parent_ai_json_engine 0.0.2

Crate provides a JSON engine for collecting, aggregating, and managing models.
Documentation
  • Coverage
  • 0%
    0 out of 59 items documented0 out of 3 items with examples
  • Size
  • Source code size: 91.23 kB This is the summed size of all the files inside the crates.io package for this release.
  • Documentation size: 7.12 MB This is the summed size of all files generated by rustdoc for all configured targets
  • Ø build duration
  • this release: 19s Average build duration of successful builds.
  • all releases: 20s Average build duration of successful builds in releases after 2024-10-23.
  • Links
  • crates.io
  • Dependencies
  • Versions
  • Owners
  • rayanmorel4498-ai

parent_ai_json_engine

Lightweight Rust crate to collect, aggregate, and persist JSONL events for monitoring and model training.

Overview

This crate provides thread-safe primitives to collect JSONL events, optionally validate them against bundled JSON Schemas (schemas/v1/), persist them atomically to disk, and periodically aggregate metrics per engine.

Key components

  • DataCollector — buffer, validate (optional), and persist engine_execution, comparison_result, and training_sample JSONL lines.
  • MetricsAggregator — read events and write periodic aggregated statistics (min/max/avg latencies, p50/p95/p99, success rates, mean quality) per engine_id.
  • Config — configuration and the atomic_append helper for safe small writes.
  • SchemaValidator — lightweight loader for the JSON Schema files under schemas/v1/.
  • Optional native feature — enables native model backend; crate works without it.

Quick example

use std::sync::Arc;
use parent_ai_json_engine::{Config, SchemaValidator, create_collector_from_config, MetricsAggregator};

fn main() -> parent_ai_json_engine::AnyResult<()> {
	let config = Config::default();
	let out = config.resolve_output_dir(std::path::Path::new("."));
	let validator = Arc::new(SchemaValidator::new(std::path::Path::new("schemas"))?);
	let collector = create_collector_from_config(validator.clone(), out.clone(), &config)?;

	collector.collect_engine_execution("{...json...}".to_string())?;

	let aggregator = std::sync::Arc::new(MetricsAggregator::new(out, config.json_engine.aggregation.window_minutes));
	aggregator.start();

	Ok(())
}

Data flow

  • Input APIs: collect_engine_execution, collect_comparison_result, collect_training_sample with payloads matching the schemas in schemas/v1/.
  • Buffering: channels buffer events in memory and flush by size or interval.
  • Persistence: JSONL lines are appended atomically to output_dir/<filename> using the crate helper.

What json_engine does (detailed)

  • Accepts JSON payloads for three primary event types and persists them as JSONL:

    • engine_execution: records a single run of a given engine (latency, status, trace id, input/output).
    • comparison_result: contains a query and multiple candidate engine responses plus a winner with a score used to rank outputs.
    • training_sample: curated examples derived from comparison_result or produced externally for fine-tuning.
  • Validation: incoming payloads are optionally validated with SchemaValidator against the JSON Schema files in schemas/v1/. Invalid payloads are logged and skipped (configurable behavior).

  • Buffering and channels:

    • Each output channel buffers events in memory. Channels are configurable via Config with per-channel options: enabled, filename, buffer_size, auto_flush_seconds.
    • Default filenames produced by the crate: events.jsonl (engine executions), comparisons.jsonl (comparison results), training_samples.jsonl (generated or ingested training samples), and aggregated_stats.jsonl (aggregator output).
    • Flush triggers: buffer size threshold, periodic auto-flush interval, or explicit flush on shutdown.
  • Atomic persistence:

    • The crate uses an atomic_append helper to ensure safe on-disk writes. For small atomic updates the helper writes to a temporary file in the same directory, fsyncs the file, renames into place, and (on Unix) fsyncs the directory to minimize corruption risk.
    • Writes are designed to avoid producing partially-written JSONL lines visible to readers.
  • Aggregation:

    • MetricsAggregator periodically scans events.jsonl (and other sources as configured), computes per-engine statistics (min/max/avg latency, p50/p95/p99, success rate, mean quality), and appends one aggregated JSON record per period to aggregated_stats.jsonl.
  • Training-sample generation:

    • When enabled, DataCollector can auto-generate training_sample entries from comparison_result input using configurable thresholds applied to winner.score (tiers: gold, silver, bronze). Generated samples are validated and appended to training_samples.jsonl.
  • Model management:

    • The crate exposes a model_store utility to save model binaries atomically. A MAX_MODEL_BYTES limit (e.g. 200_000_000) prevents saving oversized artifacts.
  • Shutdown and durability:

    • Background workers (collector and aggregator) flush buffers during graceful shutdown. Callers can also call MetricsAggregator::stop() to request a clean stop and final flush.
  • Observability:

    • The crate exposes minimal counters (events received, writes flushed) and emits compact structured logs for operational visibility.

Example JSONL lines

engine_execution (one line per event):

{"schema_version":1,"trace_id":"t-123","task_id":"task-1","engine_id":"gpt-x","timestamp":1670000000,"input":"hello","output":"hi","metrics":{"latency_ms":42,"success":true}}

comparison_result:

{"comparison_id":"c-1","query":"translate this","candidates":[{"engine_id":"a","output":"...","metrics":{"latency_ms":20,"quality_score":0.7}},{"engine_id":"b","output":"...","metrics":{"latency_ms":35,"quality_score":0.9}}],"winner":{"engine_id":"b","score":0.9}}

