deepbiop_fa/encode/
parquet.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
use crate::encode::record::RecordData;
use std::{
    fmt::Display,
    path::{Path, PathBuf},
    sync::Arc,
};

use crate::encode::traits::Encoder;
use arrow::array::{Array, RecordBatch, StringBuilder};
use arrow::datatypes::{DataType, Field, Schema};

use derive_builder::Builder;
use log::info;
use serde::{Deserialize, Serialize};

use super::{record::RecordDataBuilder, Element};
use deepbiop_utils::io::write_parquet;

use super::option::FaEncoderOption;

use anyhow::{Context, Result};
use pyo3::prelude::*;
use rayon::prelude::*;

use pyo3_stub_gen::derive::*;

#[gen_stub_pyclass]
#[pyclass(module = "deepbiop.fa")]
#[derive(Debug, Builder, Default, Clone, Serialize, Deserialize)]
pub struct ParquetEncoder {
    pub option: FaEncoderOption,
}

impl ParquetEncoder {
    pub fn new(option: FaEncoderOption) -> Self {
        Self { option }
    }

    fn generate_schema(&self) -> Arc<Schema> {
        Arc::new(Schema::new(vec![
            Field::new("id", DataType::Utf8, false),
            Field::new("seq", DataType::Utf8, false),
        ]))
    }

    fn generate_batch(&self, records: &[RecordData], schema: &Arc<Schema>) -> Result<RecordBatch> {
        let data: Vec<RecordData> = records
            .into_par_iter()
            .filter_map(|data| {
                let id = data.id.as_ref();
                let seq = data.seq.as_ref();
                match self.encode_record(id, seq).context(format!(
                    "encode fq read id {} error",
                    String::from_utf8_lossy(id)
                )) {
                    Ok(result) => Some(result),
                    Err(_e) => None,
                }
            })
            .collect();

        // Create builders for each field
        let mut id_builder = StringBuilder::new();
        let mut seq_builder = StringBuilder::new();

        // Populate builders
        data.into_iter().for_each(|parquet_record| {
            id_builder.append_value(parquet_record.id.to_string());
            seq_builder.append_value(parquet_record.seq.to_string());
        });

        // Build arrays
        let id_array = Arc::new(id_builder.finish());
        let seq_array = Arc::new(seq_builder.finish());

        // Create a RecordBatch
        let record_batch = RecordBatch::try_new(
            schema.clone(),
            vec![id_array as Arc<dyn Array>, seq_array as Arc<dyn Array>],
        )?;
        Ok(record_batch)
    }

    pub fn encode_chunk<P: AsRef<Path>>(
        &mut self,
        path: P,
        chunk_size: usize,
        parallel: bool,
    ) -> Result<()> {
        let schema = self.generate_schema();
        let records = self.fetch_records(&path)?;
        info!("Encoding records with chunk size {} ", chunk_size);

        // create a folder for the chunk parquet files
        let file_name = path.as_ref().file_name().unwrap().to_str().unwrap();
        let chunks_folder = path
            .as_ref()
            .parent()
            .unwrap()
            .join(format!("{}_{}", file_name, "chunks"))
            .to_path_buf();
        // create the folder
        std::fs::create_dir_all(&chunks_folder).context("Failed to create folder for chunks")?;

        if parallel {
            records
                // .chunks(chunk_size)
                .par_chunks(chunk_size)
                .enumerate()
                .for_each(|(idx, record)| {
                    let record_batch = self
                        .generate_batch(record, &schema)
                        .context(format!("Failed to generate record batch for chunk {}", idx))
                        .unwrap();
                    let parquet_path = chunks_folder.join(format!("{}_{}.parquet", file_name, idx));
                    write_parquet(parquet_path, record_batch, schema.clone())
                        .context(format!("Failed to write parquet file for chunk {}", idx))
                        .unwrap();
                });
        } else {
            records
                .chunks(chunk_size)
                .enumerate()
                .for_each(|(idx, record)| {
                    let record_batch = self
                        .generate_batch(record, &schema)
                        .context(format!("Failed to generate record batch for chunk {}", idx))
                        .unwrap();
                    let parquet_path = chunks_folder.join(format!("{}_{}.parquet", file_name, idx));
                    write_parquet(parquet_path, record_batch, schema.clone())
                        .context(format!("Failed to write parquet file for chunk {}", idx))
                        .unwrap();
                });
        }

        Ok(())
    }
}

impl Display for ParquetEncoder {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        write!(f, "FaEncoder {{ option: {} }}", self.option)
    }
}

impl Encoder for ParquetEncoder {
    type TargetOutput = Result<Vec<Element>>;
    type RecordOutput = Result<RecordData>;
    type EncodeOutput = Result<(RecordBatch, Arc<Schema>)>;

    fn encode_record(&self, id: &[u8], seq: &[u8]) -> Self::RecordOutput {
        let result = RecordDataBuilder::default()
            .id(id.into())
            .seq(seq.into())
            .build()
            .context("Failed to build parquet data")?;
        Ok(result)
    }

    fn encode<P: AsRef<Path>>(&mut self, path: P) -> Self::EncodeOutput {
        // Define the schema of the data (one column of integers)
        let schema = self.generate_schema();
        let records = self.fetch_records(path)?;
        let record_batch = self.generate_batch(&records, &schema)?;
        Ok((record_batch, schema))
    }

    fn encode_multiple(&mut self, _paths: &[PathBuf], _parallel: bool) -> Self::EncodeOutput {
        todo!()
    }
}

#[cfg(test)]
mod tests {
    use crate::encode::option::FaEncoderOptionBuilder;

    use super::*;
    #[test]
    fn test_encode_fq_for_parquet() {
        let option = FaEncoderOptionBuilder::default().build().unwrap();
        let mut encoder = ParquetEncoderBuilder::default()
            .option(option)
            .build()
            .unwrap();

        let (record_batch, scheme) = encoder.encode("tests/data/test.fa").unwrap();
        write_parquet("test.parquet", record_batch, scheme).unwrap();
        // remove test.parquet
        std::fs::remove_file("test.parquet").unwrap();
    }
}