Expand description
§triplets
WORK IN PROGRESS.
Composable data sampling primitives for deterministic multi-source ML/AI training-data orchestration.
triplets is a reusable core for ML/AI training-data orchestration. It provides sampler primitives, split/state persistence, chunking and weighting mechanics, and source abstractions (DataSource, DataRecord) without tying behavior to proprietary corpora.
CI is configured to run tests/linting on macOS, Linux, and Windows.
§High-level features
- Automatic deterministic splits (train/validation/test) from record IDs + seed.
- Runtime batch sampling via
next_triplet_batch,next_pair_batch, andnext_text_batch. - Recipe-driven sample construction for triplet/pair/text generation (anchor/positive/negative selectors).
- Weight-aware sampling controls across source weights, recipe weights, and chunk trust/quality weighting.
- Resume support via
persist_state()and split-store persistence. - Source-agnostic backends (
DataSourceorIndexableSource+IndexableAdapter). - Supply-chain style orchestration (core layer): multi-source intake (
refresh) with per-call parallel ingest, optional per-source weighting, staged buffering, deterministic split routing, and batch assembly into train-ready outputs. - Bounded ingestion windows instead of loading full corpora into memory.
- Per-call source threading: during refresh, each source is fetched on its own short-lived thread, then merged deterministically for batch assembly.
- Streaming-friendly: sources can be finite or unbounded.
This crate does not perform semantic mining/retrieval scoring by itself; instead, it gives you deterministic, metadata-driven sampling primitives you can feed into your downstream mining/retrieval stack.
§Metadata-driven sampling flow
Use triplets to build deterministic training batches that carry metadata context:
- Put structural tags in
DataRecord.taxonomy(source/date/category/etc.) for filtering and analysis. - Use recipes/selectors to choose which sections become anchor/positive/negative text.
- Attach optional KVP metadata prefixes (below) so sampled text can include lightweight context headers.
- Keep split assignment deterministic while changing recipe or weighting behavior at runtime.
This gives you metadata-aware sampling orchestration, while semantic retrieval/mining logic stays in your downstream pipeline.
§KVP data decorator
- Each
DataRecordcan carry an optionalmeta_prefixsampler (KvpPrefixSampler). - At sample time, the sampler can prepend a header line to chunk text, formatted like:
meta: key=value | key2=value2. KvpFieldsupports multiple value renderings per key and optional per-field presence probability.KvpPrefixSamplersupports variant selection and overall dropout (emit prefix sometimes, or always).- This is designed to give the model useful context signals (date/source/category/etc.) without making a single rigid header pattern easy to memorize.
- Multi-render values, per-field presence control, field-order variation, and prefix dropout reduce shortcut learning and encourage reliance on the underlying content.
- KVP prefixes decorate sampled text; they do not change deterministic split assignment.
§Getting started
Add triplets to a downstream crate:
cargo add tripletsTo run the included examples in this repository (for exploration/contributor workflow):
cargo run --example multi_source_demo -- --helpFor contributors (development check):
cargo testMinimal shape:
- Implement one or more
DataSourcebackends. - Create
SamplerConfig(chunking, recipes, split policy). - Open a split store (
DeterministicSplitStoreorFileSplitStore). - Construct
PairSamplerand register sources. - Call one of the batch APIs:
next_triplet_batch(split),next_pair_batch(split), ornext_text_batch(split). - Call
persist_state()when you want restart-resume behavior.
§Examples
From the triplets crate:
# sample triplet batches
cargo run --example multi_source_demo
# inspect CLI flags
cargo run --example multi_source_demo -- --help
# metadata-only capacity estimation
cargo run --example estimate_capacity -- --help
cargo run --example estimate_capacitySource roots can be overridden with repeatable flags:
cargo run --example multi_source_demo -- \
--source-root /path/to/source_1 \
--source-root /path/to/source_2§Split-store path configuration
The multi_source_demo example persists sampler/split state by default to:
.sampler_store/split_store.bin
You can override persistence location with either:
--split-store-path <FILE>for an explicit file path--split-store-dir <DIR>to keep filenamesplit_store.binin a custom directory
§Usage flow
Short version:
- Call
sampler.next_triplet_batch(split),sampler.next_pair_batch(split), orsampler.next_text_batch(split)to sample batches (ingestion happens automatically). - Call
sampler.persist_state()when you want restart-resume behavior. - Optionally call
sampler.set_epoch(n)for explicit epoch control.
Step-by-step:
- Build config + open the split store.
- Register sources.
- Call one of
sampler.next_triplet_batch(split),sampler.next_pair_batch(split), orsampler.next_text_batch(split). - Call
sampler.persist_state()when you want to write persisted sampler/split state (typically at the end of an epoch or at explicit checkpoint boundaries). Do not call this every step. Very frequent writes can create high I/O overhead and, at very large write counts (for example, tens of millions), can also adversely affect split-store initialization time. - Optionally call
sampler.set_epoch(n)for explicit epoch replay/order.
Operational notes:
- File-backed indexing is rebuilt per process/run and stored in an OS temp-backed index store.
- Persisting sampler/split state is explicit and manual.
- One split-store file shares sampler/source cursor + RNG state unless you use separate store files.
- Batch calls are thread-safe but serialized; refresh work within a call can be parallelized per source.
- Source cursors advance independently per source, so one source can continue making progress even if another source is sparse or slower.
- Refresh concurrency is per call: source refreshes run in parallel for that call, then the sampler joins all refresh threads before merging buffers (not an always-on per-source background ingest loop).
- Prefetchers smooth latency by filling bounded queues from the existing batch APIs (
next_triplet_batch,next_pair_batch,next_text_batch). - New data from streaming sources is pulled in on the next batch call.
sampler.persist_state()is manual; skipping it means no resume state after restart.sampler.set_epoch(n)is an advanced override and is not required for normal resume behavior.IngestionManager::source_refresh_stats()exposes per-source refresh duration/records/throughput/errors.metrics::source_skewsummarizes per-source sample imbalance for a batch.
Example:
use std::sync::Arc;
use triplets::{
DeterministicSplitStore, PairSampler, Sampler, SamplerConfig, SplitLabel, SplitRatios,
};
let sampler = Arc::new(PairSampler::new(config, store));
// register sources...
let prefetcher = Arc::clone(&sampler).prefetch_triplet_batches(SplitLabel::Train, 4);
let batch = prefetcher.next().unwrap();
let _ = batch;- For per-call source weighting, use
next_triplet_batch_with_weights(...),next_pair_batch_with_weights(...), ornext_text_batch_with_weights(...). - Missing source ids default to
1.0;0.0disables a source for that call.
Example (different source mix across consecutive batches):
use std::collections::HashMap;
use std::sync::Arc;
use triplets::{
DeterministicSplitStore, PairSampler, Sampler, SamplerConfig, SplitLabel, SplitRatios,
};
let mut weights_a = HashMap::new();
weights_a.insert("source_a".to_string(), 1.0);
weights_a.insert("source_b".to_string(), 0.2);
let mut weights_b = HashMap::new();
weights_b.insert("source_a".to_string(), 0.2);
weights_b.insert("source_b".to_string(), 1.0);
let batch_a = sampler
.next_triplet_batch_with_weights(SplitLabel::Train, &weights_a)
.unwrap();
let batch_b = sampler
.next_triplet_batch_with_weights(SplitLabel::Train, &weights_b)
.unwrap();
let _ = (batch_a, batch_b);- Production readiness note: if
len_hintdrifts in streaming/append-only sources, epoch order/coverage can repeat/skip records within an epoch, even though split assignment remains deterministic.
§Sampling behavior (current)
This reflects the built-in file-corpus helpers (FileCorpusIndex) used by filesystem-backed sources.
- Ingestion:
next_triplet_batch(split),next_pair_batch(split), andnext_text_batch(split)trigger refresh; per-source buffers refill when empty (or on force refresh). - Memory bound: refresh/cache limits are bounded by
ingestion_max_recordswith a floor atbatch_size. - File indexing: deterministic path ordering + deterministic index permutation for paging.
- Source ordering: round-robin by source, deterministic within-source ordering by seed/epoch.
- Splits: labels are deterministic from
record_id + seed + ratios; split APIs enforceallowed_splits. - Coverage caveat: if
len_hintdrifts mid-epoch in streaming backends, strict single-pass coverage is not guaranteed. - Weights: recipe/source/chunk weights affect scaling, not deterministic ordering.
- Scale note: full scan/sort/index rebuild cost grows roughly linearly with file count and path bytes.
- Order note: index batching preserves permutation order; chunked index reads do not remove deterministic shuffling.
- Manual epoch control:
sampler.set_epoch(n)resets per-source cursors and reshuffles deterministically for that epoch. - Persisted state scope: epoch tracking is split-aware, but sampler/source cursors + RNG/round-robin state are persisted per store file.
- Triplet recipe behavior: per-source recipes are scanned from per-source round-robin hints until a match is found.
- Pair batches: derived from triplets and follow the same source/recipe selection behavior.
- Text recipes: follow per-source behavior when provided; otherwise config recipes are used.
- Oversampling: when sources run dry, cached records may be reused (no global no-repeat guarantee).
§New-source implementation pattern
For any new backend (file/API/DB/stream), centralize backend configuration/state access in one helper reused by both refresh(...) and reported_record_count().
Why this matters: capacity estimates and runtime sampling stay aligned only when both methods represent the same logical corpus slice.
File-backed pattern:
fn source_index(&self) -> FileCorpusIndex {
FileCorpusIndex::new(&self.root, &self.id)
.with_follow_links(true)
.with_text_files_only(true)
.with_directory_grouping(true)
}
fn refresh(
&self,
cursor: Option<&SourceCursor>,
limit: Option<usize>,
) -> Result<SourceSnapshot, SamplerError> {
self.source_index()
.refresh_indexable(cursor, limit, |path| self.build_record(path))
}
fn reported_record_count(&self) -> Option<u128> {
self.source_index().indexed_record_count().ok().map(|n| n as u128)
}If your records are time-ordered (oldest → newest), use these APIs:
IndexableSource(you providelen_hint()+record_at(idx)).IndexableAdapter(easiest: turns yourIndexableSourceinto aDataSource).IndexablePager(use directly only if you are writing a customrefresh(...)).
That is the built-in path for shuffled paging + cursor resume.
Helper-based path (uses the APIs above):
use triplets::source::{IndexableAdapter, IndexableSource};
use triplets::{data::DataRecord, SamplerError};
struct MyIndexableSource {
// Could be DB/API client, manifest reader, etc.
// No in-memory ID list required.
total_records: usize,
}
impl MyIndexableSource {
fn load_record(&self, _idx: usize) -> Result<Option<DataRecord>, SamplerError> {
// Fetch by numeric position from your backend.
// `None` means "no record at this index".
todo!("load one record by index")
}
}
impl IndexableSource for MyIndexableSource {
fn id(&self) -> &str { "my_source" }
fn len_hint(&self) -> Option<usize> { Some(self.total_records) }
fn record_at(&self, idx: usize) -> Result<Option<DataRecord>, SamplerError> {
self.load_record(idx)
}
}
// register as a normal DataSource:
// sampler.register_source(Box::new(IndexableAdapter::new(MyIndexableSource { total_records }))); Manual path (does NOT use IndexableSource/IndexableAdapter directly):
use chrono::Utc;
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
use triplets::data::DataRecord;
use triplets::source::{SourceCursor, SourceSnapshot};
use triplets::SamplerError;
struct MySource {
// Canonical record IDs for this source.
// We keep IDs separate from record payloads so refresh can page deterministically.
ids: Vec<String>,
}
impl MySource {
fn load_record(&self, _id: &str) -> Result<DataRecord, SamplerError> {
// Put your real fetch logic here (database call, API request, file read, etc.).
// The sampler expects each loaded item to be returned as a DataRecord.
todo!("load record from storage")
}
fn stable_hash(id: &str) -> u64 {
// Convert each ID to a repeatable number so ordering is the same every run.
// This avoids "newest-first" bias when IDs are naturally time-ordered.
let mut hasher = DefaultHasher::new();
id.hash(&mut hasher);
hasher.finish()
}
fn refresh(
&self,
cursor: Option<&SourceCursor>,
limit: Option<usize>,
) -> Result<SourceSnapshot, SamplerError> {
// Make a sorted copy of IDs so this call runs in a repeatable order.
// Note: this copy holds all IDs in memory for this refresh call.
let mut ids = self.ids.clone();
ids.sort_by_key(|id| Self::stable_hash(id));
// How many records exist right now.
let total = ids.len();
// `revision` means "where to resume next time".
// No cursor yet means this is the first run, so start at index 0.
let mut start = cursor.map(|c| c.revision as usize).unwrap_or(0);
// If data size changed and start is now invalid, safely reset to the beginning.
if total > 0 && start >= total {
start = 0;
}
// Hard cap for this call.
// - If `limit` is Some(n), we load at most `n` records this call.
// - If `limit` is None, we allow one full pass (`total` records).
let max = limit.unwrap_or(total);
let mut records = Vec::new();
// Load records one-by-one, starting at `start`, and wrap at the end.
// We stop as soon as `records.len() == max`.
// So this does NOT always load everything; it only loads up to `max`.
for idx in 0..total {
if records.len() >= max {
break;
}
let pos = (start + idx) % total;
records.push(self.load_record(&ids[pos])?);
}
// Save where the next call should continue.
let next_start = (start + records.len()) % total.max(1);
Ok(SourceSnapshot {
records,
cursor: SourceCursor {
// Record when this refresh happened.
last_seen: Utc::now(),
// Store resume position for the next refresh call.
revision: next_start as u64,
},
})
}
}§Capacity estimates
The estimate helpers compute metadata-only approximations from source-reported counts and recipe structure.
- They do not call source refresh.
- They are floor-like approximations for real chunked training.
- Effective triplet estimates use bounded assumptions (positives/negatives per anchor).
§Potential future directions (optional)
These are ideas, not commitments.
- Add more backend adapters in downstream crates (APIs, DBs, manifests, streams)
- Improve strict-coverage options for drifting/streaming corpora
- Add optional split-keyed sampler cursor state in a single store file
- Extend observability hooks for ingestion latency/skew/error diagnostics
§License
triplets is primarily distributed under the terms of both the MIT license and the Apache License (Version 2.0).
See LICENSE-APACHE and LICENSE-MIT for details.
Re-exports§
pub use config::ChunkingStrategy;pub use config::NegativeStrategy;pub use config::SamplerConfig;pub use config::Selector;pub use config::TextRecipe;pub use config::TripletRecipe;pub use data::DataRecord;pub use data::PairLabel;pub use data::QualityScore;pub use data::RecordChunk;pub use data::SampleBatch;pub use data::SamplePair;pub use data::SampleTriplet;pub use data::SectionRole;pub use data::TextBatch;pub use data::TextSample;pub use data::TripletBatch;pub use ingestion::IngestionManager;pub use ingestion::RecordCache;pub use kvp::KvpField;pub use kvp::KvpPrefixSampler;pub use sampler::BatchPrefetcher;pub use sampler::PairSampler;pub use sampler::Sampler;pub use source::DataSource;pub use source::SourceCursor;pub use splits::DeterministicSplitStore;pub use splits::FileSplitStore;pub use splits::SplitLabel;pub use splits::SplitRatios;pub use splits::SplitStore;pub use types::CategoryId;pub use types::HashPart;pub use types::KvpValue;pub use types::LogMessage;pub use types::MetaValue;pub use types::PathString;pub use types::RecipeKey;pub use types::RecordId;pub use types::Sentence;pub use types::SourceId;pub use types::TaxonomyValue;
Modules§
- config
- Sampling configuration types.
- constants
- Centralized constants used across sampler, splits, and sources.
- data
- Data record and sample batch types.
- example_
apps - Reusable example runners shared by downstream crates.
- heuristics
- Capacity and sampling estimation helpers.
- ingestion
- Background ingestion and caching infrastructure.
- kvp
- Key/value prefix sampling helpers.
- metadata
- Metadata keys and helpers.
- metrics
- Aggregate metrics helpers.
- sampler
- Sampler implementations and public sampling API.
- source
- Data source traits and built-in sources. Data source interfaces and paging helpers.
- splits
- Split stores and persistence helpers.
- transport
- Input transports used by sources (filesystem today; DBs later).
- types
- Shared type aliases.
- utils
- Text normalization helpers. Text normalization helpers shared by source implementations.
Enums§
- Sampler
Error - Error type for sampler configuration, IO, and persistence failures.