iridis_api/io/
queryable.rs

1use std::{collections::HashMap, sync::Arc};
2
3use crate::prelude::*;
4use thirdparty::arrow_array::Array;
5
6/// Typed Queryable to queryable data to the dataflow
7pub struct Queryable<T: ArrowMessage, F: ArrowMessage> {
8    pub raw: RawQueryable,
9
10    _phantom: std::marker::PhantomData<(T, F)>,
11}
12
13impl<T: ArrowMessage, F: ArrowMessage> Queryable<T, F> {
14    /// Create a new typed Queryable
15    pub fn new(
16        tx: HashMap<Uuid, MessageSender>,
17        rx: MessageReceiver,
18        clock: Arc<HLC>,
19        source: NodeLayout,
20        layout: QueryableLayout,
21    ) -> Self {
22        Self {
23            raw: RawQueryable::new(tx, rx, clock, source, layout),
24            _phantom: std::marker::PhantomData,
25        }
26    }
27
28    /// Let the queryable handle a message, converting it from Arrow format, asynchrously
29    pub async fn on_demand(&mut self, response: impl AsyncFnOnce(T) -> Result<F>) -> Result<()> {
30        let source = self.raw.source.clone();
31        let layout = self.raw.layout.clone();
32
33        self.raw
34            .on_demand(async move |message| {
35                let result = response(
36                    T::try_from_arrow(message.data)
37                        .wrap_err(report_failed_conversion_from_arrow::<T>(&source, &layout))?,
38                )
39                .await?;
40
41                Ok(result
42                    .try_into_arrow()
43                    .wrap_err(report_failed_conversion_to_arrow::<F>(&source, &layout))?
44                    .into_data())
45            })
46            .await
47    }
48}