Struct Storage

Source
pub struct Storage { /* private fields */ }
Expand description

Storage backend for TAP transactions and message audit trail

This struct provides the main interface for storing and retrieving TAP data from a SQLite database. It maintains two separate tables:

  • transactions: For Transfer and Payment messages requiring business logic
  • messages: For complete audit trail of all messages

It uses sqlx’s built-in connection pooling for efficient concurrent access and provides a native async API.

§Example

use tap_node::storage::{Storage, MessageDirection};
use std::path::PathBuf;

// Create storage with default path
let storage = Storage::new(None).await?;

// Create storage with DID-based path
let agent_did = "did:web:example.com";
let storage_with_did = Storage::new_with_did(agent_did, None).await?;

// Create storage with custom TAP root
let custom_root = PathBuf::from("/custom/tap/root");
let storage_custom = Storage::new_with_did(agent_did, Some(custom_root)).await?;

// Query transactions
let transactions = storage.list_transactions(10, 0).await?;

// Query audit trail
let all_messages = storage.list_messages(20, 0, None).await?;
let incoming_only = storage.list_messages(10, 0, Some(MessageDirection::Incoming)).await?;

Implementations§

Source§

impl Storage

Source

pub async fn new_with_did( agent_did: &str, tap_root: Option<PathBuf>, ) -> Result<Self, StorageError>

Create a new Storage instance with an agent DID

This will initialize a SQLite database in the TAP directory structure:

  • Default: ~/.tap/{did}/transactions.db
  • Custom root: {tap_root}/{did}/transactions.db
§Arguments
  • agent_did - The DID of the agent this storage is for
  • tap_root - Optional custom root directory (defaults to ~/.tap)
§Errors

Returns StorageError if:

  • Database initialization fails
  • Migrations fail to run
  • Connection pool cannot be created
Source

pub async fn new_in_memory() -> Result<Self, StorageError>

Create a new in-memory storage instance for testing This provides complete isolation between tests with no file system dependencies

Source

pub async fn new(path: Option<PathBuf>) -> Result<Self, StorageError>

Create a new Storage instance

This will initialize a SQLite database at the specified path (or default location), run any pending migrations, and set up a connection pool.

§Arguments
  • path - Optional path to the database file. If None, uses TAP_NODE_DB_PATH env var or defaults to ./tap-node.db
§Errors

Returns StorageError if:

  • Database initialization fails
  • Migrations fail to run
  • Connection pool cannot be created
Source

pub fn db_path(&self) -> &Path

Get the database path

Source

pub fn default_logs_dir(tap_root: Option<PathBuf>) -> PathBuf

Get the default logs directory

Returns the default directory for log files:

  • Default: ~/.tap/logs
  • Custom root: {tap_root}/logs
§Arguments
  • tap_root - Optional custom root directory (defaults to ~/.tap)
Source

pub async fn update_message_status( &self, message_id: &str, status: &str, ) -> Result<(), StorageError>

Update the status of a message in the messages table

§Arguments
  • message_id - The ID of the message to update
  • status - The new status (accepted, rejected, pending)
§Errors

Returns StorageError if the database update fails

Source

pub async fn update_transaction_status( &self, transaction_id: &str, status: &str, ) -> Result<(), StorageError>

Update the status of a transaction in the transactions table

§Arguments
  • transaction_id - The reference ID of the transaction to update
  • status - The new status (pending, confirmed, failed, cancelled, reverted)
§Errors

Returns StorageError if the database update fails

Source

pub async fn get_transaction_by_id( &self, reference_id: &str, ) -> Result<Option<Transaction>, StorageError>

Get a transaction by its reference ID

§Arguments
  • reference_id - The reference ID of the transaction
§Returns
  • Ok(Some(Transaction)) if found
  • Ok(None) if not found
  • Err(StorageError) on database error
Source

pub async fn get_transaction_by_thread_id( &self, thread_id: &str, ) -> Result<Option<Transaction>, StorageError>

Get a transaction by thread ID

§Arguments
  • thread_id - The thread ID to search for
§Returns
  • Ok(Some(Transaction)) if found
  • Ok(None) if not found
  • Err(StorageError) on database error
Source

pub async fn is_agent_authorized_for_transaction( &self, transaction_id: &str, agent_did: &str, ) -> Result<bool, StorageError>

