durable_execution_sdk/
termination.rs1use std::time::{Duration, SystemTime};
16
17const DEFAULT_SAFETY_MARGIN_MS: u64 = 5000;
19
20pub struct TerminationManager {
27 remaining_time_ms: u64,
28 safety_margin_ms: u64,
29}
30
31impl TerminationManager {
32 pub fn from_lambda_context(ctx: &lambda_runtime::Context) -> Self {
37 let deadline = ctx.deadline();
38 let now = SystemTime::now();
39
40 let remaining_time_ms = deadline
41 .duration_since(now)
42 .unwrap_or(Duration::ZERO)
43 .as_millis() as u64;
44
45 Self {
46 remaining_time_ms,
47 safety_margin_ms: DEFAULT_SAFETY_MARGIN_MS,
48 }
49 }
50
51 pub async fn wait_for_timeout(&self) {
56 let effective_ms = self.remaining_ms();
57 if effective_ms == 0 {
58 return;
59 }
60 tokio::time::sleep(Duration::from_millis(effective_ms)).await;
61 }
62
63 pub fn remaining_ms(&self) -> u64 {
67 self.remaining_time_ms.saturating_sub(self.safety_margin_ms)
68 }
69}
70
71#[cfg(test)]
72mod tests {
73 use super::*;
74
75 fn make_manager(remaining_time_ms: u64, safety_margin_ms: u64) -> TerminationManager {
77 TerminationManager {
78 remaining_time_ms,
79 safety_margin_ms,
80 }
81 }
82
83 #[test]
84 fn test_remaining_ms_normal() {
85 let mgr = make_manager(10000, 5000);
86 assert_eq!(mgr.remaining_ms(), 5000);
87 }
88
89 #[test]
90 fn test_remaining_ms_less_than_margin() {
91 let mgr = make_manager(3000, 5000);
92 assert_eq!(mgr.remaining_ms(), 0);
93 }
94
95 #[test]
96 fn test_remaining_ms_equal_to_margin() {
97 let mgr = make_manager(5000, 5000);
98 assert_eq!(mgr.remaining_ms(), 0);
99 }
100
101 #[test]
102 fn test_remaining_ms_zero_remaining() {
103 let mgr = make_manager(0, 5000);
104 assert_eq!(mgr.remaining_ms(), 0);
105 }
106
107 #[test]
108 fn test_default_safety_margin() {
109 assert_eq!(DEFAULT_SAFETY_MARGIN_MS, 5000);
110 }
111
112 #[tokio::test]
113 async fn test_wait_for_timeout_fires_at_deadline_minus_margin() {
114 let mgr = make_manager(5100, 5000);
116 assert_eq!(mgr.remaining_ms(), 100);
117
118 let start = tokio::time::Instant::now();
119 tokio::time::pause();
120 mgr.wait_for_timeout().await;
121 let elapsed = start.elapsed();
122
123 assert!(
125 elapsed.as_millis() >= 100,
126 "Expected >= 100ms, got {}ms",
127 elapsed.as_millis()
128 );
129 }
130
131 #[tokio::test]
132 async fn test_wait_for_timeout_immediate_when_remaining_less_than_margin() {
133 let mgr = make_manager(3000, 5000);
134 assert_eq!(mgr.remaining_ms(), 0);
135
136 let start = tokio::time::Instant::now();
137 tokio::time::pause();
138 mgr.wait_for_timeout().await;
139 let elapsed = start.elapsed();
140
141 assert!(
143 elapsed.as_millis() < 10,
144 "Expected immediate resolution, got {}ms",
145 elapsed.as_millis()
146 );
147 }
148
149 #[tokio::test]
150 async fn test_wait_for_timeout_immediate_when_zero_remaining() {
151 let mgr = make_manager(0, 5000);
152
153 let start = tokio::time::Instant::now();
154 tokio::time::pause();
155 mgr.wait_for_timeout().await;
156 let elapsed = start.elapsed();
157
158 assert!(
159 elapsed.as_millis() < 10,
160 "Expected immediate resolution, got {}ms",
161 elapsed.as_millis()
162 );
163 }
164
165 #[test]
166 fn test_from_lambda_context_uses_default_margin() {
167 use std::time::{Duration, SystemTime};
168
169 let mut ctx = lambda_runtime::Context::default();
171 let deadline = SystemTime::now() + Duration::from_secs(10);
172 ctx.deadline = deadline
173 .duration_since(SystemTime::UNIX_EPOCH)
174 .unwrap()
175 .as_millis() as u64;
176
177 let mgr = TerminationManager::from_lambda_context(&ctx);
178
179 assert_eq!(mgr.safety_margin_ms, DEFAULT_SAFETY_MARGIN_MS);
180 assert!(
183 mgr.remaining_ms() >= 4900 && mgr.remaining_ms() <= 5100,
184 "Expected ~5000ms, got {}ms",
185 mgr.remaining_ms()
186 );
187 }
188
189 #[test]
190 fn test_from_lambda_context_past_deadline() {
191 use std::time::{Duration, SystemTime};
192
193 let mut ctx = lambda_runtime::Context::default();
195 let deadline = SystemTime::now() - Duration::from_secs(1);
196 ctx.deadline = deadline
198 .duration_since(SystemTime::UNIX_EPOCH)
199 .unwrap()
200 .as_millis() as u64;
201
202 let mgr = TerminationManager::from_lambda_context(&ctx);
203
204 assert_eq!(mgr.remaining_ms(), 0);
205 }
206
207 #[tokio::test]
211 async fn test_pending_output_on_simulated_timeout() {
212 use crate::lambda::DurableExecutionInvocationOutput;
213
214 tokio::time::pause();
215
216 let mgr = make_manager(5050, 5000); let handler_future = async {
222 tokio::time::sleep(Duration::from_secs(30)).await;
223 "handler_completed"
224 };
225
226 let output = tokio::select! {
227 result = handler_future => {
228 panic!("Handler should not complete before timeout, got: {}", result);
230 }
231 _ = mgr.wait_for_timeout() => {
232 DurableExecutionInvocationOutput::pending()
234 }
235 };
236
237 assert!(output.is_pending(), "Expected PENDING output on timeout");
238 }
239
240 #[tokio::test]
243 async fn test_handler_completes_before_timeout() {
244 tokio::time::pause();
245
246 let mgr = make_manager(15000, 5000); let handler_future = async {
251 tokio::time::sleep(Duration::from_millis(10)).await;
252 "handler_result"
253 };
254
255 let result = tokio::select! {
256 result = handler_future => {
257 result
258 }
259 _ = mgr.wait_for_timeout() => {
260 panic!("Timeout should not fire before handler completes");
261 }
262 };
263
264 assert_eq!(result, "handler_result");
265 }
266}