parent_ai_json_engine
Lightweight Rust crate to collect, aggregate, and persist JSONL events for monitoring and model training.
Overview
This crate provides thread-safe primitives to collect JSONL events, optionally validate them against bundled JSON Schemas (schemas/v1/), persist them atomically to disk, and periodically aggregate metrics per engine.
Key components
DataCollector— buffer, validate (optional), and persistengine_execution,comparison_result, andtraining_sampleJSONL lines.MetricsAggregator— read events and write periodic aggregated statistics (min/max/avg latencies, p50/p95/p99, success rates, mean quality) perengine_id.Config— configuration and theatomic_appendhelper for safe small writes.SchemaValidator— lightweight loader for the JSON Schema files underschemas/v1/.- Optional
nativefeature — enables native model backend; crate works without it.
Quick example
use Arc;
use ;
Data flow
- Input APIs:
collect_engine_execution,collect_comparison_result,collect_training_samplewith payloads matching the schemas inschemas/v1/. - Buffering: channels buffer events in memory and flush by size or interval.
- Persistence: JSONL lines are appended atomically to
output_dir/<filename>using the crate helper.
What json_engine does (detailed)
-
Accepts JSON payloads for three primary event types and persists them as JSONL:
engine_execution: records a single run of a given engine (latency, status, trace id, input/output).comparison_result: contains a query and multiple candidate engine responses plus awinnerwith ascoreused to rank outputs.training_sample: curated examples derived fromcomparison_resultor produced externally for fine-tuning.
-
Validation: incoming payloads are optionally validated with
SchemaValidatoragainst the JSON Schema files inschemas/v1/. Invalid payloads are logged and skipped (configurable behavior). -
Buffering and channels:
- Each output channel buffers events in memory. Channels are configurable via
Configwith per-channel options:enabled,filename,buffer_size,auto_flush_seconds. - Default filenames produced by the crate:
events.jsonl(engine executions),comparisons.jsonl(comparison results),training_samples.jsonl(generated or ingested training samples), andaggregated_stats.jsonl(aggregator output). - Flush triggers: buffer size threshold, periodic auto-flush interval, or explicit flush on shutdown.
- Each output channel buffers events in memory. Channels are configurable via
-
Atomic persistence:
- The crate uses an
atomic_appendhelper to ensure safe on-disk writes. For small atomic updates the helper writes to a temporary file in the same directory, fsyncs the file, renames into place, and (on Unix) fsyncs the directory to minimize corruption risk. - Writes are designed to avoid producing partially-written JSONL lines visible to readers.
- The crate uses an
-
Aggregation:
MetricsAggregatorperiodically scansevents.jsonl(and other sources as configured), computes per-engine statistics (min/max/avg latency, p50/p95/p99, success rate, mean quality), and appends one aggregated JSON record per period toaggregated_stats.jsonl.
-
Training-sample generation:
- When enabled,
DataCollectorcan auto-generatetraining_sampleentries fromcomparison_resultinput using configurable thresholds applied towinner.score(tiers:gold,silver,bronze). Generated samples are validated and appended totraining_samples.jsonl.
- When enabled,
-
Model management:
- The crate exposes a
model_storeutility to save model binaries atomically. AMAX_MODEL_BYTESlimit (e.g. 200_000_000) prevents saving oversized artifacts.
- The crate exposes a
-
Shutdown and durability:
- Background workers (collector and aggregator) flush buffers during graceful shutdown. Callers can also call
MetricsAggregator::stop()to request a clean stop and final flush.
- Background workers (collector and aggregator) flush buffers during graceful shutdown. Callers can also call
-
Observability:
- The crate exposes minimal counters (events received, writes flushed) and emits compact structured logs for operational visibility.
Example JSONL lines
engine_execution (one line per event):
comparison_result:
training_sample:
Mermaid diagram (data flow)
flowchart LR
App[Application] -->|collect_* APIs| Collector[DataCollector]
Collector --> Events[events.jsonl]
Collector --> Comparisons[comparisons.jsonl]
Collector --> Training[training_samples.jsonl]
Events --> Aggregator[MetricsAggregator]
Comparisons -->|generate| TrainingGen[Training Sample Generator]
TrainingGen --> Training
ModelStore[Model Store] -->|save_model| Models(models/)
Schemas
The repository includes JSON Schema v7 files under schemas/v1/ for engine_execution, comparison_result, and training_sample.
Atomic writes
The atomic_append helper writes to a temporary file in the same directory, calls sync_all() on the file, renames into place, and (on Unix) calls sync_all() on the directory to minimize corruption risk.
Configuration
Relevant configuration fields: json_engine.output.base_path, per-channel settings (enabled, filename, buffer_size, auto_flush_seconds), aggregation settings, and training-sample generation thresholds.
Native feature
The native model backend is feature-gated. When the native feature is disabled, model-loading APIs are no-ops and return Ok(None).
Recommended checks before publishing
Where to look
- Schemas:
schemas/v1/*.json - Core code:
src/collector,src/aggregator,src/config,src/validator
If you need examples, exact schema field descriptions, or unit tests for atomic_append, tell me which sections to expand and I will add them.
Visual schema diagrams
Below are Mermaid class diagrams that show the main fields and types for the three primary JSONL records handled by this crate. These are intended as a developer-friendly visual reference (not a full JSON Schema export).
classDiagram
class EngineExecution {
+int schema_version
+string trace_id
+string task_id
+string engine_id
+string engine_instance_id
+int timestamp
+string input
+string output
+Metrics metrics
+string status
}
class Metrics {
+int latency_ms
+float memory_mb
+float cpu_percent
+bool success
}
class ComparisonResult {
+string comparison_id
+string query
+Candidate[] candidates
+Winner winner
+Metrics metrics
}
class Candidate {
+string engine_id
+string output
+Metrics metrics
}
class Winner {
+string engine_id
+float score
}
class TrainingSample {
+string sample_id
+int created_at
+string source_trace_id
+string tier "gold|silver|bronze"
+string input
+string output
+Validation validation
}
class Validation {
+float quality_score
+bool validated
}
EngineExecution --> Metrics
ComparisonResult --> Candidate
ComparisonResult --> Winner
Candidate --> Metrics
TrainingSample --> Validation
If you prefer an ER-style diagram or a different diagram format (UML, detailed field descriptions, or a downloadable SVG/PNG), tell me which format and I will generate it.
use std::sync::Arc;
use parent_ai_json_engine::{Config, SchemaValidator, create_collector_from_config, MetricsAggregator};
fn main() -> parent_ai_json_engine::AnyResult<()> {
let config = Config::default();
let out = config.resolve_output_dir(std::path::Path::new("."));
let validator = Arc::new(SchemaValidator::new(std::path::Path::new("schemas"))?);
let collector = create_collector_from_config(validator.clone(), out.clone(), &config)?;
collector.collect_engine_execution("{...json...}".to_string())?;
let aggregator = std::sync::Arc::new(MetricsAggregator::new(out, config.json_engine.aggregation.window_minutes));
aggregator.start();
Ok(())
}
```
Data flow
- Input: call `collect_engine_execution`, `collect_comparison_result`, or `collect_training_sample` with JSON payloads matching the schemas in `schemas/v1/`.
- Buffering: each output channel buffers data in memory and flushes by size or interval.
- Persistence: JSONL lines are written atomically to `output_dir/<filename>`.
Schemas
See `schemas/v1/` for JSON Schema definitions for `engine_execution.json`, `comparison_result.json`, and `training_sample.json`.
Atomic writes
The crate provides an `atomic_append` helper that writes to a temporary file in the same directory, calls `sync_all()` on the file, renames into place, and (on Unix) calls `sync_all()` on the directory to reduce corruption risk.
Configuration
Relevant `Config` fields include `json_engine.output.base_path`, per-channel settings (`enabled`, `filename`, `buffer_size`, `auto_flush_seconds`), aggregation settings, and training sample generation thresholds.
Native feature
The native neural backend is optional and enabled via the `native` feature. When disabled, model-loading APIs are no-ops and return `Ok(None)`.
Recommended checks
```bash
cargo fmt --all
cargo clippy --all -- -D warnings
cargo test --lib
cargo build --release
```
Where to look
- Schemas: `schemas/v1/*.json`
- Core code: `src/collector`, `src/aggregator`, `src/config`, `src/validator`
If you want additional sections (example payloads, exact schema fields, or tests for atomic writes), tell me which parts to expand.