weavegraph 0.7.0

Graph-driven, concurrent agent workflow framework with versioned state, deterministic barrier merges, and rich diagnostics.
Documentation
//! Configuration types for the runtime event bus, sinks, and diagnostics.
use std::sync::Arc;

use crate::event_bus::{EventBus, EventSink, MemorySink, StdOutSink};
use crate::utils::clock::Clock;

use super::Checkpointer;

/// Configuration for a single [`AppRunner`](crate::runtimes::runner::AppRunner).
#[derive(Clone)]
pub struct RuntimeConfig {
    /// Session ID used for persistence; a UUID is generated when `None`.
    pub session_id: Option<String>,
    /// Custom checkpointer; takes precedence over the built-in SQLite/Postgres backends.
    pub checkpointer_custom: Option<Arc<dyn Checkpointer>>,
    /// SQLite database file. Falls back to `SQLITE_DB_NAME` env var, then `weavegraph.db`.
    pub sqlite_db_name: Option<String>,
    /// Event bus settings applied when building the [`EventBus`].
    pub event_bus: EventBusConfig,
    /// Clock injected into node execution contexts.
    pub clock: Option<Arc<dyn Clock>>,
}

impl std::fmt::Debug for RuntimeConfig {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("RuntimeConfig")
            .field("session_id", &self.session_id)
            .field("checkpointer_custom", &self.checkpointer_custom.is_some())
            .field("sqlite_db_name", &self.sqlite_db_name)
            .field("event_bus", &self.event_bus)
            .field("clock", &self.clock.is_some())
            .finish()
    }
}

impl Default for RuntimeConfig {
    fn default() -> Self {
        Self {
            session_id: None,
            checkpointer_custom: None,
            sqlite_db_name: Self::resolve_db_name(None),
            event_bus: EventBusConfig::default(),
            clock: None,
        }
    }
}

impl RuntimeConfig {
    fn resolve_db_name(name: Option<String>) -> Option<String> {
        name.or_else(|| {
            dotenvy::dotenv().ok();
            Some(std::env::var("SQLITE_DB_NAME").unwrap_or_else(|_| "weavegraph.db".to_string()))
        })
    }

    /// Build a `RuntimeConfig` with the given session ID and SQLite database file name.
    pub fn new(session_id: Option<String>, sqlite_db_name: Option<String>) -> Self {
        Self {
            session_id,
            checkpointer_custom: None,
            sqlite_db_name: Self::resolve_db_name(sqlite_db_name),
            event_bus: EventBusConfig::default(),
            clock: None,
        }
    }

    /// Attach a custom [`Checkpointer`].
    #[must_use]
    pub fn checkpointer_custom(mut self, checkpointer: Arc<dyn Checkpointer>) -> Self {
        self.checkpointer_custom = Some(checkpointer);
        self
    }

    /// Return the custom checkpointer if one was set.
    #[must_use]
    pub fn custom_checkpointer(&self) -> Option<Arc<dyn Checkpointer>> {
        self.checkpointer_custom.clone()
    }

    /// Attach a runtime clock injected into [`NodeContext`](crate::node::NodeContext).
    #[must_use]
    pub fn with_clock(mut self, clock: Arc<dyn Clock>) -> Self {
        self.clock = Some(clock);
        self
    }

    /// Return the configured clock, if any.
    #[must_use]
    pub fn clock(&self) -> Option<Arc<dyn Clock>> {
        self.clock.clone()
    }

    /// Describe the clock setting: `"configured"` or `"unset"`.
    #[must_use]
    pub fn clock_mode(&self) -> &'static str {
        if self.clock.is_some() {
            "configured"
        } else {
            "unset"
        }
    }

    /// A deterministic hex fingerprint of this configuration's metadata.
    #[must_use]
    pub fn config_hash(&self) -> String {
        let parts: Vec<String> = [
            "weavegraph-runtime-config-v1".to_string(),
            format!("session_id:{}", self.session_id.as_deref().unwrap_or("")),
            format!(
                "sqlite_db_name:{}",
                self.sqlite_db_name.as_deref().unwrap_or("")
            ),
            format!("custom_checkpointer:{}", self.checkpointer_custom.is_some()),
            format!("clock:{}", self.clock_mode()),
        ]
        .into_iter()
        .chain(self.event_bus.metadata_signature())
        .collect();
        fnv1a_hex(&parts)
    }

    /// Replace the event bus configuration.
    #[must_use]
    pub fn with_event_bus(mut self, event_bus: EventBusConfig) -> Self {
        self.event_bus = event_bus;
        self
    }

    /// Use a stdout-only event bus.
    #[must_use]
    pub fn with_stdout_event_bus(self) -> Self {
        self.with_event_bus(EventBusConfig::with_stdout_only())
    }

    /// Use an in-memory event bus (silent; useful in tests).
    #[must_use]
    pub fn with_memory_event_bus(self) -> Self {
        self.with_event_bus(EventBusConfig::with_memory_sink())
    }
}

fn fnv1a_hex(parts: &[String]) -> String {
    const OFFSET: u64 = 0xcbf29ce484222325;
    const PRIME: u64 = 0x100000001b3;

    let mut hash = OFFSET;
    for part in parts {
        for byte in part.bytes().chain([0xff]) {
            hash ^= u64::from(byte);
            hash = hash.wrapping_mul(PRIME);
        }
    }
    format!("{hash:016x}")
}

/// Sink target for an [`EventBusConfig`] entry.
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum SinkConfig {
    /// Write events to standard output.
    StdOut,
    /// Capture events in memory.
    Memory,
}

