flarrow_api/io/
raw_query.rs

1use std::sync::Arc;
2
3use crate::prelude::*;
4use thirdparty::arrow_data::ArrayData;
5
6/// Not typed Query to receive data from the dataflow
7pub struct RawQuery {
8    /// The sender part of the MPSC channel with the Queryable
9    pub tx: MessageSender,
10    /// The receiver part of the MPSC channel with the Queryable
11    pub rx: MessageReceiver,
12    /// Shared clock with the runtime
13    pub clock: Arc<HLC>,
14
15    /// The source node layout, useful for debugging
16    pub source: NodeLayout,
17    /// The layout of the query, useful for debugging
18    pub layout: QueryLayout,
19}
20
21impl RawQuery {
22    /// Create a new RawQuery instance
23    pub fn new(
24        tx: MessageSender,
25        rx: MessageReceiver,
26        clock: Arc<HLC>,
27        source: NodeLayout,
28        layout: QueryLayout,
29    ) -> Self {
30        Self {
31            tx,
32            rx,
33            clock,
34            source,
35            layout,
36        }
37    }
38
39    /// Query a message to a queryable
40    pub fn blocking_query(&mut self, data: ArrayData) -> Result<(Header, ArrayData)> {
41        let data = DataflowMessage {
42            header: Header {
43                timestamp: self.clock.new_timestamp(),
44                source: (self.source.uuid, self.layout.uuid),
45            },
46            data,
47        };
48
49        self.tx
50            .blocking_send(data)
51            .wrap_err(report_error_sending(&self.source, &self.layout))?;
52
53        let DataflowMessage { header, data } = self
54            .rx
55            .blocking_recv()
56            .ok_or_eyre(report_error_receiving(&self.source, &self.layout))?;
57
58        Ok((header, data))
59    }
60
61    /// Query a message to a queryable
62    pub async fn query(&mut self, data: ArrayData) -> Result<(Header, ArrayData)> {
63        let data = DataflowMessage {
64            header: Header {
65                timestamp: self.clock.new_timestamp(),
66                source: (self.source.uuid, self.layout.uuid),
67            },
68            data,
69        };
70
71        self.tx
72            .send(data)
73            .await
74            .wrap_err(report_error_sending(&self.source, &self.layout))?;
75
76        let DataflowMessage { header, data } = self
77            .rx
78            .recv()
79            .await
80            .ok_or_eyre(report_error_receiving(&self.source, &self.layout))?;
81
82        Ok((header, data))
83    }
84}