Check if an agent is authorized for a transaction

This checks the transaction_agents table to see if the given agent is associated with the transaction.

§Arguments
  • transaction_id - The reference ID of the transaction
  • agent_did - The DID of the agent to check
§Returns
  • Ok(true) if the agent is authorized
  • Ok(false) if the agent is not authorized or transaction doesn’t exist
  • Err(StorageError) on database error
Source

pub async fn insert_transaction_agent( &self, transaction_id: &str, agent_did: &str, agent_role: &str, ) -> Result<(), StorageError>

Insert a transaction agent

§Arguments
  • transaction_id - The reference ID of the transaction
  • agent_did - The DID of the agent
  • agent_role - The role of the agent (sender, receiver, compliance, other)
§Returns
  • Ok(()) on success
  • Err(StorageError) on database error
Source

pub async fn update_transaction_agent_status( &self, transaction_id: &str, agent_did: &str, status: &str, ) -> Result<(), StorageError>

Update transaction agent status

§Arguments
  • transaction_id - The reference ID of the transaction
  • agent_did - The DID of the agent
  • status - The new status (pending, authorized, rejected, cancelled)
§Returns
  • Ok(()) on success
  • Err(StorageError) on database error
Source

pub async fn get_transaction_agents( &self, transaction_id: &str, ) -> Result<Vec<(String, String, String)>, StorageError>

Get all agents for a transaction

§Arguments
  • transaction_id - The reference ID of the transaction
§Returns
  • Ok(Vec<(agent_did, agent_role, status)>) on success
  • Err(StorageError) on database error
Source

pub async fn are_all_agents_authorized( &self, transaction_id: &str, ) -> Result<bool, StorageError>

Check if all agents have authorized the transaction

§Arguments
  • transaction_id - The reference ID of the transaction
§Returns
  • Ok(true) if all agents have authorized
  • Ok(false) if any agent hasn’t authorized or has rejected/cancelled
  • Err(StorageError) on database error
Source

pub async fn insert_transaction( &self, message: &PlainMessage, ) -> Result<(), StorageError>

Insert a new transaction from a TAP message

This method extracts transaction details from a Transfer or Payment message and stores them in the database with a ‘pending’ status.

§Arguments
  • message - The DIDComm PlainMessage containing a Transfer or Payment body
§Errors

Returns StorageError if:

  • The message is not a Transfer or Payment type
  • Database insertion fails
  • The transaction already exists (duplicate reference_id)
Source

pub async fn list_transactions( &self, limit: u32, offset: u32, ) -> Result<Vec<Transaction>, StorageError>

List transactions with pagination

Retrieves transactions ordered by creation time (newest first).

§Arguments
  • limit - Maximum number of transactions to return
  • offset - Number of transactions to skip (for pagination)
§Returns

A vector of transactions ordered by creation time descending

Source

pub async fn log_message( &self, message: &PlainMessage, direction: MessageDirection, ) -> Result<(), StorageError>

Log an incoming or outgoing message to the audit trail

This method stores any DIDComm message for audit purposes, regardless of type.

§Arguments
  • message - The DIDComm PlainMessage to log
  • direction - Whether the message is incoming or outgoing
§Errors

Returns StorageError if:

  • Database insertion fails
  • The message already exists (duplicate message_id)
Source

pub async fn get_message_by_id( &self, message_id: &str, ) -> Result<Option<Message>, StorageError>

Retrieve a message by its ID

§Arguments
  • message_id - The unique message ID
§Returns
  • Ok(Some(Message)) if found
  • Ok(None) if not found
  • Err(StorageError) on database error
Source

pub async fn list_messages( &self, limit: u32, offset: u32, direction: Option<MessageDirection>, ) -> Result<Vec<Message>, StorageError>

List messages with pagination and optional filtering

§Arguments
  • limit - Maximum number of messages to return
  • offset - Number of messages to skip (for pagination)
  • direction - Optional filter by message direction
§Returns

A vector of messages ordered by creation time descending

Source

pub async fn create_delivery( &self, message_id: &str, message_text: &str, recipient_did: &str, delivery_url: Option<&str>, delivery_type: DeliveryType, ) -> Result<i64, StorageError>

Create a new delivery record

