1use crate::queue::{QueueError, VibeQueue};
9use crate::signals::{NodeId, SystemSignal};
10use crate::task::{Task, TaskExecutionOutcome};
11use crate::types::{LocalStats, NodeError};
12use crate::utils::elapsed_ns;
13use std::sync::{
14 Arc, Mutex,
15 atomic::{AtomicBool, AtomicUsize, Ordering},
16 mpsc,
17};
18use std::thread::{self, JoinHandle};
19use std::time::Duration;
20
21#[derive(Debug, Clone, Copy, PartialEq, Eq)]
23pub enum PressureLevel {
24 Empty,
26 Low,
28 Normal,
30 High,
32 Full,
34}
35
36pub struct VibeNode {
42 pub node_id: NodeId,
44 pub task_queue: VibeQueue<Task>,
46 worker_threads_handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
48 active_thread_count: Arc<AtomicUsize>,
50 pub min_threads: usize,
52 pub max_threads: usize,
54 signal_tx: mpsc::Sender<SystemSignal>,
56 local_stats: Arc<LocalStats>,
58 is_shutting_down: Arc<AtomicBool>,
60 desired_thread_count: Arc<AtomicUsize>,
62 last_scaling_time: Arc<Mutex<u64>>,
64 last_self_overload_time: Arc<Mutex<u64>>,
66 pub scale_down_cooldown: u64,
68 pressure: Arc<AtomicUsize>,
70}
71
72impl VibeNode {
73 pub fn new(
78 node_id: NodeId,
79 queue_capacity: usize,
80 min_threads: usize,
81 max_threads: usize,
82 signal_tx: mpsc::Sender<SystemSignal>,
83 scale_down_cooldown_override: Option<u64>,
84 ) -> Result<Self, String> {
85 if min_threads == 0 {
86 return Err("min_threads cannot be 0".to_string());
87 }
88 if max_threads < min_threads {
89 return Err("max_threads cannot be less than min_threads".to_string());
90 }
91
92 let task_queue = VibeQueue::new_with_signal(node_id, queue_capacity, signal_tx.clone());
93 let pressure_arc = Arc::new(AtomicUsize::new(0));
94 const NANOS_PER_SEC: u64 = 1_000_000_000;
95
96 let node = Self {
97 node_id,
98 task_queue,
99 worker_threads_handles: Arc::new(Mutex::new(Vec::new())),
100 active_thread_count: Arc::new(AtomicUsize::new(0)),
101 min_threads,
102 max_threads,
103 signal_tx,
104 local_stats: Arc::new(LocalStats::new()),
105 is_shutting_down: Arc::new(AtomicBool::new(false)),
106 desired_thread_count: Arc::new(AtomicUsize::new(min_threads)),
107 last_scaling_time: Arc::new(Mutex::new(elapsed_ns())),
108 last_self_overload_time: Arc::new(Mutex::new(
109 elapsed_ns().saturating_sub(3600 * NANOS_PER_SEC),
110 )),
111 scale_down_cooldown: scale_down_cooldown_override.unwrap_or(5 * NANOS_PER_SEC),
112 pressure: pressure_arc,
113 };
114
115 for _ in 0..node.min_threads {
117 node.spawn_worker_thread(false);
118 }
119 node.update_pressure();
120
121 Ok(node)
122 }
123
124 fn update_pressure(&self) {
126 let q = self.task_queue.len() as f64;
127 let c = self.active_thread_count.load(Ordering::Relaxed) as f64;
128 let k = self.task_queue.capacity() as f64;
129
130 let pressure_float = if c > 0.0 && k > 0.0 {
131 (q / k) * 100.0
132 } else if q > 0.0 {
133 100.0
134 } else {
135 0.0
136 };
137 self.pressure
138 .store(pressure_float.clamp(0.0, 100.0) as usize, Ordering::Relaxed);
139 }
140
141 pub fn get_pressure(&self) -> usize {
143 self.pressure.load(Ordering::Relaxed)
144 }
145
146 pub fn max_pressure(&self) -> usize {
148 100
149 }
150
151 pub fn get_pressure_level(&self) -> PressureLevel {
153 match self.get_pressure() {
154 0 => PressureLevel::Empty,
155 1..=25 => PressureLevel::Low,
156 26..=75 => PressureLevel::Normal,
157 76..=99 => PressureLevel::High,
158 _ => PressureLevel::Full,
159 }
160 }
161
162 fn spawn_worker_thread(&self, triggered_by_overload: bool) {
164 if self.active_threads() >= self.max_threads {
165 return;
166 }
167 if self.is_shutting_down.load(Ordering::Relaxed) {
168 return;
169 }
170
171 self.active_thread_count.fetch_add(1, Ordering::SeqCst);
172 self.desired_thread_count
173 .store(self.active_threads(), Ordering::SeqCst);
174
175 let now = elapsed_ns();
176 *self
177 .last_scaling_time
178 .lock()
179 .expect("Mutex should not be poisoned") = now;
180 if triggered_by_overload {
181 *self
182 .last_self_overload_time
183 .lock()
184 .expect("Mutex should not be poisoned") = now;
185 }
186
187 self.update_pressure();
188
189 let worker_context = self.clone_for_worker();
190 let handle = thread::Builder::new()
191 .name(format!(
192 "vibe-node-{}-worker-{}",
193 self.node_id.0,
194 self.active_threads()
195 ))
196 .spawn(move || worker_context.run_loop())
197 .expect("Failed to spawn worker thread");
198
199 self.worker_threads_handles
200 .lock()
201 .expect("Mutex should not be poisoned")
202 .push(handle);
203 }
204
205 pub(crate) fn submit_task(&self, task: Task) -> Result<(), NodeError> {
210 if self.is_shutting_down.load(Ordering::Relaxed) {
211 return Err(NodeError::NodeShuttingDown);
212 }
213 self.local_stats.task_submitted();
214 match self.task_queue.enqueue(task) {
215 Ok(()) => {
216 self.update_pressure();
217 if self.get_pressure_level() == PressureLevel::High
218 || self.get_pressure_level() == PressureLevel::Full
219 {
220 self.spawn_worker_thread(true);
221 }
222 Ok(())
223 }
224 Err(QueueError::Full) => {
225 *self
226 .last_self_overload_time
227 .lock()
228 .expect("Mutex should not be poisoned") = elapsed_ns();
229 self.spawn_worker_thread(true);
230 Err(NodeError::QueueFull)
231 }
232 Err(e) => Err(NodeError::from(e)),
233 }
234 }
235
236 pub fn shutdown(&self) {
241 if self
242 .is_shutting_down
243 .compare_exchange(false, true, Ordering::SeqCst, Ordering::Relaxed)
244 .is_ok()
245 {
246 self.task_queue.close();
247 self.desired_thread_count.store(0, Ordering::SeqCst);
248 let mut workers = self
249 .worker_threads_handles
250 .lock()
251 .expect("Mutex should not be poisoned");
252 for handle in workers.drain(..) {
253 let _ = handle.join();
254 }
255 }
256 }
257
258 fn clone_for_worker(&self) -> WorkerContext {
262 WorkerContext {
263 node_id: self.node_id,
264 active_thread_count: Arc::clone(&self.active_thread_count),
265 desired_thread_count: Arc::clone(&self.desired_thread_count),
266 min_threads: self.min_threads,
267 max_threads: self.max_threads,
268 task_queue: self.task_queue.clone(),
269 last_scaling_time: Arc::clone(&self.last_scaling_time),
270 last_self_overload_time: Arc::clone(&self.last_self_overload_time),
271 scale_down_cooldown: self.scale_down_cooldown,
272 node_pressure_atomic: Arc::clone(&self.pressure),
273 is_shutting_down: Arc::clone(&self.is_shutting_down),
274 signal_tx: self.signal_tx.clone(),
275 local_stats: Arc::clone(&self.local_stats),
276 }
277 }
278
279 pub fn id(&self) -> NodeId {
281 self.node_id
282 }
283
284 pub fn active_threads(&self) -> usize {
286 self.active_thread_count.load(Ordering::Relaxed)
287 }
288
289 pub fn desired_threads(&self) -> usize {
291 self.desired_thread_count.load(Ordering::Relaxed)
292 }
293}
294
295struct WorkerContext {
297 node_id: NodeId,
298 active_thread_count: Arc<AtomicUsize>,
299 desired_thread_count: Arc<AtomicUsize>,
300 min_threads: usize,
301 max_threads: usize,
302 task_queue: VibeQueue<Task>,
303 last_scaling_time: Arc<Mutex<u64>>,
304 last_self_overload_time: Arc<Mutex<u64>>,
305 scale_down_cooldown: u64,
306 node_pressure_atomic: Arc<AtomicUsize>,
307 is_shutting_down: Arc<AtomicBool>,
308 signal_tx: mpsc::Sender<SystemSignal>,
309 local_stats: Arc<LocalStats>,
310}
311
312impl WorkerContext {
313 fn run_loop(self) {
318 let mut retired_by_choice = false;
319 loop {
320 if self.is_shutting_down.load(Ordering::Relaxed) {
321 break;
322 }
323
324 if let Some(task) = self.task_queue.dequeue() {
325 self.update_pressure_from_context();
326 let _ = self.signal_tx.send(SystemSignal::TaskDequeuedByWorker {
327 node_id: self.node_id,
328 task_id: task.id,
329 });
330 self.process_task(task);
331 } else {
332 if self.is_shutting_down.load(Ordering::Relaxed) {
333 break;
334 }
335
336 self.consider_scaling_down();
337 if self.check_and_attempt_self_retirement() {
338 retired_by_choice = true;
339 break;
340 }
341 thread::sleep(Duration::from_millis(5));
342 }
343 }
344
345 if !retired_by_choice {
346 self.active_thread_count.fetch_sub(1, Ordering::SeqCst);
347 self.update_pressure_from_context();
348 }
349 }
350
351 fn process_task(&self, task: Task) {
353 let task_id = task.id;
354 let start_time_ns = elapsed_ns();
355
356 let outcome = task.run();
357
358 let duration_ns = elapsed_ns().saturating_sub(start_time_ns);
359
360 let was_logically_successful = outcome == TaskExecutionOutcome::Success;
361 self.local_stats
362 .record_task_outcome(duration_ns, was_logically_successful);
363
364 if !self.is_shutting_down.load(Ordering::Relaxed) {
365 let signal = SystemSignal::TaskProcessed {
366 node_id: self.node_id,
367 task_id,
368 duration_micros: duration_ns / 1000,
369 };
370 let _ = self.signal_tx.send(signal);
371 }
372 }
373
374 fn consider_scaling_down(&self) {
376 let current_desired = self.desired_thread_count.load(Ordering::SeqCst);
377 if current_desired <= self.min_threads {
378 return;
379 }
380
381 let now = elapsed_ns();
382 let last_scale_time = *self
383 .last_scaling_time
384 .lock()
385 .expect("Mutex should not be poisoned");
386 let last_overload_time = *self
387 .last_self_overload_time
388 .lock()
389 .expect("Mutex should not be poisoned");
390
391 if now.saturating_sub(last_scale_time) < self.scale_down_cooldown {
392 return;
393 }
394 if now.saturating_sub(last_overload_time) < self.scale_down_cooldown {
395 return;
396 }
397
398 let pressure = self.get_pressure_from_context();
399 let pressure_level = self.get_pressure_level_from_pressure(pressure);
400
401 if (pressure_level == PressureLevel::Empty || pressure_level == PressureLevel::Low)
402 && self
403 .desired_thread_count
404 .compare_exchange(
405 current_desired,
406 current_desired - 1,
407 Ordering::SeqCst,
408 Ordering::Relaxed,
409 )
410 .is_ok()
411 {
412 *self
413 .last_scaling_time
414 .lock()
415 .expect("Mutex should not be poisoned") = now;
416 }
417 }
418
419 fn check_and_attempt_self_retirement(&self) -> bool {
421 let current_active = self.active_thread_count.load(Ordering::SeqCst);
422 if current_active <= self.min_threads
423 || current_active <= self.desired_thread_count.load(Ordering::SeqCst)
424 {
425 return false;
426 }
427
428 if self
429 .active_thread_count
430 .compare_exchange(
431 current_active,
432 current_active - 1,
433 Ordering::SeqCst,
434 Ordering::Relaxed,
435 )
436 .is_ok()
437 {
438 *self
439 .last_scaling_time
440 .lock()
441 .expect("Mutex should not be poisoned") = elapsed_ns();
442 self.update_pressure_from_context();
443 true
444 } else {
445 false
446 }
447 }
448
449 fn get_pressure_from_context(&self) -> usize {
451 let q = self.task_queue.len() as f64;
452 let c = self.active_thread_count.load(Ordering::Relaxed) as f64;
453 let k = self.task_queue.capacity() as f64;
454 let pressure_float = if c > 0.0 && k > 0.0 {
455 (q / k) * 100.0
456 } else if q > 0.0 {
457 100.0
458 } else {
459 0.0
460 };
461 pressure_float.clamp(0.0, 100.0) as usize
462 }
463
464 fn get_pressure_level_from_pressure(&self, pressure: usize) -> PressureLevel {
466 match pressure {
467 0 => PressureLevel::Empty,
468 1..=25 => PressureLevel::Low,
469 26..=75 => PressureLevel::Normal,
470 76..=99 => PressureLevel::High,
471 _ => PressureLevel::Full,
472 }
473 }
474
475 fn update_pressure_from_context(&self) {
477 let pressure = self.get_pressure_from_context();
478 self.node_pressure_atomic.store(pressure, Ordering::Relaxed);
479 }
480}
481
482impl Drop for VibeNode {
483 fn drop(&mut self) {
485 if !self.is_shutting_down.load(Ordering::Relaxed) {
486 self.shutdown();
487 }
488 }
489}
490
491impl From<QueueError> for NodeError {
492 fn from(qe: QueueError) -> Self {
494 match qe {
495 QueueError::Full => NodeError::QueueFull,
496 QueueError::Closed => NodeError::QueueClosed,
497 QueueError::SendError => NodeError::SignalSendError,
498 QueueError::Empty => NodeError::QueueClosed,
499 }
500 }
501}