Skip to main content

durable_execution_sdk/
termination.rs

1//! Graceful termination management for Lambda timeout detection.
2//!
3//! This module provides [`TerminationManager`] which proactively detects
4//! approaching Lambda timeouts and signals the runtime to flush checkpoints
5//! and return a PENDING response before the Lambda hard-kills the process.
6//!
7//! # Requirements
8//!
9//! - 5.1: wait_for_timeout resolves when deadline - safety_margin is reached
10//! - 5.2: On timeout, flush batcher and return PENDING
11//! - 5.3: Handler completes normally when no timeout
12//! - 5.4: Default safety margin is 5 seconds
13//! - 5.5: Immediate resolution when remaining < margin
14
15use std::time::{Duration, SystemTime};
16
17/// Default safety margin in milliseconds (5 seconds).
18const DEFAULT_SAFETY_MARGIN_MS: u64 = 5000;
19
20/// Manages graceful termination before Lambda timeout.
21///
22/// Created from a Lambda context, this struct calculates how much time
23/// remains before the Lambda deadline and provides an async signal that
24/// fires before the hard timeout, giving the runtime time to flush
25/// pending checkpoints.
26pub struct TerminationManager {
27    remaining_time_ms: u64,
28    safety_margin_ms: u64,
29}
30
31impl TerminationManager {
32    /// Creates a new manager from the Lambda context deadline.
33    ///
34    /// Calculates `remaining_time_ms` from `ctx.deadline() - now`.
35    /// The safety margin defaults to 5000ms (5 seconds).
36    pub fn from_lambda_context(ctx: &lambda_runtime::Context) -> Self {
37        let deadline = ctx.deadline();
38        let now = SystemTime::now();
39
40        let remaining_time_ms = deadline
41            .duration_since(now)
42            .unwrap_or(Duration::ZERO)
43            .as_millis() as u64;
44
45        Self {
46            remaining_time_ms,
47            safety_margin_ms: DEFAULT_SAFETY_MARGIN_MS,
48        }
49    }
50
51    /// Returns a future that resolves when the timeout margin is reached.
52    ///
53    /// If the remaining time is already less than the safety margin,
54    /// this resolves immediately.
55    pub async fn wait_for_timeout(&self) {
56        let effective_ms = self.remaining_ms();
57        if effective_ms == 0 {
58            return;
59        }
60        tokio::time::sleep(Duration::from_millis(effective_ms)).await;
61    }
62
63    /// Returns the remaining time before the safety margin in milliseconds.
64    ///
65    /// Returns 0 if the remaining time is already less than the safety margin.
66    pub fn remaining_ms(&self) -> u64 {
67        self.remaining_time_ms.saturating_sub(self.safety_margin_ms)
68    }
69}
70
71#[cfg(test)]
72mod tests {
73    use super::*;
74
75    /// Helper to create a TerminationManager with specific values for testing.
76    fn make_manager(remaining_time_ms: u64, safety_margin_ms: u64) -> TerminationManager {
77        TerminationManager {
78            remaining_time_ms,
79            safety_margin_ms,
80        }
81    }
82
83    #[test]
84    fn test_remaining_ms_normal() {
85        let mgr = make_manager(10000, 5000);
86        assert_eq!(mgr.remaining_ms(), 5000);
87    }
88
89    #[test]
90    fn test_remaining_ms_less_than_margin() {
91        let mgr = make_manager(3000, 5000);
92        assert_eq!(mgr.remaining_ms(), 0);
93    }
94
95    #[test]
96    fn test_remaining_ms_equal_to_margin() {
97        let mgr = make_manager(5000, 5000);
98        assert_eq!(mgr.remaining_ms(), 0);
99    }
100
101    #[test]
102    fn test_remaining_ms_zero_remaining() {
103        let mgr = make_manager(0, 5000);
104        assert_eq!(mgr.remaining_ms(), 0);
105    }
106
107    #[test]
108    fn test_default_safety_margin() {
109        assert_eq!(DEFAULT_SAFETY_MARGIN_MS, 5000);
110    }
111
112    #[tokio::test]
113    async fn test_wait_for_timeout_fires_at_deadline_minus_margin() {
114        // 100ms remaining after margin
115        let mgr = make_manager(5100, 5000);
116        assert_eq!(mgr.remaining_ms(), 100);
117
118        let start = tokio::time::Instant::now();
119        tokio::time::pause();
120        mgr.wait_for_timeout().await;
121        let elapsed = start.elapsed();
122
123        // Should have waited ~100ms
124        assert!(
125            elapsed.as_millis() >= 100,
126            "Expected >= 100ms, got {}ms",
127            elapsed.as_millis()
128        );
129    }
130
131    #[tokio::test]
132    async fn test_wait_for_timeout_immediate_when_remaining_less_than_margin() {
133        let mgr = make_manager(3000, 5000);
134        assert_eq!(mgr.remaining_ms(), 0);
135
136        let start = tokio::time::Instant::now();
137        tokio::time::pause();
138        mgr.wait_for_timeout().await;
139        let elapsed = start.elapsed();
140
141        // Should resolve immediately (< 10ms)
142        assert!(
143            elapsed.as_millis() < 10,
144            "Expected immediate resolution, got {}ms",
145            elapsed.as_millis()
146        );
147    }
148
149    #[tokio::test]
150    async fn test_wait_for_timeout_immediate_when_zero_remaining() {
151        let mgr = make_manager(0, 5000);
152
153        let start = tokio::time::Instant::now();
154        tokio::time::pause();
155        mgr.wait_for_timeout().await;
156        let elapsed = start.elapsed();
157
158        assert!(
159            elapsed.as_millis() < 10,
160            "Expected immediate resolution, got {}ms",
161            elapsed.as_millis()
162        );
163    }
164
165    #[test]
166    fn test_from_lambda_context_uses_default_margin() {
167        use std::time::{Duration, SystemTime};
168
169        // Create a lambda context with a deadline 10 seconds from now
170        let mut ctx = lambda_runtime::Context::default();
171        let deadline = SystemTime::now() + Duration::from_secs(10);
172        ctx.deadline = deadline
173            .duration_since(SystemTime::UNIX_EPOCH)
174            .unwrap()
175            .as_millis() as u64;
176
177        let mgr = TerminationManager::from_lambda_context(&ctx);
178
179        assert_eq!(mgr.safety_margin_ms, DEFAULT_SAFETY_MARGIN_MS);
180        // remaining_ms should be approximately 5000 (10000 - 5000)
181        // Allow some tolerance for test execution time
182        assert!(
183            mgr.remaining_ms() >= 4900 && mgr.remaining_ms() <= 5100,
184            "Expected ~5000ms, got {}ms",
185            mgr.remaining_ms()
186        );
187    }
188
189    #[test]
190    fn test_from_lambda_context_past_deadline() {
191        use std::time::{Duration, SystemTime};
192
193        // Create a lambda context with a deadline in the past
194        let mut ctx = lambda_runtime::Context::default();
195        let deadline = SystemTime::now() - Duration::from_secs(1);
196        // For past deadlines, duration_since will fail, so remaining should be 0
197        ctx.deadline = deadline
198            .duration_since(SystemTime::UNIX_EPOCH)
199            .unwrap()
200            .as_millis() as u64;
201
202        let mgr = TerminationManager::from_lambda_context(&ctx);
203
204        assert_eq!(mgr.remaining_ms(), 0);
205    }
206
207    /// Integration test: verifies that when the termination manager fires before
208    /// the handler completes, the select! pattern returns PENDING output.
209    /// This simulates the exact behavior of run_durable_handler on timeout.
210    #[tokio::test]
211    async fn test_pending_output_on_simulated_timeout() {
212        use crate::lambda::DurableExecutionInvocationOutput;
213
214        tokio::time::pause();
215
216        // Termination manager with 100ms effective timeout (6100 - 5000 = 1100ms, but
217        // we use a small remaining to make the test fast)
218        let mgr = make_manager(5050, 5000); // 50ms effective
219
220        // Simulate a handler that takes much longer than the timeout
221        let handler_future = async {
222            tokio::time::sleep(Duration::from_secs(30)).await;
223            "handler_completed"
224        };
225
226        let output = tokio::select! {
227            result = handler_future => {
228                // Handler completed — this should NOT happen
229                panic!("Handler should not complete before timeout, got: {}", result);
230            }
231            _ = mgr.wait_for_timeout() => {
232                // Timeout fired — return PENDING
233                DurableExecutionInvocationOutput::pending()
234            }
235        };
236
237        assert!(output.is_pending(), "Expected PENDING output on timeout");
238    }
239
240    /// Integration test: verifies that when the handler completes before the
241    /// termination manager fires, the handler result is returned normally.
242    #[tokio::test]
243    async fn test_handler_completes_before_timeout() {
244        tokio::time::pause();
245
246        // Termination manager with plenty of time (10s effective)
247        let mgr = make_manager(15000, 5000); // 10000ms effective
248
249        // Simulate a handler that completes quickly
250        let handler_future = async {
251            tokio::time::sleep(Duration::from_millis(10)).await;
252            "handler_result"
253        };
254
255        let result = tokio::select! {
256            result = handler_future => {
257                result
258            }
259            _ = mgr.wait_for_timeout() => {
260                panic!("Timeout should not fire before handler completes");
261            }
262        };
263
264        assert_eq!(result, "handler_result");
265    }
266}