use super::schema;
use super::{default_limit, error_text_response, success_text_response, ToolHandler};
use crate::error::{Error, Result};
use crate::mcp::protocol::{CallToolResult, Tool};
use crate::tap_integration::TapIntegration;
use serde::{Deserialize, Serialize};
use serde_json::{json, Value};
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use tap_msg::message::TapMessage;
use tap_node::customer::CustomerManager;
use tap_node::storage::models::{Customer, SchemaType};
use tracing::{debug, error};
pub struct ListCustomersTool {
tap_integration: Arc<TapIntegration>,
}
#[derive(Debug, Deserialize)]
struct ListCustomersParams {
agent_did: String,
#[serde(default = "default_limit")]
limit: u32,
#[serde(default)]
offset: u32,
}
#[derive(Debug, Serialize)]
struct ListCustomersResponse {
customers: Vec<CustomerInfo>,
total: usize,
}
#[derive(Debug, Serialize)]
struct CustomerInfo {
#[serde(rename = "@id")]
id: String,
metadata: HashMap<String, serde_json::Value>,
transaction_count: usize,
transaction_ids: Vec<String>,
}
impl ListCustomersTool {
pub fn new(tap_integration: Arc<TapIntegration>) -> Self {
Self { tap_integration }
}
fn tap_integration(&self) -> &TapIntegration {
&self.tap_integration
}
}
#[async_trait::async_trait]
impl ToolHandler for ListCustomersTool {
async fn handle(&self, arguments: Option<Value>) -> Result<CallToolResult> {
let params: ListCustomersParams = match arguments {
Some(args) => serde_json::from_value(args)
.map_err(|e| Error::invalid_parameter(format!("Invalid parameters: {}", e)))?,
None => {
return Ok(error_text_response(
"Missing required parameters".to_string(),
))
}
};
debug!(
"Listing customers for agent {} with limit={}, offset={}",
params.agent_did, params.limit, params.offset
);
let storage = match self
.tap_integration()
.storage_for_agent(¶ms.agent_did)
.await
{
Ok(storage) => storage,
Err(e) => {
error!(
"Failed to get storage for agent {}: {}",
params.agent_did, e
);
return Ok(error_text_response(format!(
"Failed to get storage for agent {}: {}",
params.agent_did, e
)));
}
};
let all_customers = match storage.list_customers(¶ms.agent_did, 1000, 0).await {
Ok(customers) => customers,
Err(e) => {
error!("Failed to list customers: {}", e);
return Ok(error_text_response(format!(
"Failed to list customers: {}",
e
)));
}
};
let mut customers: Vec<CustomerInfo> = Vec::new();
for customer in all_customers {
let mut metadata = HashMap::new();
if let Some(profile) = customer.profile.as_object() {
for (key, value) in profile {
if key != "@context" && key != "@type" && key != "identifier" {
metadata.insert(key.clone(), value.clone());
}
}
}
if let Some(given_name) = &customer.given_name {
metadata.insert(
"givenName".to_string(),
serde_json::Value::String(given_name.clone()),
);
}
if let Some(family_name) = &customer.family_name {
metadata.insert(
"familyName".to_string(),
serde_json::Value::String(family_name.clone()),
);
}
if let Some(display_name) = &customer.display_name {
metadata.insert(
"name".to_string(),
serde_json::Value::String(display_name.clone()),
);
}
if let Some(country) = &customer.address_country {
metadata.insert(
"addressCountry".to_string(),
serde_json::Value::String(country.clone()),
);
}
if let Some(locality) = &customer.address_locality {
metadata.insert(
"addressLocality".to_string(),
serde_json::Value::String(locality.clone()),
);
}
if let Some(postal_code) = &customer.postal_code {
metadata.insert(
"postalCode".to_string(),
serde_json::Value::String(postal_code.clone()),
);
}
let mut transaction_ids = Vec::new();
if let Ok(transactions) = storage.list_transactions(1000, 0).await {
for transaction in transactions {
if let Ok(tap_message) =
serde_json::from_value::<TapMessage>(transaction.message_json.clone())
{
let mut is_involved = false;
if let TapMessage::Transfer(ref transfer) = tap_message {
if let Some(originator) = &transfer.originator {
if originator.id == customer.id {
is_involved = true;
}
}
if let Some(ref beneficiary) = transfer.beneficiary {
if beneficiary.id == customer.id {
is_involved = true;
}
}
for agent in &transfer.agents {
if agent.for_parties().contains(&customer.id) {
is_involved = true;
}
}
}
if is_involved {
transaction_ids.push(transaction.reference_id);
}
}
}
}
customers.push(CustomerInfo {
id: customer.id,
metadata,
transaction_count: transaction_ids.len(),
transaction_ids,
});
}
let total = customers.len();
customers.sort_by(|a, b| a.id.cmp(&b.id));
let paginated_customers: Vec<CustomerInfo> = customers
.into_iter()
.skip(params.offset as usize)
.take(params.limit as usize)
.collect();
let response = ListCustomersResponse {
customers: paginated_customers,
total,
};
let response_json = serde_json::to_string_pretty(&response)
.map_err(|e| Error::tool_execution(format!("Failed to serialize response: {}", e)))?;
Ok(success_text_response(response_json))
}
fn get_definition(&self) -> Tool {
Tool {
name: "tap_list_customers".to_string(),
description: "Lists customers (parties) that a specific agent acts on behalf of. Includes metadata about each party and transaction history.".to_string(),
input_schema: schema::list_customers_schema(),
}
}
}
pub struct ListConnectionsTool {
tap_integration: Arc<TapIntegration>,
}
#[derive(Debug, Deserialize)]
struct ListConnectionsParams {
party_id: String,
#[serde(default = "default_limit")]
limit: u32,
#[serde(default)]
offset: u32,
}
#[derive(Debug, Serialize)]
struct ListConnectionsResponse {
connections: Vec<ConnectionInfo>,
total: usize,
}
#[derive(Debug, Serialize)]
struct ConnectionInfo {
#[serde(rename = "@id")]
id: String,
metadata: HashMap<String, serde_json::Value>,
transaction_count: usize,
transaction_ids: Vec<String>,
roles: Vec<String>, }
impl ListConnectionsTool {
pub fn new(tap_integration: Arc<TapIntegration>) -> Self {
Self { tap_integration }
}
fn tap_integration(&self) -> &TapIntegration {
&self.tap_integration
}
}
#[async_trait::async_trait]
impl ToolHandler for ListConnectionsTool {
async fn handle(&self, arguments: Option<Value>) -> Result<CallToolResult> {
let params: ListConnectionsParams = match arguments {
Some(args) => serde_json::from_value(args)
.map_err(|e| Error::invalid_parameter(format!("Invalid parameters: {}", e)))?,
None => {
return Ok(error_text_response(
"Missing required parameters".to_string(),
))
}
};
debug!(
"Listing connections for party {} with limit={}, offset={}",
params.party_id, params.limit, params.offset
);
let agent_infos = match self.tap_integration().list_agents().await {
Ok(agents) => agents,
Err(e) => {
error!("Failed to list agents: {}", e);
return Ok(error_text_response(format!("Failed to list agents: {}", e)));
}
};
let mut connections: HashMap<String, ConnectionInfo> = HashMap::new();
for agent_info in agent_infos {
let storage = match self
.tap_integration()
.storage_for_agent(&agent_info.id)
.await
{
Ok(storage) => storage,
Err(e) => {
debug!("Failed to get storage for agent {}: {}", agent_info.id, e);
continue;
}
};
let transactions = match storage.list_transactions(1000, 0).await {
Ok(transactions) => transactions,
Err(e) => {
debug!(
"Failed to get transactions for agent {}: {}",
agent_info.id, e
);
continue;
}
};
for transaction in transactions {
if let Ok(TapMessage::Transfer(ref transfer)) =
serde_json::from_value::<TapMessage>(transaction.message_json.clone())
{
let mut party_is_involved = false;
let mut counterparties = HashSet::new();
if let Some(originator) = &transfer.originator {
if originator.id == params.party_id {
party_is_involved = true;
if let Some(ref beneficiary) = transfer.beneficiary {
counterparties.insert(beneficiary.id.clone());
}
}
}
if let Some(ref beneficiary) = transfer.beneficiary {
if beneficiary.id == params.party_id {
party_is_involved = true;
if let Some(originator) = &transfer.originator {
counterparties.insert(originator.id.clone());
}
}
}
for agent in &transfer.agents {
if agent.for_parties().contains(¶ms.party_id) {
party_is_involved = true;
for other_agent in &transfer.agents {
if other_agent.id != agent.id {
for other_party in other_agent.for_parties() {
if other_party != ¶ms.party_id {
counterparties.insert(other_party.clone());
}
}
}
}
}
}
if party_is_involved {
for counterparty_id in counterparties {
let connection = connections
.entry(counterparty_id.clone())
.or_insert_with(|| ConnectionInfo {
id: counterparty_id.clone(),
metadata: HashMap::new(),
transaction_count: 0,
transaction_ids: Vec::new(),
roles: Vec::new(),
});
connection.transaction_count += 1;
connection
.transaction_ids
.push(transaction.reference_id.clone());
if let Some(originator) = &transfer.originator {
if counterparty_id == originator.id
&& !connection.roles.contains(&"originator".to_string())
{
connection.roles.push("originator".to_string());
}
}
if let Some(ref beneficiary) = transfer.beneficiary {
if counterparty_id == beneficiary.id
&& !connection.roles.contains(&"beneficiary".to_string())
{
connection.roles.push("beneficiary".to_string());
}
}
if let Some(originator) = &transfer.originator {
if counterparty_id == originator.id {
for (key, value) in &originator.metadata {
connection.metadata.insert(key.clone(), value.clone());
}
}
}
if let Some(ref beneficiary) = transfer.beneficiary {
if counterparty_id == beneficiary.id {
for (key, value) in &beneficiary.metadata {
connection.metadata.insert(key.clone(), value.clone());
}
}
}
}
}
}
}
}
let total = connections.len();
let mut connection_list: Vec<ConnectionInfo> = connections.into_values().collect();
connection_list.sort_by(|a, b| a.id.cmp(&b.id));
let paginated_connections: Vec<ConnectionInfo> = connection_list
.into_iter()
.skip(params.offset as usize)
.take(params.limit as usize)
.collect();
let response = ListConnectionsResponse {
connections: paginated_connections,
total,
};
let response_json = serde_json::to_string_pretty(&response)
.map_err(|e| Error::tool_execution(format!("Failed to serialize response: {}", e)))?;
Ok(success_text_response(response_json))
}
fn get_definition(&self) -> Tool {
Tool {
name: "tap_list_connections".to_string(),
description: "Lists all counterparties (connections) of a specific party. Includes metadata about each counterparty and transaction history.".to_string(),
input_schema: schema::list_connections_schema(),
}
}
}
pub struct GetCustomerDetailsTool {
tap_integration: Arc<TapIntegration>,
}
#[derive(Debug, Deserialize)]
struct GetCustomerDetailsParams {
agent_did: String,
customer_id: String,
}
#[derive(Debug, Serialize)]
struct GetCustomerDetailsResponse {
customer: Option<serde_json::Value>,
ivms101_data: Option<serde_json::Value>,
}
impl GetCustomerDetailsTool {
pub fn new(tap_integration: Arc<TapIntegration>) -> Self {
Self { tap_integration }
}
fn tap_integration(&self) -> &TapIntegration {
&self.tap_integration
}
}
#[async_trait::async_trait]
impl ToolHandler for GetCustomerDetailsTool {
async fn handle(&self, arguments: Option<Value>) -> Result<CallToolResult> {
let params: GetCustomerDetailsParams = match arguments {
Some(args) => serde_json::from_value(args)
.map_err(|e| Error::invalid_parameter(format!("Invalid parameters: {}", e)))?,
None => {
return Ok(error_text_response(
"Missing required parameters".to_string(),
))
}
};
debug!(
"Getting customer details for customer {} via agent {}",
params.customer_id, params.agent_did
);
let storage = match self
.tap_integration()
.storage_for_agent(¶ms.agent_did)
.await
{
Ok(storage) => storage,
Err(e) => {
error!(
"Failed to get storage for agent {}: {}",
params.agent_did, e
);
return Ok(error_text_response(format!(
"Failed to get storage for agent {}: {}",
params.agent_did, e
)));
}
};
let customer = match storage.get_customer(¶ms.customer_id).await {
Ok(customer) => customer,
Err(e) => {
debug!("Failed to get customer {}: {}", params.customer_id, e);
None
}
};
let response = if let Some(customer) = customer {
let customer_json = serde_json::to_value(&customer).map_err(|e| {
Error::tool_execution(format!("Failed to serialize customer: {}", e))
})?;
let profile = customer_json.get("profile").cloned();
let ivms101 = customer_json.get("ivms101_data").cloned();
GetCustomerDetailsResponse {
customer: Some(profile.unwrap_or(customer_json)),
ivms101_data: ivms101,
}
} else {
GetCustomerDetailsResponse {
customer: None,
ivms101_data: None,
}
};
let response_json = serde_json::to_string_pretty(&response)
.map_err(|e| Error::tool_execution(format!("Failed to serialize response: {}", e)))?;
Ok(success_text_response(response_json))
}
fn get_definition(&self) -> Tool {
Tool {
name: "tap_get_customer_details".to_string(),
description: "Gets detailed information about a specific customer including their profile and IVMS101 data if available.".to_string(),
input_schema: schema::get_customer_details_schema(),
}
}
}
pub struct GenerateIvms101Tool {
tap_integration: Arc<TapIntegration>,
}
#[derive(Debug, Deserialize)]
struct GenerateIvms101Params {
agent_did: String,
customer_id: String,
}
impl GenerateIvms101Tool {
pub fn new(tap_integration: Arc<TapIntegration>) -> Self {
Self { tap_integration }
}
fn tap_integration(&self) -> &TapIntegration {
&self.tap_integration
}
}
#[async_trait::async_trait]
impl ToolHandler for GenerateIvms101Tool {
async fn handle(&self, arguments: Option<Value>) -> Result<CallToolResult> {
let params: GenerateIvms101Params = match arguments {
Some(args) => serde_json::from_value(args)
.map_err(|e| Error::invalid_parameter(format!("Invalid parameters: {}", e)))?,
None => {
return Ok(error_text_response(
"Missing required parameters".to_string(),
))
}
};
debug!(
"Generating IVMS101 data for customer {} via agent {}",
params.customer_id, params.agent_did
);
let storage = match self
.tap_integration()
.storage_for_agent(¶ms.agent_did)
.await
{
Ok(storage) => storage,
Err(e) => {
error!(
"Failed to get storage for agent {}: {}",
params.agent_did, e
);
return Ok(error_text_response(format!(
"Failed to get storage for agent {}: {}",
params.agent_did, e
)));
}
};
let customer_manager = CustomerManager::new(storage);
match customer_manager
.generate_ivms101_data(¶ms.customer_id)
.await
{
Ok(ivms_data) => {
let response_json = serde_json::to_string_pretty(&ivms_data).map_err(|e| {
Error::tool_execution(format!("Failed to serialize IVMS101 data: {}", e))
})?;
Ok(success_text_response(response_json))
}
Err(e) => {
error!("Failed to generate IVMS101 data: {}", e);
Ok(error_text_response(format!(
"Failed to generate IVMS101 data: {}",
e
)))
}
}
}
fn get_definition(&self) -> Tool {
Tool {
name: "tap_generate_ivms101".to_string(),
description: "Generates IVMS101 compliant data for a customer based on their stored profile information.".to_string(),
input_schema: schema::generate_ivms101_schema(),
}
}
}
pub struct UpdateCustomerProfileTool {
tap_integration: Arc<TapIntegration>,
}
#[derive(Debug, Deserialize)]
struct UpdateCustomerProfileParams {
agent_did: String,
customer_id: String,
profile_data: Value,
}
impl UpdateCustomerProfileTool {
pub fn new(tap_integration: Arc<TapIntegration>) -> Self {
Self { tap_integration }
}
fn tap_integration(&self) -> &TapIntegration {
&self.tap_integration
}
}
#[async_trait::async_trait]
impl ToolHandler for UpdateCustomerProfileTool {
async fn handle(&self, arguments: Option<Value>) -> Result<CallToolResult> {
let params: UpdateCustomerProfileParams = match arguments {
Some(args) => serde_json::from_value(args)
.map_err(|e| Error::invalid_parameter(format!("Invalid parameters: {}", e)))?,
None => {
return Ok(error_text_response(
"Missing required parameters".to_string(),
))
}
};
debug!(
"Updating profile for customer {} via agent {}",
params.customer_id, params.agent_did
);
let storage = match self
.tap_integration()
.storage_for_agent(¶ms.agent_did)
.await
{
Ok(storage) => storage,
Err(e) => {
error!(
"Failed to get storage for agent {}: {}",
params.agent_did, e
);
return Ok(error_text_response(format!(
"Failed to get storage for agent {}: {}",
params.agent_did, e
)));
}
};
let customer_manager = CustomerManager::new(storage);
match customer_manager
.update_customer_profile(¶ms.customer_id, params.profile_data)
.await
{
Ok(_) => Ok(success_text_response(format!(
"Successfully updated profile for customer {}",
params.customer_id
))),
Err(e) => {
error!("Failed to update customer profile: {}", e);
Ok(error_text_response(format!(
"Failed to update customer profile: {}",
e
)))
}
}
}
fn get_definition(&self) -> Tool {
Tool {
name: "tap_update_customer_profile".to_string(),
description: "Updates the schema.org profile data for a customer. The profile_data should be a JSON object with schema.org fields.".to_string(),
input_schema: schema::update_customer_profile_schema(),
}
}
}
pub struct CreateCustomerTool {
tap_integration: Arc<TapIntegration>,
}
#[derive(Debug, Deserialize)]
struct CreateCustomerParams {
agent_did: String,
customer_id: String,
profile_data: Value,
}
impl CreateCustomerTool {
pub fn new(tap_integration: Arc<TapIntegration>) -> Self {
Self { tap_integration }
}
fn tap_integration(&self) -> &TapIntegration {
&self.tap_integration
}
}
#[async_trait::async_trait]
impl ToolHandler for CreateCustomerTool {
async fn handle(&self, arguments: Option<Value>) -> Result<CallToolResult> {
let params: CreateCustomerParams = match arguments {
Some(args) => serde_json::from_value(args)
.map_err(|e| Error::invalid_parameter(format!("Invalid parameters: {}", e)))?,
None => {
return Ok(error_text_response(
"Missing required parameters".to_string(),
))
}
};
debug!(
"Creating customer {} via agent {}",
params.customer_id, params.agent_did
);
let storage = match self
.tap_integration()
.storage_for_agent(¶ms.agent_did)
.await
{
Ok(storage) => storage,
Err(e) => {
error!(
"Failed to get storage for agent {}: {}",
params.agent_did, e
);
return Ok(error_text_response(format!(
"Failed to get storage for agent {}: {}",
params.agent_did, e
)));
}
};
let customer_manager = CustomerManager::new(storage.clone());
let existing = match storage.get_customer(¶ms.customer_id).await {
Ok(existing) => existing,
Err(e) => {
error!("Failed to check existing customer: {}", e);
return Ok(error_text_response(format!(
"Failed to check existing customer: {}",
e
)));
}
};
if existing.is_none() {
let display_name = params
.profile_data
.get("givenName")
.and_then(|v| v.as_str())
.map(|given| {
if let Some(family) = params
.profile_data
.get("familyName")
.and_then(|v| v.as_str())
{
format!("{} {}", given, family)
} else {
given.to_string()
}
});
let mut profile = json!({
"@context": "https://schema.org",
"@type": "Person",
"identifier": params.customer_id.clone(),
});
if let Value::Object(profile_obj) = &mut profile {
if let Value::Object(data_obj) = ¶ms.profile_data {
for (key, value) in data_obj {
profile_obj.insert(key.clone(), value.clone());
}
}
}
let schema_type = if params.profile_data.get("@type").and_then(|v| v.as_str())
== Some("Organization")
{
SchemaType::Organization
} else {
SchemaType::Person
};
let customer = Customer {
id: params.customer_id.clone(),
agent_did: params.agent_did.clone(),
schema_type,
given_name: params
.profile_data
.get("givenName")
.and_then(|v| v.as_str())
.map(String::from),
family_name: params
.profile_data
.get("familyName")
.and_then(|v| v.as_str())
.map(String::from),
display_name,
legal_name: params
.profile_data
.get("legalName")
.and_then(|v| v.as_str())
.map(String::from),
lei_code: params
.profile_data
.get("leiCode")
.and_then(|v| v.as_str())
.map(String::from),
mcc_code: params
.profile_data
.get("mccCode")
.and_then(|v| v.as_str())
.map(String::from),
address_country: params
.profile_data
.get("addressCountry")
.and_then(|v| v.as_str())
.map(String::from),
address_locality: params
.profile_data
.get("addressLocality")
.and_then(|v| v.as_str())
.map(String::from),
postal_code: params
.profile_data
.get("postalCode")
.and_then(|v| v.as_str())
.map(String::from),
street_address: params
.profile_data
.get("streetAddress")
.and_then(|v| v.as_str())
.map(String::from),
profile,
ivms101_data: None,
verified_at: None,
created_at: chrono::Utc::now().to_rfc3339(),
updated_at: chrono::Utc::now().to_rfc3339(),
};
match storage.upsert_customer(&customer).await {
Ok(_) => {
debug!("Created new customer {}", params.customer_id);
Ok(success_text_response(format!(
"Successfully created customer {}",
params.customer_id
)))
}
Err(e) => {
error!("Failed to create customer: {}", e);
Ok(error_text_response(format!(
"Failed to create customer: {}",
e
)))
}
}
} else {
match customer_manager
.update_customer_profile(¶ms.customer_id, params.profile_data)
.await
{
Ok(_) => Ok(success_text_response(format!(
"Successfully updated existing customer {}",
params.customer_id
))),
Err(e) => {
error!("Failed to update customer: {}", e);
Ok(error_text_response(format!(
"Failed to update customer: {}",
e
)))
}
}
}
}
fn get_definition(&self) -> Tool {
Tool {
name: "tap_create_customer".to_string(),
description: "Creates a new customer profile for an agent. The customer_id should be a DID or unique identifier. The profile_data should be a JSON object with schema.org fields (e.g., givenName, familyName, addressCountry). If a customer with the same ID already exists, their profile will be updated.".to_string(),
input_schema: schema::create_customer_schema(),
}
}
}
pub struct UpdateCustomerFromIvms101Tool {
tap_integration: Arc<TapIntegration>,
}
#[derive(Debug, Deserialize)]
struct UpdateCustomerFromIvms101Params {
agent_did: String,
customer_id: String,
ivms101_data: Value,
}
impl UpdateCustomerFromIvms101Tool {
pub fn new(tap_integration: Arc<TapIntegration>) -> Self {
Self { tap_integration }
}
fn tap_integration(&self) -> &TapIntegration {
&self.tap_integration
}
}
#[async_trait::async_trait]
impl ToolHandler for UpdateCustomerFromIvms101Tool {
async fn handle(&self, arguments: Option<Value>) -> Result<CallToolResult> {
let params: UpdateCustomerFromIvms101Params = match arguments {
Some(args) => serde_json::from_value(args)
.map_err(|e| Error::invalid_parameter(format!("Invalid parameters: {}", e)))?,
None => {
return Ok(error_text_response(
"Missing required parameters".to_string(),
))
}
};
debug!(
"Updating customer {} from IVMS101 data via agent {}",
params.customer_id, params.agent_did
);
let storage = match self
.tap_integration()
.storage_for_agent(¶ms.agent_did)
.await
{
Ok(storage) => storage,
Err(e) => {
error!(
"Failed to get storage for agent {}: {}",
params.agent_did, e
);
return Ok(error_text_response(format!(
"Failed to get storage for agent {}: {}",
params.agent_did, e
)));
}
};
let customer_manager = CustomerManager::new(storage);
match customer_manager
.update_customer_from_ivms101(¶ms.customer_id, ¶ms.ivms101_data)
.await
{
Ok(_) => Ok(success_text_response(format!(
"Successfully updated customer {} from IVMS101 data",
params.customer_id
))),
Err(e) => {
error!("Failed to update customer from IVMS101: {}", e);
Ok(error_text_response(format!(
"Failed to update customer from IVMS101: {}",
e
)))
}
}
}
fn get_definition(&self) -> Tool {
Tool {
name: "tap_update_customer_from_ivms101".to_string(),
description: "Updates a customer's profile using IVMS101 data. This extracts name, address and other fields from IVMS101 format.".to_string(),
input_schema: schema::update_customer_from_ivms101_schema(),
}
}
}