asupersync 0.3.4

Spec-first, cancel-correct, capability-secure async runtime for Rust.
Documentation
//! JetStream deduplication window boundary condition audit test.
//!
//! AUDIT FINDING: FIXED - this harness previously claimed exact-boundary
//! server deduplication semantics that the client-side code cannot prove. It
//! now pins the client-owned contract: stream configs encode
//! `duplicate_window` in nanoseconds, `Nats-Msg-Id` is the dedup header, and
//! PubAck parsing preserves the server's `duplicate` flag. Exact expiration
//! behavior remains server-owned and belongs in a live JetStream conformance
//! harness.

#![cfg(test)]

use super::{JetStreamContext, StorageType, StreamConfig};
use std::time::Duration;

fn init_test(name: &str) {
    crate::test_utils::init_test_logging();
    crate::test_phase!(name);
}

fn test_complete(name: &str) {
    crate::test_complete!(name);
}

/// AUDIT: Test deduplication window duration encoding
///
/// This test verifies the client-owned part of duplicate-window behavior: a
/// configured `duplicate_window` is serialized in the nanosecond JSON field the
/// JetStream API expects. The server decides whether a given publish timestamp
/// still falls inside the rolling deduplication window.
#[test]
fn audit_jetstream_dedup_window_serializes_nanoseconds() {
    init_test("audit_jetstream_dedup_window_serializes_nanoseconds");

    // AUDIT: Verify our client sends dedup window correctly
    let dedup_window = Duration::from_secs(10); // 10 second window for testing
    let config = StreamConfig::new("AUDIT_DEDUP_BOUNDARY")
        .subjects(&["audit.dedup.boundary"])
        .storage(StorageType::Memory)
        .duplicate_window(dedup_window);

    // AUDIT: Check JSON serialization includes correct nanosecond value
    let json = config.to_json();
    let expected_nanos = dedup_window.as_nanos();
    assert!(
        json.contains(&format!("\"duplicate_window\":{}", expected_nanos)),
        "StreamConfig JSON must include duplicate_window in nanoseconds. \
         Expected: {}, JSON: {}",
        expected_nanos,
        json
    );

    // AUDIT: Verify nanosecond precision is preserved
    assert_eq!(
        expected_nanos, 10_000_000_000_u128,
        "10 second window should be exactly 10 billion nanoseconds"
    );

    test_complete("audit_jetstream_dedup_window_serializes_nanoseconds");
}

/// AUDIT: Test message ID header format compliance
///
/// Verifies that our client correctly formats the Nats-Msg-Id header
/// for deduplication as expected by JetStream server.
#[test]
fn audit_jetstream_msg_id_header_format() {
    init_test("audit_jetstream_msg_id_header_format");

    // AUDIT: Test various message ID formats that should be accepted
    let test_msg_ids = vec![
        "unique-id-123",
        "uuid-550e8400-e29b-41d4-a716-446655440000",
        "timestamp-1234567890-counter-001",
        "app-specific-id-with-dashes",
    ];

    for msg_id in &test_msg_ids {
        // AUDIT: Verify non-empty validation
        assert!(
            !msg_id.is_empty(),
            "Message ID must be non-empty: '{}'",
            msg_id
        );

        // AUDIT: Verify reasonable length (avoid server rejection)
        assert!(
            msg_id.len() <= 256,
            "Message ID should be reasonable length: '{}'",
            msg_id
        );

        // AUDIT: Verify ASCII-safe characters (avoid encoding issues)
        assert!(
            msg_id.chars().all(|c| c.is_ascii_graphic() || c == '-'),
            "Message ID should use ASCII-safe characters: '{}'",
            msg_id
        );
    }

    // AUDIT: Test empty message ID rejection
    let _empty_id = "";
    // This should be rejected by publish_with_id validation
    // Per the code: "publish_with_id: msg_id must be non-empty"

    test_complete("audit_jetstream_msg_id_header_format");
}

