1use sqlx::sqlite::{SqlitePool, SqlitePoolOptions};
2use sqlx::Row;
3use std::env;
4use std::path::{Path, PathBuf};
5use tap_msg::didcomm::PlainMessage;
6use tracing::{debug, info};
7
8use super::error::StorageError;
9use super::models::{
10 Customer, CustomerIdentifier, CustomerRelationship, DecisionLogEntry, DecisionStatus,
11 DecisionType, Delivery, DeliveryStatus, DeliveryType, IdentifierType, Message,
12 MessageDirection, Received, ReceivedStatus, SchemaType, SourceType, Transaction,
13 TransactionStatus, TransactionType,
14};
15
16#[derive(Clone, Debug)]
54pub struct Storage {
55 pool: SqlitePool,
56 db_path: PathBuf,
57}
58
59impl Storage {
60 pub async fn new_with_did(
78 agent_did: &str,
79 tap_root: Option<PathBuf>,
80 ) -> Result<Self, StorageError> {
81 let root_dir = match tap_root {
82 Some(root) => root,
83 None => {
84 if let Ok(tap_home) = env::var("TAP_HOME") {
85 PathBuf::from(tap_home)
86 } else if let Ok(tap_root) = env::var("TAP_ROOT") {
87 PathBuf::from(tap_root)
88 } else if let Ok(test_dir) = env::var("TAP_TEST_DIR") {
89 PathBuf::from(test_dir).join(".tap")
90 } else {
91 dirs::home_dir()
92 .ok_or_else(|| {
93 StorageError::Migration(
94 "Could not determine home directory".to_string(),
95 )
96 })?
97 .join(".tap")
98 }
99 }
100 };
101
102 let sanitized_did = agent_did.replace([':', '/', '\\'], "_").replace("..", "_");
104 let db_path = root_dir.join(&sanitized_did).join("transactions.db");
105
106 Self::new(Some(db_path)).await
107 }
108
109 pub async fn new_in_memory() -> Result<Self, StorageError> {
112 info!("Initializing in-memory storage for testing");
113
114 let db_url = "sqlite://:memory:";
116
117 let pool = SqlitePoolOptions::new()
119 .max_connections(1) .connect(db_url)
121 .await?;
122
123 sqlx::migrate!("./migrations")
125 .run(&pool)
126 .await
127 .map_err(|e| StorageError::Migration(e.to_string()))?;
128
129 Ok(Storage {
130 pool,
131 db_path: PathBuf::from(":memory:"),
132 })
133 }
134
135 pub async fn new(path: Option<PathBuf>) -> Result<Self, StorageError> {
151 let db_path = path.unwrap_or_else(|| {
152 env::var("TAP_NODE_DB_PATH")
153 .unwrap_or_else(|_| "tap-node.db".to_string())
154 .into()
155 });
156
157 info!("Initializing storage at: {:?}", db_path);
158
159 if let Some(parent) = db_path.parent() {
161 std::fs::create_dir_all(parent)?;
162 }
163
164 let db_url = format!("sqlite://{}?mode=rwc", db_path.display());
166
167 let pool = SqlitePoolOptions::new()
169 .max_connections(10)
170 .connect(&db_url)
171 .await?;
172
173 sqlx::query("PRAGMA journal_mode = WAL")
175 .execute(&pool)
176 .await?;
177 sqlx::query("PRAGMA synchronous = NORMAL")
178 .execute(&pool)
179 .await?;
180
181 sqlx::migrate!("./migrations")
183 .run(&pool)
184 .await
185 .map_err(|e| StorageError::Migration(e.to_string()))?;
186
187 Ok(Storage { pool, db_path })
188 }
189
190 pub fn db_path(&self) -> &Path {
192 &self.db_path
193 }
194
195 pub fn default_logs_dir(tap_root: Option<PathBuf>) -> PathBuf {
205 let root_dir = match tap_root {
206 Some(root) => root,
207 None => {
208 if let Ok(tap_home) = env::var("TAP_HOME") {
209 PathBuf::from(tap_home)
210 } else if let Ok(tap_root) = env::var("TAP_ROOT") {
211 PathBuf::from(tap_root)
212 } else if let Ok(test_dir) = env::var("TAP_TEST_DIR") {
213 PathBuf::from(test_dir).join(".tap")
214 } else {
215 dirs::home_dir()
216 .expect("Could not determine home directory")
217 .join(".tap")
218 }
219 }
220 };
221
222 root_dir.join("logs")
223 }
224
225 pub async fn update_message_status(
236 &self,
237 message_id: &str,
238 status: &str,
239 ) -> Result<(), StorageError> {
240 debug!("Updating message {} status to {}", message_id, status);
241
242 sqlx::query(
243 r#"
244 UPDATE messages
245 SET status = ?1
246 WHERE message_id = ?2
247 "#,
248 )
249 .bind(status)
250 .bind(message_id)
251 .execute(&self.pool)
252 .await?;
253
254 Ok(())
255 }
256
257 pub async fn update_transaction_status(
268 &self,
269 transaction_id: &str,
270 status: &str,
271 ) -> Result<(), StorageError> {
272 debug!(
273 "Updating transaction {} status to {}",
274 transaction_id, status
275 );
276
277 sqlx::query(
278 r#"
279 UPDATE transactions
280 SET status = ?1
281 WHERE reference_id = ?2
282 "#,
283 )
284 .bind(status)
285 .bind(transaction_id)
286 .execute(&self.pool)
287 .await?;
288
289 Ok(())
290 }
291
292 pub async fn get_transaction_by_id(
304 &self,
305 reference_id: &str,
306 ) -> Result<Option<Transaction>, StorageError> {
307 let result = sqlx::query_as::<_, (
308 i64,
309 String,
310 String,
311 Option<String>,
312 Option<String>,
313 Option<String>,
314 String,
315 String,
316 serde_json::Value,
317 String,
318 String,
319 )>(
320 r#"
321 SELECT id, type, reference_id, from_did, to_did, thread_id, message_type, status, message_json, created_at, updated_at
322 FROM transactions WHERE reference_id = ?1
323 "#,
324 )
325 .bind(reference_id)
326 .fetch_optional(&self.pool)
327 .await?;
328
329 if let Some((
330 id,
331 tx_type,
332 reference_id,
333 from_did,
334 to_did,
335 thread_id,
336 message_type,
337 status,
338 message_json,
339 created_at,
340 updated_at,
341 )) = result
342 {
343 Ok(Some(Transaction {
344 id,
345 transaction_type: TransactionType::try_from(tx_type.as_str())
346 .map_err(StorageError::InvalidTransactionType)?,
347 reference_id,
348 from_did,
349 to_did,
350 thread_id,
351 message_type,
352 status: TransactionStatus::try_from(status.as_str())
353 .map_err(StorageError::InvalidTransactionType)?,
354 message_json,
355 created_at,
356 updated_at,
357 }))
358 } else {
359 Ok(None)
360 }
361 }
362
363 pub async fn get_transaction_by_thread_id(
375 &self,
376 thread_id: &str,
377 ) -> Result<Option<Transaction>, StorageError> {
378 let result = sqlx::query_as::<_, (
379 i64,
380 String,
381 String,
382 Option<String>,
383 Option<String>,
384 Option<String>,
385 String,
386 String,
387 serde_json::Value,
388 String,
389 String,
390 )>(
391 r#"
392 SELECT id, type, reference_id, from_did, to_did, thread_id, message_type, status, message_json, created_at, updated_at
393 FROM transactions WHERE thread_id = ?1
394 "#,
395 )
396 .bind(thread_id)
397 .fetch_optional(&self.pool)
398 .await?;
399
400 if let Some((
401 id,
402 tx_type,
403 reference_id,
404 from_did,
405 to_did,
406 thread_id,
407 message_type,
408 status,
409 message_json,
410 created_at,
411 updated_at,
412 )) = result
413 {
414 Ok(Some(Transaction {
415 id,
416 transaction_type: TransactionType::try_from(tx_type.as_str())
417 .map_err(StorageError::InvalidTransactionType)?,
418 reference_id,
419 from_did,
420 to_did,
421 thread_id,
422 message_type,
423 status: TransactionStatus::try_from(status.as_str())
424 .map_err(StorageError::InvalidTransactionType)?,
425 message_json,
426 created_at,
427 updated_at,
428 }))
429 } else {
430 Ok(None)
431 }
432 }
433
434 pub async fn is_agent_authorized_for_transaction(
450 &self,
451 transaction_id: &str,
452 agent_did: &str,
453 ) -> Result<bool, StorageError> {
454 let tx_result = sqlx::query_scalar::<_, i64>(
456 r#"
457 SELECT id FROM transactions WHERE reference_id = ?1
458 "#,
459 )
460 .bind(transaction_id)
461 .fetch_optional(&self.pool)
462 .await?;
463
464 let tx_internal_id = match tx_result {
465 Some(id) => id,
466 None => return Ok(false), };
468
469 let count: i64 = sqlx::query_scalar(
471 r#"
472 SELECT COUNT(*) FROM transaction_agents
473 WHERE transaction_id = ?1 AND agent_did = ?2
474 "#,
475 )
476 .bind(tx_internal_id)
477 .bind(agent_did)
478 .fetch_one(&self.pool)
479 .await?;
480
481 Ok(count > 0)
482 }
483
484 pub async fn insert_transaction_agent(
497 &self,
498 transaction_id: &str,
499 agent_did: &str,
500 agent_role: &str,
501 ) -> Result<(), StorageError> {
502 let tx_result = sqlx::query_scalar::<_, i64>(
504 r#"
505 SELECT id FROM transactions WHERE reference_id = ?1
506 "#,
507 )
508 .bind(transaction_id)
509 .fetch_optional(&self.pool)
510 .await?;
511
512 let tx_internal_id = match tx_result {
513 Some(id) => id,
514 None => {
515 return Err(StorageError::NotFound(format!(
516 "Transaction {} not found",
517 transaction_id
518 )))
519 }
520 };
521
522 sqlx::query(
524 r#"
525 INSERT INTO transaction_agents (transaction_id, agent_did, agent_role, status)
526 VALUES (?1, ?2, ?3, 'pending')
527 ON CONFLICT(transaction_id, agent_did) DO UPDATE SET
528 agent_role = excluded.agent_role,
529 updated_at = strftime('%Y-%m-%dT%H:%M:%SZ', 'now')
530 "#,
531 )
532 .bind(tx_internal_id)
533 .bind(agent_did)
534 .bind(agent_role)
535 .execute(&self.pool)
536 .await?;
537
538 Ok(())
539 }
540
541 pub async fn update_transaction_agent_status(
554 &self,
555 transaction_id: &str,
556 agent_did: &str,
557 status: &str,
558 ) -> Result<(), StorageError> {
559 let tx_result = sqlx::query_scalar::<_, i64>(
561 r#"
562 SELECT id FROM transactions WHERE reference_id = ?1
563 "#,
564 )
565 .bind(transaction_id)
566 .fetch_optional(&self.pool)
567 .await?;
568
569 let tx_internal_id = match tx_result {
570 Some(id) => id,
571 None => {
572 return Err(StorageError::NotFound(format!(
573 "Transaction {} not found",
574 transaction_id
575 )))
576 }
577 };
578
579 let result = sqlx::query(
581 r#"
582 UPDATE transaction_agents
583 SET status = ?1
584 WHERE transaction_id = ?2 AND agent_did = ?3
585 "#,
586 )
587 .bind(status)
588 .bind(tx_internal_id)
589 .bind(agent_did)
590 .execute(&self.pool)
591 .await?;
592
593 if result.rows_affected() == 0 {
594 return Err(StorageError::NotFound(format!(
595 "Agent {} not found for transaction {}",
596 agent_did, transaction_id
597 )));
598 }
599
600 Ok(())
601 }
602
603 pub async fn get_transaction_agents(
614 &self,
615 transaction_id: &str,
616 ) -> Result<Vec<(String, String, String)>, StorageError> {
617 let tx_result = sqlx::query_scalar::<_, i64>(
619 r#"
620 SELECT id FROM transactions WHERE reference_id = ?1
621 "#,
622 )
623 .bind(transaction_id)
624 .fetch_optional(&self.pool)
625 .await?;
626
627 let tx_internal_id = match tx_result {
628 Some(id) => id,
629 None => {
630 return Err(StorageError::NotFound(format!(
631 "Transaction {} not found",
632 transaction_id
633 )))
634 }
635 };
636
637 let agents = sqlx::query_as::<_, (String, String, String)>(
639 r#"
640 SELECT agent_did, agent_role, status
641 FROM transaction_agents
642 WHERE transaction_id = ?1
643 ORDER BY created_at
644 "#,
645 )
646 .bind(tx_internal_id)
647 .fetch_all(&self.pool)
648 .await?;
649
650 Ok(agents)
651 }
652
653 pub async fn are_all_agents_authorized(
665 &self,
666 transaction_id: &str,
667 ) -> Result<bool, StorageError> {
668 let tx_result = sqlx::query_scalar::<_, i64>(
670 r#"
671 SELECT id FROM transactions WHERE reference_id = ?1
672 "#,
673 )
674 .bind(transaction_id)
675 .fetch_optional(&self.pool)
676 .await?;
677
678 let tx_internal_id = match tx_result {
679 Some(id) => id,
680 None => return Ok(false), };
682
683 let non_authorized_count: i64 = sqlx::query_scalar(
685 r#"
686 SELECT COUNT(*) FROM transaction_agents
687 WHERE transaction_id = ?1 AND status != 'authorized'
688 "#,
689 )
690 .bind(tx_internal_id)
691 .fetch_one(&self.pool)
692 .await?;
693
694 Ok(non_authorized_count == 0)
697 }
698
699 pub async fn insert_transaction(&self, message: &PlainMessage) -> Result<(), StorageError> {
715 let message_type = message.type_.clone();
716 let message_json = serde_json::to_value(message)?;
717
718 let message_type_lower = message.type_.to_lowercase();
720 let tx_type = if message_type_lower.contains("transfer") {
721 TransactionType::Transfer
722 } else if message_type_lower.contains("payment") {
723 TransactionType::Payment
724 } else {
725 return Err(StorageError::InvalidTransactionType(
726 message_type.to_string(),
727 ));
728 };
729
730 let reference_id = message.id.clone();
732 let from_did = message.from.clone();
733 let to_did = message.to.first().cloned();
734 let thread_id = message.thid.clone();
735
736 debug!("Inserting transaction: {} ({})", reference_id, tx_type);
737
738 let result = sqlx::query(
739 r#"
740 INSERT INTO transactions (type, reference_id, from_did, to_did, thread_id, message_type, message_json)
741 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)
742 "#,
743 )
744 .bind(tx_type.to_string())
745 .bind(&reference_id)
746 .bind(from_did)
747 .bind(to_did)
748 .bind(thread_id)
749 .bind(message_type.to_string())
750 .bind(sqlx::types::Json(message_json))
751 .execute(&self.pool)
752 .await;
753
754 match result {
755 Ok(_) => {
756 debug!("Successfully inserted transaction: {}", reference_id);
757 Ok(())
758 }
759 Err(sqlx::Error::Database(db_err)) => {
760 if db_err.message().contains("UNIQUE") {
761 Err(StorageError::DuplicateTransaction(reference_id))
762 } else {
763 Err(StorageError::Database(sqlx::Error::Database(db_err)))
764 }
765 }
766 Err(e) => Err(StorageError::Database(e)),
767 }
768 }
769
770 pub async fn list_transactions(
783 &self,
784 limit: u32,
785 offset: u32,
786 ) -> Result<Vec<Transaction>, StorageError> {
787 let rows = sqlx::query_as::<_, (
788 i64,
789 String,
790 String,
791 Option<String>,
792 Option<String>,
793 Option<String>,
794 String,
795 String,
796 serde_json::Value,
797 String,
798 String,
799 )>(
800 r#"
801 SELECT id, type, reference_id, from_did, to_did, thread_id, message_type, status, message_json, created_at, updated_at
802 FROM transactions
803 ORDER BY created_at DESC
804 LIMIT ?1 OFFSET ?2
805 "#,
806 )
807 .bind(limit)
808 .bind(offset)
809 .fetch_all(&self.pool)
810 .await?;
811
812 let mut transactions = Vec::new();
813 for (
814 id,
815 tx_type,
816 reference_id,
817 from_did,
818 to_did,
819 thread_id,
820 message_type,
821 status,
822 message_json,
823 created_at,
824 updated_at,
825 ) in rows
826 {
827 transactions.push(Transaction {
828 id,
829 transaction_type: TransactionType::try_from(tx_type.as_str())
830 .map_err(StorageError::InvalidTransactionType)?,
831 reference_id,
832 from_did,
833 to_did,
834 thread_id,
835 message_type,
836 status: TransactionStatus::try_from(status.as_str())
837 .map_err(StorageError::InvalidTransactionType)?,
838 message_json,
839 created_at,
840 updated_at,
841 });
842 }
843
844 Ok(transactions)
845 }
846
847 pub async fn log_message(
862 &self,
863 message: &PlainMessage,
864 direction: MessageDirection,
865 ) -> Result<(), StorageError> {
866 let message_json = serde_json::to_value(message)?;
867 let message_id = message.id.clone();
868 let message_type = message.type_.clone();
869 let from_did = message.from.clone();
870 let to_did = message.to.first().cloned();
871 let thread_id = message.thid.clone();
872 let parent_thread_id = message.pthid.clone();
873
874 debug!(
875 "Logging {} message: {} ({})",
876 direction, message_id, message_type
877 );
878
879 let result = sqlx::query(
880 r#"
881 INSERT INTO messages (message_id, message_type, from_did, to_did, thread_id, parent_thread_id, direction, message_json)
882 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)
883 "#,
884 )
885 .bind(&message_id)
886 .bind(message_type)
887 .bind(from_did)
888 .bind(to_did)
889 .bind(thread_id)
890 .bind(parent_thread_id)
891 .bind(direction.to_string())
892 .bind(sqlx::types::Json(message_json))
893 .execute(&self.pool)
894 .await;
895
896 match result {
897 Ok(_) => {
898 debug!("Successfully logged message: {}", message_id);
899 Ok(())
900 }
901 Err(sqlx::Error::Database(db_err)) => {
902 if db_err.message().contains("UNIQUE") {
903 debug!("Message already logged: {}", message_id);
905 Ok(())
906 } else {
907 Err(StorageError::Database(sqlx::Error::Database(db_err)))
908 }
909 }
910 Err(e) => Err(StorageError::Database(e)),
911 }
912 }
913
914 pub async fn get_message_by_id(
926 &self,
927 message_id: &str,
928 ) -> Result<Option<Message>, StorageError> {
929 let result = sqlx::query_as::<_, (
930 i64,
931 String,
932 String,
933 Option<String>,
934 Option<String>,
935 Option<String>,
936 Option<String>,
937 String,
938 serde_json::Value,
939 String,
940 )>(
941 r#"
942 SELECT id, message_id, message_type, from_did, to_did, thread_id, parent_thread_id, direction, message_json, created_at
943 FROM messages WHERE message_id = ?1
944 "#,
945 )
946 .bind(message_id)
947 .fetch_optional(&self.pool)
948 .await?;
949
950 match result {
951 Some((
952 id,
953 message_id,
954 message_type,
955 from_did,
956 to_did,
957 thread_id,
958 parent_thread_id,
959 direction,
960 message_json,
961 created_at,
962 )) => Ok(Some(Message {
963 id,
964 message_id,
965 message_type,
966 from_did,
967 to_did,
968 thread_id,
969 parent_thread_id,
970 direction: MessageDirection::try_from(direction.as_str())
971 .map_err(StorageError::InvalidTransactionType)?,
972 message_json,
973 created_at,
974 })),
975 None => Ok(None),
976 }
977 }
978
979 pub async fn list_messages(
991 &self,
992 limit: u32,
993 offset: u32,
994 direction: Option<MessageDirection>,
995 ) -> Result<Vec<Message>, StorageError> {
996 let rows = if let Some(dir) = direction {
997 sqlx::query_as::<_, (
998 i64,
999 String,
1000 String,
1001 Option<String>,
1002 Option<String>,
1003 Option<String>,
1004 Option<String>,
1005 String,
1006 serde_json::Value,
1007 String,
1008 )>(
1009 r#"
1010 SELECT id, message_id, message_type, from_did, to_did, thread_id, parent_thread_id, direction, message_json, created_at
1011 FROM messages
1012 WHERE direction = ?1
1013 ORDER BY created_at DESC
1014 LIMIT ?2 OFFSET ?3
1015 "#,
1016 )
1017 .bind(dir.to_string())
1018 .bind(limit)
1019 .bind(offset)
1020 .fetch_all(&self.pool)
1021 .await?
1022 } else {
1023 sqlx::query_as::<_, (
1024 i64,
1025 String,
1026 String,
1027 Option<String>,
1028 Option<String>,
1029 Option<String>,
1030 Option<String>,
1031 String,
1032 serde_json::Value,
1033 String,
1034 )>(
1035 r#"
1036 SELECT id, message_id, message_type, from_did, to_did, thread_id, parent_thread_id, direction, message_json, created_at
1037 FROM messages
1038 ORDER BY created_at DESC
1039 LIMIT ?1 OFFSET ?2
1040 "#,
1041 )
1042 .bind(limit)
1043 .bind(offset)
1044 .fetch_all(&self.pool)
1045 .await?
1046 };
1047
1048 let mut messages = Vec::new();
1049 for (
1050 id,
1051 message_id,
1052 message_type,
1053 from_did,
1054 to_did,
1055 thread_id,
1056 parent_thread_id,
1057 direction,
1058 message_json,
1059 created_at,
1060 ) in rows
1061 {
1062 messages.push(Message {
1063 id,
1064 message_id,
1065 message_type,
1066 from_did,
1067 to_did,
1068 thread_id,
1069 parent_thread_id,
1070 direction: MessageDirection::try_from(direction.as_str())
1071 .map_err(StorageError::InvalidTransactionType)?,
1072 message_json,
1073 created_at,
1074 });
1075 }
1076
1077 Ok(messages)
1078 }
1079
1080 pub async fn create_delivery(
1095 &self,
1096 message_id: &str,
1097 message_text: &str,
1098 recipient_did: &str,
1099 delivery_url: Option<&str>,
1100 delivery_type: DeliveryType,
1101 ) -> Result<i64, StorageError> {
1102 let result = sqlx::query(
1103 r#"
1104 INSERT INTO deliveries (message_id, message_text, recipient_did, delivery_url, delivery_type, status, retry_count)
1105 VALUES (?1, ?2, ?3, ?4, ?5, 'pending', 0)
1106 "#,
1107 )
1108 .bind(message_id)
1109 .bind(message_text)
1110 .bind(recipient_did)
1111 .bind(delivery_url)
1112 .bind(delivery_type.to_string())
1113 .execute(&self.pool)
1114 .await?;
1115
1116 Ok(result.last_insert_rowid())
1117 }
1118
1119 pub async fn update_delivery_status(
1133 &self,
1134 delivery_id: i64,
1135 status: DeliveryStatus,
1136 http_status_code: Option<i32>,
1137 error_message: Option<&str>,
1138 ) -> Result<(), StorageError> {
1139 let now = chrono::Utc::now().to_rfc3339();
1140 let delivered_at = if status == DeliveryStatus::Success {
1141 Some(now.clone())
1142 } else {
1143 None
1144 };
1145
1146 sqlx::query(
1147 r#"
1148 UPDATE deliveries
1149 SET status = ?1, last_http_status_code = ?2, error_message = ?3, updated_at = ?4, delivered_at = ?5
1150 WHERE id = ?6
1151 "#,
1152 )
1153 .bind(status.to_string())
1154 .bind(http_status_code)
1155 .bind(error_message)
1156 .bind(now)
1157 .bind(delivered_at)
1158 .bind(delivery_id)
1159 .execute(&self.pool)
1160 .await?;
1161
1162 Ok(())
1163 }
1164
1165 pub async fn increment_delivery_retry_count(
1176 &self,
1177 delivery_id: i64,
1178 ) -> Result<(), StorageError> {
1179 sqlx::query(
1180 r#"
1181 UPDATE deliveries
1182 SET retry_count = retry_count + 1, updated_at = ?1
1183 WHERE id = ?2
1184 "#,
1185 )
1186 .bind(chrono::Utc::now().to_rfc3339())
1187 .bind(delivery_id)
1188 .execute(&self.pool)
1189 .await?;
1190
1191 Ok(())
1192 }
1193
1194 pub async fn get_delivery_by_id(
1206 &self,
1207 delivery_id: i64,
1208 ) -> Result<Option<Delivery>, StorageError> {
1209 let result = sqlx::query_as::<
1210 _,
1211 (
1212 i64,
1213 String,
1214 String,
1215 String,
1216 Option<String>,
1217 String,
1218 String,
1219 i32,
1220 Option<i32>,
1221 Option<String>,
1222 String,
1223 String,
1224 Option<String>,
1225 ),
1226 >(
1227 r#"
1228 SELECT id, message_id, message_text, recipient_did, delivery_url, delivery_type, status, retry_count,
1229 last_http_status_code, error_message, created_at, updated_at, delivered_at
1230 FROM deliveries WHERE id = ?1
1231 "#,
1232 )
1233 .bind(delivery_id)
1234 .fetch_optional(&self.pool)
1235 .await?;
1236
1237 match result {
1238 Some((
1239 id,
1240 message_id,
1241 message_text,
1242 recipient_did,
1243 delivery_url,
1244 delivery_type,
1245 status,
1246 retry_count,
1247 last_http_status_code,
1248 error_message,
1249 created_at,
1250 updated_at,
1251 delivered_at,
1252 )) => Ok(Some(Delivery {
1253 id,
1254 message_id,
1255 message_text,
1256 recipient_did,
1257 delivery_url,
1258 delivery_type: DeliveryType::try_from(delivery_type.as_str())
1259 .map_err(StorageError::InvalidTransactionType)?,
1260 status: DeliveryStatus::try_from(status.as_str())
1261 .map_err(StorageError::InvalidTransactionType)?,
1262 retry_count,
1263 last_http_status_code,
1264 error_message,
1265 created_at,
1266 updated_at,
1267 delivered_at,
1268 })),
1269 None => Ok(None),
1270 }
1271 }
1272
1273 pub async fn get_deliveries_for_message(
1284 &self,
1285 message_id: &str,
1286 ) -> Result<Vec<Delivery>, StorageError> {
1287 let rows = sqlx::query_as::<
1288 _,
1289 (
1290 i64,
1291 String,
1292 String,
1293 String,
1294 Option<String>,
1295 String,
1296 String,
1297 i32,
1298 Option<i32>,
1299 Option<String>,
1300 String,
1301 String,
1302 Option<String>,
1303 ),
1304 >(
1305 r#"
1306 SELECT id, message_id, message_text, recipient_did, delivery_url, delivery_type, status, retry_count,
1307 last_http_status_code, error_message, created_at, updated_at, delivered_at
1308 FROM deliveries WHERE message_id = ?1
1309 ORDER BY created_at ASC
1310 "#,
1311 )
1312 .bind(message_id)
1313 .fetch_all(&self.pool)
1314 .await?;
1315
1316 let mut deliveries = Vec::new();
1317 for (
1318 id,
1319 message_id,
1320 message_text,
1321 recipient_did,
1322 delivery_url,
1323 delivery_type,
1324 status,
1325 retry_count,
1326 last_http_status_code,
1327 error_message,
1328 created_at,
1329 updated_at,
1330 delivered_at,
1331 ) in rows
1332 {
1333 deliveries.push(Delivery {
1334 id,
1335 message_id,
1336 message_text,
1337 recipient_did,
1338 delivery_url,
1339 delivery_type: DeliveryType::try_from(delivery_type.as_str())
1340 .map_err(StorageError::InvalidTransactionType)?,
1341 status: DeliveryStatus::try_from(status.as_str())
1342 .map_err(StorageError::InvalidTransactionType)?,
1343 retry_count,
1344 last_http_status_code,
1345 error_message,
1346 created_at,
1347 updated_at,
1348 delivered_at,
1349 });
1350 }
1351
1352 Ok(deliveries)
1353 }
1354
1355 pub async fn get_pending_deliveries(
1367 &self,
1368 max_retry_count: i32,
1369 limit: u32,
1370 ) -> Result<Vec<Delivery>, StorageError> {
1371 let rows = sqlx::query_as::<
1372 _,
1373 (
1374 i64,
1375 String,
1376 String,
1377 String,
1378 Option<String>,
1379 String,
1380 String,
1381 i32,
1382 Option<i32>,
1383 Option<String>,
1384 String,
1385 String,
1386 Option<String>,
1387 ),
1388 >(
1389 r#"
1390 SELECT id, message_id, message_text, recipient_did, delivery_url, delivery_type, status, retry_count,
1391 last_http_status_code, error_message, created_at, updated_at, delivered_at
1392 FROM deliveries
1393 WHERE status = 'pending' AND retry_count < ?1
1394 ORDER BY created_at ASC
1395 LIMIT ?2
1396 "#,
1397 )
1398 .bind(max_retry_count)
1399 .bind(limit)
1400 .fetch_all(&self.pool)
1401 .await?;
1402
1403 let mut deliveries = Vec::new();
1404 for (
1405 id,
1406 message_id,
1407 message_text,
1408 recipient_did,
1409 delivery_url,
1410 delivery_type,
1411 status,
1412 retry_count,
1413 last_http_status_code,
1414 error_message,
1415 created_at,
1416 updated_at,
1417 delivered_at,
1418 ) in rows
1419 {
1420 deliveries.push(Delivery {
1421 id,
1422 message_id,
1423 message_text,
1424 recipient_did,
1425 delivery_url,
1426 delivery_type: DeliveryType::try_from(delivery_type.as_str())
1427 .map_err(StorageError::InvalidTransactionType)?,
1428 status: DeliveryStatus::try_from(status.as_str())
1429 .map_err(StorageError::InvalidTransactionType)?,
1430 retry_count,
1431 last_http_status_code,
1432 error_message,
1433 created_at,
1434 updated_at,
1435 delivered_at,
1436 });
1437 }
1438
1439 Ok(deliveries)
1440 }
1441
1442 pub async fn get_failed_deliveries_for_recipient(
1455 &self,
1456 recipient_did: &str,
1457 limit: u32,
1458 offset: u32,
1459 ) -> Result<Vec<Delivery>, StorageError> {
1460 let rows = sqlx::query_as::<
1461 _,
1462 (
1463 i64,
1464 String,
1465 String,
1466 String,
1467 Option<String>,
1468 String,
1469 String,
1470 i32,
1471 Option<i32>,
1472 Option<String>,
1473 String,
1474 String,
1475 Option<String>,
1476 ),
1477 >(
1478 r#"
1479 SELECT id, message_id, message_text, recipient_did, delivery_url, delivery_type, status, retry_count,
1480 last_http_status_code, error_message, created_at, updated_at, delivered_at
1481 FROM deliveries
1482 WHERE recipient_did = ?1 AND status = 'failed'
1483 ORDER BY updated_at DESC
1484 LIMIT ?2 OFFSET ?3
1485 "#,
1486 )
1487 .bind(recipient_did)
1488 .bind(limit)
1489 .bind(offset)
1490 .fetch_all(&self.pool)
1491 .await?;
1492
1493 let mut deliveries = Vec::new();
1494 for (
1495 id,
1496 message_id,
1497 message_text,
1498 recipient_did,
1499 delivery_url,
1500 delivery_type,
1501 status,
1502 retry_count,
1503 last_http_status_code,
1504 error_message,
1505 created_at,
1506 updated_at,
1507 delivered_at,
1508 ) in rows
1509 {
1510 deliveries.push(Delivery {
1511 id,
1512 message_id,
1513 message_text,
1514 recipient_did,
1515 delivery_url,
1516 delivery_type: DeliveryType::try_from(delivery_type.as_str())
1517 .map_err(StorageError::InvalidTransactionType)?,
1518 status: DeliveryStatus::try_from(status.as_str())
1519 .map_err(StorageError::InvalidTransactionType)?,
1520 retry_count,
1521 last_http_status_code,
1522 error_message,
1523 created_at,
1524 updated_at,
1525 delivered_at,
1526 });
1527 }
1528
1529 Ok(deliveries)
1530 }
1531
1532 pub async fn get_deliveries_by_recipient(
1545 &self,
1546 recipient_did: &str,
1547 limit: u32,
1548 offset: u32,
1549 ) -> Result<Vec<Delivery>, StorageError> {
1550 let rows = sqlx::query_as::<
1551 _,
1552 (
1553 i64,
1554 String,
1555 String,
1556 String,
1557 Option<String>,
1558 String,
1559 String,
1560 i32,
1561 Option<i32>,
1562 Option<String>,
1563 String,
1564 String,
1565 Option<String>,
1566 ),
1567 >(
1568 r#"
1569 SELECT id, message_id, message_text, recipient_did, delivery_url, delivery_type, status, retry_count,
1570 last_http_status_code, error_message, created_at, updated_at, delivered_at
1571 FROM deliveries
1572 WHERE recipient_did = ?1
1573 ORDER BY created_at DESC
1574 LIMIT ?2 OFFSET ?3
1575 "#,
1576 )
1577 .bind(recipient_did)
1578 .bind(limit)
1579 .bind(offset)
1580 .fetch_all(&self.pool)
1581 .await?;
1582
1583 let mut deliveries = Vec::new();
1584 for (
1585 id,
1586 message_id,
1587 message_text,
1588 recipient_did,
1589 delivery_url,
1590 delivery_type,
1591 status,
1592 retry_count,
1593 last_http_status_code,
1594 error_message,
1595 created_at,
1596 updated_at,
1597 delivered_at,
1598 ) in rows
1599 {
1600 deliveries.push(Delivery {
1601 id,
1602 message_id,
1603 message_text,
1604 recipient_did,
1605 delivery_url,
1606 delivery_type: delivery_type
1607 .parse::<DeliveryType>()
1608 .unwrap_or(DeliveryType::Internal),
1609 status: status
1610 .parse::<DeliveryStatus>()
1611 .unwrap_or(DeliveryStatus::Pending),
1612 retry_count,
1613 last_http_status_code,
1614 error_message,
1615 created_at,
1616 updated_at,
1617 delivered_at,
1618 });
1619 }
1620
1621 Ok(deliveries)
1622 }
1623
1624 pub async fn get_deliveries_for_thread(
1637 &self,
1638 thread_id: &str,
1639 limit: u32,
1640 offset: u32,
1641 ) -> Result<Vec<Delivery>, StorageError> {
1642 let rows = sqlx::query_as::<
1643 _,
1644 (
1645 i64,
1646 String,
1647 String,
1648 String,
1649 Option<String>,
1650 String,
1651 String,
1652 i32,
1653 Option<i32>,
1654 Option<String>,
1655 String,
1656 String,
1657 Option<String>,
1658 ),
1659 >(
1660 r#"
1661 SELECT d.id, d.message_id, d.message_text, d.recipient_did, d.delivery_url,
1662 d.delivery_type, d.status, d.retry_count, d.last_http_status_code,
1663 d.error_message, d.created_at, d.updated_at, d.delivered_at
1664 FROM deliveries d
1665 INNER JOIN messages m ON d.message_id = m.message_id
1666 WHERE m.thread_id = ?1
1667 ORDER BY d.created_at ASC
1668 LIMIT ?2 OFFSET ?3
1669 "#,
1670 )
1671 .bind(thread_id)
1672 .bind(limit)
1673 .bind(offset)
1674 .fetch_all(&self.pool)
1675 .await?;
1676
1677 let mut deliveries = Vec::new();
1678 for (
1679 id,
1680 message_id,
1681 message_text,
1682 recipient_did,
1683 delivery_url,
1684 delivery_type,
1685 status,
1686 retry_count,
1687 last_http_status_code,
1688 error_message,
1689 created_at,
1690 updated_at,
1691 delivered_at,
1692 ) in rows
1693 {
1694 deliveries.push(Delivery {
1695 id,
1696 message_id,
1697 message_text,
1698 recipient_did,
1699 delivery_url,
1700 delivery_type: delivery_type
1701 .parse::<DeliveryType>()
1702 .unwrap_or(DeliveryType::Internal),
1703 status: status
1704 .parse::<DeliveryStatus>()
1705 .unwrap_or(DeliveryStatus::Pending),
1706 retry_count,
1707 last_http_status_code,
1708 error_message,
1709 created_at,
1710 updated_at,
1711 delivered_at,
1712 });
1713 }
1714
1715 Ok(deliveries)
1716 }
1717
1718 pub async fn create_received(
1733 &self,
1734 raw_message: &str,
1735 source_type: SourceType,
1736 source_identifier: Option<&str>,
1737 ) -> Result<i64, StorageError> {
1738 let message_id =
1740 if let Ok(json_value) = serde_json::from_str::<serde_json::Value>(raw_message) {
1741 json_value
1742 .get("id")
1743 .and_then(|v| v.as_str())
1744 .map(|s| s.to_string())
1745 } else {
1746 None
1747 };
1748
1749 let result = sqlx::query(
1750 r#"
1751 INSERT INTO received (message_id, raw_message, source_type, source_identifier)
1752 VALUES (?1, ?2, ?3, ?4)
1753 "#,
1754 )
1755 .bind(message_id)
1756 .bind(raw_message)
1757 .bind(source_type.to_string())
1758 .bind(source_identifier)
1759 .execute(&self.pool)
1760 .await?;
1761
1762 Ok(result.last_insert_rowid())
1763 }
1764
1765 pub async fn update_received_status(
1779 &self,
1780 received_id: i64,
1781 status: ReceivedStatus,
1782 processed_message_id: Option<&str>,
1783 error_message: Option<&str>,
1784 ) -> Result<(), StorageError> {
1785 let now = chrono::Utc::now().to_rfc3339();
1786
1787 sqlx::query(
1788 r#"
1789 UPDATE received
1790 SET status = ?1, processed_at = ?2, processed_message_id = ?3, error_message = ?4
1791 WHERE id = ?5
1792 "#,
1793 )
1794 .bind(status.to_string())
1795 .bind(&now)
1796 .bind(processed_message_id)
1797 .bind(error_message)
1798 .bind(received_id)
1799 .execute(&self.pool)
1800 .await?;
1801
1802 Ok(())
1803 }
1804
1805 pub async fn get_received_by_id(
1817 &self,
1818 received_id: i64,
1819 ) -> Result<Option<Received>, StorageError> {
1820 let result = sqlx::query_as::<
1821 _,
1822 (
1823 i64,
1824 Option<String>,
1825 String,
1826 String,
1827 Option<String>,
1828 String,
1829 Option<String>,
1830 String,
1831 Option<String>,
1832 Option<String>,
1833 ),
1834 >(
1835 r#"
1836 SELECT id, message_id, raw_message, source_type, source_identifier,
1837 status, error_message, received_at, processed_at, processed_message_id
1838 FROM received WHERE id = ?1
1839 "#,
1840 )
1841 .bind(received_id)
1842 .fetch_optional(&self.pool)
1843 .await?;
1844
1845 match result {
1846 Some((
1847 id,
1848 message_id,
1849 raw_message,
1850 source_type,
1851 source_identifier,
1852 status,
1853 error_message,
1854 received_at,
1855 processed_at,
1856 processed_message_id,
1857 )) => Ok(Some(Received {
1858 id,
1859 message_id,
1860 raw_message,
1861 source_type: SourceType::try_from(source_type.as_str())
1862 .map_err(StorageError::InvalidTransactionType)?,
1863 source_identifier,
1864 status: ReceivedStatus::try_from(status.as_str())
1865 .map_err(StorageError::InvalidTransactionType)?,
1866 error_message,
1867 received_at,
1868 processed_at,
1869 processed_message_id,
1870 })),
1871 None => Ok(None),
1872 }
1873 }
1874
1875 pub async fn get_pending_received(&self, limit: u32) -> Result<Vec<Received>, StorageError> {
1886 let rows = sqlx::query_as::<
1887 _,
1888 (
1889 i64,
1890 Option<String>,
1891 String,
1892 String,
1893 Option<String>,
1894 String,
1895 Option<String>,
1896 String,
1897 Option<String>,
1898 Option<String>,
1899 ),
1900 >(
1901 r#"
1902 SELECT id, message_id, raw_message, source_type, source_identifier,
1903 status, error_message, received_at, processed_at, processed_message_id
1904 FROM received
1905 WHERE status = 'pending'
1906 ORDER BY received_at ASC
1907 LIMIT ?1
1908 "#,
1909 )
1910 .bind(limit)
1911 .fetch_all(&self.pool)
1912 .await?;
1913
1914 let mut received_messages = Vec::new();
1915 for (
1916 id,
1917 message_id,
1918 raw_message,
1919 source_type,
1920 source_identifier,
1921 status,
1922 error_message,
1923 received_at,
1924 processed_at,
1925 processed_message_id,
1926 ) in rows
1927 {
1928 received_messages.push(Received {
1929 id,
1930 message_id,
1931 raw_message,
1932 source_type: SourceType::try_from(source_type.as_str())
1933 .map_err(StorageError::InvalidTransactionType)?,
1934 source_identifier,
1935 status: ReceivedStatus::try_from(status.as_str())
1936 .map_err(StorageError::InvalidTransactionType)?,
1937 error_message,
1938 received_at,
1939 processed_at,
1940 processed_message_id,
1941 });
1942 }
1943
1944 Ok(received_messages)
1945 }
1946
1947 pub async fn list_received(
1961 &self,
1962 limit: u32,
1963 offset: u32,
1964 source_type: Option<SourceType>,
1965 status: Option<ReceivedStatus>,
1966 ) -> Result<Vec<Received>, StorageError> {
1967 let mut query = "SELECT id, message_id, raw_message, source_type, source_identifier, status, error_message, received_at, processed_at, processed_message_id FROM received WHERE 1=1".to_string();
1968 let mut bind_values: Vec<String> = Vec::new();
1969
1970 if let Some(st) = source_type {
1971 query.push_str(" AND source_type = ?");
1972 bind_values.push(st.to_string());
1973 }
1974
1975 if let Some(s) = status {
1976 query.push_str(" AND status = ?");
1977 bind_values.push(s.to_string());
1978 }
1979
1980 query.push_str(" ORDER BY received_at DESC LIMIT ? OFFSET ?");
1981
1982 let mut sqlx_query = sqlx::query_as::<
1984 _,
1985 (
1986 i64,
1987 Option<String>,
1988 String,
1989 String,
1990 Option<String>,
1991 String,
1992 Option<String>,
1993 String,
1994 Option<String>,
1995 Option<String>,
1996 ),
1997 >(&query);
1998
1999 for value in bind_values {
2000 sqlx_query = sqlx_query.bind(value);
2001 }
2002
2003 let rows = sqlx_query
2004 .bind(limit)
2005 .bind(offset)
2006 .fetch_all(&self.pool)
2007 .await?;
2008
2009 let mut received_messages = Vec::new();
2010 for (
2011 id,
2012 message_id,
2013 raw_message,
2014 source_type,
2015 source_identifier,
2016 status,
2017 error_message,
2018 received_at,
2019 processed_at,
2020 processed_message_id,
2021 ) in rows
2022 {
2023 received_messages.push(Received {
2024 id,
2025 message_id,
2026 raw_message,
2027 source_type: SourceType::try_from(source_type.as_str())
2028 .map_err(StorageError::InvalidTransactionType)?,
2029 source_identifier,
2030 status: ReceivedStatus::try_from(status.as_str())
2031 .map_err(StorageError::InvalidTransactionType)?,
2032 error_message,
2033 received_at,
2034 processed_at,
2035 processed_message_id,
2036 });
2037 }
2038
2039 Ok(received_messages)
2040 }
2041
2042 pub async fn upsert_customer(&self, customer: &Customer) -> Result<(), StorageError> {
2046 sqlx::query(
2047 r#"
2048 INSERT INTO customers (
2049 id, agent_did, schema_type, given_name, family_name, display_name,
2050 legal_name, lei_code, mcc_code, address_country, address_locality,
2051 postal_code, street_address, profile, ivms101_data, verified_at,
2052 created_at, updated_at
2053 ) VALUES (
2054 ?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16, ?17, ?18
2055 ) ON CONFLICT(id) DO UPDATE SET
2056 agent_did = excluded.agent_did,
2057 schema_type = excluded.schema_type,
2058 given_name = excluded.given_name,
2059 family_name = excluded.family_name,
2060 display_name = excluded.display_name,
2061 legal_name = excluded.legal_name,
2062 lei_code = excluded.lei_code,
2063 mcc_code = excluded.mcc_code,
2064 address_country = excluded.address_country,
2065 address_locality = excluded.address_locality,
2066 postal_code = excluded.postal_code,
2067 street_address = excluded.street_address,
2068 profile = excluded.profile,
2069 ivms101_data = excluded.ivms101_data,
2070 verified_at = excluded.verified_at,
2071 updated_at = excluded.updated_at
2072 "#,
2073 )
2074 .bind(&customer.id)
2075 .bind(&customer.agent_did)
2076 .bind(customer.schema_type.to_string())
2077 .bind(&customer.given_name)
2078 .bind(&customer.family_name)
2079 .bind(&customer.display_name)
2080 .bind(&customer.legal_name)
2081 .bind(&customer.lei_code)
2082 .bind(&customer.mcc_code)
2083 .bind(&customer.address_country)
2084 .bind(&customer.address_locality)
2085 .bind(&customer.postal_code)
2086 .bind(&customer.street_address)
2087 .bind(serde_json::to_string(&customer.profile)?)
2088 .bind(
2089 customer
2090 .ivms101_data
2091 .as_ref()
2092 .map(serde_json::to_string)
2093 .transpose()?,
2094 )
2095 .bind(&customer.verified_at)
2096 .bind(&customer.created_at)
2097 .bind(&customer.updated_at)
2098 .execute(&self.pool)
2099 .await?;
2100
2101 Ok(())
2102 }
2103
2104 pub async fn get_customer(&self, customer_id: &str) -> Result<Option<Customer>, StorageError> {
2106 let row = sqlx::query(
2107 r#"
2108 SELECT id, agent_did, schema_type, given_name, family_name, display_name,
2109 legal_name, lei_code, mcc_code, address_country, address_locality,
2110 postal_code, street_address, profile, ivms101_data, verified_at,
2111 created_at, updated_at
2112 FROM customers
2113 WHERE id = ?1
2114 "#,
2115 )
2116 .bind(customer_id)
2117 .fetch_optional(&self.pool)
2118 .await?;
2119
2120 match row {
2121 Some(row) => Ok(Some(Customer {
2122 id: row.get("id"),
2123 agent_did: row.get("agent_did"),
2124 schema_type: SchemaType::try_from(row.get::<String, _>("schema_type").as_str())
2125 .map_err(StorageError::InvalidTransactionType)?,
2126 given_name: row.get("given_name"),
2127 family_name: row.get("family_name"),
2128 display_name: row.get("display_name"),
2129 legal_name: row.get("legal_name"),
2130 lei_code: row.get("lei_code"),
2131 mcc_code: row.get("mcc_code"),
2132 address_country: row.get("address_country"),
2133 address_locality: row.get("address_locality"),
2134 postal_code: row.get("postal_code"),
2135 street_address: row.get("street_address"),
2136 profile: serde_json::from_str(&row.get::<String, _>("profile"))?,
2137 ivms101_data: row
2138 .get::<Option<String>, _>("ivms101_data")
2139 .map(|v| serde_json::from_str(&v))
2140 .transpose()?,
2141 verified_at: row.get("verified_at"),
2142 created_at: row.get("created_at"),
2143 updated_at: row.get("updated_at"),
2144 })),
2145 None => Ok(None),
2146 }
2147 }
2148
2149 pub async fn get_customer_by_identifier(
2151 &self,
2152 identifier: &str,
2153 ) -> Result<Option<Customer>, StorageError> {
2154 let row = sqlx::query_as::<_, (String,)>(
2155 r#"
2156 SELECT customer_id
2157 FROM customer_identifiers
2158 WHERE id = ?1
2159 "#,
2160 )
2161 .bind(identifier)
2162 .fetch_optional(&self.pool)
2163 .await?;
2164
2165 match row {
2166 Some((customer_id,)) => self.get_customer(&customer_id).await,
2167 None => Ok(None),
2168 }
2169 }
2170
2171 pub async fn list_customers(
2173 &self,
2174 agent_did: &str,
2175 limit: u32,
2176 offset: u32,
2177 ) -> Result<Vec<Customer>, StorageError> {
2178 let rows = sqlx::query(
2179 r#"
2180 SELECT id, agent_did, schema_type, given_name, family_name, display_name,
2181 legal_name, lei_code, mcc_code, address_country, address_locality,
2182 postal_code, street_address, profile, ivms101_data, verified_at,
2183 created_at, updated_at
2184 FROM customers
2185 WHERE agent_did = ?1
2186 ORDER BY updated_at DESC
2187 LIMIT ?2 OFFSET ?3
2188 "#,
2189 )
2190 .bind(agent_did)
2191 .bind(limit)
2192 .bind(offset)
2193 .fetch_all(&self.pool)
2194 .await?;
2195
2196 let mut customers = Vec::new();
2197 for row in rows {
2198 customers.push(Customer {
2199 id: row.get("id"),
2200 agent_did: row.get("agent_did"),
2201 schema_type: SchemaType::try_from(row.get::<String, _>("schema_type").as_str())
2202 .map_err(StorageError::InvalidTransactionType)?,
2203 given_name: row.get("given_name"),
2204 family_name: row.get("family_name"),
2205 display_name: row.get("display_name"),
2206 legal_name: row.get("legal_name"),
2207 lei_code: row.get("lei_code"),
2208 mcc_code: row.get("mcc_code"),
2209 address_country: row.get("address_country"),
2210 address_locality: row.get("address_locality"),
2211 postal_code: row.get("postal_code"),
2212 street_address: row.get("street_address"),
2213 profile: serde_json::from_str(&row.get::<String, _>("profile"))?,
2214 ivms101_data: row
2215 .get::<Option<String>, _>("ivms101_data")
2216 .map(|v| serde_json::from_str(&v))
2217 .transpose()?,
2218 verified_at: row.get("verified_at"),
2219 created_at: row.get("created_at"),
2220 updated_at: row.get("updated_at"),
2221 });
2222 }
2223
2224 Ok(customers)
2225 }
2226
2227 pub async fn add_customer_identifier(
2229 &self,
2230 identifier: &CustomerIdentifier,
2231 ) -> Result<(), StorageError> {
2232 sqlx::query(
2233 r#"
2234 INSERT INTO customer_identifiers (
2235 id, customer_id, identifier_type, verified, verification_method,
2236 verified_at, created_at
2237 ) VALUES (
2238 ?1, ?2, ?3, ?4, ?5, ?6, ?7
2239 ) ON CONFLICT(id, customer_id) DO UPDATE SET
2240 verified = excluded.verified,
2241 verification_method = excluded.verification_method,
2242 verified_at = excluded.verified_at
2243 "#,
2244 )
2245 .bind(&identifier.id)
2246 .bind(&identifier.customer_id)
2247 .bind(identifier.identifier_type.to_string())
2248 .bind(identifier.verified)
2249 .bind(&identifier.verification_method)
2250 .bind(&identifier.verified_at)
2251 .bind(&identifier.created_at)
2252 .execute(&self.pool)
2253 .await?;
2254
2255 Ok(())
2256 }
2257
2258 pub async fn get_customer_identifiers(
2260 &self,
2261 customer_id: &str,
2262 ) -> Result<Vec<CustomerIdentifier>, StorageError> {
2263 let rows = sqlx::query_as::<
2264 _,
2265 (
2266 String,
2267 String,
2268 String,
2269 bool,
2270 Option<String>,
2271 Option<String>,
2272 String,
2273 ),
2274 >(
2275 r#"
2276 SELECT id, customer_id, identifier_type, verified, verification_method,
2277 verified_at, created_at
2278 FROM customer_identifiers
2279 WHERE customer_id = ?1
2280 "#,
2281 )
2282 .bind(customer_id)
2283 .fetch_all(&self.pool)
2284 .await?;
2285
2286 let mut identifiers = Vec::new();
2287 for (
2288 id,
2289 customer_id,
2290 identifier_type,
2291 verified,
2292 verification_method,
2293 verified_at,
2294 created_at,
2295 ) in rows
2296 {
2297 identifiers.push(CustomerIdentifier {
2298 id,
2299 customer_id,
2300 identifier_type: IdentifierType::try_from(identifier_type.as_str())
2301 .map_err(StorageError::InvalidTransactionType)?,
2302 verified,
2303 verification_method,
2304 verified_at,
2305 created_at,
2306 });
2307 }
2308
2309 Ok(identifiers)
2310 }
2311
2312 pub async fn add_customer_relationship(
2314 &self,
2315 relationship: &CustomerRelationship,
2316 ) -> Result<(), StorageError> {
2317 sqlx::query(
2318 r#"
2319 INSERT INTO customer_relationships (
2320 id, customer_id, relationship_type, related_identifier,
2321 proof, confirmed_at, created_at
2322 ) VALUES (
2323 ?1, ?2, ?3, ?4, ?5, ?6, ?7
2324 ) ON CONFLICT(customer_id, relationship_type, related_identifier) DO UPDATE SET
2325 proof = excluded.proof,
2326 confirmed_at = excluded.confirmed_at
2327 "#,
2328 )
2329 .bind(&relationship.id)
2330 .bind(&relationship.customer_id)
2331 .bind(&relationship.relationship_type)
2332 .bind(&relationship.related_identifier)
2333 .bind(
2334 relationship
2335 .proof
2336 .as_ref()
2337 .map(serde_json::to_string)
2338 .transpose()?,
2339 )
2340 .bind(&relationship.confirmed_at)
2341 .bind(&relationship.created_at)
2342 .execute(&self.pool)
2343 .await?;
2344
2345 Ok(())
2346 }
2347
2348 pub async fn get_customer_relationships(
2350 &self,
2351 customer_id: &str,
2352 ) -> Result<Vec<CustomerRelationship>, StorageError> {
2353 let rows = sqlx::query_as::<
2354 _,
2355 (
2356 String,
2357 String,
2358 String,
2359 String,
2360 Option<String>,
2361 Option<String>,
2362 String,
2363 ),
2364 >(
2365 r#"
2366 SELECT id, customer_id, relationship_type, related_identifier,
2367 proof, confirmed_at, created_at
2368 FROM customer_relationships
2369 WHERE customer_id = ?1
2370 "#,
2371 )
2372 .bind(customer_id)
2373 .fetch_all(&self.pool)
2374 .await?;
2375
2376 let mut relationships = Vec::new();
2377 for (
2378 id,
2379 customer_id,
2380 relationship_type,
2381 related_identifier,
2382 proof,
2383 confirmed_at,
2384 created_at,
2385 ) in rows
2386 {
2387 relationships.push(CustomerRelationship {
2388 id,
2389 customer_id,
2390 relationship_type,
2391 related_identifier,
2392 proof: proof.map(|v| serde_json::from_str(&v)).transpose()?,
2393 confirmed_at,
2394 created_at,
2395 });
2396 }
2397
2398 Ok(relationships)
2399 }
2400
2401 pub async fn search_customers(
2403 &self,
2404 agent_did: &str,
2405 query: &str,
2406 limit: u32,
2407 ) -> Result<Vec<Customer>, StorageError> {
2408 let escaped_query = query
2410 .replace('\\', "\\\\")
2411 .replace('%', "\\%")
2412 .replace('_', "\\_");
2413 let search_pattern = format!("%{}%", escaped_query);
2414
2415 let rows = sqlx::query(
2416 r#"
2417 SELECT DISTINCT c.id, c.agent_did, c.schema_type, c.given_name, c.family_name, c.display_name,
2418 c.legal_name, c.lei_code, c.mcc_code, c.address_country, c.address_locality,
2419 c.postal_code, c.street_address, c.profile, c.ivms101_data, c.verified_at,
2420 c.created_at, c.updated_at
2421 FROM customers c
2422 LEFT JOIN customer_identifiers ci ON c.id = ci.customer_id
2423 WHERE c.agent_did = ?1
2424 AND (
2425 c.given_name LIKE ?2 ESCAPE '\'
2426 OR c.family_name LIKE ?2 ESCAPE '\'
2427 OR c.display_name LIKE ?2 ESCAPE '\'
2428 OR c.legal_name LIKE ?2 ESCAPE '\'
2429 OR ci.id LIKE ?2 ESCAPE '\'
2430 )
2431 ORDER BY c.updated_at DESC
2432 LIMIT ?3
2433 "#,
2434 )
2435 .bind(agent_did)
2436 .bind(&search_pattern)
2437 .bind(limit)
2438 .fetch_all(&self.pool)
2439 .await?;
2440
2441 let mut customers = Vec::new();
2442 for row in rows {
2443 customers.push(Customer {
2444 id: row.get("id"),
2445 agent_did: row.get("agent_did"),
2446 schema_type: SchemaType::try_from(row.get::<String, _>("schema_type").as_str())
2447 .map_err(StorageError::InvalidTransactionType)?,
2448 given_name: row.get("given_name"),
2449 family_name: row.get("family_name"),
2450 display_name: row.get("display_name"),
2451 legal_name: row.get("legal_name"),
2452 lei_code: row.get("lei_code"),
2453 mcc_code: row.get("mcc_code"),
2454 address_country: row.get("address_country"),
2455 address_locality: row.get("address_locality"),
2456 postal_code: row.get("postal_code"),
2457 street_address: row.get("street_address"),
2458 profile: serde_json::from_str(&row.get::<String, _>("profile"))?,
2459 ivms101_data: row
2460 .get::<Option<String>, _>("ivms101_data")
2461 .map(|v| serde_json::from_str(&v))
2462 .transpose()?,
2463 verified_at: row.get("verified_at"),
2464 created_at: row.get("created_at"),
2465 updated_at: row.get("updated_at"),
2466 });
2467 }
2468
2469 Ok(customers)
2470 }
2471
2472 pub async fn insert_decision(
2490 &self,
2491 transaction_id: &str,
2492 agent_did: &str,
2493 decision_type: DecisionType,
2494 context_json: &serde_json::Value,
2495 ) -> Result<i64, StorageError> {
2496 debug!(
2497 "Inserting decision for transaction {} agent {} type {}",
2498 transaction_id, agent_did, decision_type
2499 );
2500
2501 let context_str = serde_json::to_string(context_json)?;
2502
2503 let result = sqlx::query(
2504 r#"
2505 INSERT INTO decision_log (transaction_id, agent_did, decision_type, context_json)
2506 VALUES (?1, ?2, ?3, ?4)
2507 "#,
2508 )
2509 .bind(transaction_id)
2510 .bind(agent_did)
2511 .bind(decision_type.to_string())
2512 .bind(&context_str)
2513 .execute(&self.pool)
2514 .await?;
2515
2516 Ok(result.last_insert_rowid())
2517 }
2518
2519 pub async fn update_decision_status(
2533 &self,
2534 decision_id: i64,
2535 status: DecisionStatus,
2536 resolution: Option<&str>,
2537 resolution_detail: Option<&serde_json::Value>,
2538 ) -> Result<(), StorageError> {
2539 debug!("Updating decision {} status to {}", decision_id, status);
2540
2541 let now = chrono::Utc::now().to_rfc3339();
2542 let resolution_detail_str = resolution_detail.map(serde_json::to_string).transpose()?;
2543
2544 let delivered_at = if status == DecisionStatus::Delivered {
2545 Some(now.clone())
2546 } else {
2547 None
2548 };
2549
2550 let resolved_at = if status == DecisionStatus::Resolved {
2551 Some(now)
2552 } else {
2553 None
2554 };
2555
2556 sqlx::query(
2557 r#"
2558 UPDATE decision_log
2559 SET status = ?1,
2560 resolution = COALESCE(?2, resolution),
2561 resolution_detail = COALESCE(?3, resolution_detail),
2562 delivered_at = COALESCE(?4, delivered_at),
2563 resolved_at = COALESCE(?5, resolved_at)
2564 WHERE id = ?6
2565 "#,
2566 )
2567 .bind(status.to_string())
2568 .bind(resolution)
2569 .bind(resolution_detail_str.as_deref())
2570 .bind(delivered_at)
2571 .bind(resolved_at)
2572 .bind(decision_id)
2573 .execute(&self.pool)
2574 .await?;
2575
2576 Ok(())
2577 }
2578
2579 pub async fn list_decisions(
2593 &self,
2594 agent_did: Option<&str>,
2595 status: Option<DecisionStatus>,
2596 since_id: Option<i64>,
2597 limit: u32,
2598 ) -> Result<Vec<DecisionLogEntry>, StorageError> {
2599 let mut query = String::from(
2600 "SELECT id, transaction_id, agent_did, decision_type, context_json, \
2601 status, resolution, resolution_detail, created_at, delivered_at, resolved_at \
2602 FROM decision_log WHERE 1=1",
2603 );
2604 let mut bind_values: Vec<String> = Vec::new();
2605 let mut bind_i64: Option<i64> = None;
2606
2607 if let Some(did) = agent_did {
2608 query.push_str(" AND agent_did = ?");
2609 bind_values.push(did.to_string());
2610 }
2611
2612 if let Some(s) = status {
2613 query.push_str(" AND status = ?");
2614 bind_values.push(s.to_string());
2615 }
2616
2617 if let Some(id) = since_id {
2618 query.push_str(" AND id > ?");
2619 bind_i64 = Some(id);
2620 }
2621
2622 query.push_str(" ORDER BY id ASC LIMIT ?");
2623
2624 let mut sqlx_query = sqlx::query(&query);
2625
2626 for value in &bind_values {
2627 sqlx_query = sqlx_query.bind(value);
2628 }
2629
2630 if let Some(id) = bind_i64 {
2631 sqlx_query = sqlx_query.bind(id);
2632 }
2633
2634 sqlx_query = sqlx_query.bind(limit);
2635
2636 let rows = sqlx_query.fetch_all(&self.pool).await?;
2637
2638 let mut entries = Vec::new();
2639 for row in rows {
2640 entries.push(DecisionLogEntry {
2641 id: row.get("id"),
2642 transaction_id: row.get("transaction_id"),
2643 agent_did: row.get("agent_did"),
2644 decision_type: DecisionType::try_from(
2645 row.get::<String, _>("decision_type").as_str(),
2646 )
2647 .map_err(StorageError::InvalidTransactionType)?,
2648 context_json: serde_json::from_str(&row.get::<String, _>("context_json"))?,
2649 status: DecisionStatus::try_from(row.get::<String, _>("status").as_str())
2650 .map_err(StorageError::InvalidTransactionType)?,
2651 resolution: row.get("resolution"),
2652 resolution_detail: row
2653 .get::<Option<String>, _>("resolution_detail")
2654 .map(|v| serde_json::from_str(&v))
2655 .transpose()?,
2656 created_at: row.get("created_at"),
2657 delivered_at: row.get("delivered_at"),
2658 resolved_at: row.get("resolved_at"),
2659 });
2660 }
2661
2662 Ok(entries)
2663 }
2664
2665 pub async fn expire_decisions_for_transaction(
2679 &self,
2680 transaction_id: &str,
2681 ) -> Result<u64, StorageError> {
2682 debug!(
2683 "Expiring pending decisions for transaction {}",
2684 transaction_id
2685 );
2686
2687 let result = sqlx::query(
2688 r#"
2689 UPDATE decision_log
2690 SET status = 'expired'
2691 WHERE transaction_id = ?1
2692 AND status IN ('pending', 'delivered')
2693 "#,
2694 )
2695 .bind(transaction_id)
2696 .execute(&self.pool)
2697 .await?;
2698
2699 Ok(result.rows_affected())
2700 }
2701
2702 pub async fn resolve_decisions_for_transaction(
2718 &self,
2719 transaction_id: &str,
2720 action: &str,
2721 decision_type: Option<DecisionType>,
2722 ) -> Result<u64, StorageError> {
2723 debug!(
2724 "Resolving decisions for transaction {} with action: {}",
2725 transaction_id, action
2726 );
2727
2728 let result = if let Some(dt) = decision_type {
2729 sqlx::query(
2730 r#"
2731 UPDATE decision_log
2732 SET status = 'resolved',
2733 resolution = ?1,
2734 resolved_at = strftime('%Y-%m-%dT%H:%M:%SZ', 'now')
2735 WHERE transaction_id = ?2
2736 AND decision_type = ?3
2737 AND status IN ('pending', 'delivered')
2738 "#,
2739 )
2740 .bind(action)
2741 .bind(transaction_id)
2742 .bind(dt.to_string())
2743 .execute(&self.pool)
2744 .await?
2745 } else {
2746 sqlx::query(
2747 r#"
2748 UPDATE decision_log
2749 SET status = 'resolved',
2750 resolution = ?1,
2751 resolved_at = strftime('%Y-%m-%dT%H:%M:%SZ', 'now')
2752 WHERE transaction_id = ?2
2753 AND status IN ('pending', 'delivered')
2754 "#,
2755 )
2756 .bind(action)
2757 .bind(transaction_id)
2758 .execute(&self.pool)
2759 .await?
2760 };
2761
2762 Ok(result.rows_affected())
2763 }
2764
2765 pub async fn get_decision_by_id(
2777 &self,
2778 decision_id: i64,
2779 ) -> Result<Option<DecisionLogEntry>, StorageError> {
2780 let row = sqlx::query(
2781 r#"
2782 SELECT id, transaction_id, agent_did, decision_type, context_json,
2783 status, resolution, resolution_detail, created_at, delivered_at, resolved_at
2784 FROM decision_log WHERE id = ?1
2785 "#,
2786 )
2787 .bind(decision_id)
2788 .fetch_optional(&self.pool)
2789 .await?;
2790
2791 match row {
2792 Some(row) => Ok(Some(DecisionLogEntry {
2793 id: row.get("id"),
2794 transaction_id: row.get("transaction_id"),
2795 agent_did: row.get("agent_did"),
2796 decision_type: DecisionType::try_from(
2797 row.get::<String, _>("decision_type").as_str(),
2798 )
2799 .map_err(StorageError::InvalidTransactionType)?,
2800 context_json: serde_json::from_str(&row.get::<String, _>("context_json"))?,
2801 status: DecisionStatus::try_from(row.get::<String, _>("status").as_str())
2802 .map_err(StorageError::InvalidTransactionType)?,
2803 resolution: row.get("resolution"),
2804 resolution_detail: row
2805 .get::<Option<String>, _>("resolution_detail")
2806 .map(|v| serde_json::from_str(&v))
2807 .transpose()?,
2808 created_at: row.get("created_at"),
2809 delivered_at: row.get("delivered_at"),
2810 resolved_at: row.get("resolved_at"),
2811 })),
2812 None => Ok(None),
2813 }
2814 }
2815}
2816
2817#[cfg(test)]
2818mod tests {
2819 use super::*;
2820 use tap_msg::message::transfer::Transfer;
2821 use tap_msg::message::Party;
2822 use tempfile::tempdir;
2823
2824 #[tokio::test]
2825 async fn test_storage_creation() {
2826 let dir = tempdir().unwrap();
2827 let db_path = dir.path().join("test.db");
2828
2829 let _storage = Storage::new(Some(db_path)).await.unwrap();
2830 }
2832
2833 #[tokio::test]
2834 async fn test_storage_with_did() {
2835 let _ = env_logger::builder().is_test(true).try_init();
2836
2837 let dir = tempdir().unwrap();
2838 let tap_root = dir.path().to_path_buf();
2839 let agent_did = "did:web:example.com";
2840
2841 let storage = Storage::new_with_did(agent_did, Some(tap_root.clone()))
2842 .await
2843 .unwrap();
2844
2845 let expected_path = tap_root.join("did_web_example.com").join("transactions.db");
2847 assert!(
2848 expected_path.exists(),
2849 "Database file not created at expected path"
2850 );
2851
2852 let messages = storage.list_messages(10, 0, None).await.unwrap();
2854 assert_eq!(messages.len(), 0);
2855 }
2856
2857 #[tokio::test]
2858 async fn test_default_logs_dir() {
2859 let dir = tempdir().unwrap();
2860 let tap_root = dir.path().to_path_buf();
2861
2862 let logs_dir = Storage::default_logs_dir(Some(tap_root.clone()));
2863 assert_eq!(logs_dir, tap_root.join("logs"));
2864
2865 let default_logs = Storage::default_logs_dir(None);
2867 assert!(default_logs.to_string_lossy().contains(".tap/logs"));
2868 }
2869
2870 #[tokio::test]
2871 async fn test_insert_and_retrieve_transaction() {
2872 let _ = env_logger::builder().is_test(true).try_init();
2873
2874 let dir = tempdir().unwrap();
2875 let db_path = dir.path().join("test.db");
2876 let storage = Storage::new(Some(db_path)).await.unwrap();
2877
2878 let transfer_body = Transfer {
2880 transaction_id: Some("test_transfer_123".to_string()),
2881 originator: Some(Party::new("did:example:originator")),
2882 beneficiary: Some(Party::new("did:example:beneficiary")),
2883 asset: "eip155:1/erc20:0x0000000000000000000000000000000000000000"
2884 .parse()
2885 .unwrap(),
2886 amount: "1000000000000000000".to_string(),
2887 agents: vec![],
2888 memo: None,
2889 settlement_id: None,
2890 expiry: None,
2891 transaction_value: None,
2892 connection_id: None,
2893 metadata: Default::default(),
2894 };
2895
2896 let message_id = "test_message_123";
2897 let message = PlainMessage {
2898 id: message_id.to_string(),
2899 typ: "application/didcomm-plain+json".to_string(),
2900 type_: "https://tap-protocol.io/messages/transfer/1.0".to_string(),
2901 body: serde_json::to_value(&transfer_body).unwrap(),
2902 from: "did:example:sender".to_string(),
2903 to: vec!["did:example:receiver".to_string()],
2904 thid: None,
2905 pthid: None,
2906 extra_headers: Default::default(),
2907 attachments: None,
2908 created_time: None,
2909 expires_time: None,
2910 from_prior: None,
2911 };
2912
2913 storage.insert_transaction(&message).await.unwrap();
2915
2916 let retrieved = storage.get_transaction_by_id(message_id).await.unwrap();
2918 assert!(retrieved.is_some(), "Transaction not found");
2919
2920 let tx = retrieved.unwrap();
2921 assert_eq!(tx.reference_id, message_id);
2922 assert_eq!(tx.transaction_type, TransactionType::Transfer);
2923 assert_eq!(tx.status, TransactionStatus::Pending);
2924 }
2925
2926 #[tokio::test]
2927 async fn test_log_and_retrieve_messages() {
2928 let _ = env_logger::builder().is_test(true).try_init();
2929
2930 let dir = tempdir().unwrap();
2931 let db_path = dir.path().join("test.db");
2932 let storage = Storage::new(Some(db_path)).await.unwrap();
2933
2934 let connect_message = PlainMessage {
2936 id: "msg_connect_123".to_string(),
2937 typ: "application/didcomm-plain+json".to_string(),
2938 type_: "https://tap-protocol.io/messages/connect/1.0".to_string(),
2939 body: serde_json::json!({"constraints": ["test"]}),
2940 from: "did:example:alice".to_string(),
2941 to: vec!["did:example:bob".to_string()],
2942 thid: Some("thread_123".to_string()),
2943 pthid: None,
2944 extra_headers: Default::default(),
2945 attachments: None,
2946 created_time: None,
2947 expires_time: None,
2948 from_prior: None,
2949 };
2950
2951 let authorize_message = PlainMessage {
2952 id: "msg_auth_123".to_string(),
2953 typ: "application/didcomm-plain+json".to_string(),
2954 type_: "https://tap-protocol.io/messages/authorize/1.0".to_string(),
2955 body: serde_json::json!({"transaction_id": "test_transfer_123"}),
2956 from: "did:example:bob".to_string(),
2957 to: vec!["did:example:alice".to_string()],
2958 thid: Some("thread_123".to_string()),
2959 pthid: None,
2960 extra_headers: Default::default(),
2961 attachments: None,
2962 created_time: None,
2963 expires_time: None,
2964 from_prior: None,
2965 };
2966
2967 storage
2969 .log_message(&connect_message, MessageDirection::Incoming)
2970 .await
2971 .unwrap();
2972 storage
2973 .log_message(&authorize_message, MessageDirection::Outgoing)
2974 .await
2975 .unwrap();
2976
2977 let retrieved = storage.get_message_by_id("msg_connect_123").await.unwrap();
2979 assert!(retrieved.is_some());
2980 let msg = retrieved.unwrap();
2981 assert_eq!(msg.message_id, "msg_connect_123");
2982 assert_eq!(msg.direction, MessageDirection::Incoming);
2983
2984 let all_messages = storage.list_messages(10, 0, None).await.unwrap();
2986 assert_eq!(all_messages.len(), 2);
2987
2988 let incoming_messages = storage
2990 .list_messages(10, 0, Some(MessageDirection::Incoming))
2991 .await
2992 .unwrap();
2993 assert_eq!(incoming_messages.len(), 1);
2994 assert_eq!(incoming_messages[0].message_id, "msg_connect_123");
2995
2996 storage
2998 .log_message(&connect_message, MessageDirection::Incoming)
2999 .await
3000 .unwrap();
3001 let all_messages_after = storage.list_messages(10, 0, None).await.unwrap();
3002 assert_eq!(all_messages_after.len(), 2); }
3004
3005 #[tokio::test]
3010 async fn test_insert_decision() {
3011 let storage = Storage::new_in_memory().await.unwrap();
3012
3013 let context = serde_json::json!({
3014 "transaction_state": "Received",
3015 "pending_agents": ["did:key:z6MkAgent1"],
3016 "transaction": {
3017 "type": "transfer",
3018 "asset": "eip155:1/slip44:60",
3019 "amount": "100"
3020 }
3021 });
3022
3023 let id = storage
3024 .insert_decision(
3025 "txn-001",
3026 "did:key:z6MkAgent1",
3027 DecisionType::AuthorizationRequired,
3028 &context,
3029 )
3030 .await
3031 .unwrap();
3032
3033 assert!(id > 0);
3034
3035 let entry = storage.get_decision_by_id(id).await.unwrap().unwrap();
3037 assert_eq!(entry.transaction_id, "txn-001");
3038 assert_eq!(entry.agent_did, "did:key:z6MkAgent1");
3039 assert_eq!(entry.decision_type, DecisionType::AuthorizationRequired);
3040 assert_eq!(entry.status, DecisionStatus::Pending);
3041 assert!(entry.resolution.is_none());
3042 assert!(entry.resolution_detail.is_none());
3043 assert!(entry.delivered_at.is_none());
3044 assert!(entry.resolved_at.is_none());
3045 assert_eq!(entry.context_json["transaction"]["type"], "transfer");
3046 }
3047
3048 #[tokio::test]
3049 async fn test_insert_multiple_decision_types() {
3050 let storage = Storage::new_in_memory().await.unwrap();
3051
3052 let context = serde_json::json!({"transaction_id": "txn-002"});
3053
3054 let id1 = storage
3055 .insert_decision(
3056 "txn-002",
3057 "did:key:z6MkAgent1",
3058 DecisionType::AuthorizationRequired,
3059 &context,
3060 )
3061 .await
3062 .unwrap();
3063
3064 let id2 = storage
3065 .insert_decision(
3066 "txn-002",
3067 "did:key:z6MkAgent1",
3068 DecisionType::PolicySatisfactionRequired,
3069 &context,
3070 )
3071 .await
3072 .unwrap();
3073
3074 let id3 = storage
3075 .insert_decision(
3076 "txn-003",
3077 "did:key:z6MkAgent1",
3078 DecisionType::SettlementRequired,
3079 &context,
3080 )
3081 .await
3082 .unwrap();
3083
3084 assert!(id1 < id2);
3085 assert!(id2 < id3);
3086
3087 let e1 = storage.get_decision_by_id(id1).await.unwrap().unwrap();
3088 let e2 = storage.get_decision_by_id(id2).await.unwrap().unwrap();
3089 let e3 = storage.get_decision_by_id(id3).await.unwrap().unwrap();
3090
3091 assert_eq!(e1.decision_type, DecisionType::AuthorizationRequired);
3092 assert_eq!(e2.decision_type, DecisionType::PolicySatisfactionRequired);
3093 assert_eq!(e3.decision_type, DecisionType::SettlementRequired);
3094 }
3095
3096 #[tokio::test]
3097 async fn test_update_decision_status_to_delivered() {
3098 let storage = Storage::new_in_memory().await.unwrap();
3099
3100 let context = serde_json::json!({"info": "test"});
3101 let id = storage
3102 .insert_decision(
3103 "txn-010",
3104 "did:key:z6MkAgent1",
3105 DecisionType::AuthorizationRequired,
3106 &context,
3107 )
3108 .await
3109 .unwrap();
3110
3111 storage
3113 .update_decision_status(id, DecisionStatus::Delivered, None, None)
3114 .await
3115 .unwrap();
3116
3117 let entry = storage.get_decision_by_id(id).await.unwrap().unwrap();
3118 assert_eq!(entry.status, DecisionStatus::Delivered);
3119 assert!(entry.delivered_at.is_some());
3120 assert!(entry.resolved_at.is_none());
3121 assert!(entry.resolution.is_none());
3122 }
3123
3124 #[tokio::test]
3125 async fn test_update_decision_status_to_resolved() {
3126 let storage = Storage::new_in_memory().await.unwrap();
3127
3128 let context = serde_json::json!({"info": "test"});
3129 let id = storage
3130 .insert_decision(
3131 "txn-011",
3132 "did:key:z6MkAgent1",
3133 DecisionType::AuthorizationRequired,
3134 &context,
3135 )
3136 .await
3137 .unwrap();
3138
3139 storage
3141 .update_decision_status(id, DecisionStatus::Delivered, None, None)
3142 .await
3143 .unwrap();
3144
3145 let detail = serde_json::json!({"settlement_address": "eip155:1:0xABC"});
3147 storage
3148 .update_decision_status(
3149 id,
3150 DecisionStatus::Resolved,
3151 Some("authorize"),
3152 Some(&detail),
3153 )
3154 .await
3155 .unwrap();
3156
3157 let entry = storage.get_decision_by_id(id).await.unwrap().unwrap();
3158 assert_eq!(entry.status, DecisionStatus::Resolved);
3159 assert_eq!(entry.resolution.as_deref(), Some("authorize"));
3160 assert!(entry.resolved_at.is_some());
3161 assert!(entry.delivered_at.is_some());
3162 assert_eq!(
3163 entry.resolution_detail.unwrap()["settlement_address"],
3164 "eip155:1:0xABC"
3165 );
3166 }
3167
3168 #[tokio::test]
3169 async fn test_list_decisions_all() {
3170 let storage = Storage::new_in_memory().await.unwrap();
3171
3172 let context = serde_json::json!({"info": "test"});
3173
3174 storage
3176 .insert_decision(
3177 "txn-020",
3178 "did:key:z6MkAgent1",
3179 DecisionType::AuthorizationRequired,
3180 &context,
3181 )
3182 .await
3183 .unwrap();
3184 storage
3185 .insert_decision(
3186 "txn-021",
3187 "did:key:z6MkAgent2",
3188 DecisionType::SettlementRequired,
3189 &context,
3190 )
3191 .await
3192 .unwrap();
3193 storage
3194 .insert_decision(
3195 "txn-022",
3196 "did:key:z6MkAgent1",
3197 DecisionType::PolicySatisfactionRequired,
3198 &context,
3199 )
3200 .await
3201 .unwrap();
3202
3203 let all = storage.list_decisions(None, None, None, 100).await.unwrap();
3205 assert_eq!(all.len(), 3);
3206
3207 assert!(all[0].id < all[1].id);
3209 assert!(all[1].id < all[2].id);
3210 }
3211
3212 #[tokio::test]
3213 async fn test_list_decisions_by_agent() {
3214 let storage = Storage::new_in_memory().await.unwrap();
3215
3216 let context = serde_json::json!({"info": "test"});
3217
3218 storage
3219 .insert_decision(
3220 "txn-030",
3221 "did:key:z6MkAgent1",
3222 DecisionType::AuthorizationRequired,
3223 &context,
3224 )
3225 .await
3226 .unwrap();
3227 storage
3228 .insert_decision(
3229 "txn-031",
3230 "did:key:z6MkAgent2",
3231 DecisionType::AuthorizationRequired,
3232 &context,
3233 )
3234 .await
3235 .unwrap();
3236 storage
3237 .insert_decision(
3238 "txn-032",
3239 "did:key:z6MkAgent1",
3240 DecisionType::SettlementRequired,
3241 &context,
3242 )
3243 .await
3244 .unwrap();
3245
3246 let agent1 = storage
3247 .list_decisions(Some("did:key:z6MkAgent1"), None, None, 100)
3248 .await
3249 .unwrap();
3250 assert_eq!(agent1.len(), 2);
3251
3252 let agent2 = storage
3253 .list_decisions(Some("did:key:z6MkAgent2"), None, None, 100)
3254 .await
3255 .unwrap();
3256 assert_eq!(agent2.len(), 1);
3257 }
3258
3259 #[tokio::test]
3260 async fn test_list_decisions_by_status() {
3261 let storage = Storage::new_in_memory().await.unwrap();
3262
3263 let context = serde_json::json!({"info": "test"});
3264
3265 let id1 = storage
3266 .insert_decision(
3267 "txn-040",
3268 "did:key:z6MkAgent1",
3269 DecisionType::AuthorizationRequired,
3270 &context,
3271 )
3272 .await
3273 .unwrap();
3274 let _id2 = storage
3275 .insert_decision(
3276 "txn-041",
3277 "did:key:z6MkAgent1",
3278 DecisionType::AuthorizationRequired,
3279 &context,
3280 )
3281 .await
3282 .unwrap();
3283
3284 storage
3286 .update_decision_status(id1, DecisionStatus::Delivered, None, None)
3287 .await
3288 .unwrap();
3289
3290 let pending = storage
3291 .list_decisions(None, Some(DecisionStatus::Pending), None, 100)
3292 .await
3293 .unwrap();
3294 assert_eq!(pending.len(), 1);
3295 assert_eq!(pending[0].transaction_id, "txn-041");
3296
3297 let delivered = storage
3298 .list_decisions(None, Some(DecisionStatus::Delivered), None, 100)
3299 .await
3300 .unwrap();
3301 assert_eq!(delivered.len(), 1);
3302 assert_eq!(delivered[0].transaction_id, "txn-040");
3303 }
3304
3305 #[tokio::test]
3306 async fn test_list_decisions_since_id() {
3307 let storage = Storage::new_in_memory().await.unwrap();
3308
3309 let context = serde_json::json!({"info": "test"});
3310
3311 let id1 = storage
3312 .insert_decision(
3313 "txn-050",
3314 "did:key:z6MkAgent1",
3315 DecisionType::AuthorizationRequired,
3316 &context,
3317 )
3318 .await
3319 .unwrap();
3320 let id2 = storage
3321 .insert_decision(
3322 "txn-051",
3323 "did:key:z6MkAgent1",
3324 DecisionType::AuthorizationRequired,
3325 &context,
3326 )
3327 .await
3328 .unwrap();
3329 let _id3 = storage
3330 .insert_decision(
3331 "txn-052",
3332 "did:key:z6MkAgent1",
3333 DecisionType::AuthorizationRequired,
3334 &context,
3335 )
3336 .await
3337 .unwrap();
3338
3339 let since = storage
3341 .list_decisions(None, None, Some(id1), 100)
3342 .await
3343 .unwrap();
3344 assert_eq!(since.len(), 2);
3345 assert_eq!(since[0].id, id2);
3346 }
3347
3348 #[tokio::test]
3349 async fn test_list_decisions_with_limit() {
3350 let storage = Storage::new_in_memory().await.unwrap();
3351
3352 let context = serde_json::json!({"info": "test"});
3353
3354 for i in 0..5 {
3355 storage
3356 .insert_decision(
3357 &format!("txn-06{}", i),
3358 "did:key:z6MkAgent1",
3359 DecisionType::AuthorizationRequired,
3360 &context,
3361 )
3362 .await
3363 .unwrap();
3364 }
3365
3366 let limited = storage.list_decisions(None, None, None, 3).await.unwrap();
3367 assert_eq!(limited.len(), 3);
3368 }
3369
3370 #[tokio::test]
3371 async fn test_expire_decisions_for_transaction() {
3372 let storage = Storage::new_in_memory().await.unwrap();
3373
3374 let context = serde_json::json!({"info": "test"});
3375
3376 let id1 = storage
3378 .insert_decision(
3379 "txn-070",
3380 "did:key:z6MkAgent1",
3381 DecisionType::AuthorizationRequired,
3382 &context,
3383 )
3384 .await
3385 .unwrap();
3386 let id2 = storage
3387 .insert_decision(
3388 "txn-070",
3389 "did:key:z6MkAgent2",
3390 DecisionType::AuthorizationRequired,
3391 &context,
3392 )
3393 .await
3394 .unwrap();
3395 let id3 = storage
3396 .insert_decision(
3397 "txn-071",
3398 "did:key:z6MkAgent1",
3399 DecisionType::AuthorizationRequired,
3400 &context,
3401 )
3402 .await
3403 .unwrap();
3404
3405 storage
3407 .update_decision_status(id2, DecisionStatus::Delivered, None, None)
3408 .await
3409 .unwrap();
3410
3411 let expired_count = storage
3413 .expire_decisions_for_transaction("txn-070")
3414 .await
3415 .unwrap();
3416 assert_eq!(expired_count, 2);
3417
3418 let e1 = storage.get_decision_by_id(id1).await.unwrap().unwrap();
3420 assert_eq!(e1.status, DecisionStatus::Expired);
3421
3422 let e2 = storage.get_decision_by_id(id2).await.unwrap().unwrap();
3423 assert_eq!(e2.status, DecisionStatus::Expired);
3424
3425 let e3 = storage.get_decision_by_id(id3).await.unwrap().unwrap();
3427 assert_eq!(e3.status, DecisionStatus::Pending);
3428 }
3429
3430 #[tokio::test]
3431 async fn test_expire_does_not_affect_resolved() {
3432 let storage = Storage::new_in_memory().await.unwrap();
3433
3434 let context = serde_json::json!({"info": "test"});
3435
3436 let id1 = storage
3437 .insert_decision(
3438 "txn-080",
3439 "did:key:z6MkAgent1",
3440 DecisionType::AuthorizationRequired,
3441 &context,
3442 )
3443 .await
3444 .unwrap();
3445 let id2 = storage
3446 .insert_decision(
3447 "txn-080",
3448 "did:key:z6MkAgent2",
3449 DecisionType::AuthorizationRequired,
3450 &context,
3451 )
3452 .await
3453 .unwrap();
3454
3455 storage
3457 .update_decision_status(id1, DecisionStatus::Resolved, Some("authorize"), None)
3458 .await
3459 .unwrap();
3460
3461 let expired_count = storage
3463 .expire_decisions_for_transaction("txn-080")
3464 .await
3465 .unwrap();
3466 assert_eq!(expired_count, 1); let e1 = storage.get_decision_by_id(id1).await.unwrap().unwrap();
3469 assert_eq!(e1.status, DecisionStatus::Resolved);
3470
3471 let e2 = storage.get_decision_by_id(id2).await.unwrap().unwrap();
3472 assert_eq!(e2.status, DecisionStatus::Expired);
3473 }
3474
3475 #[tokio::test]
3476 async fn test_get_decision_by_id_not_found() {
3477 let storage = Storage::new_in_memory().await.unwrap();
3478
3479 let result = storage.get_decision_by_id(99999).await.unwrap();
3480 assert!(result.is_none());
3481 }
3482
3483 #[tokio::test]
3484 async fn test_list_decisions_combined_filters() {
3485 let storage = Storage::new_in_memory().await.unwrap();
3486
3487 let context = serde_json::json!({"info": "test"});
3488
3489 let id1 = storage
3490 .insert_decision(
3491 "txn-090",
3492 "did:key:z6MkAgent1",
3493 DecisionType::AuthorizationRequired,
3494 &context,
3495 )
3496 .await
3497 .unwrap();
3498 storage
3499 .insert_decision(
3500 "txn-091",
3501 "did:key:z6MkAgent2",
3502 DecisionType::AuthorizationRequired,
3503 &context,
3504 )
3505 .await
3506 .unwrap();
3507 storage
3508 .insert_decision(
3509 "txn-092",
3510 "did:key:z6MkAgent1",
3511 DecisionType::SettlementRequired,
3512 &context,
3513 )
3514 .await
3515 .unwrap();
3516
3517 storage
3519 .update_decision_status(id1, DecisionStatus::Delivered, None, None)
3520 .await
3521 .unwrap();
3522
3523 let results = storage
3525 .list_decisions(
3526 Some("did:key:z6MkAgent1"),
3527 Some(DecisionStatus::Pending),
3528 None,
3529 100,
3530 )
3531 .await
3532 .unwrap();
3533 assert_eq!(results.len(), 1);
3534 assert_eq!(results[0].transaction_id, "txn-092");
3535 }
3536
3537 #[tokio::test]
3538 async fn test_resolve_decisions_for_transaction() {
3539 let storage = Storage::new_in_memory().await.unwrap();
3540
3541 let context = serde_json::json!({"info": "test"});
3542
3543 let id1 = storage
3545 .insert_decision(
3546 "txn-100",
3547 "did:key:z6MkAgent1",
3548 DecisionType::AuthorizationRequired,
3549 &context,
3550 )
3551 .await
3552 .unwrap();
3553 let id2 = storage
3554 .insert_decision(
3555 "txn-100",
3556 "did:key:z6MkAgent1",
3557 DecisionType::SettlementRequired,
3558 &context,
3559 )
3560 .await
3561 .unwrap();
3562
3563 let resolved = storage
3565 .resolve_decisions_for_transaction(
3566 "txn-100",
3567 "authorize",
3568 Some(DecisionType::AuthorizationRequired),
3569 )
3570 .await
3571 .unwrap();
3572 assert_eq!(resolved, 1);
3573
3574 let e1 = storage.get_decision_by_id(id1).await.unwrap().unwrap();
3575 assert_eq!(e1.status, DecisionStatus::Resolved);
3576 assert_eq!(e1.resolution.as_deref(), Some("authorize"));
3577
3578 let e2 = storage.get_decision_by_id(id2).await.unwrap().unwrap();
3580 assert_eq!(e2.status, DecisionStatus::Pending);
3581 }
3582
3583 #[tokio::test]
3584 async fn test_resolve_decisions_all_types() {
3585 let storage = Storage::new_in_memory().await.unwrap();
3586
3587 let context = serde_json::json!({"info": "test"});
3588
3589 let id1 = storage
3590 .insert_decision(
3591 "txn-101",
3592 "did:key:z6MkAgent1",
3593 DecisionType::AuthorizationRequired,
3594 &context,
3595 )
3596 .await
3597 .unwrap();
3598 let id2 = storage
3599 .insert_decision(
3600 "txn-101",
3601 "did:key:z6MkAgent1",
3602 DecisionType::SettlementRequired,
3603 &context,
3604 )
3605 .await
3606 .unwrap();
3607
3608 let resolved = storage
3610 .resolve_decisions_for_transaction("txn-101", "reject", None)
3611 .await
3612 .unwrap();
3613 assert_eq!(resolved, 2);
3614
3615 let e1 = storage.get_decision_by_id(id1).await.unwrap().unwrap();
3616 assert_eq!(e1.status, DecisionStatus::Resolved);
3617 assert_eq!(e1.resolution.as_deref(), Some("reject"));
3618
3619 let e2 = storage.get_decision_by_id(id2).await.unwrap().unwrap();
3620 assert_eq!(e2.status, DecisionStatus::Resolved);
3621 assert_eq!(e2.resolution.as_deref(), Some("reject"));
3622 }
3623
3624 #[tokio::test]
3625 async fn test_resolve_does_not_affect_already_resolved() {
3626 let storage = Storage::new_in_memory().await.unwrap();
3627
3628 let context = serde_json::json!({"info": "test"});
3629
3630 let id1 = storage
3631 .insert_decision(
3632 "txn-102",
3633 "did:key:z6MkAgent1",
3634 DecisionType::AuthorizationRequired,
3635 &context,
3636 )
3637 .await
3638 .unwrap();
3639
3640 storage
3642 .update_decision_status(id1, DecisionStatus::Resolved, Some("authorize"), None)
3643 .await
3644 .unwrap();
3645
3646 let resolved = storage
3648 .resolve_decisions_for_transaction("txn-102", "reject", None)
3649 .await
3650 .unwrap();
3651 assert_eq!(resolved, 0); let e1 = storage.get_decision_by_id(id1).await.unwrap().unwrap();
3654 assert_eq!(e1.resolution.as_deref(), Some("authorize")); }
3656}