Skip to main content

tap_node/event/
customer_handler.rs

1//! Customer event handler for automatic customer data extraction
2//!
3//! This handler listens to TAP message events and automatically:
4//! - Extracts party information from Transfer messages
5//! - Updates customer records from UpdateParty messages
6//! - Manages relationships from ConfirmRelationship messages
7//! - Generates IVMS101 data when needed
8
9use crate::customer::CustomerManager;
10use crate::error::Result;
11use crate::event::{EventSubscriber, NodeEvent};
12use crate::storage::Storage;
13use async_trait::async_trait;
14use serde_json::{json, Value};
15use std::sync::Arc;
16use tap_msg::message::{transfer::Transfer, update_party::UpdateParty};
17
18/// Event handler that automatically extracts and manages customer data
19pub struct CustomerEventHandler {
20    storage: Arc<Storage>,
21    agent_did: String,
22}
23
24impl CustomerEventHandler {
25    /// Create a new customer event handler
26    pub fn new(storage: Arc<Storage>, agent_did: String) -> Self {
27        Self { storage, agent_did }
28    }
29}
30
31#[async_trait]
32impl EventSubscriber for CustomerEventHandler {
33    async fn handle_event(&self, event: NodeEvent) {
34        let result: Result<()> = match &event {
35            NodeEvent::MessageReceived { message, .. } | NodeEvent::MessageSent { message, .. } => {
36                // Handle different message types
37                match message.type_.as_str() {
38                    "https://tap.rsvp/schema/1.0#Transfer" => {
39                        self.handle_transfer_message(message).await
40                    }
41                    "https://tap.rsvp/schema/1.0#UpdateParty" => {
42                        self.handle_update_party_message(message).await
43                    }
44                    "https://tap.rsvp/schema/1.0#ConfirmRelationship" => {
45                        self.handle_confirm_relationship_message(message).await
46                    }
47                    _ => Ok(()),
48                }
49            }
50            NodeEvent::TransactionCreated {
51                transaction,
52                agent_did,
53            } => {
54                log::debug!("Handling TransactionCreated event for agent: {}", agent_did);
55
56                // Extract customer data from new transactions
57                // Parse the message JSON to get the message body
58                if let Ok(plain_message) = serde_json::from_value::<tap_msg::didcomm::PlainMessage>(
59                    transaction.message_json.clone(),
60                ) {
61                    if let Ok(transfer) = serde_json::from_value::<Transfer>(plain_message.body) {
62                        let manager = CustomerManager::new(self.storage.clone());
63
64                        // Extract originator if present
65                        if let Some(originator) = &transfer.originator {
66                            match manager
67                                .extract_customer_from_party(
68                                    originator,
69                                    &self.agent_did,
70                                    "originator",
71                                )
72                                .await
73                            {
74                                Ok(customer_id) => {
75                                    log::debug!(
76                                        "Created/updated originator customer: {}",
77                                        customer_id
78                                    )
79                                }
80                                Err(e) => log::error!("Failed to extract originator: {}", e),
81                            }
82                        }
83
84                        // Extract beneficiary
85                        if let Some(beneficiary) = &transfer.beneficiary {
86                            match manager
87                                .extract_customer_from_party(
88                                    beneficiary,
89                                    &self.agent_did,
90                                    "beneficiary",
91                                )
92                                .await
93                            {
94                                Ok(customer_id) => log::debug!(
95                                    "Created/updated beneficiary customer: {}",
96                                    customer_id
97                                ),
98                                Err(e) => log::error!("Failed to extract beneficiary: {}", e),
99                            }
100                        }
101                    }
102                }
103                Ok(())
104            }
105            _ => Ok(()),
106        };
107
108        if let Err(e) = result {
109            log::error!("Customer event handler error: {}", e);
110        }
111    }
112}
113
114impl CustomerEventHandler {
115    async fn handle_transfer_message(
116        &self,
117        message: &tap_msg::didcomm::PlainMessage,
118    ) -> Result<()> {
119        // Parse the transfer message
120        if let Ok(transfer) = serde_json::from_value::<Transfer>(message.body.clone()) {
121            let manager = CustomerManager::new(self.storage.clone());
122
123            // Extract originator information if present
124            if let Some(originator) = &transfer.originator {
125                let customer_id = manager
126                    .extract_customer_from_party(originator, &self.agent_did, "originator")
127                    .await?;
128
129                log::debug!("Extracted originator customer: {}", customer_id);
130            }
131
132            // Extract beneficiary information
133            if let Some(beneficiary) = &transfer.beneficiary {
134                let customer_id = manager
135                    .extract_customer_from_party(beneficiary, &self.agent_did, "beneficiary")
136                    .await?;
137
138                log::debug!("Extracted beneficiary customer: {}", customer_id);
139            }
140
141            // Extract agent relationships
142            for agent in &transfer.agents {
143                for agent_for in &agent.for_parties.0 {
144                    // Create relationship between agent and party
145                    if let Ok(Some(customer)) = self
146                        .storage
147                        .get_customer_by_identifier(&agent.id)
148                        .await
149                        .map_err(|e| crate::error::Error::Storage(e.to_string()))
150                    {
151                        let _ = manager
152                            .add_relationship(&customer.id, "acts_for", agent_for, None)
153                            .await;
154                    }
155                }
156            }
157        }
158
159        Ok(())
160    }
161
162    async fn handle_update_party_message(
163        &self,
164        message: &tap_msg::didcomm::PlainMessage,
165    ) -> Result<()> {
166        // Parse the UpdateParty message
167        if let Ok(update_party) = serde_json::from_value::<UpdateParty>(message.body.clone()) {
168            let manager = CustomerManager::new(self.storage.clone());
169
170            // Extract the party information
171            let customer_id = manager
172                .extract_customer_from_party(
173                    &update_party.party,
174                    &self.agent_did,
175                    &update_party.party_type,
176                )
177                .await?;
178
179            // If the party has additional schema.org data, update the profile
180            if let Some(profile_data) = extract_schema_org_data(&update_party.party) {
181                manager
182                    .update_customer_profile(&customer_id, profile_data)
183                    .await?;
184            }
185
186            log::debug!(
187                "Updated {} customer: {}",
188                update_party.party_type,
189                customer_id
190            );
191        }
192
193        Ok(())
194    }
195
196    async fn handle_confirm_relationship_message(
197        &self,
198        message: &tap_msg::didcomm::PlainMessage,
199    ) -> Result<()> {
200        // Extract relationship confirmation from the message body
201        if let Some(body) = message.body.as_object() {
202            if let (Some(agent_id), Some(for_id)) = (
203                body.get("@id").and_then(|v| v.as_str()),
204                body.get("for").and_then(|v| v.as_str()),
205            ) {
206                let manager = CustomerManager::new(self.storage.clone());
207
208                // Find the customer for the agent
209                if let Ok(Some(customer)) = self
210                    .storage
211                    .get_customer_by_identifier(agent_id)
212                    .await
213                    .map_err(|e| crate::error::Error::Storage(e.to_string()))
214                {
215                    // Add the confirmed relationship
216                    let proof = json!({
217                        "type": "ConfirmRelationship",
218                        "message_id": message.id,
219                        "from": message.from,
220                        "timestamp": message.created_time
221                    });
222
223                    manager
224                        .add_relationship(&customer.id, "confirmed_acts_for", for_id, Some(proof))
225                        .await?;
226
227                    log::debug!("Confirmed relationship: {} acts for {}", agent_id, for_id);
228                }
229            }
230        }
231
232        Ok(())
233    }
234}
235
236/// Extract schema.org data from a Party object
237fn extract_schema_org_data(party: &tap_msg::message::Party) -> Option<Value> {
238    // In a real implementation, this would parse the party's metadata
239    // to extract schema.org compatible data
240
241    // For now, return a basic schema.org Person object if we have a name in metadata
242    party
243        .metadata
244        .get("name")
245        .or_else(|| party.metadata.get("https://schema.org/name"))
246        .map(|name| {
247            json!({
248                "@context": "https://schema.org",
249                "@type": "Person",
250                "name": name
251            })
252        })
253}
254
255#[cfg(test)]
256mod tests {
257    use super::*;
258    use tap_msg::message::Party;
259    use tempfile::tempdir;
260
261    #[tokio::test]
262    async fn test_customer_extraction_from_transfer() {
263        let dir = tempdir().unwrap();
264        let db_path = dir.path().join("test.db");
265        let storage = Arc::new(Storage::new(Some(db_path)).await.unwrap());
266
267        let handler = CustomerEventHandler::new(storage.clone(), "did:key:agent".to_string());
268
269        use std::collections::HashMap;
270
271        let mut alice_metadata = HashMap::new();
272        alice_metadata.insert("name".to_string(), json!("Alice"));
273        let originator = Party::with_metadata("did:key:alice", alice_metadata);
274
275        let mut bob_metadata = HashMap::new();
276        bob_metadata.insert("name".to_string(), json!("Bob"));
277        let beneficiary = Party::with_metadata("bob@example.com", bob_metadata);
278
279        let transfer = Transfer {
280            asset: "eip155:1/slip44:60".parse().unwrap(),
281            originator: Some(originator),
282            beneficiary: Some(beneficiary),
283            amount: "100".to_string(),
284            agents: vec![],
285            memo: None,
286            settlement_id: None,
287            expiry: None,
288            transaction_value: None,
289            connection_id: None,
290            transaction_id: Some("tx-123".to_string()),
291            metadata: Default::default(),
292        };
293
294        let message = tap_msg::didcomm::PlainMessage {
295            id: "msg-123".to_string(),
296            typ: "application/didcomm-plain+json".to_string(),
297            type_: "https://tap.rsvp/schema/1.0#Transfer".to_string(),
298            body: serde_json::to_value(&transfer).unwrap(),
299            from: "did:key:sender".to_string(),
300            to: vec!["did:key:receiver".to_string()],
301            thid: None,
302            pthid: None,
303            extra_headers: Default::default(),
304            attachments: None,
305            created_time: None,
306            expires_time: None,
307            from_prior: None,
308        };
309
310        let event = NodeEvent::MessageReceived {
311            message,
312            source: "test".to_string(),
313        };
314
315        handler.handle_event(event).await;
316
317        // Verify customers were created
318        let alice = storage
319            .get_customer_by_identifier("did:key:alice")
320            .await
321            .unwrap();
322        assert!(alice.is_some());
323
324        let bob = storage
325            .get_customer_by_identifier("mailto:bob@example.com")
326            .await
327            .unwrap();
328        assert!(bob.is_some());
329    }
330}