1use async_trait::async_trait;
70#[cfg(all(not(target_arch = "wasm32"), feature = "websocket"))]
71use std::collections::HashMap;
72use std::fmt::{self, Debug};
73use std::sync::Arc;
74
75use crate::error::{Error, Result};
76
77#[async_trait]
79pub trait MessageSender: Send + Sync + Debug {
80 async fn send(&self, packed_message: String, recipient_dids: Vec<String>) -> Result<()>;
82}
83
84pub struct NodeMessageSender {
86 #[allow(dead_code)]
88 send_callback: Arc<dyn Fn(String, Vec<String>) -> Result<()> + Send + Sync>,
89}
90
91impl Debug for NodeMessageSender {
92 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
93 f.debug_struct("NodeMessageSender")
94 .field("send_callback", &"<function>")
95 .finish()
96 }
97}
98
99impl NodeMessageSender {
100 pub fn new<F>(callback: F) -> Self
102 where
103 F: Fn(String, Vec<String>) -> Result<()> + Send + Sync + 'static,
104 {
105 Self {
106 send_callback: Arc::new(callback),
107 }
108 }
109}
110
111#[async_trait]
112impl MessageSender for NodeMessageSender {
113 async fn send(&self, packed_message: String, recipient_dids: Vec<String>) -> Result<()> {
114 (self.send_callback)(packed_message, recipient_dids)
116 .map_err(|e| Error::Dispatch(format!("Failed to send message: {}", e)))
117 }
118}
119
120#[derive(Debug)]
145pub struct HttpMessageSender {
146 base_url: String,
148 #[cfg(feature = "reqwest")]
150 client: reqwest::Client,
151 #[allow(dead_code)]
153 timeout_ms: u64,
154 max_retries: u32,
156}
157
158impl HttpMessageSender {
159 pub fn new(base_url: String) -> Self {
161 Self::with_options(base_url, 30000, 3) }
163
164 pub fn with_options(base_url: String, timeout_ms: u64, max_retries: u32) -> Self {
166 #[cfg(feature = "reqwest")]
167 {
168 let client = reqwest::Client::builder()
170 .timeout(std::time::Duration::from_millis(timeout_ms))
171 .user_agent("TAP-Node/0.1")
172 .build()
173 .unwrap_or_default();
174
175 Self {
176 base_url,
177 client,
178 timeout_ms,
179 max_retries,
180 }
181 }
182
183 #[cfg(not(feature = "reqwest"))]
184 {
185 Self {
186 base_url,
187 timeout_ms,
188 max_retries,
189 }
190 }
191 }
192
193 fn get_endpoint_url(&self, recipient_did: &str) -> String {
195 let encoded_did = self.url_encode(recipient_did);
201 format!(
202 "{}/api/messages/{}",
203 self.base_url.trim_end_matches('/'),
204 encoded_did
205 )
206 }
207
208 fn url_encode(&self, text: &str) -> String {
210 text.replace(':', "%3A").replace('/', "%2F")
213 }
214}
215
216#[derive(Debug)]
248pub struct WebSocketMessageSender {
249 base_url: String,
251 #[cfg(all(not(target_arch = "wasm32"), feature = "websocket"))]
253 connections: std::sync::Mutex<HashMap<String, tokio::sync::mpsc::Sender<String>>>,
254 #[cfg(all(not(target_arch = "wasm32"), feature = "websocket"))]
256 task_handles: std::sync::Mutex<HashMap<String, tokio::task::JoinHandle<()>>>,
257}
258
259impl WebSocketMessageSender {
260 pub fn new(base_url: String) -> Self {
262 Self::with_options(base_url)
263 }
264
265 pub fn with_options(base_url: String) -> Self {
267 #[cfg(all(not(target_arch = "wasm32"), feature = "websocket"))]
268 {
269 Self {
270 base_url,
271 connections: std::sync::Mutex::new(HashMap::new()),
272 task_handles: std::sync::Mutex::new(HashMap::new()),
273 }
274 }
275
276 #[cfg(not(all(not(target_arch = "wasm32"), feature = "websocket")))]
277 {
278 Self { base_url }
279 }
280 }
281
282 fn get_endpoint_url(&self, recipient_did: &str) -> String {
284 let ws_base_url = if self.base_url.starts_with("https://") {
289 self.base_url.replace("https://", "wss://")
290 } else if self.base_url.starts_with("http://") {
291 self.base_url.replace("http://", "ws://")
292 } else {
293 self.base_url.clone()
294 };
295
296 let encoded_did = self.url_encode(recipient_did);
298 format!(
299 "{}/ws/messages/{}",
300 ws_base_url.trim_end_matches('/'),
301 encoded_did
302 )
303 }
304
305 fn url_encode(&self, text: &str) -> String {
307 text.replace(':', "%3A").replace('/', "%2F")
310 }
311
312 #[cfg(all(not(target_arch = "wasm32"), feature = "websocket"))]
314 async fn ensure_connection(
315 &self,
316 recipient: &str,
317 ) -> Result<tokio::sync::mpsc::Sender<String>> {
318 use futures::sink::SinkExt;
319 use futures::stream::StreamExt;
320 use tokio_tungstenite::{connect_async, tungstenite::protocol::Message};
321
322 {
324 let connections = self.connections.lock().unwrap();
326 if let Some(connection) = connections.get(recipient) {
327 return Ok(connection.clone());
328 }
329 }
330
331 let endpoint = self.get_endpoint_url(recipient);
333 log::info!(
334 "Creating new WebSocket connection to {} at {}",
335 recipient,
336 endpoint
337 );
338
339 let (tx, mut rx) = tokio::sync::mpsc::channel::<String>(100);
341
342 let (ws_stream, _) = match tokio::time::timeout(
344 std::time::Duration::from_millis(30000),
345 connect_async(&endpoint),
346 )
347 .await
348 {
349 Ok(Ok(stream)) => stream,
350 Ok(Err(e)) => {
351 return Err(Error::Dispatch(format!(
352 "Failed to connect to WebSocket endpoint {}: {}",
353 endpoint, e
354 )));
355 }
356 Err(_) => {
357 return Err(Error::Dispatch(format!(
358 "Connection to WebSocket endpoint {} timed out",
359 endpoint
360 )));
361 }
362 };
363
364 log::debug!("WebSocket connection established to {}", recipient);
365
366 let (mut write, mut read) = ws_stream.split();
368
369 let recipient_clone = recipient.to_string();
373 let handle = tokio::spawn(async move {
374 loop {
376 tokio::select! {
377 Some(message) = rx.recv() => {
379 log::debug!("Sending message to {} via WebSocket", recipient_clone);
380 if let Err(e) = write.send(Message::Text(message)).await {
381 log::error!("Failed to send WebSocket message to {}: {}", recipient_clone, e);
382 }
384 }
385
386 result = read.next() => {
388 match result {
389 Some(Ok(message)) => {
390 if let Message::Text(text) = message {
392 log::debug!("Received WebSocket message from {}: {}", recipient_clone, text);
393 }
394 }
395 Some(Err(e)) => {
396 log::error!("WebSocket error from {}: {}", recipient_clone, e);
397 break;
399 }
400 None => {
401 log::info!("WebSocket connection to {} closed", recipient_clone);
403 break;
404 }
405 }
406 }
407 }
408 }
409
410 log::info!("WebSocket connection to {} terminated", recipient_clone);
412 });
413
414 {
416 let mut connections = self.connections.lock().unwrap();
418 connections.insert(recipient.to_string(), tx.clone());
419 }
420
421 {
422 let mut task_handles = self.task_handles.lock().unwrap();
424 task_handles.insert(recipient.to_string(), handle);
425 }
426
427 Ok(tx)
428 }
429}
430
431#[cfg(all(not(target_arch = "wasm32"), feature = "websocket"))]
432#[async_trait]
433impl MessageSender for WebSocketMessageSender {
434 async fn send(&self, packed_message: String, recipient_dids: Vec<String>) -> Result<()> {
435 if recipient_dids.is_empty() {
436 return Err(Error::Dispatch("No recipients specified".to_string()));
437 }
438
439 let mut failures = Vec::new();
441
442 for recipient in &recipient_dids {
444 log::info!("Sending message to {} via WebSocket", recipient);
445
446 match self.ensure_connection(recipient).await {
448 Ok(sender) => {
449 if let Err(e) = sender.send(packed_message.clone()).await {
451 let err_msg = format!("Failed to send message to WebSocket task: {}", e);
452 log::error!("{}", err_msg);
453 failures.push((recipient.clone(), err_msg));
454 }
455 }
456 Err(e) => {
457 let err_msg = format!("Failed to establish WebSocket connection: {}", e);
458 log::error!("{}", err_msg);
459 failures.push((recipient.clone(), err_msg));
460 }
461 }
462 }
463
464 if !failures.is_empty() {
466 let failure_messages = failures
467 .iter()
468 .map(|(did, err)| format!("{}: {}", did, err))
469 .collect::<Vec<_>>()
470 .join("; ");
471
472 return Err(Error::Dispatch(format!(
473 "Failed to send message to some recipients via WebSocket: {}",
474 failure_messages
475 )));
476 }
477
478 Ok(())
479 }
480}
481
482#[cfg(all(target_arch = "wasm32", feature = "wasm"))]
484#[async_trait(?Send)]
485impl MessageSender for WebSocketMessageSender {
486 async fn send(&self, packed_message: String, recipient_dids: Vec<String>) -> Result<()> {
487 use wasm_bindgen::prelude::*;
488 use wasm_bindgen_futures::JsFuture;
489 use web_sys::{MessageEvent, WebSocket};
490
491 if recipient_dids.is_empty() {
492 return Err(Error::Dispatch("No recipients specified".to_string()));
493 }
494
495 let mut failures = Vec::new();
497
498 let window = web_sys::window().ok_or_else(|| {
500 Error::Dispatch("Could not get window object in WASM environment".to_string())
501 })?;
502
503 for recipient in &recipient_dids {
505 let endpoint = self.get_endpoint_url(recipient);
506 log::info!(
507 "Sending message to {} via WebSocket at {} (WASM)",
508 recipient,
509 endpoint
510 );
511
512 let (resolve, reject) = js_sys::Promise::new_resolver();
514 let promise_resolver = resolve.clone();
515 let promise_rejecter = reject.clone();
516
517 let ws = match WebSocket::new(&endpoint) {
519 Ok(ws) => ws,
520 Err(err) => {
521 let err_msg = format!("Failed to create WebSocket: {:?}", err);
522 log::error!("{}", err_msg);
523 failures.push((recipient.clone(), err_msg));
524 continue;
525 }
526 };
527
528 let onopen_callback = Closure::once(Box::new(move |_: web_sys::Event| {
530 promise_resolver.resolve(&JsValue::from(true));
531 }) as Box<dyn FnOnce(web_sys::Event)>);
532
533 let onerror_callback = Closure::once(Box::new(move |e: web_sys::Event| {
534 let err_msg = format!("WebSocket error: {:?}", e);
535 promise_rejecter.reject(&JsValue::from_str(&err_msg));
536 }) as Box<dyn FnOnce(web_sys::Event)>);
537
538 let message_clone = packed_message.clone();
539 let onmessage_callback = Closure::wrap(Box::new(move |e: MessageEvent| {
540 if let Ok(txt) = e.data().dyn_into::<js_sys::JsString>() {
541 log::debug!("Received message: {}", String::from(txt));
542 }
543 }) as Box<dyn FnMut(MessageEvent)>);
544
545 ws.set_onopen(Some(onopen_callback.as_ref().unchecked_ref()));
547 ws.set_onerror(Some(onerror_callback.as_ref().unchecked_ref()));
548 ws.set_onmessage(Some(onmessage_callback.as_ref().unchecked_ref()));
549
550 match JsFuture::from(js_sys::Promise::race(&js_sys::Array::of2(
552 &js_sys::Promise::resolve(&promise_resolver),
553 &js_sys::Promise::new(&mut |resolve, _| {
554 let timeout_closure = Closure::once_into_js(move || {
555 resolve.call0(&JsValue::NULL).unwrap();
556 });
557 window
558 .set_timeout_with_callback_and_timeout_and_arguments_0(
559 timeout_closure.as_ref().unchecked_ref(),
560 30000, )
562 .unwrap();
563 }),
564 )))
565 .await
566 {
567 Ok(_) => {
568 if let Err(err) = ws.send_with_str(&message_clone) {
570 let err_msg = format!("Failed to send WebSocket message: {:?}", err);
571 log::error!("{}", err_msg);
572 failures.push((recipient.clone(), err_msg));
573 }
574 }
575 Err(err) => {
576 let err_msg = format!("WebSocket connection failed: {:?}", err);
577 log::error!("{}", err_msg);
578 failures.push((recipient.clone(), err_msg));
579 }
580 }
581
582 onopen_callback.forget();
584 onerror_callback.forget();
585 onmessage_callback.forget();
586 }
587
588 if !failures.is_empty() {
590 let failure_messages = failures
591 .iter()
592 .map(|(did, err)| format!("{}: {}", did, err))
593 .collect::<Vec<_>>()
594 .join("; ");
595
596 return Err(Error::Dispatch(format!(
597 "Failed to send message to some recipients via WebSocket: {}",
598 failure_messages
599 )));
600 }
601
602 Ok(())
603 }
604}
605
606#[cfg(not(any(
608 all(not(target_arch = "wasm32"), feature = "websocket"),
609 all(target_arch = "wasm32", feature = "wasm")
610)))]
611#[async_trait]
612impl MessageSender for WebSocketMessageSender {
613 async fn send(&self, packed_message: String, recipient_dids: Vec<String>) -> Result<()> {
614 for recipient in &recipient_dids {
616 let endpoint = self.get_endpoint_url(recipient);
617 log::info!(
618 "Would send message to {} via WebSocket at {} (WebSocket not available)",
619 recipient,
620 endpoint
621 );
622 log::debug!("Message content: {}", packed_message);
623 }
624
625 log::warn!("WebSocket sender is running without WebSocket features enabled. No actual WebSocket connections will be made.");
626 Ok(())
627 }
628}
629
630#[cfg(all(not(target_arch = "wasm32"), feature = "reqwest"))]
631#[async_trait]
632impl MessageSender for HttpMessageSender {
633 async fn send(&self, packed_message: String, recipient_dids: Vec<String>) -> Result<()> {
634 if recipient_dids.is_empty() {
635 return Err(Error::Dispatch("No recipients specified".to_string()));
636 }
637
638 let mut failures = Vec::new();
640
641 for recipient in &recipient_dids {
643 let endpoint = self.get_endpoint_url(recipient);
644 log::info!("Sending message to {} via HTTP at {}", recipient, endpoint);
645
646 let mut attempt = 0;
648 let mut success = false;
649 let mut last_error = None;
650
651 while attempt < self.max_retries && !success {
652 attempt += 1;
653
654 if attempt > 1 {
656 let backoff_ms = 100 * (2_u64.pow(attempt - 1));
657 tokio::time::sleep(std::time::Duration::from_millis(backoff_ms)).await;
658 }
659
660 match self
662 .client
663 .post(&endpoint)
664 .header("Content-Type", "application/didcomm-message+json")
665 .body(packed_message.clone())
666 .send()
667 .await
668 {
669 Ok(response) => {
670 if response.status().is_success() {
672 log::debug!("Successfully sent message to {}", recipient);
673 success = true;
674 } else {
675 let status = response.status();
676 let body = response.text().await.unwrap_or_default();
677 log::warn!(
678 "Failed to send message to {} (attempt {}/{}): HTTP {} - {}",
679 recipient,
680 attempt,
681 self.max_retries,
682 status,
683 body
684 );
685 last_error = Some(format!("HTTP error: {} - {}", status, body));
686
687 if status.as_u16() == 404 || status.as_u16() == 400 {
689 break; }
691 }
692 }
693 Err(err) => {
694 log::warn!(
695 "Failed to send message to {} (attempt {}/{}): {}",
696 recipient,
697 attempt,
698 self.max_retries,
699 err
700 );
701 last_error = Some(format!("Request error: {}", err));
702 }
703 }
704 }
705
706 if !success {
707 failures.push((
709 recipient.clone(),
710 last_error.unwrap_or_else(|| "Unknown error".to_string()),
711 ));
712 }
713 }
714
715 if !failures.is_empty() {
717 let failure_messages = failures
718 .iter()
719 .map(|(did, err)| format!("{}: {}", did, err))
720 .collect::<Vec<_>>()
721 .join("; ");
722
723 return Err(Error::Dispatch(format!(
724 "Failed to send message to some recipients: {}",
725 failure_messages
726 )));
727 }
728
729 Ok(())
730 }
731}
732
733#[cfg(all(not(target_arch = "wasm32"), not(feature = "reqwest")))]
734#[async_trait]
735impl MessageSender for HttpMessageSender {
736 async fn send(&self, packed_message: String, recipient_dids: Vec<String>) -> Result<()> {
737 for recipient in &recipient_dids {
741 let endpoint = self.get_endpoint_url(recipient);
742 log::info!(
743 "Would send message to {} via HTTP at {} (reqwest not available)",
744 recipient,
745 endpoint
746 );
747 log::debug!("Message content: {}", packed_message);
748 }
749
750 log::warn!("HTTP sender is running without reqwest feature enabled. No actual HTTP requests will be made.");
751 Ok(())
752 }
753}
754
755#[cfg(all(target_arch = "wasm32", feature = "wasm"))]
757#[async_trait(?Send)]
758impl MessageSender for HttpMessageSender {
759 async fn send(&self, packed_message: String, recipient_dids: Vec<String>) -> Result<()> {
760 use wasm_bindgen::prelude::*;
761 use wasm_bindgen_futures::JsFuture;
762 use web_sys::{Request, RequestInit, RequestMode, Response};
763
764 if recipient_dids.is_empty() {
765 return Err(Error::Dispatch("No recipients specified".to_string()));
766 }
767
768 let mut failures = Vec::new();
770
771 let window = web_sys::window().ok_or_else(|| {
773 Error::Dispatch("Could not get window object in WASM environment".to_string())
774 })?;
775
776 for recipient in &recipient_dids {
778 let endpoint = self.get_endpoint_url(recipient);
779 log::info!(
780 "Sending message to {} via HTTP at {} (WASM)",
781 recipient,
782 endpoint
783 );
784
785 let mut attempt = 0;
787 let mut success = false;
788 let mut last_error = None;
789
790 while attempt < self.max_retries && !success {
791 attempt += 1;
792
793 if attempt > 1 {
795 let backoff_ms = 100 * (2_u64.pow(attempt - 1));
796 let promise = js_sys::Promise::new(&mut |resolve, _| {
798 let closure = Closure::once_into_js(move || {
799 resolve.call0(&JsValue::NULL).unwrap();
800 });
801 window
802 .set_timeout_with_callback_and_timeout_and_arguments_0(
803 closure.as_ref().unchecked_ref(),
804 backoff_ms as i32,
805 )
806 .unwrap();
807 });
808
809 let _ = JsFuture::from(promise).await;
810 }
811
812 let mut opts = RequestInit::new();
814 opts.method("POST");
815 opts.mode(RequestMode::Cors);
816 opts.body(Some(&JsValue::from_str(&packed_message)));
817
818 let request = match Request::new_with_str_and_init(&endpoint, &opts) {
819 Ok(req) => req,
820 Err(err) => {
821 let err_msg = format!("Failed to create request: {:?}", err);
822 log::warn!("{}", err_msg);
823 last_error = Some(err_msg);
824 continue;
825 }
826 };
827
828 if let Err(err) = request
830 .headers()
831 .set("Content-Type", "application/didcomm-message+json")
832 {
833 let err_msg = format!("Failed to set headers: {:?}", err);
834 log::warn!("{}", err_msg);
835 last_error = Some(err_msg);
836 continue;
837 }
838
839 let resp_promise = window.fetch_with_request(&request);
841 let resp_jsvalue = match JsFuture::from(resp_promise).await {
842 Ok(val) => val,
843 Err(err) => {
844 let err_msg = format!("Fetch error: {:?}", err);
845 log::warn!(
846 "Failed to send message to {} (attempt {}/{}): {}",
847 recipient,
848 attempt,
849 self.max_retries,
850 err_msg
851 );
852 last_error = Some(err_msg);
853 continue;
854 }
855 };
856
857 let response: Response = match resp_jsvalue.dyn_into() {
859 Ok(resp) => resp,
860 Err(err) => {
861 let err_msg = format!("Failed to convert response: {:?}", err);
862 log::warn!("{}", err_msg);
863 last_error = Some(err_msg);
864 continue;
865 }
866 };
867
868 if response.ok() {
870 log::debug!("Successfully sent message to {}", recipient);
871 success = true;
872 } else {
873 let status = response.status();
874
875 let body_promise = response.text();
877 let body = match JsFuture::from(body_promise).await {
878 Ok(text_jsval) => text_jsval.as_string().unwrap_or_default(),
879 Err(_) => String::from("[Could not read response body]"),
880 };
881
882 let err_msg = format!("HTTP error: {} - {}", status, body);
883 log::warn!(
884 "Failed to send message to {} (attempt {}/{}): {}",
885 recipient,
886 attempt,
887 self.max_retries,
888 err_msg
889 );
890 last_error = Some(err_msg);
891
892 if status == 404 || status == 400 {
894 break; }
896 }
897 }
898
899 if !success {
900 failures.push((
901 recipient.clone(),
902 last_error.unwrap_or_else(|| "Unknown error".to_string()),
903 ));
904 }
905 }
906
907 if !failures.is_empty() {
909 let failure_messages = failures
910 .iter()
911 .map(|(did, err)| format!("{}: {}", did, err))
912 .collect::<Vec<_>>()
913 .join("; ");
914
915 return Err(Error::Dispatch(format!(
916 "Failed to send message to some recipients: {}",
917 failure_messages
918 )));
919 }
920
921 Ok(())
922 }
923}
924
925#[cfg(all(target_arch = "wasm32", not(feature = "wasm")))]
927#[async_trait(?Send)]
928impl MessageSender for HttpMessageSender {
929 async fn send(&self, packed_message: String, recipient_dids: Vec<String>) -> Result<()> {
930 for recipient in &recipient_dids {
932 let endpoint = self.get_endpoint_url(recipient);
933 log::info!(
934 "Would send message to {} via HTTP at {} (WASM without web-sys)",
935 recipient,
936 endpoint
937 );
938 log::debug!("Message content: {}", packed_message);
939 }
940
941 log::warn!("HTTP sender is running in WASM without the web-sys feature enabled. No actual HTTP requests will be made.");
942 Ok(())
943 }
944}