#![cfg(test)]
use super::{JetStreamContext, StorageType, StreamConfig};
use std::time::Duration;
fn init_test(name: &str) {
crate::test_utils::init_test_logging();
crate::test_phase!(name);
}
fn test_complete(name: &str) {
crate::test_complete!(name);
}
#[test]
fn audit_jetstream_dedup_window_serializes_nanoseconds() {
init_test("audit_jetstream_dedup_window_serializes_nanoseconds");
let dedup_window = Duration::from_secs(10); let config = StreamConfig::new("AUDIT_DEDUP_BOUNDARY")
.subjects(&["audit.dedup.boundary"])
.storage(StorageType::Memory)
.duplicate_window(dedup_window);
let json = config.to_json();
let expected_nanos = dedup_window.as_nanos();
assert!(
json.contains(&format!("\"duplicate_window\":{}", expected_nanos)),
"StreamConfig JSON must include duplicate_window in nanoseconds. \
Expected: {}, JSON: {}",
expected_nanos,
json
);
assert_eq!(
expected_nanos, 10_000_000_000_u128,
"10 second window should be exactly 10 billion nanoseconds"
);
test_complete("audit_jetstream_dedup_window_serializes_nanoseconds");
}
#[test]
fn audit_jetstream_msg_id_header_format() {
init_test("audit_jetstream_msg_id_header_format");
let test_msg_ids = vec![
"unique-id-123",
"uuid-550e8400-e29b-41d4-a716-446655440000",
"timestamp-1234567890-counter-001",
"app-specific-id-with-dashes",
];
for msg_id in &test_msg_ids {
assert!(
!msg_id.is_empty(),
"Message ID must be non-empty: '{}'",
msg_id
);
assert!(
msg_id.len() <= 256,
"Message ID should be reasonable length: '{}'",
msg_id
);
assert!(
msg_id.chars().all(|c| c.is_ascii_graphic() || c == '-'),
"Message ID should use ASCII-safe characters: '{}'",
msg_id
);
}
let _empty_id = "";
test_complete("audit_jetstream_msg_id_header_format");
}
#[test]
fn audit_jetstream_duplicate_flag_parsing() {
init_test("audit_jetstream_duplicate_flag_parsing");
let test_cases = vec![
(r#"{"stream":"TEST","seq":1,"duplicate":true}"#, true),
(r#"{"stream":"TEST","seq":1,"duplicate":false}"#, false),
(r#"{"stream":"TEST","seq":1,"duplicate" : true}"#, true),
(r#"{"stream":"TEST","seq":1,"duplicate": false }"#, false),
(r#"{"duplicate":true,"stream":"TEST","seq":1}"#, true),
(r#"{"seq":1,"duplicate":false,"stream":"TEST"}"#, false),
(r#"{"stream":"TEST","seq":1}"#, false),
(
r#"{"stream":"TEST","seq":1,"duplicate":true,"extra":"ignored"}"#,
true,
),
];
for (json_payload, expected_duplicate) in test_cases {
let ack_result = JetStreamContext::parse_pub_ack(json_payload.as_bytes());
match ack_result {
Ok(ack) => {
assert_eq!(
ack.duplicate, expected_duplicate,
"Duplicate flag parsing mismatch for JSON: {}. \
Expected: {}, Got: {}",
json_payload, expected_duplicate, ack.duplicate
);
}
Err(e) => {
panic!(
"Failed to parse valid PubAck JSON: {}. Error: {:?}",
json_payload, e
);
}
}
}
test_complete("audit_jetstream_duplicate_flag_parsing");
}
#[test]
fn audit_jetstream_dedup_window_edge_cases() {
init_test("audit_jetstream_dedup_window_edge_cases");
let min_window = Duration::from_nanos(1);
let min_config = StreamConfig::new("TEST").duplicate_window(min_window);
let min_json = min_config.to_json();
assert!(
min_json.contains("\"duplicate_window\":1"),
"Minimum 1ns window should serialize correctly: {}",
min_json
);
let max_window = Duration::from_secs(24 * 60 * 60); let max_config = StreamConfig::new("TEST").duplicate_window(max_window);
let max_json = max_config.to_json();
let expected_max_nanos = max_window.as_nanos();
assert!(
max_json.contains(&format!("\"duplicate_window\":{}", expected_max_nanos)),
"Maximum 24h window should serialize correctly: {}",
max_json
);
let zero_window = Duration::from_nanos(0);
let zero_config = StreamConfig::new("TEST").duplicate_window(zero_window);
let zero_json = zero_config.to_json();
assert!(
zero_json.contains("\"duplicate_window\":0"),
"Zero window should serialize as 0: {}",
zero_json
);
let none_config = StreamConfig::new("TEST");
let none_json = none_config.to_json();
assert!(
!none_json.contains("duplicate_window"),
"None window should omit duplicate_window field: {}",
none_json
);
test_complete("audit_jetstream_dedup_window_edge_cases");
}
#[test]
fn audit_jetstream_boundary_model_is_server_owned() {
init_test("audit_jetstream_boundary_model_is_server_owned");
let window_duration = Duration::from_millis(100);
let boundary_nanoseconds = window_duration.as_nanos();
let scenarios = vec![
(0, "immediate resubmission"),
(boundary_nanoseconds / 2, "mid window"),
(boundary_nanoseconds - 1, "just before boundary"),
];
for (offset_ns, description) in scenarios {
assert!(
offset_ns < boundary_nanoseconds,
"Scenario '{}' at {}ns should be strictly inside the configured {}ns window",
description,
offset_ns,
boundary_nanoseconds
);
}
assert_eq!(
boundary_nanoseconds, 100_000_000,
"100ms window must be exactly 100,000,000 nanoseconds"
);
test_complete("audit_jetstream_boundary_model_is_server_owned");
}
#[test]
#[ignore = "requires real NATS server - enable for full boundary compliance audit"]
fn audit_jetstream_real_server_boundary_behavior() {
init_test("audit_jetstream_real_server_boundary_behavior");
test_complete("audit_jetstream_real_server_boundary_behavior");
}