pub struct TapNode { /* private fields */ }
Expand description
§The TAP Node
The TAP Node is the core component responsible for coordinating message processing, routing, and delivery to TAP Agents. It serves as a central hub for all TAP communications and transaction coordination.
§Core Responsibilities
- Agent Management: Registration and deregistration of TAP Agents
- PlainMessage Processing: Processing incoming and outgoing messages through middleware chains
- PlainMessage Routing: Determining the appropriate recipient for each message
- Event Publishing: Broadcasting node events to subscribers
- Scalability: Managing concurrent message processing through worker pools
§Lifecycle
- Create a node with appropriate configuration
- Register one or more agents with the node
- Start the processor pool (if high throughput is required)
- Process incoming/outgoing messages
- Publish and respond to events
§Thread Safety
The TapNode
is designed to be thread-safe and can be shared across multiple
threads using an Arc<TapNode>
. All internal mutability is handled through
appropriate synchronization primitives.
Implementations§
Source§impl TapNode
impl TapNode
Sourcepub fn new(config: NodeConfig) -> Self
pub fn new(config: NodeConfig) -> Self
Create a new TAP node with the given configuration
Sourcepub async fn init_storage(&mut self) -> Result<()>
pub async fn init_storage(&mut self) -> Result<()>
Initialize storage asynchronously
Sourcepub async fn start(&mut self, config: ProcessorPoolConfig) -> Result<()>
pub async fn start(&mut self, config: ProcessorPoolConfig) -> Result<()>
Start the node
Sourcepub async fn receive_message(&self, message: Value) -> Result<()>
pub async fn receive_message(&self, message: Value) -> Result<()>
Receive and process an incoming message
This method handles the complete lifecycle of an incoming message:
- Determining the message type (plain, signed, or encrypted)
- Verifying signatures or routing to agents for decryption
- Processing the resulting plain messages through the pipeline
- Routing and dispatching to the appropriate agents
§Parameters
message
- The message as a JSON Value (can be plain, JWS, or JWE)
§Returns
Ok(())
if the message was successfully processedErr(Error)
if there was an error during processing
Sourcepub async fn receive_message_from_source(
&self,
message: Value,
source_type: SourceType,
source_identifier: Option<&str>,
) -> Result<()>
pub async fn receive_message_from_source( &self, message: Value, source_type: SourceType, source_identifier: Option<&str>, ) -> Result<()>
Receive and process an incoming message with source information
This method handles the complete lifecycle of an incoming message with tracking of where the message came from.
§Parameters
message
- The message as a JSON Value (can be plain, JWS, or JWE)source_type
- The type of source (https, internal, websocket, etc.)source_identifier
- Optional identifier for the source (URL, agent DID, etc.)
§Returns
Ok(())
if the message was successfully processedErr(Error)
if there was an error during processing
Sourcepub async fn dispatch_message(
&self,
target_did: String,
message: PlainMessage,
) -> Result<()>
pub async fn dispatch_message( &self, target_did: String, message: PlainMessage, ) -> Result<()>
Dispatch a message to an agent by DID
Sourcepub async fn send_message(
&self,
sender_did: String,
message: PlainMessage,
) -> Result<String>
pub async fn send_message( &self, sender_did: String, message: PlainMessage, ) -> Result<String>
Send a message to an agent
This method now includes comprehensive delivery tracking and actual message delivery. For internal recipients (registered agents), messages are delivered directly. For external recipients, messages are delivered via HTTP with tracking.
Sourcepub async fn register_agent(&self, agent: Arc<TapAgent>) -> Result<()>
pub async fn register_agent(&self, agent: Arc<TapAgent>) -> Result<()>
Register a new agent with the node
This method registers an agent with the TAP Node and automatically initializes DID-specific storage for the agent. The storage directory structure follows:
~/.tap/{sanitized_did}/transactions.db
(default){tap_root}/{sanitized_did}/transactions.db
(if custom TAP root is configured)
§Storage Initialization
When an agent is registered, a dedicated SQLite database is created for that agent’s DID. This ensures transaction isolation between different agents while maintaining a consistent storage structure. If storage initialization fails, the agent registration continues but a warning is logged.
§Arguments
agent
- The TapAgent to register with the node
§Returns
Ok(())
if the agent was successfully registeredErr(Error)
if agent registration fails
Sourcepub async fn unregister_agent(&self, did: &str) -> Result<()>
pub async fn unregister_agent(&self, did: &str) -> Result<()>
Unregister an agent from the node
Sourcepub fn list_agents(&self) -> Vec<String>
pub fn list_agents(&self) -> Vec<String>
Get a list of registered agent DIDs
Sourcepub fn agents(&self) -> &Arc<AgentRegistry>
pub fn agents(&self) -> &Arc<AgentRegistry>
Get a reference to the agent registry
Sourcepub fn resolver(&self) -> &Arc<MultiResolver>
pub fn resolver(&self) -> &Arc<MultiResolver>
Get a reference to the resolver
Sourcepub fn processor_pool_mut(&mut self) -> &mut Option<ProcessorPool>
pub fn processor_pool_mut(&mut self) -> &mut Option<ProcessorPool>
Get a mutable reference to the processor pool
This is a reference to Option<ProcessorPool>
to allow starting the pool after node creation
Sourcepub fn config(&self) -> &NodeConfig
pub fn config(&self) -> &NodeConfig
Get the node configuration
Sourcepub fn agent_storage_manager(&self) -> Option<&Arc<AgentStorageManager>>
pub fn agent_storage_manager(&self) -> Option<&Arc<AgentStorageManager>>
Get a reference to the agent storage manager (if available)
Sourcepub async fn set_storage(&mut self, storage: Storage) -> Result<()>
pub async fn set_storage(&mut self, storage: Storage) -> Result<()>
Set storage for testing purposes This allows injecting in-memory databases for complete test isolation
Trait Implementations§
Auto Trait Implementations§
impl Freeze for TapNode
impl !RefUnwindSafe for TapNode
impl Send for TapNode
impl Sync for TapNode
impl Unpin for TapNode
impl !UnwindSafe for TapNode
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> CloneToUninit for Twhere
T: Clone,
impl<T> CloneToUninit for Twhere
T: Clone,
Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self
into a Left
variant of Either<Self, Self>
if into_left
is true
.
Converts self
into a Right
variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self
into a Left
variant of Either<Self, Self>
if into_left(&self)
returns true
.
Converts self
into a Right
variant of Either<Self, Self>
otherwise. Read more