tap_node/message/
sender.rs

1//! Message sender implementations for TAP Node.
2//!
3//! This module provides functionality for sending TAP messages to recipients
4//! using various transport mechanisms.
5//!
6//! # Overview
7//!
8//! The message sender system in TAP Node implements different strategies for
9//! delivering DIDComm messages to their recipients. The primary implementations are:
10//!
11//! - `NodeMessageSender`: A flexible sender that uses a callback function for delivery
12//! - `HttpMessageSender`: Sends messages over HTTP with robust error handling and retries
13//!
14//! # Cross-platform Support
15//!
16//! The `HttpMessageSender` implementation supports multiple platforms:
17//!
18//! - Native environments (using reqwest)
19//! - WASM environments (using web-sys)
20//! - Fallback implementations for environments without these libraries
21//!
22//! # Usage with TapNode
23//!
24//! ```no_run
25//! # use std::sync::Arc;
26//! # use tap_node::{NodeConfig, TapNode, HttpMessageSender, MessageSender};
27//! # use tap_msg::didcomm::Message;
28//! # use serde_json::json;
29//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
30//! // Create a TapNode
31//! let node = TapNode::new(NodeConfig::default());
32//!
33//! // Create a sample message
34//! let message = Message {
35//!     id: "test-123".to_string(),
36//!     typ: "https://tap.rsvp/schema/1.0#transfer".to_string(),
37//!     type_: "https://tap.rsvp/schema/1.0#transfer".to_string(),
38//!     body: json!({"amount": "100.00"}),
39//!     from: Some("did:example:sender".to_string()),
40//!     to: Some(vec!["did:example:recipient".to_string()]),
41//!     created_time: Some(chrono::Utc::now().timestamp() as u64),
42//!     expires_time: None,
43//!     thid: None,
44//!     pthid: None,
45//!     attachments: None,
46//!     from_prior: None,
47//!     extra_headers: Default::default(),
48//! };
49//!
50//! // Pack a message using the node's send_message method
51//! let packed_message = node.send_message(
52//!     "did:example:sender",
53//!     "did:example:recipient",
54//!     message
55//! ).await?;
56//!
57//! // Create an HTTP sender for external dispatch
58//! let sender = HttpMessageSender::new("https://recipient-endpoint.example.com".to_string());
59//!
60//! // Send the packed message to the recipient node
61//! sender.send(
62//!     packed_message,
63//!     vec!["did:example:recipient".to_string()]
64//! ).await?;
65//! # Ok(())
66//! # }
67//! ```
68
69use async_trait::async_trait;
70#[cfg(all(not(target_arch = "wasm32"), feature = "websocket"))]
71use std::collections::HashMap;
72use std::fmt::{self, Debug};
73use std::sync::Arc;
74
75use crate::error::{Error, Result};
76
77/// Message sender trait for sending packed messages to recipients
78#[async_trait]
79pub trait MessageSender: Send + Sync + Debug {
80    /// Send a packed message to one or more recipients
81    async fn send(&self, packed_message: String, recipient_dids: Vec<String>) -> Result<()>;
82}
83
84/// Node message sender implementation
85pub struct NodeMessageSender {
86    /// Callback function for sending messages
87    #[allow(dead_code)]
88    send_callback: Arc<dyn Fn(String, Vec<String>) -> Result<()> + Send + Sync>,
89}
90
91impl Debug for NodeMessageSender {
92    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
93        f.debug_struct("NodeMessageSender")
94            .field("send_callback", &"<function>")
95            .finish()
96    }
97}
98
99impl NodeMessageSender {
100    /// Create a new NodeMessageSender with the given callback
101    pub fn new<F>(callback: F) -> Self
102    where
103        F: Fn(String, Vec<String>) -> Result<()> + Send + Sync + 'static,
104    {
105        Self {
106            send_callback: Arc::new(callback),
107        }
108    }
109}
110
111#[async_trait]
112impl MessageSender for NodeMessageSender {
113    async fn send(&self, packed_message: String, recipient_dids: Vec<String>) -> Result<()> {
114        // Call the callback function with the packed message and recipient DIDs
115        (self.send_callback)(packed_message, recipient_dids)
116            .map_err(|e| Error::Dispatch(format!("Failed to send message: {}", e)))
117    }
118}
119
120/// HTTP message sender implementation for sending messages over HTTP
121///
122/// This sender allows TAP nodes to send messages to other TAP nodes over HTTP,
123/// handling the necessary encoding, content types, and error handling.
124///
125/// # HTTP Endpoint Structure
126///
127/// Messages are sent to endpoints derived from the recipient's DID, using a
128/// configurable base URL.
129///
130/// # Error Handling
131///
132/// The sender includes built-in error handling for common HTTP issues:
133/// - Connection timeouts
134/// - Request failures
135/// - Invalid responses
136/// - Rate limiting
137///
138/// # Configuration
139///
140/// The sender can be configured with:
141/// - Base URL for the HTTP endpoint
142/// - Timeout settings
143/// - Retry policies
144#[derive(Debug)]
145pub struct HttpMessageSender {
146    /// Base URL for the HTTP endpoint
147    base_url: String,
148    /// HTTP client (only in native environments)
149    #[cfg(feature = "reqwest")]
150    client: reqwest::Client,
151    /// Timeout for HTTP requests in milliseconds
152    #[allow(dead_code)]
153    timeout_ms: u64,
154    /// Maximum number of retries
155    max_retries: u32,
156}
157
158impl HttpMessageSender {
159    /// Create a new HttpMessageSender with the given base URL
160    pub fn new(base_url: String) -> Self {
161        Self::with_options(base_url, 30000, 3) // Default 30 second timeout, 3 retries
162    }
163
164    /// Create a new HttpMessageSender with custom options
165    pub fn with_options(base_url: String, timeout_ms: u64, max_retries: u32) -> Self {
166        #[cfg(feature = "reqwest")]
167        {
168            // Create a reqwest client with appropriate settings
169            let client = reqwest::Client::builder()
170                .timeout(std::time::Duration::from_millis(timeout_ms))
171                .user_agent("TAP-Node/0.1")
172                .build()
173                .unwrap_or_default();
174
175            Self {
176                base_url,
177                client,
178                timeout_ms,
179                max_retries,
180            }
181        }
182
183        #[cfg(not(feature = "reqwest"))]
184        {
185            Self {
186                base_url,
187                timeout_ms,
188                max_retries,
189            }
190        }
191    }
192
193    /// Helper to construct the endpoint URL for a recipient
194    fn get_endpoint_url(&self, recipient_did: &str) -> String {
195        // In a production implementation, this would map DID to HTTP endpoint
196        // This could involve DID resolution or a lookup table
197
198        // For now, we'll use a simple convention:
199        // Append the DID to the base URL, with proper URL encoding
200        let encoded_did = self.url_encode(recipient_did);
201        format!(
202            "{}/api/messages/{}",
203            self.base_url.trim_end_matches('/'),
204            encoded_did
205        )
206    }
207
208    /// Simple URL encoding function
209    fn url_encode(&self, text: &str) -> String {
210        // Simple encoding of common URL-unsafe characters
211        // In a real implementation, you would use a proper URL encoding library
212        text.replace(':', "%3A").replace('/', "%2F")
213    }
214}
215
216/// WebSocket message sender implementation for sending messages over WebSockets
217///
218/// This sender enables real-time bidirectional communication between TAP nodes,
219/// providing a persistent connection that can be used for both sending and receiving
220/// messages. WebSockets are particularly useful for scenarios requiring:
221///
222/// - Low-latency message delivery
223/// - Bidirectional communication
224/// - Connection state awareness
225/// - Reduced overhead compared to repeated HTTP requests
226///
227/// # Connection Management
228///
229/// The WebSocket sender manages a pool of connections to recipient endpoints,
230/// keeping them alive and reconnecting as needed. This makes it suitable for
231/// high-frequency message exchanges between known parties.
232///
233/// # Error Handling
234///
235/// The sender handles various WebSocket-specific error conditions:
236/// - Connection failures
237/// - Message delivery failures
238/// - Connection drops and reconnection
239/// - Protocol errors
240///
241/// # Cross-platform Support
242///
243/// Like the HTTP sender, the WebSocket sender supports:
244/// - Native environments (using tokio_tungstenite)
245/// - WASM environments (using web-sys WebSocket API)
246/// - Fallback implementations for environments without these libraries
247#[derive(Debug)]
248pub struct WebSocketMessageSender {
249    /// Base URL for WebSocket endpoints (ws:// or wss://)
250    base_url: String,
251    /// Active connections (only in native environments)
252    #[cfg(all(not(target_arch = "wasm32"), feature = "websocket"))]
253    connections: std::sync::Mutex<HashMap<String, tokio::sync::mpsc::Sender<String>>>,
254    /// WebSocket task handles (only in native environments)
255    #[cfg(all(not(target_arch = "wasm32"), feature = "websocket"))]
256    task_handles: std::sync::Mutex<HashMap<String, tokio::task::JoinHandle<()>>>,
257}
258
259impl WebSocketMessageSender {
260    /// Create a new WebSocketMessageSender with the given base URL
261    pub fn new(base_url: String) -> Self {
262        Self::with_options(base_url)
263    }
264
265    /// Create a new WebSocketMessageSender with custom options
266    pub fn with_options(base_url: String) -> Self {
267        #[cfg(all(not(target_arch = "wasm32"), feature = "websocket"))]
268        {
269            Self {
270                base_url,
271                connections: std::sync::Mutex::new(HashMap::new()),
272                task_handles: std::sync::Mutex::new(HashMap::new()),
273            }
274        }
275
276        #[cfg(not(all(not(target_arch = "wasm32"), feature = "websocket")))]
277        {
278            Self { base_url }
279        }
280    }
281
282    /// Helper to construct the WebSocket endpoint URL for a recipient
283    fn get_endpoint_url(&self, recipient_did: &str) -> String {
284        // In a production implementation, this would map DID to WebSocket endpoint
285        // This could involve DID resolution or a lookup table
286
287        // Convert http(s):// to ws(s)://
288        let ws_base_url = if self.base_url.starts_with("https://") {
289            self.base_url.replace("https://", "wss://")
290        } else if self.base_url.starts_with("http://") {
291            self.base_url.replace("http://", "ws://")
292        } else {
293            self.base_url.clone()
294        };
295
296        // Append the DID to the base URL, with proper URL encoding
297        let encoded_did = self.url_encode(recipient_did);
298        format!(
299            "{}/ws/messages/{}",
300            ws_base_url.trim_end_matches('/'),
301            encoded_did
302        )
303    }
304
305    /// Simple URL encoding function
306    fn url_encode(&self, text: &str) -> String {
307        // Simple encoding of common URL-unsafe characters
308        // In a real implementation, you would use a proper URL encoding library
309        text.replace(':', "%3A").replace('/', "%2F")
310    }
311
312    /// Ensures a connection exists for the given recipient
313    #[cfg(all(not(target_arch = "wasm32"), feature = "websocket"))]
314    async fn ensure_connection(
315        &self,
316        recipient: &str,
317    ) -> Result<tokio::sync::mpsc::Sender<String>> {
318        use futures::sink::SinkExt;
319        use futures::stream::StreamExt;
320        use tokio_tungstenite::{connect_async, tungstenite::protocol::Message};
321
322        // Check if we already have an active connection and return it if we do
323        {
324            // Scope the lock to ensure it's released before any await points
325            let connections = self.connections.lock().unwrap();
326            if let Some(connection) = connections.get(recipient) {
327                return Ok(connection.clone());
328            }
329        }
330
331        // Otherwise, create a new connection
332        let endpoint = self.get_endpoint_url(recipient);
333        log::info!(
334            "Creating new WebSocket connection to {} at {}",
335            recipient,
336            endpoint
337        );
338
339        // Create a channel for sending messages to the WebSocket task
340        let (tx, mut rx) = tokio::sync::mpsc::channel::<String>(100);
341
342        // Connect to the WebSocket with default timeout (30 seconds)
343        let (ws_stream, _) = match tokio::time::timeout(
344            std::time::Duration::from_millis(30000),
345            connect_async(&endpoint),
346        )
347        .await
348        {
349            Ok(Ok(stream)) => stream,
350            Ok(Err(e)) => {
351                return Err(Error::Dispatch(format!(
352                    "Failed to connect to WebSocket endpoint {}: {}",
353                    endpoint, e
354                )));
355            }
356            Err(_) => {
357                return Err(Error::Dispatch(format!(
358                    "Connection to WebSocket endpoint {} timed out",
359                    endpoint
360                )));
361            }
362        };
363
364        log::debug!("WebSocket connection established to {}", recipient);
365
366        // Split the WebSocket stream
367        let (mut write, mut read) = ws_stream.split();
368
369        // Create a task that will:
370        // 1. Listen for messages from the channel and send them to the WebSocket
371        // 2. Listen for messages from the WebSocket and handle them
372        let recipient_clone = recipient.to_string();
373        let handle = tokio::spawn(async move {
374            // Process messages from the channel to send over WebSocket
375            loop {
376                tokio::select! {
377                    // Handle outgoing messages
378                    Some(message) = rx.recv() => {
379                        log::debug!("Sending message to {} via WebSocket", recipient_clone);
380                        if let Err(e) = write.send(Message::Text(message)).await {
381                            log::error!("Failed to send WebSocket message to {}: {}", recipient_clone, e);
382                            // Try to reconnect? For now we'll just log the error
383                        }
384                    }
385
386                    // Handle incoming messages
387                    result = read.next() => {
388                        match result {
389                            Some(Ok(message)) => {
390                                // Process incoming message - for now just log it
391                                if let Message::Text(text) = message {
392                                    log::debug!("Received WebSocket message from {}: {}", recipient_clone, text);
393                                }
394                            }
395                            Some(Err(e)) => {
396                                log::error!("WebSocket error from {}: {}", recipient_clone, e);
397                                // Connection likely dropped, exit the loop
398                                break;
399                            }
400                            None => {
401                                // WebSocket closed
402                                log::info!("WebSocket connection to {} closed", recipient_clone);
403                                break;
404                            }
405                        }
406                    }
407                }
408            }
409
410            // WebSocket loop ended - clean up and possibly reconnect
411            log::info!("WebSocket connection to {} terminated", recipient_clone);
412        });
413
414        // Store the sender and task handle (using separate scopes to avoid holding multiple locks)
415        {
416            // Get mutable access to the connections map
417            let mut connections = self.connections.lock().unwrap();
418            connections.insert(recipient.to_string(), tx.clone());
419        }
420
421        {
422            // Get mutable access to the task handles map
423            let mut task_handles = self.task_handles.lock().unwrap();
424            task_handles.insert(recipient.to_string(), handle);
425        }
426
427        Ok(tx)
428    }
429}
430
431#[cfg(all(not(target_arch = "wasm32"), feature = "websocket"))]
432#[async_trait]
433impl MessageSender for WebSocketMessageSender {
434    async fn send(&self, packed_message: String, recipient_dids: Vec<String>) -> Result<()> {
435        if recipient_dids.is_empty() {
436            return Err(Error::Dispatch("No recipients specified".to_string()));
437        }
438
439        // Track failures to report them at the end
440        let mut failures = Vec::new();
441
442        // Send the message to each recipient
443        for recipient in &recipient_dids {
444            log::info!("Sending message to {} via WebSocket", recipient);
445
446            // Ensure we have a connection
447            match self.ensure_connection(recipient).await {
448                Ok(sender) => {
449                    // Send the message through the channel to the WebSocket task
450                    if let Err(e) = sender.send(packed_message.clone()).await {
451                        let err_msg = format!("Failed to send message to WebSocket task: {}", e);
452                        log::error!("{}", err_msg);
453                        failures.push((recipient.clone(), err_msg));
454                    }
455                }
456                Err(e) => {
457                    let err_msg = format!("Failed to establish WebSocket connection: {}", e);
458                    log::error!("{}", err_msg);
459                    failures.push((recipient.clone(), err_msg));
460                }
461            }
462        }
463
464        // Report failures if any
465        if !failures.is_empty() {
466            let failure_messages = failures
467                .iter()
468                .map(|(did, err)| format!("{}: {}", did, err))
469                .collect::<Vec<_>>()
470                .join("; ");
471
472            return Err(Error::Dispatch(format!(
473                "Failed to send message to some recipients via WebSocket: {}",
474                failure_messages
475            )));
476        }
477
478        Ok(())
479    }
480}
481
482// Specific implementation for WASM environments with web-sys feature
483#[cfg(all(target_arch = "wasm32", feature = "wasm"))]
484#[async_trait(?Send)]
485impl MessageSender for WebSocketMessageSender {
486    async fn send(&self, packed_message: String, recipient_dids: Vec<String>) -> Result<()> {
487        use wasm_bindgen::prelude::*;
488        use wasm_bindgen_futures::JsFuture;
489        use web_sys::{MessageEvent, WebSocket};
490
491        if recipient_dids.is_empty() {
492            return Err(Error::Dispatch("No recipients specified".to_string()));
493        }
494
495        // Track failures to report them at the end
496        let mut failures = Vec::new();
497
498        // Get the window object
499        let window = web_sys::window().ok_or_else(|| {
500            Error::Dispatch("Could not get window object in WASM environment".to_string())
501        })?;
502
503        // Send the message to each recipient
504        for recipient in &recipient_dids {
505            let endpoint = self.get_endpoint_url(recipient);
506            log::info!(
507                "Sending message to {} via WebSocket at {} (WASM)",
508                recipient,
509                endpoint
510            );
511
512            // Create a promise to set up a WebSocket connection and send the message
513            let (resolve, reject) = js_sys::Promise::new_resolver();
514            let promise_resolver = resolve.clone();
515            let promise_rejecter = reject.clone();
516
517            // Create a new WebSocket
518            let ws = match WebSocket::new(&endpoint) {
519                Ok(ws) => ws,
520                Err(err) => {
521                    let err_msg = format!("Failed to create WebSocket: {:?}", err);
522                    log::error!("{}", err_msg);
523                    failures.push((recipient.clone(), err_msg));
524                    continue;
525                }
526            };
527
528            // Set up event handlers
529            let onopen_callback = Closure::once(Box::new(move |_: web_sys::Event| {
530                promise_resolver.resolve(&JsValue::from(true));
531            }) as Box<dyn FnOnce(web_sys::Event)>);
532
533            let onerror_callback = Closure::once(Box::new(move |e: web_sys::Event| {
534                let err_msg = format!("WebSocket error: {:?}", e);
535                promise_rejecter.reject(&JsValue::from_str(&err_msg));
536            }) as Box<dyn FnOnce(web_sys::Event)>);
537
538            let message_clone = packed_message.clone();
539            let onmessage_callback = Closure::wrap(Box::new(move |e: MessageEvent| {
540                if let Ok(txt) = e.data().dyn_into::<js_sys::JsString>() {
541                    log::debug!("Received message: {}", String::from(txt));
542                }
543            }) as Box<dyn FnMut(MessageEvent)>);
544
545            // Register event handlers
546            ws.set_onopen(Some(onopen_callback.as_ref().unchecked_ref()));
547            ws.set_onerror(Some(onerror_callback.as_ref().unchecked_ref()));
548            ws.set_onmessage(Some(onmessage_callback.as_ref().unchecked_ref()));
549
550            // Wait for the connection to be established
551            match JsFuture::from(js_sys::Promise::race(&js_sys::Array::of2(
552                &js_sys::Promise::resolve(&promise_resolver),
553                &js_sys::Promise::new(&mut |resolve, _| {
554                    let timeout_closure = Closure::once_into_js(move || {
555                        resolve.call0(&JsValue::NULL).unwrap();
556                    });
557                    window
558                        .set_timeout_with_callback_and_timeout_and_arguments_0(
559                            timeout_closure.as_ref().unchecked_ref(),
560                            30000, // Default 30 second timeout
561                        )
562                        .unwrap();
563                }),
564            )))
565            .await
566            {
567                Ok(_) => {
568                    // Connection established, send the message
569                    if let Err(err) = ws.send_with_str(&message_clone) {
570                        let err_msg = format!("Failed to send WebSocket message: {:?}", err);
571                        log::error!("{}", err_msg);
572                        failures.push((recipient.clone(), err_msg));
573                    }
574                }
575                Err(err) => {
576                    let err_msg = format!("WebSocket connection failed: {:?}", err);
577                    log::error!("{}", err_msg);
578                    failures.push((recipient.clone(), err_msg));
579                }
580            }
581
582            // Keep the callbacks alive
583            onopen_callback.forget();
584            onerror_callback.forget();
585            onmessage_callback.forget();
586        }
587
588        // Report failures if any
589        if !failures.is_empty() {
590            let failure_messages = failures
591                .iter()
592                .map(|(did, err)| format!("{}: {}", did, err))
593                .collect::<Vec<_>>()
594                .join("; ");
595
596            return Err(Error::Dispatch(format!(
597                "Failed to send message to some recipients via WebSocket: {}",
598                failure_messages
599            )));
600        }
601
602        Ok(())
603    }
604}
605
606// Fallback implementation for environments without WebSocket support
607#[cfg(not(any(
608    all(not(target_arch = "wasm32"), feature = "websocket"),
609    all(target_arch = "wasm32", feature = "wasm")
610)))]
611#[async_trait]
612impl MessageSender for WebSocketMessageSender {
613    async fn send(&self, packed_message: String, recipient_dids: Vec<String>) -> Result<()> {
614        // This is a fallback implementation when neither tokio_tungstenite nor web-sys is available
615        for recipient in &recipient_dids {
616            let endpoint = self.get_endpoint_url(recipient);
617            log::info!(
618                "Would send message to {} via WebSocket at {} (WebSocket not available)",
619                recipient,
620                endpoint
621            );
622            log::debug!("Message content: {}", packed_message);
623        }
624
625        log::warn!("WebSocket sender is running without WebSocket features enabled. No actual WebSocket connections will be made.");
626        Ok(())
627    }
628}
629
630#[cfg(all(not(target_arch = "wasm32"), feature = "reqwest"))]
631#[async_trait]
632impl MessageSender for HttpMessageSender {
633    async fn send(&self, packed_message: String, recipient_dids: Vec<String>) -> Result<()> {
634        if recipient_dids.is_empty() {
635            return Err(Error::Dispatch("No recipients specified".to_string()));
636        }
637
638        // Track failures to report them at the end
639        let mut failures = Vec::new();
640
641        // Send the message to each recipient
642        for recipient in &recipient_dids {
643            let endpoint = self.get_endpoint_url(recipient);
644            log::info!("Sending message to {} via HTTP at {}", recipient, endpoint);
645
646            // Retry logic
647            let mut attempt = 0;
648            let mut success = false;
649            let mut last_error = None;
650
651            while attempt < self.max_retries && !success {
652                attempt += 1;
653
654                // Exponential backoff for retries
655                if attempt > 1 {
656                    let backoff_ms = 100 * (2_u64.pow(attempt - 1));
657                    tokio::time::sleep(std::time::Duration::from_millis(backoff_ms)).await;
658                }
659
660                // Make the HTTP request
661                match self
662                    .client
663                    .post(&endpoint)
664                    .header("Content-Type", "application/didcomm-message+json")
665                    .body(packed_message.clone())
666                    .send()
667                    .await
668                {
669                    Ok(response) => {
670                        // Check if the response was successful (2xx status code)
671                        if response.status().is_success() {
672                            log::debug!("Successfully sent message to {}", recipient);
673                            success = true;
674                        } else {
675                            let status = response.status();
676                            let body = response.text().await.unwrap_or_default();
677                            log::warn!(
678                                "Failed to send message to {} (attempt {}/{}): HTTP {} - {}",
679                                recipient,
680                                attempt,
681                                self.max_retries,
682                                status,
683                                body
684                            );
685                            last_error = Some(format!("HTTP error: {} - {}", status, body));
686
687                            // Don't retry certain status codes
688                            if status.as_u16() == 404 || status.as_u16() == 400 {
689                                break; // Don't retry not found or bad request
690                            }
691                        }
692                    }
693                    Err(err) => {
694                        log::warn!(
695                            "Failed to send message to {} (attempt {}/{}): {}",
696                            recipient,
697                            attempt,
698                            self.max_retries,
699                            err
700                        );
701                        last_error = Some(format!("Request error: {}", err));
702                    }
703                }
704            }
705
706            if !success {
707                // Record the failure
708                failures.push((
709                    recipient.clone(),
710                    last_error.unwrap_or_else(|| "Unknown error".to_string()),
711                ));
712            }
713        }
714
715        // Report failures if any
716        if !failures.is_empty() {
717            let failure_messages = failures
718                .iter()
719                .map(|(did, err)| format!("{}: {}", did, err))
720                .collect::<Vec<_>>()
721                .join("; ");
722
723            return Err(Error::Dispatch(format!(
724                "Failed to send message to some recipients: {}",
725                failure_messages
726            )));
727        }
728
729        Ok(())
730    }
731}
732
733#[cfg(all(not(target_arch = "wasm32"), not(feature = "reqwest")))]
734#[async_trait]
735impl MessageSender for HttpMessageSender {
736    async fn send(&self, packed_message: String, recipient_dids: Vec<String>) -> Result<()> {
737        // This is a fallback implementation when reqwest is not available
738        // In a production environment, reqwest should always be available in the native configuration
739
740        for recipient in &recipient_dids {
741            let endpoint = self.get_endpoint_url(recipient);
742            log::info!(
743                "Would send message to {} via HTTP at {} (reqwest not available)",
744                recipient,
745                endpoint
746            );
747            log::debug!("Message content: {}", packed_message);
748        }
749
750        log::warn!("HTTP sender is running without reqwest feature enabled. No actual HTTP requests will be made.");
751        Ok(())
752    }
753}
754
755// Specific implementation for WASM environments with web-sys feature
756#[cfg(all(target_arch = "wasm32", feature = "wasm"))]
757#[async_trait(?Send)]
758impl MessageSender for HttpMessageSender {
759    async fn send(&self, packed_message: String, recipient_dids: Vec<String>) -> Result<()> {
760        use wasm_bindgen::prelude::*;
761        use wasm_bindgen_futures::JsFuture;
762        use web_sys::{Request, RequestInit, RequestMode, Response};
763
764        if recipient_dids.is_empty() {
765            return Err(Error::Dispatch("No recipients specified".to_string()));
766        }
767
768        // Track failures to report them at the end
769        let mut failures = Vec::new();
770
771        // Get the window object
772        let window = web_sys::window().ok_or_else(|| {
773            Error::Dispatch("Could not get window object in WASM environment".to_string())
774        })?;
775
776        // Send the message to each recipient
777        for recipient in &recipient_dids {
778            let endpoint = self.get_endpoint_url(recipient);
779            log::info!(
780                "Sending message to {} via HTTP at {} (WASM)",
781                recipient,
782                endpoint
783            );
784
785            // Retry logic
786            let mut attempt = 0;
787            let mut success = false;
788            let mut last_error = None;
789
790            while attempt < self.max_retries && !success {
791                attempt += 1;
792
793                // Exponential backoff for retries
794                if attempt > 1 {
795                    let backoff_ms = 100 * (2_u64.pow(attempt - 1));
796                    // In WASM, we need to use a Promise-based sleep
797                    let promise = js_sys::Promise::new(&mut |resolve, _| {
798                        let closure = Closure::once_into_js(move || {
799                            resolve.call0(&JsValue::NULL).unwrap();
800                        });
801                        window
802                            .set_timeout_with_callback_and_timeout_and_arguments_0(
803                                closure.as_ref().unchecked_ref(),
804                                backoff_ms as i32,
805                            )
806                            .unwrap();
807                    });
808
809                    let _ = JsFuture::from(promise).await;
810                }
811
812                // Initialize a new Request
813                let mut opts = RequestInit::new();
814                opts.method("POST");
815                opts.mode(RequestMode::Cors);
816                opts.body(Some(&JsValue::from_str(&packed_message)));
817
818                let request = match Request::new_with_str_and_init(&endpoint, &opts) {
819                    Ok(req) => req,
820                    Err(err) => {
821                        let err_msg = format!("Failed to create request: {:?}", err);
822                        log::warn!("{}", err_msg);
823                        last_error = Some(err_msg);
824                        continue;
825                    }
826                };
827
828                // Set headers
829                if let Err(err) = request
830                    .headers()
831                    .set("Content-Type", "application/didcomm-message+json")
832                {
833                    let err_msg = format!("Failed to set headers: {:?}", err);
834                    log::warn!("{}", err_msg);
835                    last_error = Some(err_msg);
836                    continue;
837                }
838
839                // Perform the fetch
840                let resp_promise = window.fetch_with_request(&request);
841                let resp_jsvalue = match JsFuture::from(resp_promise).await {
842                    Ok(val) => val,
843                    Err(err) => {
844                        let err_msg = format!("Fetch error: {:?}", err);
845                        log::warn!(
846                            "Failed to send message to {} (attempt {}/{}): {}",
847                            recipient,
848                            attempt,
849                            self.max_retries,
850                            err_msg
851                        );
852                        last_error = Some(err_msg);
853                        continue;
854                    }
855                };
856
857                // Convert the response to a Response object
858                let response: Response = match resp_jsvalue.dyn_into() {
859                    Ok(resp) => resp,
860                    Err(err) => {
861                        let err_msg = format!("Failed to convert response: {:?}", err);
862                        log::warn!("{}", err_msg);
863                        last_error = Some(err_msg);
864                        continue;
865                    }
866                };
867
868                // Check the status
869                if response.ok() {
870                    log::debug!("Successfully sent message to {}", recipient);
871                    success = true;
872                } else {
873                    let status = response.status();
874
875                    // Try to get the response body as text
876                    let body_promise = response.text();
877                    let body = match JsFuture::from(body_promise).await {
878                        Ok(text_jsval) => text_jsval.as_string().unwrap_or_default(),
879                        Err(_) => String::from("[Could not read response body]"),
880                    };
881
882                    let err_msg = format!("HTTP error: {} - {}", status, body);
883                    log::warn!(
884                        "Failed to send message to {} (attempt {}/{}): {}",
885                        recipient,
886                        attempt,
887                        self.max_retries,
888                        err_msg
889                    );
890                    last_error = Some(err_msg);
891
892                    // Don't retry certain status codes
893                    if status == 404 || status == 400 {
894                        break; // Don't retry not found or bad request
895                    }
896                }
897            }
898
899            if !success {
900                failures.push((
901                    recipient.clone(),
902                    last_error.unwrap_or_else(|| "Unknown error".to_string()),
903                ));
904            }
905        }
906
907        // Report failures if any
908        if !failures.is_empty() {
909            let failure_messages = failures
910                .iter()
911                .map(|(did, err)| format!("{}: {}", did, err))
912                .collect::<Vec<_>>()
913                .join("; ");
914
915            return Err(Error::Dispatch(format!(
916                "Failed to send message to some recipients: {}",
917                failure_messages
918            )));
919        }
920
921        Ok(())
922    }
923}
924
925// Fallback implementation for WASM environments without web-sys feature
926#[cfg(all(target_arch = "wasm32", not(feature = "wasm")))]
927#[async_trait(?Send)]
928impl MessageSender for HttpMessageSender {
929    async fn send(&self, packed_message: String, recipient_dids: Vec<String>) -> Result<()> {
930        // This is a fallback implementation when web-sys is not available in WASM
931        for recipient in &recipient_dids {
932            let endpoint = self.get_endpoint_url(recipient);
933            log::info!(
934                "Would send message to {} via HTTP at {} (WASM without web-sys)",
935                recipient,
936                endpoint
937            );
938            log::debug!("Message content: {}", packed_message);
939        }
940
941        log::warn!("HTTP sender is running in WASM without the web-sys feature enabled. No actual HTTP requests will be made.");
942        Ok(())
943    }
944}