vibe_code/
node.rs

1//! Defines `VibeNode`, a self-contained processing unit.
2//!
3//! A `VibeNode` is like a single worker in the system's factory. It has its own
4//! task queue and a pool of threads to execute those tasks. It's designed to be
5//! self-managing, automatically adjusting the number of active threads based on
6//! its current workload (pressure).
7
8use crate::queue::{QueueError, VibeQueue};
9use crate::signals::{NodeId, SystemSignal};
10use crate::task::{Task, TaskExecutionOutcome};
11use crate::types::{LocalStats, NodeError};
12use crate::utils::elapsed_ns;
13use std::sync::{
14    Arc, Mutex,
15    atomic::{AtomicBool, AtomicUsize, Ordering},
16    mpsc,
17};
18use std::thread::{self, JoinHandle};
19use std::time::Duration;
20
21/// A qualitative measure of a node's current workload.
22#[derive(Debug, Clone, Copy, PartialEq, Eq)]
23pub enum PressureLevel {
24    /// The task queue is empty.
25    Empty,
26    /// The task queue has a few tasks.
27    Low,
28    /// The task queue is at a healthy, normal level.
29    Normal,
30    /// The task queue is nearly full, indicating high load.
31    High,
32    /// The task queue is at maximum capacity.
33    Full,
34}
35
36/// A processing unit that executes tasks.
37///
38/// Each `VibeNode` contains a task queue and a dynamic pool of worker threads.
39/// It reports its status (e.g., overloaded, idle) to the central system, which
40/// helps with load balancing.
41pub struct VibeNode {
42    /// A unique identifier for this node.
43    pub node_id: NodeId,
44    /// The queue that holds tasks waiting to be executed by this node.
45    pub task_queue: VibeQueue<Task>,
46    /// Handles to the worker threads managed by this node.
47    worker_threads_handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
48    /// The number of currently active worker threads.
49    active_thread_count: Arc<AtomicUsize>,
50    /// The minimum number of worker threads to keep alive.
51    pub min_threads: usize,
52    /// The maximum number of worker threads this node can spawn.
53    pub max_threads: usize,
54    /// A channel to send signals (like "I'm overloaded!") to the central system.
55    signal_tx: mpsc::Sender<SystemSignal>,
56    /// Performance statistics for this specific node.
57    local_stats: Arc<LocalStats>,
58    /// A flag to indicate that the node is in the process of shutting down.
59    is_shutting_down: Arc<AtomicBool>,
60    /// The target number of worker threads, adjusted dynamically based on load.
61    desired_thread_count: Arc<AtomicUsize>,
62    /// The timestamp of the last scaling event (adding or removing a thread).
63    last_scaling_time: Arc<Mutex<u64>>,
64    /// The timestamp of the last time this node's queue was full.
65    last_self_overload_time: Arc<Mutex<u64>>,
66    /// A cooldown period to prevent scaling down threads too aggressively.
67    pub scale_down_cooldown: u64,
68    /// A percentage (0-100) representing the current queue load.
69    pressure: Arc<AtomicUsize>,
70}
71
72impl VibeNode {
73    /// Creates and initializes a new `VibeNode`.
74    ///
75    /// This sets up the task queue, thread limits, and spawns the minimum
76    /// number of worker threads to start processing tasks.
77    pub fn new(
78        node_id: NodeId,
79        queue_capacity: usize,
80        min_threads: usize,
81        max_threads: usize,
82        signal_tx: mpsc::Sender<SystemSignal>,
83        scale_down_cooldown_override: Option<u64>,
84    ) -> Result<Self, String> {
85        if min_threads == 0 {
86            return Err("min_threads cannot be 0".to_string());
87        }
88        if max_threads < min_threads {
89            return Err("max_threads cannot be less than min_threads".to_string());
90        }
91
92        let task_queue = VibeQueue::new_with_signal(node_id, queue_capacity, signal_tx.clone());
93        let pressure_arc = Arc::new(AtomicUsize::new(0));
94        const NANOS_PER_SEC: u64 = 1_000_000_000;
95
96        let node = Self {
97            node_id,
98            task_queue,
99            worker_threads_handles: Arc::new(Mutex::new(Vec::new())),
100            active_thread_count: Arc::new(AtomicUsize::new(0)),
101            min_threads,
102            max_threads,
103            signal_tx,
104            local_stats: Arc::new(LocalStats::new()),
105            is_shutting_down: Arc::new(AtomicBool::new(false)),
106            desired_thread_count: Arc::new(AtomicUsize::new(min_threads)),
107            last_scaling_time: Arc::new(Mutex::new(elapsed_ns())),
108            last_self_overload_time: Arc::new(Mutex::new(
109                elapsed_ns().saturating_sub(3600 * NANOS_PER_SEC),
110            )),
111            scale_down_cooldown: scale_down_cooldown_override.unwrap_or(5 * NANOS_PER_SEC),
112            pressure: pressure_arc,
113        };
114
115        // Spawn the initial set of worker threads.
116        for _ in 0..node.min_threads {
117            node.spawn_worker_thread(false);
118        }
119        node.update_pressure();
120
121        Ok(node)
122    }
123
124    /// Calculates and updates the node's pressure based on queue fullness.
125    fn update_pressure(&self) {
126        let q = self.task_queue.len() as f64;
127        let c = self.active_thread_count.load(Ordering::Relaxed) as f64;
128        let k = self.task_queue.capacity() as f64;
129
130        let pressure_float = if c > 0.0 && k > 0.0 {
131            (q / k) * 100.0
132        } else if q > 0.0 {
133            100.0
134        } else {
135            0.0
136        };
137        self.pressure
138            .store(pressure_float.clamp(0.0, 100.0) as usize, Ordering::Relaxed);
139    }
140
141    /// Returns the current pressure of the node (a percentage from 0 to 100).
142    pub fn get_pressure(&self) -> usize {
143        self.pressure.load(Ordering::Relaxed)
144    }
145
146    /// Returns the maximum possible pressure value (always 100).
147    pub fn max_pressure(&self) -> usize {
148        100
149    }
150
151    /// Returns a qualitative `PressureLevel` based on the current numeric pressure.
152    pub fn get_pressure_level(&self) -> PressureLevel {
153        match self.get_pressure() {
154            0 => PressureLevel::Empty,
155            1..=25 => PressureLevel::Low,
156            26..=75 => PressureLevel::Normal,
157            76..=99 => PressureLevel::High,
158            _ => PressureLevel::Full,
159        }
160    }
161
162    /// Spawns a new worker thread if the node is under load and below its max thread count.
163    fn spawn_worker_thread(&self, triggered_by_overload: bool) {
164        if self.active_threads() >= self.max_threads {
165            return;
166        }
167        if self.is_shutting_down.load(Ordering::Relaxed) {
168            return;
169        }
170
171        self.active_thread_count.fetch_add(1, Ordering::SeqCst);
172        self.desired_thread_count
173            .store(self.active_threads(), Ordering::SeqCst);
174
175        let now = elapsed_ns();
176        *self
177            .last_scaling_time
178            .lock()
179            .expect("Mutex should not be poisoned") = now;
180        if triggered_by_overload {
181            *self
182                .last_self_overload_time
183                .lock()
184                .expect("Mutex should not be poisoned") = now;
185        }
186
187        self.update_pressure();
188
189        let worker_context = self.clone_for_worker();
190        let handle = thread::Builder::new()
191            .name(format!(
192                "vibe-node-{}-worker-{}",
193                self.node_id.0,
194                self.active_threads()
195            ))
196            .spawn(move || worker_context.run_loop())
197            .expect("Failed to spawn worker thread");
198
199        self.worker_threads_handles
200            .lock()
201            .expect("Mutex should not be poisoned")
202            .push(handle);
203    }
204
205    /// Submits a task to this node's queue.
206    ///
207    /// This is an internal method called by the system's task router. It may
208    /// trigger spawning a new worker thread if the queue pressure becomes high.
209    pub(crate) fn submit_task(&self, task: Task) -> Result<(), NodeError> {
210        if self.is_shutting_down.load(Ordering::Relaxed) {
211            return Err(NodeError::NodeShuttingDown);
212        }
213        self.local_stats.task_submitted();
214        match self.task_queue.enqueue(task) {
215            Ok(()) => {
216                self.update_pressure();
217                if self.get_pressure_level() == PressureLevel::High
218                    || self.get_pressure_level() == PressureLevel::Full
219                {
220                    self.spawn_worker_thread(true);
221                }
222                Ok(())
223            }
224            Err(QueueError::Full) => {
225                *self
226                    .last_self_overload_time
227                    .lock()
228                    .expect("Mutex should not be poisoned") = elapsed_ns();
229                self.spawn_worker_thread(true);
230                Err(NodeError::QueueFull)
231            }
232            Err(e) => Err(NodeError::from(e)),
233        }
234    }
235
236    /// Begins the shutdown process for the node.
237    ///
238    /// This closes the task queue to new submissions and waits for all existing
239    /// worker threads to finish their current tasks and exit gracefully.
240    pub fn shutdown(&self) {
241        if self
242            .is_shutting_down
243            .compare_exchange(false, true, Ordering::SeqCst, Ordering::Relaxed)
244            .is_ok()
245        {
246            self.task_queue.close();
247            self.desired_thread_count.store(0, Ordering::SeqCst);
248            let mut workers = self
249                .worker_threads_handles
250                .lock()
251                .expect("Mutex should not be poisoned");
252            for handle in workers.drain(..) {
253                let _ = handle.join();
254            }
255        }
256    }
257
258    /// Clones the necessary context for a new worker thread.
259    ///
260    /// This bundles all the shared data (`Arc`s) that a worker needs to operate.
261    fn clone_for_worker(&self) -> WorkerContext {
262        WorkerContext {
263            node_id: self.node_id,
264            active_thread_count: Arc::clone(&self.active_thread_count),
265            desired_thread_count: Arc::clone(&self.desired_thread_count),
266            min_threads: self.min_threads,
267            max_threads: self.max_threads,
268            task_queue: self.task_queue.clone(),
269            last_scaling_time: Arc::clone(&self.last_scaling_time),
270            last_self_overload_time: Arc::clone(&self.last_self_overload_time),
271            scale_down_cooldown: self.scale_down_cooldown,
272            node_pressure_atomic: Arc::clone(&self.pressure),
273            is_shutting_down: Arc::clone(&self.is_shutting_down),
274            signal_tx: self.signal_tx.clone(),
275            local_stats: Arc::clone(&self.local_stats),
276        }
277    }
278
279    /// Returns the node's unique ID.
280    pub fn id(&self) -> NodeId {
281        self.node_id
282    }
283
284    /// Returns the current number of active worker threads.
285    pub fn active_threads(&self) -> usize {
286        self.active_thread_count.load(Ordering::Relaxed)
287    }
288
289    /// Returns the desired number of worker threads.
290    pub fn desired_threads(&self) -> usize {
291        self.desired_thread_count.load(Ordering::Relaxed)
292    }
293}
294
295/// Contains the shared state and logic for a single worker thread.
296struct WorkerContext {
297    node_id: NodeId,
298    active_thread_count: Arc<AtomicUsize>,
299    desired_thread_count: Arc<AtomicUsize>,
300    min_threads: usize,
301    max_threads: usize,
302    task_queue: VibeQueue<Task>,
303    last_scaling_time: Arc<Mutex<u64>>,
304    last_self_overload_time: Arc<Mutex<u64>>,
305    scale_down_cooldown: u64,
306    node_pressure_atomic: Arc<AtomicUsize>,
307    is_shutting_down: Arc<AtomicBool>,
308    signal_tx: mpsc::Sender<SystemSignal>,
309    local_stats: Arc<LocalStats>,
310}
311
312impl WorkerContext {
313    /// The main loop for a worker thread.
314    ///
315    /// The worker continuously pulls tasks from the queue, processes them, and
316    /// checks if it should scale down or retire.
317    fn run_loop(self) {
318        let mut retired_by_choice = false;
319        loop {
320            if self.is_shutting_down.load(Ordering::Relaxed) {
321                break;
322            }
323
324            if let Some(task) = self.task_queue.dequeue() {
325                self.update_pressure_from_context();
326                let _ = self.signal_tx.send(SystemSignal::TaskDequeuedByWorker {
327                    node_id: self.node_id,
328                    task_id: task.id,
329                });
330                self.process_task(task);
331            } else {
332                if self.is_shutting_down.load(Ordering::Relaxed) {
333                    break;
334                }
335
336                self.consider_scaling_down();
337                if self.check_and_attempt_self_retirement() {
338                    retired_by_choice = true;
339                    break;
340                }
341                thread::sleep(Duration::from_millis(5));
342            }
343        }
344
345        if !retired_by_choice {
346            self.active_thread_count.fetch_sub(1, Ordering::SeqCst);
347            self.update_pressure_from_context();
348        }
349    }
350
351    /// Executes a single task and records the outcome.
352    fn process_task(&self, task: Task) {
353        let task_id = task.id;
354        let start_time_ns = elapsed_ns();
355
356        let outcome = task.run();
357
358        let duration_ns = elapsed_ns().saturating_sub(start_time_ns);
359
360        let was_logically_successful = outcome == TaskExecutionOutcome::Success;
361        self.local_stats
362            .record_task_outcome(duration_ns, was_logically_successful);
363
364        if !self.is_shutting_down.load(Ordering::Relaxed) {
365            let signal = SystemSignal::TaskProcessed {
366                node_id: self.node_id,
367                task_id,
368                duration_micros: duration_ns / 1000,
369            };
370            let _ = self.signal_tx.send(signal);
371        }
372    }
373
374    /// Checks if the node is idle and if a thread can be scaled down.
375    fn consider_scaling_down(&self) {
376        let current_desired = self.desired_thread_count.load(Ordering::SeqCst);
377        if current_desired <= self.min_threads {
378            return;
379        }
380
381        let now = elapsed_ns();
382        let last_scale_time = *self
383            .last_scaling_time
384            .lock()
385            .expect("Mutex should not be poisoned");
386        let last_overload_time = *self
387            .last_self_overload_time
388            .lock()
389            .expect("Mutex should not be poisoned");
390
391        if now.saturating_sub(last_scale_time) < self.scale_down_cooldown {
392            return;
393        }
394        if now.saturating_sub(last_overload_time) < self.scale_down_cooldown {
395            return;
396        }
397
398        let pressure = self.get_pressure_from_context();
399        let pressure_level = self.get_pressure_level_from_pressure(pressure);
400
401        if (pressure_level == PressureLevel::Empty || pressure_level == PressureLevel::Low)
402            && self
403                .desired_thread_count
404                .compare_exchange(
405                    current_desired,
406                    current_desired - 1,
407                    Ordering::SeqCst,
408                    Ordering::Relaxed,
409                )
410                .is_ok()
411        {
412            *self
413                .last_scaling_time
414                .lock()
415                .expect("Mutex should not be poisoned") = now;
416        }
417    }
418
419    /// Checks if this worker thread is now superfluous and can shut down.
420    fn check_and_attempt_self_retirement(&self) -> bool {
421        let current_active = self.active_thread_count.load(Ordering::SeqCst);
422        if current_active <= self.min_threads
423            || current_active <= self.desired_thread_count.load(Ordering::SeqCst)
424        {
425            return false;
426        }
427
428        if self
429            .active_thread_count
430            .compare_exchange(
431                current_active,
432                current_active - 1,
433                Ordering::SeqCst,
434                Ordering::Relaxed,
435            )
436            .is_ok()
437        {
438            *self
439                .last_scaling_time
440                .lock()
441                .expect("Mutex should not be poisoned") = elapsed_ns();
442            self.update_pressure_from_context();
443            true
444        } else {
445            false
446        }
447    }
448
449    /// Helper for a worker to calculate the node's current pressure.
450    fn get_pressure_from_context(&self) -> usize {
451        let q = self.task_queue.len() as f64;
452        let c = self.active_thread_count.load(Ordering::Relaxed) as f64;
453        let k = self.task_queue.capacity() as f64;
454        let pressure_float = if c > 0.0 && k > 0.0 {
455            (q / k) * 100.0
456        } else if q > 0.0 {
457            100.0
458        } else {
459            0.0
460        };
461        pressure_float.clamp(0.0, 100.0) as usize
462    }
463
464    /// Helper to get a `PressureLevel` enum from a numeric pressure value.
465    fn get_pressure_level_from_pressure(&self, pressure: usize) -> PressureLevel {
466        match pressure {
467            0 => PressureLevel::Empty,
468            1..=25 => PressureLevel::Low,
469            26..=75 => PressureLevel::Normal,
470            76..=99 => PressureLevel::High,
471            _ => PressureLevel::Full,
472        }
473    }
474
475    /// Updates the node's shared pressure atomic from the worker's context.
476    fn update_pressure_from_context(&self) {
477        let pressure = self.get_pressure_from_context();
478        self.node_pressure_atomic.store(pressure, Ordering::Relaxed);
479    }
480}
481
482impl Drop for VibeNode {
483    /// Ensures the node is properly shut down when it goes out of scope.
484    fn drop(&mut self) {
485        if !self.is_shutting_down.load(Ordering::Relaxed) {
486            self.shutdown();
487        }
488    }
489}
490
491impl From<QueueError> for NodeError {
492    /// Converts a `QueueError` into a `NodeError`.
493    fn from(qe: QueueError) -> Self {
494        match qe {
495            QueueError::Full => NodeError::QueueFull,
496            QueueError::Closed => NodeError::QueueClosed,
497            QueueError::SendError => NodeError::SignalSendError,
498            QueueError::Empty => NodeError::QueueClosed,
499        }
500    }
501}