/// Settings for the [`EventBus`] used by a runtime.
#[derive(Clone, Debug)]
pub struct EventBusConfig {
    /// Broadcast channel capacity; events are dropped when the buffer fills.
    pub buffer_capacity: usize,
    /// Ordered list of sinks that receive events.
    pub sinks: Vec<SinkConfig>,
    diagnostics: DiagnosticsConfig,
}

impl EventBusConfig {
    /// Default broadcast channel capacity.
    pub const DEFAULT_BUFFER_CAPACITY: usize = 1024;

    /// Build an `EventBusConfig` with the given capacity and sink list.
    ///
    /// A zero capacity is silently promoted to [`DEFAULT_BUFFER_CAPACITY`](Self::DEFAULT_BUFFER_CAPACITY).
    #[must_use]
    pub fn new(buffer_capacity: usize, sinks: Vec<SinkConfig>) -> Self {
        Self {
            buffer_capacity: if buffer_capacity == 0 {
                Self::DEFAULT_BUFFER_CAPACITY
            } else {
                buffer_capacity
            },
            sinks,
            diagnostics: DiagnosticsConfig::default_with_capacity(buffer_capacity),
        }
    }

    /// A single stdout sink at the default capacity.
    #[must_use]
    pub fn with_stdout_only() -> Self {
        Self::new(Self::DEFAULT_BUFFER_CAPACITY, vec![SinkConfig::StdOut])
    }

    /// A single in-memory sink at the default capacity (no stdout output).
    #[must_use]
    pub fn with_memory_sink() -> Self {
        Self::new(Self::DEFAULT_BUFFER_CAPACITY, vec![SinkConfig::Memory])
    }

    /// Append `sink` unless it is already present.
    #[must_use]
    pub fn add_sink(mut self, sink: SinkConfig) -> Self {
        if !self.sinks.contains(&sink) {
            self.sinks.push(sink);
        }
        self
    }

    /// Broadcast channel capacity.
    pub fn buffer_capacity(&self) -> usize {
        self.buffer_capacity
    }

    /// Configured sink list.
    pub fn sinks(&self) -> &[SinkConfig] {
        &self.sinks
    }

    /// Deterministic metadata entries describing this configuration.
    #[must_use]
    pub fn metadata_signature(&self) -> Vec<String> {
        let mut parts = vec![format!("event_buffer:{}", self.buffer_capacity)];
        parts.extend(
            self.sinks
                .iter()
                .enumerate()
                .map(|(i, s)| format!("event_sink:{i}:{s:?}")),
        );
        parts.extend(self.diagnostics.metadata_signature());
        parts
    }

    /// Override the diagnostics configuration.
    #[must_use]
    pub fn with_diagnostics(mut self, diagnostics: DiagnosticsConfig) -> Self {
        self.diagnostics = diagnostics.with_default_capacity(self.buffer_capacity);
        self
    }

    /// Construct and return the configured [`EventBus`].
    #[must_use]
    pub fn build_event_bus(&self) -> EventBus {
        let sinks: Vec<Box<dyn EventSink>> = if self.sinks.is_empty() {
            vec![Box::new(StdOutSink::default())]
        } else {
            self.sinks
                .iter()
                .map(|s| match s {
                    SinkConfig::StdOut => Box::new(StdOutSink::default()) as Box<dyn EventSink>,
                    SinkConfig::Memory => Box::new(MemorySink::new()) as Box<dyn EventSink>,
                })
                .collect()
        };
        EventBus::with_capacity_and_diag(
            sinks,
            self.buffer_capacity,
            self.diagnostics.effective_capacity(self.buffer_capacity),
            self.diagnostics.enabled,
            self.diagnostics.emit_to_events,
        )
    }
}

impl Default for EventBusConfig {
    fn default() -> Self {
        Self::with_stdout_only()
    }
}

/// Settings for the diagnostics broadcast channel (sink health reporting).
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct DiagnosticsConfig {
    /// Whether sink diagnostics are enabled.
    pub enabled: bool,
    /// Channel capacity; falls back to the event bus capacity when `None`.
    pub buffer_capacity: Option<usize>,
    /// Forward diagnostics events into the main event stream.
    pub emit_to_events: bool,
}

impl DiagnosticsConfig {
    fn min_one(n: usize) -> usize {
        n.max(1)
    }

    /// Default settings tied to a specific event bus capacity.
    pub fn default_with_capacity(event_bus_capacity: usize) -> Self {
        Self {
            enabled: true,
            buffer_capacity: Some(Self::min_one(event_bus_capacity)),
            emit_to_events: false,
        }
    }

    /// Set `buffer_capacity` from `event_bus_capacity` if it is not already provided.
    pub fn with_default_capacity(mut self, event_bus_capacity: usize) -> Self {
        self.buffer_capacity
            .get_or_insert_with(|| Self::min_one(event_bus_capacity));
        self
    }

    /// Effective channel capacity, falling back to `event_bus_capacity`.
    pub fn effective_capacity(&self, event_bus_capacity: usize) -> usize {
        self.buffer_capacity
            .unwrap_or_else(|| Self::min_one(event_bus_capacity))
    }

    fn metadata_signature(&self) -> Vec<String> {
        vec![
            format!("diagnostics_enabled:{}", self.enabled),
            format!(
                "diagnostics_capacity:{}",
                self.buffer_capacity
                    .map_or_else(String::new, |c| c.to_string())
            ),
            format!("diagnostics_emit_to_events:{}", self.emit_to_events),
        ]
    }
}

impl Default for DiagnosticsConfig {
    fn default() -> Self {
        Self {
            enabled: true,
            buffer_capacity: None,
            emit_to_events: false,
        }
    }
}