§Arguments
  • message_id - The ID of the message being delivered
  • message_text - The full message text being delivered
  • recipient_did - The DID of the recipient
  • delivery_url - Optional URL where the message is being delivered
  • delivery_type - The type of delivery (https, internal, return_path, pickup)
§Returns
  • Ok(i64) - The ID of the created delivery record
  • Err(StorageError) on database error
Source

pub async fn update_delivery_status( &self, delivery_id: i64, status: DeliveryStatus, http_status_code: Option<i32>, error_message: Option<&str>, ) -> Result<(), StorageError>

Update delivery status

§Arguments
  • delivery_id - The ID of the delivery record
  • status - The new status (pending, success, failed)
  • http_status_code - Optional HTTP status code from delivery attempt
  • error_message - Optional error message if delivery failed
§Returns
  • Ok(()) on success
  • Err(StorageError) on database error
Source

pub async fn increment_delivery_retry_count( &self, delivery_id: i64, ) -> Result<(), StorageError>

Increment retry count for a delivery

§Arguments
  • delivery_id - The ID of the delivery record
§Returns
  • Ok(()) on success
  • Err(StorageError) on database error
Source

pub async fn get_delivery_by_id( &self, delivery_id: i64, ) -> Result<Option<Delivery>, StorageError>

Get delivery record by ID

§Arguments
  • delivery_id - The ID of the delivery record
§Returns
  • Ok(Some(Delivery)) if found
  • Ok(None) if not found
  • Err(StorageError) on database error
Source

pub async fn get_deliveries_for_message( &self, message_id: &str, ) -> Result<Vec<Delivery>, StorageError>

Get all deliveries for a message

§Arguments
  • message_id - The ID of the message
§Returns
  • Ok(Vec<Delivery>) - List of deliveries for the message
  • Err(StorageError) on database error
Source

pub async fn get_pending_deliveries( &self, max_retry_count: i32, limit: u32, ) -> Result<Vec<Delivery>, StorageError>

Get pending deliveries for retry processing

§Arguments
  • max_retry_count - Maximum retry count to include
  • limit - Maximum number of deliveries to return
§Returns
  • Ok(Vec<Delivery>) - List of pending deliveries
  • Err(StorageError) on database error
Source

pub async fn get_failed_deliveries_for_recipient( &self, recipient_did: &str, limit: u32, offset: u32, ) -> Result<Vec<Delivery>, StorageError>

Get failed deliveries for a specific recipient

§Arguments
  • recipient_did - The DID of the recipient
  • limit - Maximum number of deliveries to return
  • offset - Number of deliveries to skip (for pagination)
§Returns
  • Ok(Vec<Delivery>) - List of failed deliveries
  • Err(StorageError) on database error
Source

pub async fn get_deliveries_by_recipient( &self, recipient_did: &str, limit: u32, offset: u32, ) -> Result<Vec<Delivery>, StorageError>

Get all deliveries for a specific recipient

§Arguments
  • recipient_did - The DID of the recipient
  • limit - Maximum number of deliveries to return
  • offset - Number of deliveries to skip (for pagination)
§Returns
  • Ok(Vec<Delivery>) - List of deliveries
  • Err(StorageError) on database error
Source

pub async fn get_deliveries_for_thread( &self, thread_id: &str, limit: u32, offset: u32, ) -> Result<Vec<Delivery>, StorageError>

Get all deliveries for messages in a specific thread

§Arguments
  • thread_id - The thread ID to search for
  • limit - Maximum number of deliveries to return
  • offset - Number of deliveries to skip (for pagination)
§Returns
  • Ok(Vec<Delivery>) - List of deliveries for messages in the thread
  • Err(StorageError) on database error
Source

pub async fn create_received( &self, raw_message: &str, source_type: SourceType, source_identifier: Option<&str>, ) -> Result<i64, StorageError>

Create a new received message record

This records a raw incoming message (JWE, JWS, or plain JSON) before processing.

§Arguments
  • raw_message - The raw message content as received
  • source_type - The type of source (https, internal, websocket, etc.)
  • source_identifier - Optional identifier for the source (URL, agent DID, etc.)
§Returns
  • Ok(i64) - The ID of the created record
  • Err(StorageError) on database error
Source

