Skip to main content

systemprompt_agent/services/a2a_server/processing/message/stream_processor/
processing.rs

1use anyhow::Result;
2use std::sync::Arc;
3use tokio::sync::mpsc;
4
5use super::helpers::{build_artifacts_from_results, synthesize_final_response};
6use super::StreamProcessor;
7use crate::models::a2a::Message;
8use crate::models::AgentRuntimeInfo;
9use crate::services::a2a_server::processing::message::StreamEvent;
10use crate::services::a2a_server::processing::strategies::{
11    ExecutionContext, ExecutionStrategySelector,
12};
13use systemprompt_identifiers::{AgentName, TaskId};
14use systemprompt_models::{AiMessage, MessageRole, RequestContext};
15
16impl StreamProcessor {
17    pub async fn process_message_stream(
18        &self,
19        a2a_message: &Message,
20        agent_runtime: &AgentRuntimeInfo,
21        agent_name: &str,
22        context: &RequestContext,
23        task_id: TaskId,
24    ) -> Result<mpsc::UnboundedReceiver<StreamEvent>> {
25        let (tx, rx) = mpsc::unbounded_channel();
26
27        let ai_service = self.ai_service.clone();
28        let agent_runtime = agent_runtime.clone();
29        let agent_name_string = agent_name.to_string();
30        let agent_name_typed = AgentName::new(agent_name);
31        let (user_text, user_parts) = Self::extract_message_content(a2a_message);
32
33        let context_id = &a2a_message.context_id;
34        let conversation_history = self
35            .context_service
36            .load_conversation_history(context_id.as_str())
37            .await
38            .unwrap_or_else(|e| {
39                tracing::warn!(error = %e, context_id = %context_id, "Failed to load conversation history");
40                vec![]
41            });
42
43        tracing::info!(
44            context_id = %context_id,
45            history_count = conversation_history.len(),
46            "Loaded historical messages for context"
47        );
48
49        let context_id_str = context_id.to_string();
50        let context_id_owned = context_id.clone();
51        let task_id_str = task_id.to_string();
52
53        let request_ctx = context
54            .clone()
55            .with_task_id(task_id.clone())
56            .with_context_id(context_id.clone());
57        let skill_service = self.skill_service.clone();
58        let execution_step_repo = self.execution_step_repo.clone();
59
60        tokio::spawn(async move {
61            tracing::info!(
62                agent_name = %agent_name_string,
63                history_count = conversation_history.len(),
64                "Processing streaming message for agent"
65            );
66
67            let ai_messages = build_ai_messages(
68                &agent_runtime,
69                conversation_history,
70                user_text,
71                user_parts,
72                &skill_service,
73                &request_ctx,
74            )
75            .await;
76
77            let ai_messages_for_synthesis = ai_messages.clone();
78
79            let has_tools = !agent_runtime.mcp_servers.is_empty();
80            tracing::info!(
81                mcp_server_count = agent_runtime.mcp_servers.len(),
82                has_tools = has_tools,
83                "Agent MCP server status"
84            );
85
86            let ai_service_for_builder = ai_service.clone();
87
88            let selector = ExecutionStrategySelector::new();
89            let strategy = selector.select_strategy(has_tools);
90
91            let execution_context = ExecutionContext {
92                ai_service: ai_service.clone(),
93                skill_service: skill_service.clone(),
94                agent_runtime: agent_runtime.clone(),
95                agent_name: agent_name_typed.clone(),
96                task_id: task_id.clone(),
97                context_id: context_id_owned,
98                tx: tx.clone(),
99                request_ctx: request_ctx.clone(),
100                execution_step_repo: execution_step_repo.clone(),
101            };
102
103            let execution_result = match strategy
104                .execute(execution_context, ai_messages.clone())
105                .await
106            {
107                Ok(result) => result,
108                Err(e) => {
109                    tracing::error!(error = %e, "Execution failed");
110
111                    let tracking =
112                        crate::services::ExecutionTrackingService::new(execution_step_repo.clone());
113                    if let Err(fail_err) = tracking
114                        .fail_in_progress_steps(&task_id, &e.to_string())
115                        .await
116                    {
117                        tracing::error!(error = %fail_err, "Failed to mark steps as failed");
118                    }
119
120                    if let Err(send_err) =
121                        tx.send(StreamEvent::Error(format!("Execution failed: {e}")))
122                    {
123                        tracing::trace!(error = %send_err, "Failed to send error event, channel closed");
124                    }
125                    return;
126                },
127            };
128
129            let (accumulated_text, tool_calls, tool_results, tools, _iterations) = (
130                execution_result.accumulated_text,
131                execution_result.tool_calls,
132                execution_result.tool_results,
133                execution_result.tools,
134                execution_result.iterations,
135            );
136
137            tracing::info!(
138                text_len = accumulated_text.len(),
139                tool_call_count = tool_calls.len(),
140                tool_result_count = tool_results.len(),
141                "Processing complete"
142            );
143
144            let artifacts = match build_artifacts_from_results(
145                &tool_results,
146                &tool_calls,
147                &tools,
148                &context_id_str,
149                &task_id_str,
150            ) {
151                Ok(artifacts) => artifacts,
152                Err(e) => {
153                    tracing::error!(error = %e, "Failed to build artifacts from tool results");
154                    if let Err(send_err) =
155                        tx.send(StreamEvent::Error(format!("Artifact building failed: {e}")))
156                    {
157                        tracing::trace!(error = %send_err, "Failed to send error event, channel closed");
158                    }
159                    return;
160                },
161            };
162
163            let final_text = synthesize_final_response(
164                &tool_calls,
165                &tool_results,
166                &artifacts,
167                &accumulated_text,
168                ai_service_for_builder,
169                &agent_runtime,
170                ai_messages_for_synthesis,
171                tx.clone(),
172                request_ctx.clone(),
173                skill_service.clone(),
174            )
175            .await;
176
177            tracing::info!(artifact_count = artifacts.len(), "Sending Complete event");
178
179            for (idx, artifact) in artifacts.iter().enumerate() {
180                tracing::info!(
181                    artifact_index = idx + 1,
182                    total_artifacts = artifacts.len(),
183                    artifact_id = %artifact.id,
184                    "Complete artifact"
185                );
186            }
187
188            let send_result = tx.send(StreamEvent::Complete {
189                full_text: final_text.clone(),
190                artifacts: artifacts.clone(),
191            });
192
193            if send_result.is_err() {
194                tracing::error!("Failed to send Complete event, channel closed");
195            } else {
196                tracing::info!(artifact_count = artifacts.len(), "Sent Complete event");
197            }
198        });
199
200        Ok(rx)
201    }
202}
203
204async fn build_ai_messages(
205    agent_runtime: &AgentRuntimeInfo,
206    conversation_history: Vec<AiMessage>,
207    user_text: String,
208    user_parts: Vec<systemprompt_models::AiContentPart>,
209    skill_service: &Arc<crate::services::SkillService>,
210    request_ctx: &RequestContext,
211) -> Vec<AiMessage> {
212    let mut ai_messages = Vec::new();
213
214    if !agent_runtime.skills.is_empty() {
215        tracing::info!(
216            skill_count = agent_runtime.skills.len(),
217            skills = ?agent_runtime.skills,
218            "Loading skills for agent"
219        );
220
221        let mut skills_prompt = String::from(
222            "# Your Skills\n\nYou have the following skills that define your capabilities and \
223             writing style:\n\n",
224        );
225
226        for skill_id in &agent_runtime.skills {
227            match skill_service.load_skill(skill_id, request_ctx).await {
228                Ok(skill_content) => {
229                    tracing::info!(
230                        skill_id = %skill_id,
231                        content_len = skill_content.len(),
232                        "Loaded skill"
233                    );
234                    skills_prompt.push_str(&format!(
235                        "## {} Skill\n\n{}\n\n---\n\n",
236                        skill_id, skill_content
237                    ));
238                },
239                Err(e) => {
240                    tracing::warn!(skill_id = %skill_id, error = %e, "Failed to load skill");
241                },
242            }
243        }
244
245        ai_messages.push(AiMessage {
246            role: MessageRole::System,
247            content: skills_prompt,
248            parts: Vec::new(),
249        });
250
251        tracing::info!("Skills injected into agent context");
252    }
253
254    if let Some(system_prompt) = &agent_runtime.system_prompt {
255        ai_messages.push(AiMessage {
256            role: MessageRole::System,
257            content: system_prompt.clone(),
258            parts: Vec::new(),
259        });
260    }
261
262    ai_messages.extend(conversation_history);
263
264    ai_messages.push(AiMessage {
265        role: MessageRole::User,
266        content: user_text,
267        parts: user_parts,
268    });
269
270    ai_messages
271}