use std::{
sync::{
Arc,
atomic::{AtomicBool, Ordering},
},
thread::{self, JoinHandle},
};
use crate::ipc::{EventScope, Receiver, Sender, channel};
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Hash)]
pub enum RequestPriority {
#[default]
High,
Low,
}
#[derive(Debug)]
pub struct SaturationRequest<T> {
pub data: T,
pub priority: RequestPriority,
pub scope: Option<EventScope>,
}
pub struct SaturatorHandle<T> {
high_tx: Sender<WorkItem<T>>,
low_tx: Sender<WorkItem<T>>,
shutdown: Arc<AtomicBool>,
worker: Option<JoinHandle<()>>,
}
struct WorkItem<T> {
data: T,
scope: Option<EventScope>,
}
impl<T: Send + 'static> SaturatorHandle<T> {
pub fn submit(&self, work: T, scope: Option<&EventScope>) {
if let Some(s) = scope {
s.increment();
}
let _ = self.high_tx.send(WorkItem {
data: work,
scope: scope.cloned(),
});
}
pub fn submit_background(&self, work: T, scope: Option<&EventScope>) {
if let Some(s) = scope {
s.increment();
}
let _ = self.low_tx.send(WorkItem {
data: work,
scope: scope.cloned(),
});
}
pub fn submit_request(&self, request: SaturationRequest<T>) {
let scope = request.scope.as_ref();
match request.priority {
RequestPriority::High => self.submit(request.data, scope),
RequestPriority::Low => self.submit_background(request.data, scope),
}
}
pub fn shutdown(&self) {
self.shutdown.store(true, Ordering::Release);
}
#[must_use]
pub fn is_shutting_down(&self) -> bool {
self.shutdown.load(Ordering::Acquire)
}
}
impl<T> Clone for SaturatorHandle<T> {
fn clone(&self) -> Self {
Self {
high_tx: self.high_tx.clone(),
low_tx: self.low_tx.clone(),
shutdown: Arc::clone(&self.shutdown),
worker: None, }
}
}
impl<T> Drop for SaturatorHandle<T> {
fn drop(&mut self) {
if let Some(worker) = self.worker.take() {
self.shutdown.store(true, Ordering::Release);
let _ = worker.join();
}
}
}
pub fn spawn_saturator<T, F, R, C>(processor: F, on_complete: C) -> SaturatorHandle<T>
where
T: Send + 'static,
F: Fn(T) -> R + Send + Sync + 'static,
R: Send + 'static,
C: Fn(R) + Send + Sync + 'static,
{
let (high_tx, high_rx) = channel();
let (low_tx, low_rx) = channel();
let shutdown = Arc::new(AtomicBool::new(false));
let shutdown_clone = Arc::clone(&shutdown);
let processor = Arc::new(processor);
let on_complete = Arc::new(on_complete);
let worker = thread::spawn(move || {
worker_loop(high_rx, low_rx, shutdown_clone, processor, on_complete);
});
SaturatorHandle {
high_tx,
low_tx,
shutdown,
worker: Some(worker),
}
}
#[allow(clippy::needless_pass_by_value)] #[cfg_attr(coverage_nightly, coverage(off))]
fn worker_loop<T, F, R, C>(
high_rx: Receiver<WorkItem<T>>,
low_rx: Receiver<WorkItem<T>>,
shutdown: Arc<AtomicBool>,
processor: Arc<F>,
on_complete: Arc<C>,
) where
T: Send + 'static,
F: Fn(T) -> R + Send + Sync + 'static,
R: Send + 'static,
C: Fn(R) + Send + Sync + 'static,
{
loop {
if shutdown.load(Ordering::Acquire) {
while let Ok(item) = high_rx.try_recv() {
process_item(item, &processor, &on_complete);
}
break;
}
if let Ok(item) = high_rx.try_recv() {
process_item(item, &processor, &on_complete);
continue;
}
if let Ok(item) = low_rx.try_recv() {
process_item(item, &processor, &on_complete);
continue;
}
thread::yield_now();
}
}
fn process_item<T, F, R, C>(item: WorkItem<T>, processor: &Arc<F>, on_complete: &Arc<C>)
where
F: Fn(T) -> R,
C: Fn(R),
{
let result = processor(item.data);
on_complete(result);
if let Some(scope) = item.scope {
scope.decrement();
}
}
#[derive(Debug, Clone)]
pub struct SaturatorConfig {
pub drain_on_shutdown: bool,
}
impl Default for SaturatorConfig {
fn default() -> Self {
Self {
drain_on_shutdown: true,
}
}
}