methylsieve 0.1.0

Fast per-template tagging and filtering of unconverted reads in bisulfite / EM-seq SAM/BAM files
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
//! methylsieve — the command-line application.
//!
//! This binary is the whole tool: CLI parsing ([`Args`]), end-to-end run
//! orchestration ([`run`]), and the end-of-run resource-usage footer. The
//! per-template tagging engine and IO live in sibling modules (`sieve`,
//! `reference`, the readers/writers, `stats`, …) — this file wires them
//! together and talks to the user.
//!
//! Long-form flags follow GNU style (`--kebab-case`). Short flags mirror the
//! conventions of sibling tools: `-i` input, `-o` output, `-r` reference,
//! `-q` quiet.

mod io_threading;
mod raw_reader;
mod raw_writer;
mod reference;
mod sam_reader;
mod sieve;
mod stats;

use std::collections::HashMap;
use std::fs::File;
use std::io::{BufRead, Write};
use std::path::PathBuf;
use std::process::ExitCode;
use std::time::Instant;

use anyhow::{Context as _, Result, bail};
use bgzf::CompressionLevel;
use clap::{Parser, ValueEnum};
use fgumi_raw_bam::RawRecord;
use noodles_sam::Header;
use noodles_sam::header::record::value::Map;
use noodles_sam::header::record::value::map::Program;
use noodles_sam::header::record::value::map::program::tag as program_tag;

use crate::raw_reader::RawBamReader;
use crate::raw_writer::RawBamWriter;
use crate::reference::{Context, RefEncoding, Reference};
use crate::sam_reader::SamReader;
use crate::sieve::{DecisionMode, ProcessorOptions, RecordProcessor, Stats};

/// Crate build identifier shown in `--version` and the `@PG VN:` tag.
const METHYLSIEVE_BUILD: &str = env!("CARGO_PKG_VERSION");

/// Global allocator — mimalloc keeps the per-block temporary allocations that
/// dominate the streaming worker cheap.
#[global_allocator]
static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc;

// ── Command-line interface ──────────────────────────────────────────────────

/// CLI mirror of [`RefEncoding`] so the kebab-case spellings live with the CLI.
#[derive(Copy, Clone, Debug, PartialEq, Eq, ValueEnum)]
pub(crate) enum RefEncodingCli {
    /// 1 byte per base — fastest, ~3.1 GB for a human genome.
    #[value(name = "bytes")]
    Bytes,
    /// 4-bit packed, 2 bases/byte — ~½ the memory.
    #[value(name = "nibble")]
    Nibble,
    /// 2-bit packed, 4 bases/byte — ~¼ the memory; non-ACGT folded to A.
    #[value(name = "twobit")]
    TwoBit,
}

impl From<RefEncodingCli> for RefEncoding {
    fn from(c: RefEncodingCli) -> Self {
        match c {
            RefEncodingCli::Bytes => RefEncoding::Bytes,
            RefEncodingCli::Nibble => RefEncoding::Nibble,
            RefEncodingCli::TwoBit => RefEncoding::TwoBit,
        }
    }
}

/// CLI mirror of [`DecisionMode`] so the kebab-case spellings and per-variant
/// help live with the CLI.
#[derive(Copy, Clone, Debug, PartialEq, Eq, ValueEnum)]
pub(crate) enum DecisionModeCli {
    /// Count only: flag when the unconverted count reaches
    /// `--max-unconverted-count`. `--min-sites` and the fraction are ignored.
    #[value(name = "count")]
    Count,
    /// Proportion only: flag when a template has at least `--min-sites` sites
    /// AND its unconverted fraction exceeds `--max-unconverted-fraction`.
    /// Templates with fewer than `--min-sites` sites are NEVER flagged — the
    /// proportion can't be estimated, so they pass through unevaluated.
    #[value(name = "proportion")]
    Proportion,
    /// Either: flag when the count OR the proportion test fires.
    #[value(name = "either")]
    Either,
    /// (Default) Adaptive: use the proportion test at/above `--min-sites` and
    /// the count test below it. Low-site templates are still evaluated (by
    /// count), while high-site templates are judged on rate — avoiding an
    /// absolute count that over-penalizes long reads / read pairs.
    #[value(name = "adaptive")]
    Adaptive,
}

impl From<DecisionModeCli> for DecisionMode {
    fn from(c: DecisionModeCli) -> Self {
        match c {
            DecisionModeCli::Count => DecisionMode::Count,
            DecisionModeCli::Proportion => DecisionMode::Proportion,
            DecisionModeCli::Either => DecisionMode::Either,
            DecisionModeCli::Adaptive => DecisionMode::Adaptive,
        }
    }
}

/// Parse a `--compression-level` argument, delegating range validation to
/// `bgzf::CompressionLevel::new` (0 = stored, 1-9 zlib, 10-12 libdeflate).
fn parse_compression_level(s: &str) -> Result<CompressionLevel, String> {
    let n: u8 = s.parse().map_err(|e| format!("not a u8: {e}"))?;
    CompressionLevel::new(n).map_err(|e| format!("{e}"))
}

/// The subset of the four contexts (CpA/CpC/CpG/CpT) that count toward the
/// unconverted-decision threshold. Stored as a presence flag per context,
/// indexed by [`Context::index`].
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) struct ContextMask([bool; 4]);

