pub mod executor;
pub mod scheduler;
use crate::cache::{Cache, CacheConfig};
use crate::compression::{AdaptiveCompressor, CompressionLevel};
use crate::error::{EdgeError, Result};
use crate::resource::{HealthStatus, ResourceConstraints, ResourceManager};
use crate::sync::{SyncManager, SyncStrategy};
use futures::Future;
use parking_lot::RwLock;
use serde::{Deserialize, Serialize};
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
pub use executor::Executor;
pub use scheduler::Scheduler;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum RuntimeMode {
Online,
Offline,
Hybrid,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EdgeConfig {
pub mode: RuntimeMode,
pub constraints: ResourceConstraints,
pub cache_config: CacheConfig,
pub compression_level: CompressionLevel,
pub sync_strategy: SyncStrategy,
pub data_dir: PathBuf,
pub enable_metrics: bool,
pub heartbeat_interval_secs: u64,
}
impl Default for EdgeConfig {
fn default() -> Self {
Self {
mode: RuntimeMode::Hybrid,
constraints: ResourceConstraints::default(),
cache_config: CacheConfig::default(),
compression_level: CompressionLevel::Balanced,
sync_strategy: SyncStrategy::Incremental,
data_dir: PathBuf::from(".oxigdal_edge"),
enable_metrics: true,
heartbeat_interval_secs: 60,
}
}
}
impl EdgeConfig {
pub fn minimal() -> Self {
Self {
mode: RuntimeMode::Offline,
constraints: ResourceConstraints::minimal(),
cache_config: CacheConfig::minimal(),
compression_level: CompressionLevel::Fast,
sync_strategy: SyncStrategy::Manual,
data_dir: PathBuf::from(".edge_minimal"),
enable_metrics: false,
heartbeat_interval_secs: 1, }
}
pub fn offline_first() -> Self {
Self {
mode: RuntimeMode::Offline,
constraints: ResourceConstraints::moderate(),
cache_config: CacheConfig::offline_first(),
compression_level: CompressionLevel::Balanced,
sync_strategy: SyncStrategy::Batch,
data_dir: PathBuf::from(".edge_offline"),
enable_metrics: true,
heartbeat_interval_secs: 5, }
}
pub fn validate(&self) -> Result<()> {
self.constraints.validate()?;
if self.heartbeat_interval_secs == 0 {
return Err(EdgeError::invalid_config(
"heartbeat_interval_secs must be > 0",
));
}
Ok(())
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum RuntimeState {
Initializing,
Running,
Paused,
ShuttingDown,
Stopped,
}
pub struct EdgeRuntime {
config: EdgeConfig,
state: Arc<RwLock<RuntimeState>>,
resource_manager: Arc<ResourceManager>,
cache: Arc<Cache>,
compressor: Arc<AdaptiveCompressor>,
sync_manager: Option<Arc<SyncManager>>,
executor: Arc<Executor>,
scheduler: Arc<Scheduler>,
}
impl EdgeRuntime {
pub async fn new(config: EdgeConfig) -> Result<Self> {
config.validate()?;
std::fs::create_dir_all(&config.data_dir)
.map_err(|e| EdgeError::runtime(format!("Failed to create data dir: {}", e)))?;
let resource_manager = Arc::new(ResourceManager::new(config.constraints.clone())?);
let cache = Arc::new(Cache::new(config.cache_config.clone())?);
let compressor = Arc::new(AdaptiveCompressor::new(config.compression_level));
let sync_manager = if config.mode != RuntimeMode::Offline {
Some(Arc::new(SyncManager::new(
config.sync_strategy,
Arc::clone(&cache),
)?))
} else {
None
};
let executor = Arc::new(Executor::new(Arc::clone(&resource_manager)));
let scheduler = Arc::new(Scheduler::new(
Arc::clone(&resource_manager),
config.heartbeat_interval_secs,
));
Ok(Self {
config,
state: Arc::new(RwLock::new(RuntimeState::Initializing)),
resource_manager,
cache,
compressor,
sync_manager,
executor,
scheduler,
})
}
pub async fn start(&self) -> Result<()> {
{
let mut state = self.state.write();
if *state != RuntimeState::Initializing && *state != RuntimeState::Stopped {
return Err(EdgeError::runtime("Runtime already started"));
}
*state = RuntimeState::Running;
}
self.scheduler.start().await?;
if let Some(sync_manager) = &self.sync_manager {
sync_manager.start().await?;
}
Ok(())
}
pub async fn stop(&self) -> Result<()> {
{
let mut state = self.state.write();
*state = RuntimeState::ShuttingDown;
}
if let Some(sync_manager) = &self.sync_manager {
sync_manager.stop().await?;
}
self.scheduler.stop().await?;
{
let mut state = self.state.write();
*state = RuntimeState::Stopped;
}
Ok(())
}
pub async fn pause(&self) -> Result<()> {
let mut state = self.state.write();
if *state != RuntimeState::Running {
return Err(EdgeError::runtime("Runtime not running"));
}
*state = RuntimeState::Paused;
Ok(())
}
pub async fn resume(&self) -> Result<()> {
let mut state = self.state.write();
if *state != RuntimeState::Paused {
return Err(EdgeError::runtime("Runtime not paused"));
}
*state = RuntimeState::Running;
Ok(())
}
pub fn state(&self) -> RuntimeState {
*self.state.read()
}
pub async fn execute<F, T>(&self, task: F) -> Result<T>
where
F: Future<Output = Result<T>> + Send + 'static,
T: Send + 'static,
{
if *self.state.read() != RuntimeState::Running {
return Err(EdgeError::runtime("Runtime not running"));
}
self.executor.execute(task).await
}
pub fn cache(&self) -> &Arc<Cache> {
&self.cache
}
pub fn compressor(&self) -> &Arc<AdaptiveCompressor> {
&self.compressor
}
pub fn resource_manager(&self) -> &Arc<ResourceManager> {
&self.resource_manager
}
pub fn sync_manager(&self) -> Option<&Arc<SyncManager>> {
self.sync_manager.as_ref()
}
pub fn health(&self) -> HealthStatus {
self.resource_manager.health_check()
}
pub fn config(&self) -> &EdgeConfig {
&self.config
}
pub async fn wait_healthy(&self, timeout: Duration) -> Result<()> {
let start = std::time::Instant::now();
loop {
if self.health() == HealthStatus::Healthy {
return Ok(());
}
if start.elapsed() > timeout {
return Err(EdgeError::timeout("Health check timeout"));
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_runtime_creation() {
let config = EdgeConfig::minimal();
let runtime = EdgeRuntime::new(config).await;
assert!(runtime.is_ok());
}
#[tokio::test]
async fn test_runtime_lifecycle() -> Result<()> {
let config = EdgeConfig::minimal();
let runtime = EdgeRuntime::new(config).await?;
assert_eq!(runtime.state(), RuntimeState::Initializing);
runtime.start().await?;
assert_eq!(runtime.state(), RuntimeState::Running);
runtime.pause().await?;
assert_eq!(runtime.state(), RuntimeState::Paused);
runtime.resume().await?;
assert_eq!(runtime.state(), RuntimeState::Running);
runtime.stop().await?;
assert_eq!(runtime.state(), RuntimeState::Stopped);
Ok(())
}
#[tokio::test]
async fn test_runtime_execute() -> Result<()> {
let config = EdgeConfig::minimal();
let runtime = EdgeRuntime::new(config).await?;
runtime.start().await?;
let result = runtime.execute(async { Ok(42) }).await?;
assert_eq!(result, 42);
runtime.stop().await?;
Ok(())
}
#[tokio::test]
async fn test_runtime_health() -> Result<()> {
let config = EdgeConfig::minimal();
let runtime = EdgeRuntime::new(config).await?;
assert_eq!(runtime.health(), HealthStatus::Healthy);
Ok(())
}
#[tokio::test]
async fn test_config_validation() {
let mut config = EdgeConfig::minimal();
config.heartbeat_interval_secs = 0;
assert!(config.validate().is_err());
}
}