triplets 0.15.0-alpha

Composable data sampling primitives for deterministic multi-source ML/AI training-data orchestration.
Documentation

triplets

made-with-rust crates.io MIT licensed Apache 2.0 licensed

Work in progress.

Generate an effectively unlimited stream of training triplets, pairs, or plaintext samples from your existing corpus. This crate handles ingestion, multi-source mixing, deterministic train/validation/test splitting, and optional BM25 hard-negative mining.

Overview

In metric learning and language model training, a triplet consists of an anchor, a positive example (similar to the anchor), and a negative example (dissimilar to the anchor).

triplets provides a high-throughput streaming pipeline to:

  1. Ingest data from local text/CSV files, Hugging Face, or custom backends.
  2. Mix sources with configurable weights to balance your training data.
  3. Split data deterministically into train, validation, and test sets.
  4. Sample triplets or pairs using rule-based "recipes".
  5. Mine hard negatives using BM25 to improve model discrimination.
      Anchor
      /    \
 Positive Negative

 Triplet: (Anchor, Positive, Negative)

Getting Started

A TripletSampler needs a SplitStore for record-to-split assignments and a SamplerConfig for runtime behavior.

use std::sync::Arc;
use triplets::{
    SamplerConfig, TripletSampler, SplitRatios, 
    DeterministicSplitStore, SplitLabel, Sampler
};

fn main() -> Result<(), Box<dyn std::error::Error>> {
    // 1. Define your train/validation/test ratios (e.g., 80/10/10).
    let ratios = SplitRatios { train: 0.8, validation: 0.1, test: 0.1 };

    // 2. Initialize a deterministic split store.
    // The seed ensures record IDs are always assigned to the same split.
    let seed = 42;
    let store = Arc::new(DeterministicSplitStore::new(ratios, seed)?);

    // 3. Create the sampler.
    let mut sampler = TripletSampler::new(SamplerConfig::default(), store);
    Ok(())
}

Features

Feature What it enables Default
huggingface Streaming from Hugging Face dataset repositories. No
bm25-mining BM25 hard-negative ranking within strategy-defined pools. No
extended-metrics Additional per-triplet diagnostics for debugging. No

Configuring Sources

Hugging Face Source

Streams rows directly from the Hugging Face Hub without requiring a full dataset download. Map dataset columns to anchor, positive, or plain-text roles the same way as the CSV source.

#[cfg(feature = "huggingface")]
{
    use std::sync::Arc;
    use triplets::{SamplerConfig, TripletSampler, SplitRatios, DeterministicSplitStore, Sampler};
    use triplets::{HuggingFaceRowSource, HuggingFaceRowsConfig};

    fn main() -> Result<(), Box<dyn std::error::Error>> {
        let ratios = SplitRatios { train: 0.8, validation: 0.1, test: 0.1 };
        let store = Arc::new(DeterministicSplitStore::new(ratios, 42)?);
        let mut sampler = TripletSampler::new(SamplerConfig::default(), store);
        // Configure the source to pull the "train" split of a dataset.
        // Note: While we specify "train" here as the ingestion source, the crate
        // automatically handles its own deterministic split assignments (train/val/test)
        // at the record level across all loaded data.
        let config = HuggingFaceRowsConfig::new(
            "hf_finance",          // Source identifier
            "financial_phrasebank", // HF Dataset name
            "default",             // Dataset config
            "train",               // Dataset split
            "cache/hf_snapshots"   // Local cache for downloaded shards
        );

        let source = HuggingFaceRowSource::new(config)?;
        sampler.register_source(Box::new(source));
        Ok(())
    }
}

CSV Source

Load rows from a CSV file with explicit column mappings. The file must have a named header row — columns are always selected by name. Supports two modes:

  • Role mode — map separate columns to anchor and positive (context) roles.
  • Text mode — map a single column for SimCSE-style contrastive pre-training.
use std::sync::Arc;
use triplets::{SamplerConfig, TripletSampler, SplitRatios, DeterministicSplitStore};
use triplets::source::{CsvSource, CsvSourceConfig};

let ratios = SplitRatios { train: 0.8, validation: 0.1, test: 0.1 };
let store = Arc::new(DeterministicSplitStore::new(ratios, 42).unwrap());
let mut sampler = TripletSampler::new(SamplerConfig::default(), store);

