iridis_node/primitives/
queryable.rs1use std::{collections::HashMap, sync::Arc};
4
5use crate::prelude::{thirdparty::arrow_array::Array, *};
6
7pub struct Queryable<T: ArrowMessage, F: ArrowMessage> {
9 pub raw: RawQueryable,
10
11 _phantom: std::marker::PhantomData<(T, F)>,
12}
13
14impl<T: ArrowMessage, F: ArrowMessage> Queryable<T, F> {
15 pub fn new(
17 tx: HashMap<Uuid, MessageSender>,
18 rx: MessageReceiver,
19 clock: Arc<HLC>,
20 source: NodeID,
21 layout: QueryableID,
22 ) -> Self {
23 Self {
24 raw: RawQueryable::new(tx, rx, clock, source, layout),
25 _phantom: std::marker::PhantomData,
26 }
27 }
28
29 pub async fn on_query(
31 &mut self,
32 response: impl AsyncFnOnce(TypedDataflowMessage<T>) -> Result<F>,
33 ) -> Result<()> {
34 let source = self.raw.source.clone();
35 let layout = self.raw.layout.clone();
36
37 self.raw
38 .on_query(async move |message| {
39 let result = response(
40 message
41 .try_into()
42 .wrap_err(report_failed_conversion_from_arrow::<T>(&source, &layout))?,
43 )
44 .await?;
45
46 Ok(result
47 .try_into_arrow()
48 .wrap_err(report_failed_conversion_to_arrow::<F>(&source, &layout))?
49 .into_data())
50 })
51 .await
52 }
53}