arrow_pg/
row_encoder.rs

1use std::sync::Arc;
2
3#[cfg(not(feature = "datafusion"))]
4use arrow::array::RecordBatch;
5#[cfg(feature = "datafusion")]
6use datafusion::arrow::array::RecordBatch;
7
8use pgwire::{
9    api::results::{DataRowEncoder, FieldInfo},
10    error::PgWireResult,
11    messages::data::DataRow,
12};
13
14use crate::encoder::encode_value;
15
16pub struct RowEncoder {
17    rb: RecordBatch,
18    curr_idx: usize,
19    fields: Arc<Vec<FieldInfo>>,
20}
21
22impl RowEncoder {
23    pub fn new(rb: RecordBatch, fields: Arc<Vec<FieldInfo>>) -> Self {
24        assert_eq!(rb.num_columns(), fields.len());
25        Self {
26            rb,
27            fields,
28            curr_idx: 0,
29        }
30    }
31
32    pub fn next_row(&mut self) -> Option<PgWireResult<DataRow>> {
33        if self.curr_idx == self.rb.num_rows() {
34            return None;
35        }
36        let mut encoder = DataRowEncoder::new(self.fields.clone());
37        for col in 0..self.rb.num_columns() {
38            let array = self.rb.column(col);
39            let field = &self.fields[col];
40            let type_ = field.datatype();
41            let format = field.format();
42            encode_value(&mut encoder, array, self.curr_idx, type_, format).unwrap();
43        }
44        self.curr_idx += 1;
45        Some(encoder.finish())
46    }
47}