apiary_runtime/
behavioral.rs1use std::collections::HashMap;
8use std::sync::{Arc, Mutex};
9
10use apiary_core::types::TaskId;
11
12use crate::bee::BeePool;
13
14#[derive(Debug, Clone, PartialEq)]
16pub enum AbandonmentDecision {
17 Retry,
19 Abandon,
21}
22
23#[derive(Debug)]
28pub struct AbandonmentTracker {
29 trial_counts: Arc<Mutex<HashMap<TaskId, u32>>>,
31 trial_limit: u32,
33}
34
35impl AbandonmentTracker {
36 pub fn new(trial_limit: u32) -> Self {
38 Self {
39 trial_counts: Arc::new(Mutex::new(HashMap::new())),
40 trial_limit,
41 }
42 }
43
44 pub fn record_failure(&self, task_id: &TaskId) -> AbandonmentDecision {
55 let mut counts = self.trial_counts.lock().unwrap();
56 let count = counts.entry(task_id.clone()).or_insert(0);
57 *count += 1;
58 if *count >= self.trial_limit {
59 AbandonmentDecision::Abandon
60 } else {
61 AbandonmentDecision::Retry
62 }
63 }
64
65 pub fn record_success(&self, task_id: &TaskId) {
67 let mut counts = self.trial_counts.lock().unwrap();
68 counts.remove(task_id);
69 }
70
71 pub fn get_count(&self, task_id: &TaskId) -> u32 {
73 let counts = self.trial_counts.lock().unwrap();
74 counts.get(task_id).copied().unwrap_or(0)
75 }
76
77 pub fn clear(&self) {
79 let mut counts = self.trial_counts.lock().unwrap();
80 counts.clear();
81 }
82}
83
84impl Default for AbandonmentTracker {
85 fn default() -> Self {
86 Self::new(3)
87 }
88}
89
90#[derive(Debug, Clone, PartialEq)]
92pub enum TemperatureRegulation {
93 Cold,
95 Ideal,
97 Warm,
99 Hot,
101 Critical,
103}
104
105impl TemperatureRegulation {
106 pub fn as_str(&self) -> &'static str {
108 match self {
109 Self::Cold => "cold",
110 Self::Ideal => "ideal",
111 Self::Warm => "warm",
112 Self::Hot => "hot",
113 Self::Critical => "critical",
114 }
115 }
116}
117
118pub struct ColonyThermometer {
125 setpoint: f64,
127}
128
129impl ColonyThermometer {
130 pub fn new(setpoint: f64) -> Self {
132 Self { setpoint }
133 }
134
135 pub async fn measure(&self, bee_pool: &BeePool) -> f64 {
139 let status = bee_pool.status().await;
140 let total_bees = status.len() as f64;
141
142 if total_bees == 0.0 {
143 return 0.0;
144 }
145
146 let busy_count = status.iter().filter(|s| s.state != "idle").count() as f64;
148 let cpu_util = busy_count / total_bees;
149
150 let memory_pressure = {
152 let sum: f64 = status
153 .iter()
154 .map(|s| s.memory_used as f64 / s.memory_budget as f64)
155 .sum();
156 sum / total_bees
157 };
158
159 let queue_size = bee_pool.queue_size().await as f64;
161 let queue_capacity = total_bees * 2.0;
162 let queue_pressure = if queue_capacity > 0.0 {
163 (queue_size / queue_capacity).min(1.0)
164 } else {
165 0.0
166 };
167
168 let temperature = 0.4 * cpu_util + 0.4 * memory_pressure + 0.2 * queue_pressure;
170 temperature.min(1.0)
171 }
172
173 pub fn regulation(&self, temperature: f64) -> TemperatureRegulation {
175 match temperature {
176 t if t < 0.3 => TemperatureRegulation::Cold,
177 t if t <= 0.7 => TemperatureRegulation::Ideal,
178 t if t <= 0.85 => TemperatureRegulation::Warm,
179 t if t <= 0.95 => TemperatureRegulation::Hot,
180 _ => TemperatureRegulation::Critical,
181 }
182 }
183
184 pub fn setpoint(&self) -> f64 {
186 self.setpoint
187 }
188}
189
190impl Default for ColonyThermometer {
191 fn default() -> Self {
192 Self::new(0.5)
193 }
194}
195
196#[cfg(test)]
197mod tests {
198 use super::*;
199 use apiary_core::config::NodeConfig;
200 use std::time::Duration;
201
202 #[test]
203 fn test_abandonment_tracker_retry_then_abandon() {
204 let tracker = AbandonmentTracker::new(3);
205 let task_id = TaskId::generate();
206
207 assert_eq!(tracker.record_failure(&task_id), AbandonmentDecision::Retry);
209 assert_eq!(tracker.get_count(&task_id), 1);
210
211 assert_eq!(tracker.record_failure(&task_id), AbandonmentDecision::Retry);
213 assert_eq!(tracker.get_count(&task_id), 2);
214
215 assert_eq!(
217 tracker.record_failure(&task_id),
218 AbandonmentDecision::Abandon
219 );
220 assert_eq!(tracker.get_count(&task_id), 3);
221 }
222
223 #[test]
224 fn test_abandonment_tracker_success_clears_count() {
225 let tracker = AbandonmentTracker::new(3);
226 let task_id = TaskId::generate();
227
228 tracker.record_failure(&task_id);
229 tracker.record_failure(&task_id);
230 assert_eq!(tracker.get_count(&task_id), 2);
231
232 tracker.record_success(&task_id);
233 assert_eq!(tracker.get_count(&task_id), 0);
234 }
235
236 #[test]
237 fn test_abandonment_tracker_independent_tasks() {
238 let tracker = AbandonmentTracker::new(2);
239 let task1 = TaskId::generate();
240 let task2 = TaskId::generate();
241
242 tracker.record_failure(&task1);
243 assert_eq!(tracker.get_count(&task1), 1);
244 assert_eq!(tracker.get_count(&task2), 0);
245
246 tracker.record_failure(&task2);
247 assert_eq!(tracker.get_count(&task1), 1);
248 assert_eq!(tracker.get_count(&task2), 1);
249 }
250
251 #[test]
252 fn test_abandonment_tracker_clear() {
253 let tracker = AbandonmentTracker::new(3);
254 let task1 = TaskId::generate();
255 let task2 = TaskId::generate();
256
257 tracker.record_failure(&task1);
258 tracker.record_failure(&task2);
259 tracker.clear();
260
261 assert_eq!(tracker.get_count(&task1), 0);
262 assert_eq!(tracker.get_count(&task2), 0);
263 }
264
265 #[test]
266 fn test_temperature_regulation_classification() {
267 let thermo = ColonyThermometer::default();
268
269 assert_eq!(thermo.regulation(0.0), TemperatureRegulation::Cold);
270 assert_eq!(thermo.regulation(0.2), TemperatureRegulation::Cold);
271 assert_eq!(thermo.regulation(0.3), TemperatureRegulation::Ideal);
272 assert_eq!(thermo.regulation(0.5), TemperatureRegulation::Ideal);
273 assert_eq!(thermo.regulation(0.7), TemperatureRegulation::Ideal);
274 assert_eq!(thermo.regulation(0.75), TemperatureRegulation::Warm);
275 assert_eq!(thermo.regulation(0.85), TemperatureRegulation::Warm);
276 assert_eq!(thermo.regulation(0.90), TemperatureRegulation::Hot);
277 assert_eq!(thermo.regulation(0.95), TemperatureRegulation::Hot);
278 assert_eq!(thermo.regulation(0.96), TemperatureRegulation::Critical);
279 assert_eq!(thermo.regulation(1.0), TemperatureRegulation::Critical);
280 }
281
282 #[tokio::test]
283 async fn test_colony_temperature_idle() {
284 let tmp = tempfile::TempDir::new().unwrap();
285 let mut config = NodeConfig::detect("local://test");
286 config.cores = 4;
287 config.memory_per_bee = 1024 * 1024;
288 config.cache_dir = tmp.path().to_path_buf();
289
290 let pool = BeePool::new(&config);
291 let thermo = ColonyThermometer::default();
292
293 let temp = thermo.measure(&pool).await;
294 assert!(temp < 0.1, "Expected low temperature, got {temp}");
296 assert_eq!(thermo.regulation(temp), TemperatureRegulation::Cold);
297 }
298
299 #[tokio::test]
300 async fn test_colony_temperature_all_busy() {
301 let tmp = tempfile::TempDir::new().unwrap();
302 let mut config = NodeConfig::detect("local://test");
303 config.cores = 2;
304 config.memory_per_bee = 1024 * 1024;
305 config.cache_dir = tmp.path().to_path_buf();
306
307 let pool = Arc::new(BeePool::new(&config));
308 let thermo = ColonyThermometer::default();
309
310 let _h1 = pool
312 .submit(|| {
313 std::thread::sleep(Duration::from_millis(200));
314 Ok(vec![])
315 })
316 .await;
317 let _h2 = pool
318 .submit(|| {
319 std::thread::sleep(Duration::from_millis(200));
320 Ok(vec![])
321 })
322 .await;
323
324 tokio::time::sleep(Duration::from_millis(50)).await;
326
327 let temp = thermo.measure(&pool).await;
328 assert!(temp >= 0.3, "Expected elevated temperature, got {temp}");
330 }
331
332 #[test]
333 fn test_temperature_regulation_as_str() {
334 assert_eq!(TemperatureRegulation::Cold.as_str(), "cold");
335 assert_eq!(TemperatureRegulation::Ideal.as_str(), "ideal");
336 assert_eq!(TemperatureRegulation::Warm.as_str(), "warm");
337 assert_eq!(TemperatureRegulation::Hot.as_str(), "hot");
338 assert_eq!(TemperatureRegulation::Critical.as_str(), "critical");
339 }
340}