Skip to main content

codetether_agent/a2a/
server.rs

1//! A2A Server - serve as an A2A agent
2
3use super::types::*;
4use crate::session::{Session, SessionEvent};
5use crate::telemetry::record_persistent;
6use anyhow::Result;
7use axum::{
8    Router,
9    extract::State,
10    http::StatusCode,
11    response::Json,
12    routing::{get, post},
13};
14use dashmap::DashMap;
15use std::sync::Arc;
16use std::time::{Duration, Instant};
17
18use crate::bus::BusMessage;
19use tokio::sync::mpsc;
20use uuid::Uuid;
21
22/// A2A Server state
23#[derive(Clone)]
24pub struct A2AServer {
25    tasks: Arc<DashMap<String, Task>>,
26    agent_card: AgentCard,
27    /// Optional bus for emitting session events for SSE streaming
28    bus: Option<Arc<crate::bus::AgentBus>>,
29}
30
31impl A2AServer {
32    /// Create a new A2A server without a bus
33    pub fn new(agent_card: AgentCard) -> Self {
34        Self {
35            tasks: Arc::new(DashMap::new()),
36            agent_card,
37            bus: None,
38        }
39    }
40
41    /// Create a new A2A server with a bus for event streaming
42    pub fn with_bus(agent_card: AgentCard, bus: Arc<crate::bus::AgentBus>) -> Self {
43        Self {
44            tasks: Arc::new(DashMap::new()),
45            agent_card,
46            bus: Some(bus),
47        }
48    }
49
50    /// Create the router for A2A endpoints
51    pub fn router(self) -> Router {
52        Router::new()
53            .route("/.well-known/agent.json", get(get_agent_card))
54            .route("/.well-known/agent-card.json", get(get_agent_card))
55            .route("/", post(handle_rpc))
56            .with_state(self)
57    }
58
59    /// Get the agent card for this server
60    #[allow(dead_code)]
61    pub fn card(&self) -> &AgentCard {
62        &self.agent_card
63    }
64
65    /// Create a default agent card
66    pub fn default_card(url: &str) -> AgentCard {
67        AgentCard {
68            name: "CodeTether Agent".to_string(),
69            description: "A2A-native AI coding agent for the CodeTether ecosystem".to_string(),
70            url: url.to_string(),
71            version: env!("CARGO_PKG_VERSION").to_string(),
72            protocol_version: "0.3.0".to_string(),
73            preferred_transport: None,
74            additional_interfaces: vec![],
75            capabilities: AgentCapabilities {
76                streaming: true,
77                push_notifications: false,
78                state_transition_history: true,
79                extensions: vec![],
80            },
81            skills: vec![
82                AgentSkill {
83                    id: "code".to_string(),
84                    name: "Code Generation".to_string(),
85                    description: "Write, edit, and refactor code".to_string(),
86                    tags: vec!["code".to_string(), "programming".to_string()],
87                    examples: vec![
88                        "Write a function to parse JSON".to_string(),
89                        "Refactor this code to use async/await".to_string(),
90                    ],
91                    input_modes: vec!["text/plain".to_string()],
92                    output_modes: vec!["text/plain".to_string()],
93                },
94                AgentSkill {
95                    id: "debug".to_string(),
96                    name: "Debugging".to_string(),
97                    description: "Debug and fix code issues".to_string(),
98                    tags: vec!["debug".to_string(), "fix".to_string()],
99                    examples: vec![
100                        "Why is this function returning undefined?".to_string(),
101                        "Fix the null pointer exception".to_string(),
102                    ],
103                    input_modes: vec!["text/plain".to_string()],
104                    output_modes: vec!["text/plain".to_string()],
105                },
106                AgentSkill {
107                    id: "explain".to_string(),
108                    name: "Code Explanation".to_string(),
109                    description: "Explain code and concepts".to_string(),
110                    tags: vec!["explain".to_string(), "learn".to_string()],
111                    examples: vec![
112                        "Explain how this algorithm works".to_string(),
113                        "What does this regex do?".to_string(),
114                    ],
115                    input_modes: vec!["text/plain".to_string()],
116                    output_modes: vec!["text/plain".to_string()],
117                },
118            ],
119            default_input_modes: vec!["text/plain".to_string(), "application/json".to_string()],
120            default_output_modes: vec!["text/plain".to_string(), "application/json".to_string()],
121            provider: Some(AgentProvider {
122                organization: "CodeTether".to_string(),
123                url: "https://codetether.run".to_string(),
124            }),
125            icon_url: None,
126            documentation_url: None,
127            security_schemes: Default::default(),
128            security: vec![],
129            supports_authenticated_extended_card: false,
130            signatures: vec![],
131        }
132    }
133}
134
135/// Get agent card handler
136async fn get_agent_card(State(server): State<A2AServer>) -> Json<AgentCard> {
137    Json(server.agent_card.clone())
138}
139
140fn emit_a2a_inbound(server: &A2AServer, task_id: &str, message: &Message) {
141    emit_a2a_message(
142        server,
143        task_id,
144        "remote-a2a",
145        &server.agent_card.name,
146        message,
147    );
148}
149
150fn emit_a2a_outbound(server: &A2AServer, task_id: &str, message: &Message) {
151    emit_a2a_message(
152        server,
153        task_id,
154        &server.agent_card.name,
155        "remote-a2a",
156        message,
157    );
158}
159
160fn emit_a2a_message(server: &A2AServer, task_id: &str, from: &str, to: &str, message: &Message) {
161    let Some(bus) = server.bus.as_ref() else {
162        return;
163    };
164    let handle = bus.handle("a2a");
165    handle.send_with_correlation(
166        format!("task.{task_id}"),
167        BusMessage::AgentMessage {
168            from: from.to_string(),
169            to: to.to_string(),
170            parts: message.parts.clone(),
171        },
172        Some(task_id.to_string()),
173    );
174}
175
176async fn configure_a2a_session(session: &mut Session) {
177    let configured_model = std::env::var("CODETETHER_DEFAULT_MODEL")
178        .ok()
179        .map(|value| value.trim().to_string())
180        .filter(|value| !value.is_empty());
181
182    let configured_model = match configured_model {
183        Some(model) => Some(model),
184        None => match crate::config::Config::load().await {
185            Ok(config) => config
186                .default_model
187                .filter(|value| !value.trim().is_empty()),
188            Err(e) => {
189                tracing::debug!(error = %e, "Failed to load config for A2A session model");
190                None
191            }
192        },
193    };
194
195    if let Some(model) = configured_model {
196        session.metadata.model = Some(model);
197    }
198}
199
200fn record_a2a_message_telemetry(
201    tool_name: &str,
202    task_id: &str,
203    blocking: bool,
204    prompt: &str,
205    duration: Duration,
206    success: bool,
207    output: Option<String>,
208    error: Option<String>,
209) {
210    let record = crate::telemetry::A2AMessageRecord {
211        tool_name: tool_name.to_string(),
212        task_id: task_id.to_string(),
213        blocking,
214        prompt: prompt.to_string(),
215        duration_ms: duration.as_millis() as u64,
216        success,
217        output,
218        error,
219        timestamp: chrono::Utc::now(),
220    };
221    let _ = record_persistent(
222        "a2a_message",
223        &serde_json::to_value(&record).unwrap_or_default(),
224    );
225}
226
227/// Handle JSON-RPC requests
228async fn handle_rpc(
229    State(server): State<A2AServer>,
230    Json(request): Json<JsonRpcRequest>,
231) -> Result<Json<JsonRpcResponse>, (StatusCode, Json<JsonRpcResponse>)> {
232    let request_id = request.id.clone();
233    let response = match request.method.as_str() {
234        "message/send" => handle_message_send(&server, request).await,
235        "message/stream" => handle_message_stream(&server, request).await,
236        "tasks/get" => handle_tasks_get(&server, request).await,
237        "tasks/cancel" => handle_tasks_cancel(&server, request).await,
238        _ => Err(JsonRpcError::method_not_found(&request.method)),
239    };
240
241    match response {
242        Ok(result) => Ok(Json(JsonRpcResponse {
243            jsonrpc: "2.0".to_string(),
244            id: request_id.clone(),
245            result: Some(result),
246            error: None,
247        })),
248        Err(error) => Err((
249            StatusCode::OK,
250            Json(JsonRpcResponse {
251                jsonrpc: "2.0".to_string(),
252                id: request_id,
253                result: None,
254                error: Some(error),
255            }),
256        )),
257    }
258}
259
260async fn handle_message_send(
261    server: &A2AServer,
262    request: JsonRpcRequest,
263) -> Result<serde_json::Value, JsonRpcError> {
264    let params: MessageSendParams = serde_json::from_value(request.params)
265        .map_err(|e| JsonRpcError::invalid_params(format!("Invalid parameters: {}", e)))?;
266
267    // Create a new task
268    let task_id = params
269        .message
270        .task_id
271        .clone()
272        .unwrap_or_else(|| Uuid::new_v4().to_string());
273
274    let task = Task {
275        id: task_id.clone(),
276        context_id: params.message.context_id.clone(),
277        status: TaskStatus {
278            state: TaskState::Working,
279            message: Some(params.message.clone()),
280            timestamp: Some(chrono::Utc::now().to_rfc3339()),
281        },
282        artifacts: vec![],
283        history: vec![params.message.clone()],
284        metadata: std::collections::HashMap::new(),
285    };
286
287    server.tasks.insert(task_id.clone(), task.clone());
288    emit_a2a_inbound(server, &task_id, &params.message);
289
290    // Extract prompt text from message parts
291    let prompt: String = params
292        .message
293        .parts
294        .iter()
295        .filter_map(|p| match p {
296            Part::Text { text } => Some(text.as_str()),
297            _ => None,
298        })
299        .collect::<Vec<_>>()
300        .join("\n");
301
302    if prompt.is_empty() {
303        // Update task to failed
304        if let Some(mut t) = server.tasks.get_mut(&task_id) {
305            t.status.state = TaskState::Failed;
306            t.status.timestamp = Some(chrono::Utc::now().to_rfc3339());
307        }
308        return Err(JsonRpcError::invalid_params("No text content in message"));
309    }
310
311    // Determine if blocking (default true for message/send)
312    let blocking = params
313        .configuration
314        .as_ref()
315        .and_then(|c| c.blocking)
316        .unwrap_or(true);
317
318    if blocking {
319        // Synchronous execution: create session, run prompt, return completed task
320        let mut session = Session::new().await.map_err(|e| {
321            JsonRpcError::internal_error(format!("Failed to create session: {}", e))
322        })?;
323        configure_a2a_session(&mut session).await;
324        let started_at = Instant::now();
325
326        match session.prompt(&prompt).await {
327            Ok(result) => {
328                let result_text = result.text;
329                let response_message = Message {
330                    message_id: Uuid::new_v4().to_string(),
331                    role: MessageRole::Agent,
332                    parts: vec![Part::Text {
333                        text: result_text.clone(),
334                    }],
335                    context_id: params.message.context_id.clone(),
336                    task_id: Some(task_id.clone()),
337                    metadata: std::collections::HashMap::new(),
338                    extensions: vec![],
339                };
340
341                let artifact = Artifact {
342                    artifact_id: Uuid::new_v4().to_string(),
343                    parts: vec![Part::Text {
344                        text: result_text.clone(),
345                    }],
346                    name: Some("response".to_string()),
347                    description: None,
348                    metadata: std::collections::HashMap::new(),
349                    extensions: vec![],
350                };
351
352                emit_a2a_outbound(server, &task_id, &response_message);
353
354                if let Some(mut t) = server.tasks.get_mut(&task_id) {
355                    t.status.state = TaskState::Completed;
356                    t.status.message = Some(response_message.clone());
357                    t.status.timestamp = Some(chrono::Utc::now().to_rfc3339());
358                    t.artifacts.push(artifact.clone());
359                    t.history.push(response_message);
360
361                    let status_event = TaskStatusUpdateEvent {
362                        id: task_id.clone(),
363                        status: t.status.clone(),
364                        is_final: true,
365                        metadata: std::collections::HashMap::new(),
366                    };
367                    let artifact_event = TaskArtifactUpdateEvent {
368                        id: task_id.clone(),
369                        artifact,
370                        metadata: std::collections::HashMap::new(),
371                    };
372                    tracing::debug!(
373                        task_id = %task_id,
374                        event = ?StreamEvent::StatusUpdate(status_event),
375                        "Task completed"
376                    );
377                    tracing::debug!(
378                        task_id = %task_id,
379                        event = ?StreamEvent::ArtifactUpdate(artifact_event),
380                        "Artifact produced"
381                    );
382                }
383
384                record_a2a_message_telemetry(
385                    "a2a_message_send",
386                    &task_id,
387                    true,
388                    &prompt,
389                    started_at.elapsed(),
390                    true,
391                    Some(result_text),
392                    None,
393                );
394            }
395            Err(e) => {
396                let error_message = Message {
397                    message_id: Uuid::new_v4().to_string(),
398                    role: MessageRole::Agent,
399                    parts: vec![Part::Text {
400                        text: format!("Error: {}", e),
401                    }],
402                    context_id: params.message.context_id.clone(),
403                    task_id: Some(task_id.clone()),
404                    metadata: std::collections::HashMap::new(),
405                    extensions: vec![],
406                };
407
408                emit_a2a_outbound(server, &task_id, &error_message);
409
410                if let Some(mut t) = server.tasks.get_mut(&task_id) {
411                    t.status.state = TaskState::Failed;
412                    t.status.message = Some(error_message);
413                    t.status.timestamp = Some(chrono::Utc::now().to_rfc3339());
414                }
415
416                record_a2a_message_telemetry(
417                    "a2a_message_send",
418                    &task_id,
419                    true,
420                    &prompt,
421                    started_at.elapsed(),
422                    false,
423                    None,
424                    Some(e.to_string()),
425                );
426            }
427        }
428    } else {
429        // Async execution: spawn background task, return immediately with Working state
430        let tasks = server.tasks.clone();
431        let context_id = params.message.context_id.clone();
432        let spawn_task_id = task_id.clone();
433
434        tokio::spawn(async move {
435            let task_id = spawn_task_id;
436            let started_at = Instant::now();
437            let mut session = match Session::new().await {
438                Ok(s) => s,
439                Err(e) => {
440                    tracing::error!("Failed to create session for task {}: {}", task_id, e);
441                    if let Some(mut t) = tasks.get_mut(&task_id) {
442                        t.status.state = TaskState::Failed;
443                        t.status.timestamp = Some(chrono::Utc::now().to_rfc3339());
444                    }
445                    record_a2a_message_telemetry(
446                        "a2a_message_send",
447                        &task_id,
448                        false,
449                        &prompt,
450                        started_at.elapsed(),
451                        false,
452                        None,
453                        Some(e.to_string()),
454                    );
455                    return;
456                }
457            };
458            configure_a2a_session(&mut session).await;
459
460            match session.prompt(&prompt).await {
461                Ok(result) => {
462                    let result_text = result.text;
463                    let response_message = Message {
464                        message_id: Uuid::new_v4().to_string(),
465                        role: MessageRole::Agent,
466                        parts: vec![Part::Text {
467                            text: result_text.clone(),
468                        }],
469                        context_id,
470                        task_id: Some(task_id.clone()),
471                        metadata: std::collections::HashMap::new(),
472                        extensions: vec![],
473                    };
474
475                    let artifact = Artifact {
476                        artifact_id: Uuid::new_v4().to_string(),
477                        parts: vec![Part::Text {
478                            text: result_text.clone(),
479                        }],
480                        name: Some("response".to_string()),
481                        description: None,
482                        metadata: std::collections::HashMap::new(),
483                        extensions: vec![],
484                    };
485
486                    if let Some(mut t) = tasks.get_mut(&task_id) {
487                        t.status.state = TaskState::Completed;
488                        t.status.message = Some(response_message.clone());
489                        t.status.timestamp = Some(chrono::Utc::now().to_rfc3339());
490                        t.artifacts.push(artifact);
491                        t.history.push(response_message);
492                    }
493
494                    record_a2a_message_telemetry(
495                        "a2a_message_send",
496                        &task_id,
497                        false,
498                        &prompt,
499                        started_at.elapsed(),
500                        true,
501                        Some(result_text),
502                        None,
503                    );
504                }
505                Err(e) => {
506                    tracing::error!("Task {} failed: {}", task_id, e);
507                    if let Some(mut t) = tasks.get_mut(&task_id) {
508                        t.status.state = TaskState::Failed;
509                        t.status.timestamp = Some(chrono::Utc::now().to_rfc3339());
510                    }
511                    record_a2a_message_telemetry(
512                        "a2a_message_send",
513                        &task_id,
514                        false,
515                        &prompt,
516                        started_at.elapsed(),
517                        false,
518                        None,
519                        Some(e.to_string()),
520                    );
521                }
522            }
523        });
524    }
525
526    // Return current task state wrapped in SendMessageResponse
527    let task = server
528        .tasks
529        .get(&task_id)
530        .ok_or_else(|| JsonRpcError::internal_error(format!("Task disappeared: {}", task_id)))?;
531    let response = SendMessageResponse::Task(task.value().clone());
532    serde_json::to_value(response)
533        .map_err(|e| JsonRpcError::internal_error(format!("Serialization error: {}", e)))
534}
535
536async fn handle_message_stream(
537    server: &A2AServer,
538    request: JsonRpcRequest,
539) -> Result<serde_json::Value, JsonRpcError> {
540    // message/stream submits the task for async processing.
541    // The client should poll tasks/get for status updates.
542    // True SSE streaming requires a dedicated endpoint outside JSON-RPC.
543
544    let params: MessageSendParams = serde_json::from_value(request.params)
545        .map_err(|e| JsonRpcError::invalid_params(format!("Invalid parameters: {}", e)))?;
546
547    let task_id = params
548        .message
549        .task_id
550        .clone()
551        .unwrap_or_else(|| Uuid::new_v4().to_string());
552
553    let task = Task {
554        id: task_id.clone(),
555        context_id: params.message.context_id.clone(),
556        status: TaskStatus {
557            state: TaskState::Working,
558            message: Some(params.message.clone()),
559            timestamp: Some(chrono::Utc::now().to_rfc3339()),
560        },
561        artifacts: vec![],
562        history: vec![params.message.clone()],
563        metadata: std::collections::HashMap::new(),
564    };
565
566    server.tasks.insert(task_id.clone(), task.clone());
567    emit_a2a_inbound(server, &task_id, &params.message);
568
569    // Extract prompt
570    let prompt: String = params
571        .message
572        .parts
573        .iter()
574        .filter_map(|p| match p {
575            Part::Text { text } => Some(text.as_str()),
576            _ => None,
577        })
578        .collect::<Vec<_>>()
579        .join("\n");
580
581    if prompt.is_empty() {
582        if let Some(mut t) = server.tasks.get_mut(&task_id) {
583            t.status.state = TaskState::Failed;
584            t.status.timestamp = Some(chrono::Utc::now().to_rfc3339());
585        }
586        return Err(JsonRpcError::invalid_params("No text content in message"));
587    }
588
589    // Spawn async processing with event streaming
590    let tasks = server.tasks.clone();
591    let context_id = params.message.context_id.clone();
592    let spawn_task_id = task_id.clone();
593    let bus = server.bus.clone();
594
595    tokio::spawn(async move {
596        let task_id = spawn_task_id;
597        let started_at = Instant::now();
598
599        // Create a channel for session events
600        let (event_tx, mut event_rx) = mpsc::channel::<SessionEvent>(256);
601
602        let mut session = match Session::new().await {
603            Ok(s) => s,
604            Err(e) => {
605                tracing::error!(
606                    "Failed to create session for stream task {}: {}",
607                    task_id,
608                    e
609                );
610                if let Some(mut t) = tasks.get_mut(&task_id) {
611                    t.status.state = TaskState::Failed;
612                    t.status.timestamp = Some(chrono::Utc::now().to_rfc3339());
613                }
614                record_a2a_message_telemetry(
615                    "a2a_message_stream",
616                    &task_id,
617                    false,
618                    &prompt,
619                    started_at.elapsed(),
620                    false,
621                    None,
622                    Some(e.to_string()),
623                );
624                return;
625            }
626        };
627        configure_a2a_session(&mut session).await;
628
629        // Spawn a task to forward session events to the bus
630        let bus_clone = bus.clone();
631        let task_id_clone = task_id.clone();
632        tokio::spawn(async move {
633            while let Some(event) = event_rx.recv().await {
634                let event_data = match &event {
635                    SessionEvent::Thinking => {
636                        serde_json::json!({ "type": "thinking" })
637                    }
638                    SessionEvent::ToolCallStart { name, arguments } => {
639                        serde_json::json!({
640                            "type": "tool_call_start",
641                            "name": name,
642                            "arguments": arguments
643                        })
644                    }
645                    SessionEvent::ToolCallComplete {
646                        name,
647                        output,
648                        success,
649                        duration_ms,
650                    } => {
651                        serde_json::json!({
652                            "type": "tool_call_complete",
653                            "name": name,
654                            "output": output.chars().take(500).collect::<String>(),
655                            "success": success,
656                            "duration_ms": duration_ms
657                        })
658                    }
659                    SessionEvent::TextChunk(text) => {
660                        serde_json::json!({ "type": "text_chunk", "text": text })
661                    }
662                    SessionEvent::TextComplete(text) => {
663                        serde_json::json!({ "type": "text_complete", "text": text })
664                    }
665                    SessionEvent::ThinkingComplete(thought) => {
666                        serde_json::json!({ "type": "thinking_complete", "thought": thought })
667                    }
668                    SessionEvent::UsageReport {
669                        prompt_tokens,
670                        completion_tokens,
671                        duration_ms,
672                        model,
673                    } => {
674                        serde_json::json!({
675                            "type": "usage_report",
676                            "prompt_tokens": prompt_tokens,
677                            "completion_tokens": completion_tokens,
678                            "duration_ms": duration_ms,
679                            "model": model
680                        })
681                    }
682                    SessionEvent::Done => {
683                        serde_json::json!({ "type": "done" })
684                    }
685                    SessionEvent::Error(err) => {
686                        serde_json::json!({ "type": "error", "error": err })
687                    }
688                    SessionEvent::SessionSync(_) => {
689                        continue; // Don't emit session sync to SSE
690                    }
691                    // New non-exhaustive variants (context management, RLM
692                    // progress, token estimates). They are handled by the
693                    // dedicated SessionBus subscribers rather than SSE.
694                    _ => continue,
695                };
696
697                // Emit to bus for SSE subscribers
698                if let Some(ref bus) = bus_clone {
699                    let handle = bus.handle("a2a-stream");
700                    handle.send(
701                        format!("task.{}", task_id_clone),
702                        crate::bus::BusMessage::TaskUpdate {
703                            task_id: task_id_clone.clone(),
704                            state: crate::a2a::types::TaskState::Working,
705                            message: Some(serde_json::to_string(&event_data).unwrap_or_default()),
706                        },
707                    );
708                }
709            }
710        });
711
712        // Use prompt_with_events for streaming
713        let registry = match crate::provider::ProviderRegistry::from_vault().await {
714            Ok(r) => Arc::new(r),
715            Err(e) => {
716                tracing::error!("Failed to load provider registry: {}", e);
717                if let Some(mut t) = tasks.get_mut(&task_id) {
718                    t.status.state = TaskState::Failed;
719                    t.status.timestamp = Some(chrono::Utc::now().to_rfc3339());
720                }
721                return;
722            }
723        };
724
725        match session
726            .prompt_with_events(&prompt, event_tx, registry)
727            .await
728        {
729            Ok(result) => {
730                let result_text = result.text;
731                let response_message = Message {
732                    message_id: Uuid::new_v4().to_string(),
733                    role: MessageRole::Agent,
734                    parts: vec![Part::Text {
735                        text: result_text.clone(),
736                    }],
737                    context_id,
738                    task_id: Some(task_id.clone()),
739                    metadata: std::collections::HashMap::new(),
740                    extensions: vec![],
741                };
742
743                let artifact = Artifact {
744                    artifact_id: Uuid::new_v4().to_string(),
745                    parts: vec![Part::Text {
746                        text: result_text.clone(),
747                    }],
748                    name: Some("response".to_string()),
749                    description: None,
750                    metadata: std::collections::HashMap::new(),
751                    extensions: vec![],
752                };
753
754                if let Some(mut t) = tasks.get_mut(&task_id) {
755                    t.status.state = TaskState::Completed;
756                    t.status.message = Some(response_message.clone());
757                    t.status.timestamp = Some(chrono::Utc::now().to_rfc3339());
758                    t.artifacts.push(artifact.clone());
759                    t.history.push(response_message);
760
761                    // Emit streaming events for SSE consumers
762                    let status_event = TaskStatusUpdateEvent {
763                        id: task_id.clone(),
764                        status: t.status.clone(),
765                        is_final: true,
766                        metadata: std::collections::HashMap::new(),
767                    };
768                    let artifact_event = TaskArtifactUpdateEvent {
769                        id: task_id.clone(),
770                        artifact,
771                        metadata: std::collections::HashMap::new(),
772                    };
773                    tracing::debug!(
774                        task_id = %task_id,
775                        event = ?StreamEvent::StatusUpdate(status_event),
776                        "Task completed"
777                    );
778                    tracing::debug!(
779                        task_id = %task_id,
780                        event = ?StreamEvent::ArtifactUpdate(artifact_event),
781                        "Artifact produced"
782                    );
783                }
784
785                record_a2a_message_telemetry(
786                    "a2a_message_stream",
787                    &task_id,
788                    false,
789                    &prompt,
790                    started_at.elapsed(),
791                    true,
792                    Some(result_text),
793                    None,
794                );
795            }
796            Err(e) => {
797                tracing::error!("Stream task {} failed: {}", task_id, e);
798                if let Some(mut t) = tasks.get_mut(&task_id) {
799                    t.status.state = TaskState::Failed;
800                    t.status.timestamp = Some(chrono::Utc::now().to_rfc3339());
801                }
802                record_a2a_message_telemetry(
803                    "a2a_message_stream",
804                    &task_id,
805                    false,
806                    &prompt,
807                    started_at.elapsed(),
808                    false,
809                    None,
810                    Some(e.to_string()),
811                );
812            }
813        }
814    });
815
816    // Return task in Working state — client polls tasks/get for completion
817    let response = SendMessageResponse::Task(task);
818    serde_json::to_value(response)
819        .map_err(|e| JsonRpcError::internal_error(format!("Serialization error: {}", e)))
820}
821
822async fn handle_tasks_get(
823    server: &A2AServer,
824    request: JsonRpcRequest,
825) -> Result<serde_json::Value, JsonRpcError> {
826    let params: TaskQueryParams = serde_json::from_value(request.params)
827        .map_err(|e| JsonRpcError::invalid_params(format!("Invalid parameters: {}", e)))?;
828
829    let task = server.tasks.get(&params.id).ok_or_else(|| JsonRpcError {
830        code: TASK_NOT_FOUND,
831        message: format!("Task not found: {}", params.id),
832        data: None,
833    })?;
834
835    serde_json::to_value(task.value().clone())
836        .map_err(|e| JsonRpcError::internal_error(format!("Serialization error: {}", e)))
837}
838
839async fn handle_tasks_cancel(
840    server: &A2AServer,
841    request: JsonRpcRequest,
842) -> Result<serde_json::Value, JsonRpcError> {
843    let params: TaskQueryParams = serde_json::from_value(request.params)
844        .map_err(|e| JsonRpcError::invalid_params(format!("Invalid parameters: {}", e)))?;
845
846    let mut task = server
847        .tasks
848        .get_mut(&params.id)
849        .ok_or_else(|| JsonRpcError {
850            code: TASK_NOT_FOUND,
851            message: format!("Task not found: {}", params.id),
852            data: None,
853        })?;
854
855    if !task.status.state.is_active() {
856        return Err(JsonRpcError {
857            code: TASK_NOT_CANCELABLE,
858            message: "Task is already in a terminal state".to_string(),
859            data: None,
860        });
861    }
862
863    task.status.state = TaskState::Cancelled;
864    task.status.timestamp = Some(chrono::Utc::now().to_rfc3339());
865
866    serde_json::to_value(task.value().clone())
867        .map_err(|e| JsonRpcError::internal_error(format!("Serialization error: {}", e)))
868}