systemprompt_agent/services/a2a_server/processing/message/stream_processor/
processing.rs1use anyhow::Result;
2use std::sync::Arc;
3use tokio::sync::mpsc;
4
5use super::StreamProcessor;
6use super::helpers::{
7 SynthesizeFinalResponseParams, build_artifacts_from_results, synthesize_final_response,
8};
9use crate::models::AgentRuntimeInfo;
10use crate::services::a2a_server::processing::message::{ProcessMessageStreamParams, StreamEvent};
11use crate::services::a2a_server::processing::strategies::{
12 ExecutionContext, ExecutionStrategySelector,
13};
14use systemprompt_identifiers::AgentName;
15use systemprompt_models::{AiMessage, MessageRole, RequestContext};
16
17impl StreamProcessor {
18 pub async fn process_message_stream(
19 &self,
20 params: ProcessMessageStreamParams<'_>,
21 ) -> Result<mpsc::UnboundedReceiver<StreamEvent>> {
22 let ProcessMessageStreamParams {
23 a2a_message,
24 agent_runtime,
25 agent_name,
26 context,
27 task_id,
28 } = params;
29 let (tx, rx) = mpsc::unbounded_channel();
30
31 let ai_service = Arc::clone(&self.ai_service);
32 let agent_runtime = agent_runtime.clone();
33 let agent_name_string = agent_name.to_string();
34 let agent_name_typed = AgentName::new(agent_name);
35 let (user_text, user_parts) = Self::extract_message_content(a2a_message);
36
37 let context_id = &a2a_message.context_id;
38 let conversation_history = self
39 .context_service
40 .load_conversation_history(context_id.as_str())
41 .await
42 .unwrap_or_else(|e| {
43 tracing::warn!(error = %e, context_id = %context_id, "Failed to load conversation history");
44 vec![]
45 });
46
47 tracing::info!(
48 context_id = %context_id,
49 history_count = conversation_history.len(),
50 "Loaded historical messages for context"
51 );
52
53 let context_id_str = context_id.to_string();
54 let context_id_owned = context_id.clone();
55 let task_id_str = task_id.to_string();
56
57 let request_ctx = context
58 .clone()
59 .with_task_id(task_id.clone())
60 .with_context_id(context_id.clone());
61 let skill_service = Arc::clone(&self.skill_service);
62 let execution_step_repo = Arc::clone(&self.execution_step_repo);
63
64 tokio::spawn(async move {
65 tracing::info!(
66 agent_name = %agent_name_string,
67 history_count = conversation_history.len(),
68 "Processing streaming message for agent"
69 );
70
71 let ai_messages = build_ai_messages(BuildAiMessagesParams {
72 agent_runtime: &agent_runtime,
73 conversation_history,
74 user_text,
75 user_parts,
76 skill_service: &skill_service,
77 request_ctx: &request_ctx,
78 })
79 .await;
80
81 let ai_messages_for_synthesis = ai_messages.clone();
82
83 let has_tools = !agent_runtime.mcp_servers.is_empty();
84 tracing::info!(
85 mcp_server_count = agent_runtime.mcp_servers.len(),
86 has_tools = has_tools,
87 "Agent MCP server status"
88 );
89
90 let ai_service_for_builder = Arc::clone(&ai_service);
91
92 let strategy = ExecutionStrategySelector::select_strategy(has_tools);
93
94 let execution_context = ExecutionContext {
95 ai_service: Arc::clone(&ai_service),
96 skill_service: Arc::clone(&skill_service),
97 agent_runtime: agent_runtime.clone(),
98 agent_name: agent_name_typed.clone(),
99 task_id: task_id.clone(),
100 context_id: context_id_owned,
101 tx: tx.clone(),
102 request_ctx: request_ctx.clone(),
103 execution_step_repo: Arc::clone(&execution_step_repo),
104 };
105
106 let execution_result = match strategy
107 .execute(execution_context, ai_messages.clone())
108 .await
109 {
110 Ok(result) => result,
111 Err(e) => {
112 tracing::error!(error = %e, "Execution failed");
113
114 let tracking =
115 crate::services::ExecutionTrackingService::new(Arc::clone(&execution_step_repo));
116 if let Err(fail_err) = tracking
117 .fail_in_progress_steps(&task_id, &e.to_string())
118 .await
119 {
120 tracing::error!(error = %fail_err, "Failed to mark steps as failed");
121 }
122
123 if let Err(send_err) =
124 tx.send(StreamEvent::Error(format!("Execution failed: {e}")))
125 {
126 tracing::trace!(error = %send_err, "Failed to send error event, channel closed");
127 }
128 return;
129 },
130 };
131
132 let (accumulated_text, tool_calls, tool_results, tools, _iterations) = (
133 execution_result.accumulated_text,
134 execution_result.tool_calls,
135 execution_result.tool_results,
136 execution_result.tools,
137 execution_result.iterations,
138 );
139
140 tracing::info!(
141 text_len = accumulated_text.len(),
142 tool_call_count = tool_calls.len(),
143 tool_result_count = tool_results.len(),
144 "Processing complete"
145 );
146
147 let artifacts = match build_artifacts_from_results(
148 &tool_results,
149 &tool_calls,
150 &tools,
151 &context_id_str,
152 &task_id_str,
153 ) {
154 Ok(artifacts) => artifacts,
155 Err(e) => {
156 tracing::error!(error = %e, "Failed to build artifacts from tool results");
157 if let Err(send_err) =
158 tx.send(StreamEvent::Error(format!("Artifact building failed: {e}")))
159 {
160 tracing::trace!(error = %send_err, "Failed to send error event, channel closed");
161 }
162 return;
163 },
164 };
165
166 let final_text = synthesize_final_response(SynthesizeFinalResponseParams {
167 tool_calls: &tool_calls,
168 tool_results: &tool_results,
169 artifacts: &artifacts,
170 accumulated_text: &accumulated_text,
171 ai_service: ai_service_for_builder,
172 agent_runtime: &agent_runtime,
173 ai_messages_for_synthesis,
174 tx: tx.clone(),
175 request_ctx: request_ctx.clone(),
176 skill_service: Arc::clone(&skill_service),
177 })
178 .await;
179
180 tracing::info!(artifact_count = artifacts.len(), "Sending Complete event");
181
182 for (idx, artifact) in artifacts.iter().enumerate() {
183 tracing::info!(
184 artifact_index = idx + 1,
185 total_artifacts = artifacts.len(),
186 artifact_id = %artifact.id,
187 "Complete artifact"
188 );
189 }
190
191 let send_result = tx.send(StreamEvent::Complete {
192 full_text: final_text,
193 artifacts: artifacts.clone(),
194 });
195
196 if send_result.is_err() {
197 tracing::error!("Failed to send Complete event, channel closed");
198 } else {
199 tracing::info!(artifact_count = artifacts.len(), "Sent Complete event");
200 }
201 });
202
203 Ok(rx)
204 }
205}
206
207struct BuildAiMessagesParams<'a> {
208 agent_runtime: &'a AgentRuntimeInfo,
209 conversation_history: Vec<AiMessage>,
210 user_text: String,
211 user_parts: Vec<systemprompt_models::AiContentPart>,
212 skill_service: &'a Arc<crate::services::SkillService>,
213 request_ctx: &'a RequestContext,
214}
215
216async fn build_ai_messages(params: BuildAiMessagesParams<'_>) -> Vec<AiMessage> {
217 let BuildAiMessagesParams {
218 agent_runtime,
219 conversation_history,
220 user_text,
221 user_parts,
222 skill_service,
223 request_ctx,
224 } = params;
225 let mut ai_messages = Vec::new();
226
227 if !agent_runtime.skills.is_empty() {
228 tracing::info!(
229 skill_count = agent_runtime.skills.len(),
230 skills = ?agent_runtime.skills,
231 "Loading skills for agent"
232 );
233
234 let mut skills_prompt = String::from(
235 "# Your Skills\n\nYou have the following skills that define your capabilities and \
236 writing style:\n\n",
237 );
238
239 for skill_id in &agent_runtime.skills {
240 match skill_service.load_skill(skill_id, request_ctx).await {
241 Ok(skill_content) => {
242 tracing::info!(
243 skill_id = %skill_id,
244 content_len = skill_content.len(),
245 "Loaded skill"
246 );
247 skills_prompt.push_str(&format!(
248 "## {} Skill\n\n{}\n\n---\n\n",
249 skill_id, skill_content
250 ));
251 },
252 Err(e) => {
253 tracing::warn!(skill_id = %skill_id, error = %e, "Failed to load skill");
254 },
255 }
256 }
257
258 ai_messages.push(AiMessage {
259 role: MessageRole::System,
260 content: skills_prompt,
261 parts: Vec::new(),
262 });
263
264 tracing::info!("Skills injected into agent context");
265 }
266
267 if let Some(system_prompt) = &agent_runtime.system_prompt {
268 ai_messages.push(AiMessage {
269 role: MessageRole::System,
270 content: system_prompt.clone(),
271 parts: Vec::new(),
272 });
273 }
274
275 ai_messages.extend(conversation_history);
276
277 ai_messages.push(AiMessage {
278 role: MessageRole::User,
279 content: user_text,
280 parts: user_parts,
281 });
282
283 ai_messages
284}