dora_message/
daemon_to_node.rs

1use std::{net::SocketAddr, path::PathBuf};
2
3use crate::{
4    DataflowId,
5    config::NodeRunConfig,
6    descriptor::OperatorDefinition,
7    id::{DataId, NodeId, OperatorId},
8    metadata::Metadata,
9};
10
11pub use crate::common::{DataMessage, DropToken, SharedMemoryId, Timestamped};
12
13// Passed via env variable
14#[derive(Debug, serde::Serialize, serde::Deserialize)]
15pub struct RuntimeConfig {
16    pub node: NodeConfig,
17    pub operators: Vec<OperatorDefinition>,
18}
19
20#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
21pub struct NodeConfig {
22    pub dataflow_id: DataflowId,
23    pub node_id: NodeId,
24    pub run_config: NodeRunConfig,
25    pub daemon_communication: DaemonCommunication,
26    pub dataflow_descriptor: serde_yaml::Value,
27    pub dynamic: bool,
28}
29
30#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
31pub enum DaemonCommunication {
32    Shmem {
33        daemon_control_region_id: SharedMemoryId,
34        daemon_drop_region_id: SharedMemoryId,
35        daemon_events_region_id: SharedMemoryId,
36        daemon_events_close_region_id: SharedMemoryId,
37    },
38    Tcp {
39        socket_addr: SocketAddr,
40    },
41    #[cfg(unix)]
42    UnixDomain {
43        socket_file: PathBuf,
44    },
45}
46
47#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
48#[must_use]
49#[allow(clippy::large_enum_variant)]
50pub enum DaemonReply {
51    Result(Result<(), String>),
52    PreparedMessage { shared_memory_id: SharedMemoryId },
53    NextEvents(Vec<Timestamped<NodeEvent>>),
54    NextDropEvents(Vec<Timestamped<NodeDropEvent>>),
55    NodeConfig { result: Result<NodeConfig, String> },
56    Empty,
57}
58
59#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
60#[allow(clippy::large_enum_variant)]
61pub enum NodeEvent {
62    Stop,
63    Reload {
64        operator_id: Option<OperatorId>,
65    },
66    Input {
67        id: DataId,
68        metadata: Metadata,
69        data: Option<DataMessage>,
70    },
71    InputClosed {
72        id: DataId,
73    },
74    AllInputsClosed,
75}
76
77#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
78pub enum NodeDropEvent {
79    OutputDropped { drop_token: DropToken },
80}