Skip to main content

obs_core/observer/
workers.rs

1//! Per-tier worker pool — bounded `tokio::sync::mpsc` channels with
2//! one drain task per tier. Spec 11 § 4.
3//!
4//! AUDIT is special: bounded blocking + spool fallback (spec 11 § 6.4)
5//! — see [`crate::audit_spool`].
6
7use std::sync::{
8    Arc,
9    atomic::{AtomicBool, AtomicU64, Ordering},
10};
11
12use bytes::BytesMut;
13use obs_proto::obs::v1::{ObsEnvelope, Tier};
14use tokio::{
15    runtime::Handle,
16    sync::{Mutex as AsyncMutex, mpsc, oneshot},
17    task::JoinHandle,
18};
19
20use crate::{
21    config::QueuesConfig,
22    registry::{SchemaRegistry, ScrubbedEnvelope},
23    sink::Sink,
24};
25
26/// Per-tier counters surfaced as `ObsSinkDropped` self-events.
27#[derive(Debug, Default)]
28pub struct WorkerCounters {
29    /// Bytes dropped at emit-time mpsc send.
30    pub channel_full_log: AtomicU64,
31    /// Bytes dropped at emit-time mpsc send (METRIC).
32    pub channel_full_metric: AtomicU64,
33    /// Bytes dropped at emit-time mpsc send (TRACE).
34    pub channel_full_trace: AtomicU64,
35    /// Bytes dropped at emit-time mpsc send (AUDIT).
36    pub channel_full_audit: AtomicU64,
37    /// Total events delivered.
38    pub delivered: AtomicU64,
39}
40
41/// Single-tier worker handle.
42pub struct TierWorker {
43    sender: parking_lot::Mutex<Option<mpsc::Sender<WorkerMsg>>>,
44    join: AsyncMutex<Option<JoinHandle<()>>>,
45    shutdown: Arc<AtomicBool>,
46    sink: Arc<dyn Sink>,
47}
48
49impl std::fmt::Debug for TierWorker {
50    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
51        f.debug_struct("TierWorker")
52            .field("alive", &self.sender.lock().is_some())
53            .finish()
54    }
55}
56
57impl TierWorker {
58    /// Spawn a worker that drains a bounded mpsc channel and delivers
59    /// each envelope to `sink` after running it through the per-tier
60    /// scrubber.
61    pub fn spawn(
62        capacity: usize,
63        sink: Arc<dyn Sink>,
64        registry: Arc<SchemaRegistry>,
65        counters: Arc<WorkerCounters>,
66        tier: Tier,
67    ) -> Self {
68        let (tx, mut rx) = mpsc::channel::<WorkerMsg>(capacity.max(1));
69        let shutdown = Arc::new(AtomicBool::new(false));
70        let shutdown_in = Arc::clone(&shutdown);
71        let sink_in = Arc::clone(&sink);
72        let registry_in = registry;
73        let counters_in = counters;
74        let join = tokio::spawn(async move {
75            let mut scratch = BytesMut::with_capacity(4096);
76            // Drain the channel until the sender side is dropped. The
77            // observer's `shutdown()` drops the sender, which makes
78            // `rx.recv()` return None and the loop exit cleanly.
79            while let Some(msg) = rx.recv().await {
80                if let Some(env) = msg.envelope.as_ref() {
81                    deliver_one(env, &registry_in, &mut scratch, &sink_in);
82                    counters_in.delivered.fetch_add(1, Ordering::Relaxed);
83                    if shutdown_in.load(Ordering::Relaxed) && rx.is_empty() {
84                        break;
85                    }
86                } else if let Some(ack) = msg.flush {
87                    sink_in.flush().await;
88                    let _ = ack.send(());
89                }
90            }
91            // Final non-blocking drain (in case shutdown raced with
92            // an in-flight send).
93            while let Ok(msg) = rx.try_recv() {
94                if let Some(env) = msg.envelope.as_ref() {
95                    deliver_one(env, &registry_in, &mut scratch, &sink_in);
96                    counters_in.delivered.fetch_add(1, Ordering::Relaxed);
97                } else if let Some(ack) = msg.flush {
98                    sink_in.flush().await;
99                    let _ = ack.send(());
100                }
101            }
102            sink_in.flush().await;
103            let _ = tier;
104        });
105        Self {
106            sender: parking_lot::Mutex::new(Some(tx)),
107            join: AsyncMutex::new(Some(join)),
108            shutdown,
109            sink,
110        }
111    }
112
113    /// Try to enqueue `env` on this tier's channel. The error variant
114    /// returns the original envelope so the caller can spool it.
115    /// Allow `result_large_err` because an envelope is a large struct
116    /// and boxing it would defeat the spec's no-allocation contract on
117    /// the hot path.
118    #[allow(clippy::result_large_err)]
119    pub fn try_send(&self, env: ObsEnvelope) -> Result<(), ObsEnvelope> {
120        let guard = self.sender.lock();
121        let Some(sender) = guard.as_ref() else {
122            return Err(env);
123        };
124        match sender.try_send(WorkerMsg::envelope(env)) {
125            Ok(()) => Ok(()),
126            Err(mpsc::error::TrySendError::Full(msg) | mpsc::error::TrySendError::Closed(msg)) => {
127                if let Some(env) = msg.into_envelope() {
128                    Err(env)
129                } else {
130                    Ok(())
131                }
132            }
133        }
134    }
135
136    /// Bounded blocking send used by the AUDIT tier. The future
137    /// resolves when the envelope is enqueued or the timeout elapses.
138    /// See [`Self::try_send`] for the rationale on `result_large_err`.
139    ///
140    /// Currently the AUDIT path uses a sync `try_send` busy-wait loop
141    /// with `std::thread::sleep` instead of this async helper, so the
142    /// `#[allow(dead_code)]` keeps the helper around for callers that
143    /// want a future-shaped variant.
144    #[allow(clippy::result_large_err, dead_code)]
145    pub async fn send_with_timeout(
146        &self,
147        env: ObsEnvelope,
148        timeout: std::time::Duration,
149    ) -> Result<(), ObsEnvelope> {
150        let sender = match self.sender.lock().as_ref() {
151            Some(s) => s.clone(),
152            None => return Err(env),
153        };
154        let cloned = env.clone();
155        match tokio::time::timeout(timeout, sender.send(WorkerMsg::envelope(env))).await {
156            Ok(Ok(())) => Ok(()),
157            Ok(Err(mpsc::error::SendError(msg))) => {
158                if let Some(env) = msg.into_envelope() {
159                    Err(env)
160                } else {
161                    Ok(())
162                }
163            }
164            Err(_) => Err(cloned),
165        }
166    }
167
168    /// Drain in-flight envelopes and return when the worker is idle.
169    pub async fn flush(&self) {
170        let sender = {
171            let guard = self.sender.lock();
172            guard.as_ref().cloned()
173        };
174        let Some(sender) = sender else {
175            self.sink.flush().await;
176            return;
177        };
178        let (tx, rx) = oneshot::channel();
179        if sender.send(WorkerMsg::flush(tx)).await.is_ok() {
180            let _ = rx.await;
181        } else {
182            self.sink.flush().await;
183        }
184    }
185
186    /// Shut down the worker: drop the sender (so the receiver's
187    /// `recv().await` returns `None` and the loop exits), wait for the
188    /// task to finish, then call `Sink::shutdown`.
189    pub async fn shutdown(&self) {
190        self.shutdown.store(true, Ordering::SeqCst);
191        // Drop the sender so rx.recv() returns None.
192        self.sender.lock().take();
193        let mut guard = self.join.lock().await;
194        if let Some(join) = guard.take() {
195            let _ = join.await;
196        }
197        self.sink.shutdown().await;
198    }
199
200    /// Borrow the underlying sink.
201    #[allow(dead_code)]
202    pub fn sink(&self) -> &Arc<dyn Sink> {
203        &self.sink
204    }
205}
206
207struct WorkerMsg {
208    envelope: Option<ObsEnvelope>,
209    flush: Option<oneshot::Sender<()>>,
210}
211
212impl WorkerMsg {
213    fn envelope(env: ObsEnvelope) -> Self {
214        Self {
215            envelope: Some(env),
216            flush: None,
217        }
218    }
219
220    fn flush(ack: oneshot::Sender<()>) -> Self {
221        Self {
222            envelope: None,
223            flush: Some(ack),
224        }
225    }
226
227    fn into_envelope(self) -> Option<ObsEnvelope> {
228        self.envelope
229    }
230}
231
232fn deliver_one(
233    env: &ObsEnvelope,
234    registry: &Arc<SchemaRegistry>,
235    scratch: &mut BytesMut,
236    sink: &Arc<dyn Sink>,
237) {
238    scratch.clear();
239    let scrubbed = match ScrubbedEnvelope::scrub(env, registry, scratch) {
240        Ok(s) => s,
241        Err(_) => {
242            // Spec 14 § 8 last row — the unscrubbed envelope is never
243            // passed to a sink. Drop and increment a counter (the
244            // counter itself is surfaced via ObsSinkFailed in a later
245            // milestone; for now we silently drop).
246            return;
247        }
248    };
249    sink.deliver(scrubbed);
250}
251
252/// Adapter: schedule a per-tier worker with a bounded queue, returning
253/// the worker handle. Wired by `StandardObserverBuilder::build`.
254pub fn spawn_tier_worker(
255    tier: Tier,
256    cfg: &QueuesConfig,
257    sink: Arc<dyn Sink>,
258    registry: Arc<SchemaRegistry>,
259    counters: Arc<WorkerCounters>,
260) -> Option<TierWorker> {
261    let cap = match tier {
262        Tier::Log => cfg.log,
263        Tier::Metric => cfg.metric,
264        Tier::Trace => cfg.trace,
265        Tier::Audit => cfg.log, /* AUDIT capacity comes from AuditConfig; this caller passes its */
266        // own
267        _ => return None,
268    } as usize;
269    if Handle::try_current().is_err() {
270        // No tokio runtime → fall back to in-emit-thread synchronous
271        // delivery; do not spawn a worker.
272        return None;
273    }
274    Some(TierWorker::spawn(cap, sink, registry, counters, tier))
275}
276
277/// Increment the per-tier `channel_full_*` counter when an emit-thread
278/// `try_send` fails with full / closed.
279pub fn note_channel_full(counters: &WorkerCounters, tier: Tier) {
280    let target = match tier {
281        Tier::Log => &counters.channel_full_log,
282        Tier::Metric => &counters.channel_full_metric,
283        Tier::Trace => &counters.channel_full_trace,
284        Tier::Audit => &counters.channel_full_audit,
285        _ => return,
286    };
287    target.fetch_add(1, Ordering::Relaxed);
288}