1use crate::config::Configurable;
2use crate::log::LogPlugin;
3use crate::{config, plugin::Plugin};
4use crate::{
5 config::env,
6 error::Result,
7 plugin::{component::ComponentRef, PluginRef},
8};
9use anyhow::Context;
10use dashmap::DashMap;
11use std::any::Any;
12use std::{
13 any,
14 collections::HashSet,
15 future::Future,
16 path::{Path, PathBuf},
17 sync::Arc,
18};
19use toml::Table;
20
21pub type Registry<T> = DashMap<String, T>;
22pub type Scheduler = dyn FnOnce(Arc<App>) -> Box<dyn Future<Output = Result<String>> + Send>;
23
24pub struct App {
25 components: Registry<ComponentRef>,
27}
28
29pub struct AppBuilder {
30 pub(crate) tracing_registry: tracing_subscriber::Registry,
31 pub(crate) plugin_registry: Registry<PluginRef>,
33 components: Registry<ComponentRef>,
35 pub(crate) config_path: PathBuf,
37 config: Table,
39 schedulers: Vec<Box<Scheduler>>,
41}
42
43impl App {
44 #[allow(clippy::new_ret_no_self)]
45 pub fn new() -> AppBuilder {
46 AppBuilder::default()
47 }
48
49 pub fn get_component<T>(&self) -> Option<Arc<T>>
51 where
52 T: Any + Send + Sync,
53 {
54 let component_name = std::any::type_name::<T>();
55 let pair = self.components.get(component_name)?;
56 let component_ref = pair.value().clone();
57 component_ref.downcast::<T>()
58 }
59
60 pub fn get_components(&self) -> Vec<String> {
61 self.components.iter().map(|e| e.key().clone()).collect()
62 }
63}
64
65unsafe impl Send for AppBuilder {}
66unsafe impl Sync for AppBuilder {}
67
68impl AppBuilder {
69 pub fn add_plugin<T: Plugin>(&mut self, plugin: T) -> &mut Self {
71 log::debug!("added plugin: {}", plugin.name());
72 let plugin_name = plugin.name().to_string();
73 if self.plugin_registry.contains_key(plugin.name()) {
74 panic!("Error adding plugin {plugin_name}: plugin was already added in application")
75 }
76 self.plugin_registry
77 .insert(plugin_name, PluginRef::new(plugin));
78 self
79 }
80
81 pub fn is_plugin_added<T: Plugin>(&self) -> bool {
83 self.plugin_registry.contains_key(any::type_name::<T>())
84 }
85
86 pub fn config_file(&mut self, config_path: &str) -> &mut Self {
95 self.config_path = Path::new(config_path).to_path_buf();
96 self
97 }
98
99 pub fn get_config<T>(&self, plugin: &impl Configurable) -> Result<T>
101 where
102 T: serde::de::DeserializeOwned,
103 {
104 let prefix = plugin.config_prefix();
105 let table = match self.config.get(prefix) {
106 Some(toml::Value::Table(table)) => table.to_owned(),
107 _ => Table::new(),
108 };
109 Ok(T::deserialize(table.to_owned()).with_context(|| {
110 format!(
111 "Failed to deserialize the configuration of prefix \"{}\"",
112 prefix
113 )
114 })?)
115 }
116
117 pub fn add_component<T>(&mut self, component: T) -> &mut Self
119 where
120 T: Clone + any::Any + Send + Sync,
121 {
122 let component_name = std::any::type_name::<T>();
123 log::debug!("added component: {}", component_name);
124 if self.components.contains_key(component_name) {
125 panic!("Error adding component {component_name}: component was already added in application")
126 }
127 let component_name = component_name.to_string();
128 self.components
129 .insert(component_name, ComponentRef::new(component));
130 self
131 }
132
133 pub fn get_component<T>(&self) -> Option<Arc<T>>
135 where
136 T: Any + Send + Sync,
137 {
138 let component_name = std::any::type_name::<T>();
139 let pair = self.components.get(component_name)?;
140 let component_ref = pair.value().clone();
141 component_ref.downcast::<T>()
142 }
143
144 pub fn add_scheduler<T>(&mut self, scheduler: T) -> &mut Self
146 where
147 T: FnOnce(Arc<App>) -> Box<dyn Future<Output = Result<String>> + Send> + 'static,
148 {
149 self.schedulers.push(Box::new(scheduler));
150 self
151 }
152
153 pub async fn run(&mut self) {
155 match self.inner_run().await {
156 Err(e) => {
157 log::error!("{:?}", e);
158 }
159 Ok(_app) => {}
160 }
161 }
162
163 async fn inner_run(&mut self) -> Result<Arc<App>> {
164 let env = env::init()?;
166
167 self.config = self.load_config(env)?;
169
170 self.build_plugins().await;
172
173 self.schedule().await
175 }
176
177 fn load_config(&mut self, env: env::Env) -> Result<Table> {
178 config::load_config(&self.config_path, env)
179 }
180
181 async fn build_plugins(&mut self) {
182 LogPlugin.build(self);
183
184 let registry = std::mem::take(&mut self.plugin_registry);
185 let mut to_register = registry
186 .iter()
187 .map(|e| e.value().to_owned())
188 .collect::<Vec<_>>();
189 let mut registered: HashSet<String> = HashSet::new();
190
191 while !to_register.is_empty() {
192 let mut progress = false;
193 let mut next_round = vec![];
194
195 for plugin in to_register {
196 let deps = plugin.dependencies();
197 if deps.iter().all(|dep| registered.contains(*dep)) {
198 plugin.build(self).await;
199 registered.insert(plugin.name().to_string());
200 log::info!("{} plugin registered", plugin.name());
201 progress = true;
202 } else {
203 next_round.push(plugin);
204 }
205 }
206
207 if !progress {
208 panic!("Cyclic dependency detected or missing dependencies for some plugins");
209 }
210
211 to_register = next_round;
212 }
213 self.plugin_registry = registry;
214 }
215
216 async fn schedule(&mut self) -> Result<Arc<App>> {
217 let app = self.build_app();
218 while let Some(task) = self.schedulers.pop() {
219 let poll_future = task(app.clone());
220 let poll_future = Box::into_pin(poll_future);
221 match tokio::spawn(poll_future).await? {
222 Err(e) => log::error!("{}", e),
223 Ok(msg) => log::info!("scheduled result: {}", msg),
224 }
225 }
226 Ok(app)
227 }
228
229 fn build_app(&mut self) -> Arc<App> {
230 let components = std::mem::take(&mut self.components);
231 Arc::new(App { components })
232 }
233}
234
235impl Default for AppBuilder {
236 fn default() -> Self {
237 Self {
238 tracing_registry: tracing_subscriber::registry(),
239 plugin_registry: Default::default(),
240 config_path: Path::new("./config/app.toml").to_path_buf(),
241 config: Default::default(),
242 components: Default::default(),
243 schedulers: Default::default(),
244 }
245 }
246}