Skip to main content

drasi_host_sdk/
lifecycle.rs

1// Copyright 2025 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Plugin lifecycle management — reusable runtime lifecycle for all Drasi hosts.
16//!
17//! The [`PluginLifecycleManager`] owns the mutable [`PluginRegistry`] and provides
18//! operations to load plugins at runtime. It emits [`PluginEvent`]s
19//! through a broadcast channel so host applications can react to plugin changes.
20//!
21//! This lives in `host-sdk` so it is available to any Drasi host implementation,
22//! not just `drasi-server`.
23
24use std::collections::HashMap;
25use std::path::Path;
26use std::sync::Arc;
27
28use tokio::sync::{broadcast, RwLock};
29
30use crate::callbacks::{self, CallbackContext};
31use crate::loader::{load_plugin_from_path, plugin_kind_from_filename, LoadedPlugin};
32use crate::plugin_registry::PluginRegistry;
33use crate::plugin_types::{PluginCategory, PluginEvent, PluginKindEntry, PluginStatus};
34
35use drasi_plugin_sdk::{
36    BootstrapPluginDescriptor, ReactionPluginDescriptor, SourcePluginDescriptor,
37};
38
39/// Tracks the runtime state of a single loaded plugin library.
40#[derive(Debug)]
41pub struct LoadedPluginState {
42    pub plugin_id: String,
43    pub status: PluginStatus,
44    pub kinds: Vec<PluginKindEntry>,
45    pub metadata_info: Option<String>,
46}
47
48/// Manages plugin loading and registration at the host-sdk level.
49///
50/// The `PluginLifecycleManager` is the reusable core that any Drasi host can use
51/// to manage plugin lifecycles. It owns the `PluginRegistry` (via `Arc<RwLock>`)
52/// and emits `PluginEvent`s through a broadcast channel.
53pub struct PluginLifecycleManager {
54    registry: Arc<RwLock<PluginRegistry>>,
55    loaded_plugins: RwLock<HashMap<String, LoadedPluginState>>,
56    event_tx: broadcast::Sender<PluginEvent>,
57}
58
59impl PluginLifecycleManager {
60    /// Create a new lifecycle manager with the given registry.
61    pub fn new(registry: Arc<RwLock<PluginRegistry>>) -> Self {
62        let (event_tx, _) = broadcast::channel(64);
63        Self {
64            registry,
65            loaded_plugins: RwLock::new(HashMap::new()),
66            event_tx,
67        }
68    }
69
70    /// Get a reference to the shared plugin registry.
71    pub fn registry(&self) -> &Arc<RwLock<PluginRegistry>> {
72        &self.registry
73    }
74
75    /// Subscribe to plugin lifecycle events.
76    pub fn subscribe(&self) -> broadcast::Receiver<PluginEvent> {
77        self.event_tx.subscribe()
78    }
79
80    /// Register descriptors from an already-loaded plugin into the registry.
81    ///
82    /// This takes ownership of the `LoadedPlugin`, wraps each descriptor proxy
83    /// in `Arc`, and registers them with plugin identity metadata. Use this when
84    /// the caller has already called `load_plugin_from_path` and wants to register
85    /// the results.
86    pub async fn register_loaded_plugin(
87        &self,
88        plugin_id: &str,
89        mut loaded: LoadedPlugin,
90    ) -> Vec<PluginKindEntry> {
91        let mut kinds = Vec::new();
92        let metadata_info = loaded.metadata_info.take();
93
94        // Take ownership of proxy vecs via mem::take, so Drop finds them empty.
95        let sources = std::mem::take(&mut loaded.source_plugins);
96        let reactions = std::mem::take(&mut loaded.reaction_plugins);
97        let bootstraps = std::mem::take(&mut loaded.bootstrap_plugins);
98
99        let mut reg = self.registry.write().await;
100
101        for source in sources {
102            kinds.push(PluginKindEntry {
103                category: PluginCategory::Source,
104                kind: SourcePluginDescriptor::kind(&source).to_string(),
105                config_version: SourcePluginDescriptor::config_version(&source).to_string(),
106                config_schema_name: SourcePluginDescriptor::config_schema_name(&source).to_string(),
107            });
108            reg.register_source_with_metadata(Arc::new(source), plugin_id);
109        }
110
111        for reaction in reactions {
112            kinds.push(PluginKindEntry {
113                category: PluginCategory::Reaction,
114                kind: ReactionPluginDescriptor::kind(&reaction).to_string(),
115                config_version: ReactionPluginDescriptor::config_version(&reaction).to_string(),
116                config_schema_name: ReactionPluginDescriptor::config_schema_name(&reaction)
117                    .to_string(),
118            });
119            reg.register_reaction_with_metadata(Arc::new(reaction), plugin_id);
120        }
121
122        for bootstrap in bootstraps {
123            kinds.push(PluginKindEntry {
124                category: PluginCategory::Bootstrap,
125                kind: BootstrapPluginDescriptor::kind(&bootstrap).to_string(),
126                config_version: BootstrapPluginDescriptor::config_version(&bootstrap).to_string(),
127                config_schema_name: BootstrapPluginDescriptor::config_schema_name(&bootstrap)
128                    .to_string(),
129            });
130            reg.register_bootstrapper_with_metadata(Arc::new(bootstrap), plugin_id);
131        }
132
133        drop(reg);
134
135        // Track state
136        let state = LoadedPluginState {
137            plugin_id: plugin_id.to_string(),
138            status: PluginStatus::Loaded,
139            kinds: kinds.clone(),
140            metadata_info,
141        };
142
143        self.loaded_plugins
144            .write()
145            .await
146            .insert(plugin_id.to_string(), state);
147
148        // Emit event
149        let _ = self.event_tx.send(PluginEvent::Loaded {
150            plugin_id: plugin_id.to_string(),
151            version: String::new(),
152            kinds: kinds.clone(),
153        });
154
155        kinds
156    }
157
158    /// Load a single plugin from a file path and register its descriptors.
159    ///
160    /// This performs `dlopen`, metadata validation, `drasi_plugin_init()`, and
161    /// descriptor registration. The plugin's library handle is intentionally
162    /// leaked (never `dlclose`d) following the existing safety model.
163    ///
164    /// Returns the plugin ID and the kinds it provides.
165    pub async fn load_plugin(
166        &self,
167        path: &Path,
168        callback_context: Option<Arc<CallbackContext>>,
169    ) -> anyhow::Result<(String, Vec<PluginKindEntry>)> {
170        // Derive plugin_id from filename
171        let plugin_id = path
172            .file_name()
173            .and_then(|f| f.to_str())
174            .and_then(plugin_kind_from_filename)
175            .unwrap_or_else(|| {
176                path.file_stem()
177                    .and_then(|s| s.to_str())
178                    .unwrap_or("unknown")
179                    .to_string()
180            });
181
182        // Load the plugin using host-sdk loader
183        let (log_cb, log_ctx, lifecycle_cb, lifecycle_ctx) = match &callback_context {
184            Some(ctx) => {
185                let raw = ctx.clone().into_raw();
186                (
187                    callbacks::default_log_callback_fn(),
188                    raw,
189                    callbacks::default_lifecycle_callback_fn(),
190                    raw,
191                )
192            }
193            None => (
194                callbacks::default_log_callback_fn(),
195                std::ptr::null_mut(),
196                callbacks::default_lifecycle_callback_fn(),
197                std::ptr::null_mut(),
198            ),
199        };
200
201        let loaded = load_plugin_from_path(path, log_ctx, log_cb, lifecycle_ctx, lifecycle_cb)?;
202
203        let kinds = self.register_loaded_plugin(&plugin_id, loaded).await;
204
205        Ok((plugin_id, kinds))
206    }
207
208    /// Update a plugin's status (for use by the orchestrator layer).
209    pub async fn set_plugin_status(&self, plugin_id: &str, status: PluginStatus) {
210        let mut plugins = self.loaded_plugins.write().await;
211        if let Some(state) = plugins.get_mut(plugin_id) {
212            state.status = status;
213        }
214    }
215
216    /// Get the status of a loaded plugin.
217    pub async fn get_plugin_status(&self, plugin_id: &str) -> Option<PluginStatus> {
218        self.loaded_plugins
219            .read()
220            .await
221            .get(plugin_id)
222            .map(|s| s.status)
223    }
224
225    /// List all loaded plugins and their states.
226    pub async fn list_plugins(&self) -> Vec<(String, PluginStatus, Vec<PluginKindEntry>)> {
227        self.loaded_plugins
228            .read()
229            .await
230            .values()
231            .map(|s| (s.plugin_id.clone(), s.status, s.kinds.clone()))
232            .collect()
233    }
234}
235
236#[cfg(test)]
237mod tests {
238    use super::*;
239
240    #[tokio::test]
241    async fn test_lifecycle_manager_creation() {
242        let registry = Arc::new(RwLock::new(PluginRegistry::new()));
243        let manager = PluginLifecycleManager::new(registry.clone());
244
245        assert!(manager.list_plugins().await.is_empty());
246    }
247
248    #[tokio::test]
249    async fn test_lifecycle_manager_subscribe() {
250        let registry = Arc::new(RwLock::new(PluginRegistry::new()));
251        let manager = PluginLifecycleManager::new(registry);
252
253        let _rx = manager.subscribe();
254    }
255
256    #[tokio::test]
257    async fn test_set_plugin_status() {
258        let registry = Arc::new(RwLock::new(PluginRegistry::new()));
259        let manager = PluginLifecycleManager::new(registry);
260
261        // Insert a fake plugin state directly for testing
262        {
263            let mut plugins = manager.loaded_plugins.write().await;
264            plugins.insert(
265                "test-plugin".to_string(),
266                LoadedPluginState {
267                    plugin_id: "test-plugin".to_string(),
268                    status: PluginStatus::Loaded,
269                    kinds: vec![],
270                    metadata_info: None,
271                },
272            );
273        }
274
275        assert_eq!(
276            manager.get_plugin_status("test-plugin").await,
277            Some(PluginStatus::Loaded)
278        );
279
280        manager
281            .set_plugin_status("test-plugin", PluginStatus::Active)
282            .await;
283
284        assert_eq!(
285            manager.get_plugin_status("test-plugin").await,
286            Some(PluginStatus::Active)
287        );
288    }
289
290    #[tokio::test]
291    async fn test_get_plugin_status_nonexistent() {
292        let registry = Arc::new(RwLock::new(PluginRegistry::new()));
293        let manager = PluginLifecycleManager::new(registry);
294
295        assert_eq!(manager.get_plugin_status("nonexistent").await, None);
296    }
297
298    #[tokio::test]
299    async fn test_set_plugin_status_nonexistent_is_noop() {
300        let registry = Arc::new(RwLock::new(PluginRegistry::new()));
301        let manager = PluginLifecycleManager::new(registry);
302
303        // Setting status on a nonexistent plugin should not panic
304        manager
305            .set_plugin_status("nonexistent", PluginStatus::Active)
306            .await;
307
308        // Still no plugins
309        assert!(manager.list_plugins().await.is_empty());
310    }
311}