spring_boot/
app.rs

1use crate::config::Configurable;
2use crate::log::LogPlugin;
3use crate::{config, plugin::Plugin};
4use crate::{
5    config::env,
6    error::Result,
7    plugin::{component::ComponentRef, PluginRef},
8};
9use anyhow::Context;
10use dashmap::DashMap;
11use std::any::Any;
12use std::{
13    any,
14    collections::HashSet,
15    future::Future,
16    path::{Path, PathBuf},
17    sync::Arc,
18};
19use toml::Table;
20
21pub type Registry<T> = DashMap<String, T>;
22pub type Scheduler = dyn FnOnce(Arc<App>) -> Box<dyn Future<Output = Result<String>> + Send>;
23
24pub struct App {
25    /// Component
26    components: Registry<ComponentRef>,
27}
28
29pub struct AppBuilder {
30    pub(crate) tracing_registry: tracing_subscriber::Registry,
31    /// Plugin
32    pub(crate) plugin_registry: Registry<PluginRef>,
33    /// Component
34    components: Registry<ComponentRef>,
35    /// Path of config file
36    pub(crate) config_path: PathBuf,
37    /// Configuration read from `config_path`
38    config: Table,
39    /// task
40    schedulers: Vec<Box<Scheduler>>,
41}
42
43impl App {
44    #[allow(clippy::new_ret_no_self)]
45    pub fn new() -> AppBuilder {
46        AppBuilder::default()
47    }
48
49    /// Get the component of the specified type
50    pub fn get_component<T>(&self) -> Option<Arc<T>>
51    where
52        T: Any + Send + Sync,
53    {
54        let component_name = std::any::type_name::<T>();
55        let pair = self.components.get(component_name)?;
56        let component_ref = pair.value().clone();
57        component_ref.downcast::<T>()
58    }
59
60    pub fn get_components(&self) -> Vec<String> {
61        self.components.iter().map(|e| e.key().clone()).collect()
62    }
63}
64
65unsafe impl Send for AppBuilder {}
66unsafe impl Sync for AppBuilder {}
67
68impl AppBuilder {
69    /// add plugin
70    pub fn add_plugin<T: Plugin>(&mut self, plugin: T) -> &mut Self {
71        log::debug!("added plugin: {}", plugin.name());
72        let plugin_name = plugin.name().to_string();
73        if self.plugin_registry.contains_key(plugin.name()) {
74            panic!("Error adding plugin {plugin_name}: plugin was already added in application")
75        }
76        self.plugin_registry
77            .insert(plugin_name, PluginRef::new(plugin));
78        self
79    }
80
81    /// Returns `true` if the [`Plugin`] has already been added.
82    pub fn is_plugin_added<T: Plugin>(&self) -> bool {
83        self.plugin_registry.contains_key(any::type_name::<T>())
84    }
85
86    /// The path of the configuration file, default is `./config/app.toml`.
87    /// The application automatically reads the environment configuration file
88    /// in the same directory according to the `spring_ENV` environment variable,
89    /// such as `./config/app-dev.toml`.
90    /// The environment configuration file has a higher priority and will
91    /// overwrite the configuration items of the main configuration file.
92    ///
93    /// For specific supported environments, see the [Env](./config/env) enum.
94    pub fn config_file(&mut self, config_path: &str) -> &mut Self {
95        self.config_path = Path::new(config_path).to_path_buf();
96        self
97    }
98
99    /// Get the configuration items of the plugin according to the plugin's `config_prefix`
100    pub fn get_config<T>(&self, plugin: &impl Configurable) -> Result<T>
101    where
102        T: serde::de::DeserializeOwned,
103    {
104        let prefix = plugin.config_prefix();
105        let table = match self.config.get(prefix) {
106            Some(toml::Value::Table(table)) => table.to_owned(),
107            _ => Table::new(),
108        };
109        Ok(T::deserialize(table.to_owned()).with_context(|| {
110            format!(
111                "Failed to deserialize the configuration of prefix \"{}\"",
112                prefix
113            )
114        })?)
115    }
116
117    /// Add component to the registry
118    pub fn add_component<T>(&mut self, component: T) -> &mut Self
119    where
120        T: Clone + any::Any + Send + Sync,
121    {
122        let component_name = std::any::type_name::<T>();
123        log::debug!("added component: {}", component_name);
124        if self.components.contains_key(component_name) {
125            panic!("Error adding component {component_name}: component was already added in application")
126        }
127        let component_name = component_name.to_string();
128        self.components
129            .insert(component_name, ComponentRef::new(component));
130        self
131    }
132
133    /// Get the component of the specified type
134    pub fn get_component<T>(&self) -> Option<Arc<T>>
135    where
136        T: Any + Send + Sync,
137    {
138        let component_name = std::any::type_name::<T>();
139        let pair = self.components.get(component_name)?;
140        let component_ref = pair.value().clone();
141        component_ref.downcast::<T>()
142    }
143
144    /// Add a scheduled task
145    pub fn add_scheduler<T>(&mut self, scheduler: T) -> &mut Self
146    where
147        T: FnOnce(Arc<App>) -> Box<dyn Future<Output = Result<String>> + Send> + 'static,
148    {
149        self.schedulers.push(Box::new(scheduler));
150        self
151    }
152
153    /// Running
154    pub async fn run(&mut self) {
155        match self.inner_run().await {
156            Err(e) => {
157                log::error!("{:?}", e);
158            }
159            Ok(_app) => {}
160        }
161    }
162
163    async fn inner_run(&mut self) -> Result<Arc<App>> {
164        // 1. read env variable
165        let env = env::init()?;
166
167        // 2. load yaml config
168        self.config = self.load_config(env)?;
169
170        // 3. build plugin
171        self.build_plugins().await;
172
173        // 4. schedule
174        self.schedule().await
175    }
176
177    fn load_config(&mut self, env: env::Env) -> Result<Table> {
178        config::load_config(&self.config_path, env)
179    }
180
181    async fn build_plugins(&mut self) {
182        LogPlugin.build(self);
183
184        let registry = std::mem::take(&mut self.plugin_registry);
185        let mut to_register = registry
186            .iter()
187            .map(|e| e.value().to_owned())
188            .collect::<Vec<_>>();
189        let mut registered: HashSet<String> = HashSet::new();
190
191        while !to_register.is_empty() {
192            let mut progress = false;
193            let mut next_round = vec![];
194
195            for plugin in to_register {
196                let deps = plugin.dependencies();
197                if deps.iter().all(|dep| registered.contains(*dep)) {
198                    plugin.build(self).await;
199                    registered.insert(plugin.name().to_string());
200                    log::info!("{} plugin registered", plugin.name());
201                    progress = true;
202                } else {
203                    next_round.push(plugin);
204                }
205            }
206
207            if !progress {
208                panic!("Cyclic dependency detected or missing dependencies for some plugins");
209            }
210
211            to_register = next_round;
212        }
213        self.plugin_registry = registry;
214    }
215
216    async fn schedule(&mut self) -> Result<Arc<App>> {
217        let app = self.build_app();
218        while let Some(task) = self.schedulers.pop() {
219            let poll_future = task(app.clone());
220            let poll_future = Box::into_pin(poll_future);
221            match tokio::spawn(poll_future).await? {
222                Err(e) => log::error!("{}", e),
223                Ok(msg) => log::info!("scheduled result: {}", msg),
224            }
225        }
226        Ok(app)
227    }
228
229    fn build_app(&mut self) -> Arc<App> {
230        let components = std::mem::take(&mut self.components);
231        Arc::new(App { components })
232    }
233}
234
235impl Default for AppBuilder {
236    fn default() -> Self {
237        Self {
238            tracing_registry: tracing_subscriber::registry(),
239            plugin_registry: Default::default(),
240            config_path: Path::new("./config/app.toml").to_path_buf(),
241            config: Default::default(),
242            components: Default::default(),
243            schedulers: Default::default(),
244        }
245    }
246}