1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
use crate::config::Configurable;
use crate::log::LogPlugin;
use crate::{config, plugin::Plugin};
use crate::{
    config::env,
    error::Result,
    plugin::{component::ComponentRef, PluginRef},
};
use anyhow::Context;
use dashmap::DashMap;
use std::any::Any;
use std::{
    any,
    collections::HashSet,
    future::Future,
    path::{Path, PathBuf},
    sync::Arc,
};
use toml::Table;

pub type Registry<T> = DashMap<String, T>;
pub type Scheduler = dyn FnOnce(Arc<App>) -> Box<dyn Future<Output = Result<String>> + Send>;

pub struct App {
    /// Component
    components: Registry<ComponentRef>,
}

pub struct AppBuilder {
    pub(crate) tracing_registry: tracing_subscriber::Registry,
    /// Plugin
    pub(crate) plugin_registry: Registry<PluginRef>,
    /// Component
    components: Registry<ComponentRef>,
    /// Path of config file
    pub(crate) config_path: PathBuf,
    /// Configuration read from `config_path`
    config: Table,
    /// task
    schedulers: Vec<Box<Scheduler>>,
}

impl App {
    #[allow(clippy::new_ret_no_self)]
    pub fn new() -> AppBuilder {
        AppBuilder::default()
    }

    /// Get the component of the specified type
    pub fn get_component<T>(&self) -> Option<Arc<T>>
    where
        T: Any + Send + Sync,
    {
        let component_name = std::any::type_name::<T>();
        let pair = self.components.get(component_name)?;
        let component_ref = pair.value().clone();
        component_ref.downcast::<T>()
    }

    pub fn get_components(&self) -> Vec<String> {
        self.components.iter().map(|e| e.key().clone()).collect()
    }
}

unsafe impl Send for AppBuilder {}
unsafe impl Sync for AppBuilder {}

impl AppBuilder {
    /// add plugin
    pub fn add_plugin<T: Plugin>(&mut self, plugin: T) -> &mut Self {
        log::debug!("added plugin: {}", plugin.name());
        let plugin_name = plugin.name().to_string();
        if self.plugin_registry.contains_key(plugin.name()) {
            panic!("Error adding plugin {plugin_name}: plugin was already added in application")
        }
        self.plugin_registry
            .insert(plugin_name, PluginRef::new(plugin));
        self
    }

    /// Returns `true` if the [`Plugin`] has already been added.
    pub fn is_plugin_added<T: Plugin>(&self) -> bool {
        self.plugin_registry.contains_key(any::type_name::<T>())
    }

    /// The path of the configuration file, default is `./config/app.toml`.
    /// The application automatically reads the environment configuration file
    /// in the same directory according to the `spring_ENV` environment variable,
    /// such as `./config/app-dev.toml`.
    /// The environment configuration file has a higher priority and will
    /// overwrite the configuration items of the main configuration file.
    ///
    /// For specific supported environments, see the [Env](./config/env) enum.
    pub fn config_file(&mut self, config_path: &str) -> &mut Self {
        self.config_path = Path::new(config_path).to_path_buf();
        self
    }

    /// Get the configuration items of the plugin according to the plugin's `config_prefix`
    pub fn get_config<T>(&self, plugin: &impl Configurable) -> Result<T>
    where
        T: serde::de::DeserializeOwned,
    {
        let prefix = plugin.config_prefix();
        let table = match self.config.get(prefix) {
            Some(toml::Value::Table(table)) => table.to_owned(),
            _ => Table::new(),
        };
        Ok(T::deserialize(table.to_owned()).with_context(|| {
            format!(
                "Failed to deserialize the configuration of prefix \"{}\"",
                prefix
            )
        })?)
    }

    /// Add component to the registry
    pub fn add_component<T>(&mut self, component: T) -> &mut Self
    where
        T: any::Any + Send + Sync,
    {
        let component_name = std::any::type_name::<T>();
        log::debug!("added component: {}", component_name);
        if self.components.contains_key(component_name) {
            panic!("Error adding component {component_name}: component was already added in application")
        }
        let component_name = component_name.to_string();
        self.components
            .insert(component_name, ComponentRef::new(component));
        self
    }

    /// Get the component of the specified type
    pub fn get_component<T>(&self) -> Option<Arc<T>>
    where
        T: Any + Send + Sync,
    {
        let component_name = std::any::type_name::<T>();
        let pair = self.components.get(component_name)?;
        let component_ref = pair.value().clone();
        component_ref.downcast::<T>()
    }

    /// Add a scheduled task
    pub fn add_scheduler<T>(&mut self, scheduler: T) -> &mut Self
    where
        T: FnOnce(Arc<App>) -> Box<dyn Future<Output = Result<String>> + Send> + 'static,
    {
        self.schedulers.push(Box::new(scheduler));
        self
    }

    /// Running
    pub async fn run(&mut self) {
        match self.inner_run().await {
            Err(e) => {
                log::error!("{:?}", e);
            }
            Ok(_app) => {}
        }
    }

    async fn inner_run(&mut self) -> Result<Arc<App>> {
        // 1. read env variable
        let env = env::init()?;

        // 2. load yaml config
        self.config = config::load_config(self, env)?;

        // 3. load LogPlugin
        LogPlugin.build(self);

        // 4. build plugin
        self.build_plugins().await;

        // 5. schedule
        self.schedule().await
    }

    async fn build_plugins(&mut self) {
        let registry = std::mem::take(&mut self.plugin_registry);
        let mut to_register = registry
            .iter()
            .map(|e| e.value().to_owned())
            .collect::<Vec<_>>();
        let mut registered: HashSet<String> = HashSet::new();

        while !to_register.is_empty() {
            let mut progress = false;
            let mut next_round = vec![];

            for plugin in to_register {
                let deps = plugin.dependencies();
                if deps.iter().all(|dep| registered.contains(*dep)) {
                    plugin.build(self).await;
                    registered.insert(plugin.name().to_string());
                    log::info!("{} plugin registered", plugin.name());
                    progress = true;
                } else {
                    next_round.push(plugin);
                }
            }

            if !progress {
                panic!("Cyclic dependency detected or missing dependencies for some plugins");
            }

            to_register = next_round;
        }
        self.plugin_registry = registry;
    }

    async fn schedule(&mut self) -> Result<Arc<App>> {
        let app = self.build_app();
        while let Some(task) = self.schedulers.pop() {
            let poll_future = task(app.clone());
            let poll_future = Box::into_pin(poll_future);
            match tokio::spawn(poll_future).await? {
                Err(e) => log::error!("{}", e),
                Ok(msg) => log::info!("scheduled result: {}", msg),
            }
        }
        Ok(app)
    }

    fn build_app(&mut self) -> Arc<App> {
        let components = std::mem::take(&mut self.components);
        Arc::new(App { components })
    }
}

impl Default for AppBuilder {
    fn default() -> Self {
        Self {
            tracing_registry: tracing_subscriber::registry(),
            plugin_registry: Default::default(),
            config_path: Path::new("./config/app.toml").to_path_buf(),
            config: Default::default(),
            components: Default::default(),
            schedulers: Default::default(),
        }
    }
}