tracing_gcloud_layer/
runtime.rs1use super::google_logger::{GoogleLogger, LogMapper};
2use crate::{GoogleWriterConfig, google_writer::GoogleWriterHandle};
3use serde_json::Value;
4use std::sync::Arc;
5use tokio::{
6 sync::{RwLock, mpsc, oneshot},
7 task::JoinHandle,
8 time::sleep,
9};
10
11enum ControlMessage {
13 Flush(oneshot::Sender<()>),
14 Shutdown(oneshot::Sender<()>),
15}
16
17#[derive(Debug)]
23pub struct GoogleWriterRuntime<M: LogMapper> {
24 handle: GoogleWriterHandle,
25 control_tx: mpsc::Sender<ControlMessage>,
26 shutdown_handle: Option<JoinHandle<()>>,
27 _marker: std::marker::PhantomData<M>,
28}
29
30impl<M: LogMapper + Send + Sync + 'static> GoogleWriterRuntime<M> {
31 pub fn new(google_logger: GoogleLogger<M>, config: GoogleWriterConfig) -> Self {
39 let (tx, mut rx) = mpsc::unbounded_channel::<Value>();
40 let (control_tx, mut control_rx) = mpsc::channel::<ControlMessage>(8);
41
42 let logger = Arc::new(RwLock::new(google_logger));
43 let logger_clone = logger.clone();
44
45 let handle_task = tokio::spawn(async move {
46 let mut buffer = Vec::with_capacity(config.max_batch);
47
48 loop {
49 tokio::select! {
50 maybe_entry = rx.recv() => {
51 match maybe_entry {
52 Some(entry) => {
53 buffer.push(entry);
54 if buffer.len() >= config.max_batch {
55 Self::flush_batch(&logger_clone, std::mem::take(&mut buffer)).await;
56 }
57 }
58 None => {
59 if !buffer.is_empty() {
61 Self::flush_batch(&logger_clone, buffer).await;
62 }
63 break;
64 }
65 }
66 }
67
68 Some(ctrl) = control_rx.recv() => {
69 match ctrl {
70 ControlMessage::Flush(reply) => {
71 if !buffer.is_empty() {
72 Self::flush_batch(&logger_clone, std::mem::take(&mut buffer)).await;
73 }
74 let _ = reply.send(());
75 }
76 ControlMessage::Shutdown(reply) => {
77 if !buffer.is_empty() {
78 Self::flush_batch(&logger_clone, buffer).await;
79 }
80 let _ = reply.send(());
81 break;
82 }
83 }
84 }
85
86 _ = sleep(config.max_delay), if !buffer.is_empty() => {
87 Self::flush_batch(&logger_clone, std::mem::take(&mut buffer)).await;
88 }
89 }
90 }
91
92 tracing::debug!("Background task shut down cleanly.");
93 });
94
95 Self {
96 handle: GoogleWriterHandle { sender: tx },
97 control_tx,
98 shutdown_handle: Some(handle_task),
99 _marker: std::marker::PhantomData,
100 }
101 }
102
103 pub fn writer(&self) -> GoogleWriterHandle {
107 self.handle.clone()
108 }
109
110 pub async fn flush_and_wait(&self) {
112 let (tx, rx) = oneshot::channel();
113 let _ = self.control_tx.send(ControlMessage::Flush(tx)).await;
114 let _ = rx.await;
115 }
116
117 pub async fn shutdown(mut self) {
121 if let Some(handle) = self.shutdown_handle.take() {
122 let (tx, rx) = oneshot::channel();
123 let _ = self.control_tx.send(ControlMessage::Shutdown(tx)).await;
124 let _ = rx.await;
125 if let Err(err) = handle.await {
126 tracing::error!("Shutdown task panicked: {:?}", err);
127 }
128 }
129 }
130
131 async fn flush_batch(logger: &Arc<RwLock<GoogleLogger<M>>>, batch: Vec<Value>) {
133 let mut guard = logger.write().await;
134 if let Err(err) = guard.write_logs(batch).await {
135 tracing::error!("Failed to write log batch: {err}");
136 }
137 }
138}