flarrow_api/io/
queryable.rs1use std::{collections::HashMap, sync::Arc};
2
3use crate::prelude::*;
4use thirdparty::arrow_array::Array;
5
6pub struct Queryable<T: ArrowMessage, F: ArrowMessage> {
8 pub raw: RawQueryable,
9
10 _phantom: std::marker::PhantomData<(T, F)>,
11}
12
13impl<T: ArrowMessage, F: ArrowMessage> Queryable<T, F> {
14 pub fn new(
16 tx: HashMap<Uuid, MessageSender>,
17 rx: MessageReceiver,
18 clock: Arc<HLC>,
19 source: NodeLayout,
20 layout: QueryableLayout,
21 ) -> Self {
22 Self {
23 raw: RawQueryable::new(tx, rx, clock, source, layout),
24 _phantom: std::marker::PhantomData,
25 }
26 }
27
28 pub fn blocking_on_demand(&mut self, response: impl FnOnce(T) -> Result<F>) -> Result<()> {
31 let source = self.raw.source.clone();
32 let layout = self.raw.layout.clone();
33
34 self.raw.blocking_on_demand(move |message| {
35 let result = response(
36 T::try_from_arrow(message.data)
37 .wrap_err(report_failed_conversion_from_arrow::<T>(&source, &layout))?,
38 )?;
39
40 Ok(result
41 .try_into_arrow()
42 .wrap_err(report_failed_conversion_to_arrow::<F>(&source, &layout))?
43 .into_data())
44 })
45 }
46
47 pub async fn on_demand(&mut self, response: impl AsyncFnOnce(T) -> Result<F>) -> Result<()> {
49 let source = self.raw.source.clone();
50 let layout = self.raw.layout.clone();
51
52 self.raw
53 .on_demand(async move |message| {
54 let result = response(
55 T::try_from_arrow(message.data)
56 .wrap_err(report_failed_conversion_from_arrow::<T>(&source, &layout))?,
57 )
58 .await?;
59
60 Ok(result
61 .try_into_arrow()
62 .wrap_err(report_failed_conversion_to_arrow::<F>(&source, &layout))?
63 .into_data())
64 })
65 .await
66 }
67}