iridis_node/primitives/
raw_queryable.rs1use std::{collections::HashMap, sync::Arc};
4
5use crate::prelude::{thirdparty::arrow_data::ArrayData, *};
6
7pub struct RawQueryable {
9 pub tx: HashMap<Uuid, MessageSender>,
11 pub rx: MessageReceiver,
13 pub clock: Arc<HLC>,
15
16 pub source: NodeID,
18 pub layout: QueryableID,
20}
21
22impl RawQueryable {
23 pub fn new(
25 tx: HashMap<Uuid, MessageSender>,
26 rx: MessageReceiver,
27 clock: Arc<HLC>,
28 source: NodeID,
29 layout: QueryableID,
30 ) -> Self {
31 Self {
32 tx,
33 rx,
34 clock,
35 source,
36 layout,
37 }
38 }
39
40 pub async fn on_query(
42 &mut self,
43 response: impl AsyncFnOnce(DataflowMessage) -> Result<ArrayData>,
44 ) -> Result<()> {
45 let message = self
46 .rx
47 .recv()
48 .await
49 .ok_or_eyre(report_error_receiving(&self.source, &self.layout))?;
50
51 let tx = self
52 .tx
53 .get(&message.header.source.1)
54 .ok_or_eyre(report_io_not_found(&self.source, &self.layout))?;
55
56 let data = DataflowMessage {
57 header: Header {
58 timestamp: self.clock.new_timestamp(),
59 source: (self.source.uuid, self.layout.uuid),
60 },
61 data: response(message)
62 .await
63 .wrap_err(report_error_sending(&self.source, &self.layout))?,
64 };
65
66 tx.send(data)
67 .await
68 .wrap_err(report_error_sending(&self.source, &self.layout))
69 }
70}