blvm_sdk/composition/
lifecycle.rs1use crate::composition::registry::ModuleRegistry;
6use crate::composition::types::*;
7use blvm_node::module::manager::ModuleManager;
8use blvm_node::module::traits::{
9 ModuleMetadata as RefModuleMetadata, ModuleState as RefModuleState,
10};
11use std::collections::HashMap;
12use std::sync::Arc;
13use tokio::sync::Mutex;
14
15fn module_state_to_status(s: RefModuleState) -> ModuleStatus {
16 match s {
17 RefModuleState::Running => ModuleStatus::Running,
18 RefModuleState::Stopped => ModuleStatus::Stopped,
19 RefModuleState::Initializing => ModuleStatus::Initializing,
20 RefModuleState::Stopping => ModuleStatus::Stopping,
21 RefModuleState::Error(msg) => ModuleStatus::Error(msg),
22 }
23}
24
25pub struct ModuleLifecycle {
27 pub(crate) registry: ModuleRegistry,
29 module_manager: Option<Arc<Mutex<ModuleManager>>>,
31 status_cache: HashMap<String, ModuleStatus>,
33}
34
35impl ModuleLifecycle {
36 pub fn new(registry: ModuleRegistry) -> Self {
38 Self {
39 registry,
40 module_manager: None,
41 status_cache: HashMap::new(),
42 }
43 }
44
45 pub fn with_module_manager(mut self, manager: Arc<Mutex<ModuleManager>>) -> Self {
47 self.module_manager = Some(manager);
48 self
49 }
50
51 pub async fn start_module(
53 &mut self,
54 name: &str,
55 config: Option<&HashMap<String, serde_json::Value>>,
56 ) -> Result<()> {
57 let info = self.registry.get_module(name, None)?;
58
59 let config_map: HashMap<String, String> = config
60 .map(|c| {
61 c.iter()
62 .map(|(k, v)| {
63 let s = match v {
64 serde_json::Value::String(s) => s.clone(),
65 _ => v.to_string(),
66 };
67 (k.clone(), s)
68 })
69 .collect()
70 })
71 .unwrap_or_default();
72
73 if let Some(ref manager) = self.module_manager {
74 let metadata: RefModuleMetadata = info.clone().into();
75
76 let binary_path = info.binary_path.as_ref().ok_or_else(|| {
77 CompositionError::ModuleNotFound(format!("Module {name} has no binary path"))
78 })?;
79
80 let mut mgr = manager.lock().await;
81 mgr.load_module(&info.name, binary_path, metadata, config_map)
82 .await
83 .map_err(CompositionError::from)?;
84
85 self.status_cache
86 .insert(name.to_string(), ModuleStatus::Running);
87 } else {
88 self.status_cache
89 .insert(name.to_string(), ModuleStatus::Running);
90 }
91
92 Ok(())
93 }
94
95 pub async fn stop_module(&mut self, name: &str) -> Result<()> {
97 let _info = self.registry.get_module(name, None)?;
98
99 if let Some(ref manager) = self.module_manager {
100 let mut mgr = manager.lock().await;
101 mgr.unload_module(name)
102 .await
103 .map_err(CompositionError::from)?;
104 }
105
106 self.status_cache
107 .insert(name.to_string(), ModuleStatus::Stopped);
108 Ok(())
109 }
110
111 pub async fn restart_module(
113 &mut self,
114 name: &str,
115 config: Option<&HashMap<String, serde_json::Value>>,
116 ) -> Result<()> {
117 self.stop_module(name).await?;
118 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
119 self.start_module(name, config).await
120 }
121
122 pub async fn get_module_status(&self, name: &str) -> Result<ModuleStatus> {
124 let _ = self.registry.get_module(name, None)?;
125
126 if let Some(ref manager) = self.module_manager {
127 if let Some(state) = manager.lock().await.get_module_state(name).await {
128 return Ok(module_state_to_status(state));
129 }
130 }
131
132 Ok(self
133 .status_cache
134 .get(name)
135 .cloned()
136 .unwrap_or(ModuleStatus::NotInstalled))
137 }
138
139 pub async fn health_check(&self, name: &str) -> Result<ModuleHealth> {
141 let status = self.get_module_status(name).await?;
142 match status {
143 ModuleStatus::Running => Ok(ModuleHealth::Healthy),
144 ModuleStatus::Error(msg) => Ok(ModuleHealth::Unhealthy(msg)),
145 ModuleStatus::Stopped | ModuleStatus::NotInstalled => Ok(ModuleHealth::Unknown),
146 _ => Ok(ModuleHealth::Degraded),
147 }
148 }
149
150 pub fn registry(&self) -> &ModuleRegistry {
152 &self.registry
153 }
154
155 pub fn registry_mut(&mut self) -> &mut ModuleRegistry {
157 &mut self.registry
158 }
159}