pe-core 0.1.0

Core types for Potential Expectations — messages, channels, state, traits
Documentation
//! Node traits — the unit of execution within a graph.
//!
//! Nodes are where the magic happens. Edges are plumbing.
//! Based on Decisions 4, 11, 16 and Group 31 of the pre-plan.
//!
//! ## Execution context
//!
//! Every node receives a [`NodeContext`] alongside the state snapshot.
//! This provides engine-computed, read-only metadata: step count,
//! remaining steps, activation reason, and an extensible metadata map.
//!
//! Nodes that don't need context simply ignore the parameter.
//! Nodes that care about convergence use it to decide when to wrap up.
//!
//! ## Optional matrix layer
//!
//! The graph engine works fully without the matrix. When enabled, the
//! optional matrix layer observes execution — it lives between nodes,
//! never inside them:
//!
//! - Nodes can optionally return [`ConvergenceSignal`] instead of plain updates
//! - Without the matrix: `ConvergenceSignal` degrades to a normal `Update`
//! - With the matrix: signals are observed to learn better routing paths
//! - [`ActivationReason`] provides observability either way (debugging,
//!   streaming, HITL visibility) — the matrix just uses it as extra training data
//!
//! Nodes stay pure computation units regardless of whether the matrix is active.

use crate::error::PeError;
use crate::lobe::LobeRuntimeServiceFactory;
use crate::phase_store::PhaseStateStore;
use crate::state::{State, StateUpdate};
use serde::{Deserialize, Serialize};
use std::any::Any;
use std::collections::HashMap;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;

/// Type-erased async future returned by nodes
pub type NodeFuture<U> = Pin<Box<dyn Future<Output = NodeResult<U>> + Send>>;

/// The core node trait — any async function that takes state and returns a result.
/// Generated by the `#[node]` macro. Users never implement this manually.
///
/// Generic over S: State — works with any user-defined state struct.
///
/// # Context parameter
///
/// Every call receives a [`NodeContext`] with execution metadata.
/// Simple nodes ignore it. Smart nodes use it to adapt behavior:
///
/// ```ignore
/// fn call(&self, state: &MyState, ctx: &NodeContext) -> NodeFuture<MyUpdate> {
///     Box::pin(async move {
///         if ctx.is_last_step() {
///             // No time for tool calls — give best answer now
///             return NodeResult::Update(wrap_up(state));
///         }
///         // Normal execution...
///     })
/// }
/// ```
pub trait NodeFn<S: State>: Send + Sync {
    /// Execute the node with a state snapshot and execution context.
    ///
    /// `ctx` provides engine-computed metadata (step, remaining steps,
    /// activation reason). Read-only — the engine owns this data.
    fn call(&self, state: &S, ctx: &NodeContext) -> NodeFuture<S::Update>;

    /// Human-readable name for logging and debugging
    fn name(&self) -> &str;
}

/// Execution context injected by the engine into every node call.
///
/// Read-only from the node's perspective. The engine computes these
/// values before each superstep. Nodes use this to make informed
/// decisions about execution strategy (e.g., wrap up on last step).
///
/// ## Extensibility
///
/// The `metadata` field is the extension point for the runtime layer:
/// - **pe-runtime** (Plan 007): `agent_id`, `thread_id`, budget info
/// - **Custom layers**: any `String → serde_json::Value` pair
///
/// ## Layering
///
/// `NodeContext` is for engine + runtime metadata only. The optional
/// matrix layer does NOT inject values here — it operates between nodes
/// (guiding routing), not inside them. Nodes never see matrix state.
/// This keeps nodes as pure computation units with or without the matrix.
#[derive(Clone)]
pub struct NodeContext {
    /// Current superstep number (1-indexed, increments each BSP cycle).
    pub step: u32,

    /// Maximum supersteps before `PeError::GraphRecursion`.
    pub recursion_limit: u32,

    /// Name of the node being executed.
    pub node_name: String,

    /// Why this node was activated in this superstep.
    pub activation: ActivationReason,

    /// Extensible metadata from higher layers (runtime, custom).
    ///
    /// pe-runtime (Plan 007) populates this with:
    /// - `"agent_id"`: which agent owns this execution
    /// - `"thread_id"`: conversation thread identifier
    /// - `"budget_remaining"`: token/cost budget info
    ///
    /// Nodes read these opportunistically — if a key is missing, the
    /// runtime layer isn't active. Graceful degradation.
    pub metadata: HashMap<String, serde_json::Value>,

