parent_ai_json_engine 0.0.2

Crate provides a JSON engine for collecting, aggregating, and managing models.
Documentation
# parent_ai_json_engine

Lightweight Rust crate to collect, aggregate, and persist JSONL events for monitoring and model training.

Overview

This crate provides thread-safe primitives to collect JSONL events, optionally validate them against bundled JSON Schemas (`schemas/v1/`), persist them atomically to disk, and periodically aggregate metrics per engine.

Key components

- `DataCollector` — buffer, validate (optional), and persist `engine_execution`, `comparison_result`, and `training_sample` JSONL lines.
- `MetricsAggregator` — read events and write periodic aggregated statistics (min/max/avg latencies, p50/p95/p99, success rates, mean quality) per `engine_id`.
- `Config` — configuration and the `atomic_append` helper for safe small writes.
- `SchemaValidator` — lightweight loader for the JSON Schema files under `schemas/v1/`.
- Optional `native` feature — enables native model backend; crate works without it.

Quick example

```rust
use std::sync::Arc;
use parent_ai_json_engine::{Config, SchemaValidator, create_collector_from_config, MetricsAggregator};

fn main() -> parent_ai_json_engine::AnyResult<()> {
	let config = Config::default();
	let out = config.resolve_output_dir(std::path::Path::new("."));
	let validator = Arc::new(SchemaValidator::new(std::path::Path::new("schemas"))?);
	let collector = create_collector_from_config(validator.clone(), out.clone(), &config)?;

	collector.collect_engine_execution("{...json...}".to_string())?;

	let aggregator = std::sync::Arc::new(MetricsAggregator::new(out, config.json_engine.aggregation.window_minutes));
	aggregator.start();

	Ok(())
}
```

Data flow

- Input APIs: `collect_engine_execution`, `collect_comparison_result`, `collect_training_sample` with payloads matching the schemas in `schemas/v1/`.
- Buffering: channels buffer events in memory and flush by size or interval.
- Persistence: JSONL lines are appended atomically to `output_dir/<filename>` using the crate helper.

What json_engine does (detailed)

- Accepts JSON payloads for three primary event types and persists them as JSONL:
	- `engine_execution`: records a single run of a given engine (latency, status, trace id, input/output).
	- `comparison_result`: contains a query and multiple candidate engine responses plus a `winner` with a `score` used to rank outputs.
	- `training_sample`: curated examples derived from `comparison_result` or produced externally for fine-tuning.

- Validation: incoming payloads are optionally validated with `SchemaValidator` against the JSON Schema files in `schemas/v1/`. Invalid payloads are logged and skipped (configurable behavior).

- Buffering and channels:
	- Each output channel buffers events in memory. Channels are configurable via `Config` with per-channel options: `enabled`, `filename`, `buffer_size`, `auto_flush_seconds`.
	- Default filenames produced by the crate: `events.jsonl` (engine executions), `comparisons.jsonl` (comparison results), `training_samples.jsonl` (generated or ingested training samples), and `aggregated_stats.jsonl` (aggregator output).
	- Flush triggers: buffer size threshold, periodic auto-flush interval, or explicit flush on shutdown.

- Atomic persistence:
	- The crate uses an `atomic_append` helper to ensure safe on-disk writes. For small atomic updates the helper writes to a temporary file in the same directory, fsyncs the file, renames into place, and (on Unix) fsyncs the directory to minimize corruption risk.
	- Writes are designed to avoid producing partially-written JSONL lines visible to readers.

- Aggregation:
	- `MetricsAggregator` periodically scans `events.jsonl` (and other sources as configured), computes per-engine statistics (min/max/avg latency, p50/p95/p99, success rate, mean quality), and appends one aggregated JSON record per period to `aggregated_stats.jsonl`.

- Training-sample generation:
	- When enabled, `DataCollector` can auto-generate `training_sample` entries from `comparison_result` input using configurable thresholds applied to `winner.score` (tiers: `gold`, `silver`, `bronze`). Generated samples are validated and appended to `training_samples.jsonl`.

- Model management:
	- The crate exposes a `model_store` utility to save model binaries atomically. A `MAX_MODEL_BYTES` limit (e.g. 200_000_000) prevents saving oversized artifacts.

- Shutdown and durability:
	- Background workers (collector and aggregator) flush buffers during graceful shutdown. Callers can also call `MetricsAggregator::stop()` to request a clean stop and final flush.

- Observability:
	- The crate exposes minimal counters (events received, writes flushed) and emits compact structured logs for operational visibility.

Example JSONL lines

engine_execution (one line per event):

```json
{"schema_version":1,"trace_id":"t-123","task_id":"task-1","engine_id":"gpt-x","timestamp":1670000000,"input":"hello","output":"hi","metrics":{"latency_ms":42,"success":true}}
```

comparison_result:

```json
{"comparison_id":"c-1","query":"translate this","candidates":[{"engine_id":"a","output":"...","metrics":{"latency_ms":20,"quality_score":0.7}},{"engine_id":"b","output":"...","metrics":{"latency_ms":35,"quality_score":0.9}}],"winner":{"engine_id":"b","score":0.9}}
```

training_sample:

```json
{"sample_id":"s-1","created_at":1670000100,"source_trace_id":"t-123","tier":"gold","input":"hello","output":"hi","validation":{"quality_score":0.95,"validated":true}}
```

