dora_message/
coordinator_to_daemon.rs

1use std::{
2    collections::{BTreeMap, BTreeSet},
3    path::PathBuf,
4    time::Duration,
5};
6
7use crate::{
8    BuildId, DataflowId, SessionId,
9    common::{DaemonId, GitSource},
10    descriptor::{Descriptor, ResolvedNode},
11    id::{NodeId, OperatorId},
12};
13
14pub use crate::common::Timestamped;
15
16#[derive(Debug, serde::Serialize, serde::Deserialize)]
17pub enum RegisterResult {
18    Ok {
19        /// unique ID assigned by the coordinator
20        daemon_id: DaemonId,
21    },
22    Err(String),
23}
24
25impl RegisterResult {
26    pub fn to_result(self) -> eyre::Result<DaemonId> {
27        match self {
28            RegisterResult::Ok { daemon_id } => Ok(daemon_id),
29            RegisterResult::Err(err) => Err(eyre::eyre!(err)),
30        }
31    }
32}
33
34#[derive(Debug, serde::Deserialize, serde::Serialize)]
35pub enum DaemonCoordinatorEvent {
36    Build(BuildDataflowNodes),
37    Spawn(SpawnDataflowNodes),
38    AllNodesReady {
39        dataflow_id: DataflowId,
40        exited_before_subscribe: Vec<NodeId>,
41    },
42    StopDataflow {
43        dataflow_id: DataflowId,
44        grace_duration: Option<Duration>,
45        #[serde(default)]
46        force: bool,
47    },
48    ReloadDataflow {
49        dataflow_id: DataflowId,
50        node_id: NodeId,
51        operator_id: Option<OperatorId>,
52    },
53    Logs {
54        dataflow_id: DataflowId,
55        node_id: NodeId,
56        tail: Option<usize>,
57    },
58    Destroy,
59    Heartbeat,
60}
61
62#[derive(Debug, serde::Deserialize, serde::Serialize)]
63pub struct BuildDataflowNodes {
64    pub build_id: BuildId,
65    pub session_id: SessionId,
66    /// Allows overwriting the base working dir when CLI and daemon are
67    /// running on the same machine.
68    ///
69    /// Must not be used for multi-machine dataflows.
70    ///
71    /// Note that nodes with git sources still use a subdirectory of
72    /// the base working dir.
73    pub local_working_dir: Option<PathBuf>,
74    pub git_sources: BTreeMap<NodeId, GitSource>,
75    pub prev_git_sources: BTreeMap<NodeId, GitSource>,
76    pub dataflow_descriptor: Descriptor,
77    pub nodes_on_machine: BTreeSet<NodeId>,
78    pub uv: bool,
79}
80
81#[derive(Debug, serde::Deserialize, serde::Serialize)]
82pub struct SpawnDataflowNodes {
83    pub build_id: Option<BuildId>,
84    pub session_id: SessionId,
85    pub dataflow_id: DataflowId,
86    /// Allows overwriting the base working dir when CLI and daemon are
87    /// running on the same machine.
88    ///
89    /// Must not be used for multi-machine dataflows.
90    ///
91    /// Note that nodes with git sources still use a subdirectory of
92    /// the base working dir.
93    pub local_working_dir: Option<PathBuf>,
94    pub nodes: BTreeMap<NodeId, ResolvedNode>,
95    pub dataflow_descriptor: Descriptor,
96    pub spawn_nodes: BTreeSet<NodeId>,
97    pub uv: bool,
98    pub write_events_to: Option<PathBuf>,
99}