use crate::ExecutionError;
use crate::node::{ContextMode, NodeId};
use hashbrown::HashMap;
use std::sync::Arc;
use std::time::Duration;
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct RunId(Arc<str>);
impl RunId {
#[must_use]
pub fn new() -> Self {
Self(Arc::from(nanoid::nanoid!()))
}
pub fn from_string(id: impl Into<Arc<str>>) -> Self {
Self(id.into())
}
#[must_use]
pub fn as_str(&self) -> &str {
&self.0
}
}
impl Default for RunId {
fn default() -> Self {
Self::new()
}
}
impl std::fmt::Display for RunId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(&self.0)
}
}
impl serde::Serialize for RunId {
fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
serializer.serialize_str(&self.0)
}
}
impl<'de> serde::Deserialize<'de> for RunId {
fn deserialize<D: serde::Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
let raw = <std::borrow::Cow<'de, str> as serde::Deserialize>::deserialize(deserializer)?;
Ok(Self(Arc::from(raw.as_ref())))
}
}
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct RunLabels {
inner: Arc<HashMap<String, String>>,
}
impl RunLabels {
#[must_use]
pub fn empty() -> Self {
Self::default()
}
#[must_use]
pub fn get(&self, key: &str) -> Option<&str> {
self.inner.get(key).map(String::as_str)
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.inner.is_empty()
}
#[must_use]
pub fn len(&self) -> usize {
self.inner.len()
}
pub fn iter(&self) -> impl Iterator<Item = (&str, &str)> {
self.inner.iter().map(|(k, v)| (k.as_str(), v.as_str()))
}
}
impl<K, V, I> From<I> for RunLabels
where
K: Into<String>,
V: Into<String>,
I: IntoIterator<Item = (K, V)>,
{
fn from(iter: I) -> Self {
let map: HashMap<String, String> = iter
.into_iter()
.map(|(k, v)| (k.into(), v.into()))
.collect();
Self {
inner: Arc::new(map),
}
}
}
impl serde::Serialize for RunLabels {
fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
(*self.inner).serialize(serializer)
}
}
impl<'de> serde::Deserialize<'de> for RunLabels {
fn deserialize<D: serde::Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
let map = <HashMap<String, String> as serde::Deserialize>::deserialize(deserializer)?;
Ok(Self {
inner: Arc::new(map),
})
}
}
#[derive(Debug, Clone)]
#[non_exhaustive]
pub enum GraphEvent {
GraphStart {
run_id: RunId,
labels: RunLabels,
node_count: usize,
node_map: Vec<(NodeId, &'static str)>,
},
GraphComplete {
run_id: RunId,
labels: RunLabels,
nodes_executed: usize,
duration: Duration,
},
GraphFailure {
run_id: RunId,
labels: RunLabels,
error: ExecutionError,
},
SystemStart {
run_id: RunId,
labels: RunLabels,
node_id: NodeId,
node_name: &'static str,
},
SystemComplete {
run_id: RunId,
labels: RunLabels,
node_id: NodeId,
node_name: &'static str,
duration: Duration,
},
SystemError {
run_id: RunId,
labels: RunLabels,
node_id: NodeId,
node_name: &'static str,
error: Arc<str>,
},
DecisionStart {
run_id: RunId,
labels: RunLabels,
node_id: NodeId,
node_name: &'static str,
},
DecisionComplete {
run_id: RunId,
labels: RunLabels,
node_id: NodeId,
node_name: &'static str,
selected_branch: &'static str,
},
SwitchStart {
run_id: RunId,
labels: RunLabels,
node_id: NodeId,
node_name: &'static str,
case_count: usize,
has_default: bool,
},
SwitchComplete {
run_id: RunId,
labels: RunLabels,
node_id: NodeId,
node_name: &'static str,
selected_case: &'static str,
used_default: bool,
},
LoopStart {
run_id: RunId,
labels: RunLabels,
node_id: NodeId,
node_name: &'static str,
max_iterations: usize,
},
LoopIteration {
run_id: RunId,
labels: RunLabels,
node_id: NodeId,
node_name: &'static str,
iteration: usize,
},
LoopEnd {
run_id: RunId,
labels: RunLabels,
node_id: NodeId,
node_name: &'static str,
iterations: usize,
nodes_executed: usize,
duration: Duration,
},
ParallelStart {
run_id: RunId,
labels: RunLabels,
node_id: NodeId,
node_name: &'static str,
branch_count: usize,
},
ParallelComplete {
run_id: RunId,
labels: RunLabels,
node_id: NodeId,
node_name: &'static str,
branch_count: usize,
total_nodes_executed: usize,
duration: Duration,
},
ScopeStart {
run_id: RunId,
labels: RunLabels,
node_id: NodeId,
node_name: &'static str,
context_mode: ContextMode,
inner_node_count: usize,
},
ScopeComplete {
run_id: RunId,
labels: RunLabels,
node_id: NodeId,
node_name: &'static str,
context_mode: ContextMode,
nodes_executed: usize,
duration: Duration,
},
}
impl GraphEvent {
#[must_use]
pub fn schedule_name(&self) -> &'static str {
match self {
GraphEvent::GraphStart { .. } => "OnGraphStart",
GraphEvent::GraphComplete { .. } => "OnGraphComplete",
GraphEvent::GraphFailure { .. } => "OnGraphFailure",
GraphEvent::SystemStart { .. } => "OnSystemStart",
GraphEvent::SystemComplete { .. } => "OnSystemComplete",
GraphEvent::SystemError { .. } => "OnSystemError",
GraphEvent::DecisionStart { .. } => "OnDecisionStart",
GraphEvent::DecisionComplete { .. } => "OnDecisionComplete",
GraphEvent::SwitchStart { .. } => "OnSwitchStart",
GraphEvent::SwitchComplete { .. } => "OnSwitchComplete",
GraphEvent::LoopStart { .. } => "OnLoopStart",
GraphEvent::LoopIteration { .. } => "OnLoopIteration",
GraphEvent::LoopEnd { .. } => "OnLoopEnd",
GraphEvent::ParallelStart { .. } => "OnParallelStart",
GraphEvent::ParallelComplete { .. } => "OnParallelComplete",
GraphEvent::ScopeStart { .. } => "OnScopeStart",
GraphEvent::ScopeComplete { .. } => "OnScopeComplete",
}
}
#[must_use]
pub fn run_id(&self) -> &RunId {
match self {
GraphEvent::GraphStart { run_id, .. }
| GraphEvent::GraphComplete { run_id, .. }
| GraphEvent::GraphFailure { run_id, .. }
| GraphEvent::SystemStart { run_id, .. }
| GraphEvent::SystemComplete { run_id, .. }
| GraphEvent::SystemError { run_id, .. }
| GraphEvent::DecisionStart { run_id, .. }
| GraphEvent::DecisionComplete { run_id, .. }
| GraphEvent::SwitchStart { run_id, .. }
| GraphEvent::SwitchComplete { run_id, .. }
| GraphEvent::LoopStart { run_id, .. }
| GraphEvent::LoopIteration { run_id, .. }
| GraphEvent::LoopEnd { run_id, .. }
| GraphEvent::ParallelStart { run_id, .. }
| GraphEvent::ParallelComplete { run_id, .. }
| GraphEvent::ScopeStart { run_id, .. }
| GraphEvent::ScopeComplete { run_id, .. } => run_id,
}
}
#[must_use]
pub fn labels(&self) -> &RunLabels {
match self {
GraphEvent::GraphStart { labels, .. }
| GraphEvent::GraphComplete { labels, .. }
| GraphEvent::GraphFailure { labels, .. }
| GraphEvent::SystemStart { labels, .. }
| GraphEvent::SystemComplete { labels, .. }
| GraphEvent::SystemError { labels, .. }
| GraphEvent::DecisionStart { labels, .. }
| GraphEvent::DecisionComplete { labels, .. }
| GraphEvent::SwitchStart { labels, .. }
| GraphEvent::SwitchComplete { labels, .. }
| GraphEvent::LoopStart { labels, .. }
| GraphEvent::LoopIteration { labels, .. }
| GraphEvent::LoopEnd { labels, .. }
| GraphEvent::ParallelStart { labels, .. }
| GraphEvent::ParallelComplete { labels, .. }
| GraphEvent::ScopeStart { labels, .. }
| GraphEvent::ScopeComplete { labels, .. } => labels,
}
}
#[must_use]
pub fn node_id(&self) -> Option<NodeId> {
match self {
GraphEvent::GraphStart { .. }
| GraphEvent::GraphComplete { .. }
| GraphEvent::GraphFailure { .. } => None,
GraphEvent::SystemStart { node_id, .. }
| GraphEvent::SystemComplete { node_id, .. }
| GraphEvent::SystemError { node_id, .. }
| GraphEvent::DecisionStart { node_id, .. }
| GraphEvent::DecisionComplete { node_id, .. }
| GraphEvent::SwitchStart { node_id, .. }
| GraphEvent::SwitchComplete { node_id, .. }
| GraphEvent::LoopStart { node_id, .. }
| GraphEvent::LoopIteration { node_id, .. }
| GraphEvent::LoopEnd { node_id, .. }
| GraphEvent::ParallelStart { node_id, .. }
| GraphEvent::ParallelComplete { node_id, .. }
| GraphEvent::ScopeStart { node_id, .. }
| GraphEvent::ScopeComplete { node_id, .. } => Some(node_id.clone()),
}
}
}
impl std::fmt::Display for GraphEvent {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let run = self.run_id();
match self {
GraphEvent::GraphStart {
node_count,
node_map,
..
} => {
write!(f, "[{run}] GraphStart(nodes: {node_count})")?;
if !node_map.is_empty() {
writeln!(f, ", node_map:")?;
for (id, name) in node_map {
writeln!(f, " {name:20} {id}")?;
}
};
Ok(())
}
GraphEvent::GraphComplete {
nodes_executed,
duration,
..
} => {
write!(
f,
"[{run}] GraphComplete(executed: {nodes_executed}, duration: {duration:?})"
)
}
GraphEvent::GraphFailure { error, .. } => {
write!(f, "[{run}] GraphFailure(error: {error})")
}
GraphEvent::SystemStart {
node_id, node_name, ..
} => {
write!(f, "[{run}] SystemStart({node_name} @ {node_id:?})")
}
GraphEvent::SystemComplete {
node_id,
node_name,
duration,
..
} => {
write!(
f,
"[{run}] SystemComplete({node_name} @ {node_id:?}, duration: {duration:?})"
)
}
GraphEvent::SystemError {
node_id,
node_name,
error,
..
} => {
write!(
f,
"[{run}] SystemError({node_name} @ {node_id:?}, error: {error})"
)
}
GraphEvent::DecisionStart {
node_id, node_name, ..
} => {
write!(f, "[{run}] DecisionStart({node_name} @ {node_id:?})")
}
GraphEvent::DecisionComplete {
node_id,
node_name,
selected_branch,
..
} => {
write!(
f,
"[{run}] DecisionComplete({node_name} @ {node_id:?}, branch: {selected_branch})"
)
}
GraphEvent::SwitchStart {
node_id,
node_name,
case_count,
has_default,
..
} => {
write!(
f,
"[{run}] SwitchStart({node_name} @ {node_id:?}, cases: {case_count}, default: {has_default})"
)
}
GraphEvent::SwitchComplete {
node_id,
node_name,
selected_case,
used_default,
..
} => {
write!(
f,
"[{run}] SwitchComplete({node_name} @ {node_id:?}, case: {selected_case}, used_default: {used_default})"
)
}
GraphEvent::LoopStart {
node_id,
node_name,
max_iterations,
..
} => {
write!(
f,
"[{run}] LoopStart({node_name} @ {node_id:?}, max_iterations: {max_iterations})"
)
}
GraphEvent::LoopIteration {
node_id,
node_name,
iteration,
..
} => {
write!(
f,
"[{run}] LoopIteration({node_name} @ {node_id:?}, iteration: {iteration})"
)
}
GraphEvent::LoopEnd {
node_id,
node_name,
iterations,
nodes_executed,
duration,
..
} => {
write!(
f,
"[{run}] LoopEnd({node_name} @ {node_id:?}, iterations: {iterations}, executed: {nodes_executed}, duration: {duration:?})"
)
}
GraphEvent::ParallelStart {
node_id,
node_name,
branch_count,
..
} => {
write!(
f,
"[{run}] ParallelStart({node_name} @ {node_id:?}, branches: {branch_count})"
)
}
GraphEvent::ParallelComplete {
node_id,
node_name,
branch_count,
total_nodes_executed,
duration,
..
} => {
write!(
f,
"[{run}] ParallelComplete({node_name} @ {node_id:?}, branches: {branch_count}, executed: {total_nodes_executed}, duration: {duration:?})"
)
}
GraphEvent::ScopeStart {
node_id,
node_name,
context_mode,
inner_node_count,
..
} => {
write!(
f,
"[{run}] ScopeStart({node_name} @ {node_id:?}, mode: {context_mode}, inner_nodes: {inner_node_count})"
)
}
GraphEvent::ScopeComplete {
node_id,
node_name,
context_mode,
nodes_executed,
duration,
..
} => {
write!(
f,
"[{run}] ScopeComplete({node_name} @ {node_id:?}, mode: {context_mode}, executed: {nodes_executed}, duration: {duration:?})"
)
}
}
}
}
#[cfg(test)]
mod serde_tests {
use super::*;
#[test]
fn run_id_round_trips_through_json() {
let id = RunId::from_string("abc-123");
let json = serde_json::to_string(&id).unwrap();
assert_eq!(json, "\"abc-123\"");
let restored: RunId = serde_json::from_str(&json).unwrap();
assert_eq!(restored, id);
}
#[test]
fn run_labels_round_trip_through_json() {
let labels = RunLabels::from([
("session_id", "s-1".to_owned()),
("agent_type", "react".to_owned()),
]);
let json = serde_json::to_string(&labels).unwrap();
let restored: RunLabels = serde_json::from_str(&json).unwrap();
assert_eq!(restored, labels);
assert_eq!(restored.get("session_id"), Some("s-1"));
assert_eq!(restored.get("agent_type"), Some("react"));
}
#[test]
fn empty_run_labels_round_trip() {
let labels = RunLabels::empty();
let json = serde_json::to_string(&labels).unwrap();
assert_eq!(json, "{}");
let restored: RunLabels = serde_json::from_str(&json).unwrap();
assert!(restored.is_empty());
}
}