Skip to main content

tracing_cloudwatch/
guard.rs

1use tokio::sync::oneshot;
2
3#[derive(Debug)]
4pub(crate) struct ShutdownSignal {
5    ack_tx: oneshot::Sender<()>,
6}
7
8impl ShutdownSignal {
9    pub(crate) fn new() -> (Self, oneshot::Receiver<()>) {
10        let (ack_tx, ack_rx) = oneshot::channel();
11        (Self { ack_tx }, ack_rx)
12    }
13
14    pub(crate) fn ack(self) {
15        let _ = self.ack_tx.send(());
16    }
17}
18
19/// Guard returned when creating a CloudWatch layer
20///
21/// When this guard is dropped a shutdown signal will be
22/// sent to the CloudWatch logging worker to flush logs and
23/// stop processing any more logs.
24///
25/// This is used to ensure buffered logs are flushed on panic
26/// or graceful shutdown. Use [`CloudWatchWorkerGuard::shutdown`]
27/// to explicitly wait for completion.
28pub struct CloudWatchWorkerGuard {
29    shutdown_tx: Option<oneshot::Sender<ShutdownSignal>>,
30}
31
32impl CloudWatchWorkerGuard {
33    pub(crate) fn new(shutdown_tx: oneshot::Sender<ShutdownSignal>) -> Self {
34        Self {
35            shutdown_tx: Some(shutdown_tx),
36        }
37    }
38
39    fn take_shutdown_tx(&mut self) -> Option<oneshot::Sender<ShutdownSignal>> {
40        self.shutdown_tx.take()
41    }
42
43    /// Trigger a graceful shutdown and wait for the worker to finish
44    /// draining and flushing queued logs.
45    pub async fn shutdown(mut self) {
46        let shutdown_tx = match self.take_shutdown_tx() {
47            Some(value) => value,
48            None => return,
49        };
50
51        let (shutdown_signal, ack_rx) = ShutdownSignal::new();
52
53        if shutdown_tx.send(shutdown_signal).is_err() {
54            return;
55        }
56
57        _ = ack_rx.await;
58    }
59}
60
61impl Drop for CloudWatchWorkerGuard {
62    fn drop(&mut self) {
63        let shutdown_tx = match self.take_shutdown_tx() {
64            Some(value) => value,
65            None => return,
66        };
67
68        let (shutdown_signal, _ack_rx) = ShutdownSignal::new();
69        let _ = shutdown_tx.send(shutdown_signal);
70    }
71}
72
73#[cfg(test)]
74mod tests {
75    use super::*;
76    use tokio::time::{Duration, sleep};
77
78    #[tokio::test(flavor = "current_thread")]
79    async fn shutdown_waits_for_ack() {
80        let (shutdown_tx, shutdown_rx) = oneshot::channel::<ShutdownSignal>();
81        let guard = CloudWatchWorkerGuard::new(shutdown_tx);
82
83        let worker = tokio::spawn(async move {
84            let signal = shutdown_rx.await.unwrap();
85            sleep(Duration::from_millis(20)).await;
86            signal.ack();
87        });
88
89        guard.shutdown().await;
90        worker.await.unwrap();
91    }
92}