use crate::{buffer::tip::Buffer, Blob, Error, RwLock};
use commonware_utils::StableBuf;
use std::{num::NonZeroUsize, sync::Arc};
#[derive(Clone)]
pub struct Write<B: Blob> {
blob: B,
buffer: Arc<RwLock<Buffer>>,
}
impl<B: Blob> Write<B> {
pub fn new(blob: B, size: u64, capacity: NonZeroUsize) -> Self {
Self {
blob,
buffer: Arc::new(RwLock::new(Buffer::new(size, capacity))),
}
}
#[allow(clippy::len_without_is_empty)]
pub async fn size(&self) -> u64 {
let buffer = self.buffer.read().await;
buffer.size()
}
}
impl<B: Blob> Blob for Write<B> {
async fn read_at(
&self,
buf: impl Into<StableBuf> + Send,
offset: u64,
) -> Result<StableBuf, Error> {
let mut buf = buf.into();
let buf_len = buf.len();
let end_offset = offset
.checked_add(buf_len as u64)
.ok_or(Error::OffsetOverflow)?;
let buffer = self.buffer.read().await;
if end_offset > buffer.size() {
return Err(Error::BlobInsufficientLength);
}
let remaining = buffer.extract(buf.as_mut(), offset);
if remaining == 0 {
return Ok(buf);
}
let blob_part = self.blob.read_at(vec![0u8; remaining], offset).await?;
buf.as_mut()[..remaining].copy_from_slice(blob_part.as_ref());
Ok(buf)
}
async fn write_at(&self, buf: impl Into<StableBuf> + Send, offset: u64) -> Result<(), Error> {
let buf = buf.into();
let buf_len = buf.len();
let end_offset = offset
.checked_add(buf_len as u64)
.ok_or(Error::OffsetOverflow)?;
let mut buffer = self.buffer.write().await;
if buffer.merge(buf.as_ref(), offset) {
return Ok(());
}
if buffer.offset < end_offset {
if let Some((old_buf, old_offset)) = buffer.take() {
self.blob.write_at(old_buf, old_offset).await?;
if buffer.merge(buf.as_ref(), offset) {
return Ok(());
}
}
}
self.blob.write_at(buf, offset).await?;
buffer.offset = buffer.offset.max(end_offset);
Ok(())
}
async fn resize(&self, len: u64) -> Result<(), Error> {
let mut buffer = self.buffer.write().await;
if let Some((buf, offset)) = buffer.resize(len) {
self.blob.write_at(buf, offset).await?;
}
self.blob.resize(len).await?;
Ok(())
}
async fn sync(&self) -> Result<(), Error> {
let mut buffer = self.buffer.write().await;
if let Some((buf, offset)) = buffer.take() {
self.blob.write_at(buf, offset).await?;
}
self.blob.sync().await
}
}