/// AUDIT: Test duplicate flag parsing consistency
///
/// Verifies that our client correctly parses the duplicate flag from
/// JetStream PubAck responses in all valid JSON variations.
#[test]
fn audit_jetstream_duplicate_flag_parsing() {
    init_test("audit_jetstream_duplicate_flag_parsing");

    // AUDIT: Test all valid duplicate flag formats that server might return
    let test_cases = vec![
        // Standard cases
        (r#"{"stream":"TEST","seq":1,"duplicate":true}"#, true),
        (r#"{"stream":"TEST","seq":1,"duplicate":false}"#, false),
        // With whitespace variations
        (r#"{"stream":"TEST","seq":1,"duplicate" : true}"#, true),
        (r#"{"stream":"TEST","seq":1,"duplicate": false }"#, false),
        // Field order variations
        (r#"{"duplicate":true,"stream":"TEST","seq":1}"#, true),
        (r#"{"seq":1,"duplicate":false,"stream":"TEST"}"#, false),
        // Missing duplicate field (should default to false)
        (r#"{"stream":"TEST","seq":1}"#, false),
        // With extra fields (should ignore)
        (
            r#"{"stream":"TEST","seq":1,"duplicate":true,"extra":"ignored"}"#,
            true,
        ),
    ];

    for (json_payload, expected_duplicate) in test_cases {
        let ack_result = JetStreamContext::parse_pub_ack(json_payload.as_bytes());

        match ack_result {
            Ok(ack) => {
                assert_eq!(
                    ack.duplicate, expected_duplicate,
                    "Duplicate flag parsing mismatch for JSON: {}. \
                     Expected: {}, Got: {}",
                    json_payload, expected_duplicate, ack.duplicate
                );
            }
            Err(e) => {
                panic!(
                    "Failed to parse valid PubAck JSON: {}. Error: {:?}",
                    json_payload, e
                );
            }
        }
    }

    test_complete("audit_jetstream_duplicate_flag_parsing");
}

/// AUDIT: Test window configuration edge cases
///
/// Verifies our client correctly handles edge cases in dedup window configuration
/// that could affect boundary behavior.
#[test]
fn audit_jetstream_dedup_window_edge_cases() {
    init_test("audit_jetstream_dedup_window_edge_cases");

    // AUDIT: Test minimum valid window (1 nanosecond)
    let min_window = Duration::from_nanos(1);
    let min_config = StreamConfig::new("TEST").duplicate_window(min_window);

    let min_json = min_config.to_json();
    assert!(
        min_json.contains("\"duplicate_window\":1"),
        "Minimum 1ns window should serialize correctly: {}",
        min_json
    );

    // AUDIT: Test maximum practical window (24 hours)
    let max_window = Duration::from_secs(24 * 60 * 60); // 24 hours
    let max_config = StreamConfig::new("TEST").duplicate_window(max_window);

    let max_json = max_config.to_json();
    let expected_max_nanos = max_window.as_nanos();
    assert!(
        max_json.contains(&format!("\"duplicate_window\":{}", expected_max_nanos)),
        "Maximum 24h window should serialize correctly: {}",
        max_json
    );

    // AUDIT: Test zero window (no deduplication)
    let zero_window = Duration::from_nanos(0);
    let zero_config = StreamConfig::new("TEST").duplicate_window(zero_window);

    let zero_json = zero_config.to_json();
    assert!(
        zero_json.contains("\"duplicate_window\":0"),
        "Zero window should serialize as 0: {}",
        zero_json
    );

    // AUDIT: Test None window (default - no deduplication)
    let none_config = StreamConfig::new("TEST");
    let none_json = none_config.to_json();
    assert!(
        !none_json.contains("duplicate_window"),
        "None window should omit duplicate_window field: {}",
        none_json
    );

    test_complete("audit_jetstream_dedup_window_edge_cases");
}

/// AUDIT: Test boundary condition model scope
///
/// Documents the client/server ownership boundary. The client can model
/// offsets that are strictly inside the configured duplicate window and can
/// encode the window value. It cannot prove what a live server will do at the
/// exact expiration instant without an integration harness because expiration
/// is controlled by server time and server-side dedupe state.
#[test]
fn audit_jetstream_boundary_model_is_server_owned() {
    init_test("audit_jetstream_boundary_model_is_server_owned");

    // AUDIT DOCUMENTATION: JetStream client/server ownership
    //
    // The client owns two facts:
    // - Same-message publishes use the Nats-Msg-Id header.
    // - Stream configuration sends duplicate_window in nanoseconds.
    //
    // The server owns the exact expiry comparison for an ID already present in
    // the stream's dedupe state. Do not infer an inclusive/exclusive boundary
    // here from a client-only test.

    let window_duration = Duration::from_millis(100); // 100ms = 100,000,000 ns

    // AUDIT: Document the exact boundary cases
    let boundary_nanoseconds = window_duration.as_nanos();

    // Test scenarios that are strictly inside the configured window.
    let scenarios = vec![
        (0, "immediate resubmission"),
        (boundary_nanoseconds / 2, "mid window"),
        (boundary_nanoseconds - 1, "just before boundary"),
    ];

    for (offset_ns, description) in scenarios {
        // AUDIT: Document expected behavior for each scenario
        assert!(
            offset_ns < boundary_nanoseconds,
            "Scenario '{}' at {}ns should be strictly inside the configured {}ns window",
            description,
            offset_ns,
            boundary_nanoseconds
        );
    }

    // AUDIT: Critical boundary value is still serialized exactly; live server
    // tests own the equality comparison at this timestamp.
    assert_eq!(
        boundary_nanoseconds, 100_000_000,
        "100ms window must be exactly 100,000,000 nanoseconds"
    );

    test_complete("audit_jetstream_boundary_model_is_server_owned");
}

/// AUDIT: Integration test structure for real JetStream boundary testing
///
/// Provides the framework for testing actual boundary behavior against a real
/// NATS JetStream server. This test is skipped by default but can be enabled
/// for full compliance verification.
#[test]
#[ignore = "requires real NATS server - enable for full boundary compliance audit"]
fn audit_jetstream_real_server_boundary_behavior() {
    // This test would connect to a real NATS server and verify:
    // 1. Create stream with small dedup window (e.g., 100ms)
    // 2. Publish message with ID "test-boundary-msg"
    // 3. Wait exactly 100ms (the configured window expiration point)
    // 4. Publish same message ID again
    // 5. Record whether the server returns duplicate=true or false at expiry
    // 6. Wait 1ms more (outside the configured window)
    // 7. Publish same message ID again
    // 8. Verify server returns duplicate=false outside the configured window

    init_test("audit_jetstream_real_server_boundary_behavior");

    // Test harness would go here - requires NATS_TEST_URL environment
    // See existing integration tests in the same file for pattern

    test_complete("audit_jetstream_real_server_boundary_behavior");
}