use core::sync::atomic;
use std::{
collections::VecDeque,
io,
sync::{Arc, Mutex},
};
use crate::{block, Blocks};
pub unsafe trait BlocksAllocator {
type Blocks: Blocks;
fn alloc(&self) -> io::Result<Self::Blocks>;
fn release(&self, blocks: Self::Blocks) -> Result<(), (Self::Blocks, io::Error)>;
fn retrieve(&self, f: impl FnMut(Self::Blocks)) -> io::Result<()>;
}
#[doc(alias = "endlessstream")]
#[derive(Debug)]
pub struct Stream<A: BlocksAllocator> {
offset: atomic::AtomicUsize,
streams: vlock::VLock<BlockStreams<A::Blocks>, 2>,
releasables: Mutex<Vec<Option<Releasable<A::Blocks>>>>,
allocator: A,
span_behavior: SpanBehavior,
}
impl<A: BlocksAllocator> Stream<A> {
#[inline(always)]
#[must_use]
pub fn new(allocator: A) -> Self {
Self::with_span_behavior(allocator, SpanBehavior::Never)
}
#[inline(always)]
#[must_use]
pub fn with_span_behavior(allocator: A, span_behavior: SpanBehavior) -> Self {
Self {
offset: 0.into(),
streams: BlockStreams::new().into(),
releasables: Vec::with_capacity(128).into(),
allocator,
span_behavior,
}
}
pub fn load(&mut self) -> io::Result<Stats> {
self.streams = BlockStreams::new().into();
let mut streams = VecDeque::new();
self.allocator.retrieve(|blocks| {
streams.push_back(block::Stream::new(blocks));
})?;
for stream in &mut streams {
stream.load()?;
stream.initialize().map_err(StreamError::BlockStreamError)?;
}
self.streams = BlockStreams::try_from(streams)
.map_err(|_| StreamError::BrokenSpan)?
.into();
*self.offset.get_mut() = 0;
Ok(self.streams.get_mut().stats(0))
}
#[must_use = "are you sure the offset has been set?"]
pub fn set_offset(&mut self, compare: impl Fn(&[u8], &[u8]) -> SearchControl) -> bool {
let previous = self.offset.get_mut();
if let Ok(offset) = self.streams.get_mut().offset_at(compare) {
if *previous <= offset {
*previous = offset;
return true;
}
}
false
}
#[inline(always)]
fn lock_buffers(
streams: &BlockStreams<A::Blocks>,
) -> Option<block::LockedStream<'_, A::Blocks>> {
streams
.pick_ending()
.zip(streams.pick_empty())
.map(|(available, empty)| if available == empty { empty + 1 } else { empty })
.and_then(|index| streams.get(index))
.map(block::Stream::lock)
}
#[inline(always)]
#[must_use = "do you want to drop instead?"]
pub fn into_inner(self) -> A {
self.allocator
}
#[inline(always)]
#[must_use]
pub fn block_streams(&self) -> Vec<ProxyBlockStream<A::Blocks>> {
self.streams
.read()
.streams
.iter()
.map(|stream| ProxyBlockStream(Arc::clone(stream)))
.collect()
}
#[inline(always)]
#[must_use]
pub fn stats(&self) -> Stats {
let offset = self.offset.load(atomic::Ordering::Relaxed);
let streams = self.streams.read();
streams.stats(offset)
}
#[inline(always)]
pub fn iter(&self) -> Iter<'_, A::Blocks> {
Iter::new(&self.offset, &self.streams)
}
pub fn advance(&self, ctx: AdvanceContext) -> bool {
let streams = self.streams.read();
assert!(
ctx.0 <= streams.len().1 + streams.removed().1,
"advance past the end of the stream"
);
self.offset.fetch_max(ctx.0, atomic::Ordering::Relaxed) < ctx.0
}
pub fn append(&self, bytes: &[u8]) -> Result<AppendContext<'_, A::Blocks>, StreamError> {
block::Stream::<A::Blocks>::verify_append(bytes).map_err(StreamError::BlockStreamError)?;
let streams = self.streams.read();
let index = streams.pick_ending().ok_or(StreamError::Unavailable)?;
if bytes.is_empty() {
return Ok(AppendContext::new(&self.streams));
}
if streams
.get(index.saturating_sub(1))
.is_some_and(block::Stream::is_dirty)
{
return Err(StreamError::Dirty);
}
macro_rules! acquire_stream {
($index:expr) => {{
let stream = streams.get($index).ok_or(StreamError::Unavailable)?;
stream.try_lock().map_err(StreamError::BlockStreamError)?
}};
}
let stream = acquire_stream!(index);
if stream.is_dirty() {
return Err(StreamError::Dirty);
}
let remaining = stream.capacity() - stream.len();
if bytes.len() <= remaining {
let written = block::LockedStream::append(&stream, bytes)
.expect("input is checked prior to writing");
assert_eq!(written, bytes.len());
return Ok(AppendContext::new(&self.streams).with_left(index));
}
if stream.is_empty() {
return Err(StreamError::AppendTooLarge(stream.capacity()));
}
let empty = acquire_stream!(index + 1);
assert!(empty.is_empty());
if bytes.len() > empty.capacity() {
return Err(StreamError::AppendTooLarge(empty.capacity()));
}
match self.span_behavior {
SpanBehavior::Never => {
let written = block::LockedStream::append(&empty, bytes)
.expect("input is checked prior to writing");
assert_eq!(written, bytes.len());
Ok(AppendContext::new(&self.streams).with_right(index + 1))
}
SpanBehavior::Sized(limit) if bytes.len() >= limit => {
let written = block::LockedStream::append(&empty, bytes)
.expect("input is checked prior to writing");
assert_eq!(written, bytes.len());
Ok(AppendContext::new(&self.streams).with_right(index + 1))
}
SpanBehavior::Sized(_) => {
let written = block::LockedStream::append(&stream, bytes)
.expect("input is checked prior to writing");
assert_eq!(written, remaining);
let trailing = &bytes[..remaining];
let spilled = &bytes[remaining..];
let written =
block::LockedStream::append_with_opts(&empty, spilled, true)
.expect("input is checked prior to writing");
assert_eq!(written, spilled.len());
assert!(
unsafe { streams.set_buffer(index, trailing, spilled) },
"new buffer write"
);
Ok(AppendContext::new(&self.streams)
.with_left(index)
.with_right(index + 1))
}
}
}
pub fn grow(&self) -> io::Result<()> {
let mut result = Ok(());
let stream = core::cell::RefCell::new(None);
self.streams.compare_update_default(
|_| {
let blocks = match self.allocator.alloc() {
Ok(blocks) => blocks,
Err(err) => {
result = Err(err);
return false;
}
};
let mut stream = stream.borrow_mut();
let stream = stream.get_or_insert(block::Stream::new(blocks));
stream
.initialize()
.expect("empty stream should always initialize");
assert!(stream.is_empty());
true
},
|current, streams| {
let _locked = Self::lock_buffers(current);
streams.clone_from(current);
streams
.append(Arc::new(stream.borrow_mut().take().unwrap()))
.expect("appending an empty stream should always succeed");
},
);
result
}
pub fn maybe_shrink(&self) -> (usize, usize) {
let offset = self.offset.load(atomic::Ordering::Relaxed);
let streams = self.streams.read();
let (index, _) = streams
.stream_at(offset)
.expect("offset should always point to a valid stream");
if index == streams.removed().0 {
return (0, 0);
}
let mut result = (0, 0);
let mut release = Vec::with_capacity(index - streams.removed().0);
let updated = self.streams.compare_update_default(
|current| index != current.removed().0,
|current, streams| {
let _locked = Self::lock_buffers(current);
streams.clone_from(current);
let removed = streams.removed();
for _ in removed.0..index {
release.push(streams.remove());
}
result = (
streams.removed().0 - removed.0,
streams.removed().1 - removed.1,
);
},
);
if !updated {
return (0, 0);
}
let mut releasables = self.releasables.lock().expect("should not poison");
let capacity = releasables.capacity();
for stream in &mut release {
let stream = stream
.take()
.expect("index range should always include valid streams");
assert!(
!releasables.iter().any(|item| match item.as_ref() {
Some(Releasable::Stream(item)) => Arc::as_ptr(item) == Arc::as_ptr(&stream),
_ => false,
}),
"removed block streams are supposed to be unique"
);
releasables.push(Some(Releasable::Stream(stream)));
}
debug_assert_eq!(
releasables.capacity(),
capacity,
"too many pending releasables"
);
result
}
pub fn try_release(&self) -> (usize, usize, Option<io::Error>) {
let mut releasables = self.releasables.lock().expect("should not poison");
for releasable in releasables.iter_mut() {
if let Some(Releasable::Stream(ref stream)) = releasable {
if Arc::strong_count(stream) != 1 {
continue;
}
if let Some(Releasable::Stream(stream)) = releasable.take() {
let blocks = Arc::into_inner(stream)
.expect("releasable streams are not accessed concurrently")
.into_inner();
releasable.replace(Releasable::Blocks(blocks));
}
}
}
let mut error = None;
for releasable in releasables.iter_mut() {
match releasable.take() {
Some(stream @ Releasable::Stream(_)) => {
releasable.replace(stream);
}
Some(Releasable::Blocks(blocks)) => {
if let Err((blocks, err)) = self.allocator.release(blocks) {
releasable.replace(Releasable::Blocks(blocks));
error.replace(err);
}
}
None => unreachable!(),
}
if error.is_some() {
break;
}
}
let length = releasables.len();
releasables.retain(Option::is_some);
(length - releasables.len(), releasables.len(), error)
}
pub fn force_release(&self) -> (usize, usize, Option<io::Error>) {
self.streams.update_default(|current, streams| {
let _locked = Self::lock_buffers(current);
streams.clone_from(current);
});
self.try_release()
}
}
impl<'stream, A: BlocksAllocator> IntoIterator for &'stream Stream<A> {
type Item = Chunk<'stream, A::Blocks>;
type IntoIter = Iter<'stream, A::Blocks>;
fn into_iter(self) -> Self::IntoIter {
self.iter()
}
}
#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
pub enum StreamError {
AppendTooLarge(usize),
BlockStreamError(block::StreamError),
BrokenSpan,
Dirty,
Unavailable,
}
impl From<StreamError> for io::Error {
#[inline(always)]
fn from(value: StreamError) -> Self {
io::Error::new(io::ErrorKind::Other, value)
}
}
impl std::error::Error for StreamError {}
impl core::fmt::Display for StreamError {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
match self {
Self::BrokenSpan => {
write!(f, "endlessstream: spanned data is broken")
}
Self::Unavailable => {
write!(f, "endlessstream: no block streams available")
}
Self::Dirty => {
write!(f, "endlessstream: previous sync has not completed")
}
Self::AppendTooLarge(limit) => {
write!(
f,
"endlessstream: append exceeds block stream capacity of {limit} bytes"
)
}
Self::BlockStreamError(err) => {
write!(f, "endlessstream: {err}")
}
}
}
}
#[derive(Debug)]
pub struct ProxyBlockStream<B>(Arc<block::Stream<B>>);
impl<B: Blocks> ProxyBlockStream<B> {
#[inline(always)]
pub fn verify(&self, error: impl FnMut(block::Inconsistency)) -> bool {
self.0.verify(error)
}
}
#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
#[non_exhaustive]
pub enum SpanBehavior {
Never,
Sized(usize),
}
#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
pub enum SearchControl {
Match(usize),
SearchLeft,
SearchRight,
}
#[derive(Debug)]
pub struct AppendContext<'a, B: Blocks> {
streams: &'a vlock::VLock<BlockStreams<B>, 2>,
left: Option<usize>,
right: Option<usize>,
}
impl<'a, B: Blocks> AppendContext<'a, B> {
#[inline(always)]
#[must_use]
fn new(streams: &'a vlock::VLock<BlockStreams<B>, 2>) -> Self {
Self {
streams,
left: None,
right: None,
}
}
#[inline(always)]
#[must_use]
fn with_left(mut self, index: usize) -> Self {
self.left = Some(index);
self
}
#[inline(always)]
#[must_use]
fn with_right(mut self, index: usize) -> Self {
self.right = Some(index);
self
}
#[inline(always)]
#[must_use]
pub fn is_synced(&self) -> bool {
self.left.is_none() && self.right.is_none()
}
#[inline]
pub fn sync(&mut self) -> io::Result<()> {
if self.is_synced() {
return Ok(());
}
let streams = self.streams.read();
self.right.map_or(Ok(()), |index| {
streams.get(index).expect("stream index is stable").sync()
})?;
self.right.take();
self.left.map_or(Ok(()), |index| {
streams.get(index).expect("stream index is stable").sync()
})?;
self.left.take();
Ok(())
}
}
impl<B: Blocks> Drop for AppendContext<'_, B> {
#[inline(always)]
fn drop(&mut self) {
let _ = self.sync();
}
}
#[derive(Debug)] #[must_use = "iterators are lazy and do nothing unless consumed"]
pub struct Iter<'a, B> {
offset: &'a atomic::AtomicUsize,
streams: &'a vlock::VLock<BlockStreams<B>, 2>,
state: usize,
next: (usize, usize),
read: usize,
next_from_buffer: bool,
}
impl<'a, B> Iter<'a, B> {
fn new(offset: &'a atomic::AtomicUsize, streams: &'a vlock::VLock<BlockStreams<B>, 2>) -> Self {
let mut iter = Self {
offset,
streams,
state: 0,
next: (0, 0),
read: 0,
next_from_buffer: false,
};
iter.refresh(&streams.read());
iter
}
#[inline(always)]
pub fn advance_context(&self) -> AdvanceContext {
AdvanceContext(self.state + self.read)
}
#[inline(always)]
pub fn rewind(&mut self, dec: usize) -> bool {
let streams = self.streams.read();
if !self.refresh(&streams) {
self.rewind_with_streams(dec, &streams);
return true;
}
false
}
#[inline(always)]
fn rewind_with_streams(&mut self, dec: usize, streams: &BlockStreams<B>) {
self.next_from_buffer = false;
self.read = self.read.saturating_sub(dec);
self.next = streams
.stream_at(self.state + self.read)
.expect("state and length should always be less than or equal to stream length");
}
#[inline(always)]
fn refresh(&mut self, streams: &BlockStreams<B>) -> bool {
let offset = self.offset.load(atomic::Ordering::Relaxed);
if offset != self.state {
let diff = offset
.checked_sub(self.state)
.expect("offset should always increment");
let skipped = diff > self.read;
self.state = offset;
self.rewind_with_streams(diff, streams);
return skipped;
}
false
}
}
impl<'a, B> Iterator for Iter<'a, B> {
type Item = Chunk<'a, B>;
fn next(&mut self) -> Option<Self::Item> {
let streams = self.streams.read();
if self.refresh(&streams) {
return Some(Chunk::Skipped);
}
macro_rules! read_from_buffer {
($index:expr, $offset:expr) => {{
self.next_from_buffer = false;
let pos = $index - streams.removed().0;
{
let buffers = streams.buffers.read();
let buffer = buffers[pos].as_ref().unwrap();
assert_ne!(buffer.len(), 0);
self.read += buffer.len() - $offset;
}
Some(Chunk::Buffer(ChunkRef {
index_or_pos: pos,
offset: $offset,
streams,
}))
}};
}
if self.next_from_buffer {
assert_eq!(self.next.1, 0);
return read_from_buffer!(self.next.0 - 1, self.next.1);
}
let (mut current, mut offset) = self.next;
while let Some(stream) = streams.get(current) {
let mut skip = 0;
while let Some(next) = streams.get(current + skip + 1) {
if !next.is_empty() {
break;
}
skip += 1;
}
if !stream.is_dirty()
&& streams
.get(current + skip + 1)
.is_some_and(|next| !next.is_empty())
{
self.next.0 += skip + 1;
self.next.1 = 0;
}
let trailing = stream.trailing();
atomic::compiler_fence(atomic::Ordering::SeqCst);
let data = stream.data();
if offset < data.len() {
let read = data.len() - offset;
self.read += read;
if self.next.0 == current {
self.next.1 += read;
} else if !trailing.is_empty() {
self.next_from_buffer = true;
}
return Some(Chunk::Data(ChunkRef {
index_or_pos: current,
offset,
streams,
}));
}
if !trailing.is_empty() {
return read_from_buffer!(current, offset - data.len());
}
if self.next == (current, offset) {
return None;
}
(current, offset) = self.next;
}
None
}
}
impl<B> Clone for Iter<'_, B> {
fn clone(&self) -> Self {
Self {
offset: self.offset,
streams: self.streams,
state: self.state,
next: self.next,
read: self.read,
next_from_buffer: self.next_from_buffer,
}
}
}
#[doc(hidden)]
#[derive(Debug, Eq, Hash, PartialEq)]
pub struct ChunkRef<'a, B> {
index_or_pos: usize,
offset: usize,
streams: vlock::ReadRef<'a, BlockStreams<B>, 2>,
}
#[derive(Debug, Eq, Hash, PartialEq)]
pub enum Chunk<'a, B> {
Skipped,
Data(ChunkRef<'a, B>),
Buffer(ChunkRef<'a, B>),
}
impl<B> Chunk<'_, B> {
#[inline(always)]
#[must_use]
pub fn is_skipped(&self) -> bool {
matches!(self, Self::Skipped)
}
}
impl<B: Blocks> Chunk<'_, B> {
#[inline(always)]
#[must_use]
pub fn bytes(&self) -> Option<BytesRef<'_>> {
match self {
Self::Skipped => None,
Self::Data(ref cref) => Some(BytesRef(SliceOrBuffer::Slice(
&cref
.streams
.get(cref.index_or_pos)
.expect("stream index is stable")
.data()[cref.offset..],
))),
Self::Buffer(ref cref) => Some(BytesRef(SliceOrBuffer::Buffer(
cref.index_or_pos,
cref.offset,
cref.streams.buffers.read(),
))),
}
}
}
#[derive(Debug)]
enum SliceOrBuffer<'a> {
Slice(&'a [u8]),
Buffer(
usize,
usize,
vlock::ReadRef<'a, VecDeque<Option<Arc<Vec<u8>>>>, 2>,
),
}
#[derive(Debug)] pub struct BytesRef<'a>(SliceOrBuffer<'a>);
impl AsRef<[u8]> for BytesRef<'_> {
#[inline(always)]
fn as_ref(&self) -> &[u8] {
self
}
}
impl core::borrow::Borrow<[u8]> for BytesRef<'_> {
#[inline(always)]
fn borrow(&self) -> &[u8] {
self
}
}
impl Eq for BytesRef<'_> {}
impl core::hash::Hash for BytesRef<'_> {
#[inline(always)]
fn hash<H: core::hash::Hasher>(&self, state: &mut H) {
self.as_ref().hash(state);
}
}
impl Ord for BytesRef<'_> {
#[inline(always)]
fn cmp(&self, other: &Self) -> core::cmp::Ordering {
self.as_ref().cmp(other.as_ref())
}
}
impl PartialEq<BytesRef<'_>> for BytesRef<'_> {
#[inline(always)]
fn eq(&self, other: &BytesRef<'_>) -> bool {
self.as_ref() == other.as_ref()
}
}
impl PartialOrd<BytesRef<'_>> for BytesRef<'_> {
#[inline(always)]
fn partial_cmp(&self, other: &BytesRef<'_>) -> Option<core::cmp::Ordering> {
self.as_ref().partial_cmp(other.as_ref())
}
}
impl core::ops::Deref for BytesRef<'_> {
type Target = [u8];
#[inline(always)]
fn deref(&self) -> &[u8] {
match self.0 {
SliceOrBuffer::Slice(bytes) => bytes,
SliceOrBuffer::Buffer(pos, offset, ref buffers) => {
&buffers[pos].as_deref().unwrap()[offset..]
}
}
}
}
#[derive(Clone, Copy, Debug)]
#[must_use = "the context does not do anything by itself unless used"]
pub struct AdvanceContext(usize);
#[derive(Clone, Debug, Eq, Hash, PartialEq)]
pub struct Stats(Vec<BlockStreamStats>);
impl Stats {
#[inline(always)]
pub fn iter(&self) -> core::slice::Iter<'_, BlockStreamStats> {
self.0.iter()
}
#[inline(always)]
#[must_use]
pub fn blocks_size(&self) -> u64 {
self.iter().map(BlockStreamStats::blocks_size).sum()
}
#[inline(always)]
#[must_use]
pub fn data_capacity(&self) -> usize {
self.iter().map(|stats| stats.data_capacity).sum()
}
#[inline(always)]
#[must_use]
pub fn data_used(&self) -> usize {
self.iter().map(|stats| stats.data_used).sum()
}
#[inline(always)]
#[must_use]
pub fn data_wasted(&self) -> usize {
self.iter().map(|stats| stats.data_wasted).sum()
}
#[inline(always)]
#[must_use]
pub fn data_available(&self) -> usize {
self.iter().map(|stats| stats.data_available).sum()
}
}
impl IntoIterator for Stats {
type Item = BlockStreamStats;
type IntoIter = std::vec::IntoIter<BlockStreamStats>;
fn into_iter(self) -> Self::IntoIter {
self.0.into_iter()
}
}
impl<'stats> IntoIterator for &'stats Stats {
type Item = &'stats BlockStreamStats;
type IntoIter = core::slice::Iter<'stats, BlockStreamStats>;
fn into_iter(self) -> Self::IntoIter {
self.iter()
}
}
#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
#[non_exhaustive]
pub struct BlockStreamStats {
pub block_count: u64,
pub block_shift: u32,
pub data_capacity: usize,
pub data_used: usize,
pub data_wasted: usize,
pub data_available: usize,
}
impl BlockStreamStats {
#[inline(always)]
#[must_use]
pub fn blocks_size(&self) -> u64 {
self.block_count << self.block_shift
}
}
#[derive(Debug)]
enum Releasable<B> {
Blocks(B),
Stream(Arc<block::Stream<B>>),
}
#[derive(Debug)] struct BlockStreams<B> {
streams: VecDeque<Arc<block::Stream<B>>>,
buffers: vlock::VLock<VecDeque<Option<Arc<Vec<u8>>>>, 2>,
removed: (usize, usize),
}
impl<B> BlockStreams<B> {
#[inline(always)]
#[must_use]
fn new() -> Self {
Self::default()
}
#[inline(always)]
#[must_use]
fn removed(&self) -> (usize, usize) {
self.removed
}
#[inline(always)]
#[must_use]
fn len(&self) -> (usize, usize) {
let len: usize = self.streams.iter().map(|stream| stream.len()).sum();
(
self.streams.len(),
len - self
.streams
.front()
.map_or(0, |stream| stream.spilled().len()),
)
}
#[inline(always)]
#[must_use]
fn get(&self, index: usize) -> Option<&block::Stream<B>> {
if index >= self.removed.0 {
self.streams.get(index - self.removed.0).map(AsRef::as_ref)
} else {
None
}
}
#[inline(always)]
fn append(&mut self, stream: Arc<block::Stream<B>>) -> Result<(), Arc<block::Stream<B>>> {
assert!(
stream.trailing().len() != stream.capacity(),
"append uninitialized, or with trail too large"
);
let buffers = self.buffers.get_mut();
if let Some(last) = self.streams.back() {
let buffer = match make_span_buffer(last.trailing(), stream.spilled()) {
Ok(buffer) => buffer,
Err(()) => return Err(stream),
}
.and_then(|buffer| buffers.back_mut().unwrap().replace(Arc::new(buffer)));
assert_eq!(buffer, None);
}
self.streams.push_back(stream);
buffers.push_back(None);
Ok(())
}
#[inline(always)]
fn remove(&mut self) -> Option<Arc<block::Stream<B>>> {
self.buffers.get_mut().pop_front().map(|buffer| {
let stream = self.streams.pop_front().unwrap();
self.removed.0 += 1;
self.removed.1 += stream.data().len();
self.removed.1 += buffer.map_or(0, |buffer| buffer.len());
stream
})
}
#[inline(always)]
#[must_use]
fn pick_empty(&self) -> Option<usize> {
let mut iter = self.streams.iter().enumerate().rev().peekable();
while let stream @ Some(_) = iter.next() {
if !stream.unwrap().1.is_empty() {
break;
}
if iter.peek().filter(|next| next.1.is_empty()).is_some() {
continue;
}
return stream
.filter(|stream| stream.1.is_empty())
.map(|stream| stream.0 + self.removed.0);
}
None
}
#[inline(always)]
#[must_use]
fn pick_ending(&self) -> Option<usize> {
let mut iter = self.streams.iter().enumerate().rev().peekable();
while let Some(stream) = iter.next() {
if stream.1.is_full() {
break;
}
if !stream.1.is_empty() {
return Some(stream.0 + self.removed.0);
}
if let Some(next) = iter.peek() {
if next.1.is_full() {
return Some(stream.0 + self.removed.0);
}
} else {
return Some(stream.0 + self.removed.0);
}
}
None
}
#[inline(always)]
#[must_use]
fn stream_at(&self, offset: usize) -> Option<(usize, usize)> {
if offset < self.removed.1 {
return None;
}
let mut cumulative = self.removed.1;
let mut last = (0, 0);
for (pos, stream) in self.streams.iter().enumerate() {
if stream.is_empty() {
continue;
}
let length = cumulative;
if !stream.trailing().is_empty() {
let buffers = self.buffers.read();
if let Some(buffer) = buffers[pos].as_deref() {
assert_ne!(buffer.len(), 0);
cumulative += buffer.len();
}
}
cumulative += stream.data().len();
if offset < cumulative {
return Some((pos + self.removed.0, offset - length));
}
last = (pos + self.removed.0, cumulative - length);
}
if offset == cumulative {
if self.get(last.0).unwrap().capacity() != last.1 {
return Some(last);
}
return Some((last.0 + 1, 0));
}
None
}
#[inline(always)]
#[must_use]
unsafe fn set_buffer(&self, index: usize, trailing: &[u8], spilled: &[u8]) -> bool {
let pos = index - self.removed.0;
assert!(
pos < self.streams.len().saturating_sub(1),
"index too large"
);
let buffer = make_span_buffer(trailing, spilled)
.expect("trailing and spilled should be either empty or non-empty");
if buffer.is_none() {
return false;
}
self.buffers.compare_update_default(
|current| current[pos].is_none(),
move |current, buffers| {
buffers.clone_from(current);
buffers[pos] = buffer.map(Arc::new);
},
)
}
}
impl<B: Blocks> BlockStreams<B> {
#[inline(always)]
#[must_use]
fn stats(&self, offset: usize) -> Stats {
let ending = self.pick_ending().map(|index| index - self.removed.0);
let mut cumulative = self.removed.1;
let mut stats: Vec<BlockStreamStats> = Vec::with_capacity(self.streams.len());
for (pos, stream) in self.streams.iter().enumerate() {
let capacity = stream.capacity();
let length = stream.len();
cumulative += length;
if pos == 0 {
cumulative -= stream.spilled().len();
}
stats.push(BlockStreamStats {
block_count: stream.block_count(),
block_shift: stream.block_shift(),
data_capacity: capacity,
data_used: length,
data_wasted: length.saturating_sub(cumulative.saturating_sub(offset)),
data_available: 0,
});
if ending.is_some_and(|ending| pos >= ending) {
stats.last_mut().unwrap().data_available = capacity - length;
} else {
stats.last_mut().unwrap().data_wasted += capacity - length;
}
}
Stats(stats)
}
fn offset_at(&self, compare: impl Fn(&[u8], &[u8]) -> SearchControl) -> Result<usize, usize> {
if self.streams.is_empty() {
return Err(self.removed.1);
}
let last_index = self.pick_empty().map_or_else(
|| self.removed.0 + self.streams.len() - 1,
|index| index.saturating_sub(1),
);
let buffers = self.buffers.read();
let last_bytes = if let Some(stream) = self.get(last_index) {
if stream.data().is_empty() {
let buffer = buffers[last_index.saturating_sub(1) - self.removed.0].as_deref();
if let Some(buffer) = buffer {
buffer
} else {
return Err(self.removed.1);
}
} else {
let range = stream
.data_range_for(*stream.data_block_range().end())
.expect("end block range should be contain valid data range");
&stream.data()[range]
}
} else {
return Err(self.removed.1);
};
let mut offset = self.removed.1;
for index in self.removed.0..=last_index {
let stream = self.get(index).unwrap();
let data = stream.data();
if !data.is_empty() {
let block_range = stream.data_block_range();
let mut position = None;
let mut start_block = *block_range.start();
let mut end_block = *block_range.end();
while start_block != end_block {
let middle_block = (start_block + end_block) / 2;
let (block, range) = match stream.data_range_for(middle_block) {
Ok(range) => (middle_block, range),
Err(block) => (
block,
stream
.data_range_for(block)
.expect("block should be smaller than total number of blocks"),
),
};
assert_ne!(block as u64, stream.block_count());
let range_start = range.start;
match compare(&data[range], last_bytes) {
SearchControl::Match(pos) => {
position = Some(range_start + pos);
end_block = middle_block;
}
SearchControl::SearchLeft => end_block = middle_block,
SearchControl::SearchRight => start_block = block + 1,
};
}
match stream.data_range_for(end_block) {
Ok(range) => {
let range_start = range.start;
match compare(&data[range], last_bytes) {
SearchControl::Match(pos) => {
position = Some(range_start + pos);
}
SearchControl::SearchLeft | SearchControl::SearchRight => (),
}
}
Err(_) => unreachable!("end block should always be within data range"),
};
if let Some(pos) = position {
return Ok(offset + pos);
}
if end_block != *block_range.end() {
return Err(offset);
}
offset += data.len();
}
if let Some(buffer) = buffers[index - self.removed.0].as_deref() {
match compare(buffer, last_bytes) {
SearchControl::Match(pos) => return Ok(offset + pos),
SearchControl::SearchLeft => return Err(offset),
SearchControl::SearchRight => offset += buffer.len(),
};
}
}
Err(offset)
}
}
impl<B> Clone for BlockStreams<B> {
#[inline(always)]
fn clone(&self) -> Self {
Self {
streams: self.streams.clone(),
buffers: self.buffers.clone(),
removed: self.removed,
}
}
#[inline(always)]
fn clone_from(&mut self, source: &Self) {
self.streams.clone_from(&source.streams);
self.buffers.clone_from(&source.buffers);
self.removed = source.removed;
}
}
impl<B> Default for BlockStreams<B> {
#[inline(always)]
fn default() -> Self {
Self {
streams: VecDeque::default(),
buffers: VecDeque::default().into(),
removed: (0, 0),
}
}
}
impl<B> TryFrom<VecDeque<block::Stream<B>>> for BlockStreams<B> {
type Error = VecDeque<block::Stream<B>>;
fn try_from(mut value: VecDeque<block::Stream<B>>) -> Result<Self, Self::Error> {
let mut streams = Self::new();
while let Some(stream) = value.pop_front() {
if let Err(stream) = streams.append(Arc::new(stream)) {
value.push_front(Arc::into_inner(stream).unwrap());
return Err(value);
}
}
Ok(streams)
}
}
#[inline(always)]
fn make_span_buffer(trail: &[u8], spill: &[u8]) -> Result<Option<Vec<u8>>, ()> {
if trail.is_empty() ^ spill.is_empty() {
Err(())
} else if !trail.is_empty() {
let mut buffer = vec![0; trail.len() + spill.len()];
buffer[..trail.len()].copy_from_slice(trail);
buffer[trail.len()..].copy_from_slice(spill);
Ok(Some(buffer))
} else {
Ok(None)
}
}
#[cfg(test)]
mod tests {
use core::{mem, time};
use std::{
io::{Read, Seek, Write},
sync::mpsc,
thread,
};
use super::*;
macro_rules! assert_waits {
($what:expr, $until:expr, $($args:tt)+) => {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
tx.send(false).unwrap();
$what;
tx.send(true).unwrap();
});
assert!(!rx.recv().unwrap(), $($args)+);
thread::sleep(time::Duration::from_millis(100));
assert!(rx.try_recv().is_err(), $($args)+);
$until;
assert!(rx.recv().unwrap(), $($args)+);
}
}
const FF: usize = 1280;
const FT: usize = 1152;
const FP: usize = 1152;
const FE: usize = 0;
const SF: usize = 1280 - 16;
const SP: usize = 1152 - 16;
const TS: usize = 128 + 16;
macro_rules! blockstreams {
($($kind:tt),+) => {{
let mut streams = VecDeque::new();
let mut iter = (1..).peekable();
$(
streams.push_back(blockstreams!(@$kind &mut iter));
)+
streams
}};
(@ff $iter:expr) => {
test_generate_blockstream($iter, 255, TestGenerateFlags::Empty as u8)
};
(@ft $iter:expr) => {
test_generate_blockstream($iter, 144, TestGenerateFlags::Trailing as u8)
};
(@fp $iter:expr) => {
test_generate_blockstream($iter, 144, TestGenerateFlags::Empty as u8)
};
(@fe $iter:expr) => {
test_generate_blockstream($iter, 0, TestGenerateFlags::Empty as u8)
};
(@sf $iter:expr) => {
test_generate_blockstream($iter, 255, TestGenerateFlags::Spilled as u8)
};
(@st $iter:expr) => {
test_generate_blockstream($iter, 142,
TestGenerateFlags::Spilled as u8 | TestGenerateFlags::Trailing as u8)
};
(@sp $iter:expr) => {
test_generate_blockstream($iter, 142, TestGenerateFlags::Spilled as u8)
};
(@se $iter:expr) => {
test_generate_blockstream($iter, 0, TestGenerateFlags::Spilled as u8)
};
}
#[test]
fn stream_set_offset() {
macro_rules! read_word {
($iter:expr) => {{
let chunk = $iter.next().unwrap();
let bytes = chunk.bytes().unwrap();
u64::from_le_bytes(bytes[..8].try_into().unwrap())
}};
}
let mut stream = Stream::<TestMemoryBlocksAllocator>::new(blockstreams!(fp, ff, fe).into());
stream.load().unwrap();
let case = "offset within first block stream";
let result = stream.set_offset(|bytes, _| test_find_word_in_stream(bytes, 42));
assert!(result, "{case}");
assert_eq!(read_word!(stream.iter()), 42, "{case}");
let case = "offset within second block stream";
let result = stream.set_offset(|bytes, _| test_find_word_in_stream(bytes, 198));
assert!(result, "{case}");
assert_eq!(read_word!(stream.iter()), 198, "{case}");
let case = "offset before current offset is not set";
let result = stream.set_offset(|bytes, _| test_find_word_in_stream(bytes, 42));
assert!(!result, "{case}");
assert_eq!(read_word!(stream.iter()), 198, "{case}");
let case = "offset of unmatched value is not set";
let result = stream.set_offset(|_, _| SearchControl::SearchRight);
assert!(!result, "{case}");
assert_eq!(read_word!(stream.iter()), 198, "{case}");
}
#[test]
fn stream_block_streams() {
let mut stream = Stream::<TestMemoryBlocksAllocator>::new(blockstreams!(fp, fe).into());
stream.load().unwrap();
let block_streams = stream.block_streams();
assert_eq!(block_streams.len(), 2);
assert!(block_streams[0].verify(|_| ()));
assert!(block_streams[1].verify(|_| ()));
}
#[test]
fn stream_stats() {
let case = "basic";
let mut stream = Stream::<TestMemoryBlocksAllocator>::new(blockstreams!(fp, ff, fe).into());
stream.load().unwrap();
assert!(stream.set_offset(|bytes, _| test_find_word_in_stream(bytes, 64)));
let stats = stream.stats();
assert_eq!(stats.blocks_size(), 3 * 2048, "{case}");
assert_eq!(stats.data_capacity(), 3 * 1280, "{case}");
assert_eq!(stats.data_used(), FP + FF + FE, "{case}");
assert_eq!(stats.data_wasted(), 63 * 8 + 128, "{case}");
assert_eq!(stats.data_available(), 1280, "{case}");
}
#[test]
fn stream_iter() {
let mut stream = Stream::<TestMemoryBlocksAllocator>::new(
blockstreams!(fe, fp, ft, sp, ff, fe, fp, fe).into(),
);
stream.load().unwrap();
assert!(stream.set_offset(|bytes, _| test_find_word_in_stream(bytes, 42)));
assert_eq!(stream.maybe_shrink(), (1, 0));
let case = "first chunk";
let mut iter = stream.iter();
let chunk = iter.next().expect(case);
let bytes = chunk.bytes().expect(case);
assert_eq!(bytes.len(), (144 - 42) * 8 + 8, "{case}");
let case = "second chunk";
let chunk = iter.next().expect(case);
let bytes = chunk.bytes().expect(case);
assert_eq!(bytes.len(), 144 * 8, "{case}");
let case = "rewind before spanned chunk";
iter.rewind(64);
let chunk = iter.next().expect(case);
let bytes = chunk.bytes().expect(case);
assert_eq!(bytes.len(), 64, "{case}");
let case = "third chunk";
let chunk = iter.next().expect(case);
let bytes = chunk.bytes().expect(case);
assert_eq!(bytes.len(), 16 * 8 + 16, "{case}");
let case = "rewind spanned chunk";
iter.rewind(48);
let chunk = iter.next().expect(case);
let bytes = chunk.bytes().expect(case);
assert_eq!(bytes.len(), 48, "{case}");
let case = "fourth chunk";
let chunk = iter.next().expect(case);
let bytes = chunk.bytes().expect(case);
assert_eq!(bytes.len(), 142 * 8, "{case}");
let case = "fifth chunk";
let chunk = iter.next().expect(case);
let bytes = chunk.bytes().expect(case);
assert_eq!(bytes.len(), 160 * 8, "{case}");
let case = "sixths chunk";
let chunk = iter.next().expect(case);
let bytes = chunk.bytes().expect(case);
assert_eq!(bytes.len(), 144 * 8, "{case}");
let case = "rewind last chunk";
iter.rewind(32);
let chunk = iter.next().expect(case);
let bytes = chunk.bytes().expect(case);
assert_eq!(bytes.len(), 32, "{case}");
let case = "rewind two chunks";
iter.rewind(728 + 144 * 8);
let chunk = iter.next().expect(case);
let bytes = chunk.bytes().expect(case);
assert_eq!(bytes.len(), 728, "{case}");
let chunk = iter.next().expect(case);
let bytes = chunk.bytes().expect(case);
assert_eq!(bytes.len(), 144 * 8, "{case}");
let case = "drained";
assert!(iter.next().is_none(), "{case}");
assert!(iter.next().is_none(), "{case}");
let case = "consistent returned data";
let mut value = 42;
for (n, chunk) in stream.iter().enumerate() {
let bytes = chunk.bytes().expect(case);
for i in 0..bytes.len() >> 3 {
let word = u64::from_le_bytes(bytes[i * 8..i * 8 + 8].try_into().unwrap());
if word != TEST_WORDS_REPEATING[0] {
assert_eq!(word, value, "{case}: chunk={n} word={i}");
value += 1;
}
}
}
}
#[test]
fn stream_iter_advance() {
let mut stream =
Stream::<TestMemoryBlocksAllocator>::new(blockstreams!(ft, sp, ff, fe).into());
stream.load().unwrap();
assert!(stream.set_offset(|bytes, _| test_find_word_in_stream(bytes, 114)));
let mut iter = stream.iter();
let mut behind = stream.iter();
let mut ahead = stream.iter();
let case = "advance within first data chunk";
iter.next().unwrap();
iter.rewind(188);
ahead.next().unwrap();
assert!(stream.advance(iter.advance_context()), "{case}");
let chunk = iter.next().expect(case);
let bytes = chunk.bytes().expect(case);
assert_eq!(bytes.len(), 188, "{case}");
assert!(behind.next().expect(case).is_skipped(), "{case}");
let chunk = behind.next().expect(case);
let bytes = chunk.bytes().expect(case);
assert_eq!(bytes.len(), 188, "{case}");
let chunk = ahead.next().expect(case);
let bytes = chunk.bytes().expect(case);
assert_eq!(bytes.len(), 16 * 8 + 16, "{case}");
let case = "advance within spanned chunk";
iter.next().unwrap();
iter.rewind(64);
assert!(stream.advance(iter.advance_context()), "{case}");
let chunk = iter.next().expect(case);
let bytes = chunk.bytes().expect(case);
assert_eq!(bytes.len(), 64, "{case}");
assert!(behind.next().expect(case).is_skipped(), "{case}");
let chunk = behind.next().expect(case);
let bytes = chunk.bytes().expect(case);
assert_eq!(bytes.len(), 64, "{case}");
let chunk = ahead.next().expect(case);
let bytes = chunk.bytes().expect(case);
assert_eq!(bytes.len(), 142 * 8, "{case}");
let case = "advance until the end skipping few chunks";
while iter.next().is_some() {}
assert!(stream.advance(iter.advance_context()), "{case}");
assert!(iter.next().is_none(), "{case}");
assert!(behind.next().expect(case).is_skipped(), "{case}");
assert!(behind.next().is_none(), "{case}");
assert!(ahead.next().expect(case).is_skipped(), "{case}");
assert!(ahead.next().is_none(), "{case}");
}
#[test]
fn stream_iter_rewind() {
let mut stream =
Stream::<TestMemoryBlocksAllocator>::new(blockstreams!(ft, sp, ff, fe).into());
stream.load().unwrap();
assert!(stream.set_offset(|bytes, _| test_find_word_in_stream(bytes, 114)));
let mut iter = stream.iter();
let mut behind = stream.iter();
let mut ahead = stream.iter();
iter.next().unwrap();
iter.rewind(128);
ahead.next().unwrap();
let case = "rewind after advancing";
assert!(stream.advance(iter.advance_context()), "{case}");
assert!(iter.rewind(1024), "{case}");
assert!(!behind.rewind(1024), "{case}");
assert!(ahead.rewind(1024), "{case}");
let chunk = iter.next().expect(case);
let bytes = chunk.bytes().expect(case);
assert_eq!(bytes.len(), 128, "{case}");
let chunk = behind.next().expect(case);
let bytes = chunk.bytes().expect(case);
assert_eq!(bytes.len(), 128, "{case}");
let chunk = ahead.next().expect(case);
let bytes = chunk.bytes().expect(case);
assert_eq!(bytes.len(), 128, "{case}");
}
#[test]
fn stream_iter_shrink() {
let mut stream =
Stream::<TestMemoryBlocksAllocator>::new(blockstreams!(fp, fp, ff, fe).into());
stream.load().unwrap();
assert!(stream.set_offset(|bytes, _| test_find_word_in_stream(bytes, 114)));
let case = "shrink while reading";
let mut iter = stream.iter();
let chunk = iter.next().expect(case);
let bytes = chunk.bytes().expect(case);
let mut behind = iter.clone();
behind.rewind(64);
assert!(stream.advance(iter.advance_context()), "{case}");
assert_eq!(stream.maybe_shrink(), (1, FP), "{case}");
assert_eq!(bytes.len(), FP - 912 + 8, "{case}");
let case = "read after shrink";
assert!(behind.next().expect(case).is_skipped(), "{case}");
let size: usize = iter.map(|chunk| chunk.bytes().expect(case).len()).sum();
assert_eq!(size, FP + FF, "{case}");
let size: usize = behind.map(|chunk| chunk.bytes().expect(case).len()).sum();
assert_eq!(size, FP + FF, "{case}");
}
#[test]
fn stream_advance() {
let mut stream = Stream::<TestMemoryBlocksAllocator>::new(blockstreams!(fp, ff, fe).into());
stream.load().unwrap();
let case = "advance before offset";
let ctx = stream.iter().advance_context();
assert!(stream.set_offset(|bytes, _| test_find_word_in_stream(bytes, 114)));
assert!(!stream.advance(ctx), "{case}");
let case = "advance at offset";
assert!(!stream.advance(stream.iter().advance_context()), "{case}");
}
#[test]
#[should_panic(expected = "advance past the end of the stream")]
fn stream_advance_large() {
let mut stream = Stream::<TestMemoryBlocksAllocator>::new(blockstreams!(fp, ff, fe).into());
stream.load().unwrap();
assert!(stream.set_offset(|bytes, _| test_find_word_in_stream(bytes, 114)));
let ctx = AdvanceContext(*stream.offset.get_mut() + FP - 904 + FF + 1);
stream.advance(ctx);
}
#[test]
fn stream_load() {
let mut stream = Stream::<TestMemoryBlocksAllocator>::new(blockstreams!(fp, ff, fe).into());
let case = "load basic";
stream.load().expect(case);
assert_eq!(stream.stats().iter().count(), 3, "{case}");
let case = "load repeated";
let mut iter = stream.iter();
iter.next();
assert!(stream.advance(iter.advance_context()));
assert_ne!(*stream.offset.get_mut(), 0, "{case}");
stream.load().expect(case);
assert_eq!(*stream.offset.get_mut(), 0, "{case}");
assert_eq!(stream.stats().iter().count(), 3, "{case}");
let case = "load from empty";
let mut stream = Stream::<TestMemoryBlocksAllocator>::new(VecDeque::new().into());
stream.load().expect(case);
assert_eq!(stream.stats().iter().count(), 0, "{case}");
}
#[test]
fn stream_append() {
let mut stream = Stream::<TestMemoryBlocksAllocator>::new(blockstreams!(fp, fe, fe).into());
stream.load().unwrap();
let mut iter = stream.iter();
while iter.next().is_some() {}
let case = "append larger than the capacity of the following stream fails";
let err = stream.append(&TEST_BYTES_REPEATING[..1282]).err();
assert_eq!(
err.expect(case).to_string(),
"endlessstream: append exceeds block stream capacity of 1280 bytes",
"{case}"
);
let case = "empty append";
let ctx = stream.append(&[]).expect(case);
assert!(ctx.is_synced(), "{case}");
assert!(iter.next().is_none(), "{case}");
let case = "aligned to the end of a block stream";
let mut ctx = stream.append(&TEST_BYTES_REPEATING[..128]).expect(case);
assert!(!ctx.is_synced(), "{case}");
assert!(iter.next().is_none(), "{case}");
ctx.sync().expect(case);
assert!(ctx.is_synced(), "{case}");
let chunk = iter.next().expect(case);
let bytes = chunk.bytes().expect(case);
assert_eq!(bytes.len(), 128, "{case}");
let case = "append larger than the capacity of an empty stream fails";
let err = stream.append(&TEST_BYTES_REPEATING[..1282]).err();
assert_eq!(
err.expect(case).to_string(),
"endlessstream: append exceeds block stream capacity of 1280 bytes",
"{case}"
);
let case = "append to an empty block stream";
let mut ctx = stream.append(&TEST_BYTES_REPEATING[..1000]).expect(case);
assert!(!ctx.is_synced(), "{case}");
assert!(iter.next().is_none(), "{case}");
ctx.sync().expect(case);
assert!(ctx.is_synced(), "{case}");
let chunk = iter.next().expect(case);
let bytes = chunk.bytes().expect(case);
assert_eq!(bytes.len(), 1000, "{case}");
let case = "append to the next block stream due to size";
let mut ctx = stream.append(&TEST_BYTES_REPEATING[..500]).expect(case);
assert!(!ctx.is_synced(), "{case}");
assert!(iter.next().is_none(), "{case}");
ctx.sync().expect(case);
assert!(ctx.is_synced(), "{case}");
let chunk = iter.next().expect(case);
let bytes = chunk.bytes().expect(case);
assert_eq!(bytes.len(), 500, "{case}");
let case = "append within a block stream";
let mut ctx = stream.append(&TEST_BYTES_REPEATING[..250]).expect(case);
assert!(!ctx.is_synced(), "{case}");
assert!(iter.next().is_none(), "{case}");
ctx.sync().expect(case);
assert!(ctx.is_synced(), "{case}");
let mut ctx = stream.append(&TEST_BYTES_REPEATING[..250]).expect(case);
assert!(!ctx.is_synced(), "{case}");
ctx.sync().expect(case);
let chunk = iter.next().expect(case);
let bytes = chunk.bytes().expect(case);
assert_eq!(bytes.len(), 500, "{case}");
let case = "append fails because no block streams available";
let err = stream.append(&TEST_BYTES_REPEATING[..1000]).err();
assert_eq!(
err.expect(case).to_string(),
"endlessstream: no block streams available",
"{case}"
);
let case = "append aligned to the end of the stream";
let mut ctx = stream.append(&TEST_BYTES_REPEATING[..280]).expect(case);
assert!(!ctx.is_synced(), "{case}");
assert!(iter.next().is_none(), "{case}");
ctx.sync().expect(case);
assert!(ctx.is_synced(), "{case}");
let chunk = iter.next().expect(case);
let bytes = chunk.bytes().expect(case);
assert_eq!(bytes.len(), 280, "{case}");
stream.grow().unwrap();
let case = "append at capacity succeeds after growing the stream";
let mut ctx = stream.append(&TEST_BYTES_REPEATING[..1280]).expect(case);
assert!(!ctx.is_synced(), "{case}");
assert!(iter.next().is_none(), "{case}");
ctx.sync().expect(case);
assert!(ctx.is_synced(), "{case}");
let chunk = iter.next().expect(case);
let bytes = chunk.bytes().expect(case);
assert_eq!(bytes.len(), 1280, "{case}");
}
#[test]
fn stream_append_sized_span_behavior() {
let mut stream = Stream::<TestMemoryBlocksAllocator>::with_span_behavior(
blockstreams!(fp, fe, fe).into(),
SpanBehavior::Sized(130),
);
stream.load().unwrap();
let mut iter = stream.iter();
while iter.next().is_some() {}
let case = "no span with the append larger than the limit";
let mut ctx = stream.append(&TEST_BYTES_REPEATING[..132]).expect(case);
assert!(stream.streams.read().buffers.read()[0].is_none(), "{case}");
assert!(ctx.left.is_none(), "{case}");
assert!(ctx.right.is_some(), "{case}");
assert!(iter.next().is_none(), "{case}");
ctx.sync().expect(case);
assert!(ctx.is_synced(), "{case}");
let chunk = iter.next().expect(case);
let bytes = chunk.bytes().expect(case);
assert_eq!(bytes.len(), 132, "{case}");
let case = "spanned append";
let mut ctx = stream.append(&TEST_BYTES_REPEATING[..1086]).unwrap();
ctx.sync().unwrap();
iter.next();
let mut ctx = stream.append(&TEST_BYTES_REPEATING[..128]).expect(case);
assert!(stream.streams.read().buffers.read()[1].is_some(), "{case}");
assert!(ctx.left.is_some(), "{case}");
assert!(ctx.right.is_some(), "{case}");
assert!(iter.next().is_none(), "{case}");
ctx.sync().expect(case);
assert!(ctx.is_synced(), "{case}");
let chunk = iter.next().expect(case);
let bytes = chunk.bytes().expect(case);
assert_eq!(bytes.len(), 128, "{case}");
let case = "append after spanned append";
let mut ctx = stream.append(&TEST_BYTES_REPEATING[..64]).expect(case);
assert!(!ctx.is_synced(), "{case}");
assert!(iter.next().is_none(), "{case}");
ctx.sync().expect(case);
assert!(ctx.is_synced(), "{case}");
let chunk = iter.next().expect(case);
let bytes = chunk.bytes().expect(case);
assert_eq!(bytes.len(), 64, "{case}");
}
#[test]
fn stream_append_concurrent() {
let mut stream = Stream::<TestMemoryBlocksAllocator>::with_span_behavior(
blockstreams!(ff, fp, fe).into(),
SpanBehavior::Sized(130),
);
stream.load().unwrap();
let streams = stream.streams.read();
let case = "append to a locked block stream fails";
let locked = streams.get(1).unwrap().lock();
let err = stream.append(&TEST_BYTES_REPEATING[..64]).err();
assert_eq!(
err.expect(case).to_string(),
"endlessstream: blockstream: stream is busy with another operation",
"{case}"
);
drop(locked);
let case = "append to dirty block stream fails";
let mut ctx = stream.append(&TEST_BYTES_REPEATING[..64]).expect(case);
let err = stream.append(&TEST_BYTES_REPEATING[..64]).err();
assert_eq!(
err.expect(case).to_string(),
"endlessstream: previous sync has not completed",
"{case}"
);
ctx.sync().unwrap();
let case = "spanned append with locked buffers fails";
let locked = Stream::<TestMemoryBlocksAllocator>::lock_buffers(&streams).unwrap();
let err = stream.append(&TEST_BYTES_REPEATING[..128]).err();
assert_eq!(
err.expect(case).to_string(),
"endlessstream: blockstream: stream is busy with another operation",
"{case}"
);
drop(locked);
let case = "append during partial sync of a spanned append fails";
let mut iter = stream.iter();
while iter.next().is_some() {}
let mut ctx = stream.append(&TEST_BYTES_REPEATING[..128]).expect(case);
let err = stream.append(&TEST_BYTES_REPEATING[..64]).err();
assert_eq!(
err.expect(case).to_string(),
"endlessstream: previous sync has not completed",
"{case}"
);
assert!(iter.next().is_none(), "{case}");
streams
.get(ctx.right.take().unwrap())
.unwrap()
.sync()
.unwrap();
let err = stream.append(&TEST_BYTES_REPEATING[..64]).err();
assert_eq!(
err.expect(case).to_string(),
"endlessstream: previous sync has not completed",
"{case}"
);
assert!(iter.next().is_none(), "{case}");
let case = "append after completing partial sync succeeds";
ctx.sync().expect(case);
let chunk = iter.next().expect(case);
let bytes = chunk.bytes().expect(case);
assert_eq!(bytes.len(), 128, "{case}");
stream.append(&TEST_BYTES_REPEATING[..64]).expect(case);
let chunk = iter.next().expect(case);
let bytes = chunk.bytes().expect(case);
assert_eq!(bytes.len(), 64, "{case}");
}
#[test]
fn stream_grow() {
let mut stream = Stream::<TestMemoryBlocksAllocator>::new(blockstreams!(ff, fe).into());
stream.load().unwrap();
let stream = Arc::new(stream);
let mut iter = stream.iter();
let chunk = iter.next().unwrap();
assert_eq!(stream.stats().data_capacity(), 2 * 1280);
let case = "grow while reading";
stream.grow().expect(case);
assert_eq!(stream.stats().data_capacity(), 3 * 1280, "{case}");
let case = "growing waits for readers";
let stream_clone = Arc::clone(&stream);
assert_waits!(stream_clone.grow().expect(case), drop(chunk), "{case}");
assert_eq!(stream.stats().data_capacity(), 4 * 1280, "{case}");
let case = "growing waits for locked buffers";
let streams = stream.streams.read();
let locked = Stream::<TestMemoryBlocksAllocator>::lock_buffers(&streams).unwrap();
let stream_clone = Arc::clone(&stream);
assert_waits!(stream_clone.grow().expect(case), drop(locked), "{case}");
assert_eq!(stream.stats().data_capacity(), 5 * 1280, "{case}");
}
#[test]
fn stream_maybe_shrink() {
let mut stream =
Stream::<TestMemoryBlocksAllocator>::new(blockstreams!(ft, sp, fp, fp, fe, fe).into());
stream.load().unwrap();
let stream = Arc::new(stream);
let mut iter = stream.iter();
let chunk = iter.next().unwrap();
let case = "no shrink when referenced";
assert_eq!(stream.maybe_shrink(), (0, 0), "{case}");
assert_eq!(stream.stats().data_capacity(), 6 * 1280, "{case}");
assert_eq!(stream.releasables.lock().unwrap().len(), 0, "{case}");
let case = "no shrink when buffer referenced";
assert!(stream.advance(iter.advance_context()));
assert_eq!(stream.maybe_shrink(), (0, 0), "{case}");
assert_eq!(stream.stats().data_capacity(), 6 * 1280, "{case}");
assert_eq!(stream.releasables.lock().unwrap().len(), 0, "{case}");
let case = "shrink first block stream";
iter.next().unwrap();
assert!(stream.advance(iter.advance_context()));
assert_eq!(stream.maybe_shrink(), (1, FT + TS), "{case}");
assert_eq!(stream.stats().data_capacity(), 5 * 1280, "{case}");
assert_eq!(stream.releasables.lock().unwrap().len(), 1, "{case}");
let case = "shrinking waits for readers and removes two block streams";
while iter.next().is_some() {}
assert!(stream.advance(iter.advance_context()));
let stream_clone = Arc::clone(&stream);
assert_waits!(
assert_eq!(stream_clone.maybe_shrink(), (2, 2 * FP - 16), "{case}"),
drop(chunk),
"{case}"
);
assert_eq!(stream.stats().data_capacity(), 3 * 1280, "{case}");
assert_eq!(stream.releasables.lock().unwrap().len(), 3, "{case}");
let case = "last non-full stream is not removed";
assert!(iter.next().is_none(), "{case}");
assert_eq!(stream.maybe_shrink(), (0, 0), "{case}");
assert_eq!(stream.stats().data_capacity(), 3 * 1280, "{case}");
assert_eq!(stream.releasables.lock().unwrap().len(), 3, "{case}");
let case = "shrinking waits for locked buffers and removes last full stream";
stream.append(&TEST_BYTES_REPEATING[..128]).unwrap();
iter.next().unwrap();
assert!(stream.advance(iter.advance_context()));
assert!(iter.next().is_none(), "{case}");
let streams = stream.streams.read();
let locked = Stream::<TestMemoryBlocksAllocator>::lock_buffers(&streams).unwrap();
let stream_clone = Arc::clone(&stream);
assert_waits!(
assert_eq!(stream_clone.maybe_shrink(), (1, FF), "{case}"),
drop(locked),
"{case}"
);
assert_eq!(stream.stats().data_capacity(), 2 * 1280, "{case}");
assert_eq!(stream.releasables.lock().unwrap().len(), 4, "{case}");
}
#[test]
fn stream_release() {
let mut stream =
Stream::<TestMemoryBlocksAllocator>::new(blockstreams!(ft, sp, fp, fp, fe, fe).into());
stream.load().unwrap();
let stream = Arc::new(stream);
let mut iter = stream.iter();
iter.next().unwrap();
iter.next().unwrap();
let case = "recently removed are not released";
assert!(stream.advance(iter.advance_context()));
assert_eq!(stream.maybe_shrink(), (1, FT + TS), "{case}");
assert!(matches!(stream.try_release(), (0, 1, None)), "{case}");
let case = "removed after changing streams are released";
stream.grow().unwrap();
assert!(matches!(stream.try_release(), (1, 0, None)), "{case}");
let case = "recently removed are released immediately if forced";
iter.next().unwrap();
assert!(stream.advance(iter.advance_context()));
assert_eq!(stream.maybe_shrink(), (1, SP), "{case}");
assert!(matches!(stream.force_release(), (1, 0, None)), "{case}");
}
#[test]
fn blockstreams_len() {
let case = "with spilled at front";
let streams = BlockStreams::try_from(blockstreams!(sp, fe, ff, fe)).unwrap();
assert_eq!(streams.len(), (4, SP + FE + FF + FE), "{case}");
let case = "with spanned data";
let streams = BlockStreams::try_from(blockstreams!(sp, ft, sf, fe)).unwrap();
assert_eq!(streams.len(), (4, SP + FT + TS + SF + FE), "{case}");
let case = "without spilled at front";
let streams = BlockStreams::try_from(blockstreams!(fp, ft, sf, fe)).unwrap();
assert_eq!(streams.len(), (4, FP + FT + TS + SF + FE), "{case}");
}
#[test]
fn blockstreams_stats() {
let case = "with spilled at front";
let streams = BlockStreams::try_from(blockstreams!(sp, fe, ft, sp, fe)).unwrap();
let stats = streams.stats(88);
assert_eq!(stats.blocks_size(), 5 * 2048, "{case}");
assert_eq!(stats.data_capacity(), 5 * 1280, "{case}");
let expected = 16 + SP + FE + FT + TS + SP + FE;
assert_eq!(stats.data_used(), expected, "{case}");
assert_eq!(stats.data_wasted(), 16 + 88 + 128 + 1280 + 0, "{case}");
assert_eq!(stats.data_available(), 1280 - 1152 + 1280, "{case}");
let case = "with offset from the third block stream";
let streams = BlockStreams::try_from(blockstreams!(sp, fe, ft, sp, fe)).unwrap();
let stats = streams.stats(1168);
assert_eq!(stats.blocks_size(), 5 * 2048, "{case}");
assert_eq!(stats.data_capacity(), 5 * 1280, "{case}");
let expected = 16 + SP + FE + FT + TS + SP + FE;
assert_eq!(stats.data_used(), expected, "{case}");
assert_eq!(stats.data_wasted(), 16 + 128 + 1280 + 1168 + 0, "{case}");
assert_eq!(stats.data_available(), 1280 - 1152 + 1280, "{case}");
let case = "without spilled at front";
let streams = BlockStreams::try_from(blockstreams!(fp, fe, ft, sp, fe)).unwrap();
let stats = streams.stats(88);
assert_eq!(stats.blocks_size(), 5 * 2048, "{case}");
assert_eq!(stats.data_capacity(), 5 * 1280, "{case}");
assert_eq!(stats.data_used(), FP + FE + FT + TS + SP + FE, "{case}");
assert_eq!(stats.data_wasted(), 88 + 128 + 1280 + 0, "{case}");
assert_eq!(stats.data_available(), 1280 - 1152 + 1280, "{case}");
let case = "with removed block streams";
let mut streams = BlockStreams::try_from(blockstreams!(fe, fp, fe, ft, sp, fe)).unwrap();
streams.remove();
let stats = streams.stats(88);
assert_eq!(stats.blocks_size(), 5 * 2048, "{case}");
assert_eq!(stats.data_capacity(), 5 * 1280, "{case}");
let expected = FE + FP + FE + FT + TS + SP + FE;
assert_eq!(stats.data_used(), expected, "{case}");
assert_eq!(stats.data_wasted(), 88 + 128 + 1280 + 0, "{case}");
assert_eq!(stats.data_available(), 1280 - 1152 + 1280, "{case}");
}
#[test]
fn blockstreams_get() {
let mut streams = BlockStreams::try_from(blockstreams!(fe, ff, fe)).unwrap();
let case = "within new block streams";
assert_eq!(streams.get(0).map(|s| s.is_empty()), Some(true), "{case}");
assert_eq!(streams.get(1).map(|s| s.is_empty()), Some(false), "{case}");
assert_eq!(streams.get(2).map(|s| s.is_empty()), Some(true), "{case}");
assert_eq!(streams.get(3).map(|s| s.is_empty()), None, "{case}");
let case = "after removing first block stream";
streams.remove();
assert_eq!(streams.get(0).map(|s| s.is_empty()), None, "{case}");
assert_eq!(streams.get(1).map(|s| s.is_empty()), Some(false), "{case}");
assert_eq!(streams.get(2).map(|s| s.is_empty()), Some(true), "{case}");
assert_eq!(streams.get(3).map(|s| s.is_empty()), None, "{case}");
let case = "after removing all block streams";
streams.remove();
streams.remove();
assert_eq!(streams.get(0).map(|s| s.is_empty()), None, "{case}");
assert_eq!(streams.get(1).map(|s| s.is_empty()), None, "{case}");
assert_eq!(streams.get(2).map(|s| s.is_empty()), None, "{case}");
assert_eq!(streams.get(3).map(|s| s.is_empty()), None, "{case}");
}
#[test]
fn blockstreams_append() {
let case = "spill on a first append succeeds";
let spilled =
test_generate_blockstream(&mut (1..).peekable(), 255, TestGenerateFlags::Spilled as u8);
let mut streams = BlockStreams::new();
streams.append(spilled.into()).expect(case);
let case = "spill following blockstream without trail fails";
let partial =
test_generate_blockstream(&mut (1..).peekable(), 255, TestGenerateFlags::Empty as u8);
let spilled =
test_generate_blockstream(&mut (1..).peekable(), 255, TestGenerateFlags::Spilled as u8);
let mut streams = BlockStreams::new();
streams.append(partial.into()).unwrap();
streams.append(spilled.into()).expect_err(case);
}
#[test]
#[should_panic(expected = "append uninitialized, or with trail too large")]
fn blockstreams_append_uninitialized() {
let mut streams = BlockStreams::new();
streams
.append(Arc::new(block::Stream::new(TestMemoryBlocks::new())))
.unwrap();
}
#[test]
fn blockstreams_remove() {
let case = "remove basic";
let mut streams = BlockStreams::try_from(blockstreams!(ff, ft, sp, fe, fe)).unwrap();
assert_eq!(streams.removed(), (0, 0), "{case}");
streams.remove().expect(case);
assert_eq!(streams.removed(), (1, FF), "{case}");
streams.remove().expect(case);
assert_eq!(streams.removed(), (2, FF + FT + TS), "{case}");
streams.remove().expect(case);
assert_eq!(streams.removed(), (3, FF + FT + TS + SP), "{case}");
streams.remove().expect(case);
assert_eq!(streams.removed(), (4, FF + FT + TS + SP), "{case}");
streams.remove().expect(case);
assert_eq!(streams.removed(), (5, FF + FT + TS + SP), "{case}");
let case = "remove nothing";
assert!(streams.remove().is_none(), "{case}");
assert_eq!(streams.removed(), (5, FF + FT + TS + SP), "{case}");
}
#[test]
fn blockstreams_pick_empty() {
let case = "single empty should be picked";
let streams = BlockStreams::try_from(blockstreams!(fe)).unwrap();
assert_eq!(streams.pick_empty(), Some(0), "{case}");
let case = "empty following partial stream should be picked";
let streams = BlockStreams::try_from(blockstreams!(fp, ff, fp, fp, fe, fe)).unwrap();
assert_eq!(streams.pick_empty(), Some(4), "{case}");
let case = "empty following full stream should be picked";
let streams = BlockStreams::try_from(blockstreams!(fp, ff, fp, ff, fe, fe)).unwrap();
assert_eq!(streams.pick_empty(), Some(4), "{case}");
let case = "empty streams in the front should be skipped";
let streams = BlockStreams::try_from(blockstreams!(fe, fe, ff, fe, fe)).unwrap();
assert_eq!(streams.pick_empty(), Some(3), "{case}");
let case = "empty streams in the middle should be skipped";
let streams = BlockStreams::try_from(blockstreams!(ff, fe, fe, ff, fp, fe, fe)).unwrap();
assert_eq!(streams.pick_empty(), Some(5), "{case}");
let case = "no streams returns nothing";
let streams = BlockStreams::<TestMemoryBlocks>::new();
assert_eq!(streams.pick_empty(), None, "{case}");
let case = "single non-empty stream returns nothing";
let streams = BlockStreams::try_from(blockstreams!(fp)).unwrap();
assert_eq!(streams.pick_empty(), None, "{case}");
let streams = BlockStreams::try_from(blockstreams!(ff)).unwrap();
assert_eq!(streams.pick_empty(), None, "{case}");
let case = "no empty streams following non-empty returns nothing";
let streams = BlockStreams::try_from(blockstreams!(fp, ff, fe, fe, fp)).unwrap();
assert_eq!(streams.pick_empty(), None, "{case}");
let streams = BlockStreams::try_from(blockstreams!(fp, ff, fe, fe, ff)).unwrap();
assert_eq!(streams.pick_empty(), None, "{case}");
let case = "removing streams keeps the index stable";
let mut streams = BlockStreams::try_from(blockstreams!(ff, fp, ff, fp, fe)).unwrap();
assert_eq!(streams.pick_empty(), Some(4), "{case}");
streams.remove();
assert_eq!(streams.pick_empty(), Some(4), "{case}");
}
#[test]
fn blockstreams_pick_ending() {
let case = "single empty should be picked";
let streams = BlockStreams::try_from(blockstreams!(fe)).unwrap();
assert_eq!(streams.pick_ending(), Some(0), "{case}");
let case = "single non-empty should be picked";
let streams = BlockStreams::try_from(blockstreams!(fp)).unwrap();
assert_eq!(streams.pick_ending(), Some(0), "{case}");
let case = "last non-empty should be picked";
let streams = BlockStreams::try_from(blockstreams!(fp, ff, fp, fp)).unwrap();
assert_eq!(streams.pick_ending(), Some(3), "{case}");
let streams = BlockStreams::try_from(blockstreams!(fp, ff, fe, fp)).unwrap();
assert_eq!(streams.pick_ending(), Some(3), "{case}");
let streams = BlockStreams::try_from(blockstreams!(fp, ff, fp, fp, fe, fe)).unwrap();
assert_eq!(streams.pick_ending(), Some(3), "{case}");
let case = "first empty should be picked";
let streams = BlockStreams::try_from(blockstreams!(fp, ff, fp, ff, fe)).unwrap();
assert_eq!(streams.pick_ending(), Some(4), "{case}");
let streams = BlockStreams::try_from(blockstreams!(fp, ff, fp, ff, fe, fe)).unwrap();
assert_eq!(streams.pick_ending(), Some(4), "{case}");
let case = "empty streams in the front should be skipped";
let streams = BlockStreams::try_from(blockstreams!(fe, fe, ff, fe, fe)).unwrap();
assert_eq!(streams.pick_ending(), Some(3), "{case}");
let streams = BlockStreams::try_from(blockstreams!(fe, fe, ff, fp, fe)).unwrap();
assert_eq!(streams.pick_ending(), Some(3), "{case}");
let case = "empty streams in the middle should be skipped";
let streams = BlockStreams::try_from(blockstreams!(ff, fp, fe, fe, ff, fp, fe)).unwrap();
assert_eq!(streams.pick_ending(), Some(5), "{case}");
let case = "empty streams returns nothing";
let streams = BlockStreams::<TestMemoryBlocks>::new();
assert_eq!(streams.pick_ending(), None, "{case}");
let case = "single full stream returns nothing";
let streams = BlockStreams::try_from(blockstreams!(ff)).unwrap();
assert_eq!(streams.pick_ending(), None, "{case}");
let case = "full last stream returns nothing";
let streams = BlockStreams::try_from(blockstreams!(fp, ff, fp, ff)).unwrap();
assert_eq!(streams.pick_ending(), None, "{case}");
let streams = BlockStreams::try_from(blockstreams!(fp, ff, fe, ff)).unwrap();
assert_eq!(streams.pick_ending(), None, "{case}");
let case = "removing streams keeps the index stable";
let mut streams = BlockStreams::try_from(blockstreams!(ff, fp, ff, fp, fe)).unwrap();
assert_eq!(streams.pick_ending(), Some(3), "{case}");
streams.remove();
assert_eq!(streams.pick_ending(), Some(3), "{case}");
}
#[test]
fn blockstreams_stream_at() {
let streams =
BlockStreams::try_from(blockstreams!(ft, sp, fp, fe, fe, ft, sp, fe)).unwrap();
let case = "zero offset";
assert_eq!(streams.stream_at(0), Some((0, 0)), "{case}");
let case = "offset within the first stream";
assert_eq!(streams.stream_at(1120), Some((0, 1120)), "{case}");
let case = "offset within the first stream span";
let offset = FT + 40;
assert_eq!(streams.stream_at(offset), Some((0, FT + 40)), "{case}");
let case = "offset within the second stream";
let offset = FT + TS + 16;
assert_eq!(streams.stream_at(offset), Some((1, 16)), "{case}");
let case = "offset aligned to the start of the third stream";
let offset = FT + TS + SP;
assert_eq!(streams.stream_at(offset), Some((2, 0)), "{case}");
let case = "offset skipping empty streams";
let offset = FT + TS + SP + FP + 16;
assert_eq!(streams.stream_at(offset), Some((5, 16)), "{case}");
let case = "offset within the sixths stream span";
let offset = FT + TS + SP + FP + FT + 40;
assert_eq!(streams.stream_at(offset), Some((5, FT + 40)), "{case}");
let case = "offset aligned to the end of the last stream";
let offset = FT + TS + SP + FP + FT + TS + SP;
assert_eq!(streams.stream_at(offset), Some((6, SP)), "{case}");
let case = "offset past the end of the last stream";
let offset = FT + TS + SP + FP + FT + TS + SP + 2;
assert_eq!(streams.stream_at(offset), None, "{case}");
}
#[test]
fn blockstreams_stream_at_end_aligned() {
let case = "end of a partial stream";
let streams = BlockStreams::try_from(blockstreams!(fp, fp, fe)).unwrap();
assert_eq!(streams.stream_at(FP + FP), Some((1, FP)), "{case}");
let case = "end of a full stream with empty";
let streams = BlockStreams::try_from(blockstreams!(fp, ff, fe)).unwrap();
assert_eq!(streams.stream_at(FP + FF), Some((2, 0)), "{case}");
let case = "end of a full stream without empty";
let streams = BlockStreams::try_from(blockstreams!(fp, fp, ff)).unwrap();
assert_eq!(streams.stream_at(FP + FP + FF), Some((3, 0)), "{case}");
let case = "start of a spilled stream";
let streams = BlockStreams::try_from(blockstreams!(fp, ft, se)).unwrap();
assert_eq!(streams.stream_at(FP + FT + TS), Some((2, 0)), "{case}");
}
#[test]
fn blockstreams_stream_at_removed() {
let mut streams =
BlockStreams::try_from(blockstreams!(ft, sp, fp, fe, fe, ft, sp, fe)).unwrap();
streams.remove();
let case = "offset within removed stream";
assert_eq!(streams.stream_at(0), None, "{case}");
assert_eq!(streams.stream_at(16), None, "{case}");
let case = "offset at the start of the first stream";
assert_eq!(streams.stream_at(FT + TS), Some((1, 0)), "{case}");
let case = "offset within the first stream";
assert_eq!(streams.stream_at(FT + TS + 16), Some((1, 16)), "{case}");
let case = "offset aligned to the start of the second stream";
assert_eq!(streams.stream_at(FT + TS + SP), Some((2, 0)), "{case}");
let case = "offset skipping empty streams";
let offset = FT + TS + SP + FP + 16;
assert_eq!(streams.stream_at(offset), Some((5, 16)), "{case}");
streams.remove();
streams.remove();
let case = "zero offset with empty streams at the start";
let offset = FT + TS + SP + FP;
assert_eq!(streams.stream_at(offset - 1), None, "{case}");
assert_eq!(streams.stream_at(offset), Some((5, 0)), "{case}");
}
#[test]
fn blockstreams_offset_at() {
let mut streams =
BlockStreams::try_from(blockstreams!(fp, ft, sp, fp, fe, ft, sf, fe)).unwrap();
streams.remove();
let case = "offset of monotonically increasing numbers";
let offset = streams.offset_at(|bytes, _| test_find_word_in_stream(bytes, 144));
assert_eq!(offset, Err(streams.removed().1), "{case}");
let offset = streams.offset_at(|bytes, _| test_find_word_in_stream(bytes, 145));
assert_eq!(offset, Ok(FP), "{case}");
let offset = streams.offset_at(|bytes, _| test_find_word_in_stream(bytes, 146));
assert_eq!(offset, Ok(FP + 8), "{case}");
let offset = streams.offset_at(|bytes, _| test_find_word_in_stream(bytes, 200));
assert_eq!(offset, Ok(FP + 440), "{case}");
let offset = streams.offset_at(|bytes, _| test_find_word_in_stream(bytes, 288));
assert_eq!(offset, Ok(FP + FT - 8), "{case}");
let offset = streams.offset_at(|bytes, _| test_find_word_in_stream(bytes, 289));
assert_eq!(offset, Ok(FP + FT + TS), "{case}");
let offset = streams.offset_at(|bytes, _| test_find_word_in_stream(bytes, 290));
assert_eq!(offset, Ok(FP + FT + TS + 8), "{case}");
let expected = FP + FT + TS + SP + FP - 16;
let offset = streams.offset_at(|bytes, _| test_find_word_in_stream(bytes, 573));
assert_eq!(offset, Ok(expected), "{case}");
let offset = streams.offset_at(|bytes, _| test_find_word_in_stream(bytes, 574));
assert_eq!(offset, Ok(expected + 8), "{case}");
let offset = streams.offset_at(|bytes, _| test_find_word_in_stream(bytes, 575));
assert_eq!(offset, Ok(expected + 16), "{case}");
let offset = streams.offset_at(|bytes, _| test_find_word_in_stream(bytes, 576));
assert_eq!(offset, Ok(expected + 24), "{case}");
let expected = FP + FT + TS + SP + FP + FT - 8;
let offset = streams.offset_at(|bytes, _| test_find_word_in_stream(bytes, 718));
assert_eq!(offset, Ok(expected), "{case}");
let offset = streams.offset_at(|bytes, _| test_find_word_in_stream(bytes, 719));
assert_eq!(offset, Ok(expected + 8 + TS), "{case}");
let offset = streams.offset_at(|bytes, _| test_find_word_in_stream(bytes, 720));
assert_eq!(offset, Ok(expected + 8 + TS + 8), "{case}");
let expected = FP + FT + TS + SP + FP + FT + TS + SF - 16;
let offset = streams.offset_at(|bytes, _| test_find_word_in_stream(bytes, 875));
assert_eq!(offset, Ok(expected), "{case}");
let offset = streams.offset_at(|bytes, _| test_find_word_in_stream(bytes, 876));
assert_eq!(offset, Ok(expected + 8), "{case}");
let offset = streams.offset_at(|bytes, _| test_find_word_in_stream(bytes, 877));
assert_eq!(offset, Err(streams.removed().1 + streams.len().1), "{case}");
let case = "offset from the last block";
let offset = streams.offset_at(|bytes, last| {
let word = u64::from_le_bytes(last[..8].try_into().unwrap());
test_find_word_in_stream(bytes, word)
});
let expected = FP + FT + TS + SP + FP + FT + TS + SF - 8 * 10;
assert_eq!(offset, Ok(expected), "{case}");
let offset = streams.offset_at(|bytes, last| {
let word = u64::from_le_bytes(last[last.len() - 8..last.len()].try_into().unwrap());
test_find_word_in_stream(bytes, word)
});
let expected = FP + FT + TS + SP + FP + FT + TS + SF - 8;
assert_eq!(offset, Ok(expected), "{case}");
let case = "offset of a repeating word";
let offset =
streams.offset_at(|bytes, _| test_find_word_in_stream(bytes, TEST_WORDS_REPEATING[0]));
assert_eq!(offset, Ok(FP + FT), "{case}");
let case = "offset from the last block from a buffer";
let streams = BlockStreams::try_from(blockstreams!(fp, ft, se)).unwrap();
let offset = streams.offset_at(|bytes, last| {
let word = u64::from_le_bytes(last[..8].try_into().unwrap());
test_find_word_in_stream(bytes, word)
});
assert_eq!(offset, Ok(FP + FT), "{case}");
let case = "offset of repeating increasing numbers";
let streams = BlockStreams::try_from(blockstreams!(fe, fe)).unwrap();
let stream = streams.get(0).unwrap();
stream.append(test_words_as_bytes(&[1_u64; 8])).unwrap();
stream.append(test_words_as_bytes(&[2_u64; 8])).unwrap();
stream.append(test_words_as_bytes(&[3_u64; 8])).unwrap();
stream.append(test_words_as_bytes(&[4_u64; 8])).unwrap();
stream.sync().unwrap();
let offset = streams.offset_at(|bytes, last| {
let word = u64::from_le_bytes(last[last.len() - 8..last.len()].try_into().unwrap());
test_find_word_in_stream(bytes, word)
});
assert_eq!(offset, Ok(3 * 64), "{case}");
let case = "offset within block stream with empty data";
let mut streams = BlockStreams::new();
let mut stream = block::Stream::new(TestMemoryBlocks::new());
stream.initialize().unwrap();
stream
.append_with_opts(test_words_as_bytes(&[1_u64; 8]), true)
.unwrap();
stream.append(test_words_as_bytes(&[2_u64; 162])).unwrap();
stream.sync().unwrap();
assert!(stream.data().is_empty(), "{case}");
streams.append(stream.into()).unwrap();
let mut stream = block::Stream::new(TestMemoryBlocks::new());
stream.initialize().unwrap();
stream
.append_with_opts(test_words_as_bytes(&[2_u64; 10]), true)
.unwrap();
stream.append(test_words_as_bytes(&[3_u64; 8])).unwrap();
stream.append(test_words_as_bytes(&[4_u64; 8])).unwrap();
stream.sync().unwrap();
streams.append(stream.into()).unwrap();
let offset = streams.offset_at(|bytes, _| test_find_word_in_stream(bytes, 4));
assert_eq!(offset, Ok(162 * 8 + 8 * 8), "{case}");
let case = "no block streams";
let mut streams = BlockStreams::try_from(blockstreams!(fp)).unwrap();
streams.remove();
let offset = streams.offset_at(|_, _| unreachable!());
assert_eq!(offset, Err(FP), "{case}");
let case = "empty block streams";
let mut streams = BlockStreams::try_from(blockstreams!(fp, fe)).unwrap();
streams.remove();
let offset = streams.offset_at(|_, _| unreachable!());
assert_eq!(offset, Err(FP), "{case}");
let case = "always search left";
let streams = BlockStreams::try_from(blockstreams!(ft, se, ff)).unwrap();
let offset = streams.offset_at(|_, _| SearchControl::SearchLeft);
assert_eq!(offset, Err(0), "{case}");
let case = "search left within buffer";
let streams = BlockStreams::try_from(blockstreams!(ft, se, ff)).unwrap();
let offset = streams.offset_at(|bytes, _| {
if u64::from_le_bytes(bytes[..8].try_into().unwrap()) > 144 {
SearchControl::SearchLeft
} else {
SearchControl::SearchRight
}
});
assert_eq!(offset, Err(FT), "{case}");
let case = "search left within non-first block stream";
let streams = BlockStreams::try_from(blockstreams!(ft, se, ff)).unwrap();
let offset = streams.offset_at(|bytes, _| {
let word = u64::from_le_bytes(bytes[..8].try_into().unwrap());
if word == TEST_WORDS_REPEATING[0] || word <= 164 {
SearchControl::SearchRight
} else {
SearchControl::SearchLeft
}
});
assert_eq!(offset, Err(FT + TS), "{case}");
}
#[test]
fn blockstreams_set_buffer() {
macro_rules! get_buffer {
($streams:expr, $pos:expr) => {{
$streams.buffers.get_mut().get_mut($pos).unwrap().as_ref()
}};
}
let mut streams = BlockStreams::try_from(blockstreams!(fp, ft, st, sp, fp)).unwrap();
let case = "set with empty data";
let updated = unsafe { streams.set_buffer(0, &[], &[]) };
assert!(!updated, "{case}");
let buffer = get_buffer!(streams, 0);
assert!(buffer.is_none(), "{case}");
let case = "set on an empty index";
let updated = unsafe { streams.set_buffer(0, &[0x01, 0x02], &[0x03, 0x04]) };
assert!(updated, "{case}");
let buffer = get_buffer!(streams, 0).expect(case);
assert_eq!(buffer.as_slice(), &[0x01, 0x02, 0x03, 0x04], "{case}");
let case = "no overwrite on non-empty index";
let updated = unsafe { streams.set_buffer(1, &[0x01, 0x02], &[0x03, 0x04]) };
assert!(!updated, "{case}");
let buffer = get_buffer!(streams, 1).expect(case);
assert_eq!(buffer.as_slice(), &TEST_BYTES_REPEATING[..144], "{case}");
let case = "set after append";
let mut stream = block::Stream::new(TestMemoryBlocks::new());
stream.initialize().unwrap();
streams.append(Arc::new(stream)).unwrap();
let updated = unsafe { streams.set_buffer(3, &[0x05, 0x06], &[0x07, 0x08]) };
assert!(updated, "{case}");
let buffer = get_buffer!(streams, 0).expect(case);
assert_eq!(buffer.as_slice(), &[0x01, 0x02, 0x03, 0x04], "{case}");
let buffer = get_buffer!(streams, 1).expect(case);
assert_eq!(buffer.as_slice(), &TEST_BYTES_REPEATING[..144], "{case}");
let buffer = get_buffer!(streams, 2).expect(case);
assert_eq!(buffer.as_slice(), &TEST_BYTES_REPEATING[..144], "{case}");
let buffer = get_buffer!(streams, 3).expect(case);
assert_eq!(buffer.as_slice(), &[0x05, 0x06, 0x07, 0x08], "{case}");
let buffer = get_buffer!(streams, 4);
assert!(buffer.is_none(), "{case}");
let case = "set after remove";
streams.remove();
let updated = unsafe { streams.set_buffer(4, &[0x09, 0x10], &[0x11, 0x12]) };
assert!(updated, "{case}");
let buffer = get_buffer!(streams, 0).expect(case);
assert_eq!(buffer.as_slice(), &TEST_BYTES_REPEATING[..144], "{case}");
let buffer = get_buffer!(streams, 1).expect(case);
assert_eq!(buffer.as_slice(), &TEST_BYTES_REPEATING[..144], "{case}");
let buffer = get_buffer!(streams, 2).expect(case);
assert_eq!(buffer.as_slice(), &[0x05, 0x06, 0x07, 0x08], "{case}");
let buffer = get_buffer!(streams, 3).expect(case);
assert_eq!(buffer.as_slice(), &[0x09, 0x10, 0x11, 0x12], "{case}");
let buffer = get_buffer!(streams, 4);
assert!(buffer.is_none(), "{case}");
}
#[test]
#[should_panic(expected = "attempt to subtract with overflow")]
fn blockstreams_set_buffer_index_too_small() {
let mut streams = BlockStreams::try_from(blockstreams!(fp, fp, fp)).unwrap();
streams.remove();
unsafe {
let _ = streams.set_buffer(0, &[], &[]);
};
}
#[test]
#[should_panic(expected = "index too large")]
fn blockstreams_set_buffer_index_too_large() {
let streams = BlockStreams::try_from(blockstreams!(fp, fp, fp)).unwrap();
unsafe {
let _ = streams.set_buffer(2, &[], &[]);
};
}
#[test]
#[should_panic(expected = "trailing and spilled should be either empty or non-empty")]
fn blockstreams_set_buffer_no_spilled() {
let streams = BlockStreams::try_from(blockstreams!(fp, fp, fp)).unwrap();
unsafe {
let _ = streams.set_buffer(0, &[0x01], &[]);
};
}
#[test]
#[should_panic(expected = "trailing and spilled should be either empty or non-empty")]
fn blockstreams_set_buffer_no_trailing() {
let streams = BlockStreams::try_from(blockstreams!(fp, fp, fp)).unwrap();
unsafe {
let _ = streams.set_buffer(0, &[], &[0x01]);
};
}
#[test]
fn make_span_buffer_fn() {
let case = "empty span makes no buffer";
assert_eq!(make_span_buffer(&[], &[]).expect(case), None, "{case}");
let case = "span makes a buffer";
let bytes = make_span_buffer(&[0x01, 0x02], &[0x03]).expect(case);
assert_eq!(bytes.as_deref().expect(case), &[0x01, 0x02, 0x03], "{case}");
let case = "partial span results in error";
make_span_buffer(&[0x01, 0x02], &[]).expect_err(case);
make_span_buffer(&[], &[0x01, 0x02]).expect_err(case);
}
static TEST_WORDS_REPEATING: [u64; 170] = [0xcccccccc_u64.to_le(); 170];
static TEST_BYTES_REPEATING: &'static [u8] = test_words_as_bytes(&TEST_WORDS_REPEATING);
#[inline(always)]
const fn test_words_as_bytes(words: &[u64]) -> &[u8] {
unsafe { core::slice::from_raw_parts(words.as_ptr().cast::<u8>(), words.len() * 8) }
}
#[repr(u8)]
enum TestGenerateFlags {
Empty = 0b0000,
Spilled = 0b0001,
Trailing = 0b0010,
}
fn test_generate_blockstream(
iter: &mut core::iter::Peekable<impl Iterator<Item = u64>>,
take: usize,
flags: u8,
) -> block::Stream<TestMemoryBlocks> {
let mut stream = block::Stream::new(TestMemoryBlocks::new());
stream.initialize().unwrap();
if flags & TestGenerateFlags::Spilled as u8 != 0 {
stream
.append_with_opts(&TEST_BYTES_REPEATING[..16], true)
.unwrap();
}
for _ in 0..take {
let word = iter.peek().unwrap();
match stream.append(&word.to_le_bytes()) {
Ok(_) => {
iter.next();
continue;
}
Err(block::StreamError::Full) => break,
Err(_) => unreachable!(),
}
}
if flags & TestGenerateFlags::Trailing as u8 != 0 {
stream.append(&TEST_BYTES_REPEATING).unwrap();
}
stream.sync().unwrap();
stream
}
fn test_find_word_in_stream(bytes: &[u8], value: u64) -> SearchControl {
assert!(!bytes.is_empty());
for i in 0..bytes.len() >> 3 {
let word = u64::from_le_bytes(bytes[i * 8..i * 8 + 8].try_into().unwrap());
if word == value {
return SearchControl::Match(i * 8);
} else if word == TEST_WORDS_REPEATING[0] {
return SearchControl::SearchRight;
} else if word > value {
return SearchControl::SearchLeft;
}
}
SearchControl::SearchRight
}
struct TestMemoryBlocksAllocator(Mutex<Vec<Option<TestMemoryBlocks>>>);
unsafe impl BlocksAllocator for TestMemoryBlocksAllocator {
type Blocks = TestMemoryBlocks;
fn alloc(&self) -> io::Result<Self::Blocks> {
let mut allocated = self.0.lock().unwrap();
allocated.push(Some(TestMemoryBlocks::new()));
Ok(unsafe { allocated.last_mut().unwrap().as_mut().unwrap().refclone() })
}
fn release(&self, blocks: Self::Blocks) -> Result<(), (Self::Blocks, io::Error)> {
let bytes = blocks.0.get_ref();
for blocks in self.0.lock().unwrap().iter_mut() {
if blocks
.as_ref()
.is_some_and(|blocks| blocks.0.get_ref().as_ptr() == bytes.as_ptr())
{
blocks.take();
return Ok(());
}
}
Err((blocks, io::Error::from(io::ErrorKind::Other)))
}
fn retrieve(&self, mut f: impl FnMut(Self::Blocks)) -> io::Result<()> {
for blocks in self.0.lock().unwrap().iter_mut() {
if blocks.is_some() {
f(unsafe { blocks.as_mut().unwrap().refclone() });
}
}
Ok(())
}
}
impl From<VecDeque<block::Stream<TestMemoryBlocks>>> for TestMemoryBlocksAllocator {
fn from(mut value: VecDeque<block::Stream<TestMemoryBlocks>>) -> Self {
let mut blocks = Vec::with_capacity(value.len());
for stream in value.drain(..) {
blocks.push(Some(stream.into_inner()));
}
Self(blocks.into())
}
}
struct TestMemoryBlocks(TestMaybeDrop<io::Cursor<Vec<u8>>>);
impl TestMemoryBlocks {
const BLOCK_COUNT: usize = 16;
const BLOCK_SHIFT: u32 = 7;
const BLOCK_SIZE: usize = 1 << Self::BLOCK_SHIFT;
const SIZE: usize = Self::BLOCK_COUNT * Self::BLOCK_SIZE;
#[inline(always)]
fn new() -> Self {
Self(TestMaybeDrop::Normal(io::Cursor::new(vec![0; Self::SIZE])))
}
unsafe fn refclone(&mut self) -> Self {
let bytes = self.0.get_mut();
Self(TestMaybeDrop::Manual(mem::ManuallyDrop::new(
io::Cursor::new(Vec::from_raw_parts(
bytes.as_mut_ptr(),
bytes.len(),
bytes.capacity(),
)),
)))
}
}
impl Blocks for TestMemoryBlocks {
#[inline(always)]
fn block_count(&self) -> u64 {
Self::BLOCK_COUNT as u64
}
#[inline(always)]
fn block_shift(&self) -> u32 {
Self::BLOCK_SHIFT
}
#[inline(always)]
fn load_from(&mut self, block: u64, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result<()> {
self.0
.seek(io::SeekFrom::Start(block << Self::BLOCK_SHIFT))?;
for buf in bufs {
self.0.read_exact(buf)?;
}
Ok(())
}
#[inline(always)]
fn store_at(&mut self, block: u64, bufs: &mut [io::IoSlice<'_>]) -> io::Result<()> {
self.0
.seek(io::SeekFrom::Start(block << Self::BLOCK_SHIFT))?;
for buf in bufs {
self.0.write_all(buf)?;
}
Ok(())
}
}
impl Drop for TestMemoryBlocks {
#[inline(always)]
fn drop(&mut self) {
if let TestMaybeDrop::Manual(ref mut cursor) = self.0 {
let cursor = unsafe { mem::ManuallyDrop::take(cursor) };
mem::forget(cursor.into_inner());
}
}
}
enum TestMaybeDrop<T> {
Normal(T),
Manual(mem::ManuallyDrop<T>),
}
impl<T> core::ops::Deref for TestMaybeDrop<T> {
type Target = T;
#[inline(always)]
fn deref(&self) -> &T {
match self {
Self::Normal(value) => value,
Self::Manual(value) => value,
}
}
}
impl<T> core::ops::DerefMut for TestMaybeDrop<T> {
#[inline(always)]
fn deref_mut(&mut self) -> &mut T {
match self {
Self::Normal(value) => value,
Self::Manual(value) => value,
}
}
}
}