sockudo-adapter 4.7.0

Connection adapters and horizontal scaling for Sockudo
use super::*;
use std::collections::HashSet;
use std::sync::atomic::Ordering;
use std::time::Instant;
use tokio::time::{Duration, sleep, timeout};

#[tokio::test]
async fn request_cleanup_operates_on_live_pending_requests() {
    let adapter = HorizontalAdapter::new();
    adapter.requests_timeout.store(1, Ordering::Relaxed);
    adapter.start_request_cleanup();
    adapter.pending_requests.insert(
        "expired".to_string(),
        PendingRequest {
            start_time: Instant::now() - Duration::from_secs(1),
            app_id: "app".to_string(),
            responses: Vec::new(),
            notify: Arc::new(Notify::new()),
        },
    );

    timeout(Duration::from_secs(2), async {
        loop {
            if !adapter.pending_requests.contains_key("expired") {
                break;
            }
            sleep(Duration::from_millis(20)).await;
        }
    })
    .await
    .expect("cleanup should remove expired live requests");
}

#[tokio::test]
async fn send_request_wakes_on_notify_without_polling_delay() {
    let adapter = Arc::new(HorizontalAdapter::new());
    adapter.pending_requests.insert(
        "req-1".to_string(),
        PendingRequest {
            start_time: Instant::now(),
            app_id: "app".to_string(),
            responses: Vec::new(),
            notify: Arc::new(Notify::new()),
        },
    );

    let wait_adapter = Arc::clone(&adapter);
    let waiter = tokio::spawn(async move {
        wait_adapter
            .wait_for_request_responses("req-1", Instant::now(), Duration::from_millis(40), 1)
            .await
    });

    sleep(Duration::from_millis(10)).await;
    adapter
        .process_response(ResponseBody {
            request_id: "req-1".to_string(),
            node_id: "node-2".to_string(),
            app_id: "app".to_string(),
            members: AHashMap::new(),
            channels_with_sockets_count: AHashMap::new(),
            socket_ids: vec!["socket-1".to_string()],
            sockets_count: 1,
            exists: true,
            channels: HashSet::new(),
            members_count: 0,
            responses_received: 0,
            expected_responses: 0,
            complete: true,
        })
        .await
        .unwrap();

    let result = timeout(Duration::from_millis(20), waiter)
        .await
        .expect("notify should wake waiter before timeout");
    let (responses, timed_out) = result.unwrap();
    assert_eq!(responses.len(), 1);
    assert!(!timed_out, "waiter should complete via notify, not timeout");
}

#[test]
fn test_is_fire_and_forget_true_for_all_eight() {
    assert!(RequestType::PresenceMemberJoined.is_fire_and_forget());
    assert!(RequestType::PresenceMemberLeft.is_fire_and_forget());
    assert!(RequestType::PresenceMemberUpdated.is_fire_and_forget());
    assert!(RequestType::Heartbeat.is_fire_and_forget());
    assert!(RequestType::NodeDead.is_fire_and_forget());
    assert!(RequestType::PresenceStateSync.is_fire_and_forget());
    assert!(RequestType::ChannelCountUpdate.is_fire_and_forget());
    assert!(RequestType::ChannelCountSync.is_fire_and_forget());
}

#[test]
fn test_is_fire_and_forget_false_for_all_query_types() {
    assert!(!RequestType::ChannelMembers.is_fire_and_forget());
    assert!(!RequestType::ChannelSockets.is_fire_and_forget());
    assert!(!RequestType::ChannelSocketsCount.is_fire_and_forget());
    assert!(!RequestType::SocketExistsInChannel.is_fire_and_forget());
    assert!(!RequestType::TerminateUserConnections.is_fire_and_forget());
    assert!(!RequestType::ChannelsWithSocketsCount.is_fire_and_forget());
    assert!(!RequestType::Sockets.is_fire_and_forget());
    assert!(!RequestType::Channels.is_fire_and_forget());
    assert!(!RequestType::SocketsCount.is_fire_and_forget());
    assert!(!RequestType::ChannelMembersCount.is_fire_and_forget());
    assert!(!RequestType::CountUserConnectionsInChannel.is_fire_and_forget());
    assert!(!RequestType::BatchChannelSocketsCount.is_fire_and_forget());
}