use std::time::{Duration, SystemTime};
const DEFAULT_SAFETY_MARGIN_MS: u64 = 5000;
pub struct TerminationManager {
remaining_time_ms: u64,
safety_margin_ms: u64,
}
impl TerminationManager {
pub fn from_lambda_context(ctx: &lambda_runtime::Context) -> Self {
let deadline = ctx.deadline();
let now = SystemTime::now();
let remaining_time_ms = deadline
.duration_since(now)
.unwrap_or(Duration::ZERO)
.as_millis() as u64;
Self {
remaining_time_ms,
safety_margin_ms: DEFAULT_SAFETY_MARGIN_MS,
}
}
pub async fn wait_for_timeout(&self) {
let effective_ms = self.remaining_ms();
if effective_ms == 0 {
return;
}
tokio::time::sleep(Duration::from_millis(effective_ms)).await;
}
pub fn remaining_ms(&self) -> u64 {
self.remaining_time_ms.saturating_sub(self.safety_margin_ms)
}
}
#[cfg(test)]
mod tests {
use super::*;
fn make_manager(remaining_time_ms: u64, safety_margin_ms: u64) -> TerminationManager {
TerminationManager {
remaining_time_ms,
safety_margin_ms,
}
}
#[test]
fn test_remaining_ms_normal() {
let mgr = make_manager(10000, 5000);
assert_eq!(mgr.remaining_ms(), 5000);
}
#[test]
fn test_remaining_ms_less_than_margin() {
let mgr = make_manager(3000, 5000);
assert_eq!(mgr.remaining_ms(), 0);
}
#[test]
fn test_remaining_ms_equal_to_margin() {
let mgr = make_manager(5000, 5000);
assert_eq!(mgr.remaining_ms(), 0);
}
#[test]
fn test_remaining_ms_zero_remaining() {
let mgr = make_manager(0, 5000);
assert_eq!(mgr.remaining_ms(), 0);
}
#[test]
fn test_default_safety_margin() {
assert_eq!(DEFAULT_SAFETY_MARGIN_MS, 5000);
}
#[tokio::test]
async fn test_wait_for_timeout_fires_at_deadline_minus_margin() {
let mgr = make_manager(5100, 5000);
assert_eq!(mgr.remaining_ms(), 100);
let start = tokio::time::Instant::now();
tokio::time::pause();
mgr.wait_for_timeout().await;
let elapsed = start.elapsed();
assert!(
elapsed.as_millis() >= 100,
"Expected >= 100ms, got {}ms",
elapsed.as_millis()
);
}
#[tokio::test]
async fn test_wait_for_timeout_immediate_when_remaining_less_than_margin() {
let mgr = make_manager(3000, 5000);
assert_eq!(mgr.remaining_ms(), 0);
let start = tokio::time::Instant::now();
tokio::time::pause();
mgr.wait_for_timeout().await;
let elapsed = start.elapsed();
assert!(
elapsed.as_millis() < 10,
"Expected immediate resolution, got {}ms",
elapsed.as_millis()
);
}
#[tokio::test]
async fn test_wait_for_timeout_immediate_when_zero_remaining() {
let mgr = make_manager(0, 5000);
let start = tokio::time::Instant::now();
tokio::time::pause();
mgr.wait_for_timeout().await;
let elapsed = start.elapsed();
assert!(
elapsed.as_millis() < 10,
"Expected immediate resolution, got {}ms",
elapsed.as_millis()
);
}
#[test]
fn test_from_lambda_context_uses_default_margin() {
use std::time::{Duration, SystemTime};
let mut ctx = lambda_runtime::Context::default();
let deadline = SystemTime::now() + Duration::from_secs(10);
ctx.deadline = deadline
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_millis() as u64;
let mgr = TerminationManager::from_lambda_context(&ctx);
assert_eq!(mgr.safety_margin_ms, DEFAULT_SAFETY_MARGIN_MS);
assert!(
mgr.remaining_ms() >= 4900 && mgr.remaining_ms() <= 5100,
"Expected ~5000ms, got {}ms",
mgr.remaining_ms()
);
}
#[test]
fn test_from_lambda_context_past_deadline() {
use std::time::{Duration, SystemTime};
let mut ctx = lambda_runtime::Context::default();
let deadline = SystemTime::now() - Duration::from_secs(1);
ctx.deadline = deadline
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_millis() as u64;
let mgr = TerminationManager::from_lambda_context(&ctx);
assert_eq!(mgr.remaining_ms(), 0);
}
#[tokio::test]
async fn test_pending_output_on_simulated_timeout() {
use crate::lambda::DurableExecutionInvocationOutput;
tokio::time::pause();
let mgr = make_manager(5050, 5000);
let handler_future = async {
tokio::time::sleep(Duration::from_secs(30)).await;
"handler_completed"
};
let output = tokio::select! {
result = handler_future => {
panic!("Handler should not complete before timeout, got: {}", result);
}
_ = mgr.wait_for_timeout() => {
DurableExecutionInvocationOutput::pending()
}
};
assert!(output.is_pending(), "Expected PENDING output on timeout");
}
#[tokio::test]
async fn test_handler_completes_before_timeout() {
tokio::time::pause();
let mgr = make_manager(15000, 5000);
let handler_future = async {
tokio::time::sleep(Duration::from_millis(10)).await;
"handler_result"
};
let result = tokio::select! {
result = handler_future => {
result
}
_ = mgr.wait_for_timeout() => {
panic!("Timeout should not fire before handler completes");
}
};
assert_eq!(result, "handler_result");
}
}