use super::{Route, Shutdown};
use rand::distributions::Uniform;
use std::{any::TypeId, collections::HashMap};
use tokio::sync::RwLock;
pub type Depth = usize;
pub type ScopeId = usize;
pub type PartitionedScopes = RwLock<HashMap<ScopeId, Scope>>;
lazy_static::lazy_static! {
pub static ref PROMETHEUS_REGISTRY: prometheus::Registry = {
prometheus::Registry::new_custom(Some("Overclock".into()), None).expect("PROMETHEUS_REGISTRY to be created")
};
pub static ref OVERCLOCK_PARTITIONS: usize = {
if let Ok(p) = std::env::var("OVERCLOCK_PARTITIONS") {
p.parse().expect("Invalid OVERCLOCK_PARTITIONS env")
} else {
num_cpus::get() * 10
}
};
pub static ref SCOPE_ID_RANGE: Uniform<usize> = Uniform::from(100usize..usize::MAX);
pub static ref SCOPES: Vec<PartitionedScopes> = {
let mut scopes = Vec::with_capacity(*OVERCLOCK_PARTITIONS);
for _ in 0..*OVERCLOCK_PARTITIONS {
scopes.push(RwLock::default());
}
scopes
};
pub static ref VISIBLE_DATA: RwLock<HashMap<TypeId, Vec<(Depth,ScopeId)>>> = {
let data_map = HashMap::new();
RwLock::new(data_map)
};
}
pub trait Resource: Clone + Send + Sync + 'static {}
impl<T> Resource for T where T: Clone + Send + Sync + 'static {}
pub struct Data<T: Resource> {
pub(crate) resource: Option<T>,
pub(crate) subscribers: HashMap<ScopeId, Subscriber<T>>,
}
impl<T: Resource> Data<T> {
pub fn with_resource(resource: T) -> Self {
Self {
resource: Some(resource),
subscribers: HashMap::new(),
}
}
pub fn with_subscriber(scope_id: ScopeId, subscriber: Subscriber<T>) -> Self {
let mut subscribers = HashMap::new();
subscribers.insert(scope_id, subscriber);
Self {
resource: None,
subscribers,
}
}
}
pub enum Subscriber<T: Resource> {
OneCopy(tokio::sync::oneshot::Sender<anyhow::Result<T>>),
LinkedCopy(
Option<tokio::sync::oneshot::Sender<anyhow::Result<T>>>,
Box<dyn Shutdown>,
bool,
),
DynCopy(super::ResourceRef, Box<dyn Route<super::Event<T>>>),
}
impl<T: Resource> Subscriber<T> {
pub fn one_copy(one_shot: tokio::sync::oneshot::Sender<anyhow::Result<T>>) -> Self {
Self::OneCopy(one_shot)
}
pub fn linked_copy(
one_shot: tokio::sync::oneshot::Sender<anyhow::Result<T>>,
shutdown_handle: Box<dyn Shutdown>,
hard_link: bool,
) -> Self {
Self::LinkedCopy(Some(one_shot), shutdown_handle, hard_link)
}
pub fn dyn_copy(res_ref: super::ResourceRef, boxed_route: Box<dyn Route<super::Event<T>>>) -> Self {
Self::DynCopy(res_ref, boxed_route)
}
}
pub(crate) struct Cleanup<T: Resource> {
_marker: std::marker::PhantomData<T>,
resource_scope_id: ScopeId,
}
impl<T: Resource> Cleanup<T> {
pub(crate) fn new(resource_scope_id: ScopeId) -> Self {
Self {
_marker: std::marker::PhantomData::<T>,
resource_scope_id,
}
}
}
#[async_trait::async_trait]
pub trait CleanupFromOther: Send + Sync {
async fn cleanup_from_other(self: Box<Self>, my_scope_id: ScopeId);
}
#[async_trait::async_trait]
impl<T: Resource> CleanupFromOther for Cleanup<T> {
async fn cleanup_from_other(self: Box<Self>, my_scope_id: ScopeId) {
let resource_scopes_index = self.resource_scope_id % *OVERCLOCK_PARTITIONS;
let mut lock = SCOPES[resource_scopes_index].write().await;
if let Some(resource_scope) = lock.get_mut(&self.resource_scope_id) {
if let Some(data) = resource_scope.data_and_subscribers.get_mut::<Data<T>>() {
data.subscribers.remove(&my_scope_id);
}
};
drop(lock);
}
}
#[async_trait::async_trait]
pub trait CleanupSelf: Send + Sync {
async fn cleanup_self(
self: Box<Self>,
publisher_scope_id: ScopeId,
data_and_subscribers: &mut anymap::Map<dyn core::any::Any + Send + Sync>,
);
}
pub struct CleanupData<T: Resource> {
_marker: std::marker::PhantomData<T>,
}
impl<T: Resource> CleanupData<T> {
pub(crate) fn new() -> Self {
Self {
_marker: std::marker::PhantomData::<T>,
}
}
}
#[async_trait::async_trait]
impl<T: Resource> CleanupSelf for CleanupData<T> {
async fn cleanup_self(
self: Box<Self>,
publisher_scope_id: ScopeId,
data_and_subscribers: &mut anymap::Map<dyn std::any::Any + Send + Sync>,
) {
if let Some(mut data) = data_and_subscribers.remove::<Data<T>>() {
for (_sub_scope_id, subscriber) in data.subscribers.drain() {
match subscriber {
Subscriber::OneCopy(one_sender) => {
one_sender.send(Err(anyhow::Error::msg("Cleanup"))).ok();
}
Subscriber::LinkedCopy(mut one_sender_opt, shutdown_handle, _) => {
if let Some(one_sender) = one_sender_opt.take() {
one_sender.send(Err(anyhow::Error::msg("Cleanup"))).ok();
}
shutdown_handle.shutdown().await;
}
Subscriber::DynCopy(res_ref, route) => {
let dropped = super::Event::Dropped(publisher_scope_id, res_ref);
route.send_msg(dropped).await.ok();
}
}
}
}
}
}
pub struct Scope {
pub(crate) parent_id: Option<ScopeId>,
pub(crate) shutdown_handle: Box<dyn Shutdown>,
pub(crate) cleanup: HashMap<(TypeId, ScopeId), Box<dyn CleanupFromOther>>,
pub(crate) cleanup_data: HashMap<TypeId, Box<dyn CleanupSelf>>,
pub(crate) data_and_subscribers: anymap::Map<dyn core::any::Any + Send + Sync>,
pub(crate) active_directories: HashMap<String, ScopeId>,
pub(crate) router: anymap::Map<dyn core::any::Any + Send + Sync>,
}
impl Scope {
pub fn new(parent_id: Option<ScopeId>, shutdown_handle: Box<dyn Shutdown>) -> Self {
Self {
parent_id,
shutdown_handle,
cleanup: HashMap::new(),
cleanup_data: HashMap::new(),
data_and_subscribers: anymap::Map::new(),
active_directories: HashMap::new(),
router: anymap::Map::new(),
}
}
pub async fn lookup<T: Resource>(resource_scope_id: ScopeId) -> Option<T> {
let resource_scopes_index = resource_scope_id % *OVERCLOCK_PARTITIONS;
let mut lock = SCOPES[resource_scopes_index].write().await;
if let Some(resource_scope) = lock.get_mut(&resource_scope_id) {
if let Some(data) = resource_scope.data_and_subscribers.get::<Data<T>>() {
data.resource.clone()
} else {
None
}
} else {
None
}
}
}