iridis_api/io/
raw_queryable.rs1use std::{collections::HashMap, sync::Arc};
2
3use crate::prelude::*;
4use thirdparty::arrow_data::ArrayData;
5
6pub struct RawQueryable {
8 pub tx: HashMap<Uuid, MessageSender>,
10 pub rx: MessageReceiver,
12 pub clock: Arc<HLC>,
14
15 pub source: NodeLayout,
17 pub layout: QueryableLayout,
19}
20
21impl RawQueryable {
22 pub fn new(
24 tx: HashMap<Uuid, MessageSender>,
25 rx: MessageReceiver,
26 clock: Arc<HLC>,
27 source: NodeLayout,
28 layout: QueryableLayout,
29 ) -> Self {
30 Self {
31 tx,
32 rx,
33 clock,
34 source,
35 layout,
36 }
37 }
38
39 pub async fn on_demand(
41 &mut self,
42 response: impl AsyncFnOnce(DataflowMessage) -> Result<ArrayData>,
43 ) -> Result<()> {
44 let message = self
45 .rx
46 .recv()
47 .await
48 .ok_or_eyre(report_error_receiving(&self.source, &self.layout))?;
49
50 let tx = self
51 .tx
52 .get(&message.header.source.1)
53 .ok_or_eyre(report_io_not_found(&self.source, &self.layout))?;
54
55 let data = DataflowMessage {
56 header: Header {
57 timestamp: self.clock.new_timestamp(),
58 source: (self.source.uuid, self.layout.uuid),
59 },
60 data: response(message)
61 .await
62 .wrap_err(report_error_sending(&self.source, &self.layout))?,
63 };
64
65 tx.send(data)
66 .await
67 .wrap_err(report_error_sending(&self.source, &self.layout))
68 }
69}