modular_rs/core/
modules_registry.rs

1use crate::core::error::ModuleError;
2use crate::core::module::{Module, ModuleService};
3use crate::core::req::ModuleRequest;
4use crate::core::response::ModuleResponse;
5use parking_lot::RwLock;
6use std::collections::hash_map::Entry;
7use std::collections::HashMap;
8use std::sync::Arc;
9use tokio::sync::Mutex;
10use tower::Service;
11
12pub(crate) type BoxModuleService<Req, Resp> =
13    tower::util::BoxService<ModuleRequest<Req>, ModuleResponse<Resp>, ModuleError>;
14
15#[derive(thiserror::Error, Debug)]
16pub enum RegistryError {
17    #[error("module already exists")]
18    AlreadyExists,
19}
20
21pub struct ModulesRegistry<Req, Resp> {
22    modules: RwLock<HashMap<String, Arc<Mutex<BoxModuleService<Req, Resp>>>>>,
23}
24
25impl<Req, Resp> Default for ModulesRegistry<Req, Resp> {
26    fn default() -> Self {
27        Self {
28            modules: Default::default(),
29        }
30    }
31}
32
33impl<Request, Response> ModulesRegistry<Request, Response>
34where
35    Request: Send + Sync + 'static,
36    Response: Send + Sync + 'static,
37{
38    pub fn register<S, Req>(&self, name: &str, svc: S) -> Result<(), RegistryError>
39    where
40        S: Service<Req> + Send + 'static,
41        Req: From<ModuleRequest<Request>> + Send + 'static,
42        S::Response: Into<ModuleResponse<Response>> + Send + 'static,
43        S::Error: Into<ModuleError> + Send + 'static,
44        S::Future: Send + Sync + 'static,
45    {
46        let mut modules = self.modules.write();
47        let svc = BoxModuleService::new(ModuleService(svc, Default::default()));
48
49        match modules.entry(name.to_string()) {
50            Entry::Occupied(_) => {
51                return Err(RegistryError::AlreadyExists);
52            }
53            Entry::Vacant(entry) => {
54                entry.insert(Arc::new(Mutex::new(svc)));
55            }
56        }
57
58        Ok(())
59    }
60
61    pub fn register_or_replace<S, Req>(&self, name: &str, svc: S)
62    where
63        S: Service<Req> + Send + 'static,
64        Req: From<ModuleRequest<Request>> + Send + 'static,
65        S::Response: Into<ModuleResponse<Response>> + Send + 'static,
66        S::Error: Into<ModuleError> + Send + 'static,
67        S::Future: Send + Sync + 'static,
68    {
69        let svc = BoxModuleService::new(ModuleService(svc, Default::default()));
70
71        let mut modules = self.modules.write();
72
73        let entry = modules.entry(name.to_string());
74
75        let mut existing = match entry {
76            Entry::Occupied(entry) => entry,
77            Entry::Vacant(entry) => {
78                entry.insert(Arc::new(Mutex::new(svc)));
79                return;
80            }
81        };
82
83        let existing = existing.get_mut();
84        debug_assert!(Arc::strong_count(existing) == 1);
85
86        *Arc::get_mut(existing).unwrap() = Mutex::new(svc)
87    }
88
89    pub fn get(&self, name: &str) -> Option<Module<Request, Response>> {
90        let modules = self.modules.read();
91        modules.get(name).map(|m| Module(Arc::downgrade(m)))
92    }
93
94    pub fn remove(&self, name: &str) {
95        let mut modules = self.modules.write();
96        modules.remove(name);
97    }
98}