// Role mode: map "question" → anchor, "answer" → positive.
let config = CsvSourceConfig::new("qna", "data/qna.csv")
    .with_anchor_column("question")
    .with_positive_column("answer")
    .with_trust(0.9);
let source = CsvSource::new(config).unwrap();
sampler.register_source(Box::new(source));

// Text mode (SimCSE): single column used for both anchor and context.
let config2 = CsvSourceConfig::new("corpus", "data/corpus.csv")
    .with_text_column("text");
let source2 = CsvSource::new(config2).unwrap();
sampler.register_source(Box::new(source2));

Rows with empty required fields are skipped. Column name matching is case-insensitive.

Text File Source

Recursively indexes plain-text files from a directory. Each file's stem (filename without extension) becomes the anchor and its body content becomes the context. Useful for local corpora where files are already titled meaningfully.

use std::sync::Arc;
use triplets::{SamplerConfig, TripletSampler, SplitRatios, DeterministicSplitStore};
use triplets::source::{FileSource, FileSourceConfig};

let ratios = SplitRatios { train: 0.8, validation: 0.1, test: 0.1 };
let store = Arc::new(DeterministicSplitStore::new(ratios, 42).unwrap());
let mut sampler = TripletSampler::new(SamplerConfig::default(), store);
// Point at a directory; all text files are indexed recursively.
// The filename stem is the anchor; the file body is the context.
let config = FileSourceConfig::new("docs", "./data/corpus")
    .with_text_files_only(true)
    .with_trust(0.9); // Assign a quality score to this source

let source = FileSource::new(config);
sampler.register_source(Box::new(source));

Implement the IndexableSource trait to integrate any backend that can fetch records by a stable integer index.

use std::sync::Arc;
use triplets::{SamplerConfig, TripletSampler, SplitRatios, DeterministicSplitStore};
use chrono::Utc;
use triplets::{DataRecord, SamplerError};
use triplets::source::{IndexableSource, IndexableAdapter};

struct MyApiSource;

impl IndexableSource for MyApiSource {
    fn id(&self) -> &str { "api_source" }
    fn len_hint(&self) -> Option<usize> { Some(1000) }
    fn record_at(&self, idx: usize) -> Result<Option<DataRecord>, SamplerError> {
        // Fetch record 'idx' from your database or API.
        Ok(Some(DataRecord {
            id: format!("api_{idx}"),
            source: self.id().into(),
            created_at: Utc::now(),
            updated_at: Utc::now(),
            quality: Default::default(),
            taxonomy: vec![],
            sections: vec![], // Add text content here
            meta_prefix: None,
        }))
    }
}

let ratios = SplitRatios { train: 0.8, validation: 0.1, test: 0.1 };
let store = Arc::new(DeterministicSplitStore::new(ratios, 42).unwrap());
let mut sampler = TripletSampler::new(SamplerConfig::default(), store);
let adapter = IndexableAdapter::new(MyApiSource);
sampler.register_source(Box::new(adapter));

Sampling and Mixing

Weighted Sampling

Adjust per-source sampling frequency to handle class imbalance or dataset quality differences.

use std::sync::Arc;
use triplets::{SamplerConfig, TripletSampler, SplitRatios, DeterministicSplitStore, SplitLabel, Sampler};
use std::collections::HashMap;

fn main() -> Result<(), Box<dyn std::error::Error>> {
    let ratios = SplitRatios { train: 0.8, validation: 0.1, test: 0.1 };
    let store = Arc::new(DeterministicSplitStore::new(ratios, 42)?);
    let mut sampler = TripletSampler::new(SamplerConfig::default(), store);
    // Pull from HF 70% of the time and local files 30% of the time.
    let mut weights = HashMap::new();
    weights.insert("hf_finance".to_string(), 0.7);
    weights.insert("docs".to_string(), 0.3);

    let batch = sampler.next_triplet_batch_with_weights(SplitLabel::Train, &weights)?;
    Ok(())
}

Output Format

The sampler produces SampleTriplet values containing sampled text and associated metadata.

