dora_message/
daemon_to_node.rs1use std::{net::SocketAddr, path::PathBuf};
2
3use crate::{
4 DataflowId,
5 config::NodeRunConfig,
6 descriptor::OperatorDefinition,
7 id::{DataId, NodeId, OperatorId},
8 metadata::Metadata,
9};
10
11pub use crate::common::{DataMessage, DropToken, SharedMemoryId, Timestamped};
12
13#[derive(Debug, serde::Serialize, serde::Deserialize)]
15pub struct RuntimeConfig {
16 pub node: NodeConfig,
17 pub operators: Vec<OperatorDefinition>,
18}
19
20#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
21pub struct NodeConfig {
22 pub dataflow_id: DataflowId,
23 pub node_id: NodeId,
24 pub run_config: NodeRunConfig,
25 pub daemon_communication: Option<DaemonCommunication>,
26 pub dataflow_descriptor: serde_yaml::Value,
27 pub dynamic: bool,
28 pub write_events_to: Option<PathBuf>,
29}
30
31#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
32pub enum DaemonCommunication {
33 Tcp {
34 socket_addr: SocketAddr,
35 },
36 #[cfg(unix)]
37 UnixDomain {
38 socket_file: PathBuf,
39 },
40 Interactive,
41}
42
43#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
44#[must_use]
45#[allow(clippy::large_enum_variant)]
46pub enum DaemonReply {
47 Result(Result<(), String>),
48 PreparedMessage { shared_memory_id: SharedMemoryId },
49 NextEvents(Vec<Timestamped<NodeEvent>>),
50 NextDropEvents(Vec<Timestamped<NodeDropEvent>>),
51 NodeConfig { result: Result<NodeConfig, String> },
52 Empty,
53}
54
55#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
56#[allow(clippy::large_enum_variant)]
57pub enum NodeEvent {
58 Stop,
59 Reload {
60 operator_id: Option<OperatorId>,
61 },
62 Input {
63 id: DataId,
64 metadata: Metadata,
65 data: Option<DataMessage>,
66 },
67 InputClosed {
68 id: DataId,
69 },
70 AllInputsClosed,
74 NodeFailed {
75 affected_input_ids: Vec<DataId>,
76 error: String,
77 source_node_id: NodeId,
78 },
79}
80
81#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
82pub enum NodeDropEvent {
83 OutputDropped { drop_token: DropToken },
84}