iridis_node/primitives/
raw_query.rs1use std::sync::Arc;
4
5use crate::prelude::{thirdparty::arrow_data::ArrayData, *};
6
7pub struct RawQuery {
9 pub tx: MessageSender,
11 pub rx: MessageReceiver,
13 pub clock: Arc<HLC>,
15
16 pub source: NodeID,
18 pub layout: QueryID,
20}
21
22impl RawQuery {
23 pub fn new(
25 tx: MessageSender,
26 rx: MessageReceiver,
27 clock: Arc<HLC>,
28 source: NodeID,
29 layout: QueryID,
30 ) -> Self {
31 Self {
32 tx,
33 rx,
34 clock,
35 source,
36 layout,
37 }
38 }
39
40 pub async fn query(&mut self, data: ArrayData) -> Result<DataflowMessage> {
42 let data = DataflowMessage {
43 header: Header {
44 timestamp: self.clock.new_timestamp(),
45 source: (self.source.uuid, self.layout.uuid),
46 },
47 data,
48 };
49
50 self.tx
51 .send(data)
52 .await
53 .wrap_err(report_error_sending(&self.source, &self.layout))?;
54
55 let message = self
56 .rx
57 .recv()
58 .await
59 .ok_or_eyre(report_error_receiving(&self.source, &self.layout))?;
60
61 Ok(message)
62 }
63}