use std::sync::Arc;
use bsv::auth::utils::create_nonce;
use bsv::primitives::public_key::PublicKey;
use bsv::primitives::utils::from_hex;
use bsv::remittance::types::PeerMessage;
use bsv::script::templates::{P2PKH, ScriptTemplateLock};
use bsv::wallet::interfaces::{
CreateActionArgs, CreateActionOptions, CreateActionOutput, GetPublicKeyArgs,
InternalizeActionArgs, InternalizeOutput, Payment, SignActionArgs, WalletInterface,
};
use bsv::wallet::types::{BooleanDefaultTrue, Counterparty, CounterpartyType, Protocol};
use crate::client::MessageBoxClient;
use crate::error::MessageBoxError;
use crate::types::{IncomingPayment, PaymentCustomInstructions, PaymentToken};
impl<W: WalletInterface + Clone + 'static + Send + Sync> MessageBoxClient<W> {
pub async fn create_payment_token(
&self,
recipient: &str,
amount: u64,
) -> Result<PaymentToken, MessageBoxError> {
let prefix = create_nonce(self.wallet())
.await
.map_err(|e| MessageBoxError::Auth(format!("create_nonce prefix: {e}")))?;
let suffix = create_nonce(self.wallet())
.await
.map_err(|e| MessageBoxError::Auth(format!("create_nonce suffix: {e}")))?;
let pk_result = self
.wallet()
.get_public_key(
GetPublicKeyArgs {
identity_key: false,
protocol_id: Some(Protocol {
security_level: 2,
protocol: "3241645161d8".to_string(),
}),
key_id: Some(format!("{prefix} {suffix}")),
counterparty: Some(Counterparty {
counterparty_type: CounterpartyType::Other,
public_key: Some(
PublicKey::from_string(recipient)
.map_err(|e| MessageBoxError::Wallet(e.to_string()))?,
),
}),
privileged: false,
privileged_reason: None,
for_self: None,
seek_permission: None,
},
self.originator(),
)
.await
.map_err(|e| MessageBoxError::Wallet(e.to_string()))?;
let hash_vec = pk_result.public_key.to_hash();
let mut hash = [0u8; 20];
hash.copy_from_slice(&hash_vec);
let lock_script = P2PKH::from_public_key_hash(hash)
.lock()
.map_err(|e| MessageBoxError::Wallet(format!("P2PKH lock error: {e}")))?;
let locking_script_bytes = from_hex(&lock_script.to_hex())
.map_err(|e| MessageBoxError::Wallet(format!("hex decode locking script: {e}")))?;
let custom_instructions = PaymentCustomInstructions {
derivation_prefix: prefix.clone(),
derivation_suffix: suffix.clone(),
payee: Some(recipient.to_string()),
};
let create_result = self
.wallet()
.create_action(
CreateActionArgs {
description: "PeerPay payment".to_string(),
input_beef: None,
inputs: vec![],
outputs: vec![CreateActionOutput {
locking_script: Some(locking_script_bytes),
satoshis: amount,
output_description: "Payment for PeerPay transaction".to_string(),
basket: None,
custom_instructions: Some(
serde_json::to_string(&custom_instructions)
.map_err(MessageBoxError::Json)?,
),
tags: vec![],
}],
lock_time: None,
version: None,
labels: vec!["peerpay".to_string()],
options: Some(CreateActionOptions {
randomize_outputs: BooleanDefaultTrue(Some(false)),
sign_and_process: BooleanDefaultTrue(None),
accept_delayed_broadcast: BooleanDefaultTrue(None),
..Default::default()
}),
reference: None,
},
self.originator(),
)
.await
.map_err(|e| MessageBoxError::Wallet(e.to_string()))?;
let tx = if let Some(tx_bytes) = create_result.tx {
tx_bytes
} else if let Some(signable) = create_result.signable_transaction {
let sign_result = self
.wallet()
.sign_action(
SignActionArgs {
reference: signable.reference,
spends: std::collections::HashMap::new(),
options: None,
},
self.originator(),
)
.await
.map_err(|e| MessageBoxError::Wallet(e.to_string()))?;
sign_result
.tx
.ok_or_else(|| MessageBoxError::Wallet("sign_action returned no tx".to_string()))?
} else {
return Err(MessageBoxError::Wallet(
"create_action returned neither tx nor signable_transaction".to_string(),
));
};
Ok(PaymentToken {
custom_instructions,
transaction: tx,
amount,
output_index: None,
})
}
pub async fn send_payment(
&self,
recipient: &str,
amount: u64,
) -> Result<String, MessageBoxError> {
let token = self.create_payment_token(recipient, amount).await?;
let token_json = serde_json::to_string(&token)?;
self.send_message(recipient, "payment_inbox", &token_json, false, false, None, None).await
}
pub async fn send_live_payment(
&self,
recipient: &str,
amount: u64,
) -> Result<String, MessageBoxError> {
let token = self.create_payment_token(recipient, amount).await?;
let token_json = serde_json::to_string(&token)?;
self.send_live_message(recipient, "payment_inbox", &token_json, false, false, None, None).await
}
pub async fn listen_for_live_payments(
&self,
on_payment: Arc<dyn Fn(IncomingPayment) + Send + Sync>,
) -> Result<(), MessageBoxError> {
let wrapper: Arc<dyn Fn(PeerMessage) + Send + Sync> = Arc::new(move |msg: PeerMessage| {
if let Ok(token) = serde_json::from_str::<PaymentToken>(&msg.body) {
let incoming = IncomingPayment {
token,
sender: msg.sender,
message_id: msg.message_id,
};
on_payment(incoming);
}
});
self.listen_for_live_messages("payment_inbox", wrapper, None).await
}
pub async fn accept_payment(&self, payment: &IncomingPayment) -> Result<(), MessageBoxError> {
use base64::{engine::general_purpose::STANDARD, Engine};
let sender_pk = PublicKey::from_string(&payment.sender)
.map_err(|e| MessageBoxError::Wallet(format!("invalid sender key: {e}")))?;
let prefix_bytes = STANDARD
.decode(&payment.token.custom_instructions.derivation_prefix)
.map_err(|e| MessageBoxError::Wallet(format!("base64 decode prefix: {e}")))?;
let suffix_bytes = STANDARD
.decode(&payment.token.custom_instructions.derivation_suffix)
.map_err(|e| MessageBoxError::Wallet(format!("base64 decode suffix: {e}")))?;
self.wallet()
.internalize_action(
InternalizeActionArgs {
tx: payment.token.transaction.clone(),
description: "PeerPay Payment".to_string(),
labels: vec!["peerpay".to_string()],
seek_permission: BooleanDefaultTrue(Some(true)),
outputs: vec![InternalizeOutput::WalletPayment {
output_index: payment.token.output_index.unwrap_or(0),
payment: Payment {
derivation_prefix: prefix_bytes,
derivation_suffix: suffix_bytes,
sender_identity_key: sender_pk,
},
}],
},
self.originator(),
)
.await
.map_err(|e| MessageBoxError::Wallet(e.to_string()))?;
self.acknowledge_message(vec![payment.message_id.clone()], None).await?;
Ok(())
}
pub async fn reject_payment(&self, payment: &IncomingPayment) -> Result<(), MessageBoxError> {
if payment.token.amount < 2000 {
return self.acknowledge_message(vec![payment.message_id.clone()], None).await;
}
self.accept_payment(payment).await?;
if let Err(e) = self.send_payment(&payment.sender, payment.token.amount - 1000).await {
if Self::is_401_error(&e) { return Ok(()); }
return Err(e);
}
if let Err(e) = self.acknowledge_message(vec![payment.message_id.clone()], None).await {
if Self::is_401_error(&e) { return Ok(()); }
return Err(e);
}
Ok(())
}
fn is_401_error(e: &MessageBoxError) -> bool {
matches!(e, MessageBoxError::Http(401, _))
|| matches!(e, MessageBoxError::Auth(msg) if msg.contains("401"))
}
pub async fn list_incoming_payments(
&self,
) -> Result<Vec<IncomingPayment>, MessageBoxError> {
let messages = self.list_messages("payment_inbox", false, None).await?;
let payments = messages
.into_iter()
.filter_map(|msg| {
serde_json::from_str::<PaymentToken>(&msg.body)
.ok()
.map(|token| IncomingPayment {
token,
sender: msg.sender,
message_id: msg.message_id,
})
})
.collect();
Ok(payments)
}
pub async fn acknowledge_notification(
&self,
message: &PeerMessage,
) -> Result<bool, MessageBoxError> {
self.acknowledge_message(vec![message.message_id.clone()], None).await?;
let parsed = serde_json::from_str::<crate::http_ops::WrappedMessageBody>(&message.body);
let payment_data = parsed.ok().and_then(|w| w.payment);
if let Some(payment) = payment_data {
if let (Some(tx_bytes), Some(outputs)) = (&payment.tx, &payment.outputs) {
let description = payment
.description
.clone()
.unwrap_or_else(|| "MessageBox recipient payment".to_string());
let internalize_outputs: Vec<InternalizeOutput> = outputs
.iter()
.filter_map(|o| {
let sender_pk = o.sender_identity_key
.as_deref()
.and_then(|k| bsv::primitives::public_key::PublicKey::from_string(k).ok())?;
Some(InternalizeOutput::WalletPayment {
output_index: o.output_index.unwrap_or(0),
payment: Payment {
derivation_prefix: o.derivation_prefix.clone().unwrap_or_default(),
derivation_suffix: o.derivation_suffix.clone().unwrap_or_default(),
sender_identity_key: sender_pk,
},
})
})
.collect();
if internalize_outputs.is_empty() {
return Ok(false);
}
let args = InternalizeActionArgs {
tx: tx_bytes.clone(),
description,
labels: vec!["notification-payment".to_string()],
seek_permission: bsv::wallet::types::BooleanDefaultTrue(Some(false)),
outputs: internalize_outputs,
};
match self.wallet().internalize_action(args, self.originator()).await {
Ok(_) => return Ok(true),
Err(_) => return Ok(false),
}
}
}
Ok(false)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::types::{IncomingPayment, PaymentCustomInstructions, PaymentToken, ServerPeerMessage};
use bsv::primitives::private_key::PrivateKey;
use bsv::remittance::types::PeerMessage;
use bsv::wallet::error::WalletError;
use bsv::wallet::interfaces::*;
use bsv::wallet::proto_wallet::ProtoWallet;
use std::sync::Arc;
#[derive(Clone)]
struct ArcWallet(Arc<ProtoWallet>);
impl ArcWallet {
fn new() -> Self {
let key = PrivateKey::from_random().expect("random key");
ArcWallet(Arc::new(ProtoWallet::new(key)))
}
async fn identity_hex(&self) -> String {
self.get_public_key(
GetPublicKeyArgs {
identity_key: true,
protocol_id: None,
key_id: None,
counterparty: None,
privileged: false,
privileged_reason: None,
for_self: None,
seek_permission: None,
},
None,
)
.await
.expect("get_public_key")
.public_key
.to_der_hex()
}
}
#[async_trait::async_trait]
impl WalletInterface for ArcWallet {
async fn create_action(&self, args: CreateActionArgs, orig: Option<&str>) -> Result<CreateActionResult, WalletError> { self.0.create_action(args, orig).await }
async fn sign_action(&self, args: SignActionArgs, orig: Option<&str>) -> Result<SignActionResult, WalletError> { self.0.sign_action(args, orig).await }
async fn abort_action(&self, args: AbortActionArgs, orig: Option<&str>) -> Result<AbortActionResult, WalletError> { self.0.abort_action(args, orig).await }
async fn list_actions(&self, args: ListActionsArgs, orig: Option<&str>) -> Result<ListActionsResult, WalletError> { self.0.list_actions(args, orig).await }
async fn internalize_action(&self, args: InternalizeActionArgs, orig: Option<&str>) -> Result<InternalizeActionResult, WalletError> { self.0.internalize_action(args, orig).await }
async fn list_outputs(&self, args: ListOutputsArgs, orig: Option<&str>) -> Result<ListOutputsResult, WalletError> { self.0.list_outputs(args, orig).await }
async fn relinquish_output(&self, args: RelinquishOutputArgs, orig: Option<&str>) -> Result<RelinquishOutputResult, WalletError> { self.0.relinquish_output(args, orig).await }
async fn get_public_key(&self, args: GetPublicKeyArgs, orig: Option<&str>) -> Result<GetPublicKeyResult, WalletError> { self.0.get_public_key(args, orig).await }
async fn reveal_counterparty_key_linkage(&self, args: RevealCounterpartyKeyLinkageArgs, orig: Option<&str>) -> Result<RevealCounterpartyKeyLinkageResult, WalletError> { self.0.reveal_counterparty_key_linkage(args, orig).await }
async fn reveal_specific_key_linkage(&self, args: RevealSpecificKeyLinkageArgs, orig: Option<&str>) -> Result<RevealSpecificKeyLinkageResult, WalletError> { self.0.reveal_specific_key_linkage(args, orig).await }
async fn encrypt(&self, args: EncryptArgs, orig: Option<&str>) -> Result<EncryptResult, WalletError> { self.0.encrypt(args, orig).await }
async fn decrypt(&self, args: DecryptArgs, orig: Option<&str>) -> Result<DecryptResult, WalletError> { self.0.decrypt(args, orig).await }
async fn create_hmac(&self, args: CreateHmacArgs, orig: Option<&str>) -> Result<CreateHmacResult, WalletError> { self.0.create_hmac(args, orig).await }
async fn verify_hmac(&self, args: VerifyHmacArgs, orig: Option<&str>) -> Result<VerifyHmacResult, WalletError> { self.0.verify_hmac(args, orig).await }
async fn create_signature(&self, args: CreateSignatureArgs, orig: Option<&str>) -> Result<CreateSignatureResult, WalletError> { self.0.create_signature(args, orig).await }
async fn verify_signature(&self, args: VerifySignatureArgs, orig: Option<&str>) -> Result<VerifySignatureResult, WalletError> { self.0.verify_signature(args, orig).await }
async fn acquire_certificate(&self, args: AcquireCertificateArgs, orig: Option<&str>) -> Result<Certificate, WalletError> { self.0.acquire_certificate(args, orig).await }
async fn list_certificates(&self, args: ListCertificatesArgs, orig: Option<&str>) -> Result<ListCertificatesResult, WalletError> { self.0.list_certificates(args, orig).await }
async fn prove_certificate(&self, args: ProveCertificateArgs, orig: Option<&str>) -> Result<ProveCertificateResult, WalletError> { self.0.prove_certificate(args, orig).await }
async fn relinquish_certificate(&self, args: RelinquishCertificateArgs, orig: Option<&str>) -> Result<RelinquishCertificateResult, WalletError> { self.0.relinquish_certificate(args, orig).await }
async fn discover_by_identity_key(&self, args: DiscoverByIdentityKeyArgs, orig: Option<&str>) -> Result<DiscoverCertificatesResult, WalletError> { self.0.discover_by_identity_key(args, orig).await }
async fn discover_by_attributes(&self, args: DiscoverByAttributesArgs, orig: Option<&str>) -> Result<DiscoverCertificatesResult, WalletError> { self.0.discover_by_attributes(args, orig).await }
async fn is_authenticated(&self, orig: Option<&str>) -> Result<AuthenticatedResult, WalletError> { self.0.is_authenticated(orig).await }
async fn wait_for_authentication(&self, orig: Option<&str>) -> Result<AuthenticatedResult, WalletError> { self.0.wait_for_authentication(orig).await }
async fn get_height(&self, orig: Option<&str>) -> Result<GetHeightResult, WalletError> { self.0.get_height(orig).await }
async fn get_header_for_height(&self, args: GetHeaderArgs, orig: Option<&str>) -> Result<GetHeaderResult, WalletError> { self.0.get_header_for_height(args, orig).await }
async fn get_network(&self, orig: Option<&str>) -> Result<GetNetworkResult, WalletError> { self.0.get_network(orig).await }
async fn get_version(&self, orig: Option<&str>) -> Result<GetVersionResult, WalletError> { self.0.get_version(orig).await }
}
#[tokio::test]
async fn create_payment_token_uses_create_nonce() {
let sender = ArcWallet::new();
let recipient = ArcWallet::new();
let recipient_pk = recipient.identity_hex().await;
let client = crate::client::MessageBoxClient::new(
"https://example.com".to_string(),
sender,
None,
bsv::services::overlay_tools::Network::Mainnet,
);
let result = client.create_payment_token(&recipient_pk, 1000).await;
match &result {
Err(e) => {
let msg = e.to_string();
assert!(!msg.contains("create_nonce prefix:"), "should not fail at nonce step");
println!("create_payment_token expected error: {msg}");
}
Ok(_) => {
println!("create_payment_token succeeded unexpectedly — wallet may have funds");
}
}
}
#[allow(dead_code)]
fn send_payment_compile_check(client: &crate::client::MessageBoxClient<ArcWallet>) {
let _ = client.send_payment("03abc", 1000);
}
#[test]
fn reject_payment_threshold_below_2000() {
assert!(1999_u64 < 2000, "below threshold: only ack path");
assert!(2000_u64 >= 2000, "at threshold: accept + refund path");
let amount: u64 = 3000;
let refund = amount - 1000;
assert_eq!(refund, 2000, "refund is amount minus 1000 sat fee");
}
#[test]
fn list_incoming_payments_skips_unparseable() {
let messages = vec![
ServerPeerMessage {
message_id: "msg1".to_string(),
body: "not valid json".to_string(),
sender: "03sender1".to_string(),
created_at: "2024-01-01T00:00:00Z".to_string(),
updated_at: "2024-01-01T00:00:00Z".to_string(),
acknowledged: None,
},
ServerPeerMessage {
message_id: "msg2".to_string(),
body: r#"{"customInstructions":{"derivationPrefix":"p","derivationSuffix":"s"},"transaction":[1,2,3],"amount":1000}"#.to_string(),
sender: "03sender2".to_string(),
created_at: "2024-01-01T00:00:00Z".to_string(),
updated_at: "2024-01-01T00:00:00Z".to_string(),
acknowledged: None,
},
ServerPeerMessage {
message_id: "msg3".to_string(),
body: r#"{"foo":"bar"}"#.to_string(),
sender: "03sender3".to_string(),
created_at: "2024-01-01T00:00:00Z".to_string(),
updated_at: "2024-01-01T00:00:00Z".to_string(),
acknowledged: None,
},
];
let payments: Vec<IncomingPayment> = messages
.into_iter()
.filter_map(|msg| {
serde_json::from_str::<PaymentToken>(&msg.body)
.ok()
.map(|token| IncomingPayment {
token,
sender: msg.sender,
message_id: msg.message_id,
})
})
.collect();
assert_eq!(payments.len(), 1, "only valid payment token should be included");
assert_eq!(payments[0].message_id, "msg2");
assert_eq!(payments[0].sender, "03sender2");
assert_eq!(payments[0].token.amount, 1000);
}
#[test]
fn accept_payment_base64_round_trip() {
use base64::{engine::general_purpose::STANDARD, Engine};
let prefix = "dGVzdC1wcmVmaXg="; let suffix = "dGVzdC1zdWZmaXg=";
let prefix_bytes = STANDARD.decode(prefix).unwrap();
let suffix_bytes = STANDARD.decode(suffix).unwrap();
assert_eq!(prefix_bytes, b"test-prefix");
assert_eq!(suffix_bytes, b"test-suffix");
let re_encoded = STANDARD.encode(&prefix_bytes);
assert_eq!(re_encoded, prefix, "round-trip must produce original base64");
}
#[test]
fn incoming_payment_round_trip() {
let token = PaymentToken {
custom_instructions: PaymentCustomInstructions {
derivation_prefix: "pfx".to_string(),
derivation_suffix: "sfx".to_string(),
payee: Some("03recipient".to_string()),
},
transaction: vec![0xde, 0xad, 0xbe, 0xef],
amount: 5000,
output_index: None,
};
let incoming = IncomingPayment {
token: token.clone(),
sender: "03sender_key".to_string(),
message_id: "abc123".to_string(),
};
assert_eq!(incoming.sender, "03sender_key");
assert_eq!(incoming.message_id, "abc123");
assert_eq!(incoming.token.amount, 5000);
assert_eq!(incoming.token.transaction, vec![0xde, 0xad, 0xbe, 0xef]);
assert_eq!(incoming.token.custom_instructions.derivation_prefix, "pfx");
assert_eq!(incoming.token.custom_instructions.derivation_suffix, "sfx");
assert_eq!(incoming.token.output_index, None);
}
#[allow(dead_code)]
fn send_live_payment_compile_check(client: &crate::client::MessageBoxClient<ArcWallet>) {
let _fut = client.send_live_payment("03abc", 1000);
}
#[test]
fn listen_for_live_payments_callback_parses_token() {
use std::sync::Mutex as StdMutex;
let received = Arc::new(StdMutex::new(Vec::<IncomingPayment>::new()));
let received_clone = received.clone();
let msg = PeerMessage {
message_id: "msg-live-1".to_string(),
sender: "03sender".to_string(),
recipient: "03recipient".to_string(),
message_box: "payment_inbox".to_string(),
body: r#"{"customInstructions":{"derivationPrefix":"p","derivationSuffix":"s"},"transaction":[1,2],"amount":500}"#.to_string(),
};
if let Ok(token) = serde_json::from_str::<PaymentToken>(&msg.body) {
let incoming = IncomingPayment {
token,
sender: msg.sender.clone(),
message_id: msg.message_id.clone(),
};
received_clone.lock().unwrap().push(incoming);
}
let payments = received.lock().unwrap();
assert_eq!(payments.len(), 1, "one valid payment should be parsed");
assert_eq!(payments[0].token.amount, 500);
assert_eq!(payments[0].sender, "03sender");
assert_eq!(payments[0].message_id, "msg-live-1");
}
#[test]
fn listen_for_live_payments_callback_skips_non_payment() {
use std::sync::Mutex as StdMutex;
let received = Arc::new(StdMutex::new(Vec::<IncomingPayment>::new()));
let received_clone = received.clone();
let msg = PeerMessage {
message_id: "msg-bad-1".to_string(),
sender: "03sender".to_string(),
recipient: "03recipient".to_string(),
message_box: "payment_inbox".to_string(),
body: r#"{"not":"a payment token"}"#.to_string(),
};
if let Ok(token) = serde_json::from_str::<PaymentToken>(&msg.body) {
let incoming = IncomingPayment {
token,
sender: msg.sender.clone(),
message_id: msg.message_id.clone(),
};
received_clone.lock().unwrap().push(incoming);
}
let payments = received.lock().unwrap();
assert_eq!(payments.len(), 0, "non-payment message must be silently skipped");
}
}