iridis_api/io/
raw_queryable.rs

1use std::{collections::HashMap, sync::Arc};
2
3use crate::prelude::*;
4use thirdparty::arrow_data::ArrayData;
5
6/// Not typed Queryable to receive data from the dataflow
7pub struct RawQueryable {
8    /// The sender part of the MPSC channel with the Query
9    pub tx: HashMap<Uuid, MessageSender>,
10    /// The receiver part of the MPSC channel with the Query
11    pub rx: MessageReceiver,
12    /// Shared clock with the runtime
13    pub clock: Arc<HLC>,
14
15    /// The source node layout, useful for debugging
16    pub source: NodeLayout,
17    /// The layout of the queryable, useful for debugging
18    pub layout: QueryableLayout,
19}
20
21impl RawQueryable {
22    /// Create a new RawQueryable instance
23    pub fn new(
24        tx: HashMap<Uuid, MessageSender>,
25        rx: MessageReceiver,
26        clock: Arc<HLC>,
27        source: NodeLayout,
28        layout: QueryableLayout,
29    ) -> Self {
30        Self {
31            tx,
32            rx,
33            clock,
34            source,
35            layout,
36        }
37    }
38
39    /// Let the queryable handle a message asynchronously
40    pub async fn on_demand(
41        &mut self,
42        response: impl AsyncFnOnce(DataflowMessage) -> Result<ArrayData>,
43    ) -> Result<()> {
44        let message = self
45            .rx
46            .recv()
47            .await
48            .ok_or_eyre(report_error_receiving(&self.source, &self.layout))?;
49
50        let tx = self
51            .tx
52            .get(&message.header.source.1)
53            .ok_or_eyre(report_io_not_found(&self.source, &self.layout))?;
54
55        let data = DataflowMessage {
56            header: Header {
57                timestamp: self.clock.new_timestamp(),
58                source: (self.source.uuid, self.layout.uuid),
59            },
60            data: response(message)
61                .await
62                .wrap_err(report_error_sending(&self.source, &self.layout))?,
63        };
64
65        tx.send(data)
66            .await
67            .wrap_err(report_error_sending(&self.source, &self.layout))
68    }
69}