[−][src]Crate streamcatcher
Thread-safe, shared (asynchronous) stream buffer designed to lock only on accessing and storing new data.
Streamcatcher is designed to allow seeking on otherwise one-way streams (e.g., command output) whose output needs to be accessed by many threads without constant reallocations, contention over safe read-only data, or unnecessary stalling. Only threads who read in new data ever need to lock the data structure, and do not prevent earlier reads from occurring.
Features
- Lockless access to pre-read data and finished streams.
- Transparent caching of newly read data.
- Allows seeking on read-only bytestreams.
- Piecewise allocation to reduce copying and support unknown input lengths.
- Optional acceleration of reads on stream completion by copying to a single backing store.
- (Stateful) bytestream transformations.
- Async support with the
"async"
feature, and runtimes via ["async-std-compat"
,"smol-compat"
,"tokio-compat"
].
The main algorithm is outlined in this blog post, with rope reference tracking moved to occur only in the core.
Examples
use streamcatcher::Catcher; use std::io::{ self, Read, Seek, SeekFrom, }; const THREAD_COUNT: usize = 256; const PROCESS_LEN: u64 = 10_000_000; // A read-only process, which many threads need to (re-)use. let mut process = io::repeat(0xAC) .take(PROCESS_LEN); let mut catcher = Catcher::new(process); // Many workers who need this data... let mut handles = (0..THREAD_COUNT) .map(|v| { let mut handle = catcher.new_handle(); std::thread::spawn(move || { let mut buf = [0u8; 4_096]; let mut correct_bytes = 0; while let Ok(count) = handle.read(&mut buf[..]) { if count == 0 { break } for &byte in buf[..count].iter() { if byte == 0xAC { correct_bytes += 1 } } } correct_bytes }) }) .collect::<Vec<_>>(); // And everything read out just fine! let count_correct = handles.drain(..) .map(|h| h.join().unwrap()) .filter(|&v| v == PROCESS_LEN) .count(); assert_eq!(count_correct, THREAD_COUNT); // Moving forwards and backwards *just works*. catcher.seek(SeekFrom::End(0)); assert_eq!(io::copy(&mut catcher, &mut io::sink()).unwrap(), 0); catcher.seek(SeekFrom::Current(-256)); assert_eq!(io::copy(&mut catcher, &mut io::sink()).unwrap(), 256);
Modules
future | Support types for |
Structs
Config | Options controlling backing store allocation, finalisation, and so on. |
Identity | A no-op data transform. |
TxCatcher | A shared stream buffer, using an applied input data transform. |
Enums
CatcherError | Streamcatcher configuration errors. |
Finaliser | Method to allocate a new contiguous backing store, if required by
|
GrowthStrategy | Growth pattern for allocating new chunks as the rope expands. |
TransformPosition |
Traits
NeedsBytes | Common trait required by transforms, specifying how many contiguous bytes are needed
for any |
ReadSkipExt | Utility trait to scan forward by discarding bytes. |
StateAccess | External access to (Async) |
Stateful | Transforms who can be queried about their internal state. |
Transform | Allows an input bytestream to be modified before it is stored. |
Type Definitions
Catcher | A simple shared stream buffer, leaving data unchanged. |
Result | Shorthand for configuration error handling. |