Skip to main content

tap_node/storage/
db.rs

1use sqlx::sqlite::{SqlitePool, SqlitePoolOptions};
2use sqlx::Row;
3use std::env;
4use std::path::{Path, PathBuf};
5use tap_msg::didcomm::PlainMessage;
6use tracing::{debug, info};
7
8use super::error::StorageError;
9use super::models::{
10    Customer, CustomerIdentifier, CustomerRelationship, DecisionLogEntry, DecisionStatus,
11    DecisionType, Delivery, DeliveryStatus, DeliveryType, IdentifierType, Message,
12    MessageDirection, Received, ReceivedStatus, SchemaType, SourceType, Transaction,
13    TransactionStatus, TransactionType,
14};
15
16/// Storage backend for TAP transactions and message audit trail
17///
18/// This struct provides the main interface for storing and retrieving TAP data
19/// from a SQLite database. It maintains two separate tables:
20/// - `transactions`: For Transfer and Payment messages requiring business logic
21/// - `messages`: For complete audit trail of all messages
22///
23/// It uses sqlx's built-in connection pooling for efficient concurrent access
24/// and provides a native async API.
25///
26/// # Example
27///
28/// ```no_run
29/// use tap_node::storage::{Storage, MessageDirection};
30/// use std::path::PathBuf;
31///
32/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
33/// // Create storage with default path
34/// let storage = Storage::new(None).await?;
35///
36/// // Create storage with DID-based path
37/// let agent_did = "did:web:example.com";
38/// let storage_with_did = Storage::new_with_did(agent_did, None).await?;
39///
40/// // Create storage with custom TAP root
41/// let custom_root = PathBuf::from("/custom/tap/root");
42/// let storage_custom = Storage::new_with_did(agent_did, Some(custom_root)).await?;
43///
44/// // Query transactions
45/// let transactions = storage.list_transactions(10, 0).await?;
46///
47/// // Query audit trail
48/// let all_messages = storage.list_messages(20, 0, None).await?;
49/// let incoming_only = storage.list_messages(10, 0, Some(MessageDirection::Incoming)).await?;
50/// # Ok(())
51/// # }
52/// ```
53#[derive(Clone, Debug)]
54pub struct Storage {
55    pool: SqlitePool,
56    db_path: PathBuf,
57}
58
59impl Storage {
60    /// Create a new Storage instance with an agent DID
61    ///
62    /// This will initialize a SQLite database in the TAP directory structure:
63    /// - Default: ~/.tap/{did}/transactions.db
64    /// - Custom root: {tap_root}/{did}/transactions.db
65    ///
66    /// # Arguments
67    ///
68    /// * `agent_did` - The DID of the agent this storage is for
69    /// * `tap_root` - Optional custom root directory (defaults to ~/.tap)
70    ///
71    /// # Errors
72    ///
73    /// Returns `StorageError` if:
74    /// - Database initialization fails
75    /// - Migrations fail to run
76    /// - Connection pool cannot be created
77    pub async fn new_with_did(
78        agent_did: &str,
79        tap_root: Option<PathBuf>,
80    ) -> Result<Self, StorageError> {
81        let root_dir = match tap_root {
82            Some(root) => root,
83            None => {
84                if let Ok(tap_home) = env::var("TAP_HOME") {
85                    PathBuf::from(tap_home)
86                } else if let Ok(tap_root) = env::var("TAP_ROOT") {
87                    PathBuf::from(tap_root)
88                } else if let Ok(test_dir) = env::var("TAP_TEST_DIR") {
89                    PathBuf::from(test_dir).join(".tap")
90                } else {
91                    dirs::home_dir()
92                        .ok_or_else(|| {
93                            StorageError::Migration(
94                                "Could not determine home directory".to_string(),
95                            )
96                        })?
97                        .join(".tap")
98                }
99            }
100        };
101
102        // Sanitize the DID for use as a directory name (prevent path traversal)
103        let sanitized_did = agent_did.replace([':', '/', '\\'], "_").replace("..", "_");
104        let db_path = root_dir.join(&sanitized_did).join("transactions.db");
105
106        Self::new(Some(db_path)).await
107    }
108
109    /// Create a new in-memory storage instance for testing
110    /// This provides complete isolation between tests with no file system dependencies
111    pub async fn new_in_memory() -> Result<Self, StorageError> {
112        info!("Initializing in-memory storage for testing");
113
114        // Use SQLite in-memory database
115        let db_url = "sqlite://:memory:";
116
117        // Create connection pool
118        let pool = SqlitePoolOptions::new()
119            .max_connections(1) // In-memory databases don't benefit from multiple connections
120            .connect(db_url)
121            .await?;
122
123        // Run migrations
124        sqlx::migrate!("./migrations")
125            .run(&pool)
126            .await
127            .map_err(|e| StorageError::Migration(e.to_string()))?;
128
129        Ok(Storage {
130            pool,
131            db_path: PathBuf::from(":memory:"),
132        })
133    }
134
135    /// Create a new Storage instance
136    ///
137    /// This will initialize a SQLite database at the specified path (or default location),
138    /// run any pending migrations, and set up a connection pool.
139    ///
140    /// # Arguments
141    ///
142    /// * `path` - Optional path to the database file. If None, uses `TAP_NODE_DB_PATH` env var or defaults to `./tap-node.db`
143    ///
144    /// # Errors
145    ///
146    /// Returns `StorageError` if:
147    /// - Database initialization fails
148    /// - Migrations fail to run
149    /// - Connection pool cannot be created
150    pub async fn new(path: Option<PathBuf>) -> Result<Self, StorageError> {
151        let db_path = path.unwrap_or_else(|| {
152            env::var("TAP_NODE_DB_PATH")
153                .unwrap_or_else(|_| "tap-node.db".to_string())
154                .into()
155        });
156
157        info!("Initializing storage at: {:?}", db_path);
158
159        // Create parent directory if it doesn't exist
160        if let Some(parent) = db_path.parent() {
161            std::fs::create_dir_all(parent)?;
162        }
163
164        // Create connection URL for SQLite with create mode
165        let db_url = format!("sqlite://{}?mode=rwc", db_path.display());
166
167        // Create connection pool with optimizations
168        let pool = SqlitePoolOptions::new()
169            .max_connections(10)
170            .connect(&db_url)
171            .await?;
172
173        // Enable WAL mode and other optimizations
174        sqlx::query("PRAGMA journal_mode = WAL")
175            .execute(&pool)
176            .await?;
177        sqlx::query("PRAGMA synchronous = NORMAL")
178            .execute(&pool)
179            .await?;
180
181        // Run migrations
182        sqlx::migrate!("./migrations")
183            .run(&pool)
184            .await
185            .map_err(|e| StorageError::Migration(e.to_string()))?;
186
187        Ok(Storage { pool, db_path })
188    }
189
190    /// Get the database path
191    pub fn db_path(&self) -> &Path {
192        &self.db_path
193    }
194
195    /// Get the default logs directory
196    ///
197    /// Returns the default directory for log files:
198    /// - Default: ~/.tap/logs
199    /// - Custom root: {tap_root}/logs
200    ///
201    /// # Arguments
202    ///
203    /// * `tap_root` - Optional custom root directory (defaults to ~/.tap)
204    pub fn default_logs_dir(tap_root: Option<PathBuf>) -> PathBuf {
205        let root_dir = match tap_root {
206            Some(root) => root,
207            None => {
208                if let Ok(tap_home) = env::var("TAP_HOME") {
209                    PathBuf::from(tap_home)
210                } else if let Ok(tap_root) = env::var("TAP_ROOT") {
211                    PathBuf::from(tap_root)
212                } else if let Ok(test_dir) = env::var("TAP_TEST_DIR") {
213                    PathBuf::from(test_dir).join(".tap")
214                } else {
215                    dirs::home_dir()
216                        .expect("Could not determine home directory")
217                        .join(".tap")
218                }
219            }
220        };
221
222        root_dir.join("logs")
223    }
224
225    /// Update the status of a message in the messages table
226    ///
227    /// # Arguments
228    ///
229    /// * `message_id` - The ID of the message to update
230    /// * `status` - The new status (accepted, rejected, pending)
231    ///
232    /// # Errors
233    ///
234    /// Returns `StorageError` if the database update fails
235    pub async fn update_message_status(
236        &self,
237        message_id: &str,
238        status: &str,
239    ) -> Result<(), StorageError> {
240        debug!("Updating message {} status to {}", message_id, status);
241
242        sqlx::query(
243            r#"
244            UPDATE messages 
245            SET status = ?1 
246            WHERE message_id = ?2
247            "#,
248        )
249        .bind(status)
250        .bind(message_id)
251        .execute(&self.pool)
252        .await?;
253
254        Ok(())
255    }
256
257    /// Update the status of a transaction in the transactions table
258    ///
259    /// # Arguments
260    ///
261    /// * `transaction_id` - The reference ID of the transaction to update
262    /// * `status` - The new status (pending, confirmed, failed, cancelled, reverted)
263    ///
264    /// # Errors
265    ///
266    /// Returns `StorageError` if the database update fails
267    pub async fn update_transaction_status(
268        &self,
269        transaction_id: &str,
270        status: &str,
271    ) -> Result<(), StorageError> {
272        debug!(
273            "Updating transaction {} status to {}",
274            transaction_id, status
275        );
276
277        sqlx::query(
278            r#"
279            UPDATE transactions 
280            SET status = ?1 
281            WHERE reference_id = ?2
282            "#,
283        )
284        .bind(status)
285        .bind(transaction_id)
286        .execute(&self.pool)
287        .await?;
288
289        Ok(())
290    }
291
292    /// Get a transaction by its reference ID
293    ///
294    /// # Arguments
295    ///
296    /// * `reference_id` - The reference ID of the transaction
297    ///
298    /// # Returns
299    ///
300    /// * `Ok(Some(Transaction))` if found
301    /// * `Ok(None)` if not found
302    /// * `Err(StorageError)` on database error
303    pub async fn get_transaction_by_id(
304        &self,
305        reference_id: &str,
306    ) -> Result<Option<Transaction>, StorageError> {
307        let result = sqlx::query_as::<_, (
308            i64,
309            String,
310            String,
311            Option<String>,
312            Option<String>,
313            Option<String>,
314            String,
315            String,
316            serde_json::Value,
317            String,
318            String,
319        )>(
320            r#"
321            SELECT id, type, reference_id, from_did, to_did, thread_id, message_type, status, message_json, created_at, updated_at
322            FROM transactions WHERE reference_id = ?1
323            "#,
324        )
325        .bind(reference_id)
326        .fetch_optional(&self.pool)
327        .await?;
328
329        if let Some((
330            id,
331            tx_type,
332            reference_id,
333            from_did,
334            to_did,
335            thread_id,
336            message_type,
337            status,
338            message_json,
339            created_at,
340            updated_at,
341        )) = result
342        {
343            Ok(Some(Transaction {
344                id,
345                transaction_type: TransactionType::try_from(tx_type.as_str())
346                    .map_err(StorageError::InvalidTransactionType)?,
347                reference_id,
348                from_did,
349                to_did,
350                thread_id,
351                message_type,
352                status: TransactionStatus::try_from(status.as_str())
353                    .map_err(StorageError::InvalidTransactionType)?,
354                message_json,
355                created_at,
356                updated_at,
357            }))
358        } else {
359            Ok(None)
360        }
361    }
362
363    /// Get a transaction by thread ID
364    ///
365    /// # Arguments
366    ///
367    /// * `thread_id` - The thread ID to search for
368    ///
369    /// # Returns
370    ///
371    /// * `Ok(Some(Transaction))` if found
372    /// * `Ok(None)` if not found
373    /// * `Err(StorageError)` on database error
374    pub async fn get_transaction_by_thread_id(
375        &self,
376        thread_id: &str,
377    ) -> Result<Option<Transaction>, StorageError> {
378        let result = sqlx::query_as::<_, (
379            i64,
380            String,
381            String,
382            Option<String>,
383            Option<String>,
384            Option<String>,
385            String,
386            String,
387            serde_json::Value,
388            String,
389            String,
390        )>(
391            r#"
392            SELECT id, type, reference_id, from_did, to_did, thread_id, message_type, status, message_json, created_at, updated_at
393            FROM transactions WHERE thread_id = ?1
394            "#,
395        )
396        .bind(thread_id)
397        .fetch_optional(&self.pool)
398        .await?;
399
400        if let Some((
401            id,
402            tx_type,
403            reference_id,
404            from_did,
405            to_did,
406            thread_id,
407            message_type,
408            status,
409            message_json,
410            created_at,
411            updated_at,
412        )) = result
413        {
414            Ok(Some(Transaction {
415                id,
416                transaction_type: TransactionType::try_from(tx_type.as_str())
417                    .map_err(StorageError::InvalidTransactionType)?,
418                reference_id,
419                from_did,
420                to_did,
421                thread_id,
422                message_type,
423                status: TransactionStatus::try_from(status.as_str())
424                    .map_err(StorageError::InvalidTransactionType)?,
425                message_json,
426                created_at,
427                updated_at,
428            }))
429        } else {
430            Ok(None)
431        }
432    }
433
434    /// Check if an agent is authorized for a transaction
435    ///
436    /// This checks the transaction_agents table to see if the given agent
437    /// is associated with the transaction.
438    ///
439    /// # Arguments
440    ///
441    /// * `transaction_id` - The reference ID of the transaction
442    /// * `agent_did` - The DID of the agent to check
443    ///
444    /// # Returns
445    ///
446    /// * `Ok(true)` if the agent is authorized
447    /// * `Ok(false)` if the agent is not authorized or transaction doesn't exist
448    /// * `Err(StorageError)` on database error
449    pub async fn is_agent_authorized_for_transaction(
450        &self,
451        transaction_id: &str,
452        agent_did: &str,
453    ) -> Result<bool, StorageError> {
454        // First get the transaction's internal ID
455        let tx_result = sqlx::query_scalar::<_, i64>(
456            r#"
457            SELECT id FROM transactions WHERE reference_id = ?1
458            "#,
459        )
460        .bind(transaction_id)
461        .fetch_optional(&self.pool)
462        .await?;
463
464        let tx_internal_id = match tx_result {
465            Some(id) => id,
466            None => return Ok(false), // Transaction doesn't exist
467        };
468
469        // Check if agent is in transaction_agents table
470        let count: i64 = sqlx::query_scalar(
471            r#"
472            SELECT COUNT(*) FROM transaction_agents 
473            WHERE transaction_id = ?1 AND agent_did = ?2
474            "#,
475        )
476        .bind(tx_internal_id)
477        .bind(agent_did)
478        .fetch_one(&self.pool)
479        .await?;
480
481        Ok(count > 0)
482    }
483
484    /// Insert a transaction agent
485    ///
486    /// # Arguments
487    ///
488    /// * `transaction_id` - The reference ID of the transaction
489    /// * `agent_did` - The DID of the agent
490    /// * `agent_role` - The role of the agent (sender, receiver, compliance, other)
491    ///
492    /// # Returns
493    ///
494    /// * `Ok(())` on success
495    /// * `Err(StorageError)` on database error
496    pub async fn insert_transaction_agent(
497        &self,
498        transaction_id: &str,
499        agent_did: &str,
500        agent_role: &str,
501    ) -> Result<(), StorageError> {
502        // First get the transaction's internal ID
503        let tx_result = sqlx::query_scalar::<_, i64>(
504            r#"
505            SELECT id FROM transactions WHERE reference_id = ?1
506            "#,
507        )
508        .bind(transaction_id)
509        .fetch_optional(&self.pool)
510        .await?;
511
512        let tx_internal_id = match tx_result {
513            Some(id) => id,
514            None => {
515                return Err(StorageError::NotFound(format!(
516                    "Transaction {} not found",
517                    transaction_id
518                )))
519            }
520        };
521
522        // Insert the agent
523        sqlx::query(
524            r#"
525            INSERT INTO transaction_agents (transaction_id, agent_did, agent_role, status)
526            VALUES (?1, ?2, ?3, 'pending')
527            ON CONFLICT(transaction_id, agent_did) DO UPDATE SET
528                agent_role = excluded.agent_role,
529                updated_at = strftime('%Y-%m-%dT%H:%M:%SZ', 'now')
530            "#,
531        )
532        .bind(tx_internal_id)
533        .bind(agent_did)
534        .bind(agent_role)
535        .execute(&self.pool)
536        .await?;
537
538        Ok(())
539    }
540
541    /// Update transaction agent status
542    ///
543    /// # Arguments
544    ///
545    /// * `transaction_id` - The reference ID of the transaction
546    /// * `agent_did` - The DID of the agent
547    /// * `status` - The new status (pending, authorized, rejected, cancelled)
548    ///
549    /// # Returns
550    ///
551    /// * `Ok(())` on success
552    /// * `Err(StorageError)` on database error
553    pub async fn update_transaction_agent_status(
554        &self,
555        transaction_id: &str,
556        agent_did: &str,
557        status: &str,
558    ) -> Result<(), StorageError> {
559        // First get the transaction's internal ID
560        let tx_result = sqlx::query_scalar::<_, i64>(
561            r#"
562            SELECT id FROM transactions WHERE reference_id = ?1
563            "#,
564        )
565        .bind(transaction_id)
566        .fetch_optional(&self.pool)
567        .await?;
568
569        let tx_internal_id = match tx_result {
570            Some(id) => id,
571            None => {
572                return Err(StorageError::NotFound(format!(
573                    "Transaction {} not found",
574                    transaction_id
575                )))
576            }
577        };
578
579        // Update the agent status
580        let result = sqlx::query(
581            r#"
582            UPDATE transaction_agents 
583            SET status = ?1 
584            WHERE transaction_id = ?2 AND agent_did = ?3
585            "#,
586        )
587        .bind(status)
588        .bind(tx_internal_id)
589        .bind(agent_did)
590        .execute(&self.pool)
591        .await?;
592
593        if result.rows_affected() == 0 {
594            return Err(StorageError::NotFound(format!(
595                "Agent {} not found for transaction {}",
596                agent_did, transaction_id
597            )));
598        }
599
600        Ok(())
601    }
602
603    /// Get all agents for a transaction
604    ///
605    /// # Arguments
606    ///
607    /// * `transaction_id` - The reference ID of the transaction
608    ///
609    /// # Returns
610    ///
611    /// * `Ok(Vec<(agent_did, agent_role, status)>)` on success
612    /// * `Err(StorageError)` on database error
613    pub async fn get_transaction_agents(
614        &self,
615        transaction_id: &str,
616    ) -> Result<Vec<(String, String, String)>, StorageError> {
617        // First get the transaction's internal ID
618        let tx_result = sqlx::query_scalar::<_, i64>(
619            r#"
620            SELECT id FROM transactions WHERE reference_id = ?1
621            "#,
622        )
623        .bind(transaction_id)
624        .fetch_optional(&self.pool)
625        .await?;
626
627        let tx_internal_id = match tx_result {
628            Some(id) => id,
629            None => {
630                return Err(StorageError::NotFound(format!(
631                    "Transaction {} not found",
632                    transaction_id
633                )))
634            }
635        };
636
637        // Get all agents
638        let agents = sqlx::query_as::<_, (String, String, String)>(
639            r#"
640            SELECT agent_did, agent_role, status
641            FROM transaction_agents
642            WHERE transaction_id = ?1
643            ORDER BY created_at
644            "#,
645        )
646        .bind(tx_internal_id)
647        .fetch_all(&self.pool)
648        .await?;
649
650        Ok(agents)
651    }
652
653    /// Check if all agents have authorized the transaction
654    ///
655    /// # Arguments
656    ///
657    /// * `transaction_id` - The reference ID of the transaction
658    ///
659    /// # Returns
660    ///
661    /// * `Ok(true)` if all agents have authorized
662    /// * `Ok(false)` if any agent hasn't authorized or has rejected/cancelled
663    /// * `Err(StorageError)` on database error
664    pub async fn are_all_agents_authorized(
665        &self,
666        transaction_id: &str,
667    ) -> Result<bool, StorageError> {
668        // First get the transaction's internal ID
669        let tx_result = sqlx::query_scalar::<_, i64>(
670            r#"
671            SELECT id FROM transactions WHERE reference_id = ?1
672            "#,
673        )
674        .bind(transaction_id)
675        .fetch_optional(&self.pool)
676        .await?;
677
678        let tx_internal_id = match tx_result {
679            Some(id) => id,
680            None => return Ok(false), // Transaction doesn't exist
681        };
682
683        // Check if there are any agents not in 'authorized' status
684        let non_authorized_count: i64 = sqlx::query_scalar(
685            r#"
686            SELECT COUNT(*) FROM transaction_agents 
687            WHERE transaction_id = ?1 AND status != 'authorized'
688            "#,
689        )
690        .bind(tx_internal_id)
691        .fetch_one(&self.pool)
692        .await?;
693
694        // If there are no agents, transaction is ready to settle
695        // If there are agents, all must be authorized
696        Ok(non_authorized_count == 0)
697    }
698
699    /// Insert a new transaction from a TAP message
700    ///
701    /// This method extracts transaction details from a Transfer or Payment message
702    /// and stores them in the database with a 'pending' status.
703    ///
704    /// # Arguments
705    ///
706    /// * `message` - The DIDComm PlainMessage containing a Transfer or Payment body
707    ///
708    /// # Errors
709    ///
710    /// Returns `StorageError` if:
711    /// - The message is not a Transfer or Payment type
712    /// - Database insertion fails
713    /// - The transaction already exists (duplicate reference_id)
714    pub async fn insert_transaction(&self, message: &PlainMessage) -> Result<(), StorageError> {
715        let message_type = message.type_.clone();
716        let message_json = serde_json::to_value(message)?;
717
718        // Extract transaction type and use message ID as reference
719        let message_type_lower = message.type_.to_lowercase();
720        let tx_type = if message_type_lower.contains("transfer") {
721            TransactionType::Transfer
722        } else if message_type_lower.contains("payment") {
723            TransactionType::Payment
724        } else {
725            return Err(StorageError::InvalidTransactionType(
726                message_type.to_string(),
727            ));
728        };
729
730        // Use the PlainMessage ID as the reference_id since transaction_id is not serialized
731        let reference_id = message.id.clone();
732        let from_did = message.from.clone();
733        let to_did = message.to.first().cloned();
734        let thread_id = message.thid.clone();
735
736        debug!("Inserting transaction: {} ({})", reference_id, tx_type);
737
738        let result = sqlx::query(
739            r#"
740            INSERT INTO transactions (type, reference_id, from_did, to_did, thread_id, message_type, message_json)
741            VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)
742            "#,
743        )
744        .bind(tx_type.to_string())
745        .bind(&reference_id)
746        .bind(from_did)
747        .bind(to_did)
748        .bind(thread_id)
749        .bind(message_type.to_string())
750        .bind(sqlx::types::Json(message_json))
751        .execute(&self.pool)
752        .await;
753
754        match result {
755            Ok(_) => {
756                debug!("Successfully inserted transaction: {}", reference_id);
757                Ok(())
758            }
759            Err(sqlx::Error::Database(db_err)) => {
760                if db_err.message().contains("UNIQUE") {
761                    Err(StorageError::DuplicateTransaction(reference_id))
762                } else {
763                    Err(StorageError::Database(sqlx::Error::Database(db_err)))
764                }
765            }
766            Err(e) => Err(StorageError::Database(e)),
767        }
768    }
769
770    /// List transactions with pagination
771    ///
772    /// Retrieves transactions ordered by creation time (newest first).
773    ///
774    /// # Arguments
775    ///
776    /// * `limit` - Maximum number of transactions to return
777    /// * `offset` - Number of transactions to skip (for pagination)
778    ///
779    /// # Returns
780    ///
781    /// A vector of transactions ordered by creation time descending
782    pub async fn list_transactions(
783        &self,
784        limit: u32,
785        offset: u32,
786    ) -> Result<Vec<Transaction>, StorageError> {
787        let rows = sqlx::query_as::<_, (
788            i64,
789            String,
790            String,
791            Option<String>,
792            Option<String>,
793            Option<String>,
794            String,
795            String,
796            serde_json::Value,
797            String,
798            String,
799        )>(
800            r#"
801            SELECT id, type, reference_id, from_did, to_did, thread_id, message_type, status, message_json, created_at, updated_at
802            FROM transactions
803            ORDER BY created_at DESC
804            LIMIT ?1 OFFSET ?2
805            "#,
806        )
807        .bind(limit)
808        .bind(offset)
809        .fetch_all(&self.pool)
810        .await?;
811
812        let mut transactions = Vec::new();
813        for (
814            id,
815            tx_type,
816            reference_id,
817            from_did,
818            to_did,
819            thread_id,
820            message_type,
821            status,
822            message_json,
823            created_at,
824            updated_at,
825        ) in rows
826        {
827            transactions.push(Transaction {
828                id,
829                transaction_type: TransactionType::try_from(tx_type.as_str())
830                    .map_err(StorageError::InvalidTransactionType)?,
831                reference_id,
832                from_did,
833                to_did,
834                thread_id,
835                message_type,
836                status: TransactionStatus::try_from(status.as_str())
837                    .map_err(StorageError::InvalidTransactionType)?,
838                message_json,
839                created_at,
840                updated_at,
841            });
842        }
843
844        Ok(transactions)
845    }
846
847    /// Log an incoming or outgoing message to the audit trail
848    ///
849    /// This method stores any DIDComm message for audit purposes, regardless of type.
850    ///
851    /// # Arguments
852    ///
853    /// * `message` - The DIDComm PlainMessage to log
854    /// * `direction` - Whether the message is incoming or outgoing
855    ///
856    /// # Errors
857    ///
858    /// Returns `StorageError` if:
859    /// - Database insertion fails
860    /// - The message already exists (duplicate message_id)
861    pub async fn log_message(
862        &self,
863        message: &PlainMessage,
864        direction: MessageDirection,
865    ) -> Result<(), StorageError> {
866        let message_json = serde_json::to_value(message)?;
867        let message_id = message.id.clone();
868        let message_type = message.type_.clone();
869        let from_did = message.from.clone();
870        let to_did = message.to.first().cloned();
871        let thread_id = message.thid.clone();
872        let parent_thread_id = message.pthid.clone();
873
874        debug!(
875            "Logging {} message: {} ({})",
876            direction, message_id, message_type
877        );
878
879        let result = sqlx::query(
880            r#"
881            INSERT INTO messages (message_id, message_type, from_did, to_did, thread_id, parent_thread_id, direction, message_json)
882            VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)
883            "#,
884        )
885        .bind(&message_id)
886        .bind(message_type)
887        .bind(from_did)
888        .bind(to_did)
889        .bind(thread_id)
890        .bind(parent_thread_id)
891        .bind(direction.to_string())
892        .bind(sqlx::types::Json(message_json))
893        .execute(&self.pool)
894        .await;
895
896        match result {
897            Ok(_) => {
898                debug!("Successfully logged message: {}", message_id);
899                Ok(())
900            }
901            Err(sqlx::Error::Database(db_err)) => {
902                if db_err.message().contains("UNIQUE") {
903                    // Message already logged, this is fine
904                    debug!("Message already logged: {}", message_id);
905                    Ok(())
906                } else {
907                    Err(StorageError::Database(sqlx::Error::Database(db_err)))
908                }
909            }
910            Err(e) => Err(StorageError::Database(e)),
911        }
912    }
913
914    /// Retrieve a message by its ID
915    ///
916    /// # Arguments
917    ///
918    /// * `message_id` - The unique message ID
919    ///
920    /// # Returns
921    ///
922    /// * `Ok(Some(Message))` if found
923    /// * `Ok(None)` if not found
924    /// * `Err(StorageError)` on database error
925    pub async fn get_message_by_id(
926        &self,
927        message_id: &str,
928    ) -> Result<Option<Message>, StorageError> {
929        let result = sqlx::query_as::<_, (
930            i64,
931            String,
932            String,
933            Option<String>,
934            Option<String>,
935            Option<String>,
936            Option<String>,
937            String,
938            serde_json::Value,
939            String,
940        )>(
941            r#"
942            SELECT id, message_id, message_type, from_did, to_did, thread_id, parent_thread_id, direction, message_json, created_at
943            FROM messages WHERE message_id = ?1
944            "#,
945        )
946        .bind(message_id)
947        .fetch_optional(&self.pool)
948        .await?;
949
950        match result {
951            Some((
952                id,
953                message_id,
954                message_type,
955                from_did,
956                to_did,
957                thread_id,
958                parent_thread_id,
959                direction,
960                message_json,
961                created_at,
962            )) => Ok(Some(Message {
963                id,
964                message_id,
965                message_type,
966                from_did,
967                to_did,
968                thread_id,
969                parent_thread_id,
970                direction: MessageDirection::try_from(direction.as_str())
971                    .map_err(StorageError::InvalidTransactionType)?,
972                message_json,
973                created_at,
974            })),
975            None => Ok(None),
976        }
977    }
978
979    /// List messages with pagination and optional filtering
980    ///
981    /// # Arguments
982    ///
983    /// * `limit` - Maximum number of messages to return
984    /// * `offset` - Number of messages to skip (for pagination)
985    /// * `direction` - Optional filter by message direction
986    ///
987    /// # Returns
988    ///
989    /// A vector of messages ordered by creation time descending
990    pub async fn list_messages(
991        &self,
992        limit: u32,
993        offset: u32,
994        direction: Option<MessageDirection>,
995    ) -> Result<Vec<Message>, StorageError> {
996        let rows = if let Some(dir) = direction {
997            sqlx::query_as::<_, (
998                i64,
999                String,
1000                String,
1001                Option<String>,
1002                Option<String>,
1003                Option<String>,
1004                Option<String>,
1005                String,
1006                serde_json::Value,
1007                String,
1008            )>(
1009                r#"
1010                SELECT id, message_id, message_type, from_did, to_did, thread_id, parent_thread_id, direction, message_json, created_at
1011                FROM messages
1012                WHERE direction = ?1
1013                ORDER BY created_at DESC
1014                LIMIT ?2 OFFSET ?3
1015                "#,
1016            )
1017            .bind(dir.to_string())
1018            .bind(limit)
1019            .bind(offset)
1020            .fetch_all(&self.pool)
1021            .await?
1022        } else {
1023            sqlx::query_as::<_, (
1024                i64,
1025                String,
1026                String,
1027                Option<String>,
1028                Option<String>,
1029                Option<String>,
1030                Option<String>,
1031                String,
1032                serde_json::Value,
1033                String,
1034            )>(
1035                r#"
1036                SELECT id, message_id, message_type, from_did, to_did, thread_id, parent_thread_id, direction, message_json, created_at
1037                FROM messages
1038                ORDER BY created_at DESC
1039                LIMIT ?1 OFFSET ?2
1040                "#,
1041            )
1042            .bind(limit)
1043            .bind(offset)
1044            .fetch_all(&self.pool)
1045            .await?
1046        };
1047
1048        let mut messages = Vec::new();
1049        for (
1050            id,
1051            message_id,
1052            message_type,
1053            from_did,
1054            to_did,
1055            thread_id,
1056            parent_thread_id,
1057            direction,
1058            message_json,
1059            created_at,
1060        ) in rows
1061        {
1062            messages.push(Message {
1063                id,
1064                message_id,
1065                message_type,
1066                from_did,
1067                to_did,
1068                thread_id,
1069                parent_thread_id,
1070                direction: MessageDirection::try_from(direction.as_str())
1071                    .map_err(StorageError::InvalidTransactionType)?,
1072                message_json,
1073                created_at,
1074            });
1075        }
1076
1077        Ok(messages)
1078    }
1079
1080    /// Create a new delivery record
1081    ///
1082    /// # Arguments
1083    ///
1084    /// * `message_id` - The ID of the message being delivered
1085    /// * `message_text` - The full message text being delivered
1086    /// * `recipient_did` - The DID of the recipient
1087    /// * `delivery_url` - Optional URL where the message is being delivered
1088    /// * `delivery_type` - The type of delivery (https, internal, return_path, pickup)
1089    ///
1090    /// # Returns
1091    ///
1092    /// * `Ok(i64)` - The ID of the created delivery record
1093    /// * `Err(StorageError)` on database error
1094    pub async fn create_delivery(
1095        &self,
1096        message_id: &str,
1097        message_text: &str,
1098        recipient_did: &str,
1099        delivery_url: Option<&str>,
1100        delivery_type: DeliveryType,
1101    ) -> Result<i64, StorageError> {
1102        let result = sqlx::query(
1103            r#"
1104            INSERT INTO deliveries (message_id, message_text, recipient_did, delivery_url, delivery_type, status, retry_count)
1105            VALUES (?1, ?2, ?3, ?4, ?5, 'pending', 0)
1106            "#,
1107        )
1108        .bind(message_id)
1109        .bind(message_text)
1110        .bind(recipient_did)
1111        .bind(delivery_url)
1112        .bind(delivery_type.to_string())
1113        .execute(&self.pool)
1114        .await?;
1115
1116        Ok(result.last_insert_rowid())
1117    }
1118
1119    /// Update delivery status
1120    ///
1121    /// # Arguments
1122    ///
1123    /// * `delivery_id` - The ID of the delivery record
1124    /// * `status` - The new status (pending, success, failed)
1125    /// * `http_status_code` - Optional HTTP status code from delivery attempt
1126    /// * `error_message` - Optional error message if delivery failed
1127    ///
1128    /// # Returns
1129    ///
1130    /// * `Ok(())` on success
1131    /// * `Err(StorageError)` on database error
1132    pub async fn update_delivery_status(
1133        &self,
1134        delivery_id: i64,
1135        status: DeliveryStatus,
1136        http_status_code: Option<i32>,
1137        error_message: Option<&str>,
1138    ) -> Result<(), StorageError> {
1139        let now = chrono::Utc::now().to_rfc3339();
1140        let delivered_at = if status == DeliveryStatus::Success {
1141            Some(now.clone())
1142        } else {
1143            None
1144        };
1145
1146        sqlx::query(
1147            r#"
1148            UPDATE deliveries 
1149            SET status = ?1, last_http_status_code = ?2, error_message = ?3, updated_at = ?4, delivered_at = ?5
1150            WHERE id = ?6
1151            "#,
1152        )
1153        .bind(status.to_string())
1154        .bind(http_status_code)
1155        .bind(error_message)
1156        .bind(now)
1157        .bind(delivered_at)
1158        .bind(delivery_id)
1159        .execute(&self.pool)
1160        .await?;
1161
1162        Ok(())
1163    }
1164
1165    /// Increment retry count for a delivery
1166    ///
1167    /// # Arguments
1168    ///
1169    /// * `delivery_id` - The ID of the delivery record
1170    ///
1171    /// # Returns
1172    ///
1173    /// * `Ok(())` on success
1174    /// * `Err(StorageError)` on database error
1175    pub async fn increment_delivery_retry_count(
1176        &self,
1177        delivery_id: i64,
1178    ) -> Result<(), StorageError> {
1179        sqlx::query(
1180            r#"
1181            UPDATE deliveries 
1182            SET retry_count = retry_count + 1, updated_at = ?1
1183            WHERE id = ?2
1184            "#,
1185        )
1186        .bind(chrono::Utc::now().to_rfc3339())
1187        .bind(delivery_id)
1188        .execute(&self.pool)
1189        .await?;
1190
1191        Ok(())
1192    }
1193
1194    /// Get delivery record by ID
1195    ///
1196    /// # Arguments
1197    ///
1198    /// * `delivery_id` - The ID of the delivery record
1199    ///
1200    /// # Returns
1201    ///
1202    /// * `Ok(Some(Delivery))` if found
1203    /// * `Ok(None)` if not found
1204    /// * `Err(StorageError)` on database error
1205    pub async fn get_delivery_by_id(
1206        &self,
1207        delivery_id: i64,
1208    ) -> Result<Option<Delivery>, StorageError> {
1209        let result = sqlx::query_as::<
1210            _,
1211            (
1212                i64,
1213                String,
1214                String,
1215                String,
1216                Option<String>,
1217                String,
1218                String,
1219                i32,
1220                Option<i32>,
1221                Option<String>,
1222                String,
1223                String,
1224                Option<String>,
1225            ),
1226        >(
1227            r#"
1228            SELECT id, message_id, message_text, recipient_did, delivery_url, delivery_type, status, retry_count, 
1229                   last_http_status_code, error_message, created_at, updated_at, delivered_at
1230            FROM deliveries WHERE id = ?1
1231            "#,
1232        )
1233        .bind(delivery_id)
1234        .fetch_optional(&self.pool)
1235        .await?;
1236
1237        match result {
1238            Some((
1239                id,
1240                message_id,
1241                message_text,
1242                recipient_did,
1243                delivery_url,
1244                delivery_type,
1245                status,
1246                retry_count,
1247                last_http_status_code,
1248                error_message,
1249                created_at,
1250                updated_at,
1251                delivered_at,
1252            )) => Ok(Some(Delivery {
1253                id,
1254                message_id,
1255                message_text,
1256                recipient_did,
1257                delivery_url,
1258                delivery_type: DeliveryType::try_from(delivery_type.as_str())
1259                    .map_err(StorageError::InvalidTransactionType)?,
1260                status: DeliveryStatus::try_from(status.as_str())
1261                    .map_err(StorageError::InvalidTransactionType)?,
1262                retry_count,
1263                last_http_status_code,
1264                error_message,
1265                created_at,
1266                updated_at,
1267                delivered_at,
1268            })),
1269            None => Ok(None),
1270        }
1271    }
1272
1273    /// Get all deliveries for a message
1274    ///
1275    /// # Arguments
1276    ///
1277    /// * `message_id` - The ID of the message
1278    ///
1279    /// # Returns
1280    ///
1281    /// * `Ok(Vec<Delivery>)` - List of deliveries for the message
1282    /// * `Err(StorageError)` on database error
1283    pub async fn get_deliveries_for_message(
1284        &self,
1285        message_id: &str,
1286    ) -> Result<Vec<Delivery>, StorageError> {
1287        let rows = sqlx::query_as::<
1288            _,
1289            (
1290                i64,
1291                String,
1292                String,
1293                String,
1294                Option<String>,
1295                String,
1296                String,
1297                i32,
1298                Option<i32>,
1299                Option<String>,
1300                String,
1301                String,
1302                Option<String>,
1303            ),
1304        >(
1305            r#"
1306            SELECT id, message_id, message_text, recipient_did, delivery_url, delivery_type, status, retry_count, 
1307                   last_http_status_code, error_message, created_at, updated_at, delivered_at
1308            FROM deliveries WHERE message_id = ?1
1309            ORDER BY created_at ASC
1310            "#,
1311        )
1312        .bind(message_id)
1313        .fetch_all(&self.pool)
1314        .await?;
1315
1316        let mut deliveries = Vec::new();
1317        for (
1318            id,
1319            message_id,
1320            message_text,
1321            recipient_did,
1322            delivery_url,
1323            delivery_type,
1324            status,
1325            retry_count,
1326            last_http_status_code,
1327            error_message,
1328            created_at,
1329            updated_at,
1330            delivered_at,
1331        ) in rows
1332        {
1333            deliveries.push(Delivery {
1334                id,
1335                message_id,
1336                message_text,
1337                recipient_did,
1338                delivery_url,
1339                delivery_type: DeliveryType::try_from(delivery_type.as_str())
1340                    .map_err(StorageError::InvalidTransactionType)?,
1341                status: DeliveryStatus::try_from(status.as_str())
1342                    .map_err(StorageError::InvalidTransactionType)?,
1343                retry_count,
1344                last_http_status_code,
1345                error_message,
1346                created_at,
1347                updated_at,
1348                delivered_at,
1349            });
1350        }
1351
1352        Ok(deliveries)
1353    }
1354
1355    /// Get pending deliveries for retry processing
1356    ///
1357    /// # Arguments
1358    ///
1359    /// * `max_retry_count` - Maximum retry count to include
1360    /// * `limit` - Maximum number of deliveries to return
1361    ///
1362    /// # Returns
1363    ///
1364    /// * `Ok(Vec<Delivery>)` - List of pending deliveries
1365    /// * `Err(StorageError)` on database error
1366    pub async fn get_pending_deliveries(
1367        &self,
1368        max_retry_count: i32,
1369        limit: u32,
1370    ) -> Result<Vec<Delivery>, StorageError> {
1371        let rows = sqlx::query_as::<
1372            _,
1373            (
1374                i64,
1375                String,
1376                String,
1377                String,
1378                Option<String>,
1379                String,
1380                String,
1381                i32,
1382                Option<i32>,
1383                Option<String>,
1384                String,
1385                String,
1386                Option<String>,
1387            ),
1388        >(
1389            r#"
1390            SELECT id, message_id, message_text, recipient_did, delivery_url, delivery_type, status, retry_count, 
1391                   last_http_status_code, error_message, created_at, updated_at, delivered_at
1392            FROM deliveries 
1393            WHERE status = 'pending' AND retry_count < ?1
1394            ORDER BY created_at ASC
1395            LIMIT ?2
1396            "#,
1397        )
1398        .bind(max_retry_count)
1399        .bind(limit)
1400        .fetch_all(&self.pool)
1401        .await?;
1402
1403        let mut deliveries = Vec::new();
1404        for (
1405            id,
1406            message_id,
1407            message_text,
1408            recipient_did,
1409            delivery_url,
1410            delivery_type,
1411            status,
1412            retry_count,
1413            last_http_status_code,
1414            error_message,
1415            created_at,
1416            updated_at,
1417            delivered_at,
1418        ) in rows
1419        {
1420            deliveries.push(Delivery {
1421                id,
1422                message_id,
1423                message_text,
1424                recipient_did,
1425                delivery_url,
1426                delivery_type: DeliveryType::try_from(delivery_type.as_str())
1427                    .map_err(StorageError::InvalidTransactionType)?,
1428                status: DeliveryStatus::try_from(status.as_str())
1429                    .map_err(StorageError::InvalidTransactionType)?,
1430                retry_count,
1431                last_http_status_code,
1432                error_message,
1433                created_at,
1434                updated_at,
1435                delivered_at,
1436            });
1437        }
1438
1439        Ok(deliveries)
1440    }
1441
1442    /// Get failed deliveries for a specific recipient
1443    ///
1444    /// # Arguments
1445    ///
1446    /// * `recipient_did` - The DID of the recipient
1447    /// * `limit` - Maximum number of deliveries to return
1448    /// * `offset` - Number of deliveries to skip (for pagination)
1449    ///
1450    /// # Returns
1451    ///
1452    /// * `Ok(Vec<Delivery>)` - List of failed deliveries
1453    /// * `Err(StorageError)` on database error
1454    pub async fn get_failed_deliveries_for_recipient(
1455        &self,
1456        recipient_did: &str,
1457        limit: u32,
1458        offset: u32,
1459    ) -> Result<Vec<Delivery>, StorageError> {
1460        let rows = sqlx::query_as::<
1461            _,
1462            (
1463                i64,
1464                String,
1465                String,
1466                String,
1467                Option<String>,
1468                String,
1469                String,
1470                i32,
1471                Option<i32>,
1472                Option<String>,
1473                String,
1474                String,
1475                Option<String>,
1476            ),
1477        >(
1478            r#"
1479            SELECT id, message_id, message_text, recipient_did, delivery_url, delivery_type, status, retry_count, 
1480                   last_http_status_code, error_message, created_at, updated_at, delivered_at
1481            FROM deliveries 
1482            WHERE recipient_did = ?1 AND status = 'failed'
1483            ORDER BY updated_at DESC
1484            LIMIT ?2 OFFSET ?3
1485            "#,
1486        )
1487        .bind(recipient_did)
1488        .bind(limit)
1489        .bind(offset)
1490        .fetch_all(&self.pool)
1491        .await?;
1492
1493        let mut deliveries = Vec::new();
1494        for (
1495            id,
1496            message_id,
1497            message_text,
1498            recipient_did,
1499            delivery_url,
1500            delivery_type,
1501            status,
1502            retry_count,
1503            last_http_status_code,
1504            error_message,
1505            created_at,
1506            updated_at,
1507            delivered_at,
1508        ) in rows
1509        {
1510            deliveries.push(Delivery {
1511                id,
1512                message_id,
1513                message_text,
1514                recipient_did,
1515                delivery_url,
1516                delivery_type: DeliveryType::try_from(delivery_type.as_str())
1517                    .map_err(StorageError::InvalidTransactionType)?,
1518                status: DeliveryStatus::try_from(status.as_str())
1519                    .map_err(StorageError::InvalidTransactionType)?,
1520                retry_count,
1521                last_http_status_code,
1522                error_message,
1523                created_at,
1524                updated_at,
1525                delivered_at,
1526            });
1527        }
1528
1529        Ok(deliveries)
1530    }
1531
1532    /// Get all deliveries for a specific recipient
1533    ///
1534    /// # Arguments
1535    ///
1536    /// * `recipient_did` - The DID of the recipient
1537    /// * `limit` - Maximum number of deliveries to return
1538    /// * `offset` - Number of deliveries to skip (for pagination)
1539    ///
1540    /// # Returns
1541    ///
1542    /// * `Ok(Vec<Delivery>)` - List of deliveries
1543    /// * `Err(StorageError)` on database error
1544    pub async fn get_deliveries_by_recipient(
1545        &self,
1546        recipient_did: &str,
1547        limit: u32,
1548        offset: u32,
1549    ) -> Result<Vec<Delivery>, StorageError> {
1550        let rows = sqlx::query_as::<
1551            _,
1552            (
1553                i64,
1554                String,
1555                String,
1556                String,
1557                Option<String>,
1558                String,
1559                String,
1560                i32,
1561                Option<i32>,
1562                Option<String>,
1563                String,
1564                String,
1565                Option<String>,
1566            ),
1567        >(
1568            r#"
1569            SELECT id, message_id, message_text, recipient_did, delivery_url, delivery_type, status, retry_count, 
1570                   last_http_status_code, error_message, created_at, updated_at, delivered_at
1571            FROM deliveries 
1572            WHERE recipient_did = ?1
1573            ORDER BY created_at DESC
1574            LIMIT ?2 OFFSET ?3
1575            "#,
1576        )
1577        .bind(recipient_did)
1578        .bind(limit)
1579        .bind(offset)
1580        .fetch_all(&self.pool)
1581        .await?;
1582
1583        let mut deliveries = Vec::new();
1584        for (
1585            id,
1586            message_id,
1587            message_text,
1588            recipient_did,
1589            delivery_url,
1590            delivery_type,
1591            status,
1592            retry_count,
1593            last_http_status_code,
1594            error_message,
1595            created_at,
1596            updated_at,
1597            delivered_at,
1598        ) in rows
1599        {
1600            deliveries.push(Delivery {
1601                id,
1602                message_id,
1603                message_text,
1604                recipient_did,
1605                delivery_url,
1606                delivery_type: delivery_type
1607                    .parse::<DeliveryType>()
1608                    .unwrap_or(DeliveryType::Internal),
1609                status: status
1610                    .parse::<DeliveryStatus>()
1611                    .unwrap_or(DeliveryStatus::Pending),
1612                retry_count,
1613                last_http_status_code,
1614                error_message,
1615                created_at,
1616                updated_at,
1617                delivered_at,
1618            });
1619        }
1620
1621        Ok(deliveries)
1622    }
1623
1624    /// Get all deliveries for messages in a specific thread
1625    ///
1626    /// # Arguments
1627    ///
1628    /// * `thread_id` - The thread ID to search for
1629    /// * `limit` - Maximum number of deliveries to return
1630    /// * `offset` - Number of deliveries to skip (for pagination)
1631    ///
1632    /// # Returns
1633    ///
1634    /// * `Ok(Vec<Delivery>)` - List of deliveries for messages in the thread
1635    /// * `Err(StorageError)` on database error
1636    pub async fn get_deliveries_for_thread(
1637        &self,
1638        thread_id: &str,
1639        limit: u32,
1640        offset: u32,
1641    ) -> Result<Vec<Delivery>, StorageError> {
1642        let rows = sqlx::query_as::<
1643            _,
1644            (
1645                i64,
1646                String,
1647                String,
1648                String,
1649                Option<String>,
1650                String,
1651                String,
1652                i32,
1653                Option<i32>,
1654                Option<String>,
1655                String,
1656                String,
1657                Option<String>,
1658            ),
1659        >(
1660            r#"
1661            SELECT d.id, d.message_id, d.message_text, d.recipient_did, d.delivery_url, 
1662                   d.delivery_type, d.status, d.retry_count, d.last_http_status_code, 
1663                   d.error_message, d.created_at, d.updated_at, d.delivered_at
1664            FROM deliveries d
1665            INNER JOIN messages m ON d.message_id = m.message_id
1666            WHERE m.thread_id = ?1
1667            ORDER BY d.created_at ASC
1668            LIMIT ?2 OFFSET ?3
1669            "#,
1670        )
1671        .bind(thread_id)
1672        .bind(limit)
1673        .bind(offset)
1674        .fetch_all(&self.pool)
1675        .await?;
1676
1677        let mut deliveries = Vec::new();
1678        for (
1679            id,
1680            message_id,
1681            message_text,
1682            recipient_did,
1683            delivery_url,
1684            delivery_type,
1685            status,
1686            retry_count,
1687            last_http_status_code,
1688            error_message,
1689            created_at,
1690            updated_at,
1691            delivered_at,
1692        ) in rows
1693        {
1694            deliveries.push(Delivery {
1695                id,
1696                message_id,
1697                message_text,
1698                recipient_did,
1699                delivery_url,
1700                delivery_type: delivery_type
1701                    .parse::<DeliveryType>()
1702                    .unwrap_or(DeliveryType::Internal),
1703                status: status
1704                    .parse::<DeliveryStatus>()
1705                    .unwrap_or(DeliveryStatus::Pending),
1706                retry_count,
1707                last_http_status_code,
1708                error_message,
1709                created_at,
1710                updated_at,
1711                delivered_at,
1712            });
1713        }
1714
1715        Ok(deliveries)
1716    }
1717
1718    /// Create a new received message record
1719    ///
1720    /// This records a raw incoming message (JWE, JWS, or plain JSON) before processing.
1721    ///
1722    /// # Arguments
1723    ///
1724    /// * `raw_message` - The raw message content as received
1725    /// * `source_type` - The type of source (https, internal, websocket, etc.)
1726    /// * `source_identifier` - Optional identifier for the source (URL, agent DID, etc.)
1727    ///
1728    /// # Returns
1729    ///
1730    /// * `Ok(i64)` - The ID of the created record
1731    /// * `Err(StorageError)` on database error
1732    pub async fn create_received(
1733        &self,
1734        raw_message: &str,
1735        source_type: SourceType,
1736        source_identifier: Option<&str>,
1737    ) -> Result<i64, StorageError> {
1738        // Try to extract message ID from the raw message
1739        let message_id =
1740            if let Ok(json_value) = serde_json::from_str::<serde_json::Value>(raw_message) {
1741                json_value
1742                    .get("id")
1743                    .and_then(|v| v.as_str())
1744                    .map(|s| s.to_string())
1745            } else {
1746                None
1747            };
1748
1749        let result = sqlx::query(
1750            r#"
1751            INSERT INTO received (message_id, raw_message, source_type, source_identifier)
1752            VALUES (?1, ?2, ?3, ?4)
1753            "#,
1754        )
1755        .bind(message_id)
1756        .bind(raw_message)
1757        .bind(source_type.to_string())
1758        .bind(source_identifier)
1759        .execute(&self.pool)
1760        .await?;
1761
1762        Ok(result.last_insert_rowid())
1763    }
1764
1765    /// Update the status of a received message
1766    ///
1767    /// # Arguments
1768    ///
1769    /// * `received_id` - The ID of the received record
1770    /// * `status` - The new status (processed, failed)
1771    /// * `processed_message_id` - Optional ID of the processed message in the messages table
1772    /// * `error_message` - Optional error message if processing failed
1773    ///
1774    /// # Returns
1775    ///
1776    /// * `Ok(())` on success
1777    /// * `Err(StorageError)` on database error
1778    pub async fn update_received_status(
1779        &self,
1780        received_id: i64,
1781        status: ReceivedStatus,
1782        processed_message_id: Option<&str>,
1783        error_message: Option<&str>,
1784    ) -> Result<(), StorageError> {
1785        let now = chrono::Utc::now().to_rfc3339();
1786
1787        sqlx::query(
1788            r#"
1789            UPDATE received 
1790            SET status = ?1, processed_at = ?2, processed_message_id = ?3, error_message = ?4
1791            WHERE id = ?5
1792            "#,
1793        )
1794        .bind(status.to_string())
1795        .bind(&now)
1796        .bind(processed_message_id)
1797        .bind(error_message)
1798        .bind(received_id)
1799        .execute(&self.pool)
1800        .await?;
1801
1802        Ok(())
1803    }
1804
1805    /// Get a received message by ID
1806    ///
1807    /// # Arguments
1808    ///
1809    /// * `received_id` - The ID of the received record
1810    ///
1811    /// # Returns
1812    ///
1813    /// * `Ok(Some(Received))` if found
1814    /// * `Ok(None)` if not found
1815    /// * `Err(StorageError)` on database error
1816    pub async fn get_received_by_id(
1817        &self,
1818        received_id: i64,
1819    ) -> Result<Option<Received>, StorageError> {
1820        let result = sqlx::query_as::<
1821            _,
1822            (
1823                i64,
1824                Option<String>,
1825                String,
1826                String,
1827                Option<String>,
1828                String,
1829                Option<String>,
1830                String,
1831                Option<String>,
1832                Option<String>,
1833            ),
1834        >(
1835            r#"
1836            SELECT id, message_id, raw_message, source_type, source_identifier, 
1837                   status, error_message, received_at, processed_at, processed_message_id
1838            FROM received WHERE id = ?1
1839            "#,
1840        )
1841        .bind(received_id)
1842        .fetch_optional(&self.pool)
1843        .await?;
1844
1845        match result {
1846            Some((
1847                id,
1848                message_id,
1849                raw_message,
1850                source_type,
1851                source_identifier,
1852                status,
1853                error_message,
1854                received_at,
1855                processed_at,
1856                processed_message_id,
1857            )) => Ok(Some(Received {
1858                id,
1859                message_id,
1860                raw_message,
1861                source_type: SourceType::try_from(source_type.as_str())
1862                    .map_err(StorageError::InvalidTransactionType)?,
1863                source_identifier,
1864                status: ReceivedStatus::try_from(status.as_str())
1865                    .map_err(StorageError::InvalidTransactionType)?,
1866                error_message,
1867                received_at,
1868                processed_at,
1869                processed_message_id,
1870            })),
1871            None => Ok(None),
1872        }
1873    }
1874
1875    /// Get pending received messages for processing
1876    ///
1877    /// # Arguments
1878    ///
1879    /// * `limit` - Maximum number of messages to return
1880    ///
1881    /// # Returns
1882    ///
1883    /// * `Ok(Vec<Received>)` - List of pending received messages
1884    /// * `Err(StorageError)` on database error
1885    pub async fn get_pending_received(&self, limit: u32) -> Result<Vec<Received>, StorageError> {
1886        let rows = sqlx::query_as::<
1887            _,
1888            (
1889                i64,
1890                Option<String>,
1891                String,
1892                String,
1893                Option<String>,
1894                String,
1895                Option<String>,
1896                String,
1897                Option<String>,
1898                Option<String>,
1899            ),
1900        >(
1901            r#"
1902            SELECT id, message_id, raw_message, source_type, source_identifier, 
1903                   status, error_message, received_at, processed_at, processed_message_id
1904            FROM received 
1905            WHERE status = 'pending'
1906            ORDER BY received_at ASC
1907            LIMIT ?1
1908            "#,
1909        )
1910        .bind(limit)
1911        .fetch_all(&self.pool)
1912        .await?;
1913
1914        let mut received_messages = Vec::new();
1915        for (
1916            id,
1917            message_id,
1918            raw_message,
1919            source_type,
1920            source_identifier,
1921            status,
1922            error_message,
1923            received_at,
1924            processed_at,
1925            processed_message_id,
1926        ) in rows
1927        {
1928            received_messages.push(Received {
1929                id,
1930                message_id,
1931                raw_message,
1932                source_type: SourceType::try_from(source_type.as_str())
1933                    .map_err(StorageError::InvalidTransactionType)?,
1934                source_identifier,
1935                status: ReceivedStatus::try_from(status.as_str())
1936                    .map_err(StorageError::InvalidTransactionType)?,
1937                error_message,
1938                received_at,
1939                processed_at,
1940                processed_message_id,
1941            });
1942        }
1943
1944        Ok(received_messages)
1945    }
1946
1947    /// List received messages with optional filtering
1948    ///
1949    /// # Arguments
1950    ///
1951    /// * `limit` - Maximum number of messages to return
1952    /// * `offset` - Number of messages to skip (for pagination)
1953    /// * `source_type` - Optional filter by source type
1954    /// * `status` - Optional filter by status
1955    ///
1956    /// # Returns
1957    ///
1958    /// * `Ok(Vec<Received>)` - List of received messages
1959    /// * `Err(StorageError)` on database error
1960    pub async fn list_received(
1961        &self,
1962        limit: u32,
1963        offset: u32,
1964        source_type: Option<SourceType>,
1965        status: Option<ReceivedStatus>,
1966    ) -> Result<Vec<Received>, StorageError> {
1967        let mut query = "SELECT id, message_id, raw_message, source_type, source_identifier, status, error_message, received_at, processed_at, processed_message_id FROM received WHERE 1=1".to_string();
1968        let mut bind_values: Vec<String> = Vec::new();
1969
1970        if let Some(st) = source_type {
1971            query.push_str(" AND source_type = ?");
1972            bind_values.push(st.to_string());
1973        }
1974
1975        if let Some(s) = status {
1976            query.push_str(" AND status = ?");
1977            bind_values.push(s.to_string());
1978        }
1979
1980        query.push_str(" ORDER BY received_at DESC LIMIT ? OFFSET ?");
1981
1982        // Build the query dynamically based on filters
1983        let mut sqlx_query = sqlx::query_as::<
1984            _,
1985            (
1986                i64,
1987                Option<String>,
1988                String,
1989                String,
1990                Option<String>,
1991                String,
1992                Option<String>,
1993                String,
1994                Option<String>,
1995                Option<String>,
1996            ),
1997        >(&query);
1998
1999        for value in bind_values {
2000            sqlx_query = sqlx_query.bind(value);
2001        }
2002
2003        let rows = sqlx_query
2004            .bind(limit)
2005            .bind(offset)
2006            .fetch_all(&self.pool)
2007            .await?;
2008
2009        let mut received_messages = Vec::new();
2010        for (
2011            id,
2012            message_id,
2013            raw_message,
2014            source_type,
2015            source_identifier,
2016            status,
2017            error_message,
2018            received_at,
2019            processed_at,
2020            processed_message_id,
2021        ) in rows
2022        {
2023            received_messages.push(Received {
2024                id,
2025                message_id,
2026                raw_message,
2027                source_type: SourceType::try_from(source_type.as_str())
2028                    .map_err(StorageError::InvalidTransactionType)?,
2029                source_identifier,
2030                status: ReceivedStatus::try_from(status.as_str())
2031                    .map_err(StorageError::InvalidTransactionType)?,
2032                error_message,
2033                received_at,
2034                processed_at,
2035                processed_message_id,
2036            });
2037        }
2038
2039        Ok(received_messages)
2040    }
2041
2042    // Customer Management Methods
2043
2044    /// Create or update a customer record
2045    pub async fn upsert_customer(&self, customer: &Customer) -> Result<(), StorageError> {
2046        sqlx::query(
2047            r#"
2048            INSERT INTO customers (
2049                id, agent_did, schema_type, given_name, family_name, display_name,
2050                legal_name, lei_code, mcc_code, address_country, address_locality,
2051                postal_code, street_address, profile, ivms101_data, verified_at,
2052                created_at, updated_at
2053            ) VALUES (
2054                ?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16, ?17, ?18
2055            ) ON CONFLICT(id) DO UPDATE SET
2056                agent_did = excluded.agent_did,
2057                schema_type = excluded.schema_type,
2058                given_name = excluded.given_name,
2059                family_name = excluded.family_name,
2060                display_name = excluded.display_name,
2061                legal_name = excluded.legal_name,
2062                lei_code = excluded.lei_code,
2063                mcc_code = excluded.mcc_code,
2064                address_country = excluded.address_country,
2065                address_locality = excluded.address_locality,
2066                postal_code = excluded.postal_code,
2067                street_address = excluded.street_address,
2068                profile = excluded.profile,
2069                ivms101_data = excluded.ivms101_data,
2070                verified_at = excluded.verified_at,
2071                updated_at = excluded.updated_at
2072            "#,
2073        )
2074        .bind(&customer.id)
2075        .bind(&customer.agent_did)
2076        .bind(customer.schema_type.to_string())
2077        .bind(&customer.given_name)
2078        .bind(&customer.family_name)
2079        .bind(&customer.display_name)
2080        .bind(&customer.legal_name)
2081        .bind(&customer.lei_code)
2082        .bind(&customer.mcc_code)
2083        .bind(&customer.address_country)
2084        .bind(&customer.address_locality)
2085        .bind(&customer.postal_code)
2086        .bind(&customer.street_address)
2087        .bind(serde_json::to_string(&customer.profile)?)
2088        .bind(
2089            customer
2090                .ivms101_data
2091                .as_ref()
2092                .map(serde_json::to_string)
2093                .transpose()?,
2094        )
2095        .bind(&customer.verified_at)
2096        .bind(&customer.created_at)
2097        .bind(&customer.updated_at)
2098        .execute(&self.pool)
2099        .await?;
2100
2101        Ok(())
2102    }
2103
2104    /// Get a customer by ID
2105    pub async fn get_customer(&self, customer_id: &str) -> Result<Option<Customer>, StorageError> {
2106        let row = sqlx::query(
2107            r#"
2108            SELECT id, agent_did, schema_type, given_name, family_name, display_name,
2109                   legal_name, lei_code, mcc_code, address_country, address_locality,
2110                   postal_code, street_address, profile, ivms101_data, verified_at,
2111                   created_at, updated_at
2112            FROM customers
2113            WHERE id = ?1
2114            "#,
2115        )
2116        .bind(customer_id)
2117        .fetch_optional(&self.pool)
2118        .await?;
2119
2120        match row {
2121            Some(row) => Ok(Some(Customer {
2122                id: row.get("id"),
2123                agent_did: row.get("agent_did"),
2124                schema_type: SchemaType::try_from(row.get::<String, _>("schema_type").as_str())
2125                    .map_err(StorageError::InvalidTransactionType)?,
2126                given_name: row.get("given_name"),
2127                family_name: row.get("family_name"),
2128                display_name: row.get("display_name"),
2129                legal_name: row.get("legal_name"),
2130                lei_code: row.get("lei_code"),
2131                mcc_code: row.get("mcc_code"),
2132                address_country: row.get("address_country"),
2133                address_locality: row.get("address_locality"),
2134                postal_code: row.get("postal_code"),
2135                street_address: row.get("street_address"),
2136                profile: serde_json::from_str(&row.get::<String, _>("profile"))?,
2137                ivms101_data: row
2138                    .get::<Option<String>, _>("ivms101_data")
2139                    .map(|v| serde_json::from_str(&v))
2140                    .transpose()?,
2141                verified_at: row.get("verified_at"),
2142                created_at: row.get("created_at"),
2143                updated_at: row.get("updated_at"),
2144            })),
2145            None => Ok(None),
2146        }
2147    }
2148
2149    /// Get a customer by identifier
2150    pub async fn get_customer_by_identifier(
2151        &self,
2152        identifier: &str,
2153    ) -> Result<Option<Customer>, StorageError> {
2154        let row = sqlx::query_as::<_, (String,)>(
2155            r#"
2156            SELECT customer_id
2157            FROM customer_identifiers
2158            WHERE id = ?1
2159            "#,
2160        )
2161        .bind(identifier)
2162        .fetch_optional(&self.pool)
2163        .await?;
2164
2165        match row {
2166            Some((customer_id,)) => self.get_customer(&customer_id).await,
2167            None => Ok(None),
2168        }
2169    }
2170
2171    /// List customers for an agent
2172    pub async fn list_customers(
2173        &self,
2174        agent_did: &str,
2175        limit: u32,
2176        offset: u32,
2177    ) -> Result<Vec<Customer>, StorageError> {
2178        let rows = sqlx::query(
2179            r#"
2180            SELECT id, agent_did, schema_type, given_name, family_name, display_name,
2181                   legal_name, lei_code, mcc_code, address_country, address_locality,
2182                   postal_code, street_address, profile, ivms101_data, verified_at,
2183                   created_at, updated_at
2184            FROM customers
2185            WHERE agent_did = ?1
2186            ORDER BY updated_at DESC
2187            LIMIT ?2 OFFSET ?3
2188            "#,
2189        )
2190        .bind(agent_did)
2191        .bind(limit)
2192        .bind(offset)
2193        .fetch_all(&self.pool)
2194        .await?;
2195
2196        let mut customers = Vec::new();
2197        for row in rows {
2198            customers.push(Customer {
2199                id: row.get("id"),
2200                agent_did: row.get("agent_did"),
2201                schema_type: SchemaType::try_from(row.get::<String, _>("schema_type").as_str())
2202                    .map_err(StorageError::InvalidTransactionType)?,
2203                given_name: row.get("given_name"),
2204                family_name: row.get("family_name"),
2205                display_name: row.get("display_name"),
2206                legal_name: row.get("legal_name"),
2207                lei_code: row.get("lei_code"),
2208                mcc_code: row.get("mcc_code"),
2209                address_country: row.get("address_country"),
2210                address_locality: row.get("address_locality"),
2211                postal_code: row.get("postal_code"),
2212                street_address: row.get("street_address"),
2213                profile: serde_json::from_str(&row.get::<String, _>("profile"))?,
2214                ivms101_data: row
2215                    .get::<Option<String>, _>("ivms101_data")
2216                    .map(|v| serde_json::from_str(&v))
2217                    .transpose()?,
2218                verified_at: row.get("verified_at"),
2219                created_at: row.get("created_at"),
2220                updated_at: row.get("updated_at"),
2221            });
2222        }
2223
2224        Ok(customers)
2225    }
2226
2227    /// Add an identifier to a customer
2228    pub async fn add_customer_identifier(
2229        &self,
2230        identifier: &CustomerIdentifier,
2231    ) -> Result<(), StorageError> {
2232        sqlx::query(
2233            r#"
2234            INSERT INTO customer_identifiers (
2235                id, customer_id, identifier_type, verified, verification_method,
2236                verified_at, created_at
2237            ) VALUES (
2238                ?1, ?2, ?3, ?4, ?5, ?6, ?7
2239            ) ON CONFLICT(id, customer_id) DO UPDATE SET
2240                verified = excluded.verified,
2241                verification_method = excluded.verification_method,
2242                verified_at = excluded.verified_at
2243            "#,
2244        )
2245        .bind(&identifier.id)
2246        .bind(&identifier.customer_id)
2247        .bind(identifier.identifier_type.to_string())
2248        .bind(identifier.verified)
2249        .bind(&identifier.verification_method)
2250        .bind(&identifier.verified_at)
2251        .bind(&identifier.created_at)
2252        .execute(&self.pool)
2253        .await?;
2254
2255        Ok(())
2256    }
2257
2258    /// Get identifiers for a customer
2259    pub async fn get_customer_identifiers(
2260        &self,
2261        customer_id: &str,
2262    ) -> Result<Vec<CustomerIdentifier>, StorageError> {
2263        let rows = sqlx::query_as::<
2264            _,
2265            (
2266                String,
2267                String,
2268                String,
2269                bool,
2270                Option<String>,
2271                Option<String>,
2272                String,
2273            ),
2274        >(
2275            r#"
2276            SELECT id, customer_id, identifier_type, verified, verification_method,
2277                   verified_at, created_at
2278            FROM customer_identifiers
2279            WHERE customer_id = ?1
2280            "#,
2281        )
2282        .bind(customer_id)
2283        .fetch_all(&self.pool)
2284        .await?;
2285
2286        let mut identifiers = Vec::new();
2287        for (
2288            id,
2289            customer_id,
2290            identifier_type,
2291            verified,
2292            verification_method,
2293            verified_at,
2294            created_at,
2295        ) in rows
2296        {
2297            identifiers.push(CustomerIdentifier {
2298                id,
2299                customer_id,
2300                identifier_type: IdentifierType::try_from(identifier_type.as_str())
2301                    .map_err(StorageError::InvalidTransactionType)?,
2302                verified,
2303                verification_method,
2304                verified_at,
2305                created_at,
2306            });
2307        }
2308
2309        Ok(identifiers)
2310    }
2311
2312    /// Add a customer relationship
2313    pub async fn add_customer_relationship(
2314        &self,
2315        relationship: &CustomerRelationship,
2316    ) -> Result<(), StorageError> {
2317        sqlx::query(
2318            r#"
2319            INSERT INTO customer_relationships (
2320                id, customer_id, relationship_type, related_identifier,
2321                proof, confirmed_at, created_at
2322            ) VALUES (
2323                ?1, ?2, ?3, ?4, ?5, ?6, ?7
2324            ) ON CONFLICT(customer_id, relationship_type, related_identifier) DO UPDATE SET
2325                proof = excluded.proof,
2326                confirmed_at = excluded.confirmed_at
2327            "#,
2328        )
2329        .bind(&relationship.id)
2330        .bind(&relationship.customer_id)
2331        .bind(&relationship.relationship_type)
2332        .bind(&relationship.related_identifier)
2333        .bind(
2334            relationship
2335                .proof
2336                .as_ref()
2337                .map(serde_json::to_string)
2338                .transpose()?,
2339        )
2340        .bind(&relationship.confirmed_at)
2341        .bind(&relationship.created_at)
2342        .execute(&self.pool)
2343        .await?;
2344
2345        Ok(())
2346    }
2347
2348    /// Get relationships for a customer
2349    pub async fn get_customer_relationships(
2350        &self,
2351        customer_id: &str,
2352    ) -> Result<Vec<CustomerRelationship>, StorageError> {
2353        let rows = sqlx::query_as::<
2354            _,
2355            (
2356                String,
2357                String,
2358                String,
2359                String,
2360                Option<String>,
2361                Option<String>,
2362                String,
2363            ),
2364        >(
2365            r#"
2366            SELECT id, customer_id, relationship_type, related_identifier,
2367                   proof, confirmed_at, created_at
2368            FROM customer_relationships
2369            WHERE customer_id = ?1
2370            "#,
2371        )
2372        .bind(customer_id)
2373        .fetch_all(&self.pool)
2374        .await?;
2375
2376        let mut relationships = Vec::new();
2377        for (
2378            id,
2379            customer_id,
2380            relationship_type,
2381            related_identifier,
2382            proof,
2383            confirmed_at,
2384            created_at,
2385        ) in rows
2386        {
2387            relationships.push(CustomerRelationship {
2388                id,
2389                customer_id,
2390                relationship_type,
2391                related_identifier,
2392                proof: proof.map(|v| serde_json::from_str(&v)).transpose()?,
2393                confirmed_at,
2394                created_at,
2395            });
2396        }
2397
2398        Ok(relationships)
2399    }
2400
2401    /// Search customers by name or identifier
2402    pub async fn search_customers(
2403        &self,
2404        agent_did: &str,
2405        query: &str,
2406        limit: u32,
2407    ) -> Result<Vec<Customer>, StorageError> {
2408        // Escape LIKE metacharacters to prevent pattern injection
2409        let escaped_query = query
2410            .replace('\\', "\\\\")
2411            .replace('%', "\\%")
2412            .replace('_', "\\_");
2413        let search_pattern = format!("%{}%", escaped_query);
2414
2415        let rows = sqlx::query(
2416            r#"
2417            SELECT DISTINCT c.id, c.agent_did, c.schema_type, c.given_name, c.family_name, c.display_name,
2418                   c.legal_name, c.lei_code, c.mcc_code, c.address_country, c.address_locality,
2419                   c.postal_code, c.street_address, c.profile, c.ivms101_data, c.verified_at,
2420                   c.created_at, c.updated_at
2421            FROM customers c
2422            LEFT JOIN customer_identifiers ci ON c.id = ci.customer_id
2423            WHERE c.agent_did = ?1
2424            AND (
2425                c.given_name LIKE ?2 ESCAPE '\'
2426                OR c.family_name LIKE ?2 ESCAPE '\'
2427                OR c.display_name LIKE ?2 ESCAPE '\'
2428                OR c.legal_name LIKE ?2 ESCAPE '\'
2429                OR ci.id LIKE ?2 ESCAPE '\'
2430            )
2431            ORDER BY c.updated_at DESC
2432            LIMIT ?3
2433            "#,
2434        )
2435        .bind(agent_did)
2436        .bind(&search_pattern)
2437        .bind(limit)
2438        .fetch_all(&self.pool)
2439        .await?;
2440
2441        let mut customers = Vec::new();
2442        for row in rows {
2443            customers.push(Customer {
2444                id: row.get("id"),
2445                agent_did: row.get("agent_did"),
2446                schema_type: SchemaType::try_from(row.get::<String, _>("schema_type").as_str())
2447                    .map_err(StorageError::InvalidTransactionType)?,
2448                given_name: row.get("given_name"),
2449                family_name: row.get("family_name"),
2450                display_name: row.get("display_name"),
2451                legal_name: row.get("legal_name"),
2452                lei_code: row.get("lei_code"),
2453                mcc_code: row.get("mcc_code"),
2454                address_country: row.get("address_country"),
2455                address_locality: row.get("address_locality"),
2456                postal_code: row.get("postal_code"),
2457                street_address: row.get("street_address"),
2458                profile: serde_json::from_str(&row.get::<String, _>("profile"))?,
2459                ivms101_data: row
2460                    .get::<Option<String>, _>("ivms101_data")
2461                    .map(|v| serde_json::from_str(&v))
2462                    .transpose()?,
2463                verified_at: row.get("verified_at"),
2464                created_at: row.get("created_at"),
2465                updated_at: row.get("updated_at"),
2466            });
2467        }
2468
2469        Ok(customers)
2470    }
2471
2472    // -----------------------------------------------------------------------
2473    // Decision log operations
2474    // -----------------------------------------------------------------------
2475
2476    /// Insert a new decision into the decision log
2477    ///
2478    /// # Arguments
2479    ///
2480    /// * `transaction_id` - The transaction requiring a decision
2481    /// * `agent_did` - The DID of the agent this decision is for
2482    /// * `decision_type` - The type of decision required
2483    /// * `context_json` - JSON context about the decision (transaction details, etc.)
2484    ///
2485    /// # Returns
2486    ///
2487    /// * `Ok(i64)` - The ID of the created decision log entry
2488    /// * `Err(StorageError)` on database error
2489    pub async fn insert_decision(
2490        &self,
2491        transaction_id: &str,
2492        agent_did: &str,
2493        decision_type: DecisionType,
2494        context_json: &serde_json::Value,
2495    ) -> Result<i64, StorageError> {
2496        debug!(
2497            "Inserting decision for transaction {} agent {} type {}",
2498            transaction_id, agent_did, decision_type
2499        );
2500
2501        let context_str = serde_json::to_string(context_json)?;
2502
2503        let result = sqlx::query(
2504            r#"
2505            INSERT INTO decision_log (transaction_id, agent_did, decision_type, context_json)
2506            VALUES (?1, ?2, ?3, ?4)
2507            "#,
2508        )
2509        .bind(transaction_id)
2510        .bind(agent_did)
2511        .bind(decision_type.to_string())
2512        .bind(&context_str)
2513        .execute(&self.pool)
2514        .await?;
2515
2516        Ok(result.last_insert_rowid())
2517    }
2518
2519    /// Update the status of a decision log entry
2520    ///
2521    /// # Arguments
2522    ///
2523    /// * `decision_id` - The ID of the decision to update
2524    /// * `status` - The new status
2525    /// * `resolution` - Optional resolution action (e.g., "authorize", "reject")
2526    /// * `resolution_detail` - Optional JSON detail about the resolution
2527    ///
2528    /// # Returns
2529    ///
2530    /// * `Ok(())` on success
2531    /// * `Err(StorageError)` on database error
2532    pub async fn update_decision_status(
2533        &self,
2534        decision_id: i64,
2535        status: DecisionStatus,
2536        resolution: Option<&str>,
2537        resolution_detail: Option<&serde_json::Value>,
2538    ) -> Result<(), StorageError> {
2539        debug!("Updating decision {} status to {}", decision_id, status);
2540
2541        let now = chrono::Utc::now().to_rfc3339();
2542        let resolution_detail_str = resolution_detail.map(serde_json::to_string).transpose()?;
2543
2544        let delivered_at = if status == DecisionStatus::Delivered {
2545            Some(now.clone())
2546        } else {
2547            None
2548        };
2549
2550        let resolved_at = if status == DecisionStatus::Resolved {
2551            Some(now)
2552        } else {
2553            None
2554        };
2555
2556        sqlx::query(
2557            r#"
2558            UPDATE decision_log
2559            SET status = ?1,
2560                resolution = COALESCE(?2, resolution),
2561                resolution_detail = COALESCE(?3, resolution_detail),
2562                delivered_at = COALESCE(?4, delivered_at),
2563                resolved_at = COALESCE(?5, resolved_at)
2564            WHERE id = ?6
2565            "#,
2566        )
2567        .bind(status.to_string())
2568        .bind(resolution)
2569        .bind(resolution_detail_str.as_deref())
2570        .bind(delivered_at)
2571        .bind(resolved_at)
2572        .bind(decision_id)
2573        .execute(&self.pool)
2574        .await?;
2575
2576        Ok(())
2577    }
2578
2579    /// List decisions from the decision log with optional filtering
2580    ///
2581    /// # Arguments
2582    ///
2583    /// * `agent_did` - Optional filter by agent DID
2584    /// * `status` - Optional filter by status
2585    /// * `since_id` - Optional minimum decision ID (for pagination/replay)
2586    /// * `limit` - Maximum number of entries to return
2587    ///
2588    /// # Returns
2589    ///
2590    /// * `Ok(Vec<DecisionLogEntry>)` - Matching decision log entries ordered by id ASC
2591    /// * `Err(StorageError)` on database error
2592    pub async fn list_decisions(
2593        &self,
2594        agent_did: Option<&str>,
2595        status: Option<DecisionStatus>,
2596        since_id: Option<i64>,
2597        limit: u32,
2598    ) -> Result<Vec<DecisionLogEntry>, StorageError> {
2599        let mut query = String::from(
2600            "SELECT id, transaction_id, agent_did, decision_type, context_json, \
2601             status, resolution, resolution_detail, created_at, delivered_at, resolved_at \
2602             FROM decision_log WHERE 1=1",
2603        );
2604        let mut bind_values: Vec<String> = Vec::new();
2605        let mut bind_i64: Option<i64> = None;
2606
2607        if let Some(did) = agent_did {
2608            query.push_str(" AND agent_did = ?");
2609            bind_values.push(did.to_string());
2610        }
2611
2612        if let Some(s) = status {
2613            query.push_str(" AND status = ?");
2614            bind_values.push(s.to_string());
2615        }
2616
2617        if let Some(id) = since_id {
2618            query.push_str(" AND id > ?");
2619            bind_i64 = Some(id);
2620        }
2621
2622        query.push_str(" ORDER BY id ASC LIMIT ?");
2623
2624        let mut sqlx_query = sqlx::query(&query);
2625
2626        for value in &bind_values {
2627            sqlx_query = sqlx_query.bind(value);
2628        }
2629
2630        if let Some(id) = bind_i64 {
2631            sqlx_query = sqlx_query.bind(id);
2632        }
2633
2634        sqlx_query = sqlx_query.bind(limit);
2635
2636        let rows = sqlx_query.fetch_all(&self.pool).await?;
2637
2638        let mut entries = Vec::new();
2639        for row in rows {
2640            entries.push(DecisionLogEntry {
2641                id: row.get("id"),
2642                transaction_id: row.get("transaction_id"),
2643                agent_did: row.get("agent_did"),
2644                decision_type: DecisionType::try_from(
2645                    row.get::<String, _>("decision_type").as_str(),
2646                )
2647                .map_err(StorageError::InvalidTransactionType)?,
2648                context_json: serde_json::from_str(&row.get::<String, _>("context_json"))?,
2649                status: DecisionStatus::try_from(row.get::<String, _>("status").as_str())
2650                    .map_err(StorageError::InvalidTransactionType)?,
2651                resolution: row.get("resolution"),
2652                resolution_detail: row
2653                    .get::<Option<String>, _>("resolution_detail")
2654                    .map(|v| serde_json::from_str(&v))
2655                    .transpose()?,
2656                created_at: row.get("created_at"),
2657                delivered_at: row.get("delivered_at"),
2658                resolved_at: row.get("resolved_at"),
2659            });
2660        }
2661
2662        Ok(entries)
2663    }
2664
2665    /// Expire all pending/delivered decisions for a transaction
2666    ///
2667    /// Called when a transaction reaches a terminal state (Rejected, Cancelled, Reverted)
2668    /// to prevent stale decisions from being acted on.
2669    ///
2670    /// # Arguments
2671    ///
2672    /// * `transaction_id` - The transaction whose decisions should be expired
2673    ///
2674    /// # Returns
2675    ///
2676    /// * `Ok(u64)` - Number of decisions expired
2677    /// * `Err(StorageError)` on database error
2678    pub async fn expire_decisions_for_transaction(
2679        &self,
2680        transaction_id: &str,
2681    ) -> Result<u64, StorageError> {
2682        debug!(
2683            "Expiring pending decisions for transaction {}",
2684            transaction_id
2685        );
2686
2687        let result = sqlx::query(
2688            r#"
2689            UPDATE decision_log
2690            SET status = 'expired'
2691            WHERE transaction_id = ?1
2692            AND status IN ('pending', 'delivered')
2693            "#,
2694        )
2695        .bind(transaction_id)
2696        .execute(&self.pool)
2697        .await?;
2698
2699        Ok(result.rows_affected())
2700    }
2701
2702    /// Resolve pending/delivered decisions for a transaction with a specific action.
2703    ///
2704    /// Optionally filter by decision type. If `decision_type` is `None`, all
2705    /// pending/delivered decisions for the transaction are resolved.
2706    ///
2707    /// # Arguments
2708    ///
2709    /// * `transaction_id` - The transaction whose decisions should be resolved
2710    /// * `action` - The resolution action (e.g., "authorize", "reject", "settle")
2711    /// * `decision_type` - Optional filter for specific decision types
2712    ///
2713    /// # Returns
2714    ///
2715    /// * `Ok(u64)` - Number of decisions resolved
2716    /// * `Err(StorageError)` on database error
2717    pub async fn resolve_decisions_for_transaction(
2718        &self,
2719        transaction_id: &str,
2720        action: &str,
2721        decision_type: Option<DecisionType>,
2722    ) -> Result<u64, StorageError> {
2723        debug!(
2724            "Resolving decisions for transaction {} with action: {}",
2725            transaction_id, action
2726        );
2727
2728        let result = if let Some(dt) = decision_type {
2729            sqlx::query(
2730                r#"
2731                UPDATE decision_log
2732                SET status = 'resolved',
2733                    resolution = ?1,
2734                    resolved_at = strftime('%Y-%m-%dT%H:%M:%SZ', 'now')
2735                WHERE transaction_id = ?2
2736                AND decision_type = ?3
2737                AND status IN ('pending', 'delivered')
2738                "#,
2739            )
2740            .bind(action)
2741            .bind(transaction_id)
2742            .bind(dt.to_string())
2743            .execute(&self.pool)
2744            .await?
2745        } else {
2746            sqlx::query(
2747                r#"
2748                UPDATE decision_log
2749                SET status = 'resolved',
2750                    resolution = ?1,
2751                    resolved_at = strftime('%Y-%m-%dT%H:%M:%SZ', 'now')
2752                WHERE transaction_id = ?2
2753                AND status IN ('pending', 'delivered')
2754                "#,
2755            )
2756            .bind(action)
2757            .bind(transaction_id)
2758            .execute(&self.pool)
2759            .await?
2760        };
2761
2762        Ok(result.rows_affected())
2763    }
2764
2765    /// Get a single decision by ID
2766    ///
2767    /// # Arguments
2768    ///
2769    /// * `decision_id` - The ID of the decision
2770    ///
2771    /// # Returns
2772    ///
2773    /// * `Ok(Some(DecisionLogEntry))` if found
2774    /// * `Ok(None)` if not found
2775    /// * `Err(StorageError)` on database error
2776    pub async fn get_decision_by_id(
2777        &self,
2778        decision_id: i64,
2779    ) -> Result<Option<DecisionLogEntry>, StorageError> {
2780        let row = sqlx::query(
2781            r#"
2782            SELECT id, transaction_id, agent_did, decision_type, context_json,
2783                   status, resolution, resolution_detail, created_at, delivered_at, resolved_at
2784            FROM decision_log WHERE id = ?1
2785            "#,
2786        )
2787        .bind(decision_id)
2788        .fetch_optional(&self.pool)
2789        .await?;
2790
2791        match row {
2792            Some(row) => Ok(Some(DecisionLogEntry {
2793                id: row.get("id"),
2794                transaction_id: row.get("transaction_id"),
2795                agent_did: row.get("agent_did"),
2796                decision_type: DecisionType::try_from(
2797                    row.get::<String, _>("decision_type").as_str(),
2798                )
2799                .map_err(StorageError::InvalidTransactionType)?,
2800                context_json: serde_json::from_str(&row.get::<String, _>("context_json"))?,
2801                status: DecisionStatus::try_from(row.get::<String, _>("status").as_str())
2802                    .map_err(StorageError::InvalidTransactionType)?,
2803                resolution: row.get("resolution"),
2804                resolution_detail: row
2805                    .get::<Option<String>, _>("resolution_detail")
2806                    .map(|v| serde_json::from_str(&v))
2807                    .transpose()?,
2808                created_at: row.get("created_at"),
2809                delivered_at: row.get("delivered_at"),
2810                resolved_at: row.get("resolved_at"),
2811            })),
2812            None => Ok(None),
2813        }
2814    }
2815}
2816
2817#[cfg(test)]
2818mod tests {
2819    use super::*;
2820    use tap_msg::message::transfer::Transfer;
2821    use tap_msg::message::Party;
2822    use tempfile::tempdir;
2823
2824    #[tokio::test]
2825    async fn test_storage_creation() {
2826        let dir = tempdir().unwrap();
2827        let db_path = dir.path().join("test.db");
2828
2829        let _storage = Storage::new(Some(db_path)).await.unwrap();
2830        // Just verify we can create a storage instance
2831    }
2832
2833    #[tokio::test]
2834    async fn test_storage_with_did() {
2835        let _ = env_logger::builder().is_test(true).try_init();
2836
2837        let dir = tempdir().unwrap();
2838        let tap_root = dir.path().to_path_buf();
2839        let agent_did = "did:web:example.com";
2840
2841        let storage = Storage::new_with_did(agent_did, Some(tap_root.clone()))
2842            .await
2843            .unwrap();
2844
2845        // Verify the database was created in the expected location
2846        let expected_path = tap_root.join("did_web_example.com").join("transactions.db");
2847        assert!(
2848            expected_path.exists(),
2849            "Database file not created at expected path"
2850        );
2851
2852        // Test that we can use the storage
2853        let messages = storage.list_messages(10, 0, None).await.unwrap();
2854        assert_eq!(messages.len(), 0);
2855    }
2856
2857    #[tokio::test]
2858    async fn test_default_logs_dir() {
2859        let dir = tempdir().unwrap();
2860        let tap_root = dir.path().to_path_buf();
2861
2862        let logs_dir = Storage::default_logs_dir(Some(tap_root.clone()));
2863        assert_eq!(logs_dir, tap_root.join("logs"));
2864
2865        // Test with no tap_root (should use home dir)
2866        let default_logs = Storage::default_logs_dir(None);
2867        assert!(default_logs.to_string_lossy().contains(".tap/logs"));
2868    }
2869
2870    #[tokio::test]
2871    async fn test_insert_and_retrieve_transaction() {
2872        let _ = env_logger::builder().is_test(true).try_init();
2873
2874        let dir = tempdir().unwrap();
2875        let db_path = dir.path().join("test.db");
2876        let storage = Storage::new(Some(db_path)).await.unwrap();
2877
2878        // Create a test transfer message
2879        let transfer_body = Transfer {
2880            transaction_id: Some("test_transfer_123".to_string()),
2881            originator: Some(Party::new("did:example:originator")),
2882            beneficiary: Some(Party::new("did:example:beneficiary")),
2883            asset: "eip155:1/erc20:0x0000000000000000000000000000000000000000"
2884                .parse()
2885                .unwrap(),
2886            amount: "1000000000000000000".to_string(),
2887            agents: vec![],
2888            memo: None,
2889            settlement_id: None,
2890            expiry: None,
2891            transaction_value: None,
2892            connection_id: None,
2893            metadata: Default::default(),
2894        };
2895
2896        let message_id = "test_message_123";
2897        let message = PlainMessage {
2898            id: message_id.to_string(),
2899            typ: "application/didcomm-plain+json".to_string(),
2900            type_: "https://tap-protocol.io/messages/transfer/1.0".to_string(),
2901            body: serde_json::to_value(&transfer_body).unwrap(),
2902            from: "did:example:sender".to_string(),
2903            to: vec!["did:example:receiver".to_string()],
2904            thid: None,
2905            pthid: None,
2906            extra_headers: Default::default(),
2907            attachments: None,
2908            created_time: None,
2909            expires_time: None,
2910            from_prior: None,
2911        };
2912
2913        // Insert transaction
2914        storage.insert_transaction(&message).await.unwrap();
2915
2916        // Retrieve transaction
2917        let retrieved = storage.get_transaction_by_id(message_id).await.unwrap();
2918        assert!(retrieved.is_some(), "Transaction not found");
2919
2920        let tx = retrieved.unwrap();
2921        assert_eq!(tx.reference_id, message_id);
2922        assert_eq!(tx.transaction_type, TransactionType::Transfer);
2923        assert_eq!(tx.status, TransactionStatus::Pending);
2924    }
2925
2926    #[tokio::test]
2927    async fn test_log_and_retrieve_messages() {
2928        let _ = env_logger::builder().is_test(true).try_init();
2929
2930        let dir = tempdir().unwrap();
2931        let db_path = dir.path().join("test.db");
2932        let storage = Storage::new(Some(db_path)).await.unwrap();
2933
2934        // Create test messages of different types
2935        let connect_message = PlainMessage {
2936            id: "msg_connect_123".to_string(),
2937            typ: "application/didcomm-plain+json".to_string(),
2938            type_: "https://tap-protocol.io/messages/connect/1.0".to_string(),
2939            body: serde_json::json!({"constraints": ["test"]}),
2940            from: "did:example:alice".to_string(),
2941            to: vec!["did:example:bob".to_string()],
2942            thid: Some("thread_123".to_string()),
2943            pthid: None,
2944            extra_headers: Default::default(),
2945            attachments: None,
2946            created_time: None,
2947            expires_time: None,
2948            from_prior: None,
2949        };
2950
2951        let authorize_message = PlainMessage {
2952            id: "msg_auth_123".to_string(),
2953            typ: "application/didcomm-plain+json".to_string(),
2954            type_: "https://tap-protocol.io/messages/authorize/1.0".to_string(),
2955            body: serde_json::json!({"transaction_id": "test_transfer_123"}),
2956            from: "did:example:bob".to_string(),
2957            to: vec!["did:example:alice".to_string()],
2958            thid: Some("thread_123".to_string()),
2959            pthid: None,
2960            extra_headers: Default::default(),
2961            attachments: None,
2962            created_time: None,
2963            expires_time: None,
2964            from_prior: None,
2965        };
2966
2967        // Log messages
2968        storage
2969            .log_message(&connect_message, MessageDirection::Incoming)
2970            .await
2971            .unwrap();
2972        storage
2973            .log_message(&authorize_message, MessageDirection::Outgoing)
2974            .await
2975            .unwrap();
2976
2977        // Retrieve specific message
2978        let retrieved = storage.get_message_by_id("msg_connect_123").await.unwrap();
2979        assert!(retrieved.is_some());
2980        let msg = retrieved.unwrap();
2981        assert_eq!(msg.message_id, "msg_connect_123");
2982        assert_eq!(msg.direction, MessageDirection::Incoming);
2983
2984        // List all messages
2985        let all_messages = storage.list_messages(10, 0, None).await.unwrap();
2986        assert_eq!(all_messages.len(), 2);
2987
2988        // List only incoming messages
2989        let incoming_messages = storage
2990            .list_messages(10, 0, Some(MessageDirection::Incoming))
2991            .await
2992            .unwrap();
2993        assert_eq!(incoming_messages.len(), 1);
2994        assert_eq!(incoming_messages[0].message_id, "msg_connect_123");
2995
2996        // Test duplicate message handling (should not error)
2997        storage
2998            .log_message(&connect_message, MessageDirection::Incoming)
2999            .await
3000            .unwrap();
3001        let all_messages_after = storage.list_messages(10, 0, None).await.unwrap();
3002        assert_eq!(all_messages_after.len(), 2); // Should still be 2, not 3
3003    }
3004
3005    // -----------------------------------------------------------------------
3006    // Decision log tests
3007    // -----------------------------------------------------------------------
3008
3009    #[tokio::test]
3010    async fn test_insert_decision() {
3011        let storage = Storage::new_in_memory().await.unwrap();
3012
3013        let context = serde_json::json!({
3014            "transaction_state": "Received",
3015            "pending_agents": ["did:key:z6MkAgent1"],
3016            "transaction": {
3017                "type": "transfer",
3018                "asset": "eip155:1/slip44:60",
3019                "amount": "100"
3020            }
3021        });
3022
3023        let id = storage
3024            .insert_decision(
3025                "txn-001",
3026                "did:key:z6MkAgent1",
3027                DecisionType::AuthorizationRequired,
3028                &context,
3029            )
3030            .await
3031            .unwrap();
3032
3033        assert!(id > 0);
3034
3035        // Verify the entry was created with correct defaults
3036        let entry = storage.get_decision_by_id(id).await.unwrap().unwrap();
3037        assert_eq!(entry.transaction_id, "txn-001");
3038        assert_eq!(entry.agent_did, "did:key:z6MkAgent1");
3039        assert_eq!(entry.decision_type, DecisionType::AuthorizationRequired);
3040        assert_eq!(entry.status, DecisionStatus::Pending);
3041        assert!(entry.resolution.is_none());
3042        assert!(entry.resolution_detail.is_none());
3043        assert!(entry.delivered_at.is_none());
3044        assert!(entry.resolved_at.is_none());
3045        assert_eq!(entry.context_json["transaction"]["type"], "transfer");
3046    }
3047
3048    #[tokio::test]
3049    async fn test_insert_multiple_decision_types() {
3050        let storage = Storage::new_in_memory().await.unwrap();
3051
3052        let context = serde_json::json!({"transaction_id": "txn-002"});
3053
3054        let id1 = storage
3055            .insert_decision(
3056                "txn-002",
3057                "did:key:z6MkAgent1",
3058                DecisionType::AuthorizationRequired,
3059                &context,
3060            )
3061            .await
3062            .unwrap();
3063
3064        let id2 = storage
3065            .insert_decision(
3066                "txn-002",
3067                "did:key:z6MkAgent1",
3068                DecisionType::PolicySatisfactionRequired,
3069                &context,
3070            )
3071            .await
3072            .unwrap();
3073
3074        let id3 = storage
3075            .insert_decision(
3076                "txn-003",
3077                "did:key:z6MkAgent1",
3078                DecisionType::SettlementRequired,
3079                &context,
3080            )
3081            .await
3082            .unwrap();
3083
3084        assert!(id1 < id2);
3085        assert!(id2 < id3);
3086
3087        let e1 = storage.get_decision_by_id(id1).await.unwrap().unwrap();
3088        let e2 = storage.get_decision_by_id(id2).await.unwrap().unwrap();
3089        let e3 = storage.get_decision_by_id(id3).await.unwrap().unwrap();
3090
3091        assert_eq!(e1.decision_type, DecisionType::AuthorizationRequired);
3092        assert_eq!(e2.decision_type, DecisionType::PolicySatisfactionRequired);
3093        assert_eq!(e3.decision_type, DecisionType::SettlementRequired);
3094    }
3095
3096    #[tokio::test]
3097    async fn test_update_decision_status_to_delivered() {
3098        let storage = Storage::new_in_memory().await.unwrap();
3099
3100        let context = serde_json::json!({"info": "test"});
3101        let id = storage
3102            .insert_decision(
3103                "txn-010",
3104                "did:key:z6MkAgent1",
3105                DecisionType::AuthorizationRequired,
3106                &context,
3107            )
3108            .await
3109            .unwrap();
3110
3111        // Mark as delivered
3112        storage
3113            .update_decision_status(id, DecisionStatus::Delivered, None, None)
3114            .await
3115            .unwrap();
3116
3117        let entry = storage.get_decision_by_id(id).await.unwrap().unwrap();
3118        assert_eq!(entry.status, DecisionStatus::Delivered);
3119        assert!(entry.delivered_at.is_some());
3120        assert!(entry.resolved_at.is_none());
3121        assert!(entry.resolution.is_none());
3122    }
3123
3124    #[tokio::test]
3125    async fn test_update_decision_status_to_resolved() {
3126        let storage = Storage::new_in_memory().await.unwrap();
3127
3128        let context = serde_json::json!({"info": "test"});
3129        let id = storage
3130            .insert_decision(
3131                "txn-011",
3132                "did:key:z6MkAgent1",
3133                DecisionType::AuthorizationRequired,
3134                &context,
3135            )
3136            .await
3137            .unwrap();
3138
3139        // First deliver
3140        storage
3141            .update_decision_status(id, DecisionStatus::Delivered, None, None)
3142            .await
3143            .unwrap();
3144
3145        // Then resolve with action
3146        let detail = serde_json::json!({"settlement_address": "eip155:1:0xABC"});
3147        storage
3148            .update_decision_status(
3149                id,
3150                DecisionStatus::Resolved,
3151                Some("authorize"),
3152                Some(&detail),
3153            )
3154            .await
3155            .unwrap();
3156
3157        let entry = storage.get_decision_by_id(id).await.unwrap().unwrap();
3158        assert_eq!(entry.status, DecisionStatus::Resolved);
3159        assert_eq!(entry.resolution.as_deref(), Some("authorize"));
3160        assert!(entry.resolved_at.is_some());
3161        assert!(entry.delivered_at.is_some());
3162        assert_eq!(
3163            entry.resolution_detail.unwrap()["settlement_address"],
3164            "eip155:1:0xABC"
3165        );
3166    }
3167
3168    #[tokio::test]
3169    async fn test_list_decisions_all() {
3170        let storage = Storage::new_in_memory().await.unwrap();
3171
3172        let context = serde_json::json!({"info": "test"});
3173
3174        // Insert 3 decisions
3175        storage
3176            .insert_decision(
3177                "txn-020",
3178                "did:key:z6MkAgent1",
3179                DecisionType::AuthorizationRequired,
3180                &context,
3181            )
3182            .await
3183            .unwrap();
3184        storage
3185            .insert_decision(
3186                "txn-021",
3187                "did:key:z6MkAgent2",
3188                DecisionType::SettlementRequired,
3189                &context,
3190            )
3191            .await
3192            .unwrap();
3193        storage
3194            .insert_decision(
3195                "txn-022",
3196                "did:key:z6MkAgent1",
3197                DecisionType::PolicySatisfactionRequired,
3198                &context,
3199            )
3200            .await
3201            .unwrap();
3202
3203        // List all
3204        let all = storage.list_decisions(None, None, None, 100).await.unwrap();
3205        assert_eq!(all.len(), 3);
3206
3207        // Verify ordering by id ASC
3208        assert!(all[0].id < all[1].id);
3209        assert!(all[1].id < all[2].id);
3210    }
3211
3212    #[tokio::test]
3213    async fn test_list_decisions_by_agent() {
3214        let storage = Storage::new_in_memory().await.unwrap();
3215
3216        let context = serde_json::json!({"info": "test"});
3217
3218        storage
3219            .insert_decision(
3220                "txn-030",
3221                "did:key:z6MkAgent1",
3222                DecisionType::AuthorizationRequired,
3223                &context,
3224            )
3225            .await
3226            .unwrap();
3227        storage
3228            .insert_decision(
3229                "txn-031",
3230                "did:key:z6MkAgent2",
3231                DecisionType::AuthorizationRequired,
3232                &context,
3233            )
3234            .await
3235            .unwrap();
3236        storage
3237            .insert_decision(
3238                "txn-032",
3239                "did:key:z6MkAgent1",
3240                DecisionType::SettlementRequired,
3241                &context,
3242            )
3243            .await
3244            .unwrap();
3245
3246        let agent1 = storage
3247            .list_decisions(Some("did:key:z6MkAgent1"), None, None, 100)
3248            .await
3249            .unwrap();
3250        assert_eq!(agent1.len(), 2);
3251
3252        let agent2 = storage
3253            .list_decisions(Some("did:key:z6MkAgent2"), None, None, 100)
3254            .await
3255            .unwrap();
3256        assert_eq!(agent2.len(), 1);
3257    }
3258
3259    #[tokio::test]
3260    async fn test_list_decisions_by_status() {
3261        let storage = Storage::new_in_memory().await.unwrap();
3262
3263        let context = serde_json::json!({"info": "test"});
3264
3265        let id1 = storage
3266            .insert_decision(
3267                "txn-040",
3268                "did:key:z6MkAgent1",
3269                DecisionType::AuthorizationRequired,
3270                &context,
3271            )
3272            .await
3273            .unwrap();
3274        let _id2 = storage
3275            .insert_decision(
3276                "txn-041",
3277                "did:key:z6MkAgent1",
3278                DecisionType::AuthorizationRequired,
3279                &context,
3280            )
3281            .await
3282            .unwrap();
3283
3284        // Mark first as delivered
3285        storage
3286            .update_decision_status(id1, DecisionStatus::Delivered, None, None)
3287            .await
3288            .unwrap();
3289
3290        let pending = storage
3291            .list_decisions(None, Some(DecisionStatus::Pending), None, 100)
3292            .await
3293            .unwrap();
3294        assert_eq!(pending.len(), 1);
3295        assert_eq!(pending[0].transaction_id, "txn-041");
3296
3297        let delivered = storage
3298            .list_decisions(None, Some(DecisionStatus::Delivered), None, 100)
3299            .await
3300            .unwrap();
3301        assert_eq!(delivered.len(), 1);
3302        assert_eq!(delivered[0].transaction_id, "txn-040");
3303    }
3304
3305    #[tokio::test]
3306    async fn test_list_decisions_since_id() {
3307        let storage = Storage::new_in_memory().await.unwrap();
3308
3309        let context = serde_json::json!({"info": "test"});
3310
3311        let id1 = storage
3312            .insert_decision(
3313                "txn-050",
3314                "did:key:z6MkAgent1",
3315                DecisionType::AuthorizationRequired,
3316                &context,
3317            )
3318            .await
3319            .unwrap();
3320        let id2 = storage
3321            .insert_decision(
3322                "txn-051",
3323                "did:key:z6MkAgent1",
3324                DecisionType::AuthorizationRequired,
3325                &context,
3326            )
3327            .await
3328            .unwrap();
3329        let _id3 = storage
3330            .insert_decision(
3331                "txn-052",
3332                "did:key:z6MkAgent1",
3333                DecisionType::AuthorizationRequired,
3334                &context,
3335            )
3336            .await
3337            .unwrap();
3338
3339        // Get decisions since id1 (should return id2 and id3)
3340        let since = storage
3341            .list_decisions(None, None, Some(id1), 100)
3342            .await
3343            .unwrap();
3344        assert_eq!(since.len(), 2);
3345        assert_eq!(since[0].id, id2);
3346    }
3347
3348    #[tokio::test]
3349    async fn test_list_decisions_with_limit() {
3350        let storage = Storage::new_in_memory().await.unwrap();
3351
3352        let context = serde_json::json!({"info": "test"});
3353
3354        for i in 0..5 {
3355            storage
3356                .insert_decision(
3357                    &format!("txn-06{}", i),
3358                    "did:key:z6MkAgent1",
3359                    DecisionType::AuthorizationRequired,
3360                    &context,
3361                )
3362                .await
3363                .unwrap();
3364        }
3365
3366        let limited = storage.list_decisions(None, None, None, 3).await.unwrap();
3367        assert_eq!(limited.len(), 3);
3368    }
3369
3370    #[tokio::test]
3371    async fn test_expire_decisions_for_transaction() {
3372        let storage = Storage::new_in_memory().await.unwrap();
3373
3374        let context = serde_json::json!({"info": "test"});
3375
3376        // Insert decisions for two transactions
3377        let id1 = storage
3378            .insert_decision(
3379                "txn-070",
3380                "did:key:z6MkAgent1",
3381                DecisionType::AuthorizationRequired,
3382                &context,
3383            )
3384            .await
3385            .unwrap();
3386        let id2 = storage
3387            .insert_decision(
3388                "txn-070",
3389                "did:key:z6MkAgent2",
3390                DecisionType::AuthorizationRequired,
3391                &context,
3392            )
3393            .await
3394            .unwrap();
3395        let id3 = storage
3396            .insert_decision(
3397                "txn-071",
3398                "did:key:z6MkAgent1",
3399                DecisionType::AuthorizationRequired,
3400                &context,
3401            )
3402            .await
3403            .unwrap();
3404
3405        // Mark id2 as delivered (should still be expired)
3406        storage
3407            .update_decision_status(id2, DecisionStatus::Delivered, None, None)
3408            .await
3409            .unwrap();
3410
3411        // Expire decisions for txn-070
3412        let expired_count = storage
3413            .expire_decisions_for_transaction("txn-070")
3414            .await
3415            .unwrap();
3416        assert_eq!(expired_count, 2);
3417
3418        // Verify statuses
3419        let e1 = storage.get_decision_by_id(id1).await.unwrap().unwrap();
3420        assert_eq!(e1.status, DecisionStatus::Expired);
3421
3422        let e2 = storage.get_decision_by_id(id2).await.unwrap().unwrap();
3423        assert_eq!(e2.status, DecisionStatus::Expired);
3424
3425        // txn-071 should not be affected
3426        let e3 = storage.get_decision_by_id(id3).await.unwrap().unwrap();
3427        assert_eq!(e3.status, DecisionStatus::Pending);
3428    }
3429
3430    #[tokio::test]
3431    async fn test_expire_does_not_affect_resolved() {
3432        let storage = Storage::new_in_memory().await.unwrap();
3433
3434        let context = serde_json::json!({"info": "test"});
3435
3436        let id1 = storage
3437            .insert_decision(
3438                "txn-080",
3439                "did:key:z6MkAgent1",
3440                DecisionType::AuthorizationRequired,
3441                &context,
3442            )
3443            .await
3444            .unwrap();
3445        let id2 = storage
3446            .insert_decision(
3447                "txn-080",
3448                "did:key:z6MkAgent2",
3449                DecisionType::AuthorizationRequired,
3450                &context,
3451            )
3452            .await
3453            .unwrap();
3454
3455        // Resolve id1
3456        storage
3457            .update_decision_status(id1, DecisionStatus::Resolved, Some("authorize"), None)
3458            .await
3459            .unwrap();
3460
3461        // Expire txn-080
3462        let expired_count = storage
3463            .expire_decisions_for_transaction("txn-080")
3464            .await
3465            .unwrap();
3466        assert_eq!(expired_count, 1); // Only id2 should be expired
3467
3468        let e1 = storage.get_decision_by_id(id1).await.unwrap().unwrap();
3469        assert_eq!(e1.status, DecisionStatus::Resolved);
3470
3471        let e2 = storage.get_decision_by_id(id2).await.unwrap().unwrap();
3472        assert_eq!(e2.status, DecisionStatus::Expired);
3473    }
3474
3475    #[tokio::test]
3476    async fn test_get_decision_by_id_not_found() {
3477        let storage = Storage::new_in_memory().await.unwrap();
3478
3479        let result = storage.get_decision_by_id(99999).await.unwrap();
3480        assert!(result.is_none());
3481    }
3482
3483    #[tokio::test]
3484    async fn test_list_decisions_combined_filters() {
3485        let storage = Storage::new_in_memory().await.unwrap();
3486
3487        let context = serde_json::json!({"info": "test"});
3488
3489        let id1 = storage
3490            .insert_decision(
3491                "txn-090",
3492                "did:key:z6MkAgent1",
3493                DecisionType::AuthorizationRequired,
3494                &context,
3495            )
3496            .await
3497            .unwrap();
3498        storage
3499            .insert_decision(
3500                "txn-091",
3501                "did:key:z6MkAgent2",
3502                DecisionType::AuthorizationRequired,
3503                &context,
3504            )
3505            .await
3506            .unwrap();
3507        storage
3508            .insert_decision(
3509                "txn-092",
3510                "did:key:z6MkAgent1",
3511                DecisionType::SettlementRequired,
3512                &context,
3513            )
3514            .await
3515            .unwrap();
3516
3517        // Mark first as delivered
3518        storage
3519            .update_decision_status(id1, DecisionStatus::Delivered, None, None)
3520            .await
3521            .unwrap();
3522
3523        // Filter by agent + status
3524        let results = storage
3525            .list_decisions(
3526                Some("did:key:z6MkAgent1"),
3527                Some(DecisionStatus::Pending),
3528                None,
3529                100,
3530            )
3531            .await
3532            .unwrap();
3533        assert_eq!(results.len(), 1);
3534        assert_eq!(results[0].transaction_id, "txn-092");
3535    }
3536
3537    #[tokio::test]
3538    async fn test_resolve_decisions_for_transaction() {
3539        let storage = Storage::new_in_memory().await.unwrap();
3540
3541        let context = serde_json::json!({"info": "test"});
3542
3543        // Insert auth and settlement decisions for same transaction
3544        let id1 = storage
3545            .insert_decision(
3546                "txn-100",
3547                "did:key:z6MkAgent1",
3548                DecisionType::AuthorizationRequired,
3549                &context,
3550            )
3551            .await
3552            .unwrap();
3553        let id2 = storage
3554            .insert_decision(
3555                "txn-100",
3556                "did:key:z6MkAgent1",
3557                DecisionType::SettlementRequired,
3558                &context,
3559            )
3560            .await
3561            .unwrap();
3562
3563        // Resolve only authorization_required decisions
3564        let resolved = storage
3565            .resolve_decisions_for_transaction(
3566                "txn-100",
3567                "authorize",
3568                Some(DecisionType::AuthorizationRequired),
3569            )
3570            .await
3571            .unwrap();
3572        assert_eq!(resolved, 1);
3573
3574        let e1 = storage.get_decision_by_id(id1).await.unwrap().unwrap();
3575        assert_eq!(e1.status, DecisionStatus::Resolved);
3576        assert_eq!(e1.resolution.as_deref(), Some("authorize"));
3577
3578        // Settlement decision should still be pending
3579        let e2 = storage.get_decision_by_id(id2).await.unwrap().unwrap();
3580        assert_eq!(e2.status, DecisionStatus::Pending);
3581    }
3582
3583    #[tokio::test]
3584    async fn test_resolve_decisions_all_types() {
3585        let storage = Storage::new_in_memory().await.unwrap();
3586
3587        let context = serde_json::json!({"info": "test"});
3588
3589        let id1 = storage
3590            .insert_decision(
3591                "txn-101",
3592                "did:key:z6MkAgent1",
3593                DecisionType::AuthorizationRequired,
3594                &context,
3595            )
3596            .await
3597            .unwrap();
3598        let id2 = storage
3599            .insert_decision(
3600                "txn-101",
3601                "did:key:z6MkAgent1",
3602                DecisionType::SettlementRequired,
3603                &context,
3604            )
3605            .await
3606            .unwrap();
3607
3608        // Resolve all decisions (no type filter)
3609        let resolved = storage
3610            .resolve_decisions_for_transaction("txn-101", "reject", None)
3611            .await
3612            .unwrap();
3613        assert_eq!(resolved, 2);
3614
3615        let e1 = storage.get_decision_by_id(id1).await.unwrap().unwrap();
3616        assert_eq!(e1.status, DecisionStatus::Resolved);
3617        assert_eq!(e1.resolution.as_deref(), Some("reject"));
3618
3619        let e2 = storage.get_decision_by_id(id2).await.unwrap().unwrap();
3620        assert_eq!(e2.status, DecisionStatus::Resolved);
3621        assert_eq!(e2.resolution.as_deref(), Some("reject"));
3622    }
3623
3624    #[tokio::test]
3625    async fn test_resolve_does_not_affect_already_resolved() {
3626        let storage = Storage::new_in_memory().await.unwrap();
3627
3628        let context = serde_json::json!({"info": "test"});
3629
3630        let id1 = storage
3631            .insert_decision(
3632                "txn-102",
3633                "did:key:z6MkAgent1",
3634                DecisionType::AuthorizationRequired,
3635                &context,
3636            )
3637            .await
3638            .unwrap();
3639
3640        // Resolve manually first
3641        storage
3642            .update_decision_status(id1, DecisionStatus::Resolved, Some("authorize"), None)
3643            .await
3644            .unwrap();
3645
3646        // Try to resolve again via the bulk method
3647        let resolved = storage
3648            .resolve_decisions_for_transaction("txn-102", "reject", None)
3649            .await
3650            .unwrap();
3651        assert_eq!(resolved, 0); // Already resolved, should not change
3652
3653        let e1 = storage.get_decision_by_id(id1).await.unwrap().unwrap();
3654        assert_eq!(e1.resolution.as_deref(), Some("authorize")); // Original resolution preserved
3655    }
3656}