Mermaid diagram (data flow)

```mermaid
flowchart LR
	App[Application] -->|collect_* APIs| Collector[DataCollector]
	Collector --> Events[events.jsonl]
	Collector --> Comparisons[comparisons.jsonl]
	Collector --> Training[training_samples.jsonl]
	Events --> Aggregator[MetricsAggregator]
	Comparisons -->|generate| TrainingGen[Training Sample Generator]
	TrainingGen --> Training
	ModelStore[Model Store] -->|save_model| Models(models/)
```


Schemas

The repository includes JSON Schema v7 files under `schemas/v1/` for `engine_execution`, `comparison_result`, and `training_sample`.

Atomic writes

The `atomic_append` helper writes to a temporary file in the same directory, calls `sync_all()` on the file, renames into place, and (on Unix) calls `sync_all()` on the directory to minimize corruption risk.

Configuration

Relevant configuration fields: `json_engine.output.base_path`, per-channel settings (`enabled`, `filename`, `buffer_size`, `auto_flush_seconds`), aggregation settings, and training-sample generation thresholds.

Native feature

The native model backend is feature-gated. When the `native` feature is disabled, model-loading APIs are no-ops and return `Ok(None)`.

Recommended checks before publishing

```bash
cargo fmt --all
cargo clippy --all -- -D warnings
cargo test --lib
cargo build --release
```

Where to look

- Schemas: `schemas/v1/*.json`
- Core code: `src/collector`, `src/aggregator`, `src/config`, `src/validator`

If you need examples, exact schema field descriptions, or unit tests for `atomic_append`, tell me which sections to expand and I will add them.

Visual schema diagrams

Below are Mermaid class diagrams that show the main fields and types for the three primary JSONL records handled by this crate. These are intended as a developer-friendly visual reference (not a full JSON Schema export).

```mermaid
classDiagram
	class EngineExecution {
		+int schema_version
		+string trace_id
		+string task_id
		+string engine_id
		+string engine_instance_id
		+int timestamp
		+string input
		+string output
		+Metrics metrics
		+string status
	}

	class Metrics {
		+int latency_ms
		+float memory_mb
		+float cpu_percent
		+bool success
	}

	class ComparisonResult {
		+string comparison_id
		+string query
		+Candidate[] candidates
		+Winner winner
		+Metrics metrics
	}

	class Candidate {
		+string engine_id
		+string output
		+Metrics metrics
	}

	class Winner {
		+string engine_id
		+float score
	}

	class TrainingSample {
		+string sample_id
		+int created_at
		+string source_trace_id
		+string tier  "gold|silver|bronze"
		+string input
		+string output
		+Validation validation
	}

	class Validation {
		+float quality_score
		+bool validated
	}

	EngineExecution --> Metrics
	ComparisonResult --> Candidate
	ComparisonResult --> Winner
	Candidate --> Metrics
	TrainingSample --> Validation
```

If you prefer an ER-style diagram or a different diagram format (UML, detailed field descriptions, or a downloadable SVG/PNG), tell me which format and I will generate it.

	use std::sync::Arc;
	use parent_ai_json_engine::{Config, SchemaValidator, create_collector_from_config, MetricsAggregator};

	fn main() -> parent_ai_json_engine::AnyResult<()> {
		let config = Config::default();
		let out = config.resolve_output_dir(std::path::Path::new("."));
		let validator = Arc::new(SchemaValidator::new(std::path::Path::new("schemas"))?);
		let collector = create_collector_from_config(validator.clone(), out.clone(), &config)?;

		collector.collect_engine_execution("{...json...}".to_string())?;

		let aggregator = std::sync::Arc::new(MetricsAggregator::new(out, config.json_engine.aggregation.window_minutes));
		aggregator.start();

		Ok(())
	}
	```

	Data flow

	- Input: call `collect_engine_execution`, `collect_comparison_result`, or `collect_training_sample` with JSON payloads matching the schemas in `schemas/v1/`.
	- Buffering: each output channel buffers data in memory and flushes by size or interval.
	- Persistence: JSONL lines are written atomically to `output_dir/<filename>`.

	Schemas

	See `schemas/v1/` for JSON Schema definitions for `engine_execution.json`, `comparison_result.json`, and `training_sample.json`.

	Atomic writes

	The crate provides an `atomic_append` helper that writes to a temporary file in the same directory, calls `sync_all()` on the file, renames into place, and (on Unix) calls `sync_all()` on the directory to reduce corruption risk.

	Configuration

	Relevant `Config` fields include `json_engine.output.base_path`, per-channel settings (`enabled`, `filename`, `buffer_size`, `auto_flush_seconds`), aggregation settings, and training sample generation thresholds.

	Native feature

	The native neural backend is optional and enabled via the `native` feature. When disabled, model-loading APIs are no-ops and return `Ok(None)`.

	Recommended checks

	```bash
	cargo fmt --all
	cargo clippy --all -- -D warnings
	cargo test --lib
	cargo build --release
	```

	Where to look

	- Schemas: `schemas/v1/*.json`
	- Core code: `src/collector`, `src/aggregator`, `src/config`, `src/validator`

	If you want additional sections (example payloads, exact schema fields, or tests for atomic writes), tell me which parts to expand.