dora_message/
node_to_daemon.rs

1pub use crate::common::{
2    DataMessage, DropToken, LogLevel, LogMessage, SharedMemoryId, Timestamped,
3};
4use crate::{
5    DataflowId, current_crate_version,
6    id::{DataId, NodeId},
7    metadata::Metadata,
8    versions_compatible,
9};
10
11#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
12pub enum DaemonRequest {
13    Register(NodeRegisterRequest),
14    Subscribe,
15    SendMessage {
16        output_id: DataId,
17        metadata: Metadata,
18        data: Option<DataMessage>,
19    },
20    CloseOutputs(Vec<DataId>),
21    /// Signals that the node is finished sending outputs and that it received all
22    /// required drop tokens.
23    OutputsDone,
24    NextEvent {
25        drop_tokens: Vec<DropToken>,
26    },
27    ReportDropTokens {
28        drop_tokens: Vec<DropToken>,
29    },
30    SubscribeDrop,
31    NextFinishedDropTokens,
32    EventStreamDropped,
33    NodeConfig {
34        node_id: NodeId,
35    },
36}
37
38impl DaemonRequest {
39    pub fn expects_tcp_bincode_reply(&self) -> bool {
40        #[allow(clippy::match_like_matches_macro)]
41        match self {
42            DaemonRequest::SendMessage { .. }
43            | DaemonRequest::NodeConfig { .. }
44            | DaemonRequest::ReportDropTokens { .. } => false,
45            DaemonRequest::Register(NodeRegisterRequest { .. })
46            | DaemonRequest::Subscribe
47            | DaemonRequest::CloseOutputs(_)
48            | DaemonRequest::OutputsDone
49            | DaemonRequest::NextEvent { .. }
50            | DaemonRequest::SubscribeDrop
51            | DaemonRequest::NextFinishedDropTokens
52            | DaemonRequest::EventStreamDropped => true,
53        }
54    }
55
56    pub fn expects_tcp_json_reply(&self) -> bool {
57        #[allow(clippy::match_like_matches_macro)]
58        match self {
59            DaemonRequest::NodeConfig { .. } => true,
60            DaemonRequest::Register(NodeRegisterRequest { .. })
61            | DaemonRequest::Subscribe
62            | DaemonRequest::CloseOutputs(_)
63            | DaemonRequest::OutputsDone
64            | DaemonRequest::NextEvent { .. }
65            | DaemonRequest::SubscribeDrop
66            | DaemonRequest::NextFinishedDropTokens
67            | DaemonRequest::ReportDropTokens { .. }
68            | DaemonRequest::SendMessage { .. }
69            | DaemonRequest::EventStreamDropped => false,
70        }
71    }
72}
73
74#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
75pub struct NodeRegisterRequest {
76    pub dataflow_id: DataflowId,
77    pub node_id: NodeId,
78    dora_version: semver::Version,
79}
80
81impl NodeRegisterRequest {
82    pub fn new(dataflow_id: DataflowId, node_id: NodeId) -> Self {
83        Self {
84            dataflow_id,
85            node_id,
86            dora_version: semver::Version::parse(env!("CARGO_PKG_VERSION")).unwrap(),
87        }
88    }
89
90    pub fn check_version(&self) -> Result<(), String> {
91        let crate_version = current_crate_version();
92        let specified_version = &self.dora_version;
93
94        if versions_compatible(&crate_version, specified_version)? {
95            Ok(())
96        } else {
97            Err(format!(
98                "version mismatch: message format v{} is not compatible \
99                with expected message format v{crate_version}",
100                self.dora_version
101            ))
102        }
103    }
104}
105
106#[derive(Debug, serde::Serialize, serde::Deserialize)]
107pub struct DropEvent {
108    pub tokens: Vec<DropToken>,
109}
110
111#[derive(Debug, serde::Serialize, serde::Deserialize)]
112pub enum InputData {
113    SharedMemory(SharedMemoryInput),
114    Vec(Vec<u8>),
115}
116
117impl InputData {
118    pub fn drop_token(&self) -> Option<DropToken> {
119        match self {
120            InputData::SharedMemory(data) => Some(data.drop_token),
121            InputData::Vec(_) => None,
122        }
123    }
124}
125
126#[derive(Debug, serde::Serialize, serde::Deserialize)]
127pub struct SharedMemoryInput {
128    pub shared_memory_id: SharedMemoryId,
129    pub len: usize,
130    pub drop_token: DropToken,
131}
132
133#[derive(Debug, serde::Deserialize, serde::Serialize)]
134pub enum DynamicNodeEvent {
135    NodeConfig { node_id: NodeId },
136}