1use std::sync::{
8 Arc,
9 atomic::{AtomicBool, AtomicU64, Ordering},
10};
11
12use bytes::BytesMut;
13use obs_proto::obs::v1::{ObsEnvelope, Tier};
14use tokio::{
15 runtime::Handle,
16 sync::{Mutex as AsyncMutex, mpsc},
17 task::JoinHandle,
18};
19
20use crate::{
21 config::QueuesConfig,
22 registry::{SchemaRegistry, ScrubbedEnvelope},
23 sink::Sink,
24};
25
26#[derive(Debug, Default)]
28pub struct WorkerCounters {
29 pub channel_full_log: AtomicU64,
31 pub channel_full_metric: AtomicU64,
33 pub channel_full_trace: AtomicU64,
35 pub channel_full_audit: AtomicU64,
37 pub delivered: AtomicU64,
39}
40
41pub struct TierWorker {
43 sender: parking_lot::Mutex<Option<mpsc::Sender<ObsEnvelope>>>,
44 join: AsyncMutex<Option<JoinHandle<()>>>,
45 shutdown: Arc<AtomicBool>,
46 sink: Arc<dyn Sink>,
47}
48
49impl std::fmt::Debug for TierWorker {
50 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
51 f.debug_struct("TierWorker")
52 .field("alive", &self.sender.lock().is_some())
53 .finish()
54 }
55}
56
57impl TierWorker {
58 pub fn spawn(
62 capacity: usize,
63 sink: Arc<dyn Sink>,
64 registry: Arc<SchemaRegistry>,
65 counters: Arc<WorkerCounters>,
66 tier: Tier,
67 ) -> Self {
68 let (tx, mut rx) = mpsc::channel::<ObsEnvelope>(capacity.max(1));
69 let shutdown = Arc::new(AtomicBool::new(false));
70 let shutdown_in = Arc::clone(&shutdown);
71 let sink_in = Arc::clone(&sink);
72 let registry_in = registry;
73 let counters_in = counters;
74 let join = tokio::spawn(async move {
75 let mut scratch = BytesMut::with_capacity(4096);
76 while let Some(env) = rx.recv().await {
80 deliver_one(&env, ®istry_in, &mut scratch, &sink_in);
81 counters_in.delivered.fetch_add(1, Ordering::Relaxed);
82 if shutdown_in.load(Ordering::Relaxed) && rx.is_empty() {
83 break;
84 }
85 }
86 while let Ok(env) = rx.try_recv() {
89 deliver_one(&env, ®istry_in, &mut scratch, &sink_in);
90 counters_in.delivered.fetch_add(1, Ordering::Relaxed);
91 }
92 sink_in.flush().await;
93 let _ = tier;
94 });
95 Self {
96 sender: parking_lot::Mutex::new(Some(tx)),
97 join: AsyncMutex::new(Some(join)),
98 shutdown,
99 sink,
100 }
101 }
102
103 #[allow(clippy::result_large_err)]
109 pub fn try_send(&self, env: ObsEnvelope) -> Result<(), ObsEnvelope> {
110 let guard = self.sender.lock();
111 let Some(sender) = guard.as_ref() else {
112 return Err(env);
113 };
114 match sender.try_send(env) {
115 Ok(()) => Ok(()),
116 Err(mpsc::error::TrySendError::Full(env) | mpsc::error::TrySendError::Closed(env)) => {
117 Err(env)
118 }
119 }
120 }
121
122 #[allow(clippy::result_large_err, dead_code)]
131 pub async fn send_with_timeout(
132 &self,
133 env: ObsEnvelope,
134 timeout: std::time::Duration,
135 ) -> Result<(), ObsEnvelope> {
136 let sender = match self.sender.lock().as_ref() {
137 Some(s) => s.clone(),
138 None => return Err(env),
139 };
140 let cloned = env.clone();
141 match tokio::time::timeout(timeout, sender.send(env)).await {
142 Ok(Ok(())) => Ok(()),
143 Ok(Err(mpsc::error::SendError(env))) => Err(env),
144 Err(_) => Err(cloned),
145 }
146 }
147
148 pub async fn flush(&self) {
150 tokio::task::yield_now().await;
154 self.sink.flush().await;
155 }
156
157 pub async fn shutdown(&self) {
161 self.shutdown.store(true, Ordering::SeqCst);
162 self.sender.lock().take();
164 let mut guard = self.join.lock().await;
165 if let Some(join) = guard.take() {
166 let _ = join.await;
167 }
168 self.sink.shutdown().await;
169 }
170
171 #[allow(dead_code)]
173 pub fn sink(&self) -> &Arc<dyn Sink> {
174 &self.sink
175 }
176}
177
178fn deliver_one(
179 env: &ObsEnvelope,
180 registry: &Arc<SchemaRegistry>,
181 scratch: &mut BytesMut,
182 sink: &Arc<dyn Sink>,
183) {
184 scratch.clear();
185 let scrubbed = match ScrubbedEnvelope::scrub(env, registry, scratch) {
186 Ok(s) => s,
187 Err(_) => {
188 return;
193 }
194 };
195 sink.deliver(scrubbed);
196}
197
198pub fn spawn_tier_worker(
201 tier: Tier,
202 cfg: &QueuesConfig,
203 sink: Arc<dyn Sink>,
204 registry: Arc<SchemaRegistry>,
205 counters: Arc<WorkerCounters>,
206) -> Option<TierWorker> {
207 let cap = match tier {
208 Tier::Log => cfg.log,
209 Tier::Metric => cfg.metric,
210 Tier::Trace => cfg.trace,
211 Tier::Audit => cfg.log, _ => return None,
214 } as usize;
215 if Handle::try_current().is_err() {
216 return None;
219 }
220 Some(TierWorker::spawn(cap, sink, registry, counters, tier))
221}
222
223pub fn note_channel_full(counters: &WorkerCounters, tier: Tier) {
226 let target = match tier {
227 Tier::Log => &counters.channel_full_log,
228 Tier::Metric => &counters.channel_full_metric,
229 Tier::Trace => &counters.channel_full_trace,
230 Tier::Audit => &counters.channel_full_audit,
231 _ => return,
232 };
233 target.fetch_add(1, Ordering::Relaxed);
234}