flarrow_api/io/
raw_queryable.rs1use std::{collections::HashMap, sync::Arc};
2
3use crate::prelude::*;
4use thirdparty::arrow_data::ArrayData;
5
6pub struct RawQueryable {
8 pub tx: HashMap<Uuid, MessageSender>,
10 pub rx: MessageReceiver,
12 pub clock: Arc<HLC>,
14
15 pub source: NodeLayout,
17 pub layout: QueryableLayout,
19}
20
21impl RawQueryable {
22 pub fn new(
24 tx: HashMap<Uuid, MessageSender>,
25 rx: MessageReceiver,
26 clock: Arc<HLC>,
27 source: NodeLayout,
28 layout: QueryableLayout,
29 ) -> Self {
30 Self {
31 tx,
32 rx,
33 clock,
34 source,
35 layout,
36 }
37 }
38
39 pub fn blocking_on_demand(
41 &mut self,
42 response: impl FnOnce(DataflowMessage) -> Result<ArrayData>,
43 ) -> Result<()> {
44 let message = self
45 .rx
46 .blocking_recv()
47 .ok_or_eyre(report_error_receiving(&self.source, &self.layout))?;
48
49 let tx = self
50 .tx
51 .get(&message.header.source.1)
52 .ok_or_eyre(report_io_not_found(&self.source, &self.layout))?;
53
54 let data = DataflowMessage {
55 header: Header {
56 timestamp: self.clock.new_timestamp(),
57 source: (self.source.uuid, self.layout.uuid),
58 },
59 data: response(message).wrap_err(report_error_sending(&self.source, &self.layout))?,
60 };
61
62 tx.blocking_send(data)
63 .wrap_err(report_error_sending(&self.source, &self.layout))
64 }
65
66 pub async fn on_demand(
68 &mut self,
69 response: impl AsyncFnOnce(DataflowMessage) -> Result<ArrayData>,
70 ) -> Result<()> {
71 let message = self
72 .rx
73 .recv()
74 .await
75 .ok_or_eyre(report_error_receiving(&self.source, &self.layout))?;
76
77 let tx = self
78 .tx
79 .get(&message.header.source.1)
80 .ok_or_eyre(report_io_not_found(&self.source, &self.layout))?;
81
82 let data = DataflowMessage {
83 header: Header {
84 timestamp: self.clock.new_timestamp(),
85 source: (self.source.uuid, self.layout.uuid),
86 },
87 data: response(message)
88 .await
89 .wrap_err(report_error_sending(&self.source, &self.layout))?,
90 };
91
92 tx.send(data)
93 .await
94 .wrap_err(report_error_sending(&self.source, &self.layout))
95 }
96}