    /// Phase state store for the interrupt/resume system.
    ///
    /// Populated by the Pregel engine from checkpoint data on resume.
    /// Nodes using the `node!` DSL read their current phase from here.
    /// Empty on initial runs (no checkpoint). The engine preserves
    /// this across interrupt/resume boundaries via checkpoint serialization.
    pub phase_store: PhaseStateStore,

    /// Type-erased stream sender for the streaming layer (Plan 011).
    ///
    /// When streaming is active, pe-runtime injects an
    /// `Arc<mpsc::Sender<StreamEvent>>` here. Nodes access it via
    /// pe-runtime's `StreamWriter` helper — never directly.
    ///
    /// `None` when streaming is not active (the common non-streaming path).
    pub stream_sender: Option<Arc<dyn Any + Send + Sync>>,

    /// Optional tool observer for streaming tool call lifecycle events.
    ///
    /// When streaming is active, pe-runtime injects a `StreamingToolObserver`
    /// that emits `ToolCallStarted`/`ToolCallCompleted`/`ToolCallFailed`.
    /// pe-tools extracts this and calls it around each tool execution.
    ///
    /// `None` when streaming is not active.
    pub tool_observer: Option<Arc<dyn ToolObserver>>,

    /// Optional factory for creating runtime-owned services for lobes.
    ///
    /// When present, `LobeNode` can build source-attributed service handles
    /// for the concrete lobe being executed.
    pub lobe_runtime_service_factory: Option<Arc<dyn LobeRuntimeServiceFactory>>,
}

impl std::fmt::Debug for NodeContext {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("NodeContext")
            .field("step", &self.step)
            .field("recursion_limit", &self.recursion_limit)
            .field("node_name", &self.node_name)
            .field("activation", &self.activation)
            .field("has_stream_sender", &self.stream_sender.is_some())
            .field("has_tool_observer", &self.tool_observer.is_some())
            .field(
                "has_lobe_runtime_service_factory",
                &self.lobe_runtime_service_factory.is_some(),
            )
            .finish()
    }
}

impl NodeContext {
    /// How many supersteps remain before the recursion limit.
    pub fn remaining_steps(&self) -> u32 {
        self.recursion_limit.saturating_sub(self.step)
    }

    /// Whether this is the final superstep before recursion limit.
    /// Nodes should wrap up and produce a final answer.
    pub fn is_last_step(&self) -> bool {
        self.step >= self.recursion_limit
    }
}

