1use crate::fd_table::FdTableManager;
2use crate::pipe_manager::PipeManager;
3use crate::process_table::{ProcessStatus, ProcessTable};
4use crate::pty::PtyManager;
5use crate::socket_table::{SocketState, SocketTable};
6use secure_exec_bridge::queue_tracker::{register_limit, QueueGauge, TrackedLimit};
7use std::collections::BTreeMap;
8use std::error::Error;
9use std::fmt;
10use std::sync::Arc;
11use vfs::posix::usage::RootFilesystemResourceLimits;
12
13pub use vfs::posix::usage::{
14 measure_filesystem_usage, FileSystemUsage, DEFAULT_MAX_FILESYSTEM_BYTES,
15 DEFAULT_MAX_INODE_COUNT,
16};
17
18pub const DEFAULT_MAX_PROCESSES: usize = 256;
19pub const DEFAULT_MAX_OPEN_FDS: usize = 256;
20pub const DEFAULT_MAX_PIPES: usize = 128;
21pub const DEFAULT_MAX_PTYS: usize = 128;
22pub const DEFAULT_MAX_SOCKETS: usize = 256;
23pub const DEFAULT_MAX_CONNECTIONS: usize = 256;
24pub const DEFAULT_MAX_SOCKET_BUFFERED_BYTES: usize = 4 * 1024 * 1024;
25pub const DEFAULT_MAX_SOCKET_DATAGRAM_QUEUE_LEN: usize = 1_024;
26pub const DEFAULT_BLOCKING_READ_TIMEOUT_MS: u64 = 5_000;
27pub const DEFAULT_MAX_PREAD_BYTES: usize = 64 * 1024 * 1024;
28pub const DEFAULT_MAX_FD_WRITE_BYTES: usize = 64 * 1024 * 1024;
29pub const DEFAULT_MAX_PROCESS_ARGV_BYTES: usize = 1024 * 1024;
30pub const DEFAULT_MAX_PROCESS_ENV_BYTES: usize = 1024 * 1024;
31pub const DEFAULT_MAX_READDIR_ENTRIES: usize = 4_096;
32pub const DEFAULT_VIRTUAL_CPU_COUNT: usize = 1;
33pub const DEFAULT_MAX_WASM_MEMORY_BYTES: u64 = 128 * 1024 * 1024;
34
35#[derive(Debug, Clone, PartialEq, Eq, Default)]
36pub struct ResourceSnapshot {
37 pub running_processes: usize,
38 pub exited_processes: usize,
39 pub fd_tables: usize,
40 pub open_fds: usize,
41 pub pipes: usize,
42 pub pipe_buffered_bytes: usize,
43 pub ptys: usize,
44 pub pty_buffered_input_bytes: usize,
45 pub pty_buffered_output_bytes: usize,
46 pub sockets: usize,
47 pub socket_listeners: usize,
48 pub socket_connections: usize,
49 pub socket_buffered_bytes: usize,
50 pub socket_datagram_queue_len: usize,
51}
52
53#[derive(Debug, Clone, PartialEq, Eq)]
54pub struct ResourceLimits {
55 pub virtual_cpu_count: Option<usize>,
56 pub max_processes: Option<usize>,
57 pub max_open_fds: Option<usize>,
58 pub max_pipes: Option<usize>,
59 pub max_ptys: Option<usize>,
60 pub max_sockets: Option<usize>,
61 pub max_connections: Option<usize>,
62 pub max_socket_buffered_bytes: Option<usize>,
63 pub max_socket_datagram_queue_len: Option<usize>,
64 pub max_filesystem_bytes: Option<u64>,
65 pub max_inode_count: Option<usize>,
66 pub max_blocking_read_ms: Option<u64>,
67 pub max_pread_bytes: Option<usize>,
68 pub max_fd_write_bytes: Option<usize>,
69 pub max_process_argv_bytes: Option<usize>,
70 pub max_process_env_bytes: Option<usize>,
71 pub max_readdir_entries: Option<usize>,
72 pub max_wasm_fuel: Option<u64>,
73 pub max_wasm_memory_bytes: Option<u64>,
74 pub max_wasm_stack_bytes: Option<usize>,
75}
76
77impl Default for ResourceLimits {
78 fn default() -> Self {
79 Self {
80 virtual_cpu_count: Some(DEFAULT_VIRTUAL_CPU_COUNT),
81 max_processes: Some(DEFAULT_MAX_PROCESSES),
82 max_open_fds: Some(DEFAULT_MAX_OPEN_FDS),
83 max_pipes: Some(DEFAULT_MAX_PIPES),
84 max_ptys: Some(DEFAULT_MAX_PTYS),
85 max_sockets: Some(DEFAULT_MAX_SOCKETS),
86 max_connections: Some(DEFAULT_MAX_CONNECTIONS),
87 max_socket_buffered_bytes: Some(DEFAULT_MAX_SOCKET_BUFFERED_BYTES),
88 max_socket_datagram_queue_len: Some(DEFAULT_MAX_SOCKET_DATAGRAM_QUEUE_LEN),
89 max_filesystem_bytes: Some(DEFAULT_MAX_FILESYSTEM_BYTES),
90 max_inode_count: Some(DEFAULT_MAX_INODE_COUNT),
91 max_blocking_read_ms: Some(DEFAULT_BLOCKING_READ_TIMEOUT_MS),
92 max_pread_bytes: Some(DEFAULT_MAX_PREAD_BYTES),
93 max_fd_write_bytes: Some(DEFAULT_MAX_FD_WRITE_BYTES),
94 max_process_argv_bytes: Some(DEFAULT_MAX_PROCESS_ARGV_BYTES),
95 max_process_env_bytes: Some(DEFAULT_MAX_PROCESS_ENV_BYTES),
96 max_readdir_entries: Some(DEFAULT_MAX_READDIR_ENTRIES),
97 max_wasm_fuel: None,
98 max_wasm_memory_bytes: Some(DEFAULT_MAX_WASM_MEMORY_BYTES),
101 max_wasm_stack_bytes: None,
102 }
103 }
104}
105
106#[derive(Debug, Clone, PartialEq, Eq)]
107pub struct ResourceError {
108 code: &'static str,
109 message: String,
110}
111
112impl RootFilesystemResourceLimits for ResourceLimits {
113 fn max_filesystem_bytes(&self) -> Option<u64> {
114 self.max_filesystem_bytes
115 }
116
117 fn max_inode_count(&self) -> Option<usize> {
118 self.max_inode_count
119 }
120}
121
122impl ResourceError {
123 pub fn code(&self) -> &'static str {
124 self.code
125 }
126
127 fn exhausted(message: impl Into<String>) -> Self {
128 Self {
129 code: "EAGAIN",
130 message: message.into(),
131 }
132 }
133
134 fn file_table_full(message: impl Into<String>) -> Self {
135 Self {
136 code: "ENFILE",
137 message: message.into(),
138 }
139 }
140
141 fn filesystem_full(message: impl Into<String>) -> Self {
142 Self {
143 code: "ENOSPC",
144 message: message.into(),
145 }
146 }
147
148 fn invalid_input(message: impl Into<String>) -> Self {
149 Self {
150 code: "EINVAL",
151 message: message.into(),
152 }
153 }
154
155 fn out_of_memory(message: impl Into<String>) -> Self {
156 Self {
157 code: "ENOMEM",
158 message: message.into(),
159 }
160 }
161}
162
163impl fmt::Display for ResourceError {
164 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
165 write!(f, "{}: {}", self.code, self.message)
166 }
167}
168
169impl Error for ResourceError {}
170
171#[derive(Debug, Clone, Default)]
172struct ResourceGauges {
177 processes: Option<Arc<QueueGauge>>,
178 open_fds: Option<Arc<QueueGauge>>,
179 pipes: Option<Arc<QueueGauge>>,
180 ptys: Option<Arc<QueueGauge>>,
181 sockets: Option<Arc<QueueGauge>>,
182 connections: Option<Arc<QueueGauge>>,
183 socket_buffered_bytes: Option<Arc<QueueGauge>>,
184 socket_datagram_queue_len: Option<Arc<QueueGauge>>,
185 filesystem_bytes: Option<Arc<QueueGauge>>,
186 inodes: Option<Arc<QueueGauge>>,
187}
188
189fn register_resource_gauge(name: TrackedLimit, limit: Option<usize>) -> Option<Arc<QueueGauge>> {
190 limit.map(|capacity| register_limit(name, capacity))
191}
192
193fn register_resource_gauge_u64(name: TrackedLimit, limit: Option<u64>) -> Option<Arc<QueueGauge>> {
194 limit.map(|capacity| register_limit(name, usize_saturating_from_u64(capacity)))
195}
196
197fn usize_saturating_from_u64(value: u64) -> usize {
198 usize::try_from(value).unwrap_or(usize::MAX)
199}
200
201impl ResourceGauges {
202 fn new(limits: &ResourceLimits) -> Self {
203 Self {
204 processes: register_resource_gauge(TrackedLimit::VmProcesses, limits.max_processes),
205 open_fds: register_resource_gauge(TrackedLimit::VmOpenFds, limits.max_open_fds),
206 pipes: register_resource_gauge(TrackedLimit::VmPipes, limits.max_pipes),
207 ptys: register_resource_gauge(TrackedLimit::VmPtys, limits.max_ptys),
208 sockets: register_resource_gauge(TrackedLimit::VmSockets, limits.max_sockets),
209 connections: register_resource_gauge(
210 TrackedLimit::VmConnections,
211 limits.max_connections,
212 ),
213 socket_buffered_bytes: register_resource_gauge(
214 TrackedLimit::VmSocketBufferedBytes,
215 limits.max_socket_buffered_bytes,
216 ),
217 socket_datagram_queue_len: register_resource_gauge(
218 TrackedLimit::VmSocketDatagramQueueLen,
219 limits.max_socket_datagram_queue_len,
220 ),
221 filesystem_bytes: register_resource_gauge_u64(
222 TrackedLimit::VmFilesystemBytes,
223 limits.max_filesystem_bytes,
224 ),
225 inodes: register_resource_gauge(TrackedLimit::VmInodes, limits.max_inode_count),
226 }
227 }
228}
229
230pub struct ResourceAccountant {
231 limits: ResourceLimits,
232 gauges: ResourceGauges,
233}
234
235impl ResourceAccountant {
236 pub fn new(limits: ResourceLimits) -> Self {
237 let gauges = ResourceGauges::new(&limits);
238 Self { limits, gauges }
239 }
240
241 fn observe_resource_gauges(&self, snapshot: &ResourceSnapshot) {
244 if let Some(gauge) = &self.gauges.processes {
245 gauge.observe_depth(snapshot.running_processes + snapshot.exited_processes);
246 }
247 if let Some(gauge) = &self.gauges.open_fds {
248 gauge.observe_depth(snapshot.open_fds);
249 }
250 if let Some(gauge) = &self.gauges.pipes {
251 gauge.observe_depth(snapshot.pipes);
252 }
253 if let Some(gauge) = &self.gauges.ptys {
254 gauge.observe_depth(snapshot.ptys);
255 }
256 if let Some(gauge) = &self.gauges.sockets {
257 gauge.observe_depth(snapshot.sockets);
258 }
259 if let Some(gauge) = &self.gauges.connections {
260 gauge.observe_depth(snapshot.socket_connections);
261 }
262 if let Some(gauge) = &self.gauges.socket_buffered_bytes {
263 gauge.observe_depth(snapshot.socket_buffered_bytes);
264 }
265 if let Some(gauge) = &self.gauges.socket_datagram_queue_len {
266 gauge.observe_depth(snapshot.socket_datagram_queue_len);
267 }
268 }
269
270 pub fn limits(&self) -> &ResourceLimits {
271 &self.limits
272 }
273
274 pub fn snapshot(
275 &self,
276 processes: &ProcessTable,
277 fd_tables: &FdTableManager,
278 pipes: &PipeManager,
279 ptys: &PtyManager,
280 sockets: &SocketTable,
281 ) -> ResourceSnapshot {
282 let process_list = processes.list_processes();
283 let running_processes = process_list
284 .values()
285 .filter(|process| process.status == ProcessStatus::Running)
286 .count();
287 let exited_processes = process_list
288 .values()
289 .filter(|process| process.status == ProcessStatus::Exited)
290 .count();
291 let socket_snapshot = sockets.snapshot();
292
293 let snapshot = ResourceSnapshot {
294 running_processes,
295 exited_processes,
296 fd_tables: fd_tables.len(),
297 open_fds: fd_tables.total_open_fds(),
298 pipes: pipes.pipe_count(),
299 pipe_buffered_bytes: pipes.buffered_bytes(),
300 ptys: ptys.pty_count(),
301 pty_buffered_input_bytes: ptys.buffered_input_bytes(),
302 pty_buffered_output_bytes: ptys.buffered_output_bytes(),
303 sockets: socket_snapshot.sockets,
304 socket_listeners: socket_snapshot.listeners,
305 socket_connections: socket_snapshot.connections,
306 socket_buffered_bytes: socket_snapshot.buffered_bytes,
307 socket_datagram_queue_len: socket_snapshot.datagram_queue_len,
308 };
309 self.observe_resource_gauges(&snapshot);
310 snapshot
311 }
312
313 pub fn check_process_spawn(
314 &self,
315 snapshot: &ResourceSnapshot,
316 additional_fds: usize,
317 ) -> Result<(), ResourceError> {
318 if let Some(limit) = self.limits.max_processes {
319 if snapshot.running_processes + snapshot.exited_processes >= limit {
320 return Err(ResourceError::exhausted("maximum process limit reached"));
321 }
322 }
323
324 self.check_open_fds(snapshot, additional_fds)
325 }
326
327 pub fn check_process_argv_bytes(
328 &self,
329 command: &str,
330 args: &[String],
331 ) -> Result<(), ResourceError> {
332 if let Some(limit) = self.limits.max_process_argv_bytes {
333 let total = argv_payload_bytes(command, args);
334 if total > limit {
335 return Err(ResourceError::invalid_input(format!(
336 "process argv payload {total} bytes exceeds configured limit {limit}"
337 )));
338 }
339 }
340
341 Ok(())
342 }
343
344 pub fn check_process_env_bytes(
345 &self,
346 inherited_env: &BTreeMap<String, String>,
347 overrides: &BTreeMap<String, String>,
348 ) -> Result<(), ResourceError> {
349 if let Some(limit) = self.limits.max_process_env_bytes {
350 let total = merged_env_payload_bytes(inherited_env, overrides);
351 if total > limit {
352 return Err(ResourceError::invalid_input(format!(
353 "process environment payload {total} bytes exceeds configured limit {limit}"
354 )));
355 }
356 }
357
358 Ok(())
359 }
360
361 pub fn check_pipe_allocation(&self, snapshot: &ResourceSnapshot) -> Result<(), ResourceError> {
362 if let Some(limit) = self.limits.max_pipes {
363 if snapshot.pipes >= limit {
364 return Err(ResourceError::exhausted("maximum pipe count reached"));
365 }
366 }
367
368 self.check_open_fds(snapshot, 2)
369 }
370
371 pub fn check_pty_allocation(&self, snapshot: &ResourceSnapshot) -> Result<(), ResourceError> {
372 if let Some(limit) = self.limits.max_ptys {
373 if snapshot.ptys >= limit {
374 return Err(ResourceError::exhausted("maximum PTY count reached"));
375 }
376 }
377
378 self.check_open_fds(snapshot, 2)
379 }
380
381 pub fn check_socket_allocation(
382 &self,
383 snapshot: &ResourceSnapshot,
384 ) -> Result<(), ResourceError> {
385 if let Some(limit) = self.limits.max_sockets {
386 if snapshot.sockets >= limit {
387 return Err(ResourceError::exhausted("maximum socket count reached"));
388 }
389 }
390
391 Ok(())
392 }
393
394 pub fn check_socket_state_transition(
395 &self,
396 snapshot: &ResourceSnapshot,
397 current: SocketState,
398 next: SocketState,
399 ) -> Result<(), ResourceError> {
400 if !current.counts_as_connection() && next.counts_as_connection() {
401 if let Some(limit) = self.limits.max_connections {
402 if snapshot.socket_connections >= limit {
403 return Err(ResourceError::exhausted("maximum connection count reached"));
404 }
405 }
406 }
407
408 Ok(())
409 }
410
411 pub fn check_socket_buffer_growth(
412 &self,
413 snapshot: &ResourceSnapshot,
414 additional_bytes: usize,
415 ) -> Result<(), ResourceError> {
416 if let Some(limit) = self.limits.max_socket_buffered_bytes {
417 if snapshot
418 .socket_buffered_bytes
419 .saturating_add(additional_bytes)
420 > limit
421 {
422 return Err(ResourceError::exhausted(
423 "maximum socket buffered byte limit reached",
424 ));
425 }
426 }
427
428 Ok(())
429 }
430
431 pub fn check_socket_datagram_enqueue(
432 &self,
433 snapshot: &ResourceSnapshot,
434 additional_bytes: usize,
435 ) -> Result<(), ResourceError> {
436 self.check_socket_buffer_growth(snapshot, additional_bytes)?;
437 if let Some(limit) = self.limits.max_socket_datagram_queue_len {
438 if snapshot.socket_datagram_queue_len.saturating_add(1) > limit {
439 return Err(ResourceError::exhausted(
440 "maximum socket datagram queue length reached",
441 ));
442 }
443 }
444
445 Ok(())
446 }
447
448 pub fn check_pread_length(&self, length: usize) -> Result<(), ResourceError> {
449 if let Some(limit) = self.limits.max_pread_bytes {
450 if length > limit {
451 return Err(ResourceError::invalid_input(format!(
452 "pread length {length} exceeds configured limit {limit}"
453 )));
454 }
455 }
456
457 Ok(())
458 }
459
460 pub fn check_fd_write_size(&self, size: usize) -> Result<(), ResourceError> {
461 if let Some(limit) = self.limits.max_fd_write_bytes {
462 if size > limit {
463 return Err(ResourceError::invalid_input(format!(
464 "write size {size} exceeds configured limit {limit}"
465 )));
466 }
467 }
468
469 Ok(())
470 }
471
472 pub fn check_fd_allocation(
473 &self,
474 snapshot: &ResourceSnapshot,
475 additional_fds: usize,
476 ) -> Result<(), ResourceError> {
477 self.check_open_fds(snapshot, additional_fds)
478 }
479
480 pub fn max_readdir_entries(&self) -> Option<usize> {
481 self.limits.max_readdir_entries
482 }
483
484 pub fn check_readdir_entries(&self, entries: usize) -> Result<(), ResourceError> {
485 if let Some(limit) = self.limits.max_readdir_entries {
486 if entries > limit {
487 return Err(ResourceError::out_of_memory(format!(
488 "directory listing with {entries} entries exceeds configured limit {limit}"
489 )));
490 }
491 }
492
493 Ok(())
494 }
495
496 fn check_open_fds(
497 &self,
498 snapshot: &ResourceSnapshot,
499 additional_fds: usize,
500 ) -> Result<(), ResourceError> {
501 if let Some(limit) = self.limits.max_open_fds {
502 if snapshot.open_fds.saturating_add(additional_fds) > limit {
503 return Err(ResourceError::file_table_full(
504 "maximum open file descriptor limit reached",
505 ));
506 }
507 }
508
509 Ok(())
510 }
511
512 pub fn check_filesystem_usage(
513 &self,
514 _usage: &FileSystemUsage,
515 resulting_bytes: u64,
516 resulting_inodes: usize,
517 ) -> Result<(), ResourceError> {
518 if let Some(limit) = self.limits.max_filesystem_bytes {
519 if resulting_bytes > limit {
520 return Err(ResourceError::filesystem_full(
521 "maximum filesystem size limit reached",
522 ));
523 }
524 }
525
526 if let Some(limit) = self.limits.max_inode_count {
527 if resulting_inodes > limit {
528 return Err(ResourceError::filesystem_full(
529 "maximum inode count limit reached",
530 ));
531 }
532 }
533
534 if let Some(gauge) = &self.gauges.filesystem_bytes {
538 gauge.observe_depth(usize_saturating_from_u64(resulting_bytes));
539 }
540 if let Some(gauge) = &self.gauges.inodes {
541 gauge.observe_depth(resulting_inodes);
542 }
543 Ok(())
544 }
545}
546
547fn argv_payload_bytes(command: &str, args: &[String]) -> usize {
548 let command_bytes = command.len().saturating_add(1);
549 command_bytes.saturating_add(
550 args.iter()
551 .map(|arg| arg.len().saturating_add(1))
552 .sum::<usize>(),
553 )
554}
555
556fn env_entry_payload_bytes(key: &str, value: &str) -> usize {
557 key.len()
558 .saturating_add(1)
559 .saturating_add(value.len())
560 .saturating_add(1)
561}
562
563fn merged_env_payload_bytes(
564 inherited_env: &BTreeMap<String, String>,
565 overrides: &BTreeMap<String, String>,
566) -> usize {
567 let mut total = inherited_env
568 .iter()
569 .map(|(key, value)| env_entry_payload_bytes(key, value))
570 .sum::<usize>();
571
572 for (key, value) in overrides {
573 if let Some(previous) = inherited_env.get(key) {
574 total = total.saturating_sub(env_entry_payload_bytes(key, previous));
575 }
576 total = total.saturating_add(env_entry_payload_bytes(key, value));
577 }
578
579 total
580}
581
582#[cfg(test)]
583mod gauge_tests {
584 use super::*;
585 use secure_exec_bridge::queue_tracker::{
586 set_limit_warning_handler, LimitWarning, TrackedLimit,
587 };
588 use std::sync::{Arc, Mutex};
589
590 #[test]
591 fn resource_gauges_track_usage_and_warn_on_approach() {
592 let captured: Arc<Mutex<Vec<LimitWarning>>> = Arc::new(Mutex::new(Vec::new()));
593 let sink = Arc::clone(&captured);
594 set_limit_warning_handler(Box::new(move |warning| {
596 if warning.name == TrackedLimit::VmOpenFds {
597 sink.lock().expect("sink mutex").push(warning.clone());
598 }
599 }));
600
601 let limits = ResourceLimits {
602 max_open_fds: Some(10),
603 ..ResourceLimits::default()
604 };
605 let accountant = ResourceAccountant::new(limits);
606 let snapshot = ResourceSnapshot {
607 open_fds: 9, ..ResourceSnapshot::default()
609 };
610 accountant.observe_resource_gauges(&snapshot);
611
612 let gauge = accountant
614 .gauges
615 .open_fds
616 .as_ref()
617 .expect("open_fds gauge registered when the limit is set");
618 assert_eq!(gauge.depth(), 9);
619 assert_eq!(gauge.capacity(), 10);
620 assert_eq!(gauge.high_water(), 9);
621
622 assert!(
624 captured
625 .lock()
626 .unwrap()
627 .iter()
628 .any(|warning| warning.name == TrackedLimit::VmOpenFds),
629 "open_fds at 90% of cap must emit an approach warning"
630 );
631 }
632
633 #[test]
634 fn unset_limit_registers_no_gauge() {
635 let limits = ResourceLimits {
636 max_ptys: None,
637 ..ResourceLimits::default()
638 };
639 let accountant = ResourceAccountant::new(limits);
640 assert!(
641 accountant.gauges.ptys.is_none(),
642 "an unbounded (None) limit must not register a gauge"
643 );
644 }
645
646 #[test]
647 fn filesystem_gauge_not_latched_by_rejected_write() {
648 let limits = ResourceLimits {
649 max_filesystem_bytes: Some(1000),
650 max_inode_count: Some(100),
651 ..ResourceLimits::default()
652 };
653 let accountant = ResourceAccountant::new(limits);
654 let usage = FileSystemUsage::default();
655
656 let rejected = accountant.check_filesystem_usage(&usage, 2000, 0);
659 assert!(rejected.is_err());
660 let bytes_gauge = accountant
661 .gauges
662 .filesystem_bytes
663 .as_ref()
664 .expect("filesystem_bytes gauge registered");
665 assert_eq!(
666 bytes_gauge.depth(),
667 0,
668 "a rejected over-limit write must not bump the gauge"
669 );
670
671 accountant
673 .check_filesystem_usage(&usage, 500, 7)
674 .expect("under-limit write is accepted");
675 assert_eq!(bytes_gauge.depth(), 500);
676 assert_eq!(
677 accountant.gauges.inodes.as_ref().unwrap().depth(),
678 7,
679 "inode gauge tracks the accepted value"
680 );
681 }
682}