Skip to main content

rosalind/call/
columnkit.rs

1//! ColumnKit: implement one trait, inherit the bounded memory contract.
2//!
3//! A builder who wants their OWN per-locus metric — methylation, a coverage/QC
4//! track, custom ML features, a star-allele genotyper — would otherwise fork into
5//! the raw pileup engine and re-implement, by hand, every surface that makes
6//! Rosalind worth choosing: the whole-genome contig walk, the working-set bound
7//! that `plan`/`--enforce` admit, and the canonical-JSON + BLAKE3 receipt.
8//!
9//! [`ColumnAnalyzer`] + [`run_bounded_whole_genome`] turn "extend the substrate"
10//! into a first-class SDK: implement one trait, run it through the driver, and
11//! inherit the SAME bounded per-contig stream, the SAME working-set bound, and
12//! the SAME verifiable receipt the shipped `variants`/`features` subcommands
13//! enjoy — for free. The trait is *welded* to the bounded kernel: the driver runs
14//! the exact same column stream as `stream_features_whole_genome`, so the
15//! estimator that admits a run upper-bounds the realized working set of
16//! the builder's analyzer too. It is the contract made composable, not a feature
17//! bolted beside it. (`FeatureAnalyzer` is the first impl — proof the trait
18//! carries the real shipped analyzer, not a toy.)
19
20use std::collections::BTreeMap;
21use std::io::{self, Write};
22
23use crate::call::features::{stream_features_whole_genome, write_feature_row, FEATURE_HEADER};
24use crate::core::{ContigSet, CoreError, WorkingSet};
25use crate::genomics::ReferenceView;
26use crate::pileup::{PileupColumn, PileupParams, ReadSource, SkipCounts};
27
28/// A per-locus analyzer over the bounded pileup-column stream. Implement this and
29/// run it with [`run_bounded_whole_genome`] to inherit bounded memory,
30/// byte-identical determinism, and a verifiable receipt.
31pub trait ColumnAnalyzer {
32    /// Optional header written once, before any column (e.g. a TSV header line,
33    /// including its trailing newline). Default: none.
34    fn header(&self) -> Option<String> {
35        None
36    }
37
38    /// Parameters to fold into the run receipt (provenance). Default: none.
39    fn params(&self) -> BTreeMap<String, String> {
40        BTreeMap::new()
41    }
42
43    /// Process one callable pileup column, writing any output to `out`. Called in
44    /// `(contig, position)` order; `contig` is the column's contig name. Keep
45    /// per-call state O(1) (or bounded) to preserve the memory contract.
46    fn on_column(
47        &mut self,
48        col: &PileupColumn,
49        contig: &str,
50        out: &mut dyn Write,
51    ) -> io::Result<()>;
52}
53
54/// Drive `analyzer` over every callable column of a coordinate-sorted read stream
55/// and a persisted reference, under the bounded memory contract: peak ≈ the
56/// largest contig's reference + the pileup working set, independent of input
57/// size. Writes the analyzer's header (if any), then streams columns to it in
58/// `(contig, pos)` order. Returns the max pileup working set observed — the value
59/// `plan`/`--enforce` reason about — and the skip counts, so the analyzer
60/// inherits the receipt.
61pub fn run_bounded_whole_genome<S: ReadSource>(
62    analyzer: &mut dyn ColumnAnalyzer,
63    source: S,
64    ref_view: &ReferenceView,
65    contigs: &ContigSet,
66    pileup_params: PileupParams,
67    out: &mut dyn Write,
68) -> Result<(WorkingSet, SkipCounts), CoreError> {
69    if let Some(header) = analyzer.header() {
70        out.write_all(header.as_bytes())?;
71    }
72    stream_features_whole_genome(
73        source,
74        ref_view,
75        contigs,
76        pileup_params,
77        &mut |col, contig| {
78            analyzer
79                .on_column(col, contig, &mut *out)
80                .map_err(CoreError::from)
81        },
82    )
83}
84
85/// The shipped `features` egress, expressed as a [`ColumnAnalyzer`] — proof the
86/// trait carries the real production analyzer. Counts the rows it emits (folded
87/// into the receipt via [`ColumnAnalyzer::params`]).
88#[derive(Debug, Default)]
89pub struct FeatureAnalyzer {
90    rows: u64,
91}
92
93impl FeatureAnalyzer {
94    /// Number of feature rows written so far.
95    pub fn rows(&self) -> u64 {
96        self.rows
97    }
98}
99
100impl ColumnAnalyzer for FeatureAnalyzer {
101    fn header(&self) -> Option<String> {
102        Some(format!("{FEATURE_HEADER}\n"))
103    }
104
105    fn params(&self) -> BTreeMap<String, String> {
106        let mut p = BTreeMap::new();
107        p.insert("feature_rows".to_string(), self.rows.to_string());
108        p
109    }
110
111    fn on_column(
112        &mut self,
113        col: &PileupColumn,
114        contig: &str,
115        out: &mut dyn Write,
116    ) -> io::Result<()> {
117        write_feature_row(out, contig, col)?;
118        self.rows += 1;
119        Ok(())
120    }
121}
122
123#[cfg(test)]
124mod tests {
125    use super::*;
126    use crate::call::features::write_feature_header;
127    use crate::genomics::{GenomeIndex, IndexReader, IndexWriter};
128    use crate::pileup::SliceSource;
129    use std::sync::Arc;
130
131    fn tmp(suffix: &str) -> std::path::PathBuf {
132        let nanos = std::time::SystemTime::now()
133            .duration_since(std::time::UNIX_EPOCH)
134            .unwrap()
135            .as_nanos();
136        let d = std::env::temp_dir().join(format!("rosalind-ck-{suffix}-{nanos}"));
137        std::fs::create_dir_all(&d).unwrap();
138        d.join("ref.idx")
139    }
140
141    fn read_at(contig: u32, pos: u32, seq: &[u8]) -> crate::core::AlignedRead {
142        use crate::core::{CigarOp, CigarOpKind, Position, SamFlags};
143        crate::core::AlignedRead {
144            contig,
145            pos: Position(pos),
146            mapq: 60,
147            flags: SamFlags(0),
148            cigar: vec![CigarOp::new(CigarOpKind::Match, seq.len() as u32)],
149            seq: Arc::from(seq.to_vec().into_boxed_slice()),
150            qual: Arc::from(vec![40u8; seq.len()].into_boxed_slice()),
151        }
152    }
153
154    // Running FeatureAnalyzer through the driver must be byte-identical to the
155    // hand-wired features path (header + write_feature_row per column) — i.e. the
156    // trait IS the production path, not a parallel one.
157    #[test]
158    fn feature_analyzer_via_driver_equals_the_direct_features_path() {
159        let idx = tmp("equiv");
160        let index = GenomeIndex::from_named_sequences(&[
161            ("chr1".to_string(), b"ACGTACGTACGTACGTACGT".to_vec()),
162            ("chr2".to_string(), b"TTTTGGGGCCCCAAAATTTT".to_vec()),
163        ])
164        .unwrap();
165        IndexWriter::create(&idx)
166            .unwrap()
167            .write_genome_index(&index)
168            .unwrap();
169        let loaded = IndexReader::open(&idx).unwrap();
170        let rv = loaded.reference_view().unwrap();
171        let contigs = loaded.contigs();
172        let reads = vec![
173            read_at(0, 0, b"ACGTACGT"),
174            read_at(0, 0, b"ACGTACGT"),
175            read_at(1, 0, b"TTTTGGGG"),
176        ];
177        let pp = PileupParams::default();
178
179        // Direct path: header + stream_features_whole_genome + write_feature_row.
180        let mut direct: Vec<u8> = Vec::new();
181        write_feature_header(&mut direct).unwrap();
182        let (ws_direct, _) = stream_features_whole_genome(
183            SliceSource::new(reads.clone()),
184            &rv,
185            contigs,
186            pp.clone(),
187            &mut |col, name| write_feature_row(&mut direct, name, col).map_err(CoreError::from),
188        )
189        .unwrap();
190
191        // Driver path: FeatureAnalyzer through run_bounded_whole_genome.
192        let mut analyzer = FeatureAnalyzer::default();
193        let mut via_kit: Vec<u8> = Vec::new();
194        let (ws_kit, _) = run_bounded_whole_genome(
195            &mut analyzer,
196            SliceSource::new(reads),
197            &rv,
198            contigs,
199            pp,
200            &mut via_kit,
201        )
202        .unwrap();
203
204        assert_eq!(direct, via_kit, "driver output must be byte-identical");
205        assert_eq!(
206            ws_direct.bytes, ws_kit.bytes,
207            "driver must report the same bounded working set"
208        );
209        assert!(analyzer.rows() > 0, "analyzer should count its rows");
210        assert_eq!(
211            analyzer.params().get("feature_rows").map(String::as_str),
212            Some(analyzer.rows().to_string().as_str())
213        );
214
215        let _ = std::fs::remove_dir_all(idx.parent().unwrap());
216    }
217
218    // A tiny custom analyzer inherits the bounded walk: its working set is the
219    // same coverage-bounded value, independent of how many reads stream in.
220    #[test]
221    fn a_custom_analyzer_inherits_the_bounded_working_set() {
222        struct DepthTrack {
223            max_depth_seen: u32,
224        }
225        impl ColumnAnalyzer for DepthTrack {
226            fn on_column(
227                &mut self,
228                col: &PileupColumn,
229                _contig: &str,
230                _out: &mut dyn Write,
231            ) -> io::Result<()> {
232                self.max_depth_seen = self.max_depth_seen.max(col.depth());
233                Ok(())
234            }
235        }
236
237        let idx = tmp("custom");
238        let index =
239            GenomeIndex::from_named_sequences(&[("chr1".to_string(), vec![b'A'; 500])]).unwrap();
240        IndexWriter::create(&idx)
241            .unwrap()
242            .write_genome_index(&index)
243            .unwrap();
244        let loaded = IndexReader::open(&idx).unwrap();
245        let rv = loaded.reference_view().unwrap();
246        let contigs = loaded.contigs();
247        let pp = PileupParams {
248            max_depth: Some(8),
249            ..PileupParams::default()
250        };
251
252        let run = |n: usize| -> u64 {
253            let reads: Vec<_> = (0..n).map(|_| read_at(0, 0, &[b'C'; 50])).collect();
254            let mut a = DepthTrack { max_depth_seen: 0 };
255            let mut sink = io::sink();
256            run_bounded_whole_genome(
257                &mut a,
258                SliceSource::new(reads),
259                &rv,
260                contigs,
261                pp.clone(),
262                &mut sink,
263            )
264            .unwrap()
265            .0
266            .bytes
267        };
268        // Bounded: 100× more reads at one locus (capped at depth 8) → same WS.
269        assert_eq!(
270            run(20),
271            run(2000),
272            "analyzer working set must not grow with reads"
273        );
274
275        let _ = std::fs::remove_dir_all(idx.parent().unwrap());
276    }
277}