modular_rs/core/
modules_registry.rs1use crate::core::error::ModuleError;
2use crate::core::module::{Module, ModuleService};
3use crate::core::req::ModuleRequest;
4use crate::core::response::ModuleResponse;
5use parking_lot::RwLock;
6use std::collections::hash_map::Entry;
7use std::collections::HashMap;
8use std::sync::Arc;
9use tokio::sync::Mutex;
10use tower::Service;
11
12pub(crate) type BoxModuleService<Req, Resp> =
13 tower::util::BoxService<ModuleRequest<Req>, ModuleResponse<Resp>, ModuleError>;
14
15#[derive(thiserror::Error, Debug)]
16pub enum RegistryError {
17 #[error("module already exists")]
18 AlreadyExists,
19}
20
21pub struct ModulesRegistry<Req, Resp> {
22 modules: RwLock<HashMap<String, Arc<Mutex<BoxModuleService<Req, Resp>>>>>,
23}
24
25impl<Req, Resp> Default for ModulesRegistry<Req, Resp> {
26 fn default() -> Self {
27 Self {
28 modules: Default::default(),
29 }
30 }
31}
32
33impl<Request, Response> ModulesRegistry<Request, Response>
34where
35 Request: Send + Sync + 'static,
36 Response: Send + Sync + 'static,
37{
38 pub fn register<S, Req>(&self, name: &str, svc: S) -> Result<(), RegistryError>
39 where
40 S: Service<Req> + Send + 'static,
41 Req: From<ModuleRequest<Request>> + Send + 'static,
42 S::Response: Into<ModuleResponse<Response>> + Send + 'static,
43 S::Error: Into<ModuleError> + Send + 'static,
44 S::Future: Send + Sync + 'static,
45 {
46 let mut modules = self.modules.write();
47 let svc = BoxModuleService::new(ModuleService(svc, Default::default()));
48
49 match modules.entry(name.to_string()) {
50 Entry::Occupied(_) => {
51 return Err(RegistryError::AlreadyExists);
52 }
53 Entry::Vacant(entry) => {
54 entry.insert(Arc::new(Mutex::new(svc)));
55 }
56 }
57
58 Ok(())
59 }
60
61 pub fn register_or_replace<S, Req>(&self, name: &str, svc: S)
62 where
63 S: Service<Req> + Send + 'static,
64 Req: From<ModuleRequest<Request>> + Send + 'static,
65 S::Response: Into<ModuleResponse<Response>> + Send + 'static,
66 S::Error: Into<ModuleError> + Send + 'static,
67 S::Future: Send + Sync + 'static,
68 {
69 let svc = BoxModuleService::new(ModuleService(svc, Default::default()));
70
71 let mut modules = self.modules.write();
72
73 let entry = modules.entry(name.to_string());
74
75 let mut existing = match entry {
76 Entry::Occupied(entry) => entry,
77 Entry::Vacant(entry) => {
78 entry.insert(Arc::new(Mutex::new(svc)));
79 return;
80 }
81 };
82
83 let existing = existing.get_mut();
84 debug_assert!(Arc::strong_count(existing) == 1);
85
86 *Arc::get_mut(existing).unwrap() = Mutex::new(svc)
87 }
88
89 pub fn get(&self, name: &str) -> Option<Module<Request, Response>> {
90 let modules = self.modules.read();
91 modules.get(name).map(|m| Module(Arc::downgrade(m)))
92 }
93
94 pub fn remove(&self, name: &str) {
95 let mut modules = self.modules.write();
96 modules.remove(name);
97 }
98}