use {
crate::{
prolly::{
node::Node,
roller::{Config as RollerConfig, Roller},
},
storage::StorageWrite,
value::{Key, Value},
Addr, Error,
},
std::mem,
};
pub struct CursorCreate<'s, S> {
leaf: Leaf<'s, S>,
}
impl<'s, S> CursorCreate<'s, S> {
pub fn new(storage: &'s S) -> Self {
Self::with_roller(storage, RollerConfig::default())
}
pub fn with_roller(storage: &'s S, roller_config: RollerConfig) -> Self {
Self {
leaf: Leaf::new(storage, roller_config),
}
}
}
impl<'s, S> CursorCreate<'s, S>
where
S: StorageWrite,
{
pub async fn with_kvs(mut self, mut kvs: Vec<(Key, Value)>) -> Result<Addr, Error> {
kvs.sort_unstable();
for kv in kvs.into_iter() {
self.leaf.push(kv).await?;
}
self.leaf.flush().await
}
}
struct Leaf<'s, S> {
storage: &'s S,
roller_config: RollerConfig,
roller: Roller,
buffer: Vec<(Key, Value)>,
parent: Option<Branch<'s, S>>,
}
impl<'s, S> Leaf<'s, S> {
pub fn new(storage: &'s S, roller_config: RollerConfig) -> Self {
Self {
storage,
roller_config,
roller: Roller::with_config(roller_config),
buffer: Vec::new(),
parent: None,
}
}
}
impl<'s, S> Leaf<'s, S>
where
S: StorageWrite,
{
pub async fn flush(&mut self) -> Result<Addr, Error> {
if self.buffer.is_empty() {
match self.parent.take() {
None => unreachable!("CursorCreate leaf missing parent and has empty buffer"),
Some(mut parent) => parent.flush(None).await,
}
} else {
let kvs = mem::replace(&mut self.buffer, Vec::new());
let (node_key, node_addr, node_bytes) = {
let node = Node::<_, _, Addr>::Leaf(kvs);
let (node_addr, node_bytes) = node.as_bytes()?;
(node.into_key_unchecked(), node_addr, node_bytes)
};
self.storage.write(node_addr.clone(), &*node_bytes).await?;
match self.parent.take() {
None => Ok(node_addr),
Some(mut parent) => parent.flush(Some((node_key, node_addr))).await,
}
}
}
pub async fn push(&mut self, kv: (Key, Value)) -> Result<(), Error> {
let boundary = self.roller.roll_bytes(&crate::value::serialize(&kv)?);
self.buffer.push(kv);
if boundary {
let is_first_kv = self.buffer.is_empty() && self.parent.is_none();
if is_first_kv {
log::warn!(
"writing key & value that exceeds block size, this is highly inefficient"
);
}
let (node_key, node_addr) = {
let kvs = mem::replace(&mut self.buffer, Vec::new());
let node = Node::<_, _, Addr>::Leaf(kvs);
let (node_addr, node_bytes) = node.as_bytes()?;
self.storage.write(node_addr.clone(), &*node_bytes).await?;
(node.into_key_unchecked(), node_addr)
};
let storage = &self.storage;
let roller_config = &self.roller_config;
self.parent
.get_or_insert_with(|| Branch::new(storage, *roller_config))
.push((node_key, node_addr.into()))
.await?;
}
Ok(())
}
}
struct Branch<'s, S> {
storage: &'s S,
roller_config: RollerConfig,
roller: Roller,
buffer: Vec<(Key, Addr)>,
parent: Option<Box<Branch<'s, S>>>,
}
impl<'s, S> Branch<'s, S> {
pub fn new(storage: &'s S, roller_config: RollerConfig) -> Self {
Self {
storage,
roller_config,
roller: Roller::with_config(roller_config.clone()),
buffer: Vec::new(),
parent: None,
}
}
}
impl<'s, S> Branch<'s, S>
where
S: StorageWrite,
{
#[async_recursion::async_recursion]
pub async fn flush(&mut self, kv: Option<(Key, Addr)>) -> Result<Addr, Error> {
if let Some(kv) = kv {
self.buffer.push(kv);
}
if self.buffer.is_empty() {
match self.parent.take() {
None => unreachable!("CursorCreate branch missing parent and has empty buffer"),
Some(mut parent) => parent.flush(None).await,
}
} else {
let kvs = mem::replace(&mut self.buffer, Vec::new());
let (node_key, node_addr, node_bytes) = {
let node = Node::<_, Value, _>::Branch(kvs);
let (node_addr, node_bytes) = node.as_bytes()?;
(node.into_key_unchecked(), node_addr, node_bytes)
};
self.storage.write(node_addr.clone(), &*node_bytes).await?;
match self.parent.take() {
None => Ok(node_addr),
Some(mut parent) => parent.flush(Some((node_key, node_addr))).await,
}
}
}
#[async_recursion::async_recursion]
pub async fn push(&mut self, kv: (Key, Addr)) -> Result<(), Error> {
let boundary = self.roller.roll_bytes(&crate::value::serialize(&kv)?);
self.buffer.push(kv);
if boundary {
let first_kv = self.buffer.is_empty() && self.parent.is_none();
if first_kv {
log::warn!(
"writing key & value that exceeds block size, this is highly inefficient"
);
}
let (node_key, node_addr) = {
let kvs = mem::replace(&mut self.buffer, Vec::new());
let node = Node::<_, Value, _>::Branch(kvs);
let (node_addr, node_bytes) = node.as_bytes()?;
self.storage.write(node_addr.clone(), &*node_bytes).await?;
(node.into_key_unchecked(), node_addr)
};
let storage = &self.storage;
let roller_config = &self.roller_config;
self.parent
.get_or_insert_with(|| Box::new(Branch::new(storage, roller_config.clone())))
.push((node_key, node_addr.into()))
.await?;
}
Ok(())
}
}
#[cfg(test)]
pub mod test {
use {super::*, crate::storage::Memory};
const TEST_PATTERN: u32 = (1 << 8) - 1;
#[tokio::test]
async fn poc() {
let mut env_builder = env_logger::builder();
env_builder.is_test(true);
if std::env::var("RUST_LOG").is_err() {
env_builder.filter(Some("fixity"), log::LevelFilter::Debug);
}
let _ = env_builder.try_init();
let contents = vec![(0..20), (0..200), (0..2_000)];
for content in contents {
let content = content
.map(|i| (i, i * 10))
.map(|(k, v)| (Key::from(k), Value::from(v)))
.collect::<Vec<_>>();
let storage = Memory::new();
let tree =
CursorCreate::with_roller(&storage, RollerConfig::with_pattern(TEST_PATTERN));
let addr = tree.with_kvs(content).await.unwrap();
dbg!(addr);
}
}
}