use std::sync::Arc;
use bytes::Bytes;
use system_interface::fs::FileIoExt;
use tokio::sync::RwLock;
use wasmtime_wasi_io::poll::Pollable;
use wasmtime_wasi_io::streams::{InputStream, OutputStream, StreamError, StreamResult};
use crate::storage::VfsStorage;
pub struct RealFileInputStream {
file: Arc<cap_std::fs::File>,
position: u64,
closed: bool,
}
impl RealFileInputStream {
pub fn new(file: Arc<cap_std::fs::File>, offset: u64) -> Self {
Self {
file,
position: offset,
closed: false,
}
}
}
#[async_trait::async_trait]
impl Pollable for RealFileInputStream {
async fn ready(&mut self) {
}
}
#[async_trait::async_trait]
impl InputStream for RealFileInputStream {
fn read(&mut self, size: usize) -> StreamResult<Bytes> {
if self.closed {
return Err(StreamError::Closed);
}
if size == 0 {
return Ok(Bytes::new());
}
let mut buf = vec![0u8; size];
match self.file.read_at(&mut buf, self.position) {
Ok(0) => {
self.closed = true;
Err(StreamError::Closed)
}
Ok(n) => {
buf.truncate(n);
self.position += n as u64;
Ok(Bytes::from(buf))
}
Err(e) => Err(StreamError::LastOperationFailed(e.into())),
}
}
async fn blocking_read(&mut self, size: usize) -> StreamResult<Bytes> {
self.read(size)
}
}
pub struct RealFileOutputStream {
file: Arc<cap_std::fs::File>,
position: u64,
append: bool,
closed: bool,
}
impl RealFileOutputStream {
pub fn write_at(file: Arc<cap_std::fs::File>, offset: u64) -> Self {
Self {
file,
position: offset,
append: false,
closed: false,
}
}
pub fn append(file: Arc<cap_std::fs::File>) -> Self {
Self {
file,
position: 0, append: true,
closed: false,
}
}
}
#[async_trait::async_trait]
impl Pollable for RealFileOutputStream {
async fn ready(&mut self) {
}
}
#[async_trait::async_trait]
impl OutputStream for RealFileOutputStream {
fn write(&mut self, bytes: Bytes) -> StreamResult<()> {
if self.closed {
return Err(StreamError::Closed);
}
if bytes.is_empty() {
return Ok(());
}
let result = if self.append {
match self.file.metadata() {
Ok(meta) => {
let len = meta.len();
self.file.write_at(&bytes, len)
}
Err(e) => Err(e),
}
} else {
self.file.write_at(&bytes, self.position)
};
match result {
Ok(n) => {
if !self.append {
self.position += n as u64;
}
Ok(())
}
Err(e) => Err(StreamError::LastOperationFailed(e.into())),
}
}
fn flush(&mut self) -> StreamResult<()> {
if self.closed {
return Err(StreamError::Closed);
}
Ok(())
}
fn check_write(&mut self) -> StreamResult<usize> {
if self.closed {
return Err(StreamError::Closed);
}
Ok(64 * 1024) }
async fn blocking_write_and_flush(&mut self, bytes: Bytes) -> StreamResult<()> {
self.write(bytes)?;
self.flush()
}
}
pub struct VfsInputStream<S: VfsStorage + Clone + 'static> {
storage: S,
path: String,
position: u64,
closed: bool,
}
impl<S: VfsStorage + Clone + 'static> VfsInputStream<S> {
pub fn new(storage: S, path: String, offset: u64) -> Self {
Self {
storage,
path,
position: offset,
closed: false,
}
}
fn read_sync(&mut self, size: usize) -> StreamResult<Bytes> {
if size == 0 {
return Ok(Bytes::new());
}
let storage = self.storage.clone();
let path = self.path.clone();
let position = self.position;
let result = std::thread::spawn(move || {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.map_err(|e| format!("Failed to create runtime: {e}"))?;
rt.block_on(async { storage.read_at(&path, position, size as u64).await })
.map_err(|e| format!("VFS read failed: {:?}", e))
})
.join()
.map_err(|_| StreamError::trap("Thread panicked during VFS read"))?;
match result {
Ok(data) => {
if data.is_empty() {
self.closed = true;
Err(StreamError::Closed)
} else {
self.position += data.len() as u64;
Ok(Bytes::from(data))
}
}
Err(e) => Err(StreamError::LastOperationFailed(wasmtime::Error::msg(
e.to_string(),
))),
}
}
}
#[async_trait::async_trait]
impl<S: VfsStorage + Clone + 'static> Pollable for VfsInputStream<S> {
async fn ready(&mut self) {
}
}
#[async_trait::async_trait]
impl<S: VfsStorage + Clone + 'static> InputStream for VfsInputStream<S> {
fn read(&mut self, size: usize) -> StreamResult<Bytes> {
if self.closed {
return Err(StreamError::Closed);
}
self.read_sync(size)
}
async fn blocking_read(&mut self, size: usize) -> StreamResult<Bytes> {
self.read(size)
}
}
pub struct VfsOutputStream<S: VfsStorage + Clone + 'static> {
storage: S,
path: String,
buffer: Arc<RwLock<Vec<u8>>>,
position: Option<u64>,
closed: bool,
}
impl<S: VfsStorage + Clone + 'static> VfsOutputStream<S> {
pub fn write_at(storage: S, path: String, offset: u64) -> Self {
Self {
storage,
path,
buffer: Arc::new(RwLock::new(Vec::new())),
position: Some(offset),
closed: false,
}
}
pub fn append(storage: S, path: String) -> Self {
Self {
storage,
path,
buffer: Arc::new(RwLock::new(Vec::new())),
position: None, closed: false,
}
}
fn flush_sync(&mut self) -> StreamResult<()> {
let buffer_data = if let Ok(mut buf) = self.buffer.try_write() {
std::mem::take(&mut *buf)
} else {
let mut buf = self.buffer.blocking_write();
std::mem::take(&mut *buf)
};
if buffer_data.is_empty() {
return Ok(());
}
let storage = self.storage.clone();
let path = self.path.clone();
let position = self.position;
let data_len = buffer_data.len() as u64;
let result = std::thread::spawn(move || {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.map_err(|e| format!("Failed to create runtime: {e}"))?;
rt.block_on(async {
let write_position = match position {
Some(pos) => pos,
None => {
match storage.stat(&path).await {
Ok(meta) => meta.size,
Err(_) => 0,
}
}
};
storage
.write_at(&path, write_position, &buffer_data)
.await
.map(|()| write_position)
})
.map_err(|e| format!("VFS write failed: {:?}", e))
})
.join()
.map_err(|_| StreamError::trap("Thread panicked during VFS write"))?;
match result {
Ok(write_position) => {
self.position = Some(write_position + data_len);
Ok(())
}
Err(e) => Err(StreamError::LastOperationFailed(wasmtime::Error::msg(
e.to_string(),
))),
}
}
}
#[async_trait::async_trait]
impl<S: VfsStorage + Clone + 'static> Pollable for VfsOutputStream<S> {
async fn ready(&mut self) {
}
}
#[async_trait::async_trait]
impl<S: VfsStorage + Clone + 'static> OutputStream for VfsOutputStream<S> {
fn write(&mut self, bytes: Bytes) -> StreamResult<()> {
if self.closed {
return Err(StreamError::Closed);
}
if bytes.is_empty() {
return Ok(());
}
if let Ok(mut buf) = self.buffer.try_write() {
buf.extend_from_slice(&bytes);
} else {
let mut buf = self.buffer.blocking_write();
buf.extend_from_slice(&bytes);
}
let buf_len = if let Ok(buf) = self.buffer.try_read() {
buf.len()
} else {
let buf = self.buffer.blocking_read();
buf.len()
};
if buf_len >= 64 * 1024 {
self.flush_sync()?;
}
Ok(())
}
fn flush(&mut self) -> StreamResult<()> {
if self.closed {
return Err(StreamError::Closed);
}
self.flush_sync()
}
fn check_write(&mut self) -> StreamResult<usize> {
if self.closed {
return Err(StreamError::Closed);
}
Ok(64 * 1024) }
async fn blocking_write_and_flush(&mut self, bytes: Bytes) -> StreamResult<()> {
self.write(bytes)?;
self.flush()
}
}
impl<S: VfsStorage + Clone + 'static> Drop for VfsOutputStream<S> {
fn drop(&mut self) {
let _ = self.flush_sync();
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used)]
mod tests {
use super::*;
use crate::storage::{ArcStorage, InMemoryStorage};
#[tokio::test]
async fn test_vfs_input_stream_read() {
let storage = ArcStorage::new(Arc::new(InMemoryStorage::new()));
storage.write("/test.txt", b"hello world").await.unwrap();
let mut stream = VfsInputStream::new(storage.clone(), "/test.txt".to_string(), 0);
let result = stream.read(100).unwrap();
assert_eq!(&*result, b"hello world");
let err = stream.read(100).unwrap_err();
assert!(matches!(err, StreamError::Closed));
}
#[tokio::test]
async fn test_vfs_input_stream_read_at_offset() {
let storage = ArcStorage::new(Arc::new(InMemoryStorage::new()));
storage.write("/test.txt", b"hello world").await.unwrap();
let mut stream = VfsInputStream::new(storage.clone(), "/test.txt".to_string(), 6);
let result = stream.read(100).unwrap();
assert_eq!(&*result, b"world");
}
#[tokio::test]
async fn test_vfs_input_stream_partial_read() {
let storage = ArcStorage::new(Arc::new(InMemoryStorage::new()));
storage.write("/test.txt", b"hello world").await.unwrap();
let mut stream = VfsInputStream::new(storage.clone(), "/test.txt".to_string(), 0);
let result = stream.read(5).unwrap();
assert_eq!(&*result, b"hello");
let result = stream.read(100).unwrap();
assert_eq!(&*result, b" world");
}
#[tokio::test]
async fn test_vfs_output_stream_write() {
let storage = ArcStorage::new(Arc::new(InMemoryStorage::new()));
storage.write("/test.txt", b"").await.unwrap();
let mut stream = VfsOutputStream::write_at(storage.clone(), "/test.txt".to_string(), 0);
stream.write(Bytes::from("hello")).unwrap();
stream.flush().unwrap();
let content = storage.read("/test.txt").await.unwrap();
assert_eq!(&content, b"hello");
}
#[tokio::test]
async fn test_vfs_output_stream_write_at_offset() {
let storage = ArcStorage::new(Arc::new(InMemoryStorage::new()));
storage.write("/test.txt", b"XXXXX world").await.unwrap();
let mut stream = VfsOutputStream::write_at(storage.clone(), "/test.txt".to_string(), 0);
stream.write(Bytes::from("hello")).unwrap();
stream.flush().unwrap();
let content = storage.read("/test.txt").await.unwrap();
assert_eq!(&content, b"hello world");
}
#[tokio::test]
async fn test_vfs_output_stream_append() {
let storage = ArcStorage::new(Arc::new(InMemoryStorage::new()));
storage.write("/test.txt", b"hello").await.unwrap();
let mut stream = VfsOutputStream::append(storage.clone(), "/test.txt".to_string());
stream.write(Bytes::from(" world")).unwrap();
stream.flush().unwrap();
let content = storage.read("/test.txt").await.unwrap();
assert_eq!(&content, b"hello world");
}
#[tokio::test]
async fn test_vfs_output_stream_append_to_nonexistent() {
let storage = ArcStorage::new(Arc::new(InMemoryStorage::new()));
let mut stream = VfsOutputStream::append(storage.clone(), "/new.txt".to_string());
stream.write(Bytes::from("new content")).unwrap();
stream.flush().unwrap();
let content = storage.read("/new.txt").await.unwrap();
assert_eq!(&content, b"new content");
}
#[tokio::test]
async fn test_vfs_output_stream_multiple_appends() {
let storage = ArcStorage::new(Arc::new(InMemoryStorage::new()));
storage.write("/test.txt", b"").await.unwrap();
let mut stream = VfsOutputStream::append(storage.clone(), "/test.txt".to_string());
stream.write(Bytes::from("one")).unwrap();
stream.flush().unwrap();
stream.write(Bytes::from("two")).unwrap();
stream.flush().unwrap();
stream.write(Bytes::from("three")).unwrap();
stream.flush().unwrap();
let content = storage.read("/test.txt").await.unwrap();
assert_eq!(&content, b"onetwothree");
}
#[tokio::test]
async fn test_vfs_output_stream_drop_flushes() {
let storage = ArcStorage::new(Arc::new(InMemoryStorage::new()));
storage.write("/test.txt", b"").await.unwrap();
{
let mut stream = VfsOutputStream::write_at(storage.clone(), "/test.txt".to_string(), 0);
stream.write(Bytes::from("hello")).unwrap();
}
let content = storage.read("/test.txt").await.unwrap();
assert_eq!(&content, b"hello");
}
#[tokio::test]
async fn test_vfs_streams_closed_error() {
let storage = ArcStorage::new(Arc::new(InMemoryStorage::new()));
storage.write("/test.txt", b"hello").await.unwrap();
let mut input = VfsInputStream::new(storage.clone(), "/test.txt".to_string(), 0);
let _ = input.read(100).unwrap(); let _ = input.read(100);
let err = input.read(100).unwrap_err();
assert!(matches!(err, StreamError::Closed));
}
}