use crate::error::{EdgeError, Result};
use crate::resource::ResourceManager;
use parking_lot::RwLock;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;
use tokio::task::JoinHandle;
pub type ScheduledTask = Box<dyn Fn() -> Result<()> + Send + Sync>;
pub struct Scheduler {
resource_manager: Arc<ResourceManager>,
heartbeat_interval: Duration,
running: Arc<AtomicBool>,
handle: Arc<RwLock<Option<JoinHandle<()>>>>,
}
impl Scheduler {
pub fn new(resource_manager: Arc<ResourceManager>, heartbeat_interval_secs: u64) -> Self {
Self {
resource_manager,
heartbeat_interval: Duration::from_secs(heartbeat_interval_secs),
running: Arc::new(AtomicBool::new(false)),
handle: Arc::new(RwLock::new(None)),
}
}
pub async fn start(&self) -> Result<()> {
if self.running.load(Ordering::Relaxed) {
return Err(EdgeError::runtime("Scheduler already running"));
}
self.running.store(true, Ordering::Relaxed);
let resource_manager = Arc::clone(&self.resource_manager);
let heartbeat_interval = self.heartbeat_interval;
let running = Arc::clone(&self.running);
let handle = tokio::spawn(async move {
while running.load(Ordering::Relaxed) {
Self::heartbeat(&resource_manager);
tokio::time::sleep(heartbeat_interval).await;
}
});
let mut handle_lock = self.handle.write();
*handle_lock = Some(handle);
Ok(())
}
pub async fn stop(&self) -> Result<()> {
if !self.running.load(Ordering::Relaxed) {
return Ok(());
}
self.running.store(false, Ordering::Relaxed);
let handle = {
let mut handle_lock = self.handle.write();
handle_lock.take()
};
if let Some(handle) = handle {
let timeout_duration = Duration::from_secs(5);
match tokio::time::timeout(timeout_duration, handle).await {
Ok(_) => {}
Err(_) => {
tracing::warn!("Scheduler stop timed out after {:?}", timeout_duration);
}
}
}
Ok(())
}
fn heartbeat(resource_manager: &ResourceManager) {
let cpu_usage = Self::sample_cpu();
resource_manager.record_cpu_sample(cpu_usage);
let metrics = resource_manager.metrics();
tracing::debug!(
memory_bytes = metrics.memory_bytes,
cpu_percent = metrics.cpu_percent,
active_ops = metrics.active_operations,
"Heartbeat"
);
}
fn sample_cpu() -> f64 {
0.0
}
pub fn is_running(&self) -> bool {
self.running.load(Ordering::Relaxed)
}
}
impl Drop for Scheduler {
fn drop(&mut self) {
self.running.store(false, Ordering::Relaxed);
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::resource::ResourceConstraints;
#[tokio::test]
async fn test_scheduler_lifecycle() -> Result<()> {
let constraints = ResourceConstraints::minimal();
let manager = Arc::new(ResourceManager::new(constraints)?);
let scheduler = Scheduler::new(manager, 1);
assert!(!scheduler.is_running());
scheduler.start().await?;
assert!(scheduler.is_running());
tokio::time::sleep(Duration::from_millis(100)).await;
scheduler.stop().await?;
assert!(!scheduler.is_running());
Ok(())
}
#[tokio::test]
async fn test_scheduler_heartbeat() -> Result<()> {
let constraints = ResourceConstraints::minimal();
let manager = Arc::new(ResourceManager::new(constraints)?);
let scheduler = Scheduler::new(manager, 1);
scheduler.start().await?;
tokio::time::sleep(Duration::from_millis(250)).await; scheduler.stop().await?;
Ok(())
}
}