1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
//! This module contains implementations for this primitive.
use std::{collections::HashMap, sync::Arc};
use crate::prelude::{thirdparty::arrow_data::ArrayData, *};
/// Not typed Queryable to receive data from the dataflow
pub struct RawQueryable {
/// The sender part of the MPSC channel with the Query
pub tx: HashMap<Uuid, MessageSender>,
/// The receiver part of the MPSC channel with the Query
pub rx: MessageReceiver,
/// Shared clock with the runtime
pub clock: Arc<HLC>,
/// The source node layout, useful for debugging
pub source: NodeID,
/// The layout of the queryable, useful for debugging
pub layout: QueryableID,
}
impl RawQueryable {
/// Create a new RawQueryable instance
pub fn new(
tx: HashMap<Uuid, MessageSender>,
rx: MessageReceiver,
clock: Arc<HLC>,
source: NodeID,
layout: QueryableID,
) -> Self {
Self {
tx,
rx,
clock,
source,
layout,
}
}
/// Let the queryable handle a message asynchronously
pub async fn on_query(
&mut self,
response: impl AsyncFnOnce(DataflowMessage) -> Result<ArrayData>,
) -> Result<()> {
let message = self
.rx
.recv()
.await
.ok_or_eyre(report_error_receiving(&self.source, &self.layout))?;
let tx = self
.tx
.get(&message.header.source.1)
.ok_or_eyre(report_io_not_found(&self.source, &self.layout))?;
let data = DataflowMessage {
header: Header {
timestamp: self.clock.new_timestamp(),
source: (self.source.uuid, self.layout.uuid),
},
data: response(message)
.await
.wrap_err(report_error_sending(&self.source, &self.layout))?,
};
tx.send(data)
.await
.wrap_err(report_error_sending(&self.source, &self.layout))
}
}