iridis_node/primitives/
raw_query.rs1use std::sync::Arc;
4
5use crate::prelude::{thirdparty::arrow_data::ArrayData, *};
6
7pub struct RawQuery {
9    pub tx: MessageSender,
11    pub rx: MessageReceiver,
13    pub clock: Arc<HLC>,
15
16    pub source: NodeID,
18    pub layout: QueryID,
20}
21
22impl RawQuery {
23    pub fn new(
25        tx: MessageSender,
26        rx: MessageReceiver,
27        clock: Arc<HLC>,
28        source: NodeID,
29        layout: QueryID,
30    ) -> Self {
31        Self {
32            tx,
33            rx,
34            clock,
35            source,
36            layout,
37        }
38    }
39
40    pub async fn query(&mut self, data: ArrayData) -> Result<DataflowMessage> {
42        let data = DataflowMessage {
43            header: Header {
44                timestamp: self.clock.new_timestamp(),
45                source: (self.source.uuid, self.layout.uuid),
46            },
47            data,
48        };
49
50        self.tx
51            .send(data)
52            .await
53            .wrap_err(report_error_sending(&self.source, &self.layout))?;
54
55        let message = self
56            .rx
57            .recv()
58            .await
59            .ok_or_eyre(report_error_receiving(&self.source, &self.layout))?;
60
61        Ok(message)
62    }
63}