use crossbeam_channel::{bounded, Receiver, RecvTimeoutError, Sender};
use std::{
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
thread::{self, JoinHandle},
time::Instant,
};
use std::time::Duration;
use crate::{api::Api, error::MitaError};
pub type Payload = serde_json::Value;
pub struct MitaWorker {
tx: Sender<Payload>,
handles: Vec<JoinHandle<()>>,
stop_flag: Arc<AtomicBool>,
}
impl MitaWorker {
pub fn new(
api_url: impl Into<String>,
password: impl Into<String>,
threads: usize,
queue_cap: usize,
verbose: bool,
) -> Self {
let (tx, rx) = bounded::<Payload>(queue_cap);
let url = api_url.into();
let pwd = password.into();
let stop_flag = Arc::new(AtomicBool::new(false));
let mut handles = Vec::with_capacity(threads);
for _ in 0..threads {
handles.push(Self::spawn_one(
rx.clone(),
stop_flag.clone(),
url.clone(),
pwd.clone(),
verbose,
));
}
Self {
tx,
handles,
stop_flag,
}
}
pub fn submit(&self, payload: Payload) -> Result<(), crossbeam_channel::SendError<Payload>> {
self.tx.send(payload)
}
pub fn stop(&self) {
self.stop_flag.store(true, Ordering::SeqCst);
drop(self.tx.clone()); }
pub fn join(self) {
for h in self.handles {
let _ = h.join();
}
}
fn spawn_one(
rx: Receiver<Payload>,
stop: Arc<AtomicBool>,
url: String,
password: String,
verbose: bool,
) -> JoinHandle<()> {
thread::spawn(move || {
let api = Api::new(url);
while api.auth(&password).is_err() {
eprintln!("[MitaWorker] auth failed, retry in 3s");
thread::sleep(std::time::Duration::from_secs(3));
}
while !stop.load(Ordering::SeqCst) {
match rx.recv_timeout(Duration::from_secs(1)) {
Ok(payload) => {
MitaWorker::handle_payload(&api, &password, payload, verbose);
}
Err(RecvTimeoutError::Timeout) => {
continue;
}
Err(RecvTimeoutError::Disconnected) => {
break;
}
}
}
})
}
fn handle_payload(api: &Api, password: &str, payload: Payload, verbose: bool) {
println!("[MitaWorker] received payload = {}", payload);
let t0 = Instant::now();
match api.push(&payload) {
Ok(()) => {
if verbose {
println!("[MitaWorker] pushed in {:.3}s", t0.elapsed().as_secs_f64());
}
}
Err(MitaError::Auth) => {
if api.auth(password).is_ok() {
let _ = api.push(&payload);
} else {
eprintln!("[MitaWorker] auth retry failed");
}
}
Err(MitaError::Net(e)) => {
eprintln!("[MitaWorker] connection error: {e}");
}
Err(MitaError::Config(reason)) => {
eprintln!("[MitaWorker] config error: {reason}");
}
Err(MitaError::QueueClosed) => {
eprintln!("[MitaWorker] queue closed");
}
}
}
}