weavegraph 0.7.0

Graph-driven, concurrent agent workflow framework with versioned state, deterministic barrier merges, and rich diagnostics.
Documentation
//! Event stream lifecycle helpers for workflow execution.
use crate::event_bus::{Event, EventBus, INVOCATION_END_SCOPE, STREAM_END_SCOPE};

/// Internal reason a workflow's event stream is being closed.
pub(crate) enum StreamEndReason {
    /// Workflow ran to completion.
    Completed {
        /// Final step number.
        step: u64,
    },
    /// Workflow halted due to an error.
    Error {
        /// Step at which the error occurred, if known.
        step: Option<u64>,
        /// Human-readable error description.
        error: String,
    },
}

impl StreamEndReason {
    fn format_message(&self, session_id: &str) -> String {
        match self {
            Self::Completed { step } => {
                format!("session={session_id} status=completed step={step}")
            }
            Self::Error { step, error } => match step {
                Some(s) => format!("session={session_id} status=error step={s} error={error}"),
                None => format!("session={session_id} status=error error={error}"),
            },
        }
    }
}

/// Emit a stream termination event and optionally close the event channel.
///
/// Called when a workflow completes or errors. Closes the channel only when
/// `event_stream_taken` is true, then resets the flag.
pub(crate) fn finalize_event_stream(
    event_bus: &EventBus,
    session_id: &str,
    reason: StreamEndReason,
    event_stream_taken: &mut bool,
) {
    let message = reason.format_message(session_id);

    if let Err(err) = event_bus
        .get_emitter()
        .emit(Event::diagnostic(STREAM_END_SCOPE, message.clone()))
    {
        tracing::debug!(
            session = %session_id,
            scope = STREAM_END_SCOPE,
            completion_message = %message,
            error = ?err,
            "failed to emit stream termination event"
        );
    }

    if *event_stream_taken {
        event_bus.close_channel();
        *event_stream_taken = false;
    }
}

/// Emit an invocation completion marker without closing the event channel.
pub(crate) fn emit_invocation_end(event_bus: &EventBus, session_id: &str, reason: StreamEndReason) {
    let message = reason.format_message(session_id);

    if let Err(err) = event_bus
        .get_emitter()
        .emit(Event::diagnostic(INVOCATION_END_SCOPE, message.clone()))
    {
        tracing::debug!(
            session = %session_id,
            scope = INVOCATION_END_SCOPE,
            completion_message = %message,
            error = ?err,
            "failed to emit invocation completion event"
        );
    }
}