Skip to main content

obs_core/observer/
workers.rs

1//! Per-tier worker pool — bounded `tokio::sync::mpsc` channels with
2//! one drain task per tier. Spec 11 § 4.
3//!
4//! AUDIT is special: bounded blocking + spool fallback (spec 11 § 6.4)
5//! — see [`crate::audit_spool`].
6
7use std::sync::{
8    Arc,
9    atomic::{AtomicBool, AtomicU64, Ordering},
10};
11
12use bytes::BytesMut;
13use obs_proto::obs::v1::ObsEnvelope;
14use obs_types::Tier;
15use tokio::{
16    runtime::Handle,
17    sync::{Mutex as AsyncMutex, mpsc},
18    task::JoinHandle,
19};
20
21use crate::{
22    config::QueuesConfig,
23    registry::{SchemaRegistry, ScrubbedEnvelope},
24    sink::Sink,
25};
26
27/// Per-tier counters surfaced as `ObsSinkDropped` self-events.
28#[derive(Debug, Default)]
29pub struct WorkerCounters {
30    /// Bytes dropped at emit-time mpsc send.
31    pub channel_full_log: AtomicU64,
32    /// Bytes dropped at emit-time mpsc send (METRIC).
33    pub channel_full_metric: AtomicU64,
34    /// Bytes dropped at emit-time mpsc send (TRACE).
35    pub channel_full_trace: AtomicU64,
36    /// Bytes dropped at emit-time mpsc send (AUDIT).
37    pub channel_full_audit: AtomicU64,
38    /// Total events delivered.
39    pub delivered: AtomicU64,
40}
41
42/// Single-tier worker handle.
43pub struct TierWorker {
44    sender: parking_lot::Mutex<Option<mpsc::Sender<ObsEnvelope>>>,
45    join: AsyncMutex<Option<JoinHandle<()>>>,
46    shutdown: Arc<AtomicBool>,
47    sink: Arc<dyn Sink>,
48}
49
50impl std::fmt::Debug for TierWorker {
51    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
52        f.debug_struct("TierWorker")
53            .field("alive", &self.sender.lock().is_some())
54            .finish()
55    }
56}
57
58impl TierWorker {
59    /// Spawn a worker that drains a bounded mpsc channel and delivers
60    /// each envelope to `sink` after running it through the per-tier
61    /// scrubber.
62    pub fn spawn(
63        capacity: usize,
64        sink: Arc<dyn Sink>,
65        registry: Arc<SchemaRegistry>,
66        counters: Arc<WorkerCounters>,
67        tier: Tier,
68    ) -> Self {
69        let (tx, mut rx) = mpsc::channel::<ObsEnvelope>(capacity.max(1));
70        let shutdown = Arc::new(AtomicBool::new(false));
71        let shutdown_in = Arc::clone(&shutdown);
72        let sink_in = Arc::clone(&sink);
73        let registry_in = registry;
74        let counters_in = counters;
75        let join = tokio::spawn(async move {
76            let mut scratch = BytesMut::with_capacity(4096);
77            // Drain the channel until the sender side is dropped. The
78            // observer's `shutdown()` drops the sender, which makes
79            // `rx.recv()` return None and the loop exit cleanly.
80            while let Some(env) = rx.recv().await {
81                deliver_one(&env, &registry_in, &mut scratch, &sink_in);
82                counters_in.delivered.fetch_add(1, Ordering::Relaxed);
83                if shutdown_in.load(Ordering::Relaxed) && rx.is_empty() {
84                    break;
85                }
86            }
87            // Final non-blocking drain (in case shutdown raced with
88            // an in-flight send).
89            while let Ok(env) = rx.try_recv() {
90                deliver_one(&env, &registry_in, &mut scratch, &sink_in);
91                counters_in.delivered.fetch_add(1, Ordering::Relaxed);
92            }
93            sink_in.flush().await;
94            let _ = tier;
95        });
96        Self {
97            sender: parking_lot::Mutex::new(Some(tx)),
98            join: AsyncMutex::new(Some(join)),
99            shutdown,
100            sink,
101        }
102    }
103
104    /// Try to enqueue `env` on this tier's channel. The error variant
105    /// returns the original envelope so the caller can spool it.
106    /// Allow `result_large_err` because an envelope is a large struct
107    /// and boxing it would defeat the spec's no-allocation contract on
108    /// the hot path.
109    #[allow(clippy::result_large_err)]
110    pub fn try_send(&self, env: ObsEnvelope) -> Result<(), ObsEnvelope> {
111        let guard = self.sender.lock();
112        let Some(sender) = guard.as_ref() else {
113            return Err(env);
114        };
115        match sender.try_send(env) {
116            Ok(()) => Ok(()),
117            Err(mpsc::error::TrySendError::Full(env) | mpsc::error::TrySendError::Closed(env)) => {
118                Err(env)
119            }
120        }
121    }
122
123    /// Bounded blocking send used by the AUDIT tier. The future
124    /// resolves when the envelope is enqueued or the timeout elapses.
125    /// See [`Self::try_send`] for the rationale on `result_large_err`.
126    ///
127    /// Currently the AUDIT path uses a sync `try_send` busy-wait loop
128    /// with `std::thread::sleep` instead of this async helper, so the
129    /// `#[allow(dead_code)]` keeps the helper around for callers that
130    /// want a future-shaped variant.
131    #[allow(clippy::result_large_err, dead_code)]
132    pub async fn send_with_timeout(
133        &self,
134        env: ObsEnvelope,
135        timeout: std::time::Duration,
136    ) -> Result<(), ObsEnvelope> {
137        let sender = match self.sender.lock().as_ref() {
138            Some(s) => s.clone(),
139            None => return Err(env),
140        };
141        let cloned = env.clone();
142        match tokio::time::timeout(timeout, sender.send(env)).await {
143            Ok(Ok(())) => Ok(()),
144            Ok(Err(mpsc::error::SendError(env))) => Err(env),
145            Err(_) => Err(cloned),
146        }
147    }
148
149    /// Drain in-flight envelopes and return when the worker is idle.
150    pub async fn flush(&self) {
151        // The mpsc backpressure plus the worker's `try_recv` drain
152        // make a `flush` call best-effort: yield once to let the
153        // worker poll, then call sink.flush() directly.
154        tokio::task::yield_now().await;
155        self.sink.flush().await;
156    }
157
158    /// Shut down the worker: drop the sender (so the receiver's
159    /// `recv().await` returns `None` and the loop exits), wait for the
160    /// task to finish, then call `Sink::shutdown`.
161    pub async fn shutdown(&self) {
162        self.shutdown.store(true, Ordering::SeqCst);
163        // Drop the sender so rx.recv() returns None.
164        self.sender.lock().take();
165        let mut guard = self.join.lock().await;
166        if let Some(join) = guard.take() {
167            let _ = join.await;
168        }
169        self.sink.shutdown().await;
170    }
171
172    /// Borrow the underlying sink.
173    #[allow(dead_code)]
174    pub fn sink(&self) -> &Arc<dyn Sink> {
175        &self.sink
176    }
177}
178
179fn deliver_one(
180    env: &ObsEnvelope,
181    registry: &Arc<SchemaRegistry>,
182    scratch: &mut BytesMut,
183    sink: &Arc<dyn Sink>,
184) {
185    scratch.clear();
186    let scrubbed = match ScrubbedEnvelope::scrub(env, registry, scratch) {
187        Ok(s) => s,
188        Err(_) => {
189            // Spec 14 § 8 last row — the unscrubbed envelope is never
190            // passed to a sink. Drop and increment a counter (the
191            // counter itself is surfaced via ObsSinkFailed in a later
192            // milestone; for now we silently drop).
193            return;
194        }
195    };
196    sink.deliver(scrubbed);
197}
198
199/// Adapter: schedule a per-tier worker with a bounded queue, returning
200/// the worker handle. Wired by `StandardObserverBuilder::build`.
201pub fn spawn_tier_worker(
202    tier: Tier,
203    cfg: &QueuesConfig,
204    sink: Arc<dyn Sink>,
205    registry: Arc<SchemaRegistry>,
206    counters: Arc<WorkerCounters>,
207) -> Option<TierWorker> {
208    let cap = match tier {
209        Tier::Log => cfg.log,
210        Tier::Metric => cfg.metric,
211        Tier::Trace => cfg.trace,
212        Tier::Audit => cfg.log, /* AUDIT capacity comes from AuditConfig; this caller passes its */
213        // own
214        _ => return None,
215    } as usize;
216    if Handle::try_current().is_err() {
217        // No tokio runtime → fall back to in-emit-thread synchronous
218        // delivery; do not spawn a worker.
219        return None;
220    }
221    Some(TierWorker::spawn(cap, sink, registry, counters, tier))
222}
223
224/// Increment the per-tier `channel_full_*` counter when an emit-thread
225/// `try_send` fails with full / closed.
226pub fn note_channel_full(counters: &WorkerCounters, tier: Tier) {
227    let target = match tier {
228        Tier::Log => &counters.channel_full_log,
229        Tier::Metric => &counters.channel_full_metric,
230        Tier::Trace => &counters.channel_full_trace,
231        Tier::Audit => &counters.channel_full_audit,
232        _ => return,
233    };
234    target.fetch_add(1, Ordering::Relaxed);
235}