1use std::collections::HashMap;
6use std::path::{Path, PathBuf};
7use std::sync::Arc;
8use std::time::Duration;
9
10use tokio::sync::{RwLock, broadcast, mpsc};
11use tracing::{debug, error, info, warn};
12
13use super::loader::{DynamicPlugin, PluginLoadError, PluginLoader};
14use super::registry::{PluginInfo, PluginRegistry, PluginVersion};
15use super::state::{StateManager, StateSnapshot};
16use super::watcher::{PluginWatcher, WatchConfig, WatchEventKind};
17use crate::{PluginContext, PluginResult};
18
19#[derive(Debug, Clone)]
21pub struct HotReloadConfig {
22 pub base: mofa_kernel::plugin::HotReloadConfig,
24 pub watch_config: WatchConfig,
26 pub shutdown_timeout: Duration,
28 pub parallel_reload: bool,
30}
31
32impl Default for HotReloadConfig {
33 fn default() -> Self {
34 Self {
35 base: mofa_kernel::plugin::HotReloadConfig::default(),
36 watch_config: WatchConfig::default(),
37 shutdown_timeout: Duration::from_secs(30),
38 parallel_reload: false,
39 }
40 }
41}
42
43impl HotReloadConfig {
44 pub fn new() -> Self {
46 Self::default()
47 }
48
49 pub fn with_base(mut self, base: mofa_kernel::plugin::HotReloadConfig) -> Self {
51 self.base = base;
52 self
53 }
54
55 pub fn with_strategy(mut self, strategy: mofa_kernel::plugin::ReloadStrategy) -> Self {
57 self.base.strategy = strategy;
58 self
59 }
60
61 pub fn with_preserve_state(mut self, enabled: bool) -> Self {
63 self.base.preserve_state = enabled;
64 self
65 }
66
67 pub fn with_auto_rollback(mut self, enabled: bool) -> Self {
69 self.base.auto_rollback = enabled;
70 self
71 }
72
73 pub fn with_max_attempts(mut self, max: u32) -> Self {
75 self.base.max_reload_attempts = max;
76 self
77 }
78
79 pub fn with_reload_cooldown(mut self, cooldown: Duration) -> Self {
81 self.base.reload_cooldown = cooldown;
82 self
83 }
84
85 pub fn with_watch_config(mut self, watch_config: WatchConfig) -> Self {
87 self.watch_config = watch_config;
88 self
89 }
90
91 pub fn with_shutdown_timeout(mut self, timeout: Duration) -> Self {
93 self.shutdown_timeout = timeout;
94 self
95 }
96
97 pub fn with_parallel_reload(mut self, enabled: bool) -> Self {
99 self.parallel_reload = enabled;
100 self
101 }
102}
103
104pub use mofa_kernel::plugin::{ReloadEvent, ReloadStrategy};
106
107#[derive(Debug)]
109pub struct ReloadResult {
110 pub plugin_id: String,
112 pub success: bool,
114 pub error: Option<String>,
116 pub duration: Duration,
118 pub state_preserved: bool,
120 pub attempts: u32,
122}
123
124#[derive(Debug, thiserror::Error)]
126pub enum ReloadError {
127 #[error("Plugin not found: {0}")]
128 PluginNotFound(String),
129
130 #[error("Load error: {0}")]
131 LoadError(#[from] PluginLoadError),
132
133 #[error("State preservation failed: {0}")]
134 StateError(String),
135
136 #[error("Initialization failed: {0}")]
137 InitError(String),
138
139 #[error("Max reload attempts exceeded")]
140 MaxAttemptsExceeded,
141
142 #[error("Plugin still in use")]
143 PluginInUse,
144
145 #[error("Watch error: {0}")]
146 WatchError(String),
147
148 #[error("Internal error: {0}")]
149 Internal(String),
150}
151
152struct LoadedPlugin {
154 plugin: DynamicPlugin,
156 info: PluginInfo,
158 reload_attempts: u32,
160 last_reload: Option<std::time::Instant>,
162}
163
164pub struct HotReloadManager {
166 config: HotReloadConfig,
168 loader: Arc<PluginLoader>,
170 watcher: Arc<RwLock<PluginWatcher>>,
172 registry: Arc<PluginRegistry>,
174 state_manager: Arc<StateManager>,
176 loaded_plugins: Arc<RwLock<HashMap<String, LoadedPlugin>>>,
178 event_tx: broadcast::Sender<ReloadEvent>,
180 shutdown_tx: Option<mpsc::Sender<()>>,
182 running: Arc<RwLock<bool>>,
184 plugin_context: Option<PluginContext>,
186}
187
188impl HotReloadManager {
189 pub fn new(config: HotReloadConfig) -> Self {
191 let (event_tx, _) = broadcast::channel(1024);
192
193 Self {
194 loader: Arc::new(PluginLoader::new()),
195 watcher: Arc::new(RwLock::new(PluginWatcher::new(config.watch_config.clone()))),
196 registry: Arc::new(PluginRegistry::new()),
197 state_manager: Arc::new(StateManager::new()),
198 loaded_plugins: Arc::new(RwLock::new(HashMap::new())),
199 event_tx,
200 shutdown_tx: None,
201 running: Arc::new(RwLock::new(false)),
202 plugin_context: None,
203 config,
204 }
205 }
206
207 pub fn with_context(mut self, context: PluginContext) -> Self {
209 self.plugin_context = Some(context);
210 self
211 }
212
213 pub fn subscribe(&self) -> broadcast::Receiver<ReloadEvent> {
215 self.event_tx.subscribe()
216 }
217
218 pub fn registry(&self) -> Arc<PluginRegistry> {
220 self.registry.clone()
221 }
222
223 pub fn state_manager(&self) -> Arc<StateManager> {
225 self.state_manager.clone()
226 }
227
228 pub async fn add_watch_path<P: AsRef<Path>>(&self, path: P) -> Result<(), ReloadError> {
230 let path = path.as_ref();
231 info!("Adding watch path: {:?}", path);
232
233 let mut watcher = self.watcher.write().await;
234 watcher
235 .watch(path)
236 .await
237 .map_err(|e| ReloadError::WatchError(e.to_string()))?;
238
239 Ok(())
240 }
241
242 pub async fn start(&mut self) -> Result<(), ReloadError> {
244 info!("Starting hot-reload manager");
245
246 {
247 let mut running = self.running.write().await;
248 if *running {
249 return Err(ReloadError::Internal("Already running".to_string()));
250 }
251 *running = true;
252 }
253
254 {
256 let mut watcher = self.watcher.write().await;
257 watcher
258 .start()
259 .await
260 .map_err(|e| ReloadError::WatchError(e.to_string()))?;
261 }
262
263 let existing = {
265 let watcher = self.watcher.read().await;
266 watcher.scan_existing().await
267 };
268
269 for path in existing {
270 if let Err(e) = self.load_plugin(&path).await {
271 warn!("Failed to load existing plugin {:?}: {}", path, e);
272 }
273 }
274
275 let (shutdown_tx, shutdown_rx) = mpsc::channel::<()>(1);
277 self.shutdown_tx = Some(shutdown_tx);
278
279 self.spawn_event_processor(shutdown_rx).await;
280
281 info!("Hot-reload manager started");
282 Ok(())
283 }
284
285 pub async fn stop(&mut self) -> Result<(), ReloadError> {
287 info!("Stopping hot-reload manager");
288
289 if let Some(tx) = self.shutdown_tx.take() {
291 let _ = tx.send(()).await;
292 }
293
294 {
296 let mut watcher = self.watcher.write().await;
297 watcher.stop().await;
298 }
299
300 let plugin_ids: Vec<String> = {
302 let plugins = self.loaded_plugins.read().await;
303 plugins.keys().cloned().collect()
304 };
305
306 for plugin_id in plugin_ids {
307 if let Err(e) = self.unload_plugin(&plugin_id).await {
308 warn!("Failed to unload plugin {}: {}", plugin_id, e);
309 }
310 }
311
312 {
313 let mut running = self.running.write().await;
314 *running = false;
315 }
316
317 info!("Hot-reload manager stopped");
318 Ok(())
319 }
320
321 async fn spawn_event_processor(&self, mut shutdown_rx: mpsc::Receiver<()>) {
323 let watcher = self.watcher.clone();
324 let event_tx = self.event_tx.clone();
325 let config = self.config.clone();
326 let loader = self.loader.clone();
327 let registry = self.registry.clone();
328 let state_manager = self.state_manager.clone();
329 let loaded_plugins = self.loaded_plugins.clone();
330 let running = self.running.clone();
331 let plugin_context = self.plugin_context.clone();
332
333 tokio::spawn(async move {
334 let mut event_rx = {
336 let mut w = watcher.write().await;
337 match w.take_event_receiver() {
338 Some(rx) => rx,
339 None => {
340 error!("Failed to get event receiver");
341 return;
342 }
343 }
344 };
345
346 let mut pending_reloads: HashMap<PathBuf, std::time::Instant> = HashMap::new();
347
348 loop {
349 tokio::select! {
350 Some(watch_event) = event_rx.recv() => {
351 debug!("Received watch event: {:?}", watch_event);
352
353 match watch_event.kind {
354 WatchEventKind::Created => {
355 let _ = event_tx.send(ReloadEvent::PluginDiscovered {
357 path: watch_event.path.clone(),
358 });
359
360 if let Err(e) = Self::handle_load(
362 &watch_event.path,
363 &loader,
364 ®istry,
365 &loaded_plugins,
366 &event_tx,
367 plugin_context.as_ref(),
368 ).await {
369 warn!("Failed to load new plugin: {}", e);
370 }
371 }
372
373 WatchEventKind::Modified => {
374 match config.base.strategy {
376 ReloadStrategy::Immediate => {
377 Self::handle_reload(
378 &watch_event.path,
379 &loader,
380 ®istry,
381 &state_manager,
382 &loaded_plugins,
383 &event_tx,
384 &config,
385 plugin_context.as_ref(),
386 ).await;
387 }
388 ReloadStrategy::Debounced(duration) => {
389 pending_reloads.insert(
390 watch_event.path.clone(),
391 std::time::Instant::now() + duration,
392 );
393 }
394 ReloadStrategy::Manual => {
395 debug!("Manual reload mode - ignoring change");
396 }
397 ReloadStrategy::OnIdle => {
398 pending_reloads.insert(
399 watch_event.path.clone(),
400 std::time::Instant::now() + Duration::from_secs(5),
401 );
402 }
403 }
404 }
405
406 WatchEventKind::Removed => {
407 if let Some(info) = registry.get_by_path(&watch_event.path).await {
409 let _ = event_tx.send(ReloadEvent::PluginRemoved {
410 plugin_id: info.id.clone(),
411 path: watch_event.path.clone(),
412 });
413
414 let mut plugins = loaded_plugins.write().await;
416 if let Some(mut entry) = plugins.remove(&info.id) {
417 let _ = entry.plugin.plugin_mut().unload().await;
418 }
419 let _ = registry.unregister(&info.id).await;
420 }
421 }
422
423 WatchEventKind::Renamed { from, to } => {
424 if let Some(info) = registry.get_by_path(&from).await {
426 let _ = registry.unregister(&info.id).await;
427 }
428
429 if let Err(e) = Self::handle_load(
430 &to,
431 &loader,
432 ®istry,
433 &loaded_plugins,
434 &event_tx,
435 plugin_context.as_ref(),
436 ).await {
437 warn!("Failed to load renamed plugin: {}", e);
438 }
439 }
440 }
441 }
442
443 _ = tokio::time::sleep(Duration::from_millis(100)) => {
444 let now = std::time::Instant::now();
446 let ready: Vec<PathBuf> = pending_reloads
447 .iter()
448 .filter(|(_, time)| &now >= time)
449 .map(|(path, _)| path.clone())
450 .collect();
451
452 for path in ready {
453 pending_reloads.remove(&path);
454 Self::handle_reload(
455 &path,
456 &loader,
457 ®istry,
458 &state_manager,
459 &loaded_plugins,
460 &event_tx,
461 &config,
462 plugin_context.as_ref(),
463 ).await;
464 }
465 }
466
467 _ = shutdown_rx.recv() => {
468 info!("Event processor shutting down");
469 return;
470 }
471 }
472
473 if !*running.read().await {
475 return;
476 }
477 }
478 });
479 }
480
481 async fn handle_load(
483 path: &Path,
484 loader: &PluginLoader,
485 registry: &PluginRegistry,
486 loaded_plugins: &RwLock<HashMap<String, LoadedPlugin>>,
487 event_tx: &broadcast::Sender<ReloadEvent>,
488 context: Option<&PluginContext>,
489 ) -> Result<(), ReloadError> {
490 info!("Loading plugin from {:?}", path);
491
492 let library = loader.load_library(path).await?;
494
495 let mut dynamic_plugin = loader.create_plugin(path).await?;
497
498 if let Some(ctx) = context {
500 dynamic_plugin
501 .plugin_mut()
502 .load(ctx)
503 .await
504 .map_err(|e| ReloadError::InitError(e.to_string()))?;
505 dynamic_plugin
506 .plugin_mut()
507 .init_plugin()
508 .await
509 .map_err(|e| ReloadError::InitError(e.to_string()))?;
510 dynamic_plugin
511 .plugin_mut()
512 .start()
513 .await
514 .map_err(|e| ReloadError::InitError(e.to_string()))?;
515 }
516
517 let metadata = library.metadata();
518 let plugin_id = metadata.id.clone();
519
520 let mut info = PluginInfo::new(
522 &plugin_id,
523 &metadata.name,
524 PluginVersion::parse(&metadata.version).unwrap_or_default(),
525 )
526 .with_library_path(path)
527 .with_description(&metadata.description);
528
529 for cap in &metadata.capabilities {
530 info = info.with_capability(cap);
531 }
532 for dep in &metadata.dependencies {
533 info = info.with_dependency(dep);
534 }
535
536 info.mark_loaded();
537
538 registry
540 .register(info.clone())
541 .await
542 .map_err(ReloadError::Internal)?;
543
544 let entry = LoadedPlugin {
546 plugin: dynamic_plugin,
547 info,
548 reload_attempts: 0,
549 last_reload: None,
550 };
551
552 let mut plugins = loaded_plugins.write().await;
553 plugins.insert(plugin_id.clone(), entry);
554
555 let _ = event_tx.send(ReloadEvent::ReloadCompleted {
556 plugin_id,
557 path: path.to_path_buf(),
558 success: true,
559 duration: Duration::from_millis(0),
560 });
561
562 Ok(())
563 }
564
565 async fn handle_reload(
567 path: &Path,
568 loader: &PluginLoader,
569 registry: &PluginRegistry,
570 state_manager: &StateManager,
571 loaded_plugins: &RwLock<HashMap<String, LoadedPlugin>>,
572 event_tx: &broadcast::Sender<ReloadEvent>,
573 config: &HotReloadConfig,
574 context: Option<&PluginContext>,
575 ) {
576 let start = std::time::Instant::now();
577
578 let plugin_id = match registry.get_by_path(path).await {
580 Some(info) => info.id,
581 None => {
582 if let Err(e) =
584 Self::handle_load(path, loader, registry, loaded_plugins, event_tx, context)
585 .await
586 {
587 warn!("Failed to load plugin: {}", e);
588 }
589 return;
590 }
591 };
592
593 info!("Reloading plugin: {}", plugin_id);
594
595 let _ = event_tx.send(ReloadEvent::ReloadStarted {
596 plugin_id: plugin_id.clone(),
597 path: path.to_path_buf(),
598 });
599
600 let saved_state = if config.base.preserve_state {
602 let plugins = loaded_plugins.read().await;
603 if let Some(entry) = plugins.get(&plugin_id) {
604 let stats = entry.plugin.plugin().stats();
606 let mut snapshot = StateSnapshot::new(&plugin_id, &entry.info.version.to_string());
607 for (key, value) in stats {
608 snapshot.data.insert(key, value);
609 }
610 let _ = state_manager.save_snapshot(snapshot.clone()).await;
611
612 let _ = event_tx.send(ReloadEvent::StatePreserved {
613 plugin_id: plugin_id.clone(),
614 });
615
616 Some(snapshot)
617 } else {
618 None
619 }
620 } else {
621 None
622 };
623
624 {
626 let mut plugins = loaded_plugins.write().await;
627 if let Some(mut entry) = plugins.remove(&plugin_id) {
628 let _ = entry.plugin.plugin_mut().stop().await;
629 let _ = entry.plugin.plugin_mut().unload().await;
630 }
631 }
632
633 let library = match loader.reload_library(path).await {
635 Ok(lib) => lib,
636 Err(e) => {
637 error!("Failed to reload library: {}", e);
638
639 let _ = event_tx.send(ReloadEvent::ReloadFailed {
640 plugin_id: plugin_id.clone(),
641 path: path.to_path_buf(),
642 error: e.to_string(),
643 attempt: 1,
644 });
645
646 if config.base.auto_rollback
648 && let Some(_snapshot) = saved_state
649 {
650 let _ = event_tx.send(ReloadEvent::RollbackTriggered {
651 plugin_id: plugin_id.clone(),
652 reason: "Library reload failed".to_string(),
653 });
654 }
655
656 return;
657 }
658 };
659
660 let mut dynamic_plugin = match loader.create_plugin(path).await {
662 Ok(p) => p,
663 Err(e) => {
664 error!("Failed to create plugin instance: {}", e);
665
666 let _ = event_tx.send(ReloadEvent::ReloadFailed {
667 plugin_id: plugin_id.clone(),
668 path: path.to_path_buf(),
669 error: e.to_string(),
670 attempt: 1,
671 });
672
673 return;
674 }
675 };
676
677 if let Some(ctx) = context {
679 if let Err(e) = dynamic_plugin.plugin_mut().load(ctx).await {
680 error!("Failed to load plugin: {}", e);
681 return;
682 }
683 if let Err(e) = dynamic_plugin.plugin_mut().init_plugin().await {
684 error!("Failed to initialize plugin: {}", e);
685 return;
686 }
687 if let Err(e) = dynamic_plugin.plugin_mut().start().await {
688 error!("Failed to start plugin: {}", e);
689 return;
690 }
691 }
692
693 let metadata = library.metadata();
695 let mut info = PluginInfo::new(
696 &plugin_id,
697 &metadata.name,
698 PluginVersion::parse(&metadata.version).unwrap_or_default(),
699 )
700 .with_library_path(path);
701
702 info.mark_reloaded();
703
704 let _ = registry.update(info.clone()).await;
705
706 let entry = LoadedPlugin {
708 plugin: dynamic_plugin,
709 info,
710 reload_attempts: 0,
711 last_reload: Some(std::time::Instant::now()),
712 };
713
714 let mut plugins = loaded_plugins.write().await;
715 plugins.insert(plugin_id.clone(), entry);
716
717 if let Some(_snapshot) = saved_state {
719 let _ = event_tx.send(ReloadEvent::StateRestored {
720 plugin_id: plugin_id.clone(),
721 });
722 }
723
724 let duration = start.elapsed();
725 info!("Plugin {} reloaded in {:?}", plugin_id, duration);
726
727 let _ = event_tx.send(ReloadEvent::ReloadCompleted {
728 plugin_id,
729 path: path.to_path_buf(),
730 success: true,
731 duration,
732 });
733 }
734
735 pub async fn load_plugin<P: AsRef<Path>>(&self, path: P) -> Result<String, ReloadError> {
737 let path = path.as_ref();
738
739 Self::handle_load(
740 path,
741 &self.loader,
742 &self.registry,
743 &self.loaded_plugins,
744 &self.event_tx,
745 self.plugin_context.as_ref(),
746 )
747 .await?;
748
749 let info =
751 self.registry.get_by_path(path).await.ok_or_else(|| {
752 ReloadError::Internal("Plugin not registered after load".to_string())
753 })?;
754
755 Ok(info.id)
756 }
757
758 pub async fn unload_plugin(&self, plugin_id: &str) -> Result<(), ReloadError> {
760 info!("Unloading plugin: {}", plugin_id);
761
762 if self.config.base.preserve_state {
764 let plugins = self.loaded_plugins.read().await;
765 if let Some(entry) = plugins.get(plugin_id) {
766 let stats = entry.plugin.plugin().stats();
767 let mut snapshot = StateSnapshot::new(plugin_id, &entry.info.version.to_string());
768 for (key, value) in stats {
769 snapshot.data.insert(key, value);
770 }
771 let _ = self.state_manager.save_snapshot(snapshot).await;
772 }
773 }
774
775 let mut plugins = self.loaded_plugins.write().await;
777 if let Some(mut entry) = plugins.remove(plugin_id) {
778 entry
779 .plugin
780 .plugin_mut()
781 .stop()
782 .await
783 .map_err(|e| ReloadError::Internal(e.to_string()))?;
784 entry
785 .plugin
786 .plugin_mut()
787 .unload()
788 .await
789 .map_err(|e| ReloadError::Internal(e.to_string()))?;
790 } else {
791 return Err(ReloadError::PluginNotFound(plugin_id.to_string()));
792 }
793
794 self.registry
796 .unregister(plugin_id)
797 .await
798 .map_err(ReloadError::Internal)?;
799
800 Ok(())
801 }
802
803 pub async fn reload_plugin(&self, plugin_id: &str) -> Result<ReloadResult, ReloadError> {
805 let info = self
806 .registry
807 .get(plugin_id)
808 .await
809 .ok_or_else(|| ReloadError::PluginNotFound(plugin_id.to_string()))?;
810
811 let path = info
812 .library_path
813 .ok_or_else(|| ReloadError::Internal("No library path".to_string()))?;
814
815 let start = std::time::Instant::now();
816
817 Self::handle_reload(
818 &path,
819 &self.loader,
820 &self.registry,
821 &self.state_manager,
822 &self.loaded_plugins,
823 &self.event_tx,
824 &self.config,
825 self.plugin_context.as_ref(),
826 )
827 .await;
828
829 Ok(ReloadResult {
830 plugin_id: plugin_id.to_string(),
831 success: true,
832 error: None,
833 duration: start.elapsed(),
834 state_preserved: self.config.base.preserve_state,
835 attempts: 1,
836 })
837 }
838
839 pub async fn get_plugin_info(&self, plugin_id: &str) -> Option<PluginInfo> {
841 self.registry.get(plugin_id).await
842 }
843
844 pub async fn execute(&self, plugin_id: &str, input: String) -> PluginResult<String> {
846 let mut plugins = self.loaded_plugins.write().await;
847 let entry = plugins
848 .get_mut(plugin_id)
849 .ok_or_else(|| anyhow::anyhow!("Plugin {} not found", plugin_id))?;
850
851 entry.plugin.plugin_mut().execute(input).await
852 }
853
854 pub async fn list_plugins(&self) -> Vec<String> {
856 let plugins = self.loaded_plugins.read().await;
857 plugins.keys().cloned().collect()
858 }
859
860 pub async fn is_running(&self) -> bool {
862 *self.running.read().await
863 }
864}
865
866impl Default for HotReloadManager {
867 fn default() -> Self {
868 Self::new(HotReloadConfig::default())
869 }
870}