Skip to main content

tap_node/message/
sender.rs

1//! PlainMessage sender implementations for TAP Node.
2//!
3//! This module provides functionality for sending TAP messages to recipients
4//! using various transport mechanisms.
5//!
6//! # Overview
7//!
8//! The message sender system in TAP Node implements different strategies for
9//! delivering DIDComm messages to their recipients. The primary implementations are:
10//!
11//! - `NodePlainMessageSender`: A flexible sender that uses a callback function for delivery
12//! - `HttpPlainMessageSender`: Sends messages over HTTP with robust error handling and retries
13//!
14//! # Cross-platform Support
15//!
16//! The `HttpPlainMessageSender` implementation supports multiple platforms:
17//!
18//! - Native environments (using reqwest)
19//! - WASM environments (using web-sys)
20//! - Fallback implementations for environments without these libraries
21//!
22//! # Usage with TapNode
23//!
24//! ```no_run
25//! # use std::sync::Arc;
26//! # use tap_node::{NodeConfig, TapNode, HttpPlainMessageSender, PlainMessageSender};
27//! # use tap_msg::didcomm::PlainMessage;
28//! # use serde_json::json;
29//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
30//! // Create a TapNode
31//! let node = TapNode::new(NodeConfig::default());
32//!
33//! // Create a sample message
34//! let message = PlainMessage {
35//!     id: "test-123".to_string(),
36//!     typ: "https://tap.rsvp/schema/1.0#transfer".to_string(),
37//!     type_: "https://tap.rsvp/schema/1.0#transfer".to_string(),
38//!     body: json!({"amount": "100.00"}),
39//!     from: "did:example:sender".to_string(),
40//!     to: vec!["did:example:recipient".to_string()],
41//!     created_time: Some(chrono::Utc::now().timestamp() as u64),
42//!     expires_time: None,
43//!     thid: None,
44//!     pthid: None,
45//!     attachments: None,
46//!     from_prior: None,
47//!     extra_headers: Default::default(),
48//! };
49//!
50//! // Pack a message using the node's send_message method
51//! let packed_message = node.send_message(
52//!     "did:example:sender".to_string(),
53//!     message
54//! ).await?;
55//!
56//! // Create an HTTP sender for external dispatch
57//! let sender = HttpPlainMessageSender::new("https://recipient-endpoint.example.com".to_string());
58//!
59//! // Send the packed message to the recipient node
60//! sender.send(
61//!     packed_message,
62//!     vec!["did:example:recipient".to_string()]
63//! ).await?;
64//! # Ok(())
65//! # }
66//! ```
67
68use async_trait::async_trait;
69#[cfg(all(not(target_arch = "wasm32"), feature = "websocket"))]
70use std::collections::HashMap;
71use std::fmt::{self, Debug};
72use std::sync::Arc;
73
74use crate::error::{Error, Result};
75use crate::storage::{
76    models::{DeliveryStatus, DeliveryType},
77    Storage,
78};
79
80/// PlainMessage sender trait for sending packed messages to recipients
81#[async_trait]
82pub trait PlainMessageSender: Send + Sync + Debug {
83    /// Send a packed message to one or more recipients
84    async fn send(&self, packed_message: String, recipient_dids: Vec<String>) -> Result<()>;
85}
86
87/// Node message sender implementation
88pub struct NodePlainMessageSender {
89    /// Callback function for sending messages
90    send_callback: Arc<dyn Fn(String, Vec<String>) -> Result<()> + Send + Sync>,
91}
92
93impl Debug for NodePlainMessageSender {
94    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
95        f.debug_struct("NodePlainMessageSender")
96            .field("send_callback", &"<function>")
97            .finish()
98    }
99}
100
101impl NodePlainMessageSender {
102    /// Create a new NodePlainMessageSender with the given callback
103    pub fn new<F>(callback: F) -> Self
104    where
105        F: Fn(String, Vec<String>) -> Result<()> + Send + Sync + 'static,
106    {
107        Self {
108            send_callback: Arc::new(callback),
109        }
110    }
111}
112
113#[async_trait]
114impl PlainMessageSender for NodePlainMessageSender {
115    async fn send(&self, packed_message: String, recipient_dids: Vec<String>) -> Result<()> {
116        // Call the callback function with the packed message and recipient DIDs
117        (self.send_callback)(packed_message, recipient_dids)
118            .map_err(|e| Error::Dispatch(format!("Failed to send message: {}", e)))
119    }
120}
121
122/// HTTP message sender implementation for sending messages over HTTP
123///
124/// This sender allows TAP nodes to send messages to other TAP nodes over HTTP,
125/// handling the necessary encoding, content types, and error handling.
126///
127/// # HTTP Endpoint Structure
128///
129/// PlainMessages are sent to endpoints derived from the recipient's DID, using a
130/// configurable base URL.
131///
132/// # Error Handling
133///
134/// The sender includes built-in error handling for common HTTP issues:
135/// - Connection timeouts
136/// - Request failures
137/// - Invalid responses
138/// - Rate limiting
139///
140/// # Configuration
141///
142/// The sender can be configured with:
143/// - Base URL for the HTTP endpoint
144/// - Timeout settings
145/// - Retry policies
146#[derive(Debug)]
147pub struct HttpPlainMessageSender {
148    /// Base URL for the HTTP endpoint
149    base_url: String,
150    /// HTTP client (only in native environments)
151    #[cfg(feature = "reqwest")]
152    client: reqwest::Client,
153    /// Timeout for HTTP requests in milliseconds
154    #[allow(dead_code)] // Used for future timeout configuration
155    timeout_ms: u64,
156    /// Maximum number of retries
157    max_retries: u32,
158}
159
160impl HttpPlainMessageSender {
161    /// Create a new HttpPlainMessageSender with the given base URL
162    pub fn new(base_url: String) -> Self {
163        Self::with_options(base_url, 30000, 3) // Default 30 second timeout, 3 retries
164    }
165
166    /// Create a new HttpPlainMessageSender with custom options
167    pub fn with_options(base_url: String, timeout_ms: u64, max_retries: u32) -> Self {
168        #[cfg(feature = "reqwest")]
169        {
170            // Create a reqwest client with appropriate settings
171            let client = reqwest::Client::builder()
172                .timeout(std::time::Duration::from_millis(timeout_ms))
173                .user_agent("TAP-Node/0.1")
174                .build()
175                .unwrap_or_default();
176
177            Self {
178                base_url,
179                client,
180                timeout_ms,
181                max_retries,
182            }
183        }
184
185        #[cfg(not(feature = "reqwest"))]
186        {
187            Self {
188                base_url,
189                timeout_ms,
190                max_retries,
191            }
192        }
193    }
194
195    /// Helper to construct the endpoint URL for a recipient
196    pub fn get_endpoint_url(&self, recipient_did: &str) -> String {
197        // In a production implementation, this would map DID to HTTP endpoint
198        // This could involve DID resolution or a lookup table
199
200        // For now, we'll use a simple convention:
201        // Append the DID to the base URL, with proper URL encoding
202        let encoded_did = self.url_encode(recipient_did);
203        format!(
204            "{}/api/messages/{}",
205            self.base_url.trim_end_matches('/'),
206            encoded_did
207        )
208    }
209
210    /// URL-encode a string for safe use in URLs
211    fn url_encode(&self, text: &str) -> String {
212        use percent_encoding::{utf8_percent_encode, NON_ALPHANUMERIC};
213        utf8_percent_encode(text, NON_ALPHANUMERIC).to_string()
214    }
215}
216
217/// WebSocket message sender implementation for sending messages over WebSockets
218///
219/// This sender enables real-time bidirectional communication between TAP nodes,
220/// providing a persistent connection that can be used for both sending and receiving
221/// messages. WebSockets are particularly useful for scenarios requiring:
222///
223/// - Low-latency message delivery
224/// - Bidirectional communication
225/// - Connection state awareness
226/// - Reduced overhead compared to repeated HTTP requests
227///
228/// # Connection Management
229///
230/// The WebSocket sender manages a pool of connections to recipient endpoints,
231/// keeping them alive and reconnecting as needed. This makes it suitable for
232/// high-frequency message exchanges between known parties.
233///
234/// # Error Handling
235///
236/// The sender handles various WebSocket-specific error conditions:
237/// - Connection failures
238/// - PlainMessage delivery failures
239/// - Connection drops and reconnection
240/// - Protocol errors
241///
242/// # Cross-platform Support
243///
244/// Like the HTTP sender, the WebSocket sender supports:
245/// - Native environments (using tokio_tungstenite)
246/// - WASM environments (using web-sys WebSocket API)
247/// - Fallback implementations for environments without these libraries
248#[derive(Debug)]
249pub struct WebSocketPlainMessageSender {
250    /// Base URL for WebSocket endpoints (ws:// or wss://)
251    base_url: String,
252    /// Active connections (only in native environments)
253    #[cfg(all(not(target_arch = "wasm32"), feature = "websocket"))]
254    connections: std::sync::Mutex<HashMap<String, tokio::sync::mpsc::Sender<String>>>,
255    /// WebSocket task handles (only in native environments)
256    #[cfg(all(not(target_arch = "wasm32"), feature = "websocket"))]
257    task_handles: std::sync::Mutex<HashMap<String, tokio::task::JoinHandle<()>>>,
258}
259
260impl WebSocketPlainMessageSender {
261    /// Create a new WebSocketPlainMessageSender with the given base URL
262    pub fn new(base_url: String) -> Self {
263        Self::with_options(base_url)
264    }
265
266    /// Create a new WebSocketPlainMessageSender with custom options
267    pub fn with_options(base_url: String) -> Self {
268        #[cfg(all(not(target_arch = "wasm32"), feature = "websocket"))]
269        {
270            Self {
271                base_url,
272                connections: std::sync::Mutex::new(HashMap::new()),
273                task_handles: std::sync::Mutex::new(HashMap::new()),
274            }
275        }
276
277        #[cfg(not(all(not(target_arch = "wasm32"), feature = "websocket")))]
278        {
279            Self { base_url }
280        }
281    }
282
283    /// Helper to construct the WebSocket endpoint URL for a recipient
284    fn get_endpoint_url(&self, recipient_did: &str) -> String {
285        // In a production implementation, this would map DID to WebSocket endpoint
286        // This could involve DID resolution or a lookup table
287
288        // Convert http(s):// to ws(s)://
289        let ws_base_url = if self.base_url.starts_with("https://") {
290            self.base_url.replace("https://", "wss://")
291        } else if self.base_url.starts_with("http://") {
292            self.base_url.replace("http://", "ws://")
293        } else {
294            self.base_url.clone()
295        };
296
297        // Append the DID to the base URL, with proper URL encoding
298        let encoded_did = self.url_encode(recipient_did);
299        format!(
300            "{}/ws/messages/{}",
301            ws_base_url.trim_end_matches('/'),
302            encoded_did
303        )
304    }
305
306    /// URL-encode a string for safe use in URLs
307    fn url_encode(&self, text: &str) -> String {
308        use percent_encoding::{utf8_percent_encode, NON_ALPHANUMERIC};
309        utf8_percent_encode(text, NON_ALPHANUMERIC).to_string()
310    }
311
312    /// Ensures a connection exists for the given recipient
313    #[cfg(all(not(target_arch = "wasm32"), feature = "websocket"))]
314    async fn ensure_connection(
315        &self,
316        recipient: &str,
317    ) -> Result<tokio::sync::mpsc::Sender<String>> {
318        use futures::sink::SinkExt;
319        use futures::stream::StreamExt;
320        use tokio_tungstenite::connect_async;
321        use tokio_tungstenite::tungstenite::protocol::Message;
322
323        // Check if we already have an active connection and return it if we do
324        {
325            // Scope the lock to ensure it's released before any await points
326            let connections = self.connections.lock().unwrap();
327            if let Some(connection) = connections.get(recipient) {
328                return Ok(connection.clone());
329            }
330        }
331
332        // Otherwise, create a new connection
333        let endpoint = self.get_endpoint_url(recipient);
334        log::info!(
335            "Creating new WebSocket connection to {} at {}",
336            recipient,
337            endpoint
338        );
339
340        // Create a channel for sending messages to the WebSocket task
341        let (tx, mut rx) = tokio::sync::mpsc::channel::<String>(100);
342
343        // Connect to the WebSocket with default timeout (30 seconds)
344        let (ws_stream, _) = match tokio::time::timeout(
345            std::time::Duration::from_millis(30000),
346            connect_async(&endpoint),
347        )
348        .await
349        {
350            Ok(Ok(stream)) => stream,
351            Ok(Err(e)) => {
352                return Err(Error::Dispatch(format!(
353                    "Failed to connect to WebSocket endpoint {}: {}",
354                    endpoint, e
355                )));
356            }
357            Err(_) => {
358                return Err(Error::Dispatch(format!(
359                    "Connection to WebSocket endpoint {} timed out",
360                    endpoint
361                )));
362            }
363        };
364
365        log::debug!("WebSocket connection established to {}", recipient);
366
367        // Split the WebSocket stream
368        let (mut write, mut read) = ws_stream.split();
369
370        // Create a task that will:
371        // 1. Listen for messages from the channel and send them to the WebSocket
372        // 2. Listen for messages from the WebSocket and handle them
373        let recipient_clone = recipient.to_string();
374        let handle = tokio::spawn(async move {
375            // Process messages from the channel to send over WebSocket
376            loop {
377                tokio::select! {
378                    // Handle outgoing messages
379                    Some(message) = rx.recv() => {
380                        log::debug!("Sending message to {} via WebSocket", recipient_clone);
381                        if let Err(e) = write.send(Message::Text(message)).await {
382                            log::error!("Failed to send WebSocket message to {}: {}", recipient_clone, e);
383                            // Try to reconnect? For now we'll just log the error
384                        }
385                    }
386
387                    // Handle incoming messages
388                    result = read.next() => {
389                        match result {
390                            Some(Ok(message)) => {
391                                // Process incoming message - for now just log it
392                                if let Message::Text(text) = message {
393                                    log::debug!("Received WebSocket message from {}: {}", recipient_clone, text);
394                                }
395                            }
396                            Some(Err(e)) => {
397                                log::error!("WebSocket error from {}: {}", recipient_clone, e);
398                                // Connection likely dropped, exit the loop
399                                break;
400                            }
401                            None => {
402                                // WebSocket closed
403                                log::info!("WebSocket connection to {} closed", recipient_clone);
404                                break;
405                            }
406                        }
407                    }
408                }
409            }
410
411            // WebSocket loop ended - clean up and possibly reconnect
412            log::info!("WebSocket connection to {} terminated", recipient_clone);
413        });
414
415        // Store the sender and task handle (using separate scopes to avoid holding multiple locks)
416        {
417            // Get mutable access to the connections map
418            let mut connections = self.connections.lock().unwrap();
419            connections.insert(recipient.to_string(), tx.clone());
420        }
421
422        {
423            // Get mutable access to the task handles map
424            let mut task_handles = self.task_handles.lock().unwrap();
425            task_handles.insert(recipient.to_string(), handle);
426        }
427
428        Ok(tx)
429    }
430}
431
432#[cfg(all(not(target_arch = "wasm32"), feature = "websocket"))]
433#[async_trait]
434impl PlainMessageSender for WebSocketPlainMessageSender {
435    async fn send(&self, packed_message: String, recipient_dids: Vec<String>) -> Result<()> {
436        if recipient_dids.is_empty() {
437            return Err(Error::Dispatch("No recipients specified".to_string()));
438        }
439
440        // Track failures to report them at the end
441        let mut failures = Vec::new();
442
443        // Send the message to each recipient
444        for recipient in &recipient_dids {
445            log::info!("Sending message to {} via WebSocket", recipient);
446
447            // Ensure we have a connection
448            match self.ensure_connection(recipient).await {
449                Ok(sender) => {
450                    // Send the message through the channel to the WebSocket task
451                    if let Err(e) = sender.send(packed_message.clone()).await {
452                        let err_msg = format!("Failed to send message to WebSocket task: {}", e);
453                        log::error!("{}", err_msg);
454                        failures.push((recipient.clone(), err_msg));
455                    }
456                }
457                Err(e) => {
458                    let err_msg = format!("Failed to establish WebSocket connection: {}", e);
459                    log::error!("{}", err_msg);
460                    failures.push((recipient.clone(), err_msg));
461                }
462            }
463        }
464
465        // Report failures if any
466        if !failures.is_empty() {
467            let failure_messages = failures
468                .iter()
469                .map(|(did, err)| format!("{}: {}", did, err))
470                .collect::<Vec<_>>()
471                .join("; ");
472
473            return Err(Error::Dispatch(format!(
474                "Failed to send message to some recipients via WebSocket: {}",
475                failure_messages
476            )));
477        }
478
479        Ok(())
480    }
481}
482
483// Specific implementation for WASM environments with web-sys feature
484#[cfg(all(target_arch = "wasm32", feature = "wasm"))]
485#[async_trait(?Send)]
486impl PlainMessageSender for WebSocketPlainMessageSender {
487    async fn send(&self, packed_message: String, recipient_dids: Vec<String>) -> Result<()> {
488        use wasm_bindgen::prelude::*;
489        use wasm_bindgen_futures::JsFuture;
490        use web_sys::{MessageEvent, WebSocket};
491
492        if recipient_dids.is_empty() {
493            return Err(Error::Dispatch("No recipients specified".to_string()));
494        }
495
496        // Track failures to report them at the end
497        let mut failures = Vec::new();
498
499        // Get the window object
500        let window = web_sys::window().ok_or_else(|| {
501            Error::Dispatch("Could not get window object in WASM environment".to_string())
502        })?;
503
504        // Send the message to each recipient
505        for recipient in &recipient_dids {
506            let endpoint = self.get_endpoint_url(recipient);
507            log::info!(
508                "Sending message to {} via WebSocket at {} (WASM)",
509                recipient,
510                endpoint
511            );
512
513            // Create a promise to set up a WebSocket connection and send the message
514            let (resolve, reject) = js_sys::Promise::new_resolver();
515            let promise_resolver = resolve.clone();
516            let promise_rejecter = reject.clone();
517
518            // Create a new WebSocket
519            let ws = match WebSocket::new(&endpoint) {
520                Ok(ws) => ws,
521                Err(err) => {
522                    let err_msg = format!("Failed to create WebSocket: {:?}", err);
523                    log::error!("{}", err_msg);
524                    failures.push((recipient.clone(), err_msg));
525                    continue;
526                }
527            };
528
529            // Set up event handlers
530            let onopen_callback = Closure::once(Box::new(move |_: web_sys::Event| {
531                promise_resolver.resolve(&JsValue::from(true));
532            }) as Box<dyn FnOnce(web_sys::Event)>);
533
534            let onerror_callback = Closure::once(Box::new(move |e: web_sys::Event| {
535                let err_msg = format!("WebSocket error: {:?}", e);
536                promise_rejecter.reject(&JsValue::from_str(&err_msg));
537            }) as Box<dyn FnOnce(web_sys::Event)>);
538
539            let message_clone = packed_message.clone();
540            let onmessage_callback = Closure::wrap(Box::new(move |e: MessageEvent| {
541                if let Ok(txt) = e.data().dyn_into::<js_sys::JsString>() {
542                    log::debug!("Received message: {}", String::from(txt));
543                }
544            }) as Box<dyn FnMut(MessageEvent)>);
545
546            // Register event handlers
547            ws.set_onopen(Some(onopen_callback.as_ref().unchecked_ref()));
548            ws.set_onerror(Some(onerror_callback.as_ref().unchecked_ref()));
549            ws.set_onmessage(Some(onmessage_callback.as_ref().unchecked_ref()));
550
551            // Wait for the connection to be established
552            match JsFuture::from(js_sys::Promise::race(&js_sys::Array::of2(
553                &js_sys::Promise::resolve(&promise_resolver),
554                &js_sys::Promise::new(&mut |resolve, _| {
555                    let timeout_closure = Closure::once_into_js(move || {
556                        resolve.call0(&JsValue::NULL).unwrap();
557                    });
558                    window
559                        .set_timeout_with_callback_and_timeout_and_arguments_0(
560                            timeout_closure.as_ref().unchecked_ref(),
561                            30000, // Default 30 second timeout
562                        )
563                        .unwrap();
564                }),
565            )))
566            .await
567            {
568                Ok(_) => {
569                    // Connection established, send the message
570                    if let Err(err) = ws.send_with_str(&message_clone) {
571                        let err_msg = format!("Failed to send WebSocket message: {:?}", err);
572                        log::error!("{}", err_msg);
573                        failures.push((recipient.clone(), err_msg));
574                    }
575                }
576                Err(err) => {
577                    let err_msg = format!("WebSocket connection failed: {:?}", err);
578                    log::error!("{}", err_msg);
579                    failures.push((recipient.clone(), err_msg));
580                }
581            }
582
583            // Keep the callbacks alive
584            onopen_callback.forget();
585            onerror_callback.forget();
586            onmessage_callback.forget();
587        }
588
589        // Report failures if any
590        if !failures.is_empty() {
591            let failure_messages = failures
592                .iter()
593                .map(|(did, err)| format!("{}: {}", did, err))
594                .collect::<Vec<_>>()
595                .join("; ");
596
597            return Err(Error::Dispatch(format!(
598                "Failed to send message to some recipients via WebSocket: {}",
599                failure_messages
600            )));
601        }
602
603        Ok(())
604    }
605}
606
607// Fallback implementation for environments without WebSocket support
608#[cfg(not(any(
609    all(not(target_arch = "wasm32"), feature = "websocket"),
610    all(target_arch = "wasm32", feature = "wasm")
611)))]
612#[async_trait]
613impl PlainMessageSender for WebSocketPlainMessageSender {
614    async fn send(&self, packed_message: String, recipient_dids: Vec<String>) -> Result<()> {
615        // This is a fallback implementation when neither tokio_tungstenite nor web-sys is available
616        for recipient in &recipient_dids {
617            let endpoint = self.get_endpoint_url(recipient);
618            log::info!(
619                "Would send message to {} via WebSocket at {} (WebSocket not available)",
620                recipient,
621                endpoint
622            );
623            log::debug!("PlainMessage content: {}", packed_message);
624        }
625
626        log::warn!("WebSocket sender is running without WebSocket features enabled. No actual WebSocket connections will be made.");
627        Ok(())
628    }
629}
630
631#[cfg(all(not(target_arch = "wasm32"), feature = "reqwest"))]
632#[async_trait]
633impl PlainMessageSender for HttpPlainMessageSender {
634    async fn send(&self, packed_message: String, recipient_dids: Vec<String>) -> Result<()> {
635        if recipient_dids.is_empty() {
636            return Err(Error::Dispatch("No recipients specified".to_string()));
637        }
638
639        // Track failures to report them at the end
640        let mut failures = Vec::new();
641
642        // Send the message to each recipient
643        for recipient in &recipient_dids {
644            let endpoint = self.get_endpoint_url(recipient);
645            log::info!("Sending message to {} via HTTP at {}", recipient, endpoint);
646
647            // Retry logic
648            let mut attempt = 0;
649            let mut success = false;
650            let mut last_error = None;
651
652            while attempt < self.max_retries && !success {
653                attempt += 1;
654
655                // Exponential backoff for retries
656                if attempt > 1 {
657                    let backoff_ms = 100 * (2_u64.pow(attempt - 1));
658                    tokio::time::sleep(std::time::Duration::from_millis(backoff_ms)).await;
659                }
660
661                // Make the HTTP request
662                match self
663                    .client
664                    .post(&endpoint)
665                    .header("Content-Type", "application/didcomm-message+json")
666                    .body(packed_message.clone())
667                    .send()
668                    .await
669                {
670                    Ok(response) => {
671                        // Check if the response was successful (2xx status code)
672                        if response.status().is_success() {
673                            log::debug!("Successfully sent message to {}", recipient);
674                            success = true;
675                        } else {
676                            let status = response.status();
677                            let body = response.text().await.unwrap_or_default();
678                            log::warn!(
679                                "Failed to send message to {} (attempt {}/{}): HTTP {} - {}",
680                                recipient,
681                                attempt,
682                                self.max_retries,
683                                status,
684                                body
685                            );
686                            last_error = Some(format!("HTTP error: {} - {}", status, body));
687
688                            // Don't retry certain status codes
689                            if status.as_u16() == 404 || status.as_u16() == 400 {
690                                break; // Don't retry not found or bad request
691                            }
692                        }
693                    }
694                    Err(err) => {
695                        log::warn!(
696                            "Failed to send message to {} (attempt {}/{}): {}",
697                            recipient,
698                            attempt,
699                            self.max_retries,
700                            err
701                        );
702                        last_error = Some(format!("Request error: {}", err));
703                    }
704                }
705            }
706
707            if !success {
708                // Record the failure
709                failures.push((
710                    recipient.clone(),
711                    last_error.unwrap_or_else(|| "Unknown error".to_string()),
712                ));
713            }
714        }
715
716        // Report failures if any
717        if !failures.is_empty() {
718            let failure_messages = failures
719                .iter()
720                .map(|(did, err)| format!("{}: {}", did, err))
721                .collect::<Vec<_>>()
722                .join("; ");
723
724            return Err(Error::Dispatch(format!(
725                "Failed to send message to some recipients: {}",
726                failure_messages
727            )));
728        }
729
730        Ok(())
731    }
732}
733
734#[cfg(all(not(target_arch = "wasm32"), not(feature = "reqwest")))]
735#[async_trait]
736impl PlainMessageSender for HttpPlainMessageSender {
737    async fn send(&self, packed_message: String, recipient_dids: Vec<String>) -> Result<()> {
738        // This is a fallback implementation when reqwest is not available
739        // In a production environment, reqwest should always be available in the native configuration
740
741        for recipient in &recipient_dids {
742            let endpoint = self.get_endpoint_url(recipient);
743            log::info!(
744                "Would send message to {} via HTTP at {} (reqwest not available)",
745                recipient,
746                endpoint
747            );
748            log::debug!("PlainMessage content: {}", packed_message);
749        }
750
751        log::warn!("HTTP sender is running without reqwest feature enabled. No actual HTTP requests will be made.");
752        Ok(())
753    }
754}
755
756// Specific implementation for WASM environments with web-sys feature
757#[cfg(all(target_arch = "wasm32", feature = "wasm"))]
758#[async_trait(?Send)]
759impl PlainMessageSender for HttpPlainMessageSender {
760    async fn send(&self, packed_message: String, recipient_dids: Vec<String>) -> Result<()> {
761        use wasm_bindgen::prelude::*;
762        use wasm_bindgen_futures::JsFuture;
763        use web_sys::{Request, RequestInit, RequestMode, Response};
764
765        if recipient_dids.is_empty() {
766            return Err(Error::Dispatch("No recipients specified".to_string()));
767        }
768
769        // Track failures to report them at the end
770        let mut failures = Vec::new();
771
772        // Get the window object
773        let window = web_sys::window().ok_or_else(|| {
774            Error::Dispatch("Could not get window object in WASM environment".to_string())
775        })?;
776
777        // Send the message to each recipient
778        for recipient in &recipient_dids {
779            let endpoint = self.get_endpoint_url(recipient);
780            log::info!(
781                "Sending message to {} via HTTP at {} (WASM)",
782                recipient,
783                endpoint
784            );
785
786            // Retry logic
787            let mut attempt = 0;
788            let mut success = false;
789            let mut last_error = None;
790
791            while attempt < self.max_retries && !success {
792                attempt += 1;
793
794                // Exponential backoff for retries
795                if attempt > 1 {
796                    let backoff_ms = 100 * (2_u64.pow(attempt - 1));
797                    // In WASM, we need to use a Promise-based sleep
798                    let promise = js_sys::Promise::new(&mut |resolve, _| {
799                        let closure = Closure::once_into_js(move || {
800                            resolve.call0(&JsValue::NULL).unwrap();
801                        });
802                        window
803                            .set_timeout_with_callback_and_timeout_and_arguments_0(
804                                closure.as_ref().unchecked_ref(),
805                                backoff_ms as i32,
806                            )
807                            .unwrap();
808                    });
809
810                    let _ = JsFuture::from(promise).await;
811                }
812
813                // Initialize a new Request
814                let mut opts = RequestInit::new();
815                opts.method("POST");
816                opts.mode(RequestMode::Cors);
817                opts.body(Some(&JsValue::from_str(&packed_message)));
818
819                let request = match Request::new_with_str_and_init(&endpoint, &opts) {
820                    Ok(req) => req,
821                    Err(err) => {
822                        let err_msg = format!("Failed to create request: {:?}", err);
823                        log::warn!("{}", err_msg);
824                        last_error = Some(err_msg);
825                        continue;
826                    }
827                };
828
829                // Set headers
830                if let Err(err) = request
831                    .headers()
832                    .set("Content-Type", "application/didcomm-message+json")
833                {
834                    let err_msg = format!("Failed to set headers: {:?}", err);
835                    log::warn!("{}", err_msg);
836                    last_error = Some(err_msg);
837                    continue;
838                }
839
840                // Perform the fetch
841                let resp_promise = window.fetch_with_request(&request);
842                let resp_jsvalue = match JsFuture::from(resp_promise).await {
843                    Ok(val) => val,
844                    Err(err) => {
845                        let err_msg = format!("Fetch error: {:?}", err);
846                        log::warn!(
847                            "Failed to send message to {} (attempt {}/{}): {}",
848                            recipient,
849                            attempt,
850                            self.max_retries,
851                            err_msg
852                        );
853                        last_error = Some(err_msg);
854                        continue;
855                    }
856                };
857
858                // Convert the response to a Response object
859                let response: Response = match resp_jsvalue.dyn_into() {
860                    Ok(resp) => resp,
861                    Err(err) => {
862                        let err_msg = format!("Failed to convert response: {:?}", err);
863                        log::warn!("{}", err_msg);
864                        last_error = Some(err_msg);
865                        continue;
866                    }
867                };
868
869                // Check the status
870                if response.ok() {
871                    log::debug!("Successfully sent message to {}", recipient);
872                    success = true;
873                } else {
874                    let status = response.status();
875
876                    // Try to get the response body as text
877                    let body_promise = response.text();
878                    let body = match JsFuture::from(body_promise).await {
879                        Ok(text_jsval) => text_jsval.as_string().unwrap_or_default(),
880                        Err(_) => String::from("[Could not read response body]"),
881                    };
882
883                    let err_msg = format!("HTTP error: {} - {}", status, body);
884                    log::warn!(
885                        "Failed to send message to {} (attempt {}/{}): {}",
886                        recipient,
887                        attempt,
888                        self.max_retries,
889                        err_msg
890                    );
891                    last_error = Some(err_msg);
892
893                    // Don't retry certain status codes
894                    if status == 404 || status == 400 {
895                        break; // Don't retry not found or bad request
896                    }
897                }
898            }
899
900            if !success {
901                failures.push((
902                    recipient.clone(),
903                    last_error.unwrap_or_else(|| "Unknown error".to_string()),
904                ));
905            }
906        }
907
908        // Report failures if any
909        if !failures.is_empty() {
910            let failure_messages = failures
911                .iter()
912                .map(|(did, err)| format!("{}: {}", did, err))
913                .collect::<Vec<_>>()
914                .join("; ");
915
916            return Err(Error::Dispatch(format!(
917                "Failed to send message to some recipients: {}",
918                failure_messages
919            )));
920        }
921
922        Ok(())
923    }
924}
925
926// Fallback implementation for WASM environments without web-sys feature
927#[cfg(all(target_arch = "wasm32", not(feature = "wasm")))]
928#[async_trait(?Send)]
929impl PlainMessageSender for HttpPlainMessageSender {
930    async fn send(&self, packed_message: String, recipient_dids: Vec<String>) -> Result<()> {
931        // This is a fallback implementation when web-sys is not available in WASM
932        for recipient in &recipient_dids {
933            let endpoint = self.get_endpoint_url(recipient);
934            log::info!(
935                "Would send message to {} via HTTP at {} (WASM without web-sys)",
936                recipient,
937                endpoint
938            );
939            log::debug!("PlainMessage content: {}", packed_message);
940        }
941
942        log::warn!("HTTP sender is running in WASM without the web-sys feature enabled. No actual HTTP requests will be made.");
943        Ok(())
944    }
945}
946
947/// HTTP message sender with delivery tracking
948///
949/// This sender extends HttpPlainMessageSender with delivery tracking capabilities,
950/// recording delivery attempts, success/failure status, HTTP status codes, and retry counts
951/// in the database for monitoring and automatic retry processing.
952///
953/// # Features
954/// - All capabilities of HttpPlainMessageSender
955/// - Delivery record creation before sending
956/// - Status updates after delivery attempts
957/// - Retry count tracking
958/// - HTTP status code recording
959/// - Error message logging
960///
961/// # Usage
962///
963/// ```no_run
964/// # use std::sync::Arc;
965/// # use tap_node::{HttpPlainMessageSenderWithTracking, PlainMessageSender, Storage};
966/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
967/// // Create storage instance
968/// let storage = Arc::new(Storage::new(None).await?);
969///
970/// // Create sender with tracking
971/// let sender = HttpPlainMessageSenderWithTracking::new(
972///     "https://recipient.example.com".to_string(),
973///     storage
974/// );
975///
976/// // Send message - delivery will be tracked automatically
977/// sender.send("packed_message".to_string(), vec!["did:example:recipient".to_string()]).await?;
978/// # Ok(())
979/// # }
980/// ```
981#[derive(Debug)]
982pub struct HttpPlainMessageSenderWithTracking {
983    /// The underlying HTTP sender
984    http_sender: HttpPlainMessageSender,
985    /// Storage for tracking deliveries
986    storage: Arc<Storage>,
987}
988
989impl HttpPlainMessageSenderWithTracking {
990    /// Create a new HttpPlainMessageSenderWithTracking
991    pub fn new(base_url: String, storage: Arc<Storage>) -> Self {
992        Self {
993            http_sender: HttpPlainMessageSender::new(base_url),
994            storage,
995        }
996    }
997
998    /// Create a new HttpPlainMessageSenderWithTracking with custom options
999    pub fn with_options(
1000        base_url: String,
1001        timeout_ms: u64,
1002        max_retries: u32,
1003        storage: Arc<Storage>,
1004    ) -> Self {
1005        Self {
1006            http_sender: HttpPlainMessageSender::with_options(base_url, timeout_ms, max_retries),
1007            storage,
1008        }
1009    }
1010}
1011
1012#[async_trait]
1013impl PlainMessageSender for HttpPlainMessageSenderWithTracking {
1014    async fn send(&self, packed_message: String, recipient_dids: Vec<String>) -> Result<()> {
1015        if recipient_dids.is_empty() {
1016            return Err(Error::Dispatch("No recipients specified".to_string()));
1017        }
1018
1019        // Extract message ID from packed message for tracking
1020        // This is a simplified approach - in production you might want a more robust way to get the message ID
1021        let message_id = format!("msg_{}", uuid::Uuid::new_v4());
1022
1023        // Create delivery records for each recipient before attempting delivery
1024        let mut delivery_ids = Vec::new();
1025        for recipient in &recipient_dids {
1026            let delivery_url = Some(self.http_sender.get_endpoint_url(recipient));
1027            match self
1028                .storage
1029                .create_delivery(
1030                    &message_id,
1031                    &packed_message,
1032                    recipient,
1033                    delivery_url.as_deref(),
1034                    DeliveryType::Https,
1035                )
1036                .await
1037            {
1038                Ok(delivery_id) => {
1039                    delivery_ids.push((recipient.clone(), delivery_id));
1040                    log::debug!(
1041                        "Created delivery record {} for message {} to {}",
1042                        delivery_id,
1043                        message_id,
1044                        recipient
1045                    );
1046                }
1047                Err(e) => {
1048                    log::error!("Failed to create delivery record for {}: {}", recipient, e);
1049                    // Continue with delivery attempt even if we can't track it
1050                    delivery_ids.push((recipient.clone(), -1)); // Use -1 to indicate no tracking
1051                }
1052            }
1053        }
1054
1055        // Now attempt delivery using the underlying HTTP sender
1056        let delivery_result = self
1057            .http_sender
1058            .send(packed_message, recipient_dids.clone())
1059            .await;
1060
1061        // Update delivery records based on the result
1062        for (_recipient, delivery_id) in delivery_ids {
1063            if delivery_id == -1 {
1064                continue; // Skip if we couldn't create the delivery record
1065            }
1066
1067            match &delivery_result {
1068                Ok(_) => {
1069                    // Delivery successful
1070                    if let Err(e) = self
1071                        .storage
1072                        .update_delivery_status(
1073                            delivery_id,
1074                            DeliveryStatus::Success,
1075                            Some(200), // Assume 200 for successful delivery
1076                            None,
1077                        )
1078                        .await
1079                    {
1080                        log::error!(
1081                            "Failed to update delivery record {} to success: {}",
1082                            delivery_id,
1083                            e
1084                        );
1085                    } else {
1086                        log::debug!("Updated delivery record {} to success", delivery_id);
1087                    }
1088                }
1089                Err(e) => {
1090                    // Delivery failed - extract HTTP status code if possible
1091                    let error_msg = e.to_string();
1092                    let http_status_code = if error_msg.contains("HTTP error: ") {
1093                        // Try to extract status code from error message
1094                        error_msg
1095                            .split("HTTP error: ")
1096                            .nth(1)
1097                            .and_then(|s| s.split(' ').next())
1098                            .and_then(|s| s.parse::<i32>().ok())
1099                    } else {
1100                        None
1101                    };
1102
1103                    if let Err(e) = self
1104                        .storage
1105                        .update_delivery_status(
1106                            delivery_id,
1107                            DeliveryStatus::Failed,
1108                            http_status_code,
1109                            Some(&error_msg),
1110                        )
1111                        .await
1112                    {
1113                        log::error!(
1114                            "Failed to update delivery record {} to failed: {}",
1115                            delivery_id,
1116                            e
1117                        );
1118                    } else {
1119                        log::debug!(
1120                            "Updated delivery record {} to failed with error: {}",
1121                            delivery_id,
1122                            error_msg
1123                        );
1124                    }
1125
1126                    // Increment retry count for future retry processing
1127                    if let Err(e) = self
1128                        .storage
1129                        .increment_delivery_retry_count(delivery_id)
1130                        .await
1131                    {
1132                        log::error!(
1133                            "Failed to increment retry count for delivery record {}: {}",
1134                            delivery_id,
1135                            e
1136                        );
1137                    }
1138                }
1139            }
1140        }
1141
1142        delivery_result
1143    }
1144}