use crate::decision_trace::{read_decisions_from_msgpack, sampling, DecisionTrace};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::path::PathBuf;
use std::time::{Duration, Instant, SystemTime};
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(default)]
pub struct DepylerIngestConfig {
#[serde(default = "default_watch_paths")]
pub watch_paths: Vec<PathBuf>,
#[serde(default = "default_poll_interval")]
pub poll_interval_ms: u64,
#[serde(default = "default_sample_rate")]
pub remote_sample_rate: f64,
#[serde(default = "default_max_rate")]
pub max_remote_rate: u64,
}
fn default_watch_paths() -> Vec<PathBuf> {
vec![PathBuf::from("/tmp/depyler_decisions.msgpack")]
}
fn default_poll_interval() -> u64 {
100
}
fn default_sample_rate() -> f64 {
0.1
}
fn default_max_rate() -> u64 {
1000
}
impl Default for DepylerIngestConfig {
fn default() -> Self {
Self {
watch_paths: default_watch_paths(),
poll_interval_ms: default_poll_interval(),
remote_sample_rate: default_sample_rate(),
max_remote_rate: default_max_rate(),
}
}
}
#[derive(Debug, Clone, Default)]
pub struct IngestStats {
pub total_decisions_seen: u64,
pub total_decisions_sampled: u64,
pub total_decisions_exported: u64,
pub circuit_breaker_trips: u64,
}
#[derive(Debug, Default)]
struct FileState {
last_mtime: Option<SystemTime>,
last_count: usize,
}
pub struct DepylerWatcher {
config: DepylerIngestConfig,
file_states: HashMap<PathBuf, FileState>,
stats: IngestStats,
last_poll: Option<Instant>,
decisions_this_second: u64,
second_window_start: Instant,
}
impl DepylerWatcher {
pub fn new(config: DepylerIngestConfig) -> Result<Self, String> {
Ok(Self {
config,
file_states: HashMap::new(),
stats: IngestStats::default(),
last_poll: None,
decisions_this_second: 0,
second_window_start: Instant::now(),
})
}
pub fn poll(&mut self) -> Result<Vec<DecisionTrace>, String> {
let mut all_new_decisions = Vec::new();
for path in &self.config.watch_paths.clone() {
let new_decisions = self.poll_file(path)?;
all_new_decisions.extend(new_decisions);
}
self.stats.total_decisions_seen += all_new_decisions.len() as u64;
self.last_poll = Some(Instant::now());
Ok(all_new_decisions)
}
fn poll_file(&mut self, path: &PathBuf) -> Result<Vec<DecisionTrace>, String> {
if !path.exists() {
return Ok(Vec::new());
}
let metadata = std::fs::metadata(path)
.map_err(|e| format!("Failed to get metadata for {path:?}: {e}"))?;
let mtime = metadata.modified().ok();
let state = self.file_states.entry(path.clone()).or_default();
if state.last_mtime == mtime {
return Ok(Vec::new());
}
let all_decisions = read_decisions_from_msgpack(path)?;
let new_decisions = if all_decisions.len() > state.last_count {
all_decisions[state.last_count..].to_vec()
} else {
all_decisions.clone()
};
state.last_mtime = mtime;
state.last_count = all_decisions.len();
Ok(new_decisions)
}
pub fn poll_sampled(&mut self) -> Result<Vec<DecisionTrace>, String> {
let all_decisions = self.poll()?;
let sample_rate = self.config.remote_sample_rate;
let sampled: Vec<DecisionTrace> = all_decisions
.into_iter()
.filter(|_| sampling::should_sample_trace(sample_rate))
.collect();
self.stats.total_decisions_sampled += sampled.len() as u64;
Ok(sampled)
}
pub fn poll_with_circuit_breaker(&mut self) -> Result<Vec<DecisionTrace>, String> {
let all_decisions = self.poll()?;
if self.second_window_start.elapsed() >= Duration::from_secs(1) {
self.decisions_this_second = 0;
self.second_window_start = Instant::now();
}
let remaining_quota =
self.config.max_remote_rate.saturating_sub(self.decisions_this_second);
let exported: Vec<DecisionTrace> =
all_decisions.into_iter().take(remaining_quota as usize).collect();
if exported.len() < remaining_quota as usize {
} else if remaining_quota == 0 {
self.stats.circuit_breaker_trips += 1;
}
self.decisions_this_second += exported.len() as u64;
self.stats.total_decisions_exported += exported.len() as u64;
Ok(exported)
}
pub fn stats(&self) -> &IngestStats {
&self.stats
}
pub fn poll_interval(&self) -> Duration {
Duration::from_millis(self.config.poll_interval_ms)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_config_default() {
let config = DepylerIngestConfig::default();
assert_eq!(config.poll_interval_ms, 100);
assert!((config.remote_sample_rate - 0.1).abs() < f64::EPSILON);
assert_eq!(config.max_remote_rate, 1000);
assert_eq!(config.watch_paths.len(), 1);
}
#[test]
fn test_config_default_functions() {
assert_eq!(default_poll_interval(), 100);
assert!((default_sample_rate() - 0.1).abs() < f64::EPSILON);
assert_eq!(default_max_rate(), 1000);
let paths = default_watch_paths();
assert_eq!(paths.len(), 1);
}
#[test]
fn test_watcher_creation() {
let config = DepylerIngestConfig::default();
let watcher = DepylerWatcher::new(config);
assert!(watcher.is_ok());
}
#[test]
fn test_watcher_poll_nonexistent_file() {
let config = DepylerIngestConfig {
watch_paths: vec![PathBuf::from("/nonexistent/path.msgpack")],
..Default::default()
};
let mut watcher = DepylerWatcher::new(config).expect("test");
let result = watcher.poll();
assert!(result.is_ok());
assert!(result.expect("test").is_empty());
}
#[test]
fn test_watcher_stats() {
let config = DepylerIngestConfig::default();
let watcher = DepylerWatcher::new(config).expect("test");
let stats = watcher.stats();
assert_eq!(stats.total_decisions_seen, 0);
assert_eq!(stats.total_decisions_sampled, 0);
assert_eq!(stats.total_decisions_exported, 0);
assert_eq!(stats.circuit_breaker_trips, 0);
}
#[test]
fn test_watcher_poll_interval() {
let config = DepylerIngestConfig { poll_interval_ms: 500, ..Default::default() };
let watcher = DepylerWatcher::new(config).expect("test");
assert_eq!(watcher.poll_interval(), Duration::from_millis(500));
}
#[test]
fn test_ingest_stats_default() {
let stats = IngestStats::default();
assert_eq!(stats.total_decisions_seen, 0);
assert_eq!(stats.total_decisions_sampled, 0);
assert_eq!(stats.total_decisions_exported, 0);
assert_eq!(stats.circuit_breaker_trips, 0);
}
#[test]
fn test_ingest_stats_clone() {
let stats = IngestStats {
total_decisions_seen: 100,
total_decisions_sampled: 50,
total_decisions_exported: 25,
circuit_breaker_trips: 2,
};
let cloned = stats.clone();
assert_eq!(cloned.total_decisions_seen, 100);
assert_eq!(cloned.total_decisions_sampled, 50);
}
#[test]
fn test_config_clone() {
let config = DepylerIngestConfig::default();
let cloned = config.clone();
assert_eq!(cloned.poll_interval_ms, config.poll_interval_ms);
}
#[test]
fn test_config_debug() {
let config = DepylerIngestConfig::default();
let debug = format!("{:?}", config);
assert!(debug.contains("DepylerIngestConfig"));
}
#[test]
fn test_ingest_stats_debug() {
let stats = IngestStats::default();
let debug = format!("{:?}", stats);
assert!(debug.contains("IngestStats"));
}
#[test]
fn test_file_state_default() {
let state = FileState::default();
assert!(state.last_mtime.is_none());
assert_eq!(state.last_count, 0);
}
#[test]
fn test_watcher_poll_sampled_nonexistent() {
let config = DepylerIngestConfig {
watch_paths: vec![PathBuf::from("/nonexistent/path.msgpack")],
remote_sample_rate: 1.0, ..Default::default()
};
let mut watcher = DepylerWatcher::new(config).expect("test");
let result = watcher.poll_sampled();
assert!(result.is_ok());
assert!(result.expect("test").is_empty());
}
#[test]
fn test_watcher_poll_with_circuit_breaker_nonexistent() {
let config = DepylerIngestConfig {
watch_paths: vec![PathBuf::from("/nonexistent/path.msgpack")],
max_remote_rate: 100,
..Default::default()
};
let mut watcher = DepylerWatcher::new(config).expect("test");
let result = watcher.poll_with_circuit_breaker();
assert!(result.is_ok());
assert!(result.expect("test").is_empty());
}
#[test]
fn test_watcher_empty_watch_paths() {
let config = DepylerIngestConfig { watch_paths: vec![], ..Default::default() };
let mut watcher = DepylerWatcher::new(config).expect("test");
let result = watcher.poll();
assert!(result.is_ok());
assert!(result.expect("test").is_empty());
}
#[test]
fn test_watcher_multiple_polls() {
let config = DepylerIngestConfig {
watch_paths: vec![PathBuf::from("/nonexistent/path.msgpack")],
..Default::default()
};
let mut watcher = DepylerWatcher::new(config).expect("test");
for _ in 0..5 {
let result = watcher.poll();
assert!(result.is_ok());
}
}
#[test]
fn test_watcher_circuit_breaker_stats() {
let config = DepylerIngestConfig {
watch_paths: vec![PathBuf::from("/nonexistent/path.msgpack")],
max_remote_rate: 0, ..Default::default()
};
let mut watcher = DepylerWatcher::new(config).expect("test");
let _ = watcher.poll_with_circuit_breaker();
let stats = watcher.stats();
assert_eq!(stats.total_decisions_exported, 0);
}
}