iridis_node/primitives/
query.rs

1//! This module contains implementations for this primitive.
2
3use std::sync::Arc;
4
5use crate::prelude::{thirdparty::arrow_array::Array, *};
6
7/// Typed Query to query data from the dataflow
8pub struct Query<T: ArrowMessage, F: ArrowMessage> {
9    pub raw: RawQuery,
10
11    _phantom: std::marker::PhantomData<(T, F)>,
12}
13
14impl<T: ArrowMessage, F: ArrowMessage> Query<T, F> {
15    /// Create a new typed Query
16    pub fn new(
17        tx: MessageSender,
18        rx: MessageReceiver,
19        clock: Arc<HLC>,
20        source: NodeID,
21        layout: QueryID,
22    ) -> Self {
23        Self {
24            raw: RawQuery::new(tx, rx, clock, source, layout),
25            _phantom: std::marker::PhantomData,
26        }
27    }
28
29    /// Query a message from the channel and converting it from Arrow format, asynchronously
30    pub async fn query(&mut self, data: T) -> Result<TypedDataflowMessage<F>> {
31        self.raw
32            .query(
33                data.try_into_arrow()
34                    .wrap_err(report_failed_conversion_to_arrow::<T>(
35                        &self.raw.source,
36                        &self.raw.layout,
37                    ))?
38                    .into_data(),
39            )
40            .await?
41            .try_into()
42            .wrap_err(report_failed_conversion_from_arrow::<F>(
43                &self.raw.source,
44                &self.raw.layout,
45            ))
46    }
47}