Skip to main content

oxigdal_edge/runtime/
mod.rs

1//! Lightweight edge runtime for resource-constrained devices
2
3pub mod executor;
4pub mod scheduler;
5
6use crate::cache::{Cache, CacheConfig};
7use crate::compression::{AdaptiveCompressor, CompressionLevel};
8use crate::error::{EdgeError, Result};
9use crate::resource::{HealthStatus, ResourceConstraints, ResourceManager};
10use crate::sync::{SyncManager, SyncStrategy};
11use futures::Future;
12use parking_lot::RwLock;
13use serde::{Deserialize, Serialize};
14use std::path::PathBuf;
15use std::sync::Arc;
16use std::time::Duration;
17
18pub use executor::Executor;
19pub use scheduler::Scheduler;
20
21/// Runtime operation mode
22#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
23pub enum RuntimeMode {
24    /// Online mode with cloud connectivity
25    Online,
26    /// Offline mode with local processing only
27    Offline,
28    /// Hybrid mode that adapts based on connectivity
29    Hybrid,
30}
31
32/// Edge runtime configuration
33#[derive(Debug, Clone, Serialize, Deserialize)]
34pub struct EdgeConfig {
35    /// Runtime mode
36    pub mode: RuntimeMode,
37    /// Resource constraints
38    pub constraints: ResourceConstraints,
39    /// Cache configuration
40    pub cache_config: CacheConfig,
41    /// Compression level
42    pub compression_level: CompressionLevel,
43    /// Sync strategy
44    pub sync_strategy: SyncStrategy,
45    /// Data directory
46    pub data_dir: PathBuf,
47    /// Enable metrics collection
48    pub enable_metrics: bool,
49    /// Heartbeat interval in seconds
50    pub heartbeat_interval_secs: u64,
51}
52
53impl Default for EdgeConfig {
54    fn default() -> Self {
55        Self {
56            mode: RuntimeMode::Hybrid,
57            constraints: ResourceConstraints::default(),
58            cache_config: CacheConfig::default(),
59            compression_level: CompressionLevel::Balanced,
60            sync_strategy: SyncStrategy::Incremental,
61            data_dir: PathBuf::from(".oxigdal_edge"),
62            enable_metrics: true,
63            heartbeat_interval_secs: 60,
64        }
65    }
66}
67
68impl EdgeConfig {
69    /// Create minimal configuration for embedded devices
70    pub fn minimal() -> Self {
71        Self {
72            mode: RuntimeMode::Offline,
73            constraints: ResourceConstraints::minimal(),
74            cache_config: CacheConfig::minimal(),
75            compression_level: CompressionLevel::Fast,
76            sync_strategy: SyncStrategy::Manual,
77            data_dir: PathBuf::from(".edge_minimal"),
78            enable_metrics: false,
79            heartbeat_interval_secs: 1, // 1 second for faster tests
80        }
81    }
82
83    /// Create configuration for offline-first mode
84    pub fn offline_first() -> Self {
85        Self {
86            mode: RuntimeMode::Offline,
87            constraints: ResourceConstraints::moderate(),
88            cache_config: CacheConfig::offline_first(),
89            compression_level: CompressionLevel::Balanced,
90            sync_strategy: SyncStrategy::Batch,
91            data_dir: PathBuf::from(".edge_offline"),
92            enable_metrics: true,
93            heartbeat_interval_secs: 5, // 5 seconds for faster tests
94        }
95    }
96
97    /// Validate configuration
98    pub fn validate(&self) -> Result<()> {
99        self.constraints.validate()?;
100
101        if self.heartbeat_interval_secs == 0 {
102            return Err(EdgeError::invalid_config(
103                "heartbeat_interval_secs must be > 0",
104            ));
105        }
106
107        Ok(())
108    }
109}
110
111/// Edge runtime state
112#[derive(Debug, Clone, Copy, PartialEq, Eq)]
113pub enum RuntimeState {
114    /// Runtime is initializing
115    Initializing,
116    /// Runtime is running
117    Running,
118    /// Runtime is paused
119    Paused,
120    /// Runtime is shutting down
121    ShuttingDown,
122    /// Runtime is stopped
123    Stopped,
124}
125
126/// Edge runtime for resource-constrained devices
127pub struct EdgeRuntime {
128    config: EdgeConfig,
129    state: Arc<RwLock<RuntimeState>>,
130    resource_manager: Arc<ResourceManager>,
131    cache: Arc<Cache>,
132    compressor: Arc<AdaptiveCompressor>,
133    sync_manager: Option<Arc<SyncManager>>,
134    executor: Arc<Executor>,
135    scheduler: Arc<Scheduler>,
136}
137
138impl EdgeRuntime {
139    /// Create new edge runtime
140    pub async fn new(config: EdgeConfig) -> Result<Self> {
141        config.validate()?;
142
143        // Create data directory
144        std::fs::create_dir_all(&config.data_dir)
145            .map_err(|e| EdgeError::runtime(format!("Failed to create data dir: {}", e)))?;
146
147        // Initialize resource manager
148        let resource_manager = Arc::new(ResourceManager::new(config.constraints.clone())?);
149
150        // Initialize cache
151        let cache = Arc::new(Cache::new(config.cache_config.clone())?);
152
153        // Initialize compressor
154        let compressor = Arc::new(AdaptiveCompressor::new(config.compression_level));
155
156        // Initialize sync manager if not in offline mode
157        let sync_manager = if config.mode != RuntimeMode::Offline {
158            Some(Arc::new(SyncManager::new(
159                config.sync_strategy,
160                Arc::clone(&cache),
161            )?))
162        } else {
163            None
164        };
165
166        // Initialize executor
167        let executor = Arc::new(Executor::new(Arc::clone(&resource_manager)));
168
169        // Initialize scheduler
170        let scheduler = Arc::new(Scheduler::new(
171            Arc::clone(&resource_manager),
172            config.heartbeat_interval_secs,
173        ));
174
175        Ok(Self {
176            config,
177            state: Arc::new(RwLock::new(RuntimeState::Initializing)),
178            resource_manager,
179            cache,
180            compressor,
181            sync_manager,
182            executor,
183            scheduler,
184        })
185    }
186
187    /// Start the runtime
188    pub async fn start(&self) -> Result<()> {
189        {
190            let mut state = self.state.write();
191            if *state != RuntimeState::Initializing && *state != RuntimeState::Stopped {
192                return Err(EdgeError::runtime("Runtime already started"));
193            }
194            *state = RuntimeState::Running;
195        } // Guard dropped here automatically
196
197        // Start scheduler
198        self.scheduler.start().await?;
199
200        // Start sync manager if available
201        if let Some(sync_manager) = &self.sync_manager {
202            sync_manager.start().await?;
203        }
204
205        Ok(())
206    }
207
208    /// Stop the runtime
209    pub async fn stop(&self) -> Result<()> {
210        {
211            let mut state = self.state.write();
212            *state = RuntimeState::ShuttingDown;
213        } // Guard dropped here automatically
214
215        // Stop sync manager
216        if let Some(sync_manager) = &self.sync_manager {
217            sync_manager.stop().await?;
218        }
219
220        // Stop scheduler
221        self.scheduler.stop().await?;
222
223        {
224            let mut state = self.state.write();
225            *state = RuntimeState::Stopped;
226        } // Guard dropped here automatically
227
228        Ok(())
229    }
230
231    /// Pause the runtime
232    pub async fn pause(&self) -> Result<()> {
233        let mut state = self.state.write();
234        if *state != RuntimeState::Running {
235            return Err(EdgeError::runtime("Runtime not running"));
236        }
237
238        *state = RuntimeState::Paused;
239        Ok(())
240    }
241
242    /// Resume the runtime
243    pub async fn resume(&self) -> Result<()> {
244        let mut state = self.state.write();
245        if *state != RuntimeState::Paused {
246            return Err(EdgeError::runtime("Runtime not paused"));
247        }
248
249        *state = RuntimeState::Running;
250        Ok(())
251    }
252
253    /// Get current runtime state
254    pub fn state(&self) -> RuntimeState {
255        *self.state.read()
256    }
257
258    /// Execute a task
259    pub async fn execute<F, T>(&self, task: F) -> Result<T>
260    where
261        F: Future<Output = Result<T>> + Send + 'static,
262        T: Send + 'static,
263    {
264        if *self.state.read() != RuntimeState::Running {
265            return Err(EdgeError::runtime("Runtime not running"));
266        }
267
268        self.executor.execute(task).await
269    }
270
271    /// Get cache reference
272    pub fn cache(&self) -> &Arc<Cache> {
273        &self.cache
274    }
275
276    /// Get compressor reference
277    pub fn compressor(&self) -> &Arc<AdaptiveCompressor> {
278        &self.compressor
279    }
280
281    /// Get resource manager reference
282    pub fn resource_manager(&self) -> &Arc<ResourceManager> {
283        &self.resource_manager
284    }
285
286    /// Get sync manager reference
287    pub fn sync_manager(&self) -> Option<&Arc<SyncManager>> {
288        self.sync_manager.as_ref()
289    }
290
291    /// Check health status
292    pub fn health(&self) -> HealthStatus {
293        self.resource_manager.health_check()
294    }
295
296    /// Get configuration
297    pub fn config(&self) -> &EdgeConfig {
298        &self.config
299    }
300
301    /// Wait for healthy state
302    pub async fn wait_healthy(&self, timeout: Duration) -> Result<()> {
303        let start = std::time::Instant::now();
304
305        loop {
306            if self.health() == HealthStatus::Healthy {
307                return Ok(());
308            }
309
310            if start.elapsed() > timeout {
311                return Err(EdgeError::timeout("Health check timeout"));
312            }
313
314            tokio::time::sleep(Duration::from_millis(100)).await;
315        }
316    }
317}
318
319#[cfg(test)]
320mod tests {
321    use super::*;
322
323    #[tokio::test]
324    async fn test_runtime_creation() {
325        let config = EdgeConfig::minimal();
326        let runtime = EdgeRuntime::new(config).await;
327        assert!(runtime.is_ok());
328    }
329
330    #[tokio::test]
331    async fn test_runtime_lifecycle() -> Result<()> {
332        let config = EdgeConfig::minimal();
333        let runtime = EdgeRuntime::new(config).await?;
334
335        assert_eq!(runtime.state(), RuntimeState::Initializing);
336
337        runtime.start().await?;
338        assert_eq!(runtime.state(), RuntimeState::Running);
339
340        runtime.pause().await?;
341        assert_eq!(runtime.state(), RuntimeState::Paused);
342
343        runtime.resume().await?;
344        assert_eq!(runtime.state(), RuntimeState::Running);
345
346        runtime.stop().await?;
347        assert_eq!(runtime.state(), RuntimeState::Stopped);
348
349        Ok(())
350    }
351
352    #[tokio::test]
353    async fn test_runtime_execute() -> Result<()> {
354        let config = EdgeConfig::minimal();
355        let runtime = EdgeRuntime::new(config).await?;
356        runtime.start().await?;
357
358        let result = runtime.execute(async { Ok(42) }).await?;
359
360        assert_eq!(result, 42);
361
362        runtime.stop().await?;
363        Ok(())
364    }
365
366    #[tokio::test]
367    async fn test_runtime_health() -> Result<()> {
368        let config = EdgeConfig::minimal();
369        let runtime = EdgeRuntime::new(config).await?;
370
371        assert_eq!(runtime.health(), HealthStatus::Healthy);
372
373        Ok(())
374    }
375
376    #[tokio::test]
377    async fn test_config_validation() {
378        let mut config = EdgeConfig::minimal();
379        config.heartbeat_interval_secs = 0;
380
381        assert!(config.validate().is_err());
382    }
383}