triplets 0.14.0-alpha

Composable data sampling primitives for deterministic multi-source ML/AI training-data orchestration.
Documentation
# triplets

[![made-with-rust][rust-logo]][rust-src-page] [![crates.io][crates-badge]][crates-page] [![MIT licensed][mit-license-badge]][mit-license-page] [![Apache 2.0 licensed][apache-2.0-license-badge]][apache-2.0-license-page]

**WORK IN PROGRESS.**

Generate an effectively unlimited stream of [training triplets](https://en.wikipedia.org/wiki/Triplet_loss), pairs, or plaintext samples from your existing corpus. This crate handles ingestion, multi-source mixing, deterministic train/validation/test splitting, and optional [BM25](https://en.wikipedia.org/wiki/Okapi_BM25) hard-negative mining.

## Overview

In metric learning and language model training, a **triplet** consists of an **anchor**, a **positive** example (similar to the anchor), and a **negative** example (dissimilar to the anchor).

`triplets` provides a high-throughput streaming pipeline to:
1. **Ingest** data from local files, Hugging Face, or custom backends.
2. **Mix** sources with configurable weights to balance your training data.
3. **Split** data deterministically into train, validation, and test sets.
4. **Sample** triplets or pairs using rule-based "recipes".
5. **Mine** hard negatives using BM25 to improve model discrimination.

```text
      Anchor
      /    \
 Positive Negative

 Triplet: (Anchor, Positive, Negative)
```

## Getting Started

A `TripletSampler` needs a `SplitStore` for record-to-split assignments and a `SamplerConfig` for runtime behavior.

```rust
use std::sync::Arc;
use triplets::{
    SamplerConfig, TripletSampler, SplitRatios, 
    DeterministicSplitStore, SplitLabel, Sampler
};

fn main() -> Result<(), Box<dyn std::error::Error>> {
    // 1. Define your train/validation/test ratios (e.g., 80/10/10).
    let ratios = SplitRatios { train: 0.8, validation: 0.1, test: 0.1 };

    // 2. Initialize a deterministic split store.
    // The seed ensures record IDs are always assigned to the same split.
    let seed = 42;
    let store = Arc::new(DeterministicSplitStore::new(ratios, seed)?);

    // 3. Create the sampler.
    let mut sampler = TripletSampler::new(SamplerConfig::default(), store);
    Ok(())
}
```

## Features

| Feature            | What it enables                                           | Default |
| ------------------ | --------------------------------------------------------- | ------- |
| `huggingface`      | Streaming from Hugging Face dataset repositories.         | No      |
| `bm25-mining`      | BM25 hard-negative ranking within strategy-defined pools. | No      |
| `extended-metrics` | Additional per-triplet diagnostics for debugging.         | No      |

## Configuring Sources

### Local File Source
Recursively indexes text files from a directory. Ideal for local datasets and exported corpora.

```rust
use std::sync::Arc;
use triplets::{SamplerConfig, TripletSampler, SplitRatios, DeterministicSplitStore};
use triplets::source::{FileSource, FileSourceConfig};

let ratios = SplitRatios { train: 0.8, validation: 0.1, test: 0.1 };
let store = Arc::new(DeterministicSplitStore::new(ratios, 42).unwrap());
let mut sampler = TripletSampler::new(SamplerConfig::default(), store);
// Create a source named "docs" targeting a local directory.
let config = FileSourceConfig::new("docs", "./data/corpus")
    .with_text_files_only(true)
    .with_trust(0.9); // Assign a quality score to this source

let source = FileSource::new(config);
sampler.register_source(Box::new(source));
```


### Hugging Face Source
Streams rows directly from the Hugging Face Hub without requiring a full dataset download.

```rust,no_run
#[cfg(feature = "huggingface")]
{
    use std::sync::Arc;
    use triplets::{SamplerConfig, TripletSampler, SplitRatios, DeterministicSplitStore, Sampler};
    use triplets::{HuggingFaceRowSource, HuggingFaceRowsConfig};

    fn main() -> Result<(), Box<dyn std::error::Error>> {
        let ratios = SplitRatios { train: 0.8, validation: 0.1, test: 0.1 };
        let store = Arc::new(DeterministicSplitStore::new(ratios, 42)?);
        let mut sampler = TripletSampler::new(SamplerConfig::default(), store);
        // Configure the source to pull the "train" split of a dataset.
        // Note: While we specify "train" here as the ingestion source, the crate
        // automatically handles its own deterministic split assignments (train/val/test)
        // at the record level across all loaded data.
        let config = HuggingFaceRowsConfig::new(
            "hf_finance",          // Source identifier
            "financial_phrasebank", // HF Dataset name
            "default",             // Dataset config
            "train",               // Dataset split
            "cache/hf_snapshots"   // Local cache for downloaded shards
        );

        let source = HuggingFaceRowSource::new(config)?;
        sampler.register_source(Box::new(source));
        Ok(())
    }
}
```

### Custom Data Source
Implement the `IndexableSource` trait to integrate any backend that can fetch records by a stable integer index.

```rust
use std::sync::Arc;
use triplets::{SamplerConfig, TripletSampler, SplitRatios, DeterministicSplitStore};
use chrono::Utc;
use triplets::{DataRecord, SamplerError};
use triplets::source::{IndexableSource, IndexableAdapter};

struct MyApiSource;

impl IndexableSource for MyApiSource {
    fn id(&self) -> &str { "api_source" }
    fn len_hint(&self) -> Option<usize> { Some(1000) }
    fn record_at(&self, idx: usize) -> Result<Option<DataRecord>, SamplerError> {
        // Fetch record 'idx' from your database or API.
        Ok(Some(DataRecord {
            id: format!("api_{idx}"),
            source: self.id().into(),
            created_at: Utc::now(),
            updated_at: Utc::now(),
            quality: Default::default(),
            taxonomy: vec![],
            sections: vec![], // Add text content here
            meta_prefix: None,
        }))
    }
}

let ratios = SplitRatios { train: 0.8, validation: 0.1, test: 0.1 };
let store = Arc::new(DeterministicSplitStore::new(ratios, 42).unwrap());
let mut sampler = TripletSampler::new(SamplerConfig::default(), store);
let adapter = IndexableAdapter::new(MyApiSource);
sampler.register_source(Box::new(adapter));
```


## Sampling and Mixing

### Weighted Sampling
Adjust per-source sampling frequency to handle class imbalance or dataset quality differences.

```rust,no_run
use std::sync::Arc;
use triplets::{SamplerConfig, TripletSampler, SplitRatios, DeterministicSplitStore, SplitLabel, Sampler};
use std::collections::HashMap;

fn main() -> Result<(), Box<dyn std::error::Error>> {
    let ratios = SplitRatios { train: 0.8, validation: 0.1, test: 0.1 };
    let store = Arc::new(DeterministicSplitStore::new(ratios, 42)?);
    let mut sampler = TripletSampler::new(SamplerConfig::default(), store);
    // Pull from HF 70% of the time and local files 30% of the time.
    let mut weights = HashMap::new();
    weights.insert("hf_finance".to_string(), 0.7);
    weights.insert("docs".to_string(), 0.3);

    let batch = sampler.next_triplet_batch_with_weights(SplitLabel::Train, &weights)?;
    Ok(())
}
```

### Output Format
The sampler produces `SampleTriplet` values containing sampled text and associated metadata.

```rust,no_run
use std::sync::Arc;
use triplets::{SamplerConfig, TripletSampler, SplitRatios, DeterministicSplitStore, SplitLabel, Sampler};
let ratios = SplitRatios { train: 0.8, validation: 0.1, test: 0.1 };
let store = Arc::new(DeterministicSplitStore::new(ratios, 42).unwrap());
let mut sampler = TripletSampler::new(SamplerConfig::default(), store);
let batch = sampler.next_triplet_batch(SplitLabel::Train).unwrap();
for triplet in batch.triplets {
    // Primary content
    let anchor_text = &triplet.anchor.text;
    let pos_text    = &triplet.positive.text;
    let neg_text    = &triplet.negative.text;
    
    // Metadata
    let recipe      = &triplet.recipe;      // which recipe was used
    let weight      = triplet.weight;       // training weight
    let instruction = triplet.instruction;  // optional instruction string
}
```

## Epochs and Determinism

### Iterating Epochs
In a typical training loop, signal a new epoch so the sampler can reset cursors and reshuffle sources.

```rust,no_run
use std::sync::Arc;
use triplets::{SamplerConfig, TripletSampler, SplitRatios, DeterministicSplitStore, SplitLabel, Sampler};
fn main() -> Result<(), Box<dyn std::error::Error>> {
    let ratios = SplitRatios { train: 0.8, validation: 0.1, test: 0.1 };
    let store = Arc::new(DeterministicSplitStore::new(ratios, 42)?);
    let mut sampler = TripletSampler::new(SamplerConfig::default(), store);
    let mut batches_left = 1;
    let mut training_not_finished = || {
        let ret = batches_left > 0;
        batches_left -= 1;
        ret
    };
    // In your training loop:
    for epoch in 0..10 {
        sampler.set_epoch(epoch)?;

        while training_not_finished() {
            let batch = sampler.next_triplet_batch(SplitLabel::Train)?;
            // ... pass batch to your model ...
        }

        // Save state at the end of each epoch to allow resuming if training is interrupted.
        sampler.save_sampler_state(None)?;
    }

    Ok(())
}
```

### Deterministic Resuming
To resume training, initialize a `FileSplitStore` at the same path. The sampler automatically restores cursors, RNG state, and epoch progress from that store.

```rust,no_run
use std::sync::Arc;
use triplets::{SamplerConfig, TripletSampler, FileSplitStore, SplitRatios, Sampler};

fn main() -> Result<(), Box<dyn std::error::Error>> {
    let ratios = SplitRatios { train: 0.8, validation: 0.1, test: 0.1 };
    let seed = 42;

    // Opening an existing FileSplitStore automatically loads its persisted state.
    let store = Arc::new(FileSplitStore::open("checkpoints/splits.bin", ratios, seed)?);

    // The sampler will resume from the exact record and recipe it was on.
    let mut sampler = TripletSampler::new(SamplerConfig::default(), store);
    Ok(())
}
```

> **Note**: Sampler state is intentionally lightweight. It persists source identifiers, integer record cursors, and compact RNG state vectors, not full data records. This keeps frequent checkpointing practical in long-running training jobs.

## Technical Details

### Threading Model
Concurrency is handled at multiple levels for high throughput:
- **Prefetching**: `BatchPrefetcher` runs a dedicated background worker thread that fills a bounded queue.
- **Parallel Ingestion**: Source refresh executes concurrently across registered sources during ingestion cycles.
- **Synchronous API**: Sampling calls are synchronous at the API boundary for straightforward training-loop integration.
- **Thread-Safe Shared Use**: `TripletSampler` is safe to share across threads (for example via `Arc`); concurrent calls are internally synchronized with a mutex, so a single sampler instance is callable from multiple threads without data races.

### Chunking and Windows
Long documents are handled through a pluggable `ChunkingAlgorithm`. The default `SlidingWindowChunker` splits sections into fixed-size token windows with configurable overlap, preserving full coverage of long text.

### Negative Mining
Negative selection is delegated to a pluggable backend.
- **DefaultBackend**: Uniform random selection from the candidate pool.
- **Bm25Backend**: (Requires `bm25-mining`) Ranks candidates by lexical overlap with the anchor to provide harder training examples.

## Capabilities

| Capability              | Description                                                                   |
| ----------------------- | ----------------------------------------------------------------------------- |
| **Source Agnostic**     | Implement `DataSource` or `IndexableSource` for any DB or API.                |
| **Weighted Sampling**   | Tune source and recipe frequencies to handle class imbalance.                 |
| **Epoch Shuffling**     | Deterministic pseudo-random shuffling that re-permutes per epoch.             |
| **Instruction Tuning**  | Attach task-specific prompts (e.g., "Summarize this...") to specific recipes. |
| **Metadata Decorators** | Inject structured prefixes into sampled text via `KvpPrefixSampler`.          |
| **Anti-Shortcut**       | Includes anchor/positive swapping to avoid asymmetric slot bias.              |

## License

`triplets` is distributed under both the MIT license and the Apache License (Version 2.0).

See [LICENSE-APACHE](LICENSE-APACHE) and [LICENSE-MIT](LICENSE-MIT) for details.

[rust-src-page]: https://www.rust-lang.org/
[rust-logo]: https://img.shields.io/badge/Made%20with-Rust-black
[crates-page]: https://crates.io/crates/triplets
[crates-badge]: https://img.shields.io/crates/v/triplets.svg
[mit-license-page]: https://github.com/jzombie/rust-triplets/blob/main/LICENSE-MIT
[mit-license-badge]: https://img.shields.io/badge/license-MIT-blue.svg
[apache-2.0-license-page]: https://github.com/jzombie/rust-triplets/blob/main/LICENSE-APACHE
[apache-2.0-license-badge]: https://img.shields.io/badge/license-Apache%202.0-blue.svg