1pub mod executor;
4pub mod scheduler;
5
6use crate::cache::{Cache, CacheConfig};
7use crate::compression::{AdaptiveCompressor, CompressionLevel};
8use crate::error::{EdgeError, Result};
9use crate::resource::{HealthStatus, ResourceConstraints, ResourceManager};
10use crate::sync::{SyncManager, SyncStrategy};
11use futures::Future;
12use parking_lot::RwLock;
13use serde::{Deserialize, Serialize};
14use std::path::PathBuf;
15use std::sync::Arc;
16use std::time::Duration;
17
18pub use executor::Executor;
19pub use scheduler::Scheduler;
20
21#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
23pub enum RuntimeMode {
24 Online,
26 Offline,
28 Hybrid,
30}
31
32#[derive(Debug, Clone, Serialize, Deserialize)]
34pub struct EdgeConfig {
35 pub mode: RuntimeMode,
37 pub constraints: ResourceConstraints,
39 pub cache_config: CacheConfig,
41 pub compression_level: CompressionLevel,
43 pub sync_strategy: SyncStrategy,
45 pub data_dir: PathBuf,
47 pub enable_metrics: bool,
49 pub heartbeat_interval_secs: u64,
51}
52
53impl Default for EdgeConfig {
54 fn default() -> Self {
55 Self {
56 mode: RuntimeMode::Hybrid,
57 constraints: ResourceConstraints::default(),
58 cache_config: CacheConfig::default(),
59 compression_level: CompressionLevel::Balanced,
60 sync_strategy: SyncStrategy::Incremental,
61 data_dir: PathBuf::from(".oxigdal_edge"),
62 enable_metrics: true,
63 heartbeat_interval_secs: 60,
64 }
65 }
66}
67
68impl EdgeConfig {
69 pub fn minimal() -> Self {
71 Self {
72 mode: RuntimeMode::Offline,
73 constraints: ResourceConstraints::minimal(),
74 cache_config: CacheConfig::minimal(),
75 compression_level: CompressionLevel::Fast,
76 sync_strategy: SyncStrategy::Manual,
77 data_dir: PathBuf::from(".edge_minimal"),
78 enable_metrics: false,
79 heartbeat_interval_secs: 1, }
81 }
82
83 pub fn offline_first() -> Self {
85 Self {
86 mode: RuntimeMode::Offline,
87 constraints: ResourceConstraints::moderate(),
88 cache_config: CacheConfig::offline_first(),
89 compression_level: CompressionLevel::Balanced,
90 sync_strategy: SyncStrategy::Batch,
91 data_dir: PathBuf::from(".edge_offline"),
92 enable_metrics: true,
93 heartbeat_interval_secs: 5, }
95 }
96
97 pub fn validate(&self) -> Result<()> {
99 self.constraints.validate()?;
100
101 if self.heartbeat_interval_secs == 0 {
102 return Err(EdgeError::invalid_config(
103 "heartbeat_interval_secs must be > 0",
104 ));
105 }
106
107 Ok(())
108 }
109}
110
111#[derive(Debug, Clone, Copy, PartialEq, Eq)]
113pub enum RuntimeState {
114 Initializing,
116 Running,
118 Paused,
120 ShuttingDown,
122 Stopped,
124}
125
126pub struct EdgeRuntime {
128 config: EdgeConfig,
129 state: Arc<RwLock<RuntimeState>>,
130 resource_manager: Arc<ResourceManager>,
131 cache: Arc<Cache>,
132 compressor: Arc<AdaptiveCompressor>,
133 sync_manager: Option<Arc<SyncManager>>,
134 executor: Arc<Executor>,
135 scheduler: Arc<Scheduler>,
136}
137
138impl EdgeRuntime {
139 pub async fn new(config: EdgeConfig) -> Result<Self> {
141 config.validate()?;
142
143 std::fs::create_dir_all(&config.data_dir)
145 .map_err(|e| EdgeError::runtime(format!("Failed to create data dir: {}", e)))?;
146
147 let resource_manager = Arc::new(ResourceManager::new(config.constraints.clone())?);
149
150 let cache = Arc::new(Cache::new(config.cache_config.clone())?);
152
153 let compressor = Arc::new(AdaptiveCompressor::new(config.compression_level));
155
156 let sync_manager = if config.mode != RuntimeMode::Offline {
158 Some(Arc::new(SyncManager::new(
159 config.sync_strategy,
160 Arc::clone(&cache),
161 )?))
162 } else {
163 None
164 };
165
166 let executor = Arc::new(Executor::new(Arc::clone(&resource_manager)));
168
169 let scheduler = Arc::new(Scheduler::new(
171 Arc::clone(&resource_manager),
172 config.heartbeat_interval_secs,
173 ));
174
175 Ok(Self {
176 config,
177 state: Arc::new(RwLock::new(RuntimeState::Initializing)),
178 resource_manager,
179 cache,
180 compressor,
181 sync_manager,
182 executor,
183 scheduler,
184 })
185 }
186
187 pub async fn start(&self) -> Result<()> {
189 {
190 let mut state = self.state.write();
191 if *state != RuntimeState::Initializing && *state != RuntimeState::Stopped {
192 return Err(EdgeError::runtime("Runtime already started"));
193 }
194 *state = RuntimeState::Running;
195 } self.scheduler.start().await?;
199
200 if let Some(sync_manager) = &self.sync_manager {
202 sync_manager.start().await?;
203 }
204
205 Ok(())
206 }
207
208 pub async fn stop(&self) -> Result<()> {
210 {
211 let mut state = self.state.write();
212 *state = RuntimeState::ShuttingDown;
213 } if let Some(sync_manager) = &self.sync_manager {
217 sync_manager.stop().await?;
218 }
219
220 self.scheduler.stop().await?;
222
223 {
224 let mut state = self.state.write();
225 *state = RuntimeState::Stopped;
226 } Ok(())
229 }
230
231 pub async fn pause(&self) -> Result<()> {
233 let mut state = self.state.write();
234 if *state != RuntimeState::Running {
235 return Err(EdgeError::runtime("Runtime not running"));
236 }
237
238 *state = RuntimeState::Paused;
239 Ok(())
240 }
241
242 pub async fn resume(&self) -> Result<()> {
244 let mut state = self.state.write();
245 if *state != RuntimeState::Paused {
246 return Err(EdgeError::runtime("Runtime not paused"));
247 }
248
249 *state = RuntimeState::Running;
250 Ok(())
251 }
252
253 pub fn state(&self) -> RuntimeState {
255 *self.state.read()
256 }
257
258 pub async fn execute<F, T>(&self, task: F) -> Result<T>
260 where
261 F: Future<Output = Result<T>> + Send + 'static,
262 T: Send + 'static,
263 {
264 if *self.state.read() != RuntimeState::Running {
265 return Err(EdgeError::runtime("Runtime not running"));
266 }
267
268 self.executor.execute(task).await
269 }
270
271 pub fn cache(&self) -> &Arc<Cache> {
273 &self.cache
274 }
275
276 pub fn compressor(&self) -> &Arc<AdaptiveCompressor> {
278 &self.compressor
279 }
280
281 pub fn resource_manager(&self) -> &Arc<ResourceManager> {
283 &self.resource_manager
284 }
285
286 pub fn sync_manager(&self) -> Option<&Arc<SyncManager>> {
288 self.sync_manager.as_ref()
289 }
290
291 pub fn health(&self) -> HealthStatus {
293 self.resource_manager.health_check()
294 }
295
296 pub fn config(&self) -> &EdgeConfig {
298 &self.config
299 }
300
301 pub async fn wait_healthy(&self, timeout: Duration) -> Result<()> {
303 let start = std::time::Instant::now();
304
305 loop {
306 if self.health() == HealthStatus::Healthy {
307 return Ok(());
308 }
309
310 if start.elapsed() > timeout {
311 return Err(EdgeError::timeout("Health check timeout"));
312 }
313
314 tokio::time::sleep(Duration::from_millis(100)).await;
315 }
316 }
317}
318
319#[cfg(test)]
320mod tests {
321 use super::*;
322
323 #[tokio::test]
324 async fn test_runtime_creation() {
325 let config = EdgeConfig::minimal();
326 let runtime = EdgeRuntime::new(config).await;
327 assert!(runtime.is_ok());
328 }
329
330 #[tokio::test]
331 async fn test_runtime_lifecycle() -> Result<()> {
332 let config = EdgeConfig::minimal();
333 let runtime = EdgeRuntime::new(config).await?;
334
335 assert_eq!(runtime.state(), RuntimeState::Initializing);
336
337 runtime.start().await?;
338 assert_eq!(runtime.state(), RuntimeState::Running);
339
340 runtime.pause().await?;
341 assert_eq!(runtime.state(), RuntimeState::Paused);
342
343 runtime.resume().await?;
344 assert_eq!(runtime.state(), RuntimeState::Running);
345
346 runtime.stop().await?;
347 assert_eq!(runtime.state(), RuntimeState::Stopped);
348
349 Ok(())
350 }
351
352 #[tokio::test]
353 async fn test_runtime_execute() -> Result<()> {
354 let config = EdgeConfig::minimal();
355 let runtime = EdgeRuntime::new(config).await?;
356 runtime.start().await?;
357
358 let result = runtime.execute(async { Ok(42) }).await?;
359
360 assert_eq!(result, 42);
361
362 runtime.stop().await?;
363 Ok(())
364 }
365
366 #[tokio::test]
367 async fn test_runtime_health() -> Result<()> {
368 let config = EdgeConfig::minimal();
369 let runtime = EdgeRuntime::new(config).await?;
370
371 assert_eq!(runtime.health(), HealthStatus::Healthy);
372
373 Ok(())
374 }
375
376 #[tokio::test]
377 async fn test_config_validation() {
378 let mut config = EdgeConfig::minimal();
379 config.heartbeat_interval_secs = 0;
380
381 assert!(config.validate().is_err());
382 }
383}