1use std::collections::{HashMap, HashSet};
2
3use bsv::remittance::types::PeerMessage;
4use bsv::wallet::interfaces::{InternalizeActionArgs, InternalizeOutput, Payment, WalletInterface};
5use bsv::wallet::types::BooleanDefaultTrue;
6use bsv::primitives::public_key::PublicKey;
7use futures_util::future::join_all;
8
9use crate::client::MessageBoxClient;
10use crate::error::MessageBoxError;
11use crate::client::check_status_error;
12use crate::types::{AcknowledgeMessageParams, FailedRecipient, ListMessagesParams, ListMessagesResponse, MessagePayment, MessagePaymentOutput, SendListParams, SendListResult, SentRecipient, SendMessageParams, SendMessageRequest, SendMessageResponse, ServerPeerMessage};
13use crate::encryption;
14
15pub(crate) fn dedup_messages(results: Vec<Vec<PeerMessage>>) -> Vec<PeerMessage> {
21 let mut seen = HashSet::new();
22 let mut out = Vec::new();
23 for host_messages in results {
24 for msg in host_messages {
25 if seen.insert(msg.message_id.clone()) {
26 out.push(msg);
27 }
28 }
29 }
30 out
31}
32
33#[derive(serde::Deserialize)]
37pub(crate) struct WrappedMessageBody {
38 pub message: Option<serde_json::Value>,
39 pub payment: Option<ServerPayment>,
40}
41
42#[derive(serde::Deserialize)]
43pub(crate) struct ServerPayment {
44 pub tx: Option<Vec<u8>>,
45 pub outputs: Option<Vec<ServerPaymentOutput>>,
46 pub description: Option<String>,
47}
48
49#[derive(serde::Deserialize)]
52#[serde(rename_all = "camelCase")]
53pub(crate) struct ServerPaymentOutput {
54 pub output_index: Option<u32>,
55 pub protocol: Option<String>,
57 pub derivation_prefix: Option<Vec<u8>>,
59 pub derivation_suffix: Option<Vec<u8>>,
61 pub sender_identity_key: Option<String>,
63}
64
65impl<W: WalletInterface + Clone + 'static + Send + Sync> MessageBoxClient<W> {
66 #[allow(clippy::too_many_arguments)]
78 pub async fn send_message(
79 &self,
80 recipient: &str,
81 message_box: &str,
82 body: &str,
83 skip_encryption: bool,
84 check_permissions: bool,
85 message_id: Option<&str>,
86 override_host: Option<&str>,
87 ) -> Result<String, MessageBoxError> {
88 self.assert_initialized().await?;
89 let host = match override_host {
90 Some(h) => h.to_string(),
91 None => self.resolve_host_for_recipient(recipient).await?,
92 };
93 self.send_message_to_host(
94 &host,
95 recipient,
96 message_box,
97 body,
98 skip_encryption,
99 check_permissions,
100 message_id,
101 None,
102 )
103 .await
104 }
105
106 #[allow(clippy::too_many_arguments)]
119 pub(crate) async fn send_message_to_host(
120 &self,
121 host: &str,
122 recipient: &str,
123 message_box: &str,
124 body: &str,
125 skip_encryption: bool,
126 check_permissions: bool,
127 message_id: Option<&str>,
128 payment: Option<MessagePayment>,
129 ) -> Result<String, MessageBoxError> {
130 let wire_body = if skip_encryption {
132 body.to_string()
133 } else {
134 encryption::encrypt_body(
135 self.wallet(),
136 body,
137 recipient,
138 self.originator(),
139 )
140 .await?
141 };
142
143 let resolved_message_id = if let Some(id) = message_id {
147 id.to_string()
148 } else {
149 encryption::generate_message_id(
150 self.wallet(),
151 body,
152 recipient,
153 self.originator(),
154 )
155 .await?
156 };
157
158 let payment = if check_permissions && payment.is_none() {
161 let quote = self.get_message_box_quote(recipient, message_box, None).await?;
162 if quote.delivery_fee > 0 || quote.recipient_fee > 0 {
163 let p = self.create_message_payment(recipient, "e, None).await?;
164 Some(p)
165 } else {
166 None
167 }
168 } else {
169 payment
170 };
171
172 let request = SendMessageRequest {
174 message: SendMessageParams {
175 recipient: recipient.to_string(),
176 message_box: message_box.to_string(),
177 body: wire_body,
178 message_id: resolved_message_id.clone(),
179 },
180 payment,
181 };
182
183 let body_bytes = serde_json::to_vec(&request)?;
184 let url = format!("{host}/sendMessage");
185 let response = self.post_json(&url, body_bytes).await?;
186 check_status_error(&response.body)?;
187
188 if let Ok(resp) = serde_json::from_slice::<SendMessageResponse>(&response.body) {
190 if let Some(server_id) = resp.message_id {
191 return Ok(server_id);
192 }
193 }
194 Ok(resolved_message_id)
195 }
196
197 async fn create_message_payment(
207 &self,
208 recipient: &str,
209 quote: &crate::types::MessageBoxQuote,
210 description: Option<&str>,
211 ) -> Result<MessagePayment, MessageBoxError> {
212 use bsv::wallet::interfaces::{
213 CreateActionArgs, CreateActionOptions, CreateActionOutput, GetPublicKeyArgs,
214 };
215 use bsv::wallet::types::{BooleanDefaultTrue, Counterparty, CounterpartyType, Protocol};
216 use bsv::primitives::public_key::PublicKey;
217 use bsv::primitives::utils::from_hex;
218 use bsv::script::templates::{P2PKH, ScriptTemplateLock};
219 use bsv::wallet::proto_wallet::ProtoWallet;
220 use base64::Engine;
221
222 let desc = description.unwrap_or("MessageBox delivery fee");
223 let sender_identity_key = self.get_identity_key().await?;
224
225 let mut output_index: u32 = 0;
226 let mut outputs = Vec::new();
227 let mut payment_outputs = Vec::new();
228
229 if quote.delivery_fee > 0 {
231 let prefix_bytes: Vec<u8> = (0..32).map(|_| rand::random::<u8>()).collect();
233 let suffix_bytes: Vec<u8> = (0..32).map(|_| rand::random::<u8>()).collect();
234 let prefix = base64::engine::general_purpose::STANDARD.encode(&prefix_bytes);
235 let suffix = base64::engine::general_purpose::STANDARD.encode(&suffix_bytes);
236
237 let agent_pk = PublicKey::from_string("e.delivery_agent_identity_key)
238 .map_err(|e| MessageBoxError::Wallet(format!("agent key: {e}")))?;
239
240 let delivery_key = self
242 .wallet()
243 .get_public_key(
244 GetPublicKeyArgs {
245 identity_key: false,
246 protocol_id: Some(Protocol {
247 security_level: 2,
248 protocol: "3241645161d8".to_string(),
249 }),
250 key_id: Some(format!("{prefix} {suffix}")),
251 counterparty: Some(Counterparty {
252 counterparty_type: CounterpartyType::Other,
253 public_key: Some(agent_pk),
254 }),
255 privileged: false,
256 privileged_reason: None,
257 for_self: None,
258 seek_permission: None,
259 },
260 self.originator(),
261 )
262 .await
263 .map_err(|e| MessageBoxError::Wallet(e.to_string()))?;
264
265 let hash_vec = delivery_key.public_key.to_hash();
266 let mut hash = [0u8; 20];
267 hash.copy_from_slice(&hash_vec);
268 let lock = P2PKH::from_public_key_hash(hash)
269 .lock()
270 .map_err(|e| MessageBoxError::Wallet(format!("P2PKH lock: {e}")))?;
271 let lock_bytes = from_hex(&lock.to_hex())
272 .map_err(|e| MessageBoxError::Wallet(format!("hex decode: {e}")))?;
273
274 outputs.push(CreateActionOutput {
275 locking_script: Some(lock_bytes),
276 satoshis: quote.delivery_fee as u64,
277 output_description: "MessageBox server delivery fee".to_string(),
278 basket: None,
279 custom_instructions: None,
280 tags: vec![],
281 });
282
283 payment_outputs.push(MessagePaymentOutput {
285 output_index,
286 derivation_prefix: prefix.as_bytes().to_vec(),
287 derivation_suffix: suffix.as_bytes().to_vec(),
288 sender_identity_key: sender_identity_key.clone(),
289 });
290 output_index += 1;
291 }
292
293 if quote.recipient_fee > 0 {
295 let prefix_bytes: Vec<u8> = (0..32).map(|_| rand::random::<u8>()).collect();
296 let suffix_bytes: Vec<u8> = (0..32).map(|_| rand::random::<u8>()).collect();
297 let prefix = base64::engine::general_purpose::STANDARD.encode(&prefix_bytes);
298 let suffix = base64::engine::general_purpose::STANDARD.encode(&suffix_bytes);
299
300 let anyone_wallet = ProtoWallet::anyone();
304
305 let recipient_pk = PublicKey::from_string(recipient)
306 .map_err(|e| MessageBoxError::Wallet(format!("recipient key: {e}")))?;
307
308 let recv_key = anyone_wallet
310 .get_public_key(
311 GetPublicKeyArgs {
312 identity_key: false,
313 protocol_id: Some(Protocol {
314 security_level: 2,
315 protocol: "3241645161d8".to_string(),
316 }),
317 key_id: Some(format!("{prefix} {suffix}")),
318 counterparty: Some(Counterparty {
319 counterparty_type: CounterpartyType::Other,
320 public_key: Some(recipient_pk),
321 }),
322 privileged: false,
323 privileged_reason: None,
324 for_self: None,
325 seek_permission: None,
326 },
327 None,
328 )
329 .await
330 .map_err(|e| MessageBoxError::Wallet(e.to_string()))?;
331
332 let hash_vec2 = recv_key.public_key.to_hash();
333 let mut hash2 = [0u8; 20];
334 hash2.copy_from_slice(&hash_vec2);
335 let lock_recv = P2PKH::from_public_key_hash(hash2)
336 .lock()
337 .map_err(|e| MessageBoxError::Wallet(format!("P2PKH lock: {e}")))?;
338 let lock_recv_bytes = from_hex(&lock_recv.to_hex())
339 .map_err(|e| MessageBoxError::Wallet(format!("hex decode: {e}")))?;
340
341 outputs.push(CreateActionOutput {
342 locking_script: Some(lock_recv_bytes),
343 satoshis: quote.recipient_fee as u64,
344 output_description: "Recipient message fee".to_string(),
345 basket: None,
346 custom_instructions: None,
347 tags: vec![],
348 });
349
350 let anyone_id = anyone_wallet
352 .get_public_key(
353 GetPublicKeyArgs {
354 identity_key: true,
355 protocol_id: None,
356 key_id: None,
357 counterparty: None,
358 privileged: false,
359 privileged_reason: None,
360 for_self: None,
361 seek_permission: None,
362 },
363 None,
364 )
365 .await
366 .map_err(|e| MessageBoxError::Wallet(e.to_string()))?;
367
368 payment_outputs.push(MessagePaymentOutput {
369 output_index,
370 derivation_prefix: prefix.as_bytes().to_vec(),
371 derivation_suffix: suffix.as_bytes().to_vec(),
372 sender_identity_key: anyone_id.public_key.to_der_hex(),
373 });
374 }
375
376 let create_result = self
377 .wallet()
378 .create_action(
379 CreateActionArgs {
380 description: desc.to_string(),
381 input_beef: None,
382 inputs: vec![],
383 outputs,
384 lock_time: None,
385 version: None,
386 labels: vec!["messagebox".to_string()],
387 options: Some(CreateActionOptions {
388 randomize_outputs: BooleanDefaultTrue(Some(false)),
389 ..Default::default()
390 }),
391 reference: None,
392 },
393 self.originator(),
394 )
395 .await
396 .map_err(|e| MessageBoxError::Wallet(e.to_string()))?;
397
398 let tx = create_result
399 .tx
400 .ok_or_else(|| MessageBoxError::Wallet("create_action returned no tx".to_string()))?;
401
402 Ok(MessagePayment {
403 tx,
404 outputs: payment_outputs,
405 })
406 }
407
408
409 pub async fn send_message_to_recipients(
416 &self,
417 params: &SendListParams,
418 override_host: Option<&str>,
419 ) -> Result<SendListResult, MessageBoxError> {
420 self.assert_initialized().await?;
421
422 let skip_enc = params.skip_encryption.unwrap_or(false);
423
424 let recipient_refs: Vec<&str> = params.recipients.iter().map(|s| s.as_str()).collect();
425 let multi_quote = self
426 .get_message_box_quote_multi(&recipient_refs, ¶ms.message_box, override_host)
427 .await?;
428
429 let blocked: Vec<String> = multi_quote.blocked_recipients.clone();
431 let sendable: Vec<&crate::types::RecipientQuote> = multi_quote
432 .quotes_by_recipient
433 .iter()
434 .filter(|rq| rq.status != "blocked")
435 .collect();
436
437 let mut recipient_hosts: HashMap<String, String> = HashMap::new();
439 for rq in &sendable {
440 let host = if let Some(h) = override_host {
441 h.to_string()
442 } else {
443 self.resolve_host_for_recipient(&rq.recipient).await.unwrap_or_else(|_| self.host().to_string())
444 };
445 recipient_hosts.insert(rq.recipient.clone(), host);
446 }
447
448 let needs_payment = sendable
450 .iter()
451 .any(|rq| rq.delivery_fee > 0 || rq.recipient_fee > 0);
452
453 let batch_payment = if needs_payment {
454 let pairs_for_payment: Vec<(String, i64, i64, String)> = sendable
456 .iter()
457 .map(|rq| {
458 let host = recipient_hosts.get(&rq.recipient).cloned().unwrap_or_else(|| self.host().to_string());
459 let agent_key = multi_quote.delivery_agent_identity_key_by_host.get(&host).cloned().unwrap_or_default();
460 (rq.recipient.clone(), rq.delivery_fee, rq.recipient_fee, agent_key)
461 })
462 .collect();
463
464 match self.create_message_payment_batch_from_tuples(&pairs_for_payment, None).await {
465 Ok(p) => Some(p),
466 Err(e) => {
467 let failed_entries: Vec<FailedRecipient> = sendable
469 .iter()
470 .map(|rq| FailedRecipient {
471 recipient: rq.recipient.clone(),
472 error: e.to_string(),
473 })
474 .collect();
475 return Ok(SendListResult {
476 status: "error".to_string(),
477 description: "Batch payment creation failed".to_string(),
478 sent: vec![],
479 blocked,
480 failed: failed_entries,
481 totals: None,
482 });
483 }
484 }
485 } else {
486 None
487 };
488
489 let mut sent: Vec<SentRecipient> = Vec::new();
491 let mut failed: Vec<FailedRecipient> = Vec::new();
492
493 for rq in &sendable {
494 let host = recipient_hosts
495 .get(&rq.recipient)
496 .cloned()
497 .unwrap_or_else(|| self.host().to_string());
498
499 match self
500 .send_message_to_host(
501 &host,
502 &rq.recipient,
503 ¶ms.message_box,
504 ¶ms.body,
505 skip_enc,
506 false, None,
508 batch_payment.clone(),
509 )
510 .await
511 {
512 Ok(msg_id) => sent.push(SentRecipient {
513 recipient: rq.recipient.clone(),
514 message_id: msg_id,
515 }),
516 Err(e) => failed.push(FailedRecipient {
517 recipient: rq.recipient.clone(),
518 error: e.to_string(),
519 }),
520 }
521 }
522
523 Ok(SendListResult {
524 status: "success".to_string(),
525 description: format!("Sent to {} recipients", sent.len()),
526 sent,
527 blocked,
528 failed,
529 totals: multi_quote.totals,
530 })
531 }
532
533 async fn create_message_payment_batch_from_tuples(
541 &self,
542 tuples: &[(String, i64, i64, String)],
543 description: Option<&str>,
544 ) -> Result<MessagePayment, MessageBoxError> {
545 use bsv::wallet::interfaces::{
546 CreateActionArgs, CreateActionOptions, CreateActionOutput, GetPublicKeyArgs,
547 };
548 use bsv::wallet::types::{BooleanDefaultTrue, Counterparty, CounterpartyType, Protocol};
549 use bsv::primitives::public_key::PublicKey;
550 use bsv::primitives::utils::from_hex;
551 use bsv::script::templates::{P2PKH, ScriptTemplateLock};
552 use bsv::wallet::proto_wallet::ProtoWallet;
553 use base64::Engine;
554
555 let desc = description.unwrap_or("MessageBox batch delivery fee");
556 let sender_identity_key = self.get_identity_key().await?;
557 let anyone_wallet = ProtoWallet::anyone();
558 let anyone_id = anyone_wallet
559 .get_public_key(
560 GetPublicKeyArgs {
561 identity_key: true,
562 protocol_id: None,
563 key_id: None,
564 counterparty: None,
565 privileged: false,
566 privileged_reason: None,
567 for_self: None,
568 seek_permission: None,
569 },
570 None,
571 )
572 .await
573 .map_err(|e| MessageBoxError::Wallet(e.to_string()))?;
574 let anyone_id_hex = anyone_id.public_key.to_der_hex();
575
576 let mut outputs: Vec<CreateActionOutput> = Vec::new();
577 let mut payment_outputs: Vec<MessagePaymentOutput> = Vec::new();
578
579 for (recipient, delivery_fee, recipient_fee, agent_key) in tuples {
580 if *delivery_fee > 0 && !agent_key.is_empty() {
582 let prefix_bytes: Vec<u8> = (0..32).map(|_| rand::random::<u8>()).collect();
583 let suffix_bytes: Vec<u8> = (0..32).map(|_| rand::random::<u8>()).collect();
584 let prefix = base64::engine::general_purpose::STANDARD.encode(&prefix_bytes);
585 let suffix = base64::engine::general_purpose::STANDARD.encode(&suffix_bytes);
586
587 let agent_pk = PublicKey::from_string(agent_key)
588 .map_err(|e| MessageBoxError::Wallet(format!("agent key: {e}")))?;
589
590 let key = self
591 .wallet()
592 .get_public_key(
593 GetPublicKeyArgs {
594 identity_key: false,
595 protocol_id: Some(Protocol {
596 security_level: 2,
597 protocol: "3241645161d8".to_string(),
598 }),
599 key_id: Some(format!("{prefix} {suffix}")),
600 counterparty: Some(Counterparty {
601 counterparty_type: CounterpartyType::Other,
602 public_key: Some(agent_pk),
603 }),
604 privileged: false,
605 privileged_reason: None,
606 for_self: None,
607 seek_permission: None,
608 },
609 self.originator(),
610 )
611 .await
612 .map_err(|e| MessageBoxError::Wallet(e.to_string()))?;
613
614 let hash_vec = key.public_key.to_hash();
615 let mut hash = [0u8; 20];
616 hash.copy_from_slice(&hash_vec);
617 let lock = P2PKH::from_public_key_hash(hash)
618 .lock()
619 .map_err(|e| MessageBoxError::Wallet(format!("P2PKH lock: {e}")))?;
620 let lock_bytes = from_hex(&lock.to_hex())
621 .map_err(|e| MessageBoxError::Wallet(format!("hex decode: {e}")))?;
622
623 let output_index = outputs.len() as u32;
624 outputs.push(CreateActionOutput {
625 locking_script: Some(lock_bytes),
626 satoshis: *delivery_fee as u64,
627 output_description: format!("Delivery fee for {}", recipient),
628 basket: None,
629 custom_instructions: None,
630 tags: vec![],
631 });
632 payment_outputs.push(MessagePaymentOutput {
633 output_index,
634 derivation_prefix: prefix.as_bytes().to_vec(),
635 derivation_suffix: suffix.as_bytes().to_vec(),
636 sender_identity_key: sender_identity_key.clone(),
637 });
638 }
639
640 if *recipient_fee > 0 {
642 let prefix_bytes: Vec<u8> = (0..32).map(|_| rand::random::<u8>()).collect();
643 let suffix_bytes: Vec<u8> = (0..32).map(|_| rand::random::<u8>()).collect();
644 let prefix = base64::engine::general_purpose::STANDARD.encode(&prefix_bytes);
645 let suffix = base64::engine::general_purpose::STANDARD.encode(&suffix_bytes);
646
647 let recipient_pk = PublicKey::from_string(recipient)
648 .map_err(|e| MessageBoxError::Wallet(format!("recipient key: {e}")))?;
649
650 let key = anyone_wallet
652 .get_public_key(
653 GetPublicKeyArgs {
654 identity_key: false,
655 protocol_id: Some(Protocol {
656 security_level: 2,
657 protocol: "3241645161d8".to_string(),
658 }),
659 key_id: Some(format!("{prefix} {suffix}")),
660 counterparty: Some(Counterparty {
661 counterparty_type: CounterpartyType::Other,
662 public_key: Some(recipient_pk),
663 }),
664 privileged: false,
665 privileged_reason: None,
666 for_self: None,
667 seek_permission: None,
668 },
669 None,
670 )
671 .await
672 .map_err(|e| MessageBoxError::Wallet(e.to_string()))?;
673
674 let hash_vec = key.public_key.to_hash();
675 let mut hash = [0u8; 20];
676 hash.copy_from_slice(&hash_vec);
677 let lock = P2PKH::from_public_key_hash(hash)
678 .lock()
679 .map_err(|e| MessageBoxError::Wallet(format!("P2PKH lock: {e}")))?;
680 let lock_bytes = from_hex(&lock.to_hex())
681 .map_err(|e| MessageBoxError::Wallet(format!("hex decode: {e}")))?;
682
683 let output_index = outputs.len() as u32;
684 outputs.push(CreateActionOutput {
685 locking_script: Some(lock_bytes),
686 satoshis: *recipient_fee as u64,
687 output_description: format!("Recipient fee for {}", recipient),
688 basket: None,
689 custom_instructions: None,
690 tags: vec![],
691 });
692 payment_outputs.push(MessagePaymentOutput {
693 output_index,
694 derivation_prefix: prefix.as_bytes().to_vec(),
695 derivation_suffix: suffix.as_bytes().to_vec(),
696 sender_identity_key: anyone_id_hex.clone(),
697 });
698 }
699 }
700
701 if outputs.is_empty() {
702 return Ok(MessagePayment { tx: vec![], outputs: vec![] });
703 }
704
705 let create_result = self
706 .wallet()
707 .create_action(
708 CreateActionArgs {
709 description: desc.to_string(),
710 input_beef: None,
711 inputs: vec![],
712 outputs,
713 lock_time: None,
714 version: None,
715 labels: vec!["messagebox".to_string()],
716 options: Some(CreateActionOptions {
717 randomize_outputs: BooleanDefaultTrue(Some(false)),
718 ..Default::default()
719 }),
720 reference: None,
721 },
722 self.originator(),
723 )
724 .await
725 .map_err(|e| MessageBoxError::Wallet(e.to_string()))?;
726
727 let tx = create_result
728 .tx
729 .ok_or_else(|| MessageBoxError::Wallet("create_action returned no tx".to_string()))?;
730
731 Ok(MessagePayment {
732 tx,
733 outputs: payment_outputs,
734 })
735 }
736
737 pub async fn list_messages_lite(
744 &self,
745 message_box: &str,
746 override_host: Option<&str>,
747 ) -> Result<Vec<ServerPeerMessage>, MessageBoxError> {
748 self.assert_initialized().await?;
749
750 let host = override_host.unwrap_or_else(|| self.host());
751 let params = ListMessagesParams {
752 message_box: message_box.to_string(),
753 };
754 let body_bytes = serde_json::to_vec(¶ms)?;
755 let url = format!("{host}/listMessages");
756 let response = self.post_json(&url, body_bytes).await?;
757 check_status_error(&response.body)?;
758
759 let mut list_response: ListMessagesResponse =
760 serde_json::from_slice(&response.body)?;
761
762 for msg in &mut list_response.messages {
765 msg.body = encryption::try_decrypt_message(
766 self.wallet(),
767 &msg.body,
768 &msg.sender,
769 None, )
771 .await;
772 }
773
774 Ok(list_response.messages)
775 }
776
777 pub async fn list_messages(
793 &self,
794 message_box: &str,
795 accept_payments: bool,
796 override_host: Option<&str>,
797 ) -> Result<Vec<PeerMessage>, MessageBoxError> {
798 self.assert_initialized().await?;
799
800 if let Some(host) = override_host {
802 return self.list_messages_from_host(host, message_box, accept_payments).await;
803 }
804
805 let identity_key = self.get_identity_key().await?;
807 let ads = self.query_advertisements(Some(&identity_key), None).await.unwrap_or_default();
808
809 let mut host_set: HashSet<String> = ads.into_iter().map(|ad| ad.host).collect();
811 host_set.insert(self.host().to_string());
812
813 if host_set.len() == 1 {
814 return self.list_messages_from_host(self.host(), message_box, accept_payments).await;
816 }
817
818 let futures: Vec<_> = host_set
820 .iter()
821 .map(|h| self.list_messages_from_host(h, message_box, accept_payments))
822 .collect();
823
824 let outcomes = join_all(futures).await;
825 let successful: Vec<Vec<PeerMessage>> = outcomes
826 .into_iter()
827 .filter_map(|r| r.ok())
828 .collect();
829
830 if successful.is_empty() {
831 return Err(MessageBoxError::Http(0, format!("list_messages: all {} hosts failed", host_set.len())));
832 }
833
834 Ok(dedup_messages(successful))
835 }
836
837 async fn list_messages_from_host(
842 &self,
843 host: &str,
844 message_box: &str,
845 accept_payments: bool,
846 ) -> Result<Vec<PeerMessage>, MessageBoxError> {
847 let identity_key = self.get_identity_key().await?;
849
850 let params = ListMessagesParams {
851 message_box: message_box.to_string(),
852 };
853 let body_bytes = serde_json::to_vec(¶ms)?;
854 let url = format!("{host}/listMessages");
855 let response = self.post_json(&url, body_bytes).await?;
856 check_status_error(&response.body)?;
857
858 let list_response: ListMessagesResponse = serde_json::from_slice(&response.body)?;
859
860 let mut result = Vec::with_capacity(list_response.messages.len());
861 for msg in list_response.messages {
862 let plain_body: String = if let Ok(wrapped) = serde_json::from_str::<WrappedMessageBody>(&msg.body) {
864 if accept_payments {
866 if let Some(payment) = &wrapped.payment {
867 if let Some(tx_bytes) = &payment.tx {
868 let description = payment
869 .description
870 .clone()
871 .unwrap_or_else(|| "Server delivery fee".to_string());
872
873 let outputs: Vec<InternalizeOutput> = payment
876 .outputs
877 .as_deref()
878 .unwrap_or(&[])
879 .iter()
880 .filter_map(|o| {
881 if o.protocol.as_deref() != Some("wallet payment") && o.protocol.is_some() {
883 return None;
884 }
885 let sender_pk = o.sender_identity_key
887 .as_deref()
888 .and_then(|k| PublicKey::from_string(k).ok())?;
889 Some(InternalizeOutput::WalletPayment {
890 output_index: o.output_index.unwrap_or(0),
891 payment: Payment {
892 derivation_prefix: o.derivation_prefix.clone().unwrap_or_default(),
893 derivation_suffix: o.derivation_suffix.clone().unwrap_or_default(),
894 sender_identity_key: sender_pk,
895 },
896 })
897 })
898 .collect();
899
900 let args = InternalizeActionArgs {
901 tx: tx_bytes.clone(),
902 description,
903 labels: vec!["server-delivery-fee".to_string()],
904 seek_permission: BooleanDefaultTrue(Some(false)),
905 outputs,
906 };
907 let _ = self.wallet().internalize_action(args, self.originator()).await;
909 }
910 }
911 }
912
913 match wrapped.message {
915 Some(serde_json::Value::String(s)) => s,
916 Some(v) => v.to_string(),
917 None => msg.body.clone(),
918 }
919 } else {
920 msg.body.clone()
922 };
923
924 let decrypted = encryption::try_decrypt_message(
926 self.wallet(),
927 &plain_body,
928 &msg.sender,
929 self.originator(),
930 )
931 .await;
932
933 result.push(PeerMessage {
934 message_id: msg.message_id,
935 sender: msg.sender,
936 recipient: identity_key.clone(),
937 message_box: message_box.to_string(),
938 body: decrypted,
939 });
940 }
941
942 Ok(result)
943 }
944
945 pub async fn acknowledge_message(
951 &self,
952 message_ids: Vec<String>,
953 override_host: Option<&str>,
954 ) -> Result<(), MessageBoxError> {
955 self.assert_initialized().await?;
956
957 if let Some(host) = override_host {
958 return self.acknowledge_message_on_host(host, &message_ids).await;
959 }
960
961 let identity_key = self.get_identity_key().await?;
963 let ads = self.query_advertisements(Some(&identity_key), None).await.unwrap_or_default();
964
965 let mut host_set: HashSet<String> = ads.into_iter().map(|ad| ad.host).collect();
966 host_set.insert(self.host().to_string());
967
968 if host_set.len() == 1 {
969 return self.acknowledge_message_on_host(self.host(), &message_ids).await;
970 }
971
972 let futures: Vec<_> = host_set
974 .iter()
975 .map(|h| self.acknowledge_message_on_host(h, &message_ids))
976 .collect();
977
978 let outcomes = join_all(futures).await;
979 let any_ok = outcomes.iter().any(|r| r.is_ok());
980
981 if any_ok {
982 Ok(())
983 } else {
984 Err(MessageBoxError::Http(0, format!("acknowledge_message: all {} hosts failed", host_set.len())))
985 }
986 }
987
988 async fn acknowledge_message_on_host(
990 &self,
991 host: &str,
992 message_ids: &[String],
993 ) -> Result<(), MessageBoxError> {
994 let params = AcknowledgeMessageParams {
995 message_ids: message_ids.to_vec(),
996 };
997 let body_bytes = serde_json::to_vec(¶ms)?;
998 let url = format!("{host}/acknowledgeMessage");
999 let response = self.post_json(&url, body_bytes).await?;
1000 check_status_error(&response.body)?;
1001 Ok(())
1002 }
1003}
1004
1005#[cfg(test)]
1010mod tests {
1011 use crate::encryption::generate_message_id;
1012 use crate::types::{
1013 AcknowledgeMessageParams, ListMessagesResponse, SendMessageParams, SendMessageRequest,
1014 };
1015 use bsv::primitives::private_key::PrivateKey;
1016 use bsv::wallet::error::WalletError;
1017 use bsv::wallet::interfaces::*;
1018 use bsv::wallet::proto_wallet::ProtoWallet;
1019 use std::sync::Arc;
1020
1021 #[derive(Clone)]
1023 struct ArcWallet(Arc<ProtoWallet>);
1024
1025 impl ArcWallet {
1026 fn new() -> Self {
1027 let key = PrivateKey::from_random().expect("random key");
1028 ArcWallet(Arc::new(ProtoWallet::new(key)))
1029 }
1030 }
1031
1032 #[async_trait::async_trait]
1033 impl WalletInterface for ArcWallet {
1034 async fn create_action(&self, args: CreateActionArgs, orig: Option<&str>) -> Result<CreateActionResult, WalletError> { self.0.create_action(args, orig).await }
1035 async fn sign_action(&self, args: SignActionArgs, orig: Option<&str>) -> Result<SignActionResult, WalletError> { self.0.sign_action(args, orig).await }
1036 async fn abort_action(&self, args: AbortActionArgs, orig: Option<&str>) -> Result<AbortActionResult, WalletError> { self.0.abort_action(args, orig).await }
1037 async fn list_actions(&self, args: ListActionsArgs, orig: Option<&str>) -> Result<ListActionsResult, WalletError> { self.0.list_actions(args, orig).await }
1038 async fn internalize_action(&self, args: InternalizeActionArgs, orig: Option<&str>) -> Result<InternalizeActionResult, WalletError> { self.0.internalize_action(args, orig).await }
1039 async fn list_outputs(&self, args: ListOutputsArgs, orig: Option<&str>) -> Result<ListOutputsResult, WalletError> { self.0.list_outputs(args, orig).await }
1040 async fn relinquish_output(&self, args: RelinquishOutputArgs, orig: Option<&str>) -> Result<RelinquishOutputResult, WalletError> { self.0.relinquish_output(args, orig).await }
1041 async fn get_public_key(&self, args: GetPublicKeyArgs, orig: Option<&str>) -> Result<GetPublicKeyResult, WalletError> { self.0.get_public_key(args, orig).await }
1042 async fn reveal_counterparty_key_linkage(&self, args: RevealCounterpartyKeyLinkageArgs, orig: Option<&str>) -> Result<RevealCounterpartyKeyLinkageResult, WalletError> { self.0.reveal_counterparty_key_linkage(args, orig).await }
1043 async fn reveal_specific_key_linkage(&self, args: RevealSpecificKeyLinkageArgs, orig: Option<&str>) -> Result<RevealSpecificKeyLinkageResult, WalletError> { self.0.reveal_specific_key_linkage(args, orig).await }
1044 async fn encrypt(&self, args: EncryptArgs, orig: Option<&str>) -> Result<EncryptResult, WalletError> { self.0.encrypt(args, orig).await }
1045 async fn decrypt(&self, args: DecryptArgs, orig: Option<&str>) -> Result<DecryptResult, WalletError> { self.0.decrypt(args, orig).await }
1046 async fn create_hmac(&self, args: CreateHmacArgs, orig: Option<&str>) -> Result<CreateHmacResult, WalletError> { self.0.create_hmac(args, orig).await }
1047 async fn verify_hmac(&self, args: VerifyHmacArgs, orig: Option<&str>) -> Result<VerifyHmacResult, WalletError> { self.0.verify_hmac(args, orig).await }
1048 async fn create_signature(&self, args: CreateSignatureArgs, orig: Option<&str>) -> Result<CreateSignatureResult, WalletError> { self.0.create_signature(args, orig).await }
1049 async fn verify_signature(&self, args: VerifySignatureArgs, orig: Option<&str>) -> Result<VerifySignatureResult, WalletError> { self.0.verify_signature(args, orig).await }
1050 async fn acquire_certificate(&self, args: AcquireCertificateArgs, orig: Option<&str>) -> Result<Certificate, WalletError> { self.0.acquire_certificate(args, orig).await }
1051 async fn list_certificates(&self, args: ListCertificatesArgs, orig: Option<&str>) -> Result<ListCertificatesResult, WalletError> { self.0.list_certificates(args, orig).await }
1052 async fn prove_certificate(&self, args: ProveCertificateArgs, orig: Option<&str>) -> Result<ProveCertificateResult, WalletError> { self.0.prove_certificate(args, orig).await }
1053 async fn relinquish_certificate(&self, args: RelinquishCertificateArgs, orig: Option<&str>) -> Result<RelinquishCertificateResult, WalletError> { self.0.relinquish_certificate(args, orig).await }
1054 async fn discover_by_identity_key(&self, args: DiscoverByIdentityKeyArgs, orig: Option<&str>) -> Result<DiscoverCertificatesResult, WalletError> { self.0.discover_by_identity_key(args, orig).await }
1055 async fn discover_by_attributes(&self, args: DiscoverByAttributesArgs, orig: Option<&str>) -> Result<DiscoverCertificatesResult, WalletError> { self.0.discover_by_attributes(args, orig).await }
1056 async fn is_authenticated(&self, orig: Option<&str>) -> Result<AuthenticatedResult, WalletError> { self.0.is_authenticated(orig).await }
1057 async fn wait_for_authentication(&self, orig: Option<&str>) -> Result<AuthenticatedResult, WalletError> { self.0.wait_for_authentication(orig).await }
1058 async fn get_height(&self, orig: Option<&str>) -> Result<GetHeightResult, WalletError> { self.0.get_height(orig).await }
1059 async fn get_header_for_height(&self, args: GetHeaderArgs, orig: Option<&str>) -> Result<GetHeaderResult, WalletError> { self.0.get_header_for_height(args, orig).await }
1060 async fn get_network(&self, orig: Option<&str>) -> Result<GetNetworkResult, WalletError> { self.0.get_network(orig).await }
1061 async fn get_version(&self, orig: Option<&str>) -> Result<GetVersionResult, WalletError> { self.0.get_version(orig).await }
1062 }
1063
1064 #[test]
1070 fn test_send_message_request_format() {
1071 let req = SendMessageRequest {
1072 message: SendMessageParams {
1073 recipient: "03abc123".to_string(),
1074 message_box: "payment_inbox".to_string(),
1075 body: r#"{"encryptedMessage":"abc=="}"#.to_string(),
1076 message_id: "deadbeef01234567".to_string(),
1077 },
1078 payment: None,
1079 };
1080 let json = serde_json::to_string(&req).unwrap();
1081 assert!(json.starts_with(r#"{"message":"#), "must have message wrapper");
1083 assert!(json.contains("\"recipient\""), "camelCase recipient");
1084 assert!(json.contains("\"messageBox\""), "camelCase messageBox");
1085 assert!(json.contains("\"messageId\""), "camelCase messageId");
1086 assert!(json.contains("\"payment_inbox\""), "messageBox value preserved");
1087 assert!(!json.contains("message_box"), "no snake_case leakage");
1088 assert!(!json.contains("message_id"), "no snake_case leakage");
1089 }
1090
1091 #[test]
1093 fn test_acknowledge_request_format() {
1094 let params = AcknowledgeMessageParams {
1095 message_ids: vec!["id1".to_string(), "id2".to_string()],
1096 };
1097 let json = serde_json::to_string(¶ms).unwrap();
1098 assert_eq!(json, r#"{"messageIds":["id1","id2"]}"#);
1099 }
1100
1101 #[test]
1103 fn test_list_messages_response_parsing() {
1104 let raw = r#"{
1105 "status": "success",
1106 "messages": [
1107 {
1108 "messageId": "abc123",
1109 "body": "hello world",
1110 "sender": "03xyz",
1111 "created_at": "2024-01-01T00:00:00Z",
1112 "updated_at": "2024-01-01T00:01:00Z"
1113 }
1114 ]
1115 }"#;
1116 let resp: ListMessagesResponse = serde_json::from_str(raw).unwrap();
1117 assert_eq!(resp.status, "success");
1118 assert_eq!(resp.messages.len(), 1);
1119 assert_eq!(resp.messages[0].message_id, "abc123");
1120 assert_eq!(resp.messages[0].body, "hello world");
1121 assert_eq!(resp.messages[0].sender, "03xyz");
1122 }
1123
1124 #[tokio::test]
1130 async fn test_message_id_is_64_hex_chars() {
1131 let wallet = ArcWallet::new();
1132 let other = ArcWallet::new();
1134 let other_pk = other
1135 .get_public_key(
1136 GetPublicKeyArgs {
1137 identity_key: true,
1138 protocol_id: None,
1139 key_id: None,
1140 counterparty: None,
1141 privileged: false,
1142 privileged_reason: None,
1143 for_self: None,
1144 seek_permission: None,
1145 },
1146 None,
1147 )
1148 .await
1149 .expect("get_public_key")
1150 .public_key
1151 .to_der_hex();
1152
1153 let id = generate_message_id(&wallet, "test body", &other_pk, None)
1154 .await
1155 .expect("generate_message_id");
1156
1157 assert_eq!(id.len(), 64, "HMAC hex must be 64 chars (32 bytes)");
1158 assert!(
1159 id.chars().all(|c| c.is_ascii_hexdigit()),
1160 "all characters must be hex"
1161 );
1162 assert!(
1163 id.chars().all(|c| !c.is_uppercase()),
1164 "hex must be lowercase"
1165 );
1166 }
1167
1168 #[test]
1174 fn list_messages_parses_wrapped_body() {
1175 use super::WrappedMessageBody;
1176 let raw = r#"{"message": "hello world", "payment": {"tx": [1,2,3]}}"#;
1177 let wrapped: WrappedMessageBody = serde_json::from_str(raw).unwrap();
1178 assert!(wrapped.message.is_some(), "message sub-field must be present");
1179 assert!(wrapped.payment.is_some(), "payment sub-field must be present");
1180 let msg_val = wrapped.message.unwrap();
1182 assert_eq!(msg_val.as_str().unwrap(), "hello world");
1183 }
1184
1185 #[test]
1187 fn list_messages_plain_body_passthrough() {
1188 use super::WrappedMessageBody;
1189 let plain = "plain body text";
1191 let result = serde_json::from_str::<WrappedMessageBody>(plain);
1192 assert!(result.is_err(), "plain text must not parse as wrapped body");
1193 }
1194
1195 #[test]
1197 fn list_messages_missing_payment_no_crash() {
1198 use super::WrappedMessageBody;
1199 let raw = r#"{"message": "the content", "payment": null}"#;
1200 let wrapped: WrappedMessageBody = serde_json::from_str(raw).unwrap();
1201 assert!(wrapped.message.is_some(), "message present");
1202 assert!(wrapped.payment.is_none(), "payment is none when null");
1203 }
1204
1205 #[test]
1207 fn test_list_messages_dedup_by_id() {
1208 use super::dedup_messages;
1209 use bsv::remittance::types::PeerMessage;
1210
1211 let msg_a = PeerMessage {
1212 message_id: "id-1".to_string(),
1213 sender: "03sender".to_string(),
1214 recipient: "03me".to_string(),
1215 message_box: "inbox".to_string(),
1216 body: "first".to_string(),
1217 };
1218 let msg_a_dup = PeerMessage {
1219 message_id: "id-1".to_string(), sender: "03sender".to_string(),
1221 recipient: "03me".to_string(),
1222 message_box: "inbox".to_string(),
1223 body: "duplicate".to_string(), };
1225 let msg_b = PeerMessage {
1226 message_id: "id-2".to_string(),
1227 sender: "03sender".to_string(),
1228 recipient: "03me".to_string(),
1229 message_box: "inbox".to_string(),
1230 body: "second".to_string(),
1231 };
1232
1233 let results = vec![vec![msg_a.clone(), msg_b.clone()], vec![msg_a_dup]];
1235 let deduped = dedup_messages(results);
1236
1237 assert_eq!(deduped.len(), 2, "must deduplicate to 2 unique messages");
1238 let first = deduped.iter().find(|m| m.message_id == "id-1").unwrap();
1240 assert_eq!(first.body, "first", "first-seen must win on deduplication");
1241 }
1242
1243 #[tokio::test]
1245 async fn test_message_id_deterministic() {
1246 let wallet = ArcWallet::new();
1247 let other = ArcWallet::new();
1248 let other_pk = other
1249 .get_public_key(
1250 GetPublicKeyArgs {
1251 identity_key: true,
1252 protocol_id: None,
1253 key_id: None,
1254 counterparty: None,
1255 privileged: false,
1256 privileged_reason: None,
1257 for_self: None,
1258 seek_permission: None,
1259 },
1260 None,
1261 )
1262 .await
1263 .expect("get_public_key")
1264 .public_key
1265 .to_der_hex();
1266
1267 let id1 = generate_message_id(&wallet, "same body", &other_pk, None)
1268 .await
1269 .expect("first call");
1270 let id2 = generate_message_id(&wallet, "same body", &other_pk, None)
1271 .await
1272 .expect("second call");
1273
1274 assert_eq!(id1, id2, "same inputs must produce the same HMAC");
1275 }
1276}