Skip to main content

dasein_agentic_core/distributed/
allocation.rs

1//! Dynamic executor allocation between supervisors.
2//!
3//! # Quick Start
4//!
5//! ```rust
6//! // Request help
7//! let request = AllocationRequest::new("sup-code", 2)
8//!     .reason(AllocationReason::Overloaded { queue_depth: 15 });
9//!
10//! // Grant executors
11//! let grant = AllocationGrant::new("sup-data", "sup-code", vec!["exe-001", "exe-002"])
12//!     .duration_secs(60);
13//! ```
14
15use chrono::{DateTime, Duration, Utc};
16use serde::{Deserialize, Serialize};
17use std::collections::HashMap;
18use std::sync::Arc;
19use tokio::sync::RwLock;
20use uuid::Uuid;
21
22/// Reason for requesting allocation.
23#[derive(Debug, Clone, Serialize, Deserialize)]
24#[serde(rename_all = "snake_case")]
25pub enum AllocationReason {
26    /// Queue is too deep
27    Overloaded { queue_depth: usize },
28    /// Need specific capability
29    SpecializedTask { capability: String },
30    /// Urgent deadline
31    Deadline { remaining_ms: u64 },
32    /// Traffic burst
33    TrafficBurst { requests_per_sec: f64 },
34}
35
36/// Request for executor allocation.
37#[derive(Debug, Clone, Serialize, Deserialize)]
38pub struct AllocationRequest {
39    /// Request ID
40    pub id: String,
41    /// Requesting supervisor
42    pub from_supervisor: String,
43    /// Reason
44    pub reason: AllocationReason,
45    /// Number of executors needed
46    pub executors_needed: usize,
47    /// Max duration in ms
48    pub max_duration_ms: u64,
49    /// Priority (higher = more urgent)
50    pub priority: i32,
51    /// Created at
52    pub created_at: DateTime<Utc>,
53}
54
55impl AllocationRequest {
56    /// Create a new allocation request.
57    pub fn new(from_supervisor: impl Into<String>, executors_needed: usize) -> Self {
58        Self {
59            id: Uuid::new_v4().to_string(),
60            from_supervisor: from_supervisor.into(),
61            reason: AllocationReason::Overloaded { queue_depth: 10 },
62            executors_needed,
63            max_duration_ms: 60_000,
64            priority: 0,
65            created_at: Utc::now(),
66        }
67    }
68
69    /// Set reason.
70    #[must_use]
71    pub fn reason(mut self, reason: AllocationReason) -> Self {
72        self.reason = reason;
73        self
74    }
75
76    /// Set priority.
77    #[must_use]
78    pub fn priority(mut self, priority: i32) -> Self {
79        self.priority = priority;
80        self
81    }
82
83    /// Set max duration.
84    #[must_use]
85    pub fn duration_ms(mut self, ms: u64) -> Self {
86        self.max_duration_ms = ms;
87        self
88    }
89
90    /// Set duration in seconds.
91    #[must_use]
92    pub fn duration_secs(self, secs: u64) -> Self {
93        self.duration_ms(secs * 1000)
94    }
95}
96
97/// Offer of executors.
98#[derive(Debug, Clone, Serialize, Deserialize)]
99pub struct AllocationOffer {
100    /// Offer ID
101    pub id: String,
102    /// Offering supervisor
103    pub from_supervisor: String,
104    /// To supervisor
105    pub to_supervisor: String,
106    /// Request ID being answered
107    pub request_id: String,
108    /// Available executor IDs
109    pub executor_ids: Vec<String>,
110    /// Max duration offered
111    pub max_duration_ms: u64,
112    /// Can be revoked early
113    pub revocable: bool,
114}
115
116impl AllocationOffer {
117    /// Create a new offer responding to an allocation request.
118    #[must_use]
119    pub fn new(
120        from_supervisor: impl Into<String>,
121        to_supervisor: impl Into<String>,
122        request_id: impl Into<String>,
123        executor_ids: Vec<String>,
124    ) -> Self {
125        Self {
126            id: Uuid::new_v4().to_string(),
127            from_supervisor: from_supervisor.into(),
128            to_supervisor: to_supervisor.into(),
129            request_id: request_id.into(),
130            executor_ids,
131            max_duration_ms: 60_000,
132            revocable: true,
133        }
134    }
135
136    /// Set duration.
137    #[must_use]
138    pub fn duration_ms(mut self, ms: u64) -> Self {
139        self.max_duration_ms = ms;
140        self
141    }
142
143    /// Set non-revocable.
144    #[must_use]
145    pub fn non_revocable(mut self) -> Self {
146        self.revocable = false;
147        self
148    }
149}
150
151/// Granted allocation (lease).
152#[derive(Debug, Clone, Serialize, Deserialize)]
153pub struct AllocationGrant {
154    /// Lease ID
155    pub lease_id: String,
156    /// From supervisor (lender)
157    pub from_supervisor: String,
158    /// To supervisor (borrower)
159    pub to_supervisor: String,
160    /// Executor IDs
161    pub executor_ids: Vec<String>,
162    /// Granted at
163    pub granted_at: DateTime<Utc>,
164    /// Expires at
165    pub expires_at: DateTime<Utc>,
166    /// Is revocable
167    pub revocable: bool,
168}
169
170impl AllocationGrant {
171    /// Create a new grant.
172    pub fn new(
173        from_supervisor: impl Into<String>,
174        to_supervisor: impl Into<String>,
175        executor_ids: Vec<String>,
176    ) -> Self {
177        let now = Utc::now();
178        Self {
179            lease_id: Uuid::new_v4().to_string(),
180            from_supervisor: from_supervisor.into(),
181            to_supervisor: to_supervisor.into(),
182            executor_ids,
183            granted_at: now,
184            expires_at: now + Duration::seconds(60),
185            revocable: true,
186        }
187    }
188
189    /// Set duration in seconds.
190    #[must_use]
191    pub fn duration_secs(mut self, secs: i64) -> Self {
192        self.expires_at = self.granted_at + Duration::seconds(secs);
193        self
194    }
195
196    /// Check if expired.
197    #[must_use]
198    pub fn is_expired(&self) -> bool {
199        Utc::now() > self.expires_at
200    }
201
202    /// Time remaining in ms.
203    #[must_use]
204    pub fn remaining_ms(&self) -> i64 {
205        (self.expires_at - Utc::now()).num_milliseconds().max(0)
206    }
207}
208
209/// Release of allocated executors.
210#[derive(Debug, Clone, Serialize, Deserialize)]
211pub struct AllocationRelease {
212    /// Lease ID
213    pub lease_id: String,
214    /// Reason for release
215    pub reason: ReleaseReason,
216    /// Released at
217    pub released_at: DateTime<Utc>,
218}
219
220#[derive(Debug, Clone, Serialize, Deserialize)]
221#[serde(rename_all = "snake_case")]
222pub enum ReleaseReason {
223    /// Lease expired
224    Expired,
225    /// Returned early by borrower
226    ReturnedEarly,
227    /// Revoked by lender
228    Revoked,
229    /// Error
230    Error { message: String },
231}
232
233/// Manages allocations for a supervisor.
234#[derive(Debug)]
235pub struct AllocationManager {
236    /// Supervisor ID
237    supervisor_id: String,
238    /// Active grants where we are the lender
239    lent_grants: Arc<RwLock<HashMap<String, AllocationGrant>>>,
240    /// Active grants where we are the borrower
241    borrowed_grants: Arc<RwLock<HashMap<String, AllocationGrant>>>,
242    /// Max executors we can lend
243    max_lend: usize,
244    /// Max executors we can borrow
245    max_borrow: usize,
246}
247
248impl AllocationManager {
249    /// Create a new allocation manager.
250    pub fn new(supervisor_id: impl Into<String>) -> Self {
251        Self {
252            supervisor_id: supervisor_id.into(),
253            lent_grants: Arc::new(RwLock::new(HashMap::new())),
254            borrowed_grants: Arc::new(RwLock::new(HashMap::new())),
255            max_lend: 4,
256            max_borrow: 4,
257        }
258    }
259
260    /// Set max lend.
261    #[must_use]
262    pub fn max_lend(mut self, n: usize) -> Self {
263        self.max_lend = n;
264        self
265    }
266
267    /// Set max borrow.
268    #[must_use]
269    pub fn max_borrow(mut self, n: usize) -> Self {
270        self.max_borrow = n;
271        self
272    }
273
274    /// Record a grant where we are the lender.
275    pub async fn record_lent(&self, grant: AllocationGrant) {
276        self.lent_grants
277            .write()
278            .await
279            .insert(grant.lease_id.clone(), grant);
280    }
281
282    /// Record a grant where we are the borrower.
283    pub async fn record_borrowed(&self, grant: AllocationGrant) {
284        self.borrowed_grants
285            .write()
286            .await
287            .insert(grant.lease_id.clone(), grant);
288    }
289
290    /// Remove a grant.
291    pub async fn remove_grant(&self, lease_id: &str) {
292        self.lent_grants.write().await.remove(lease_id);
293        self.borrowed_grants.write().await.remove(lease_id);
294    }
295
296    /// Get count of lent executors.
297    pub async fn lent_count(&self) -> usize {
298        self.lent_grants
299            .read()
300            .await
301            .values()
302            .map(|g| g.executor_ids.len())
303            .sum()
304    }
305
306    /// Get count of borrowed executors.
307    pub async fn borrowed_count(&self) -> usize {
308        self.borrowed_grants
309            .read()
310            .await
311            .values()
312            .map(|g| g.executor_ids.len())
313            .sum()
314    }
315
316    /// Can we lend more?
317    pub async fn can_lend(&self, count: usize) -> bool {
318        self.lent_count().await + count <= self.max_lend
319    }
320
321    /// Can we borrow more?
322    pub async fn can_borrow(&self, count: usize) -> bool {
323        self.borrowed_count().await + count <= self.max_borrow
324    }
325
326    /// Get expired grants.
327    pub async fn get_expired_grants(&self) -> Vec<AllocationGrant> {
328        let mut expired = Vec::new();
329
330        for grant in self.lent_grants.read().await.values() {
331            if grant.is_expired() {
332                expired.push(grant.clone());
333            }
334        }
335
336        for grant in self.borrowed_grants.read().await.values() {
337            if grant.is_expired() {
338                expired.push(grant.clone());
339            }
340        }
341
342        expired
343    }
344
345    /// Clean up expired grants.
346    pub async fn cleanup_expired(&self) -> Vec<String> {
347        let mut removed = Vec::new();
348
349        {
350            let mut lent = self.lent_grants.write().await;
351            lent.retain(|id, grant| {
352                if grant.is_expired() {
353                    removed.push(id.clone());
354                    false
355                } else {
356                    true
357                }
358            });
359        }
360
361        {
362            let mut borrowed = self.borrowed_grants.write().await;
363            borrowed.retain(|id, grant| {
364                if grant.is_expired() {
365                    removed.push(id.clone());
366                    false
367                } else {
368                    true
369                }
370            });
371        }
372
373        removed
374    }
375
376    /// Get all borrowed executor IDs.
377    pub async fn borrowed_executor_ids(&self) -> Vec<String> {
378        self.borrowed_grants
379            .read()
380            .await
381            .values()
382            .flat_map(|g| g.executor_ids.clone())
383            .collect()
384    }
385
386    /// Get all lent executor IDs.
387    pub async fn lent_executor_ids(&self) -> Vec<String> {
388        self.lent_grants
389            .read()
390            .await
391            .values()
392            .flat_map(|g| g.executor_ids.clone())
393            .collect()
394    }
395
396    /// Create an offer for a request. Used by supervisors to respond to allocation requests.
397    #[must_use]
398    pub fn create_offer(
399        &self,
400        request: &AllocationRequest,
401        executor_ids: Vec<String>,
402    ) -> AllocationOffer {
403        AllocationOffer::new(
404            &self.supervisor_id,
405            &request.from_supervisor,
406            &request.id,
407            executor_ids,
408        )
409    }
410
411    /// Create a release notification when returning executors.
412    #[must_use]
413    pub fn create_release(lease_id: impl Into<String>, reason: ReleaseReason) -> AllocationRelease {
414        AllocationRelease {
415            lease_id: lease_id.into(),
416            reason,
417            released_at: Utc::now(),
418        }
419    }
420}
421
422#[cfg(test)]
423mod tests {
424    use super::*;
425
426    #[test]
427    fn test_allocation_request() {
428        let req = AllocationRequest::new("sup-code", 2)
429            .reason(AllocationReason::Overloaded { queue_depth: 15 })
430            .priority(1)
431            .duration_secs(120);
432
433        assert_eq!(req.from_supervisor, "sup-code");
434        assert_eq!(req.executors_needed, 2);
435        assert_eq!(req.max_duration_ms, 120_000);
436    }
437
438    #[test]
439    fn test_allocation_grant() {
440        let grant =
441            AllocationGrant::new("sup-data", "sup-code", vec!["exe-001".into()]).duration_secs(60);
442
443        assert!(!grant.is_expired());
444        assert!(grant.remaining_ms() > 0);
445    }
446
447    #[tokio::test]
448    async fn test_allocation_manager() {
449        let manager = AllocationManager::new("sup-001").max_lend(4).max_borrow(4);
450
451        assert!(manager.can_lend(2).await);
452        assert!(manager.can_borrow(2).await);
453
454        let grant = AllocationGrant::new("sup-001", "sup-002", vec!["exe-001".into()]);
455        manager.record_lent(grant).await;
456
457        assert_eq!(manager.lent_count().await, 1);
458    }
459
460    #[test]
461    fn test_allocation_offer() {
462        let offer = AllocationOffer::new("sup-data", "sup-code", "req-001", vec!["exe-001".into()])
463            .duration_ms(120_000)
464            .non_revocable();
465
466        assert_eq!(offer.from_supervisor, "sup-data");
467        assert!(!offer.revocable);
468    }
469
470    #[test]
471    fn test_allocation_release() {
472        let release = AllocationRelease {
473            lease_id: "lease-001".to_string(),
474            reason: ReleaseReason::Expired,
475            released_at: Utc::now(),
476        };
477
478        assert_eq!(release.lease_id, "lease-001");
479        assert!(matches!(release.reason, ReleaseReason::Expired));
480    }
481}