use bytes::{BufMut, Bytes, BytesMut};
use serde::{Deserialize, Serialize};
use serde_json::value::RawValue;
use std::fmt;
use std::path::Path;
use std::sync::Arc;
use uuid::Uuid;
use crate::types::{ContentType, Timestamp};
use crate::{McpError as Error, Result};
#[derive(Debug, Clone)]
pub struct ZeroCopyMessage {
pub id: Arc<MessageId>,
pub payload: Bytes,
pub lazy_json: Option<Box<RawValue>>,
pub metadata: MessageMetadata,
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum MessageId {
String(Arc<str>),
Number(i64),
Uuid(Uuid),
}
#[derive(Debug, Clone)]
pub struct MessageMetadata {
pub created_at: Timestamp,
pub content_type: ContentType,
pub size: usize,
pub correlation_id: Option<Arc<str>>,
}
impl ZeroCopyMessage {
#[inline]
pub fn from_bytes(id: MessageId, payload: Bytes) -> Self {
let size = payload.len();
Self {
id: Arc::new(id),
payload: payload.clone(),
lazy_json: None,
metadata: MessageMetadata {
created_at: Timestamp::now(),
content_type: ContentType::Json,
size,
correlation_id: None,
},
}
}
pub fn from_json<T: Serialize>(id: MessageId, value: &T) -> Result<Self> {
let mut buffer = BytesMut::with_capacity(1024);
serde_json::to_writer((&mut buffer).writer(), value)
.map_err(|e| Error::serialization(e.to_string()))?;
let payload = buffer.freeze();
let size = payload.len();
Ok(Self {
id: Arc::new(id),
payload,
lazy_json: None,
metadata: MessageMetadata {
created_at: Timestamp::now(),
content_type: ContentType::Json,
size,
correlation_id: None,
},
})
}
#[inline]
pub fn parse_json_lazy(&mut self) -> Result<&RawValue> {
if self.lazy_json.is_none() {
let raw: Box<RawValue> = serde_json::from_slice(&self.payload)
.map_err(|e| Error::serialization(format!("JSON parse error: {}", e)))?;
self.lazy_json = Some(raw);
}
Ok(self.lazy_json.as_ref().unwrap())
}
#[inline]
pub fn deserialize<T: for<'de> Deserialize<'de>>(&self) -> Result<T> {
#[cfg(feature = "simd")]
{
sonic_rs::from_slice(&self.payload)
.map_err(|e| Error::serialization(format!("SIMD deserialize error: {e}")))
}
#[cfg(not(feature = "simd"))]
{
serde_json::from_slice(&self.payload)
.map_err(|e| Error::serialization(format!("Deserialization error: {e}")))
}
}
#[inline]
pub fn payload_slice(&self) -> &[u8] {
&self.payload
}
#[inline]
pub fn cheap_clone(&self) -> Self {
Self {
id: Arc::clone(&self.id),
payload: self.payload.clone(), lazy_json: self.lazy_json.clone(),
metadata: self.metadata.clone(),
}
}
}
#[derive(Debug)]
pub struct BufferPool {
buffers: crossbeam::queue::ArrayQueue<BytesMut>,
capacity: usize,
}
impl BufferPool {
pub fn new(size: usize, capacity: usize) -> Self {
let buffers = crossbeam::queue::ArrayQueue::new(size);
for _ in 0..size {
let _ = buffers.push(BytesMut::with_capacity(capacity));
}
Self { buffers, capacity }
}
#[inline]
pub fn acquire(&self) -> BytesMut {
self.buffers
.pop()
.unwrap_or_else(|| BytesMut::with_capacity(self.capacity))
}
#[inline]
pub fn release(&self, mut buffer: BytesMut) {
buffer.clear();
let _ = self.buffers.push(buffer);
}
}
#[derive(Debug)]
pub struct MessageBatch {
pub buffer: Bytes,
pub messages: Vec<(usize, usize)>,
pub ids: Vec<Arc<MessageId>>,
}
impl MessageBatch {
pub fn new(capacity: usize) -> Self {
Self {
buffer: Bytes::new(),
messages: Vec::with_capacity(capacity),
ids: Vec::with_capacity(capacity),
}
}
pub fn add(&mut self, id: MessageId, payload: Bytes) {
let offset = self.buffer.len();
let length = payload.len();
let mut buffer = BytesMut::from(self.buffer.as_ref());
buffer.extend_from_slice(&payload);
self.buffer = buffer.freeze();
self.messages.push((offset, length));
self.ids.push(Arc::new(id));
}
#[inline]
pub fn get(&self, index: usize) -> Option<Bytes> {
self.messages
.get(index)
.map(|(offset, length)| self.buffer.slice(*offset..*offset + *length))
}
pub fn iter(&self) -> impl Iterator<Item = (&Arc<MessageId>, Bytes)> + '_ {
self.ids
.iter()
.zip(self.messages.iter())
.map(move |(id, (offset, length))| (id, self.buffer.slice(*offset..*offset + *length)))
}
}
pub mod fast {
#[inline]
pub fn validate_utf8_fast(bytes: &[u8]) -> bool {
#[cfg(feature = "simd")]
{
if bytes.len() >= 64 {
simdutf8::basic::from_utf8(bytes).is_ok()
} else {
std::str::from_utf8(bytes).is_ok()
}
}
#[cfg(not(feature = "simd"))]
{
std::str::from_utf8(bytes).is_ok()
}
}
#[inline]
pub fn find_json_boundaries(bytes: &[u8]) -> Vec<usize> {
let mut boundaries = Vec::new();
let mut depth = 0;
let mut in_string = false;
let mut escaped = false;
for (i, &byte) in bytes.iter().enumerate() {
if escaped {
escaped = false;
continue;
}
match byte {
b'\\' if in_string => escaped = true,
b'"' if !escaped => in_string = !in_string,
b'{' | b'[' if !in_string => depth += 1,
b'}' | b']' if !in_string => {
depth -= 1;
if depth == 0 {
boundaries.push(i + 1);
}
}
_ => {}
}
}
boundaries
}
#[cfg(feature = "simd")]
#[inline]
pub fn validate_json_fast(bytes: &[u8]) -> bool {
sonic_rs::from_slice::<sonic_rs::Value>(bytes).is_ok()
}
#[cfg(not(feature = "simd"))]
#[inline]
pub fn validate_json_fast(bytes: &[u8]) -> bool {
serde_json::from_slice::<serde_json::Value>(bytes).is_ok()
}
}
#[cfg(feature = "mmap")]
pub mod mmap {
use super::*;
use memmap2::{Mmap, MmapOptions};
use std::fs::File;
use std::io;
use std::ops::Deref;
#[derive(Debug)]
pub struct MmapMessage {
pub id: Arc<MessageId>,
pub mmap: Arc<Mmap>,
pub offset: usize,
pub length: usize,
pub metadata: MessageMetadata,
}
impl MmapMessage {
pub fn from_file(
id: MessageId,
path: &Path,
offset: usize,
length: Option<usize>,
) -> io::Result<Self> {
let file = File::open(path)?;
let metadata = file.metadata()?;
let file_size = metadata.len() as usize;
if offset >= file_size {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
"Offset exceeds file size",
));
}
let actual_length = length.unwrap_or(file_size - offset);
let actual_length = actual_length.min(file_size - offset);
let mmap = unsafe { MmapOptions::new().map(&file)? };
Ok(Self {
id: Arc::new(id),
mmap: Arc::new(mmap),
offset,
length: actual_length,
metadata: MessageMetadata {
created_at: Timestamp::now(),
content_type: ContentType::Json,
size: actual_length,
correlation_id: None,
},
})
}
pub async fn from_file_async(
id: MessageId,
path: &std::path::Path,
offset: usize,
length: Option<usize>,
) -> std::io::Result<Self> {
let path = path.to_path_buf();
tokio::task::spawn_blocking(move || Self::from_file(id, &path, offset, length))
.await
.map_err(|join_err| {
std::io::Error::other(format!("Async mmap operation failed: {}", join_err))
})?
}
#[inline]
pub fn data(&self) -> &[u8] {
&self.mmap[self.offset..self.offset + self.length]
}
#[inline]
pub fn to_bytes(&self) -> Bytes {
Bytes::copy_from_slice(self.data())
}
pub fn parse_json<T>(&self) -> Result<T>
where
T: for<'de> Deserialize<'de>,
{
serde_json::from_slice(self.data())
.map_err(|e| Error::serialization(format!("JSON parse error: {}", e)))
}
pub fn as_str(&self) -> Result<&str> {
std::str::from_utf8(self.data())
.map_err(|e| Error::serialization(format!("Invalid UTF-8: {}", e)))
}
}
#[derive(Debug)]
pub struct MmapPool {
maps: dashmap::DashMap<std::path::PathBuf, Arc<Mmap>>,
max_size: usize,
}
impl MmapPool {
pub fn new(max_size: usize) -> Self {
Self {
maps: dashmap::DashMap::new(),
max_size,
}
}
pub fn get_or_create(&self, path: &Path) -> io::Result<Arc<Mmap>> {
if let Some(mmap) = self.maps.get(path) {
return Ok(Arc::clone(&*mmap));
}
let file = File::open(path)?;
let mmap = unsafe { MmapOptions::new().map(&file)? };
let mmap = Arc::new(mmap);
if self.maps.len() < self.max_size {
self.maps.insert(path.to_path_buf(), Arc::clone(&mmap));
}
Ok(mmap)
}
pub fn clear(&self) {
self.maps.clear();
}
pub fn size(&self) -> usize {
self.maps.len()
}
}
#[derive(Debug)]
pub struct MmapBatch {
mmap: Arc<Mmap>,
messages: Vec<(usize, usize)>,
ids: Vec<Arc<MessageId>>,
}
impl MmapBatch {
pub fn from_jsonl_file(path: &Path) -> io::Result<Self> {
let file = File::open(path)?;
let mmap = unsafe { MmapOptions::new().map(&file)? };
let mut messages = Vec::new();
let mut ids = Vec::new();
let mut offset = 0;
for (idx, line) in mmap.split(|&b| b == b'\n').enumerate() {
if !line.is_empty() {
messages.push((offset, line.len()));
ids.push(Arc::new(MessageId::Number(idx as i64)));
}
offset += line.len() + 1; }
Ok(Self {
mmap: Arc::new(mmap),
messages,
ids,
})
}
#[inline]
pub fn get(&self, index: usize) -> Option<&[u8]> {
self.messages
.get(index)
.map(|(offset, length)| &self.mmap[*offset..*offset + *length])
}
pub fn iter(&self) -> impl Iterator<Item = (&Arc<MessageId>, &[u8])> + '_ {
self.ids
.iter()
.zip(self.messages.iter())
.map(move |(id, (offset, length))| {
(id, &self.mmap.deref()[*offset..*offset + *length])
})
}
pub fn len(&self) -> usize {
self.messages.len()
}
pub fn is_empty(&self) -> bool {
self.messages.is_empty()
}
pub async fn from_jsonl_file_async(path: &std::path::Path) -> std::io::Result<Self> {
let path = path.to_path_buf();
tokio::task::spawn_blocking(move || Self::from_jsonl_file(&path))
.await
.map_err(|join_err| {
std::io::Error::other(format!(
"Async JSONL batch operation failed: {}",
join_err
))
})?
}
}
}
#[cfg(not(feature = "mmap"))]
pub mod mmap {
use super::*;
use std::fs;
use std::io;
#[derive(Debug)]
pub struct MmapMessage {
pub id: Arc<MessageId>,
pub data: Bytes,
pub metadata: MessageMetadata,
}
impl MmapMessage {
pub fn from_file(
id: MessageId,
path: &Path,
offset: usize,
length: Option<usize>,
) -> io::Result<Self> {
let data = fs::read(path)?;
let file_size = data.len();
if offset >= file_size {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
"Offset exceeds file size",
));
}
let actual_length = length.unwrap_or(file_size - offset);
let actual_length = actual_length.min(file_size - offset);
let data = Bytes::copy_from_slice(&data[offset..offset + actual_length]);
Ok(Self {
id: Arc::new(id),
data: data.clone(),
metadata: MessageMetadata {
created_at: Timestamp::now(),
content_type: ContentType::Json,
size: actual_length,
correlation_id: None,
},
})
}
#[inline]
pub fn data(&self) -> &[u8] {
&self.data
}
#[inline]
pub fn to_bytes(&self) -> Bytes {
self.data.clone()
}
pub fn parse_json<T>(&self) -> Result<T>
where
T: for<'de> Deserialize<'de>,
{
serde_json::from_slice(&self.data)
.map_err(|e| Error::serialization(format!("JSON parse error: {}", e)))
}
pub fn as_str(&self) -> Result<&str> {
std::str::from_utf8(&self.data)
.map_err(|e| Error::serialization(format!("Invalid UTF-8: {}", e)))
}
pub async fn from_file_async(
id: MessageId,
path: &std::path::Path,
offset: usize,
length: Option<usize>,
) -> std::io::Result<Self> {
let path = path.to_path_buf();
tokio::task::spawn_blocking(move || Self::from_file(id, &path, offset, length))
.await
.map_err(|join_err| {
std::io::Error::other(format!("Async file operation failed: {}", join_err))
})?
}
}
#[derive(Debug)]
pub struct MmapPool {
cache: dashmap::DashMap<std::path::PathBuf, Bytes>,
max_size: usize,
}
impl MmapPool {
pub fn new(max_size: usize) -> Self {
Self {
cache: dashmap::DashMap::new(),
max_size,
}
}
pub fn get_or_create(&self, path: &Path) -> io::Result<Bytes> {
if let Some(data) = self.cache.get(path) {
return Ok(data.clone());
}
let data = fs::read(path)?;
let bytes = Bytes::from(data);
if self.cache.len() < self.max_size {
self.cache.insert(path.to_path_buf(), bytes.clone());
}
Ok(bytes)
}
pub fn clear(&self) {
self.cache.clear();
}
pub fn size(&self) -> usize {
self.cache.len()
}
}
#[derive(Debug)]
pub struct MmapBatch {
data: Bytes,
messages: Vec<(usize, usize)>,
ids: Vec<Arc<MessageId>>,
}
impl MmapBatch {
pub fn from_jsonl_file(path: &Path) -> io::Result<Self> {
let data = fs::read(path)?;
let mut messages = Vec::new();
let mut ids = Vec::new();
let mut offset = 0;
for (idx, line) in data.split(|&b| b == b'\n').enumerate() {
if !line.is_empty() {
messages.push((offset, line.len()));
ids.push(Arc::new(MessageId::Number(idx as i64)));
}
offset += line.len() + 1;
}
Ok(Self {
data: Bytes::from(data),
messages,
ids,
})
}
#[inline]
pub fn get(&self, index: usize) -> Option<&[u8]> {
self.messages
.get(index)
.map(|(offset, length)| &self.data[*offset..*offset + *length])
}
pub fn iter(&self) -> impl Iterator<Item = (&Arc<MessageId>, &[u8])> + '_ {
self.ids
.iter()
.zip(self.messages.iter())
.map(move |(id, (offset, length))| (id, &self.data[*offset..*offset + *length]))
}
pub fn len(&self) -> usize {
self.messages.len()
}
pub fn is_empty(&self) -> bool {
self.messages.is_empty()
}
pub async fn from_jsonl_file_async(path: &std::path::Path) -> std::io::Result<Self> {
let path = path.to_path_buf();
tokio::task::spawn_blocking(move || Self::from_jsonl_file(&path))
.await
.map_err(|join_err| {
std::io::Error::other(format!(
"Async JSONL batch operation failed: {}",
join_err
))
})?
}
}
}
impl From<String> for MessageId {
fn from(s: String) -> Self {
Self::String(Arc::from(s))
}
}
impl From<&str> for MessageId {
fn from(s: &str) -> Self {
Self::String(Arc::from(s))
}
}
impl From<i64> for MessageId {
fn from(n: i64) -> Self {
Self::Number(n)
}
}
impl From<Uuid> for MessageId {
fn from(u: Uuid) -> Self {
Self::Uuid(u)
}
}
impl fmt::Display for MessageId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::String(s) => write!(f, "{}", s),
Self::Number(n) => write!(f, "{}", n),
Self::Uuid(u) => write!(f, "{}", u),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_zero_copy_message_creation() {
let payload = Bytes::from(r#"{"test": "data"}"#);
let msg = ZeroCopyMessage::from_bytes(MessageId::from("test-1"), payload.clone());
assert_eq!(msg.payload, payload);
assert_eq!(msg.metadata.size, payload.len());
}
#[test]
fn test_lazy_json_parsing() {
let payload = Bytes::from(r#"{"key": "value", "number": 42}"#);
let mut msg = ZeroCopyMessage::from_bytes(MessageId::from("test-2"), payload);
let raw = msg.parse_json_lazy().unwrap();
assert!(raw.get().contains("value"));
assert!(msg.lazy_json.is_some());
}
#[test]
fn test_buffer_pool() {
let pool = BufferPool::new(2, 1024);
let buf1 = pool.acquire();
let buf2 = pool.acquire();
let buf3 = pool.acquire();
assert_eq!(buf1.capacity(), 1024);
assert_eq!(buf2.capacity(), 1024);
assert_eq!(buf3.capacity(), 1024);
pool.release(buf1);
let buf4 = pool.acquire(); assert_eq!(buf4.capacity(), 1024);
}
#[test]
fn test_message_batch() {
let mut batch = MessageBatch::new(10);
batch.add(MessageId::from("msg1"), Bytes::from("data1"));
batch.add(MessageId::from("msg2"), Bytes::from("data2"));
batch.add(MessageId::from("msg3"), Bytes::from("data3"));
assert_eq!(batch.messages.len(), 3);
let msg1 = batch.get(0).unwrap();
assert_eq!(msg1, Bytes::from("data1"));
let msg2 = batch.get(1).unwrap();
assert_eq!(msg2, Bytes::from("data2"));
let mut count = 0;
for (_id, payload) in batch.iter() {
count += 1;
assert!(!payload.is_empty());
}
assert_eq!(count, 3);
}
#[test]
fn test_cheap_clone() {
let msg = ZeroCopyMessage::from_bytes(MessageId::from("test"), Bytes::from("data"));
let cloned = msg.cheap_clone();
assert!(Arc::ptr_eq(&msg.id, &cloned.id));
assert_eq!(msg.payload, cloned.payload);
}
#[test]
fn test_mmap_message() {
use std::io::Write;
let temp_dir = std::env::temp_dir();
let test_file = temp_dir.join("test_mmap.json");
let mut file = std::fs::File::create(&test_file).unwrap();
let test_data = r#"{"test": "data", "value": 42}"#;
file.write_all(test_data.as_bytes()).unwrap();
file.sync_all().unwrap();
drop(file);
let msg = mmap::MmapMessage::from_file(MessageId::from("mmap-test"), &test_file, 0, None)
.unwrap();
assert_eq!(msg.data(), test_data.as_bytes());
assert_eq!(msg.as_str().unwrap(), test_data);
let value: serde_json::Value = msg.parse_json().unwrap();
assert_eq!(value["test"], "data");
assert_eq!(value["value"], 42);
std::fs::remove_file(test_file).unwrap();
}
#[test]
fn test_mmap_batch() {
use std::io::Write;
let temp_dir = std::env::temp_dir();
let test_file = temp_dir.join("test_batch.jsonl");
let mut file = std::fs::File::create(&test_file).unwrap();
writeln!(file, r#"{{"id": 1, "name": "first"}}"#).unwrap();
writeln!(file, r#"{{"id": 2, "name": "second"}}"#).unwrap();
writeln!(file, r#"{{"id": 3, "name": "third"}}"#).unwrap();
file.sync_all().unwrap();
drop(file);
let batch = mmap::MmapBatch::from_jsonl_file(&test_file).unwrap();
assert_eq!(batch.len(), 3);
assert!(!batch.is_empty());
let msg1 = batch.get(0).unwrap();
let value: serde_json::Value = serde_json::from_slice(msg1).unwrap();
assert_eq!(value["id"], 1);
assert_eq!(value["name"], "first");
let mut count = 0;
for (_id, data) in batch.iter() {
let value: serde_json::Value = serde_json::from_slice(data).unwrap();
assert!(value["id"].is_number());
assert!(value["name"].is_string());
count += 1;
}
assert_eq!(count, 3);
std::fs::remove_file(test_file).unwrap();
}
#[test]
fn test_mmap_pool() {
use std::io::Write;
let temp_dir = std::env::temp_dir();
let test_file1 = temp_dir.join("pool_test1.json");
let test_file2 = temp_dir.join("pool_test2.json");
let mut file1 = std::fs::File::create(&test_file1).unwrap();
file1.write_all(b"test1").unwrap();
file1.sync_all().unwrap();
let mut file2 = std::fs::File::create(&test_file2).unwrap();
file2.write_all(b"test2").unwrap();
file2.sync_all().unwrap();
let pool = mmap::MmapPool::new(10);
assert_eq!(pool.size(), 0);
let _data1 = pool.get_or_create(&test_file1).unwrap();
assert_eq!(pool.size(), 1);
let _data2 = pool.get_or_create(&test_file2).unwrap();
assert_eq!(pool.size(), 2);
let _data1_again = pool.get_or_create(&test_file1).unwrap();
assert_eq!(pool.size(), 2);
pool.clear();
assert_eq!(pool.size(), 0);
std::fs::remove_file(test_file1).unwrap();
std::fs::remove_file(test_file2).unwrap();
}
#[cfg(feature = "mmap")]
mod async_mmap_tests {
use super::MessageId;
use super::mmap::*;
use std::io::Write;
use std::path::Path;
#[tokio::test]
async fn test_mmap_message_from_file_async_performance() {
let temp_dir = std::env::temp_dir();
let test_file = temp_dir.join("async_mmap_test.json");
{
let mut file = std::fs::File::create(&test_file).unwrap();
let test_data = r#"{"test": "async_data", "large_field": "Lorem ipsum dolor sit amet, consectetur adipiscing elit. Sed do eiusmod tempor incididunt ut labore et dolore magna aliqua."}"#;
file.write_all(test_data.as_bytes()).unwrap();
file.sync_all().unwrap();
}
let handles = (0..3)
.map(|i| {
let test_file = test_file.clone();
tokio::spawn(async move {
let start_time = std::time::Instant::now();
let result = MmapMessage::from_file_async(
MessageId::from(format!("async-test-{}", i)),
&test_file,
0,
None,
)
.await;
let duration = start_time.elapsed();
assert!(
duration.as_millis() < 100,
"Async mmap took {}ms - should be <100ms",
duration.as_millis()
);
(i, result)
})
})
.collect::<Vec<_>>();
let start_time = std::time::Instant::now();
let results = futures::future::join_all(handles).await;
let total_duration = start_time.elapsed();
assert!(
total_duration.as_millis() < 200,
"Concurrent async mmap operations took {}ms - should be <200ms",
total_duration.as_millis()
);
for result in results {
let (i, mmap_result) = result.unwrap();
let mmap_msg = mmap_result.unwrap();
assert_eq!(*mmap_msg.id, MessageId::from(format!("async-test-{}", i)));
assert!(!mmap_msg.data().is_empty());
}
std::fs::remove_file(test_file).unwrap();
}
#[tokio::test]
async fn test_mmap_batch_from_jsonl_file_async_concurrency() {
let temp_dir = std::env::temp_dir();
let test_file = temp_dir.join("async_batch_test.jsonl");
{
let mut file = std::fs::File::create(&test_file).unwrap();
writeln!(file, r#"{{"id": "msg1", "data": "test1"}}"#).unwrap();
writeln!(file, r#"{{"id": "msg2", "data": "test2"}}"#).unwrap();
writeln!(file, r#"{{"id": "msg3", "data": "test3"}}"#).unwrap();
file.sync_all().unwrap();
}
let handles = (0..5)
.map(|_| {
let test_file = test_file.clone();
tokio::spawn(async move {
let start_time = std::time::Instant::now();
let result = MmapBatch::from_jsonl_file_async(&test_file).await;
let duration = start_time.elapsed();
assert!(
duration.as_millis() < 150,
"Async batch processing took {}ms - should be <150ms",
duration.as_millis()
);
result
})
})
.collect::<Vec<_>>();
let results = futures::future::join_all(handles).await;
for result in results {
let batch = result.unwrap().unwrap();
assert_eq!(batch.len(), 3);
}
std::fs::remove_file(test_file).unwrap();
}
#[tokio::test]
async fn test_async_mmap_error_handling() {
let non_existent = Path::new("/tmp/does_not_exist_async.json");
let result = MmapMessage::from_file_async(
MessageId::String("error-test".to_string().into()),
non_existent,
0,
None,
)
.await;
assert!(result.is_err());
let error_msg = format!("{}", result.unwrap_err());
assert!(
error_msg.contains("No such file") || error_msg.contains("not found"),
"Error should be descriptive: {}",
error_msg
);
}
#[tokio::test]
async fn test_async_mmap_maintains_functionality() {
let temp_dir = std::env::temp_dir();
let test_file = temp_dir.join("functionality_test.json");
let test_data = r#"{"test": "functionality", "value": 42}"#;
std::fs::write(&test_file, test_data).unwrap();
let sync_result = MmapMessage::from_file(
MessageId::String("sync".to_string().into()),
&test_file,
0,
None,
)
.unwrap();
let async_result = MmapMessage::from_file_async(
MessageId::String("async".to_string().into()),
&test_file,
0,
None,
)
.await
.unwrap();
assert_eq!(sync_result.data(), async_result.data());
assert_eq!(sync_result.data(), test_data.as_bytes());
std::fs::remove_file(test_file).unwrap();
}
}
}