Skip to main content

alun_core/
plugin.rs

1use async_trait::async_trait;
2use crate::error::Result;
3
4/// 插件生命周期:start/stop 对称
5///
6/// 极简设计——只定义 start / stop 两个生命周期方法。
7/// Rust 版增强:异步化 + 依赖声明 + 拓扑排序启动。
8///
9/// # 示例
10///
11/// ```ignore
12/// use alun_core::plugin::Plugin;
13///
14/// struct DbPlugin { pool: PgPool }
15///
16/// #[async_trait]
17/// impl Plugin for DbPlugin {
18///     fn name(&self) -> &str { "pg" }
19///
20///     async fn start(&self) -> Result<()> {
21///         sqlx::query("SELECT 1").fetch_one(&self.pool).await?;
22///         Ok(())
23///     }
24///
25///     async fn stop(&self) -> Result<()> {
26///         self.pool.close().await;
27///         Ok(())
28///     }
29/// }
30/// ```
31#[async_trait]
32pub trait Plugin: Send + Sync {
33    /// 插件唯一名称(用于注册、日志、依赖解析)
34    fn name(&self) -> &str;
35
36    /// 启动插件:验证连接、初始化资源
37    ///
38    /// 在 `PluginManager::start_all()` 中被调用,按拓扑顺序依次执行。
39    /// 若返回 `Err`,则启动流程中止,后续插件不会执行。
40    async fn start(&self) -> Result<()>;
41
42    /// 关闭插件:释放资源
43    ///
44    /// 在 `PluginManager::stop_all()` 中被调用,按逆序执行。
45    /// 即使返回 `Err`,也会继续关闭其余插件(仅记录日志)。
46    async fn stop(&self) -> Result<()>;
47
48    /// 依赖的其他插件名称(用于拓扑排序,保证启动顺序)
49    ///
50    /// 默认返回空数组(无依赖)。
51    fn depends_on(&self) -> &[&str] {
52        &[]
53    }
54}
55
56use std::collections::{HashMap, HashSet, VecDeque};
57use tracing::{info, error};
58
59/// 插件管理器:负责注册、拓扑排序启动、逆序关闭
60///
61/// # 示例
62///
63/// ```ignore
64/// let mgr = PluginManager::new()
65///     .add(db_plugin)
66///     .add(cache_plugin);
67/// mgr.start_all().await?;
68/// // ... 运行应用 ...
69/// mgr.stop_all().await;
70/// ```
71pub struct PluginManager {
72    plugins: Vec<Box<dyn Plugin>>,
73}
74
75impl PluginManager {
76    /// 创建空的插件管理器
77    pub fn new() -> Self {
78        Self { plugins: Vec::new() }
79    }
80
81    /// 手动注册插件(链式调用)
82    ///
83    /// # 示例
84    ///
85    /// ```ignore
86    /// PluginManager::new()
87    ///     .add(DbPlugin::new(db_config))
88    ///     .add(CachePlugin::new(cache_config))
89    /// ```
90    pub fn add<P: Plugin + 'static>(mut self, plugin: P) -> Self {
91        self.plugins.push(Box::new(plugin));
92        self
93    }
94
95    /// 编译期自动发现的插件批量注册
96    ///
97    /// 与 `#[plugin]` 宏配合使用,将 linkme 收集到的插件批量加入管理器。
98    pub fn add_discovered(mut self, plugins: Vec<Box<dyn Plugin>>) -> Self {
99        self.plugins.extend(plugins);
100        self
101    }
102
103    /// 拓扑排序后依次启动所有插件
104    ///
105    /// 若任一插件启动失败,流程立即中止并返回错误。
106    pub async fn start_all(&self) -> Result<()> {
107        let ordered = self.topological_sort()?;
108
109        for plugin in ordered {
110            info!("启动插件: {}", plugin.name());
111            if let Err(e) = plugin.start().await {
112                error!("插件 {} 启动失败: {}", plugin.name(), e);
113                return Err(e);
114            }
115        }
116
117        Ok(())
118    }
119
120    /// 逆序关闭所有插件(后启动的先关闭)
121    ///
122    /// 即使某个插件关闭失败,也会继续关闭其余插件(仅记录错误日志)。
123    pub async fn stop_all(&self) {
124        for plugin in self.plugins.iter().rev() {
125            info!("停止插件: {}", plugin.name());
126            if let Err(e) = plugin.stop().await {
127                error!("插件 {} 停止异常: {}", plugin.name(), e);
128            }
129        }
130    }
131
132    /// Kahn 算法拓扑排序,检测循环依赖
133    ///
134    /// 返回按依赖顺序排列的插件引用。若存在循环依赖则返回 `Config` 错误。
135    fn topological_sort(&self) -> Result<Vec<&Box<dyn Plugin>>> {
136        let name_to_idx: HashMap<&str, usize> = self.plugins
137            .iter()
138            .enumerate()
139            .map(|(i, p)| (p.name(), i))
140            .collect();
141
142        let count = self.plugins.len();
143        let mut in_degree = vec![0usize; count];
144        let mut adj = vec![Vec::new(); count];
145
146        for (i, plugin) in self.plugins.iter().enumerate() {
147            for dep in plugin.depends_on() {
148                if let Some(&j) = name_to_idx.get(dep) {
149                    adj[j].push(i);
150                    in_degree[i] += 1;
151                }
152            }
153        }
154
155        let mut queue: VecDeque<usize> = (0..count)
156            .filter(|&i| in_degree[i] == 0)
157            .collect();
158
159        let mut sorted = Vec::with_capacity(count);
160        while let Some(u) = queue.pop_front() {
161            sorted.push(&self.plugins[u]);
162            for &v in &adj[u] {
163                in_degree[v] -= 1;
164                if in_degree[v] == 0 {
165                    queue.push_back(v);
166                }
167            }
168        }
169
170        if sorted.len() != count {
171            // 检测循环依赖
172            let cycle: Vec<&str> = self.plugins.iter()
173                .enumerate()
174                .filter(|(i, _)| in_degree[*i] > 0)
175                .map(|(_, p)| p.name())
176                .collect();
177            return Err(crate::error::Error::Config(
178                format!("插件循环依赖: {:?}", cycle)
179            ));
180        }
181
182        Ok(sorted)
183    }
184
185    /// 确保插件之间没有 name 冲突
186    pub fn check_duplicate_names(&self) -> std::result::Result<(), String> {
187        let mut seen = HashSet::new();
188        for p in &self.plugins {
189            if !seen.insert(p.name()) {
190                return Err(format!("插件名重复: {}", p.name()));
191            }
192        }
193        Ok(())
194    }
195}
196
197impl Default for PluginManager {
198    fn default() -> Self {
199        Self::new()
200    }
201}
202
203#[cfg(test)]
204mod tests {
205    use super::*;
206    use async_trait::async_trait;
207    use parking_lot::Mutex;
208    use std::sync::Arc;
209
210    struct TestPlugin {
211        name: &'static str,
212        deps: &'static [&'static str],
213        order: Arc<Mutex<Vec<String>>>,
214    }
215
216    #[async_trait]
217    impl Plugin for TestPlugin {
218        fn name(&self) -> &str { self.name }
219        async fn start(&self) -> Result<()> {
220            self.order.lock().push(format!("start:{}", self.name));
221            Ok(())
222        }
223        async fn stop(&self) -> Result<()> {
224            self.order.lock().push(format!("stop:{}", self.name));
225            Ok(())
226        }
227        fn depends_on(&self) -> &[&str] { self.deps }
228    }
229
230    #[tokio::test]
231    async fn test_topological_start() {
232        let order = Arc::new(Mutex::new(Vec::new()));
233
234        let mgr = PluginManager::new()
235            .add(TestPlugin {
236                name: "c", deps: &["a", "b"],
237                order: order.clone(),
238            })
239            .add(TestPlugin {
240                name: "b", deps: &["a"],
241                order: order.clone(),
242            })
243            .add(TestPlugin {
244                name: "a", deps: &[],
245                order: order.clone(),
246            });
247
248        mgr.start_all().await.unwrap();
249
250        let log = order.lock();
251        assert_eq!(log[0], "start:a");
252        assert_eq!(log[1], "start:b");
253        assert_eq!(log[2], "start:c");
254    }
255
256    #[tokio::test]
257    async fn test_cycle_detection() {
258        let order = Arc::new(Mutex::new(Vec::new()));
259
260        let mgr = PluginManager::new()
261            .add(TestPlugin {
262                name: "x", deps: &["y"],
263                order: order.clone(),
264            })
265            .add(TestPlugin {
266                name: "y", deps: &["x"],
267                order: order.clone(),
268            });
269
270        let result = mgr.start_all().await;
271        assert!(result.is_err());
272    }
273}