use std::collections::VecDeque;
use std::sync::Arc;
use futures::StreamExt;
use net_sdk::dataforts::{BlobAdapter, BlobInventoryEntry, BlobListOptions, MeshBlobAdapter};
use net_sdk::deck::{AdminAuditRecord, DeckClient, FailureRecord, LogFilter, LogRecord};
use parking_lot::Mutex;
#[allow(dead_code)]
#[derive(Clone, Debug)]
pub struct NrpcCall {
pub ts_ms: u64,
pub caller: u64,
pub callee: u64,
pub method: String,
pub latency_ms: u32,
pub status: NrpcStatus,
pub request_bytes: u32,
pub response_bytes: u32,
}
#[allow(dead_code)]
#[derive(Clone, Debug)]
pub enum NrpcStatus {
Ok,
InFlight,
Error(String),
Timeout,
}
pub const NRPC_TAIL_CAP: usize = 5_000;
#[allow(dead_code)]
#[derive(Clone)]
pub struct NrpcTail {
pub records: Arc<Mutex<VecDeque<NrpcCall>>>,
pub cap: usize,
}
#[allow(dead_code)]
impl NrpcTail {
pub fn new(cap: usize) -> Self {
Self {
records: Arc::new(Mutex::new(VecDeque::with_capacity(cap.min(1024)))),
cap,
}
}
pub fn snapshot(&self) -> Vec<NrpcCall> {
let g = self.records.lock();
g.iter().cloned().collect()
}
pub fn snapshot_tail(&self, n: usize) -> Vec<NrpcCall> {
if n == 0 {
return Vec::new();
}
let g = self.records.lock();
let take = n.min(g.len());
let start = g.len() - take;
g.iter().skip(start).cloned().collect()
}
pub fn push(&self, call: NrpcCall) {
let mut g = self.records.lock();
if g.len() == self.cap {
g.pop_front();
}
g.push_back(call);
}
}
pub const LOGS_TAIL_CAP: usize = 5_000;
pub const AUDIT_TAIL_CAP: usize = 2_000;
pub const FAILURES_TAIL_CAP: usize = 2_000;
#[derive(Clone)]
pub struct LogsTail {
pub records: Arc<Mutex<VecDeque<LogRecord>>>,
pub cap: usize,
}
impl LogsTail {
pub fn new(cap: usize) -> Self {
Self {
records: Arc::new(Mutex::new(VecDeque::with_capacity(cap.min(1024)))),
cap,
}
}
pub fn snapshot(&self) -> Vec<LogRecord> {
let g = self.records.lock();
g.iter().cloned().collect()
}
pub fn push(&self, record: LogRecord) {
let mut g = self.records.lock();
if g.len() == self.cap {
g.pop_front();
}
g.push_back(record);
}
}
pub fn spawn_logs_stream(deck: Arc<DeckClient>, tail: LogsTail) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
let mut stream = deck.subscribe_logs(LogFilter::new());
while let Some(item) = stream.next().await {
match item {
Ok(record) => tail.push(record),
Err(_err) => {
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
continue;
}
}
}
})
}
#[derive(Clone)]
pub struct AuditTail {
pub records: Arc<Mutex<VecDeque<AdminAuditRecord>>>,
pub cap: usize,
}
impl AuditTail {
pub fn new(cap: usize) -> Self {
Self {
records: Arc::new(Mutex::new(VecDeque::with_capacity(cap.min(512)))),
cap,
}
}
pub fn snapshot(&self) -> Vec<AdminAuditRecord> {
let g = self.records.lock();
g.iter().cloned().collect()
}
pub fn push(&self, record: AdminAuditRecord) {
let mut g = self.records.lock();
if g.len() == self.cap {
g.pop_front();
}
g.push_back(record);
}
}
pub fn spawn_audit_stream(deck: Arc<DeckClient>, tail: AuditTail) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
let mut stream = deck.audit().stream();
while let Some(item) = stream.next().await {
match item {
Ok(record) => tail.push(record),
Err(_err) => {
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
continue;
}
}
}
})
}
#[derive(Clone)]
pub struct FailuresTail {
pub records: Arc<Mutex<VecDeque<FailureRecord>>>,
pub cap: usize,
}
impl FailuresTail {
pub fn new(cap: usize) -> Self {
Self {
records: Arc::new(Mutex::new(VecDeque::with_capacity(cap.min(512)))),
cap,
}
}
pub fn snapshot(&self) -> Vec<FailureRecord> {
let g = self.records.lock();
g.iter().cloned().collect()
}
pub fn push(&self, record: FailureRecord) {
let mut g = self.records.lock();
if g.len() == self.cap {
g.pop_front();
}
g.push_back(record);
}
}
pub fn spawn_failures_stream(
deck: Arc<DeckClient>,
tail: FailuresTail,
) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
let mut stream = deck.subscribe_failures(0);
while let Some(item) = stream.next().await {
match item {
Ok(record) => tail.push(record),
Err(_err) => {
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
continue;
}
}
}
})
}
pub const BLOBS_TAIL_CAP: usize = 5_000;
#[derive(Clone)]
pub struct BlobsTail {
pub records: Arc<Mutex<Vec<BlobInventoryEntry>>>,
}
impl BlobsTail {
pub fn new() -> Self {
Self {
records: Arc::new(Mutex::new(Vec::new())),
}
}
pub fn snapshot(&self) -> Vec<BlobInventoryEntry> {
self.records.lock().clone()
}
fn replace(&self, entries: Vec<BlobInventoryEntry>) {
*self.records.lock() = entries;
}
}
impl Default for BlobsTail {
fn default() -> Self {
Self::new()
}
}
pub fn spawn_blobs_poll(
adapters: Vec<Arc<MeshBlobAdapter>>,
tail: BlobsTail,
poll_interval: std::time::Duration,
toast_tx: std::sync::mpsc::Sender<String>,
) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
let mut ticker = tokio::time::interval(poll_interval);
ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
let mut last_err: std::collections::HashMap<String, Option<String>> =
std::collections::HashMap::new();
loop {
ticker.tick().await;
let opts = BlobListOptions {
prefix_hex: None,
limit: BLOBS_TAIL_CAP,
};
let mut merged: Vec<BlobInventoryEntry> = Vec::new();
for adapter in &adapters {
let id = adapter.adapter_id().to_string();
match adapter.list(&opts).await {
Ok(entries) => {
merged.extend(entries);
if last_err
.get(&id)
.map(|prev| prev.is_some())
.unwrap_or(false)
{
let _ = toast_tx.send(format!("BLOBS poll: adapter {id} recovered"));
}
last_err.insert(id, None);
}
Err(err) => {
let msg = format!("{err}");
let changed = last_err
.get(&id)
.map(|prev| prev.as_deref() != Some(msg.as_str()))
.unwrap_or(true);
if changed {
let _ = toast_tx
.send(format!("BLOBS poll: adapter {id} list error: {msg}"));
}
last_err.insert(id, Some(msg));
}
}
}
if merged.len() > BLOBS_TAIL_CAP {
merged.truncate(BLOBS_TAIL_CAP);
}
tail.replace(merged);
}
})
}
pub struct Tails {
pub logs: LogsTail,
pub audit: AuditTail,
pub failures: FailuresTail,
pub blobs: BlobsTail,
pub nrpc: NrpcTail,
}