use super::*;
use anyhow::{Context, Result, anyhow};
use std::fmt::Write;
use std::time::Duration;
type TestResult = Result<()>;
#[test]
fn test_scratch_buffers() {
let mut buffers = ScratchBuffers::new();
buffers.digit_buffer.push(1);
buffers.digit_buffer.push(2);
assert_eq!(buffers.digit_buffer.len(), 2);
buffers.byte_buffer.extend_from_slice(b"hello");
assert_eq!(buffers.byte_buffer.len(), 5);
buffers.string_buffer.push_str("world");
assert_eq!(buffers.string_buffer.len(), 5);
buffers.clear();
assert_eq!(buffers.digit_buffer.len(), 0);
assert_eq!(buffers.byte_buffer.len(), 0);
assert_eq!(buffers.string_buffer.len(), 0);
}
#[test]
fn test_sequence_ring_ordered() -> TestResult {
let mut ring = SequenceRing::new(10, 5);
let sender = ring.sender();
sender
.send(SequencedRecord::new(2, "second"))
.context("sending second record")?;
sender
.send(SequencedRecord::new(1, "first"))
.context("sending first record")?;
sender
.send(SequencedRecord::new(3, "third"))
.context("sending third record")?;
assert_eq!(
ring.recv_ordered().context("receiving first record")?,
Some("first")
);
assert_eq!(
ring.recv_ordered().context("receiving second record")?,
Some("second")
);
assert_eq!(
ring.recv_ordered().context("receiving third record")?,
Some("third")
);
Ok(())
}
#[test]
fn test_worker_pool() -> TestResult {
let pool = WorkerPool::new(
2, 10, 5, |input: i32, _buffers: &mut ScratchBuffers| -> i32 {
input * 2 },
);
let mut pool = pool;
for i in 1..=5 {
pool.submit(i).context("submitting work item")?;
}
for i in 1..=5 {
let result = pool
.recv_ordered()
.context("receiving result")?
.context("worker pool returned None")?;
assert_eq!(result, i * 2);
}
pool.shutdown()
.map_err(|error| anyhow!(error))
.context("shutting down worker pool")?;
Ok(())
}
#[test]
fn test_streaming_processor() {
let mut processor = StreamingProcessor::new(1);
assert!(!processor.is_memory_pressure());
processor.update_memory_usage(900 * 1024); assert!(processor.is_memory_pressure());
processor.record_processed(1024);
let stats = processor.stats();
assert_eq!(stats.records_processed, 1);
assert_eq!(stats.bytes_processed, 1024);
}
#[test]
fn test_deterministic_ordering() -> TestResult {
let test_data: Vec<i32> = (1..=10).collect();
for num_workers in [1, 2] {
let pool = WorkerPool::new(
num_workers,
10,
5,
|input: i32, _buffers: &mut ScratchBuffers| -> i32 {
if input % 3 == 0 {
std::thread::sleep(Duration::from_micros(1));
}
input * input
},
);
let mut pool = pool;
let mut results = Vec::new();
for &input in &test_data {
pool.submit(input).context("submitting input")?;
}
for _ in 0..test_data.len() {
match pool.recv_ordered() {
Ok(Some(result)) => results.push(result),
Ok(None) => break,
Err(_) => return Err(anyhow!("Worker pool error")),
}
}
pool.shutdown()
.map_err(|error| anyhow!(error))
.context("shutting down worker pool")?;
let expected: Vec<i32> = test_data.iter().map(|x| x * x).collect();
assert_eq!(results, expected);
}
Ok(())
}
#[test]
fn test_sequenced_record_creation() {
let record = SequencedRecord::new(42, "test data");
assert_eq!(record.sequence_id, 42);
assert_eq!(record.data, "test data");
}
#[test]
fn test_sequence_ring_empty() {
let mut ring = SequenceRing::<&str>::new(10, 5);
let result = ring.try_recv_ordered();
assert!(result.is_err());
}
#[test]
fn test_sequence_ring_channel_capacity() -> TestResult {
let ring = SequenceRing::new(2, 1); let sender = ring.sender();
sender.try_send(SequencedRecord::new(1, "first"))?;
sender.try_send(SequencedRecord::new(2, "second"))?;
let result = sender.try_send(SequencedRecord::new(3, "third"));
assert!(result.is_err());
Ok(())
}
#[test]
fn test_sequence_ring_stats() -> TestResult {
let mut ring = SequenceRing::new(10, 5);
let sender = ring.sender();
for i in 1..=5 {
sender.send(SequencedRecord::new(i, format!("record_{i}")))?;
}
let stats = ring.stats();
assert_eq!(stats.next_sequence_id, 1); assert_eq!(stats.reorder_buffer_size, 0);
for _ in 1..=5 {
assert!(ring.recv_ordered()?.is_some());
}
let stats = ring.stats();
assert_eq!(stats.next_sequence_id, 6); assert_eq!(stats.reorder_buffer_size, 0);
Ok(())
}
#[test]
fn test_streaming_processor_with_default_limit() {
let mut processor = StreamingProcessor::with_default_limit();
assert!(!processor.is_memory_pressure());
processor.update_memory_usage(210 * 1024 * 1024); assert!(processor.is_memory_pressure());
let stats = processor.stats();
assert_eq!(stats.max_memory_bytes, 256 * 1024 * 1024);
}
#[test]
fn test_streaming_processor_memory_tracking() {
let mut processor = StreamingProcessor::new(1);
assert!(!processor.is_memory_pressure());
processor.update_memory_usage(512 * 1024); assert_eq!(processor.stats().current_memory_bytes, 512 * 1024);
assert!(!processor.is_memory_pressure());
processor.update_memory_usage(512 * 1024); assert!(processor.is_memory_pressure());
let stats = processor.stats();
assert_eq!(stats.current_memory_bytes, 1024 * 1024);
assert_eq!(stats.max_memory_bytes, 1024 * 1024); }
#[test]
fn test_streaming_processor_record_tracking() {
let mut processor = StreamingProcessor::with_default_limit();
processor.record_processed(1024);
processor.record_processed(2048);
let stats = processor.stats();
assert_eq!(stats.records_processed, 2);
assert_eq!(stats.bytes_processed, 3072);
}
#[test]
fn test_scratch_buffers_initial_capacities() {
let scratch = ScratchBuffers::new();
assert_eq!(scratch.digit_buffer.len(), 0);
assert!(scratch.byte_buffer.capacity() >= 1024);
assert!(scratch.string_buffer.capacity() >= 512);
}
#[test]
fn test_scratch_buffers_default_trait() {
let scratch = ScratchBuffers::default();
assert_eq!(scratch.digit_buffer.len(), 0);
assert!(scratch.byte_buffer.capacity() >= 1024);
assert!(scratch.string_buffer.capacity() >= 512);
}
#[test]
fn test_scratch_buffers_clear_preserves_capacity() {
let mut scratch = ScratchBuffers::new();
scratch.byte_buffer.extend_from_slice(&[0xAA; 4096]);
scratch.string_buffer.push_str(&"x".repeat(2048));
for i in 0..64_u8 {
scratch.digit_buffer.push(i);
}
let byte_cap = scratch.byte_buffer.capacity();
let string_cap = scratch.string_buffer.capacity();
let digit_cap = scratch.digit_buffer.capacity();
scratch.clear();
assert_eq!(scratch.byte_buffer.len(), 0);
assert_eq!(scratch.string_buffer.len(), 0);
assert_eq!(scratch.digit_buffer.len(), 0);
assert_eq!(scratch.byte_buffer.capacity(), byte_cap);
assert_eq!(scratch.string_buffer.capacity(), string_cap);
assert_eq!(scratch.digit_buffer.capacity(), digit_cap);
}
#[test]
fn test_scratch_buffers_ensure_byte_capacity_growth() {
let mut scratch = ScratchBuffers::new();
let original_cap = scratch.byte_buffer.capacity();
scratch.ensure_byte_capacity(8192);
assert!(scratch.byte_buffer.capacity() >= 8192);
let cap_after_grow = scratch.byte_buffer.capacity();
scratch.ensure_byte_capacity(1024);
assert_eq!(scratch.byte_buffer.capacity(), cap_after_grow);
scratch.ensure_byte_capacity(cap_after_grow);
assert_eq!(scratch.byte_buffer.capacity(), cap_after_grow);
let _ = original_cap; }
#[test]
fn test_scratch_buffers_ensure_string_capacity_growth() {
let mut scratch = ScratchBuffers::new();
scratch.ensure_string_capacity(4096);
assert!(scratch.string_buffer.capacity() >= 4096);
let cap = scratch.string_buffer.capacity();
scratch.ensure_string_capacity(2048);
assert_eq!(scratch.string_buffer.capacity(), cap);
}
#[test]
fn test_scratch_buffers_multiple_reuse_cycles() {
let mut scratch = ScratchBuffers::new();
for cycle in 0..100 {
scratch.digit_buffer.push(u8::try_from(cycle % 10).unwrap());
scratch
.byte_buffer
.extend_from_slice(format!("record-{cycle}").as_bytes());
write!(scratch.string_buffer, "output-{cycle}").unwrap();
assert!(!scratch.byte_buffer.is_empty());
assert!(!scratch.string_buffer.is_empty());
assert!(!scratch.digit_buffer.is_empty());
scratch.clear();
assert_eq!(scratch.byte_buffer.len(), 0);
assert_eq!(scratch.string_buffer.len(), 0);
assert_eq!(scratch.digit_buffer.len(), 0);
}
assert!(scratch.byte_buffer.capacity() > 0);
assert!(scratch.string_buffer.capacity() > 0);
}
#[test]
fn test_scratch_buffers_zero_length_operations() {
let mut scratch = ScratchBuffers::new();
scratch.byte_buffer.extend_from_slice(b"");
scratch.string_buffer.push_str("");
assert_eq!(scratch.byte_buffer.len(), 0);
assert_eq!(scratch.string_buffer.len(), 0);
scratch.clear();
assert_eq!(scratch.byte_buffer.len(), 0);
scratch.ensure_byte_capacity(0);
scratch.ensure_string_capacity(0);
}
#[test]
fn test_scratch_buffers_large_allocation() {
let mut scratch = ScratchBuffers::new();
let large_data = vec![0x42_u8; 1_048_576];
scratch.byte_buffer.extend_from_slice(&large_data);
assert_eq!(scratch.byte_buffer.len(), 1_048_576);
let large_str: String = "A".repeat(524_288);
scratch.string_buffer.push_str(&large_str);
assert_eq!(scratch.string_buffer.len(), 524_288);
scratch.clear();
assert_eq!(scratch.byte_buffer.len(), 0);
assert_eq!(scratch.string_buffer.len(), 0);
assert!(scratch.byte_buffer.capacity() >= 1_048_576);
assert!(scratch.string_buffer.capacity() >= 524_288);
}
#[test]
fn test_scratch_buffers_digit_buffer_stack_to_heap() {
let mut scratch = ScratchBuffers::new();
for i in 0..32_u8 {
scratch.digit_buffer.push(i);
}
assert_eq!(scratch.digit_buffer.len(), 32);
assert!(!scratch.digit_buffer.spilled());
scratch.digit_buffer.push(32);
assert_eq!(scratch.digit_buffer.len(), 33);
assert!(scratch.digit_buffer.spilled());
scratch.clear();
assert_eq!(scratch.digit_buffer.len(), 0);
}
#[test]
fn test_scratch_buffers_send_across_threads() {
let scratch = ScratchBuffers::new();
let handle = std::thread::spawn(move || {
let mut s = scratch;
s.byte_buffer.extend_from_slice(b"thread");
s.clear();
s.byte_buffer.len()
});
assert_eq!(handle.join().unwrap(), 0);
}
#[test]
fn test_scratch_buffers_ensure_capacity_after_partial_fill() {
let mut scratch = ScratchBuffers::new();
scratch.byte_buffer.extend_from_slice(&[1; 512]);
scratch.ensure_byte_capacity(4096);
assert!(scratch.byte_buffer.capacity() >= 4096);
assert_eq!(scratch.byte_buffer.len(), 512);
scratch.string_buffer.push_str("abc");
scratch.ensure_string_capacity(2048);
assert!(scratch.string_buffer.capacity() >= 2048);
assert_eq!(&scratch.string_buffer, "abc");
}
#[test]
fn test_streaming_processor_saturating_underflow() {
let mut processor = StreamingProcessor::new(1);
processor.update_memory_usage(100);
processor.update_memory_usage(-500);
assert_eq!(processor.stats().current_memory_bytes, 0);
}
#[test]
fn test_streaming_processor_utilization_percent() {
let mut processor = StreamingProcessor::new(100);
processor.update_memory_usage(50 * 1024 * 1024); let stats = processor.stats();
assert_eq!(stats.memory_utilization_percent, 50);
processor.update_memory_usage(50 * 1024 * 1024); let stats = processor.stats();
assert_eq!(stats.memory_utilization_percent, 100);
}
#[test]
fn test_streaming_processor_pressure_boundary() {
let mut processor = StreamingProcessor::new(1); let limit = 1_048_576_usize;
let eighty_pct = limit * 80 / 100; processor.update_memory_usage(eighty_pct.cast_signed());
assert!(!processor.is_memory_pressure());
processor.update_memory_usage(1);
assert!(processor.is_memory_pressure());
}