pe_core/node.rs
1//! Node traits — the unit of execution within a graph.
2//!
3//! Nodes are where the magic happens. Edges are plumbing.
4//! Based on Decisions 4, 11, 16 and Group 31 of the pre-plan.
5//!
6//! ## Execution context
7//!
8//! Every node receives a [`NodeContext`] alongside the state snapshot.
9//! This provides engine-computed, read-only metadata: step count,
10//! remaining steps, activation reason, and an extensible metadata map.
11//!
12//! Nodes that don't need context simply ignore the parameter.
13//! Nodes that care about convergence use it to decide when to wrap up.
14//!
15//! ## Optional matrix layer
16//!
17//! The graph engine works fully without the matrix. When enabled, the
18//! optional matrix layer observes execution — it lives between nodes,
19//! never inside them:
20//!
21//! - Nodes can optionally return [`ConvergenceSignal`] instead of plain updates
22//! - Without the matrix: `ConvergenceSignal` degrades to a normal `Update`
23//! - With the matrix: signals are observed to learn better routing paths
24//! - [`ActivationReason`] provides observability either way (debugging,
25//! streaming, HITL visibility) — the matrix just uses it as extra training data
26//!
27//! Nodes stay pure computation units regardless of whether the matrix is active.
28
29use crate::error::PeError;
30use crate::lobe::LobeRuntimeServiceFactory;
31use crate::phase_store::PhaseStateStore;
32use crate::state::{State, StateUpdate};
33use serde::{Deserialize, Serialize};
34use std::any::Any;
35use std::collections::HashMap;
36use std::future::Future;
37use std::pin::Pin;
38use std::sync::Arc;
39
40/// Type-erased async future returned by nodes
41pub type NodeFuture<U> = Pin<Box<dyn Future<Output = NodeResult<U>> + Send>>;
42
43/// The core node trait — any async function that takes state and returns a result.
44/// Generated by the `#[node]` macro. Users never implement this manually.
45///
46/// Generic over S: State — works with any user-defined state struct.
47///
48/// # Context parameter
49///
50/// Every call receives a [`NodeContext`] with execution metadata.
51/// Simple nodes ignore it. Smart nodes use it to adapt behavior:
52///
53/// ```ignore
54/// fn call(&self, state: &MyState, ctx: &NodeContext) -> NodeFuture<MyUpdate> {
55/// Box::pin(async move {
56/// if ctx.is_last_step() {
57/// // No time for tool calls — give best answer now
58/// return NodeResult::Update(wrap_up(state));
59/// }
60/// // Normal execution...
61/// })
62/// }
63/// ```
64pub trait NodeFn<S: State>: Send + Sync {
65 /// Execute the node with a state snapshot and execution context.
66 ///
67 /// `ctx` provides engine-computed metadata (step, remaining steps,
68 /// activation reason). Read-only — the engine owns this data.
69 fn call(&self, state: &S, ctx: &NodeContext) -> NodeFuture<S::Update>;
70
71 /// Human-readable name for logging and debugging
72 fn name(&self) -> &str;
73}
74
75/// Execution context injected by the engine into every node call.
76///
77/// Read-only from the node's perspective. The engine computes these
78/// values before each superstep. Nodes use this to make informed
79/// decisions about execution strategy (e.g., wrap up on last step).
80///
81/// ## Extensibility
82///
83/// The `metadata` field is the extension point for the runtime layer:
84/// - **pe-runtime** (Plan 007): `agent_id`, `thread_id`, budget info
85/// - **Custom layers**: any `String → serde_json::Value` pair
86///
87/// ## Layering
88///
89/// `NodeContext` is for engine + runtime metadata only. The optional
90/// matrix layer does NOT inject values here — it operates between nodes
91/// (guiding routing), not inside them. Nodes never see matrix state.
92/// This keeps nodes as pure computation units with or without the matrix.
93#[derive(Clone)]
94pub struct NodeContext {
95 /// Current superstep number (1-indexed, increments each BSP cycle).
96 pub step: u32,
97
98 /// Maximum supersteps before `PeError::GraphRecursion`.
99 pub recursion_limit: u32,
100
101 /// Name of the node being executed.
102 pub node_name: String,
103
104 /// Why this node was activated in this superstep.
105 pub activation: ActivationReason,
106
107 /// Extensible metadata from higher layers (runtime, custom).
108 ///
109 /// pe-runtime (Plan 007) populates this with:
110 /// - `"agent_id"`: which agent owns this execution
111 /// - `"thread_id"`: conversation thread identifier
112 /// - `"budget_remaining"`: token/cost budget info
113 ///
114 /// Nodes read these opportunistically — if a key is missing, the
115 /// runtime layer isn't active. Graceful degradation.
116 pub metadata: HashMap<String, serde_json::Value>,
117
118 /// Phase state store for the interrupt/resume system.
119 ///
120 /// Populated by the Pregel engine from checkpoint data on resume.
121 /// Nodes using the `node!` DSL read their current phase from here.
122 /// Empty on initial runs (no checkpoint). The engine preserves
123 /// this across interrupt/resume boundaries via checkpoint serialization.
124 pub phase_store: PhaseStateStore,
125
126 /// Type-erased stream sender for the streaming layer (Plan 011).
127 ///
128 /// When streaming is active, pe-runtime injects an
129 /// `Arc<mpsc::Sender<StreamEvent>>` here. Nodes access it via
130 /// pe-runtime's `StreamWriter` helper — never directly.
131 ///
132 /// `None` when streaming is not active (the common non-streaming path).
133 pub stream_sender: Option<Arc<dyn Any + Send + Sync>>,
134
135 /// Optional tool observer for streaming tool call lifecycle events.
136 ///
137 /// When streaming is active, pe-runtime injects a `StreamingToolObserver`
138 /// that emits `ToolCallStarted`/`ToolCallCompleted`/`ToolCallFailed`.
139 /// pe-tools extracts this and calls it around each tool execution.
140 ///
141 /// `None` when streaming is not active.
142 pub tool_observer: Option<Arc<dyn ToolObserver>>,
143
144 /// Optional factory for creating runtime-owned services for lobes.
145 ///
146 /// When present, `LobeNode` can build source-attributed service handles
147 /// for the concrete lobe being executed.
148 pub lobe_runtime_service_factory: Option<Arc<dyn LobeRuntimeServiceFactory>>,
149}
150
151impl std::fmt::Debug for NodeContext {
152 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
153 f.debug_struct("NodeContext")
154 .field("step", &self.step)
155 .field("recursion_limit", &self.recursion_limit)
156 .field("node_name", &self.node_name)
157 .field("activation", &self.activation)
158 .field("has_stream_sender", &self.stream_sender.is_some())
159 .field("has_tool_observer", &self.tool_observer.is_some())
160 .field(
161 "has_lobe_runtime_service_factory",
162 &self.lobe_runtime_service_factory.is_some(),
163 )
164 .finish()
165 }
166}
167
168impl NodeContext {
169 /// How many supersteps remain before the recursion limit.
170 pub fn remaining_steps(&self) -> u32 {
171 self.recursion_limit.saturating_sub(self.step)
172 }
173
174 /// Whether this is the final superstep before recursion limit.
175 /// Nodes should wrap up and produce a final answer.
176 pub fn is_last_step(&self) -> bool {
177 self.step >= self.recursion_limit
178 }
179}
180
181/// Observer for node lifecycle events within the execution engine.
182///
183/// pe-graph calls these methods around each node execution. pe-runtime
184/// provides a streaming implementation that converts them to `StreamEvent`.
185///
186/// This is the extension point for phase-aware visibility. Without an
187/// observer, no overhead — the engine just skips the calls.
188///
189/// # Async note
190///
191/// Returns boxed futures because async trait methods are not yet stable
192/// in all contexts. Implementations should be lightweight (just send
193/// to a channel).
194pub trait NodeObserver: Send + Sync {
195 /// Called before a node starts executing.
196 fn on_node_start(
197 &self,
198 node_name: &str,
199 step: u32,
200 ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>>;
201
202 /// Called after a node completes successfully.
203 fn on_node_complete(
204 &self,
205 node_name: &str,
206 step: u32,
207 duration: std::time::Duration,
208 ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>>;
209
210 /// Called when a node fails with an error.
211 fn on_node_error(
212 &self,
213 node_name: &str,
214 step: u32,
215 error: &str,
216 ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>>;
217
218 /// Called before a retry attempt for a failed node.
219 fn on_node_retry(
220 &self,
221 _node_name: &str,
222 _step: u32,
223 _attempt: u32,
224 _max_attempts: u32,
225 ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
226 Box::pin(async {})
227 }
228}
229
230/// Observer for tool call lifecycle events.
231///
232/// pe-tools calls these methods around each tool execution. pe-runtime
233/// provides a streaming implementation that converts them to `StreamEvent`.
234///
235/// Injected into `NodeContext` alongside the stream sender. pe-tools
236/// extracts it from `NodeContext::tool_observer()`.
237pub trait ToolObserver: Send + Sync {
238 /// Called before a tool starts executing.
239 fn on_tool_start(
240 &self,
241 tool_name: &str,
242 input_summary: &str,
243 ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>>;
244
245 /// Called after a tool completes successfully.
246 fn on_tool_complete(
247 &self,
248 tool_name: &str,
249 duration: std::time::Duration,
250 ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>>;
251
252 /// Called when a tool fails with an error.
253 fn on_tool_error(
254 &self,
255 tool_name: &str,
256 error: &str,
257 ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>>;
258}
259
260/// Why a node was activated in a particular superstep.
261///
262/// Tracks the causal chain of execution. Useful standalone:
263/// - **Debugging**: trace execution flow through the graph
264/// - **HITL**: show humans WHY a node will run ("activated by edge from chat")
265/// - **Streaming**: emit activation events as they happen
266///
267/// The optional matrix layer also uses activation patterns as training data
268/// to learn which routing paths lead to convergence.
269#[derive(Debug, Clone, Serialize, Deserialize)]
270#[non_exhaustive]
271pub enum ActivationReason {
272 /// First nodes activated from START edges.
273 EntryPoint,
274
275 /// Activated by a fixed edge from a completed node.
276 Edge {
277 /// The node whose completion triggered this activation.
278 from: String,
279 },
280
281 /// Activated by a conditional edge (router selected this node).
282 ConditionalEdge {
283 /// The node whose router selected this activation.
284 from: String,
285 },
286
287 /// Resumed after an interrupt (human-in-the-loop).
288 Resume,
289
290 /// Re-executed after a retryable failure.
291 Retry {
292 /// Which retry attempt this is (1-based).
293 attempt: u32,
294 },
295}
296
297/// What a node can return — unified enum covering all execution outcomes.
298/// Every variant works standalone. `Converge` adds optional matrix data
299/// but degrades to `Update` automatically when the matrix layer isn't active.
300#[derive(Debug, Clone)]
301#[non_exhaustive]
302pub enum NodeResult<U: StateUpdate> {
303 /// Normal completion — apply this update to state
304 Update(U),
305
306 /// Pause for human input — checkpoint and wait
307 Interrupt(InterruptRequest<U>),
308
309 /// Unrecoverable error — propagate to caller
310 Error(PeError),
311
312 /// Optional convergence signal — carries quality/surprise metadata.
313 /// Without the matrix layer, degrades to Update(signal.partial_update).
314 /// With the matrix layer, the extra metadata guides learned routing.
315 Converge(ConvergenceSignal<U>),
316}
317
318impl<U: StateUpdate> NodeResult<U> {
319 /// Strip optional convergence metadata — Converge becomes plain Update.
320 /// Called automatically by the engine when the matrix layer is not active.
321 pub fn into_standard(self) -> NodeResult<U> {
322 match self {
323 Self::Converge(signal) => NodeResult::Update(signal.partial_update),
324 other => other,
325 }
326 }
327
328 /// Check if this result is an interrupt
329 pub fn is_interrupt(&self) -> bool {
330 matches!(self, NodeResult::Interrupt(_))
331 }
332
333 /// Check if this result is an error
334 pub fn is_error(&self) -> bool {
335 matches!(self, NodeResult::Error(_))
336 }
337}
338
339/// Request to pause execution for human input
340#[derive(Debug, Clone, Serialize, Deserialize)]
341pub struct InterruptRequest<U: StateUpdate> {
342 /// Why we're pausing — shown to the human
343 pub reason: String,
344
345 /// Partial update to apply BEFORE checkpointing (saves expensive work).
346 ///
347 /// NOTE: `#[serde(skip)]` is intentional — the partial update is applied
348 /// to state before the checkpoint is saved. On resume, the state already contains
349 /// this update. The field exists only to carry the update from the node to the
350 /// engine within a single execution, not across serialization boundaries.
351 #[serde(skip)]
352 pub partial_update: Option<U>,
353
354 /// Identifies which interrupt point this is (for resume matching)
355 pub resume_point: String,
356}
357
358/// Optional convergence signal — nodes can report quality metadata.
359/// Without the matrix layer, only `partial_update` is used (applied as a normal update).
360/// With the matrix layer, `actual_contribution`, `surprise`, and `quality` guide routing.
361#[derive(Debug, Clone, Serialize, Deserialize)]
362pub struct ConvergenceSignal<U: StateUpdate> {
363 /// How much this node contributed to task completion (0.0 - 1.0)
364 pub actual_contribution: f64,
365
366 /// How unexpected the result was (0.0 = expected, 1.0 = surprise)
367 pub surprise: f64,
368
369 /// Output quality estimate for C value update
370 pub quality: f64,
371
372 /// The state update to apply. Always used, regardless of matrix mode.
373 ///
374 /// NOTE: `#[serde(skip)]` — same rationale as `InterruptRequest::partial_update`.
375 /// Applied to state during the engine's UPDATE phase, already in the checkpoint.
376 #[serde(skip)]
377 pub partial_update: U,
378}
379
380/// Human input received after an interrupt
381#[derive(Debug, Clone, Serialize, Deserialize)]
382pub struct HumanInput {
383 pub approved: bool,
384 #[serde(default)]
385 pub feedback: Option<String>,
386 #[serde(default)]
387 pub data: Option<serde_json::Value>,
388}