impl ContextMask {
    /// Whether `ctx` participates in the threshold.
    #[inline]
    #[must_use]
    pub(crate) fn contains(self, ctx: Context) -> bool {
        self.0[ctx.index()]
    }
}

/// Parse a comma-separated context list (e.g. `CpA,CpC,CpT`). Token matching is
/// case-insensitive and accepts both `CpA` and `CA` spellings.
pub(crate) fn parse_contexts(s: &str) -> Result<ContextMask, String> {
    let mut mask = [false; 4];
    let mut any = false;
    for tok in s.split(',') {
        let t = tok.trim().to_ascii_uppercase();
        let ctx = match t.as_str() {
            "CPA" | "CA" => Context::CpA,
            "CPC" | "CC" => Context::CpC,
            "CPG" | "CG" => Context::CpG,
            "CPT" | "CT" => Context::CpT,
            "" => continue,
            other => {
                return Err(format!(
                    "unknown context '{other}'; valid contexts are CpA, CpC, CpG, CpT"
                ));
            }
        };
        mask[ctx.index()] = true;
        any = true;
    }
    if !any {
        return Err("at least one context must be specified".to_string());
    }
    Ok(ContextMask(mask))
}

/// A parsed aux-tag specification (`--tag`). Only string (`Z`) tags are
/// supported in v1.
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) struct TagSpec {
    /// The two-byte aux tag (e.g. `XX`).
    pub(crate) tag: [u8; 2],
    /// The string value to set (e.g. `UC`).
    pub(crate) value: Vec<u8>,
}

/// Parse a two-character aux-tag name (e.g. `ch`) for `--count-tag`.
fn parse_tag_name(s: &str) -> Result<[u8; 2], String> {
    let b = s.as_bytes();
    if b.len() != 2 || !b.iter().all(u8::is_ascii_alphanumeric) {
        return Err(format!("tag name '{s}' must be exactly two alphanumeric characters"));
    }
    Ok([b[0], b[1]])
}

/// Parse a SAM-style `TAG:Z:VALUE` spec (e.g. `XX:Z:UC`). Only the `Z`
/// (string) type is accepted.
fn parse_tag_spec(s: &str) -> Result<TagSpec, String> {
    let parts: Vec<&str> = s.splitn(3, ':').collect();
    if parts.len() != 3 {
        return Err(format!("tag spec '{s}' must be of the form TAG:Z:VALUE (e.g. XX:Z:UC)"));
    }
    let tag_bytes = parts[0].as_bytes();
    if tag_bytes.len() != 2 || !tag_bytes.iter().all(u8::is_ascii_alphanumeric) {
        return Err(format!("tag '{}' must be exactly two alphanumeric characters", parts[0]));
    }
    if !parts[1].eq_ignore_ascii_case("Z") {
        return Err(format!(
            "only string (Z) tags are supported; got type '{}' in '{s}'",
            parts[1]
        ));
    }
    if parts[2].is_empty() {
        return Err(format!("tag spec '{s}' has an empty value"));
    }
    Ok(TagSpec { tag: [tag_bytes[0], tag_bytes[1]], value: parts[2].as_bytes().to_vec() })
}

/// Parsed command-line arguments — see `--help` for per-flag descriptions.
#[derive(Parser, Debug, Clone)]
#[command(name = "methylsieve", version = METHYLSIEVE_BUILD, about = SHORT_ABOUT, long_about = LONG_ABOUT)]
pub struct Args {
    /// Input SAM/BAM file [stdin]. Must be query-grouped (all records for one
    /// QNAME adjacent, typically straight from the aligner).
    #[arg(short = 'i', long = "input", help_heading = "Inputs / outputs")]
    pub(crate) input: Option<PathBuf>,

    /// Output BAM file [stdout]. The path must end in `.bam` — output is always
    /// BAM, never SAM. Use `-` for stdout (no extension check).
    #[arg(short = 'o', long = "output", help_heading = "Inputs / outputs")]
    pub(crate) output: Option<PathBuf>,

