flarrow_api/io/
raw_queryable.rs

1use std::{collections::HashMap, sync::Arc};
2
3use crate::prelude::*;
4use thirdparty::arrow_data::ArrayData;
5
6/// Not typed Queryable to receive data from the dataflow
7pub struct RawQueryable {
8    /// The sender part of the MPSC channel with the Query
9    pub tx: HashMap<Uuid, MessageSender>,
10    /// The receiver part of the MPSC channel with the Query
11    pub rx: MessageReceiver,
12    /// Shared clock with the runtime
13    pub clock: Arc<HLC>,
14
15    /// The source node layout, useful for debugging
16    pub source: NodeLayout,
17    /// The layout of the queryable, useful for debugging
18    pub layout: QueryableLayout,
19}
20
21impl RawQueryable {
22    /// Create a new RawQueryable instance
23    pub fn new(
24        tx: HashMap<Uuid, MessageSender>,
25        rx: MessageReceiver,
26        clock: Arc<HLC>,
27        source: NodeLayout,
28        layout: QueryableLayout,
29    ) -> Self {
30        Self {
31            tx,
32            rx,
33            clock,
34            source,
35            layout,
36        }
37    }
38
39    /// Let the queryable handle a message
40    pub fn blocking_on_demand(
41        &mut self,
42        response: impl FnOnce(DataflowMessage) -> Result<ArrayData>,
43    ) -> Result<()> {
44        let message = self
45            .rx
46            .blocking_recv()
47            .ok_or_eyre(report_error_receiving(&self.source, &self.layout))?;
48
49        let tx = self
50            .tx
51            .get(&message.header.source.1)
52            .ok_or_eyre(report_io_not_found(&self.source, &self.layout))?;
53
54        let data = DataflowMessage {
55            header: Header {
56                timestamp: self.clock.new_timestamp(),
57                source: (self.source.uuid, self.layout.uuid),
58            },
59            data: response(message).wrap_err(report_error_sending(&self.source, &self.layout))?,
60        };
61
62        tx.blocking_send(data)
63            .wrap_err(report_error_sending(&self.source, &self.layout))
64    }
65
66    /// Let the queryable handle a message asynchronously
67    pub async fn on_demand(
68        &mut self,
69        response: impl AsyncFnOnce(DataflowMessage) -> Result<ArrayData>,
70    ) -> Result<()> {
71        let message = self
72            .rx
73            .recv()
74            .await
75            .ok_or_eyre(report_error_receiving(&self.source, &self.layout))?;
76
77        let tx = self
78            .tx
79            .get(&message.header.source.1)
80            .ok_or_eyre(report_io_not_found(&self.source, &self.layout))?;
81
82        let data = DataflowMessage {
83            header: Header {
84                timestamp: self.clock.new_timestamp(),
85                source: (self.source.uuid, self.layout.uuid),
86            },
87            data: response(message)
88                .await
89                .wrap_err(report_error_sending(&self.source, &self.layout))?,
90        };
91
92        tx.send(data)
93            .await
94            .wrap_err(report_error_sending(&self.source, &self.layout))
95    }
96}