Skip to main content

synaptic_graph/
compiled.rs

1use std::collections::{HashMap, HashSet};
2use std::hash::{Hash, Hasher};
3use std::pin::Pin;
4use std::sync::Arc;
5use std::time::{Duration, Instant};
6
7use futures::Stream;
8use serde_json::Value;
9use synaptic_core::SynapticError;
10use tokio::sync::RwLock;
11
12use crate::checkpoint::{Checkpoint, CheckpointConfig, Checkpointer};
13use crate::command::{CommandGoto, GraphResult, NodeOutput};
14use crate::edge::{ConditionalEdge, Edge};
15use crate::node::Node;
16use crate::state::State;
17use crate::END;
18
19/// Cache policy for node-level caching.
20#[derive(Debug, Clone)]
21pub struct CachePolicy {
22    /// Time-to-live for cached entries.
23    pub ttl: Duration,
24}
25
26impl CachePolicy {
27    /// Create a new cache policy with the given TTL.
28    pub fn new(ttl: Duration) -> Self {
29        Self { ttl }
30    }
31}
32
33/// Cached node output with expiry.
34pub(crate) struct CachedEntry<S: State> {
35    output: NodeOutput<S>,
36    created: Instant,
37    ttl: Duration,
38}
39
40impl<S: State> CachedEntry<S> {
41    fn is_valid(&self) -> bool {
42        self.created.elapsed() < self.ttl
43    }
44}
45
46/// Hash a serializable state to use as a cache key.
47fn hash_state(value: &Value) -> u64 {
48    let mut hasher = std::collections::hash_map::DefaultHasher::new();
49    let canonical = value.to_string();
50    canonical.hash(&mut hasher);
51    hasher.finish()
52}
53
54/// Controls what is yielded during graph streaming.
55#[derive(Debug, Clone, Copy, PartialEq, Eq)]
56pub enum StreamMode {
57    /// Yield full state after each node executes.
58    Values,
59    /// Yield only the delta (state before merge vs after, keyed by node name).
60    Updates,
61    /// Yield only AI messages from the state (useful for chat UIs).
62    Messages,
63    /// Yield detailed debug information including node timing.
64    Debug,
65    /// Yield custom events emitted via StreamWriter.
66    Custom,
67}
68
69/// An event yielded during graph streaming.
70#[derive(Debug, Clone)]
71pub struct GraphEvent<S> {
72    /// The node that just executed.
73    pub node: String,
74    /// The state snapshot (full state for Values mode, post-node state for Updates).
75    pub state: S,
76}
77
78/// An event yielded during multi-mode streaming, tagged with its stream mode.
79#[derive(Debug, Clone)]
80pub struct MultiGraphEvent<S> {
81    /// Which stream mode produced this event.
82    pub mode: StreamMode,
83    /// The underlying graph event.
84    pub event: GraphEvent<S>,
85}
86
87/// A stream of graph events.
88pub type GraphStream<'a, S> =
89    Pin<Box<dyn Stream<Item = Result<GraphEvent<S>, SynapticError>> + Send + 'a>>;
90
91/// A stream of multi-mode graph events.
92pub type MultiGraphStream<'a, S> =
93    Pin<Box<dyn Stream<Item = Result<MultiGraphEvent<S>, SynapticError>> + Send + 'a>>;
94
95/// The compiled, executable graph.
96pub struct CompiledGraph<S: State> {
97    pub(crate) nodes: HashMap<String, Box<dyn Node<S>>>,
98    pub(crate) edges: Vec<Edge>,
99    pub(crate) conditional_edges: Vec<ConditionalEdge<S>>,
100    pub(crate) entry_point: String,
101    pub(crate) interrupt_before: HashSet<String>,
102    pub(crate) interrupt_after: HashSet<String>,
103    pub(crate) checkpointer: Option<Arc<dyn Checkpointer>>,
104    /// Cache policies keyed by node name.
105    pub(crate) cache_policies: HashMap<String, CachePolicy>,
106    /// Node-level cache: node_name -> (state_hash -> cached_output).
107    #[expect(clippy::type_complexity)]
108    pub(crate) cache: Arc<RwLock<HashMap<String, HashMap<u64, CachedEntry<S>>>>>,
109    /// Nodes marked as deferred (wait for all incoming edges).
110    pub(crate) deferred: HashSet<String>,
111}
112
113impl<S: State> std::fmt::Debug for CompiledGraph<S> {
114    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
115        f.debug_struct("CompiledGraph")
116            .field("entry_point", &self.entry_point)
117            .field("node_count", &self.nodes.len())
118            .field("edge_count", &self.edges.len())
119            .field("conditional_edge_count", &self.conditional_edges.len())
120            .finish()
121    }
122}
123
124/// Internal helper: process a `NodeOutput` and return the next node to visit.
125/// Returns `(next_node_or_none, interrupt_value_or_none)`.
126fn handle_node_output<S: State>(
127    output: NodeOutput<S>,
128    state: &mut S,
129    current_node: &str,
130    find_next: impl Fn(&str, &S) -> String,
131) -> (Option<String>, Option<serde_json::Value>) {
132    match output {
133        NodeOutput::State(new_state) => {
134            *state = new_state;
135            (None, None) // use normal routing
136        }
137        NodeOutput::Command(cmd) => {
138            // Apply state update if present
139            if let Some(update) = cmd.update {
140                state.merge(update);
141            }
142
143            // Check for interrupt
144            if let Some(interrupt_value) = cmd.interrupt_value {
145                return (None, Some(interrupt_value));
146            }
147
148            // Determine routing
149            match cmd.goto {
150                Some(CommandGoto::One(target)) => (Some(target), None),
151                Some(CommandGoto::Many(_sends)) => {
152                    // Fan-out: for now, execute Send targets sequentially
153                    // Full parallel execution is handled in the main loop
154                    (Some("__fanout__".to_string()), None)
155                }
156                None => {
157                    let next = find_next(current_node, state);
158                    (Some(next), None)
159                }
160            }
161        }
162    }
163}
164
165/// Helper to serialize state into a checkpoint.
166fn make_checkpoint<S: serde::Serialize>(
167    state: &S,
168    next_node: Option<String>,
169    node_name: &str,
170) -> Result<Checkpoint, SynapticError> {
171    let state_val = serde_json::to_value(state)
172        .map_err(|e| SynapticError::Graph(format!("serialize state: {e}")))?;
173    Ok(Checkpoint::new(state_val, next_node).with_metadata("source", serde_json::json!(node_name)))
174}
175
176impl<S: State> CompiledGraph<S> {
177    /// Set a checkpointer for state persistence.
178    pub fn with_checkpointer(mut self, checkpointer: Arc<dyn Checkpointer>) -> Self {
179        self.checkpointer = Some(checkpointer);
180        self
181    }
182
183    /// Execute the graph with initial state.
184    pub async fn invoke(&self, state: S) -> Result<GraphResult<S>, SynapticError>
185    where
186        S: serde::Serialize + serde::de::DeserializeOwned,
187    {
188        self.invoke_with_config(state, None).await
189    }
190
191    /// Execute with optional checkpoint config for resumption.
192    pub async fn invoke_with_config(
193        &self,
194        mut state: S,
195        config: Option<CheckpointConfig>,
196    ) -> Result<GraphResult<S>, SynapticError>
197    where
198        S: serde::Serialize + serde::de::DeserializeOwned,
199    {
200        // If there's a checkpoint, try to resume from it
201        let mut resume_from: Option<String> = None;
202        if let (Some(ref checkpointer), Some(ref cfg)) = (&self.checkpointer, &config) {
203            if let Some(checkpoint) = checkpointer.get(cfg).await? {
204                state = serde_json::from_value(checkpoint.state).map_err(|e| {
205                    SynapticError::Graph(format!("failed to deserialize checkpoint state: {e}"))
206                })?;
207                resume_from = checkpoint.next_node;
208            }
209        }
210
211        let mut current_node = resume_from.unwrap_or_else(|| self.entry_point.clone());
212        let mut max_iterations = 100; // safety guard
213
214        loop {
215            if current_node == END {
216                break;
217            }
218            if max_iterations == 0 {
219                return Err(SynapticError::Graph(
220                    "max iterations (100) exceeded — possible infinite loop".to_string(),
221                ));
222            }
223            max_iterations -= 1;
224
225            // Check interrupt_before
226            if self.interrupt_before.contains(&current_node) {
227                if let (Some(ref checkpointer), Some(ref cfg)) = (&self.checkpointer, &config) {
228                    let checkpoint =
229                        make_checkpoint(&state, Some(current_node.clone()), &current_node)?;
230                    checkpointer.put(cfg, &checkpoint).await?;
231                }
232                return Ok(GraphResult::Interrupted {
233                    state,
234                    interrupt_value: serde_json::json!({
235                        "reason": format!("interrupted before node '{current_node}'")
236                    }),
237                });
238            }
239
240            // Execute node (with optional cache)
241            let node = self
242                .nodes
243                .get(&current_node)
244                .ok_or_else(|| SynapticError::Graph(format!("node '{current_node}' not found")))?;
245            let output = self
246                .execute_with_cache(&current_node, node.as_ref(), state.clone())
247                .await?;
248
249            // Handle the output
250            let (next_override, interrupt_value) =
251                handle_node_output(output, &mut state, &current_node, |cur, s| {
252                    self.find_next_node(cur, s)
253                });
254
255            // Check for interrupt from Command
256            if let Some(interrupt_val) = interrupt_value {
257                if let (Some(ref checkpointer), Some(ref cfg)) = (&self.checkpointer, &config) {
258                    let next = self.find_next_node(&current_node, &state);
259                    let checkpoint = make_checkpoint(&state, Some(next), &current_node)?;
260                    checkpointer.put(cfg, &checkpoint).await?;
261                }
262                return Ok(GraphResult::Interrupted {
263                    state,
264                    interrupt_value: interrupt_val,
265                });
266            }
267
268            // Handle fan-out (Send)
269            if next_override.as_deref() == Some("__fanout__") {
270                // TODO: full parallel fan-out
271                break;
272            }
273
274            let next = if let Some(target) = next_override {
275                target
276            } else {
277                // Check interrupt_after (only when no command override)
278                if self.interrupt_after.contains(&current_node) {
279                    let next = self.find_next_node(&current_node, &state);
280                    if let (Some(ref checkpointer), Some(ref cfg)) = (&self.checkpointer, &config) {
281                        let checkpoint = make_checkpoint(&state, Some(next), &current_node)?;
282                        checkpointer.put(cfg, &checkpoint).await?;
283                    }
284                    return Ok(GraphResult::Interrupted {
285                        state,
286                        interrupt_value: serde_json::json!({
287                            "reason": format!("interrupted after node '{current_node}'")
288                        }),
289                    });
290                }
291
292                // Normal routing
293                self.find_next_node(&current_node, &state)
294            };
295
296            // Save checkpoint after each node
297            if let (Some(ref checkpointer), Some(ref cfg)) = (&self.checkpointer, &config) {
298                let checkpoint = make_checkpoint(&state, Some(next.clone()), &current_node)?;
299                checkpointer.put(cfg, &checkpoint).await?;
300            }
301
302            current_node = next;
303        }
304
305        Ok(GraphResult::Complete(state))
306    }
307
308    /// Stream graph execution, yielding a `GraphEvent` after each node.
309    pub fn stream(&self, state: S, mode: StreamMode) -> GraphStream<'_, S>
310    where
311        S: serde::Serialize + serde::de::DeserializeOwned + Clone,
312    {
313        self.stream_with_config(state, mode, None)
314    }
315
316    /// Stream graph execution with optional checkpoint config.
317    pub fn stream_with_config(
318        &self,
319        state: S,
320        _mode: StreamMode,
321        config: Option<CheckpointConfig>,
322    ) -> GraphStream<'_, S>
323    where
324        S: serde::Serialize + serde::de::DeserializeOwned + Clone,
325    {
326        Box::pin(async_stream::stream! {
327            let mut state = state;
328
329            // If there's a checkpoint, try to resume from it
330            let mut resume_from: Option<String> = None;
331            if let (Some(ref checkpointer), Some(ref cfg)) = (&self.checkpointer, &config) {
332                match checkpointer.get(cfg).await {
333                    Ok(Some(checkpoint)) => {
334                        match serde_json::from_value(checkpoint.state) {
335                            Ok(s) => {
336                                state = s;
337                                resume_from = checkpoint.next_node;
338                            }
339                            Err(e) => {
340                                yield Err(SynapticError::Graph(format!(
341                                    "failed to deserialize checkpoint state: {e}"
342                                )));
343                                return;
344                            }
345                        }
346                    }
347                    Ok(None) => {}
348                    Err(e) => {
349                        yield Err(e);
350                        return;
351                    }
352                }
353            }
354
355            let mut current_node = resume_from.unwrap_or_else(|| self.entry_point.clone());
356            let mut max_iterations = 100;
357
358            loop {
359                if current_node == END {
360                    break;
361                }
362                if max_iterations == 0 {
363                    yield Err(SynapticError::Graph(
364                        "max iterations (100) exceeded — possible infinite loop".to_string(),
365                    ));
366                    return;
367                }
368                max_iterations -= 1;
369
370                // Check interrupt_before
371                if self.interrupt_before.contains(&current_node) {
372                    if let (Some(ref checkpointer), Some(ref cfg)) = (&self.checkpointer, &config) {
373                        match make_checkpoint(&state, Some(current_node.clone()), &current_node) {
374                            Ok(checkpoint) => {
375                                if let Err(e) = checkpointer.put(cfg, &checkpoint).await {
376                                    yield Err(e);
377                                    return;
378                                }
379                            }
380                            Err(e) => {
381                                yield Err(e);
382                                return;
383                            }
384                        }
385                    }
386                    yield Err(SynapticError::Graph(format!(
387                        "interrupted before node '{current_node}'"
388                    )));
389                    return;
390                }
391
392                // Execute node
393                let node = match self.nodes.get(&current_node) {
394                    Some(n) => n,
395                    None => {
396                        yield Err(SynapticError::Graph(format!("node '{current_node}' not found")));
397                        return;
398                    }
399                };
400
401                let output = match node.process(state.clone()).await {
402                    Ok(o) => o,
403                    Err(e) => {
404                        yield Err(e);
405                        return;
406                    }
407                };
408
409                // Handle the node output
410                let mut interrupt_val = None;
411                let next_override = match output {
412                    NodeOutput::State(new_state) => {
413                        state = new_state;
414                        None
415                    }
416                    NodeOutput::Command(cmd) => {
417                        if let Some(update) = cmd.update {
418                            state.merge(update);
419                        }
420
421                        if let Some(iv) = cmd.interrupt_value {
422                            interrupt_val = Some(iv);
423                            None
424                        } else {
425                            match cmd.goto {
426                                Some(CommandGoto::One(target)) => Some(target),
427                                Some(CommandGoto::Many(_)) => Some(END.to_string()),
428                                None => None,
429                            }
430                        }
431                    }
432                };
433
434                // Yield event
435                let event = GraphEvent {
436                    node: current_node.clone(),
437                    state: state.clone(),
438                };
439                yield Ok(event);
440
441                // Check for interrupt from Command
442                if let Some(iv) = interrupt_val {
443                    if let (Some(ref checkpointer), Some(ref cfg)) = (&self.checkpointer, &config) {
444                        let next = self.find_next_node(&current_node, &state);
445                        match make_checkpoint(&state, Some(next), &current_node) {
446                            Ok(checkpoint) => {
447                                if let Err(e) = checkpointer.put(cfg, &checkpoint).await {
448                                    yield Err(e);
449                                    return;
450                                }
451                            }
452                            Err(e) => {
453                                yield Err(e);
454                                return;
455                            }
456                        }
457                    }
458                    yield Err(SynapticError::Graph(format!(
459                        "interrupted by node '{current_node}': {iv}"
460                    )));
461                    return;
462                }
463
464                let next = if let Some(target) = next_override {
465                    target
466                } else {
467                    // Check interrupt_after (only when no command override)
468                    if self.interrupt_after.contains(&current_node) {
469                        let next = self.find_next_node(&current_node, &state);
470                        if let (Some(ref checkpointer), Some(ref cfg)) = (&self.checkpointer, &config) {
471                            match make_checkpoint(&state, Some(next), &current_node) {
472                                Ok(checkpoint) => {
473                                    if let Err(e) = checkpointer.put(cfg, &checkpoint).await {
474                                        yield Err(e);
475                                        return;
476                                    }
477                                }
478                                Err(e) => {
479                                    yield Err(e);
480                                    return;
481                                }
482                            }
483                        }
484                        yield Err(SynapticError::Graph(format!(
485                            "interrupted after node '{current_node}'"
486                        )));
487                        return;
488                    }
489
490                    // Find next node via normal edge routing
491                    self.find_next_node(&current_node, &state)
492                };
493
494                // Save checkpoint
495                if let (Some(ref checkpointer), Some(ref cfg)) = (&self.checkpointer, &config) {
496                    match make_checkpoint(&state, Some(next.clone()), &current_node) {
497                        Ok(checkpoint) => {
498                            if let Err(e) = checkpointer.put(cfg, &checkpoint).await {
499                                yield Err(e);
500                                return;
501                            }
502                        }
503                        Err(e) => {
504                            yield Err(e);
505                            return;
506                        }
507                    }
508                }
509
510                current_node = next;
511            }
512        })
513    }
514
515    /// Stream graph execution with multiple stream modes.
516    ///
517    /// Each event is tagged with the `StreamMode` that produced it.
518    /// For a single node execution, one event per requested mode is emitted.
519    pub fn stream_modes(&self, state: S, modes: Vec<StreamMode>) -> MultiGraphStream<'_, S>
520    where
521        S: serde::Serialize + serde::de::DeserializeOwned + Clone,
522    {
523        self.stream_modes_with_config(state, modes, None)
524    }
525
526    /// Stream graph execution with multiple stream modes and optional checkpoint config.
527    pub fn stream_modes_with_config(
528        &self,
529        state: S,
530        modes: Vec<StreamMode>,
531        config: Option<CheckpointConfig>,
532    ) -> MultiGraphStream<'_, S>
533    where
534        S: serde::Serialize + serde::de::DeserializeOwned + Clone,
535    {
536        Box::pin(async_stream::stream! {
537            let mut state = state;
538
539            // If there's a checkpoint, try to resume from it
540            let mut resume_from: Option<String> = None;
541            if let (Some(ref checkpointer), Some(ref cfg)) = (&self.checkpointer, &config) {
542                match checkpointer.get(cfg).await {
543                    Ok(Some(checkpoint)) => {
544                        match serde_json::from_value(checkpoint.state) {
545                            Ok(s) => {
546                                state = s;
547                                resume_from = checkpoint.next_node;
548                            }
549                            Err(e) => {
550                                yield Err(SynapticError::Graph(format!(
551                                    "failed to deserialize checkpoint state: {e}"
552                                )));
553                                return;
554                            }
555                        }
556                    }
557                    Ok(None) => {}
558                    Err(e) => {
559                        yield Err(e);
560                        return;
561                    }
562                }
563            }
564
565            let mut current_node = resume_from.unwrap_or_else(|| self.entry_point.clone());
566            let mut max_iterations = 100;
567
568            loop {
569                if current_node == END {
570                    break;
571                }
572                if max_iterations == 0 {
573                    yield Err(SynapticError::Graph(
574                        "max iterations (100) exceeded — possible infinite loop".to_string(),
575                    ));
576                    return;
577                }
578                max_iterations -= 1;
579
580                // Check interrupt_before
581                if self.interrupt_before.contains(&current_node) {
582                    if let (Some(ref checkpointer), Some(ref cfg)) = (&self.checkpointer, &config) {
583                        match make_checkpoint(&state, Some(current_node.clone()), &current_node) {
584                            Ok(checkpoint) => {
585                                if let Err(e) = checkpointer.put(cfg, &checkpoint).await {
586                                    yield Err(e);
587                                    return;
588                                }
589                            }
590                            Err(e) => {
591                                yield Err(e);
592                                return;
593                            }
594                        }
595                    }
596                    yield Err(SynapticError::Graph(format!(
597                        "interrupted before node '{current_node}'"
598                    )));
599                    return;
600                }
601
602                // Snapshot state before node execution (for Updates mode diff)
603                let state_before = state.clone();
604
605                // Execute node
606                let node = match self.nodes.get(&current_node) {
607                    Some(n) => n,
608                    None => {
609                        yield Err(SynapticError::Graph(format!("node '{current_node}' not found")));
610                        return;
611                    }
612                };
613
614                let output = match node.process(state.clone()).await {
615                    Ok(o) => o,
616                    Err(e) => {
617                        yield Err(e);
618                        return;
619                    }
620                };
621
622                // Handle the node output
623                let mut interrupt_val = None;
624                let next_override = match output {
625                    NodeOutput::State(new_state) => {
626                        state = new_state;
627                        None
628                    }
629                    NodeOutput::Command(cmd) => {
630                        if let Some(update) = cmd.update {
631                            state.merge(update);
632                        }
633
634                        if let Some(iv) = cmd.interrupt_value {
635                            interrupt_val = Some(iv);
636                            None
637                        } else {
638                            match cmd.goto {
639                                Some(CommandGoto::One(target)) => Some(target),
640                                Some(CommandGoto::Many(_)) => Some(END.to_string()),
641                                None => None,
642                            }
643                        }
644                    }
645                };
646
647                // Yield events for each requested mode
648                for mode in &modes {
649                    let event = match mode {
650                        StreamMode::Values | StreamMode::Debug | StreamMode::Custom => {
651                            // Full state after node execution
652                            GraphEvent {
653                                node: current_node.clone(),
654                                state: state.clone(),
655                            }
656                        }
657                        StreamMode::Updates => {
658                            // State before node (the "delta" is the difference)
659                            // For Updates, we yield the pre-node state so callers
660                            // can diff against the full Values event
661                            GraphEvent {
662                                node: current_node.clone(),
663                                state: state_before.clone(),
664                            }
665                        }
666                        StreamMode::Messages => {
667                            // Same as Values — callers filter for AI messages
668                            GraphEvent {
669                                node: current_node.clone(),
670                                state: state.clone(),
671                            }
672                        }
673                    };
674                    yield Ok(MultiGraphEvent {
675                        mode: *mode,
676                        event,
677                    });
678                }
679
680                // Check for interrupt from Command
681                if let Some(iv) = interrupt_val {
682                    if let (Some(ref checkpointer), Some(ref cfg)) = (&self.checkpointer, &config) {
683                        let next = self.find_next_node(&current_node, &state);
684                        match make_checkpoint(&state, Some(next), &current_node) {
685                            Ok(checkpoint) => {
686                                if let Err(e) = checkpointer.put(cfg, &checkpoint).await {
687                                    yield Err(e);
688                                    return;
689                                }
690                            }
691                            Err(e) => {
692                                yield Err(e);
693                                return;
694                            }
695                        }
696                    }
697                    yield Err(SynapticError::Graph(format!(
698                        "interrupted by node '{current_node}': {iv}"
699                    )));
700                    return;
701                }
702
703                let next = if let Some(target) = next_override {
704                    target
705                } else {
706                    // Check interrupt_after (only when no command override)
707                    if self.interrupt_after.contains(&current_node) {
708                        let next = self.find_next_node(&current_node, &state);
709                        if let (Some(ref checkpointer), Some(ref cfg)) = (&self.checkpointer, &config) {
710                            match make_checkpoint(&state, Some(next), &current_node) {
711                                Ok(checkpoint) => {
712                                    if let Err(e) = checkpointer.put(cfg, &checkpoint).await {
713                                        yield Err(e);
714                                        return;
715                                    }
716                                }
717                                Err(e) => {
718                                    yield Err(e);
719                                    return;
720                                }
721                            }
722                        }
723                        yield Err(SynapticError::Graph(format!(
724                            "interrupted after node '{current_node}'"
725                        )));
726                        return;
727                    }
728
729                    // Find next node via normal edge routing
730                    self.find_next_node(&current_node, &state)
731                };
732
733                // Save checkpoint
734                if let (Some(ref checkpointer), Some(ref cfg)) = (&self.checkpointer, &config) {
735                    match make_checkpoint(&state, Some(next.clone()), &current_node) {
736                        Ok(checkpoint) => {
737                            if let Err(e) = checkpointer.put(cfg, &checkpoint).await {
738                                yield Err(e);
739                                return;
740                            }
741                        }
742                        Err(e) => {
743                            yield Err(e);
744                            return;
745                        }
746                    }
747                }
748
749                current_node = next;
750            }
751        })
752    }
753
754    /// Update state on an interrupted graph (for human-in-the-loop).
755    pub async fn update_state(
756        &self,
757        config: &CheckpointConfig,
758        update: S,
759    ) -> Result<(), SynapticError>
760    where
761        S: serde::Serialize + serde::de::DeserializeOwned,
762    {
763        let checkpointer = self
764            .checkpointer
765            .as_ref()
766            .ok_or_else(|| SynapticError::Graph("no checkpointer configured".to_string()))?;
767
768        let checkpoint = checkpointer
769            .get(config)
770            .await?
771            .ok_or_else(|| SynapticError::Graph("no checkpoint found".to_string()))?;
772
773        let mut current_state: S = serde_json::from_value(checkpoint.state)
774            .map_err(|e| SynapticError::Graph(format!("deserialize: {e}")))?;
775
776        current_state.merge(update);
777
778        let updated = Checkpoint::new(
779            serde_json::to_value(&current_state)
780                .map_err(|e| SynapticError::Graph(format!("serialize: {e}")))?,
781            checkpoint.next_node,
782        )
783        .with_metadata("source", serde_json::json!("update_state"));
784        checkpointer.put(config, &updated).await?;
785
786        Ok(())
787    }
788
789    /// Get the current state for a thread from the checkpointer.
790    ///
791    /// Returns `None` if no checkpoint exists for the given thread.
792    pub async fn get_state(&self, config: &CheckpointConfig) -> Result<Option<S>, SynapticError>
793    where
794        S: serde::de::DeserializeOwned,
795    {
796        let checkpointer = self
797            .checkpointer
798            .as_ref()
799            .ok_or_else(|| SynapticError::Graph("no checkpointer configured".to_string()))?;
800
801        match checkpointer.get(config).await? {
802            Some(checkpoint) => {
803                let state: S = serde_json::from_value(checkpoint.state).map_err(|e| {
804                    SynapticError::Graph(format!("failed to deserialize checkpoint state: {e}"))
805                })?;
806                Ok(Some(state))
807            }
808            None => Ok(None),
809        }
810    }
811
812    /// Get the state history for a thread (all checkpoints).
813    ///
814    /// Returns a list of `(state, next_node)` pairs, ordered from oldest to newest.
815    pub async fn get_state_history(
816        &self,
817        config: &CheckpointConfig,
818    ) -> Result<Vec<(S, Option<String>)>, SynapticError>
819    where
820        S: serde::de::DeserializeOwned,
821    {
822        let checkpointer = self
823            .checkpointer
824            .as_ref()
825            .ok_or_else(|| SynapticError::Graph("no checkpointer configured".to_string()))?;
826
827        let checkpoints = checkpointer.list(config).await?;
828        let mut history = Vec::with_capacity(checkpoints.len());
829
830        for checkpoint in checkpoints {
831            let state: S = serde_json::from_value(checkpoint.state).map_err(|e| {
832                SynapticError::Graph(format!("failed to deserialize checkpoint state: {e}"))
833            })?;
834            history.push((state, checkpoint.next_node));
835        }
836
837        Ok(history)
838    }
839
840    /// Execute a node, using cache if a CachePolicy is set for it.
841    async fn execute_with_cache(
842        &self,
843        node_name: &str,
844        node: &dyn Node<S>,
845        state: S,
846    ) -> Result<NodeOutput<S>, SynapticError>
847    where
848        S: serde::Serialize,
849    {
850        let policy = self.cache_policies.get(node_name);
851        if policy.is_none() {
852            return node.process(state).await;
853        }
854        let policy = policy.unwrap();
855
856        // Compute state hash for cache key
857        let state_val = serde_json::to_value(&state)
858            .map_err(|e| SynapticError::Graph(format!("cache: serialize state: {e}")))?;
859        let key = hash_state(&state_val);
860
861        // Check cache
862        {
863            let cache = self.cache.read().await;
864            if let Some(node_cache) = cache.get(node_name) {
865                if let Some(entry) = node_cache.get(&key) {
866                    if entry.is_valid() {
867                        return Ok(entry.output.clone());
868                    }
869                }
870            }
871        }
872
873        // Cache miss — execute the node
874        let output = node.process(state).await?;
875
876        // Store in cache
877        {
878            let mut cache = self.cache.write().await;
879            let node_cache = cache.entry(node_name.to_string()).or_default();
880            node_cache.insert(
881                key,
882                CachedEntry {
883                    output: output.clone(),
884                    created: Instant::now(),
885                    ttl: policy.ttl,
886                },
887            );
888        }
889
890        Ok(output)
891    }
892
893    /// Returns true if the given node is deferred (waits for all incoming paths).
894    pub fn is_deferred(&self, node_name: &str) -> bool {
895        self.deferred.contains(node_name)
896    }
897
898    /// Returns the number of incoming edges (fixed + conditional) for a node.
899    pub fn incoming_edge_count(&self, node_name: &str) -> usize {
900        let fixed = self.edges.iter().filter(|e| e.target == node_name).count();
901        // Conditional edges may route to this node but we can't statically count them,
902        // so we count the path_map entries that reference this node.
903        let conditional = self
904            .conditional_edges
905            .iter()
906            .filter_map(|ce| ce.path_map.as_ref())
907            .flat_map(|pm| pm.values())
908            .filter(|target| *target == node_name)
909            .count();
910        fixed + conditional
911    }
912
913    fn find_next_node(&self, current: &str, state: &S) -> String {
914        // Check conditional edges first
915        for ce in &self.conditional_edges {
916            if ce.source == current {
917                return (ce.router)(state);
918            }
919        }
920
921        // Check fixed edges
922        for edge in &self.edges {
923            if edge.source == current {
924                return edge.target.clone();
925            }
926        }
927
928        // No outgoing edge means END
929        END.to_string()
930    }
931}