use super::{store, Action, RawAction};
use crate::{
fields::{
depth::Incremental, Collection, Intent, List, Load, LocalField, SparseField, Store,
Strategy, Value,
},
index::{reader, writer, FieldReader, FieldWriter},
object::{self, ObjectError, StreamChunks},
ChunkPointer, Digest,
};
use itertools::Itertools;
use parking_lot::RwLock;
use scc::ebr::{Arc, AtomicArc, Barrier, Tag};
use serde::{de::DeserializeOwned, Serialize};
use std::{
collections::BTreeMap,
io,
ops::{Bound, RangeBounds},
sync::atomic::Ordering,
};
enum RangeAction {
Skip,
Take,
End,
}
struct StreamInner<Index: 'static + Ord + Sync + Send + Clone> {
current: RwLock<BTreeMap<Index, Arc<StreamChunks>>>,
base: RwLock<BTreeMap<Index, Arc<StreamChunks>>>,
max: AtomicArc<Index>,
min: AtomicArc<Index>,
}
impl<Index> Default for StreamInner<Index>
where
Index: 'static + Ord + Send + Sync + Clone,
{
fn default() -> Self {
Self {
current: RwLock::default(),
base: RwLock::default(),
max: AtomicArc::default(),
min: AtomicArc::default(),
}
}
}
#[derive(Clone)]
pub struct Stream<Index>
where
Index: 'static + Ord + Send + Sync + Clone,
{
inner: Arc<StreamInner<Index>>,
}
impl<Index> Default for Stream<Index>
where
Index: 'static + Ord + Send + Sync + Clone,
{
fn default() -> Self {
Self {
inner: Arc::new(StreamInner::default()),
}
}
}
impl<Index: Send + Sync + Ord + Clone> Stream<Index> {
pub fn commit(&self) {
let barrier = Barrier::new();
for (k, v) in self.inner.current.read().iter() {
self.inner.base.write().insert(k.clone(), v.clone());
}
self.inner.current.write().clear();
}
pub fn writer<W: object::Writer>(&self, index: W) -> StreamWriter<'_, Index, W> {
StreamWriter {
stream: &self.inner,
barrier: Barrier::new(),
writer,
}
}
pub fn reader<R: object::Reader>(&self, reader: R) -> StreamReader<'_, Index, R, Box<[u8]>> {
self.reader_with_buffer(reader, vec![0; crate::BLOCK_SIZE].into_boxed_slice())
}
pub fn reader_with_buffer<R: object::Reader, B: AsRef<[u8]> + AsMut<[u8]> + 'static>(
&self,
reader: R,
buffer: B,
) -> StreamReader<'_, Index, R, B> {
StreamReader {
stream: &self.inner,
barrier: Barrier::new(),
reader,
buffer,
}
}
pub fn clear(&self) {
self.inner.current.write().clear();
self.inner.base.write().clear();
}
}
impl<Idx> crate::Index for Stream<Idx>
where
Idx: 'static + Ord + Send + Sync + Clone + DeserializeOwned + Serialize,
{
fn store_all(&mut self) -> anyhow::Result<Vec<Intent<Box<dyn Store>>>> {
Ok(vec![Intent::new(
"root",
Box::new(LocalField::for_field(self)),
)])
}
fn load_all(&mut self) -> anyhow::Result<Vec<Intent<Box<dyn Load>>>> {
Ok(vec![Intent::new(
"root",
Box::new(LocalField::for_field(self)),
)])
}
}
impl<Index> Store for LocalField<Stream<Index>>
where
Index: 'static + Ord + Send + Sync + Clone + Serialize,
{
#[inline(always)]
fn store(
&mut self,
transaction: &mut writer::Transaction<'_>,
_object: &mut dyn object::Writer,
) {
let barrier = Barrier::new();
for (k, v) in self.field.inner.current.read().iter() {
transaction.write_next((k, v.read().to_vec()));
}
self.field.commit();
}
}
impl<Index> Collection for LocalField<Stream<Index>>
where
Index: 'static + Ord + Send + Sync + Clone + DeserializeOwned,
{
type Depth = Incremental;
type Key = Index;
type Serialized = (Index, Vec<ChunkPointer>);
type Item = (Index, Vec<ChunkPointer>);
#[inline(always)]
fn key(from: &Self::Serialized) -> &Self::Key {
&from.0
}
#[inline(always)]
fn load(from: Self::Serialized, _object: &mut dyn crate::object::Reader) -> Self::Item {
from
}
#[inline(always)]
fn insert(&mut self, record: Self::Item) {
self.field
.inner
.base
.write()
.insert(record.0, Arc::new(RwLock::new(record.1)));
}
}
fn lz4_max_input_size_for(output_len: usize) -> usize {
((output_len - 16 - 4) as f64 / 1.1) as usize
}
#[cfg(test)]
mod test {
#[test]
fn faszom() {
use crate::{backends::Directory, fields::Stream, Infinitree, Key};
use std::io::Write;
let tree = Infinitree::<Stream<usize>>::empty(
Directory::new("test_data").unwrap(),
Key::from_credentials("username", "password").unwrap(),
);
let index = tree.index();
let mut writer = index.writer(tree.object_writer().unwrap());
let buffer = vec![123; 8 * 1024 * 1024];
assert_eq!(writer.write(&buffer).unwrap(), 8 * 1024 * 1024);
drop(writer);
let mut reader = index.reader(tree.object_reader().unwrap());
}
}