iridis_node/primitives/
raw_queryable.rs

1//! This module contains implementations for this primitive.
2
3use std::{collections::HashMap, sync::Arc};
4
5use crate::prelude::{thirdparty::arrow_data::ArrayData, *};
6
7/// Not typed Queryable to receive data from the dataflow
8pub struct RawQueryable {
9    /// The sender part of the MPSC channel with the Query
10    pub tx: HashMap<Uuid, MessageSender>,
11    /// The receiver part of the MPSC channel with the Query
12    pub rx: MessageReceiver,
13    /// Shared clock with the runtime
14    pub clock: Arc<HLC>,
15
16    /// The source node layout, useful for debugging
17    pub source: NodeID,
18    /// The layout of the queryable, useful for debugging
19    pub layout: QueryableID,
20}
21
22impl RawQueryable {
23    /// Create a new RawQueryable instance
24    pub fn new(
25        tx: HashMap<Uuid, MessageSender>,
26        rx: MessageReceiver,
27        clock: Arc<HLC>,
28        source: NodeID,
29        layout: QueryableID,
30    ) -> Self {
31        Self {
32            tx,
33            rx,
34            clock,
35            source,
36            layout,
37        }
38    }
39
40    /// Let the queryable handle a message asynchronously
41    pub async fn on_query(
42        &mut self,
43        response: impl AsyncFnOnce(DataflowMessage) -> Result<ArrayData>,
44    ) -> Result<()> {
45        let message = self
46            .rx
47            .recv()
48            .await
49            .ok_or_eyre(report_error_receiving(&self.source, &self.layout))?;
50
51        let tx = self
52            .tx
53            .get(&message.header.source.1)
54            .ok_or_eyre(report_io_not_found(&self.source, &self.layout))?;
55
56        let data = DataflowMessage {
57            header: Header {
58                timestamp: self.clock.new_timestamp(),
59                source: (self.source.uuid, self.layout.uuid),
60            },
61            data: response(message)
62                .await
63                .wrap_err(report_error_sending(&self.source, &self.layout))?,
64        };
65
66        tx.send(data)
67            .await
68            .wrap_err(report_error_sending(&self.source, &self.layout))
69    }
70}