dora_message/
daemon_to_node.rs

1use std::{net::SocketAddr, path::PathBuf};
2
3use crate::{
4    DataflowId,
5    config::NodeRunConfig,
6    descriptor::OperatorDefinition,
7    id::{DataId, NodeId, OperatorId},
8    metadata::Metadata,
9};
10
11pub use crate::common::{DataMessage, DropToken, SharedMemoryId, Timestamped};
12
13// Passed via env variable
14#[derive(Debug, serde::Serialize, serde::Deserialize)]
15pub struct RuntimeConfig {
16    pub node: NodeConfig,
17    pub operators: Vec<OperatorDefinition>,
18}
19
20#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
21pub struct NodeConfig {
22    pub dataflow_id: DataflowId,
23    pub node_id: NodeId,
24    pub run_config: NodeRunConfig,
25    pub daemon_communication: Option<DaemonCommunication>,
26    pub dataflow_descriptor: serde_yaml::Value,
27    pub dynamic: bool,
28    pub write_events_to: Option<PathBuf>,
29}
30
31#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
32pub enum DaemonCommunication {
33    Shmem {
34        daemon_control_region_id: SharedMemoryId,
35        daemon_drop_region_id: SharedMemoryId,
36        daemon_events_region_id: SharedMemoryId,
37        daemon_events_close_region_id: SharedMemoryId,
38    },
39    Tcp {
40        socket_addr: SocketAddr,
41    },
42    #[cfg(unix)]
43    UnixDomain {
44        socket_file: PathBuf,
45    },
46    Interactive,
47}
48
49#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
50#[must_use]
51#[allow(clippy::large_enum_variant)]
52pub enum DaemonReply {
53    Result(Result<(), String>),
54    PreparedMessage { shared_memory_id: SharedMemoryId },
55    NextEvents(Vec<Timestamped<NodeEvent>>),
56    NextDropEvents(Vec<Timestamped<NodeDropEvent>>),
57    NodeConfig { result: Result<NodeConfig, String> },
58    Empty,
59}
60
61#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
62#[allow(clippy::large_enum_variant)]
63pub enum NodeEvent {
64    Stop,
65    Reload {
66        operator_id: Option<OperatorId>,
67    },
68    Input {
69        id: DataId,
70        metadata: Metadata,
71        data: Option<DataMessage>,
72    },
73    InputClosed {
74        id: DataId,
75    },
76    AllInputsClosed,
77}
78
79#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
80pub enum NodeDropEvent {
81    OutputDropped { drop_token: DropToken },
82}