pub mod agent;
#[cfg(feature = "storage")]
pub mod customer;
pub mod error;
pub mod event;
pub mod message;
#[cfg(feature = "storage")]
pub mod state_machine;
pub mod storage;
#[cfg(feature = "storage")]
pub mod validation;
pub use error::{Error, Result};
pub use event::logger::{EventLogger, EventLoggerConfig, LogDestination};
pub use event::{EventSubscriber, NodeEvent};
pub use message::sender::{
HttpPlainMessageSender, HttpPlainMessageSenderWithTracking, NodePlainMessageSender,
PlainMessageSender, WebSocketPlainMessageSender,
};
#[cfg(feature = "storage")]
pub use storage::{
models::{DeliveryStatus, DeliveryType},
Storage,
};
use std::sync::Arc;
use tap_agent::{Agent, TapAgent};
use tap_msg::didcomm::PlainMessage;
use crate::message::processor::PlainMessageProcessor;
use crate::message::{
CompositePlainMessageProcessor, CompositePlainMessageRouter, PlainMessageProcessorType,
PlainMessageRouterType,
};
use agent::AgentRegistry;
use event::EventBus;
use tap_agent::did::MultiResolver;
use async_trait::async_trait;
#[async_trait]
pub trait TapAgentExt {
async fn send_serialized_message(&self, message: &PlainMessage) -> Result<String>;
}
#[async_trait]
impl TapAgentExt for TapAgent {
async fn send_serialized_message(&self, message: &PlainMessage) -> Result<String> {
let json_value =
serde_json::to_value(message).map_err(|e| Error::Serialization(e.to_string()))?;
let serialized =
serde_json::to_string(&json_value).map_err(|e| Error::Serialization(e.to_string()))?;
Ok(serialized)
}
}
#[derive(Debug, Clone, Default)]
pub struct NodeConfig {
pub debug: bool,
pub max_agents: Option<usize>,
pub enable_message_logging: bool,
pub log_message_content: bool,
pub processor_pool: Option<ProcessorPoolConfig>,
pub event_logger: Option<EventLoggerConfig>,
#[cfg(feature = "storage")]
pub storage_path: Option<std::path::PathBuf>,
#[cfg(feature = "storage")]
pub agent_did: Option<String>,
#[cfg(feature = "storage")]
pub tap_root: Option<std::path::PathBuf>,
pub decision_mode: state_machine::fsm::DecisionMode,
}
#[derive(Clone)]
pub struct TapNode {
agents: Arc<AgentRegistry>,
event_bus: Arc<EventBus>,
incoming_processor: CompositePlainMessageProcessor,
outgoing_processor: CompositePlainMessageProcessor,
router: CompositePlainMessageRouter,
resolver: Arc<MultiResolver>,
processor_pool: Option<ProcessorPool>,
config: NodeConfig,
#[cfg(feature = "storage")]
storage: Option<Arc<storage::Storage>>,
#[cfg(feature = "storage")]
agent_storage_manager: Option<Arc<storage::AgentStorageManager>>,
#[cfg(feature = "storage")]
state_processor: Option<Arc<state_machine::StandardTransactionProcessor>>,
}
impl TapNode {
pub fn new(config: NodeConfig) -> Self {
let agents = Arc::new(AgentRegistry::new(config.max_agents));
let event_bus = Arc::new(EventBus::new());
let default_router = PlainMessageRouterType::Default(DefaultPlainMessageRouter::new());
let router = CompositePlainMessageRouter::new(vec![default_router]);
let logging_processor = PlainMessageProcessorType::Logging(LoggingPlainMessageProcessor);
let validation_processor =
PlainMessageProcessorType::Validation(ValidationPlainMessageProcessor);
let trust_ping_processor = PlainMessageProcessorType::TrustPing(
TrustPingProcessor::with_event_bus(event_bus.clone()),
);
let default_processor = PlainMessageProcessorType::Default(DefaultPlainMessageProcessor);
let incoming_processor = CompositePlainMessageProcessor::new(vec![
logging_processor.clone(),
validation_processor.clone(),
trust_ping_processor.clone(),
default_processor.clone(),
]);
let outgoing_processor = CompositePlainMessageProcessor::new(vec![
logging_processor,
validation_processor,
trust_ping_processor,
default_processor,
]);
let resolver = Arc::new(MultiResolver::default());
#[cfg(feature = "storage")]
let storage = None;
#[cfg(feature = "storage")]
let agent_storage_manager = Some(Arc::new(storage::AgentStorageManager::new(
config.tap_root.clone(),
)));
#[cfg(feature = "storage")]
let state_processor = None;
let node = Self {
agents,
event_bus,
incoming_processor,
outgoing_processor,
router,
resolver,
processor_pool: None,
config,
#[cfg(feature = "storage")]
storage,
#[cfg(feature = "storage")]
agent_storage_manager,
#[cfg(feature = "storage")]
state_processor,
};
if let Some(logger_config) = &node.config.event_logger {
let event_logger = Arc::new(EventLogger::new(logger_config.clone()));
let event_bus = node.event_bus.clone();
tokio::task::block_in_place(|| {
tokio::runtime::Handle::current().block_on(async {
event_bus.subscribe(event_logger).await;
})
});
}
node
}
#[cfg(feature = "storage")]
pub async fn init_storage(&mut self) -> Result<()> {
let storage = if let Some(agent_did) = &self.config.agent_did {
match storage::Storage::new_with_did(agent_did, self.config.tap_root.clone()).await {
Ok(s) => s,
Err(e) => {
log::error!("Failed to initialize storage with DID: {}", e);
return Err(Error::Storage(e.to_string()));
}
}
} else if let Some(storage_path) = self.config.storage_path.clone() {
match storage::Storage::new(Some(storage_path)).await {
Ok(s) => s,
Err(e) => {
log::error!("Failed to initialize storage: {}", e);
return Err(Error::Storage(e.to_string()));
}
}
} else {
match storage::Storage::new(None).await {
Ok(s) => s,
Err(e) => {
log::error!("Failed to initialize storage: {}", e);
return Err(Error::Storage(e.to_string()));
}
}
};
let storage_arc = Arc::new(storage);
let message_status_handler = Arc::new(event::handlers::MessageStatusHandler::new(
storage_arc.clone(),
));
self.event_bus.subscribe(message_status_handler).await;
let transaction_state_handler = Arc::new(event::handlers::TransactionStateHandler::new(
storage_arc.clone(),
));
self.event_bus.subscribe(transaction_state_handler).await;
let transaction_audit_handler = Arc::new(event::handlers::TransactionAuditHandler::new());
self.event_bus.subscribe(transaction_audit_handler).await;
let state_processor = Arc::new(state_machine::StandardTransactionProcessor::new(
storage_arc.clone(),
self.event_bus.clone(),
self.agents.clone(),
self.config.decision_mode.clone(),
));
self.storage = Some(storage_arc);
self.state_processor = Some(state_processor);
Ok(())
}
pub async fn start(&mut self, config: ProcessorPoolConfig) -> Result<()> {
let processor_pool = ProcessorPool::new(config);
self.processor_pool = Some(processor_pool);
Ok(())
}
pub async fn receive_message(&self, message: serde_json::Value) -> Result<()> {
self.receive_message_from_source(message, storage::SourceType::Internal, None)
.await
}
pub async fn receive_message_from_source(
&self,
message: serde_json::Value,
source_type: storage::SourceType,
source_identifier: Option<&str>,
) -> Result<()> {
let raw_message = serde_json::to_string(&message).ok();
#[cfg(feature = "storage")]
let mut received_ids: Vec<(String, i64)> = Vec::new();
use tap_agent::{verify_jws, Jwe, Jws};
let is_encrypted =
message.get("protected").is_some() && message.get("recipients").is_some();
let is_signed = message.get("payload").is_some()
&& (message.get("signatures").is_some() || message.get("signature").is_some());
#[cfg(feature = "storage")]
async fn store_received_for_agent(
storage_manager: &storage::AgentStorageManager,
agent_did: &str,
raw_message: &str,
source_type: &storage::SourceType,
source_identifier: Option<&str>,
) -> Option<i64> {
match storage_manager.get_agent_storage(agent_did).await {
Ok(agent_storage) => {
match agent_storage
.create_received(raw_message, source_type.clone(), source_identifier)
.await
{
Ok(id) => {
log::debug!(
"Created received record {} for agent {} for incoming message",
id,
agent_did
);
Some(id)
}
Err(e) => {
log::warn!(
"Failed to create received record for agent {}: {}",
agent_did,
e
);
None
}
}
}
Err(e) => {
log::warn!("Failed to get storage for agent {}: {}", agent_did, e);
None
}
}
}
if is_signed {
let jws: Jws = serde_json::from_value(message)
.map_err(|e| Error::Serialization(format!("Failed to parse JWS: {}", e)))?;
let plain_message = verify_jws(&jws, &*self.resolver)
.await
.map_err(|e| Error::Verification(format!("JWS verification failed: {}", e)))?;
#[cfg(feature = "storage")]
{
if let Some(ref storage_manager) = self.agent_storage_manager {
let empty_msg = "{}".to_string();
let raw_msg = raw_message.as_ref().unwrap_or(&empty_msg);
for recipient_did in &plain_message.to {
if self.agents.has_agent(recipient_did) {
if let Some(id) = store_received_for_agent(
storage_manager,
recipient_did,
raw_msg,
&source_type,
source_identifier,
)
.await
{
received_ids.push((recipient_did.clone(), id));
}
}
}
if received_ids.is_empty() && self.agents.has_agent(&plain_message.from) {
if let Some(id) = store_received_for_agent(
storage_manager,
&plain_message.from,
raw_msg,
&source_type,
source_identifier,
)
.await
{
received_ids.push((plain_message.from.clone(), id));
}
}
}
}
let result = self.process_plain_message(plain_message).await;
#[cfg(feature = "storage")]
{
if let Some(ref storage_manager) = self.agent_storage_manager {
let status = if result.is_ok() {
storage::ReceivedStatus::Processed
} else {
storage::ReceivedStatus::Failed
};
let error_msg = result.as_ref().err().map(|e| e.to_string());
for (agent_did, received_id) in &received_ids {
if let Ok(agent_storage) =
storage_manager.get_agent_storage(agent_did).await
{
let _ = agent_storage
.update_received_status(
*received_id,
status.clone(),
None, error_msg.as_deref(),
)
.await;
}
}
}
}
result
} else if is_encrypted {
let jwe: Jwe = serde_json::from_value(message.clone())
.map_err(|e| Error::Serialization(format!("Failed to parse JWE: {}", e)))?;
#[cfg(feature = "storage")]
{
if let Some(ref storage_manager) = self.agent_storage_manager {
let empty_msg = "{}".to_string();
let raw_msg = raw_message.as_ref().unwrap_or(&empty_msg);
for recipient in &jwe.recipients {
if let Some(did) = recipient.header.kid.split('#').next() {
if self.agents.has_agent(did) {
if let Some(id) = store_received_for_agent(
storage_manager,
did,
raw_msg,
&source_type,
source_identifier,
)
.await
{
received_ids.push((did.to_string(), id));
}
}
}
}
}
}
let mut processed = false;
for recipient in &jwe.recipients {
if let Some(did) = recipient.header.kid.split('#').next() {
if let Ok(agent) = self.agents.get_agent(did).await {
match agent.receive_encrypted_message(&message).await {
Ok(_) => {
processed = true;
log::debug!(
"Agent {} successfully processed encrypted message",
did
);
}
Err(e) => {
log::debug!(
"Agent {} couldn't process encrypted message: {}",
did,
e
);
}
}
}
}
}
let result = if !processed {
Err(Error::Processing(
"No agent could process the encrypted message".to_string(),
))
} else {
Ok(())
};
#[cfg(feature = "storage")]
{
if let Some(ref storage_manager) = self.agent_storage_manager {
let status = if result.is_ok() {
storage::ReceivedStatus::Processed
} else {
storage::ReceivedStatus::Failed
};
let error_msg = result.as_ref().err().map(|e| e.to_string());
for (agent_did, received_id) in &received_ids {
if let Ok(agent_storage) =
storage_manager.get_agent_storage(agent_did).await
{
let _ = agent_storage
.update_received_status(
*received_id,
status.clone(),
None, error_msg.as_deref(),
)
.await;
}
}
}
}
result
} else {
let plain_message: PlainMessage = serde_json::from_value(message).map_err(|e| {
Error::Serialization(format!("Failed to parse PlainMessage: {}", e))
})?;
#[cfg(feature = "storage")]
{
if let Some(ref storage_manager) = self.agent_storage_manager {
let empty_msg = "{}".to_string();
let raw_msg = raw_message.as_ref().unwrap_or(&empty_msg);
for recipient_did in &plain_message.to {
if self.agents.has_agent(recipient_did) {
if let Some(id) = store_received_for_agent(
storage_manager,
recipient_did,
raw_msg,
&source_type,
source_identifier,
)
.await
{
received_ids.push((recipient_did.clone(), id));
}
}
}
if received_ids.is_empty() && self.agents.has_agent(&plain_message.from) {
if let Some(id) = store_received_for_agent(
storage_manager,
&plain_message.from,
raw_msg,
&source_type,
source_identifier,
)
.await
{
received_ids.push((plain_message.from.clone(), id));
}
}
}
}
let result = self.process_plain_message(plain_message).await;
#[cfg(feature = "storage")]
{
if let Some(ref storage_manager) = self.agent_storage_manager {
let status = if result.is_ok() {
storage::ReceivedStatus::Processed
} else {
storage::ReceivedStatus::Failed
};
let error_msg = result.as_ref().err().map(|e| e.to_string());
for (agent_did, received_id) in &received_ids {
if let Ok(agent_storage) =
storage_manager.get_agent_storage(agent_did).await
{
let _ = agent_storage
.update_received_status(
*received_id,
status.clone(),
None, error_msg.as_deref(),
)
.await;
}
}
}
}
result
}
}
async fn process_plain_message(&self, message: PlainMessage) -> Result<()> {
#[cfg(feature = "storage")]
{
if let Some(ref storage) = self.storage {
let validator_config = validation::StandardValidatorConfig {
max_timestamp_drift_secs: 60,
storage: storage.clone(),
};
let validator = validation::create_standard_validator(validator_config).await;
use crate::validation::{MessageValidator, ValidationResult};
match validator.validate(&message).await {
ValidationResult::Accept => {
self.event_bus
.publish_message_accepted(
message.id.clone(),
message.type_.clone(),
message.from.clone(),
message.to.first().cloned().unwrap_or_default(),
)
.await;
}
ValidationResult::Reject(reason) => {
self.event_bus
.publish_message_rejected(
message.id.clone(),
reason.clone(),
message.from.clone(),
message.to.first().cloned().unwrap_or_default(),
)
.await;
return Err(Error::Validation(reason));
}
}
}
}
#[cfg(feature = "storage")]
{
if let Some(ref state_processor) = self.state_processor {
use crate::state_machine::TransactionStateProcessor;
if let Err(e) = state_processor.process_message(&message).await {
log::warn!("State processor error: {}", e);
}
}
}
#[cfg(feature = "storage")]
{
if let Some(ref storage_manager) = self.agent_storage_manager {
let message_type_lower = message.type_.to_lowercase();
let is_transaction = message_type_lower.contains("transfer")
|| message_type_lower.contains("payment");
log::debug!(
"Message type: {}, is_transaction: {}",
message.type_,
is_transaction
);
if is_transaction {
let involved_agents = self.extract_transaction_agents(&message);
if involved_agents.is_empty() {
log::warn!("No registered agents found for transaction: {}", message.id);
} else {
log::debug!(
"Storing transaction {} in {} agent databases",
message.id,
involved_agents.len()
);
for agent_did in &involved_agents {
if let Ok(agent_storage) =
storage_manager.get_agent_storage(agent_did).await
{
match agent_storage
.log_message(&message, storage::MessageDirection::Incoming)
.await
{
Ok(_) => log::debug!(
"Logged incoming message to agent {}: {}",
agent_did,
message.id
),
Err(e) => log::warn!(
"Failed to log incoming message for agent {}: {}",
agent_did,
e
),
}
match agent_storage.insert_transaction(&message).await {
Ok(()) => {
log::debug!(
"Stored transaction for agent {}: {}",
agent_did,
message.id
);
if let Ok(Some(tx)) =
agent_storage.get_transaction_by_id(&message.id).await
{
self.event_bus
.publish_event(NodeEvent::TransactionCreated {
transaction: tx,
agent_did: agent_did.clone(),
})
.await;
}
}
Err(e) => log::warn!(
"Failed to store transaction for agent {}: {}",
agent_did,
e
),
}
} else {
log::warn!("Failed to get storage for agent: {}", agent_did);
}
}
}
} else {
let mut logged_to_any = false;
for recipient_did in &message.to {
if self.agents.has_agent(recipient_did) {
if let Ok(agent_storage) =
storage_manager.get_agent_storage(recipient_did).await
{
match agent_storage
.log_message(&message, storage::MessageDirection::Incoming)
.await
{
Ok(_) => {
log::debug!(
"Logged incoming message to recipient {}: {}",
recipient_did,
message.id
);
logged_to_any = true;
}
Err(e) => log::warn!(
"Failed to log incoming message for recipient {}: {}",
recipient_did,
e
),
}
} else {
log::warn!(
"Failed to get storage for recipient: {}",
recipient_did
);
}
}
}
if !logged_to_any {
match self.determine_message_agent(&message) {
Ok(agent_did) => {
if let Ok(agent_storage) =
storage_manager.get_agent_storage(&agent_did).await
{
match agent_storage
.log_message(
&message,
storage::MessageDirection::Incoming,
)
.await
{
Ok(_) => log::debug!(
"Logged incoming message to fallback agent {}: {}",
agent_did,
message.id
),
Err(e) => log::warn!(
"Failed to log incoming message for fallback agent {}: {}",
agent_did,
e
),
}
} else {
log::warn!(
"Failed to get storage for fallback agent: {}",
agent_did
);
}
}
Err(e) => {
log::warn!(
"Failed to determine fallback agent for message storage: {}",
e
);
if let Some(ref storage) = self.storage {
let _ = storage
.log_message(&message, storage::MessageDirection::Incoming)
.await;
}
}
}
}
}
}
}
let processed_message = match self.incoming_processor.process_incoming(message).await? {
Some(msg) => msg,
None => return Ok(()), };
let mut delivery_success = false;
for recipient_did in &processed_message.to {
match self.agents.get_agent(recipient_did).await {
Ok(agent) => {
#[cfg(feature = "storage")]
let delivery_id = if let Some(ref storage_manager) = self.agent_storage_manager
{
if let Ok(agent_storage) =
storage_manager.get_agent_storage(recipient_did).await
{
let message_text = serde_json::to_string(&processed_message)
.unwrap_or_else(|_| "Failed to serialize message".to_string());
match agent_storage
.create_delivery(
&processed_message.id,
&message_text,
recipient_did,
None, storage::models::DeliveryType::Internal,
)
.await
{
Ok(id) => {
log::debug!(
"Created internal delivery record {} for message {} to {}",
id,
processed_message.id,
recipient_did
);
Some(id)
}
Err(e) => {
log::warn!("Failed to create internal delivery record: {}", e);
None
}
}
} else {
None
}
} else {
None
};
match agent.receive_plain_message(processed_message.clone()).await {
Ok(_) => {
log::debug!(
"Successfully delivered message to agent: {}",
recipient_did
);
delivery_success = true;
#[cfg(feature = "storage")]
if let (Some(delivery_id), Some(ref storage_manager)) =
(delivery_id, &self.agent_storage_manager)
{
if let Ok(agent_storage) =
storage_manager.get_agent_storage(recipient_did).await
{
if let Err(e) = agent_storage
.update_delivery_status(
delivery_id,
storage::models::DeliveryStatus::Success,
None, None, )
.await
{
log::warn!("Failed to update internal delivery record to success: {}", e);
}
}
}
}
Err(e) => {
log::warn!("Agent {} failed to process message: {}", recipient_did, e);
#[cfg(feature = "storage")]
if let (Some(delivery_id), Some(ref storage_manager)) =
(delivery_id, &self.agent_storage_manager)
{
if let Ok(agent_storage) =
storage_manager.get_agent_storage(recipient_did).await
{
if let Err(e2) = agent_storage
.update_delivery_status(
delivery_id,
storage::models::DeliveryStatus::Failed,
None, Some(&e.to_string()), )
.await
{
log::warn!("Failed to update internal delivery record to failed: {}", e2);
}
}
}
}
}
}
Err(e) => {
log::debug!(
"No registered agent found for recipient {}: {}",
recipient_did,
e
);
}
}
}
if !delivery_success {
let target_did = match self.router.route_message(&processed_message).await {
Ok(did) => did,
Err(e) => {
log::warn!("Unable to route message and no recipients processed: {}", e);
return Ok(());
}
};
let agent = match self.agents.get_agent(&target_did).await {
Ok(a) => a,
Err(e) => {
log::warn!("Failed to get agent for dispatch: {}", e);
return Ok(());
}
};
#[cfg(feature = "storage")]
let delivery_id = if let Some(ref storage_manager) = self.agent_storage_manager {
if let Ok(agent_storage) = storage_manager.get_agent_storage(&target_did).await {
let message_text = serde_json::to_string(&processed_message)
.unwrap_or_else(|_| "Failed to serialize message".to_string());
match agent_storage
.create_delivery(
&processed_message.id,
&message_text,
&target_did,
None, storage::models::DeliveryType::Internal,
)
.await
{
Ok(id) => {
log::debug!(
"Created internal delivery record {} for routed message {} to {}",
id,
processed_message.id,
target_did
);
Some(id)
}
Err(e) => {
log::warn!(
"Failed to create internal delivery record for routing: {}",
e
);
None
}
}
} else {
None
}
} else {
None
};
match agent.receive_plain_message(processed_message).await {
Ok(_) => {
log::debug!("Successfully routed message to agent: {}", target_did);
#[cfg(feature = "storage")]
if let (Some(delivery_id), Some(ref storage_manager)) =
(delivery_id, &self.agent_storage_manager)
{
if let Ok(agent_storage) =
storage_manager.get_agent_storage(&target_did).await
{
if let Err(e) = agent_storage
.update_delivery_status(
delivery_id,
storage::models::DeliveryStatus::Success,
None, None, )
.await
{
log::warn!(
"Failed to update routed delivery record to success: {}",
e
);
}
}
}
}
Err(e) => {
log::warn!("Agent failed to process message: {}", e);
#[cfg(feature = "storage")]
if let (Some(delivery_id), Some(ref storage_manager)) =
(delivery_id, &self.agent_storage_manager)
{
if let Ok(agent_storage) =
storage_manager.get_agent_storage(&target_did).await
{
if let Err(e2) = agent_storage
.update_delivery_status(
delivery_id,
storage::models::DeliveryStatus::Failed,
None, Some(&e.to_string()), )
.await
{
log::warn!(
"Failed to update routed delivery record to failed: {}",
e2
);
}
}
}
}
}
}
Ok(())
}
pub async fn dispatch_message(&self, target_did: String, message: PlainMessage) -> Result<()> {
let agent = self.agents.get_agent(&target_did).await?;
let packed = agent.send_serialized_message(&message).await?;
self.event_bus
.publish_agent_message(target_did, packed.into_bytes())
.await;
Ok(())
}
pub async fn send_message(&self, sender_did: String, message: PlainMessage) -> Result<String> {
#[cfg(feature = "storage")]
{
if let Some(ref storage_manager) = self.agent_storage_manager {
let message_type_lower = message.type_.to_lowercase();
let is_transaction = message_type_lower.contains("transfer")
|| message_type_lower.contains("payment");
log::debug!(
"Message type: {}, is_transaction: {}",
message.type_,
is_transaction
);
if is_transaction {
let involved_agents = self.extract_transaction_agents(&message);
if involved_agents.is_empty() {
log::warn!(
"No registered agents found for outgoing transaction: {}",
message.id
);
} else {
log::debug!(
"Storing outgoing transaction {} in {} agent databases",
message.id,
involved_agents.len()
);
for agent_did in &involved_agents {
if let Ok(agent_storage) =
storage_manager.get_agent_storage(agent_did).await
{
match agent_storage
.log_message(&message, storage::MessageDirection::Outgoing)
.await
{
Ok(_) => log::debug!(
"Logged outgoing message to agent {}: {}",
agent_did,
message.id
),
Err(e) => log::warn!(
"Failed to log outgoing message for agent {}: {}",
agent_did,
e
),
}
match agent_storage.insert_transaction(&message).await {
Ok(()) => {
log::debug!(
"Stored outgoing transaction for agent {}: {}",
agent_did,
message.id
);
if let Ok(Some(tx)) =
agent_storage.get_transaction_by_id(&message.id).await
{
self.event_bus
.publish_event(NodeEvent::TransactionCreated {
transaction: tx,
agent_did: agent_did.clone(),
})
.await;
}
}
Err(e) => log::warn!(
"Failed to store outgoing transaction for agent {}: {}",
agent_did,
e
),
}
} else {
log::warn!("Failed to get storage for agent: {}", agent_did);
}
}
}
} else {
if let Ok(sender_storage) = storage_manager.get_agent_storage(&sender_did).await
{
match sender_storage
.log_message(&message, storage::MessageDirection::Outgoing)
.await
{
Ok(_) => log::debug!(
"Logged outgoing message for agent {}: {}",
sender_did,
message.id
),
Err(e) => log::warn!(
"Failed to log outgoing message for agent {}: {}",
sender_did,
e
),
}
} else {
log::warn!("Failed to get storage for sender agent: {}", sender_did);
if let Some(ref storage) = self.storage {
let _ = storage
.log_message(&message, storage::MessageDirection::Outgoing)
.await;
}
}
}
}
}
let processed_message = match self.outgoing_processor.process_outgoing(message).await? {
Some(msg) => msg,
None => {
return Err(Error::MessageDropped(
"PlainMessage dropped during processing".to_string(),
))
}
};
let agent = self.agents.get_agent(&sender_did).await?;
let key_manager = agent.key_manager();
use tap_agent::message::SecurityMode;
let security_mode = if processed_message.type_.contains("credential")
|| processed_message.type_.contains("transfer")
|| processed_message.type_.contains("payment")
{
SecurityMode::AuthCrypt
} else {
SecurityMode::Signed
};
let sender_kid = agent.get_signing_kid().await?;
let recipient_kid =
if processed_message.to.len() == 1 && security_mode == SecurityMode::AuthCrypt {
Some(agent.get_encryption_kid(&processed_message.to[0]).await?)
} else {
None
};
use tap_agent::message_packing::{PackOptions, Packable};
let pack_options = PackOptions {
security_mode,
sender_kid: Some(sender_kid),
recipient_kid,
};
let packed = processed_message.pack(&**key_manager, pack_options).await?;
let mut delivery_errors = Vec::new();
for recipient_did in &processed_message.to {
log::debug!("Processing delivery to recipient: {}", recipient_did);
let is_internal_recipient = self.agents.get_agent(recipient_did).await.is_ok();
if is_internal_recipient {
log::debug!("Delivering message internally to agent: {}", recipient_did);
#[cfg(feature = "storage")]
let delivery_id = if let Some(ref storage_manager) = self.agent_storage_manager {
if let Ok(sender_storage) = storage_manager.get_agent_storage(&sender_did).await
{
match sender_storage
.create_delivery(
&processed_message.id,
&packed, recipient_did,
None, storage::models::DeliveryType::Internal,
)
.await
{
Ok(id) => {
log::debug!(
"Created internal delivery record {} for message {} to {}",
id,
processed_message.id,
recipient_did
);
Some(id)
}
Err(e) => {
log::warn!("Failed to create internal delivery record: {}", e);
None
}
}
} else {
None
}
} else {
None
};
let message_value = match serde_json::from_str::<serde_json::Value>(&packed) {
Ok(val) => val,
Err(e) => {
log::error!("Failed to parse packed message as JSON: {}", e);
continue;
}
};
match self
.receive_message_from_source(
message_value,
storage::SourceType::Internal,
Some(&sender_did),
)
.await
{
Ok(_) => {
log::debug!(
"Successfully delivered message internally to: {}",
recipient_did
);
#[cfg(feature = "storage")]
if let (Some(delivery_id), Some(ref storage_manager)) =
(delivery_id, &self.agent_storage_manager)
{
if let Ok(sender_storage) =
storage_manager.get_agent_storage(&sender_did).await
{
if let Err(e) = sender_storage
.update_delivery_status(
delivery_id,
storage::models::DeliveryStatus::Success,
None, None, )
.await
{
log::warn!("Failed to update internal delivery status: {}", e);
}
}
}
}
Err(e) => {
log::error!(
"Failed to deliver message internally to {}: {}",
recipient_did,
e
);
#[cfg(feature = "storage")]
if let (Some(delivery_id), Some(ref storage_manager)) =
(delivery_id, &self.agent_storage_manager)
{
if let Ok(sender_storage) =
storage_manager.get_agent_storage(&sender_did).await
{
if let Err(e2) = sender_storage
.update_delivery_status(
delivery_id,
storage::models::DeliveryStatus::Failed,
None, Some(&format!("Internal delivery failed: {}", e)),
)
.await
{
log::warn!("Failed to update internal delivery status: {}", e2);
}
}
}
delivery_errors.push((recipient_did.clone(), e));
continue; }
}
} else {
log::debug!("Attempting external delivery to: {}", recipient_did);
let sender_agent = self.agents.get_agent(&sender_did).await?;
let endpoint = match sender_agent.get_service_endpoint(recipient_did).await? {
Some(ep) => ep,
None => {
log::warn!(
"No service endpoint found for {}, delivery failed",
recipient_did
);
#[cfg(feature = "storage")]
if let Some(ref storage_manager) = self.agent_storage_manager {
if let Ok(sender_storage) =
storage_manager.get_agent_storage(&sender_did).await
{
if let Ok(delivery_id) = sender_storage
.create_delivery(
&processed_message.id,
&packed,
recipient_did,
None,
storage::models::DeliveryType::Https,
)
.await
{
let _ = sender_storage
.update_delivery_status(
delivery_id,
storage::models::DeliveryStatus::Failed,
None,
Some("No service endpoint found for recipient"),
)
.await;
}
}
}
delivery_errors.push((
recipient_did.clone(),
Error::Dispatch(format!(
"No service endpoint found for recipient: {}",
recipient_did
)),
));
continue; }
};
#[cfg(feature = "storage")]
let delivery_id = if let Some(ref storage_manager) = self.agent_storage_manager {
if let Ok(sender_storage) = storage_manager.get_agent_storage(&sender_did).await
{
match sender_storage
.create_delivery(
&processed_message.id,
&packed, recipient_did,
Some(&endpoint),
storage::models::DeliveryType::Https,
)
.await
{
Ok(id) => {
log::debug!(
"Created external delivery record {} for message {} to {} at {}",
id,
processed_message.id,
recipient_did,
endpoint
);
Some(id)
}
Err(e) => {
log::warn!("Failed to create external delivery record: {}", e);
None
}
}
} else {
None
}
} else {
None
};
match sender_agent.send_to_endpoint(&packed, &endpoint).await {
Ok(status_code) => {
log::debug!(
"Successfully delivered message {} to {} at {} (HTTP {})",
processed_message.id,
recipient_did,
endpoint,
status_code
);
#[cfg(feature = "storage")]
if let (Some(delivery_id), Some(ref storage_manager)) =
(delivery_id, &self.agent_storage_manager)
{
if let Ok(sender_storage) =
storage_manager.get_agent_storage(&sender_did).await
{
if let Err(e) = sender_storage
.update_delivery_status(
delivery_id,
storage::models::DeliveryStatus::Success,
Some(status_code as i32),
None,
)
.await
{
log::warn!(
"Failed to update external delivery status to success: {}",
e
);
}
}
}
}
Err(e) => {
log::error!(
"Failed to deliver message {} to {} at {}: {}",
processed_message.id,
recipient_did,
endpoint,
e
);
#[cfg(feature = "storage")]
if let (Some(delivery_id), Some(ref storage_manager)) =
(delivery_id, &self.agent_storage_manager)
{
if let Ok(sender_storage) =
storage_manager.get_agent_storage(&sender_did).await
{
let error_msg = e.to_string();
let http_status_code = if error_msg.contains("status:") {
error_msg
.split("status:")
.nth(1)
.and_then(|s| s.split_whitespace().next())
.and_then(|s| s.parse::<i32>().ok())
} else {
None
};
if let Err(e2) = sender_storage
.update_delivery_status(
delivery_id,
storage::models::DeliveryStatus::Failed,
http_status_code,
Some(&error_msg),
)
.await
{
log::warn!(
"Failed to update external delivery status to failed: {}",
e2
);
}
if let Err(e2) = sender_storage
.increment_delivery_retry_count(delivery_id)
.await
{
log::warn!("Failed to increment retry count: {}", e2);
}
}
}
delivery_errors.push((
recipient_did.clone(),
Error::Dispatch(format!(
"HTTP delivery failed for {}: {}",
recipient_did, e
)),
));
continue; }
}
}
}
if !delivery_errors.is_empty() && delivery_errors.len() == processed_message.to.len() {
return Err(Error::Dispatch(format!(
"Failed to deliver message to all recipients: {:?}",
delivery_errors
)));
}
if !delivery_errors.is_empty() {
log::warn!(
"Message delivered to {}/{} recipients. Failures: {:?}",
processed_message.to.len() - delivery_errors.len(),
processed_message.to.len(),
delivery_errors
);
}
self.event_bus
.publish_agent_message(sender_did, packed.clone().into_bytes())
.await;
Ok(packed)
}
pub async fn register_agent(&self, agent: Arc<TapAgent>) -> Result<()> {
let agent_did = agent.get_agent_did().to_string();
#[cfg(feature = "storage")]
{
if let Some(ref storage_manager) = self.agent_storage_manager {
match storage_manager.ensure_agent_storage(&agent_did).await {
Ok(_) => {
log::info!("Initialized storage for agent: {}", agent_did);
if let Ok(agent_storage) =
storage_manager.get_agent_storage(&agent_did).await
{
let customer_handler =
Arc::new(event::customer_handler::CustomerEventHandler::new(
agent_storage,
agent_did.clone(),
));
self.event_bus.subscribe(customer_handler).await;
log::debug!(
"Registered customer event handler for agent: {}",
agent_did
);
}
}
Err(e) => {
log::warn!(
"Failed to initialize storage for agent {}: {}",
agent_did,
e
);
}
}
}
}
self.agents.register_agent(agent_did.clone(), agent).await?;
self.event_bus.publish_agent_registered(agent_did).await;
Ok(())
}
pub async fn unregister_agent(&self, did: &str) -> Result<()> {
self.agents.unregister_agent(did).await?;
self.event_bus
.publish_agent_unregistered(did.to_string())
.await;
Ok(())
}
pub fn list_agents(&self) -> Vec<String> {
self.agents.get_all_dids()
}
pub fn agents(&self) -> &Arc<AgentRegistry> {
&self.agents
}
pub fn event_bus(&self) -> &Arc<EventBus> {
&self.event_bus
}
pub fn resolver(&self) -> &Arc<MultiResolver> {
&self.resolver
}
pub fn processor_pool_mut(&mut self) -> &mut Option<ProcessorPool> {
&mut self.processor_pool
}
pub fn config(&self) -> &NodeConfig {
&self.config
}
pub fn set_decision_mode(&mut self, mode: state_machine::fsm::DecisionMode) {
self.config.decision_mode = mode;
}
#[cfg(feature = "storage")]
pub fn storage(&self) -> Option<&Arc<storage::Storage>> {
self.storage.as_ref()
}
#[cfg(feature = "storage")]
pub fn agent_storage_manager(&self) -> Option<&Arc<storage::AgentStorageManager>> {
self.agent_storage_manager.as_ref()
}
#[cfg(feature = "storage")]
pub async fn set_storage(&mut self, storage: storage::Storage) -> Result<()> {
let storage_arc = Arc::new(storage);
let message_status_handler = Arc::new(event::handlers::MessageStatusHandler::new(
storage_arc.clone(),
));
self.event_bus.subscribe(message_status_handler).await;
let transaction_state_handler = Arc::new(event::handlers::TransactionStateHandler::new(
storage_arc.clone(),
));
self.event_bus.subscribe(transaction_state_handler).await;
let transaction_audit_handler = Arc::new(event::handlers::TransactionAuditHandler::new());
self.event_bus.subscribe(transaction_audit_handler).await;
let state_processor = Arc::new(state_machine::StandardTransactionProcessor::new(
storage_arc.clone(),
self.event_bus.clone(),
self.agents.clone(),
self.config.decision_mode.clone(),
));
self.storage = Some(storage_arc);
self.state_processor = Some(state_processor);
Ok(())
}
#[cfg(feature = "storage")]
fn determine_message_agent(&self, message: &PlainMessage) -> Result<String> {
for recipient in &message.to {
if self.agents.has_agent(recipient) {
return Ok(recipient.clone());
}
}
if self.agents.has_agent(&message.from) {
return Ok(message.from.clone());
}
if !message.to.is_empty() {
return Ok(message.to[0].clone());
}
Err(Error::Storage(
"Cannot determine agent for message storage".to_string(),
))
}
#[cfg(feature = "storage")]
fn extract_transaction_agents(&self, message: &PlainMessage) -> Vec<String> {
use std::collections::HashSet;
let mut agent_dids = HashSet::new();
log::debug!("Extracting transaction agents for message: {}", message.id);
agent_dids.insert(message.from.clone());
log::debug!("Added sender: {}", message.from);
for recipient in &message.to {
agent_dids.insert(recipient.clone());
log::debug!("Added recipient: {}", recipient);
}
let message_type_lower = message.type_.to_lowercase();
log::debug!("Message type: {}", message_type_lower);
if message_type_lower.contains("transfer") {
if let Ok(transfer) =
serde_json::from_value::<tap_msg::message::Transfer>(message.body.clone())
{
if let Some(originator) = &transfer.originator {
agent_dids.insert(originator.id.clone());
log::debug!("Added originator: {}", originator.id);
}
if let Some(beneficiary) = &transfer.beneficiary {
agent_dids.insert(beneficiary.id.clone());
log::debug!("Added beneficiary: {}", beneficiary.id);
}
for agent in &transfer.agents {
agent_dids.insert(agent.id.clone());
log::debug!("Added agent: {}", agent.id);
}
} else {
log::warn!("Failed to parse Transfer message body");
}
} else if message_type_lower.contains("payment") {
if let Ok(payment) =
serde_json::from_value::<tap_msg::message::Payment>(message.body.clone())
{
agent_dids.insert(payment.merchant.id.clone());
log::debug!("Added merchant: {}", payment.merchant.id);
if let Some(customer) = &payment.customer {
agent_dids.insert(customer.id.clone());
log::debug!("Added customer: {}", customer.id);
}
for agent in &payment.agents {
agent_dids.insert(agent.id.clone());
log::debug!("Added agent: {}", agent.id);
}
} else {
log::warn!("Failed to parse Payment message body");
}
}
log::debug!("Total unique agents found: {}", agent_dids.len());
let registered_agents: Vec<String> = agent_dids
.into_iter()
.filter(|did| {
let is_registered = self.agents.has_agent(did);
log::debug!("Agent {} registered: {}", did, is_registered);
is_registered
})
.collect();
log::debug!(
"Registered agents involved in transaction: {:?}",
registered_agents
);
registered_agents
}
}
use message::processor::DefaultPlainMessageProcessor;
use message::processor::LoggingPlainMessageProcessor;
use message::processor::ValidationPlainMessageProcessor;
use message::processor_pool::{ProcessorPool, ProcessorPoolConfig};
use message::router::DefaultPlainMessageRouter;
use message::trust_ping_processor::TrustPingProcessor;
use message::RouterAsyncExt;