obs_core/observer/
workers.rs1use std::sync::{
8 Arc,
9 atomic::{AtomicBool, AtomicU64, Ordering},
10};
11
12use bytes::BytesMut;
13use obs_proto::obs::v1::ObsEnvelope;
14use obs_types::Tier;
15use tokio::{
16 runtime::Handle,
17 sync::{Mutex as AsyncMutex, mpsc},
18 task::JoinHandle,
19};
20
21use crate::{
22 config::QueuesConfig,
23 registry::{SchemaRegistry, ScrubbedEnvelope},
24 sink::Sink,
25};
26
27#[derive(Debug, Default)]
29pub struct WorkerCounters {
30 pub channel_full_log: AtomicU64,
32 pub channel_full_metric: AtomicU64,
34 pub channel_full_trace: AtomicU64,
36 pub channel_full_audit: AtomicU64,
38 pub delivered: AtomicU64,
40}
41
42pub struct TierWorker {
44 sender: parking_lot::Mutex<Option<mpsc::Sender<ObsEnvelope>>>,
45 join: AsyncMutex<Option<JoinHandle<()>>>,
46 shutdown: Arc<AtomicBool>,
47 sink: Arc<dyn Sink>,
48}
49
50impl std::fmt::Debug for TierWorker {
51 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
52 f.debug_struct("TierWorker")
53 .field("alive", &self.sender.lock().is_some())
54 .finish()
55 }
56}
57
58impl TierWorker {
59 pub fn spawn(
63 capacity: usize,
64 sink: Arc<dyn Sink>,
65 registry: Arc<SchemaRegistry>,
66 counters: Arc<WorkerCounters>,
67 tier: Tier,
68 ) -> Self {
69 let (tx, mut rx) = mpsc::channel::<ObsEnvelope>(capacity.max(1));
70 let shutdown = Arc::new(AtomicBool::new(false));
71 let shutdown_in = Arc::clone(&shutdown);
72 let sink_in = Arc::clone(&sink);
73 let registry_in = registry;
74 let counters_in = counters;
75 let join = tokio::spawn(async move {
76 let mut scratch = BytesMut::with_capacity(4096);
77 while let Some(env) = rx.recv().await {
81 deliver_one(&env, ®istry_in, &mut scratch, &sink_in);
82 counters_in.delivered.fetch_add(1, Ordering::Relaxed);
83 if shutdown_in.load(Ordering::Relaxed) && rx.is_empty() {
84 break;
85 }
86 }
87 while let Ok(env) = rx.try_recv() {
90 deliver_one(&env, ®istry_in, &mut scratch, &sink_in);
91 counters_in.delivered.fetch_add(1, Ordering::Relaxed);
92 }
93 sink_in.flush().await;
94 let _ = tier;
95 });
96 Self {
97 sender: parking_lot::Mutex::new(Some(tx)),
98 join: AsyncMutex::new(Some(join)),
99 shutdown,
100 sink,
101 }
102 }
103
104 #[allow(clippy::result_large_err)]
110 pub fn try_send(&self, env: ObsEnvelope) -> Result<(), ObsEnvelope> {
111 let guard = self.sender.lock();
112 let Some(sender) = guard.as_ref() else {
113 return Err(env);
114 };
115 match sender.try_send(env) {
116 Ok(()) => Ok(()),
117 Err(mpsc::error::TrySendError::Full(env) | mpsc::error::TrySendError::Closed(env)) => {
118 Err(env)
119 }
120 }
121 }
122
123 #[allow(clippy::result_large_err, dead_code)]
132 pub async fn send_with_timeout(
133 &self,
134 env: ObsEnvelope,
135 timeout: std::time::Duration,
136 ) -> Result<(), ObsEnvelope> {
137 let sender = match self.sender.lock().as_ref() {
138 Some(s) => s.clone(),
139 None => return Err(env),
140 };
141 let cloned = env.clone();
142 match tokio::time::timeout(timeout, sender.send(env)).await {
143 Ok(Ok(())) => Ok(()),
144 Ok(Err(mpsc::error::SendError(env))) => Err(env),
145 Err(_) => Err(cloned),
146 }
147 }
148
149 pub async fn flush(&self) {
151 tokio::task::yield_now().await;
155 self.sink.flush().await;
156 }
157
158 pub async fn shutdown(&self) {
162 self.shutdown.store(true, Ordering::SeqCst);
163 self.sender.lock().take();
165 let mut guard = self.join.lock().await;
166 if let Some(join) = guard.take() {
167 let _ = join.await;
168 }
169 self.sink.shutdown().await;
170 }
171
172 #[allow(dead_code)]
174 pub fn sink(&self) -> &Arc<dyn Sink> {
175 &self.sink
176 }
177}
178
179fn deliver_one(
180 env: &ObsEnvelope,
181 registry: &Arc<SchemaRegistry>,
182 scratch: &mut BytesMut,
183 sink: &Arc<dyn Sink>,
184) {
185 scratch.clear();
186 let scrubbed = match ScrubbedEnvelope::scrub(env, registry, scratch) {
187 Ok(s) => s,
188 Err(_) => {
189 return;
194 }
195 };
196 sink.deliver(scrubbed);
197}
198
199pub fn spawn_tier_worker(
202 tier: Tier,
203 cfg: &QueuesConfig,
204 sink: Arc<dyn Sink>,
205 registry: Arc<SchemaRegistry>,
206 counters: Arc<WorkerCounters>,
207) -> Option<TierWorker> {
208 let cap = match tier {
209 Tier::Log => cfg.log,
210 Tier::Metric => cfg.metric,
211 Tier::Trace => cfg.trace,
212 Tier::Audit => cfg.log, _ => return None,
215 } as usize;
216 if Handle::try_current().is_err() {
217 return None;
220 }
221 Some(TierWorker::spawn(cap, sink, registry, counters, tier))
222}
223
224pub fn note_channel_full(counters: &WorkerCounters, tier: Tier) {
227 let target = match tier {
228 Tier::Log => &counters.channel_full_log,
229 Tier::Metric => &counters.channel_full_metric,
230 Tier::Trace => &counters.channel_full_trace,
231 Tier::Audit => &counters.channel_full_audit,
232 _ => return,
233 };
234 target.fetch_add(1, Ordering::Relaxed);
235}