Skip to main content

pe_core/
node.rs

1//! Node traits — the unit of execution within a graph.
2//!
3//! Nodes are where the magic happens. Edges are plumbing.
4//! Based on Decisions 4, 11, 16 and Group 31 of the pre-plan.
5//!
6//! ## Execution context
7//!
8//! Every node receives a [`NodeContext`] alongside the state snapshot.
9//! This provides engine-computed, read-only metadata: step count,
10//! remaining steps, activation reason, and an extensible metadata map.
11//!
12//! Nodes that don't need context simply ignore the parameter.
13//! Nodes that care about convergence use it to decide when to wrap up.
14//!
15//! ## Optional matrix layer
16//!
17//! The graph engine works fully without the matrix. When enabled, the
18//! optional matrix layer observes execution — it lives between nodes,
19//! never inside them:
20//!
21//! - Nodes can optionally return [`ConvergenceSignal`] instead of plain updates
22//! - Without the matrix: `ConvergenceSignal` degrades to a normal `Update`
23//! - With the matrix: signals are observed to learn better routing paths
24//! - [`ActivationReason`] provides observability either way (debugging,
25//!   streaming, HITL visibility) — the matrix just uses it as extra training data
26//!
27//! Nodes stay pure computation units regardless of whether the matrix is active.
28
29use crate::error::PeError;
30use crate::lobe::LobeRuntimeServiceFactory;
31use crate::phase_store::PhaseStateStore;
32use crate::state::{State, StateUpdate};
33use serde::{Deserialize, Serialize};
34use std::any::Any;
35use std::collections::HashMap;
36use std::future::Future;
37use std::pin::Pin;
38use std::sync::Arc;
39
40/// Type-erased async future returned by nodes
41pub type NodeFuture<U> = Pin<Box<dyn Future<Output = NodeResult<U>> + Send>>;
42
43/// The core node trait — any async function that takes state and returns a result.
44/// Generated by the `#[node]` macro. Users never implement this manually.
45///
46/// Generic over S: State — works with any user-defined state struct.
47///
48/// # Context parameter
49///
50/// Every call receives a [`NodeContext`] with execution metadata.
51/// Simple nodes ignore it. Smart nodes use it to adapt behavior:
52///
53/// ```ignore
54/// fn call(&self, state: &MyState, ctx: &NodeContext) -> NodeFuture<MyUpdate> {
55///     Box::pin(async move {
56///         if ctx.is_last_step() {
57///             // No time for tool calls — give best answer now
58///             return NodeResult::Update(wrap_up(state));
59///         }
60///         // Normal execution...
61///     })
62/// }
63/// ```
64pub trait NodeFn<S: State>: Send + Sync {
65    /// Execute the node with a state snapshot and execution context.
66    ///
67    /// `ctx` provides engine-computed metadata (step, remaining steps,
68    /// activation reason). Read-only — the engine owns this data.
69    fn call(&self, state: &S, ctx: &NodeContext) -> NodeFuture<S::Update>;
70
71    /// Human-readable name for logging and debugging
72    fn name(&self) -> &str;
73}
74
75/// Execution context injected by the engine into every node call.
76///
77/// Read-only from the node's perspective. The engine computes these
78/// values before each superstep. Nodes use this to make informed
79/// decisions about execution strategy (e.g., wrap up on last step).
80///
81/// ## Extensibility
82///
83/// The `metadata` field is the extension point for the runtime layer:
84/// - **pe-runtime** (Plan 007): `agent_id`, `thread_id`, budget info
85/// - **Custom layers**: any `String → serde_json::Value` pair
86///
87/// ## Layering
88///
89/// `NodeContext` is for engine + runtime metadata only. The optional
90/// matrix layer does NOT inject values here — it operates between nodes
91/// (guiding routing), not inside them. Nodes never see matrix state.
92/// This keeps nodes as pure computation units with or without the matrix.
93#[derive(Clone)]
94pub struct NodeContext {
95    /// Current superstep number (1-indexed, increments each BSP cycle).
96    pub step: u32,
97
98    /// Maximum supersteps before `PeError::GraphRecursion`.
99    pub recursion_limit: u32,
100
101    /// Name of the node being executed.
102    pub node_name: String,
103
104    /// Why this node was activated in this superstep.
105    pub activation: ActivationReason,
106
107    /// Extensible metadata from higher layers (runtime, custom).
108    ///
109    /// pe-runtime (Plan 007) populates this with:
110    /// - `"agent_id"`: which agent owns this execution
111    /// - `"thread_id"`: conversation thread identifier
112    /// - `"budget_remaining"`: token/cost budget info
113    ///
114    /// Nodes read these opportunistically — if a key is missing, the
115    /// runtime layer isn't active. Graceful degradation.
116    pub metadata: HashMap<String, serde_json::Value>,
117
118    /// Phase state store for the interrupt/resume system.
119    ///
120    /// Populated by the Pregel engine from checkpoint data on resume.
121    /// Nodes using the `node!` DSL read their current phase from here.
122    /// Empty on initial runs (no checkpoint). The engine preserves
123    /// this across interrupt/resume boundaries via checkpoint serialization.
124    pub phase_store: PhaseStateStore,
125
126    /// Type-erased stream sender for the streaming layer (Plan 011).
127    ///
128    /// When streaming is active, pe-runtime injects an
129    /// `Arc<mpsc::Sender<StreamEvent>>` here. Nodes access it via
130    /// pe-runtime's `StreamWriter` helper — never directly.
131    ///
132    /// `None` when streaming is not active (the common non-streaming path).
133    pub stream_sender: Option<Arc<dyn Any + Send + Sync>>,
134
135    /// Optional tool observer for streaming tool call lifecycle events.
136    ///
137    /// When streaming is active, pe-runtime injects a `StreamingToolObserver`
138    /// that emits `ToolCallStarted`/`ToolCallCompleted`/`ToolCallFailed`.
139    /// pe-tools extracts this and calls it around each tool execution.
140    ///
141    /// `None` when streaming is not active.
142    pub tool_observer: Option<Arc<dyn ToolObserver>>,
143
144    /// Optional factory for creating runtime-owned services for lobes.
145    ///
146    /// When present, `LobeNode` can build source-attributed service handles
147    /// for the concrete lobe being executed.
148    pub lobe_runtime_service_factory: Option<Arc<dyn LobeRuntimeServiceFactory>>,
149}
150
151impl std::fmt::Debug for NodeContext {
152    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
153        f.debug_struct("NodeContext")
154            .field("step", &self.step)
155            .field("recursion_limit", &self.recursion_limit)
156            .field("node_name", &self.node_name)
157            .field("activation", &self.activation)
158            .field("has_stream_sender", &self.stream_sender.is_some())
159            .field("has_tool_observer", &self.tool_observer.is_some())
160            .field(
161                "has_lobe_runtime_service_factory",
162                &self.lobe_runtime_service_factory.is_some(),
163            )
164            .finish()
165    }
166}
167
168impl NodeContext {
169    /// How many supersteps remain before the recursion limit.
170    pub fn remaining_steps(&self) -> u32 {
171        self.recursion_limit.saturating_sub(self.step)
172    }
173
174    /// Whether this is the final superstep before recursion limit.
175    /// Nodes should wrap up and produce a final answer.
176    pub fn is_last_step(&self) -> bool {
177        self.step >= self.recursion_limit
178    }
179}
180
181/// Observer for node lifecycle events within the execution engine.
182///
183/// pe-graph calls these methods around each node execution. pe-runtime
184/// provides a streaming implementation that converts them to `StreamEvent`.
185///
186/// This is the extension point for phase-aware visibility. Without an
187/// observer, no overhead — the engine just skips the calls.
188///
189/// # Async note
190///
191/// Returns boxed futures because async trait methods are not yet stable
192/// in all contexts. Implementations should be lightweight (just send
193/// to a channel).
194pub trait NodeObserver: Send + Sync {
195    /// Called before a node starts executing.
196    fn on_node_start(
197        &self,
198        node_name: &str,
199        step: u32,
200    ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>>;
201
202    /// Called after a node completes successfully.
203    fn on_node_complete(
204        &self,
205        node_name: &str,
206        step: u32,
207        duration: std::time::Duration,
208    ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>>;
209
210    /// Called when a node fails with an error.
211    fn on_node_error(
212        &self,
213        node_name: &str,
214        step: u32,
215        error: &str,
216    ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>>;
217
218    /// Called before a retry attempt for a failed node.
219    fn on_node_retry(
220        &self,
221        _node_name: &str,
222        _step: u32,
223        _attempt: u32,
224        _max_attempts: u32,
225    ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
226        Box::pin(async {})
227    }
228}
229
230/// Observer for tool call lifecycle events.
231///
232/// pe-tools calls these methods around each tool execution. pe-runtime
233/// provides a streaming implementation that converts them to `StreamEvent`.
234///
235/// Injected into `NodeContext` alongside the stream sender. pe-tools
236/// extracts it from `NodeContext::tool_observer()`.
237pub trait ToolObserver: Send + Sync {
238    /// Called before a tool starts executing.
239    fn on_tool_start(
240        &self,
241        tool_name: &str,
242        input_summary: &str,
243    ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>>;
244
245    /// Called after a tool completes successfully.
246    fn on_tool_complete(
247        &self,
248        tool_name: &str,
249        duration: std::time::Duration,
250    ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>>;
251
252    /// Called when a tool fails with an error.
253    fn on_tool_error(
254        &self,
255        tool_name: &str,
256        error: &str,
257    ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>>;
258}
259
260/// Why a node was activated in a particular superstep.
261///
262/// Tracks the causal chain of execution. Useful standalone:
263/// - **Debugging**: trace execution flow through the graph
264/// - **HITL**: show humans WHY a node will run ("activated by edge from chat")
265/// - **Streaming**: emit activation events as they happen
266///
267/// The optional matrix layer also uses activation patterns as training data
268/// to learn which routing paths lead to convergence.
269#[derive(Debug, Clone, Serialize, Deserialize)]
270#[non_exhaustive]
271pub enum ActivationReason {
272    /// First nodes activated from START edges.
273    EntryPoint,
274
275    /// Activated by a fixed edge from a completed node.
276    Edge {
277        /// The node whose completion triggered this activation.
278        from: String,
279    },
280
281    /// Activated by a conditional edge (router selected this node).
282    ConditionalEdge {
283        /// The node whose router selected this activation.
284        from: String,
285    },
286
287    /// Resumed after an interrupt (human-in-the-loop).
288    Resume,
289
290    /// Re-executed after a retryable failure.
291    Retry {
292        /// Which retry attempt this is (1-based).
293        attempt: u32,
294    },
295}
296
297/// What a node can return — unified enum covering all execution outcomes.
298/// Every variant works standalone. `Converge` adds optional matrix data
299/// but degrades to `Update` automatically when the matrix layer isn't active.
300#[derive(Debug, Clone)]
301#[non_exhaustive]
302pub enum NodeResult<U: StateUpdate> {
303    /// Normal completion — apply this update to state
304    Update(U),
305
306    /// Pause for human input — checkpoint and wait
307    Interrupt(InterruptRequest<U>),
308
309    /// Unrecoverable error — propagate to caller
310    Error(PeError),
311
312    /// Optional convergence signal — carries quality/surprise metadata.
313    /// Without the matrix layer, degrades to Update(signal.partial_update).
314    /// With the matrix layer, the extra metadata guides learned routing.
315    Converge(ConvergenceSignal<U>),
316}
317
318impl<U: StateUpdate> NodeResult<U> {
319    /// Strip optional convergence metadata — Converge becomes plain Update.
320    /// Called automatically by the engine when the matrix layer is not active.
321    pub fn into_standard(self) -> NodeResult<U> {
322        match self {
323            Self::Converge(signal) => NodeResult::Update(signal.partial_update),
324            other => other,
325        }
326    }
327
328    /// Check if this result is an interrupt
329    pub fn is_interrupt(&self) -> bool {
330        matches!(self, NodeResult::Interrupt(_))
331    }
332
333    /// Check if this result is an error
334    pub fn is_error(&self) -> bool {
335        matches!(self, NodeResult::Error(_))
336    }
337}
338
339/// Request to pause execution for human input
340#[derive(Debug, Clone, Serialize, Deserialize)]
341pub struct InterruptRequest<U: StateUpdate> {
342    /// Why we're pausing — shown to the human
343    pub reason: String,
344
345    /// Partial update to apply BEFORE checkpointing (saves expensive work).
346    ///
347    /// NOTE: `#[serde(skip)]` is intentional — the partial update is applied
348    /// to state before the checkpoint is saved. On resume, the state already contains
349    /// this update. The field exists only to carry the update from the node to the
350    /// engine within a single execution, not across serialization boundaries.
351    #[serde(skip)]
352    pub partial_update: Option<U>,
353
354    /// Identifies which interrupt point this is (for resume matching)
355    pub resume_point: String,
356}
357
358/// Optional convergence signal — nodes can report quality metadata.
359/// Without the matrix layer, only `partial_update` is used (applied as a normal update).
360/// With the matrix layer, `actual_contribution`, `surprise`, and `quality` guide routing.
361#[derive(Debug, Clone, Serialize, Deserialize)]
362pub struct ConvergenceSignal<U: StateUpdate> {
363    /// How much this node contributed to task completion (0.0 - 1.0)
364    pub actual_contribution: f64,
365
366    /// How unexpected the result was (0.0 = expected, 1.0 = surprise)
367    pub surprise: f64,
368
369    /// Output quality estimate for C value update
370    pub quality: f64,
371
372    /// The state update to apply. Always used, regardless of matrix mode.
373    ///
374    /// NOTE: `#[serde(skip)]` — same rationale as `InterruptRequest::partial_update`.
375    /// Applied to state during the engine's UPDATE phase, already in the checkpoint.
376    #[serde(skip)]
377    pub partial_update: U,
378}
379
380/// Human input received after an interrupt
381#[derive(Debug, Clone, Serialize, Deserialize)]
382pub struct HumanInput {
383    pub approved: bool,
384    #[serde(default)]
385    pub feedback: Option<String>,
386    #[serde(default)]
387    pub data: Option<serde_json::Value>,
388}