Skip to main content

apiary_runtime/
behavioral.rs

1//! Behavioral model components: Task Abandonment and Colony Temperature.
2//!
3//! These behaviors govern resource management and failure recovery in Apiary.
4//! - [`AbandonmentTracker`]: Tracks task failures and decides when to abandon
5//! - [`ColonyThermometer`]: Measures system health as a composite temperature metric
6
7use std::collections::HashMap;
8use std::sync::{Arc, Mutex};
9
10use apiary_core::types::TaskId;
11
12use crate::bee::BeePool;
13
14/// Decision made by the abandonment tracker after a task failure.
15#[derive(Debug, Clone, PartialEq)]
16pub enum AbandonmentDecision {
17    /// Retry the task (possibly on a different node).
18    Retry,
19    /// Abandon the task — query fails with diagnostic error.
20    Abandon,
21}
22
23/// Tracker for task failures that decides when to abandon failing tasks.
24///
25/// Tasks that repeatedly fail are abandoned rather than retried indefinitely.
26/// This prevents the colony from wasting effort on unrecoverable work.
27#[derive(Debug)]
28pub struct AbandonmentTracker {
29    /// Map of task ID to failure count.
30    trial_counts: Arc<Mutex<HashMap<TaskId, u32>>>,
31    /// Maximum number of attempts before abandoning (default: 3).
32    trial_limit: u32,
33}
34
35impl AbandonmentTracker {
36    /// Create a new abandonment tracker with the given trial limit.
37    pub fn new(trial_limit: u32) -> Self {
38        Self {
39            trial_counts: Arc::new(Mutex::new(HashMap::new())),
40            trial_limit,
41        }
42    }
43
44    /// Record a task failure and return the abandonment decision.
45    ///
46    /// # Arguments
47    ///
48    /// * `task_id` — The ID of the failed task
49    ///
50    /// # Returns
51    ///
52    /// * `AbandonmentDecision::Retry` — Try again (possibly on a different node)
53    /// * `AbandonmentDecision::Abandon` — Give up with diagnostic error
54    pub fn record_failure(&self, task_id: &TaskId) -> AbandonmentDecision {
55        let mut counts = self.trial_counts.lock().unwrap();
56        let count = counts.entry(task_id.clone()).or_insert(0);
57        *count += 1;
58        if *count >= self.trial_limit {
59            AbandonmentDecision::Abandon
60        } else {
61            AbandonmentDecision::Retry
62        }
63    }
64
65    /// Record a task success and clear its failure count.
66    pub fn record_success(&self, task_id: &TaskId) {
67        let mut counts = self.trial_counts.lock().unwrap();
68        counts.remove(task_id);
69    }
70
71    /// Get the current failure count for a task (0 if not tracked).
72    pub fn get_count(&self, task_id: &TaskId) -> u32 {
73        let counts = self.trial_counts.lock().unwrap();
74        counts.get(task_id).copied().unwrap_or(0)
75    }
76
77    /// Clear all tracked failures.
78    pub fn clear(&self) {
79        let mut counts = self.trial_counts.lock().unwrap();
80        counts.clear();
81    }
82}
83
84impl Default for AbandonmentTracker {
85    fn default() -> Self {
86        Self::new(3)
87    }
88}
89
90/// Temperature regulation state based on colony temperature reading.
91#[derive(Debug, Clone, PartialEq)]
92pub enum TemperatureRegulation {
93    /// System underutilized (< 0.3) — run background maintenance.
94    Cold,
95    /// Normal operation (0.3-0.7).
96    Ideal,
97    /// Reduce non-essential work (0.7-0.85) — defer compaction.
98    Warm,
99    /// Throttle writes (0.85-0.95), pause maintenance.
100    Hot,
101    /// Reject new work (> 0.95), focus on completing in-progress tasks.
102    Critical,
103}
104
105impl TemperatureRegulation {
106    /// Convert regulation state to a human-readable string.
107    pub fn as_str(&self) -> &'static str {
108        match self {
109            Self::Cold => "cold",
110            Self::Ideal => "ideal",
111            Self::Warm => "warm",
112            Self::Hot => "hot",
113            Self::Critical => "critical",
114        }
115    }
116}
117
118/// Colony thermometer measures system health as a composite temperature metric.
119///
120/// Temperature is a value from 0.0 to 1.0 calculated from:
121/// - CPU utilization (busy bees / total bees)
122/// - Memory pressure (average memory utilization across bees)
123/// - Queue pressure (queued tasks / capacity)
124pub struct ColonyThermometer {
125    /// Target temperature (default 0.5).
126    setpoint: f64,
127}
128
129impl ColonyThermometer {
130    /// Create a new thermometer with the given setpoint.
131    pub fn new(setpoint: f64) -> Self {
132        Self { setpoint }
133    }
134
135    /// Measure the current colony temperature from the bee pool state.
136    ///
137    /// Returns a value from 0.0 to 1.0 representing system load.
138    pub async fn measure(&self, bee_pool: &BeePool) -> f64 {
139        let status = bee_pool.status().await;
140        let total_bees = status.len() as f64;
141
142        if total_bees == 0.0 {
143            return 0.0;
144        }
145
146        // CPU utilization: fraction of busy bees
147        let busy_count = status.iter().filter(|s| s.state != "idle").count() as f64;
148        let cpu_util = busy_count / total_bees;
149
150        // Memory pressure: average memory utilization across all bees
151        let memory_pressure = {
152            let sum: f64 = status
153                .iter()
154                .map(|s| s.memory_used as f64 / s.memory_budget as f64)
155                .sum();
156            sum / total_bees
157        };
158
159        // Queue pressure: queued tasks / (total bees * 2)
160        let queue_size = bee_pool.queue_size().await as f64;
161        let queue_capacity = total_bees * 2.0;
162        let queue_pressure = if queue_capacity > 0.0 {
163            (queue_size / queue_capacity).min(1.0)
164        } else {
165            0.0
166        };
167
168        // Weighted composite: CPU and memory are most important
169        let temperature = 0.4 * cpu_util + 0.4 * memory_pressure + 0.2 * queue_pressure;
170        temperature.min(1.0)
171    }
172
173    /// Determine the appropriate regulation state for the given temperature.
174    pub fn regulation(&self, temperature: f64) -> TemperatureRegulation {
175        match temperature {
176            t if t < 0.3 => TemperatureRegulation::Cold,
177            t if t <= 0.7 => TemperatureRegulation::Ideal,
178            t if t <= 0.85 => TemperatureRegulation::Warm,
179            t if t <= 0.95 => TemperatureRegulation::Hot,
180            _ => TemperatureRegulation::Critical,
181        }
182    }
183
184    /// Get the setpoint.
185    pub fn setpoint(&self) -> f64 {
186        self.setpoint
187    }
188}
189
190impl Default for ColonyThermometer {
191    fn default() -> Self {
192        Self::new(0.5)
193    }
194}
195
196#[cfg(test)]
197mod tests {
198    use super::*;
199    use apiary_core::config::NodeConfig;
200    use std::time::Duration;
201
202    #[test]
203    fn test_abandonment_tracker_retry_then_abandon() {
204        let tracker = AbandonmentTracker::new(3);
205        let task_id = TaskId::generate();
206
207        // First failure: retry
208        assert_eq!(tracker.record_failure(&task_id), AbandonmentDecision::Retry);
209        assert_eq!(tracker.get_count(&task_id), 1);
210
211        // Second failure: retry
212        assert_eq!(tracker.record_failure(&task_id), AbandonmentDecision::Retry);
213        assert_eq!(tracker.get_count(&task_id), 2);
214
215        // Third failure: abandon
216        assert_eq!(
217            tracker.record_failure(&task_id),
218            AbandonmentDecision::Abandon
219        );
220        assert_eq!(tracker.get_count(&task_id), 3);
221    }
222
223    #[test]
224    fn test_abandonment_tracker_success_clears_count() {
225        let tracker = AbandonmentTracker::new(3);
226        let task_id = TaskId::generate();
227
228        tracker.record_failure(&task_id);
229        tracker.record_failure(&task_id);
230        assert_eq!(tracker.get_count(&task_id), 2);
231
232        tracker.record_success(&task_id);
233        assert_eq!(tracker.get_count(&task_id), 0);
234    }
235
236    #[test]
237    fn test_abandonment_tracker_independent_tasks() {
238        let tracker = AbandonmentTracker::new(2);
239        let task1 = TaskId::generate();
240        let task2 = TaskId::generate();
241
242        tracker.record_failure(&task1);
243        assert_eq!(tracker.get_count(&task1), 1);
244        assert_eq!(tracker.get_count(&task2), 0);
245
246        tracker.record_failure(&task2);
247        assert_eq!(tracker.get_count(&task1), 1);
248        assert_eq!(tracker.get_count(&task2), 1);
249    }
250
251    #[test]
252    fn test_abandonment_tracker_clear() {
253        let tracker = AbandonmentTracker::new(3);
254        let task1 = TaskId::generate();
255        let task2 = TaskId::generate();
256
257        tracker.record_failure(&task1);
258        tracker.record_failure(&task2);
259        tracker.clear();
260
261        assert_eq!(tracker.get_count(&task1), 0);
262        assert_eq!(tracker.get_count(&task2), 0);
263    }
264
265    #[test]
266    fn test_temperature_regulation_classification() {
267        let thermo = ColonyThermometer::default();
268
269        assert_eq!(thermo.regulation(0.0), TemperatureRegulation::Cold);
270        assert_eq!(thermo.regulation(0.2), TemperatureRegulation::Cold);
271        assert_eq!(thermo.regulation(0.3), TemperatureRegulation::Ideal);
272        assert_eq!(thermo.regulation(0.5), TemperatureRegulation::Ideal);
273        assert_eq!(thermo.regulation(0.7), TemperatureRegulation::Ideal);
274        assert_eq!(thermo.regulation(0.75), TemperatureRegulation::Warm);
275        assert_eq!(thermo.regulation(0.85), TemperatureRegulation::Warm);
276        assert_eq!(thermo.regulation(0.90), TemperatureRegulation::Hot);
277        assert_eq!(thermo.regulation(0.95), TemperatureRegulation::Hot);
278        assert_eq!(thermo.regulation(0.96), TemperatureRegulation::Critical);
279        assert_eq!(thermo.regulation(1.0), TemperatureRegulation::Critical);
280    }
281
282    #[tokio::test]
283    async fn test_colony_temperature_idle() {
284        let tmp = tempfile::TempDir::new().unwrap();
285        let mut config = NodeConfig::detect("local://test");
286        config.cores = 4;
287        config.memory_per_bee = 1024 * 1024;
288        config.cache_dir = tmp.path().to_path_buf();
289
290        let pool = BeePool::new(&config);
291        let thermo = ColonyThermometer::default();
292
293        let temp = thermo.measure(&pool).await;
294        // All idle, no queue, no memory use → near zero
295        assert!(temp < 0.1, "Expected low temperature, got {temp}");
296        assert_eq!(thermo.regulation(temp), TemperatureRegulation::Cold);
297    }
298
299    #[tokio::test]
300    async fn test_colony_temperature_all_busy() {
301        let tmp = tempfile::TempDir::new().unwrap();
302        let mut config = NodeConfig::detect("local://test");
303        config.cores = 2;
304        config.memory_per_bee = 1024 * 1024;
305        config.cache_dir = tmp.path().to_path_buf();
306
307        let pool = Arc::new(BeePool::new(&config));
308        let thermo = ColonyThermometer::default();
309
310        // Submit 2 long-running tasks to keep both bees busy
311        let _h1 = pool
312            .submit(|| {
313                std::thread::sleep(Duration::from_millis(200));
314                Ok(vec![])
315            })
316            .await;
317        let _h2 = pool
318            .submit(|| {
319                std::thread::sleep(Duration::from_millis(200));
320                Ok(vec![])
321            })
322            .await;
323
324        // Give tasks time to start
325        tokio::time::sleep(Duration::from_millis(50)).await;
326
327        let temp = thermo.measure(&pool).await;
328        // Both bees busy → CPU util = 1.0, so temp should be at least 0.4
329        assert!(temp >= 0.3, "Expected elevated temperature, got {temp}");
330    }
331
332    #[test]
333    fn test_temperature_regulation_as_str() {
334        assert_eq!(TemperatureRegulation::Cold.as_str(), "cold");
335        assert_eq!(TemperatureRegulation::Ideal.as_str(), "ideal");
336        assert_eq!(TemperatureRegulation::Warm.as_str(), "warm");
337        assert_eq!(TemperatureRegulation::Hot.as_str(), "hot");
338        assert_eq!(TemperatureRegulation::Critical.as_str(), "critical");
339    }
340}