Skip to main content

obs_core/observer/
workers.rs

1//! Per-tier worker pool — bounded `tokio::sync::mpsc` channels with
2//! one drain task per tier. Spec 11 § 4.
3//!
4//! AUDIT is special: bounded blocking + spool fallback (spec 11 § 6.4)
5//! — see [`crate::audit_spool`].
6
7use std::sync::{
8    Arc,
9    atomic::{AtomicBool, AtomicU64, Ordering},
10};
11
12use bytes::BytesMut;
13use obs_proto::obs::v1::{ObsEnvelope, Tier};
14use tokio::{
15    runtime::Handle,
16    sync::{Mutex as AsyncMutex, mpsc},
17    task::JoinHandle,
18};
19
20use crate::{
21    config::QueuesConfig,
22    registry::{SchemaRegistry, ScrubbedEnvelope},
23    sink::Sink,
24};
25
26/// Per-tier counters surfaced as `ObsSinkDropped` self-events.
27#[derive(Debug, Default)]
28pub struct WorkerCounters {
29    /// Bytes dropped at emit-time mpsc send.
30    pub channel_full_log: AtomicU64,
31    /// Bytes dropped at emit-time mpsc send (METRIC).
32    pub channel_full_metric: AtomicU64,
33    /// Bytes dropped at emit-time mpsc send (TRACE).
34    pub channel_full_trace: AtomicU64,
35    /// Bytes dropped at emit-time mpsc send (AUDIT).
36    pub channel_full_audit: AtomicU64,
37    /// Total events delivered.
38    pub delivered: AtomicU64,
39}
40
41/// Single-tier worker handle.
42pub struct TierWorker {
43    sender: parking_lot::Mutex<Option<mpsc::Sender<ObsEnvelope>>>,
44    join: AsyncMutex<Option<JoinHandle<()>>>,
45    shutdown: Arc<AtomicBool>,
46    sink: Arc<dyn Sink>,
47}
48
49impl std::fmt::Debug for TierWorker {
50    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
51        f.debug_struct("TierWorker")
52            .field("alive", &self.sender.lock().is_some())
53            .finish()
54    }
55}
56
57impl TierWorker {
58    /// Spawn a worker that drains a bounded mpsc channel and delivers
59    /// each envelope to `sink` after running it through the per-tier
60    /// scrubber.
61    pub fn spawn(
62        capacity: usize,
63        sink: Arc<dyn Sink>,
64        registry: Arc<SchemaRegistry>,
65        counters: Arc<WorkerCounters>,
66        tier: Tier,
67    ) -> Self {
68        let (tx, mut rx) = mpsc::channel::<ObsEnvelope>(capacity.max(1));
69        let shutdown = Arc::new(AtomicBool::new(false));
70        let shutdown_in = Arc::clone(&shutdown);
71        let sink_in = Arc::clone(&sink);
72        let registry_in = registry;
73        let counters_in = counters;
74        let join = tokio::spawn(async move {
75            let mut scratch = BytesMut::with_capacity(4096);
76            // Drain the channel until the sender side is dropped. The
77            // observer's `shutdown()` drops the sender, which makes
78            // `rx.recv()` return None and the loop exit cleanly.
79            while let Some(env) = rx.recv().await {
80                deliver_one(&env, &registry_in, &mut scratch, &sink_in);
81                counters_in.delivered.fetch_add(1, Ordering::Relaxed);
82                if shutdown_in.load(Ordering::Relaxed) && rx.is_empty() {
83                    break;
84                }
85            }
86            // Final non-blocking drain (in case shutdown raced with
87            // an in-flight send).
88            while let Ok(env) = rx.try_recv() {
89                deliver_one(&env, &registry_in, &mut scratch, &sink_in);
90                counters_in.delivered.fetch_add(1, Ordering::Relaxed);
91            }
92            sink_in.flush().await;
93            let _ = tier;
94        });
95        Self {
96            sender: parking_lot::Mutex::new(Some(tx)),
97            join: AsyncMutex::new(Some(join)),
98            shutdown,
99            sink,
100        }
101    }
102
103    /// Try to enqueue `env` on this tier's channel. The error variant
104    /// returns the original envelope so the caller can spool it.
105    /// Allow `result_large_err` because an envelope is a large struct
106    /// and boxing it would defeat the spec's no-allocation contract on
107    /// the hot path.
108    #[allow(clippy::result_large_err)]
109    pub fn try_send(&self, env: ObsEnvelope) -> Result<(), ObsEnvelope> {
110        let guard = self.sender.lock();
111        let Some(sender) = guard.as_ref() else {
112            return Err(env);
113        };
114        match sender.try_send(env) {
115            Ok(()) => Ok(()),
116            Err(mpsc::error::TrySendError::Full(env) | mpsc::error::TrySendError::Closed(env)) => {
117                Err(env)
118            }
119        }
120    }
121
122    /// Bounded blocking send used by the AUDIT tier. The future
123    /// resolves when the envelope is enqueued or the timeout elapses.
124    /// See [`Self::try_send`] for the rationale on `result_large_err`.
125    ///
126    /// Currently the AUDIT path uses a sync `try_send` busy-wait loop
127    /// with `std::thread::sleep` instead of this async helper, so the
128    /// `#[allow(dead_code)]` keeps the helper around for callers that
129    /// want a future-shaped variant.
130    #[allow(clippy::result_large_err, dead_code)]
131    pub async fn send_with_timeout(
132        &self,
133        env: ObsEnvelope,
134        timeout: std::time::Duration,
135    ) -> Result<(), ObsEnvelope> {
136        let sender = match self.sender.lock().as_ref() {
137            Some(s) => s.clone(),
138            None => return Err(env),
139        };
140        let cloned = env.clone();
141        match tokio::time::timeout(timeout, sender.send(env)).await {
142            Ok(Ok(())) => Ok(()),
143            Ok(Err(mpsc::error::SendError(env))) => Err(env),
144            Err(_) => Err(cloned),
145        }
146    }
147
148    /// Drain in-flight envelopes and return when the worker is idle.
149    pub async fn flush(&self) {
150        // The mpsc backpressure plus the worker's `try_recv` drain
151        // make a `flush` call best-effort: yield once to let the
152        // worker poll, then call sink.flush() directly.
153        tokio::task::yield_now().await;
154        self.sink.flush().await;
155    }
156
157    /// Shut down the worker: drop the sender (so the receiver's
158    /// `recv().await` returns `None` and the loop exits), wait for the
159    /// task to finish, then call `Sink::shutdown`.
160    pub async fn shutdown(&self) {
161        self.shutdown.store(true, Ordering::SeqCst);
162        // Drop the sender so rx.recv() returns None.
163        self.sender.lock().take();
164        let mut guard = self.join.lock().await;
165        if let Some(join) = guard.take() {
166            let _ = join.await;
167        }
168        self.sink.shutdown().await;
169    }
170
171    /// Borrow the underlying sink.
172    #[allow(dead_code)]
173    pub fn sink(&self) -> &Arc<dyn Sink> {
174        &self.sink
175    }
176}
177
178fn deliver_one(
179    env: &ObsEnvelope,
180    registry: &Arc<SchemaRegistry>,
181    scratch: &mut BytesMut,
182    sink: &Arc<dyn Sink>,
183) {
184    scratch.clear();
185    let scrubbed = match ScrubbedEnvelope::scrub(env, registry, scratch) {
186        Ok(s) => s,
187        Err(_) => {
188            // Spec 14 § 8 last row — the unscrubbed envelope is never
189            // passed to a sink. Drop and increment a counter (the
190            // counter itself is surfaced via ObsSinkFailed in a later
191            // milestone; for now we silently drop).
192            return;
193        }
194    };
195    sink.deliver(scrubbed);
196}
197
198/// Adapter: schedule a per-tier worker with a bounded queue, returning
199/// the worker handle. Wired by `StandardObserverBuilder::build`.
200pub fn spawn_tier_worker(
201    tier: Tier,
202    cfg: &QueuesConfig,
203    sink: Arc<dyn Sink>,
204    registry: Arc<SchemaRegistry>,
205    counters: Arc<WorkerCounters>,
206) -> Option<TierWorker> {
207    let cap = match tier {
208        Tier::Log => cfg.log,
209        Tier::Metric => cfg.metric,
210        Tier::Trace => cfg.trace,
211        Tier::Audit => cfg.log, /* AUDIT capacity comes from AuditConfig; this caller passes its */
212        // own
213        _ => return None,
214    } as usize;
215    if Handle::try_current().is_err() {
216        // No tokio runtime → fall back to in-emit-thread synchronous
217        // delivery; do not spawn a worker.
218        return None;
219    }
220    Some(TierWorker::spawn(cap, sink, registry, counters, tier))
221}
222
223/// Increment the per-tier `channel_full_*` counter when an emit-thread
224/// `try_send` fails with full / closed.
225pub fn note_channel_full(counters: &WorkerCounters, tier: Tier) {
226    let target = match tier {
227        Tier::Log => &counters.channel_full_log,
228        Tier::Metric => &counters.channel_full_metric,
229        Tier::Trace => &counters.channel_full_trace,
230        Tier::Audit => &counters.channel_full_audit,
231        _ => return,
232    };
233    target.fetch_add(1, Ordering::Relaxed);
234}