liquidwar7core 0.2.0

Liquidwar7 core logic library, low-level things which are game-engine agnostic.
Documentation
// Copyright (C) 2025 Christian Mauduit <ufoot@ufoot.org>

//! Chunked network transfer utilities.
//!
//! Provides functions for splitting large data into chunks (server-side)
//! and reassembling chunks back into complete data (client-side).
//! Useful for transferring game states over WebSocket where buffer
//! limits may cause issues with large messages.

use std::collections::HashMap;

/// Default chunk size in bytes (50KB).
///
/// Godot's WebSocketMultiplayerPeer has a default buffer size of 64KB.
/// We use 50KB chunks to stay safely under that limit without requiring
/// custom buffer configuration.
pub const DEFAULT_CHUNK_SIZE: usize = 50_000;

/// Splits data into chunks with smoothed sizes.
///
/// Instead of having one small trailing chunk, distributes bytes evenly.
/// For example, 100001 bytes with max 50000 → 3 chunks of ~33333 each,
/// not [50000, 50000, 1].
///
/// # Example
/// ```
/// use liquidwar7core::chunks::split_into_chunks;
///
/// // Exact multiple: equal chunks
/// let data = vec![1u8; 150_000];
/// let chunks = split_into_chunks(&data, 50_000);
/// assert_eq!(chunks.len(), 3);
/// assert_eq!(chunks[0].len(), 50_000);
///
/// // Non-exact: smoothed sizes (not [50000, 50000, 1])
/// let data = vec![1u8; 100_001];
/// let chunks = split_into_chunks(&data, 50_000);
/// assert_eq!(chunks.len(), 3);
/// assert_eq!(chunks[0].len(), 33334);  // base + 1 (remainder distributed)
/// assert_eq!(chunks[1].len(), 33334);
/// assert_eq!(chunks[2].len(), 33333);  // base
/// ```
pub fn split_into_chunks(data: &[u8], chunk_size: usize) -> Vec<Vec<u8>> {
    if data.is_empty() || chunk_size == 0 {
        return Vec::new();
    }

    let total_len = data.len();

    // If data fits in one chunk, return as-is
    if total_len <= chunk_size {
        return vec![data.to_vec()];
    }

    // Calculate number of chunks needed
    let num_chunks = total_len.div_ceil(chunk_size);

    // Distribute bytes evenly
    let base_size = total_len / num_chunks;
    let remainder = total_len % num_chunks;

    let mut chunks = Vec::with_capacity(num_chunks);
    let mut offset = 0;

    for i in 0..num_chunks {
        // First `remainder` chunks get one extra byte
        let this_chunk_size = if i < remainder {
            base_size + 1
        } else {
            base_size
        };

        chunks.push(data[offset..offset + this_chunk_size].to_vec());
        offset += this_chunk_size;
    }

    chunks
}

/// Manages chunk reassembly for a single message.
#[derive(Debug)]
pub struct ChunkAssembler {
    /// Chunks received so far (None = not yet received).
    chunks: Vec<Option<Vec<u8>>>,
    /// Number of chunks received.
    received_count: usize,
    /// Total chunks expected.
    total_count: usize,
}

impl ChunkAssembler {
    /// Creates a new assembler expecting `total_chunks` chunks.
    pub fn new(total_chunks: usize) -> Self {
        Self {
            chunks: vec![None; total_chunks],
            received_count: 0,
            total_count: total_chunks,
        }
    }

    /// Adds a chunk at the given index.
    ///
    /// Returns `true` if all chunks have been received and data is ready.
    /// Ignores duplicate chunks (same index received twice).
    pub fn add_chunk(&mut self, index: usize, data: Vec<u8>) -> bool {
        if index >= self.total_count {
            return false;
        }

        // Ignore duplicates
        if self.chunks[index].is_none() {
            self.chunks[index] = Some(data);
            self.received_count += 1;
        }

        self.is_complete()
    }

    /// Returns `true` if all chunks have been received.
    pub fn is_complete(&self) -> bool {
        self.received_count == self.total_count
    }

