mod common;
use lambda_simulator::{ShutdownReason, Simulator};
use mock_collector::{MockServer, Protocol as MockProtocol, ServerHandle};
use opentelemetry_lambda_extension::{FlushConfig, FlushManager, FlushReason, FlushStrategy};
use serial_test::serial;
use std::time::Duration;
#[test]
fn test_end_strategy_flushes_on_invocation_end() {
let config = FlushConfig {
strategy: FlushStrategy::End,
interval: Duration::from_millis(100),
max_batch_entries: 100,
max_batch_bytes: 1024,
};
let manager = FlushManager::new(config);
let reason = manager.should_flush(None, 5, false);
assert!(
reason.is_none(),
"End strategy should not flush mid-invocation"
);
let reason = manager.should_flush_on_invocation_end(5);
assert_eq!(
reason,
Some(FlushReason::InvocationEnd),
"End strategy should flush at invocation end"
);
}
#[test]
fn test_default_strategy_with_infrequent_pattern_flushes_on_end() {
let config = FlushConfig {
strategy: FlushStrategy::Default,
interval: Duration::from_millis(100),
max_batch_entries: 100,
max_batch_bytes: 1024,
};
let manager = FlushManager::new(config);
assert!(
manager.is_infrequent_pattern(),
"Empty history should be considered infrequent"
);
let reason = manager.should_flush_on_invocation_end(5);
assert_eq!(
reason,
Some(FlushReason::InvocationEnd),
"Default strategy with infrequent pattern should flush at invocation end"
);
}
#[test]
fn test_continuous_strategy_periodic_flush() {
let config = FlushConfig {
strategy: FlushStrategy::Continuous,
interval: Duration::from_millis(10),
max_batch_entries: 100,
max_batch_bytes: 1024,
};
let mut manager = FlushManager::new(config);
let reason = manager.should_flush(None, 1, false);
assert_eq!(
reason,
Some(FlushReason::Continuous),
"Continuous strategy should flush when interval elapsed"
);
manager.record_flush();
let reason = manager.should_flush(None, 1, false);
assert!(
reason.is_none(),
"Should not flush immediately after recording flush"
);
std::thread::sleep(Duration::from_millis(15));
let reason = manager.should_flush(None, 1, false);
assert_eq!(
reason,
Some(FlushReason::Continuous),
"Should flush after interval elapsed"
);
}
#[test]
fn test_periodic_strategy_flush() {
let config = FlushConfig {
strategy: FlushStrategy::Periodic,
interval: Duration::from_millis(10),
max_batch_entries: 100,
max_batch_bytes: 1024,
};
let mut manager = FlushManager::new(config);
let reason = manager.should_flush(None, 1, false);
assert_eq!(
reason,
Some(FlushReason::Periodic),
"Periodic strategy should flush when interval elapsed"
);
manager.record_flush();
std::thread::sleep(Duration::from_millis(15));
let reason = manager.should_flush(None, 1, false);
assert_eq!(
reason,
Some(FlushReason::Periodic),
"Periodic strategy should flush after interval"
);
}
#[test]
fn test_no_export_after_next_architecture() {
let config = FlushConfig {
strategy: FlushStrategy::End,
interval: Duration::from_millis(100),
max_batch_entries: 100,
max_batch_bytes: 1024,
};
let manager = FlushManager::new(config);
let reason = manager.should_flush(None, 10, false);
assert!(
reason.is_none(),
"End strategy should not flush during invocation (after /next returns)"
);
let reason = manager.should_flush_on_invocation_end(10);
assert_eq!(
reason,
Some(FlushReason::InvocationEnd),
"Flush decision should be made at invocation end, before calling /next"
);
}
#[test]
fn test_shutdown_always_flushes() {
let config = FlushConfig {
strategy: FlushStrategy::End,
interval: Duration::from_secs(60),
max_batch_entries: 100,
max_batch_bytes: 1024,
};
let manager = FlushManager::new(config);
let reason = manager.should_flush(None, 0, true);
assert_eq!(
reason,
Some(FlushReason::Shutdown),
"Shutdown should always trigger flush"
);
let reason = manager.should_flush(None, 10, true);
assert_eq!(
reason,
Some(FlushReason::Shutdown),
"Shutdown should take priority over other reasons"
);
}
#[test]
fn test_buffer_full_triggers_flush() {
let config = FlushConfig {
strategy: FlushStrategy::End, interval: Duration::from_secs(60),
max_batch_entries: 10,
max_batch_bytes: 1024,
};
let manager = FlushManager::new(config);
let reason = manager.should_flush(None, 5, false);
assert!(reason.is_none(), "Should not flush when buffer is not full");
let reason = manager.should_flush(None, 10, false);
assert_eq!(
reason,
Some(FlushReason::BufferFull),
"Should flush when buffer reaches limit"
);
let reason = manager.should_flush(None, 15, false);
assert_eq!(
reason,
Some(FlushReason::BufferFull),
"Should flush when buffer exceeds limit"
);
}
#[test]
fn test_deadline_approaching_triggers_flush() {
let config = FlushConfig {
strategy: FlushStrategy::End, interval: Duration::from_secs(60),
max_batch_entries: 100,
max_batch_bytes: 1024,
};
let manager = FlushManager::new(config);
let now_ms = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis() as i64;
let approaching_deadline = now_ms + 200;
let reason = manager.should_flush(Some(approaching_deadline), 1, false);
assert_eq!(
reason,
Some(FlushReason::Deadline),
"Should flush when deadline is approaching"
);
let distant_deadline = now_ms + 60000;
let reason = manager.should_flush(Some(distant_deadline), 1, false);
assert!(
reason.is_none(),
"Should not flush when deadline is far away"
);
}
#[test]
fn test_infrequent_pattern_detection() {
let config = FlushConfig::default();
let manager = FlushManager::new(config);
assert!(
manager.is_infrequent_pattern(),
"Empty history should be infrequent"
);
let mut manager = FlushManager::new(FlushConfig::default());
manager.record_invocation();
assert!(
manager.is_infrequent_pattern(),
"Single invocation should be infrequent"
);
}
#[test]
fn test_frequent_pattern_detection() {
let config = FlushConfig::default();
let mut manager = FlushManager::new(config);
for _ in 0..5 {
manager.record_invocation();
}
let avg = manager.average_invocation_interval();
assert!(avg.is_some(), "Should have calculated average interval");
assert!(
avg.unwrap() < Duration::from_secs(120),
"Average interval should be under 120s"
);
assert!(
!manager.is_infrequent_pattern(),
"Quick successive invocations should be frequent pattern"
);
}
#[test]
fn test_default_strategy_adapts_to_pattern() {
let config = FlushConfig {
strategy: FlushStrategy::Default,
interval: Duration::from_millis(10),
max_batch_entries: 100,
max_batch_bytes: 1024,
};
let mut manager = FlushManager::new(config);
assert!(manager.is_infrequent_pattern());
let reason = manager.should_flush_on_invocation_end(1);
assert_eq!(reason, Some(FlushReason::InvocationEnd));
for _ in 0..5 {
manager.record_invocation();
}
assert!(!manager.is_infrequent_pattern());
let reason = manager.should_flush_on_invocation_end(1);
assert!(
reason.is_none(),
"Frequent pattern should not flush at invocation end"
);
std::thread::sleep(Duration::from_millis(15));
let reason = manager.should_flush(None, 1, false);
assert_eq!(
reason,
Some(FlushReason::Continuous),
"Default with frequent pattern should use Continuous"
);
}
#[test]
fn test_timeout_escalation_to_continuous() {
let config = FlushConfig {
strategy: FlushStrategy::End,
interval: Duration::from_millis(10),
max_batch_entries: 100,
max_batch_bytes: 1024,
};
let mut manager = FlushManager::new(config);
let reason = manager.should_flush(None, 1, false);
assert!(
reason.is_none(),
"End strategy should not flush mid-invocation"
);
for i in 0..20 {
manager.record_flush_timeout();
assert_eq!(
manager.consecutive_timeout_count(),
i + 1,
"Timeout count should increment"
);
}
let reason = manager.should_flush(None, 1, false);
assert_eq!(
reason,
Some(FlushReason::Continuous),
"After 20 timeouts, should escalate to Continuous"
);
}
#[test]
fn test_successful_flush_resets_timeout_count() {
let config = FlushConfig::default();
let mut manager = FlushManager::new(config);
manager.record_flush_timeout();
manager.record_flush_timeout();
manager.record_flush_timeout();
assert_eq!(manager.consecutive_timeout_count(), 3);
manager.record_flush();
assert_eq!(
manager.consecutive_timeout_count(),
0,
"Successful flush should reset timeout count"
);
}
#[test]
fn test_no_inflight_export_during_freeze() {
let config = FlushConfig {
strategy: FlushStrategy::End,
interval: Duration::from_secs(60),
max_batch_entries: 100,
max_batch_bytes: 1024,
};
let manager = FlushManager::new(config);
let reason = manager.should_flush_on_invocation_end(10);
assert_eq!(
reason,
Some(FlushReason::InvocationEnd),
"Flush must complete before signalling readiness"
);
let reason = manager.should_flush(None, 10, false);
assert!(
reason.is_none(),
"No flush should occur during invocation when freeze might happen"
);
}
#[test]
fn test_connection_reuse_within_invocation() {
let config = opentelemetry_lambda_extension::ExporterConfig {
endpoint: Some("http://localhost:4318".to_string()),
timeout: Duration::from_secs(2),
..Default::default()
};
let exporter = opentelemetry_lambda_extension::OtlpExporter::new(config).unwrap();
assert!(
exporter.has_endpoint(),
"Exporter should be configured with endpoint"
);
}
#[test]
fn test_stale_connection_retry_logic() {
let config = FlushConfig::default();
let mut manager = FlushManager::new(config);
manager.record_flush_timeout();
assert_eq!(manager.consecutive_timeout_count(), 1);
manager.record_flush_timeout();
assert_eq!(manager.consecutive_timeout_count(), 2);
manager.record_flush();
assert_eq!(
manager.consecutive_timeout_count(),
0,
"Successful flush should reset timeout count after stale connection recovery"
);
}
#[test]
fn test_export_timeout_reasonable() {
use opentelemetry_lambda_extension::ExporterConfig;
let default_config = ExporterConfig::default();
assert!(
default_config.timeout >= Duration::from_millis(100),
"Timeout should be at least 100ms to allow for network latency"
);
assert!(
default_config.timeout <= Duration::from_secs(5),
"Timeout should not exceed 5s to avoid blocking indefinitely"
);
assert_eq!(
default_config.timeout,
Duration::from_millis(500),
"Default timeout should be 500ms"
);
let custom_config = ExporterConfig {
timeout: Duration::from_secs(2),
..Default::default()
};
assert_eq!(custom_config.timeout, Duration::from_secs(2));
}
#[test]
fn test_invocation_history_bounded() {
let config = FlushConfig::default();
let mut manager = FlushManager::new(config);
for _ in 0..30 {
manager.record_invocation();
}
assert_eq!(
manager.invocation_history_len(),
20,
"History should be bounded to 20 entries"
);
}
#[tokio::test]
#[serial]
async fn test_simulator_shutdown_triggers_extension_shutdown() {
use opentelemetry_lambda_extension::{
Compression, Config, ExporterConfig, Protocol, ReceiverConfig, RuntimeBuilder,
TelemetryApiConfig,
};
let collector = create_test_collector().await;
let collector_endpoint = format!("http://{}", collector.addr());
let simulator = create_test_simulator().await;
let runtime_api_base = simulator.runtime_api_url().replace("http://", "");
let config = Config {
exporter: ExporterConfig {
endpoint: Some(collector_endpoint),
protocol: Protocol::Http,
timeout: Duration::from_secs(2),
compression: Compression::None,
..Default::default()
},
telemetry_api: TelemetryApiConfig {
enabled: false,
..Default::default()
},
receiver: ReceiverConfig {
http_enabled: true,
http_port: 4330,
..Default::default()
},
flush: FlushConfig {
strategy: FlushStrategy::End,
..Default::default()
},
..Default::default()
};
let extension_runtime_api = runtime_api_base.clone();
unsafe {
std::env::set_var(
"AWS_LAMBDA_RUNTIME_API",
format!("http://{}", extension_runtime_api),
);
}
let extension_handle = tokio::spawn(async move {
let runtime = RuntimeBuilder::new().config(config).build();
runtime.run().await
});
simulator
.wait_for(
|| async { simulator.extension_count().await >= 1 },
Duration::from_secs(5),
)
.await
.expect("Extension did not register");
let extensions = simulator.get_registered_extensions().await;
assert!(!extensions.is_empty(), "Should have registered extension");
simulator.graceful_shutdown(ShutdownReason::Spindown).await;
let result = tokio::time::timeout(Duration::from_secs(5), extension_handle)
.await
.expect("Extension should exit within timeout");
assert!(
result.is_ok(),
"Extension should exit cleanly after shutdown: {:?}",
result
);
collector
.shutdown()
.await
.expect("Collector shutdown failed");
}
async fn create_test_collector() -> ServerHandle {
MockServer::builder()
.protocol(MockProtocol::HttpBinary)
.start()
.await
.expect("Failed to start mock collector")
}
async fn create_test_simulator() -> Simulator {
Simulator::builder()
.function_name("adaptive-export-test")
.extension_ready_timeout(Duration::from_secs(5))
.build()
.await
.expect("Failed to start simulator")
}
#[test]
fn test_time_until_next_flush() {
let config = FlushConfig {
strategy: FlushStrategy::Continuous,
interval: Duration::from_millis(100),
max_batch_entries: 100,
max_batch_bytes: 1024,
};
let mut manager = FlushManager::new(config);
assert_eq!(
manager.time_until_next_flush(),
Duration::ZERO,
"Should be zero before first flush"
);
manager.record_flush();
let remaining = manager.time_until_next_flush();
assert!(
remaining <= Duration::from_millis(100),
"Should be at most interval"
);
assert!(
remaining > Duration::ZERO,
"Should be positive after recording flush"
);
std::thread::sleep(Duration::from_millis(110));
assert_eq!(
manager.time_until_next_flush(),
Duration::ZERO,
"Should be zero after interval elapsed"
);
}