    /// BGZF compression level for the output BAM (0-9 typical, up to 12 for
    /// libdeflate's strongest tier). Level 0 (the default) emits "stored" BGZF
    /// blocks — a downstream sort will recompress.
    #[arg(long = "compression-level", default_value = "0", value_parser = parse_compression_level,
          help_heading = "Inputs / outputs")]
    pub(crate) compression_level: CompressionLevel,

    /// Reference FASTA (a sibling `.fai` index is required). The FASTA may be a
    /// superset of the BAM's contigs; every BAM `@SQ` must be present with a
    /// matching length. NOTE: every referenced contig is loaded into memory
    /// (~3.1 GB for a human genome), since query-grouped input touches any
    /// contig at any time.
    #[arg(short = 'r', long = "reference", help_heading = "Inputs / outputs")]
    pub(crate) reference: PathBuf,

    /// In-memory reference encoding. `twobit` (2-bit, the default) holds the
    /// genome in ~¼ the memory (~0.8 GB vs ~3.1 GB for a human genome); its
    /// small CPU cost is hidden when methylsieve is input-rate-limited in a
    /// pipe. It folds non-ACGT reference bases (N, IUPAC ambiguity) to A: this
    /// never changes a conversion call (only genuine C/G positions are
    /// monitored, and those are represented exactly) — it can only relabel the
    /// CpH/CpG context of a monitored C/G immediately adjacent to a former-N
    /// (assembly-gap edges; below measurement noise). Use `bytes` (1 byte/base,
    /// fastest) for a single max-throughput stream that isn't rate-limited, or
    /// `nibble` (4-bit, ~½ memory) for bit-identical context labeling.
    #[arg(long = "ref-encoding", value_enum, default_value_t = RefEncodingCli::TwoBit,
          help_heading = "Inputs / outputs")]
    pub(crate) ref_encoding: RefEncodingCli,

    /// Contexts (comma-separated CpA,CpC,CpT,CpG) counted toward the
    /// unconverted-decision threshold. The default is CpH (CpA,CpC,CpT); CpG is
    /// excluded because genuine methylation lives there. All four contexts are
    /// always reported in `--stats` regardless of this subset.
    #[arg(long = "contexts", default_value = "CpA,CpC,CpT", value_parser = parse_contexts,
          help_heading = "Decision")]
    pub(crate) contexts: ContextMask,

    /// How to combine the count and proportion tests. `adaptive` (the default)
    /// uses the proportion at/above `--min-sites` and the count below it; `count`
    /// and `proportion` use only that one test; `either` flags when either fires.
    /// In `proportion` mode, templates with fewer than `--min-sites` sites are
    /// never flagged (the proportion is unestimable) — they pass through.
    #[arg(long = "mode", value_enum, default_value_t = DecisionModeCli::Adaptive,
          help_heading = "Decision")]
    pub(crate) mode: DecisionModeCli,

    /// Count test: a template is called unconverted when its count of
    /// unconverted cytosines (summed over the `--contexts` subset, across all
    /// evaluated records) is at least this value. Used by `count`, `either`, and
    /// (below `--min-sites`) `adaptive`.
    #[arg(long = "max-unconverted-count", default_value_t = 3, help_heading = "Decision")]
    pub(crate) max_unconverted_count: u32,

    /// Proportion test: a template is called unconverted when it has at least
    /// `--min-sites` sites AND its unconverted fraction exceeds this value. Used
    /// by `proportion`, `either`, and (at/above `--min-sites`) `adaptive`;
    /// ignored by `count`. Must be in (0, 1].
    #[arg(long = "max-unconverted-fraction", default_value_t = 0.05, help_heading = "Decision")]
    pub(crate) max_unconverted_fraction: f64,

    /// Floor for the proportion test (in `--contexts`-subset sites): below it the
    /// proportion is unestimable and abstains. In `adaptive` this is also the
    /// switch point — proportion at/above it, count below. The default (40) is
    /// the smallest value that makes the `adaptive` switch continuous given
    /// count=3 and fraction=0.05 (so a read doesn't flip its call as it crosses
    /// the threshold).
    #[arg(long = "min-sites", default_value_t = 40, help_heading = "Decision")]
    pub(crate) min_sites: u32,

    /// Skip read bases with base quality below this value when tallying. Higher
    /// values reduce sequencing-error-driven false unconverted calls but shrink
    /// the site count (so fewer templates clear `--min-sites`).
    #[arg(long = "min-base-quality", default_value_t = 20, help_heading = "Decision")]
    pub(crate) min_base_quality: u8,

    /// Ignore the outermost N bases at each end of the *template* (the original
    /// DNA fragment) when tallying — the bases prone to end-repair fill-in and
    /// A-tailing artifacts. For a mapped pair these are the 5' sequenced ends of
    /// R1 and R2 (the two fragment termini), whose reference positions are
    /// skipped in *both* mates (so an overlapped terminus is trimmed once, in
    /// each read that covers it). For single-end or orphan reads the far end is
    /// unknown, so both ends of the read are trimmed instead. Counted over the
    /// stored SEQ in sequencing order, so 5'-end soft-clips count toward N;
    /// hard-clipped bases are absent from SEQ and do not. Default 0 (off).
    #[arg(long = "ignore-template-ends", default_value_t = 0, help_heading = "Decision")]
    pub(crate) ignore_template_ends: u32,

    /// Exclude supplementary alignments from conversion tallying (they still
    /// receive the tag / qc-fail flag like every other record of an unconverted
    /// template). By default supplementaries contribute evidence.
    #[arg(long = "ignore-supplementary-evidence", help_heading = "Decision")]
    pub(crate) ignore_supplementary_evidence: bool,

    /// Aux tag to set on every record of an unconverted template, as a
    /// SAM-style `TAG:Z:VALUE` spec.
    #[arg(long = "tag", default_value = "XX:Z:UC", value_parser = parse_tag_spec,
          help_heading = "Actions on unconverted templates")]
    pub(crate) tag: TagSpec,

    /// Do not OR the QC-fail flag (0x200) into unconverted records' FLAG.
    /// (QC-fail marking is on by default.)
    #[arg(long = "no-qc-fail", help_heading = "Actions on unconverted templates")]
    pub(crate) no_qc_fail: bool,

    /// Drop every record of an unconverted template from the output entirely,
    /// instead of just tagging/flagging it.
    #[arg(long = "remove-unconverted", help_heading = "Actions on unconverted templates")]
    pub(crate) remove_unconverted: bool,

    /// Spike-in control contig (repeatable). Reads whose primary R1 maps here
    /// are excluded from the main decision, never tagged, and tallied into a
    /// separate `--stats` row.
    #[arg(long = "control-contig", help_heading = "Spike-in controls")]
    pub(crate) control_contig: Vec<String>,

    /// Write a multi-row TSV of run stats to PATH (one row for the `genome`
    /// scope plus one per `--control-contig`).
    #[arg(long = "stats", help_heading = "Stats & misc")]
    pub(crate) stats: Option<PathBuf>,

    /// Write the per-template decision histogram to PATH: one row per observed
    /// `(checked_sites, unconverted_sites)` cell over the decision contexts,
    /// with the template count and the cell's verdict (`decision`,
    /// `decided_by`).
    #[arg(long = "conversion-matrix", help_heading = "Stats & misc")]
    pub(crate) conversion_matrix: Option<PathBuf>,

    /// Sample name for the `sample` column of `--stats`. If omitted, the unique
    /// `@RG SM:` values from the header are comma-joined.
    #[arg(long = "sample", help_heading = "Stats & misc")]
    pub(crate) sample: Option<String>,

    /// Aux tag (2 characters) for the per-record count annotation: `<TAG>:Z:u/n`,
    /// where u is the unconverted count and n the total monitored sites in the
    /// `--contexts` subset — the exact numerator/denominator of the decision. It
    /// is a per-TEMPLATE aggregate stamped on every record of the template (not a
    /// per-read count), so a user can see why any read was (or wasn't) flagged.
    /// On by default with tag `ch`; pass a name to rename, or `--no-count-tag`
    /// to disable.
    #[arg(long = "count-tag", value_parser = parse_tag_name, default_value = "ch",
          conflicts_with = "no_count_tag", help_heading = "Stats & misc")]
    pub(crate) count_tag: [u8; 2],

    /// Disable the per-record count annotation (see `--count-tag`).
    #[arg(long = "no-count-tag", help_heading = "Stats & misc")]
    pub(crate) no_count_tag: bool,

    /// Output fewer statistics.
    #[arg(short = 'q', long = "quiet", help_heading = "Stats & misc")]
    pub(crate) quiet: bool,

    /// Verify BGZF CRC32 on input. Default: on for file input, off for stdin.
    #[arg(long = "check-crc", conflicts_with = "no_check_crc", help_heading = "Stats & misc")]
    pub(crate) check_crc: bool,

    /// Skip BGZF CRC32 verification on input regardless of source.
    #[arg(long = "no-check-crc", conflicts_with = "check_crc", help_heading = "Stats & misc")]
    pub(crate) no_check_crc: bool,

    /// Size (MB) of the ring buffer between the input IO thread and the worker.
    #[arg(long = "read-buffer-mb", default_value_t = 16, value_parser = clap::value_parser!(u32).range(1..=4096),
          help_heading = "Stats & misc")]
    pub(crate) read_buffer_mb: u32,

    /// Size (MB) of the ring buffer between the worker and the output IO thread.
    #[arg(long = "write-buffer-mb", default_value_t = 64, value_parser = clap::value_parser!(u32).range(1..=4096),
          help_heading = "Stats & misc")]
    pub(crate) write_buffer_mb: u32,
}

