dora_message/
node_to_daemon.rs1pub use crate::common::{
2 DataMessage, DropToken, LogLevel, LogMessage, SharedMemoryId, Timestamped,
3};
4use crate::{
5 DataflowId, current_crate_version,
6 id::{DataId, NodeId},
7 metadata::Metadata,
8 versions_compatible,
9};
10
11#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
12pub enum DaemonRequest {
13 Register(NodeRegisterRequest),
14 Subscribe,
15 SendMessage {
16 output_id: DataId,
17 metadata: Metadata,
18 data: Option<DataMessage>,
19 },
20 CloseOutputs(Vec<DataId>),
21 OutputsDone,
24 NextEvent {
25 drop_tokens: Vec<DropToken>,
26 },
27 ReportDropTokens {
28 drop_tokens: Vec<DropToken>,
29 },
30 SubscribeDrop,
31 NextFinishedDropTokens,
32 EventStreamDropped,
33 NodeConfig {
34 node_id: NodeId,
35 },
36}
37
38impl DaemonRequest {
39 pub fn expects_tcp_bincode_reply(&self) -> bool {
40 #[allow(clippy::match_like_matches_macro)]
41 match self {
42 DaemonRequest::SendMessage { .. }
43 | DaemonRequest::NodeConfig { .. }
44 | DaemonRequest::ReportDropTokens { .. } => false,
45 DaemonRequest::Register(NodeRegisterRequest { .. })
46 | DaemonRequest::Subscribe
47 | DaemonRequest::CloseOutputs(_)
48 | DaemonRequest::OutputsDone
49 | DaemonRequest::NextEvent { .. }
50 | DaemonRequest::SubscribeDrop
51 | DaemonRequest::NextFinishedDropTokens
52 | DaemonRequest::EventStreamDropped => true,
53 }
54 }
55
56 pub fn expects_tcp_json_reply(&self) -> bool {
57 #[allow(clippy::match_like_matches_macro)]
58 match self {
59 DaemonRequest::NodeConfig { .. } => true,
60 DaemonRequest::Register(NodeRegisterRequest { .. })
61 | DaemonRequest::Subscribe
62 | DaemonRequest::CloseOutputs(_)
63 | DaemonRequest::OutputsDone
64 | DaemonRequest::NextEvent { .. }
65 | DaemonRequest::SubscribeDrop
66 | DaemonRequest::NextFinishedDropTokens
67 | DaemonRequest::ReportDropTokens { .. }
68 | DaemonRequest::SendMessage { .. }
69 | DaemonRequest::EventStreamDropped => false,
70 }
71 }
72}
73
74#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
75pub struct NodeRegisterRequest {
76 pub dataflow_id: DataflowId,
77 pub node_id: NodeId,
78 dora_version: semver::Version,
79}
80
81impl NodeRegisterRequest {
82 pub fn new(dataflow_id: DataflowId, node_id: NodeId) -> Self {
83 Self {
84 dataflow_id,
85 node_id,
86 dora_version: semver::Version::parse(env!("CARGO_PKG_VERSION")).unwrap(),
87 }
88 }
89
90 pub fn check_version(&self) -> Result<(), String> {
91 let crate_version = current_crate_version();
92 let specified_version = &self.dora_version;
93
94 if versions_compatible(&crate_version, specified_version)? {
95 Ok(())
96 } else {
97 Err(format!(
98 "version mismatch: message format v{} is not compatible \
99 with expected message format v{crate_version}",
100 self.dora_version
101 ))
102 }
103 }
104}
105
106#[derive(Debug, serde::Serialize, serde::Deserialize)]
107pub struct DropEvent {
108 pub tokens: Vec<DropToken>,
109}
110
111#[derive(Debug, serde::Serialize, serde::Deserialize)]
112pub enum InputData {
113 SharedMemory(SharedMemoryInput),
114 Vec(Vec<u8>),
115}
116
117impl InputData {
118 pub fn drop_token(&self) -> Option<DropToken> {
119 match self {
120 InputData::SharedMemory(data) => Some(data.drop_token),
121 InputData::Vec(_) => None,
122 }
123 }
124}
125
126#[derive(Debug, serde::Serialize, serde::Deserialize)]
127pub struct SharedMemoryInput {
128 pub shared_memory_id: SharedMemoryId,
129 pub len: usize,
130 pub drop_token: DropToken,
131}
132
133#[derive(Debug, serde::Deserialize, serde::Serialize)]
134pub enum DynamicNodeEvent {
135 NodeConfig { node_id: NodeId },
136}