#![cfg(feature = "async-tokio")]
#![allow(
clippy::approx_constant,
clippy::useless_vec,
clippy::len_zero,
clippy::unnecessary_cast,
clippy::redundant_closure,
clippy::too_many_arguments,
clippy::type_complexity,
clippy::needless_borrow,
clippy::enum_variant_names,
clippy::upper_case_acronyms,
clippy::inconsistent_digit_grouping,
clippy::unit_cmp,
clippy::assertions_on_constants,
clippy::iter_on_single_items,
clippy::expect_fun_call,
clippy::redundant_pattern_matching,
variant_size_differences,
clippy::absurd_extreme_comparisons,
clippy::nonminimal_bool,
clippy::for_kv_map,
clippy::needless_range_loop,
clippy::single_match,
clippy::collapsible_if,
clippy::needless_return,
clippy::redundant_clone,
clippy::map_entry,
clippy::match_single_binding,
clippy::bool_comparison,
clippy::derivable_impls,
clippy::manual_range_contains,
clippy::needless_borrows_for_generic_args,
clippy::manual_map,
clippy::vec_init_then_push,
clippy::identity_op,
clippy::manual_flatten,
clippy::single_char_pattern,
clippy::search_is_some,
clippy::option_map_unit_fn,
clippy::while_let_on_iterator,
clippy::clone_on_copy,
clippy::box_collection,
clippy::redundant_field_names,
clippy::ptr_arg,
clippy::large_enum_variant,
clippy::match_ref_pats,
clippy::needless_pass_by_value,
clippy::unused_unit,
clippy::let_and_return,
clippy::suspicious_else_formatting,
clippy::manual_strip,
clippy::match_like_matches_macro,
clippy::from_over_into,
clippy::wrong_self_convention,
clippy::inherent_to_string,
clippy::new_without_default,
clippy::unnecessary_wraps,
clippy::field_reassign_with_default,
clippy::manual_find,
clippy::unnecessary_lazy_evaluations,
clippy::should_implement_trait,
clippy::missing_safety_doc,
clippy::unusual_byte_groupings,
clippy::bool_assert_comparison,
clippy::zero_prefixed_literal,
clippy::await_holding_lock,
clippy::manual_saturating_arithmetic,
clippy::explicit_counter_loop,
clippy::needless_lifetimes,
clippy::single_component_path_imports,
clippy::uninlined_format_args,
clippy::iter_cloned_collect,
clippy::manual_str_repeat,
clippy::excessive_precision,
clippy::precedence,
clippy::unnecessary_literal_unwrap
)]
use oxicode::streaming::{AsyncStreamingDecoder, AsyncStreamingEncoder, StreamingConfig};
use oxicode::{
config, decode_from_slice, decode_from_slice_with_config, encode_to_vec,
encode_to_vec_with_config, Decode, Encode,
};
use std::io::Cursor;
#[derive(Debug, Clone, PartialEq, Encode, Decode)]
struct Record {
seq: u64,
label: String,
flags: u8,
}
#[derive(Debug, Clone, PartialEq, Encode, Decode)]
enum Command {
Noop,
Write { addr: u32, value: u16 },
Read { addr: u32 },
}
#[derive(Debug, Clone, PartialEq, Encode, Decode)]
struct Matrix2x2 {
a: f64,
b: f64,
c: f64,
d: f64,
}
#[tokio::test]
async fn test_async8_struct_record_roundtrip() {
let original = Record {
seq: 1_000_000,
label: "oxicode-record".to_string(),
flags: 0b1010_1010,
};
let mut buf = Vec::<u8>::new();
{
let cursor = Cursor::new(&mut buf);
let mut enc = AsyncStreamingEncoder::new(cursor);
enc.write_item(&original)
.await
.expect("write Record failed");
enc.finish().await.expect("finish failed");
}
let cursor = Cursor::new(buf);
let mut dec = AsyncStreamingDecoder::new(cursor);
let decoded: Option<Record> = dec.read_item().await.expect("read Record failed");
assert_eq!(decoded, Some(original));
}
#[tokio::test]
async fn test_async8_enum_command_all_variants_roundtrip() {
let commands = vec![
Command::Noop,
Command::Write {
addr: 0xDEAD,
value: 0xBEEF,
},
Command::Read { addr: 0xCAFE },
];
let mut buf = Vec::<u8>::new();
{
let cursor = Cursor::new(&mut buf);
let mut enc = AsyncStreamingEncoder::new(cursor);
for cmd in &commands {
enc.write_item(cmd).await.expect("write Command failed");
}
enc.finish().await.expect("finish failed");
}
let cursor = Cursor::new(buf);
let mut dec = AsyncStreamingDecoder::new(cursor);
let decoded: Vec<Command> = dec.read_all().await.expect("read_all Commands failed");
assert_eq!(decoded, commands);
}
#[tokio::test]
async fn test_async8_struct_matrix2x2_roundtrip() {
let original = Matrix2x2 {
a: 1.0,
b: 0.0,
c: 0.0,
d: 1.0,
};
let mut buf = Vec::<u8>::new();
{
let cursor = Cursor::new(&mut buf);
let mut enc = AsyncStreamingEncoder::new(cursor);
enc.write_item(&original)
.await
.expect("write Matrix2x2 failed");
enc.finish().await.expect("finish failed");
}
let cursor = Cursor::new(buf);
let mut dec = AsyncStreamingDecoder::new(cursor);
let decoded: Option<Matrix2x2> = dec.read_item().await.expect("read Matrix2x2 failed");
assert_eq!(decoded, Some(original));
}
#[tokio::test]
async fn test_async8_large_vec_struct_roundtrip() {
let records: Vec<Record> = (0u64..500)
.map(|i| Record {
seq: i,
label: format!("label-{}", i),
flags: (i % 256) as u8,
})
.collect();
let mut buf = Vec::<u8>::new();
{
let cursor = Cursor::new(&mut buf);
let mut enc = AsyncStreamingEncoder::new(cursor);
for rec in &records {
enc.write_item(rec).await.expect("write Record failed");
}
enc.finish().await.expect("finish failed");
}
let cursor = Cursor::new(buf);
let mut dec = AsyncStreamingDecoder::new(cursor);
let decoded: Vec<Record> = dec.read_all().await.expect("read_all Records failed");
assert_eq!(decoded.len(), 500);
assert_eq!(decoded, records);
}
#[tokio::test]
async fn test_async8_i64_boundary_values_roundtrip() {
let values: Vec<i64> = vec![i64::MIN, -1, 0, 1, i64::MAX];
let mut buf = Vec::<u8>::new();
{
let cursor = Cursor::new(&mut buf);
let mut enc = AsyncStreamingEncoder::new(cursor);
for &v in &values {
enc.write_item(&v).await.expect("write i64 failed");
}
enc.finish().await.expect("finish failed");
}
let cursor = Cursor::new(buf);
let mut dec = AsyncStreamingDecoder::new(cursor);
let decoded: Vec<i64> = dec.read_all().await.expect("read_all i64 failed");
assert_eq!(decoded, values);
}
#[tokio::test]
async fn test_async8_config_flush_per_item_struct_roundtrip() {
let config = StreamingConfig::new().with_flush_per_item(true);
let items: Vec<Record> = (0u64..10)
.map(|i| Record {
seq: i,
label: format!("flush-{}", i),
flags: i as u8,
})
.collect();
let mut buf = Vec::<u8>::new();
{
let cursor = Cursor::new(&mut buf);
let mut enc = AsyncStreamingEncoder::with_config(cursor, config);
for item in &items {
enc.write_item(item)
.await
.expect("write with flush_per_item failed");
}
enc.finish().await.expect("finish failed");
}
let cursor = Cursor::new(buf);
let mut dec = AsyncStreamingDecoder::new(cursor);
let decoded: Vec<Record> = dec.read_all().await.expect("read_all failed");
assert_eq!(decoded, items);
}
#[tokio::test]
async fn test_async8_fixed_int_config_u32_is_4_bytes() {
let value: u32 = 0x0102_0304;
let fixed_bytes =
encode_to_vec_with_config(&value, config::standard().with_fixed_int_encoding())
.expect("fixed-int encode failed");
assert_eq!(
fixed_bytes.len(),
4,
"fixed-int u32 must be exactly 4 bytes"
);
let (decoded, _) = decode_from_slice_with_config::<u32, _>(
&fixed_bytes,
config::standard().with_fixed_int_encoding(),
)
.expect("fixed-int decode failed");
assert_eq!(decoded, value);
let mut buf = Vec::<u8>::new();
{
let cursor = Cursor::new(&mut buf);
let mut enc = AsyncStreamingEncoder::new(cursor);
enc.write_item(&value)
.await
.expect("async write u32 failed");
enc.finish().await.expect("finish failed");
}
let cursor = Cursor::new(buf);
let mut dec = AsyncStreamingDecoder::new(cursor);
let async_decoded: Option<u32> = dec.read_item().await.expect("async read u32 failed");
assert_eq!(async_decoded, Some(value));
}
#[tokio::test]
async fn test_async8_big_endian_config_roundtrip() {
let value: u32 = 0x01_02_03_04;
let be_bytes = encode_to_vec_with_config(&value, config::standard().with_big_endian())
.expect("big-endian encode failed");
let (be_decoded, _) =
decode_from_slice_with_config::<u32, _>(&be_bytes, config::standard().with_big_endian())
.expect("big-endian decode failed");
assert_eq!(be_decoded, value, "big-endian roundtrip must match");
let mut buf = Vec::<u8>::new();
{
let cursor = Cursor::new(&mut buf);
let mut enc = AsyncStreamingEncoder::new(cursor);
enc.write_item(&value).await.expect("async write failed");
enc.finish().await.expect("finish failed");
}
let cursor = Cursor::new(buf);
let mut dec = AsyncStreamingDecoder::new(cursor);
let async_decoded: Option<u32> = dec.read_item().await.expect("async read failed");
assert_eq!(async_decoded, Some(value));
}
#[tokio::test]
async fn test_async8_small_chunk_size_forces_multiple_chunks_structs() {
let config = StreamingConfig::new().with_chunk_size(1024);
let items: Vec<Record> = (0u64..100)
.map(|i| Record {
seq: i,
label: format!("chunk-record-{:05}", i),
flags: (i % 256) as u8,
})
.collect();
let mut buf = Vec::<u8>::new();
{
let cursor = Cursor::new(&mut buf);
let mut enc = AsyncStreamingEncoder::with_config(cursor, config);
for item in &items {
enc.write_item(item).await.expect("write Record failed");
}
enc.finish().await.expect("finish failed");
}
let cursor = Cursor::new(buf);
let mut dec = AsyncStreamingDecoder::new(cursor);
let decoded: Vec<Record> = dec.read_all().await.expect("read_all failed");
assert_eq!(decoded, items);
assert!(
dec.progress().chunks_processed > 1,
"expected more than one chunk"
);
}
#[tokio::test]
async fn test_async8_large_chunk_size_single_chunk_many_u64() {
let config = StreamingConfig::new().with_chunk_size(1024 * 1024);
let values: Vec<u64> = (0u64..3_000).collect();
let mut buf = Vec::<u8>::new();
{
let cursor = Cursor::new(&mut buf);
let mut enc = AsyncStreamingEncoder::with_config(cursor, config);
for &v in &values {
enc.write_item(&v).await.expect("write u64 failed");
}
enc.finish().await.expect("finish failed");
}
let cursor = Cursor::new(buf);
let mut dec = AsyncStreamingDecoder::new(cursor);
let decoded: Vec<u64> = dec.read_all().await.expect("read_all failed");
assert_eq!(decoded, values);
assert_eq!(
dec.progress().chunks_processed,
1,
"expected exactly one chunk"
);
}
#[tokio::test]
async fn test_async8_cursor_in_memory_encode_decode() {
let values: Vec<u32> = vec![7, 14, 21, 28, 35, 42];
let mut buf = Vec::<u8>::new();
{
let cursor = Cursor::new(&mut buf);
let mut enc = AsyncStreamingEncoder::new(cursor);
for &v in &values {
enc.write_item(&v).await.expect("write u32 failed");
}
enc.finish().await.expect("finish failed");
}
let cursor = Cursor::new(buf);
let mut dec = AsyncStreamingDecoder::new(cursor);
let decoded: Vec<u32> = dec.read_all().await.expect("read_all from cursor failed");
assert_eq!(decoded, values);
assert!(dec.is_finished());
}
#[tokio::test]
async fn test_async8_two_independent_in_memory_streams() {
let stream_a_values: Vec<u32> = vec![1, 2, 3];
let stream_b_values: Vec<u64> = vec![100, 200, 300, 400];
let mut buf_a = Vec::<u8>::new();
{
let cursor = Cursor::new(&mut buf_a);
let mut enc = AsyncStreamingEncoder::new(cursor);
for &v in &stream_a_values {
enc.write_item(&v).await.expect("write stream A failed");
}
enc.finish().await.expect("finish stream A failed");
}
let mut buf_b = Vec::<u8>::new();
{
let cursor = Cursor::new(&mut buf_b);
let mut enc = AsyncStreamingEncoder::new(cursor);
for &v in &stream_b_values {
enc.write_item(&v).await.expect("write stream B failed");
}
enc.finish().await.expect("finish stream B failed");
}
assert_ne!(
buf_a, buf_b,
"separate streams must produce different bytes"
);
let cursor_a = Cursor::new(buf_a);
let mut dec_a = AsyncStreamingDecoder::new(cursor_a);
let decoded_a: Vec<u32> = dec_a.read_all().await.expect("read_all stream A failed");
assert_eq!(decoded_a, stream_a_values);
let cursor_b = Cursor::new(buf_b);
let mut dec_b = AsyncStreamingDecoder::new(cursor_b);
let decoded_b: Vec<u64> = dec_b.read_all().await.expect("read_all stream B failed");
assert_eq!(decoded_b, stream_b_values);
}
#[tokio::test]
async fn test_async8_progress_bytes_processed_grows() {
let values: Vec<u64> = (1u64..=20).collect();
let mut buf = Vec::<u8>::new();
{
let cursor = Cursor::new(&mut buf);
let mut enc = AsyncStreamingEncoder::new(cursor);
for &v in &values {
enc.write_item(&v).await.expect("write u64 failed");
}
enc.finish().await.expect("finish failed");
}
let cursor = Cursor::new(buf);
let mut dec = AsyncStreamingDecoder::new(cursor);
let _first: Option<u64> = dec.read_item().await.expect("read first failed");
let bytes_after_first = dec.progress().bytes_processed;
assert!(
bytes_after_first > 0,
"bytes_processed must be > 0 after first read"
);
let _rest: Vec<u64> = dec.read_all().await.expect("read_all failed");
let bytes_after_all = dec.progress().bytes_processed;
assert!(
bytes_after_all > bytes_after_first,
"bytes_processed must grow after reading more items"
);
assert_eq!(dec.progress().items_processed, 20);
}
#[tokio::test]
async fn test_async8_get_ref_after_partial_decode() {
let values: Vec<u32> = vec![10, 20, 30];
let mut buf = Vec::<u8>::new();
{
let cursor = Cursor::new(&mut buf);
let mut enc = AsyncStreamingEncoder::new(cursor);
for &v in &values {
enc.write_item(&v).await.expect("write u32 failed");
}
enc.finish().await.expect("finish failed");
}
let cursor = Cursor::new(buf);
let mut dec = AsyncStreamingDecoder::new(cursor);
let first: Option<u32> = dec.read_item().await.expect("read first failed");
assert_eq!(first, Some(10));
let cursor_ref = dec.get_ref();
assert!(cursor_ref.position() > 0, "cursor must have advanced");
}
#[tokio::test]
async fn test_async8_write_all_encodes_multiple_items() {
let values: Vec<u32> = (0..50_u32).collect();
let mut buf = Vec::<u8>::new();
{
let cursor = Cursor::new(&mut buf);
let mut enc = AsyncStreamingEncoder::new(cursor);
enc.write_all(values.iter().copied())
.await
.expect("write_all failed");
enc.finish().await.expect("finish failed");
}
let cursor = Cursor::new(buf);
let mut dec = AsyncStreamingDecoder::new(cursor);
let decoded: Vec<u32> = dec.read_all().await.expect("read_all failed");
assert_eq!(decoded, values);
assert_eq!(dec.progress().items_processed, 50);
}
#[tokio::test]
async fn test_async8_interleaved_write_batches_decode_correctly() {
let first_batch: Vec<u32> = (0..10_u32).collect();
let second_batch: Vec<u32> = (100..110_u32).collect();
let mut buf = Vec::<u8>::new();
{
let cursor = Cursor::new(&mut buf);
let mut enc = AsyncStreamingEncoder::new(cursor);
enc.write_all(first_batch.iter().copied())
.await
.expect("first batch failed");
enc.write_all(second_batch.iter().copied())
.await
.expect("second batch failed");
enc.finish().await.expect("finish failed");
}
let cursor = Cursor::new(buf);
let mut dec = AsyncStreamingDecoder::new(cursor);
let decoded: Vec<u32> = dec.read_all().await.expect("read_all failed");
let mut expected = first_batch;
expected.extend(second_batch);
assert_eq!(decoded, expected);
}
#[tokio::test]
async fn test_async8_read_items_one_by_one_verify_each() {
let values: Vec<u64> = vec![0xAA, 0xBB, 0xCC, 0xDD, 0xEE, 0xFF];
let mut buf = Vec::<u8>::new();
{
let cursor = Cursor::new(&mut buf);
let mut enc = AsyncStreamingEncoder::new(cursor);
for &v in &values {
enc.write_item(&v).await.expect("write u64 failed");
}
enc.finish().await.expect("finish failed");
}
let cursor = Cursor::new(buf);
let mut dec = AsyncStreamingDecoder::new(cursor);
for &expected in &values {
let item: Option<u64> = dec.read_item().await.expect("read u64 failed");
assert_eq!(item, Some(expected), "mismatch at value 0x{:02X}", expected);
}
let eof: Option<u64> = dec.read_item().await.expect("eof read failed");
assert_eq!(eof, None);
assert!(dec.is_finished());
}
#[tokio::test]
async fn test_async8_decode_all_zeros_returns_error_or_eof() {
let zeros: Vec<u8> = vec![0u8; 32];
let cursor = Cursor::new(zeros);
let mut dec: AsyncStreamingDecoder<Cursor<Vec<u8>>> = AsyncStreamingDecoder::new(cursor);
let result: Result<Option<u32>, _> = dec.read_item().await;
match result {
Ok(None) => {} Ok(Some(_)) => panic!("must not decode a valid item from all-zeros buffer"),
Err(_) => {} }
}
#[tokio::test]
async fn test_async8_truncated_to_single_byte_returns_error() {
let truncated: Vec<u8> = vec![0x4F];
let cursor = Cursor::new(truncated);
let mut dec: AsyncStreamingDecoder<Cursor<Vec<u8>>> = AsyncStreamingDecoder::new(cursor);
let result: Result<Option<u32>, _> = dec.read_item().await;
match result {
Ok(None) => {} Ok(Some(_)) => panic!("must not decode from a single-byte truncated stream"),
Err(_) => {} }
}
#[tokio::test]
async fn test_async8_decode_random_junk_bytes_is_error() {
let junk: Vec<u8> = (1u8..=32).collect();
let cursor = Cursor::new(junk);
let mut dec: AsyncStreamingDecoder<Cursor<Vec<u8>>> = AsyncStreamingDecoder::new(cursor);
let result: Result<Option<u32>, _> = dec.read_item().await;
match result {
Ok(None) => {} Ok(Some(_)) => panic!("must not decode a valid item from junk bytes"),
Err(_) => {} }
}
#[tokio::test]
async fn test_async8_async_encode_sync_decode_integration() {
let original = Record {
seq: 42,
label: "integration-sync-decode".to_string(),
flags: 0xFF,
};
let mut stream_buf = Vec::<u8>::new();
{
let cursor = Cursor::new(&mut stream_buf);
let mut enc = AsyncStreamingEncoder::new(cursor);
enc.write_item(&original).await.expect("async write failed");
enc.finish().await.expect("finish failed");
}
let cursor = Cursor::new(stream_buf);
let mut dec = AsyncStreamingDecoder::new(cursor);
let decoded_async: Option<Record> = dec.read_item().await.expect("async read failed");
assert_eq!(decoded_async, Some(original.clone()));
let sync_bytes = encode_to_vec(&original).expect("sync encode_to_vec failed");
let (decoded_sync, _): (Record, _) =
decode_from_slice(&sync_bytes).expect("sync decode_from_slice failed");
assert_eq!(decoded_sync, original);
}
#[tokio::test]
async fn test_async8_sync_encode_async_decode_integration() {
let original = Record {
seq: 9999,
label: "integration-async-decode".to_string(),
flags: 0x0F,
};
let sync_bytes = encode_to_vec(&original).expect("sync encode_to_vec failed");
let (sync_decoded, _): (Record, _) =
decode_from_slice(&sync_bytes).expect("sync decode_from_slice failed");
assert_eq!(sync_decoded, original);
let mut async_buf = Vec::<u8>::new();
{
let cursor = Cursor::new(&mut async_buf);
let mut enc = AsyncStreamingEncoder::new(cursor);
enc.write_item(&original).await.expect("async write failed");
enc.finish().await.expect("finish failed");
}
let cursor = Cursor::new(async_buf);
let mut dec = AsyncStreamingDecoder::new(cursor);
let decoded_async: Option<Record> = dec.read_item().await.expect("async read failed");
assert_eq!(decoded_async, Some(original));
assert!(
dec.is_finished()
|| dec
.read_item::<Record>()
.await
.expect("eof read failed")
.is_none()
);
}