const SHORT_ABOUT: &str = "Tag or filter unconverted reads in bisulfite / EM-seq SAM/BAM files.";
const LONG_ABOUT: &str = "Tag or filter incompletely-converted reads in directional bisulfite or \
EM-seq data.\n\
Makes one per-template decision using all of a QNAME's primary and supplementary records, \
propagates it to every record, and emits a per-context / per-spike-in conversion-rate TSV. \
Input must be query-grouped; output is always BAM.";

impl Args {
    /// Resolved CRC-verify setting: explicit flags win; otherwise on for file
    /// input and off for stdin (trusted producer, e.g. piped from bwa-meth).
    #[must_use]
    pub(crate) fn effective_check_crc(&self) -> bool {
        if self.check_crc {
            return true;
        }
        if self.no_check_crc {
            return false;
        }
        matches!(self.input.as_deref(), Some(p) if p.to_string_lossy() != "-")
    }

    /// Whether unconverted templates should be QC-fail flagged (0x200).
    #[must_use]
    pub(crate) fn qc_fail(&self) -> bool {
        !self.no_qc_fail
    }

    /// Resolved per-record count tag: `None` if disabled, else the tag name.
    #[must_use]
    pub(crate) fn effective_count_tag(&self) -> Option<[u8; 2]> {
        if self.no_count_tag { None } else { Some(self.count_tag) }
    }

    /// Validate invariants clap can't express directly.
    ///
    /// # Errors
    /// Returns an error if `-o` is not `-`/`.bam` or `--max-unconverted-fraction`
    /// is out of `(0, 1]`.
    pub(crate) fn validate(&self) -> Result<()> {
        if let Some(p) = &self.output {
            let s = p.to_string_lossy();
            if s != "-" && !s.ends_with(".bam") {
                bail!(
                    "output path {} must end in `.bam` (methylsieve only writes BAM); \
                     use `-` to send BAM to stdout",
                    p.display()
                );
            }
        }
        // At most one output stream may target stdout, or they would interleave
        // and corrupt one another (the BAM in particular). The BAM goes to stdout
        // by default when `-o` is omitted, or with `-o -`; `--stats` and
        // `--conversion-matrix` each take `-` for stdout.
        let mut stdout_streams: Vec<&str> = Vec::new();
        let bam_to_stdout = match self.output.as_deref() {
            None => true,
            Some(p) => p.to_string_lossy() == "-",
        };
        if bam_to_stdout {
            stdout_streams.push("the BAM output");
        }
        if matches!(self.stats.as_deref(), Some(p) if p.to_string_lossy() == "-") {
            stdout_streams.push("--stats");
        }
        if matches!(self.conversion_matrix.as_deref(), Some(p) if p.to_string_lossy() == "-") {
            stdout_streams.push("--conversion-matrix");
        }
        if stdout_streams.len() > 1 {
            bail!(
                "{} are all directed to stdout; at most one stream may use stdout, or \
                 they would interleave. The BAM goes to stdout by default — send it to a \
                 file with `-o out.bam`, or write --stats / --conversion-matrix to a path.",
                stdout_streams.join(" and ")
            );
        }
        if !(self.max_unconverted_fraction > 0.0 && self.max_unconverted_fraction <= 1.0) {
            bail!(
                "--max-unconverted-fraction must be in (0, 1]; got {}",
                self.max_unconverted_fraction
            );
        }
        if self.max_unconverted_count == 0 {
            bail!(
                "--max-unconverted-count must be >= 1 (a threshold of 0 would tag every \
                 template). Use --max-unconverted-fraction for fraction-based filtering."
            );
        }
        Ok(())
    }
}

