use super::{PublicFileSerializable, PublicNodeSerializable};
use crate::{
WNFS_VERSION, error::FsError, is_readable_wnfs_version, traits::Id, utils::OnceCellDebug,
};
use anyhow::{Result, anyhow, bail};
use async_once_cell::OnceCell;
use chrono::{DateTime, Utc};
use futures::{AsyncRead, AsyncReadExt};
use std::{cmp::Ordering, collections::BTreeSet, io::SeekFrom};
use tokio::io::AsyncSeekExt;
use tokio_util::compat::{FuturesAsyncReadCompatExt, TokioAsyncReadCompatExt};
use wnfs_common::{
BlockStore, Cid, Link, Metadata, NodeType, Storable,
utils::{Arc, CondSend},
};
use wnfs_unixfs_file::{builder::FileBuilder, unixfs::UnixFsFile};
pub struct PublicFile {
persisted_as: OnceCell<Cid>,
pub(crate) metadata: Metadata,
pub(crate) userland: Link<UnixFsFile>,
pub(crate) previous: BTreeSet<Cid>,
}
impl PublicFile {
pub fn new(time: DateTime<Utc>) -> Self {
Self {
persisted_as: OnceCell::new(),
metadata: Metadata::new(time),
userland: Link::from(UnixFsFile::empty()),
previous: BTreeSet::new(),
}
}
pub fn new_rc(time: DateTime<Utc>) -> Arc<Self> {
Arc::new(Self::new(time))
}
pub async fn with_content(
time: DateTime<Utc>,
content: Vec<u8>,
store: &impl BlockStore,
) -> Result<Self> {
let content_cid = FileBuilder::new()
.content_bytes(content)
.build()?
.store(store)
.await?;
Ok(Self {
persisted_as: OnceCell::new(),
metadata: Metadata::new(time),
userland: Link::from_cid(content_cid),
previous: BTreeSet::new(),
})
}
pub async fn with_content_rc(
time: DateTime<Utc>,
content: Vec<u8>,
store: &impl BlockStore,
) -> Result<Arc<Self>> {
Ok(Arc::new(Self::with_content(time, content, store).await?))
}
pub async fn with_content_streaming<'a>(
time: DateTime<Utc>,
content: impl AsyncRead + CondSend + 'a,
store: &'a impl BlockStore,
) -> Result<Self> {
let content_cid = FileBuilder::new()
.content_reader(FuturesAsyncReadCompatExt::compat(content))
.build()?
.store(store)
.await?;
Ok(Self {
persisted_as: OnceCell::new(),
metadata: Metadata::new(time),
userland: Link::from_cid(content_cid),
previous: BTreeSet::new(),
})
}
pub async fn with_content_streaming_rc<'a>(
time: DateTime<Utc>,
content: impl AsyncRead + CondSend + 'a,
store: &'a impl BlockStore,
) -> Result<Arc<Self>> {
Ok(Arc::new(
Self::with_content_streaming(time, content, store).await?,
))
}
pub fn copy_content_from(&mut self, other: &Self, time: DateTime<Utc>) {
self.metadata.upsert_mtime(time);
self.userland = other.userland.clone();
}
pub async fn stream_content<'a>(
&'a self,
byte_offset: u64,
store: &'a impl BlockStore,
) -> Result<impl AsyncRead + CondSend + 'a> {
let mut reader = self
.userland
.resolve_value(store)
.await?
.clone()
.into_content_reader(store, None)?;
reader.seek(SeekFrom::Start(byte_offset)).await?;
Ok(TokioAsyncReadCompatExt::compat(reader))
}
pub async fn read_at<'a>(
&'a self,
byte_offset: u64,
len_limit: Option<usize>,
store: &'a impl BlockStore,
) -> Result<Vec<u8>> {
let size = self.size(store).await?;
let mut reader = self.stream_content(byte_offset, store).await?;
if let Some(len) = len_limit {
let len = std::cmp::min(len as u64, size - byte_offset) as usize;
let mut buffer = vec![0; len];
reader.read_exact(&mut buffer).await?;
Ok(buffer)
} else {
let mut buffer = Vec::new();
reader.read_to_end(&mut buffer).await?;
Ok(buffer)
}
}
pub async fn size(&self, store: &impl BlockStore) -> Result<u64> {
self.userland
.resolve_value(store)
.await?
.filesize()
.ok_or_else(|| anyhow!("Missing size on dag-pb node"))
}
pub async fn get_content(&self, store: &impl BlockStore) -> Result<Vec<u8>> {
self.read_at(0, None, store).await
}
pub fn prepare_next_revision(self: &mut Arc<Self>) -> &mut Self {
let Some(previous_cid) = self.persisted_as.get().cloned() else {
return Arc::make_mut(self);
};
let cloned = Arc::make_mut(self);
cloned.persisted_as = OnceCell::new();
cloned.previous = [previous_cid].into_iter().collect();
cloned
}
pub(crate) async fn prepare_next_merge<'a>(
self: &'a mut Arc<Self>,
store: &impl BlockStore,
) -> Result<&'a mut Self> {
if self.previous.len() > 1 {
let cloned = Arc::make_mut(self);
cloned.persisted_as = OnceCell::new();
return Ok(cloned);
}
let previous_cid = self.store(store).await?;
let cloned = Arc::make_mut(self);
cloned.persisted_as = OnceCell::new();
cloned.previous = BTreeSet::from([previous_cid]);
Ok(cloned)
}
pub async fn set_content(
&mut self,
content: Vec<u8>,
time: DateTime<Utc>,
store: &impl BlockStore,
) -> Result<()> {
let content_cid = FileBuilder::new()
.content_bytes(content)
.build()?
.store(store)
.await?;
self.metadata.upsert_mtime(time);
self.userland = Link::from_cid(content_cid);
Ok(())
}
pub async fn get_raw_content_cid(&self, store: &impl BlockStore) -> Cid {
let content_cid: Result<Cid> = self.userland.resolve_cid(store).await;
content_cid.unwrap()
}
pub fn get_previous(&self) -> &BTreeSet<Cid> {
&self.previous
}
pub fn get_metadata(&self) -> &Metadata {
&self.metadata
}
pub fn get_metadata_mut(&mut self) -> &mut Metadata {
&mut self.metadata
}
pub fn get_metadata_mut_rc(self: &mut Arc<Self>) -> &mut Metadata {
self.prepare_next_revision().get_metadata_mut()
}
pub async fn merge(
self: &mut Arc<Self>,
other: &Arc<Self>,
store: &impl BlockStore,
) -> Result<bool> {
let our_cid = self.store(store).await?;
let other_cid = other.store(store).await?;
if our_cid == other_cid {
return Ok(false); }
let our_content_cid = self.userland.resolve_cid(store).await?;
let other_content_cid = other.userland.resolve_cid(store).await?;
let file = self.prepare_next_merge(store).await?;
if other.previous.len() > 1 {
file.previous.extend(other.previous.iter().cloned());
} else {
file.previous.insert(other.store(store).await?);
}
match our_content_cid
.hash()
.digest()
.cmp(other_content_cid.hash().digest())
{
Ordering::Greater => {
file.userland.clone_from(&other.userland);
file.metadata.clone_from(&other.metadata);
}
Ordering::Equal => {
file.metadata.tie_break_with(&other.metadata)?;
}
Ordering::Less => {
}
}
Ok(true)
}
}
impl std::fmt::Debug for PublicFile {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("PublicFile")
.field(
"persisted_as",
&OnceCellDebug(self.persisted_as.get().map(|cid| format!("{cid}"))),
)
.field("metadata", &self.metadata)
.field("userland", &self.userland)
.field(
"previous",
&self
.previous
.iter()
.map(|cid| format!("{cid}"))
.collect::<Vec<_>>(),
)
.finish()
}
}
impl Storable for PublicFile {
type Serializable = PublicNodeSerializable;
async fn to_serializable(&self, store: &impl BlockStore) -> Result<Self::Serializable> {
Ok(PublicNodeSerializable::File(PublicFileSerializable {
version: WNFS_VERSION,
metadata: self.metadata.clone(),
userland: self.userland.resolve_cid(store).await?,
previous: self.previous.iter().cloned().collect(),
}))
}
async fn from_serializable(
cid: Option<&Cid>,
serializable: Self::Serializable,
) -> Result<Self> {
let PublicNodeSerializable::File(serializable) = serializable else {
bail!(FsError::UnexpectedNodeType(NodeType::PublicDirectory));
};
if !is_readable_wnfs_version(&serializable.version) {
bail!(FsError::UnexpectedVersion(serializable.version))
}
Ok(Self {
persisted_as: cid.cloned().map(OnceCell::new_with).unwrap_or_default(),
metadata: serializable.metadata,
userland: Link::from_cid(serializable.userland),
previous: serializable.previous.iter().cloned().collect(),
})
}
fn persisted_as(&self) -> Option<&OnceCell<Cid>> {
Some(&self.persisted_as)
}
}
impl Id for PublicFile {
fn get_id(&self) -> String {
format!("{:p}", &self.metadata)
}
}
impl PartialEq for PublicFile {
fn eq(&self, other: &Self) -> bool {
self.metadata == other.metadata
&& self.userland == other.userland
&& self.previous == other.previous
}
}
impl Clone for PublicFile {
fn clone(&self) -> Self {
Self {
persisted_as: self
.persisted_as
.get()
.cloned()
.map(OnceCell::new_with)
.unwrap_or_default(),
metadata: self.metadata.clone(),
userland: self.userland.clone(),
previous: self.previous.clone(),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use wnfs_common::MemoryBlockStore;
#[async_std::test]
async fn previous_links_get_set() {
let time = Utc::now();
let store = &MemoryBlockStore::default();
let file = &mut PublicFile::new_rc(time);
let previous_cid = &file.store(store).await.unwrap();
let next_file = file.prepare_next_revision();
assert_eq!(
next_file.previous.iter().collect::<Vec<_>>(),
vec![previous_cid]
);
}
#[async_std::test]
async fn prepare_next_revision_shortcuts_if_possible() {
let time = Utc::now();
let store = &MemoryBlockStore::default();
let file = &mut PublicFile::new_rc(time);
let previous_cid = &file.store(store).await.unwrap();
let next_file = file.prepare_next_revision();
let next_file_clone = &mut Arc::new(next_file.clone());
let yet_another_file = next_file_clone.prepare_next_revision();
assert_eq!(
yet_another_file.previous.iter().collect::<Vec<_>>(),
vec![previous_cid]
);
}
}
#[cfg(test)]
mod snapshot_tests {
use super::*;
use chrono::TimeZone;
use testresult::TestResult;
use wnfs_common::utils::SnapshotBlockStore;
#[async_std::test]
async fn test_simple_file() -> TestResult {
let store = &SnapshotBlockStore::default();
let time = Utc.with_ymd_and_hms(1970, 1, 1, 0, 0, 0).unwrap();
let file = &mut PublicFile::new_rc(time);
let cid = file.store(store).await?;
let file = store.get_block_snapshot(&cid).await?;
insta::assert_json_snapshot!(file);
Ok(())
}
#[async_std::test]
async fn test_file_with_previous_links() -> TestResult {
let store = &SnapshotBlockStore::default();
let time = Utc.with_ymd_and_hms(1970, 1, 1, 0, 0, 0).unwrap();
let file = &mut PublicFile::new_rc(time);
let _ = file.store(store).await?;
let file = file.prepare_next_revision();
file.set_content(b"Hello, World!".to_vec(), time, store)
.await?;
let cid = file.store(store).await?;
let file = store.get_block_snapshot(&cid).await?;
insta::assert_json_snapshot!(file);
Ok(())
}
}