stream-transfer-limit 0.1.0

Byte-count transfer limits for fallible futures streams
Documentation
  • Coverage
  • 100%
    17 out of 17 items documented1 out of 1 items with examples
  • Size
  • Source code size: 29.7 kB This is the summed size of all the files inside the crates.io package for this release.
  • Documentation size: 566.43 kB This is the summed size of all files generated by rustdoc for all configured targets
  • Ø build duration
  • this release: 9s Average build duration of successful builds.
  • all releases: 9s Average build duration of successful builds in releases after 2024-10-23.
  • Links
  • run4w4y/stream-transfer-limit
    0 0 0
  • crates.io
  • Dependencies
  • Versions
  • Owners
  • run4w4y

stream-transfer-limit

stream-transfer-limit applies cumulative byte limits to fallible futures::Streams.

It is meant for cases where an HTTP middleware or fixed body limit is not quite the right abstraction: for example, when the allowed transfer size is chosen after routing, authentication, configuration, or account lookup.

use futures::{stream, StreamExt};
use stream_transfer_limit::{TransferLimit, TransferLimitError};

fn main() {
    futures::executor::block_on(async {
        let chunks = stream::iter([
            Ok::<_, std::io::Error>(vec![1, 2]),
            Ok(vec![3, 4, 5]),
        ]);

        let max_bytes = 4;
        let mut limited = TransferLimit::new(max_bytes).wrap(chunks);

        assert_eq!(limited.next().await.unwrap().unwrap(), vec![1, 2]);
        assert!(matches!(
            limited.next().await.unwrap(),
            Err(TransferLimitError::LimitExceeded { limit: 4, actual: 5 })
        ));
        assert!(limited.next().await.is_none());
    });
}

Behavior

  • The stream may produce exactly the configured limit.
  • The first chunk that makes the cumulative byte count greater than the limit returns TransferLimitError::LimitExceeded.
  • After a limit error, the wrapper terminates and does not keep polling the inner stream.
  • The default byte counter is usize. If the cumulative count cannot fit in the selected counter type, the wrapper returns TransferLimitError::CounterOverflow and terminates.
  • Inner stream errors are preserved as TransferLimitError::Inner.
  • Progress callbacks receive cumulative bytes after every successful chunk read from the inner stream, including the chunk that crosses the limit.

APIs

For new code, construct a TransferLimit and wrap the stream:

use futures::stream;
use stream_transfer_limit::TransferLimit;

fn main() {
    let chunks = stream::iter([Ok::<_, std::io::Error>(vec![0; 512])]);

    let _limited = TransferLimit::new(1024)
        .on_progress(|bytes_seen| {
            eprintln!("read {bytes_seen} bytes");
        })
        .wrap(chunks);
}

usize avoids unnecessary conversions for ordinary in-memory or platform-sized limits. For very large streams, choose a wider counter explicitly:

use futures::stream;
use stream_transfer_limit::TransferLimit;

fn main() {
    let chunks = stream::iter([Ok::<_, std::io::Error>(vec![0])]);
    let huge_limit = 16_u128 * 1024 * 1024 * 1024 * 1024;

    let _limited = TransferLimit::<u128>::from_limit(huge_limit).wrap(chunks);
}

The crate implements counters for usize, u64, and u128.

Development

The repository includes a Nix flake for a reproducible development shell:

nix develop

If you use direnv, allow the checked-in .envrc once:

direnv allow

The development shell provides the pinned Rust toolchain, rustfmt, clippy, rust-analyzer, cargo helper tools, Docker client, and act.

Run the local CI pipeline:

scripts/ci.sh

Run the GitHub Actions workflow locally with act:

scripts/act-ci.sh

Why not tower-http?

tower_http::limit::RequestBodyLimitLayer and http_body_util::Limited are good choices when the limit is known at the HTTP-body layer. This crate is for code that already has a futures::Stream and needs to choose the limit closer to business logic, such as per-tenant transfer limits.