iridis_node/primitives/
raw_query.rs

1//! This module contains implementations for this primitive.
2
3use std::sync::Arc;
4
5use crate::prelude::{thirdparty::arrow_data::ArrayData, *};
6
7/// Not typed Query to receive data from the dataflow
8pub struct RawQuery {
9    /// The sender part of the MPSC channel with the Queryable
10    pub tx: MessageSender,
11    /// The receiver part of the MPSC channel with the Queryable
12    pub rx: MessageReceiver,
13    /// Shared clock with the runtime
14    pub clock: Arc<HLC>,
15
16    /// The source node layout, useful for debugging
17    pub source: NodeID,
18    /// The layout of the query, useful for debugging
19    pub layout: QueryID,
20}
21
22impl RawQuery {
23    /// Create a new RawQuery instance
24    pub fn new(
25        tx: MessageSender,
26        rx: MessageReceiver,
27        clock: Arc<HLC>,
28        source: NodeID,
29        layout: QueryID,
30    ) -> Self {
31        Self {
32            tx,
33            rx,
34            clock,
35            source,
36            layout,
37        }
38    }
39
40    /// Query a message to a queryable
41    pub async fn query(&mut self, data: ArrayData) -> Result<DataflowMessage> {
42        let data = DataflowMessage {
43            header: Header {
44                timestamp: self.clock.new_timestamp(),
45                source: (self.source.uuid, self.layout.uuid),
46            },
47            data,
48        };
49
50        self.tx
51            .send(data)
52            .await
53            .wrap_err(report_error_sending(&self.source, &self.layout))?;
54
55        let message = self
56            .rx
57            .recv()
58            .await
59            .ok_or_eyre(report_error_receiving(&self.source, &self.layout))?;
60
61        Ok(message)
62    }
63}