Skip to main content

tap_node/
customer.rs

1//! Customer management module for TAP Node
2//!
3//! This module provides customer data management functionality, including:
4//! - Automatic extraction of party information from TAP messages
5//! - Schema.org JSON-LD profile storage
6//! - Multiple identifier support (DIDs, email, phone, URLs)
7//! - Relationship tracking for TAIP-9 compliance
8//! - IVMS101 data caching for Travel Rule compliance
9
10use crate::error::{Error, Result};
11use crate::storage::{
12    Customer, CustomerIdentifier, CustomerRelationship, IdentifierType, SchemaType, Storage,
13};
14use chrono::Utc;
15use serde_json::{json, Value};
16use std::sync::Arc;
17use tap_ivms101::{
18    builder::{GeographicAddressBuilder, NaturalPersonBuilder, NaturalPersonNameBuilder},
19    message::Person,
20    types::AddressType,
21};
22use tap_msg::message::Party;
23use tap_msg::utils::NameHashable;
24use uuid::Uuid;
25
26/// Customer manager handles all customer-related operations
27pub struct CustomerManager {
28    storage: Arc<Storage>,
29}
30
31impl CustomerManager {
32    /// Generate name hash from IVMS101 Person data using TAIP-12 standard
33    pub fn generate_name_hash_from_ivms101(&self, person: &Person) -> Option<String> {
34        person
35            .get_full_name()
36            .map(|name| Customer::hash_name(&name))
37    }
38
39    /// Create a new customer manager
40    pub fn new(storage: Arc<Storage>) -> Self {
41        Self { storage }
42    }
43
44    /// Get a reference to the storage
45    pub fn get_storage(&self) -> &Arc<Storage> {
46        &self.storage
47    }
48
49    /// Extract and create/update customer from a Party object
50    pub async fn extract_customer_from_party(
51        &self,
52        party: &Party,
53        agent_did: &str,
54        _role: &str, // "originator", "beneficiary", etc.
55    ) -> Result<String> {
56        // Determine customer ID and primary identifier
57        let (customer_id, primary_identifier) = self.determine_customer_id(&party.id);
58
59        // Check if customer exists
60        let existing = self
61            .storage
62            .get_customer(&customer_id)
63            .await
64            .map_err(|e| Error::Storage(e.to_string()))?;
65
66        let mut profile = json!({
67            "@context": "https://schema.org",
68            "@type": "Person",
69            "identifier": party.id.clone(),
70        });
71
72        // Add metadata fields to profile
73        for (key, value) in &party.metadata {
74            // Map common fields
75            match key.as_str() {
76                "name" | "https://schema.org/name" => {
77                    profile["name"] = value.clone();
78                }
79                "givenName" | "https://schema.org/givenName" => {
80                    profile["givenName"] = value.clone();
81                }
82                "familyName" | "https://schema.org/familyName" => {
83                    profile["familyName"] = value.clone();
84                }
85                "addressCountry" | "https://schema.org/addressCountry" => {
86                    profile["addressCountry"] = value.clone();
87                }
88                "nameHash" => {
89                    // Preserve existing name hash from party metadata
90                    profile["nameHash"] = value.clone();
91                }
92                _ => {
93                    // Add other metadata as-is
94                    profile[key] = value.clone();
95                }
96            }
97        }
98
99        // Extract structured data from profile
100        let (given_name, family_name, display_name, address_country) =
101            self.extract_structured_data(&profile);
102
103        let now = Utc::now().to_rfc3339();
104
105        let mut customer = Customer {
106            id: customer_id.clone(),
107            agent_did: agent_did.to_string(),
108            schema_type: SchemaType::Person, // Default to Person, can be updated later
109            given_name,
110            family_name,
111            display_name: display_name.or_else(|| {
112                party
113                    .metadata
114                    .get("name")
115                    .and_then(|v| v.as_str())
116                    .map(String::from)
117            }),
118            legal_name: None,
119            lei_code: None,
120            mcc_code: None,
121            address_country,
122            address_locality: None,
123            postal_code: None,
124            street_address: None,
125            profile,
126            ivms101_data: None,
127            verified_at: None,
128            created_at: existing
129                .as_ref()
130                .map(|c| c.created_at.clone())
131                .unwrap_or_else(|| now.clone()),
132            updated_at: now,
133        };
134
135        // Generate and add name hash if not already present
136        if customer.get_name_hash().is_none() {
137            customer.add_name_hash_to_profile();
138        }
139
140        // Upsert customer
141        self.storage
142            .upsert_customer(&customer)
143            .await
144            .map_err(|e| Error::Storage(e.to_string()))?;
145
146        // Add identifier
147        let identifier = CustomerIdentifier {
148            id: primary_identifier.clone(),
149            customer_id: customer_id.clone(),
150            identifier_type: self.determine_identifier_type(&primary_identifier),
151            verified: false,
152            verification_method: None,
153            verified_at: None,
154            created_at: Utc::now().to_rfc3339(),
155        };
156        self.storage
157            .add_customer_identifier(&identifier)
158            .await
159            .map_err(|e| Error::Storage(e.to_string()))?;
160
161        // Extract additional identifiers from the id string
162        self.extract_additional_identifiers(&customer_id, &party.id)
163            .await?;
164
165        Ok(customer_id)
166    }
167
168    /// Update customer with schema.org data
169    pub async fn update_customer_profile(
170        &self,
171        customer_id: &str,
172        profile_data: Value,
173    ) -> Result<()> {
174        let mut customer = self
175            .storage
176            .get_customer(customer_id)
177            .await
178            .map_err(|e| Error::Storage(e.to_string()))?
179            .ok_or_else(|| Error::Storage("Customer not found".to_string()))?;
180
181        // Merge profile data
182        if let Value::Object(ref mut existing_map) = customer.profile {
183            if let Value::Object(new_map) = profile_data {
184                for (key, value) in new_map {
185                    existing_map.insert(key, value);
186                }
187            }
188        }
189
190        // Re-extract structured data
191        let (given_name, family_name, display_name, address_country) =
192            self.extract_structured_data(&customer.profile);
193
194        customer.given_name = given_name.or(customer.given_name);
195        customer.family_name = family_name.or(customer.family_name);
196        customer.display_name = display_name.or(customer.display_name);
197        customer.address_country = address_country.or(customer.address_country);
198        customer.updated_at = Utc::now().to_rfc3339();
199
200        // Regenerate name hash if names have changed
201        customer.add_name_hash_to_profile();
202
203        self.storage
204            .upsert_customer(&customer)
205            .await
206            .map_err(|e| Error::Storage(e.to_string()))?;
207        Ok(())
208    }
209
210    /// Generate IVMS101 data from customer profile
211    pub async fn generate_ivms101_data(&self, customer_id: &str) -> Result<Value> {
212        let customer = self
213            .storage
214            .get_customer(customer_id)
215            .await
216            .map_err(|e| Error::Storage(e.to_string()))?
217            .ok_or_else(|| Error::Storage("Customer not found".to_string()))?;
218
219        let person = match customer.schema_type {
220            SchemaType::Person => {
221                // Build natural person
222                let mut person_builder = NaturalPersonBuilder::new();
223
224                // Add name
225                if customer.family_name.is_some() || customer.given_name.is_some() {
226                    let name = NaturalPersonNameBuilder::new()
227                        .legal_name(
228                            customer.family_name.as_deref().unwrap_or("Unknown"),
229                            customer.given_name.as_deref().unwrap_or(""),
230                        )
231                        .build()
232                        .map_err(|e| Error::Storage(format!("Failed to build name: {}", e)))?;
233                    person_builder = person_builder.name(name);
234                }
235
236                // Add address only if we have street address (required field)
237                if customer.address_country.is_some() && customer.street_address.is_some() {
238                    let mut address_builder = GeographicAddressBuilder::new()
239                        .address_type(AddressType::Home)
240                        .country(customer.address_country.as_deref().unwrap_or(""))
241                        .street_name(customer.street_address.as_deref().unwrap_or(""));
242
243                    if let Some(postal) = &customer.postal_code {
244                        address_builder = address_builder.post_code(postal);
245                    }
246                    if let Some(town) = &customer.address_locality {
247                        address_builder = address_builder.town_name(town);
248                    }
249
250                    let address = address_builder
251                        .build()
252                        .map_err(|e| Error::Storage(format!("Failed to build address: {}", e)))?;
253                    person_builder = person_builder.add_address(address);
254                }
255
256                let natural_person = person_builder.build().map_err(|e| {
257                    Error::Storage(format!("Failed to build natural person: {}", e))
258                })?;
259
260                Person::NaturalPerson(natural_person)
261            }
262            SchemaType::Organization => {
263                // For organizations, we'll use LegalPerson (not implemented in tap-ivms101 yet)
264                // For now, return empty JSON
265                return Ok(json!({}));
266            }
267            _ => return Ok(json!({})),
268        };
269
270        // Serialize the person to JSON
271        let ivms101_json = serde_json::to_value(&person)
272            .map_err(|e| Error::Storage(format!("Failed to serialize IVMS101: {}", e)))?;
273
274        // Cache the generated IVMS101 data
275        let mut customer = customer;
276        customer.ivms101_data = Some(ivms101_json.clone());
277        customer.updated_at = Utc::now().to_rfc3339();
278        self.storage
279            .upsert_customer(&customer)
280            .await
281            .map_err(|e| Error::Storage(e.to_string()))?;
282
283        Ok(ivms101_json)
284    }
285
286    /// Update customer data from IVMS101 data
287    pub async fn update_customer_from_ivms101(
288        &self,
289        customer_id: &str,
290        ivms101_data: &Value,
291    ) -> Result<()> {
292        let mut customer = self
293            .storage
294            .get_customer(customer_id)
295            .await
296            .map_err(|e| Error::Storage(e.to_string()))?
297            .ok_or_else(|| Error::Storage("Customer not found".to_string()))?;
298
299        // Parse IVMS101 data and update customer fields
300        if let Some(natural_person) = ivms101_data.get("naturalPerson") {
301            // Update from natural person data
302            if let Some(name) = natural_person.get("name") {
303                if let Some(name_identifiers) =
304                    name.get("nameIdentifiers").and_then(|v| v.as_array())
305                {
306                    if let Some(first_name_id) = name_identifiers.first() {
307                        if let Some(primary) = first_name_id
308                            .get("primaryIdentifier")
309                            .and_then(|v| v.as_str())
310                        {
311                            customer.family_name = Some(primary.to_string());
312                        }
313                        if let Some(secondary) = first_name_id
314                            .get("secondaryIdentifier")
315                            .and_then(|v| v.as_str())
316                        {
317                            customer.given_name = Some(secondary.to_string());
318                        }
319                    }
320                }
321            }
322
323            // Update address from IVMS101
324            if let Some(addresses) = natural_person
325                .get("geographicAddress")
326                .and_then(|v| v.as_array())
327            {
328                if let Some(first_addr) = addresses.first() {
329                    if let Some(street) = first_addr.get("streetName").and_then(|v| v.as_str()) {
330                        customer.street_address = Some(street.to_string());
331                    }
332                    if let Some(postal) = first_addr.get("postCode").and_then(|v| v.as_str()) {
333                        customer.postal_code = Some(postal.to_string());
334                    }
335                    if let Some(town) = first_addr.get("townName").and_then(|v| v.as_str()) {
336                        customer.address_locality = Some(town.to_string());
337                    }
338                    if let Some(country) = first_addr.get("country").and_then(|v| v.as_str()) {
339                        customer.address_country = Some(country.to_string());
340                    }
341                }
342            }
343        }
344
345        // Store the IVMS101 data
346        customer.ivms101_data = Some(ivms101_data.clone());
347        customer.updated_at = Utc::now().to_rfc3339();
348
349        // Regenerate name hash if names have changed
350        customer.add_name_hash_to_profile();
351
352        self.storage
353            .upsert_customer(&customer)
354            .await
355            .map_err(|e| Error::Storage(e.to_string()))?;
356        Ok(())
357    }
358
359    /// Add a verified relationship
360    pub async fn add_relationship(
361        &self,
362        customer_id: &str,
363        relationship_type: &str,
364        related_identifier: &str,
365        proof: Option<Value>,
366    ) -> Result<()> {
367        let relationship = CustomerRelationship {
368            id: Uuid::new_v4().to_string(),
369            customer_id: customer_id.to_string(),
370            relationship_type: relationship_type.to_string(),
371            related_identifier: related_identifier.to_string(),
372            proof,
373            confirmed_at: Some(Utc::now().to_rfc3339()),
374            created_at: Utc::now().to_rfc3339(),
375        };
376
377        self.storage
378            .add_customer_relationship(&relationship)
379            .await
380            .map_err(|e| Error::Storage(e.to_string()))?;
381        Ok(())
382    }
383
384    // Helper methods
385
386    fn determine_customer_id(&self, account: &str) -> (String, String) {
387        // If it's a DID, use it as the customer ID
388        if account.starts_with("did:") {
389            (account.to_string(), account.to_string())
390        } else if account.contains('@') {
391            // Email address - create a stable ID
392            let id = format!("customer:{}", Uuid::new_v4());
393            (id, format!("mailto:{}", account))
394        } else if account.starts_with("http://") || account.starts_with("https://") {
395            // URL - create did:web
396            let domain = account
397                .trim_start_matches("https://")
398                .trim_start_matches("http://");
399            let did_web = format!("did:web:{}", domain.replace('/', ":"));
400            (did_web.clone(), did_web)
401        } else if account.starts_with('+')
402            || account.chars().all(|c| c.is_ascii_digit() || c == '-')
403        {
404            // Phone number
405            let id = format!("customer:{}", Uuid::new_v4());
406            (id, format!("tel:{}", account))
407        } else {
408            // Generic identifier
409            let id = format!("customer:{}", Uuid::new_v4());
410            (id, account.to_string())
411        }
412    }
413
414    fn determine_identifier_type(&self, identifier: &str) -> IdentifierType {
415        if identifier.starts_with("did:") {
416            IdentifierType::Did
417        } else if identifier.starts_with("mailto:") {
418            IdentifierType::Email
419        } else if identifier.starts_with("tel:") || identifier.starts_with("sms:") {
420            IdentifierType::Phone
421        } else if identifier.starts_with("http://") || identifier.starts_with("https://") {
422            IdentifierType::Url
423        } else if identifier.contains(':') && identifier.contains('/') {
424            // Likely a CAIP account identifier
425            IdentifierType::Account
426        } else {
427            IdentifierType::Other
428        }
429    }
430
431    async fn extract_additional_identifiers(&self, customer_id: &str, account: &str) -> Result<()> {
432        // If account contains multiple identifiers (e.g., "did:key:xyz, email:user@example.com")
433        if account.contains(',') {
434            for part in account.split(',') {
435                let trimmed = part.trim();
436                if !trimmed.is_empty() {
437                    let identifier = CustomerIdentifier {
438                        id: trimmed.to_string(),
439                        customer_id: customer_id.to_string(),
440                        identifier_type: self.determine_identifier_type(trimmed),
441                        verified: false,
442                        verification_method: None,
443                        verified_at: None,
444                        created_at: Utc::now().to_rfc3339(),
445                    };
446                    let _ = self.storage.add_customer_identifier(&identifier).await;
447                }
448            }
449        }
450        Ok(())
451    }
452
453    fn extract_structured_data(
454        &self,
455        profile: &Value,
456    ) -> (
457        Option<String>,
458        Option<String>,
459        Option<String>,
460        Option<String>,
461    ) {
462        let mut given_name = None;
463        let mut family_name = None;
464        let mut display_name = None;
465        let mut address_country = None;
466
467        if let Value::Object(map) = profile {
468            // Extract name components
469            if let Some(Value::String(gn)) = map.get("givenName") {
470                given_name = Some(gn.clone());
471            }
472            if let Some(Value::String(fn_)) = map.get("familyName") {
473                family_name = Some(fn_.clone());
474            }
475            if let Some(Value::String(name)) = map.get("name") {
476                display_name = Some(name.clone());
477            }
478
479            // Extract address
480            if let Some(Value::Object(addr)) = map.get("address") {
481                if let Some(Value::String(country)) = addr.get("addressCountry") {
482                    address_country = Some(country.clone());
483                }
484            } else if let Some(Value::String(country)) = map.get("addressCountry") {
485                address_country = Some(country.clone());
486            }
487        }
488
489        (given_name, family_name, display_name, address_country)
490    }
491}
492
493#[cfg(test)]
494mod tests {
495    use super::*;
496    use std::collections::HashMap;
497    use tempfile::tempdir;
498
499    #[tokio::test]
500    async fn test_extract_customer_from_party() {
501        let dir = tempdir().unwrap();
502        let db_path = dir.path().join("test.db");
503        let storage = Arc::new(Storage::new(Some(db_path)).await.unwrap());
504
505        let manager = CustomerManager::new(storage.clone());
506
507        let mut metadata = HashMap::new();
508        metadata.insert("name".to_string(), json!("Alice Smith"));
509        let party = Party::with_metadata(
510            "did:key:z6MkhaXgBZDvotDkL5257faiztiGiC2QtKLGpbnnEGta2doK",
511            metadata,
512        );
513
514        let customer_id = manager
515            .extract_customer_from_party(&party, "did:key:agent", "originator")
516            .await
517            .unwrap();
518
519        // Verify customer was created
520        let customer = storage.get_customer(&customer_id).await.unwrap().unwrap();
521        assert_eq!(customer.display_name, Some("Alice Smith".to_string()));
522        assert_eq!(customer.agent_did, "did:key:agent");
523
524        // Verify identifier was created
525        let identifiers = storage
526            .get_customer_identifiers(&customer_id)
527            .await
528            .unwrap();
529        assert_eq!(identifiers.len(), 1);
530        assert_eq!(identifiers[0].identifier_type, IdentifierType::Did);
531    }
532
533    #[tokio::test]
534    async fn test_email_identifier() {
535        let dir = tempdir().unwrap();
536        let db_path = dir.path().join("test.db");
537        let storage = Arc::new(Storage::new(Some(db_path)).await.unwrap());
538
539        let manager = CustomerManager::new(storage.clone());
540
541        let mut metadata = HashMap::new();
542        metadata.insert("name".to_string(), json!("Alice"));
543        let party = Party::with_metadata("alice@example.com", metadata);
544
545        let customer_id = manager
546            .extract_customer_from_party(&party, "did:key:agent", "beneficiary")
547            .await
548            .unwrap();
549
550        // Verify identifier is mailto format
551        let identifiers = storage
552            .get_customer_identifiers(&customer_id)
553            .await
554            .unwrap();
555        assert_eq!(identifiers.len(), 1);
556        assert_eq!(identifiers[0].id, "mailto:alice@example.com");
557        assert_eq!(identifiers[0].identifier_type, IdentifierType::Email);
558    }
559}