systemprompt-agent 0.14.4

Agent-to-Agent (A2A) protocol for systemprompt.io AI governance: streaming, JSON-RPC models, task lifecycle, .well-known discovery, and governed agent orchestration.
Documentation
use systemprompt_models::{AgUiEventBuilder, AgUiMessageRole, RequestContext};

use crate::models::a2a::{Artifact, Message, Task};
use crate::services::a2a_server::processing::message::StreamEvent;
use crate::services::a2a_server::streaming::webhook_client::broadcast_agui_event;
use crate::services::shared::{AgentServiceError, Result};

pub(super) async fn collect_stream_response(
    mut chunk_rx: tokio::sync::mpsc::Receiver<StreamEvent>,
    context: &RequestContext,
) -> Result<(String, Vec<Artifact>)> {
    let mut response_text = String::new();
    let mut tool_artifacts = Vec::new();

    while let Some(event) = chunk_rx.recv().await {
        match event {
            StreamEvent::Text(text) => {
                response_text.push_str(&text);
            },
            StreamEvent::Complete {
                full_text,
                artifacts,
            } => {
                response_text = full_text;
                tool_artifacts = artifacts;
            },
            StreamEvent::Error(error) => {
                let error_event =
                    AgUiEventBuilder::run_error(error.clone(), Some("EXECUTION_ERROR".to_owned()));
                if let Err(e) = broadcast_agui_event(
                    context.user_id(),
                    error_event,
                    context.auth_token().as_str(),
                )
                .await
                {
                    tracing::debug!(error = %e, "Failed to broadcast error event");
                }
                return Err(AgentServiceError::Internal(error.clone()));
            },
            _ => {},
        }
    }

    Ok((response_text, tool_artifacts))
}

pub(super) struct BroadcastAguiLifecycleParams<'a> {
    pub context: &'a RequestContext,
    pub context_id: &'a systemprompt_identifiers::ContextId,
    pub task: &'a Task,
    pub agent_message: &'a Message,
    pub response_text: &'a str,
}

pub(super) async fn broadcast_agui_lifecycle(params: BroadcastAguiLifecycleParams<'_>) {
    let user_id = params.context.user_id();
    let auth_token = params.context.auth_token().as_str();
    let task_id = params.task.id.clone();
    let message_id = params.agent_message.message_id.clone();

    let start_event =
        AgUiEventBuilder::run_started(params.context_id.clone(), task_id.clone(), None);
    if let Err(e) = broadcast_agui_event(user_id, start_event, auth_token).await {
        tracing::debug!(error = %e, "Failed to broadcast run_started event");
    }

    let msg_start =
        AgUiEventBuilder::text_message_start(message_id.to_string(), AgUiMessageRole::Assistant);
    if let Err(e) = broadcast_agui_event(user_id, msg_start, auth_token).await {
        tracing::debug!(error = %e, "Failed to broadcast text_message_start event");
    }

    let msg_content =
        AgUiEventBuilder::text_message_content(message_id.to_string(), params.response_text);
    if let Err(e) = broadcast_agui_event(user_id, msg_content, auth_token).await {
        tracing::debug!(error = %e, "Failed to broadcast text_message_content event");
    }

    let msg_end = AgUiEventBuilder::text_message_end(message_id.to_string());
    if let Err(e) = broadcast_agui_event(user_id, msg_end, auth_token).await {
        tracing::debug!(error = %e, "Failed to broadcast text_message_end event");
    }

    let result = serde_json::json!({
        "text": params.response_text,
        "artifacts": params.task.artifacts,
    });
    let finish_event =
        AgUiEventBuilder::run_finished(params.context_id.clone(), task_id, Some(result));
    if let Err(e) = broadcast_agui_event(user_id, finish_event, auth_token).await {
        tracing::debug!(error = %e, "Failed to broadcast run_finished event");
    }
}