Skip to main content

dora_message/
daemon_to_node.rs

1use std::{net::SocketAddr, path::PathBuf};
2
3use crate::{
4    DataflowId,
5    config::NodeRunConfig,
6    descriptor::OperatorDefinition,
7    id::{DataId, NodeId, OperatorId},
8    metadata::Metadata,
9};
10
11pub use crate::common::{DataMessage, DropToken, SharedMemoryId, Timestamped};
12
13// Passed via env variable
14#[derive(Debug, serde::Serialize, serde::Deserialize)]
15pub struct RuntimeConfig {
16    pub node: NodeConfig,
17    pub operators: Vec<OperatorDefinition>,
18}
19
20#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
21pub struct NodeConfig {
22    pub dataflow_id: DataflowId,
23    pub node_id: NodeId,
24    pub run_config: NodeRunConfig,
25    pub daemon_communication: Option<DaemonCommunication>,
26    pub dataflow_descriptor: serde_yaml::Value,
27    pub dynamic: bool,
28    pub write_events_to: Option<PathBuf>,
29}
30
31#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
32pub enum DaemonCommunication {
33    Tcp {
34        socket_addr: SocketAddr,
35    },
36    #[cfg(unix)]
37    UnixDomain {
38        socket_file: PathBuf,
39    },
40    Interactive,
41}
42
43#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
44#[must_use]
45#[allow(clippy::large_enum_variant)]
46pub enum DaemonReply {
47    Result(Result<(), String>),
48    PreparedMessage { shared_memory_id: SharedMemoryId },
49    NextEvents(Vec<Timestamped<NodeEvent>>),
50    NextDropEvents(Vec<Timestamped<NodeDropEvent>>),
51    NodeConfig { result: Result<NodeConfig, String> },
52    Empty,
53}
54
55#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
56#[allow(clippy::large_enum_variant)]
57pub enum NodeEvent {
58    Stop,
59    Reload {
60        operator_id: Option<OperatorId>,
61    },
62    Input {
63        id: DataId,
64        metadata: Metadata,
65        data: Option<DataMessage>,
66    },
67    InputClosed {
68        id: DataId,
69    },
70    /// Notifies a node that all its inputs have been closed.
71    ///
72    /// This event is only sent to nodes that have at least one input.
73    AllInputsClosed,
74    NodeFailed {
75        affected_input_ids: Vec<DataId>,
76        error: String,
77        source_node_id: NodeId,
78    },
79}
80
81#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
82pub enum NodeDropEvent {
83    OutputDropped { drop_token: DropToken },
84}