exon_bam/
array_builder.rs

1// Copyright 2023 WHERE TRUE Technologies.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use arrow::{
18    array::{ArrayRef, GenericListBuilder, GenericStringBuilder, Int32Builder, Int64Builder},
19    error::ArrowError,
20};
21use exon_common::ExonArrayBuilder;
22use exon_sam::TagsBuilder;
23use noodles::sam::{
24    alignment::record::{cigar::op::Kind, Cigar},
25    Header,
26};
27
28const BATCH_SIZE: usize = 8192;
29
30use crate::BAMConfig;
31
32use super::indexed_async_batch_stream::SemiLazyRecord;
33
34/// Builds an vector of arrays from a SAM file.
35pub struct BAMArrayBuilder {
36    names: GenericStringBuilder<i32>,
37    flags: Int32Builder,
38    references: GenericStringBuilder<i32>,
39    starts: Int64Builder,
40    ends: Int64Builder,
41    mapping_qualities: GenericStringBuilder<i32>,
42    cigar: GenericStringBuilder<i32>,
43    mate_references: GenericStringBuilder<i32>,
44    sequences: GenericStringBuilder<i32>,
45    quality_scores: GenericListBuilder<i32, Int64Builder>,
46
47    tags: TagsBuilder,
48
49    projection: Vec<usize>,
50
51    rows: usize,
52
53    reference_names: Vec<String>,
54}
55
56impl BAMArrayBuilder {
57    /// Creates a new SAM array builder.
58    pub fn create(header: Arc<Header>, bam_config: Arc<BAMConfig>) -> Self {
59        let reference_names = header
60            .reference_sequences()
61            .keys()
62            .map(|k| k.to_string())
63            .collect::<Vec<_>>();
64
65        let item_capacity = BATCH_SIZE;
66
67        let quality_score_inner = Int64Builder::new();
68
69        let tags_builder = bam_config
70            .file_schema
71            .field_with_name("tags")
72            .map_or(TagsBuilder::default(), |field| {
73                TagsBuilder::try_from(field.data_type()).unwrap()
74            });
75
76        Self {
77            names: GenericStringBuilder::<i32>::new(),
78            flags: Int32Builder::new(),
79            references: GenericStringBuilder::<i32>::with_capacity(
80                item_capacity,
81                item_capacity * 10,
82            ),
83            starts: Int64Builder::with_capacity(item_capacity),
84            ends: Int64Builder::with_capacity(item_capacity),
85            mapping_qualities: GenericStringBuilder::<i32>::new(),
86            cigar: GenericStringBuilder::<i32>::new(),
87            mate_references: GenericStringBuilder::<i32>::new(),
88            sequences: GenericStringBuilder::<i32>::new(),
89            quality_scores: GenericListBuilder::new(quality_score_inner),
90
91            tags: tags_builder,
92
93            projection: bam_config.projection(),
94
95            rows: 0,
96
97            reference_names,
98        }
99    }
100
101    /// Appends a record to the builder.
102    pub(crate) fn append(&mut self, record: &SemiLazyRecord) -> Result<(), ArrowError> {
103        for col_idx in self.projection.iter() {
104            match col_idx {
105                0 => {
106                    if let Some(name) = record.record().name() {
107                        let sam_read_name = std::str::from_utf8(name)?;
108
109                        self.names.append_value(sam_read_name);
110                    } else {
111                        self.names.append_null();
112                    }
113                }
114                1 => {
115                    let flag_bits = record.record().flags().bits();
116                    self.flags.append_value(flag_bits as i32);
117                }
118                2 => match record.record().reference_sequence_id() {
119                    Some(reference_sequence_id) => {
120                        let reference_name = &self.reference_names[reference_sequence_id];
121
122                        self.references.append_value(reference_name);
123                    }
124                    None => {
125                        self.references.append_null();
126                    }
127                },
128                3 => {
129                    self.starts
130                        .append_option(record.record().alignment_start().map(|v| v.get() as i64));
131                }
132                4 => {
133                    let alignment_end = record.alignment_end().map(|v| v.get() as i64);
134                    self.ends.append_option(alignment_end);
135                }
136                5 => {
137                    self.mapping_qualities.append_option(
138                        record
139                            .record()
140                            .mapping_quality()
141                            .map(|v| v.get().to_string()),
142                    );
143                }
144                6 => {
145                    let cigar = record.record().cigar();
146
147                    let mut cigar_to_print = Vec::new();
148
149                    for op_result in cigar.iter() {
150                        let op = op_result?;
151
152                        let kind_str = match op.kind() {
153                            Kind::Deletion => "D",
154                            Kind::Insertion => "I",
155                            Kind::HardClip => "H",
156                            Kind::SoftClip => "S",
157                            Kind::Match => "M",
158                            Kind::SequenceMismatch => "X",
159                            Kind::Skip => "N",
160                            Kind::Pad => "P",
161                            Kind::SequenceMatch => "=",
162                        };
163
164                        cigar_to_print.push(format!("{}{}", op.len(), kind_str));
165                    }
166
167                    self.cigar.append_value(cigar_to_print.join(""));
168                }
169                7 => match record.record().mate_reference_sequence_id() {
170                    Some(mate_reference_sequence_id) => {
171                        let mate_reference_name = &self.reference_names[mate_reference_sequence_id];
172
173                        self.mate_references.append_value(mate_reference_name);
174                    }
175                    None => {
176                        self.mate_references.append_null();
177                    }
178                },
179                8 => {
180                    let sequence = record.record().sequence().as_ref();
181                    let sequence_str = std::str::from_utf8(sequence)?;
182
183                    self.sequences.append_value(sequence_str);
184                }
185                9 => {
186                    let quality_scores = record.record().quality_scores();
187
188                    let quality_scores_str = quality_scores.as_ref();
189                    let slice_i8: &[i8] = unsafe {
190                        std::slice::from_raw_parts(
191                            quality_scores_str.as_ptr() as *const i8,
192                            quality_scores_str.len(),
193                        )
194                    };
195
196                    // Convert the i8s into i64s
197                    let slice_i64 = slice_i8.iter().map(|v| *v as i64).collect::<Vec<_>>();
198
199                    self.quality_scores.values().append_slice(&slice_i64);
200                    self.quality_scores.append(true);
201                }
202                10 => {
203                    let data = record.record().data();
204                    self.tags.append(data)?;
205                }
206                _ => {
207                    return Err(ArrowError::InvalidArgumentError(format!(
208                        "Invalid column index {} for SAM",
209                        col_idx
210                    )))
211                }
212            }
213        }
214
215        self.rows += 1;
216
217        Ok(())
218    }
219
220    /// Finishes the builder and returns an vector of arrays.
221    pub fn finish(&mut self) -> Vec<ArrayRef> {
222        let mut arrays: Vec<ArrayRef> = Vec::new();
223
224        for col_idx in self.projection.iter() {
225            match col_idx {
226                0 => arrays.push(Arc::new(self.names.finish())),
227                1 => arrays.push(Arc::new(self.flags.finish())),
228                2 => arrays.push(Arc::new(self.references.finish())),
229                3 => arrays.push(Arc::new(self.starts.finish())),
230                4 => arrays.push(Arc::new(self.ends.finish())),
231                5 => arrays.push(Arc::new(self.mapping_qualities.finish())),
232                6 => arrays.push(Arc::new(self.cigar.finish())),
233                7 => arrays.push(Arc::new(self.mate_references.finish())),
234                8 => arrays.push(Arc::new(self.sequences.finish())),
235                9 => arrays.push(Arc::new(self.quality_scores.finish())),
236                10 => {
237                    let tags = self.tags.finish();
238                    arrays.push(Arc::new(tags))
239                }
240                _ => panic!("Invalid column index {} for SAM", col_idx),
241            }
242        }
243
244        arrays
245    }
246}
247
248impl ExonArrayBuilder for BAMArrayBuilder {
249    /// Finishes building the internal data structures and returns the built arrays.
250    fn finish(&mut self) -> Vec<ArrayRef> {
251        self.finish()
252    }
253
254    /// Returns the number of elements in the array.
255    fn len(&self) -> usize {
256        self.rows
257    }
258}