Skip to main content

flows_arrow/
project_columns.rs

1// This is free and unencumbered software released into the public domain.
2
3use arrow_array::RecordBatch;
4use async_flow::{Inputs, Outputs, Port, Result};
5
6/// A block that projects columns from input batches to output batches.
7///
8/// Panics in case the specified columns are out of bounds.
9pub async fn project_columns(
10    columns: &[usize],
11    mut inputs: Inputs<RecordBatch>,
12    outputs: Outputs<RecordBatch>,
13) -> Result {
14    while let Some(input) = inputs.recv().await? {
15        if input.num_rows() == 0 {
16            continue; // skip empty batches
17        }
18
19        let output = input.project(columns).unwrap();
20        if !outputs.is_closed() {
21            outputs.send(output).await?;
22        }
23    }
24
25    Ok(())
26}
27
28#[cfg(test)]
29mod tests {
30    use super::*;
31    use alloc::{boxed::Box, vec};
32    use arrow_array::record_batch;
33    use async_flow::{Channel, InputPort};
34    use core::error::Error;
35
36    #[tokio::test]
37    async fn test_project_columns() -> Result<(), Box<dyn Error>> {
38        let mut inputs = Channel::bounded(10);
39        let mut outputs = Channel::bounded(10);
40        let projecter = tokio::spawn(project_columns(&[1], inputs.rx, outputs.tx));
41
42        let input = record_batch!(
43            ("a", Int32, [1, 2, 3]),
44            ("b", Float64, [Some(4.0), None, Some(5.0)]),
45            ("c", Utf8, ["alpha", "beta", "gamma"])
46        )?;
47        assert_eq!(input.num_columns(), 3);
48        assert_eq!(input.num_rows(), 3);
49        inputs.tx.send(input.clone()).await?;
50        inputs.tx.send(input.clone()).await?;
51        inputs.tx.close();
52
53        let _ = tokio::join!(projecter);
54
55        let outputs = outputs.rx.recv_all().await?;
56        assert_eq!(outputs.len(), 2);
57        for output in outputs {
58            assert_eq!(output.num_columns(), 1);
59            assert_eq!(output.num_rows(), 3);
60        }
61
62        Ok(())
63    }
64}