iridis_api/io/
raw_queryable.rs1use std::{collections::HashMap, sync::Arc};
2
3use crate::prelude::*;
4use thirdparty::arrow_data::ArrayData;
5
6pub struct RawQueryable {
8 pub tx: HashMap<Uuid, MessageSender>,
10 pub rx: MessageReceiver,
12 pub clock: Arc<HLC>,
14
15 pub source: NodeLayout,
17 pub layout: QueryableLayout,
19}
20
21impl RawQueryable {
22 pub fn new(
24 tx: HashMap<Uuid, MessageSender>,
25 rx: MessageReceiver,
26 clock: Arc<HLC>,
27 source: NodeLayout,
28 layout: QueryableLayout,
29 ) -> Self {
30 Self {
31 tx,
32 rx,
33 clock,
34 source,
35 layout,
36 }
37 }
38
39 pub async fn on_query(
41 &mut self,
42 response: impl AsyncFnOnce(DataflowMessage) -> Result<ArrayData>,
43 ) -> Result<()> {
44 let message = self
47 .rx
48 .recv()
49 .await
50 .ok_or_eyre(report_error_receiving(&self.source, &self.layout))?;
51
52 let tx = self
53 .tx
54 .get(&message.header.source.1)
55 .ok_or_eyre(report_io_not_found(&self.source, &self.layout))?;
56
57 let data = DataflowMessage {
58 header: Header {
59 timestamp: self.clock.new_timestamp(),
60 source: (self.source.uuid, self.layout.uuid),
61 },
62 data: response(message)
63 .await
64 .wrap_err(report_error_sending(&self.source, &self.layout))?,
65 };
66
67 tx.send(data)
68 .await
69 .wrap_err(report_error_sending(&self.source, &self.layout))
70 }
71}