iridis_node/primitives/
queryable.rs

1//! This module contains implementations for this primitive.
2
3use std::{collections::HashMap, sync::Arc};
4
5use crate::prelude::{thirdparty::arrow_array::Array, *};
6
7/// Typed Queryable to queryable data to the dataflow
8pub struct Queryable<T: ArrowMessage, F: ArrowMessage> {
9    pub raw: RawQueryable,
10
11    _phantom: std::marker::PhantomData<(T, F)>,
12}
13
14impl<T: ArrowMessage, F: ArrowMessage> Queryable<T, F> {
15    /// Create a new typed Queryable
16    pub fn new(
17        tx: HashMap<Uuid, MessageSender>,
18        rx: MessageReceiver,
19        clock: Arc<HLC>,
20        source: NodeID,
21        layout: QueryableID,
22    ) -> Self {
23        Self {
24            raw: RawQueryable::new(tx, rx, clock, source, layout),
25            _phantom: std::marker::PhantomData,
26        }
27    }
28
29    /// Let the queryable handle a message, converting it from Arrow format, asynchrously
30    pub async fn on_query(
31        &mut self,
32        response: impl AsyncFnOnce(TypedDataflowMessage<T>) -> Result<F>,
33    ) -> Result<()> {
34        let source = self.raw.source.clone();
35        let layout = self.raw.layout.clone();
36
37        self.raw
38            .on_query(async move |message| {
39                let result = response(
40                    message
41                        .try_into()
42                        .wrap_err(report_failed_conversion_from_arrow::<T>(&source, &layout))?,
43                )
44                .await?;
45
46                Ok(result
47                    .try_into_arrow()
48                    .wrap_err(report_failed_conversion_to_arrow::<F>(&source, &layout))?
49                    .into_data())
50            })
51            .await
52    }
53}