iridis_api/io/
raw_queryable.rs1use std::{collections::HashMap, sync::Arc};
2
3use crate::prelude::*;
4use thirdparty::arrow_data::ArrayData;
5
6pub struct RawQueryable {
8    pub tx: HashMap<Uuid, MessageSender>,
10    pub rx: MessageReceiver,
12    pub clock: Arc<HLC>,
14
15    pub source: NodeLayout,
17    pub layout: QueryableLayout,
19}
20
21impl RawQueryable {
22    pub fn new(
24        tx: HashMap<Uuid, MessageSender>,
25        rx: MessageReceiver,
26        clock: Arc<HLC>,
27        source: NodeLayout,
28        layout: QueryableLayout,
29    ) -> Self {
30        Self {
31            tx,
32            rx,
33            clock,
34            source,
35            layout,
36        }
37    }
38
39    pub async fn on_query(
41        &mut self,
42        response: impl AsyncFnOnce(DataflowMessage) -> Result<ArrayData>,
43    ) -> Result<()> {
44        let message = self
47            .rx
48            .recv()
49            .await
50            .ok_or_eyre(report_error_receiving(&self.source, &self.layout))?;
51
52        let tx = self
53            .tx
54            .get(&message.header.source.1)
55            .ok_or_eyre(report_io_not_found(&self.source, &self.layout))?;
56
57        let data = DataflowMessage {
58            header: Header {
59                timestamp: self.clock.new_timestamp(),
60                source: (self.source.uuid, self.layout.uuid),
61            },
62            data: response(message)
63                .await
64                .wrap_err(report_error_sending(&self.source, &self.layout))?,
65        };
66
67        tx.send(data)
68            .await
69            .wrap_err(report_error_sending(&self.source, &self.layout))
70    }
71}