use std::sync::Arc;
use triplets::{SamplerConfig, TripletSampler, SplitRatios, DeterministicSplitStore, SplitLabel, Sampler};
let ratios = SplitRatios { train: 0.8, validation: 0.1, test: 0.1 };
let store = Arc::new(DeterministicSplitStore::new(ratios, 42).unwrap());
let mut sampler = TripletSampler::new(SamplerConfig::default(), store);
let batch = sampler.next_triplet_batch(SplitLabel::Train).unwrap();
for triplet in batch.triplets {
    // Primary content
    let anchor_text = &triplet.anchor.text;
    let pos_text    = &triplet.positive.text;
    let neg_text    = &triplet.negative.text;
    
    // Metadata
    let recipe      = &triplet.recipe;      // which recipe was used
    let weight      = triplet.weight;       // training weight
    let instruction = triplet.instruction;  // optional instruction string
}

Epochs and Determinism

Iterating Epochs

In a typical training loop, signal a new epoch so the sampler can reset cursors and reshuffle sources.

use std::sync::Arc;
use triplets::{SamplerConfig, TripletSampler, SplitRatios, DeterministicSplitStore, SplitLabel, Sampler};
fn main() -> Result<(), Box<dyn std::error::Error>> {
    let ratios = SplitRatios { train: 0.8, validation: 0.1, test: 0.1 };
    let store = Arc::new(DeterministicSplitStore::new(ratios, 42)?);
    let mut sampler = TripletSampler::new(SamplerConfig::default(), store);
    let mut batches_left = 1;
    let mut training_not_finished = || {
        let ret = batches_left > 0;
        batches_left -= 1;
        ret
    };
    // In your training loop:
    for epoch in 0..10 {
        sampler.set_epoch(epoch)?;

        while training_not_finished() {
            let batch = sampler.next_triplet_batch(SplitLabel::Train)?;
            // ... pass batch to your model ...
        }

        // Save state at the end of each epoch to allow resuming if training is interrupted.
        sampler.save_sampler_state(None)?;
    }

    Ok(())
}

Deterministic Resuming

To resume training, initialize a FileSplitStore at the same path. The sampler automatically restores cursors, RNG state, and epoch progress from that store.

use std::sync::Arc;
use triplets::{SamplerConfig, TripletSampler, FileSplitStore, SplitRatios, Sampler};

fn main() -> Result<(), Box<dyn std::error::Error>> {
    let ratios = SplitRatios { train: 0.8, validation: 0.1, test: 0.1 };
    let seed = 42;

    // Opening an existing FileSplitStore automatically loads its persisted state.
    let store = Arc::new(FileSplitStore::open("checkpoints/splits.bin", ratios, seed)?);

    // The sampler will resume from the exact record and recipe it was on.
    let mut sampler = TripletSampler::new(SamplerConfig::default(), store);
    Ok(())
}

Note: Sampler state is intentionally lightweight. It persists source identifiers, integer record cursors, and compact RNG state vectors, not full data records. This keeps frequent checkpointing practical in long-running training jobs.

Technical Details

Threading Model

Concurrency is handled at multiple levels for high throughput:

  • Prefetching: BatchPrefetcher runs a dedicated background worker thread that fills a bounded queue.
  • Parallel Ingestion: Source refresh executes concurrently across registered sources during ingestion cycles.
  • Synchronous API: Sampling calls are synchronous at the API boundary for straightforward training-loop integration.
  • Thread-Safe Shared Use: TripletSampler is safe to share across threads (for example via Arc); concurrent calls are internally synchronized with a mutex, so a single sampler instance is callable from multiple threads without data races.

Chunking and Windows

Long documents are handled through a pluggable ChunkingAlgorithm. The default SlidingWindowChunker splits sections into fixed-size token windows with configurable overlap, preserving full coverage of long text.

Negative Mining

Negative selection is delegated to a pluggable backend.

  • DefaultBackend: Uniform random selection from the candidate pool.
  • Bm25Backend: (Requires bm25-mining) Ranks candidates by lexical overlap with the anchor to provide harder training examples.

Capabilities

Capability Description
Source Agnostic Implement DataSource or IndexableSource for any DB or API.
Weighted Sampling Tune source and recipe frequencies to handle class imbalance.
Epoch Shuffling Deterministic pseudo-random shuffling that re-permutes per epoch.
Instruction Tuning Attach task-specific prompts (e.g., "Summarize this...") to specific recipes.
Metadata Decorators Inject structured prefixes into sampled text via KvpPrefixSampler.
Anti-Shortcut Includes anchor/positive swapping to avoid asymmetric slot bias.

License

triplets is distributed under both the MIT license and the Apache License (Version 2.0).

See LICENSE-APACHE and LICENSE-MIT for details.