use crate::{
backends::{Backend, BackendError},
chunks::RawChunkPointer,
crypto::{CryptoError, Key, SealedHeader},
deserialize_from_slice,
index::{FieldReader, IndexExt, TransactionList},
object::{
AEADReader, AEADWriter, BlockBuffer, BufferedSink, DeserializeStream, ObjectId, Pool,
PoolRef, Stream, Writer,
},
tree::RootIndex,
};
use serde::{de::DeserializeOwned, Serialize};
use std::{
io,
mem::{size_of, take},
num::NonZeroUsize,
sync::Arc,
};
#[derive(thiserror::Error, Debug)]
pub(crate) enum Error {
#[error("IO error")]
Io {
#[from]
source: io::Error,
},
#[error("Crypto error")]
Crypto {
#[from]
source: CryptoError,
},
#[error("Backend error: {source}")]
Backend {
#[from]
source: BackendError,
},
#[error("Decoding error: {source}")]
Decode {
#[from]
source: rmp_serde::decode::Error,
},
#[error("Encoding error: {source}")]
Encode {
#[from]
source: rmp_serde::encode::Error,
},
#[error("Internal error: {source}")]
Internal {
#[from]
source: crate::object::ObjectError,
},
#[error("anyhow")]
Anyhow {
#[from]
source: anyhow::Error,
},
}
pub(crate) type Result<T> = std::result::Result<T, Error>;
pub(crate) fn open<CustomData>(
mut buffer: BlockBuffer,
backend: Arc<dyn Backend>,
crypto: Key,
) -> Result<RootIndex<CustomData>>
where
CustomData: Serialize + DeserializeOwned + Send + Sync + 'static,
{
let root = crypto.root_object_id()?;
let object = backend.read_fresh(&root)?;
let header = {
let mut sealed_header = [0u8; size_of::<SealedHeader>()];
sealed_header.copy_from_slice(object.head(size_of::<SealedHeader>()));
crypto.open_root(sealed_header.into())?
};
let pool = {
let backend = backend.clone();
let key = header.key.index_key()?;
Pool::with_constructor(1, move || {
AEADReader::for_root(backend.clone(), key.clone())
})
};
let (shadow_root, objects, transaction_list) = {
let (shadow_root, stream_ptr) = parse_transactions_stream(
&header.root_ptr,
root,
object.as_inner(),
&mut buffer,
pool.lease()?,
)?;
let stream_objects = stream_ptr.objects();
let mut stream = DeserializeStream::new(stream_ptr.open_with_buffer(pool.lease()?, buffer));
(
shadow_root,
stream_objects,
stream.read_next::<TransactionList>()?,
)
};
let mut root = RootIndex::<CustomData>::new(shadow_root, objects, header.key);
root.load_all_from(&transaction_list, &pool)?;
let objects = root.objects();
backend.preload(&objects)?;
backend.keep_warm(&objects)?;
Ok(root)
}
pub(crate) fn commit<CustomData>(
index: &RootIndex<CustomData>,
backend: Arc<dyn Backend>,
) -> Result<()>
where
CustomData: Serialize + DeserializeOwned + Send + Sync + 'static,
{
let crypto = index.key.clone();
let root = crypto.root_object_id()?;
let index_key = crypto.index_key()?;
let mut writer = Pool::new(
NonZeroUsize::new(1).unwrap(),
AEADWriter::for_root(
backend.clone(),
index_key.clone(),
size_of::<SealedHeader>() as u64,
take(&mut index.objects.write()),
),
)?;
let stream = {
let mut sink = BufferedSink::new(writer.clone());
let (commit_id, fields) = index.commit(&mut sink, &mut writer, vec![], index_key)?;
let transactions = fields
.into_iter()
.map(|(field, stream)| (commit_id, field, stream))
.collect::<TransactionList>();
crate::serialize_to_writer(&mut sink, &transactions)?;
sink.clear()?
};
let objects_written = stream.objects();
backend.keep_warm(&objects_written)?;
*index.objects.write() = objects_written;
let stream_buf = crate::serialize_to_vec(&stream)?;
let root_ptr = writer.write(&stream_buf)?.into_raw();
*index.shadow_root.write() = root_ptr.object;
Ok(writer
.lease()?
.flush_root_head(root, &crypto.seal_root(&root_ptr)?)?)
}
fn parse_transactions_stream(
root_ptr: &RawChunkPointer,
root_object: ObjectId,
raw: &[u8],
buffer: &mut [u8],
mut reader: PoolRef<AEADReader>,
) -> Result<(ObjectId, Stream)> {
let shadow_root = root_ptr.object;
reader.override_root_id(shadow_root, root_object);
let stream: Stream = deserialize_from_slice(reader.decrypt_decompress(
buffer,
raw,
&root_ptr.clone().into(),
)?)?;
Ok((shadow_root, stream))
}