flarrow_api/io/
raw_query.rs1use std::sync::Arc;
2
3use crate::prelude::*;
4use thirdparty::arrow_data::ArrayData;
5
6pub struct RawQuery {
8 pub tx: MessageSender,
10 pub rx: MessageReceiver,
12 pub clock: Arc<HLC>,
14
15 pub source: NodeLayout,
17 pub layout: QueryLayout,
19}
20
21impl RawQuery {
22 pub fn new(
24 tx: MessageSender,
25 rx: MessageReceiver,
26 clock: Arc<HLC>,
27 source: NodeLayout,
28 layout: QueryLayout,
29 ) -> Self {
30 Self {
31 tx,
32 rx,
33 clock,
34 source,
35 layout,
36 }
37 }
38
39 pub fn blocking_query(&mut self, data: ArrayData) -> Result<(Header, ArrayData)> {
41 let data = DataflowMessage {
42 header: Header {
43 timestamp: self.clock.new_timestamp(),
44 source: (self.source.uuid, self.layout.uuid),
45 },
46 data,
47 };
48
49 self.tx
50 .blocking_send(data)
51 .wrap_err(report_error_sending(&self.source, &self.layout))?;
52
53 let DataflowMessage { header, data } = self
54 .rx
55 .blocking_recv()
56 .ok_or_eyre(report_error_receiving(&self.source, &self.layout))?;
57
58 Ok((header, data))
59 }
60
61 pub async fn query(&mut self, data: ArrayData) -> Result<(Header, ArrayData)> {
63 let data = DataflowMessage {
64 header: Header {
65 timestamp: self.clock.new_timestamp(),
66 source: (self.source.uuid, self.layout.uuid),
67 },
68 data,
69 };
70
71 self.tx
72 .send(data)
73 .await
74 .wrap_err(report_error_sending(&self.source, &self.layout))?;
75
76 let DataflowMessage { header, data } = self
77 .rx
78 .recv()
79 .await
80 .ok_or_eyre(report_error_receiving(&self.source, &self.layout))?;
81
82 Ok((header, data))
83 }
84}