1#[cfg(feature = "sqlx-storage")]
7use async_trait::async_trait;
8#[cfg(feature = "sqlx-storage")]
9use serde_json;
10#[cfg(feature = "sqlx-storage")]
11use sqlx::{Row, SqlitePool};
12
13#[cfg(feature = "sqlx-storage")]
14use crate::adapter::business::push_notification::{
15 PushNotificationRegistry, PushNotificationSender,
16};
17
18#[cfg(feature = "sqlx-storage")]
19#[cfg(feature = "http-client")]
20use crate::adapter::business::push_notification::HttpPushNotificationSender;
21#[cfg(feature = "sqlx-storage")]
22#[cfg(not(feature = "http-client"))]
23use crate::adapter::business::push_notification::NoopPushNotificationSender;
24
25#[cfg(feature = "sqlx-storage")]
26use crate::domain::{
27 A2AError, ContextId, Message, Task, TaskId, TaskPushNotificationConfig, TaskState, TaskStatus,
28 VersionedTask,
29};
30#[cfg(feature = "sqlx-storage")]
31use crate::port::{
32 AsyncNotificationManager, AsyncPushNotifier, AsyncTaskLifecycle, AsyncTaskQuery,
33 AsyncTaskVersioning,
34};
35
36#[cfg(feature = "sqlx-storage")]
37use std::sync::Arc;
38
39#[cfg(feature = "sqlx-storage")]
40pub struct SqlxTaskStorage {
48 pool: SqlitePool,
50 push_notification_registry: Arc<PushNotificationRegistry>,
52}
53
54#[cfg(feature = "sqlx-storage")]
55use super::database_config::DatabaseType;
56
57#[cfg(feature = "sqlx-storage")]
58impl SqlxTaskStorage {
59 fn validate_url(database_url: &str) -> Result<(), A2AError> {
64 match DatabaseType::from_url(database_url) {
65 Some(DatabaseType::Sqlite) => Ok(()),
66 Some(db_type) => Err(A2AError::DatabaseError(format!(
67 "{db_type} database detected from URL '{database_url}', but SqlxTaskStorage \
68 currently only supports SQLite. For {db_type} support, see the project roadmap."
69 ))),
70 None => Err(A2AError::DatabaseError(format!(
71 "Unrecognized database URL scheme in '{database_url}'. \
72 Expected a URL starting with sqlite:, e.g. 'sqlite::memory:' or 'sqlite:data.db'"
73 ))),
74 }
75 }
76
77 pub async fn new(database_url: &str) -> Result<Self, A2AError> {
82 Self::validate_url(database_url)?;
83
84 let pool = SqlitePool::connect(database_url).await.map_err(|e| {
85 A2AError::DatabaseError(format!("Failed to connect to database: {}", e))
86 })?;
87
88 Self::run_base_migrations(&pool).await?;
90
91 #[cfg(feature = "http-client")]
93 let push_sender = HttpPushNotificationSender::new();
94 #[cfg(not(feature = "http-client"))]
95 let push_sender = NoopPushNotificationSender::default();
96
97 let push_registry = PushNotificationRegistry::new(push_sender);
98
99 Ok(Self {
100 pool,
101 push_notification_registry: Arc::new(push_registry),
102 })
103 }
104
105 pub async fn with_push_sender(
109 database_url: &str,
110 push_sender: impl PushNotificationSender + 'static,
111 ) -> Result<Self, A2AError> {
112 Self::validate_url(database_url)?;
113
114 let pool = SqlitePool::connect(database_url).await.map_err(|e| {
115 A2AError::DatabaseError(format!("Failed to connect to database: {}", e))
116 })?;
117
118 Self::run_base_migrations(&pool).await?;
120
121 let push_registry = PushNotificationRegistry::new(push_sender);
122
123 Ok(Self {
124 pool,
125 push_notification_registry: Arc::new(push_registry),
126 })
127 }
128
129 pub async fn with_migrations(
133 database_url: &str,
134 additional_migrations: &[&str],
135 ) -> Result<Self, A2AError> {
136 Self::validate_url(database_url)?;
137
138 let pool = SqlitePool::connect(database_url).await.map_err(|e| {
139 A2AError::DatabaseError(format!("Failed to connect to database: {}", e))
140 })?;
141
142 Self::run_base_migrations(&pool).await?;
144
145 Self::run_additional_migrations(&pool, additional_migrations).await?;
147
148 #[cfg(feature = "http-client")]
150 let push_sender = HttpPushNotificationSender::new();
151 #[cfg(not(feature = "http-client"))]
152 let push_sender = NoopPushNotificationSender::default();
153
154 let push_registry = PushNotificationRegistry::new(push_sender);
155
156 Ok(Self {
157 pool,
158 push_notification_registry: Arc::new(push_registry),
159 })
160 }
161
162 async fn run_base_migrations(pool: &SqlitePool) -> Result<(), A2AError> {
164 sqlx::query(include_str!("../../../migrations/001_initial_schema.sql"))
165 .execute(pool)
166 .await
167 .map_err(|e| A2AError::DatabaseError(format!("Migration 001 failed: {}", e)))?;
168
169 sqlx::query(include_str!(
170 "../../../migrations/002_v030_push_configs.sql"
171 ))
172 .execute(pool)
173 .await
174 .map_err(|e| A2AError::DatabaseError(format!("Migration 002 failed: {}", e)))?;
175
176 if let Err(e) = sqlx::query(include_str!("../../../migrations/003_task_version.sql"))
180 .execute(pool)
181 .await
182 {
183 let msg = e.to_string();
184 if !msg.contains("duplicate column name") {
185 return Err(A2AError::DatabaseError(format!(
186 "Migration 003 failed: {msg}"
187 )));
188 }
189 }
190
191 Ok(())
192 }
193
194 async fn run_additional_migrations(
196 pool: &SqlitePool,
197 migrations: &[&str],
198 ) -> Result<(), A2AError> {
199 for (i, migration_sql) in migrations.iter().enumerate() {
200 sqlx::query(migration_sql)
201 .execute(pool)
202 .await
203 .map_err(|e| {
204 A2AError::DatabaseError(format!("Additional migration {} failed: {}", i + 1, e))
205 })?;
206 }
207 Ok(())
208 }
209
210 fn row_to_task(row: &sqlx::sqlite::SqliteRow) -> Result<Task, A2AError> {
212 let task_id: String = row
213 .try_get("id")
214 .map_err(|e| A2AError::DatabaseError(format!("Failed to get task_id: {}", e)))?;
215 let context_id: String = row
216 .try_get("context_id")
217 .map_err(|e| A2AError::DatabaseError(format!("Failed to get context_id: {}", e)))?;
218 let status_state: String = row
219 .try_get("status_state")
220 .map_err(|e| A2AError::DatabaseError(format!("Failed to get status_state: {}", e)))?;
221 let status_message_json: Option<String> = row
222 .try_get("status_message")
223 .map_err(|e| A2AError::DatabaseError(format!("Failed to get status_message: {}", e)))?;
224 let metadata_json: Option<String> = row
225 .try_get("metadata")
226 .map_err(|e| A2AError::DatabaseError(format!("Failed to get metadata: {}", e)))?;
227 let artifacts_json: Option<String> = row
228 .try_get("artifacts")
229 .map_err(|e| A2AError::DatabaseError(format!("Failed to get artifacts: {}", e)))?;
230
231 let state = match status_state.as_str() {
233 "submitted" => TaskState::Submitted,
234 "working" => TaskState::Working,
235 "input-required" => TaskState::InputRequired,
236 "completed" => TaskState::Completed,
237 "canceled" => TaskState::Canceled,
238 "failed" => TaskState::Failed,
239 "rejected" => TaskState::Rejected,
240 "auth-required" => TaskState::AuthRequired,
241 "unknown" => TaskState::Unknown,
242 _ => TaskState::Unknown,
243 };
244
245 let status_message = if let Some(msg_str) = status_message_json {
247 Some(serde_json::from_str(&msg_str).map_err(|e| {
248 A2AError::DatabaseError(format!("Failed to parse status message: {}", e))
249 })?)
250 } else {
251 None
252 };
253
254 let metadata =
256 if let Some(meta_str) = metadata_json {
257 Some(serde_json::from_str(&meta_str).map_err(|e| {
258 A2AError::DatabaseError(format!("Failed to parse metadata: {}", e))
259 })?)
260 } else {
261 None
262 };
263
264 let artifacts = if let Some(artifacts_str) = artifacts_json {
266 Some(serde_json::from_str(&artifacts_str).map_err(|e| {
267 A2AError::DatabaseError(format!("Failed to parse artifacts: {}", e))
268 })?)
269 } else {
270 None
271 };
272
273 let now = chrono::Utc::now();
274 let task_status = TaskStatus {
275 state: ::buffa::EnumValue::from(state),
276 message: status_message.into(),
277 timestamp: ::buffa::MessageField::some(::buffa_types::google::protobuf::Timestamp {
278 seconds: now.timestamp(),
279 nanos: now.timestamp_subsec_nanos() as i32,
280 ..Default::default()
281 }),
282 ..Default::default()
283 };
284
285 let task = Task {
286 id: task_id.clone(),
287 context_id,
288 status: ::buffa::MessageField::some(task_status),
289 history: Vec::new(),
290 metadata: metadata.into(),
291 artifacts: artifacts.unwrap_or_default(),
292 ..Default::default()
293 };
294
295 Ok(task)
296 }
297
298 async fn load_task_history(
300 &self,
301 task_id: &str,
302 limit: Option<u32>,
303 ) -> Result<Vec<Message>, A2AError> {
304 let query_str = if let Some(limit) = limit {
305 format!(
306 "SELECT timestamp, status_state, message FROM task_history WHERE task_id = ? ORDER BY timestamp DESC LIMIT {}",
307 limit
308 )
309 } else {
310 "SELECT timestamp, status_state, message FROM task_history WHERE task_id = ? ORDER BY timestamp DESC".to_string()
311 };
312
313 let query = sqlx::query(&query_str);
314
315 let rows = query
316 .bind(task_id)
317 .fetch_all(&self.pool)
318 .await
319 .map_err(|e| A2AError::DatabaseError(format!("Failed to load task history: {}", e)))?;
320
321 let mut history = Vec::new();
322 for row in rows {
323 let message_json: Option<String> = row.try_get("message").map_err(|e| {
324 A2AError::DatabaseError(format!("Failed to get message from history: {}", e))
325 })?;
326
327 if let Some(msg_str) = message_json {
328 let message: Message = serde_json::from_str(&msg_str).map_err(|e| {
329 A2AError::DatabaseError(format!("Failed to parse message from history: {}", e))
330 })?;
331 history.push(message);
332 }
333 }
334
335 history.reverse();
337 Ok(history)
338 }
339
340 async fn add_to_history(
342 &self,
343 task_id: &str,
344 state: TaskState,
345 message: Option<Message>,
346 ) -> Result<(), A2AError> {
347 let state_str = match state {
348 TaskState::Submitted => "submitted",
349 TaskState::Working => "working",
350 TaskState::InputRequired => "input-required",
351 TaskState::Completed => "completed",
352 TaskState::Canceled => "canceled",
353 TaskState::Failed => "failed",
354 TaskState::Rejected => "rejected",
355 TaskState::AuthRequired => "auth-required",
356 TaskState::Unknown => "unknown",
357 };
358
359 let message_json = if let Some(msg) = message {
360 Some(serde_json::to_string(&msg).map_err(|e| {
361 A2AError::DatabaseError(format!("Failed to serialize message: {}", e))
362 })?)
363 } else {
364 None
365 };
366
367 sqlx::query("INSERT INTO task_history (task_id, status_state, message) VALUES (?, ?, ?)")
368 .bind(task_id)
369 .bind(state_str)
370 .bind(message_json)
371 .execute(&self.pool)
372 .await
373 .map_err(|e| A2AError::DatabaseError(format!("Failed to add task history: {}", e)))?;
374
375 Ok(())
376 }
377
378 pub fn push_notifier(&self) -> Arc<dyn AsyncPushNotifier> {
385 self.push_notification_registry.clone()
386 }
387}
388
389#[cfg(feature = "sqlx-storage")]
390#[async_trait]
391impl AsyncTaskLifecycle for SqlxTaskStorage {
392 async fn create(&self, id: &TaskId, context_id: &ContextId) -> Result<Task, A2AError> {
393 let task_id = id.as_str();
394 let context_id = context_id.as_str();
395 let existing = sqlx::query("SELECT id FROM tasks WHERE id = ?")
397 .bind(task_id)
398 .fetch_optional(&self.pool)
399 .await
400 .map_err(|e| {
401 A2AError::DatabaseError(format!("Failed to check existing task: {}", e))
402 })?;
403
404 if existing.is_some() {
405 return Err(A2AError::TaskNotFound(format!(
406 "Task {} already exists",
407 task_id
408 )));
409 }
410
411 let task = Task::new(task_id.to_string(), context_id.to_string());
413
414 let metadata_json = task
416 .metadata
417 .as_option()
418 .map(|m| serde_json::to_string(m).unwrap_or_default());
419 let artifacts_json = serde_json::to_string(&task.artifacts).unwrap_or_default();
420 let status_message_str = task
421 .status
422 .as_option()
423 .and_then(|s| s.message.as_option())
424 .map(|m| serde_json::to_string(m).unwrap_or_default());
425
426 sqlx::query("INSERT INTO tasks (id, context_id, status_state, status_message, metadata, artifacts) VALUES (?, ?, ?, ?, ?, ?)")
428 .bind(&task.id)
429 .bind(&task.context_id)
430 .bind("submitted")
431 .bind(status_message_str)
432 .bind(metadata_json)
433 .bind(artifacts_json)
434 .execute(&self.pool)
435 .await
436 .map_err(|e| A2AError::DatabaseError(format!("Failed to create task: {}", e)))?;
437
438 self.add_to_history(task_id, TaskState::Submitted, None)
440 .await?;
441
442 Ok(task)
443 }
444
445 async fn update_status(
446 &self,
447 id: &TaskId,
448 state: TaskState,
449 message: Option<Message>,
450 ) -> Result<Task, A2AError> {
451 let task_id = id.as_str();
452 let state_str = match state {
454 TaskState::Submitted => "submitted",
455 TaskState::Working => "working",
456 TaskState::InputRequired => "input-required",
457 TaskState::Completed => "completed",
458 TaskState::Canceled => "canceled",
459 TaskState::Failed => "failed",
460 TaskState::Rejected => "rejected",
461 TaskState::AuthRequired => "auth-required",
462 TaskState::Unknown => "unknown",
463 };
464
465 let result =
467 sqlx::query("UPDATE tasks SET status_state = ?, version = version + 1 WHERE id = ?")
468 .bind(state_str)
469 .bind(task_id)
470 .execute(&self.pool)
471 .await
472 .map_err(|e| {
473 A2AError::DatabaseError(format!("Failed to update task status: {}", e))
474 })?;
475
476 if result.rows_affected() == 0 {
477 return Err(A2AError::TaskNotFound(task_id.to_string()));
478 }
479
480 self.add_to_history(task_id, state, message).await?;
482
483 self.get(id, None).await
487 }
488
489 async fn exists(&self, id: &TaskId) -> Result<bool, A2AError> {
490 let task_id = id.as_str();
491 let row = sqlx::query("SELECT id FROM tasks WHERE id = ?")
492 .bind(task_id)
493 .fetch_optional(&self.pool)
494 .await
495 .map_err(|e| {
496 A2AError::DatabaseError(format!("Failed to check task existence: {}", e))
497 })?;
498
499 Ok(row.is_some())
500 }
501
502 async fn get(&self, id: &TaskId, history_length: Option<u32>) -> Result<Task, A2AError> {
503 let task_id = id.as_str();
504 let row = sqlx::query("SELECT * FROM tasks WHERE id = ?")
506 .bind(task_id)
507 .fetch_optional(&self.pool)
508 .await
509 .map_err(|e| A2AError::DatabaseError(format!("Failed to get task: {}", e)))?;
510
511 let Some(row) = row else {
512 return Err(A2AError::TaskNotFound(task_id.to_string()));
513 };
514
515 let mut task = Self::row_to_task(&row)?;
516
517 if history_length.is_some() || history_length.is_none() {
519 let history = self.load_task_history(task_id, history_length).await?;
520 task.history = history;
521 }
522
523 Ok(task)
524 }
525
526 async fn cancel(&self, id: &TaskId) -> Result<Task, A2AError> {
527 let task_id = id.as_str();
528 let task = self.get(id, None).await?;
530
531 if task.status.state != TaskState::Working {
533 return Err(A2AError::TaskNotCancelable(format!(
534 "Task {} is in state {:?} and cannot be canceled",
535 task_id, task.status.state
536 )));
537 }
538
539 let mut cancel_message = Message::agent_text(
541 format!("Task {} canceled.", task_id),
542 uuid::Uuid::new_v4().to_string(),
543 );
544 cancel_message.task_id = task_id.to_string();
545 cancel_message.context_id = task.context_id.clone();
546
547 sqlx::query("UPDATE tasks SET status_state = ?, version = version + 1 WHERE id = ?")
549 .bind("canceled")
550 .bind(task_id)
551 .execute(&self.pool)
552 .await
553 .map_err(|e| A2AError::DatabaseError(format!("Failed to cancel task: {}", e)))?;
554
555 self.add_to_history(task_id, TaskState::Canceled, Some(cancel_message))
557 .await?;
558
559 self.get(id, None).await
562 }
563}
564
565#[cfg(feature = "sqlx-storage")]
566impl SqlxTaskStorage {
567 async fn current_version(&self, task_id: &str) -> Result<Option<u64>, A2AError> {
569 let row = sqlx::query("SELECT version FROM tasks WHERE id = ?")
570 .bind(task_id)
571 .fetch_optional(&self.pool)
572 .await
573 .map_err(|e| A2AError::DatabaseError(format!("Failed to read task version: {}", e)))?;
574 match row {
575 Some(row) => {
576 let v: i64 = row.try_get("version").map_err(|e| {
577 A2AError::DatabaseError(format!("Failed to get version column: {}", e))
578 })?;
579 Ok(Some(v as u64))
580 }
581 None => Ok(None),
582 }
583 }
584}
585
586#[cfg(feature = "sqlx-storage")]
587#[async_trait]
588impl AsyncTaskVersioning for SqlxTaskStorage {
589 async fn version(&self, id: &TaskId) -> Result<u64, A2AError> {
590 self.current_version(id.as_str())
591 .await?
592 .ok_or_else(|| A2AError::TaskNotFound(id.as_str().to_string()))
593 }
594
595 async fn get_versioned(
596 &self,
597 id: &TaskId,
598 history_length: Option<u32>,
599 ) -> Result<VersionedTask, A2AError> {
600 let task = self.get(id, history_length).await?;
601 let version = self.version(id).await?;
602 Ok(VersionedTask::new(task, version))
603 }
604
605 async fn update_status_checked(
606 &self,
607 id: &TaskId,
608 expected: u64,
609 state: TaskState,
610 message: Option<Message>,
611 ) -> Result<VersionedTask, A2AError> {
612 let task_id = id.as_str();
613 let state_str = match state {
614 TaskState::Submitted => "submitted",
615 TaskState::Working => "working",
616 TaskState::InputRequired => "input-required",
617 TaskState::Completed => "completed",
618 TaskState::Canceled => "canceled",
619 TaskState::Failed => "failed",
620 TaskState::Rejected => "rejected",
621 TaskState::AuthRequired => "auth-required",
622 TaskState::Unknown => "unknown",
623 };
624
625 let result = sqlx::query(
628 "UPDATE tasks SET status_state = ?, version = version + 1 WHERE id = ? AND version = ?",
629 )
630 .bind(state_str)
631 .bind(task_id)
632 .bind(expected as i64)
633 .execute(&self.pool)
634 .await
635 .map_err(|e| A2AError::DatabaseError(format!("Failed to update task status: {}", e)))?;
636
637 if result.rows_affected() == 0 {
638 return match self.current_version(task_id).await? {
640 Some(actual) => Err(A2AError::VersionConflict {
641 id: task_id.to_string(),
642 expected,
643 actual,
644 }),
645 None => Err(A2AError::TaskNotFound(task_id.to_string())),
646 };
647 }
648
649 self.add_to_history(task_id, state, message).await?;
650 let task = self.get(id, None).await?;
651 Ok(VersionedTask::new(task, expected + 1))
652 }
653}
654
655#[cfg(feature = "sqlx-storage")]
656#[async_trait]
657impl AsyncTaskQuery for SqlxTaskStorage {
658 async fn list(
659 &self,
660 params: &crate::domain::ListTasksParams,
661 ) -> Result<crate::domain::ListTasksResult, A2AError> {
662 use crate::domain::ListTasksResult;
663
664 let mut where_conditions = Vec::new();
666
667 if params.context_id.is_some() {
669 where_conditions.push("context_id = ?".to_string());
670 }
671
672 if params.status.is_some() {
674 where_conditions.push("status_state = ?".to_string());
675 }
676
677 let timestamp_str = if let Some(status_timestamp_after) = ¶ms.status_timestamp_after {
679 let timestamp =
681 chrono::DateTime::parse_from_rfc3339(status_timestamp_after).map_err(|e| {
682 A2AError::DatabaseError(format!(
683 "Invalid timestamp value: {} ({})",
684 status_timestamp_after, e
685 ))
686 })?;
687 where_conditions.push("updated_at >= ?".to_string());
688 Some(
689 timestamp
690 .with_timezone(&chrono::Utc)
691 .format("%Y-%m-%d %H:%M:%S")
692 .to_string(),
693 )
694 } else {
695 None
696 };
697
698 let where_clause = if where_conditions.is_empty() {
700 String::new()
701 } else {
702 format!(" WHERE {}", where_conditions.join(" AND "))
703 };
704
705 let count_query = format!("SELECT COUNT(*) as count FROM tasks{}", where_clause);
707 let mut count_q = sqlx::query(&count_query);
708
709 if let Some(ref context_id) = params.context_id {
711 count_q = count_q.bind(context_id);
712 }
713 if let Some(ref status) = params.status {
714 let state_str = match *status {
715 crate::domain::TaskState::Submitted => "submitted",
716 crate::domain::TaskState::Working => "working",
717 crate::domain::TaskState::InputRequired => "input-required",
718 crate::domain::TaskState::Completed => "completed",
719 crate::domain::TaskState::Canceled => "canceled",
720 crate::domain::TaskState::Failed => "failed",
721 crate::domain::TaskState::Rejected => "rejected",
722 crate::domain::TaskState::AuthRequired => "auth-required",
723 crate::domain::TaskState::Unknown => "unknown",
724 };
725 count_q = count_q.bind(state_str);
726 }
727 if let Some(ref ts) = timestamp_str {
728 count_q = count_q.bind(ts);
729 }
730
731 let count_row = count_q
732 .fetch_one(&self.pool)
733 .await
734 .map_err(|e| A2AError::DatabaseError(format!("Failed to count tasks: {}", e)))?;
735
736 let total_size: i32 = count_row
737 .try_get("count")
738 .map_err(|e| A2AError::DatabaseError(format!("Failed to get count: {}", e)))?;
739
740 let page_size = params.page_size.unwrap_or(50).clamp(1, 100);
742 let offset = if let Some(ref token) = params.page_token {
743 token.parse::<i32>().unwrap_or(0)
744 } else {
745 0
746 };
747
748 let main_query = format!(
750 "SELECT * FROM tasks{} ORDER BY updated_at DESC LIMIT ? OFFSET ?",
751 where_clause
752 );
753
754 let mut main_q = sqlx::query(&main_query);
755
756 if let Some(ref context_id) = params.context_id {
758 main_q = main_q.bind(context_id);
759 }
760 if let Some(ref status) = params.status {
761 let state_str = match *status {
762 crate::domain::TaskState::Submitted => "submitted",
763 crate::domain::TaskState::Working => "working",
764 crate::domain::TaskState::InputRequired => "input-required",
765 crate::domain::TaskState::Completed => "completed",
766 crate::domain::TaskState::Canceled => "canceled",
767 crate::domain::TaskState::Failed => "failed",
768 crate::domain::TaskState::Rejected => "rejected",
769 crate::domain::TaskState::AuthRequired => "auth-required",
770 crate::domain::TaskState::Unknown => "unknown",
771 };
772 main_q = main_q.bind(state_str);
773 }
774 if let Some(ref ts) = timestamp_str {
775 main_q = main_q.bind(ts);
776 }
777
778 main_q = main_q.bind(page_size).bind(offset);
780
781 let rows = main_q
782 .fetch_all(&self.pool)
783 .await
784 .map_err(|e| A2AError::DatabaseError(format!("Failed to list tasks: {}", e)))?;
785
786 let mut tasks: Vec<Task> = rows
788 .iter()
789 .filter_map(|row| Self::row_to_task(row).ok())
790 .collect();
791
792 let history_length = params.history_length.unwrap_or(0);
794 for task in &mut tasks {
795 if history_length > 0 {
796 let history = self
797 .load_task_history(&task.id, Some(history_length as u32))
798 .await?;
799 task.history = history;
800 } else {
801 task.history.clear();
802 }
803
804 if !params.include_artifacts.unwrap_or(false) {
806 task.artifacts.clear();
807 }
808 }
809
810 let has_more = offset + page_size < total_size;
812 let next_page_token = if has_more {
813 (offset + page_size).to_string()
814 } else {
815 String::new()
816 };
817
818 Ok(ListTasksResult {
819 tasks,
820 total_size,
821 page_size,
822 next_page_token,
823 })
824 }
825}
826
827#[cfg(feature = "sqlx-storage")]
828#[async_trait]
829impl AsyncNotificationManager for SqlxTaskStorage {
830 async fn get_config(
831 &self,
832 params: &crate::domain::GetTaskPushNotificationConfigParams,
833 ) -> Result<crate::domain::TaskPushNotificationConfig, A2AError> {
834 let row = match params.push_notification_config_id.as_ref() {
839 Some(config_id) => sqlx::query(
840 "SELECT id, task_id, url, token, authentication FROM push_notification_configs WHERE task_id = ? AND id = ?"
841 )
842 .bind(¶ms.id)
843 .bind(config_id),
844 None => sqlx::query(
845 "SELECT id, task_id, url, token, authentication FROM push_notification_configs WHERE task_id = ? ORDER BY id LIMIT 1"
846 )
847 .bind(¶ms.id),
848 }
849 .fetch_optional(&self.pool)
850 .await
851 .map_err(|e| A2AError::DatabaseError(format!("Failed to get push config: {}", e)))?;
852
853 if let Some(row) = row {
854 let id: String = row
855 .try_get("id")
856 .map_err(|e| A2AError::DatabaseError(format!("Failed to get config id: {}", e)))?;
857 let url: String = row
858 .try_get("url")
859 .map_err(|e| A2AError::DatabaseError(format!("Failed to get url: {}", e)))?;
860 let token: Option<String> = row.try_get("token").ok();
861 let auth_json: Option<String> = row.try_get("authentication").ok();
862
863 let auth_info = if let Some(auth_str) = auth_json {
864 serde_json::from_str(&auth_str).ok()
865 } else {
866 None
867 };
868
869 Ok(crate::domain::TaskPushNotificationConfig {
870 task_id: params.id.clone(),
871 id,
872 url,
873 token: token.unwrap_or_default(),
874 authentication: auth_info.into(),
875 tenant: "".to_string(),
876 ..Default::default()
877 })
878 } else {
879 Err(A2AError::TaskNotFound(format!(
880 "Push notification config not found for task {}{}",
881 params.id,
882 params
883 .push_notification_config_id
884 .as_ref()
885 .map(|id| format!(" with id {}", id))
886 .unwrap_or_default()
887 )))
888 }
889 }
890
891 async fn list_configs(
892 &self,
893 params: &crate::domain::ListTaskPushNotificationConfigsParams,
894 ) -> Result<Vec<crate::domain::TaskPushNotificationConfig>, A2AError> {
895 let rows = sqlx::query(
897 "SELECT id, task_id, url, token, authentication FROM push_notification_configs WHERE task_id = ?"
898 )
899 .bind(¶ms.id)
900 .fetch_all(&self.pool)
901 .await
902 .map_err(|e| A2AError::DatabaseError(format!("Failed to list push configs: {}", e)))?;
903
904 let configs: Vec<crate::domain::TaskPushNotificationConfig> = rows
905 .iter()
906 .filter_map(|row| {
907 let id: String = row.try_get("id").ok()?;
908 let url: String = row.try_get("url").ok()?;
909 let token: Option<String> = row.try_get("token").ok().flatten();
910 let auth_json: Option<String> = row.try_get("authentication").ok().flatten();
911
912 let auth_info = if let Some(auth_str) = auth_json {
913 serde_json::from_str(&auth_str).ok()
914 } else {
915 None
916 };
917
918 Some(crate::domain::TaskPushNotificationConfig {
919 task_id: params.id.clone(),
920 id,
921 url,
922 token: token.unwrap_or_default(),
923 authentication: auth_info.into(),
924 tenant: "".to_string(),
925 ..Default::default()
926 })
927 })
928 .collect();
929
930 Ok(configs)
931 }
932
933 async fn delete_config(
934 &self,
935 params: &crate::domain::DeleteTaskPushNotificationConfigParams,
936 ) -> Result<(), A2AError> {
937 let query = if params.push_notification_config_id.is_empty() {
941 sqlx::query("DELETE FROM push_notification_configs WHERE task_id = ?").bind(¶ms.id)
942 } else {
943 sqlx::query("DELETE FROM push_notification_configs WHERE task_id = ? AND id = ?")
944 .bind(¶ms.id)
945 .bind(¶ms.push_notification_config_id)
946 };
947 let _result = query
948 .execute(&self.pool)
949 .await
950 .map_err(|e| A2AError::DatabaseError(format!("Failed to delete push config: {}", e)))?;
951
952 Ok(())
954 }
955
956 async fn set_config(
957 &self,
958 config: &TaskPushNotificationConfig,
959 ) -> Result<TaskPushNotificationConfig, A2AError> {
960 let config_id = if config.id.is_empty() {
962 uuid::Uuid::new_v4().to_string()
963 } else {
964 config.id.clone()
965 };
966
967 let auth_json = config
969 .authentication
970 .as_option()
971 .map(|auth| serde_json::to_string(auth).unwrap_or_default());
972
973 sqlx::query(
975 "INSERT OR REPLACE INTO push_notification_configs (id, task_id, url, token, authentication) VALUES (?, ?, ?, ?, ?)",
976 )
977 .bind(&config_id)
978 .bind(&config.task_id)
979 .bind(&config.url)
980 .bind(&config.token)
981 .bind(auth_json)
982 .execute(&self.pool)
983 .await
984 .map_err(|e| {
985 A2AError::DatabaseError(format!("Failed to set push notification config: {}", e))
986 })?;
987
988 self.push_notification_registry
990 .register(&config.task_id, config.clone())
991 .await?;
992
993 let mut result_config = config.clone();
995 result_config.id = config_id;
996 Ok(result_config)
997 }
998}
999
1000#[cfg(feature = "sqlx-storage")]
1001impl Clone for SqlxTaskStorage {
1002 fn clone(&self) -> Self {
1003 Self {
1004 pool: self.pool.clone(),
1005 push_notification_registry: self.push_notification_registry.clone(),
1006 }
1007 }
1008}