tracing_gcloud_layer/
runtime.rs

1use super::google_logger::{GoogleLogger, LogMapper};
2use crate::{GoogleWriterConfig, google_writer::GoogleWriterHandle};
3use serde_json::Value;
4use std::sync::Arc;
5use tokio::{
6    sync::{RwLock, mpsc, oneshot},
7    task::JoinHandle,
8    time::sleep,
9};
10
11/// Messages sent from the runtime to the background worker
12enum ControlMessage {
13    Flush(oneshot::Sender<()>),
14    Shutdown(oneshot::Sender<()>),
15}
16
17/// Runtime that manages a background task for batching and sending logs to Google Cloud.
18///
19/// `GoogleWriterRuntime` owns the background task, handles shutdown, and provides a
20/// [`GoogleWriterHandle`] for synchronous log writing. It is generic over a type implementing [`LogMapper`],
21/// which is responsible for mapping structured log entries to the format expected by Google Cloud Logging.
22#[derive(Debug)]
23pub struct GoogleWriterRuntime<M: LogMapper> {
24    handle: GoogleWriterHandle,
25    control_tx: mpsc::Sender<ControlMessage>,
26    shutdown_handle: Option<JoinHandle<()>>,
27    _marker: std::marker::PhantomData<M>,
28}
29
30impl<M: LogMapper + Send + Sync + 'static> GoogleWriterRuntime<M> {
31    /// Creates a new `GoogleWriterRuntime` and spawns the background batching task.
32    ///
33    /// Logs are received via an unbounded channel, buffered, and flushed when:
34    /// - the buffer reaches `config.max_batch` entries, or
35    /// - `config.max_delay` elapses since the last flush.
36    ///
37    /// The background task also flushes any remaining logs on shutdown.
38    pub fn new(google_logger: GoogleLogger<M>, config: GoogleWriterConfig) -> Self {
39        let (tx, mut rx) = mpsc::unbounded_channel::<Value>();
40        let (control_tx, mut control_rx) = mpsc::channel::<ControlMessage>(8);
41
42        let logger = Arc::new(RwLock::new(google_logger));
43        let logger_clone = logger.clone();
44
45        let handle_task = tokio::spawn(async move {
46            let mut buffer = Vec::with_capacity(config.max_batch);
47
48            loop {
49                tokio::select! {
50                    maybe_entry = rx.recv() => {
51                        match maybe_entry {
52                            Some(entry) => {
53                                buffer.push(entry);
54                                if buffer.len() >= config.max_batch {
55                                    Self::flush_batch(&logger_clone, std::mem::take(&mut buffer)).await;
56                                }
57                            }
58                            None => {
59                                // Sender dropped, flush remaining logs
60                                if !buffer.is_empty() {
61                                    Self::flush_batch(&logger_clone, buffer).await;
62                                }
63                                break;
64                            }
65                        }
66                    }
67
68                    Some(ctrl) = control_rx.recv() => {
69                        match ctrl {
70                            ControlMessage::Flush(reply) => {
71                                if !buffer.is_empty() {
72                                    Self::flush_batch(&logger_clone, std::mem::take(&mut buffer)).await;
73                                }
74                                let _ = reply.send(());
75                            }
76                            ControlMessage::Shutdown(reply) => {
77                                if !buffer.is_empty() {
78                                    Self::flush_batch(&logger_clone, buffer).await;
79                                }
80                                let _ = reply.send(());
81                                break;
82                            }
83                        }
84                    }
85
86                    _ = sleep(config.max_delay), if !buffer.is_empty() => {
87                        Self::flush_batch(&logger_clone, std::mem::take(&mut buffer)).await;
88                    }
89                }
90            }
91
92            tracing::debug!("Background task shut down cleanly.");
93        });
94
95        Self {
96            handle: GoogleWriterHandle { sender: tx },
97            control_tx,
98            shutdown_handle: Some(handle_task),
99            _marker: std::marker::PhantomData,
100        }
101    }
102
103    /// Returns a handle that implements [`std::io::Write`] for sending log entries.
104    ///
105    /// This handle can be cloned and shared across threads for synchronous logging.
106    pub fn writer(&self) -> GoogleWriterHandle {
107        self.handle.clone()
108    }
109
110    /// Flushes all buffered logs **and waits** until the background task completes the flush.
111    pub async fn flush_and_wait(&self) {
112        let (tx, rx) = oneshot::channel();
113        let _ = self.control_tx.send(ControlMessage::Flush(tx)).await;
114        let _ = rx.await;
115    }
116
117    /// Shuts down the background task, flushing any remaining logs.
118    ///
119    /// This method waits for the task to complete and logs any panics encountered during shutdown.
120    pub async fn shutdown(mut self) {
121        if let Some(handle) = self.shutdown_handle.take() {
122            let (tx, rx) = oneshot::channel();
123            let _ = self.control_tx.send(ControlMessage::Shutdown(tx)).await;
124            let _ = rx.await;
125            if let Err(err) = handle.await {
126                tracing::error!("Shutdown task panicked: {:?}", err);
127            }
128        }
129    }
130
131    /// Flushes a batch of logs to Google Cloud Logging.
132    async fn flush_batch(logger: &Arc<RwLock<GoogleLogger<M>>>, batch: Vec<Value>) {
133        let mut guard = logger.write().await;
134        if let Err(err) = guard.write_logs(batch).await {
135            tracing::error!("Failed to write log batch: {err}");
136        }
137    }
138}