1use crate::banner;
2use crate::config::env::Env;
3use crate::config::toml::TomlConfigRegistry;
4use crate::config::ConfigRegistry;
5use crate::log::{BoxLayer, LogPlugin};
6use crate::plugin::component::ComponentRef;
7use crate::plugin::{service, ComponentRegistry, MutableComponentRegistry, Plugin};
8use crate::{
9 error::Result,
10 plugin::{component::DynComponentRef, PluginRef},
11};
12use dashmap::DashMap;
13use once_cell::sync::Lazy;
14use std::any::{Any, TypeId};
15use std::str::FromStr;
16use std::sync::RwLock;
17use std::{collections::HashSet, future::Future, path::Path, sync::Arc};
18use tracing_subscriber::Layer;
19
20type Registry<T> = DashMap<TypeId, T>;
21type Scheduler<T> = dyn FnOnce(Arc<App>) -> Box<dyn Future<Output = Result<T>> + Send>;
22
23#[derive(Default)]
25pub struct App {
26 env: Env,
27 components: Registry<DynComponentRef>,
29 config: TomlConfigRegistry,
30}
31
32pub struct AppBuilder {
38 pub(crate) env: Env,
39 pub(crate) layers: Vec<BoxLayer>,
41 pub(crate) plugin_registry: Registry<PluginRef>,
43 components: Registry<DynComponentRef>,
45 config: TomlConfigRegistry,
47 schedulers: Vec<Box<Scheduler<String>>>,
49 shutdown_hooks: Vec<Box<Scheduler<String>>>,
50}
51
52impl App {
53 #[allow(clippy::new_ret_no_self)]
55 pub fn new() -> AppBuilder {
56 AppBuilder::default()
57 }
58
59 pub fn get_env(&self) -> Env {
62 self.env
63 }
64
65 pub fn global() -> Arc<App> {
71 GLOBAL_APP
72 .read()
73 .expect("GLOBAL_APP RwLock poisoned")
74 .clone()
75 }
76
77 fn set_global(app: Arc<App>) {
78 let mut global_app = GLOBAL_APP.write().expect("GLOBAL_APP RwLock poisoned");
79 *global_app = app;
80 }
81}
82
83static GLOBAL_APP: Lazy<RwLock<Arc<App>>> = Lazy::new(|| RwLock::new(Arc::new(App::default())));
84
85unsafe impl Send for AppBuilder {}
86unsafe impl Sync for AppBuilder {}
87
88impl AppBuilder {
89 #[inline]
92 pub fn get_env(&self) -> Env {
93 self.env
94 }
95
96 pub fn add_plugin<T: Plugin>(&mut self, plugin: T) -> &mut Self {
98 log::debug!("added plugin: {}", plugin.name());
99 if plugin.immediately() {
100 plugin.immediately_build(self);
101 return self;
102 }
103 let plugin_id = TypeId::of::<T>();
104 if self.plugin_registry.contains_key(&plugin_id) {
105 let plugin_name = plugin.name();
106 panic!("Error adding plugin {plugin_name}: plugin was already added in application")
107 }
108 self.plugin_registry
109 .insert(plugin_id, PluginRef::new(plugin));
110 self
111 }
112
113 #[inline]
115 pub fn is_plugin_added<T: Plugin>(&self) -> bool {
116 self.plugin_registry.contains_key(&TypeId::of::<T>())
117 }
118
119 pub fn use_config_file(&mut self, config_path: &str) -> &mut Self {
128 self.config = TomlConfigRegistry::new(Path::new(config_path), self.env)
129 .expect("config file load failed");
130 self
131 }
132
133 pub fn use_config_str(&mut self, toml_content: &str) -> &mut Self {
138 self.config =
139 TomlConfigRegistry::from_str(toml_content).expect("config content parse failed");
140 self
141 }
142
143 pub fn add_layer<L>(&mut self, layer: L) -> &mut Self
145 where
146 L: Layer<tracing_subscriber::Registry> + Send + Sync + 'static,
147 {
148 self.layers.push(Box::new(layer));
149 self
150 }
151
152 pub fn add_scheduler<T>(&mut self, scheduler: T) -> &mut Self
154 where
155 T: FnOnce(Arc<App>) -> Box<dyn Future<Output = Result<String>> + Send> + 'static,
156 {
157 self.schedulers.push(Box::new(scheduler));
158 self
159 }
160
161 pub fn add_shutdown_hook<T>(&mut self, hook: T) -> &mut Self
163 where
164 T: FnOnce(Arc<App>) -> Box<dyn Future<Output = Result<String>> + Send> + 'static,
165 {
166 self.shutdown_hooks.push(Box::new(hook));
167 self
168 }
169
170 pub async fn run(&mut self) {
177 match self.inner_run().await {
178 Err(e) => {
179 log::error!("{:?}", e);
180 }
181 _ => { }
182 }
183 }
184
185 async fn inner_run(&mut self) -> Result<()> {
186 banner::print_banner(self);
188
189 self.build_plugins().await;
191
192 service::auto_inject_service(self)?;
194
195 self.schedule().await
197 }
198
199 pub async fn build(&mut self) -> Result<Arc<App>> {
202 self.build_plugins().await;
204
205 service::auto_inject_service(self)?;
207
208 Ok(self.build_app())
209 }
210
211 async fn build_plugins(&mut self) {
212 LogPlugin.immediately_build(self);
213
214 let registry = std::mem::take(&mut self.plugin_registry);
215 let mut to_register = registry
216 .iter()
217 .map(|e| e.value().to_owned())
218 .collect::<Vec<_>>();
219 let mut registered: HashSet<String> = HashSet::new();
220
221 while !to_register.is_empty() {
222 let mut progress = false;
223 let mut next_round = vec![];
224
225 for plugin in to_register {
226 let deps = plugin.dependencies();
227 if deps.iter().all(|dep| registered.contains(*dep)) {
228 plugin.build(self).await;
229 registered.insert(plugin.name().to_string());
230 log::info!("{} plugin registered", plugin.name());
231 progress = true;
232 } else {
233 next_round.push(plugin);
234 }
235 }
236
237 if !progress {
238 panic!("Cyclic dependency detected or missing dependencies for some plugins");
239 }
240
241 to_register = next_round;
242 }
243 self.plugin_registry = registry;
244 }
245
246 async fn schedule(&mut self) -> Result<()> {
247 let app = self.build_app();
248
249 let schedulers = std::mem::take(&mut self.schedulers);
250 let mut handles = vec![];
251 for task in schedulers {
252 let poll_future = task(app.clone());
253 let poll_future = Box::into_pin(poll_future);
254 handles.push(tokio::spawn(poll_future));
255 }
256
257 while let Some(handle) = handles.pop() {
258 match handle.await? {
259 Err(e) => log::error!("{:?}", e),
260 Ok(msg) => log::info!("scheduled result: {}", msg),
261 }
262 }
263
264 while let Some(hook) = self.shutdown_hooks.pop() {
266 let result = Box::into_pin(hook(app.clone())).await?;
267 log::info!("shutdown result: {result}");
268 }
269 Ok(())
270 }
271
272 fn build_app(&mut self) -> Arc<App> {
273 let components = std::mem::take(&mut self.components);
274 let config = std::mem::take(&mut self.config);
275 let app = Arc::new(App {
276 env: self.env,
277 components,
278 config,
279 });
280 App::set_global(app.clone());
281 app
282 }
283}
284
285impl Default for AppBuilder {
286 fn default() -> Self {
287 let env = Env::init();
288 let config = TomlConfigRegistry::new(Path::new("./config/app.toml"), env)
289 .expect("toml config load failed");
290 Self {
291 env,
292 config,
293 layers: Default::default(),
294 plugin_registry: Default::default(),
295 components: Default::default(),
296 schedulers: Default::default(),
297 shutdown_hooks: Default::default(),
298 }
299 }
300}
301
302impl ConfigRegistry for App {
303 fn get_config<T>(&self) -> Result<T>
304 where
305 T: serde::de::DeserializeOwned + crate::config::Configurable,
306 {
307 self.config.get_config::<T>()
308 }
309}
310
311impl ConfigRegistry for AppBuilder {
312 fn get_config<T>(&self) -> Result<T>
313 where
314 T: serde::de::DeserializeOwned + crate::config::Configurable,
315 {
316 self.config.get_config::<T>()
317 }
318}
319
320macro_rules! impl_component_registry {
321 ($ty:ident) => {
322 impl ComponentRegistry for $ty {
323 fn get_component_ref<T>(&self) -> Option<ComponentRef<T>>
324 where
325 T: Any + Send + Sync,
326 {
327 let component_id = TypeId::of::<T>();
328 let pair = self.components.get(&component_id)?;
329 let component_ref = pair.value().clone();
330 component_ref.downcast::<T>()
331 }
332
333 fn get_component<T>(&self) -> Option<T>
334 where
335 T: Clone + Send + Sync + 'static,
336 {
337 let component_ref = self.get_component_ref();
338 component_ref.map(|arc| T::clone(&arc))
339 }
340
341 fn has_component<T>(&self) -> bool
342 where
343 T: Any + Send + Sync,
344 {
345 let component_id = TypeId::of::<T>();
346 self.components.contains_key(&component_id)
347 }
348 }
349 };
350}
351
352impl_component_registry!(App);
353impl_component_registry!(AppBuilder);
354
355impl MutableComponentRegistry for AppBuilder {
356 fn add_component<C>(&mut self, component: C) -> &mut Self
358 where
359 C: Clone + Any + Send + Sync,
360 {
361 let component_id = TypeId::of::<C>();
362 let component_name = std::any::type_name::<C>();
363 log::debug!("added component: {}", component_name);
364 if self.components.contains_key(&component_id) {
365 panic!("Error adding component {component_name}: component was already added in application")
366 }
367 self.components
368 .insert(component_id, DynComponentRef::new(component));
369 self
370 }
371}
372
373#[cfg(test)]
374mod tests {
375 use crate::plugin::{ComponentRegistry, MutableComponentRegistry};
376 use crate::App;
377
378 #[tokio::test]
379 async fn test_component_registry() {
380 #[derive(Clone)]
381 struct UnitComponent;
382
383 #[derive(Clone)]
384 struct TupleComponent(i32, i32);
385
386 #[derive(Clone)]
387 struct StructComponent {
388 x: i32,
389 y: i32,
390 }
391
392 #[derive(Clone)]
393 struct Point<T> {
394 x: T,
395 y: T,
396 }
397
398 let app = App::new()
399 .add_component(UnitComponent)
400 .add_component(TupleComponent(1, 2))
401 .add_component(StructComponent { x: 3, y: 4 })
402 .add_component(Point { x: 5i64, y: 6i64 })
403 .build()
404 .await;
405 let app = app.expect("app build failed");
406
407 let _ = app.get_expect_component::<UnitComponent>();
408 let t = app.get_expect_component::<TupleComponent>();
409 assert_eq!(t.0, 1);
410 assert_eq!(t.1, 2);
411 let s = app.get_expect_component::<StructComponent>();
412 assert_eq!(s.x, 3);
413 assert_eq!(s.y, 4);
414 let p = app.get_expect_component::<Point<i64>>();
415 assert_eq!(p.x, 5);
416 assert_eq!(p.y, 6);
417
418 let p = app.get_component::<Point<i32>>();
419 assert!(p.is_none())
420 }
421}