modular_rs/core/
mod.rs

1use bytes::Bytes;
2use futures::Sink;
3use std::sync::Arc;
4use tower::Service;
5
6pub mod error;
7pub mod events;
8mod module;
9mod modules_registry;
10pub mod pattern;
11mod req;
12mod response;
13
14pub mod modules {
15    pub use super::module::*;
16    pub use super::modules_registry::*;
17    pub use super::req::*;
18    pub use super::response::*;
19}
20
21use crate::core::pattern::Pattern;
22use error::*;
23use modules::*;
24
25#[derive(Default)]
26pub struct Modular {
27    modules: Arc<ModulesRegistry<Bytes, Bytes>>,
28    events: Arc<events::EventsManager<Bytes>>,
29}
30
31impl Modular {
32    pub fn register_module<S, Request>(&self, name: &str, svc: S) -> Result<(), RegistryError>
33    where
34        S: Service<Request> + Send + 'static,
35        Request: From<ModuleRequest<Bytes>> + Send + 'static,
36        S::Response: Into<ModuleResponse<Bytes>> + Send + 'static,
37        S::Error: Into<ModuleError> + Send + 'static,
38        S::Future: Send + Sync + 'static,
39    {
40        self.modules.register(name, svc)?;
41
42        Ok(())
43    }
44
45    pub fn register_or_replace_module<S, Request>(&self, name: &str, svc: S)
46    where
47        S: Service<Request> + Send + 'static,
48        Request: From<ModuleRequest<Bytes>> + Send + 'static,
49        S::Response: Into<ModuleResponse<Bytes>> + Send + 'static,
50        S::Error: Into<ModuleError> + Send + 'static,
51        S::Future: Send + Sync + 'static,
52    {
53        self.modules.register_or_replace(name, svc);
54    }
55
56    pub fn remove_module(&self, name: &str) {
57        self.modules.remove(name);
58    }
59
60    pub fn get_module(&self, name: &str) -> Option<Module<Bytes, Bytes>> {
61        self.modules.get(name)
62    }
63
64    pub fn subscribe<S, Err>(&self, name: &str, sink: S) -> Result<(), SubscribeError>
65    where
66        S: Sink<(String, Bytes), Error = Err> + Send + Sync + 'static,
67    {
68        let pattern = Pattern::parse(name).map_err(SubscribeError::InvalidPattern)?;
69        self.events.subscribe(pattern, sink);
70
71        Ok(())
72    }
73
74    pub fn publish_event<E: Into<Bytes>>(&self, path: &str, event: E) {
75        if path.starts_with("$.sys.") {
76            return;
77        }
78
79        self.publish_event_inner(path, event.into());
80    }
81
82    fn publish_event_inner(&self, path: &str, event: Bytes) {
83        self.events.publish(path, event);
84    }
85}