/// Observer for node lifecycle events within the execution engine.
///
/// pe-graph calls these methods around each node execution. pe-runtime
/// provides a streaming implementation that converts them to `StreamEvent`.
///
/// This is the extension point for phase-aware visibility. Without an
/// observer, no overhead — the engine just skips the calls.
///
/// # Async note
///
/// Returns boxed futures because async trait methods are not yet stable
/// in all contexts. Implementations should be lightweight (just send
/// to a channel).
pub trait NodeObserver: Send + Sync {
    /// Called before a node starts executing.
    fn on_node_start(
        &self,
        node_name: &str,
        step: u32,
    ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>>;

    /// Called after a node completes successfully.
    fn on_node_complete(
        &self,
        node_name: &str,
        step: u32,
        duration: std::time::Duration,
    ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>>;

    /// Called when a node fails with an error.
    fn on_node_error(
        &self,
        node_name: &str,
        step: u32,
        error: &str,
    ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>>;

    /// Called before a retry attempt for a failed node.
    fn on_node_retry(
        &self,
        _node_name: &str,
        _step: u32,
        _attempt: u32,
        _max_attempts: u32,
    ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
        Box::pin(async {})
    }
}

/// Observer for tool call lifecycle events.
///
/// pe-tools calls these methods around each tool execution. pe-runtime
/// provides a streaming implementation that converts them to `StreamEvent`.
///
/// Injected into `NodeContext` alongside the stream sender. pe-tools
/// extracts it from `NodeContext::tool_observer()`.
pub trait ToolObserver: Send + Sync {
    /// Called before a tool starts executing.
    fn on_tool_start(
        &self,
        tool_name: &str,
        input_summary: &str,
    ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>>;

    /// Called after a tool completes successfully.
    fn on_tool_complete(
        &self,
        tool_name: &str,
        duration: std::time::Duration,
    ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>>;

    /// Called when a tool fails with an error.
    fn on_tool_error(
        &self,
        tool_name: &str,
        error: &str,
    ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>>;
}

/// Why a node was activated in a particular superstep.
///
/// Tracks the causal chain of execution. Useful standalone:
/// - **Debugging**: trace execution flow through the graph
/// - **HITL**: show humans WHY a node will run ("activated by edge from chat")
/// - **Streaming**: emit activation events as they happen
///
/// The optional matrix layer also uses activation patterns as training data
/// to learn which routing paths lead to convergence.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[non_exhaustive]
pub enum ActivationReason {
    /// First nodes activated from START edges.
    EntryPoint,

    /// Activated by a fixed edge from a completed node.
    Edge {
        /// The node whose completion triggered this activation.
        from: String,
    },

    /// Activated by a conditional edge (router selected this node).
    ConditionalEdge {
        /// The node whose router selected this activation.
        from: String,
    },

    /// Resumed after an interrupt (human-in-the-loop).
    Resume,

    /// Re-executed after a retryable failure.
    Retry {
        /// Which retry attempt this is (1-based).
        attempt: u32,
    },
}

/// What a node can return — unified enum covering all execution outcomes.
/// Every variant works standalone. `Converge` adds optional matrix data
/// but degrades to `Update` automatically when the matrix layer isn't active.
#[derive(Debug, Clone)]
#[non_exhaustive]
pub enum NodeResult<U: StateUpdate> {
    /// Normal completion — apply this update to state
    Update(U),

    /// Pause for human input — checkpoint and wait
    Interrupt(InterruptRequest<U>),

    /// Unrecoverable error — propagate to caller
    Error(PeError),

    /// Optional convergence signal — carries quality/surprise metadata.
    /// Without the matrix layer, degrades to Update(signal.partial_update).
    /// With the matrix layer, the extra metadata guides learned routing.
    Converge(ConvergenceSignal<U>),
}

impl<U: StateUpdate> NodeResult<U> {
    /// Strip optional convergence metadata — Converge becomes plain Update.
    /// Called automatically by the engine when the matrix layer is not active.
    pub fn into_standard(self) -> NodeResult<U> {
        match self {
            Self::Converge(signal) => NodeResult::Update(signal.partial_update),
            other => other,
        }
    }

    /// Check if this result is an interrupt
    pub fn is_interrupt(&self) -> bool {
        matches!(self, NodeResult::Interrupt(_))
    }

    /// Check if this result is an error
    pub fn is_error(&self) -> bool {
        matches!(self, NodeResult::Error(_))
    }
}

/// Request to pause execution for human input
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct InterruptRequest<U: StateUpdate> {
    /// Why we're pausing — shown to the human
    pub reason: String,

    /// Partial update to apply BEFORE checkpointing (saves expensive work).
    ///
    /// NOTE: `#[serde(skip)]` is intentional — the partial update is applied
    /// to state before the checkpoint is saved. On resume, the state already contains
    /// this update. The field exists only to carry the update from the node to the
    /// engine within a single execution, not across serialization boundaries.
    #[serde(skip)]
    pub partial_update: Option<U>,

    /// Identifies which interrupt point this is (for resume matching)
    pub resume_point: String,
}

/// Optional convergence signal — nodes can report quality metadata.
/// Without the matrix layer, only `partial_update` is used (applied as a normal update).
/// With the matrix layer, `actual_contribution`, `surprise`, and `quality` guide routing.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ConvergenceSignal<U: StateUpdate> {
    /// How much this node contributed to task completion (0.0 - 1.0)
    pub actual_contribution: f64,

    /// How unexpected the result was (0.0 = expected, 1.0 = surprise)
    pub surprise: f64,

    /// Output quality estimate for C value update
    pub quality: f64,

    /// The state update to apply. Always used, regardless of matrix mode.
    ///
    /// NOTE: `#[serde(skip)]` — same rationale as `InterruptRequest::partial_update`.
    /// Applied to state during the engine's UPDATE phase, already in the checkpoint.
    #[serde(skip)]
    pub partial_update: U,
}

/// Human input received after an interrupt
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct HumanInput {
    pub approved: bool,
    #[serde(default)]
    pub feedback: Option<String>,
    #[serde(default)]
    pub data: Option<serde_json::Value>,
}