dora_message/
coordinator_to_daemon.rs1use std::{
2 collections::{BTreeMap, BTreeSet},
3 path::PathBuf,
4 time::Duration,
5};
6
7use crate::{
8 BuildId, DataflowId, SessionId,
9 common::{DaemonId, GitSource},
10 descriptor::{Descriptor, ResolvedNode},
11 id::{NodeId, OperatorId},
12};
13
14pub use crate::common::Timestamped;
15
16#[derive(Debug, serde::Serialize, serde::Deserialize)]
17pub enum RegisterResult {
18 Ok {
19 daemon_id: DaemonId,
21 },
22 Err(String),
23}
24
25impl RegisterResult {
26 pub fn to_result(self) -> eyre::Result<DaemonId> {
27 match self {
28 RegisterResult::Ok { daemon_id } => Ok(daemon_id),
29 RegisterResult::Err(err) => Err(eyre::eyre!(err)),
30 }
31 }
32}
33
34#[derive(Debug, serde::Deserialize, serde::Serialize)]
35pub enum DaemonCoordinatorEvent {
36 Build(BuildDataflowNodes),
37 Spawn(SpawnDataflowNodes),
38 AllNodesReady {
39 dataflow_id: DataflowId,
40 exited_before_subscribe: Vec<NodeId>,
41 },
42 StopDataflow {
43 dataflow_id: DataflowId,
44 grace_duration: Option<Duration>,
45 },
46 ReloadDataflow {
47 dataflow_id: DataflowId,
48 node_id: NodeId,
49 operator_id: Option<OperatorId>,
50 },
51 Logs {
52 dataflow_id: DataflowId,
53 node_id: NodeId,
54 },
55 Destroy,
56 Heartbeat,
57}
58
59#[derive(Debug, serde::Deserialize, serde::Serialize)]
60pub struct BuildDataflowNodes {
61 pub build_id: BuildId,
62 pub session_id: SessionId,
63 pub local_working_dir: Option<PathBuf>,
71 pub git_sources: BTreeMap<NodeId, GitSource>,
72 pub prev_git_sources: BTreeMap<NodeId, GitSource>,
73 pub dataflow_descriptor: Descriptor,
74 pub nodes_on_machine: BTreeSet<NodeId>,
75 pub uv: bool,
76}
77
78#[derive(Debug, serde::Deserialize, serde::Serialize)]
79pub struct SpawnDataflowNodes {
80 pub build_id: Option<BuildId>,
81 pub session_id: SessionId,
82 pub dataflow_id: DataflowId,
83 pub local_working_dir: Option<PathBuf>,
91 pub nodes: BTreeMap<NodeId, ResolvedNode>,
92 pub dataflow_descriptor: Descriptor,
93 pub spawn_nodes: BTreeSet<NodeId>,
94 pub uv: bool,
95}