Skip to main content

eventbus_core/stream/
subscription.rs

1use std::sync::{
2    atomic::{AtomicBool, Ordering},
3    Arc, Mutex,
4};
5
6use tokio::sync::watch;
7use tokio::task::JoinHandle;
8
9use super::observer::{ErrorObserver, ErrorScope};
10use crate::{EventBusError, Subscription};
11
12#[must_use = "subscription is idle until bound; call `.close().await` for graceful shutdown"]
13pub struct StreamSubscription {
14    name: String,
15    closed: AtomicBool,
16    close_tx: watch::Sender<bool>,
17    task: Mutex<Option<JoinHandle<Result<(), EventBusError>>>>,
18    observer: Option<Arc<dyn ErrorObserver>>,
19}
20
21impl StreamSubscription {
22    pub(crate) fn new(
23        name: String,
24        close_tx: watch::Sender<bool>,
25        task: JoinHandle<Result<(), EventBusError>>,
26        observer: Option<Arc<dyn ErrorObserver>>,
27    ) -> Self {
28        Self {
29            name,
30            closed: AtomicBool::new(false),
31            close_tx,
32            task: Mutex::new(Some(task)),
33            observer,
34        }
35    }
36
37    pub fn name(&self) -> &str {
38        &self.name
39    }
40
41    /// Returns `true` until [`StreamSubscription::close`] has been invoked
42    /// (or the subscription was dropped). Useful for control planes that
43    /// need to skip already-shutdown subscriptions without racing on close.
44    pub fn is_running(&self) -> bool {
45        !self.closed.load(Ordering::Acquire)
46    }
47
48    fn begin_shutdown(
49        &self,
50    ) -> Result<Option<JoinHandle<Result<(), EventBusError>>>, EventBusError> {
51        if self.closed.swap(true, Ordering::AcqRel) {
52            return Ok(None);
53        }
54
55        let _ = self.close_tx.send(true);
56        let mut guard = self
57            .task
58            .lock()
59            .map_err(|_| EventBusError::Internal("subscription task mutex poisoned".into()))?;
60        Ok(guard.take())
61    }
62
63    pub async fn close(&self) -> Result<(), EventBusError> {
64        let Some(task) = self.begin_shutdown()? else {
65            return Ok(());
66        };
67
68        task.await
69            .map_err(|err| EventBusError::source("subscription task failed", err))?
70    }
71
72    /// Abort the background task without waiting for graceful drain. Returns
73    /// `Ok(())` if the abort was acknowledged or the task was already done;
74    /// surfaces the task's last error if it had one.
75    pub async fn abort(&self) -> Result<(), EventBusError> {
76        let Some(task) = self.begin_shutdown()? else {
77            return Ok(());
78        };
79        task.abort();
80        match task.await {
81            Ok(r) => r,
82            Err(err) if err.is_cancelled() => Ok(()),
83            Err(err) => Err(EventBusError::source("subscription task aborted", err)),
84        }
85    }
86}
87
88impl Subscription for StreamSubscription {
89    fn name(&self) -> &str {
90        StreamSubscription::name(self)
91    }
92
93    fn close(self: std::sync::Arc<Self>) -> crate::BoxFuture<'static, Result<(), EventBusError>> {
94        Box::pin(async move {
95            // Deref the Arc to call the inherent &self method, which already
96            // handles the close handshake (begin_shutdown -> JoinHandle::await).
97            // The Arc keeps the subscription alive until close completes.
98            (*self).close().await
99        })
100    }
101}
102
103/// Dropping a [`StreamSubscription`] is fire-and-forget: it signals the
104/// background task to exit but does not await it, and **delivery errors
105/// raised after the close signal are silently discarded**. To surface those
106/// errors, call [`StreamSubscription::close`] explicitly and await the
107/// returned `Result`.
108///
109/// When the subscription is dropped without `close()` having been called,
110/// the configured [`ErrorObserver`] (if any) is notified via
111/// [`ErrorScope::Drop`] so leaked subscriptions are observable.
112impl Drop for StreamSubscription {
113    fn drop(&mut self) {
114        if self.closed.swap(true, Ordering::AcqRel) {
115            return;
116        }
117
118        let _ = self.close_tx.send(true);
119        if let Some(obs) = self.observer.as_ref() {
120            obs.on_error(
121                ErrorScope::Drop,
122                &EventBusError::Internal(format!(
123                    "subscription `{}` dropped without close()",
124                    self.name
125                )),
126            );
127        }
128        if let Ok(mut guard) = self.task.lock() {
129            let _ = guard.take();
130        }
131    }
132}