flows_arrow/
project_columns.rs1use arrow_array::RecordBatch;
4use async_flow::{Inputs, Outputs, Port, Result};
5
6pub async fn project_columns(
10 columns: &[usize],
11 mut inputs: Inputs<RecordBatch>,
12 outputs: Outputs<RecordBatch>,
13) -> Result {
14 while let Some(input) = inputs.recv().await? {
15 if input.num_rows() == 0 {
16 continue; }
18
19 let output = input.project(columns).unwrap();
20 if !outputs.is_closed() {
21 outputs.send(output).await?;
22 }
23 }
24
25 Ok(())
26}
27
28#[cfg(test)]
29mod tests {
30 use super::*;
31 use alloc::{boxed::Box, vec};
32 use arrow_array::record_batch;
33 use async_flow::{Channel, InputPort};
34 use core::error::Error;
35
36 #[tokio::test]
37 async fn test_project_columns() -> Result<(), Box<dyn Error>> {
38 let mut inputs = Channel::bounded(10);
39 let mut outputs = Channel::bounded(10);
40 let projecter = tokio::spawn(project_columns(&[1], inputs.rx, outputs.tx));
41
42 let input = record_batch!(
43 ("a", Int32, [1, 2, 3]),
44 ("b", Float64, [Some(4.0), None, Some(5.0)]),
45 ("c", Utf8, ["alpha", "beta", "gamma"])
46 )?;
47 assert_eq!(input.num_columns(), 3);
48 assert_eq!(input.num_rows(), 3);
49 inputs.tx.send(input.clone()).await?;
50 inputs.tx.send(input.clone()).await?;
51 inputs.tx.close();
52
53 let _ = tokio::join!(projecter);
54
55 let outputs = outputs.rx.recv_all().await?;
56 assert_eq!(outputs.len(), 2);
57 for output in outputs {
58 assert_eq!(output.num_columns(), 1);
59 assert_eq!(output.num_rows(), 3);
60 }
61
62 Ok(())
63 }
64}