1use std::sync::{
8 Arc,
9 atomic::{AtomicBool, AtomicU64, Ordering},
10};
11
12use bytes::BytesMut;
13use obs_proto::obs::v1::{ObsEnvelope, Tier};
14use tokio::{
15 runtime::Handle,
16 sync::{Mutex as AsyncMutex, mpsc, oneshot},
17 task::JoinHandle,
18};
19
20use crate::{
21 config::QueuesConfig,
22 registry::{SchemaRegistry, ScrubbedEnvelope},
23 sink::Sink,
24};
25
26#[derive(Debug, Default)]
28pub struct WorkerCounters {
29 pub channel_full_log: AtomicU64,
31 pub channel_full_metric: AtomicU64,
33 pub channel_full_trace: AtomicU64,
35 pub channel_full_audit: AtomicU64,
37 pub delivered: AtomicU64,
39}
40
41pub struct TierWorker {
43 sender: parking_lot::Mutex<Option<mpsc::Sender<WorkerMsg>>>,
44 join: AsyncMutex<Option<JoinHandle<()>>>,
45 shutdown: Arc<AtomicBool>,
46 sink: Arc<dyn Sink>,
47}
48
49impl std::fmt::Debug for TierWorker {
50 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
51 f.debug_struct("TierWorker")
52 .field("alive", &self.sender.lock().is_some())
53 .finish()
54 }
55}
56
57impl TierWorker {
58 pub fn spawn(
62 capacity: usize,
63 sink: Arc<dyn Sink>,
64 registry: Arc<SchemaRegistry>,
65 counters: Arc<WorkerCounters>,
66 tier: Tier,
67 ) -> Self {
68 let (tx, mut rx) = mpsc::channel::<WorkerMsg>(capacity.max(1));
69 let shutdown = Arc::new(AtomicBool::new(false));
70 let shutdown_in = Arc::clone(&shutdown);
71 let sink_in = Arc::clone(&sink);
72 let registry_in = registry;
73 let counters_in = counters;
74 let join = tokio::spawn(async move {
75 let mut scratch = BytesMut::with_capacity(4096);
76 while let Some(msg) = rx.recv().await {
80 if let Some(env) = msg.envelope.as_ref() {
81 deliver_one(env, ®istry_in, &mut scratch, &sink_in);
82 counters_in.delivered.fetch_add(1, Ordering::Relaxed);
83 if shutdown_in.load(Ordering::Relaxed) && rx.is_empty() {
84 break;
85 }
86 } else if let Some(ack) = msg.flush {
87 sink_in.flush().await;
88 let _ = ack.send(());
89 }
90 }
91 while let Ok(msg) = rx.try_recv() {
94 if let Some(env) = msg.envelope.as_ref() {
95 deliver_one(env, ®istry_in, &mut scratch, &sink_in);
96 counters_in.delivered.fetch_add(1, Ordering::Relaxed);
97 } else if let Some(ack) = msg.flush {
98 sink_in.flush().await;
99 let _ = ack.send(());
100 }
101 }
102 sink_in.flush().await;
103 let _ = tier;
104 });
105 Self {
106 sender: parking_lot::Mutex::new(Some(tx)),
107 join: AsyncMutex::new(Some(join)),
108 shutdown,
109 sink,
110 }
111 }
112
113 #[allow(clippy::result_large_err)]
119 pub fn try_send(&self, env: ObsEnvelope) -> Result<(), ObsEnvelope> {
120 let guard = self.sender.lock();
121 let Some(sender) = guard.as_ref() else {
122 return Err(env);
123 };
124 match sender.try_send(WorkerMsg::envelope(env)) {
125 Ok(()) => Ok(()),
126 Err(mpsc::error::TrySendError::Full(msg) | mpsc::error::TrySendError::Closed(msg)) => {
127 if let Some(env) = msg.into_envelope() {
128 Err(env)
129 } else {
130 Ok(())
131 }
132 }
133 }
134 }
135
136 #[allow(clippy::result_large_err, dead_code)]
145 pub async fn send_with_timeout(
146 &self,
147 env: ObsEnvelope,
148 timeout: std::time::Duration,
149 ) -> Result<(), ObsEnvelope> {
150 let sender = match self.sender.lock().as_ref() {
151 Some(s) => s.clone(),
152 None => return Err(env),
153 };
154 let cloned = env.clone();
155 match tokio::time::timeout(timeout, sender.send(WorkerMsg::envelope(env))).await {
156 Ok(Ok(())) => Ok(()),
157 Ok(Err(mpsc::error::SendError(msg))) => {
158 if let Some(env) = msg.into_envelope() {
159 Err(env)
160 } else {
161 Ok(())
162 }
163 }
164 Err(_) => Err(cloned),
165 }
166 }
167
168 pub async fn flush(&self) {
170 let sender = {
171 let guard = self.sender.lock();
172 guard.as_ref().cloned()
173 };
174 let Some(sender) = sender else {
175 self.sink.flush().await;
176 return;
177 };
178 let (tx, rx) = oneshot::channel();
179 if sender.send(WorkerMsg::flush(tx)).await.is_ok() {
180 let _ = rx.await;
181 } else {
182 self.sink.flush().await;
183 }
184 }
185
186 pub async fn shutdown(&self) {
190 self.shutdown.store(true, Ordering::SeqCst);
191 self.sender.lock().take();
193 let mut guard = self.join.lock().await;
194 if let Some(join) = guard.take() {
195 let _ = join.await;
196 }
197 self.sink.shutdown().await;
198 }
199
200 #[allow(dead_code)]
202 pub fn sink(&self) -> &Arc<dyn Sink> {
203 &self.sink
204 }
205}
206
207struct WorkerMsg {
208 envelope: Option<ObsEnvelope>,
209 flush: Option<oneshot::Sender<()>>,
210}
211
212impl WorkerMsg {
213 fn envelope(env: ObsEnvelope) -> Self {
214 Self {
215 envelope: Some(env),
216 flush: None,
217 }
218 }
219
220 fn flush(ack: oneshot::Sender<()>) -> Self {
221 Self {
222 envelope: None,
223 flush: Some(ack),
224 }
225 }
226
227 fn into_envelope(self) -> Option<ObsEnvelope> {
228 self.envelope
229 }
230}
231
232fn deliver_one(
233 env: &ObsEnvelope,
234 registry: &Arc<SchemaRegistry>,
235 scratch: &mut BytesMut,
236 sink: &Arc<dyn Sink>,
237) {
238 scratch.clear();
239 let scrubbed = match ScrubbedEnvelope::scrub(env, registry, scratch) {
240 Ok(s) => s,
241 Err(_) => {
242 return;
247 }
248 };
249 sink.deliver(scrubbed);
250}
251
252pub fn spawn_tier_worker(
255 tier: Tier,
256 cfg: &QueuesConfig,
257 sink: Arc<dyn Sink>,
258 registry: Arc<SchemaRegistry>,
259 counters: Arc<WorkerCounters>,
260) -> Option<TierWorker> {
261 let cap = match tier {
262 Tier::Log => cfg.log,
263 Tier::Metric => cfg.metric,
264 Tier::Trace => cfg.trace,
265 Tier::Audit => cfg.log, _ => return None,
268 } as usize;
269 if Handle::try_current().is_err() {
270 return None;
273 }
274 Some(TierWorker::spawn(cap, sink, registry, counters, tier))
275}
276
277pub fn note_channel_full(counters: &WorkerCounters, tier: Tier) {
280 let target = match tier {
281 Tier::Log => &counters.channel_full_log,
282 Tier::Metric => &counters.channel_full_metric,
283 Tier::Trace => &counters.channel_full_trace,
284 Tier::Audit => &counters.channel_full_audit,
285 _ => return,
286 };
287 target.fetch_add(1, Ordering::Relaxed);
288}