rustauth_core/
outbound.rs1use std::future::Future;
2use std::pin::Pin;
3
4use crate::context::AuthContext;
5use crate::error::RustAuthError;
6use crate::options::BackgroundTaskFuture;
7
8pub type OutboundSendFuture =
10 Pin<Box<dyn Future<Output = Result<(), RustAuthError>> + Send + 'static>>;
11
12pub fn ready_outbound(result: Result<(), RustAuthError>) -> OutboundSendFuture {
14 Box::pin(async move { result })
15}
16
17pub fn dispatch_outbound(context: &AuthContext, send: OutboundSendFuture) {
19 let logger = context.logger.clone();
20 let task: BackgroundTaskFuture = Box::pin(async move {
21 if let Err(error) = send.await {
22 logger.error("outbound delivery failed", &[&error.to_string()]);
23 }
24 });
25 if context.background_tasks.is_some() {
26 context.run_background_task(task);
27 } else {
28 tokio::spawn(task);
29 }
30}
31
32#[cfg(test)]
33mod tests {
34 use std::sync::atomic::{AtomicUsize, Ordering};
35 use std::sync::Arc;
36 use std::time::Duration;
37
38 use crate::context::create_auth_context;
39 use crate::error::RustAuthError;
40 use crate::options::{
41 AdvancedOptions, BackgroundTaskFuture, BackgroundTaskRunner, RustAuthOptions,
42 };
43
44 use super::dispatch_outbound;
45
46 #[derive(Default)]
47 struct CountingBackgroundRunner {
48 calls: AtomicUsize,
49 }
50
51 impl CountingBackgroundRunner {
52 fn calls(&self) -> usize {
53 self.calls.load(Ordering::SeqCst)
54 }
55 }
56
57 impl BackgroundTaskRunner for CountingBackgroundRunner {
58 fn spawn(&self, task: BackgroundTaskFuture) {
59 self.calls.fetch_add(1, Ordering::SeqCst);
60 tokio::spawn(task);
61 }
62 }
63
64 #[tokio::test]
65 async fn dispatch_outbound_spawns_without_awaiting_sender() -> Result<(), RustAuthError> {
66 let counting = Arc::new(CountingBackgroundRunner::default());
67 let runner: Arc<dyn BackgroundTaskRunner> =
68 Arc::clone(&counting) as Arc<dyn BackgroundTaskRunner>;
69 let context = create_auth_context(
70 RustAuthOptions::default()
71 .advanced(AdvancedOptions::default().background_tasks(runner)),
72 )?;
73
74 let started = Arc::new(AtomicUsize::new(0));
75 let finished = Arc::new(AtomicUsize::new(0));
76 let started_for_send = Arc::clone(&started);
77 let finished_for_send = Arc::clone(&finished);
78
79 dispatch_outbound(
80 &context,
81 Box::pin(async move {
82 started_for_send.fetch_add(1, Ordering::SeqCst);
83 tokio::time::sleep(Duration::from_millis(50)).await;
84 finished_for_send.fetch_add(1, Ordering::SeqCst);
85 Ok(())
86 }),
87 );
88
89 assert_eq!(counting.calls(), 1);
90 assert_eq!(started.load(Ordering::SeqCst), 0);
91
92 tokio::time::sleep(Duration::from_millis(75)).await;
93 assert_eq!(started.load(Ordering::SeqCst), 1);
94 assert_eq!(finished.load(Ordering::SeqCst), 1);
95 Ok(())
96 }
97}