training_sample:

{"sample_id":"s-1","created_at":1670000100,"source_trace_id":"t-123","tier":"gold","input":"hello","output":"hi","validation":{"quality_score":0.95,"validated":true}}

Mermaid diagram (data flow)

flowchart LR
	App[Application] -->|collect_* APIs| Collector[DataCollector]
	Collector --> Events[events.jsonl]
	Collector --> Comparisons[comparisons.jsonl]
	Collector --> Training[training_samples.jsonl]
	Events --> Aggregator[MetricsAggregator]
	Comparisons -->|generate| TrainingGen[Training Sample Generator]
	TrainingGen --> Training
	ModelStore[Model Store] -->|save_model| Models(models/)

Schemas

The repository includes JSON Schema v7 files under schemas/v1/ for engine_execution, comparison_result, and training_sample.

Atomic writes

The atomic_append helper writes to a temporary file in the same directory, calls sync_all() on the file, renames into place, and (on Unix) calls sync_all() on the directory to minimize corruption risk.

Configuration

Relevant configuration fields: json_engine.output.base_path, per-channel settings (enabled, filename, buffer_size, auto_flush_seconds), aggregation settings, and training-sample generation thresholds.

Native feature

The native model backend is feature-gated. When the native feature is disabled, model-loading APIs are no-ops and return Ok(None).

Recommended checks before publishing

cargo fmt --all
cargo clippy --all -- -D warnings
cargo test --lib
cargo build --release

Where to look

  • Schemas: schemas/v1/*.json
  • Core code: src/collector, src/aggregator, src/config, src/validator

If you need examples, exact schema field descriptions, or unit tests for atomic_append, tell me which sections to expand and I will add them.

Visual schema diagrams

Below are Mermaid class diagrams that show the main fields and types for the three primary JSONL records handled by this crate. These are intended as a developer-friendly visual reference (not a full JSON Schema export).

classDiagram
	class EngineExecution {
		+int schema_version
		+string trace_id
		+string task_id
		+string engine_id
		+string engine_instance_id
		+int timestamp
		+string input
		+string output
		+Metrics metrics
		+string status
	}

	class Metrics {
		+int latency_ms
		+float memory_mb
		+float cpu_percent
		+bool success
	}

	class ComparisonResult {
		+string comparison_id
		+string query
		+Candidate[] candidates
		+Winner winner
		+Metrics metrics
	}

	class Candidate {
		+string engine_id
		+string output
		+Metrics metrics
	}

	class Winner {
		+string engine_id
		+float score
	}

	class TrainingSample {
		+string sample_id
		+int created_at
		+string source_trace_id
		+string tier  "gold|silver|bronze"
		+string input
		+string output
		+Validation validation
	}

	class Validation {
		+float quality_score
		+bool validated
	}

	EngineExecution --> Metrics
	ComparisonResult --> Candidate
	ComparisonResult --> Winner
	Candidate --> Metrics
	TrainingSample --> Validation

If you prefer an ER-style diagram or a different diagram format (UML, detailed field descriptions, or a downloadable SVG/PNG), tell me which format and I will generate it.

use std::sync::Arc;
use parent_ai_json_engine::{Config, SchemaValidator, create_collector_from_config, MetricsAggregator};

fn main() -> parent_ai_json_engine::AnyResult<()> {
	let config = Config::default();
	let out = config.resolve_output_dir(std::path::Path::new("."));
	let validator = Arc::new(SchemaValidator::new(std::path::Path::new("schemas"))?);
	let collector = create_collector_from_config(validator.clone(), out.clone(), &config)?;

	collector.collect_engine_execution("{...json...}".to_string())?;

	let aggregator = std::sync::Arc::new(MetricsAggregator::new(out, config.json_engine.aggregation.window_minutes));
	aggregator.start();

	Ok(())
}
```

Data flow

- Input: call `collect_engine_execution`, `collect_comparison_result`, or `collect_training_sample` with JSON payloads matching the schemas in `schemas/v1/`.
- Buffering: each output channel buffers data in memory and flushes by size or interval.
- Persistence: JSONL lines are written atomically to `output_dir/<filename>`.

Schemas

See `schemas/v1/` for JSON Schema definitions for `engine_execution.json`, `comparison_result.json`, and `training_sample.json`.

Atomic writes

The crate provides an `atomic_append` helper that writes to a temporary file in the same directory, calls `sync_all()` on the file, renames into place, and (on Unix) calls `sync_all()` on the directory to reduce corruption risk.

Configuration

Relevant `Config` fields include `json_engine.output.base_path`, per-channel settings (`enabled`, `filename`, `buffer_size`, `auto_flush_seconds`), aggregation settings, and training sample generation thresholds.

Native feature

The native neural backend is optional and enabled via the `native` feature. When disabled, model-loading APIs are no-ops and return `Ok(None)`.

Recommended checks

```bash
cargo fmt --all
cargo clippy --all -- -D warnings
cargo test --lib
cargo build --release
```

Where to look

- Schemas: `schemas/v1/*.json`
- Core code: `src/collector`, `src/aggregator`, `src/config`, `src/validator`

If you want additional sections (example payloads, exact schema fields, or tests for atomic writes), tell me which parts to expand.