deepbiop_fa/encode/
parquet.rs1use crate::encode::record::RecordData;
2use std::{
3 fmt::Display,
4 path::{Path, PathBuf},
5 sync::Arc,
6};
7
8use crate::encode::traits::Encoder;
9use arrow::array::{Array, RecordBatch, StringBuilder};
10use arrow::datatypes::{DataType, Field, Schema};
11
12use derive_builder::Builder;
13use log::{debug, info};
14use serde::{Deserialize, Serialize};
15
16use super::record::RecordDataBuilder;
17use deepbiop_utils::io::write_parquet;
18
19use super::option::EncoderOption;
20
21use anyhow::{Context, Result};
22use pyo3::prelude::*;
23use rayon::prelude::*;
24
25use pyo3_stub_gen::derive::*;
26
27#[gen_stub_pyclass]
45#[pyclass(module = "deepbiop.fa")]
46#[derive(Debug, Builder, Default, Clone, Serialize, Deserialize)]
47pub struct ParquetEncoder {
48 pub option: EncoderOption,
49}
50
51impl ParquetEncoder {
52 pub fn new(option: EncoderOption) -> Self {
53 Self { option }
54 }
55
56 fn generate_schema(&self) -> Arc<Schema> {
57 Arc::new(Schema::new(vec![
58 Field::new("id", DataType::Utf8, false),
59 Field::new("seq", DataType::Utf8, false),
60 ]))
61 }
62
63 fn generate_batches(
64 &self,
65 records: &[RecordData],
66 schema: &Arc<Schema>,
67 ) -> Result<Vec<RecordBatch>> {
68 const BATCH_SIZE: usize = 10000; let all_batches: Vec<_> = records
71 .par_chunks(BATCH_SIZE)
72 .map(|chunk| {
73 let _capacity = chunk.len();
74
75 let mut id_builder = StringBuilder::new();
76 let mut seq_builder = StringBuilder::new();
77
78 for data in chunk {
79 let record = self
80 .encode_record(data.id.as_ref(), data.seq.as_ref())
81 .context(format!(
82 "encode fq read id {} error",
83 String::from_utf8_lossy(data.id.as_ref())
84 ))
85 .unwrap();
86 id_builder.append_value(record.id.to_string());
87 seq_builder.append_value(record.seq.to_string());
88 }
89
90 RecordBatch::try_new(
91 schema.clone(),
92 vec![
93 Arc::new(id_builder.finish()) as Arc<dyn Array>,
94 Arc::new(seq_builder.finish()) as Arc<dyn Array>,
95 ],
96 )
97 .unwrap()
98 })
99 .collect();
100 debug!("all batches: {}", all_batches.len());
101 Ok(all_batches)
102 }
103
104 fn generate_batch(&self, records: &[RecordData], schema: &Arc<Schema>) -> Result<RecordBatch> {
105 let all_batches = self.generate_batches(records, schema)?;
106 arrow::compute::concat_batches(schema, &all_batches)
108 .context("Failed to concatenate record batches")
109 }
110
111 pub fn encode_chunk<P: AsRef<Path>>(
112 &mut self,
113 path: P,
114 chunk_size: usize,
115 parallel: bool,
116 ) -> Result<()> {
117 let schema = self.generate_schema();
118 let records = self.fetch_records(&path)?;
119 info!("Encoding records with chunk size {} ", chunk_size);
120
121 let file_name = path.as_ref().file_name().unwrap().to_str().unwrap();
123 let chunks_folder = path
124 .as_ref()
125 .parent()
126 .unwrap()
127 .join(format!("{}_{}", file_name, "chunks"))
128 .to_path_buf();
129 std::fs::create_dir_all(&chunks_folder).context("Failed to create folder for chunks")?;
131
132 if parallel {
133 records
134 .par_chunks(chunk_size)
136 .enumerate()
137 .for_each(|(idx, record)| {
138 let record_batch = self
139 .generate_batch(record, &schema)
140 .context(format!("Failed to generate record batch for chunk {}", idx))
141 .unwrap();
142 let parquet_path = chunks_folder.join(format!("{}_{}.parquet", file_name, idx));
143 write_parquet(parquet_path, record_batch, schema.clone())
144 .context(format!("Failed to write parquet file for chunk {}", idx))
145 .unwrap();
146 });
147 } else {
148 records
149 .chunks(chunk_size)
150 .enumerate()
151 .for_each(|(idx, record)| {
152 let record_batch = self
153 .generate_batch(record, &schema)
154 .context(format!("Failed to generate record batch for chunk {}", idx))
155 .unwrap();
156 let parquet_path = chunks_folder.join(format!("{}_{}.parquet", file_name, idx));
157 write_parquet(parquet_path, record_batch, schema.clone())
158 .context(format!("Failed to write parquet file for chunk {}", idx))
159 .unwrap();
160 });
161 }
162
163 Ok(())
164 }
165}
166
167impl Display for ParquetEncoder {
168 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
169 write!(f, "FaEncoder {{ option: {} }}", self.option)
170 }
171}
172
173impl Encoder for ParquetEncoder {
174 type EncodeOutput = Result<(Vec<RecordBatch>, Arc<Schema>)>;
175 type RecordOutput = Result<RecordData>;
176
177 fn encode_multiple(&mut self, _paths: &[PathBuf], _parallel: bool) -> Self::EncodeOutput {
178 todo!()
179 }
180
181 fn encode<P: AsRef<Path>>(&mut self, path: P) -> Self::EncodeOutput {
182 let schema = self.generate_schema();
184 let records = self.fetch_records(path)?;
185 let record_batch = self.generate_batches(&records, &schema)?;
186 Ok((record_batch, schema))
187 }
188
189 fn encode_record(&self, id: &[u8], seq: &[u8]) -> Self::RecordOutput {
190 let result = RecordDataBuilder::default()
191 .id(id.into())
192 .seq(seq.into())
193 .build()
194 .context("Failed to build parquet data")?;
195 Ok(result)
196 }
197}
198
199#[cfg(test)]
200mod tests {
201 use deepbiop_utils::io::write_parquet_for_batches;
202
203 use crate::encode::option::EncoderOptionBuilder;
204
205 use super::*;
206 #[test]
207 fn test_encode_fa_for_parquet() {
208 let option = EncoderOptionBuilder::default().build().unwrap();
209 let mut encoder = ParquetEncoderBuilder::default()
210 .option(option)
211 .build()
212 .unwrap();
213
214 let (record_batch, scheme) = encoder.encode("tests/data/test.fa").unwrap();
215 write_parquet_for_batches("test.parquet", &record_batch, scheme).unwrap();
216 std::fs::remove_file("test.parquet").unwrap();
218 }
219}