1use super::types::*;
4use crate::session::{Session, SessionEvent};
5use crate::telemetry::record_persistent;
6use anyhow::Result;
7use axum::{
8 Router,
9 extract::State,
10 http::StatusCode,
11 response::Json,
12 routing::{get, post},
13};
14use dashmap::DashMap;
15use std::sync::Arc;
16use std::time::{Duration, Instant};
17
18use crate::bus::BusMessage;
19use tokio::sync::mpsc;
20use uuid::Uuid;
21
22#[derive(Clone)]
24pub struct A2AServer {
25 tasks: Arc<DashMap<String, Task>>,
26 agent_card: AgentCard,
27 bus: Option<Arc<crate::bus::AgentBus>>,
29}
30
31impl A2AServer {
32 pub fn new(agent_card: AgentCard) -> Self {
34 Self {
35 tasks: Arc::new(DashMap::new()),
36 agent_card,
37 bus: None,
38 }
39 }
40
41 pub fn with_bus(agent_card: AgentCard, bus: Arc<crate::bus::AgentBus>) -> Self {
43 Self {
44 tasks: Arc::new(DashMap::new()),
45 agent_card,
46 bus: Some(bus),
47 }
48 }
49
50 pub fn router(self) -> Router {
52 Router::new()
53 .route("/.well-known/agent.json", get(get_agent_card))
54 .route("/.well-known/agent-card.json", get(get_agent_card))
55 .route("/", post(handle_rpc))
56 .with_state(self)
57 }
58
59 #[allow(dead_code)]
61 pub fn card(&self) -> &AgentCard {
62 &self.agent_card
63 }
64
65 pub fn default_card(url: &str) -> AgentCard {
67 AgentCard {
68 name: "CodeTether Agent".to_string(),
69 description: "A2A-native AI coding agent for the CodeTether ecosystem".to_string(),
70 url: url.to_string(),
71 version: env!("CARGO_PKG_VERSION").to_string(),
72 protocol_version: "0.3.0".to_string(),
73 preferred_transport: None,
74 additional_interfaces: vec![],
75 capabilities: AgentCapabilities {
76 streaming: true,
77 push_notifications: false,
78 state_transition_history: true,
79 extensions: vec![],
80 },
81 skills: vec![
82 AgentSkill {
83 id: "code".to_string(),
84 name: "Code Generation".to_string(),
85 description: "Write, edit, and refactor code".to_string(),
86 tags: vec!["code".to_string(), "programming".to_string()],
87 examples: vec![
88 "Write a function to parse JSON".to_string(),
89 "Refactor this code to use async/await".to_string(),
90 ],
91 input_modes: vec!["text/plain".to_string()],
92 output_modes: vec!["text/plain".to_string()],
93 },
94 AgentSkill {
95 id: "debug".to_string(),
96 name: "Debugging".to_string(),
97 description: "Debug and fix code issues".to_string(),
98 tags: vec!["debug".to_string(), "fix".to_string()],
99 examples: vec![
100 "Why is this function returning undefined?".to_string(),
101 "Fix the null pointer exception".to_string(),
102 ],
103 input_modes: vec!["text/plain".to_string()],
104 output_modes: vec!["text/plain".to_string()],
105 },
106 AgentSkill {
107 id: "explain".to_string(),
108 name: "Code Explanation".to_string(),
109 description: "Explain code and concepts".to_string(),
110 tags: vec!["explain".to_string(), "learn".to_string()],
111 examples: vec![
112 "Explain how this algorithm works".to_string(),
113 "What does this regex do?".to_string(),
114 ],
115 input_modes: vec!["text/plain".to_string()],
116 output_modes: vec!["text/plain".to_string()],
117 },
118 ],
119 default_input_modes: vec!["text/plain".to_string(), "application/json".to_string()],
120 default_output_modes: vec!["text/plain".to_string(), "application/json".to_string()],
121 provider: Some(AgentProvider {
122 organization: "CodeTether".to_string(),
123 url: "https://codetether.run".to_string(),
124 }),
125 icon_url: None,
126 documentation_url: None,
127 security_schemes: Default::default(),
128 security: vec![],
129 supports_authenticated_extended_card: false,
130 signatures: vec![],
131 }
132 }
133}
134
135async fn get_agent_card(State(server): State<A2AServer>) -> Json<AgentCard> {
137 Json(server.agent_card.clone())
138}
139
140fn emit_a2a_inbound(server: &A2AServer, task_id: &str, message: &Message) {
141 emit_a2a_message(
142 server,
143 task_id,
144 "remote-a2a",
145 &server.agent_card.name,
146 message,
147 );
148}
149
150fn emit_a2a_outbound(server: &A2AServer, task_id: &str, message: &Message) {
151 emit_a2a_message(
152 server,
153 task_id,
154 &server.agent_card.name,
155 "remote-a2a",
156 message,
157 );
158}
159
160fn emit_a2a_message(server: &A2AServer, task_id: &str, from: &str, to: &str, message: &Message) {
161 let Some(bus) = server.bus.as_ref() else {
162 return;
163 };
164 let handle = bus.handle("a2a");
165 handle.send_with_correlation(
166 format!("task.{task_id}"),
167 BusMessage::AgentMessage {
168 from: from.to_string(),
169 to: to.to_string(),
170 parts: message.parts.clone(),
171 },
172 Some(task_id.to_string()),
173 );
174}
175
176async fn configure_a2a_session(session: &mut Session) {
177 let configured_model = std::env::var("CODETETHER_DEFAULT_MODEL")
178 .ok()
179 .map(|value| value.trim().to_string())
180 .filter(|value| !value.is_empty());
181
182 let configured_model = match configured_model {
183 Some(model) => Some(model),
184 None => match crate::config::Config::load().await {
185 Ok(config) => config
186 .default_model
187 .filter(|value| !value.trim().is_empty()),
188 Err(e) => {
189 tracing::debug!(error = %e, "Failed to load config for A2A session model");
190 None
191 }
192 },
193 };
194
195 if let Some(model) = configured_model {
196 session.metadata.model = Some(model);
197 }
198}
199
200fn record_a2a_message_telemetry(
201 tool_name: &str,
202 task_id: &str,
203 blocking: bool,
204 prompt: &str,
205 duration: Duration,
206 success: bool,
207 output: Option<String>,
208 error: Option<String>,
209) {
210 let record = crate::telemetry::A2AMessageRecord {
211 tool_name: tool_name.to_string(),
212 task_id: task_id.to_string(),
213 blocking,
214 prompt: prompt.to_string(),
215 duration_ms: duration.as_millis() as u64,
216 success,
217 output,
218 error,
219 timestamp: chrono::Utc::now(),
220 };
221 let _ = record_persistent(
222 "a2a_message",
223 &serde_json::to_value(&record).unwrap_or_default(),
224 );
225}
226
227async fn handle_rpc(
229 State(server): State<A2AServer>,
230 Json(request): Json<JsonRpcRequest>,
231) -> Result<Json<JsonRpcResponse>, (StatusCode, Json<JsonRpcResponse>)> {
232 let request_id = request.id.clone();
233 let response = match request.method.as_str() {
234 "message/send" => handle_message_send(&server, request).await,
235 "message/stream" => handle_message_stream(&server, request).await,
236 "tasks/get" => handle_tasks_get(&server, request).await,
237 "tasks/cancel" => handle_tasks_cancel(&server, request).await,
238 _ => Err(JsonRpcError::method_not_found(&request.method)),
239 };
240
241 match response {
242 Ok(result) => Ok(Json(JsonRpcResponse {
243 jsonrpc: "2.0".to_string(),
244 id: request_id.clone(),
245 result: Some(result),
246 error: None,
247 })),
248 Err(error) => Err((
249 StatusCode::OK,
250 Json(JsonRpcResponse {
251 jsonrpc: "2.0".to_string(),
252 id: request_id,
253 result: None,
254 error: Some(error),
255 }),
256 )),
257 }
258}
259
260async fn handle_message_send(
261 server: &A2AServer,
262 request: JsonRpcRequest,
263) -> Result<serde_json::Value, JsonRpcError> {
264 let params: MessageSendParams = serde_json::from_value(request.params)
265 .map_err(|e| JsonRpcError::invalid_params(format!("Invalid parameters: {}", e)))?;
266
267 let task_id = params
269 .message
270 .task_id
271 .clone()
272 .unwrap_or_else(|| Uuid::new_v4().to_string());
273
274 let task = Task {
275 id: task_id.clone(),
276 context_id: params.message.context_id.clone(),
277 status: TaskStatus {
278 state: TaskState::Working,
279 message: Some(params.message.clone()),
280 timestamp: Some(chrono::Utc::now().to_rfc3339()),
281 },
282 artifacts: vec![],
283 history: vec![params.message.clone()],
284 metadata: std::collections::HashMap::new(),
285 };
286
287 server.tasks.insert(task_id.clone(), task.clone());
288 emit_a2a_inbound(server, &task_id, ¶ms.message);
289
290 let prompt: String = params
292 .message
293 .parts
294 .iter()
295 .filter_map(|p| match p {
296 Part::Text { text } => Some(text.as_str()),
297 _ => None,
298 })
299 .collect::<Vec<_>>()
300 .join("\n");
301
302 if prompt.is_empty() {
303 if let Some(mut t) = server.tasks.get_mut(&task_id) {
305 t.status.state = TaskState::Failed;
306 t.status.timestamp = Some(chrono::Utc::now().to_rfc3339());
307 }
308 return Err(JsonRpcError::invalid_params("No text content in message"));
309 }
310
311 let blocking = params
313 .configuration
314 .as_ref()
315 .and_then(|c| c.blocking)
316 .unwrap_or(true);
317
318 if blocking {
319 let mut session = Session::new().await.map_err(|e| {
321 JsonRpcError::internal_error(format!("Failed to create session: {}", e))
322 })?;
323 configure_a2a_session(&mut session).await;
324 let started_at = Instant::now();
325
326 match session.prompt(&prompt).await {
327 Ok(result) => {
328 let result_text = result.text;
329 let response_message = Message {
330 message_id: Uuid::new_v4().to_string(),
331 role: MessageRole::Agent,
332 parts: vec![Part::Text {
333 text: result_text.clone(),
334 }],
335 context_id: params.message.context_id.clone(),
336 task_id: Some(task_id.clone()),
337 metadata: std::collections::HashMap::new(),
338 extensions: vec![],
339 };
340
341 let artifact = Artifact {
342 artifact_id: Uuid::new_v4().to_string(),
343 parts: vec![Part::Text {
344 text: result_text.clone(),
345 }],
346 name: Some("response".to_string()),
347 description: None,
348 metadata: std::collections::HashMap::new(),
349 extensions: vec![],
350 };
351
352 emit_a2a_outbound(server, &task_id, &response_message);
353
354 if let Some(mut t) = server.tasks.get_mut(&task_id) {
355 t.status.state = TaskState::Completed;
356 t.status.message = Some(response_message.clone());
357 t.status.timestamp = Some(chrono::Utc::now().to_rfc3339());
358 t.artifacts.push(artifact.clone());
359 t.history.push(response_message);
360
361 let status_event = TaskStatusUpdateEvent {
362 id: task_id.clone(),
363 status: t.status.clone(),
364 is_final: true,
365 metadata: std::collections::HashMap::new(),
366 };
367 let artifact_event = TaskArtifactUpdateEvent {
368 id: task_id.clone(),
369 artifact,
370 metadata: std::collections::HashMap::new(),
371 };
372 tracing::debug!(
373 task_id = %task_id,
374 event = ?StreamEvent::StatusUpdate(status_event),
375 "Task completed"
376 );
377 tracing::debug!(
378 task_id = %task_id,
379 event = ?StreamEvent::ArtifactUpdate(artifact_event),
380 "Artifact produced"
381 );
382 }
383
384 record_a2a_message_telemetry(
385 "a2a_message_send",
386 &task_id,
387 true,
388 &prompt,
389 started_at.elapsed(),
390 true,
391 Some(result_text),
392 None,
393 );
394 }
395 Err(e) => {
396 let error_message = Message {
397 message_id: Uuid::new_v4().to_string(),
398 role: MessageRole::Agent,
399 parts: vec![Part::Text {
400 text: format!("Error: {}", e),
401 }],
402 context_id: params.message.context_id.clone(),
403 task_id: Some(task_id.clone()),
404 metadata: std::collections::HashMap::new(),
405 extensions: vec![],
406 };
407
408 emit_a2a_outbound(server, &task_id, &error_message);
409
410 if let Some(mut t) = server.tasks.get_mut(&task_id) {
411 t.status.state = TaskState::Failed;
412 t.status.message = Some(error_message);
413 t.status.timestamp = Some(chrono::Utc::now().to_rfc3339());
414 }
415
416 record_a2a_message_telemetry(
417 "a2a_message_send",
418 &task_id,
419 true,
420 &prompt,
421 started_at.elapsed(),
422 false,
423 None,
424 Some(e.to_string()),
425 );
426 }
427 }
428 } else {
429 let tasks = server.tasks.clone();
431 let context_id = params.message.context_id.clone();
432 let spawn_task_id = task_id.clone();
433
434 tokio::spawn(async move {
435 let task_id = spawn_task_id;
436 let started_at = Instant::now();
437 let mut session = match Session::new().await {
438 Ok(s) => s,
439 Err(e) => {
440 tracing::error!("Failed to create session for task {}: {}", task_id, e);
441 if let Some(mut t) = tasks.get_mut(&task_id) {
442 t.status.state = TaskState::Failed;
443 t.status.timestamp = Some(chrono::Utc::now().to_rfc3339());
444 }
445 record_a2a_message_telemetry(
446 "a2a_message_send",
447 &task_id,
448 false,
449 &prompt,
450 started_at.elapsed(),
451 false,
452 None,
453 Some(e.to_string()),
454 );
455 return;
456 }
457 };
458 configure_a2a_session(&mut session).await;
459
460 match session.prompt(&prompt).await {
461 Ok(result) => {
462 let result_text = result.text;
463 let response_message = Message {
464 message_id: Uuid::new_v4().to_string(),
465 role: MessageRole::Agent,
466 parts: vec![Part::Text {
467 text: result_text.clone(),
468 }],
469 context_id,
470 task_id: Some(task_id.clone()),
471 metadata: std::collections::HashMap::new(),
472 extensions: vec![],
473 };
474
475 let artifact = Artifact {
476 artifact_id: Uuid::new_v4().to_string(),
477 parts: vec![Part::Text {
478 text: result_text.clone(),
479 }],
480 name: Some("response".to_string()),
481 description: None,
482 metadata: std::collections::HashMap::new(),
483 extensions: vec![],
484 };
485
486 if let Some(mut t) = tasks.get_mut(&task_id) {
487 t.status.state = TaskState::Completed;
488 t.status.message = Some(response_message.clone());
489 t.status.timestamp = Some(chrono::Utc::now().to_rfc3339());
490 t.artifacts.push(artifact);
491 t.history.push(response_message);
492 }
493
494 record_a2a_message_telemetry(
495 "a2a_message_send",
496 &task_id,
497 false,
498 &prompt,
499 started_at.elapsed(),
500 true,
501 Some(result_text),
502 None,
503 );
504 }
505 Err(e) => {
506 tracing::error!("Task {} failed: {}", task_id, e);
507 if let Some(mut t) = tasks.get_mut(&task_id) {
508 t.status.state = TaskState::Failed;
509 t.status.timestamp = Some(chrono::Utc::now().to_rfc3339());
510 }
511 record_a2a_message_telemetry(
512 "a2a_message_send",
513 &task_id,
514 false,
515 &prompt,
516 started_at.elapsed(),
517 false,
518 None,
519 Some(e.to_string()),
520 );
521 }
522 }
523 });
524 }
525
526 let task = server
528 .tasks
529 .get(&task_id)
530 .ok_or_else(|| JsonRpcError::internal_error(format!("Task disappeared: {}", task_id)))?;
531 let response = SendMessageResponse::Task(task.value().clone());
532 serde_json::to_value(response)
533 .map_err(|e| JsonRpcError::internal_error(format!("Serialization error: {}", e)))
534}
535
536async fn handle_message_stream(
537 server: &A2AServer,
538 request: JsonRpcRequest,
539) -> Result<serde_json::Value, JsonRpcError> {
540 let params: MessageSendParams = serde_json::from_value(request.params)
545 .map_err(|e| JsonRpcError::invalid_params(format!("Invalid parameters: {}", e)))?;
546
547 let task_id = params
548 .message
549 .task_id
550 .clone()
551 .unwrap_or_else(|| Uuid::new_v4().to_string());
552
553 let task = Task {
554 id: task_id.clone(),
555 context_id: params.message.context_id.clone(),
556 status: TaskStatus {
557 state: TaskState::Working,
558 message: Some(params.message.clone()),
559 timestamp: Some(chrono::Utc::now().to_rfc3339()),
560 },
561 artifacts: vec![],
562 history: vec![params.message.clone()],
563 metadata: std::collections::HashMap::new(),
564 };
565
566 server.tasks.insert(task_id.clone(), task.clone());
567 emit_a2a_inbound(server, &task_id, ¶ms.message);
568
569 let prompt: String = params
571 .message
572 .parts
573 .iter()
574 .filter_map(|p| match p {
575 Part::Text { text } => Some(text.as_str()),
576 _ => None,
577 })
578 .collect::<Vec<_>>()
579 .join("\n");
580
581 if prompt.is_empty() {
582 if let Some(mut t) = server.tasks.get_mut(&task_id) {
583 t.status.state = TaskState::Failed;
584 t.status.timestamp = Some(chrono::Utc::now().to_rfc3339());
585 }
586 return Err(JsonRpcError::invalid_params("No text content in message"));
587 }
588
589 let tasks = server.tasks.clone();
591 let context_id = params.message.context_id.clone();
592 let spawn_task_id = task_id.clone();
593 let bus = server.bus.clone();
594
595 tokio::spawn(async move {
596 let task_id = spawn_task_id;
597 let started_at = Instant::now();
598
599 let (event_tx, mut event_rx) = mpsc::channel::<SessionEvent>(256);
601
602 let mut session = match Session::new().await {
603 Ok(s) => s,
604 Err(e) => {
605 tracing::error!(
606 "Failed to create session for stream task {}: {}",
607 task_id,
608 e
609 );
610 if let Some(mut t) = tasks.get_mut(&task_id) {
611 t.status.state = TaskState::Failed;
612 t.status.timestamp = Some(chrono::Utc::now().to_rfc3339());
613 }
614 record_a2a_message_telemetry(
615 "a2a_message_stream",
616 &task_id,
617 false,
618 &prompt,
619 started_at.elapsed(),
620 false,
621 None,
622 Some(e.to_string()),
623 );
624 return;
625 }
626 };
627 configure_a2a_session(&mut session).await;
628
629 let bus_clone = bus.clone();
631 let task_id_clone = task_id.clone();
632 tokio::spawn(async move {
633 while let Some(event) = event_rx.recv().await {
634 let event_data = match &event {
635 SessionEvent::Thinking => {
636 serde_json::json!({ "type": "thinking" })
637 }
638 SessionEvent::ToolCallStart { name, arguments } => {
639 serde_json::json!({
640 "type": "tool_call_start",
641 "name": name,
642 "arguments": arguments
643 })
644 }
645 SessionEvent::ToolCallComplete {
646 name,
647 output,
648 success,
649 duration_ms,
650 } => {
651 serde_json::json!({
652 "type": "tool_call_complete",
653 "name": name,
654 "output": output.chars().take(500).collect::<String>(),
655 "success": success,
656 "duration_ms": duration_ms
657 })
658 }
659 SessionEvent::TextChunk(text) => {
660 serde_json::json!({ "type": "text_chunk", "text": text })
661 }
662 SessionEvent::TextComplete(text) => {
663 serde_json::json!({ "type": "text_complete", "text": text })
664 }
665 SessionEvent::ThinkingComplete(thought) => {
666 serde_json::json!({ "type": "thinking_complete", "thought": thought })
667 }
668 SessionEvent::UsageReport {
669 prompt_tokens,
670 completion_tokens,
671 duration_ms,
672 model,
673 } => {
674 serde_json::json!({
675 "type": "usage_report",
676 "prompt_tokens": prompt_tokens,
677 "completion_tokens": completion_tokens,
678 "duration_ms": duration_ms,
679 "model": model
680 })
681 }
682 SessionEvent::Done => {
683 serde_json::json!({ "type": "done" })
684 }
685 SessionEvent::Error(err) => {
686 serde_json::json!({ "type": "error", "error": err })
687 }
688 SessionEvent::SessionSync(_) => {
689 continue; }
691 _ => continue,
695 };
696
697 if let Some(ref bus) = bus_clone {
699 let handle = bus.handle("a2a-stream");
700 handle.send(
701 format!("task.{}", task_id_clone),
702 crate::bus::BusMessage::TaskUpdate {
703 task_id: task_id_clone.clone(),
704 state: crate::a2a::types::TaskState::Working,
705 message: Some(serde_json::to_string(&event_data).unwrap_or_default()),
706 },
707 );
708 }
709 }
710 });
711
712 let registry = match crate::provider::ProviderRegistry::from_vault().await {
714 Ok(r) => Arc::new(r),
715 Err(e) => {
716 tracing::error!("Failed to load provider registry: {}", e);
717 if let Some(mut t) = tasks.get_mut(&task_id) {
718 t.status.state = TaskState::Failed;
719 t.status.timestamp = Some(chrono::Utc::now().to_rfc3339());
720 }
721 return;
722 }
723 };
724
725 match session
726 .prompt_with_events(&prompt, event_tx, registry)
727 .await
728 {
729 Ok(result) => {
730 let result_text = result.text;
731 let response_message = Message {
732 message_id: Uuid::new_v4().to_string(),
733 role: MessageRole::Agent,
734 parts: vec![Part::Text {
735 text: result_text.clone(),
736 }],
737 context_id,
738 task_id: Some(task_id.clone()),
739 metadata: std::collections::HashMap::new(),
740 extensions: vec![],
741 };
742
743 let artifact = Artifact {
744 artifact_id: Uuid::new_v4().to_string(),
745 parts: vec![Part::Text {
746 text: result_text.clone(),
747 }],
748 name: Some("response".to_string()),
749 description: None,
750 metadata: std::collections::HashMap::new(),
751 extensions: vec![],
752 };
753
754 if let Some(mut t) = tasks.get_mut(&task_id) {
755 t.status.state = TaskState::Completed;
756 t.status.message = Some(response_message.clone());
757 t.status.timestamp = Some(chrono::Utc::now().to_rfc3339());
758 t.artifacts.push(artifact.clone());
759 t.history.push(response_message);
760
761 let status_event = TaskStatusUpdateEvent {
763 id: task_id.clone(),
764 status: t.status.clone(),
765 is_final: true,
766 metadata: std::collections::HashMap::new(),
767 };
768 let artifact_event = TaskArtifactUpdateEvent {
769 id: task_id.clone(),
770 artifact,
771 metadata: std::collections::HashMap::new(),
772 };
773 tracing::debug!(
774 task_id = %task_id,
775 event = ?StreamEvent::StatusUpdate(status_event),
776 "Task completed"
777 );
778 tracing::debug!(
779 task_id = %task_id,
780 event = ?StreamEvent::ArtifactUpdate(artifact_event),
781 "Artifact produced"
782 );
783 }
784
785 record_a2a_message_telemetry(
786 "a2a_message_stream",
787 &task_id,
788 false,
789 &prompt,
790 started_at.elapsed(),
791 true,
792 Some(result_text),
793 None,
794 );
795 }
796 Err(e) => {
797 tracing::error!("Stream task {} failed: {}", task_id, e);
798 if let Some(mut t) = tasks.get_mut(&task_id) {
799 t.status.state = TaskState::Failed;
800 t.status.timestamp = Some(chrono::Utc::now().to_rfc3339());
801 }
802 record_a2a_message_telemetry(
803 "a2a_message_stream",
804 &task_id,
805 false,
806 &prompt,
807 started_at.elapsed(),
808 false,
809 None,
810 Some(e.to_string()),
811 );
812 }
813 }
814 });
815
816 let response = SendMessageResponse::Task(task);
818 serde_json::to_value(response)
819 .map_err(|e| JsonRpcError::internal_error(format!("Serialization error: {}", e)))
820}
821
822async fn handle_tasks_get(
823 server: &A2AServer,
824 request: JsonRpcRequest,
825) -> Result<serde_json::Value, JsonRpcError> {
826 let params: TaskQueryParams = serde_json::from_value(request.params)
827 .map_err(|e| JsonRpcError::invalid_params(format!("Invalid parameters: {}", e)))?;
828
829 let task = server.tasks.get(¶ms.id).ok_or_else(|| JsonRpcError {
830 code: TASK_NOT_FOUND,
831 message: format!("Task not found: {}", params.id),
832 data: None,
833 })?;
834
835 serde_json::to_value(task.value().clone())
836 .map_err(|e| JsonRpcError::internal_error(format!("Serialization error: {}", e)))
837}
838
839async fn handle_tasks_cancel(
840 server: &A2AServer,
841 request: JsonRpcRequest,
842) -> Result<serde_json::Value, JsonRpcError> {
843 let params: TaskQueryParams = serde_json::from_value(request.params)
844 .map_err(|e| JsonRpcError::invalid_params(format!("Invalid parameters: {}", e)))?;
845
846 let mut task = server
847 .tasks
848 .get_mut(¶ms.id)
849 .ok_or_else(|| JsonRpcError {
850 code: TASK_NOT_FOUND,
851 message: format!("Task not found: {}", params.id),
852 data: None,
853 })?;
854
855 if !task.status.state.is_active() {
856 return Err(JsonRpcError {
857 code: TASK_NOT_CANCELABLE,
858 message: "Task is already in a terminal state".to_string(),
859 data: None,
860 });
861 }
862
863 task.status.state = TaskState::Cancelled;
864 task.status.timestamp = Some(chrono::Utc::now().to_rfc3339());
865
866 serde_json::to_value(task.value().clone())
867 .map_err(|e| JsonRpcError::internal_error(format!("Serialization error: {}", e)))
868}