use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::{mpsc, Arc, Mutex};
use std::thread;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use crate::config::Config;
use crate::validator::SchemaValidator;
use crate::AnyResult;
pub struct DataCollector {
validator: Arc<SchemaValidator>,
output_dir: PathBuf,
network: Option<Arc<()>>,
events_buffer_size: usize,
quality_buffer_size: usize,
training_buffer_size: usize,
gold_threshold: f64,
silver_threshold: f64,
bronze_threshold: f64,
auto_generate_samples: bool,
strict_validation: bool,
schema_version: String,
events_enabled: bool,
quality_enabled: bool,
training_enabled: bool,
events_filename: String,
quality_filename: String,
training_filename: String,
events_auto_flush_seconds: u64,
quality_auto_flush_seconds: u64,
training_auto_flush_seconds: u64,
events_buffer: Arc<Mutex<Vec<String>>>,
quality_buffer: Arc<Mutex<Vec<String>>>,
training_buffer: Arc<Mutex<Vec<String>>>,
stats: Arc<Mutex<CollectorStats>>,
events_received_counter: Arc<AtomicU64>,
writes_flushed_counter: Arc<AtomicU64>,
write_sender: Option<mpsc::Sender<(PathBuf, Vec<String>)>>,
writer_handle: Arc<Mutex<Option<std::thread::JoinHandle<()>>>>,
worker_handles: Arc<Mutex<Vec<std::thread::JoinHandle<()>>>>,
shutdown_flag: Arc<AtomicBool>,
}
#[derive(Debug, Default)]
struct CollectorStats {
total_received: u64,
total_accepted: u64,
total_rejected: u64,
by_type: std::collections::HashMap<String, TypeStats>,
}
#[derive(Debug, Default, Clone)]
struct TypeStats {
accepted: u64,
rejected: u64,
}
impl DataCollector {
pub fn new(
validator: Arc<SchemaValidator>,
output_dir: PathBuf,
config: &Config,
network: Option<Arc<()>>,
) -> AnyResult<Self> {
std::fs::create_dir_all(&output_dir)?;
let channels = &config.json_engine.output.channels;
let training = &config.json_engine.training;
let (tx, rx) = mpsc::channel::<(PathBuf, Vec<String>)>();
let writer_handle = crate::io::spawn_blocking(move || {
while let Ok((path, lines)) = rx.recv() {
let joiner = lines.join("\n") + "\n";
if let Err(e) = crate::config::atomic_append(&path, joiner.as_bytes()) {
eprintln!("{{\"component\":\"collector-writer\",\"path\":\"{}\",\"error\":\"{}\",\"trace_id\":null}}", path.display(), e);
}
}
});
let collector = Self {
validator,
output_dir,
network,
events_buffer_size: channels.events.buffer_size,
quality_buffer_size: channels.quality_metrics.buffer_size,
training_buffer_size: channels.training_samples.buffer_size,
gold_threshold: training.gold_threshold,
silver_threshold: training.silver_threshold,
bronze_threshold: training.bronze_threshold,
auto_generate_samples: training.auto_generate_samples,
strict_validation: config.json_engine.strict_validation,
schema_version: config.json_engine.schema_version.clone(),
events_enabled: channels.events.enabled,
quality_enabled: channels.quality_metrics.enabled,
training_enabled: channels.training_samples.enabled,
events_filename: channels.events.filename.clone(),
quality_filename: channels.quality_metrics.filename.clone(),
training_filename: channels.training_samples.filename.clone(),
events_auto_flush_seconds: channels.events.auto_flush_seconds,
quality_auto_flush_seconds: channels.quality_metrics.auto_flush_seconds,
training_auto_flush_seconds: channels.training_samples.auto_flush_seconds,
events_buffer: Arc::new(Mutex::new(Vec::with_capacity(channels.events.buffer_size))),
quality_buffer: Arc::new(Mutex::new(Vec::with_capacity(
channels.quality_metrics.buffer_size,
))),
training_buffer: Arc::new(Mutex::new(Vec::with_capacity(
channels.training_samples.buffer_size,
))),
stats: Arc::new(Mutex::new(CollectorStats::default())),
events_received_counter: Arc::new(AtomicU64::new(0)),
writes_flushed_counter: Arc::new(AtomicU64::new(0)),
write_sender: Some(tx),
writer_handle: Arc::new(Mutex::new(Some(writer_handle))),
worker_handles: Arc::new(Mutex::new(Vec::new())),
shutdown_flag: Arc::new(AtomicBool::new(false)),
};
collector.start_auto_flush();
Ok(collector)
}
pub fn collect_engine_execution(&self, data: String) -> AnyResult<()> {
self.update_stats_received("engine_execution");
self.events_received_counter.fetch_add(1, Ordering::SeqCst);
if self.strict_validation {
if let Err(e) = self.validator.validate_or_error(&data, "engine_execution") {
self.update_stats_rejected("engine_execution");
return Err(e);
}
}
self.update_stats_accepted("engine_execution");
if self.events_enabled {
let mut buffer = self.events_buffer.lock().unwrap();
buffer.push(data.clone());
if buffer.len() >= self.events_buffer_size {
if let Err(e) = self.flush_buffer(&mut buffer, &self.events_filename) {
eprintln!("[collector] flush events error: {}", e);
}
}
}
if self.quality_enabled {
let mut buffer = self.quality_buffer.lock().unwrap();
buffer.push(data);
if buffer.len() >= self.quality_buffer_size {
if let Err(e) = self.flush_buffer(&mut buffer, &self.quality_filename) {
eprintln!("[collector] flush quality error: {}", e);
}
}
}
Ok(())
}
pub fn collect_comparison_result(&self, data: String) -> AnyResult<()> {
self.update_stats_received("comparison_result");
if self.strict_validation {
if let Err(e) = self.validator.validate_or_error(&data, "comparison_result") {
self.update_stats_rejected("comparison_result");
return Err(e);
}
}
self.update_stats_accepted("comparison_result");
if self.events_enabled {
let mut buffer = self.events_buffer.lock().unwrap();
buffer.push(data.clone());
if buffer.len() >= self.events_buffer_size {
if let Err(e) = self.flush_buffer(&mut buffer, &self.events_filename) {
eprintln!("[collector] flush events error: {}", e);
}
}
}
if self.quality_enabled {
let mut buffer = self.quality_buffer.lock().unwrap();
buffer.push(data.clone());
if buffer.len() >= self.quality_buffer_size {
if let Err(e) = self.flush_buffer(&mut buffer, &self.quality_filename) {
eprintln!("[collector] flush quality error: {}", e);
}
}
}
if self.auto_generate_samples {
if let Some(score) = extract_winner_score(&data) {
if score >= self.bronze_threshold {
if let Err(e) = self.auto_generate_training_sample(&data, score) {
eprintln!("[collector] auto-generate training sample error: {}", e);
}
}
}
}
Ok(())
}
pub fn collect_training_sample(&self, data: String) -> AnyResult<()> {
self.update_stats_received("training_sample");
if self.strict_validation {
if let Err(e) = self.validator.validate_or_error(&data, "training_sample") {
self.update_stats_rejected("training_sample");
return Err(e);
}
}
self.update_stats_accepted("training_sample");
if self.training_enabled {
let mut buffer = self.training_buffer.lock().unwrap();
buffer.push(data);
if buffer.len() >= self.training_buffer_size {
if let Err(e) = self.flush_buffer(&mut buffer, &self.training_filename) {
eprintln!("[collector] flush training error: {}", e);
}
}
}
Ok(())
}
fn auto_generate_training_sample(&self, comparison: &str, score: f64) -> AnyResult<()> {
if !self.training_enabled || !self.auto_generate_samples {
return Ok(());
}
std::hint::black_box(comparison);
let id = gen_id();
let created = now_ts();
let topology = if self.network.is_some() {
"present".to_string()
} else {
"null".to_string()
};
let grade = if score >= self.gold_threshold {
"gold"
} else if score >= self.silver_threshold {
"silver"
} else {
"bronze"
};
let sample = format!("{{\"schema_version\":\"{}\",\"sample_id\":\"{}\",\"created_at\":\"{}\",\"model_topology\":\"{}\",\"grade\":\"{}\",\"score\":{}}}", self.schema_version, id, created, topology, grade, score);
let mut buffer = self.training_buffer.lock().unwrap();
buffer.push(sample);
if buffer.len() >= self.training_buffer_size {
if let Err(e) = self.flush_buffer(&mut buffer, &self.training_filename) {
eprintln!("[collector] flush training error: {}", e);
}
}
Ok(())
}
fn start_auto_flush(&self) {
let events_interval = self.events_auto_flush_seconds;
let quality_interval = self.quality_auto_flush_seconds;
let training_interval = self.training_auto_flush_seconds;
let events_buf = Arc::clone(&self.events_buffer);
let quality_buf = Arc::clone(&self.quality_buffer);
let training_buf = Arc::clone(&self.training_buffer);
let out_dir_events = self.output_dir.clone();
let out_dir_quality = self.output_dir.clone();
let out_dir_training = self.output_dir.clone();
let events_file = self.events_filename.clone();
let quality_file = self.quality_filename.clone();
let training_file = self.training_filename.clone();
if events_interval > 0 {
let writer_opt = self.write_sender.clone();
let shutdown = Arc::clone(&self.shutdown_flag);
let handles = Arc::clone(&self.worker_handles);
let handle = crate::io::spawn_blocking(move || loop {
if shutdown.load(Ordering::SeqCst) {
break;
}
thread::sleep(Duration::from_secs(events_interval));
if shutdown.load(Ordering::SeqCst) {
break;
}
let mut locked = events_buf.lock().unwrap();
if locked.is_empty() {
continue;
}
let to_write = std::mem::take(&mut *locked);
drop(locked);
let file_path = out_dir_events.join(&events_file);
if let Some(tx) = &writer_opt {
if let Err(err) = tx.send((file_path, to_write)) {
eprintln!("{{\"component\":\"collector\",\"op\":\"send_events\",\"error\":\"{}\",\"trace_id\":null}}", err);
}
} else {
let joiner = to_write.join("\n") + "\n";
if let Err(err) = crate::config::atomic_append(&file_path, joiner.as_bytes()) {
eprintln!("{{\"component\":\"collector\",\"op\":\"write_events\",\"file\":\"{}\",\"error\":\"{}\",\"trace_id\":null}}", file_path.display(), err);
} else {
}
}
});
handles.lock().unwrap().push(handle);
}
if quality_interval > 0 {
let writer_opt = self.write_sender.clone();
let shutdown = Arc::clone(&self.shutdown_flag);
let handles = Arc::clone(&self.worker_handles);
let handle = crate::io::spawn_blocking(move || loop {
if shutdown.load(Ordering::SeqCst) {
break;
}
thread::sleep(Duration::from_secs(quality_interval));
if shutdown.load(Ordering::SeqCst) {
break;
}
let mut locked = quality_buf.lock().unwrap();
if locked.is_empty() {
continue;
}
let to_write = std::mem::take(&mut *locked);
drop(locked);
let file_path = out_dir_quality.join(&quality_file);
if let Some(tx) = &writer_opt {
if let Err(err) = tx.send((file_path, to_write)) {
eprintln!("{{\"component\":\"collector\",\"op\":\"send_quality\",\"error\":\"{}\",\"trace_id\":null}}", err);
}
} else {
let joiner = to_write.join("\n") + "\n";
if let Err(err) = crate::config::atomic_append(&file_path, joiner.as_bytes()) {
eprintln!("{{\"component\":\"collector\",\"op\":\"write_quality\",\"file\":\"{}\",\"error\":\"{}\",\"trace_id\":null}}", file_path.display(), err);
}
}
});
handles.lock().unwrap().push(handle);
}
if training_interval > 0 {
let writer_opt = self.write_sender.clone();
let shutdown = Arc::clone(&self.shutdown_flag);
let handles = Arc::clone(&self.worker_handles);
let handle = crate::io::spawn_blocking(move || loop {
if shutdown.load(Ordering::SeqCst) {
break;
}
thread::sleep(Duration::from_secs(training_interval));
if shutdown.load(Ordering::SeqCst) {
break;
}
let mut locked = training_buf.lock().unwrap();
if locked.is_empty() {
continue;
}
let to_write = std::mem::take(&mut *locked);
drop(locked);
let file_path = out_dir_training.join(&training_file);
if let Some(tx) = &writer_opt {
if let Err(err) = tx.send((file_path, to_write)) {
eprintln!("{{\"component\":\"collector\",\"op\":\"send_training\",\"error\":\"{}\",\"trace_id\":null}}", err);
}
} else {
let joiner = to_write.join("\n") + "\n";
if let Err(err) = crate::config::atomic_append(&file_path, joiner.as_bytes()) {
eprintln!("{{\"component\":\"collector\",\"op\":\"write_training\",\"file\":\"{}\",\"error\":\"{}\",\"trace_id\":null}}", file_path.display(), err);
}
}
});
handles.lock().unwrap().push(handle);
}
}
pub fn flush_all(&self) -> AnyResult<()> {
let events_vec = std::mem::take(&mut *self.events_buffer.lock().unwrap());
if !events_vec.is_empty() {
let path = self.output_dir.join(&self.events_filename);
let joiner = events_vec.join("\n") + "\n";
crate::config::atomic_append(&path, joiner.as_bytes())?;
self.writes_flushed_counter.fetch_add(1, Ordering::SeqCst);
}
let quality_vec = std::mem::take(&mut *self.quality_buffer.lock().unwrap());
if !quality_vec.is_empty() {
let path = self.output_dir.join(&self.quality_filename);
let joiner = quality_vec.join("\n") + "\n";
crate::config::atomic_append(&path, joiner.as_bytes())?;
self.writes_flushed_counter.fetch_add(1, Ordering::SeqCst);
}
let training_vec = std::mem::take(&mut *self.training_buffer.lock().unwrap());
if !training_vec.is_empty() {
let path = self.output_dir.join(&self.training_filename);
let joiner = training_vec.join("\n") + "\n";
crate::config::atomic_append(&path, joiner.as_bytes())?;
self.writes_flushed_counter.fetch_add(1, Ordering::SeqCst);
}
Ok(())
}
fn flush_buffer(&self, buffer: &mut Vec<String>, filename: &str) -> AnyResult<()> {
if buffer.is_empty() {
return Ok(());
}
let file_path = self.output_dir.join(filename);
let to_write = std::mem::take(buffer);
let joiner = to_write.join("\n") + "\n";
if let Err(err) = crate::config::atomic_append(&file_path, joiner.as_bytes()) {
eprintln!("{{\"component\":\"collector\",\"file\":\"{}\",\"error\":\"{}\",\"trace_id\":null}}", file_path.display(), err);
}
self.writes_flushed_counter.fetch_add(1, Ordering::SeqCst);
Ok(())
}
pub fn get_stats(&self) -> String {
let stats = self.stats.lock().unwrap();
format!(
"total_received={} total_accepted={} total_rejected={}",
stats.total_received, stats.total_accepted, stats.total_rejected
)
}
fn update_stats_received(&self, schema_type: &str) {
let mut stats = self.stats.lock().unwrap();
stats.total_received += 1;
stats.by_type.entry(schema_type.to_string()).or_default();
}
fn update_stats_accepted(&self, schema_type: &str) {
let mut stats = self.stats.lock().unwrap();
stats.total_accepted += 1;
stats
.by_type
.entry(schema_type.to_string())
.or_default()
.accepted += 1;
}
fn update_stats_rejected(&self, schema_type: &str) {
let mut stats = self.stats.lock().unwrap();
stats.total_rejected += 1;
stats
.by_type
.entry(schema_type.to_string())
.or_default()
.rejected += 1;
}
}
fn gen_id() -> String {
match SystemTime::now().duration_since(UNIX_EPOCH) {
Ok(d) => format!("id-{}", d.as_millis()),
Err(_) => "id-0".to_string(),
}
}
fn now_ts() -> String {
match SystemTime::now().duration_since(UNIX_EPOCH) {
Ok(d) => format!("{}", d.as_secs()),
Err(_) => "0".to_string(),
}
}
fn extract_winner_score(s: &str) -> Option<f64> {
let lower = s;
if let Some(pos) = lower.find("\"winner\"") {
if let Some(score_pos) = lower[pos..].find("\"score\"") {
let start = pos + score_pos;
if let Some(colon) = lower[start..].find(':') {
let num_start = start + colon + 1;
let tail = &lower[num_start..];
let mut num_chars = String::new();
for c in tail.chars() {
if c.is_ascii_digit() || c == '.' || c == '-' {
num_chars.push(c);
} else if !num_chars.is_empty() {
break;
} else {
continue;
}
}
if !num_chars.is_empty() {
if let Ok(v) = num_chars.parse::<f64>() {
return Some(v);
}
}
}
}
}
None
}
impl Drop for DataCollector {
fn drop(&mut self) {
self.shutdown_flag.store(true, Ordering::SeqCst);
let events = std::mem::take(&mut *self.events_buffer.lock().unwrap());
let quality = std::mem::take(&mut *self.quality_buffer.lock().unwrap());
let training = std::mem::take(&mut *self.training_buffer.lock().unwrap());
if let Some(tx) = self.write_sender.take() {
if !events.is_empty() {
if let Err(err) = tx.send((self.output_dir.join(&self.events_filename), events)) {
eprintln!(
"[collector] failed to send remaining events to writer: {}",
err
);
}
}
if !quality.is_empty() {
if let Err(err) = tx.send((self.output_dir.join(&self.quality_filename), quality)) {
eprintln!(
"[collector] failed to send remaining quality to writer: {}",
err
);
}
}
if !training.is_empty() {
if let Err(err) = tx.send((self.output_dir.join(&self.training_filename), training))
{
eprintln!(
"[collector] failed to send remaining training to writer: {}",
err
);
}
}
drop(tx);
} else {
if !events.is_empty() {
let p = self.output_dir.join(&self.events_filename);
let joiner = events.join("\n") + "\n";
if let Err(err) = crate::config::atomic_append(&p, joiner.as_bytes()) {
eprintln!("{{\"component\":\"collector\",\"op\":\"final_write_events\",\"file\":\"{}\",\"error\":\"{}\",\"trace_id\":null}}", p.display(), err);
} else {
self.writes_flushed_counter.fetch_add(1, Ordering::SeqCst);
}
let p = self.output_dir.join(&self.quality_filename);
let joiner = quality.join("\n") + "\n";
if let Err(err) = crate::config::atomic_append(&p, joiner.as_bytes()) {
eprintln!("{{\"component\":\"collector\",\"op\":\"final_write_quality\",\"file\":\"{}\",\"error\":\"{}\",\"trace_id\":null}}", p.display(), err);
} else {
self.writes_flushed_counter.fetch_add(1, Ordering::SeqCst);
}
let p = self.output_dir.join(&self.training_filename);
let joiner = training.join("\n") + "\n";
if let Err(err) = crate::config::atomic_append(&p, joiner.as_bytes()) {
eprintln!("{{\"component\":\"collector\",\"op\":\"final_write_training\",\"file\":\"{}\",\"error\":\"{}\",\"trace_id\":null}}", p.display(), err);
} else {
self.writes_flushed_counter.fetch_add(1, Ordering::SeqCst);
}
}
let mut wh = self.worker_handles.lock().unwrap();
for h in wh.drain(..) {
if let Err(e) = h.join() {
let any = &*e;
if let Some(s) = any.downcast_ref::<&str>() {
eprintln!("[collector] worker thread panicked: {}", s);
} else if let Some(s) = any.downcast_ref::<String>() {
eprintln!("[collector] worker thread panicked: {}", s);
} else {
eprintln!("[collector] worker thread panicked with unknown payload");
}
}
}
if let Some(h) = self.writer_handle.lock().unwrap().take() {
if let Err(e) = h.join() {
let any = &*e;
if let Some(s) = any.downcast_ref::<&str>() {
eprintln!("[collector] writer thread panicked: {}", s);
} else if let Some(s) = any.downcast_ref::<String>() {
eprintln!("[collector] writer thread panicked: {}", s);
} else {
eprintln!("[collector] writer thread panicked with unknown payload");
}
}
}
}
}
}