flarrow_api/io/
queryable.rs

1use std::{collections::HashMap, sync::Arc};
2
3use crate::prelude::*;
4use thirdparty::arrow_array::Array;
5
6/// Typed Queryable to queryable data to the dataflow
7pub struct Queryable<T: ArrowMessage, F: ArrowMessage> {
8    pub raw: RawQueryable,
9
10    _phantom: std::marker::PhantomData<(T, F)>,
11}
12
13impl<T: ArrowMessage, F: ArrowMessage> Queryable<T, F> {
14    /// Create a new typed Queryable
15    pub fn new(
16        tx: HashMap<Uuid, MessageSender>,
17        rx: MessageReceiver,
18        clock: Arc<HLC>,
19        source: NodeLayout,
20        layout: QueryableLayout,
21    ) -> Self {
22        Self {
23            raw: RawQueryable::new(tx, rx, clock, source, layout),
24            _phantom: std::marker::PhantomData,
25        }
26    }
27
28    /// Let the queryable handle a message, converting it from Arrow format, blocking until one is available, don't use it
29    /// in async context
30    pub fn blocking_on_demand(&mut self, response: impl FnOnce(T) -> Result<F>) -> Result<()> {
31        let source = self.raw.source.clone();
32        let layout = self.raw.layout.clone();
33
34        self.raw.blocking_on_demand(move |message| {
35            let result = response(
36                T::try_from_arrow(message.data)
37                    .wrap_err(report_failed_conversion_from_arrow::<T>(&source, &layout))?,
38            )?;
39
40            Ok(result
41                .try_into_arrow()
42                .wrap_err(report_failed_conversion_to_arrow::<F>(&source, &layout))?
43                .into_data())
44        })
45    }
46
47    /// Let the queryable handle a message, converting it from Arrow format, asynchrously
48    pub async fn on_demand(&mut self, response: impl AsyncFnOnce(T) -> Result<F>) -> Result<()> {
49        let source = self.raw.source.clone();
50        let layout = self.raw.layout.clone();
51
52        self.raw
53            .on_demand(async move |message| {
54                let result = response(
55                    T::try_from_arrow(message.data)
56                        .wrap_err(report_failed_conversion_from_arrow::<T>(&source, &layout))?,
57                )
58                .await?;
59
60                Ok(result
61                    .try_into_arrow()
62                    .wrap_err(report_failed_conversion_to_arrow::<F>(&source, &layout))?
63                    .into_data())
64            })
65            .await
66    }
67}