dora_message/
daemon_to_node.rs

1use std::{net::SocketAddr, path::PathBuf};
2
3use crate::{
4    config::NodeRunConfig,
5    descriptor::OperatorDefinition,
6    id::{DataId, NodeId, OperatorId},
7    metadata::Metadata,
8    DataflowId,
9};
10
11pub use crate::common::{DataMessage, DropToken, SharedMemoryId, Timestamped};
12
13// Passed via env variable
14#[derive(Debug, serde::Serialize, serde::Deserialize)]
15pub struct RuntimeConfig {
16    pub node: NodeConfig,
17    pub operators: Vec<OperatorDefinition>,
18}
19
20#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
21pub struct NodeConfig {
22    pub dataflow_id: DataflowId,
23    pub node_id: NodeId,
24    pub run_config: NodeRunConfig,
25    pub daemon_communication: DaemonCommunication,
26    pub dataflow_descriptor: serde_yaml::Value,
27    pub dynamic: bool,
28}
29
30#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
31pub enum DaemonCommunication {
32    Shmem {
33        daemon_control_region_id: SharedMemoryId,
34        daemon_drop_region_id: SharedMemoryId,
35        daemon_events_region_id: SharedMemoryId,
36        daemon_events_close_region_id: SharedMemoryId,
37    },
38    Tcp {
39        socket_addr: SocketAddr,
40    },
41    #[cfg(unix)]
42    UnixDomain {
43        socket_file: PathBuf,
44    },
45}
46
47#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
48#[must_use]
49pub enum DaemonReply {
50    Result(Result<(), String>),
51    PreparedMessage { shared_memory_id: SharedMemoryId },
52    NextEvents(Vec<Timestamped<NodeEvent>>),
53    NextDropEvents(Vec<Timestamped<NodeDropEvent>>),
54    NodeConfig { result: Result<NodeConfig, String> },
55    Empty,
56}
57
58#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
59pub enum NodeEvent {
60    Stop,
61    Reload {
62        operator_id: Option<OperatorId>,
63    },
64    Input {
65        id: DataId,
66        metadata: Metadata,
67        data: Option<DataMessage>,
68    },
69    InputClosed {
70        id: DataId,
71    },
72    AllInputsClosed,
73}
74
75#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
76pub enum NodeDropEvent {
77    OutputDropped { drop_token: DropToken },
78}