use orb::AsyncRuntime;
use orb::io::AsyncBufStream;
use orb::prelude::*;
use orb_smol::SmolRT;
use std::future::Future;
use std::io;
use std::sync::{Arc, Mutex};
#[derive(Debug)]
enum MockReadBehavior {
Chunked(Vec<Vec<u8>>),
Randomized { data: Vec<u8>, pos: usize },
}
#[derive(Debug)]
struct MockReadStream {
read_behavior: MockReadBehavior,
}
impl MockReadStream {
fn new_chunked_reader(chunks: Vec<Vec<u8>>) -> Self {
Self { read_behavior: MockReadBehavior::Chunked(chunks) }
}
fn new_chunked_reader_deterministic(chunks: Vec<Vec<u8>>) -> Self {
Self { read_behavior: MockReadBehavior::Chunked(chunks) }
}
fn new_randomized_reader(data: Vec<u8>) -> Self {
Self { read_behavior: MockReadBehavior::Randomized { data, pos: 0 } }
}
}
impl AsyncRead for MockReadStream {
fn read(&mut self, buf: &mut [u8]) -> impl Future<Output = io::Result<usize>> + Send {
async move {
match &mut self.read_behavior {
MockReadBehavior::Chunked(chunks) => {
if chunks.is_empty() {
return Ok(0);
}
let chunk = chunks.remove(0);
let n = std::cmp::min(buf.len(), chunk.len());
buf[..n].copy_from_slice(&chunk[..n]);
Ok(n)
}
MockReadBehavior::Randomized { data, pos } => {
if *pos >= data.len() {
return Ok(0); }
let remaining = data.len() - *pos;
let max_read = std::cmp::min(buf.len(), remaining);
if max_read == 0 {
return Ok(0);
}
let read_size = fastrand::usize(1..=max_read);
buf[..read_size].copy_from_slice(&data[*pos..*pos + read_size]);
*pos += read_size;
Ok(read_size)
}
}
}
}
}
#[derive(Debug)]
struct MockWriteStream {
write_buffer: Arc<Mutex<Vec<u8>>>,
deterministic: bool, }
impl MockWriteStream {
fn new(write_buffer: Arc<Mutex<Vec<u8>>>, deterministic: bool) -> Self {
Self { write_buffer, deterministic }
}
}
impl AsyncRead for MockWriteStream {
fn read(&mut self, _buf: &mut [u8]) -> impl Future<Output = io::Result<usize>> + Send {
async move {
Ok(0)
}
}
}
impl AsyncWrite for MockWriteStream {
fn write(&mut self, buf: &[u8]) -> impl Future<Output = io::Result<usize>> + Send {
async move {
if buf.is_empty() {
return Ok(0);
}
let n = if self.deterministic {
buf.len()
} else {
if fastrand::bool() { fastrand::usize(1..=buf.len()) } else { buf.len() }
};
self.write_buffer.lock().unwrap().extend_from_slice(&buf[..n]);
Ok(n)
}
}
}
#[test]
fn test_async_read_exact_fixed_chunks() {
SmolRT::current().block_on(async {
let data_size = 2048;
let mut source_data = vec![0u8; data_size];
for i in 0..data_size {
source_data[i] = (i % 256) as u8;
}
let chunks = vec![
source_data[0..512].to_vec(),
source_data[512..1024].to_vec(),
source_data[1024..1536].to_vec(),
source_data[1536..2048].to_vec(),
];
let mut read_stream = MockReadStream::new_chunked_reader_deterministic(chunks);
let mut out = vec![0u8; data_size];
read_stream.read_exact(&mut out).await.unwrap();
assert_eq!(out, source_data);
});
}
#[test]
fn test_async_read_bypass_fixed() {
SmolRT::current().block_on(async {
let data_size = 300; let mut source_data = vec![0u8; data_size];
for i in 0..data_size {
source_data[i] = (i % 256) as u8;
}
let mut read_stream =
MockReadStream::new_chunked_reader_deterministic(vec![source_data.clone()]);
let mut out = vec![0u8; data_size];
read_stream.read_exact(&mut out).await.unwrap();
assert_eq!(out, source_data);
});
}
#[test]
fn test_async_read_multiple_reads_fixed() {
SmolRT::current().block_on(async {
let chunk1_data = vec![1u8; 100];
let chunk2_data = vec![2u8; 100];
let chunks = vec![chunk1_data.clone(), chunk2_data.clone()];
let mut read_stream = MockReadStream::new_chunked_reader_deterministic(chunks);
let mut out1 = vec![0u8; 100];
read_stream.read_exact(&mut out1).await.unwrap();
assert_eq!(out1, chunk1_data);
let mut out2 = vec![0u8; 100];
read_stream.read_exact(&mut out2).await.unwrap();
assert_eq!(out2, chunk2_data);
});
}
#[test]
fn test_async_write_all_buffering_deterministic() {
SmolRT::current().block_on(async {
let data_handle = Arc::new(Mutex::new(Vec::new()));
let mock_stream = MockWriteStream::new(data_handle.clone(), true); let mut writer = AsyncBufStream::new(mock_stream, 8);
writer.write_all(b"hello").await.unwrap();
{
assert!(data_handle.lock().unwrap().is_empty()); }
writer.write_all(b" wo").await.unwrap(); {
assert!(data_handle.lock().unwrap().is_empty()); }
writer.write_all(b"rld").await.unwrap(); {
assert_eq!(*data_handle.lock().unwrap(), b"hello wo");
}
writer.flush().await.unwrap();
{
assert_eq!(*data_handle.lock().unwrap(), b"hello world");
}
});
}
#[test]
fn test_async_write_bypass_deterministic() {
SmolRT::current().block_on(async {
let data_handle = Arc::new(Mutex::new(Vec::new()));
let mock_stream = MockWriteStream::new(data_handle.clone(), true); let mut writer = AsyncBufStream::new(mock_stream, 8);
writer.write_all(b"abc").await.unwrap();
{
assert!(data_handle.lock().unwrap().is_empty()); }
writer.write_all(b"this is a long line").await.unwrap();
{
assert_eq!(*data_handle.lock().unwrap(), b"abcthis is a long line");
}
writer.flush().await.unwrap();
{
assert_eq!(*data_handle.lock().unwrap(), b"abcthis is a long line");
}
});
}
#[test]
fn test_async_read_exact_random_chunks() {
SmolRT::current().block_on(async {
let data_size = fastrand::usize(1024..4096);
let mut source_data = vec![0u8; data_size];
fastrand::fill(&mut source_data);
let mut chunks = Vec::new();
let mut remaining_data = &source_data[..];
while !remaining_data.is_empty() {
let chunk_size = fastrand::usize(1..128).min(remaining_data.len());
chunks.push(remaining_data[..chunk_size].to_vec());
remaining_data = &remaining_data[chunk_size..];
}
let mut read_stream = MockReadStream::new_chunked_reader(chunks);
let mut out = vec![0u8; data_size];
read_stream.read_exact(&mut out).await.unwrap();
assert_eq!(out, source_data);
});
}
#[test]
fn test_async_read_bypass_random() {
SmolRT::current().block_on(async {
let data_size = fastrand::usize(257..512);
let mut source_data = vec![0u8; data_size];
fastrand::fill(&mut source_data);
let mut read_stream = MockReadStream::new_chunked_reader(vec![source_data.clone()]);
let mut out = vec![0u8; data_size];
read_stream.read_exact(&mut out).await.unwrap();
assert_eq!(out, source_data);
});
}
#[test]
fn test_async_read_multiple_reads_random() {
SmolRT::current().block_on(async {
let chunk1_size = fastrand::usize(64..128);
let mut chunk1_data = vec![0u8; chunk1_size];
fastrand::fill(&mut chunk1_data);
let chunk2_size = fastrand::usize(64..128);
let mut chunk2_data = vec![0u8; chunk2_size];
fastrand::fill(&mut chunk2_data);
let chunks = vec![chunk1_data.clone(), chunk2_data.clone()];
let mut read_stream = MockReadStream::new_chunked_reader(chunks);
let mut out1 = vec![0u8; chunk1_size];
read_stream.read_exact(&mut out1).await.unwrap();
assert_eq!(out1, chunk1_data);
let mut out2 = vec![0u8; chunk2_size];
read_stream.read_exact(&mut out2).await.unwrap();
assert_eq!(out2, chunk2_data);
});
}
#[test]
fn test_random_read_sizes_and_returns() {
SmolRT::current().block_on(async {
let data_size = fastrand::usize(8192..16384);
let mut source_data = vec![0u8; data_size];
fastrand::fill(&mut source_data);
let mut read_stream = MockReadStream::new_randomized_reader(source_data.clone());
let mut result_data = Vec::with_capacity(data_size);
while result_data.len() < data_size {
let read_size = fastrand::usize(1..=512);
let mut temp_buf = vec![0u8; read_size];
match read_stream.read(&mut temp_buf).await {
Ok(0) => break, Ok(n) => {
result_data.extend_from_slice(&temp_buf[..n]);
}
Err(e) if e.kind() == io::ErrorKind::Interrupted => continue,
Err(e) => panic!("Read failed: {}", e),
}
}
assert_eq!(result_data.len(), data_size);
assert_eq!(result_data, source_data);
});
}
#[test]
fn test_async_write_all_buffering_random() {
SmolRT::current().block_on(async {
let data_handle = Arc::new(Mutex::new(Vec::new()));
let mock_stream = MockWriteStream::new(data_handle.clone(), false); let mut writer = AsyncBufStream::new(mock_stream, 8);
writer.write_all(b"hello").await.unwrap();
{
assert!(data_handle.lock().unwrap().is_empty()); }
writer.write_all(b" wo").await.unwrap(); {
assert!(data_handle.lock().unwrap().is_empty()); }
writer.write_all(b"rld").await.unwrap(); {
assert_eq!(*data_handle.lock().unwrap(), b"hello wo");
}
writer.flush().await.unwrap();
{
assert_eq!(*data_handle.lock().unwrap(), b"hello world");
}
});
}
#[test]
fn test_async_write_bypass_random() {
SmolRT::current().block_on(async {
let data_handle = Arc::new(Mutex::new(Vec::new()));
let mock_stream = MockWriteStream::new(data_handle.clone(), false); let mut writer = AsyncBufStream::new(mock_stream, 8);
writer.write_all(b"abc").await.unwrap();
{
assert!(data_handle.lock().unwrap().is_empty()); }
writer.write_all(b"this is a long line").await.unwrap();
writer.flush().await.unwrap();
{
assert_eq!(*data_handle.lock().unwrap(), b"abcthis is a long line");
}
});
}