// ── Binary entry point ──────────────────────────────────────────────────────

fn main() -> ExitCode {
    env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info"))
        .format_timestamp(None)
        .init();

    let args = Args::parse();
    match run(args) {
        Ok(()) => ExitCode::SUCCESS,
        Err(err) => {
            eprintln!("methylsieve: {err:#}");
            eprintln!("methylsieve: Premature exit (return code 1).");
            ExitCode::FAILURE
        }
    }
}

// ── Run orchestration ───────────────────────────────────────────────────────

/// Input format auto-detected from the first byte.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum Format {
    Sam,
    Bam,
}

/// Peek the first byte to determine SAM (`@`) vs BAM (BGZF magic `0x1f`).
fn detect_format<R: BufRead>(reader: &mut R) -> Result<Format> {
    let head = reader.fill_buf().context("reading input to detect format")?;
    if head.is_empty() {
        bail!("Empty input: no SAM/BAM header detected");
    }
    match head[0] {
        0x1f => Ok(Format::Bam),
        b'@' => Ok(Format::Sam),
        b => bail!(
            "Input doesn't look like SAM or BAM (first byte 0x{:02x}); expected '@' or 0x1f",
            b
        ),
    }
}

/// Unified reader enum so the main loop is format-agnostic.
enum Reader {
    Bam(RawBamReader<Box<dyn BufRead>>),
    Sam(SamReader<Box<dyn BufRead>>),
}

impl Reader {
    fn read_header(&mut self) -> Result<Header> {
        match self {
            Reader::Bam(r) => r.read_header().context("reading BAM header"),
            Reader::Sam(r) => r.read_header().context("reading SAM header"),
        }
    }

    fn read_record(&mut self, rec: &mut RawRecord) -> std::io::Result<bool> {
        match self {
            Reader::Bam(r) => r.read_record(rec),
            Reader::Sam(r) => r.read_record(rec),
        }
    }
}

