pub struct Storage { /* private fields */ }
Expand description
Storage backend for TAP transactions and message audit trail
This struct provides the main interface for storing and retrieving TAP data from a SQLite database. It maintains two separate tables:
transactions
: For Transfer and Payment messages requiring business logicmessages
: For complete audit trail of all messages
It uses sqlx’s built-in connection pooling for efficient concurrent access and provides a native async API.
§Example
use tap_node::storage::{Storage, MessageDirection};
use std::path::PathBuf;
// Create storage with default path
let storage = Storage::new(None).await?;
// Create storage with DID-based path
let agent_did = "did:web:example.com";
let storage_with_did = Storage::new_with_did(agent_did, None).await?;
// Create storage with custom TAP root
let custom_root = PathBuf::from("/custom/tap/root");
let storage_custom = Storage::new_with_did(agent_did, Some(custom_root)).await?;
// Query transactions
let transactions = storage.list_transactions(10, 0).await?;
// Query audit trail
let all_messages = storage.list_messages(20, 0, None).await?;
let incoming_only = storage.list_messages(10, 0, Some(MessageDirection::Incoming)).await?;
Implementations§
Source§impl Storage
impl Storage
Sourcepub async fn new_with_did(
agent_did: &str,
tap_root: Option<PathBuf>,
) -> Result<Self, StorageError>
pub async fn new_with_did( agent_did: &str, tap_root: Option<PathBuf>, ) -> Result<Self, StorageError>
Create a new Storage instance with an agent DID
This will initialize a SQLite database in the TAP directory structure:
- Default: ~/.tap/{did}/transactions.db
- Custom root: {tap_root}/{did}/transactions.db
§Arguments
agent_did
- The DID of the agent this storage is fortap_root
- Optional custom root directory (defaults to ~/.tap)
§Errors
Returns StorageError
if:
- Database initialization fails
- Migrations fail to run
- Connection pool cannot be created
Sourcepub async fn new_in_memory() -> Result<Self, StorageError>
pub async fn new_in_memory() -> Result<Self, StorageError>
Create a new in-memory storage instance for testing This provides complete isolation between tests with no file system dependencies
Sourcepub async fn new(path: Option<PathBuf>) -> Result<Self, StorageError>
pub async fn new(path: Option<PathBuf>) -> Result<Self, StorageError>
Create a new Storage instance
This will initialize a SQLite database at the specified path (or default location), run any pending migrations, and set up a connection pool.
§Arguments
path
- Optional path to the database file. If None, usesTAP_NODE_DB_PATH
env var or defaults to./tap-node.db
§Errors
Returns StorageError
if:
- Database initialization fails
- Migrations fail to run
- Connection pool cannot be created
Sourcepub fn default_logs_dir(tap_root: Option<PathBuf>) -> PathBuf
pub fn default_logs_dir(tap_root: Option<PathBuf>) -> PathBuf
Get the default logs directory
Returns the default directory for log files:
- Default: ~/.tap/logs
- Custom root: {tap_root}/logs
§Arguments
tap_root
- Optional custom root directory (defaults to ~/.tap)
Sourcepub async fn update_message_status(
&self,
message_id: &str,
status: &str,
) -> Result<(), StorageError>
pub async fn update_message_status( &self, message_id: &str, status: &str, ) -> Result<(), StorageError>
Sourcepub async fn update_transaction_status(
&self,
transaction_id: &str,
status: &str,
) -> Result<(), StorageError>
pub async fn update_transaction_status( &self, transaction_id: &str, status: &str, ) -> Result<(), StorageError>
Sourcepub async fn get_transaction_by_id(
&self,
reference_id: &str,
) -> Result<Option<Transaction>, StorageError>
pub async fn get_transaction_by_id( &self, reference_id: &str, ) -> Result<Option<Transaction>, StorageError>
Sourcepub async fn get_transaction_by_thread_id(
&self,
thread_id: &str,
) -> Result<Option<Transaction>, StorageError>
pub async fn get_transaction_by_thread_id( &self, thread_id: &str, ) -> Result<Option<Transaction>, StorageError>
Check if an agent is authorized for a transaction
This checks the transaction_agents table to see if the given agent is associated with the transaction.
§Arguments
transaction_id
- The reference ID of the transactionagent_did
- The DID of the agent to check
§Returns
Ok(true)
if the agent is authorizedOk(false)
if the agent is not authorized or transaction doesn’t existErr(StorageError)
on database error
Sourcepub async fn insert_transaction_agent(
&self,
transaction_id: &str,
agent_did: &str,
agent_role: &str,
) -> Result<(), StorageError>
pub async fn insert_transaction_agent( &self, transaction_id: &str, agent_did: &str, agent_role: &str, ) -> Result<(), StorageError>
Sourcepub async fn update_transaction_agent_status(
&self,
transaction_id: &str,
agent_did: &str,
status: &str,
) -> Result<(), StorageError>
pub async fn update_transaction_agent_status( &self, transaction_id: &str, agent_did: &str, status: &str, ) -> Result<(), StorageError>
Sourcepub async fn get_transaction_agents(
&self,
transaction_id: &str,
) -> Result<Vec<(String, String, String)>, StorageError>
pub async fn get_transaction_agents( &self, transaction_id: &str, ) -> Result<Vec<(String, String, String)>, StorageError>
Sourcepub async fn insert_transaction(
&self,
message: &PlainMessage,
) -> Result<(), StorageError>
pub async fn insert_transaction( &self, message: &PlainMessage, ) -> Result<(), StorageError>
Insert a new transaction from a TAP message
This method extracts transaction details from a Transfer or Payment message and stores them in the database with a ‘pending’ status.
§Arguments
message
- The DIDComm PlainMessage containing a Transfer or Payment body
§Errors
Returns StorageError
if:
- The message is not a Transfer or Payment type
- Database insertion fails
- The transaction already exists (duplicate reference_id)
Sourcepub async fn list_transactions(
&self,
limit: u32,
offset: u32,
) -> Result<Vec<Transaction>, StorageError>
pub async fn list_transactions( &self, limit: u32, offset: u32, ) -> Result<Vec<Transaction>, StorageError>
Sourcepub async fn log_message(
&self,
message: &PlainMessage,
direction: MessageDirection,
) -> Result<(), StorageError>
pub async fn log_message( &self, message: &PlainMessage, direction: MessageDirection, ) -> Result<(), StorageError>
Log an incoming or outgoing message to the audit trail
This method stores any DIDComm message for audit purposes, regardless of type.
§Arguments
message
- The DIDComm PlainMessage to logdirection
- Whether the message is incoming or outgoing
§Errors
Returns StorageError
if:
- Database insertion fails
- The message already exists (duplicate message_id)
Sourcepub async fn get_message_by_id(
&self,
message_id: &str,
) -> Result<Option<Message>, StorageError>
pub async fn get_message_by_id( &self, message_id: &str, ) -> Result<Option<Message>, StorageError>
Sourcepub async fn list_messages(
&self,
limit: u32,
offset: u32,
direction: Option<MessageDirection>,
) -> Result<Vec<Message>, StorageError>
pub async fn list_messages( &self, limit: u32, offset: u32, direction: Option<MessageDirection>, ) -> Result<Vec<Message>, StorageError>
Sourcepub async fn create_delivery(
&self,
message_id: &str,
message_text: &str,
recipient_did: &str,
delivery_url: Option<&str>,
delivery_type: DeliveryType,
) -> Result<i64, StorageError>
pub async fn create_delivery( &self, message_id: &str, message_text: &str, recipient_did: &str, delivery_url: Option<&str>, delivery_type: DeliveryType, ) -> Result<i64, StorageError>
Create a new delivery record
§Arguments
message_id
- The ID of the message being deliveredmessage_text
- The full message text being deliveredrecipient_did
- The DID of the recipientdelivery_url
- Optional URL where the message is being delivereddelivery_type
- The type of delivery (https, internal, return_path, pickup)
§Returns
Ok(i64)
- The ID of the created delivery recordErr(StorageError)
on database error
Sourcepub async fn update_delivery_status(
&self,
delivery_id: i64,
status: DeliveryStatus,
http_status_code: Option<i32>,
error_message: Option<&str>,
) -> Result<(), StorageError>
pub async fn update_delivery_status( &self, delivery_id: i64, status: DeliveryStatus, http_status_code: Option<i32>, error_message: Option<&str>, ) -> Result<(), StorageError>
Update delivery status
§Arguments
delivery_id
- The ID of the delivery recordstatus
- The new status (pending, success, failed)http_status_code
- Optional HTTP status code from delivery attempterror_message
- Optional error message if delivery failed
§Returns
Ok(())
on successErr(StorageError)
on database error
Sourcepub async fn increment_delivery_retry_count(
&self,
delivery_id: i64,
) -> Result<(), StorageError>
pub async fn increment_delivery_retry_count( &self, delivery_id: i64, ) -> Result<(), StorageError>
Sourcepub async fn get_delivery_by_id(
&self,
delivery_id: i64,
) -> Result<Option<Delivery>, StorageError>
pub async fn get_delivery_by_id( &self, delivery_id: i64, ) -> Result<Option<Delivery>, StorageError>
Sourcepub async fn get_deliveries_for_message(
&self,
message_id: &str,
) -> Result<Vec<Delivery>, StorageError>
pub async fn get_deliveries_for_message( &self, message_id: &str, ) -> Result<Vec<Delivery>, StorageError>
Sourcepub async fn get_pending_deliveries(
&self,
max_retry_count: i32,
limit: u32,
) -> Result<Vec<Delivery>, StorageError>
pub async fn get_pending_deliveries( &self, max_retry_count: i32, limit: u32, ) -> Result<Vec<Delivery>, StorageError>
Sourcepub async fn get_failed_deliveries_for_recipient(
&self,
recipient_did: &str,
limit: u32,
offset: u32,
) -> Result<Vec<Delivery>, StorageError>
pub async fn get_failed_deliveries_for_recipient( &self, recipient_did: &str, limit: u32, offset: u32, ) -> Result<Vec<Delivery>, StorageError>
Sourcepub async fn get_deliveries_by_recipient(
&self,
recipient_did: &str,
limit: u32,
offset: u32,
) -> Result<Vec<Delivery>, StorageError>
pub async fn get_deliveries_by_recipient( &self, recipient_did: &str, limit: u32, offset: u32, ) -> Result<Vec<Delivery>, StorageError>
Sourcepub async fn get_deliveries_for_thread(
&self,
thread_id: &str,
limit: u32,
offset: u32,
) -> Result<Vec<Delivery>, StorageError>
pub async fn get_deliveries_for_thread( &self, thread_id: &str, limit: u32, offset: u32, ) -> Result<Vec<Delivery>, StorageError>
Get all deliveries for messages in a specific thread
§Arguments
thread_id
- The thread ID to search forlimit
- Maximum number of deliveries to returnoffset
- Number of deliveries to skip (for pagination)
§Returns
Ok(Vec<Delivery>)
- List of deliveries for messages in the threadErr(StorageError)
on database error
Sourcepub async fn create_received(
&self,
raw_message: &str,
source_type: SourceType,
source_identifier: Option<&str>,
) -> Result<i64, StorageError>
pub async fn create_received( &self, raw_message: &str, source_type: SourceType, source_identifier: Option<&str>, ) -> Result<i64, StorageError>
Create a new received message record
This records a raw incoming message (JWE, JWS, or plain JSON) before processing.
§Arguments
raw_message
- The raw message content as receivedsource_type
- The type of source (https, internal, websocket, etc.)source_identifier
- Optional identifier for the source (URL, agent DID, etc.)
§Returns
Ok(i64)
- The ID of the created recordErr(StorageError)
on database error
Sourcepub async fn update_received_status(
&self,
received_id: i64,
status: ReceivedStatus,
processed_message_id: Option<&str>,
error_message: Option<&str>,
) -> Result<(), StorageError>
pub async fn update_received_status( &self, received_id: i64, status: ReceivedStatus, processed_message_id: Option<&str>, error_message: Option<&str>, ) -> Result<(), StorageError>
Update the status of a received message
§Arguments
received_id
- The ID of the received recordstatus
- The new status (processed, failed)processed_message_id
- Optional ID of the processed message in the messages tableerror_message
- Optional error message if processing failed
§Returns
Ok(())
on successErr(StorageError)
on database error
Sourcepub async fn get_received_by_id(
&self,
received_id: i64,
) -> Result<Option<Received>, StorageError>
pub async fn get_received_by_id( &self, received_id: i64, ) -> Result<Option<Received>, StorageError>
Sourcepub async fn get_pending_received(
&self,
limit: u32,
) -> Result<Vec<Received>, StorageError>
pub async fn get_pending_received( &self, limit: u32, ) -> Result<Vec<Received>, StorageError>
Sourcepub async fn list_received(
&self,
limit: u32,
offset: u32,
source_type: Option<SourceType>,
status: Option<ReceivedStatus>,
) -> Result<Vec<Received>, StorageError>
pub async fn list_received( &self, limit: u32, offset: u32, source_type: Option<SourceType>, status: Option<ReceivedStatus>, ) -> Result<Vec<Received>, StorageError>
List received messages with optional filtering
§Arguments
limit
- Maximum number of messages to returnoffset
- Number of messages to skip (for pagination)source_type
- Optional filter by source typestatus
- Optional filter by status
§Returns
Ok(Vec<Received>)
- List of received messagesErr(StorageError)
on database error
Sourcepub async fn upsert_customer(
&self,
customer: &Customer,
) -> Result<(), StorageError>
pub async fn upsert_customer( &self, customer: &Customer, ) -> Result<(), StorageError>
Create or update a customer record
Sourcepub async fn get_customer(
&self,
customer_id: &str,
) -> Result<Option<Customer>, StorageError>
pub async fn get_customer( &self, customer_id: &str, ) -> Result<Option<Customer>, StorageError>
Get a customer by ID
Sourcepub async fn get_customer_by_identifier(
&self,
identifier: &str,
) -> Result<Option<Customer>, StorageError>
pub async fn get_customer_by_identifier( &self, identifier: &str, ) -> Result<Option<Customer>, StorageError>
Get a customer by identifier
Sourcepub async fn list_customers(
&self,
agent_did: &str,
limit: u32,
offset: u32,
) -> Result<Vec<Customer>, StorageError>
pub async fn list_customers( &self, agent_did: &str, limit: u32, offset: u32, ) -> Result<Vec<Customer>, StorageError>
List customers for an agent
Sourcepub async fn add_customer_identifier(
&self,
identifier: &CustomerIdentifier,
) -> Result<(), StorageError>
pub async fn add_customer_identifier( &self, identifier: &CustomerIdentifier, ) -> Result<(), StorageError>
Add an identifier to a customer
Sourcepub async fn get_customer_identifiers(
&self,
customer_id: &str,
) -> Result<Vec<CustomerIdentifier>, StorageError>
pub async fn get_customer_identifiers( &self, customer_id: &str, ) -> Result<Vec<CustomerIdentifier>, StorageError>
Get identifiers for a customer
Sourcepub async fn add_customer_relationship(
&self,
relationship: &CustomerRelationship,
) -> Result<(), StorageError>
pub async fn add_customer_relationship( &self, relationship: &CustomerRelationship, ) -> Result<(), StorageError>
Add a customer relationship
Sourcepub async fn get_customer_relationships(
&self,
customer_id: &str,
) -> Result<Vec<CustomerRelationship>, StorageError>
pub async fn get_customer_relationships( &self, customer_id: &str, ) -> Result<Vec<CustomerRelationship>, StorageError>
Get relationships for a customer
Sourcepub async fn search_customers(
&self,
agent_did: &str,
query: &str,
limit: u32,
) -> Result<Vec<Customer>, StorageError>
pub async fn search_customers( &self, agent_did: &str, query: &str, limit: u32, ) -> Result<Vec<Customer>, StorageError>
Search customers by name or identifier
Trait Implementations§
Auto Trait Implementations§
impl Freeze for Storage
impl !RefUnwindSafe for Storage
impl Send for Storage
impl Sync for Storage
impl Unpin for Storage
impl !UnwindSafe for Storage
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> CloneToUninit for Twhere
T: Clone,
impl<T> CloneToUninit for Twhere
T: Clone,
Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self
into a Left
variant of Either<Self, Self>
if into_left
is true
.
Converts self
into a Right
variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self
into a Left
variant of Either<Self, Self>
if into_left(&self)
returns true
.
Converts self
into a Right
variant of Either<Self, Self>
otherwise. Read more