dasein_agentic_core/distributed/
allocation.rs1use chrono::{DateTime, Duration, Utc};
16use serde::{Deserialize, Serialize};
17use std::collections::HashMap;
18use std::sync::Arc;
19use tokio::sync::RwLock;
20use uuid::Uuid;
21
22#[derive(Debug, Clone, Serialize, Deserialize)]
24#[serde(rename_all = "snake_case")]
25pub enum AllocationReason {
26 Overloaded { queue_depth: usize },
28 SpecializedTask { capability: String },
30 Deadline { remaining_ms: u64 },
32 TrafficBurst { requests_per_sec: f64 },
34}
35
36#[derive(Debug, Clone, Serialize, Deserialize)]
38pub struct AllocationRequest {
39 pub id: String,
41 pub from_supervisor: String,
43 pub reason: AllocationReason,
45 pub executors_needed: usize,
47 pub max_duration_ms: u64,
49 pub priority: i32,
51 pub created_at: DateTime<Utc>,
53}
54
55impl AllocationRequest {
56 pub fn new(from_supervisor: impl Into<String>, executors_needed: usize) -> Self {
58 Self {
59 id: Uuid::new_v4().to_string(),
60 from_supervisor: from_supervisor.into(),
61 reason: AllocationReason::Overloaded { queue_depth: 10 },
62 executors_needed,
63 max_duration_ms: 60_000,
64 priority: 0,
65 created_at: Utc::now(),
66 }
67 }
68
69 #[must_use]
71 pub fn reason(mut self, reason: AllocationReason) -> Self {
72 self.reason = reason;
73 self
74 }
75
76 #[must_use]
78 pub fn priority(mut self, priority: i32) -> Self {
79 self.priority = priority;
80 self
81 }
82
83 #[must_use]
85 pub fn duration_ms(mut self, ms: u64) -> Self {
86 self.max_duration_ms = ms;
87 self
88 }
89
90 #[must_use]
92 pub fn duration_secs(self, secs: u64) -> Self {
93 self.duration_ms(secs * 1000)
94 }
95}
96
97#[derive(Debug, Clone, Serialize, Deserialize)]
99pub struct AllocationOffer {
100 pub id: String,
102 pub from_supervisor: String,
104 pub to_supervisor: String,
106 pub request_id: String,
108 pub executor_ids: Vec<String>,
110 pub max_duration_ms: u64,
112 pub revocable: bool,
114}
115
116impl AllocationOffer {
117 #[must_use]
119 pub fn new(
120 from_supervisor: impl Into<String>,
121 to_supervisor: impl Into<String>,
122 request_id: impl Into<String>,
123 executor_ids: Vec<String>,
124 ) -> Self {
125 Self {
126 id: Uuid::new_v4().to_string(),
127 from_supervisor: from_supervisor.into(),
128 to_supervisor: to_supervisor.into(),
129 request_id: request_id.into(),
130 executor_ids,
131 max_duration_ms: 60_000,
132 revocable: true,
133 }
134 }
135
136 #[must_use]
138 pub fn duration_ms(mut self, ms: u64) -> Self {
139 self.max_duration_ms = ms;
140 self
141 }
142
143 #[must_use]
145 pub fn non_revocable(mut self) -> Self {
146 self.revocable = false;
147 self
148 }
149}
150
151#[derive(Debug, Clone, Serialize, Deserialize)]
153pub struct AllocationGrant {
154 pub lease_id: String,
156 pub from_supervisor: String,
158 pub to_supervisor: String,
160 pub executor_ids: Vec<String>,
162 pub granted_at: DateTime<Utc>,
164 pub expires_at: DateTime<Utc>,
166 pub revocable: bool,
168}
169
170impl AllocationGrant {
171 pub fn new(
173 from_supervisor: impl Into<String>,
174 to_supervisor: impl Into<String>,
175 executor_ids: Vec<String>,
176 ) -> Self {
177 let now = Utc::now();
178 Self {
179 lease_id: Uuid::new_v4().to_string(),
180 from_supervisor: from_supervisor.into(),
181 to_supervisor: to_supervisor.into(),
182 executor_ids,
183 granted_at: now,
184 expires_at: now + Duration::seconds(60),
185 revocable: true,
186 }
187 }
188
189 #[must_use]
191 pub fn duration_secs(mut self, secs: i64) -> Self {
192 self.expires_at = self.granted_at + Duration::seconds(secs);
193 self
194 }
195
196 #[must_use]
198 pub fn is_expired(&self) -> bool {
199 Utc::now() > self.expires_at
200 }
201
202 #[must_use]
204 pub fn remaining_ms(&self) -> i64 {
205 (self.expires_at - Utc::now()).num_milliseconds().max(0)
206 }
207}
208
209#[derive(Debug, Clone, Serialize, Deserialize)]
211pub struct AllocationRelease {
212 pub lease_id: String,
214 pub reason: ReleaseReason,
216 pub released_at: DateTime<Utc>,
218}
219
220#[derive(Debug, Clone, Serialize, Deserialize)]
221#[serde(rename_all = "snake_case")]
222pub enum ReleaseReason {
223 Expired,
225 ReturnedEarly,
227 Revoked,
229 Error { message: String },
231}
232
233#[derive(Debug)]
235pub struct AllocationManager {
236 supervisor_id: String,
238 lent_grants: Arc<RwLock<HashMap<String, AllocationGrant>>>,
240 borrowed_grants: Arc<RwLock<HashMap<String, AllocationGrant>>>,
242 max_lend: usize,
244 max_borrow: usize,
246}
247
248impl AllocationManager {
249 pub fn new(supervisor_id: impl Into<String>) -> Self {
251 Self {
252 supervisor_id: supervisor_id.into(),
253 lent_grants: Arc::new(RwLock::new(HashMap::new())),
254 borrowed_grants: Arc::new(RwLock::new(HashMap::new())),
255 max_lend: 4,
256 max_borrow: 4,
257 }
258 }
259
260 #[must_use]
262 pub fn max_lend(mut self, n: usize) -> Self {
263 self.max_lend = n;
264 self
265 }
266
267 #[must_use]
269 pub fn max_borrow(mut self, n: usize) -> Self {
270 self.max_borrow = n;
271 self
272 }
273
274 pub async fn record_lent(&self, grant: AllocationGrant) {
276 self.lent_grants
277 .write()
278 .await
279 .insert(grant.lease_id.clone(), grant);
280 }
281
282 pub async fn record_borrowed(&self, grant: AllocationGrant) {
284 self.borrowed_grants
285 .write()
286 .await
287 .insert(grant.lease_id.clone(), grant);
288 }
289
290 pub async fn remove_grant(&self, lease_id: &str) {
292 self.lent_grants.write().await.remove(lease_id);
293 self.borrowed_grants.write().await.remove(lease_id);
294 }
295
296 pub async fn lent_count(&self) -> usize {
298 self.lent_grants
299 .read()
300 .await
301 .values()
302 .map(|g| g.executor_ids.len())
303 .sum()
304 }
305
306 pub async fn borrowed_count(&self) -> usize {
308 self.borrowed_grants
309 .read()
310 .await
311 .values()
312 .map(|g| g.executor_ids.len())
313 .sum()
314 }
315
316 pub async fn can_lend(&self, count: usize) -> bool {
318 self.lent_count().await + count <= self.max_lend
319 }
320
321 pub async fn can_borrow(&self, count: usize) -> bool {
323 self.borrowed_count().await + count <= self.max_borrow
324 }
325
326 pub async fn get_expired_grants(&self) -> Vec<AllocationGrant> {
328 let mut expired = Vec::new();
329
330 for grant in self.lent_grants.read().await.values() {
331 if grant.is_expired() {
332 expired.push(grant.clone());
333 }
334 }
335
336 for grant in self.borrowed_grants.read().await.values() {
337 if grant.is_expired() {
338 expired.push(grant.clone());
339 }
340 }
341
342 expired
343 }
344
345 pub async fn cleanup_expired(&self) -> Vec<String> {
347 let mut removed = Vec::new();
348
349 {
350 let mut lent = self.lent_grants.write().await;
351 lent.retain(|id, grant| {
352 if grant.is_expired() {
353 removed.push(id.clone());
354 false
355 } else {
356 true
357 }
358 });
359 }
360
361 {
362 let mut borrowed = self.borrowed_grants.write().await;
363 borrowed.retain(|id, grant| {
364 if grant.is_expired() {
365 removed.push(id.clone());
366 false
367 } else {
368 true
369 }
370 });
371 }
372
373 removed
374 }
375
376 pub async fn borrowed_executor_ids(&self) -> Vec<String> {
378 self.borrowed_grants
379 .read()
380 .await
381 .values()
382 .flat_map(|g| g.executor_ids.clone())
383 .collect()
384 }
385
386 pub async fn lent_executor_ids(&self) -> Vec<String> {
388 self.lent_grants
389 .read()
390 .await
391 .values()
392 .flat_map(|g| g.executor_ids.clone())
393 .collect()
394 }
395
396 #[must_use]
398 pub fn create_offer(
399 &self,
400 request: &AllocationRequest,
401 executor_ids: Vec<String>,
402 ) -> AllocationOffer {
403 AllocationOffer::new(
404 &self.supervisor_id,
405 &request.from_supervisor,
406 &request.id,
407 executor_ids,
408 )
409 }
410
411 #[must_use]
413 pub fn create_release(lease_id: impl Into<String>, reason: ReleaseReason) -> AllocationRelease {
414 AllocationRelease {
415 lease_id: lease_id.into(),
416 reason,
417 released_at: Utc::now(),
418 }
419 }
420}
421
422#[cfg(test)]
423mod tests {
424 use super::*;
425
426 #[test]
427 fn test_allocation_request() {
428 let req = AllocationRequest::new("sup-code", 2)
429 .reason(AllocationReason::Overloaded { queue_depth: 15 })
430 .priority(1)
431 .duration_secs(120);
432
433 assert_eq!(req.from_supervisor, "sup-code");
434 assert_eq!(req.executors_needed, 2);
435 assert_eq!(req.max_duration_ms, 120_000);
436 }
437
438 #[test]
439 fn test_allocation_grant() {
440 let grant =
441 AllocationGrant::new("sup-data", "sup-code", vec!["exe-001".into()]).duration_secs(60);
442
443 assert!(!grant.is_expired());
444 assert!(grant.remaining_ms() > 0);
445 }
446
447 #[tokio::test]
448 async fn test_allocation_manager() {
449 let manager = AllocationManager::new("sup-001").max_lend(4).max_borrow(4);
450
451 assert!(manager.can_lend(2).await);
452 assert!(manager.can_borrow(2).await);
453
454 let grant = AllocationGrant::new("sup-001", "sup-002", vec!["exe-001".into()]);
455 manager.record_lent(grant).await;
456
457 assert_eq!(manager.lent_count().await, 1);
458 }
459
460 #[test]
461 fn test_allocation_offer() {
462 let offer = AllocationOffer::new("sup-data", "sup-code", "req-001", vec!["exe-001".into()])
463 .duration_ms(120_000)
464 .non_revocable();
465
466 assert_eq!(offer.from_supervisor, "sup-data");
467 assert!(!offer.revocable);
468 }
469
470 #[test]
471 fn test_allocation_release() {
472 let release = AllocationRelease {
473 lease_id: "lease-001".to_string(),
474 reason: ReleaseReason::Expired,
475 released_at: Utc::now(),
476 };
477
478 assert_eq!(release.lease_id, "lease-001");
479 assert!(matches!(release.reason, ReleaseReason::Expired));
480 }
481}