/// Main entry point used by both the binary and the test harness.
///
/// # Errors
/// Returns an error on IO failure, malformed input, a missing/mismatched
/// reference contig, or an unknown `--control-contig`.
fn run(args: Args) -> Result<()> {
    log::info!("methylsieve by Fulcrum Genomics - https://github.com/fulcrumgenomics/methylsieve");
    args.validate()?;
    let started = StartedRun::now();
    if !args.quiet {
        eprintln!("methylsieve: Version {METHYLSIEVE_BUILD}");
    }

    // Input goes through a dedicated IO read thread + ring buffer so the worker
    // never blocks on the kernel pipe.
    let raw_source: Box<dyn std::io::Read + Send> = match args.input.as_deref() {
        Some(p) if p.to_string_lossy() != "-" => {
            let f = File::open(p).with_context(|| format!("opening {} for read", p.display()))?;
            Box::new(f)
        }
        _ => Box::new(std::io::stdin()),
    };
    let read_buf_bytes = (args.read_buffer_mb as usize).saturating_mul(1024 * 1024);
    let mut reader_box: Box<dyn BufRead> =
        Box::new(crate::io_threading::ThreadedReader::new(raw_source, read_buf_bytes));
    let input_name =
        args.input.as_deref().map(|p| p.display().to_string()).unwrap_or_else(|| "stdin".into());
    let input_format = detect_format(&mut reader_box)?;
    let check_crc = args.effective_check_crc();
    let mut reader: Reader = match input_format {
        Format::Bam => {
            if !args.quiet {
                eprintln!(
                    "methylsieve: Reading BAM from {input_name} (CRC verify: {}).",
                    if check_crc { "on" } else { "off" }
                );
            }
            Reader::Bam(RawBamReader::new(reader_box, check_crc))
        }
        Format::Sam => {
            if !args.quiet {
                eprintln!("methylsieve: Reading SAM from {input_name}.");
            }
            Reader::Sam(SamReader::new(reader_box))
        }
    };

    let mut header = reader.read_header()?;
    if header.reference_sequences().is_empty() {
        bail!("Input has no @SQ reference sequences. Exiting.");
    }
    if !args.quiet {
        eprintln!(
            "methylsieve: Loaded {} header sequence entries.",
            header.reference_sequences().len()
        );
    }

    // Load the reference (every @SQ contig, 4-bit encoded) and resolve control
    // contigs to a per-tid scope map. Both cross-check against the header.
    let reference = Reference::load(&args.reference, &header, args.ref_encoding.into())
        .with_context(|| format!("loading reference {}", args.reference.display()))?;
    if !args.quiet {
        eprintln!("methylsieve: Loaded reference {}.", args.reference.display());
    }
    let (scope_of_tid, control_names) = resolve_control_scopes(&header, &args.control_contig)?;

    append_methylsieve_pg(&mut header)?;

    let write_buf_bytes = (args.write_buffer_mb as usize).saturating_mul(1024 * 1024);
    let mut out = RawBamWriter::open(
        args.output.as_deref(),
        &header,
        write_buf_bytes,
        args.compression_level,
    )?;

    let opts = ProcessorOptions {
        contexts: args.contexts,
        mode: args.mode.into(),
        max_unconverted_count: args.max_unconverted_count,
        max_unconverted_fraction: args.max_unconverted_fraction,
        min_sites: args.min_sites,
        min_base_quality: args.min_base_quality,
        ignore_template_ends: args.ignore_template_ends,
        ignore_supplementary_evidence: args.ignore_supplementary_evidence,
        tag: args.tag.clone(),
        count_tag: args.effective_count_tag(),
        qc_fail: args.qc_fail(),
        remove_unconverted: args.remove_unconverted,
        scope_of_tid,
        record_matrix: args.conversion_matrix.is_some(),
    };
    let processor = RecordProcessor::new(reference, opts);
    let mut stats = Stats::new(&control_names);

    // methylsieve requires query-grouped input: records are read in maximal
    // runs sharing a QNAME ("blocks") and each block is processed as a unit.
    let mut pool: Vec<RawRecord> = Vec::with_capacity(8);
    for_each_block(&mut reader, &mut pool, |block| {
        processor.process_block(block, &mut stats, &mut out)
    })
    .context("processing record block")?;

    print_run_stats(&stats, &args);
    warn_proportion_blind_spot(&stats, &args);

    if let Some(stats_path) = args.stats.as_deref() {
        crate::stats::write_to_path(stats_path, &stats, &header, args.sample.as_deref())
            .context("writing --stats TSV")?;
    }

    if let Some(matrix_path) = args.conversion_matrix.as_deref() {
        crate::stats::write_matrix_to_path(
            matrix_path,
            &stats,
            &header,
            args.sample.as_deref(),
            |unconv, monitored| processor.classify(unconv, monitored),
        )
        .context("writing --conversion-matrix TSV")?;
    }

    out.finish().context("finishing main output")?;
    report(&started, stats.total_templates, args.quiet);
    Ok(())
}

/// Resolve `--control-contig` names against the header into a per-tid scope map
/// (`None` → genome, `Some(i)` → `controls[i]`) plus the ordered control names.
///
/// # Errors
/// Returns an error if a named control contig is absent from the header.
fn resolve_control_scopes(
    header: &Header,
    control_contigs: &[String],
) -> Result<(Vec<Option<usize>>, Vec<String>)> {
    let n = header.reference_sequences().len();
    let name_to_tid: HashMap<String, usize> = header
        .reference_sequences()
        .keys()
        .enumerate()
        .map(|(tid, name)| (String::from_utf8_lossy(name.as_ref()).into_owned(), tid))
        .collect();

    let mut scope_of_tid = vec![None; n];
    let mut control_names = Vec::with_capacity(control_contigs.len());
    for (ci, name) in control_contigs.iter().enumerate() {
        let tid = *name_to_tid.get(name).ok_or_else(|| {
            anyhow::anyhow!(
                "--control-contig '{name}' is not present in the input @SQ header lines."
            )
        })?;
        scope_of_tid[tid] = Some(ci);
        control_names.push(name.clone());
    }
    Ok((scope_of_tid, control_names))
}

/// Drive the QNAME-block grouping loop over `reader`, invoking `on_block` for
/// each maximal run of records sharing a QNAME. `pool` allocations are reused
/// across blocks.
fn for_each_block(
    reader: &mut Reader,
    pool: &mut Vec<RawRecord>,
    mut on_block: impl FnMut(&mut [RawRecord]) -> Result<()>,
) -> Result<()> {
    if pool.is_empty() {
        pool.push(RawRecord::new());
    }
    let mut block_len: usize = 0;
    let mut current_qname: Vec<u8> = Vec::new();
    loop {
        if pool.len() == block_len {
            pool.push(RawRecord::new());
        }
        let read_idx = block_len;
        let got = reader.read_record(&mut pool[read_idx]).context("reading record")?;
        if !got {
            break;
        }
        let new_qname_differs =
            block_len > 0 && pool[read_idx].read_name() != current_qname.as_slice();
        if new_qname_differs {
            on_block(&mut pool[..block_len])?;
            if read_idx != 0 {
                pool.swap(0, read_idx);
            }
            block_len = 1;
            current_qname.clear();
            current_qname.extend_from_slice(pool[0].read_name());
        } else {
            if block_len == 0 {
                current_qname.clear();
                current_qname.extend_from_slice(pool[0].read_name());
            }
            block_len += 1;
        }
    }
    if block_len > 0 {
        on_block(&mut pool[..block_len])?;
    }
    Ok(())
}

