celers_core/
time_limit.rs

1//! Time Limits for Task Execution
2//!
3//! This module provides time limit enforcement for task execution:
4//!
5//! - **Soft Time Limit**: Warning before the task is killed, allowing graceful cleanup
6//! - **Hard Time Limit**: Force kill after this duration
7//!
8//! # Example
9//!
10//! ```rust
11//! use celers_core::time_limit::{TimeLimit, TimeLimitConfig, TimeLimitExceeded};
12//! use std::time::Duration;
13//!
14//! // Create a time limit config
15//! let config = TimeLimitConfig::new()
16//!     .with_soft_limit(Duration::from_secs(30))
17//!     .with_hard_limit(Duration::from_secs(60));
18//!
19//! assert_eq!(config.soft_limit(), Some(Duration::from_secs(30)));
20//! assert_eq!(config.hard_limit(), Some(Duration::from_secs(60)));
21//! ```
22
23use serde::{Deserialize, Serialize};
24use std::collections::HashMap;
25use std::sync::{Arc, RwLock};
26use std::time::{Duration, Instant};
27
28/// Time limit configuration for a task
29#[derive(Debug, Clone, Serialize, Deserialize, Default)]
30pub struct TimeLimitConfig {
31    /// Soft time limit in seconds (warning before kill)
32    #[serde(skip_serializing_if = "Option::is_none")]
33    pub soft_seconds: Option<u64>,
34    /// Hard time limit in seconds (force kill)
35    #[serde(skip_serializing_if = "Option::is_none")]
36    pub hard_seconds: Option<u64>,
37}
38
39impl TimeLimitConfig {
40    /// Create a new time limit configuration with no limits
41    #[must_use]
42    pub fn new() -> Self {
43        Self::default()
44    }
45
46    /// Set the soft time limit
47    #[must_use]
48    pub fn with_soft_limit(mut self, duration: Duration) -> Self {
49        self.soft_seconds = Some(duration.as_secs());
50        self
51    }
52
53    /// Set the hard time limit
54    #[must_use]
55    pub fn with_hard_limit(mut self, duration: Duration) -> Self {
56        self.hard_seconds = Some(duration.as_secs());
57        self
58    }
59
60    /// Set both soft and hard limits
61    #[must_use]
62    pub fn with_limits(mut self, soft: Duration, hard: Duration) -> Self {
63        self.soft_seconds = Some(soft.as_secs());
64        self.hard_seconds = Some(hard.as_secs());
65        self
66    }
67
68    /// Get the soft limit as Duration
69    pub fn soft_limit(&self) -> Option<Duration> {
70        self.soft_seconds.map(Duration::from_secs)
71    }
72
73    /// Get the hard limit as Duration
74    pub fn hard_limit(&self) -> Option<Duration> {
75        self.hard_seconds.map(Duration::from_secs)
76    }
77
78    /// Check if any time limit is configured
79    #[inline]
80    #[must_use]
81    pub const fn has_limits(&self) -> bool {
82        self.soft_seconds.is_some() || self.hard_seconds.is_some()
83    }
84
85    /// Merge with another config, taking non-None values from the other
86    #[must_use]
87    pub fn merge(&self, other: &TimeLimitConfig) -> TimeLimitConfig {
88        TimeLimitConfig {
89            soft_seconds: other.soft_seconds.or(self.soft_seconds),
90            hard_seconds: other.hard_seconds.or(self.hard_seconds),
91        }
92    }
93}
94
95/// Error type for time limit exceeded
96#[derive(Debug, Clone, Serialize, Deserialize)]
97pub enum TimeLimitExceeded {
98    /// Soft time limit exceeded (warning)
99    SoftLimitExceeded {
100        /// Task ID
101        task_id: String,
102        /// Elapsed time in seconds
103        elapsed_seconds: u64,
104        /// Configured soft limit in seconds
105        limit_seconds: u64,
106    },
107    /// Hard time limit exceeded (force kill)
108    HardLimitExceeded {
109        /// Task ID
110        task_id: String,
111        /// Elapsed time in seconds
112        elapsed_seconds: u64,
113        /// Configured hard limit in seconds
114        limit_seconds: u64,
115    },
116}
117
118impl std::fmt::Display for TimeLimitExceeded {
119    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
120        match self {
121            Self::SoftLimitExceeded {
122                task_id,
123                elapsed_seconds,
124                limit_seconds,
125            } => {
126                write!(
127                    f,
128                    "Soft time limit exceeded for task {task_id}: {elapsed_seconds}s elapsed (limit: {limit_seconds}s)"
129                )
130            }
131            Self::HardLimitExceeded {
132                task_id,
133                elapsed_seconds,
134                limit_seconds,
135            } => {
136                write!(
137                    f,
138                    "Hard time limit exceeded for task {task_id}: {elapsed_seconds}s elapsed (limit: {limit_seconds}s)"
139                )
140            }
141        }
142    }
143}
144
145impl std::error::Error for TimeLimitExceeded {}
146
147/// Status of time limit check
148#[derive(Debug, Clone, PartialEq)]
149pub enum TimeLimitStatus {
150    /// No limit configured or within limits
151    Ok,
152    /// Soft limit exceeded (warning)
153    SoftLimitExceeded,
154    /// Hard limit exceeded (force kill)
155    HardLimitExceeded,
156}
157
158/// Time limit tracker for a running task
159#[derive(Debug, Clone)]
160pub struct TimeLimit {
161    /// Task ID being tracked
162    task_id: String,
163    /// Time limit configuration
164    config: TimeLimitConfig,
165    /// When the task started
166    started_at: Instant,
167    /// Whether soft limit warning has been emitted
168    soft_limit_warned: bool,
169}
170
171impl TimeLimit {
172    /// Create a new time limit tracker
173    pub fn new(task_id: impl Into<String>, config: TimeLimitConfig) -> Self {
174        Self {
175            task_id: task_id.into(),
176            config,
177            started_at: Instant::now(),
178            soft_limit_warned: false,
179        }
180    }
181
182    /// Create with specific start time (for testing)
183    pub fn with_start_time(
184        task_id: impl Into<String>,
185        config: TimeLimitConfig,
186        started_at: Instant,
187    ) -> Self {
188        Self {
189            task_id: task_id.into(),
190            config,
191            started_at,
192            soft_limit_warned: false,
193        }
194    }
195
196    /// Get elapsed time since task started
197    #[must_use]
198    pub fn elapsed(&self) -> Duration {
199        self.started_at.elapsed()
200    }
201
202    /// Get elapsed time in seconds
203    #[inline]
204    #[must_use]
205    pub fn elapsed_seconds(&self) -> u64 {
206        self.elapsed().as_secs()
207    }
208
209    /// Check current time limit status
210    #[must_use]
211    pub fn check(&self) -> TimeLimitStatus {
212        let elapsed = self.elapsed();
213
214        // Check hard limit first
215        if let Some(hard_limit) = self.config.hard_limit() {
216            if elapsed >= hard_limit {
217                return TimeLimitStatus::HardLimitExceeded;
218            }
219        }
220
221        // Check soft limit
222        if let Some(soft_limit) = self.config.soft_limit() {
223            if elapsed >= soft_limit {
224                return TimeLimitStatus::SoftLimitExceeded;
225            }
226        }
227
228        TimeLimitStatus::Ok
229    }
230
231    /// Check and return error if limit exceeded
232    #[must_use]
233    pub fn check_exceeded(&self) -> Option<TimeLimitExceeded> {
234        let elapsed_seconds = self.elapsed_seconds();
235
236        // Check hard limit first
237        if let Some(limit_seconds) = self.config.hard_seconds {
238            if elapsed_seconds >= limit_seconds {
239                return Some(TimeLimitExceeded::HardLimitExceeded {
240                    task_id: self.task_id.clone(),
241                    elapsed_seconds,
242                    limit_seconds,
243                });
244            }
245        }
246
247        // Check soft limit
248        if let Some(limit_seconds) = self.config.soft_seconds {
249            if elapsed_seconds >= limit_seconds {
250                return Some(TimeLimitExceeded::SoftLimitExceeded {
251                    task_id: self.task_id.clone(),
252                    elapsed_seconds,
253                    limit_seconds,
254                });
255            }
256        }
257
258        None
259    }
260
261    /// Check if soft limit was already warned
262    #[inline]
263    #[must_use]
264    pub const fn soft_limit_warned(&self) -> bool {
265        self.soft_limit_warned
266    }
267
268    /// Mark soft limit as warned
269    pub fn mark_soft_limit_warned(&mut self) {
270        self.soft_limit_warned = true;
271    }
272
273    /// Get remaining time until soft limit
274    #[must_use]
275    pub fn time_until_soft_limit(&self) -> Option<Duration> {
276        self.config.soft_limit().and_then(|limit| {
277            let elapsed = self.elapsed();
278            if elapsed < limit {
279                Some(limit - elapsed)
280            } else {
281                None
282            }
283        })
284    }
285
286    /// Get remaining time until hard limit
287    #[must_use]
288    pub fn time_until_hard_limit(&self) -> Option<Duration> {
289        self.config.hard_limit().and_then(|limit| {
290            let elapsed = self.elapsed();
291            if elapsed < limit {
292                Some(limit - elapsed)
293            } else {
294                None
295            }
296        })
297    }
298
299    /// Get the task ID
300    #[inline]
301    #[must_use]
302    pub fn task_id(&self) -> &str {
303        &self.task_id
304    }
305
306    /// Get the configuration
307    #[inline]
308    #[must_use]
309    pub fn config(&self) -> &TimeLimitConfig {
310        &self.config
311    }
312}
313
314/// Per-task time limit manager
315///
316/// Manages time limits for multiple task types, allowing different
317/// limits per task name.
318#[derive(Debug, Default)]
319pub struct TaskTimeLimits {
320    /// Per-task time limits (`task_name` -> config)
321    limits: HashMap<String, TimeLimitConfig>,
322    /// Default time limit for tasks without specific configuration
323    default_config: Option<TimeLimitConfig>,
324}
325
326impl TaskTimeLimits {
327    /// Create a new task time limits manager
328    #[must_use]
329    pub fn new() -> Self {
330        Self::default()
331    }
332
333    /// Create with a default time limit for all tasks
334    #[must_use]
335    pub fn with_default(config: TimeLimitConfig) -> Self {
336        Self {
337            limits: HashMap::new(),
338            default_config: Some(config),
339        }
340    }
341
342    /// Set time limit for a specific task type
343    pub fn set_task_limit(&mut self, task_name: impl Into<String>, config: TimeLimitConfig) {
344        self.limits.insert(task_name.into(), config);
345    }
346
347    /// Remove time limit for a specific task type
348    pub fn remove_task_limit(&mut self, task_name: &str) {
349        self.limits.remove(task_name);
350    }
351
352    /// Get time limit configuration for a task
353    #[must_use]
354    pub fn get_limit(&self, task_name: &str) -> Option<&TimeLimitConfig> {
355        self.limits.get(task_name).or(self.default_config.as_ref())
356    }
357
358    /// Check if a task type has time limits configured
359    #[must_use]
360    pub fn has_limit(&self, task_name: &str) -> bool {
361        self.limits.contains_key(task_name) || self.default_config.is_some()
362    }
363
364    /// Create a time limit tracker for a task
365    #[must_use]
366    pub fn create_tracker(&self, task_id: &str, task_name: &str) -> Option<TimeLimit> {
367        self.get_limit(task_name)
368            .filter(|c| c.has_limits())
369            .map(|config| TimeLimit::new(task_id, config.clone()))
370    }
371
372    /// Set the default time limit configuration
373    pub fn set_default(&mut self, config: TimeLimitConfig) {
374        self.default_config = Some(config);
375    }
376
377    /// Clear all configurations
378    pub fn clear(&mut self) {
379        self.limits.clear();
380        self.default_config = None;
381    }
382}
383
384/// Thread-safe per-worker time limits manager
385#[derive(Debug, Clone, Default)]
386pub struct WorkerTimeLimits {
387    inner: Arc<RwLock<TaskTimeLimits>>,
388}
389
390impl WorkerTimeLimits {
391    /// Create a new worker time limits manager
392    #[must_use]
393    pub fn new() -> Self {
394        Self::default()
395    }
396
397    /// Create with a default time limit
398    #[must_use]
399    pub fn with_default(config: TimeLimitConfig) -> Self {
400        Self {
401            inner: Arc::new(RwLock::new(TaskTimeLimits::with_default(config))),
402        }
403    }
404
405    /// Set time limit for a specific task type
406    pub fn set_task_limit(&self, task_name: impl Into<String>, config: TimeLimitConfig) {
407        if let Ok(mut guard) = self.inner.write() {
408            guard.set_task_limit(task_name, config);
409        }
410    }
411
412    /// Remove time limit for a specific task type
413    pub fn remove_task_limit(&self, task_name: &str) {
414        if let Ok(mut guard) = self.inner.write() {
415            guard.remove_task_limit(task_name);
416        }
417    }
418
419    /// Create a time limit tracker for a task
420    #[must_use]
421    pub fn create_tracker(&self, task_id: &str, task_name: &str) -> Option<TimeLimit> {
422        if let Ok(guard) = self.inner.read() {
423            guard.create_tracker(task_id, task_name)
424        } else {
425            None
426        }
427    }
428
429    /// Check if a task type has time limits configured
430    #[must_use]
431    pub fn has_limit(&self, task_name: &str) -> bool {
432        if let Ok(guard) = self.inner.read() {
433            guard.has_limit(task_name)
434        } else {
435            false
436        }
437    }
438
439    /// Set the default time limit configuration
440    pub fn set_default(&self, config: TimeLimitConfig) {
441        if let Ok(mut guard) = self.inner.write() {
442            guard.set_default(config);
443        }
444    }
445}
446
447/// Serializable time limit configuration for config files
448#[derive(Debug, Clone, Serialize, Deserialize, Default)]
449pub struct TimeLimitSettings {
450    /// Default soft time limit in seconds
451    #[serde(default, skip_serializing_if = "Option::is_none")]
452    pub default_soft_limit: Option<u64>,
453    /// Default hard time limit in seconds
454    #[serde(default, skip_serializing_if = "Option::is_none")]
455    pub default_hard_limit: Option<u64>,
456    /// Per-task time limits (`task_name` -> config)
457    #[serde(default, skip_serializing_if = "HashMap::is_empty")]
458    pub task_limits: HashMap<String, TimeLimitConfig>,
459}
460
461impl TimeLimitSettings {
462    /// Create a new empty settings
463    #[must_use]
464    pub fn new() -> Self {
465        Self::default()
466    }
467
468    /// Create a `TaskTimeLimits` from settings
469    #[must_use]
470    pub fn into_task_time_limits(self) -> TaskTimeLimits {
471        let default_config =
472            if self.default_soft_limit.is_some() || self.default_hard_limit.is_some() {
473                Some(TimeLimitConfig {
474                    soft_seconds: self.default_soft_limit,
475                    hard_seconds: self.default_hard_limit,
476                })
477            } else {
478                None
479            };
480
481        TaskTimeLimits {
482            limits: self.task_limits,
483            default_config,
484        }
485    }
486}
487
488#[cfg(test)]
489mod tests {
490    use super::*;
491    use std::thread;
492
493    #[test]
494    fn test_time_limit_config() {
495        let config = TimeLimitConfig::new()
496            .with_soft_limit(Duration::from_secs(30))
497            .with_hard_limit(Duration::from_secs(60));
498
499        assert_eq!(config.soft_limit(), Some(Duration::from_secs(30)));
500        assert_eq!(config.hard_limit(), Some(Duration::from_secs(60)));
501        assert!(config.has_limits());
502    }
503
504    #[test]
505    fn test_time_limit_config_no_limits() {
506        let config = TimeLimitConfig::new();
507        assert!(!config.has_limits());
508        assert_eq!(config.soft_limit(), None);
509        assert_eq!(config.hard_limit(), None);
510    }
511
512    #[test]
513    fn test_time_limit_tracker() {
514        let config = TimeLimitConfig::new()
515            .with_soft_limit(Duration::from_secs(5))
516            .with_hard_limit(Duration::from_secs(10));
517
518        let tracker = TimeLimit::new("task-123", config);
519        assert_eq!(tracker.task_id(), "task-123");
520        assert_eq!(tracker.check(), TimeLimitStatus::Ok);
521    }
522
523    #[test]
524    fn test_time_limit_soft_exceeded() {
525        let config = TimeLimitConfig::new().with_soft_limit(Duration::from_millis(10));
526
527        let tracker = TimeLimit::new("task-123", config);
528
529        // Wait for soft limit to be exceeded
530        thread::sleep(Duration::from_millis(15));
531
532        assert_eq!(tracker.check(), TimeLimitStatus::SoftLimitExceeded);
533    }
534
535    #[test]
536    fn test_time_limit_hard_exceeded() {
537        let config = TimeLimitConfig::new()
538            .with_soft_limit(Duration::from_millis(5))
539            .with_hard_limit(Duration::from_millis(10));
540
541        let tracker = TimeLimit::new("task-123", config);
542
543        // Wait for hard limit to be exceeded
544        thread::sleep(Duration::from_millis(15));
545
546        assert_eq!(tracker.check(), TimeLimitStatus::HardLimitExceeded);
547    }
548
549    #[test]
550    fn test_time_limit_exceeded_error() {
551        let config = TimeLimitConfig::new().with_soft_limit(Duration::from_millis(10));
552
553        let tracker = TimeLimit::new("task-123", config);
554        thread::sleep(Duration::from_millis(15));
555
556        let error = tracker.check_exceeded();
557        assert!(error.is_some());
558        assert!(matches!(
559            error,
560            Some(TimeLimitExceeded::SoftLimitExceeded { .. })
561        ));
562    }
563
564    #[test]
565    fn test_task_time_limits() {
566        let mut limits = TaskTimeLimits::new();
567
568        limits.set_task_limit(
569            "slow.task",
570            TimeLimitConfig::new()
571                .with_soft_limit(Duration::from_secs(60))
572                .with_hard_limit(Duration::from_secs(120)),
573        );
574
575        limits.set_task_limit(
576            "fast.task",
577            TimeLimitConfig::new().with_hard_limit(Duration::from_secs(10)),
578        );
579
580        assert!(limits.has_limit("slow.task"));
581        assert!(limits.has_limit("fast.task"));
582        assert!(!limits.has_limit("unknown.task"));
583
584        let slow_config = limits.get_limit("slow.task").unwrap();
585        assert_eq!(slow_config.soft_seconds, Some(60));
586        assert_eq!(slow_config.hard_seconds, Some(120));
587    }
588
589    #[test]
590    fn test_task_time_limits_default() {
591        let limits = TaskTimeLimits::with_default(
592            TimeLimitConfig::new().with_hard_limit(Duration::from_secs(300)),
593        );
594
595        // Unknown task should get default limit
596        assert!(limits.has_limit("any.task"));
597        let config = limits.get_limit("any.task").unwrap();
598        assert_eq!(config.hard_seconds, Some(300));
599    }
600
601    #[test]
602    fn test_create_tracker() {
603        let mut limits = TaskTimeLimits::new();
604        limits.set_task_limit(
605            "my.task",
606            TimeLimitConfig::new().with_hard_limit(Duration::from_secs(60)),
607        );
608
609        let tracker = limits.create_tracker("task-id-123", "my.task");
610        assert!(tracker.is_some());
611
612        let tracker = limits.create_tracker("task-id-456", "unknown.task");
613        assert!(tracker.is_none());
614    }
615
616    #[test]
617    fn test_time_remaining() {
618        let config = TimeLimitConfig::new()
619            .with_soft_limit(Duration::from_secs(30))
620            .with_hard_limit(Duration::from_secs(60));
621
622        let tracker = TimeLimit::new("task-123", config);
623
624        let soft_remaining = tracker.time_until_soft_limit();
625        assert!(soft_remaining.is_some());
626        assert!(soft_remaining.unwrap() <= Duration::from_secs(30));
627
628        let hard_remaining = tracker.time_until_hard_limit();
629        assert!(hard_remaining.is_some());
630        assert!(hard_remaining.unwrap() <= Duration::from_secs(60));
631    }
632
633    #[test]
634    fn test_config_merge() {
635        let base = TimeLimitConfig::new()
636            .with_soft_limit(Duration::from_secs(30))
637            .with_hard_limit(Duration::from_secs(60));
638
639        let override_config = TimeLimitConfig {
640            soft_seconds: Some(15),
641            hard_seconds: None,
642        };
643
644        let merged = base.merge(&override_config);
645        assert_eq!(merged.soft_seconds, Some(15)); // Overridden
646        assert_eq!(merged.hard_seconds, Some(60)); // From base
647    }
648
649    #[test]
650    fn test_soft_limit_warned() {
651        let config = TimeLimitConfig::new().with_soft_limit(Duration::from_secs(30));
652
653        let mut tracker = TimeLimit::new("task-123", config);
654        assert!(!tracker.soft_limit_warned());
655
656        tracker.mark_soft_limit_warned();
657        assert!(tracker.soft_limit_warned());
658    }
659
660    #[test]
661    fn test_time_limit_settings_serialization() {
662        let mut settings = TimeLimitSettings::new();
663        settings.default_soft_limit = Some(30);
664        settings.default_hard_limit = Some(60);
665        settings.task_limits.insert(
666            "slow.task".to_string(),
667            TimeLimitConfig {
668                soft_seconds: Some(120),
669                hard_seconds: Some(300),
670            },
671        );
672
673        let json = serde_json::to_string(&settings).unwrap();
674        let parsed: TimeLimitSettings = serde_json::from_str(&json).unwrap();
675
676        assert_eq!(parsed.default_soft_limit, Some(30));
677        assert_eq!(parsed.default_hard_limit, Some(60));
678        assert!(parsed.task_limits.contains_key("slow.task"));
679    }
680
681    #[test]
682    fn test_worker_time_limits_thread_safe() {
683        let limits = WorkerTimeLimits::new();
684        limits.set_task_limit(
685            "my.task",
686            TimeLimitConfig::new().with_hard_limit(Duration::from_secs(60)),
687        );
688
689        let limits_clone = limits.clone();
690
691        // Spawn multiple threads to test thread safety
692        let handles: Vec<_> = (0..4)
693            .map(|i| {
694                let l = limits_clone.clone();
695                thread::spawn(move || {
696                    for _ in 0..10 {
697                        let _ = l.has_limit("my.task");
698                        let _ = l.create_tracker(&format!("task-{i}"), "my.task");
699                    }
700                })
701            })
702            .collect();
703
704        for handle in handles {
705            handle.join().unwrap();
706        }
707
708        assert!(limits.has_limit("my.task"));
709    }
710
711    #[test]
712    fn test_into_task_time_limits() {
713        let mut settings = TimeLimitSettings::new();
714        settings.default_soft_limit = Some(30);
715        settings.default_hard_limit = Some(60);
716        settings.task_limits.insert(
717            "custom.task".to_string(),
718            TimeLimitConfig {
719                soft_seconds: Some(10),
720                hard_seconds: Some(20),
721            },
722        );
723
724        let limits = settings.into_task_time_limits();
725
726        // Default should be applied
727        let default = limits.get_limit("any.task").unwrap();
728        assert_eq!(default.soft_seconds, Some(30));
729        assert_eq!(default.hard_seconds, Some(60));
730
731        // Custom should override
732        let custom = limits.get_limit("custom.task").unwrap();
733        assert_eq!(custom.soft_seconds, Some(10));
734        assert_eq!(custom.hard_seconds, Some(20));
735    }
736}