1use std::io;
4use std::num::NonZeroUsize;
5use std::path::PathBuf;
6
7use anyhow::Context;
8use noodles::bam;
9use noodles::bgzf;
10use noodles::bgzf::writer::CompressionLevel;
11use noodles::cram;
12use noodles::fasta;
13use noodles::sam::alignment::Record;
14use noodles::sam::header::record::value::map::ReferenceSequence;
15use noodles::sam::header::record::value::Map;
16use tokio::fs::File;
17use tracing::info;
18
19use crate::utils::args::CompressionStrategy;
20use crate::utils::args::NumberOfRecords;
21use crate::utils::display::RecordCounter;
22use crate::utils::formats;
23use crate::utils::formats::sam::ParsedAsyncSAMFile;
24
25pub async fn to_bam_async(
27 from: PathBuf,
28 to: PathBuf,
29 max_records: NumberOfRecords,
30 compression_strategy: CompressionStrategy,
31) -> anyhow::Result<()> {
32 let ParsedAsyncSAMFile { mut reader, header } =
34 formats::sam::open_and_parse_async(from).await?;
35
36 let compression_level: CompressionLevel = compression_strategy.into();
38
39 let mut writer = File::create(to)
41 .await
42 .map(|f| {
43 bgzf::r#async::writer::Builder::default()
44 .set_compression_level(compression_level)
45 .build_with_writer(f)
46 })
47 .map(bam::AsyncWriter::from)
48 .with_context(|| "opening output filestream")?;
49
50 writer.write_header(&header.parsed).await?;
52 writer
53 .write_reference_sequences(header.parsed.reference_sequences())
54 .await?;
55
56 let mut counter = RecordCounter::new();
57 let mut record = Record::default();
58
59 while reader.read_record(&header.parsed, &mut record).await? != 0 {
61 writer
62 .write_alignment_record(&header.parsed, &record)
63 .await?;
64
65 counter.inc();
66
67 if counter.time_to_break(&max_records) {
68 break;
69 }
70 }
71
72 writer.shutdown().await?;
74
75 Ok(())
76}
77
78pub async fn to_cram_async(
80 from: PathBuf,
81 to: PathBuf,
82 fasta: PathBuf,
83 max_records: NumberOfRecords,
84) -> anyhow::Result<()> {
85 let ParsedAsyncSAMFile {
87 mut reader,
88 mut header,
89 ..
90 } = formats::sam::open_and_parse_async(from).await?;
91
92 let mut fasta_reader = formats::fasta::open(fasta)?;
94 let records: Vec<fasta::Record> = fasta_reader
95 .records()
96 .collect::<io::Result<Vec<fasta::Record>>>()?;
97
98 let reference_sequences = header.parsed.reference_sequences_mut();
100 for record in records.iter() {
101 let name_as_string = record.name().to_owned();
102 let name = name_as_string.parse()?;
103 let length = record.sequence().len();
104
105 let reference_sequence = Map::<ReferenceSequence>::new(NonZeroUsize::try_from(length)?);
106 reference_sequences.insert(name, reference_sequence);
107 }
108
109 let repository = fasta::Repository::new(records);
110
111 let handle = File::create(to).await?;
113 let mut writer = cram::r#async::writer::Builder::default()
114 .set_reference_sequence_repository(repository)
115 .build_with_writer(handle);
116
117 info!("Writing the file definition and header to CRAM file.");
119
120 writer.write_file_definition().await?;
121 writer.write_file_header(&header.parsed).await?;
122
123 let mut counter = RecordCounter::new();
124 let mut record = Record::default();
125
126 info!("Writing records to CRAM file.");
128 while reader.read_record(&header.parsed, &mut record).await? != 0 {
129 let cram_record = cram::Record::try_from_alignment_record(&header.parsed, &record)?;
130 writer
131 .write_record(&header.parsed, cram_record)
132 .await
133 .with_context(|| "Writing CRAM record.")?;
134
135 counter.inc();
136
137 if counter.time_to_break(&max_records) {
138 break;
139 }
140 }
141
142 writer.shutdown(&header.parsed).await?;
143 Ok(())
144}