/// Append methylsieve's `@PG` line to the header. noodles auto-chains via `PP:`.
fn append_methylsieve_pg(header: &mut Header) -> Result<()> {
    // Defensive: a broken PP chain makes noodles' `programs.add` panic; convert
    // that into a clean error.
    let programs = header.programs().as_ref();
    let known_ids: std::collections::HashSet<&[u8]> =
        programs.keys().map(|k| k.as_slice()).collect();
    for (id, map) in programs.iter() {
        if let Some(pp) = map.other_fields().get(&program_tag::PREVIOUS_PROGRAM_ID) {
            let pp_bytes = pp.as_ref();
            if !known_ids.contains(pp_bytes) {
                bail!(
                    "input header @PG ID:{} has PP:{} but no @PG with that ID exists. \
                     Strip the broken PP tag (e.g. via `samtools reheader`) before re-running.",
                    String::from_utf8_lossy(id.as_slice()),
                    String::from_utf8_lossy(pp_bytes),
                );
            }
        }
    }

    let cl = command_line_for_pg();
    let mut map = Map::<Program>::default();
    map.other_fields_mut().insert(program_tag::NAME, "methylsieve".into());
    map.other_fields_mut().insert(program_tag::VERSION, METHYLSIEVE_BUILD.into());
    map.other_fields_mut().insert(program_tag::COMMAND_LINE, cl.into());
    header.programs_mut().add("methylsieve", map).context("appending @PG methylsieve record")?;
    Ok(())
}

/// Build the `@PG CL:` string from `std::env::args`, with `argv[0]` reduced to
/// its basename.
fn command_line_for_pg() -> String {
    let mut args = std::env::args();
    let prog = args
        .next()
        .map(|a| {
            std::path::Path::new(&a)
                .file_name()
                .map(|s| s.to_string_lossy().into_owned())
                .unwrap_or(a)
        })
        .unwrap_or_else(|| "methylsieve".to_string());
    let rest: Vec<String> = args.collect();
    if rest.is_empty() { prog } else { format!("{prog} {}", rest.join(" ")) }
}

fn print_run_stats(stats: &Stats, args: &Args) {
    if stats.total_templates == 0 {
        eprintln!("methylsieve: No reads processed.");
        return;
    }
    let g = &stats.genome;
    let verb = if args.remove_unconverted { "Removed" } else { "Tagged " };
    let pct =
        if g.n_evaluated > 0 { 100.0 * g.n_unconverted as f64 / g.n_evaluated as f64 } else { 0.0 };
    eprintln!(
        "methylsieve: {} {:>10} of {:>10} ({:5.3}%) evaluated genome templates as unconverted.",
        verb, g.n_unconverted, g.n_evaluated, pct
    );
    if stats.unmapped_templates > 0 || stats.zero_site_templates > 0 {
        eprintln!(
            "methylsieve: {} templates had no mapped primary; {} produced no monitored sites.",
            stats.unmapped_templates, stats.zero_site_templates
        );
    }
}

/// In `proportion` mode, warn about the blind spot: templates below `--min-sites`
/// can't be evaluated by the proportion test and pass through unflagged. Other
/// modes cover those templates with the count test, so no warning is needed.
fn warn_proportion_blind_spot(stats: &Stats, args: &Args) {
    if args.quiet || args.mode != DecisionModeCli::Proportion {
        return;
    }
    if stats.below_min_sites_templates > 0 {
        eprintln!(
            "methylsieve: WARNING proportion mode — {} templates had fewer than {} sites and \
             were NOT evaluated (passed through). Use --mode adaptive or either to catch them \
             with the count test.",
            stats.below_min_sites_templates, args.min_sites
        );
    }
}

// ── End-of-run resource-usage footer ────────────────────────────────────────
//
// Mirrors the C++ samblaster footer that prints memory + timing after the
// summary. Suppressed by `--quiet`. CPU and max-RSS reads use
// `getrusage(RUSAGE_SELF)` and are Unix-only — on other platforms only wall
// time is reported.

/// Captures a starting wall-clock timestamp; pair with [`report`] at end.
struct StartedRun {
    /// Snapshot of `Instant::now()` at process start.
    wall_start: Instant,
}

impl StartedRun {
    /// Snapshot the current wall-clock timestamp.
    fn now() -> Self {
        Self { wall_start: Instant::now() }
    }
}

/// Print a single-line resource-usage footer to stderr. No-op if `quiet`.
fn report(started: &StartedRun, n_templates: u64, quiet: bool) {
    if quiet {
        return;
    }
    let wall = started.wall_start.elapsed().as_secs_f64();
    let stderr = std::io::stderr();
    let mut stderr = stderr.lock();

    #[cfg(unix)]
    if let Some(ru) = read_rusage() {
        let user = ru.user_secs;
        let sys = ru.sys_secs;
        let rss_mb = ru.max_rss_bytes as f64 / (1024.0 * 1024.0);
        let _ = writeln!(
            stderr,
            "methylsieve: Processed {n_templates} templates in {wall:.2}s wall, \
             {user:.2}s user CPU, {sys:.2}s system CPU, max RSS {rss_mb:.1} MB.",
        );
        return;
    }

    let _ = writeln!(stderr, "methylsieve: Processed {n_templates} templates in {wall:.2}s wall.");
}

#[cfg(unix)]
struct Rusage {
    user_secs: f64,
    sys_secs: f64,
    max_rss_bytes: u64,
}

