1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
// Copyright 2023 WHERE TRUE Technologies.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::{str::FromStr, sync::Arc};

use arrow::{error::ArrowError, error::Result as ArrowResult, record_batch::RecordBatch};

use futures::Stream;
use tokio::io::{AsyncBufRead, AsyncBufReadExt};

use super::{array_builder::GFFArrayBuilder, GFFConfig};

/// Reads a GFF file into arrow record batches.
pub struct BatchReader<R> {
    /// The reader to read from.
    reader: R,

    /// The configuration for this reader.
    config: Arc<GFFConfig>,
}

impl<R> BatchReader<R>
where
    R: AsyncBufRead + Unpin + Send,
{
    pub fn new(reader: R, config: Arc<GFFConfig>) -> Self {
        Self { reader, config }
    }

    pub fn into_stream(self) -> impl Stream<Item = Result<RecordBatch, ArrowError>> {
        futures::stream::unfold(self, |mut reader| async move {
            match reader.read_batch().await {
                Ok(Some(batch)) => Some((Ok(batch), reader)),
                Ok(None) => None,
                Err(e) => Some((Err(ArrowError::ExternalError(Box::new(e))), reader)),
            }
        })
    }

    async fn read_line(&mut self) -> std::io::Result<Option<noodles::gff::Line>> {
        loop {
            let mut buf = String::new();
            match self.reader.read_line(&mut buf).await {
                Ok(0) => return Ok(None),
                Ok(_) => {
                    buf.pop();

                    #[cfg(target_os = "windows")]
                    if buf.ends_with('\r') {
                        buf.pop();
                    }

                    let line = match noodles::gff::Line::from_str(&buf) {
                        Ok(line) => line,
                        Err(e) => match e {
                            noodles::gff::line::ParseError::InvalidDirective(_) => {
                                continue;
                            }
                            noodles::gff::line::ParseError::InvalidRecord(e) => {
                                return Err(std::io::Error::new(
                                    std::io::ErrorKind::InvalidData,
                                    format!("invalid record: {buf} error: {e}"),
                                ))
                            }
                        },
                    };
                    buf.clear();
                    return Ok(Some(line));
                }
                Err(e) => return Err(e),
            };
        }
    }

    async fn read_batch(&mut self) -> ArrowResult<Option<RecordBatch>> {
        let mut gff_array_builder = GFFArrayBuilder::new();

        for _ in 0..self.config.batch_size {
            match self.read_line().await? {
                None => break,
                Some(line) => match line {
                    noodles::gff::Line::Comment(_) => {}
                    noodles::gff::Line::Directive(_) => {}
                    noodles::gff::Line::Record(record) => {
                        gff_array_builder.append(&record)?;
                    }
                },
            }
        }

        if gff_array_builder.is_empty() {
            return Ok(None);
        }

        let batch =
            RecordBatch::try_new(self.config.file_schema.clone(), gff_array_builder.finish())?;

        match &self.config.projection {
            Some(projection) => Ok(Some(batch.project(projection)?)),
            None => Ok(Some(batch)),
        }
    }
}