tower_batch/lib.rs
1#![allow(clippy::type_complexity)]
2
3//! A Tower middleware that buffers requests and flushes them in batches.
4//!
5//! Writing data in bulk is a common technique for improving the efficiency of
6//! certain tasks – databases, message brokers, object stores, etc. `tower-batch`
7//! collects individual requests and flushes them as a group when the buffer
8//! reaches a maximum size **or** a maximum duration elapses.
9//!
10//! # Inner service contract
11//!
12//! Your inner service must implement `Service<BatchControl<R>>` where `R` is
13//! your request type. The middleware sends two kinds of calls:
14//!
15//! - [`BatchControl::Item(request)`](BatchControl::Item) – buffer this request.
16//! - [`BatchControl::Flush`] – process the buffered items and return the result.
17//!
18//! # How the worker operates
19//!
20//! [`Batch::new`] (or [`BatchLayer`]) spawns a background worker that owns the
21//! inner service. The worker cycles through three states:
22//!
23//! 1. **Collecting** – the worker pulls requests from the channel and forwards
24//! each one to the inner service as a [`BatchControl::Item`]. A timer starts
25//! when the first item of a new batch arrives.
26//! 2. **Flushing** – triggered when the batch reaches `max_size` items or the
27//! `max_time` duration elapses. The worker calls the inner service with
28//! [`BatchControl::Flush`]. Once the flush completes, all callers in the
29//! batch receive the outcome and the worker returns to collecting.
30//! 3. **Finished** – the worker shuts down, either because all [`Batch`] handles
31//! were dropped (no more requests possible) or because the inner service
32//! returned an error.
33//!
34//! # Backpressure
35//!
36//! [`Batch`] handles are cheap to clone – each clone shares the same worker.
37//! Backpressure is enforced via a semaphore with `max_size` permits: once
38//! `max_size` callers have received `Ready` from [`poll_ready`](tower::Service::poll_ready)
39//! without yet calling [`call`](tower::Service::call), subsequent `poll_ready`
40//! calls will return `Pending` until capacity is freed.
41//!
42//! # Errors
43//!
44//! Callers receive one of two error types through the [`BoxError`] returned by
45//! [`ResponseFuture`](future::ResponseFuture):
46//!
47//! - [`error::ServiceError`] – the inner service returned an error, either
48//! during an item call or during a flush. The worker terminates and all
49//! pending callers in the current batch receive this error. The original
50//! error is accessible via [`source()`](std::error::Error::source).
51//!
52//! - [`error::Closed`] – the worker shut down before the caller's request
53//! could be completed. This happens when all [`Batch`] handles are dropped
54//! while items are still collecting (the batch was never flushed), or when
55//! the worker is dropped for any other reason. Callers should treat this as
56//! meaning their request was **NOT** processed.
57
58/// Export tower's alias for a type-erased error type.
59pub use tower::BoxError;
60
61pub use self::layer::BatchLayer;
62pub use self::service::Batch;
63
64pub mod error;
65pub mod future;
66mod layer;
67mod message;
68mod service;
69mod worker;
70
71/// Signaling mechanism for services that allow processing in batches.
72#[derive(Debug, Eq, PartialEq)]
73pub enum BatchControl<R> {
74 /// Collect a new batch item.
75 Item(R),
76
77 /// The current batch should be processed.
78 Flush,
79}
80
81impl<R> From<R> for BatchControl<R> {
82 fn from(req: R) -> BatchControl<R> {
83 BatchControl::Item(req)
84 }
85}