use hashbrown::HashMap;
use std::sync::atomic::Ordering;
use std::sync::Arc;
pub struct DispatcherBuilder<ResourceId> {
resource_locks: HashMap<ResourceId, tokio::sync::lock::Lock<()>>,
}
impl<ResourceId> DispatcherBuilder<ResourceId>
where
ResourceId: super::ResourceIdTrait,
{
pub fn new() -> Self {
DispatcherBuilder {
resource_locks: HashMap::new(),
}
}
pub fn with_resource_id(mut self, resource_id: ResourceId) -> Self {
self.register_resource_id(resource_id);
self
}
pub fn register_resource_id(&mut self, resource_id: ResourceId) {
self.resource_locks
.insert(resource_id.clone(), tokio::sync::lock::Lock::new(()));
}
pub fn build(self) -> Dispatcher<ResourceId> {
return Dispatcher {
next_task_id: std::sync::atomic::AtomicUsize::new(0),
dispatch_lock: tokio::sync::lock::Lock::new(()),
resource_locks: self.resource_locks,
should_terminate: std::sync::atomic::AtomicBool::new(false),
};
}
}
pub struct Dispatcher<ResourceId>
where
ResourceId: super::ResourceIdTrait,
{
next_task_id: std::sync::atomic::AtomicUsize,
dispatch_lock: tokio::sync::lock::Lock<()>,
resource_locks: HashMap<ResourceId, tokio::sync::lock::Lock<()>>,
should_terminate: std::sync::atomic::AtomicBool,
}
impl<ResourceId> Dispatcher<ResourceId>
where
ResourceId: super::ResourceIdTrait,
{
pub(super) fn dispatch_lock(&self) -> &tokio::sync::lock::Lock<()> {
&self.dispatch_lock
}
pub(super) fn resource_locks(&self) -> &HashMap<ResourceId, tokio::sync::lock::Lock<()>> {
&self.resource_locks
}
pub(super) fn take_task_id(&self) -> usize {
self.next_task_id
.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
}
pub fn end_game_loop(&self) {
self.should_terminate.swap(true, Ordering::Release);
}
pub fn enter_game_loop<F, FutureT>(self, f: F)
where
F: Fn(Arc<Dispatcher<ResourceId>>) -> FutureT + Send + Sync + 'static,
FutureT: futures::future::Future<Item = (), Error = ()> + Send + 'static,
{
let dispatcher = Arc::new(self);
let dispatcher_clone = dispatcher.clone();
let loop_future = futures::future::loop_fn((), move |_| {
let dispatcher_clone2 = dispatcher_clone.clone();
(f)(dispatcher_clone.clone()).map(move |_| {
return if dispatcher_clone2.should_terminate.load(Ordering::Acquire) {
futures::future::Loop::Break(())
} else {
futures::future::Loop::Continue(())
};
})
});
debug!("Calling tokio run");
tokio::run(loop_future);
}
}