streamcatcher 1.0.1

A thread-safe, shared (asynchronous), almost-lockless stream buffer.
Documentation

Thread-safe, shared (asynchronous) stream buffer designed to lock only on accessing and storing new data.

Streamcatcher is designed to allow seeking on otherwise one-way streams (e.g., command output) whose output needs to be accessed by many threads without constant reallocations, contention over safe read-only data, or unnecessary stalling. Only threads who read in new data ever need to lock the data structure, and do not prevent earlier reads from occurring.

Features

  • Lockless access to pre-read data and finished streams.
  • Transparent caching of newly read data.
  • Allows seeking on read-only bytestreams.
  • Piecewise allocation to reduce copying and support unknown input lengths.
  • Optional acceleration of reads on stream completion by copying to a single backing store.
  • (Stateful) bytestream transformations.
  • Async support with the "async" feature, and runtimes via ["async-std-compat", "smol-compat", "tokio-compat"].

The main algorithm is outlined in this blog post, with rope reference tracking moved to occur only in the core.

Examples

use streamcatcher::Catcher;
use std::io::{
self,
Read,
Seek,
SeekFrom,
};

const THREAD_COUNT: usize = 256;
const PROCESS_LEN: u64 = 10_000_000;

// A read-only process, which many threads need to (re-)use.
let mut process = io::repeat(0xAC)
.take(PROCESS_LEN);

let mut catcher = Catcher::new(process);

// Many workers who need this data...
let mut handles = (0..THREAD_COUNT)
.map(|v| {
let mut handle = catcher.new_handle();
std::thread::spawn(move || {
let mut buf = [0u8; 4_096];
let mut correct_bytes = 0;
while let Ok(count) = handle.read(&mut buf[..]) {
if count == 0 { break }
for &byte in buf[..count].iter() {
if byte == 0xAC { correct_bytes += 1 }
}
}
correct_bytes
})
})
.collect::<Vec<_>>();

// And everything read out just fine!
let count_correct = handles.drain(..)
.map(|h| h.join().unwrap())
.filter(|&v| v == PROCESS_LEN)
.count();

assert_eq!(count_correct, THREAD_COUNT);

// Moving forwards and backwards *just works*.
catcher.seek(SeekFrom::End(0));
assert_eq!(io::copy(&mut catcher, &mut io::sink()).unwrap(), 0);

catcher.seek(SeekFrom::Current(-256));
assert_eq!(io::copy(&mut catcher, &mut io::sink()).unwrap(), 256);