use crate::sim::WeakSimWorld;
use crate::sim::state::FileId;
use futures::io::{AsyncRead, AsyncSeek, AsyncWrite};
use moonpool_core::StorageFile;
use std::io::{self, SeekFrom};
use std::pin::Pin;
use std::sync::Mutex;
use std::task::{Context, Poll};
use super::futures::{SetLenFuture, SyncFuture};
use super::sim_shutdown_error;
#[derive(Debug)]
pub struct SimStorageFile {
sim: WeakSimWorld,
file_id: FileId,
pending_read: Mutex<Option<(u64, u64, usize)>>,
pending_write: Mutex<Option<(u64, usize)>>,
}
impl SimStorageFile {
pub(crate) fn new(sim: WeakSimWorld, file_id: FileId) -> Self {
Self {
sim,
file_id,
pending_read: Mutex::new(None),
pending_write: Mutex::new(None),
}
}
}
impl StorageFile for SimStorageFile {
async fn sync_all(&self) -> io::Result<()> {
SyncFuture::new(self.sim.clone(), self.file_id).await
}
async fn sync_data(&self) -> io::Result<()> {
SyncFuture::new(self.sim.clone(), self.file_id).await
}
async fn size(&self) -> io::Result<u64> {
let sim = self.sim.upgrade().map_err(|_| sim_shutdown_error())?;
sim.file_size(self.file_id)
.map_err(|e| io::Error::other(e.to_string()))
}
async fn set_len(&self, size: u64) -> io::Result<()> {
SetLenFuture::new(self.sim.clone(), self.file_id, size).await
}
}
impl AsyncRead for SimStorageFile {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
let sim = self.sim.upgrade().map_err(|_| sim_shutdown_error())?;
let pending = *self
.pending_read
.lock()
.expect("Mutex poisoned: prior task panicked");
if let Some((op_seq, offset, len)) = pending {
if sim.is_storage_op_complete(self.file_id, op_seq) {
*self
.pending_read
.lock()
.expect("Mutex poisoned: prior task panicked") = None;
let bytes_to_read = buf.len().min(len);
if bytes_to_read == 0 {
return Poll::Ready(Ok(0));
}
let bytes_read =
sim.read_from_file(self.file_id, offset, &mut buf[..bytes_to_read])?;
let new_position = offset + bytes_read as u64;
sim.set_file_position(self.file_id, new_position)?;
return Poll::Ready(Ok(bytes_read));
}
sim.register_storage_waker(self.file_id, op_seq, cx.waker().clone());
return Poll::Pending;
}
let position = sim.file_position(self.file_id)?;
let file_size = sim.file_size(self.file_id)?;
if position >= file_size {
return Poll::Ready(Ok(0)); }
let remaining_in_file =
usize::try_from(file_size - position).expect("remaining bytes in file fit in usize");
let len = buf.len().min(remaining_in_file);
if len == 0 {
return Poll::Ready(Ok(0));
}
let op_seq = sim.schedule_read(self.file_id, position, len)?;
*self
.pending_read
.lock()
.expect("Mutex poisoned: prior task panicked") = Some((op_seq, position, len));
sim.register_storage_waker(self.file_id, op_seq, cx.waker().clone());
Poll::Pending
}
}
impl AsyncWrite for SimStorageFile {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
let sim = self.sim.upgrade().map_err(|_| sim_shutdown_error())?;
let pending = *self
.pending_write
.lock()
.expect("Mutex poisoned: prior task panicked");
if let Some((op_seq, bytes_written)) = pending {
if sim.is_storage_op_complete(self.file_id, op_seq) {
*self
.pending_write
.lock()
.expect("Mutex poisoned: prior task panicked") = None;
let position = sim.file_position(self.file_id)?;
let new_position = position + bytes_written as u64;
sim.set_file_position(self.file_id, new_position)?;
return Poll::Ready(Ok(bytes_written));
}
sim.register_storage_waker(self.file_id, op_seq, cx.waker().clone());
return Poll::Pending;
}
if buf.is_empty() {
return Poll::Ready(Ok(0));
}
let position = sim.file_position(self.file_id)?;
let op_seq = sim.schedule_write(self.file_id, position, buf.to_vec())?;
*self
.pending_write
.lock()
.expect("Mutex poisoned: prior task panicked") = Some((op_seq, buf.len()));
sim.register_storage_waker(self.file_id, op_seq, cx.waker().clone());
Poll::Pending
}
fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Poll::Ready(Ok(()))
}
fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Poll::Ready(Ok(()))
}
}
impl AsyncSeek for SimStorageFile {
fn poll_seek(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
pos: SeekFrom,
) -> Poll<io::Result<u64>> {
let sim = self.sim.upgrade().map_err(|_| sim_shutdown_error())?;
let current_position = sim.file_position(self.file_id)?;
let file_size = sim.file_size(self.file_id)?;
let target = match pos {
SeekFrom::Start(p) => p,
SeekFrom::End(offset) => {
if offset >= 0 {
file_size.saturating_add(offset.unsigned_abs())
} else {
file_size.saturating_sub(offset.unsigned_abs())
}
}
SeekFrom::Current(offset) => {
if offset >= 0 {
current_position.saturating_add(offset.unsigned_abs())
} else {
current_position.saturating_sub(offset.unsigned_abs())
}
}
};
sim.set_file_position(self.file_id, target)?;
Poll::Ready(Ok(target))
}
}