use log::debug;
use once_cell::sync::OnceCell;
use rand::Rng;
use tokio::runtime::{Builder, Handle};
use tokio::sync::oneshot::{channel, Sender};
pub(crate) struct OffloadRuntime {
shards: usize,
thread_per_shard: usize,
pools: OnceCell<Box<[(Handle, Sender<()>)]>>,
}
impl OffloadRuntime {
pub fn new(shards: usize, thread_per_shard: usize) -> Self {
assert!(shards != 0);
assert!(thread_per_shard != 0);
OffloadRuntime {
shards,
thread_per_shard,
pools: OnceCell::new(),
}
}
fn init_pools(&self) -> Box<[(Handle, Sender<()>)]> {
let threads = self.shards * self.thread_per_shard;
let mut pools = Vec::with_capacity(threads);
for _ in 0..threads {
let rt = Builder::new_current_thread().enable_all().build().unwrap();
let handler = rt.handle().clone();
let (tx, rx) = channel::<()>();
std::thread::Builder::new()
.name("Offload thread".to_string())
.spawn(move || {
debug!("Offload thread started");
rt.block_on(rx)
})
.unwrap();
pools.push((handler, tx));
}
pools.into_boxed_slice()
}
pub fn get_runtime(&self, hash: u64) -> &Handle {
let mut rng = rand::thread_rng();
let shard = hash as usize % self.shards;
let thread_in_shard = rng.gen_range(0..self.thread_per_shard);
let pools = self.pools.get_or_init(|| self.init_pools());
&pools[shard * self.thread_per_shard + thread_in_shard].0
}
}