Skip to main content

execgo_runtime/
ledger.rs

1use crate::{
2    error::{AppError, AppResult},
3    types::{ResourceCapacity, RuntimeResourcesResponse, TaskResourceReservation},
4};
5
6#[derive(Debug, Clone)]
7pub struct ResourceLedger {
8    capacity: ResourceCapacity,
9}
10
11impl ResourceLedger {
12    pub fn new(capacity: ResourceCapacity) -> Self {
13        Self { capacity }
14    }
15
16    pub fn capacity(&self) -> &ResourceCapacity {
17        &self.capacity
18    }
19
20    pub fn ensure_within_capacity(&self, reservation: &TaskResourceReservation) -> AppResult<()> {
21        if reservation.task_slots > self.capacity.task_slots {
22            return Err(AppError::InsufficientResources(format!(
23                "task requires {} task slots but runtime capacity is {}",
24                reservation.task_slots, self.capacity.task_slots
25            )));
26        }
27
28        if let (Some(requested), Some(capacity)) =
29            (reservation.memory_bytes, self.capacity.memory_bytes)
30        {
31            if requested > capacity {
32                return Err(AppError::InsufficientResources(format!(
33                    "task requires {requested} memory_bytes but runtime capacity is {capacity}"
34                )));
35            }
36        }
37
38        if let (Some(requested), Some(capacity)) = (reservation.pids, self.capacity.pids) {
39            if requested > capacity {
40                return Err(AppError::InsufficientResources(format!(
41                    "task requires {requested} pids but runtime capacity is {capacity}"
42                )));
43            }
44        }
45
46        Ok(())
47    }
48
49    pub fn can_reserve(
50        &self,
51        currently_reserved: &ResourceCapacity,
52        reservation: &TaskResourceReservation,
53    ) -> bool {
54        if currently_reserved
55            .task_slots
56            .saturating_add(reservation.task_slots)
57            > self.capacity.task_slots
58        {
59            return false;
60        }
61
62        if let (Some(reserved), Some(requested), Some(capacity)) = (
63            currently_reserved.memory_bytes,
64            reservation.memory_bytes,
65            self.capacity.memory_bytes,
66        ) {
67            if reserved.saturating_add(requested) > capacity {
68                return false;
69            }
70        }
71
72        if let (Some(reserved), Some(requested), Some(capacity)) = (
73            currently_reserved.pids,
74            reservation.pids,
75            self.capacity.pids,
76        ) {
77            if reserved.saturating_add(requested) > capacity {
78                return false;
79            }
80        }
81
82        true
83    }
84
85    pub fn reserved_capacity<'a, I>(&self, reservations: I) -> ResourceCapacity
86    where
87        I: IntoIterator<Item = &'a TaskResourceReservation>,
88    {
89        let mut task_slots = 0u64;
90        let mut memory = if self.capacity.memory_bytes.is_some() {
91            Some(0u64)
92        } else {
93            None
94        };
95        let mut pids = if self.capacity.pids.is_some() {
96            Some(0u64)
97        } else {
98            None
99        };
100
101        for reservation in reservations {
102            task_slots = task_slots.saturating_add(reservation.task_slots);
103            if let Some(value) = reservation.memory_bytes {
104                let current = memory.unwrap_or(0);
105                memory = Some(current.saturating_add(value));
106            }
107            if let Some(value) = reservation.pids {
108                let current = pids.unwrap_or(0);
109                pids = Some(current.saturating_add(value));
110            }
111        }
112
113        ResourceCapacity {
114            task_slots,
115            memory_bytes: memory,
116            pids,
117        }
118    }
119
120    pub fn available_capacity(&self, reserved: &ResourceCapacity) -> ResourceCapacity {
121        ResourceCapacity {
122            task_slots: self.capacity.task_slots.saturating_sub(reserved.task_slots),
123            memory_bytes: self
124                .capacity
125                .memory_bytes
126                .map(|capacity| capacity.saturating_sub(reserved.memory_bytes.unwrap_or(0))),
127            pids: self
128                .capacity
129                .pids
130                .map(|capacity| capacity.saturating_sub(reserved.pids.unwrap_or(0))),
131        }
132    }
133
134    pub fn empty_snapshot(&self, runtime_id: String) -> RuntimeResourcesResponse {
135        let reserved = self.reserved_capacity(std::iter::empty::<&TaskResourceReservation>());
136        RuntimeResourcesResponse {
137            runtime_id,
138            capacity: self.capacity.clone(),
139            available: self.available_capacity(&reserved),
140            reserved,
141            active_reservations: Vec::new(),
142            accepted_waiting_tasks: 0,
143        }
144    }
145}
146
147#[cfg(test)]
148mod tests {
149    use super::*;
150
151    #[test]
152    fn detects_capacity_overflow() {
153        let ledger = ResourceLedger::new(ResourceCapacity {
154            task_slots: 2,
155            memory_bytes: Some(100),
156            pids: Some(8),
157        });
158        let reserved = ResourceCapacity {
159            task_slots: 1,
160            memory_bytes: Some(50),
161            pids: Some(2),
162        };
163        let reservation = TaskResourceReservation {
164            task_slots: 2,
165            memory_bytes: Some(60),
166            pids: Some(1),
167        };
168        assert!(!ledger.can_reserve(&reserved, &reservation));
169    }
170}