flarrow_api/io/
query.rs

1use std::sync::Arc;
2
3use crate::prelude::*;
4use thirdparty::arrow_array::Array;
5
6/// Typed Query to query data from the dataflow
7pub struct Query<T: ArrowMessage, F: ArrowMessage> {
8    pub raw: RawQuery,
9
10    _phantom: std::marker::PhantomData<(T, F)>,
11}
12
13impl<T: ArrowMessage, F: ArrowMessage> Query<T, F> {
14    /// Create a new typed Query
15    pub fn new(
16        tx: MessageSender,
17        rx: MessageReceiver,
18        clock: Arc<HLC>,
19        source: NodeLayout,
20        layout: QueryLayout,
21    ) -> Self {
22        Self {
23            raw: RawQuery::new(tx, rx, clock, source, layout),
24            _phantom: std::marker::PhantomData,
25        }
26    }
27
28    /// Query a message from the channel and converting it from Arrow format, blocking until one is available, don't use it
29    /// in async context
30    pub fn blocking_query(&mut self, data: T) -> Result<(Header, F)> {
31        let (header, data) = self.raw.blocking_query(
32            data.try_into_arrow()
33                .wrap_err(report_failed_conversion_to_arrow::<T>(
34                    &self.raw.source,
35                    &self.raw.layout,
36                ))?
37                .into_data(),
38        )?;
39
40        Ok((
41            header,
42            F::try_from_arrow(data).wrap_err(report_failed_conversion_from_arrow::<F>(
43                &self.raw.source,
44                &self.raw.layout,
45            ))?,
46        ))
47    }
48
49    /// Query a message from the channel and converting it from Arrow format, asynchronously
50    pub async fn query(&mut self, data: T) -> Result<(Header, F)> {
51        let (header, data) = self
52            .raw
53            .query(
54                data.try_into_arrow()
55                    .wrap_err(report_failed_conversion_to_arrow::<T>(
56                        &self.raw.source,
57                        &self.raw.layout,
58                    ))?
59                    .into_data(),
60            )
61            .await?;
62
63        Ok((
64            header,
65            F::try_from_arrow(data).wrap_err(report_failed_conversion_from_arrow::<F>(
66                &self.raw.source,
67                &self.raw.layout,
68            ))?,
69        ))
70    }
71}