    /// Returns the number of chunks received.
    pub fn received(&self) -> usize {
        self.received_count
    }

    /// Returns the total number of chunks expected.
    pub fn total(&self) -> usize {
        self.total_count
    }

    /// Assembles and returns the complete data.
    ///
    /// Returns `None` if not all chunks have been received.
    pub fn assemble(self) -> Option<Vec<u8>> {
        if !self.is_complete() {
            return None;
        }

        let mut result = Vec::new();
        for chunk in self.chunks.into_iter().flatten() {
            result.extend(chunk);
        }
        Some(result)
    }
}

/// Manages multiple in-flight message reassemblies.
///
/// Useful when messages may arrive interleaved or out of order.
#[derive(Debug, Default)]
pub struct MultiChunkReceiver {
    /// Pending assemblers by message ID.
    pending: HashMap<i32, ChunkAssembler>,
    /// Last completed message ID (for stale cleanup).
    last_completed_id: Option<i32>,
}

impl MultiChunkReceiver {
    /// Creates a new receiver.
    pub fn new() -> Self {
        Self::default()
    }

    /// Adds a chunk for a message.
    ///
    /// Returns `Some(data)` when all chunks for the message have been received.
    /// Returns `None` if more chunks are still needed.
    ///
    /// Automatically cleans up stale incomplete messages (older than completed ones).
    pub fn add_chunk(
        &mut self,
        msg_id: i32,
        chunk_index: usize,
        total_chunks: usize,
        data: Vec<u8>,
    ) -> Option<Vec<u8>> {
        // Get or create assembler
        let assembler = self
            .pending
            .entry(msg_id)
            .or_insert_with(|| ChunkAssembler::new(total_chunks));

        // Validate total matches
        if assembler.total() != total_chunks {
            return None;
        }

        // Add chunk
        if !assembler.add_chunk(chunk_index, data) {
            return None;
        }

        // Complete - remove and assemble
        let assembler = self.pending.remove(&msg_id)?;
        let result = assembler.assemble();

        // Track completion and cleanup stale
        self.last_completed_id = Some(msg_id);
        self.cleanup_stale(msg_id);

        result
    }

    /// Removes incomplete messages older than the given ID.
    fn cleanup_stale(&mut self, current_id: i32) {
        self.pending.retain(|&id, _| id >= current_id);
    }

    /// Returns the number of pending (incomplete) messages.
    pub fn pending_count(&self) -> usize {
        self.pending.len()
    }

