Skip to main content

a2a_rs/adapter/storage/
sqlx_storage.rs

1//! SQLx-based task storage implementation
2//!
3//! This module provides a persistent storage solution using SQLx, supporting
4//! SQLite, PostgreSQL, and MySQL databases.
5
6#[cfg(feature = "sqlx-storage")]
7use async_trait::async_trait;
8#[cfg(feature = "sqlx-storage")]
9use serde_json;
10#[cfg(feature = "sqlx-storage")]
11use sqlx::{Row, SqlitePool};
12
13#[cfg(feature = "sqlx-storage")]
14use crate::adapter::business::push_notification::{
15    PushNotificationRegistry, PushNotificationSender,
16};
17
18#[cfg(feature = "sqlx-storage")]
19#[cfg(feature = "http-client")]
20use crate::adapter::business::push_notification::HttpPushNotificationSender;
21#[cfg(feature = "sqlx-storage")]
22#[cfg(not(feature = "http-client"))]
23use crate::adapter::business::push_notification::NoopPushNotificationSender;
24
25#[cfg(feature = "sqlx-storage")]
26use crate::domain::{
27    A2AError, ContextId, Message, Task, TaskId, TaskPushNotificationConfig, TaskState, TaskStatus,
28    VersionedTask,
29};
30#[cfg(feature = "sqlx-storage")]
31use crate::port::{
32    AsyncNotificationManager, AsyncPushNotifier, AsyncTaskLifecycle, AsyncTaskQuery,
33    AsyncTaskVersioning,
34};
35
36#[cfg(feature = "sqlx-storage")]
37use std::sync::Arc;
38
39#[cfg(feature = "sqlx-storage")]
40/// SQLx-based task storage for persistent storage.
41///
42/// Persistence-only: streaming fan-out lives in
43/// [`InMemoryStreamingHandler`](crate::adapter::InMemoryStreamingHandler) and
44/// push-webhook delivery behind the [`AsyncPushNotifier`] port (handed out via
45/// [`push_notifier`](Self::push_notifier)). The store still owns push-config
46/// CRUD ([`AsyncNotificationManager`]) — that is config persistence.
47pub struct SqlxTaskStorage {
48    /// Database pool
49    pool: SqlitePool,
50    /// Push notification registry (config store + delivery backend)
51    push_notification_registry: Arc<PushNotificationRegistry>,
52}
53
54#[cfg(feature = "sqlx-storage")]
55use super::database_config::DatabaseType;
56
57#[cfg(feature = "sqlx-storage")]
58impl SqlxTaskStorage {
59    /// Validate that the database URL is a supported SQLite URL.
60    ///
61    /// Returns an error if the URL points to a different database type
62    /// or if the required feature is not enabled.
63    fn validate_url(database_url: &str) -> Result<(), A2AError> {
64        match DatabaseType::from_url(database_url) {
65            Some(DatabaseType::Sqlite) => Ok(()),
66            Some(db_type) => Err(A2AError::DatabaseError(format!(
67                "{db_type} database detected from URL '{database_url}', but SqlxTaskStorage \
68                 currently only supports SQLite. For {db_type} support, see the project roadmap."
69            ))),
70            None => Err(A2AError::DatabaseError(format!(
71                "Unrecognized database URL scheme in '{database_url}'. \
72                 Expected a URL starting with sqlite:, e.g. 'sqlite::memory:' or 'sqlite:data.db'"
73            ))),
74        }
75    }
76
77    /// Create a new SQLx task storage with the given database URL.
78    ///
79    /// Currently only SQLite URLs are supported (e.g. `sqlite::memory:`, `sqlite:data.db`).
80    /// Passing a PostgreSQL or MySQL URL will return an error.
81    pub async fn new(database_url: &str) -> Result<Self, A2AError> {
82        Self::validate_url(database_url)?;
83
84        let pool = SqlitePool::connect(database_url).await.map_err(|e| {
85            A2AError::DatabaseError(format!("Failed to connect to database: {}", e))
86        })?;
87
88        // Run base migrations
89        Self::run_base_migrations(&pool).await?;
90
91        // Use the appropriate push notification sender based on available features
92        #[cfg(feature = "http-client")]
93        let push_sender = HttpPushNotificationSender::new();
94        #[cfg(not(feature = "http-client"))]
95        let push_sender = NoopPushNotificationSender::default();
96
97        let push_registry = PushNotificationRegistry::new(push_sender);
98
99        Ok(Self {
100            pool,
101            push_notification_registry: Arc::new(push_registry),
102        })
103    }
104
105    /// Create a new SQLx task storage with a custom push notification sender.
106    ///
107    /// Currently only SQLite URLs are supported.
108    pub async fn with_push_sender(
109        database_url: &str,
110        push_sender: impl PushNotificationSender + 'static,
111    ) -> Result<Self, A2AError> {
112        Self::validate_url(database_url)?;
113
114        let pool = SqlitePool::connect(database_url).await.map_err(|e| {
115            A2AError::DatabaseError(format!("Failed to connect to database: {}", e))
116        })?;
117
118        // Run migrations
119        Self::run_base_migrations(&pool).await?;
120
121        let push_registry = PushNotificationRegistry::new(push_sender);
122
123        Ok(Self {
124            pool,
125            push_notification_registry: Arc::new(push_registry),
126        })
127    }
128
129    /// Create a new SQLx task storage with additional migrations.
130    ///
131    /// Currently only SQLite URLs are supported.
132    pub async fn with_migrations(
133        database_url: &str,
134        additional_migrations: &[&str],
135    ) -> Result<Self, A2AError> {
136        Self::validate_url(database_url)?;
137
138        let pool = SqlitePool::connect(database_url).await.map_err(|e| {
139            A2AError::DatabaseError(format!("Failed to connect to database: {}", e))
140        })?;
141
142        // Run base migrations
143        Self::run_base_migrations(&pool).await?;
144
145        // Run additional migrations
146        Self::run_additional_migrations(&pool, additional_migrations).await?;
147
148        // Use the appropriate push notification sender based on available features
149        #[cfg(feature = "http-client")]
150        let push_sender = HttpPushNotificationSender::new();
151        #[cfg(not(feature = "http-client"))]
152        let push_sender = NoopPushNotificationSender::default();
153
154        let push_registry = PushNotificationRegistry::new(push_sender);
155
156        Ok(Self {
157            pool,
158            push_notification_registry: Arc::new(push_registry),
159        })
160    }
161
162    /// Run base A2A framework migrations (SQLite dialect).
163    async fn run_base_migrations(pool: &SqlitePool) -> Result<(), A2AError> {
164        sqlx::query(include_str!("../../../migrations/001_initial_schema.sql"))
165            .execute(pool)
166            .await
167            .map_err(|e| A2AError::DatabaseError(format!("Migration 001 failed: {}", e)))?;
168
169        sqlx::query(include_str!(
170            "../../../migrations/002_v030_push_configs.sql"
171        ))
172        .execute(pool)
173        .await
174        .map_err(|e| A2AError::DatabaseError(format!("Migration 002 failed: {}", e)))?;
175
176        // Migration 003 is an `ALTER TABLE ADD COLUMN`, which SQLite cannot
177        // express idempotently. Since base migrations re-run on every `new()`,
178        // tolerate the "duplicate column name" error on an already-migrated DB.
179        if let Err(e) = sqlx::query(include_str!("../../../migrations/003_task_version.sql"))
180            .execute(pool)
181            .await
182        {
183            let msg = e.to_string();
184            if !msg.contains("duplicate column name") {
185                return Err(A2AError::DatabaseError(format!(
186                    "Migration 003 failed: {msg}"
187                )));
188            }
189        }
190
191        Ok(())
192    }
193
194    /// Run additional migrations provided by the application
195    async fn run_additional_migrations(
196        pool: &SqlitePool,
197        migrations: &[&str],
198    ) -> Result<(), A2AError> {
199        for (i, migration_sql) in migrations.iter().enumerate() {
200            sqlx::query(migration_sql)
201                .execute(pool)
202                .await
203                .map_err(|e| {
204                    A2AError::DatabaseError(format!("Additional migration {} failed: {}", i + 1, e))
205                })?;
206        }
207        Ok(())
208    }
209
210    /// Convert database row to Task
211    fn row_to_task(row: &sqlx::sqlite::SqliteRow) -> Result<Task, A2AError> {
212        let task_id: String = row
213            .try_get("id")
214            .map_err(|e| A2AError::DatabaseError(format!("Failed to get task_id: {}", e)))?;
215        let context_id: String = row
216            .try_get("context_id")
217            .map_err(|e| A2AError::DatabaseError(format!("Failed to get context_id: {}", e)))?;
218        let status_state: String = row
219            .try_get("status_state")
220            .map_err(|e| A2AError::DatabaseError(format!("Failed to get status_state: {}", e)))?;
221        let status_message_json: Option<String> = row
222            .try_get("status_message")
223            .map_err(|e| A2AError::DatabaseError(format!("Failed to get status_message: {}", e)))?;
224        let metadata_json: Option<String> = row
225            .try_get("metadata")
226            .map_err(|e| A2AError::DatabaseError(format!("Failed to get metadata: {}", e)))?;
227        let artifacts_json: Option<String> = row
228            .try_get("artifacts")
229            .map_err(|e| A2AError::DatabaseError(format!("Failed to get artifacts: {}", e)))?;
230
231        // Parse task state
232        let state = match status_state.as_str() {
233            "submitted" => TaskState::Submitted,
234            "working" => TaskState::Working,
235            "input-required" => TaskState::InputRequired,
236            "completed" => TaskState::Completed,
237            "canceled" => TaskState::Canceled,
238            "failed" => TaskState::Failed,
239            "rejected" => TaskState::Rejected,
240            "auth-required" => TaskState::AuthRequired,
241            "unknown" => TaskState::Unknown,
242            _ => TaskState::Unknown,
243        };
244
245        // Parse status message
246        let status_message = if let Some(msg_str) = status_message_json {
247            Some(serde_json::from_str(&msg_str).map_err(|e| {
248                A2AError::DatabaseError(format!("Failed to parse status message: {}", e))
249            })?)
250        } else {
251            None
252        };
253
254        // Parse metadata
255        let metadata =
256            if let Some(meta_str) = metadata_json {
257                Some(serde_json::from_str(&meta_str).map_err(|e| {
258                    A2AError::DatabaseError(format!("Failed to parse metadata: {}", e))
259                })?)
260            } else {
261                None
262            };
263
264        // Parse artifacts
265        let artifacts = if let Some(artifacts_str) = artifacts_json {
266            Some(serde_json::from_str(&artifacts_str).map_err(|e| {
267                A2AError::DatabaseError(format!("Failed to parse artifacts: {}", e))
268            })?)
269        } else {
270            None
271        };
272
273        let now = chrono::Utc::now();
274        let task_status = TaskStatus {
275            state: ::buffa::EnumValue::from(state),
276            message: status_message.into(),
277            timestamp: ::buffa::MessageField::some(::buffa_types::google::protobuf::Timestamp {
278                seconds: now.timestamp(),
279                nanos: now.timestamp_subsec_nanos() as i32,
280                ..Default::default()
281            }),
282            ..Default::default()
283        };
284
285        let task = Task {
286            id: task_id.clone(),
287            context_id,
288            status: ::buffa::MessageField::some(task_status),
289            history: Vec::new(),
290            metadata: metadata.into(),
291            artifacts: artifacts.unwrap_or_default(),
292            ..Default::default()
293        };
294
295        Ok(task)
296    }
297
298    /// Load task history from database
299    async fn load_task_history(
300        &self,
301        task_id: &str,
302        limit: Option<u32>,
303    ) -> Result<Vec<Message>, A2AError> {
304        let query_str = if let Some(limit) = limit {
305            format!(
306                "SELECT timestamp, status_state, message FROM task_history WHERE task_id = ? ORDER BY timestamp DESC LIMIT {}",
307                limit
308            )
309        } else {
310            "SELECT timestamp, status_state, message FROM task_history WHERE task_id = ? ORDER BY timestamp DESC".to_string()
311        };
312
313        let query = sqlx::query(&query_str);
314
315        let rows = query
316            .bind(task_id)
317            .fetch_all(&self.pool)
318            .await
319            .map_err(|e| A2AError::DatabaseError(format!("Failed to load task history: {}", e)))?;
320
321        let mut history = Vec::new();
322        for row in rows {
323            let message_json: Option<String> = row.try_get("message").map_err(|e| {
324                A2AError::DatabaseError(format!("Failed to get message from history: {}", e))
325            })?;
326
327            if let Some(msg_str) = message_json {
328                let message: Message = serde_json::from_str(&msg_str).map_err(|e| {
329                    A2AError::DatabaseError(format!("Failed to parse message from history: {}", e))
330                })?;
331                history.push(message);
332            }
333        }
334
335        // Reverse to get chronological order
336        history.reverse();
337        Ok(history)
338    }
339
340    /// Add entry to task history
341    async fn add_to_history(
342        &self,
343        task_id: &str,
344        state: TaskState,
345        message: Option<Message>,
346    ) -> Result<(), A2AError> {
347        let state_str = match state {
348            TaskState::Submitted => "submitted",
349            TaskState::Working => "working",
350            TaskState::InputRequired => "input-required",
351            TaskState::Completed => "completed",
352            TaskState::Canceled => "canceled",
353            TaskState::Failed => "failed",
354            TaskState::Rejected => "rejected",
355            TaskState::AuthRequired => "auth-required",
356            TaskState::Unknown => "unknown",
357        };
358
359        let message_json = if let Some(msg) = message {
360            Some(serde_json::to_string(&msg).map_err(|e| {
361                A2AError::DatabaseError(format!("Failed to serialize message: {}", e))
362            })?)
363        } else {
364            None
365        };
366
367        sqlx::query("INSERT INTO task_history (task_id, status_state, message) VALUES (?, ?, ?)")
368            .bind(task_id)
369            .bind(state_str)
370            .bind(message_json)
371            .execute(&self.pool)
372            .await
373            .map_err(|e| A2AError::DatabaseError(format!("Failed to add task history: {}", e)))?;
374
375        Ok(())
376    }
377
378    /// Hand out this store's push-notification registry as an
379    /// [`AsyncPushNotifier`].
380    ///
381    /// The returned notifier shares the same config registry the store writes to
382    /// via [`AsyncNotificationManager::set_config`], so a config registered on
383    /// the store is immediately visible to the notifier at the composition edge.
384    pub fn push_notifier(&self) -> Arc<dyn AsyncPushNotifier> {
385        self.push_notification_registry.clone()
386    }
387}
388
389#[cfg(feature = "sqlx-storage")]
390#[async_trait]
391impl AsyncTaskLifecycle for SqlxTaskStorage {
392    async fn create(&self, id: &TaskId, context_id: &ContextId) -> Result<Task, A2AError> {
393        let task_id = id.as_str();
394        let context_id = context_id.as_str();
395        // Check if task already exists
396        let existing = sqlx::query("SELECT id FROM tasks WHERE id = ?")
397            .bind(task_id)
398            .fetch_optional(&self.pool)
399            .await
400            .map_err(|e| {
401                A2AError::DatabaseError(format!("Failed to check existing task: {}", e))
402            })?;
403
404        if existing.is_some() {
405            return Err(A2AError::TaskNotFound(format!(
406                "Task {} already exists",
407                task_id
408            )));
409        }
410
411        // Create new task
412        let task = Task::new(task_id.to_string(), context_id.to_string());
413
414        // Convert metadata and artifacts to JSON strings
415        let metadata_json = task
416            .metadata
417            .as_option()
418            .map(|m| serde_json::to_string(m).unwrap_or_default());
419        let artifacts_json = serde_json::to_string(&task.artifacts).unwrap_or_default();
420        let status_message_str = task
421            .status
422            .as_option()
423            .and_then(|s| s.message.as_option())
424            .map(|m| serde_json::to_string(m).unwrap_or_default());
425
426        // Insert into database
427        sqlx::query("INSERT INTO tasks (id, context_id, status_state, status_message, metadata, artifacts) VALUES (?, ?, ?, ?, ?, ?)")
428            .bind(&task.id)
429            .bind(&task.context_id)
430            .bind("submitted")
431            .bind(status_message_str)
432            .bind(metadata_json)
433            .bind(artifacts_json)
434            .execute(&self.pool)
435            .await
436            .map_err(|e| A2AError::DatabaseError(format!("Failed to create task: {}", e)))?;
437
438        // Add initial history entry
439        self.add_to_history(task_id, TaskState::Submitted, None)
440            .await?;
441
442        Ok(task)
443    }
444
445    async fn update_status(
446        &self,
447        id: &TaskId,
448        state: TaskState,
449        message: Option<Message>,
450    ) -> Result<Task, A2AError> {
451        let task_id = id.as_str();
452        // Convert state to string
453        let state_str = match state {
454            TaskState::Submitted => "submitted",
455            TaskState::Working => "working",
456            TaskState::InputRequired => "input-required",
457            TaskState::Completed => "completed",
458            TaskState::Canceled => "canceled",
459            TaskState::Failed => "failed",
460            TaskState::Rejected => "rejected",
461            TaskState::AuthRequired => "auth-required",
462            TaskState::Unknown => "unknown",
463        };
464
465        // Update task in database (bump the optimistic-concurrency version)
466        let result =
467            sqlx::query("UPDATE tasks SET status_state = ?, version = version + 1 WHERE id = ?")
468                .bind(state_str)
469                .bind(task_id)
470                .execute(&self.pool)
471                .await
472                .map_err(|e| {
473                    A2AError::DatabaseError(format!("Failed to update task status: {}", e))
474                })?;
475
476        if result.rows_affected() == 0 {
477            return Err(A2AError::TaskNotFound(task_id.to_string()));
478        }
479
480        // Add to history
481        self.add_to_history(task_id, state, message).await?;
482
483        // Persistence only: announcing the change to streaming subscribers is
484        // the orchestration layer's job (see `TaskStatusBroadcast`), not a side
485        // effect of the mutator.
486        self.get(id, None).await
487    }
488
489    async fn exists(&self, id: &TaskId) -> Result<bool, A2AError> {
490        let task_id = id.as_str();
491        let row = sqlx::query("SELECT id FROM tasks WHERE id = ?")
492            .bind(task_id)
493            .fetch_optional(&self.pool)
494            .await
495            .map_err(|e| {
496                A2AError::DatabaseError(format!("Failed to check task existence: {}", e))
497            })?;
498
499        Ok(row.is_some())
500    }
501
502    async fn get(&self, id: &TaskId, history_length: Option<u32>) -> Result<Task, A2AError> {
503        let task_id = id.as_str();
504        // Get task from database
505        let row = sqlx::query("SELECT * FROM tasks WHERE id = ?")
506            .bind(task_id)
507            .fetch_optional(&self.pool)
508            .await
509            .map_err(|e| A2AError::DatabaseError(format!("Failed to get task: {}", e)))?;
510
511        let Some(row) = row else {
512            return Err(A2AError::TaskNotFound(task_id.to_string()));
513        };
514
515        let mut task = Self::row_to_task(&row)?;
516
517        // Load history
518        if history_length.is_some() || history_length.is_none() {
519            let history = self.load_task_history(task_id, history_length).await?;
520            task.history = history;
521        }
522
523        Ok(task)
524    }
525
526    async fn cancel(&self, id: &TaskId) -> Result<Task, A2AError> {
527        let task_id = id.as_str();
528        // Get current task
529        let task = self.get(id, None).await?;
530
531        // Only working tasks can be canceled
532        if task.status.state != TaskState::Working {
533            return Err(A2AError::TaskNotCancelable(format!(
534                "Task {} is in state {:?} and cannot be canceled",
535                task_id, task.status.state
536            )));
537        }
538
539        // Create a cancellation message
540        let mut cancel_message = Message::agent_text(
541            format!("Task {} canceled.", task_id),
542            uuid::Uuid::new_v4().to_string(),
543        );
544        cancel_message.task_id = task_id.to_string();
545        cancel_message.context_id = task.context_id.clone();
546
547        // Update task status (bump the optimistic-concurrency version)
548        sqlx::query("UPDATE tasks SET status_state = ?, version = version + 1 WHERE id = ?")
549            .bind("canceled")
550            .bind(task_id)
551            .execute(&self.pool)
552            .await
553            .map_err(|e| A2AError::DatabaseError(format!("Failed to cancel task: {}", e)))?;
554
555        // Add to history with cancellation message
556        self.add_to_history(task_id, TaskState::Canceled, Some(cancel_message))
557            .await?;
558
559        // Persistence only: the orchestration layer announces the cancellation
560        // to streaming subscribers (see `TaskStatusBroadcast`).
561        self.get(id, None).await
562    }
563}
564
565#[cfg(feature = "sqlx-storage")]
566impl SqlxTaskStorage {
567    /// Read the current stored version of a task, or `None` if it doesn't exist.
568    async fn current_version(&self, task_id: &str) -> Result<Option<u64>, A2AError> {
569        let row = sqlx::query("SELECT version FROM tasks WHERE id = ?")
570            .bind(task_id)
571            .fetch_optional(&self.pool)
572            .await
573            .map_err(|e| A2AError::DatabaseError(format!("Failed to read task version: {}", e)))?;
574        match row {
575            Some(row) => {
576                let v: i64 = row.try_get("version").map_err(|e| {
577                    A2AError::DatabaseError(format!("Failed to get version column: {}", e))
578                })?;
579                Ok(Some(v as u64))
580            }
581            None => Ok(None),
582        }
583    }
584}
585
586#[cfg(feature = "sqlx-storage")]
587#[async_trait]
588impl AsyncTaskVersioning for SqlxTaskStorage {
589    async fn version(&self, id: &TaskId) -> Result<u64, A2AError> {
590        self.current_version(id.as_str())
591            .await?
592            .ok_or_else(|| A2AError::TaskNotFound(id.as_str().to_string()))
593    }
594
595    async fn get_versioned(
596        &self,
597        id: &TaskId,
598        history_length: Option<u32>,
599    ) -> Result<VersionedTask, A2AError> {
600        let task = self.get(id, history_length).await?;
601        let version = self.version(id).await?;
602        Ok(VersionedTask::new(task, version))
603    }
604
605    async fn update_status_checked(
606        &self,
607        id: &TaskId,
608        expected: u64,
609        state: TaskState,
610        message: Option<Message>,
611    ) -> Result<VersionedTask, A2AError> {
612        let task_id = id.as_str();
613        let state_str = match state {
614            TaskState::Submitted => "submitted",
615            TaskState::Working => "working",
616            TaskState::InputRequired => "input-required",
617            TaskState::Completed => "completed",
618            TaskState::Canceled => "canceled",
619            TaskState::Failed => "failed",
620            TaskState::Rejected => "rejected",
621            TaskState::AuthRequired => "auth-required",
622            TaskState::Unknown => "unknown",
623        };
624
625        // Conditional update: SQLite applies it atomically, so the row count
626        // tells us whether the version matched without a separate lock.
627        let result = sqlx::query(
628            "UPDATE tasks SET status_state = ?, version = version + 1 WHERE id = ? AND version = ?",
629        )
630        .bind(state_str)
631        .bind(task_id)
632        .bind(expected as i64)
633        .execute(&self.pool)
634        .await
635        .map_err(|e| A2AError::DatabaseError(format!("Failed to update task status: {}", e)))?;
636
637        if result.rows_affected() == 0 {
638            // No row matched: either the task is gone or the version moved on.
639            return match self.current_version(task_id).await? {
640                Some(actual) => Err(A2AError::VersionConflict {
641                    id: task_id.to_string(),
642                    expected,
643                    actual,
644                }),
645                None => Err(A2AError::TaskNotFound(task_id.to_string())),
646            };
647        }
648
649        self.add_to_history(task_id, state, message).await?;
650        let task = self.get(id, None).await?;
651        Ok(VersionedTask::new(task, expected + 1))
652    }
653}
654
655#[cfg(feature = "sqlx-storage")]
656#[async_trait]
657impl AsyncTaskQuery for SqlxTaskStorage {
658    async fn list(
659        &self,
660        params: &crate::domain::ListTasksParams,
661    ) -> Result<crate::domain::ListTasksResult, A2AError> {
662        use crate::domain::ListTasksResult;
663
664        // Build WHERE clause conditions
665        let mut where_conditions = Vec::new();
666
667        // Filter by context_id
668        if params.context_id.is_some() {
669            where_conditions.push("context_id = ?".to_string());
670        }
671
672        // Filter by status
673        if params.status.is_some() {
674            where_conditions.push("status_state = ?".to_string());
675        }
676
677        // Filter by status_timestamp_after
678        let timestamp_str = if let Some(status_timestamp_after) = &params.status_timestamp_after {
679            // Parse ISO 8601 string
680            let timestamp =
681                chrono::DateTime::parse_from_rfc3339(status_timestamp_after).map_err(|e| {
682                    A2AError::DatabaseError(format!(
683                        "Invalid timestamp value: {} ({})",
684                        status_timestamp_after, e
685                    ))
686                })?;
687            where_conditions.push("updated_at >= ?".to_string());
688            Some(
689                timestamp
690                    .with_timezone(&chrono::Utc)
691                    .format("%Y-%m-%d %H:%M:%S")
692                    .to_string(),
693            )
694        } else {
695            None
696        };
697
698        // Build WHERE clause
699        let where_clause = if where_conditions.is_empty() {
700            String::new()
701        } else {
702            format!(" WHERE {}", where_conditions.join(" AND "))
703        };
704
705        // First, get total count with same filters
706        let count_query = format!("SELECT COUNT(*) as count FROM tasks{}", where_clause);
707        let mut count_q = sqlx::query(&count_query);
708
709        // Bind parameters for count query
710        if let Some(ref context_id) = params.context_id {
711            count_q = count_q.bind(context_id);
712        }
713        if let Some(ref status) = params.status {
714            let state_str = match *status {
715                crate::domain::TaskState::Submitted => "submitted",
716                crate::domain::TaskState::Working => "working",
717                crate::domain::TaskState::InputRequired => "input-required",
718                crate::domain::TaskState::Completed => "completed",
719                crate::domain::TaskState::Canceled => "canceled",
720                crate::domain::TaskState::Failed => "failed",
721                crate::domain::TaskState::Rejected => "rejected",
722                crate::domain::TaskState::AuthRequired => "auth-required",
723                crate::domain::TaskState::Unknown => "unknown",
724            };
725            count_q = count_q.bind(state_str);
726        }
727        if let Some(ref ts) = timestamp_str {
728            count_q = count_q.bind(ts);
729        }
730
731        let count_row = count_q
732            .fetch_one(&self.pool)
733            .await
734            .map_err(|e| A2AError::DatabaseError(format!("Failed to count tasks: {}", e)))?;
735
736        let total_size: i32 = count_row
737            .try_get("count")
738            .map_err(|e| A2AError::DatabaseError(format!("Failed to get count: {}", e)))?;
739
740        // Handle pagination
741        let page_size = params.page_size.unwrap_or(50).clamp(1, 100);
742        let offset = if let Some(ref token) = params.page_token {
743            token.parse::<i32>().unwrap_or(0)
744        } else {
745            0
746        };
747
748        // Build main query with LIMIT and OFFSET
749        let main_query = format!(
750            "SELECT * FROM tasks{} ORDER BY updated_at DESC LIMIT ? OFFSET ?",
751            where_clause
752        );
753
754        let mut main_q = sqlx::query(&main_query);
755
756        // Bind parameters for main query
757        if let Some(ref context_id) = params.context_id {
758            main_q = main_q.bind(context_id);
759        }
760        if let Some(ref status) = params.status {
761            let state_str = match *status {
762                crate::domain::TaskState::Submitted => "submitted",
763                crate::domain::TaskState::Working => "working",
764                crate::domain::TaskState::InputRequired => "input-required",
765                crate::domain::TaskState::Completed => "completed",
766                crate::domain::TaskState::Canceled => "canceled",
767                crate::domain::TaskState::Failed => "failed",
768                crate::domain::TaskState::Rejected => "rejected",
769                crate::domain::TaskState::AuthRequired => "auth-required",
770                crate::domain::TaskState::Unknown => "unknown",
771            };
772            main_q = main_q.bind(state_str);
773        }
774        if let Some(ref ts) = timestamp_str {
775            main_q = main_q.bind(ts);
776        }
777
778        // Bind LIMIT and OFFSET
779        main_q = main_q.bind(page_size).bind(offset);
780
781        let rows = main_q
782            .fetch_all(&self.pool)
783            .await
784            .map_err(|e| A2AError::DatabaseError(format!("Failed to list tasks: {}", e)))?;
785
786        // Convert rows to tasks
787        let mut tasks: Vec<Task> = rows
788            .iter()
789            .filter_map(|row| Self::row_to_task(row).ok())
790            .collect();
791
792        // Load history for each task if requested
793        let history_length = params.history_length.unwrap_or(0);
794        for task in &mut tasks {
795            if history_length > 0 {
796                let history = self
797                    .load_task_history(&task.id, Some(history_length as u32))
798                    .await?;
799                task.history = history;
800            } else {
801                task.history.clear();
802            }
803
804            // Remove artifacts if not requested
805            if !params.include_artifacts.unwrap_or(false) {
806                task.artifacts.clear();
807            }
808        }
809
810        // Generate next page token
811        let has_more = offset + page_size < total_size;
812        let next_page_token = if has_more {
813            (offset + page_size).to_string()
814        } else {
815            String::new()
816        };
817
818        Ok(ListTasksResult {
819            tasks,
820            total_size,
821            page_size,
822            next_page_token,
823        })
824    }
825}
826
827#[cfg(feature = "sqlx-storage")]
828#[async_trait]
829impl AsyncNotificationManager for SqlxTaskStorage {
830    async fn get_config(
831        &self,
832        params: &crate::domain::GetTaskPushNotificationConfigParams,
833    ) -> Result<crate::domain::TaskPushNotificationConfig, A2AError> {
834        // When a specific config id is supplied, filter by it; otherwise fall
835        // back to the task's config (single-config-per-task convenience, matching
836        // the in-memory adapter and the v1.0.0 single-config helpers).
837        // Note: push_notification_config_id filtering requires migration 002 to be applied.
838        let row = match params.push_notification_config_id.as_ref() {
839            Some(config_id) => sqlx::query(
840                "SELECT id, task_id, url, token, authentication FROM push_notification_configs WHERE task_id = ? AND id = ?"
841            )
842            .bind(&params.id)
843            .bind(config_id),
844            None => sqlx::query(
845                "SELECT id, task_id, url, token, authentication FROM push_notification_configs WHERE task_id = ? ORDER BY id LIMIT 1"
846            )
847            .bind(&params.id),
848        }
849        .fetch_optional(&self.pool)
850        .await
851        .map_err(|e| A2AError::DatabaseError(format!("Failed to get push config: {}", e)))?;
852
853        if let Some(row) = row {
854            let id: String = row
855                .try_get("id")
856                .map_err(|e| A2AError::DatabaseError(format!("Failed to get config id: {}", e)))?;
857            let url: String = row
858                .try_get("url")
859                .map_err(|e| A2AError::DatabaseError(format!("Failed to get url: {}", e)))?;
860            let token: Option<String> = row.try_get("token").ok();
861            let auth_json: Option<String> = row.try_get("authentication").ok();
862
863            let auth_info = if let Some(auth_str) = auth_json {
864                serde_json::from_str(&auth_str).ok()
865            } else {
866                None
867            };
868
869            Ok(crate::domain::TaskPushNotificationConfig {
870                task_id: params.id.clone(),
871                id,
872                url,
873                token: token.unwrap_or_default(),
874                authentication: auth_info.into(),
875                tenant: "".to_string(),
876                ..Default::default()
877            })
878        } else {
879            Err(A2AError::TaskNotFound(format!(
880                "Push notification config not found for task {}{}",
881                params.id,
882                params
883                    .push_notification_config_id
884                    .as_ref()
885                    .map(|id| format!(" with id {}", id))
886                    .unwrap_or_default()
887            )))
888        }
889    }
890
891    async fn list_configs(
892        &self,
893        params: &crate::domain::ListTaskPushNotificationConfigsParams,
894    ) -> Result<Vec<crate::domain::TaskPushNotificationConfig>, A2AError> {
895        // Query all configs for the task
896        let rows = sqlx::query(
897            "SELECT id, task_id, url, token, authentication FROM push_notification_configs WHERE task_id = ?"
898        )
899        .bind(&params.id)
900        .fetch_all(&self.pool)
901        .await
902        .map_err(|e| A2AError::DatabaseError(format!("Failed to list push configs: {}", e)))?;
903
904        let configs: Vec<crate::domain::TaskPushNotificationConfig> = rows
905            .iter()
906            .filter_map(|row| {
907                let id: String = row.try_get("id").ok()?;
908                let url: String = row.try_get("url").ok()?;
909                let token: Option<String> = row.try_get("token").ok().flatten();
910                let auth_json: Option<String> = row.try_get("authentication").ok().flatten();
911
912                let auth_info = if let Some(auth_str) = auth_json {
913                    serde_json::from_str(&auth_str).ok()
914                } else {
915                    None
916                };
917
918                Some(crate::domain::TaskPushNotificationConfig {
919                    task_id: params.id.clone(),
920                    id,
921                    url,
922                    token: token.unwrap_or_default(),
923                    authentication: auth_info.into(),
924                    tenant: "".to_string(),
925                    ..Default::default()
926                })
927            })
928            .collect();
929
930        Ok(configs)
931    }
932
933    async fn delete_config(
934        &self,
935        params: &crate::domain::DeleteTaskPushNotificationConfigParams,
936    ) -> Result<(), A2AError> {
937        // Delete the specific config when an id is supplied; otherwise delete all
938        // configs for the task (single-config-per-task convenience, matching the
939        // in-memory adapter).
940        let query = if params.push_notification_config_id.is_empty() {
941            sqlx::query("DELETE FROM push_notification_configs WHERE task_id = ?").bind(&params.id)
942        } else {
943            sqlx::query("DELETE FROM push_notification_configs WHERE task_id = ? AND id = ?")
944                .bind(&params.id)
945                .bind(&params.push_notification_config_id)
946        };
947        let _result = query
948            .execute(&self.pool)
949            .await
950            .map_err(|e| A2AError::DatabaseError(format!("Failed to delete push config: {}", e)))?;
951
952        // Idempotent - don't error if already deleted (v1.0.0 spec behavior)
953        Ok(())
954    }
955
956    async fn set_config(
957        &self,
958        config: &TaskPushNotificationConfig,
959    ) -> Result<TaskPushNotificationConfig, A2AError> {
960        // Generate ID if not provided
961        let config_id = if config.id.is_empty() {
962            uuid::Uuid::new_v4().to_string()
963        } else {
964            config.id.clone()
965        };
966
967        // Serialize authentication if present
968        let auth_json = config
969            .authentication
970            .as_option()
971            .map(|auth| serde_json::to_string(auth).unwrap_or_default());
972
973        // Store in database (using new schema with id, token, authentication)
974        sqlx::query(
975            "INSERT OR REPLACE INTO push_notification_configs (id, task_id, url, token, authentication) VALUES (?, ?, ?, ?, ?)",
976        )
977        .bind(&config_id)
978        .bind(&config.task_id)
979        .bind(&config.url)
980        .bind(&config.token)
981        .bind(auth_json)
982        .execute(&self.pool)
983        .await
984        .map_err(|e| {
985            A2AError::DatabaseError(format!("Failed to set push notification config: {}", e))
986        })?;
987
988        // Register with the push notification registry
989        self.push_notification_registry
990            .register(&config.task_id, config.clone())
991            .await?;
992
993        // Return config with ID set
994        let mut result_config = config.clone();
995        result_config.id = config_id;
996        Ok(result_config)
997    }
998}
999
1000#[cfg(feature = "sqlx-storage")]
1001impl Clone for SqlxTaskStorage {
1002    fn clone(&self) -> Self {
1003        Self {
1004            pool: self.pool.clone(),
1005            push_notification_registry: self.push_notification_registry.clone(),
1006        }
1007    }
1008}