zenoh_flow_runtime/
instance.rs

1//
2// Copyright (c) 2021 - 2024 ZettaScale Technology
3//
4// This program and the accompanying materials are made available under the
5// terms of the Eclipse Public License 2.0 which is available at
6// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7// which is available at https://www.apache.org/licenses/LICENSE-2.0.
8//
9// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10//
11// Contributors:
12//   ZettaScale Zenoh Team, <zenoh@zettascale.tech>
13//
14
15use crate::runners::Runner;
16
17use std::{collections::HashMap, fmt::Display, ops::Deref};
18
19use serde::{Deserialize, Serialize};
20use uhlc::{Timestamp, HLC};
21use zenoh_flow_commons::{NodeId, Result, RuntimeId};
22use zenoh_flow_records::DataFlowRecord;
23
24/// A `DataFlowInstance` keeps track of the parts of a data flow managed by the Zenoh-Flow runtime.
25///
26/// A `DataFlowInstance` structure is thus *local* to a Zenoh-Flow runtime. For a data flow that spawns on multiple
27/// runtimes, there will be one such structure at each runtime.
28///
29/// All instances will share the same [record](DataFlowRecord) but their internal state will differ.
30pub struct DataFlowInstance {
31    pub(crate) state: InstanceState,
32    pub(crate) record: DataFlowRecord,
33    pub(crate) runners: HashMap<NodeId, Runner>,
34}
35
36/// The different states of a [DataFlowInstance].
37///
38/// Note that *a state is tied to a Zenoh-Flow [runtime]*: if a data flow is distributed across multiple Zenoh-Flow
39/// runtimes, their respective state for the same instance could be different (but should eventually converge).
40///
41/// [runtime]: crate::Runtime
42#[derive(Clone, Deserialize, Serialize, Debug)]
43pub enum InstanceState {
44    /// A [runtime] listing a [DataFlowInstance] in the `Creating` state is in the process of loading all the nodes it
45    /// manages.
46    ///
47    /// [runtime]: crate::Runtime
48    Creating(Timestamp),
49    /// A [runtime] listing a [DataFlowInstance] in the `Loaded` state successfully instantiated all the nodes it manages
50    /// and is ready to start them.
51    ///
52    /// A `Loaded` data flow can be started or deleted.
53    ///
54    /// [runtime]: crate::Runtime
55    Loaded(Timestamp),
56    /// A [runtime] listing a [DataFlowInstance] in the `Running` state has (re)started all the nodes it manages.
57    ///
58    /// A `Running` data flow can be aborted or deleted.
59    ///
60    /// [runtime]: crate::Runtime
61    Running(Timestamp),
62    /// A [runtime] listing a [DataFlowInstance] in the `Aborted` state has abruptly stopped all the nodes it manages.
63    ///
64    /// An `Aborted` data flow can be restarted or deleted.
65    ///
66    /// [runtime]: crate::Runtime
67    Aborted(Timestamp),
68    /// A [runtime] listing a [DataFlowInstance] in the `Failed` state failed to load at least one of the nodes of this
69    /// instance it manages.
70    ///
71    /// A data flow in the `Failed` state can only be deleted.
72    ///
73    /// [runtime]: crate::Runtime
74    Failed((Timestamp, String)),
75}
76
77impl Display for InstanceState {
78    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
79        match self {
80            InstanceState::Creating(ts) => write!(f, "Creation started on {}", ts.get_time()),
81            InstanceState::Loaded(ts) => write!(f, "Loaded on {}", ts.get_time()),
82            InstanceState::Running(ts) => write!(f, "Running since {}", ts.get_time()),
83            InstanceState::Aborted(ts) => write!(f, "Aborted on {}", ts.get_time()),
84            InstanceState::Failed((ts, reason)) => {
85                write!(f, "Failed on {} with:\n{}", ts.get_time(), reason)
86            }
87        }
88    }
89}
90
91/// The `InstanceStatus` provides information about the data flow instance.
92///
93/// It details:
94/// - from which runtime this information comes from (through its [identifier](RuntimeId)),
95/// - the [state](InstanceState) of the data flow instance,
96/// - the list of nodes (through their [identifier](NodeId)) the runtime manages --- and thus for which the state
97///   applies.
98///
99/// This information is what is displayed by the `zfctl` tool when requesting the status of a data flow instance.
100#[derive(Deserialize, Serialize, Debug)]
101pub struct InstanceStatus {
102    /// The identifier of the [runtime](crate::Runtime) this information comes from.
103    pub runtime_id: RuntimeId,
104    /// The state of the data flow instance --- on this runtime.
105    pub state: InstanceState,
106    /// The nodes managed by this runtime, for which the state applies.
107    pub nodes: Vec<NodeId>,
108}
109
110impl Deref for DataFlowInstance {
111    type Target = DataFlowRecord;
112
113    fn deref(&self) -> &Self::Target {
114        &self.record
115    }
116}
117
118impl DataFlowInstance {
119    /// Creates a new `DataFlowInstance`, setting its state to [Creating](InstanceState::Creating).
120    pub(crate) fn new(record: DataFlowRecord, hlc: &HLC) -> Self {
121        Self {
122            state: InstanceState::Creating(hlc.new_timestamp()),
123            record,
124            runners: HashMap::default(),
125        }
126    }
127
128    /// (re-)Starts the `DataFlowInstance`.
129    ///
130    /// The [hlc](HLC) is required to keep track of when this call was made.
131    ///
132    /// # Errors
133    ///
134    /// This method can fail when attempting to re-start: when re-starting a data flow, the method
135    /// [on_resume] is called for each node and is faillible.
136    ///
137    /// [on_resume]: zenoh_flow_nodes::prelude::Node::on_resume()
138    pub async fn start(&mut self, hlc: &HLC) -> Result<()> {
139        for (node_id, runner) in self.runners.iter_mut() {
140            runner.start().await?;
141            tracing::trace!("Started node < {} >", node_id);
142        }
143
144        self.state = InstanceState::Running(hlc.new_timestamp());
145        Ok(())
146    }
147
148    /// Aborts the `DataFlowInstance`.
149    ///
150    /// The [hlc](HLC) is required to keep track of when this call was made.
151    pub async fn abort(&mut self, hlc: &HLC) {
152        for (node_id, runner) in self.runners.iter_mut() {
153            runner.abort().await;
154            tracing::trace!("Aborted node < {} >", node_id);
155        }
156
157        self.state = InstanceState::Aborted(hlc.new_timestamp());
158    }
159
160    /// Returns the [state](InstanceState) of this `DataFlowInstance`.
161    pub fn state(&self) -> &InstanceState {
162        &self.state
163    }
164
165    /// Returns the [status](InstanceStatus) of this `DataFlowInstance`.
166    ///
167    /// This structure was intended as a way to retrieve and display information about the instance. This is what the
168    /// `zfctl` tool leverages for its `instance status` command.
169    pub fn status(&self, runtime_id: &RuntimeId) -> InstanceStatus {
170        InstanceStatus {
171            runtime_id: runtime_id.clone(),
172            state: self.state.clone(),
173            nodes: self
174                .runners
175                .keys()
176                .filter(|&node_id| {
177                    !(self.senders().contains_key(node_id)
178                        || self.receivers().contains_key(node_id))
179                })
180                .cloned()
181                .collect(),
182        }
183    }
184}