use std::sync::mpsc::Sender;
use std::time::Duration;
use serde::Deserialize;
use crate::pipeline::LocalPipeline;
use crate::tui::app::{DashEvent, LogEntry};
#[derive(Deserialize)]
struct StatsResponse {
ingested_total: u64,
cache_hit_rate: f64,
detector_calibrated: bool,
#[serde(default)]
anomalies_total: u64,
}
pub fn http_poll_loop(server_url: String, tx: Sender<DashEvent>) {
let stats_url = format!("{server_url}/stats");
let mut prev_ingested = 0u64;
let mut prev_anomalies = 0u64;
loop {
let stats: Option<StatsResponse> = ureq::get(&stats_url)
.timeout(Duration::from_secs(1))
.call()
.ok()
.and_then(|r| r.into_json().ok());
match stats {
Some(stats) => {
let delta_ingested = stats.ingested_total.saturating_sub(prev_ingested);
let delta_anomalies = stats.anomalies_total.saturating_sub(prev_anomalies);
let anomaly_rate = if delta_ingested > 0 {
delta_anomalies as f64 / delta_ingested as f64
} else {
0.0
};
prev_ingested = stats.ingested_total;
prev_anomalies = stats.anomalies_total;
if tx
.send(DashEvent::StatsUpdate {
ingested_total: stats.ingested_total,
cache_hit_rate: stats.cache_hit_rate,
anomaly_rate,
detector_calibrated: stats.detector_calibrated,
})
.is_err()
{
break;
}
}
None => {
if tx
.send(DashEvent::ConnError(format!(
"Cannot reach {server_url}/stats — is the server running?"
)))
.is_err()
{
break;
}
}
}
std::thread::sleep(Duration::from_millis(500));
}
}
pub fn standalone_loop(mut pipeline: LocalPipeline, tx: Sender<DashEvent>) {
use std::io::{BufRead, BufReader};
let stdin = std::io::stdin();
let reader = BufReader::new(stdin.lock());
let mut total: u64 = 0;
let mut anomalies: u64 = 0;
for line in reader.lines() {
let line = match line {
Ok(l) => l,
Err(_) => break,
};
if line.is_empty() {
continue;
}
if let Ok(result) = pipeline.process(&line) {
total += 1;
if result.is_anomaly {
anomalies += 1;
}
let anomaly_rate = if total > 0 {
anomalies as f64 / total as f64
} else {
0.0
};
if tx
.send(DashEvent::LogLine(LogEntry {
text: line,
is_anomaly: result.is_anomaly,
score: result.score,
}))
.is_err()
{
break;
}
if tx
.send(DashEvent::StatsUpdate {
ingested_total: total,
cache_hit_rate: 0.0, anomaly_rate,
detector_calibrated: pipeline.calibrated(),
})
.is_err()
{
break;
}
}
}
}