iridis_api/io/
raw_query.rs1use std::sync::Arc;
2
3use crate::prelude::*;
4use thirdparty::arrow_data::ArrayData;
5
6pub struct RawQuery {
8 pub tx: MessageSender,
10 pub rx: MessageReceiver,
12 pub clock: Arc<HLC>,
14
15 pub source: NodeLayout,
17 pub layout: QueryLayout,
19}
20
21impl RawQuery {
22 pub fn new(
24 tx: MessageSender,
25 rx: MessageReceiver,
26 clock: Arc<HLC>,
27 source: NodeLayout,
28 layout: QueryLayout,
29 ) -> Self {
30 Self {
31 tx,
32 rx,
33 clock,
34 source,
35 layout,
36 }
37 }
38
39 pub async fn query(&mut self, data: ArrayData) -> Result<(Header, ArrayData)> {
41 let data = DataflowMessage {
42 header: Header {
43 timestamp: self.clock.new_timestamp(),
44 source: (self.source.uuid, self.layout.uuid),
45 },
46 data,
47 };
48
49 self.tx
50 .send(data)
51 .await
52 .wrap_err(report_error_sending(&self.source, &self.layout))?;
53
54 let DataflowMessage { header, data } = self
55 .rx
56 .recv()
57 .await
58 .ok_or_eyre(report_error_receiving(&self.source, &self.layout))?;
59
60 Ok((header, data))
61 }
62}