weavegraph 0.7.0

Graph-driven, concurrent agent workflow framework with versioned state, deterministic barrier merges, and rich diagnostics.
Documentation
//! [`RuntimeObserver`] trait and associated metadata types for workflow telemetry.
//!
//! Provides opt-in hooks at key execution boundaries: invocation start/finish,
//! per-node completion, checkpoint operations, and event-bus emissions. Every
//! method defaults to a no-op; implement only the hooks you need.
//!
//! ## Usage
//!
//! ```rust,no_run
//! use std::sync::Arc;
//! use weavegraph::runtimes::{AppRunner, observer::{RuntimeObserver, NodeFinishMeta}};
//! use weavegraph::app::App;
//!
//! #[derive(Debug)]
//! struct CountingObserver {
//!     count: std::sync::atomic::AtomicU64,
//! }
//!
//! impl RuntimeObserver for CountingObserver {
//!     fn on_node_finish(&self, meta: &NodeFinishMeta<'_>) {
//!         self.count.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
//!     }
//! }
//!
//! # async fn example(app: App) {
//! let observer = Arc::new(CountingObserver { count: Default::default() });
//! let runner = AppRunner::builder()
//!     .app(app)
//!     .observer(observer)
//!     .build()
//!     .await;
//! # }
//! ```
//!
//! ## Performance
//!
//! Hooks are called **synchronously** on the execution thread and must be fast
//! and non-blocking. Panics are caught by the runner, which emits a
//! `tracing::warn!` and continues — an observer failure never aborts the graph.
//!
//! ## Per-node timing in 0.6.0
//!
//! `step_duration_ms` in [`NodeFinishMeta`] is the wall-clock duration of the
//! **entire superstep**, shared by every node that ran in the same parallel step.
//! Per-node timing requires scheduler-level instrumentation and is planned for a
//! future release.

use std::fmt;
use std::panic::RefUnwindSafe;

use crate::types::NodeKind;

/// Outcome of a completed workflow invocation.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[non_exhaustive]
pub enum InvocationOutcome {
    /// The invocation ran to completion successfully.
    Completed,
    /// The invocation ended with a runtime error.
    Error,
}

/// Outcome of a completed node execution within a superstep.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[non_exhaustive]
pub enum NodeOutcome {
    /// The node ran and returned a `NodePartial`.
    Completed,
    /// The node returned a fatal `NodeError`.
    Error,
    /// The node was skipped (version-gated or terminal `End` node).
    Skipped,
}

/// Metadata supplied to [`RuntimeObserver::on_invocation_start`].
#[derive(Debug)]
#[non_exhaustive]
pub struct InvocationStartMeta<'a> {
    /// Session identifier for this invocation.
    pub session_id: &'a str,
    /// Stable fingerprint of the compiled graph definition.
    ///
    /// Computed by [`App::graph_definition_hash`](crate::app::App::graph_definition_hash).
    pub graph_id: &'a str,
}

/// Metadata supplied to [`RuntimeObserver::on_invocation_finish`].
#[derive(Debug)]
#[non_exhaustive]
pub struct InvocationFinishMeta<'a> {
    /// Session identifier.
    pub session_id: &'a str,
    /// Stable fingerprint of the compiled graph definition.
    pub graph_id: &'a str,
    /// Wall-clock elapsed time for the full invocation in milliseconds.
    pub duration_ms: u64,
    /// Outcome of the invocation.
    pub outcome: InvocationOutcome,
}

/// Metadata supplied to [`RuntimeObserver::on_node_finish`].
///
/// See the [per-node timing note](self#per-node-timing-in-060) in the module docs.
#[derive(Debug)]
#[non_exhaustive]
pub struct NodeFinishMeta<'a> {
    /// The node that completed.
    pub node_kind: &'a NodeKind,
    /// Session identifier.
    pub session_id: &'a str,
    /// Step number within which this node executed.
    pub step: u64,
    /// Superstep wall-clock duration in milliseconds; shared by all nodes in the step.
    pub step_duration_ms: u64,
    /// Outcome of this node.
    pub outcome: NodeOutcome,
}

/// Metadata supplied to [`RuntimeObserver::on_checkpoint_load`].
#[derive(Debug)]
#[non_exhaustive]
pub struct CheckpointLoadMeta<'a> {
    /// Session identifier.
    pub session_id: &'a str,
    /// Backend name (e.g. `"sqlite"`, `"postgres"`, `"in-memory"`).
    pub backend: &'a str,
    /// Step number loaded from the checkpoint.
    pub step: u64,
}

/// Metadata supplied to [`RuntimeObserver::on_checkpoint_save`].
#[derive(Debug)]
#[non_exhaustive]
pub struct CheckpointSaveMeta<'a> {
    /// Session identifier.
    pub session_id: &'a str,
    /// Backend name (e.g. `"sqlite"`, `"postgres"`, `"in-memory"`).
    pub backend: &'a str,
    /// Step number saved.
    pub step: u64,
    /// Wall-clock duration of the save operation in milliseconds.
    pub duration_ms: u64,
}

/// Metadata supplied to [`RuntimeObserver::on_event_bus_emit`].
#[derive(Debug)]
#[non_exhaustive]
pub struct EventBusEmitMeta<'a> {
    /// Scope label of the emitted event (e.g. `"features"`, `"__weavegraph_stream_end__"`).
    pub scope: &'a str,
}

/// Opt-in telemetry interface for the runtime.
///
/// Register an implementation with
/// [`AppRunnerBuilder::observer`](crate::runtimes::runner::AppRunnerBuilder::observer).
/// All methods default to no-ops; override only the hooks you need.
///
/// ## Contract
///
/// Implementations must be `Send + Sync + 'static` — the runner shares observers
/// across async tasks. The [`RefUnwindSafe`] bound lets the runner catch panics
/// inside hooks without an `AssertUnwindSafe` wrapper at every callsite; a
/// panicking hook produces a `tracing::warn!` and execution continues.
pub trait RuntimeObserver: Send + Sync + fmt::Debug + RefUnwindSafe + 'static {
    /// Called immediately before a workflow invocation begins.
    fn on_invocation_start(&self, _meta: &InvocationStartMeta<'_>) {}

    /// Called after a workflow invocation finishes (successfully or with an error).
    fn on_invocation_finish(&self, _meta: &InvocationFinishMeta<'_>) {}

    /// Called once per node after the superstep containing it completes.
    fn on_node_finish(&self, _meta: &NodeFinishMeta<'_>) {}

    /// Called after a checkpoint is successfully loaded during session resumption.
    fn on_checkpoint_load(&self, _meta: &CheckpointLoadMeta<'_>) {}

    /// Called after a checkpoint is successfully saved.
    fn on_checkpoint_save(&self, _meta: &CheckpointSaveMeta<'_>) {}

    /// Called after each event is emitted through the event bus.
    fn on_event_bus_emit(&self, _meta: &EventBusEmitMeta<'_>) {}
}