pub async fn update_received_status( &self, received_id: i64, status: ReceivedStatus, processed_message_id: Option<&str>, error_message: Option<&str>, ) -> Result<(), StorageError>

Update the status of a received message

§Arguments
  • received_id - The ID of the received record
  • status - The new status (processed, failed)
  • processed_message_id - Optional ID of the processed message in the messages table
  • error_message - Optional error message if processing failed
§Returns
  • Ok(()) on success
  • Err(StorageError) on database error
Source

pub async fn get_received_by_id( &self, received_id: i64, ) -> Result<Option<Received>, StorageError>

Get a received message by ID

§Arguments
  • received_id - The ID of the received record
§Returns
  • Ok(Some(Received)) if found
  • Ok(None) if not found
  • Err(StorageError) on database error
Source

pub async fn get_pending_received( &self, limit: u32, ) -> Result<Vec<Received>, StorageError>

Get pending received messages for processing

§Arguments
  • limit - Maximum number of messages to return
§Returns
  • Ok(Vec<Received>) - List of pending received messages
  • Err(StorageError) on database error
Source

pub async fn list_received( &self, limit: u32, offset: u32, source_type: Option<SourceType>, status: Option<ReceivedStatus>, ) -> Result<Vec<Received>, StorageError>

List received messages with optional filtering

§Arguments
  • limit - Maximum number of messages to return
  • offset - Number of messages to skip (for pagination)
  • source_type - Optional filter by source type
  • status - Optional filter by status
§Returns
  • Ok(Vec<Received>) - List of received messages
  • Err(StorageError) on database error
Source

pub async fn upsert_customer( &self, customer: &Customer, ) -> Result<(), StorageError>

Create or update a customer record

Source

pub async fn get_customer( &self, customer_id: &str, ) -> Result<Option<Customer>, StorageError>

Get a customer by ID

Source

pub async fn get_customer_by_identifier( &self, identifier: &str, ) -> Result<Option<Customer>, StorageError>

Get a customer by identifier

Source

pub async fn list_customers( &self, agent_did: &str, limit: u32, offset: u32, ) -> Result<Vec<Customer>, StorageError>

List customers for an agent

Source

pub async fn add_customer_identifier( &self, identifier: &CustomerIdentifier, ) -> Result<(), StorageError>

Add an identifier to a customer

Source

pub async fn get_customer_identifiers( &self, customer_id: &str, ) -> Result<Vec<CustomerIdentifier>, StorageError>

Get identifiers for a customer

Source

pub async fn add_customer_relationship( &self, relationship: &CustomerRelationship, ) -> Result<(), StorageError>

Add a customer relationship

Source

pub async fn get_customer_relationships( &self, customer_id: &str, ) -> Result<Vec<CustomerRelationship>, StorageError>

Get relationships for a customer

Source

pub async fn search_customers( &self, agent_did: &str, query: &str, limit: u32, ) -> Result<Vec<Customer>, StorageError>

Search customers by name or identifier

Trait Implementations§

Source§

impl Clone for Storage

Source§

fn clone(&self) -> Storage

Returns a duplicate of the value. Read more
1.0.0 · Source§

fn clone_from(&mut self, source: &Self)

Performs copy-assignment from source. Read more
Source§

impl Debug for Storage

Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> CloneToUninit for T
where T: Clone,

Source§

unsafe fn clone_to_uninit(&self, dest: *mut u8)

🔬This is a nightly-only experimental API. (clone_to_uninit)
Performs copy-assignment from self to dest. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> IntoEither for T

Source§

fn into_either(self, into_left: bool) -> Either<Self, Self>

Converts self into a Left variant of Either<Self, Self> if into_left is true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
where F: FnOnce(&Self) -> bool,

Converts self into a Left variant of Either<Self, Self> if into_left(&self) returns true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

impl<T> Same for T

Source§

type Output = T

Should always be Self
Source§

impl<T> ToOwned for T
where T: Clone,

Source§

type Owned = T

The resulting type after obtaining ownership.
Source§

fn to_owned(&self) -> T

Creates owned data from borrowed data, usually by cloning. Read more
Source§

fn clone_into(&self, target: &mut T)

Uses borrowed data to replace owned data, usually by cloning. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

Source§

fn vzip(self) -> V

Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

impl<T> ErasedDestructor for T
where T: 'static,