    /// Clears all pending messages.
    pub fn clear(&mut self) {
        self.pending.clear();
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_split_into_chunks_exact() {
        let data: Vec<u8> = (0..150).collect();
        let chunks = split_into_chunks(&data, 50);
        assert_eq!(chunks.len(), 3);
        assert_eq!(chunks[0], (0..50).collect::<Vec<u8>>());
        assert_eq!(chunks[1], (50..100).collect::<Vec<u8>>());
        assert_eq!(chunks[2], (100..150).collect::<Vec<u8>>());
    }

    #[test]
    fn test_split_into_chunks_smoothed() {
        // 120 bytes, max 50 → 3 chunks of 40 each (smoothed)
        let data: Vec<u8> = (0..120).collect();
        let chunks = split_into_chunks(&data, 50);
        assert_eq!(chunks.len(), 3);
        assert_eq!(chunks[0].len(), 40);
        assert_eq!(chunks[1].len(), 40);
        assert_eq!(chunks[2].len(), 40);

        // Verify data integrity
        let reassembled: Vec<u8> = chunks.into_iter().flatten().collect();
        assert_eq!(reassembled, data);
    }

    #[test]
    fn test_split_into_chunks_smoothed_with_remainder() {
        // 100001 bytes, max 50000 → 3 chunks: 33334, 33334, 33333
        let data: Vec<u8> = (0..100_001).map(|i| (i % 256) as u8).collect();
        let chunks = split_into_chunks(&data, 50_000);
        assert_eq!(chunks.len(), 3);
        assert_eq!(chunks[0].len(), 33334); // base + 1
        assert_eq!(chunks[1].len(), 33334); // base + 1
        assert_eq!(chunks[2].len(), 33333); // base

        // Verify data integrity
        let reassembled: Vec<u8> = chunks.into_iter().flatten().collect();
        assert_eq!(reassembled, data);
    }

    #[test]
    fn test_split_into_chunks_small() {
        let data = vec![1, 2, 3, 4, 5];
        let chunks = split_into_chunks(&data, 50);
        assert_eq!(chunks.len(), 1);
        assert_eq!(chunks[0], data);
    }

    #[test]
    fn test_split_into_chunks_empty() {
        let data: Vec<u8> = vec![];
        let chunks = split_into_chunks(&data, 50);
        assert!(chunks.is_empty());
    }

    #[test]
    fn test_chunk_assembler_in_order() {
        let mut assembler = ChunkAssembler::new(3);
        assert!(!assembler.add_chunk(0, vec![1, 2]));
        assert!(!assembler.add_chunk(1, vec![3, 4]));
        assert!(assembler.add_chunk(2, vec![5, 6]));

        let data = assembler.assemble().unwrap();
        assert_eq!(data, vec![1, 2, 3, 4, 5, 6]);
    }

    #[test]
    fn test_chunk_assembler_out_of_order() {
        let mut assembler = ChunkAssembler::new(3);
        assert!(!assembler.add_chunk(2, vec![5, 6]));
        assert!(!assembler.add_chunk(0, vec![1, 2]));
        assert!(assembler.add_chunk(1, vec![3, 4]));

        let data = assembler.assemble().unwrap();
        assert_eq!(data, vec![1, 2, 3, 4, 5, 6]);
    }

    #[test]
    fn test_chunk_assembler_duplicate_ignored() {
        let mut assembler = ChunkAssembler::new(2);
        assert!(!assembler.add_chunk(0, vec![1, 2]));
        assert!(!assembler.add_chunk(0, vec![99, 99])); // Duplicate ignored
        assert!(assembler.add_chunk(1, vec![3, 4]));

        let data = assembler.assemble().unwrap();
        assert_eq!(data, vec![1, 2, 3, 4]);
    }

    #[test]
    fn test_chunk_assembler_incomplete() {
        let mut assembler = ChunkAssembler::new(3);
        assembler.add_chunk(0, vec![1, 2]);
        assembler.add_chunk(2, vec![5, 6]);
        // Missing chunk 1

        assert!(!assembler.is_complete());
        assert!(assembler.assemble().is_none());
    }

    #[test]
    fn test_multi_chunk_receiver_single_message() {
        let mut receiver = MultiChunkReceiver::new();

        assert!(receiver.add_chunk(1, 0, 3, vec![1, 2]).is_none());
        assert!(receiver.add_chunk(1, 1, 3, vec![3, 4]).is_none());
        let result = receiver.add_chunk(1, 2, 3, vec![5, 6]);

        assert_eq!(result, Some(vec![1, 2, 3, 4, 5, 6]));
        assert_eq!(receiver.pending_count(), 0);
    }

    #[test]
    fn test_multi_chunk_receiver_interleaved() {
        let mut receiver = MultiChunkReceiver::new();

        // Start message 1
        receiver.add_chunk(1, 0, 2, vec![1, 2]);

        // Start message 2
        receiver.add_chunk(2, 0, 2, vec![10, 20]);

        // Complete message 2
        let result2 = receiver.add_chunk(2, 1, 2, vec![30, 40]);
        assert_eq!(result2, Some(vec![10, 20, 30, 40]));

        // Message 1 was cleaned up as stale (id < 2)
        assert_eq!(receiver.pending_count(), 0);
    }

    #[test]
    fn test_roundtrip_large_data() {
        let original: Vec<u8> = (0..150_000).map(|i| (i % 256) as u8).collect();
        let chunks = split_into_chunks(&original, 50_000);

        let mut receiver = MultiChunkReceiver::new();
        let mut result = None;

        for (i, chunk) in chunks.into_iter().enumerate() {
            result = receiver.add_chunk(42, i, 3, chunk);
        }

        assert_eq!(result.unwrap(), original);
    }
}