1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
//! Job runner for futures and async functions

pub mod vars;
pub mod worker;

use std::collections::HashMap;
use std::sync::Arc;

use serde::{Deserialize, Serialize};
use tokio::sync::{mpsc, watch};

use worker::WorkerProcessor;
pub use worker::{Worker, WorkerState};

/// Job runner for futures and async functions
pub struct BackgroundRunner {
	send_worker: mpsc::UnboundedSender<Box<dyn Worker>>,
	worker_info: Arc<std::sync::Mutex<HashMap<usize, WorkerInfo>>>,
}

#[derive(Clone, Serialize, Deserialize, Debug)]
pub struct WorkerInfo {
	pub name: String,
	pub status: WorkerStatus,
	pub state: WorkerState,
	pub errors: usize,
	pub consecutive_errors: usize,
	pub last_error: Option<(String, u64)>,
}

/// WorkerStatus is a struct returned by the worker with a bunch of canonical
/// fields to indicate their status to CLI users. All fields are optional.
#[derive(Clone, Serialize, Deserialize, Debug, Default)]
pub struct WorkerStatus {
	pub tranquility: Option<u32>,
	pub progress: Option<String>,
	pub queue_length: Option<u64>,
	pub persistent_errors: Option<u64>,
	pub freeform: Vec<String>,
}

impl BackgroundRunner {
	/// Create a new BackgroundRunner
	pub fn new(stop_signal: watch::Receiver<bool>) -> (Arc<Self>, tokio::task::JoinHandle<()>) {
		let (send_worker, worker_out) = mpsc::unbounded_channel::<Box<dyn Worker>>();

		let worker_info = Arc::new(std::sync::Mutex::new(HashMap::new()));
		let mut worker_processor =
			WorkerProcessor::new(worker_out, stop_signal, worker_info.clone());

		let await_all_done = tokio::spawn(async move {
			worker_processor.run().await;
		});

		let bgrunner = Arc::new(Self {
			send_worker,
			worker_info,
		});
		(bgrunner, await_all_done)
	}

	pub fn get_worker_info(&self) -> HashMap<usize, WorkerInfo> {
		self.worker_info.lock().unwrap().clone()
	}

	pub fn spawn_worker<W>(&self, worker: W)
	where
		W: Worker + 'static,
	{
		self.send_worker
			.send(Box::new(worker))
			.ok()
			.expect("Could not put worker in queue");
	}
}