use anyhow::Result;
use std::sync::Arc;
use tokio::sync::mpsc;
use super::StreamProcessor;
use super::helpers::{
SynthesizeFinalResponseParams, build_artifacts_from_results, synthesize_final_response,
};
use crate::models::AgentRuntimeInfo;
use crate::services::a2a_server::processing::message::{ProcessMessageStreamParams, StreamEvent};
use crate::services::a2a_server::processing::strategies::{
ExecutionContext, ExecutionStrategySelector,
};
use systemprompt_identifiers::AgentName;
use systemprompt_models::{AiMessage, MessageRole, RequestContext};
impl StreamProcessor {
pub async fn process_message_stream(
&self,
params: ProcessMessageStreamParams<'_>,
) -> Result<mpsc::UnboundedReceiver<StreamEvent>> {
let ProcessMessageStreamParams {
a2a_message,
agent_runtime,
agent_name,
context,
task_id,
} = params;
let (tx, rx) = mpsc::unbounded_channel();
let ai_service = Arc::clone(&self.ai_service);
let agent_runtime = agent_runtime.clone();
let agent_name_string = agent_name.to_string();
let agent_name_typed = AgentName::new(agent_name);
let (user_text, user_parts) = Self::extract_message_content(a2a_message);
let context_id = &a2a_message.context_id;
let conversation_history = self
.context_service
.load_conversation_history(context_id)
.await
.unwrap_or_else(|e| {
tracing::warn!(error = %e, context_id = %context_id, "Failed to load conversation history");
vec![]
});
tracing::info!(
context_id = %context_id,
history_count = conversation_history.len(),
"Loaded historical messages for context"
);
let context_id_for_artifacts = context_id.clone();
let context_id_owned = context_id.clone();
let task_id_for_artifacts = task_id.clone();
let request_ctx = context
.clone()
.with_task_id(task_id.clone())
.with_context_id(context_id.clone());
let skill_service = Arc::clone(&self.skill_service);
let execution_step_repo = Arc::clone(&self.execution_step_repo);
tokio::spawn(async move {
tracing::info!(
agent_name = %agent_name_string,
history_count = conversation_history.len(),
"Processing streaming message for agent"
);
let ai_messages = build_ai_messages(BuildAiMessagesParams {
agent_runtime: &agent_runtime,
conversation_history,
user_text,
user_parts,
skill_service: &skill_service,
request_ctx: &request_ctx,
})
.await;
let ai_messages_for_synthesis = ai_messages.clone();
let has_tools = !agent_runtime.mcp_servers.is_empty();
tracing::info!(
mcp_server_count = agent_runtime.mcp_servers.len(),
has_tools = has_tools,
"Agent MCP server status"
);
let ai_service_for_builder = Arc::clone(&ai_service);
let strategy = ExecutionStrategySelector::select_strategy(has_tools);
let execution_context = ExecutionContext {
ai_service: Arc::clone(&ai_service),
skill_service: Arc::clone(&skill_service),
agent_runtime: agent_runtime.clone(),
agent_name: agent_name_typed.clone(),
task_id: task_id.clone(),
context_id: context_id_owned,
tx: tx.clone(),
request_ctx: request_ctx.clone(),
execution_step_repo: Arc::clone(&execution_step_repo),
};
let execution_result = match strategy
.execute(execution_context, ai_messages.clone())
.await
{
Ok(result) => result,
Err(e) => {
tracing::error!(error = %e, "Execution failed");
let tracking = crate::services::ExecutionTrackingService::new(Arc::clone(
&execution_step_repo,
));
if let Err(fail_err) = tracking
.fail_in_progress_steps(&task_id, &e.to_string())
.await
{
tracing::error!(error = %fail_err, "Failed to mark steps as failed");
}
if let Err(send_err) =
tx.send(StreamEvent::Error(format!("Execution failed: {e}")))
{
tracing::trace!(error = %send_err, "Failed to send error event, channel closed");
}
return;
},
};
let (accumulated_text, tool_calls, tool_results, tools, _iterations) = (
execution_result.accumulated_text,
execution_result.tool_calls,
execution_result.tool_results,
execution_result.tools,
execution_result.iterations,
);
tracing::info!(
text_len = accumulated_text.len(),
tool_call_count = tool_calls.len(),
tool_result_count = tool_results.len(),
"Processing complete"
);
let artifacts = match build_artifacts_from_results(
&tool_results,
&tool_calls,
&tools,
&context_id_for_artifacts,
&task_id_for_artifacts,
) {
Ok(artifacts) => artifacts,
Err(e) => {
tracing::error!(error = %e, "Failed to build artifacts from tool results");
if let Err(send_err) =
tx.send(StreamEvent::Error(format!("Artifact building failed: {e}")))
{
tracing::trace!(error = %send_err, "Failed to send error event, channel closed");
}
return;
},
};
let final_text = synthesize_final_response(SynthesizeFinalResponseParams {
tool_calls: &tool_calls,
tool_results: &tool_results,
artifacts: &artifacts,
accumulated_text: &accumulated_text,
ai_service: ai_service_for_builder,
agent_runtime: &agent_runtime,
ai_messages_for_synthesis,
tx: tx.clone(),
request_ctx: request_ctx.clone(),
skill_service: Arc::clone(&skill_service),
})
.await;
tracing::info!(artifact_count = artifacts.len(), "Sending Complete event");
for (idx, artifact) in artifacts.iter().enumerate() {
tracing::info!(
artifact_index = idx + 1,
total_artifacts = artifacts.len(),
artifact_id = %artifact.id,
"Complete artifact"
);
}
let send_result = tx.send(StreamEvent::Complete {
full_text: final_text,
artifacts: artifacts.clone(),
});
if send_result.is_err() {
tracing::error!("Failed to send Complete event, channel closed");
} else {
tracing::info!(artifact_count = artifacts.len(), "Sent Complete event");
}
});
Ok(rx)
}
}
struct BuildAiMessagesParams<'a> {
agent_runtime: &'a AgentRuntimeInfo,
conversation_history: Vec<AiMessage>,
user_text: String,
user_parts: Vec<systemprompt_models::AiContentPart>,
skill_service: &'a Arc<crate::services::SkillService>,
request_ctx: &'a RequestContext,
}
async fn build_ai_messages(params: BuildAiMessagesParams<'_>) -> Vec<AiMessage> {
let BuildAiMessagesParams {
agent_runtime,
conversation_history,
user_text,
user_parts,
skill_service,
request_ctx,
} = params;
let mut ai_messages = Vec::new();
if !agent_runtime.skills.is_empty() {
tracing::info!(
skill_count = agent_runtime.skills.len(),
skills = ?agent_runtime.skills,
"Loading skills for agent"
);
let mut skills_prompt = String::from(
"# Your Skills\n\nYou have the following skills that define your capabilities and \
writing style:\n\n",
);
for skill_id in &agent_runtime.skills {
let skill_id_typed = systemprompt_identifiers::SkillId::new(skill_id);
match skill_service.load_skill(&skill_id_typed, request_ctx).await {
Ok(skill_content) => {
tracing::info!(
skill_id = %skill_id,
content_len = skill_content.len(),
"Loaded skill"
);
skills_prompt.push_str(&format!(
"## {} Skill\n\n{}\n\n---\n\n",
skill_id, skill_content
));
},
Err(e) => {
tracing::warn!(skill_id = %skill_id, error = %e, "Failed to load skill");
},
}
}
ai_messages.push(AiMessage {
role: MessageRole::System,
content: skills_prompt,
parts: Vec::new(),
});
tracing::info!("Skills injected into agent context");
}
if let Some(system_prompt) = &agent_runtime.system_prompt {
ai_messages.push(AiMessage {
role: MessageRole::System,
content: system_prompt.clone(),
parts: Vec::new(),
});
}
ai_messages.extend(conversation_history);
ai_messages.push(AiMessage {
role: MessageRole::User,
content: user_text,
parts: user_parts,
});
ai_messages
}