systemprompt_agent/services/a2a_server/processing/message/stream_processor/
processing.rs1use anyhow::Result;
2use std::sync::Arc;
3use tokio::sync::mpsc;
4
5use super::StreamProcessor;
6use super::helpers::{
7 SynthesizeFinalResponseParams, build_artifacts_from_results, synthesize_final_response,
8};
9use crate::models::AgentRuntimeInfo;
10use crate::services::a2a_server::processing::message::{ProcessMessageStreamParams, StreamEvent};
11use crate::services::a2a_server::processing::strategies::{
12 ExecutionContext, ExecutionStrategySelector,
13};
14use systemprompt_identifiers::AgentName;
15use systemprompt_models::{AiMessage, MessageRole, RequestContext};
16
17impl StreamProcessor {
18 pub async fn process_message_stream(
19 &self,
20 params: ProcessMessageStreamParams<'_>,
21 ) -> Result<mpsc::UnboundedReceiver<StreamEvent>> {
22 let ProcessMessageStreamParams {
23 a2a_message,
24 agent_runtime,
25 agent_name,
26 context,
27 task_id,
28 } = params;
29 let (tx, rx) = mpsc::unbounded_channel();
30
31 let ai_service = Arc::clone(&self.ai_service);
32 let agent_runtime = agent_runtime.clone();
33 let agent_name_string = agent_name.to_string();
34 let agent_name_typed = AgentName::new(agent_name);
35 let (user_text, user_parts) = Self::extract_message_content(a2a_message);
36
37 let context_id = &a2a_message.context_id;
38 let conversation_history = self
39 .context_service
40 .load_conversation_history(context_id)
41 .await
42 .unwrap_or_else(|e| {
43 tracing::warn!(error = %e, context_id = %context_id, "Failed to load conversation history");
44 vec![]
45 });
46
47 tracing::info!(
48 context_id = %context_id,
49 history_count = conversation_history.len(),
50 "Loaded historical messages for context"
51 );
52
53 let context_id_for_artifacts = context_id.clone();
54 let context_id_owned = context_id.clone();
55 let task_id_for_artifacts = task_id.clone();
56
57 let request_ctx = context
58 .clone()
59 .with_task_id(task_id.clone())
60 .with_context_id(context_id.clone());
61 let skill_service = Arc::clone(&self.skill_service);
62 let execution_step_repo = Arc::clone(&self.execution_step_repo);
63
64 tokio::spawn(async move {
65 tracing::info!(
66 agent_name = %agent_name_string,
67 history_count = conversation_history.len(),
68 "Processing streaming message for agent"
69 );
70
71 let ai_messages = build_ai_messages(BuildAiMessagesParams {
72 agent_runtime: &agent_runtime,
73 conversation_history,
74 user_text,
75 user_parts,
76 skill_service: &skill_service,
77 request_ctx: &request_ctx,
78 })
79 .await;
80
81 let ai_messages_for_synthesis = ai_messages.clone();
82
83 let has_tools = !agent_runtime.mcp_servers.is_empty();
84 tracing::info!(
85 mcp_server_count = agent_runtime.mcp_servers.len(),
86 has_tools = has_tools,
87 "Agent MCP server status"
88 );
89
90 let ai_service_for_builder = Arc::clone(&ai_service);
91
92 let strategy = ExecutionStrategySelector::select_strategy(has_tools);
93
94 let execution_context = ExecutionContext {
95 ai_service: Arc::clone(&ai_service),
96 skill_service: Arc::clone(&skill_service),
97 agent_runtime: agent_runtime.clone(),
98 agent_name: agent_name_typed.clone(),
99 task_id: task_id.clone(),
100 context_id: context_id_owned,
101 tx: tx.clone(),
102 request_ctx: request_ctx.clone(),
103 execution_step_repo: Arc::clone(&execution_step_repo),
104 };
105
106 let execution_result = match strategy
107 .execute(execution_context, ai_messages.clone())
108 .await
109 {
110 Ok(result) => result,
111 Err(e) => {
112 tracing::error!(error = %e, "Execution failed");
113
114 let tracking = crate::services::ExecutionTrackingService::new(Arc::clone(
115 &execution_step_repo,
116 ));
117 if let Err(fail_err) = tracking
118 .fail_in_progress_steps(&task_id, &e.to_string())
119 .await
120 {
121 tracing::error!(error = %fail_err, "Failed to mark steps as failed");
122 }
123
124 if let Err(send_err) =
125 tx.send(StreamEvent::Error(format!("Execution failed: {e}")))
126 {
127 tracing::trace!(error = %send_err, "Failed to send error event, channel closed");
128 }
129 return;
130 },
131 };
132
133 let (accumulated_text, tool_calls, tool_results, tools, _iterations) = (
134 execution_result.accumulated_text,
135 execution_result.tool_calls,
136 execution_result.tool_results,
137 execution_result.tools,
138 execution_result.iterations,
139 );
140
141 tracing::info!(
142 text_len = accumulated_text.len(),
143 tool_call_count = tool_calls.len(),
144 tool_result_count = tool_results.len(),
145 "Processing complete"
146 );
147
148 let artifacts = match build_artifacts_from_results(
149 &tool_results,
150 &tool_calls,
151 &tools,
152 &context_id_for_artifacts,
153 &task_id_for_artifacts,
154 ) {
155 Ok(artifacts) => artifacts,
156 Err(e) => {
157 tracing::error!(error = %e, "Failed to build artifacts from tool results");
158 if let Err(send_err) =
159 tx.send(StreamEvent::Error(format!("Artifact building failed: {e}")))
160 {
161 tracing::trace!(error = %send_err, "Failed to send error event, channel closed");
162 }
163 return;
164 },
165 };
166
167 let final_text = synthesize_final_response(SynthesizeFinalResponseParams {
168 tool_calls: &tool_calls,
169 tool_results: &tool_results,
170 artifacts: &artifacts,
171 accumulated_text: &accumulated_text,
172 ai_service: ai_service_for_builder,
173 agent_runtime: &agent_runtime,
174 ai_messages_for_synthesis,
175 tx: tx.clone(),
176 request_ctx: request_ctx.clone(),
177 skill_service: Arc::clone(&skill_service),
178 })
179 .await;
180
181 tracing::info!(artifact_count = artifacts.len(), "Sending Complete event");
182
183 for (idx, artifact) in artifacts.iter().enumerate() {
184 tracing::info!(
185 artifact_index = idx + 1,
186 total_artifacts = artifacts.len(),
187 artifact_id = %artifact.id,
188 "Complete artifact"
189 );
190 }
191
192 let send_result = tx.send(StreamEvent::Complete {
193 full_text: final_text,
194 artifacts: artifacts.clone(),
195 });
196
197 if send_result.is_err() {
198 tracing::error!("Failed to send Complete event, channel closed");
199 } else {
200 tracing::info!(artifact_count = artifacts.len(), "Sent Complete event");
201 }
202 });
203
204 Ok(rx)
205 }
206}
207
208struct BuildAiMessagesParams<'a> {
209 agent_runtime: &'a AgentRuntimeInfo,
210 conversation_history: Vec<AiMessage>,
211 user_text: String,
212 user_parts: Vec<systemprompt_models::AiContentPart>,
213 skill_service: &'a Arc<crate::services::SkillService>,
214 request_ctx: &'a RequestContext,
215}
216
217async fn build_ai_messages(params: BuildAiMessagesParams<'_>) -> Vec<AiMessage> {
218 let BuildAiMessagesParams {
219 agent_runtime,
220 conversation_history,
221 user_text,
222 user_parts,
223 skill_service,
224 request_ctx,
225 } = params;
226 let mut ai_messages = Vec::new();
227
228 if !agent_runtime.skills.is_empty() {
229 tracing::info!(
230 skill_count = agent_runtime.skills.len(),
231 skills = ?agent_runtime.skills,
232 "Loading skills for agent"
233 );
234
235 let mut skills_prompt = String::from(
236 "# Your Skills\n\nYou have the following skills that define your capabilities and \
237 writing style:\n\n",
238 );
239
240 for skill_id in &agent_runtime.skills {
241 let skill_id_typed = systemprompt_identifiers::SkillId::new(skill_id);
242 match skill_service.load_skill(&skill_id_typed, request_ctx).await {
243 Ok(skill_content) => {
244 tracing::info!(
245 skill_id = %skill_id,
246 content_len = skill_content.len(),
247 "Loaded skill"
248 );
249 skills_prompt.push_str(&format!(
250 "## {} Skill\n\n{}\n\n---\n\n",
251 skill_id, skill_content
252 ));
253 },
254 Err(e) => {
255 tracing::warn!(skill_id = %skill_id, error = %e, "Failed to load skill");
256 },
257 }
258 }
259
260 ai_messages.push(AiMessage {
261 role: MessageRole::System,
262 content: skills_prompt,
263 parts: Vec::new(),
264 });
265
266 tracing::info!("Skills injected into agent context");
267 }
268
269 if let Some(system_prompt) = &agent_runtime.system_prompt {
270 ai_messages.push(AiMessage {
271 role: MessageRole::System,
272 content: system_prompt.clone(),
273 parts: Vec::new(),
274 });
275 }
276
277 ai_messages.extend(conversation_history);
278
279 ai_messages.push(AiMessage {
280 role: MessageRole::User,
281 content: user_text,
282 parts: user_parts,
283 });
284
285 ai_messages
286}