1use bytes::Bytes;
2use futures::Sink;
3use std::sync::Arc;
4use tower::Service;
5
6pub mod error;
7pub mod events;
8mod module;
9mod modules_registry;
10pub mod pattern;
11mod req;
12mod response;
13
14pub mod modules {
15 pub use super::module::*;
16 pub use super::modules_registry::*;
17 pub use super::req::*;
18 pub use super::response::*;
19}
20
21use crate::core::pattern::Pattern;
22use error::*;
23use modules::*;
24
25#[derive(Default)]
26pub struct Modular {
27 modules: Arc<ModulesRegistry<Bytes, Bytes>>,
28 events: Arc<events::EventsManager<Bytes>>,
29}
30
31impl Modular {
32 pub fn register_module<S, Request>(&self, name: &str, svc: S) -> Result<(), RegistryError>
33 where
34 S: Service<Request> + Send + 'static,
35 Request: From<ModuleRequest<Bytes>> + Send + 'static,
36 S::Response: Into<ModuleResponse<Bytes>> + Send + 'static,
37 S::Error: Into<ModuleError> + Send + 'static,
38 S::Future: Send + Sync + 'static,
39 {
40 self.modules.register(name, svc)?;
41
42 Ok(())
43 }
44
45 pub fn register_or_replace_module<S, Request>(&self, name: &str, svc: S)
46 where
47 S: Service<Request> + Send + 'static,
48 Request: From<ModuleRequest<Bytes>> + Send + 'static,
49 S::Response: Into<ModuleResponse<Bytes>> + Send + 'static,
50 S::Error: Into<ModuleError> + Send + 'static,
51 S::Future: Send + Sync + 'static,
52 {
53 self.modules.register_or_replace(name, svc);
54 }
55
56 pub fn remove_module(&self, name: &str) {
57 self.modules.remove(name);
58 }
59
60 pub fn get_module(&self, name: &str) -> Option<Module<Bytes, Bytes>> {
61 self.modules.get(name)
62 }
63
64 pub fn subscribe<S, Err>(&self, name: &str, sink: S) -> Result<(), SubscribeError>
65 where
66 S: Sink<(String, Bytes), Error = Err> + Send + Sync + 'static,
67 {
68 let pattern = Pattern::parse(name).map_err(SubscribeError::InvalidPattern)?;
69 self.events.subscribe(pattern, sink);
70
71 Ok(())
72 }
73
74 pub fn publish_event<E: Into<Bytes>>(&self, path: &str, event: E) {
75 if path.starts_with("$.sys.") {
76 return;
77 }
78
79 self.publish_event_inner(path, event.into());
80 }
81
82 fn publish_event_inner(&self, path: &str, event: Bytes) {
83 self.events.publish(path, event);
84 }
85}