Crate streamcatcher[][src]

Expand description

Thread-safe, shared (asynchronous) stream buffer designed to lock only on accessing and storing new data.

Streamcatcher is designed to allow seeking on otherwise one-way streams (e.g., command output) whose output needs to be accessed by many threads without constant reallocations, contention over safe read-only data, or unnecessary stalling. Only threads who read in new data ever need to lock the data structure, and do not prevent earlier reads from occurring.

Features

  • Lockless access to pre-read data and finished streams.
  • Transparent caching of newly read data.
  • Allows seeking on read-only bytestreams.
  • Piecewise allocation to reduce copying and support unknown input lengths.
  • Optional acceleration of reads on stream completion by copying to a single backing store.
  • (Stateful) bytestream transformations.
  • Async support with the "async" feature, and runtimes via ["async-std-compat", "smol-compat", "tokio-compat"].

The main algorithm is outlined in this blog post, with rope reference tracking moved to occur only in the core.

Examples

use streamcatcher::Catcher;
use std::io::{
    self,
    Read,
    Seek,
    SeekFrom,
};

const THREAD_COUNT: usize = 256;
const PROCESS_LEN: u64 = 10_000_000;

// A read-only process, which many threads need to (re-)use.
let mut process = io::repeat(0xAC)
    .take(PROCESS_LEN);

let mut catcher = Catcher::new(process);

// Many workers who need this data...
let mut handles = (0..THREAD_COUNT)
    .map(|v| {
        let mut handle = catcher.new_handle();
        std::thread::spawn(move || {
            let mut buf = [0u8; 4_096];
            let mut correct_bytes = 0;
            while let Ok(count) = handle.read(&mut buf[..]) {
                if count == 0 { break }
                for &byte in buf[..count].iter() {
                    if byte == 0xAC { correct_bytes += 1 }
                }
            }
            correct_bytes
        })
    })
    .collect::<Vec<_>>();

// And everything read out just fine!
let count_correct = handles.drain(..)
    .map(|h| h.join().unwrap())
    .filter(|&v| v == PROCESS_LEN)
    .count();

assert_eq!(count_correct, THREAD_COUNT);

// Moving forwards and backwards *just works*.
catcher.seek(SeekFrom::End(0));
assert_eq!(io::copy(&mut catcher, &mut io::sink()).unwrap(), 0);

catcher.seek(SeekFrom::Current(-256));
assert_eq!(io::copy(&mut catcher, &mut io::sink()).unwrap(), 256);

Modules

Support types for AsyncRead/AsyncSeek compatible stream buffers. Requires the "async" feature.

Structs

Options controlling backing store allocation, finalisation, and so on.

A no-op data transform.

A shared stream buffer, using an applied input data transform.

Enums

Streamcatcher configuration errors.

Method to allocate a new contiguous backing store, if required by Config::use_backing.

Growth pattern for allocating new chunks as the rope expands.

The number of bytes output by a Transform into a TxCatcher.

Traits

Common trait required by transforms, specifying how many contiguous bytes are needed for any read(...) to succeed.

Utility trait to scan forward by discarding bytes.

External access to (Async)Transform state via a TxCatcher (resp. async variants).

Transforms who can be queried about their internal state.

Allows an input bytestream to be modified before it is stored.

Type Definitions

A simple shared stream buffer, leaving data unchanged.

Shorthand for configuration error handling.