dora_message/
coordinator_to_daemon.rs

1use std::{
2    collections::{BTreeMap, BTreeSet},
3    path::PathBuf,
4    time::Duration,
5};
6
7use crate::{
8    BuildId, DataflowId, SessionId,
9    common::{DaemonId, GitSource},
10    descriptor::{Descriptor, ResolvedNode},
11    id::{NodeId, OperatorId},
12};
13
14pub use crate::common::Timestamped;
15
16#[derive(Debug, serde::Serialize, serde::Deserialize)]
17pub enum RegisterResult {
18    Ok {
19        /// unique ID assigned by the coordinator
20        daemon_id: DaemonId,
21    },
22    Err(String),
23}
24
25impl RegisterResult {
26    pub fn to_result(self) -> eyre::Result<DaemonId> {
27        match self {
28            RegisterResult::Ok { daemon_id } => Ok(daemon_id),
29            RegisterResult::Err(err) => Err(eyre::eyre!(err)),
30        }
31    }
32}
33
34#[derive(Debug, serde::Deserialize, serde::Serialize)]
35pub enum DaemonCoordinatorEvent {
36    Build(BuildDataflowNodes),
37    Spawn(SpawnDataflowNodes),
38    AllNodesReady {
39        dataflow_id: DataflowId,
40        exited_before_subscribe: Vec<NodeId>,
41    },
42    StopDataflow {
43        dataflow_id: DataflowId,
44        grace_duration: Option<Duration>,
45    },
46    ReloadDataflow {
47        dataflow_id: DataflowId,
48        node_id: NodeId,
49        operator_id: Option<OperatorId>,
50    },
51    Logs {
52        dataflow_id: DataflowId,
53        node_id: NodeId,
54    },
55    Destroy,
56    Heartbeat,
57}
58
59#[derive(Debug, serde::Deserialize, serde::Serialize)]
60pub struct BuildDataflowNodes {
61    pub build_id: BuildId,
62    pub session_id: SessionId,
63    /// Allows overwriting the base working dir when CLI and daemon are
64    /// running on the same machine.
65    ///
66    /// Must not be used for multi-machine dataflows.
67    ///
68    /// Note that nodes with git sources still use a subdirectory of
69    /// the base working dir.
70    pub local_working_dir: Option<PathBuf>,
71    pub git_sources: BTreeMap<NodeId, GitSource>,
72    pub prev_git_sources: BTreeMap<NodeId, GitSource>,
73    pub dataflow_descriptor: Descriptor,
74    pub nodes_on_machine: BTreeSet<NodeId>,
75    pub uv: bool,
76}
77
78#[derive(Debug, serde::Deserialize, serde::Serialize)]
79pub struct SpawnDataflowNodes {
80    pub build_id: Option<BuildId>,
81    pub session_id: SessionId,
82    pub dataflow_id: DataflowId,
83    /// Allows overwriting the base working dir when CLI and daemon are
84    /// running on the same machine.
85    ///
86    /// Must not be used for multi-machine dataflows.
87    ///
88    /// Note that nodes with git sources still use a subdirectory of
89    /// the base working dir.
90    pub local_working_dir: Option<PathBuf>,
91    pub nodes: BTreeMap<NodeId, ResolvedNode>,
92    pub dataflow_descriptor: Descriptor,
93    pub spawn_nodes: BTreeSet<NodeId>,
94    pub uv: bool,
95}