1use async_trait::async_trait;
69#[cfg(all(not(target_arch = "wasm32"), feature = "websocket"))]
70use std::collections::HashMap;
71use std::fmt::{self, Debug};
72use std::sync::Arc;
73
74use crate::error::{Error, Result};
75use crate::storage::{
76 models::{DeliveryStatus, DeliveryType},
77 Storage,
78};
79
80#[async_trait]
82pub trait PlainMessageSender: Send + Sync + Debug {
83 async fn send(&self, packed_message: String, recipient_dids: Vec<String>) -> Result<()>;
85}
86
87pub struct NodePlainMessageSender {
89 send_callback: Arc<dyn Fn(String, Vec<String>) -> Result<()> + Send + Sync>,
91}
92
93impl Debug for NodePlainMessageSender {
94 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
95 f.debug_struct("NodePlainMessageSender")
96 .field("send_callback", &"<function>")
97 .finish()
98 }
99}
100
101impl NodePlainMessageSender {
102 pub fn new<F>(callback: F) -> Self
104 where
105 F: Fn(String, Vec<String>) -> Result<()> + Send + Sync + 'static,
106 {
107 Self {
108 send_callback: Arc::new(callback),
109 }
110 }
111}
112
113#[async_trait]
114impl PlainMessageSender for NodePlainMessageSender {
115 async fn send(&self, packed_message: String, recipient_dids: Vec<String>) -> Result<()> {
116 (self.send_callback)(packed_message, recipient_dids)
118 .map_err(|e| Error::Dispatch(format!("Failed to send message: {}", e)))
119 }
120}
121
122#[derive(Debug)]
147pub struct HttpPlainMessageSender {
148 base_url: String,
150 #[cfg(feature = "reqwest")]
152 client: reqwest::Client,
153 #[allow(dead_code)] timeout_ms: u64,
156 max_retries: u32,
158}
159
160impl HttpPlainMessageSender {
161 pub fn new(base_url: String) -> Self {
163 Self::with_options(base_url, 30000, 3) }
165
166 pub fn with_options(base_url: String, timeout_ms: u64, max_retries: u32) -> Self {
168 #[cfg(feature = "reqwest")]
169 {
170 let client = reqwest::Client::builder()
172 .timeout(std::time::Duration::from_millis(timeout_ms))
173 .user_agent("TAP-Node/0.1")
174 .build()
175 .unwrap_or_default();
176
177 Self {
178 base_url,
179 client,
180 timeout_ms,
181 max_retries,
182 }
183 }
184
185 #[cfg(not(feature = "reqwest"))]
186 {
187 Self {
188 base_url,
189 timeout_ms,
190 max_retries,
191 }
192 }
193 }
194
195 pub fn get_endpoint_url(&self, recipient_did: &str) -> String {
197 let encoded_did = self.url_encode(recipient_did);
203 format!(
204 "{}/api/messages/{}",
205 self.base_url.trim_end_matches('/'),
206 encoded_did
207 )
208 }
209
210 fn url_encode(&self, text: &str) -> String {
212 use percent_encoding::{utf8_percent_encode, NON_ALPHANUMERIC};
213 utf8_percent_encode(text, NON_ALPHANUMERIC).to_string()
214 }
215}
216
217#[derive(Debug)]
249pub struct WebSocketPlainMessageSender {
250 base_url: String,
252 #[cfg(all(not(target_arch = "wasm32"), feature = "websocket"))]
254 connections: std::sync::Mutex<HashMap<String, tokio::sync::mpsc::Sender<String>>>,
255 #[cfg(all(not(target_arch = "wasm32"), feature = "websocket"))]
257 task_handles: std::sync::Mutex<HashMap<String, tokio::task::JoinHandle<()>>>,
258}
259
260impl WebSocketPlainMessageSender {
261 pub fn new(base_url: String) -> Self {
263 Self::with_options(base_url)
264 }
265
266 pub fn with_options(base_url: String) -> Self {
268 #[cfg(all(not(target_arch = "wasm32"), feature = "websocket"))]
269 {
270 Self {
271 base_url,
272 connections: std::sync::Mutex::new(HashMap::new()),
273 task_handles: std::sync::Mutex::new(HashMap::new()),
274 }
275 }
276
277 #[cfg(not(all(not(target_arch = "wasm32"), feature = "websocket")))]
278 {
279 Self { base_url }
280 }
281 }
282
283 fn get_endpoint_url(&self, recipient_did: &str) -> String {
285 let ws_base_url = if self.base_url.starts_with("https://") {
290 self.base_url.replace("https://", "wss://")
291 } else if self.base_url.starts_with("http://") {
292 self.base_url.replace("http://", "ws://")
293 } else {
294 self.base_url.clone()
295 };
296
297 let encoded_did = self.url_encode(recipient_did);
299 format!(
300 "{}/ws/messages/{}",
301 ws_base_url.trim_end_matches('/'),
302 encoded_did
303 )
304 }
305
306 fn url_encode(&self, text: &str) -> String {
308 use percent_encoding::{utf8_percent_encode, NON_ALPHANUMERIC};
309 utf8_percent_encode(text, NON_ALPHANUMERIC).to_string()
310 }
311
312 #[cfg(all(not(target_arch = "wasm32"), feature = "websocket"))]
314 async fn ensure_connection(
315 &self,
316 recipient: &str,
317 ) -> Result<tokio::sync::mpsc::Sender<String>> {
318 use futures::sink::SinkExt;
319 use futures::stream::StreamExt;
320 use tokio_tungstenite::connect_async;
321 use tokio_tungstenite::tungstenite::protocol::Message;
322
323 {
325 let connections = self.connections.lock().unwrap();
327 if let Some(connection) = connections.get(recipient) {
328 return Ok(connection.clone());
329 }
330 }
331
332 let endpoint = self.get_endpoint_url(recipient);
334 log::info!(
335 "Creating new WebSocket connection to {} at {}",
336 recipient,
337 endpoint
338 );
339
340 let (tx, mut rx) = tokio::sync::mpsc::channel::<String>(100);
342
343 let (ws_stream, _) = match tokio::time::timeout(
345 std::time::Duration::from_millis(30000),
346 connect_async(&endpoint),
347 )
348 .await
349 {
350 Ok(Ok(stream)) => stream,
351 Ok(Err(e)) => {
352 return Err(Error::Dispatch(format!(
353 "Failed to connect to WebSocket endpoint {}: {}",
354 endpoint, e
355 )));
356 }
357 Err(_) => {
358 return Err(Error::Dispatch(format!(
359 "Connection to WebSocket endpoint {} timed out",
360 endpoint
361 )));
362 }
363 };
364
365 log::debug!("WebSocket connection established to {}", recipient);
366
367 let (mut write, mut read) = ws_stream.split();
369
370 let recipient_clone = recipient.to_string();
374 let handle = tokio::spawn(async move {
375 loop {
377 tokio::select! {
378 Some(message) = rx.recv() => {
380 log::debug!("Sending message to {} via WebSocket", recipient_clone);
381 if let Err(e) = write.send(Message::Text(message)).await {
382 log::error!("Failed to send WebSocket message to {}: {}", recipient_clone, e);
383 }
385 }
386
387 result = read.next() => {
389 match result {
390 Some(Ok(message)) => {
391 if let Message::Text(text) = message {
393 log::debug!("Received WebSocket message from {}: {}", recipient_clone, text);
394 }
395 }
396 Some(Err(e)) => {
397 log::error!("WebSocket error from {}: {}", recipient_clone, e);
398 break;
400 }
401 None => {
402 log::info!("WebSocket connection to {} closed", recipient_clone);
404 break;
405 }
406 }
407 }
408 }
409 }
410
411 log::info!("WebSocket connection to {} terminated", recipient_clone);
413 });
414
415 {
417 let mut connections = self.connections.lock().unwrap();
419 connections.insert(recipient.to_string(), tx.clone());
420 }
421
422 {
423 let mut task_handles = self.task_handles.lock().unwrap();
425 task_handles.insert(recipient.to_string(), handle);
426 }
427
428 Ok(tx)
429 }
430}
431
432#[cfg(all(not(target_arch = "wasm32"), feature = "websocket"))]
433#[async_trait]
434impl PlainMessageSender for WebSocketPlainMessageSender {
435 async fn send(&self, packed_message: String, recipient_dids: Vec<String>) -> Result<()> {
436 if recipient_dids.is_empty() {
437 return Err(Error::Dispatch("No recipients specified".to_string()));
438 }
439
440 let mut failures = Vec::new();
442
443 for recipient in &recipient_dids {
445 log::info!("Sending message to {} via WebSocket", recipient);
446
447 match self.ensure_connection(recipient).await {
449 Ok(sender) => {
450 if let Err(e) = sender.send(packed_message.clone()).await {
452 let err_msg = format!("Failed to send message to WebSocket task: {}", e);
453 log::error!("{}", err_msg);
454 failures.push((recipient.clone(), err_msg));
455 }
456 }
457 Err(e) => {
458 let err_msg = format!("Failed to establish WebSocket connection: {}", e);
459 log::error!("{}", err_msg);
460 failures.push((recipient.clone(), err_msg));
461 }
462 }
463 }
464
465 if !failures.is_empty() {
467 let failure_messages = failures
468 .iter()
469 .map(|(did, err)| format!("{}: {}", did, err))
470 .collect::<Vec<_>>()
471 .join("; ");
472
473 return Err(Error::Dispatch(format!(
474 "Failed to send message to some recipients via WebSocket: {}",
475 failure_messages
476 )));
477 }
478
479 Ok(())
480 }
481}
482
483#[cfg(all(target_arch = "wasm32", feature = "wasm"))]
485#[async_trait(?Send)]
486impl PlainMessageSender for WebSocketPlainMessageSender {
487 async fn send(&self, packed_message: String, recipient_dids: Vec<String>) -> Result<()> {
488 use wasm_bindgen::prelude::*;
489 use wasm_bindgen_futures::JsFuture;
490 use web_sys::{MessageEvent, WebSocket};
491
492 if recipient_dids.is_empty() {
493 return Err(Error::Dispatch("No recipients specified".to_string()));
494 }
495
496 let mut failures = Vec::new();
498
499 let window = web_sys::window().ok_or_else(|| {
501 Error::Dispatch("Could not get window object in WASM environment".to_string())
502 })?;
503
504 for recipient in &recipient_dids {
506 let endpoint = self.get_endpoint_url(recipient);
507 log::info!(
508 "Sending message to {} via WebSocket at {} (WASM)",
509 recipient,
510 endpoint
511 );
512
513 let (resolve, reject) = js_sys::Promise::new_resolver();
515 let promise_resolver = resolve.clone();
516 let promise_rejecter = reject.clone();
517
518 let ws = match WebSocket::new(&endpoint) {
520 Ok(ws) => ws,
521 Err(err) => {
522 let err_msg = format!("Failed to create WebSocket: {:?}", err);
523 log::error!("{}", err_msg);
524 failures.push((recipient.clone(), err_msg));
525 continue;
526 }
527 };
528
529 let onopen_callback = Closure::once(Box::new(move |_: web_sys::Event| {
531 promise_resolver.resolve(&JsValue::from(true));
532 }) as Box<dyn FnOnce(web_sys::Event)>);
533
534 let onerror_callback = Closure::once(Box::new(move |e: web_sys::Event| {
535 let err_msg = format!("WebSocket error: {:?}", e);
536 promise_rejecter.reject(&JsValue::from_str(&err_msg));
537 }) as Box<dyn FnOnce(web_sys::Event)>);
538
539 let message_clone = packed_message.clone();
540 let onmessage_callback = Closure::wrap(Box::new(move |e: MessageEvent| {
541 if let Ok(txt) = e.data().dyn_into::<js_sys::JsString>() {
542 log::debug!("Received message: {}", String::from(txt));
543 }
544 }) as Box<dyn FnMut(MessageEvent)>);
545
546 ws.set_onopen(Some(onopen_callback.as_ref().unchecked_ref()));
548 ws.set_onerror(Some(onerror_callback.as_ref().unchecked_ref()));
549 ws.set_onmessage(Some(onmessage_callback.as_ref().unchecked_ref()));
550
551 match JsFuture::from(js_sys::Promise::race(&js_sys::Array::of2(
553 &js_sys::Promise::resolve(&promise_resolver),
554 &js_sys::Promise::new(&mut |resolve, _| {
555 let timeout_closure = Closure::once_into_js(move || {
556 resolve.call0(&JsValue::NULL).unwrap();
557 });
558 window
559 .set_timeout_with_callback_and_timeout_and_arguments_0(
560 timeout_closure.as_ref().unchecked_ref(),
561 30000, )
563 .unwrap();
564 }),
565 )))
566 .await
567 {
568 Ok(_) => {
569 if let Err(err) = ws.send_with_str(&message_clone) {
571 let err_msg = format!("Failed to send WebSocket message: {:?}", err);
572 log::error!("{}", err_msg);
573 failures.push((recipient.clone(), err_msg));
574 }
575 }
576 Err(err) => {
577 let err_msg = format!("WebSocket connection failed: {:?}", err);
578 log::error!("{}", err_msg);
579 failures.push((recipient.clone(), err_msg));
580 }
581 }
582
583 onopen_callback.forget();
585 onerror_callback.forget();
586 onmessage_callback.forget();
587 }
588
589 if !failures.is_empty() {
591 let failure_messages = failures
592 .iter()
593 .map(|(did, err)| format!("{}: {}", did, err))
594 .collect::<Vec<_>>()
595 .join("; ");
596
597 return Err(Error::Dispatch(format!(
598 "Failed to send message to some recipients via WebSocket: {}",
599 failure_messages
600 )));
601 }
602
603 Ok(())
604 }
605}
606
607#[cfg(not(any(
609 all(not(target_arch = "wasm32"), feature = "websocket"),
610 all(target_arch = "wasm32", feature = "wasm")
611)))]
612#[async_trait]
613impl PlainMessageSender for WebSocketPlainMessageSender {
614 async fn send(&self, packed_message: String, recipient_dids: Vec<String>) -> Result<()> {
615 for recipient in &recipient_dids {
617 let endpoint = self.get_endpoint_url(recipient);
618 log::info!(
619 "Would send message to {} via WebSocket at {} (WebSocket not available)",
620 recipient,
621 endpoint
622 );
623 log::debug!("PlainMessage content: {}", packed_message);
624 }
625
626 log::warn!("WebSocket sender is running without WebSocket features enabled. No actual WebSocket connections will be made.");
627 Ok(())
628 }
629}
630
631#[cfg(all(not(target_arch = "wasm32"), feature = "reqwest"))]
632#[async_trait]
633impl PlainMessageSender for HttpPlainMessageSender {
634 async fn send(&self, packed_message: String, recipient_dids: Vec<String>) -> Result<()> {
635 if recipient_dids.is_empty() {
636 return Err(Error::Dispatch("No recipients specified".to_string()));
637 }
638
639 let mut failures = Vec::new();
641
642 for recipient in &recipient_dids {
644 let endpoint = self.get_endpoint_url(recipient);
645 log::info!("Sending message to {} via HTTP at {}", recipient, endpoint);
646
647 let mut attempt = 0;
649 let mut success = false;
650 let mut last_error = None;
651
652 while attempt < self.max_retries && !success {
653 attempt += 1;
654
655 if attempt > 1 {
657 let backoff_ms = 100 * (2_u64.pow(attempt - 1));
658 tokio::time::sleep(std::time::Duration::from_millis(backoff_ms)).await;
659 }
660
661 match self
663 .client
664 .post(&endpoint)
665 .header("Content-Type", "application/didcomm-message+json")
666 .body(packed_message.clone())
667 .send()
668 .await
669 {
670 Ok(response) => {
671 if response.status().is_success() {
673 log::debug!("Successfully sent message to {}", recipient);
674 success = true;
675 } else {
676 let status = response.status();
677 let body = response.text().await.unwrap_or_default();
678 log::warn!(
679 "Failed to send message to {} (attempt {}/{}): HTTP {} - {}",
680 recipient,
681 attempt,
682 self.max_retries,
683 status,
684 body
685 );
686 last_error = Some(format!("HTTP error: {} - {}", status, body));
687
688 if status.as_u16() == 404 || status.as_u16() == 400 {
690 break; }
692 }
693 }
694 Err(err) => {
695 log::warn!(
696 "Failed to send message to {} (attempt {}/{}): {}",
697 recipient,
698 attempt,
699 self.max_retries,
700 err
701 );
702 last_error = Some(format!("Request error: {}", err));
703 }
704 }
705 }
706
707 if !success {
708 failures.push((
710 recipient.clone(),
711 last_error.unwrap_or_else(|| "Unknown error".to_string()),
712 ));
713 }
714 }
715
716 if !failures.is_empty() {
718 let failure_messages = failures
719 .iter()
720 .map(|(did, err)| format!("{}: {}", did, err))
721 .collect::<Vec<_>>()
722 .join("; ");
723
724 return Err(Error::Dispatch(format!(
725 "Failed to send message to some recipients: {}",
726 failure_messages
727 )));
728 }
729
730 Ok(())
731 }
732}
733
734#[cfg(all(not(target_arch = "wasm32"), not(feature = "reqwest")))]
735#[async_trait]
736impl PlainMessageSender for HttpPlainMessageSender {
737 async fn send(&self, packed_message: String, recipient_dids: Vec<String>) -> Result<()> {
738 for recipient in &recipient_dids {
742 let endpoint = self.get_endpoint_url(recipient);
743 log::info!(
744 "Would send message to {} via HTTP at {} (reqwest not available)",
745 recipient,
746 endpoint
747 );
748 log::debug!("PlainMessage content: {}", packed_message);
749 }
750
751 log::warn!("HTTP sender is running without reqwest feature enabled. No actual HTTP requests will be made.");
752 Ok(())
753 }
754}
755
756#[cfg(all(target_arch = "wasm32", feature = "wasm"))]
758#[async_trait(?Send)]
759impl PlainMessageSender for HttpPlainMessageSender {
760 async fn send(&self, packed_message: String, recipient_dids: Vec<String>) -> Result<()> {
761 use wasm_bindgen::prelude::*;
762 use wasm_bindgen_futures::JsFuture;
763 use web_sys::{Request, RequestInit, RequestMode, Response};
764
765 if recipient_dids.is_empty() {
766 return Err(Error::Dispatch("No recipients specified".to_string()));
767 }
768
769 let mut failures = Vec::new();
771
772 let window = web_sys::window().ok_or_else(|| {
774 Error::Dispatch("Could not get window object in WASM environment".to_string())
775 })?;
776
777 for recipient in &recipient_dids {
779 let endpoint = self.get_endpoint_url(recipient);
780 log::info!(
781 "Sending message to {} via HTTP at {} (WASM)",
782 recipient,
783 endpoint
784 );
785
786 let mut attempt = 0;
788 let mut success = false;
789 let mut last_error = None;
790
791 while attempt < self.max_retries && !success {
792 attempt += 1;
793
794 if attempt > 1 {
796 let backoff_ms = 100 * (2_u64.pow(attempt - 1));
797 let promise = js_sys::Promise::new(&mut |resolve, _| {
799 let closure = Closure::once_into_js(move || {
800 resolve.call0(&JsValue::NULL).unwrap();
801 });
802 window
803 .set_timeout_with_callback_and_timeout_and_arguments_0(
804 closure.as_ref().unchecked_ref(),
805 backoff_ms as i32,
806 )
807 .unwrap();
808 });
809
810 let _ = JsFuture::from(promise).await;
811 }
812
813 let mut opts = RequestInit::new();
815 opts.method("POST");
816 opts.mode(RequestMode::Cors);
817 opts.body(Some(&JsValue::from_str(&packed_message)));
818
819 let request = match Request::new_with_str_and_init(&endpoint, &opts) {
820 Ok(req) => req,
821 Err(err) => {
822 let err_msg = format!("Failed to create request: {:?}", err);
823 log::warn!("{}", err_msg);
824 last_error = Some(err_msg);
825 continue;
826 }
827 };
828
829 if let Err(err) = request
831 .headers()
832 .set("Content-Type", "application/didcomm-message+json")
833 {
834 let err_msg = format!("Failed to set headers: {:?}", err);
835 log::warn!("{}", err_msg);
836 last_error = Some(err_msg);
837 continue;
838 }
839
840 let resp_promise = window.fetch_with_request(&request);
842 let resp_jsvalue = match JsFuture::from(resp_promise).await {
843 Ok(val) => val,
844 Err(err) => {
845 let err_msg = format!("Fetch error: {:?}", err);
846 log::warn!(
847 "Failed to send message to {} (attempt {}/{}): {}",
848 recipient,
849 attempt,
850 self.max_retries,
851 err_msg
852 );
853 last_error = Some(err_msg);
854 continue;
855 }
856 };
857
858 let response: Response = match resp_jsvalue.dyn_into() {
860 Ok(resp) => resp,
861 Err(err) => {
862 let err_msg = format!("Failed to convert response: {:?}", err);
863 log::warn!("{}", err_msg);
864 last_error = Some(err_msg);
865 continue;
866 }
867 };
868
869 if response.ok() {
871 log::debug!("Successfully sent message to {}", recipient);
872 success = true;
873 } else {
874 let status = response.status();
875
876 let body_promise = response.text();
878 let body = match JsFuture::from(body_promise).await {
879 Ok(text_jsval) => text_jsval.as_string().unwrap_or_default(),
880 Err(_) => String::from("[Could not read response body]"),
881 };
882
883 let err_msg = format!("HTTP error: {} - {}", status, body);
884 log::warn!(
885 "Failed to send message to {} (attempt {}/{}): {}",
886 recipient,
887 attempt,
888 self.max_retries,
889 err_msg
890 );
891 last_error = Some(err_msg);
892
893 if status == 404 || status == 400 {
895 break; }
897 }
898 }
899
900 if !success {
901 failures.push((
902 recipient.clone(),
903 last_error.unwrap_or_else(|| "Unknown error".to_string()),
904 ));
905 }
906 }
907
908 if !failures.is_empty() {
910 let failure_messages = failures
911 .iter()
912 .map(|(did, err)| format!("{}: {}", did, err))
913 .collect::<Vec<_>>()
914 .join("; ");
915
916 return Err(Error::Dispatch(format!(
917 "Failed to send message to some recipients: {}",
918 failure_messages
919 )));
920 }
921
922 Ok(())
923 }
924}
925
926#[cfg(all(target_arch = "wasm32", not(feature = "wasm")))]
928#[async_trait(?Send)]
929impl PlainMessageSender for HttpPlainMessageSender {
930 async fn send(&self, packed_message: String, recipient_dids: Vec<String>) -> Result<()> {
931 for recipient in &recipient_dids {
933 let endpoint = self.get_endpoint_url(recipient);
934 log::info!(
935 "Would send message to {} via HTTP at {} (WASM without web-sys)",
936 recipient,
937 endpoint
938 );
939 log::debug!("PlainMessage content: {}", packed_message);
940 }
941
942 log::warn!("HTTP sender is running in WASM without the web-sys feature enabled. No actual HTTP requests will be made.");
943 Ok(())
944 }
945}
946
947#[derive(Debug)]
982pub struct HttpPlainMessageSenderWithTracking {
983 http_sender: HttpPlainMessageSender,
985 storage: Arc<Storage>,
987}
988
989impl HttpPlainMessageSenderWithTracking {
990 pub fn new(base_url: String, storage: Arc<Storage>) -> Self {
992 Self {
993 http_sender: HttpPlainMessageSender::new(base_url),
994 storage,
995 }
996 }
997
998 pub fn with_options(
1000 base_url: String,
1001 timeout_ms: u64,
1002 max_retries: u32,
1003 storage: Arc<Storage>,
1004 ) -> Self {
1005 Self {
1006 http_sender: HttpPlainMessageSender::with_options(base_url, timeout_ms, max_retries),
1007 storage,
1008 }
1009 }
1010}
1011
1012#[async_trait]
1013impl PlainMessageSender for HttpPlainMessageSenderWithTracking {
1014 async fn send(&self, packed_message: String, recipient_dids: Vec<String>) -> Result<()> {
1015 if recipient_dids.is_empty() {
1016 return Err(Error::Dispatch("No recipients specified".to_string()));
1017 }
1018
1019 let message_id = format!("msg_{}", uuid::Uuid::new_v4());
1022
1023 let mut delivery_ids = Vec::new();
1025 for recipient in &recipient_dids {
1026 let delivery_url = Some(self.http_sender.get_endpoint_url(recipient));
1027 match self
1028 .storage
1029 .create_delivery(
1030 &message_id,
1031 &packed_message,
1032 recipient,
1033 delivery_url.as_deref(),
1034 DeliveryType::Https,
1035 )
1036 .await
1037 {
1038 Ok(delivery_id) => {
1039 delivery_ids.push((recipient.clone(), delivery_id));
1040 log::debug!(
1041 "Created delivery record {} for message {} to {}",
1042 delivery_id,
1043 message_id,
1044 recipient
1045 );
1046 }
1047 Err(e) => {
1048 log::error!("Failed to create delivery record for {}: {}", recipient, e);
1049 delivery_ids.push((recipient.clone(), -1)); }
1052 }
1053 }
1054
1055 let delivery_result = self
1057 .http_sender
1058 .send(packed_message, recipient_dids.clone())
1059 .await;
1060
1061 for (_recipient, delivery_id) in delivery_ids {
1063 if delivery_id == -1 {
1064 continue; }
1066
1067 match &delivery_result {
1068 Ok(_) => {
1069 if let Err(e) = self
1071 .storage
1072 .update_delivery_status(
1073 delivery_id,
1074 DeliveryStatus::Success,
1075 Some(200), None,
1077 )
1078 .await
1079 {
1080 log::error!(
1081 "Failed to update delivery record {} to success: {}",
1082 delivery_id,
1083 e
1084 );
1085 } else {
1086 log::debug!("Updated delivery record {} to success", delivery_id);
1087 }
1088 }
1089 Err(e) => {
1090 let error_msg = e.to_string();
1092 let http_status_code = if error_msg.contains("HTTP error: ") {
1093 error_msg
1095 .split("HTTP error: ")
1096 .nth(1)
1097 .and_then(|s| s.split(' ').next())
1098 .and_then(|s| s.parse::<i32>().ok())
1099 } else {
1100 None
1101 };
1102
1103 if let Err(e) = self
1104 .storage
1105 .update_delivery_status(
1106 delivery_id,
1107 DeliveryStatus::Failed,
1108 http_status_code,
1109 Some(&error_msg),
1110 )
1111 .await
1112 {
1113 log::error!(
1114 "Failed to update delivery record {} to failed: {}",
1115 delivery_id,
1116 e
1117 );
1118 } else {
1119 log::debug!(
1120 "Updated delivery record {} to failed with error: {}",
1121 delivery_id,
1122 error_msg
1123 );
1124 }
1125
1126 if let Err(e) = self
1128 .storage
1129 .increment_delivery_retry_count(delivery_id)
1130 .await
1131 {
1132 log::error!(
1133 "Failed to increment retry count for delivery record {}: {}",
1134 delivery_id,
1135 e
1136 );
1137 }
1138 }
1139 }
1140 }
1141
1142 delivery_result
1143 }
1144}