eventbus_core/stream/
subscription.rs1use std::sync::{
2 atomic::{AtomicBool, Ordering},
3 Arc, Mutex,
4};
5
6use tokio::sync::watch;
7use tokio::task::JoinHandle;
8
9use super::observer::{ErrorObserver, ErrorScope};
10use crate::{EventBusError, Subscription};
11
12#[must_use = "subscription is idle until bound; call `.close().await` for graceful shutdown"]
13pub struct StreamSubscription {
14 name: String,
15 closed: AtomicBool,
16 close_tx: watch::Sender<bool>,
17 task: Mutex<Option<JoinHandle<Result<(), EventBusError>>>>,
18 observer: Option<Arc<dyn ErrorObserver>>,
19}
20
21impl StreamSubscription {
22 pub(crate) fn new(
23 name: String,
24 close_tx: watch::Sender<bool>,
25 task: JoinHandle<Result<(), EventBusError>>,
26 observer: Option<Arc<dyn ErrorObserver>>,
27 ) -> Self {
28 Self {
29 name,
30 closed: AtomicBool::new(false),
31 close_tx,
32 task: Mutex::new(Some(task)),
33 observer,
34 }
35 }
36
37 pub fn name(&self) -> &str {
38 &self.name
39 }
40
41 pub fn is_running(&self) -> bool {
45 !self.closed.load(Ordering::Acquire)
46 }
47
48 fn begin_shutdown(
49 &self,
50 ) -> Result<Option<JoinHandle<Result<(), EventBusError>>>, EventBusError> {
51 if self.closed.swap(true, Ordering::AcqRel) {
52 return Ok(None);
53 }
54
55 let _ = self.close_tx.send(true);
56 let mut guard = self
57 .task
58 .lock()
59 .map_err(|_| EventBusError::Internal("subscription task mutex poisoned".into()))?;
60 Ok(guard.take())
61 }
62
63 pub async fn close(&self) -> Result<(), EventBusError> {
64 let Some(task) = self.begin_shutdown()? else {
65 return Ok(());
66 };
67
68 task.await
69 .map_err(|err| EventBusError::source("subscription task failed", err))?
70 }
71
72 pub async fn abort(&self) -> Result<(), EventBusError> {
76 let Some(task) = self.begin_shutdown()? else {
77 return Ok(());
78 };
79 task.abort();
80 match task.await {
81 Ok(r) => r,
82 Err(err) if err.is_cancelled() => Ok(()),
83 Err(err) => Err(EventBusError::source("subscription task aborted", err)),
84 }
85 }
86}
87
88impl Subscription for StreamSubscription {
89 fn name(&self) -> &str {
90 StreamSubscription::name(self)
91 }
92
93 fn close(self: std::sync::Arc<Self>) -> crate::BoxFuture<'static, Result<(), EventBusError>> {
94 Box::pin(async move {
95 (*self).close().await
99 })
100 }
101}
102
103impl Drop for StreamSubscription {
113 fn drop(&mut self) {
114 if self.closed.swap(true, Ordering::AcqRel) {
115 return;
116 }
117
118 let _ = self.close_tx.send(true);
119 if let Some(obs) = self.observer.as_ref() {
120 obs.on_error(
121 ErrorScope::Drop,
122 &EventBusError::Internal(format!(
123 "subscription `{}` dropped without close()",
124 self.name
125 )),
126 );
127 }
128 if let Ok(mut guard) = self.task.lock() {
129 let _ = guard.take();
130 }
131 }
132}