iridis_api/io/
query.rs

1use std::sync::Arc;
2
3use crate::prelude::*;
4use thirdparty::arrow_array::Array;
5
6/// Typed Query to query data from the dataflow
7pub struct Query<T: ArrowMessage, F: ArrowMessage> {
8    pub raw: RawQuery,
9
10    _phantom: std::marker::PhantomData<(T, F)>,
11}
12
13impl<T: ArrowMessage, F: ArrowMessage> Query<T, F> {
14    /// Create a new typed Query
15    pub fn new(
16        tx: MessageSender,
17        rx: MessageReceiver,
18        clock: Arc<HLC>,
19        source: NodeLayout,
20        layout: QueryLayout,
21    ) -> Self {
22        Self {
23            raw: RawQuery::new(tx, rx, clock, source, layout),
24            _phantom: std::marker::PhantomData,
25        }
26    }
27
28    /// Query a message from the channel and converting it from Arrow format, asynchronously
29    pub async fn query(&mut self, data: T) -> Result<(Header, F)> {
30        let (header, data) = self
31            .raw
32            .query(
33                data.try_into_arrow()
34                    .wrap_err(report_failed_conversion_to_arrow::<T>(
35                        &self.raw.source,
36                        &self.raw.layout,
37                    ))?
38                    .into_data(),
39            )
40            .await?;
41
42        Ok((
43            header,
44            F::try_from_arrow(data).wrap_err(report_failed_conversion_from_arrow::<F>(
45                &self.raw.source,
46                &self.raw.layout,
47            ))?,
48        ))
49    }
50}