veda_rs/
runtime_manager.rs

1//! Runtime manager that coordinates scheduler, telemetry, and feedback loop.
2
3use crate::config::Config;
4use crate::error::Result;
5use crate::runtime::Runtime;
6use std::sync::Arc;
7use std::thread;
8use std::time::Duration;
9use std::sync::atomic::{AtomicBool, Ordering};
10
11#[cfg(feature = "telemetry")]
12use crate::telemetry::{Metrics, FeedbackController};
13#[cfg(feature = "telemetry")]
14use crate::telemetry::feedback::FeedbackConfig;
15
16/// Runtime manager that coordinates adaptive scheduling
17pub struct RuntimeManager {
18    runtime: Arc<Runtime>,
19    #[cfg(feature = "telemetry")]
20    feedback_handle: Option<thread::JoinHandle<()>>,
21    #[cfg(feature = "telemetry")]
22    feedback_shutdown: Arc<AtomicBool>,
23}
24
25impl RuntimeManager {
26    pub fn new(config: Config) -> Result<Self> {
27        let runtime = Arc::new(Runtime::new(config)?);
28        
29        #[cfg(feature = "telemetry")]
30        let (feedback_handle, feedback_shutdown) = {
31            let shutdown = Arc::new(AtomicBool::new(false));
32            let handle = if runtime.config().enable_telemetry {
33                let metrics = runtime.pool.metrics.clone();
34                let shutdown_clone = shutdown.clone();
35                
36                let feedback_config = FeedbackConfig {
37                    min_task_rate: 10.0,
38                    max_latency_ns: 100_000_000, // 100ms
39                    max_power_watts: None,
40                    update_interval: Duration::from_millis(100),
41                    history_size: 100,
42                };
43                
44                let controller = FeedbackController::new(metrics, feedback_config);
45                
46                Some(thread::Builder::new()
47                    .name("veda-feedback".to_string())
48                    .spawn(move || {
49                        feedback_loop(controller, shutdown_clone);
50                    })
51                    .expect("Failed to spawn feedback thread"))
52            } else {
53                None
54            };
55            (handle, shutdown)
56        };
57        
58        Ok(Self {
59            runtime,
60            #[cfg(feature = "telemetry")]
61            feedback_handle,
62            #[cfg(feature = "telemetry")]
63            feedback_shutdown,
64        })
65    }
66    
67    pub fn runtime(&self) -> &Arc<Runtime> {
68        &self.runtime
69    }
70}
71
72impl Drop for RuntimeManager {
73    fn drop(&mut self) {
74        #[cfg(feature = "telemetry")]
75        {
76            self.feedback_shutdown.store(true, Ordering::Release);
77            if let Some(handle) = self.feedback_handle.take() {
78                let _ = handle.join();
79            }
80        }
81    }
82}
83
84#[cfg(feature = "telemetry")]
85fn feedback_loop(controller: FeedbackController, shutdown: Arc<AtomicBool>) {
86    use crate::util::{BackpressureController, BackpressureConfig};
87    
88    // Create backpressure controller for adaptive rate limiting
89    let backpressure = BackpressureController::new(BackpressureConfig {
90        max_queue_size: 10_000,
91        target_latency_ms: 100,
92        rate_limit_per_sec: None,
93        backoff_factor: 0.5,
94    });
95    
96    while !shutdown.load(Ordering::Acquire) {
97        let action = controller.update();
98        
99        match action {
100            crate::telemetry::feedback::FeedbackAction::IncreaseParallelism { reason } => {
101                // Increase max queue size to allow more parallel work
102                let current_max = backpressure.queue_size() * 2;
103                backpressure.set_max_queue_size(current_max.min(50_000));
104                
105                if cfg!(debug_assertions) {
106                    eprintln!("[VEDA Feedback] Increasing parallelism: {} (new max queue: {})", 
107                        reason, current_max);
108                }
109            }
110            crate::telemetry::feedback::FeedbackAction::ReduceLoad { reason } => {
111                // Reduce max queue size to throttle admission
112                let current_max = (backpressure.queue_size() * 3 / 4).max(1000);
113                backpressure.set_max_queue_size(current_max);
114                
115                if cfg!(debug_assertions) {
116                    eprintln!("[VEDA Feedback] Reducing load: {} (new max queue: {})", 
117                        reason, current_max);
118                }
119            }
120            crate::telemetry::feedback::FeedbackAction::OptimizeResources { reason } => {
121                // Get metrics delta to determine optimization direction
122                if let Some(delta) = controller.compute_delta() {
123                    if delta.utilization_change < -0.1 {
124                        // Utilization dropping, tighten queue
125                        let current_max = (backpressure.queue_size() * 9 / 10).max(1000);
126                        backpressure.set_max_queue_size(current_max);
127                    }
128                }
129                
130                if cfg!(debug_assertions) {
131                    eprintln!("[VEDA Feedback] Optimizing resources: {}", reason);
132                }
133            }
134            crate::telemetry::feedback::FeedbackAction::None => {}
135        }
136        
137        thread::sleep(Duration::from_millis(100));
138    }
139}