Crate streamcatcher[−][src]
Expand description
Thread-safe, shared (asynchronous) stream buffer designed to lock only on accessing and storing new data.
Streamcatcher is designed to allow seeking on otherwise one-way streams (e.g., command output) whose output needs to be accessed by many threads without constant reallocations, contention over safe read-only data, or unnecessary stalling. Only threads who read in new data ever need to lock the data structure, and do not prevent earlier reads from occurring.
Features
- Lockless access to pre-read data and finished streams.
- Transparent caching of newly read data.
- Allows seeking on read-only bytestreams.
- Piecewise allocation to reduce copying and support unknown input lengths.
- Optional acceleration of reads on stream completion by copying to a single backing store.
- (Stateful) bytestream transformations.
- Async support with the
"async"
feature, and runtimes via ["async-std-compat"
,"smol-compat"
,"tokio-compat"
].
The main algorithm is outlined in this blog post, with rope reference tracking moved to occur only in the core.
Examples
use streamcatcher::Catcher;
use std::io::{
self,
Read,
Seek,
SeekFrom,
};
const THREAD_COUNT: usize = 256;
const PROCESS_LEN: u64 = 10_000_000;
// A read-only process, which many threads need to (re-)use.
let mut process = io::repeat(0xAC)
.take(PROCESS_LEN);
let mut catcher = Catcher::new(process);
// Many workers who need this data...
let mut handles = (0..THREAD_COUNT)
.map(|v| {
let mut handle = catcher.new_handle();
std::thread::spawn(move || {
let mut buf = [0u8; 4_096];
let mut correct_bytes = 0;
while let Ok(count) = handle.read(&mut buf[..]) {
if count == 0 { break }
for &byte in buf[..count].iter() {
if byte == 0xAC { correct_bytes += 1 }
}
}
correct_bytes
})
})
.collect::<Vec<_>>();
// And everything read out just fine!
let count_correct = handles.drain(..)
.map(|h| h.join().unwrap())
.filter(|&v| v == PROCESS_LEN)
.count();
assert_eq!(count_correct, THREAD_COUNT);
// Moving forwards and backwards *just works*.
catcher.seek(SeekFrom::End(0));
assert_eq!(io::copy(&mut catcher, &mut io::sink()).unwrap(), 0);
catcher.seek(SeekFrom::Current(-256));
assert_eq!(io::copy(&mut catcher, &mut io::sink()).unwrap(), 256);
Modules
Support types for AsyncRead
/AsyncSeek
compatible stream buffers.
Requires the "async"
feature.
Structs
Options controlling backing store allocation, finalisation, and so on.
A no-op data transform.
A shared stream buffer, using an applied input data transform.
Enums
Streamcatcher configuration errors.
Method to allocate a new contiguous backing store, if required by
Config::use_backing
.
Growth pattern for allocating new chunks as the rope expands.
Traits
Common trait required by transforms, specifying how many contiguous bytes are needed
for any read(...)
to succeed.
Utility trait to scan forward by discarding bytes.
Transforms who can be queried about their internal state.
Allows an input bytestream to be modified before it is stored.