Skip to main content

systemprompt_agent/services/a2a_server/processing/message/stream_processor/
processing.rs

1use anyhow::Result;
2use std::sync::Arc;
3use tokio::sync::mpsc;
4
5use super::StreamProcessor;
6use super::helpers::{
7    SynthesizeFinalResponseParams, build_artifacts_from_results, synthesize_final_response,
8};
9use crate::models::AgentRuntimeInfo;
10use crate::services::a2a_server::processing::message::{ProcessMessageStreamParams, StreamEvent};
11use crate::services::a2a_server::processing::strategies::{
12    ExecutionContext, ExecutionStrategySelector,
13};
14use systemprompt_identifiers::AgentName;
15use systemprompt_models::{AiMessage, MessageRole, RequestContext};
16
17impl StreamProcessor {
18    pub async fn process_message_stream(
19        &self,
20        params: ProcessMessageStreamParams<'_>,
21    ) -> Result<mpsc::UnboundedReceiver<StreamEvent>> {
22        let ProcessMessageStreamParams {
23            a2a_message,
24            agent_runtime,
25            agent_name,
26            context,
27            task_id,
28        } = params;
29        let (tx, rx) = mpsc::unbounded_channel();
30
31        let ai_service = Arc::clone(&self.ai_service);
32        let agent_runtime = agent_runtime.clone();
33        let agent_name_string = agent_name.to_string();
34        let agent_name_typed = AgentName::new(agent_name);
35        let (user_text, user_parts) = Self::extract_message_content(a2a_message);
36
37        let context_id = &a2a_message.context_id;
38        let conversation_history = self
39            .context_service
40            .load_conversation_history(context_id)
41            .await
42            .unwrap_or_else(|e| {
43                tracing::warn!(error = %e, context_id = %context_id, "Failed to load conversation history");
44                vec![]
45            });
46
47        tracing::info!(
48            context_id = %context_id,
49            history_count = conversation_history.len(),
50            "Loaded historical messages for context"
51        );
52
53        let context_id_for_artifacts = context_id.clone();
54        let context_id_owned = context_id.clone();
55        let task_id_for_artifacts = task_id.clone();
56
57        let request_ctx = context
58            .clone()
59            .with_task_id(task_id.clone())
60            .with_context_id(context_id.clone());
61        let skill_service = Arc::clone(&self.skill_service);
62        let execution_step_repo = Arc::clone(&self.execution_step_repo);
63
64        tokio::spawn(async move {
65            tracing::info!(
66                agent_name = %agent_name_string,
67                history_count = conversation_history.len(),
68                "Processing streaming message for agent"
69            );
70
71            let ai_messages = build_ai_messages(BuildAiMessagesParams {
72                agent_runtime: &agent_runtime,
73                conversation_history,
74                user_text,
75                user_parts,
76                skill_service: &skill_service,
77                request_ctx: &request_ctx,
78            })
79            .await;
80
81            let ai_messages_for_synthesis = ai_messages.clone();
82
83            let has_tools = !agent_runtime.mcp_servers.is_empty();
84            tracing::info!(
85                mcp_server_count = agent_runtime.mcp_servers.len(),
86                has_tools = has_tools,
87                "Agent MCP server status"
88            );
89
90            let ai_service_for_builder = Arc::clone(&ai_service);
91
92            let strategy = ExecutionStrategySelector::select_strategy(has_tools);
93
94            let execution_context = ExecutionContext {
95                ai_service: Arc::clone(&ai_service),
96                skill_service: Arc::clone(&skill_service),
97                agent_runtime: agent_runtime.clone(),
98                agent_name: agent_name_typed.clone(),
99                task_id: task_id.clone(),
100                context_id: context_id_owned,
101                tx: tx.clone(),
102                request_ctx: request_ctx.clone(),
103                execution_step_repo: Arc::clone(&execution_step_repo),
104            };
105
106            let execution_result = match strategy
107                .execute(execution_context, ai_messages.clone())
108                .await
109            {
110                Ok(result) => result,
111                Err(e) => {
112                    tracing::error!(error = %e, "Execution failed");
113
114                    let tracking = crate::services::ExecutionTrackingService::new(Arc::clone(
115                        &execution_step_repo,
116                    ));
117                    if let Err(fail_err) = tracking
118                        .fail_in_progress_steps(&task_id, &e.to_string())
119                        .await
120                    {
121                        tracing::error!(error = %fail_err, "Failed to mark steps as failed");
122                    }
123
124                    if let Err(send_err) =
125                        tx.send(StreamEvent::Error(format!("Execution failed: {e}")))
126                    {
127                        tracing::trace!(error = %send_err, "Failed to send error event, channel closed");
128                    }
129                    return;
130                },
131            };
132
133            let (accumulated_text, tool_calls, tool_results, tools, _iterations) = (
134                execution_result.accumulated_text,
135                execution_result.tool_calls,
136                execution_result.tool_results,
137                execution_result.tools,
138                execution_result.iterations,
139            );
140
141            tracing::info!(
142                text_len = accumulated_text.len(),
143                tool_call_count = tool_calls.len(),
144                tool_result_count = tool_results.len(),
145                "Processing complete"
146            );
147
148            let artifacts = match build_artifacts_from_results(
149                &tool_results,
150                &tool_calls,
151                &tools,
152                &context_id_for_artifacts,
153                &task_id_for_artifacts,
154            ) {
155                Ok(artifacts) => artifacts,
156                Err(e) => {
157                    tracing::error!(error = %e, "Failed to build artifacts from tool results");
158                    if let Err(send_err) =
159                        tx.send(StreamEvent::Error(format!("Artifact building failed: {e}")))
160                    {
161                        tracing::trace!(error = %send_err, "Failed to send error event, channel closed");
162                    }
163                    return;
164                },
165            };
166
167            let final_text = synthesize_final_response(SynthesizeFinalResponseParams {
168                tool_calls: &tool_calls,
169                tool_results: &tool_results,
170                artifacts: &artifacts,
171                accumulated_text: &accumulated_text,
172                ai_service: ai_service_for_builder,
173                agent_runtime: &agent_runtime,
174                ai_messages_for_synthesis,
175                tx: tx.clone(),
176                request_ctx: request_ctx.clone(),
177                skill_service: Arc::clone(&skill_service),
178            })
179            .await;
180
181            tracing::info!(artifact_count = artifacts.len(), "Sending Complete event");
182
183            for (idx, artifact) in artifacts.iter().enumerate() {
184                tracing::info!(
185                    artifact_index = idx + 1,
186                    total_artifacts = artifacts.len(),
187                    artifact_id = %artifact.id,
188                    "Complete artifact"
189                );
190            }
191
192            let send_result = tx.send(StreamEvent::Complete {
193                full_text: final_text,
194                artifacts: artifacts.clone(),
195            });
196
197            if send_result.is_err() {
198                tracing::error!("Failed to send Complete event, channel closed");
199            } else {
200                tracing::info!(artifact_count = artifacts.len(), "Sent Complete event");
201            }
202        });
203
204        Ok(rx)
205    }
206}
207
208struct BuildAiMessagesParams<'a> {
209    agent_runtime: &'a AgentRuntimeInfo,
210    conversation_history: Vec<AiMessage>,
211    user_text: String,
212    user_parts: Vec<systemprompt_models::AiContentPart>,
213    skill_service: &'a Arc<crate::services::SkillService>,
214    request_ctx: &'a RequestContext,
215}
216
217async fn build_ai_messages(params: BuildAiMessagesParams<'_>) -> Vec<AiMessage> {
218    let BuildAiMessagesParams {
219        agent_runtime,
220        conversation_history,
221        user_text,
222        user_parts,
223        skill_service,
224        request_ctx,
225    } = params;
226    let mut ai_messages = Vec::new();
227
228    if !agent_runtime.skills.is_empty() {
229        tracing::info!(
230            skill_count = agent_runtime.skills.len(),
231            skills = ?agent_runtime.skills,
232            "Loading skills for agent"
233        );
234
235        let mut skills_prompt = String::from(
236            "# Your Skills\n\nYou have the following skills that define your capabilities and \
237             writing style:\n\n",
238        );
239
240        for skill_id in &agent_runtime.skills {
241            let skill_id_typed = systemprompt_identifiers::SkillId::new(skill_id);
242            match skill_service.load_skill(&skill_id_typed, request_ctx).await {
243                Ok(skill_content) => {
244                    tracing::info!(
245                        skill_id = %skill_id,
246                        content_len = skill_content.len(),
247                        "Loaded skill"
248                    );
249                    skills_prompt.push_str(&format!(
250                        "## {} Skill\n\n{}\n\n---\n\n",
251                        skill_id, skill_content
252                    ));
253                },
254                Err(e) => {
255                    tracing::warn!(skill_id = %skill_id, error = %e, "Failed to load skill");
256                },
257            }
258        }
259
260        ai_messages.push(AiMessage {
261            role: MessageRole::System,
262            content: skills_prompt,
263            parts: Vec::new(),
264        });
265
266        tracing::info!("Skills injected into agent context");
267    }
268
269    if let Some(system_prompt) = &agent_runtime.system_prompt {
270        ai_messages.push(AiMessage {
271            role: MessageRole::System,
272            content: system_prompt.clone(),
273            parts: Vec::new(),
274        });
275    }
276
277    ai_messages.extend(conversation_history);
278
279    ai_messages.push(AiMessage {
280        role: MessageRole::User,
281        content: user_text,
282        parts: user_parts,
283    });
284
285    ai_messages
286}