/// Snapshot the current process's user/sys CPU and max RSS via
/// `getrusage(RUSAGE_SELF)`. Returns `None` if the syscall fails.
///
/// `ru_maxrss` is in **bytes** on macOS and **kilobytes** on Linux — we
/// normalize to bytes here so the caller doesn't need to care.
#[cfg(unix)]
fn read_rusage() -> Option<Rusage> {
    // SAFETY: `getrusage` writes a `rusage` struct that we just allocated.
    // Both the syscall number and the struct layout come from `libc`.
    let mut ru: libc::rusage = unsafe { std::mem::zeroed() };
    let rc = unsafe { libc::getrusage(libc::RUSAGE_SELF, &mut ru) };
    if rc != 0 {
        return None;
    }
    let user_secs = ru.ru_utime.tv_sec as f64 + ru.ru_utime.tv_usec as f64 * 1e-6;
    let sys_secs = ru.ru_stime.tv_sec as f64 + ru.ru_stime.tv_usec as f64 * 1e-6;
    let max_rss = ru.ru_maxrss as u64;
    // macOS reports bytes; Linux + BSD report kilobytes.
    #[cfg(target_os = "macos")]
    let max_rss_bytes = max_rss;
    #[cfg(not(target_os = "macos"))]
    let max_rss_bytes = max_rss.saturating_mul(1024);
    Some(Rusage { user_secs, sys_secs, max_rss_bytes })
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn parse_contexts_default_is_cph() {
        let m = parse_contexts("CpA,CpC,CpT").unwrap();
        assert!(m.contains(Context::CpA));
        assert!(m.contains(Context::CpC));
        assert!(m.contains(Context::CpT));
        assert!(!m.contains(Context::CpG));
    }

    #[test]
    fn parse_contexts_accepts_short_spelling_and_case() {
        let m = parse_contexts("ca,CG").unwrap();
        assert!(m.contains(Context::CpA));
        assert!(m.contains(Context::CpG));
        assert!(!m.contains(Context::CpC));
    }

    #[test]
    fn parse_contexts_rejects_unknown() {
        assert!(parse_contexts("CpA,CpZ").is_err());
        assert!(parse_contexts("").is_err());
    }

    #[test]
    fn parse_tag_spec_default() {
        let t = parse_tag_spec("XX:Z:UC").unwrap();
        assert_eq!(t.tag, [b'X', b'X']);
        assert_eq!(t.value, b"UC");
    }

    #[test]
    fn parse_tag_spec_rejects_non_z_and_malformed() {
        assert!(parse_tag_spec("XX:i:3").is_err());
        assert!(parse_tag_spec("X:Z:UC").is_err());
        assert!(parse_tag_spec("XX:Z:").is_err());
        assert!(parse_tag_spec("garbage").is_err());
    }

    #[test]
    fn validate_rejects_non_bam_output() {
        let mut a = minimal_args();
        a.output = Some(PathBuf::from("out.sam"));
        assert!(a.validate().is_err());
    }

    #[test]
    fn validate_rejects_default_bam_stdout_plus_stats_stdout() {
        // BAM defaults to stdout when -o is omitted; stats to stdout collides.
        let mut a = minimal_args();
        a.output = None;
        a.stats = Some(PathBuf::from("-"));
        let err = a.validate().unwrap_err().to_string();
        assert!(err.contains("stdout"), "got: {err}");
    }

    #[test]
    fn validate_rejects_stats_and_matrix_both_to_stdout() {
        let mut a = minimal_args();
        a.output = Some(PathBuf::from("out.bam")); // BAM to a file
        a.stats = Some(PathBuf::from("-"));
        a.conversion_matrix = Some(PathBuf::from("-"));
        assert!(a.validate().is_err());
    }

    #[test]
    fn validate_allows_a_single_stdout_stream() {
        // BAM to a file frees stdout for exactly one TSV.
        let mut a = minimal_args();
        a.output = Some(PathBuf::from("out.bam"));
        a.stats = Some(PathBuf::from("-"));
        a.conversion_matrix = Some(PathBuf::from("matrix.tsv"));
        assert!(a.validate().is_ok());
    }

    #[test]
    fn validate_rejects_bad_fraction() {
        let mut a = minimal_args();
        a.max_unconverted_fraction = 1.5;
        assert!(a.validate().is_err());
        a.max_unconverted_fraction = 0.0;
        assert!(a.validate().is_err());
        a.max_unconverted_fraction = 0.5;
        assert!(a.validate().is_ok());
    }

    fn minimal_args() -> Args {
        Args {
            input: None,
            output: None,
            compression_level: CompressionLevel::new(0).unwrap(),
            reference: PathBuf::from("ref.fa"),
            ref_encoding: RefEncodingCli::Bytes,
            contexts: parse_contexts("CpA,CpC,CpT").unwrap(),
            mode: DecisionModeCli::Adaptive,
            max_unconverted_count: 3,
            max_unconverted_fraction: 0.05,
            min_sites: 40,
            min_base_quality: 20,
            ignore_template_ends: 0,
            ignore_supplementary_evidence: false,
            tag: parse_tag_spec("XX:Z:UC").unwrap(),
            count_tag: [b'c', b'h'],
            no_count_tag: false,
            no_qc_fail: false,
            remove_unconverted: false,
            control_contig: vec![],
            stats: None,
            conversion_matrix: None,
            sample: None,
            quiet: true,
            check_crc: false,
            no_check_crc: false,
            read_buffer_mb: 16,
            write_buffer_mb: 64,
        }
    }
}