use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use crate::graph::builder::{Branch, BuilderNode, NodeMeta};
use crate::graph::checkpoint::{
CheckpointConfig, CheckpointMetadata, Checkpointer, DurabilityMode,
};
use crate::graph::command::Interrupt;
use crate::graph::observability::{GraphEventJournal, GraphStatusStore};
use crate::graph::recursion::{ChildRun, RunTree};
use crate::graph::reducer::StateReducer;
use crate::graph::status::GraphRunStatus;
use crate::graph::stream::GraphEventSink;
use crate::harness::ids::{CheckpointId, GraphId, NodeId, RunId};
pub struct CompiledGraph<State, Update> {
pub(crate) graph_id: GraphId,
pub(crate) name: Option<String>,
pub(crate) nodes: Arc<HashMap<NodeId, BuilderNode<State, Update>>>,
pub(crate) edges: Arc<HashMap<NodeId, NodeId>>,
pub(crate) branches: Arc<HashMap<NodeId, Branch<State>>>,
#[allow(dead_code)]
pub(crate) command_nodes: Arc<HashSet<NodeId>>,
pub(crate) waiting: Arc<HashMap<NodeId, HashSet<NodeId>>>,
pub(crate) node_meta: Arc<HashMap<NodeId, NodeMeta>>,
pub(crate) entry: NodeId,
pub(crate) reducer: Arc<dyn StateReducer<State, Update>>,
pub(crate) recursion_limit: usize,
pub(crate) recursion_policy: crate::graph::recursion::RecursionPolicy,
pub(crate) recursion_frames: Vec<crate::graph::recursion::RecursionFrame>,
pub(crate) recursion_node: Option<NodeId>,
pub(crate) checkpointer: Option<Arc<dyn Checkpointer<State>>>,
pub(crate) event_sink: Option<Arc<dyn GraphEventSink>>,
pub(crate) journal: Option<Arc<dyn GraphEventJournal>>,
pub(crate) status_store: Option<Arc<dyn GraphStatusStore>>,
pub(crate) namespace: Vec<String>,
pub(crate) parallel: bool,
pub(crate) max_concurrency: Option<usize>,
pub(crate) node_timeout: Option<std::time::Duration>,
pub(crate) durability: DurabilityMode,
}
impl<State, Update> std::fmt::Debug for CompiledGraph<State, Update> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("CompiledGraph")
.field("graph_id", &self.graph_id)
.field("nodes", &self.nodes.len())
.field("entry", &self.entry)
.field("recursion_limit", &self.recursion_limit)
.field("namespace", &self.namespace)
.field("parallel", &self.parallel)
.finish_non_exhaustive()
}
}
impl<State, Update> Clone for CompiledGraph<State, Update> {
fn clone(&self) -> Self {
Self {
graph_id: self.graph_id.clone(),
name: self.name.clone(),
nodes: self.nodes.clone(),
edges: self.edges.clone(),
branches: self.branches.clone(),
command_nodes: self.command_nodes.clone(),
waiting: self.waiting.clone(),
node_meta: self.node_meta.clone(),
entry: self.entry.clone(),
reducer: self.reducer.clone(),
recursion_limit: self.recursion_limit,
recursion_policy: self.recursion_policy,
recursion_frames: self.recursion_frames.clone(),
recursion_node: self.recursion_node.clone(),
checkpointer: self.checkpointer.clone(),
event_sink: self.event_sink.clone(),
journal: self.journal.clone(),
status_store: self.status_store.clone(),
namespace: self.namespace.clone(),
parallel: self.parallel,
max_concurrency: self.max_concurrency,
node_timeout: self.node_timeout,
durability: self.durability,
}
}
}
#[derive(Clone, Debug)]
pub struct GraphExecution<State> {
pub state: State,
pub run_id: RunId,
pub graph_id: GraphId,
pub root_run_id: RunId,
pub parent_run_id: Option<RunId>,
pub child_runs: Vec<ChildRun>,
pub visited: Vec<NodeId>,
pub steps: usize,
pub interrupts: Vec<Interrupt>,
pub status: GraphRunStatus,
pub checkpoint_id: Option<CheckpointId>,
}
impl<State> GraphExecution<State> {
pub fn is_interrupted(&self) -> bool {
!self.interrupts.is_empty()
}
pub fn run_tree(&self) -> RunTree {
RunTree {
run_id: self.run_id.clone(),
root_run_id: self.root_run_id.clone(),
parent_run_id: self.parent_run_id.clone(),
children: self.child_runs.clone(),
}
}
}
#[derive(Clone, Debug)]
pub struct StateSnapshot<State> {
pub values: State,
pub next_nodes: Vec<NodeId>,
pub tasks: Vec<NodeId>,
pub config: CheckpointConfig,
pub metadata: CheckpointMetadata,
pub parent_config: Option<CheckpointConfig>,
pub pending_interrupts: Vec<Interrupt>,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum ResumeTarget {
Latest,
Checkpoint(String),
}