use crate::{LoggerImpl, LoggerStatus, Message, Service};
use crossbeam::channel::Sender;
use std::any::Any;
use std::sync::Arc;
use std::thread;
use std::thread::JoinHandle;
struct PerishableMessage {
pub message: Message,
pub lives: usize,
pub sender: Sender<PerishableMessage>,
}
pub struct Queued {
service: Arc<dyn Service + Send + Sync>,
max_retries: usize,
workers: Vec<JoinHandle<()>>,
sender: Option<Sender<PerishableMessage>>,
}
impl Queued {
pub fn new(
service: Box<dyn Service + Send + Sync>,
max_retries: usize,
worker_count: usize,
) -> Box<Self> {
let service: Arc<dyn Service + Send + Sync> = Arc::from(service);
let (sender, receiver) = crossbeam::channel::unbounded::<PerishableMessage>();
let mut workers = Vec::with_capacity(worker_count);
for _ in 0..worker_count {
let worker_receiver = receiver.clone();
let worker_service = service.clone();
workers.push(thread::spawn(move || {
while let Ok( mut message) = worker_receiver.recv() {
let result = worker_service.work(&message.message);
if result.is_ok() {
continue;
}
let error = result.unwrap_err();
if message.lives == 0 {
worker_service.fallback(&error, &message.message);
continue;
}
message.lives -= 1;
let sender = message.sender.clone();
let result = sender.send(message);
if let Err(err) = result {
worker_service.fallback(&error, &err.0.message);
}
}
}));
}
Box::new(Queued {
service,
max_retries,
workers,
sender: Some(sender),
})
}
pub fn get_service(&self) -> &dyn Service {
self.service.as_ref()
}
pub fn take_service(self) -> Arc<dyn Service + Send + Sync> {
self.service.clone()
}
}
impl LoggerImpl for Queued {
fn status(&self) -> LoggerStatus {
self.service.status()
}
fn log(&self, message: Message) {
let sender = self
.sender
.as_ref()
.expect("AsyncLogger integrity violation: log() called after drop() initialization.");
let _ = sender.send(PerishableMessage {
message,
lives: self.max_retries,
sender: sender.clone(),
});
}
fn as_any(&self) -> &dyn Any {
self
}
}
impl Drop for Queued {
fn drop(&mut self) {
self.sender = None;
for worker in self.workers.drain(..) {
let _ = worker.join();
}
}
}