deepbiop_fa/encode/
parquet.rs

1use crate::encode::record::RecordData;
2use std::{
3    fmt::Display,
4    path::{Path, PathBuf},
5    sync::Arc,
6};
7
8use crate::encode::traits::Encoder;
9use arrow::array::{Array, RecordBatch, StringBuilder};
10use arrow::datatypes::{DataType, Field, Schema};
11
12use derive_builder::Builder;
13use log::{debug, info};
14use serde::{Deserialize, Serialize};
15
16use super::record::RecordDataBuilder;
17use deepbiop_utils::io::write_parquet;
18
19use super::option::EncoderOption;
20
21use anyhow::{Context, Result};
22use pyo3::prelude::*;
23use rayon::prelude::*;
24
25use pyo3_stub_gen::derive::*;
26
27/// An encoder for converting FASTA records to Parquet format.
28///
29/// This struct provides functionality to encode FASTA sequence data into Parquet files,
30/// which are an efficient columnar storage format.
31///
32/// # Fields
33///
34/// * `option` - Configuration options for the encoder, including which bases to consider
35///
36/// # Example
37///
38/// ```
39/// use deepbiop_fa::encode::{option::EncoderOption, parquet::ParquetEncoder};
40///
41/// let options = EncoderOption::default();
42/// let encoder = ParquetEncoder::new(options);
43/// ```
44#[gen_stub_pyclass]
45#[pyclass(module = "deepbiop.fa")]
46#[derive(Debug, Builder, Default, Clone, Serialize, Deserialize)]
47pub struct ParquetEncoder {
48    pub option: EncoderOption,
49}
50
51impl ParquetEncoder {
52    pub fn new(option: EncoderOption) -> Self {
53        Self { option }
54    }
55
56    fn generate_schema(&self) -> Arc<Schema> {
57        Arc::new(Schema::new(vec![
58            Field::new("id", DataType::Utf8, false),
59            Field::new("seq", DataType::Utf8, false),
60        ]))
61    }
62
63    fn generate_batches(
64        &self,
65        records: &[RecordData],
66        schema: &Arc<Schema>,
67    ) -> Result<Vec<RecordBatch>> {
68        // Process smaller batches to avoid 2GB limit
69        const BATCH_SIZE: usize = 10000; // Adjust this value based on your data size
70        let all_batches: Vec<_> = records
71            .par_chunks(BATCH_SIZE)
72            .map(|chunk| {
73                let _capacity = chunk.len();
74
75                let mut id_builder = StringBuilder::new();
76                let mut seq_builder = StringBuilder::new();
77
78                for data in chunk {
79                    let record = self
80                        .encode_record(data.id.as_ref(), data.seq.as_ref())
81                        .context(format!(
82                            "encode fq read id {} error",
83                            String::from_utf8_lossy(data.id.as_ref())
84                        ))
85                        .unwrap();
86                    id_builder.append_value(record.id.to_string());
87                    seq_builder.append_value(record.seq.to_string());
88                }
89
90                RecordBatch::try_new(
91                    schema.clone(),
92                    vec![
93                        Arc::new(id_builder.finish()) as Arc<dyn Array>,
94                        Arc::new(seq_builder.finish()) as Arc<dyn Array>,
95                    ],
96                )
97                .unwrap()
98            })
99            .collect();
100        debug!("all batches: {}", all_batches.len());
101        Ok(all_batches)
102    }
103
104    fn generate_batch(&self, records: &[RecordData], schema: &Arc<Schema>) -> Result<RecordBatch> {
105        let all_batches = self.generate_batches(records, schema)?;
106        // Concatenate all batches
107        arrow::compute::concat_batches(schema, &all_batches)
108            .context("Failed to concatenate record batches")
109    }
110
111    pub fn encode_chunk<P: AsRef<Path>>(
112        &mut self,
113        path: P,
114        chunk_size: usize,
115        parallel: bool,
116    ) -> Result<()> {
117        let schema = self.generate_schema();
118        let records = self.fetch_records(&path)?;
119        info!("Encoding records with chunk size {} ", chunk_size);
120
121        // create a folder for the chunk parquet files
122        let file_name = path.as_ref().file_name().unwrap().to_str().unwrap();
123        let chunks_folder = path
124            .as_ref()
125            .parent()
126            .unwrap()
127            .join(format!("{}_{}", file_name, "chunks"))
128            .to_path_buf();
129        // create the folder
130        std::fs::create_dir_all(&chunks_folder).context("Failed to create folder for chunks")?;
131
132        if parallel {
133            records
134                // .chunks(chunk_size)
135                .par_chunks(chunk_size)
136                .enumerate()
137                .for_each(|(idx, record)| {
138                    let record_batch = self
139                        .generate_batch(record, &schema)
140                        .context(format!("Failed to generate record batch for chunk {}", idx))
141                        .unwrap();
142                    let parquet_path = chunks_folder.join(format!("{}_{}.parquet", file_name, idx));
143                    write_parquet(parquet_path, record_batch, schema.clone())
144                        .context(format!("Failed to write parquet file for chunk {}", idx))
145                        .unwrap();
146                });
147        } else {
148            records
149                .chunks(chunk_size)
150                .enumerate()
151                .for_each(|(idx, record)| {
152                    let record_batch = self
153                        .generate_batch(record, &schema)
154                        .context(format!("Failed to generate record batch for chunk {}", idx))
155                        .unwrap();
156                    let parquet_path = chunks_folder.join(format!("{}_{}.parquet", file_name, idx));
157                    write_parquet(parquet_path, record_batch, schema.clone())
158                        .context(format!("Failed to write parquet file for chunk {}", idx))
159                        .unwrap();
160                });
161        }
162
163        Ok(())
164    }
165}
166
167impl Display for ParquetEncoder {
168    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
169        write!(f, "FaEncoder {{ option: {} }}", self.option)
170    }
171}
172
173impl Encoder for ParquetEncoder {
174    type EncodeOutput = Result<(Vec<RecordBatch>, Arc<Schema>)>;
175    type RecordOutput = Result<RecordData>;
176
177    fn encode_multiple(&mut self, _paths: &[PathBuf], _parallel: bool) -> Self::EncodeOutput {
178        todo!()
179    }
180
181    fn encode<P: AsRef<Path>>(&mut self, path: P) -> Self::EncodeOutput {
182        // Define the schema of the data (one column of integers)
183        let schema = self.generate_schema();
184        let records = self.fetch_records(path)?;
185        let record_batch = self.generate_batches(&records, &schema)?;
186        Ok((record_batch, schema))
187    }
188
189    fn encode_record(&self, id: &[u8], seq: &[u8]) -> Self::RecordOutput {
190        let result = RecordDataBuilder::default()
191            .id(id.into())
192            .seq(seq.into())
193            .build()
194            .context("Failed to build parquet data")?;
195        Ok(result)
196    }
197}
198
199#[cfg(test)]
200mod tests {
201    use deepbiop_utils::io::write_parquet_for_batches;
202
203    use crate::encode::option::EncoderOptionBuilder;
204
205    use super::*;
206    #[test]
207    fn test_encode_fa_for_parquet() {
208        let option = EncoderOptionBuilder::default().build().unwrap();
209        let mut encoder = ParquetEncoderBuilder::default()
210            .option(option)
211            .build()
212            .unwrap();
213
214        let (record_batch, scheme) = encoder.encode("tests/data/test.fa").unwrap();
215        write_parquet_for_batches("test.parquet", &record_batch, scheme).unwrap();
216        // remove test.parquet
217        std::fs::remove_file("test.parquet").unwrap();
218    }
219}