#![cfg(test)]
use crate::{
connection::{BUFFER_SIZE, write_connection::WriteConnection},
test_utils::mock_socket::TestWriteHalf,
};
#[tokio::test]
async fn write() {
const WRITE_LEN: usize =
BUFFER_SIZE +
(BUFFER_SIZE - 1) +
2 +
1;
let mut write_conn = WriteConnection::new(TestWriteHalf::new(WRITE_LEN), 1);
let item: Vec<u8> = vec![0u8; BUFFER_SIZE];
#[cfg(feature = "std")]
write_conn.write(&item, vec![]).await.unwrap();
#[cfg(not(feature = "std"))]
write_conn.write(&item).await.unwrap();
assert_eq!(write_conn.buffer.len(), BUFFER_SIZE * 3);
assert_eq!(write_conn.pos, 0); }
#[tokio::test]
async fn enqueue_and_flush() {
let mut write_conn = WriteConnection::new(TestWriteHalf::new(5), 1);
#[cfg(feature = "std")]
{
write_conn.enqueue(&42u32, vec![]).unwrap();
write_conn.enqueue(&3u32, vec![]).unwrap();
}
#[cfg(not(feature = "std"))]
{
write_conn.enqueue(&42u32).unwrap();
write_conn.enqueue(&3u32).unwrap();
}
assert_eq!(write_conn.pos, 5);
write_conn.flush().await.unwrap();
assert_eq!(write_conn.pos, 0); }
#[tokio::test]
async fn enqueue_null_terminators() {
let mut write_conn = WriteConnection::new(TestWriteHalf::new(4), 1);
#[cfg(feature = "std")]
{
write_conn.enqueue(&1u32, vec![]).unwrap();
assert_eq!(write_conn.buffer[write_conn.pos - 1], b'\0');
write_conn.enqueue(&2u32, vec![]).unwrap();
assert_eq!(write_conn.buffer[write_conn.pos - 1], b'\0');
}
#[cfg(not(feature = "std"))]
{
write_conn.enqueue(&1u32).unwrap();
assert_eq!(write_conn.buffer[write_conn.pos - 1], b'\0');
write_conn.enqueue(&2u32).unwrap();
assert_eq!(write_conn.buffer[write_conn.pos - 1], b'\0');
}
write_conn.flush().await.unwrap();
}
#[tokio::test]
async fn enqueue_buffer_extension() {
let mut write_conn = WriteConnection::new(TestWriteHalf::new(0), 1);
let initial_len = write_conn.buffer.len();
let large_item: Vec<u8> = vec![0u8; BUFFER_SIZE];
#[cfg(feature = "std")]
write_conn.enqueue(&large_item, vec![]).unwrap();
#[cfg(not(feature = "std"))]
write_conn.enqueue(&large_item).unwrap();
assert!(write_conn.buffer.len() > initial_len);
}
#[tokio::test]
async fn flush_empty_buffer() {
let mut write_conn = WriteConnection::new(TestWriteHalf::new(0), 1);
write_conn.flush().await.unwrap();
assert_eq!(write_conn.pos, 0);
}
#[tokio::test]
async fn multiple_flushes() {
let mut write_conn = WriteConnection::new(TestWriteHalf::new(2), 1);
#[cfg(feature = "std")]
write_conn.enqueue(&1u32, vec![]).unwrap();
#[cfg(not(feature = "std"))]
write_conn.enqueue(&1u32).unwrap();
write_conn.flush().await.unwrap();
assert_eq!(write_conn.pos, 0);
write_conn.flush().await.unwrap();
assert_eq!(write_conn.pos, 0);
}
#[tokio::test]
async fn enqueue_after_flush() {
let mut write_conn = WriteConnection::new(TestWriteHalf::new(2), 1);
#[cfg(feature = "std")]
write_conn.enqueue(&1u32, vec![]).unwrap();
#[cfg(not(feature = "std"))]
write_conn.enqueue(&1u32).unwrap();
write_conn.flush().await.unwrap();
#[cfg(feature = "std")]
write_conn.enqueue(&2u32, vec![]).unwrap();
#[cfg(not(feature = "std"))]
write_conn.enqueue(&2u32).unwrap();
assert_eq!(write_conn.pos, 2);
write_conn.flush().await.unwrap();
assert_eq!(write_conn.pos, 0);
}
#[tokio::test]
async fn call_pipelining() {
use super::super::super::Call;
use serde::{Deserialize, Serialize};
#[derive(Debug, Serialize, Deserialize)]
struct TestMethod {
name: &'static str,
value: u32,
}
let mut write_conn = WriteConnection::new(TestWriteHalf::new(0), 1);
let call1 = Call::new(TestMethod {
name: "method1",
value: 1,
});
#[cfg(feature = "std")]
write_conn.enqueue_call(&call1, vec![]).unwrap();
#[cfg(not(feature = "std"))]
write_conn.enqueue_call(&call1).unwrap();
let call2 = Call::new(TestMethod {
name: "method2",
value: 2,
});
#[cfg(feature = "std")]
write_conn.enqueue_call(&call2, vec![]).unwrap();
#[cfg(not(feature = "std"))]
write_conn.enqueue_call(&call2).unwrap();
let call3 = Call::new(TestMethod {
name: "method3",
value: 3,
});
#[cfg(feature = "std")]
write_conn.enqueue_call(&call3, vec![]).unwrap();
#[cfg(not(feature = "std"))]
write_conn.enqueue_call(&call3).unwrap();
assert!(write_conn.pos > 0);
let buffer = &write_conn.buffer[..write_conn.pos];
let mut null_positions = [0usize; 3];
let mut null_count = 0;
for (i, &byte) in buffer.iter().enumerate() {
if byte == b'\0' {
assert!(null_count < 3, "Found more than 3 null terminators");
null_positions[null_count] = i;
null_count += 1;
}
}
assert_eq!(null_count, 3);
for i in 0..null_count {
let pos = null_positions[i];
assert!(
pos > 0,
"Null terminator at position {pos} should not be at start"
);
let preceding_byte = buffer[pos - 1];
assert!(
preceding_byte == b'}' || preceding_byte == b'"' || preceding_byte.is_ascii_digit(),
"Null terminator at position {pos} should be after valid JSON ending, found byte: {preceding_byte}"
);
}
assert_eq!(null_positions[2], write_conn.pos - 1);
}
#[tokio::test]
async fn pipelining_vs_individual_sends() {
use super::super::super::Call;
use serde::{Deserialize, Serialize};
#[derive(Debug, Serialize, Deserialize)]
struct TestMethod {
operation: &'static str,
id: u32,
}
use crate::test_utils::mock_socket::CountingWriteHalf;
let counting_write = CountingWriteHalf::new();
let mut write_conn_individual = WriteConnection::new(counting_write, 1);
for i in 1..=3 {
let call = Call::new(TestMethod {
operation: "fetch",
id: i,
});
#[cfg(feature = "std")]
write_conn_individual
.send_call(&call, vec![])
.await
.unwrap();
#[cfg(not(feature = "std"))]
write_conn_individual.send_call(&call).await.unwrap();
}
assert_eq!(write_conn_individual.socket.count(), 3);
let counting_write = CountingWriteHalf::new();
let mut write_conn_pipelined = WriteConnection::new(counting_write, 2);
for i in 1..=3 {
let call = Call::new(TestMethod {
operation: "fetch",
id: i,
});
#[cfg(feature = "std")]
write_conn_pipelined.enqueue_call(&call, vec![]).unwrap();
#[cfg(not(feature = "std"))]
write_conn_pipelined.enqueue_call(&call).unwrap();
}
write_conn_pipelined.flush().await.unwrap();
assert_eq!(write_conn_pipelined.socket.count(), 1);
}