veda_rs/
runtime_manager.rs1use crate::config::Config;
4use crate::error::Result;
5use crate::runtime::Runtime;
6use std::sync::Arc;
7use std::thread;
8use std::time::Duration;
9use std::sync::atomic::{AtomicBool, Ordering};
10
11#[cfg(feature = "telemetry")]
12use crate::telemetry::{Metrics, FeedbackController};
13#[cfg(feature = "telemetry")]
14use crate::telemetry::feedback::FeedbackConfig;
15
16pub struct RuntimeManager {
18 runtime: Arc<Runtime>,
19 #[cfg(feature = "telemetry")]
20 feedback_handle: Option<thread::JoinHandle<()>>,
21 #[cfg(feature = "telemetry")]
22 feedback_shutdown: Arc<AtomicBool>,
23}
24
25impl RuntimeManager {
26 pub fn new(config: Config) -> Result<Self> {
27 let runtime = Arc::new(Runtime::new(config)?);
28
29 #[cfg(feature = "telemetry")]
30 let (feedback_handle, feedback_shutdown) = {
31 let shutdown = Arc::new(AtomicBool::new(false));
32 let handle = if runtime.config().enable_telemetry {
33 let metrics = runtime.pool.metrics.clone();
34 let shutdown_clone = shutdown.clone();
35
36 let feedback_config = FeedbackConfig {
37 min_task_rate: 10.0,
38 max_latency_ns: 100_000_000, max_power_watts: None,
40 update_interval: Duration::from_millis(100),
41 history_size: 100,
42 };
43
44 let controller = FeedbackController::new(metrics, feedback_config);
45
46 Some(thread::Builder::new()
47 .name("veda-feedback".to_string())
48 .spawn(move || {
49 feedback_loop(controller, shutdown_clone);
50 })
51 .expect("Failed to spawn feedback thread"))
52 } else {
53 None
54 };
55 (handle, shutdown)
56 };
57
58 Ok(Self {
59 runtime,
60 #[cfg(feature = "telemetry")]
61 feedback_handle,
62 #[cfg(feature = "telemetry")]
63 feedback_shutdown,
64 })
65 }
66
67 pub fn runtime(&self) -> &Arc<Runtime> {
68 &self.runtime
69 }
70}
71
72impl Drop for RuntimeManager {
73 fn drop(&mut self) {
74 #[cfg(feature = "telemetry")]
75 {
76 self.feedback_shutdown.store(true, Ordering::Release);
77 if let Some(handle) = self.feedback_handle.take() {
78 let _ = handle.join();
79 }
80 }
81 }
82}
83
84#[cfg(feature = "telemetry")]
85fn feedback_loop(controller: FeedbackController, shutdown: Arc<AtomicBool>) {
86 use crate::util::{BackpressureController, BackpressureConfig};
87
88 let backpressure = BackpressureController::new(BackpressureConfig {
90 max_queue_size: 10_000,
91 target_latency_ms: 100,
92 rate_limit_per_sec: None,
93 backoff_factor: 0.5,
94 });
95
96 while !shutdown.load(Ordering::Acquire) {
97 let action = controller.update();
98
99 match action {
100 crate::telemetry::feedback::FeedbackAction::IncreaseParallelism { reason } => {
101 let current_max = backpressure.queue_size() * 2;
103 backpressure.set_max_queue_size(current_max.min(50_000));
104
105 if cfg!(debug_assertions) {
106 eprintln!("[VEDA Feedback] Increasing parallelism: {} (new max queue: {})",
107 reason, current_max);
108 }
109 }
110 crate::telemetry::feedback::FeedbackAction::ReduceLoad { reason } => {
111 let current_max = (backpressure.queue_size() * 3 / 4).max(1000);
113 backpressure.set_max_queue_size(current_max);
114
115 if cfg!(debug_assertions) {
116 eprintln!("[VEDA Feedback] Reducing load: {} (new max queue: {})",
117 reason, current_max);
118 }
119 }
120 crate::telemetry::feedback::FeedbackAction::OptimizeResources { reason } => {
121 if let Some(delta) = controller.compute_delta() {
123 if delta.utilization_change < -0.1 {
124 let current_max = (backpressure.queue_size() * 9 / 10).max(1000);
126 backpressure.set_max_queue_size(current_max);
127 }
128 }
129
130 if cfg!(debug_assertions) {
131 eprintln!("[VEDA Feedback] Optimizing resources: {}", reason);
132 }
133 }
134 crate::telemetry::feedback::FeedbackAction::None => {}
135 }
136
137 thread::sleep(Duration::from_millis(100));
138 }
139}