tracing_cloudwatch/
guard.rs1use tokio::sync::oneshot;
2
3#[derive(Debug)]
4pub(crate) struct ShutdownSignal {
5 ack_tx: oneshot::Sender<()>,
6}
7
8impl ShutdownSignal {
9 pub(crate) fn new() -> (Self, oneshot::Receiver<()>) {
10 let (ack_tx, ack_rx) = oneshot::channel();
11 (Self { ack_tx }, ack_rx)
12 }
13
14 pub(crate) fn ack(self) {
15 let _ = self.ack_tx.send(());
16 }
17}
18
19pub struct CloudWatchWorkerGuard {
29 shutdown_tx: Option<oneshot::Sender<ShutdownSignal>>,
30}
31
32impl CloudWatchWorkerGuard {
33 pub(crate) fn new(shutdown_tx: oneshot::Sender<ShutdownSignal>) -> Self {
34 Self {
35 shutdown_tx: Some(shutdown_tx),
36 }
37 }
38
39 fn take_shutdown_tx(&mut self) -> Option<oneshot::Sender<ShutdownSignal>> {
40 self.shutdown_tx.take()
41 }
42
43 pub async fn shutdown(mut self) {
46 let shutdown_tx = match self.take_shutdown_tx() {
47 Some(value) => value,
48 None => return,
49 };
50
51 let (shutdown_signal, ack_rx) = ShutdownSignal::new();
52
53 if shutdown_tx.send(shutdown_signal).is_err() {
54 return;
55 }
56
57 _ = ack_rx.await;
58 }
59}
60
61impl Drop for CloudWatchWorkerGuard {
62 fn drop(&mut self) {
63 let shutdown_tx = match self.take_shutdown_tx() {
64 Some(value) => value,
65 None => return,
66 };
67
68 let (shutdown_signal, _ack_rx) = ShutdownSignal::new();
69 let _ = shutdown_tx.send(shutdown_signal);
70 }
71}
72
73#[cfg(test)]
74mod tests {
75 use super::*;
76 use tokio::time::{Duration, sleep};
77
78 #[tokio::test(flavor = "current_thread")]
79 async fn shutdown_waits_for_ack() {
80 let (shutdown_tx, shutdown_rx) = oneshot::channel::<ShutdownSignal>();
81 let guard = CloudWatchWorkerGuard::new(shutdown_tx);
82
83 let worker = tokio::spawn(async move {
84 let signal = shutdown_rx.await.unwrap();
85 sleep(Duration::from_millis(20)).await;
86 signal.ack();
87 });
88
89 guard.shutdown().await;
90 worker.await.unwrap();
91 }
92}