systemprompt_agent/services/a2a_server/processing/message/stream_processor/
mod.rs1mod helpers;
2mod processing;
3
4use anyhow::{Result, anyhow};
5use base64::Engine;
6use std::sync::Arc;
7
8use crate::models::a2a::{FilePart, Message, Part};
9use crate::repository::execution::ExecutionStepRepository;
10use crate::services::{ContextService, SkillService};
11use systemprompt_models::{
12 AiContentPart, AiProvider, is_supported_audio, is_supported_image, is_supported_text,
13 is_supported_video,
14};
15
16#[allow(missing_debug_implementations)]
17pub struct StreamProcessor {
18 pub ai_service: Arc<dyn AiProvider>,
19 pub context_service: ContextService,
20 pub skill_service: Arc<SkillService>,
21 pub execution_step_repo: Arc<ExecutionStepRepository>,
22}
23
24impl StreamProcessor {
25 pub fn extract_message_text(message: &Message) -> Result<String> {
26 for part in &message.parts {
27 if let Part::Text(text_part) = part {
28 return Ok(text_part.text.clone());
29 }
30 }
31 Err(anyhow!("No text content found in message"))
32 }
33
34 pub fn extract_message_content(message: &Message) -> (String, Vec<AiContentPart>) {
35 let mut text_content = String::new();
36 let mut content_parts = Vec::new();
37
38 for part in &message.parts {
39 match part {
40 Part::Text(text_part) => {
41 if text_content.is_empty() {
42 text_content.clone_from(&text_part.text);
43 }
44 content_parts.push(AiContentPart::text(&text_part.text));
45 },
46 Part::File(file_part) => {
47 if let Some(content_part) = Self::file_to_content_part(file_part) {
48 content_parts.push(content_part);
49 }
50 },
51 Part::Data(_) => {},
52 }
53 }
54
55 (text_content, content_parts)
56 }
57
58 fn file_to_content_part(file_part: &FilePart) -> Option<AiContentPart> {
59 let mime_type = file_part.file.mime_type.as_deref()?;
60 let file_name = file_part.file.name.as_deref().unwrap_or("unnamed");
61
62 let bytes = file_part.file.bytes.as_deref()?;
63
64 if is_supported_image(mime_type) {
65 return Some(AiContentPart::image(mime_type, bytes));
66 }
67
68 if is_supported_audio(mime_type) {
69 return Some(AiContentPart::audio(mime_type, bytes));
70 }
71
72 if is_supported_video(mime_type) {
73 return Some(AiContentPart::video(mime_type, bytes));
74 }
75
76 if is_supported_text(mime_type) {
77 return Self::decode_text_file(bytes, file_name, mime_type);
78 }
79
80 tracing::warn!(
81 file_name = %file_name,
82 mime_type = %mime_type,
83 "Unsupported file type - file will not be sent to AI"
84 );
85 None
86 }
87
88 fn decode_text_file(bytes: &str, file_name: &str, mime_type: &str) -> Option<AiContentPart> {
89 let decoded = base64::engine::general_purpose::STANDARD
90 .decode(bytes)
91 .map_err(|e| {
92 tracing::warn!(
93 file_name = %file_name,
94 mime_type = %mime_type,
95 error = %e,
96 "Failed to decode base64 text file"
97 );
98 e
99 })
100 .ok()?;
101
102 let text_content = String::from_utf8(decoded)
103 .map_err(|e| {
104 tracing::warn!(
105 file_name = %file_name,
106 mime_type = %mime_type,
107 error = %e,
108 "Failed to decode text file as UTF-8"
109 );
110 e
111 })
112 .ok()?;
113
114 let formatted = format!("[File: {file_name} ({mime_type})]\n{text_content}");
115 Some(AiContentPart::text(formatted))
116 }
117}