1use crate::customer::CustomerManager;
10use crate::error::Result;
11use crate::event::{EventSubscriber, NodeEvent};
12use crate::storage::Storage;
13use async_trait::async_trait;
14use serde_json::{json, Value};
15use std::sync::Arc;
16use tap_msg::message::{transfer::Transfer, update_party::UpdateParty};
17
18pub struct CustomerEventHandler {
20 storage: Arc<Storage>,
21 agent_did: String,
22}
23
24impl CustomerEventHandler {
25 pub fn new(storage: Arc<Storage>, agent_did: String) -> Self {
27 Self { storage, agent_did }
28 }
29}
30
31#[async_trait]
32impl EventSubscriber for CustomerEventHandler {
33 async fn handle_event(&self, event: NodeEvent) {
34 let result: Result<()> = match &event {
35 NodeEvent::MessageReceived { message, .. } | NodeEvent::MessageSent { message, .. } => {
36 match message.type_.as_str() {
38 "https://tap.rsvp/schema/1.0#Transfer" => {
39 self.handle_transfer_message(message).await
40 }
41 "https://tap.rsvp/schema/1.0#UpdateParty" => {
42 self.handle_update_party_message(message).await
43 }
44 "https://tap.rsvp/schema/1.0#ConfirmRelationship" => {
45 self.handle_confirm_relationship_message(message).await
46 }
47 _ => Ok(()),
48 }
49 }
50 NodeEvent::TransactionCreated {
51 transaction,
52 agent_did,
53 } => {
54 log::debug!("Handling TransactionCreated event for agent: {}", agent_did);
55
56 if let Ok(plain_message) = serde_json::from_value::<tap_msg::didcomm::PlainMessage>(
59 transaction.message_json.clone(),
60 ) {
61 if let Ok(transfer) = serde_json::from_value::<Transfer>(plain_message.body) {
62 let manager = CustomerManager::new(self.storage.clone());
63
64 if let Some(originator) = &transfer.originator {
66 match manager
67 .extract_customer_from_party(
68 originator,
69 &self.agent_did,
70 "originator",
71 )
72 .await
73 {
74 Ok(customer_id) => {
75 log::debug!(
76 "Created/updated originator customer: {}",
77 customer_id
78 )
79 }
80 Err(e) => log::error!("Failed to extract originator: {}", e),
81 }
82 }
83
84 if let Some(beneficiary) = &transfer.beneficiary {
86 match manager
87 .extract_customer_from_party(
88 beneficiary,
89 &self.agent_did,
90 "beneficiary",
91 )
92 .await
93 {
94 Ok(customer_id) => log::debug!(
95 "Created/updated beneficiary customer: {}",
96 customer_id
97 ),
98 Err(e) => log::error!("Failed to extract beneficiary: {}", e),
99 }
100 }
101 }
102 }
103 Ok(())
104 }
105 _ => Ok(()),
106 };
107
108 if let Err(e) = result {
109 log::error!("Customer event handler error: {}", e);
110 }
111 }
112}
113
114impl CustomerEventHandler {
115 async fn handle_transfer_message(
116 &self,
117 message: &tap_msg::didcomm::PlainMessage,
118 ) -> Result<()> {
119 if let Ok(transfer) = serde_json::from_value::<Transfer>(message.body.clone()) {
121 let manager = CustomerManager::new(self.storage.clone());
122
123 if let Some(originator) = &transfer.originator {
125 let customer_id = manager
126 .extract_customer_from_party(originator, &self.agent_did, "originator")
127 .await?;
128
129 log::debug!("Extracted originator customer: {}", customer_id);
130 }
131
132 if let Some(beneficiary) = &transfer.beneficiary {
134 let customer_id = manager
135 .extract_customer_from_party(beneficiary, &self.agent_did, "beneficiary")
136 .await?;
137
138 log::debug!("Extracted beneficiary customer: {}", customer_id);
139 }
140
141 for agent in &transfer.agents {
143 for agent_for in &agent.for_parties.0 {
144 if let Ok(Some(customer)) = self
146 .storage
147 .get_customer_by_identifier(&agent.id)
148 .await
149 .map_err(|e| crate::error::Error::Storage(e.to_string()))
150 {
151 let _ = manager
152 .add_relationship(&customer.id, "acts_for", agent_for, None)
153 .await;
154 }
155 }
156 }
157 }
158
159 Ok(())
160 }
161
162 async fn handle_update_party_message(
163 &self,
164 message: &tap_msg::didcomm::PlainMessage,
165 ) -> Result<()> {
166 if let Ok(update_party) = serde_json::from_value::<UpdateParty>(message.body.clone()) {
168 let manager = CustomerManager::new(self.storage.clone());
169
170 let customer_id = manager
172 .extract_customer_from_party(
173 &update_party.party,
174 &self.agent_did,
175 &update_party.party_type,
176 )
177 .await?;
178
179 if let Some(profile_data) = extract_schema_org_data(&update_party.party) {
181 manager
182 .update_customer_profile(&customer_id, profile_data)
183 .await?;
184 }
185
186 log::debug!(
187 "Updated {} customer: {}",
188 update_party.party_type,
189 customer_id
190 );
191 }
192
193 Ok(())
194 }
195
196 async fn handle_confirm_relationship_message(
197 &self,
198 message: &tap_msg::didcomm::PlainMessage,
199 ) -> Result<()> {
200 if let Some(body) = message.body.as_object() {
202 if let (Some(agent_id), Some(for_id)) = (
203 body.get("@id").and_then(|v| v.as_str()),
204 body.get("for").and_then(|v| v.as_str()),
205 ) {
206 let manager = CustomerManager::new(self.storage.clone());
207
208 if let Ok(Some(customer)) = self
210 .storage
211 .get_customer_by_identifier(agent_id)
212 .await
213 .map_err(|e| crate::error::Error::Storage(e.to_string()))
214 {
215 let proof = json!({
217 "type": "ConfirmRelationship",
218 "message_id": message.id,
219 "from": message.from,
220 "timestamp": message.created_time
221 });
222
223 manager
224 .add_relationship(&customer.id, "confirmed_acts_for", for_id, Some(proof))
225 .await?;
226
227 log::debug!("Confirmed relationship: {} acts for {}", agent_id, for_id);
228 }
229 }
230 }
231
232 Ok(())
233 }
234}
235
236fn extract_schema_org_data(party: &tap_msg::message::Party) -> Option<Value> {
238 party
243 .metadata
244 .get("name")
245 .or_else(|| party.metadata.get("https://schema.org/name"))
246 .map(|name| {
247 json!({
248 "@context": "https://schema.org",
249 "@type": "Person",
250 "name": name
251 })
252 })
253}
254
255#[cfg(test)]
256mod tests {
257 use super::*;
258 use tap_msg::message::Party;
259 use tempfile::tempdir;
260
261 #[tokio::test]
262 async fn test_customer_extraction_from_transfer() {
263 let dir = tempdir().unwrap();
264 let db_path = dir.path().join("test.db");
265 let storage = Arc::new(Storage::new(Some(db_path)).await.unwrap());
266
267 let handler = CustomerEventHandler::new(storage.clone(), "did:key:agent".to_string());
268
269 use std::collections::HashMap;
270
271 let mut alice_metadata = HashMap::new();
272 alice_metadata.insert("name".to_string(), json!("Alice"));
273 let originator = Party::with_metadata("did:key:alice", alice_metadata);
274
275 let mut bob_metadata = HashMap::new();
276 bob_metadata.insert("name".to_string(), json!("Bob"));
277 let beneficiary = Party::with_metadata("bob@example.com", bob_metadata);
278
279 let transfer = Transfer {
280 asset: "eip155:1/slip44:60".parse().unwrap(),
281 originator: Some(originator),
282 beneficiary: Some(beneficiary),
283 amount: "100".to_string(),
284 agents: vec![],
285 memo: None,
286 settlement_id: None,
287 expiry: None,
288 transaction_value: None,
289 connection_id: None,
290 transaction_id: Some("tx-123".to_string()),
291 metadata: Default::default(),
292 };
293
294 let message = tap_msg::didcomm::PlainMessage {
295 id: "msg-123".to_string(),
296 typ: "application/didcomm-plain+json".to_string(),
297 type_: "https://tap.rsvp/schema/1.0#Transfer".to_string(),
298 body: serde_json::to_value(&transfer).unwrap(),
299 from: "did:key:sender".to_string(),
300 to: vec!["did:key:receiver".to_string()],
301 thid: None,
302 pthid: None,
303 extra_headers: Default::default(),
304 attachments: None,
305 created_time: None,
306 expires_time: None,
307 from_prior: None,
308 };
309
310 let event = NodeEvent::MessageReceived {
311 message,
312 source: "test".to_string(),
313 };
314
315 handler.handle_event(event).await;
316
317 let alice = storage
319 .get_customer_by_identifier("did:key:alice")
320 .await
321 .unwrap();
322 assert!(alice.is_some());
323
324 let bob = storage
325 .get_customer_by_identifier("mailto:bob@example.com")
326 .await
327 .unwrap();
328 assert!(bob.is_some());
329 }
330}