use std::path::PathBuf;
use std::sync::{Arc, Mutex};
use crate::config::atomic;
use crate::config::json::Value as JsonValue;
use crate::config::Config;
use crate::validator::SchemaValidator;
#[derive(Clone)]
pub struct DataCollector {
validator: Arc<SchemaValidator>,
output_dir: PathBuf,
events_buffer_size: usize,
quality_buffer_size: usize,
training_buffer_size: usize,
gold_threshold: f64,
silver_threshold: f64,
bronze_threshold: f64,
auto_generate_samples: bool,
strict_validation: bool,
schema_version: String,
events_enabled: bool,
quality_enabled: bool,
training_enabled: bool,
events_filename: String,
quality_filename: String,
training_filename: String,
events_auto_flush_seconds: u64,
quality_auto_flush_seconds: u64,
training_auto_flush_seconds: u64,
events_buffer: Arc<Mutex<Vec<JsonValue>>>,
quality_buffer: Arc<Mutex<Vec<JsonValue>>>,
training_buffer: Arc<Mutex<Vec<JsonValue>>>,
stats: Arc<Mutex<CollectorStats>>,
}
#[derive(Debug, Default)]
struct CollectorStats {
total_received: u64,
total_accepted: u64,
total_rejected: u64,
by_type: std::collections::HashMap<String, TypeStats>,
}
#[derive(Debug, Default, Clone)]
struct TypeStats {
accepted: u64,
rejected: u64,
}
impl DataCollector {
pub fn new(
validator: Arc<SchemaValidator>,
output_dir: PathBuf,
config: &Config,
) -> std::io::Result<Self> {
std::fs::create_dir_all(&output_dir)?;
let channels = &config.json_engine.output.channels;
let training = &config.json_engine.training;
Ok(Self {
validator,
output_dir,
events_buffer_size: channels.events.buffer_size,
quality_buffer_size: channels.quality_metrics.buffer_size,
training_buffer_size: channels.training_samples.buffer_size,
gold_threshold: training.gold_threshold,
silver_threshold: training.silver_threshold,
bronze_threshold: training.bronze_threshold,
auto_generate_samples: training.auto_generate_samples,
strict_validation: config.json_engine.strict_validation,
schema_version: config.json_engine.schema_version.clone(),
events_enabled: channels.events.enabled,
quality_enabled: channels.quality_metrics.enabled,
training_enabled: channels.training_samples.enabled,
events_filename: channels.events.filename.clone(),
quality_filename: channels.quality_metrics.filename.clone(),
training_filename: channels.training_samples.filename.clone(),
events_auto_flush_seconds: channels.events.auto_flush_seconds,
quality_auto_flush_seconds: channels.quality_metrics.auto_flush_seconds,
training_auto_flush_seconds: channels.training_samples.auto_flush_seconds,
events_buffer: Arc::new(Mutex::new(Vec::with_capacity(channels.events.buffer_size))),
quality_buffer: Arc::new(Mutex::new(Vec::with_capacity(
channels.quality_metrics.buffer_size,
))),
training_buffer: Arc::new(Mutex::new(Vec::with_capacity(
channels.training_samples.buffer_size,
))),
stats: Arc::new(Mutex::new(CollectorStats::default())),
})
}
pub fn collect_engine_execution(&self, data: JsonValue) -> Result<(), String> {
self.update_stats_received("engine_execution");
crate::metrics::inc_events();
eprintln!("[collector] collect_engine_execution received");
if self.strict_validation {
if let Err(e) = self.validator.validate_or_error(&data, "engine_execution") {
self.update_stats_rejected("engine_execution");
return Err(e.to_string());
}
}
self.update_stats_accepted("engine_execution");
if self.events_enabled {
let mut buffer = self.events_buffer.lock().unwrap();
buffer.push(data.clone());
if buffer.len() >= self.events_buffer_size {
if let Err(e) = self.flush_buffer(&mut buffer, &self.events_filename) {
eprintln!(
"[collector] flush events error ({}): {}",
self.events_filename, e
);
}
}
}
if self.quality_enabled {
let has_quality = if let JsonValue::Object(m) = &data {
m.contains_key("quality_score")
} else {
false
};
if has_quality {
let mut buffer = self.quality_buffer.lock().unwrap();
buffer.push(data);
if buffer.len() >= self.quality_buffer_size {
if let Err(e) = self.flush_buffer(&mut buffer, &self.quality_filename) {
eprintln!(
"[collector] flush quality error ({}): {}",
self.quality_filename, e
);
}
}
}
}
Ok(())
}
pub fn collect_comparison_result(&self, data: JsonValue) -> Result<(), String> {
self.update_stats_received("comparison_result");
crate::metrics::inc_events();
eprintln!("[collector] collect_comparison_result received");
if self.strict_validation {
if let Err(e) = self.validator.validate_or_error(&data, "comparison_result") {
self.update_stats_rejected("comparison_result");
return Err(e.to_string());
}
}
self.update_stats_accepted("comparison_result");
if self.events_enabled {
let mut buffer = self.events_buffer.lock().unwrap();
buffer.push(data.clone());
if buffer.len() >= self.events_buffer_size {
if let Err(e) = self.flush_buffer(&mut buffer, &self.events_filename) {
eprintln!(
"[collector] flush events error ({}): {}",
self.events_filename, e
);
}
}
}
if self.quality_enabled {
let mut buffer = self.quality_buffer.lock().unwrap();
buffer.push(data.clone());
if buffer.len() >= self.quality_buffer_size {
if let Err(e) = self.flush_buffer(&mut buffer, &self.quality_filename) {
eprintln!(
"[collector] flush quality error ({}): {}",
self.quality_filename, e
);
}
}
}
if let JsonValue::Object(m) = &data {
if let Some(JsonValue::Object(wm)) = m.get("winner") {
if let Some(JsonValue::Number(score)) = wm.get("score") {
if self.auto_generate_samples && *score >= self.bronze_threshold {
if let Err(e) = self.auto_generate_training_sample(&data) {
eprintln!("[collector] auto_generate_training_sample failed: {}", e);
}
}
}
}
}
Ok(())
}
pub fn collect_training_sample(&self, data: JsonValue) -> Result<(), String> {
self.update_stats_received("training_sample");
crate::metrics::inc_events();
eprintln!("[collector] collect_training_sample received");
if self.strict_validation {
if let Err(e) = self.validator.validate_or_error(&data, "training_sample") {
self.update_stats_rejected("training_sample");
return Err(e.to_string());
}
}
self.update_stats_accepted("training_sample");
if self.training_enabled {
let mut buffer = self.training_buffer.lock().unwrap();
buffer.push(data);
if buffer.len() >= self.training_buffer_size {
if let Err(e) = self.flush_buffer(&mut buffer, &self.training_filename) {
eprintln!(
"[collector] flush training error ({}): {}",
self.training_filename, e
);
}
}
}
Ok(())
}
fn auto_generate_training_sample(&self, comparison: &JsonValue) -> Result<(), String> {
let score = if let JsonValue::Object(m) = comparison {
m.get("winner").and_then(|w| {
if let JsonValue::Object(wm) = w {
wm.get("score").and_then(|s| {
if let JsonValue::Number(n) = s {
Some(*n)
} else {
None
}
})
} else {
None
}
})
} else {
None
};
let score = score.unwrap_or(0.0);
let tier = if score >= self.gold_threshold {
"gold"
} else if score >= self.silver_threshold {
"silver"
} else if score >= self.bronze_threshold {
"bronze"
} else {
return Ok(());
};
let mut sample = std::collections::BTreeMap::new();
sample.insert(
"schema_version".to_string(),
JsonValue::String(self.schema_version.clone()),
);
sample.insert(
"sample_id".to_string(),
JsonValue::String(atomic::generate_id()),
);
sample.insert(
"created_at".to_string(),
JsonValue::String(atomic::current_time_rfc3339()),
);
let source_trace = if let JsonValue::Object(m) = comparison {
m.get("trace_id").cloned().unwrap_or(JsonValue::Null)
} else {
JsonValue::Null
};
sample.insert("source_trace_id".to_string(), source_trace);
sample.insert(
"source_type".to_string(),
JsonValue::String("auto_comparison".to_string()),
);
sample.insert("tier".to_string(), JsonValue::String(tier.to_string()));
let input = if let JsonValue::Object(m) = comparison {
m.get("query").cloned().unwrap_or(JsonValue::Null)
} else {
JsonValue::Null
};
sample.insert("input".to_string(), input);
let output_content = if let JsonValue::Object(m) = comparison {
m.get("winner")
.and_then(|w| {
if let JsonValue::Object(wm) = w {
wm.get("engine_id").cloned()
} else {
None
}
})
.unwrap_or(JsonValue::Null)
} else {
JsonValue::Null
};
let mut output_map = std::collections::BTreeMap::new();
output_map.insert("content".to_string(), output_content);
sample.insert("output".to_string(), JsonValue::Object(output_map));
let mut validation = std::collections::BTreeMap::new();
validation.insert("quality_score".to_string(), JsonValue::Number(score));
validation.insert("validated".to_string(), JsonValue::Bool(true));
validation.insert(
"validator".to_string(),
JsonValue::String("auto".to_string()),
);
validation.insert(
"validation_date".to_string(),
JsonValue::String(atomic::current_time_rfc3339()),
);
sample.insert("validation".to_string(), JsonValue::Object(validation));
let sample_value = JsonValue::Object(sample);
if self.training_enabled {
let mut buffer = self.training_buffer.lock().unwrap();
buffer.push(sample_value);
if buffer.len() >= self.training_buffer_size {
if let Err(e) = self.flush_buffer(&mut buffer, &self.training_filename) {
eprintln!(
"[collector] flush training error ({}): {}",
self.training_filename, e
);
}
}
}
Ok(())
}
pub fn flush_all(&self) -> std::io::Result<()> {
let mut events = self.events_buffer.lock().unwrap();
self.flush_buffer(&mut events, &self.events_filename)?;
let mut quality = self.quality_buffer.lock().unwrap();
self.flush_buffer(&mut quality, &self.quality_filename)?;
let mut training = self.training_buffer.lock().unwrap();
self.flush_buffer(&mut training, &self.training_filename)?;
Ok(())
}
fn flush_buffer(&self, buffer: &mut Vec<JsonValue>, filename: &str) -> std::io::Result<()> {
if buffer.is_empty() {
return Ok(());
}
let file_path = self.output_dir.join(filename);
let mut out_bytes = Vec::new();
for event in buffer.iter() {
let s = format!("{}", event);
out_bytes.extend_from_slice(s.as_bytes());
out_bytes.push(b'\n');
}
crate::config::atomic::atomic_append(&file_path, &out_bytes)?;
crate::metrics::inc_writes_flushed();
eprintln!(
"[collector] flushed entries={} path={}",
buffer.len(),
file_path.display()
);
buffer.clear();
Ok(())
}
pub fn get_stats(&self) -> JsonValue {
let stats = self.stats.lock().unwrap();
let mut by_type = std::collections::BTreeMap::new();
for (key, val) in &stats.by_type {
let mut m = std::collections::BTreeMap::new();
m.insert(
"accepted".to_string(),
JsonValue::Number(val.accepted as f64),
);
m.insert(
"rejected".to_string(),
JsonValue::Number(val.rejected as f64),
);
by_type.insert(key.clone(), JsonValue::Object(m));
}
let mut channels = std::collections::BTreeMap::new();
let mut events = std::collections::BTreeMap::new();
events.insert("enabled".to_string(), JsonValue::Bool(self.events_enabled));
events.insert(
"filename".to_string(),
JsonValue::String(self.events_filename.clone()),
);
events.insert(
"buffer_size".to_string(),
JsonValue::Number(self.events_buffer_size as f64),
);
events.insert(
"auto_flush_seconds".to_string(),
JsonValue::Number(self.events_auto_flush_seconds as f64),
);
channels.insert("events".to_string(), JsonValue::Object(events));
let mut quality = std::collections::BTreeMap::new();
quality.insert("enabled".to_string(), JsonValue::Bool(self.quality_enabled));
quality.insert(
"filename".to_string(),
JsonValue::String(self.quality_filename.clone()),
);
quality.insert(
"buffer_size".to_string(),
JsonValue::Number(self.quality_buffer_size as f64),
);
quality.insert(
"auto_flush_seconds".to_string(),
JsonValue::Number(self.quality_auto_flush_seconds as f64),
);
channels.insert("quality_metrics".to_string(), JsonValue::Object(quality));
let mut training = std::collections::BTreeMap::new();
training.insert(
"enabled".to_string(),
JsonValue::Bool(self.training_enabled),
);
training.insert(
"filename".to_string(),
JsonValue::String(self.training_filename.clone()),
);
training.insert(
"buffer_size".to_string(),
JsonValue::Number(self.training_buffer_size as f64),
);
training.insert(
"auto_flush_seconds".to_string(),
JsonValue::Number(self.training_auto_flush_seconds as f64),
);
channels.insert("training_samples".to_string(), JsonValue::Object(training));
let mut out = std::collections::BTreeMap::new();
out.insert(
"total_received".to_string(),
JsonValue::Number(stats.total_received as f64),
);
out.insert(
"total_accepted".to_string(),
JsonValue::Number(stats.total_accepted as f64),
);
out.insert(
"total_rejected".to_string(),
JsonValue::Number(stats.total_rejected as f64),
);
out.insert("by_type".to_string(), JsonValue::Object(by_type));
out.insert("channels".to_string(), JsonValue::Object(channels));
JsonValue::Object(out)
}
fn update_stats_received(&self, schema_type: &str) {
let mut stats = self.stats.lock().unwrap();
stats.total_received += 1;
stats.by_type.entry(schema_type.to_string()).or_default();
}
fn update_stats_accepted(&self, schema_type: &str) {
let mut stats = self.stats.lock().unwrap();
stats.total_accepted += 1;
stats
.by_type
.entry(schema_type.to_string())
.or_default()
.accepted += 1;
}
fn update_stats_rejected(&self, schema_type: &str) {
let mut stats = self.stats.lock().unwrap();
stats.total_rejected += 1;
stats
.by_type
.entry(schema_type.to_string())
.or_default()
.rejected += 1;
}
}
pub mod model_store {
use crate::config::atomic;
use crate::config::json::Value as JsonValue;
use std::fs;
use std::io;
use std::path::PathBuf;
#[derive(Debug, Clone)]
pub struct ModelMetadata {
pub model_id: String,
pub epoch: u64,
pub checksum: String,
pub format_version: u8,
pub created_at: String,
}
impl ModelMetadata {
fn to_json(&self) -> JsonValue {
let mut m = std::collections::BTreeMap::new();
m.insert(
"model_id".to_string(),
JsonValue::String(self.model_id.clone()),
);
m.insert("epoch".to_string(), JsonValue::Number(self.epoch as f64));
m.insert(
"checksum".to_string(),
JsonValue::String(self.checksum.clone()),
);
m.insert(
"format_version".to_string(),
JsonValue::Number(self.format_version as f64),
);
m.insert(
"created_at".to_string(),
JsonValue::String(self.created_at.clone()),
);
JsonValue::Object(m)
}
fn from_json(v: &JsonValue) -> Option<ModelMetadata> {
if let JsonValue::Object(m) = v {
Some(ModelMetadata {
model_id: match m.get("model_id") {
Some(JsonValue::String(s)) => s.clone(),
_ => {
eprintln!("[model_store] ModelMetadata.from_json: missing or invalid 'model_id'");
return None;
}
},
epoch: match m.get("epoch") {
Some(JsonValue::Number(n)) => *n as u64,
_ => {
eprintln!(
"[model_store] ModelMetadata.from_json: missing or invalid 'epoch'"
);
return None;
}
},
checksum: match m.get("checksum") {
Some(JsonValue::String(s)) => s.clone(),
_ => {
eprintln!("[model_store] ModelMetadata.from_json: missing or invalid 'checksum'");
return None;
}
},
format_version: match m.get("format_version") {
Some(JsonValue::Number(n)) => *n as u8,
_ => {
eprintln!("[model_store] ModelMetadata.from_json: missing or invalid 'format_version'");
return None;
}
},
created_at: match m.get("created_at") {
Some(JsonValue::String(s)) => s.clone(),
_ => {
eprintln!("[model_store] ModelMetadata.from_json: missing or invalid 'created_at'");
return None;
}
},
})
} else {
None
}
}
}
fn models_dir() -> PathBuf {
PathBuf::from("data/models")
}
pub fn save_model(model_id: &str, bytes: &[u8], epoch: u64) -> io::Result<ModelMetadata> {
const DEFAULT_MAX_MODEL_BYTES: usize = 100 * 1024 * 1024;
let max_size: usize = std::env::var("CHILD_JSON_ENGINE_MAX_MODEL_BYTES")
.ok()
.and_then(|s| s.parse().ok())
.or_else(|| {
std::env::var("JSON_ENGINE_MAX_MODEL_BYTES")
.ok()
.and_then(|s| s.parse().ok())
})
.unwrap_or(DEFAULT_MAX_MODEL_BYTES);
if bytes.len() > max_size {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
format!("model size {} exceeds max {} bytes", bytes.len(), max_size),
));
}
let checksum = atomic::sha256_bytes_local(bytes);
let dir = models_dir();
fs::create_dir_all(&dir)?;
let bin_path = dir.join(format!("{}.bin", model_id));
let meta_path = dir.join(format!("{}.meta.json", model_id));
atomic::atomic_write(&bin_path, bytes)?;
let meta = ModelMetadata {
model_id: model_id.to_string(),
epoch,
checksum: checksum.clone(),
format_version: 1,
created_at: atomic::current_time_rfc3339(),
};
let meta_json = meta.to_json();
let meta_bytes = meta_json.to_vec_pretty();
atomic::atomic_write(&meta_path, &meta_bytes)?;
crate::metrics::inc_models();
eprintln!(
"[model_store] saved model {} ({} bytes)",
model_id,
bytes.len()
);
Ok(meta)
}
pub fn load_model(model_id: &str) -> io::Result<Vec<u8>> {
let dir = models_dir();
let bin_path = dir.join(format!("{}.bin", model_id));
let data = fs::read(&bin_path)?;
Ok(data)
}
pub fn list_models() -> io::Result<Vec<ModelMetadata>> {
let dir = models_dir();
let mut out = Vec::new();
let entries = match fs::read_dir(&dir) {
Ok(e) => e,
Err(_) => return Ok(out),
};
for entry in entries.flatten() {
let path = entry.path();
if path.extension().and_then(|s| s.to_str()) == Some("json")
&& path
.file_name()
.and_then(|s| s.to_str())
.is_some_and(|n| n.ends_with(".meta.json"))
{
if let Ok(bytes) = fs::read(&path) {
if let Ok(s) = std::str::from_utf8(&bytes) {
if let Ok(jv) = s.parse::<crate::config::json::Value>() {
if let Some(meta) = ModelMetadata::from_json(&jv) {
out.push(meta);
}
}
}
}
}
}
Ok(out)
}
}
pub mod vocab {
use crate::config::atomic;
use crate::config::json::Value as JsonValue;
use std::fs;
use std::io;
use std::path::PathBuf;
pub fn vocab_dir() -> PathBuf {
PathBuf::from("data/vocab")
}
pub fn save_vocab(name: &str, value: &JsonValue) -> io::Result<()> {
let dir = vocab_dir();
fs::create_dir_all(&dir)?;
let path = dir.join(format!("{}.json", name));
let bytes = value.to_vec_pretty();
atomic::atomic_write(&path, &bytes)?;
Ok(())
}
pub fn load_vocab(name: &str) -> io::Result<JsonValue> {
let path = vocab_dir().join(format!("{}.json", name));
let bytes = fs::read(&path)?;
let s = std::str::from_utf8(&bytes)
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
let v = s
.parse::<JsonValue>()
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
Ok(v)
}
}