zenoh_flow_runtime/instance.rs
1//
2// Copyright (c) 2021 - 2024 ZettaScale Technology
3//
4// This program and the accompanying materials are made available under the
5// terms of the Eclipse Public License 2.0 which is available at
6// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7// which is available at https://www.apache.org/licenses/LICENSE-2.0.
8//
9// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10//
11// Contributors:
12// ZettaScale Zenoh Team, <zenoh@zettascale.tech>
13//
14
15use crate::runners::Runner;
16
17use std::{collections::HashMap, fmt::Display, ops::Deref};
18
19use serde::{Deserialize, Serialize};
20use uhlc::{Timestamp, HLC};
21use zenoh_flow_commons::{NodeId, Result, RuntimeId};
22use zenoh_flow_records::DataFlowRecord;
23
24/// A `DataFlowInstance` keeps track of the parts of a data flow managed by the Zenoh-Flow runtime.
25///
26/// A `DataFlowInstance` structure is thus *local* to a Zenoh-Flow runtime. For a data flow that spawns on multiple
27/// runtimes, there will be one such structure at each runtime.
28///
29/// All instances will share the same [record](DataFlowRecord) but their internal state will differ.
30pub struct DataFlowInstance {
31 pub(crate) state: InstanceState,
32 pub(crate) record: DataFlowRecord,
33 pub(crate) runners: HashMap<NodeId, Runner>,
34}
35
36/// The different states of a [DataFlowInstance].
37///
38/// Note that *a state is tied to a Zenoh-Flow [runtime]*: if a data flow is distributed across multiple Zenoh-Flow
39/// runtimes, their respective state for the same instance could be different (but should eventually converge).
40///
41/// [runtime]: crate::Runtime
42#[derive(Clone, Deserialize, Serialize, Debug)]
43pub enum InstanceState {
44 /// A [runtime] listing a [DataFlowInstance] in the `Creating` state is in the process of loading all the nodes it
45 /// manages.
46 ///
47 /// [runtime]: crate::Runtime
48 Creating(Timestamp),
49 /// A [runtime] listing a [DataFlowInstance] in the `Loaded` state successfully instantiated all the nodes it manages
50 /// and is ready to start them.
51 ///
52 /// A `Loaded` data flow can be started or deleted.
53 ///
54 /// [runtime]: crate::Runtime
55 Loaded(Timestamp),
56 /// A [runtime] listing a [DataFlowInstance] in the `Running` state has (re)started all the nodes it manages.
57 ///
58 /// A `Running` data flow can be aborted or deleted.
59 ///
60 /// [runtime]: crate::Runtime
61 Running(Timestamp),
62 /// A [runtime] listing a [DataFlowInstance] in the `Aborted` state has abruptly stopped all the nodes it manages.
63 ///
64 /// An `Aborted` data flow can be restarted or deleted.
65 ///
66 /// [runtime]: crate::Runtime
67 Aborted(Timestamp),
68 /// A [runtime] listing a [DataFlowInstance] in the `Failed` state failed to load at least one of the nodes of this
69 /// instance it manages.
70 ///
71 /// A data flow in the `Failed` state can only be deleted.
72 ///
73 /// [runtime]: crate::Runtime
74 Failed((Timestamp, String)),
75}
76
77impl Display for InstanceState {
78 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
79 match self {
80 InstanceState::Creating(ts) => write!(f, "Creation started on {}", ts.get_time()),
81 InstanceState::Loaded(ts) => write!(f, "Loaded on {}", ts.get_time()),
82 InstanceState::Running(ts) => write!(f, "Running since {}", ts.get_time()),
83 InstanceState::Aborted(ts) => write!(f, "Aborted on {}", ts.get_time()),
84 InstanceState::Failed((ts, reason)) => {
85 write!(f, "Failed on {} with:\n{}", ts.get_time(), reason)
86 }
87 }
88 }
89}
90
91/// The `InstanceStatus` provides information about the data flow instance.
92///
93/// It details:
94/// - from which runtime this information comes from (through its [identifier](RuntimeId)),
95/// - the [state](InstanceState) of the data flow instance,
96/// - the list of nodes (through their [identifier](NodeId)) the runtime manages --- and thus for which the state
97/// applies.
98///
99/// This information is what is displayed by the `zfctl` tool when requesting the status of a data flow instance.
100#[derive(Deserialize, Serialize, Debug)]
101pub struct InstanceStatus {
102 /// The identifier of the [runtime](crate::Runtime) this information comes from.
103 pub runtime_id: RuntimeId,
104 /// The state of the data flow instance --- on this runtime.
105 pub state: InstanceState,
106 /// The nodes managed by this runtime, for which the state applies.
107 pub nodes: Vec<NodeId>,
108}
109
110impl Deref for DataFlowInstance {
111 type Target = DataFlowRecord;
112
113 fn deref(&self) -> &Self::Target {
114 &self.record
115 }
116}
117
118impl DataFlowInstance {
119 /// Creates a new `DataFlowInstance`, setting its state to [Creating](InstanceState::Creating).
120 pub(crate) fn new(record: DataFlowRecord, hlc: &HLC) -> Self {
121 Self {
122 state: InstanceState::Creating(hlc.new_timestamp()),
123 record,
124 runners: HashMap::default(),
125 }
126 }
127
128 /// (re-)Starts the `DataFlowInstance`.
129 ///
130 /// The [hlc](HLC) is required to keep track of when this call was made.
131 ///
132 /// # Errors
133 ///
134 /// This method can fail when attempting to re-start: when re-starting a data flow, the method
135 /// [on_resume] is called for each node and is faillible.
136 ///
137 /// [on_resume]: zenoh_flow_nodes::prelude::Node::on_resume()
138 pub async fn start(&mut self, hlc: &HLC) -> Result<()> {
139 for (node_id, runner) in self.runners.iter_mut() {
140 runner.start().await?;
141 tracing::trace!("Started node < {} >", node_id);
142 }
143
144 self.state = InstanceState::Running(hlc.new_timestamp());
145 Ok(())
146 }
147
148 /// Aborts the `DataFlowInstance`.
149 ///
150 /// The [hlc](HLC) is required to keep track of when this call was made.
151 pub async fn abort(&mut self, hlc: &HLC) {
152 for (node_id, runner) in self.runners.iter_mut() {
153 runner.abort().await;
154 tracing::trace!("Aborted node < {} >", node_id);
155 }
156
157 self.state = InstanceState::Aborted(hlc.new_timestamp());
158 }
159
160 /// Returns the [state](InstanceState) of this `DataFlowInstance`.
161 pub fn state(&self) -> &InstanceState {
162 &self.state
163 }
164
165 /// Returns the [status](InstanceStatus) of this `DataFlowInstance`.
166 ///
167 /// This structure was intended as a way to retrieve and display information about the instance. This is what the
168 /// `zfctl` tool leverages for its `instance status` command.
169 pub fn status(&self, runtime_id: &RuntimeId) -> InstanceStatus {
170 InstanceStatus {
171 runtime_id: runtime_id.clone(),
172 state: self.state.clone(),
173 nodes: self
174 .runners
175 .keys()
176 .filter(|&node_id| {
177 !(self.senders().contains_key(node_id)
178 || self.receivers().contains_key(node_id))
179 })
180 .cloned()
181 .collect(),
182 }
183 }
184}