use std::sync::{
Arc,
atomic::{AtomicBool, AtomicU64, Ordering},
};
use bytes::BytesMut;
use obs_proto::obs::v1::ObsEnvelope;
use obs_types::Tier;
use tokio::{
runtime::Handle,
sync::{Mutex as AsyncMutex, mpsc},
task::JoinHandle,
};
use crate::{
config::QueuesConfig,
registry::{SchemaRegistry, ScrubbedEnvelope},
sink::Sink,
};
#[derive(Debug, Default)]
pub struct WorkerCounters {
pub channel_full_log: AtomicU64,
pub channel_full_metric: AtomicU64,
pub channel_full_trace: AtomicU64,
pub channel_full_audit: AtomicU64,
pub delivered: AtomicU64,
}
pub struct TierWorker {
sender: parking_lot::Mutex<Option<mpsc::Sender<ObsEnvelope>>>,
join: AsyncMutex<Option<JoinHandle<()>>>,
shutdown: Arc<AtomicBool>,
sink: Arc<dyn Sink>,
}
impl std::fmt::Debug for TierWorker {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("TierWorker")
.field("alive", &self.sender.lock().is_some())
.finish()
}
}
impl TierWorker {
pub fn spawn(
capacity: usize,
sink: Arc<dyn Sink>,
registry: Arc<SchemaRegistry>,
counters: Arc<WorkerCounters>,
tier: Tier,
) -> Self {
let (tx, mut rx) = mpsc::channel::<ObsEnvelope>(capacity.max(1));
let shutdown = Arc::new(AtomicBool::new(false));
let shutdown_in = Arc::clone(&shutdown);
let sink_in = Arc::clone(&sink);
let registry_in = registry;
let counters_in = counters;
let join = tokio::spawn(async move {
let mut scratch = BytesMut::with_capacity(4096);
while let Some(env) = rx.recv().await {
deliver_one(&env, ®istry_in, &mut scratch, &sink_in);
counters_in.delivered.fetch_add(1, Ordering::Relaxed);
if shutdown_in.load(Ordering::Relaxed) && rx.is_empty() {
break;
}
}
while let Ok(env) = rx.try_recv() {
deliver_one(&env, ®istry_in, &mut scratch, &sink_in);
counters_in.delivered.fetch_add(1, Ordering::Relaxed);
}
sink_in.flush().await;
let _ = tier;
});
Self {
sender: parking_lot::Mutex::new(Some(tx)),
join: AsyncMutex::new(Some(join)),
shutdown,
sink,
}
}
#[allow(clippy::result_large_err)]
pub fn try_send(&self, env: ObsEnvelope) -> Result<(), ObsEnvelope> {
let guard = self.sender.lock();
let Some(sender) = guard.as_ref() else {
return Err(env);
};
match sender.try_send(env) {
Ok(()) => Ok(()),
Err(mpsc::error::TrySendError::Full(env) | mpsc::error::TrySendError::Closed(env)) => {
Err(env)
}
}
}
#[allow(clippy::result_large_err, dead_code)]
pub async fn send_with_timeout(
&self,
env: ObsEnvelope,
timeout: std::time::Duration,
) -> Result<(), ObsEnvelope> {
let sender = match self.sender.lock().as_ref() {
Some(s) => s.clone(),
None => return Err(env),
};
let cloned = env.clone();
match tokio::time::timeout(timeout, sender.send(env)).await {
Ok(Ok(())) => Ok(()),
Ok(Err(mpsc::error::SendError(env))) => Err(env),
Err(_) => Err(cloned),
}
}
pub async fn flush(&self) {
tokio::task::yield_now().await;
self.sink.flush().await;
}
pub async fn shutdown(&self) {
self.shutdown.store(true, Ordering::SeqCst);
self.sender.lock().take();
let mut guard = self.join.lock().await;
if let Some(join) = guard.take() {
let _ = join.await;
}
self.sink.shutdown().await;
}
#[allow(dead_code)]
pub fn sink(&self) -> &Arc<dyn Sink> {
&self.sink
}
}
fn deliver_one(
env: &ObsEnvelope,
registry: &Arc<SchemaRegistry>,
scratch: &mut BytesMut,
sink: &Arc<dyn Sink>,
) {
scratch.clear();
let scrubbed = match ScrubbedEnvelope::scrub(env, registry, scratch) {
Ok(s) => s,
Err(_) => {
return;
}
};
sink.deliver(scrubbed);
}
pub fn spawn_tier_worker(
tier: Tier,
cfg: &QueuesConfig,
sink: Arc<dyn Sink>,
registry: Arc<SchemaRegistry>,
counters: Arc<WorkerCounters>,
) -> Option<TierWorker> {
let cap = match tier {
Tier::Log => cfg.log,
Tier::Metric => cfg.metric,
Tier::Trace => cfg.trace,
Tier::Audit => cfg.log,
_ => return None,
} as usize;
if Handle::try_current().is_err() {
return None;
}
Some(TierWorker::spawn(cap, sink, registry, counters, tier))
}
pub fn note_channel_full(counters: &WorkerCounters, tier: Tier) {
let target = match tier {
Tier::Log => &counters.channel_full_log,
Tier::Metric => &counters.channel_full_metric,
Tier::Trace => &counters.channel_full_trace,
Tier::Audit => &counters.channel_full_audit,
_ => return,
};
target.fetch_add(1, Ordering::Relaxed);
}