systemprompt_agent/services/a2a_server/processing/message/stream_processor/
processing.rs1use anyhow::Result;
2use std::sync::Arc;
3use tokio::sync::mpsc;
4
5use super::helpers::{build_artifacts_from_results, synthesize_final_response};
6use super::StreamProcessor;
7use crate::models::a2a::Message;
8use crate::models::AgentRuntimeInfo;
9use crate::services::a2a_server::processing::message::StreamEvent;
10use crate::services::a2a_server::processing::strategies::{
11 ExecutionContext, ExecutionStrategySelector,
12};
13use systemprompt_identifiers::{AgentName, TaskId};
14use systemprompt_models::{AiMessage, MessageRole, RequestContext};
15
16impl StreamProcessor {
17 pub async fn process_message_stream(
18 &self,
19 a2a_message: &Message,
20 agent_runtime: &AgentRuntimeInfo,
21 agent_name: &str,
22 context: &RequestContext,
23 task_id: TaskId,
24 ) -> Result<mpsc::UnboundedReceiver<StreamEvent>> {
25 let (tx, rx) = mpsc::unbounded_channel();
26
27 let ai_service = self.ai_service.clone();
28 let agent_runtime = agent_runtime.clone();
29 let agent_name_string = agent_name.to_string();
30 let agent_name_typed = AgentName::new(agent_name);
31 let (user_text, user_parts) = Self::extract_message_content(a2a_message);
32
33 let context_id = &a2a_message.context_id;
34 let conversation_history = self
35 .context_service
36 .load_conversation_history(context_id.as_str())
37 .await
38 .unwrap_or_else(|e| {
39 tracing::warn!(error = %e, context_id = %context_id, "Failed to load conversation history");
40 vec![]
41 });
42
43 tracing::info!(
44 context_id = %context_id,
45 history_count = conversation_history.len(),
46 "Loaded historical messages for context"
47 );
48
49 let context_id_str = context_id.to_string();
50 let context_id_owned = context_id.clone();
51 let task_id_str = task_id.to_string();
52
53 let request_ctx = context
54 .clone()
55 .with_task_id(task_id.clone())
56 .with_context_id(context_id.clone());
57 let skill_service = self.skill_service.clone();
58 let execution_step_repo = self.execution_step_repo.clone();
59
60 tokio::spawn(async move {
61 tracing::info!(
62 agent_name = %agent_name_string,
63 history_count = conversation_history.len(),
64 "Processing streaming message for agent"
65 );
66
67 let ai_messages = build_ai_messages(
68 &agent_runtime,
69 conversation_history,
70 user_text,
71 user_parts,
72 &skill_service,
73 &request_ctx,
74 )
75 .await;
76
77 let ai_messages_for_synthesis = ai_messages.clone();
78
79 let has_tools = !agent_runtime.mcp_servers.is_empty();
80 tracing::info!(
81 mcp_server_count = agent_runtime.mcp_servers.len(),
82 has_tools = has_tools,
83 "Agent MCP server status"
84 );
85
86 let ai_service_for_builder = ai_service.clone();
87
88 let selector = ExecutionStrategySelector::new();
89 let strategy = selector.select_strategy(has_tools);
90
91 let execution_context = ExecutionContext {
92 ai_service: ai_service.clone(),
93 skill_service: skill_service.clone(),
94 agent_runtime: agent_runtime.clone(),
95 agent_name: agent_name_typed.clone(),
96 task_id: task_id.clone(),
97 context_id: context_id_owned,
98 tx: tx.clone(),
99 request_ctx: request_ctx.clone(),
100 execution_step_repo: execution_step_repo.clone(),
101 };
102
103 let execution_result = match strategy
104 .execute(execution_context, ai_messages.clone())
105 .await
106 {
107 Ok(result) => result,
108 Err(e) => {
109 tracing::error!(error = %e, "Execution failed");
110
111 let tracking =
112 crate::services::ExecutionTrackingService::new(execution_step_repo.clone());
113 if let Err(fail_err) = tracking
114 .fail_in_progress_steps(&task_id, &e.to_string())
115 .await
116 {
117 tracing::error!(error = %fail_err, "Failed to mark steps as failed");
118 }
119
120 if let Err(send_err) =
121 tx.send(StreamEvent::Error(format!("Execution failed: {e}")))
122 {
123 tracing::trace!(error = %send_err, "Failed to send error event, channel closed");
124 }
125 return;
126 },
127 };
128
129 let (accumulated_text, tool_calls, tool_results, tools, _iterations) = (
130 execution_result.accumulated_text,
131 execution_result.tool_calls,
132 execution_result.tool_results,
133 execution_result.tools,
134 execution_result.iterations,
135 );
136
137 tracing::info!(
138 text_len = accumulated_text.len(),
139 tool_call_count = tool_calls.len(),
140 tool_result_count = tool_results.len(),
141 "Processing complete"
142 );
143
144 let artifacts = match build_artifacts_from_results(
145 &tool_results,
146 &tool_calls,
147 &tools,
148 &context_id_str,
149 &task_id_str,
150 ) {
151 Ok(artifacts) => artifacts,
152 Err(e) => {
153 tracing::error!(error = %e, "Failed to build artifacts from tool results");
154 if let Err(send_err) =
155 tx.send(StreamEvent::Error(format!("Artifact building failed: {e}")))
156 {
157 tracing::trace!(error = %send_err, "Failed to send error event, channel closed");
158 }
159 return;
160 },
161 };
162
163 let final_text = synthesize_final_response(
164 &tool_calls,
165 &tool_results,
166 &artifacts,
167 &accumulated_text,
168 ai_service_for_builder,
169 &agent_runtime,
170 ai_messages_for_synthesis,
171 tx.clone(),
172 request_ctx.clone(),
173 skill_service.clone(),
174 )
175 .await;
176
177 tracing::info!(artifact_count = artifacts.len(), "Sending Complete event");
178
179 for (idx, artifact) in artifacts.iter().enumerate() {
180 tracing::info!(
181 artifact_index = idx + 1,
182 total_artifacts = artifacts.len(),
183 artifact_id = %artifact.id,
184 "Complete artifact"
185 );
186 }
187
188 let send_result = tx.send(StreamEvent::Complete {
189 full_text: final_text.clone(),
190 artifacts: artifacts.clone(),
191 });
192
193 if send_result.is_err() {
194 tracing::error!("Failed to send Complete event, channel closed");
195 } else {
196 tracing::info!(artifact_count = artifacts.len(), "Sent Complete event");
197 }
198 });
199
200 Ok(rx)
201 }
202}
203
204async fn build_ai_messages(
205 agent_runtime: &AgentRuntimeInfo,
206 conversation_history: Vec<AiMessage>,
207 user_text: String,
208 user_parts: Vec<systemprompt_models::AiContentPart>,
209 skill_service: &Arc<crate::services::SkillService>,
210 request_ctx: &RequestContext,
211) -> Vec<AiMessage> {
212 let mut ai_messages = Vec::new();
213
214 if !agent_runtime.skills.is_empty() {
215 tracing::info!(
216 skill_count = agent_runtime.skills.len(),
217 skills = ?agent_runtime.skills,
218 "Loading skills for agent"
219 );
220
221 let mut skills_prompt = String::from(
222 "# Your Skills\n\nYou have the following skills that define your capabilities and \
223 writing style:\n\n",
224 );
225
226 for skill_id in &agent_runtime.skills {
227 match skill_service.load_skill(skill_id, request_ctx).await {
228 Ok(skill_content) => {
229 tracing::info!(
230 skill_id = %skill_id,
231 content_len = skill_content.len(),
232 "Loaded skill"
233 );
234 skills_prompt.push_str(&format!(
235 "## {} Skill\n\n{}\n\n---\n\n",
236 skill_id, skill_content
237 ));
238 },
239 Err(e) => {
240 tracing::warn!(skill_id = %skill_id, error = %e, "Failed to load skill");
241 },
242 }
243 }
244
245 ai_messages.push(AiMessage {
246 role: MessageRole::System,
247 content: skills_prompt,
248 parts: Vec::new(),
249 });
250
251 tracing::info!("Skills injected into agent context");
252 }
253
254 if let Some(system_prompt) = &agent_runtime.system_prompt {
255 ai_messages.push(AiMessage {
256 role: MessageRole::System,
257 content: system_prompt.clone(),
258 parts: Vec::new(),
259 });
260 }
261
262 ai_messages.extend(conversation_history);
263
264 ai_messages.push(AiMessage {
265 role: MessageRole::User,
266 content: user_text,
267 parts: user_parts,
268 });
269
270 ai_messages
271}