triplets
WORK IN PROGRESS
Composable Rust crate for deterministic multi-source sampling and split persistence for ML/AI training data.
triplets is a reusable core for ML/AI training-data orchestration. It provides sampler primitives, split/state persistence, chunking and weighting mechanics, and source abstractions (DataSource, DataRecord) without tying behavior to proprietary corpora.
Why this instead of a static dataset
Compared with a typical static dataset workflow, triplets is designed for deterministic runtime orchestration:
- Online deterministic sampling: sample from multiple sources at runtime instead of consuming one pre-materialized dump.
- Stable split assignment + persistence: keep train/validation/test membership reproducible across restarts and runs.
- Bounded ingestion windows: progress through large or changing corpora without loading everything at once.
- Recipe-time generation: build triplet/pair/text training examples during sampling rather than only reading pre-generated examples.
- Per-call source weighting: adjust source mixture without regenerating a static artifact.
- Streaming-aware refresh: incorporate newly available records on subsequent sampling calls.
Concurrency and source progression model:
- Each source has an independent cursor and buffer, so sources do not advance in lockstep.
- Source refreshes run concurrently within a sampling/refresh call.
- Synchronization happens at call boundaries: refresh threads are joined before buffer merge (not an always-on per-source ingest loop).
Philosophy
You can think of triplets as a training-pipeline orchestrator:
- Composability: recipe-driven pair/triplet/text generation independent of storage backend.
- Abstractions: source backends (filesystem, SQL, APIs, streams) are decoupled from sampling logic.
- Pipeline management: deterministic split assignment, bounded ingestion, chunk weighting, and persisted resume state.
Supply-chain mindset
- Suppliers: each
DataSourceis a supplier. - Manifests & traceability: stable record IDs plus deterministic split hashing keep records glued to train/validation/test.
- Inventory control: per-source cursors bound memory and support large corpora.
- Routing plan: seed + epoch + chunking define deterministic ordering.
- Packaged outputs: recipes emit triplets/pairs/text batches without changing source backends.
Highlights
- Data-source agnostic core – implement
DataSourcefor files, SQL, APIs, streams, etc. - Semantic recipes – define anchor/positive/negative selectors and mismatch strategies.
- Deterministic split manager – reproducible split assignment and optional persisted state.
- Quality knobs – per-record trust scores and chunk-level weighting.
- Chunk orchestration – overlap-aware windows with summary fallbacks.
- Thread-safe batching – serialized batch construction with multi-threaded source refresh.
- Prefetchers – background queueing for triplet/pair/text batch pipelines.
- Capacity estimation helpers – metadata-only split/pair/triplet/text estimates.
What this does (and does not do)
- Does: deterministic paging, split assignment, state persistence, and reproducible batch assembly.
- Does: enforce bounded ingestion and explicit resume semantics.
- Does: support both finite/index-backed sources and unbounded streaming/append-only sources.
- Does not: perform semantic mining, topic modeling, or relevance scoring by itself.
- Does not: assume every source is infinite.
- Does not: guarantee semantic hardness beyond recipe and source metadata design.
Getting started
Add triplets to a downstream crate:
To run the included examples in this repository (for exploration/contributor workflow):
For contributors (development check):
Minimal shape:
- Implement one or more
DataSourcebackends. - Create
SamplerConfig(chunking, recipes, split policy). - Open a split store (
DeterministicSplitStoreorFileSplitStore). - Construct
PairSamplerand register sources. - Call
next_*_batch(split)APIs. - Call
persist_state()when you want restart-resume behavior.
Examples
From the triplets crate:
# sample triplet batches
# inspect CLI flags
# metadata-only capacity estimation
Source roots can be overridden with repeatable flags:
Split-store path configuration
The multi_source_demo example persists sampler/split state by default to:
.sampler_store/split_store.bin
You can override persistence location with either:
--split-store-path <FILE>for an explicit file path--split-store-dir <DIR>to keep filenamesplit_store.binin a custom directory
Usage flow
Short version:
- Call
sampler.next_*_batch(split)to sample batches (ingestion happens automatically). - Call
sampler.persist_state()when you want restart-resume behavior. - Optionally call
sampler.set_epoch(n)for explicit epoch control.
Step-by-step:
- Build config + open the split store.
- Register sources.
- Call
sampler.next_*_batch(split). - Call
sampler.persist_state()when you want to save progress. - Optionally call
sampler.set_epoch(n)for explicit epoch replay/order.
Operational notes:
- File-backed indexing is rebuilt per process/run and stored in an OS temp-backed index store.
- Persisting sampler/split state is explicit and manual.
- One split-store file shares sampler/source cursor + RNG state unless you use separate store files.
- Batch calls are thread-safe but serialized; refresh work within a call can be parallelized per source.
- Source cursors advance independently per source, so one source can continue making progress even if another source is sparse or slower.
- Refresh concurrency is per call: source refreshes run in parallel for that call, then the sampler joins all refresh threads before merging buffers (not an always-on per-source background ingest loop).
- Prefetchers smooth latency by filling bounded queues from existing
next_*_batch(split)APIs. - New data from streaming sources is pulled in on the next
next_*_batch(split)call. sampler.persist_state()is manual; skipping it means no resume state after restart.sampler.set_epoch(n)is an advanced override and is not required for normal resume behavior.IngestionManager::source_refresh_stats()exposes per-source refresh duration/records/throughput/errors.metrics::source_skewsummarizes per-source sample imbalance for a batch.
Example:
use Arc;
use ;
# let split = SplitRatios ;
# let store = new;
# let config = default;
let sampler = new;
// register sources...
let prefetcher = clone.prefetch_triplet_batches;
let batch = prefetcher.next.unwrap;
let _ = batch;
- For per-call source weighting, use
next_*_batch_with_weights(split, &HashMap<SourceId, f32>). - Missing source ids default to
1.0;0.0disables a source for that call. - Production readiness note: if
len_hintdrifts in streaming/append-only sources, epoch order/coverage can repeat/skip records within an epoch, even though split assignment remains deterministic.
Sampling behavior (current)
This reflects the built-in file-corpus helpers (FileCorpusIndex) used by filesystem-backed sources.
- Ingestion:
next_*_batch(split)triggers refresh; per-source buffers refill when empty (or on force refresh). - Memory bound: refresh/cache limits are bounded by
ingestion_max_recordswith a floor atbatch_size. - File indexing: deterministic path ordering + deterministic index permutation for paging.
- Source ordering: round-robin by source, deterministic within-source ordering by seed/epoch.
- Splits: labels are deterministic from
record_id + seed + ratios; split APIs enforceallowed_splits. - Coverage caveat: if
len_hintdrifts mid-epoch in streaming backends, strict single-pass coverage is not guaranteed. - Weights: recipe/source/chunk weights affect scaling, not deterministic ordering.
- Scale note: full scan/sort/index rebuild cost grows roughly linearly with file count and path bytes.
- Order note: index batching preserves permutation order; chunked index reads do not remove deterministic shuffling.
- Manual epoch control:
sampler.set_epoch(n)resets per-source cursors and reshuffles deterministically for that epoch. - Persisted state scope: epoch tracking is split-aware, but sampler/source cursors + RNG/round-robin state are persisted per store file.
- Triplet recipe behavior: per-source recipes are scanned from per-source round-robin hints until a match is found.
- Pair batches: derived from triplets and follow the same source/recipe selection behavior.
- Text recipes: follow per-source behavior when provided; otherwise config recipes are used.
- Oversampling: when sources run dry, cached records may be reused (no global no-repeat guarantee).
New-source implementation pattern
For any new backend (file/API/DB/stream), centralize backend configuration/state access in one helper reused by both refresh(...) and reported_record_count().
Why this matters: capacity estimates and runtime sampling stay aligned only when both methods represent the same logical corpus slice.
File-backed pattern:
If a source emits sequential IDs, implement indexable paging (IndexableSource + IndexablePager or IndexableAdapter) to avoid time-ordered ingestion bias.
Example hash-sorted refresh skeleton:
use Utc;
use DefaultHasher;
use ;
use DataRecord;
use ;
use SamplerError;
Capacity estimates
The estimate helpers compute metadata-only approximations from source-reported counts and recipe structure.
- They do not call source refresh.
- They are floor-like approximations for real chunked training.
- Effective triplet estimates use bounded assumptions (positives/negatives per anchor).
Potential future directions (optional)
These are ideas, not commitments.
- Add more backend adapters in downstream crates (APIs, DBs, manifests, streams)
- Improve strict-coverage options for drifting/streaming corpora
- Add optional split-keyed sampler cursor state in a single store file
- Extend observability hooks for ingestion latency/skew/error diagnostics
License
triplets is primarily distributed under the terms of both the MIT license and the Apache License (Version 2.0).
See LICENSE-APACHE and LICENSE-MIT for details.