Qubit Batch
One-shot batch execution and processing utilities for the Qubit Rust libraries.
What it does
Use qubit-batch when you already have a finite batch and want to run it once
with consistent accounting:
- attempt every record in an import, validation, or maintenance job;
- keep stable zero-based failure indexes for diagnostics and retries;
- collect completed, succeeded, failed, and panicked task counts;
- detect producer bugs when an iterator yields fewer or more items than declared;
- avoid binding shared library code to Tokio, Rayon, or another runtime.
This crate is not a queue, scheduler, worker pool, or retry framework. It consumes the supplied iterator once and returns a structured result.
Core model
BatchExecutorruns fallible tasks. Usefor_eachfor item-oriented jobs,executefor explicitRunnabletasks, andcallforCallabletasks that return values.BatchOutcomeis the executor result. It reports task counters, elapsed time, and indexedBatchTaskFailureentries.BatchExecutionErroris a batch contract error. It means the iterator count did not match the declared count, and it carries the partialBatchOutcome.SequentialBatchExecutorruns tasks in iterator order on the caller thread.ParallelBatchExecutorruns tasks on fixed-width scoped standard threads.BatchProcessorprocesses data items directly instead of wrapping them as tasks.SequentialBatchProcessorandParallelBatchProcessorinvoke aqubit-functionConsumerper item and support progress reporting.ChunkedBatchProcessorsplits one logical batch into fixed-size chunks and delegates each chunk to anotherBatchProcessor. A delegate that returnsOkfor a chunk must reportitem_count == chunk_lenandcompleted_count == chunk_len;processed_countmay be lower when the underlying operation reports fewer successful or affected rows.
Rayon-backed execution lives in the companion qubit-rayon-batch crate.
Installation
[]
= "0.5"
Add qubit-function when you implement Runnable, Callable, or Consumer
types directly, and add qubit-progress when you implement custom progress
reporters.
Examples
Validate every item
use ;
let executor = new;
let records = ;
let result = executor
.for_each
.expect;
assert_eq!;
assert_eq!;
assert_eq!;
let failure = &result.failures;
assert_eq!;
match failure.error
Run in parallel
use ;
let executor = builder
.thread_count
.sequential_threshold
.build
.expect;
let result = executor
.for_each
.expect;
assert!;
ParallelBatchExecutor::default() keeps batches with 100 or fewer declared
tasks on the sequential executor to avoid scoped-thread setup overhead. Set
sequential_threshold(0) when every non-empty batch should use parallel
workers.
Collect callable values
use ;
let result = new
.call
.expect;
assert!;
assert_eq!;
Process items directly
use ;
let mut processor = new;
let result = processor
.process
.expect;
assert_eq!;
assert_eq!;
Delegate fixed-size chunks
use ;
use ;
;
let mut processor = new;
let result = processor
.process
.expect;
assert_eq!;
assert_eq!;
assert_eq!;
When ChunkedBatchProcessor delegates a chunk, the delegate result is treated
as the result for that exact submitted chunk. Returning Ok means the delegate
has reached a terminal outcome for every item in the chunk, so item_count and
completed_count must both match the submitted chunk length. processed_count
can be lower than the chunk length for domains where the target reports a
smaller success count, such as an idempotent database insert that accepts three
rows but affects only two. If the delegate cannot reach a terminal outcome for
the whole chunk, it should return Err; inconsistent Ok results are reported
as ChunkedBatchProcessError::InvalidChunkResult.
Progress Reporting
qubit-batch accepts qubit-progress reporters but does not re-export
qubit-progress types. Implement reporters from qubit-progress directly.
SequentialBatchExecutor, ParallelBatchExecutor, SequentialBatchProcessor,
ParallelBatchProcessor, and ChunkedBatchProcessor can all attach custom
reporters.
use Duration;
use ;
use ;
;
let executor = new
.with_reporter
.with_report_interval;
let result = executor
.for_each
.expect;
assert!;
Panics from task bodies are captured as BatchTaskError::Panicked. Panics from
processor consumers and progress reporters propagate to the caller because they
are outside the task failure model. Sequential execution and processing report
progress only between tasks or items; parallel variants use
Progress::spawn_running_reporter to emit running progress periodically from a
scoped reporter thread.
The configured report_interval is a throttle checked only at
implementation-defined running progress points. It does not guarantee that a
running event is emitted immediately when the interval elapses. Sequential
variants check between tasks or items, and chunked processing checks after a
chunk completes. Parallel variants use a scoped reporter thread; with a positive
interval they can also emit periodic running events while workers are active.
Duration::ZERO disables time throttling, so running progress is reported as
soon as each implementation-defined progress point is reached; it does not
create a tight refresh loop.
Count Contract
Execution and processing APIs require a declared count. This lets the API report stable totals before consuming lazy iterators and return partial results when a producer yields the wrong number of items.
use ;
let executor = new;
let error = executor
.for_each
.expect_err;
match error
Important result semantics:
Ok(BatchOutcome)does not mean every task succeeded. It means the supplied iterator matched the declared count.result.is_success()means all declared tasks completed without task errors or panics.Err(BatchExecutionError)means the iterator produced fewer or more items than declared and carries a partialBatchOutcome.
API Cheat Sheet
SequentialBatchExecutor::new()runs tasks deterministically on the caller thread in iterator order.ParallelBatchExecutor::default()uses available CPU parallelism, scoped standard threads, and a sequential fallback for batches with 100 or fewer declared tasks. UseParallelBatchExecutor::builder().sequential_threshold(0)to force parallel workers for every non-empty batch.BatchOutcome::failures()returns failure records sorted by zero-based task index.BatchCallResult::values()storesSome(value)only for successful callables; failed and panicked callables haveNone.BatchProcessResult::processed_count()is the delegate-reported success count. It can differ fromcompleted_count()for processors that report affected rows or similar target-side counts.ChunkedBatchProcessError<E>carries the partial aggregate result for count mismatches and delegate failures.
Project Layout
src/execute: batch execution traits, outcomes, count mismatch errors, task failures, and execution adapters.src/execute/impls: standard-library batch executor implementations.src/process: data-item batch processor traits, results, and processing errors.src/process/impls: consumer-backed processors and the chunked processor.src/utils: crate-internal utilities shared by execution and processing.tests/execute: behavior tests for batch execution, progress callbacks, failures, panics, outcomes, and count mismatches.tests/process: behavior tests for direct processing, chunking, delegate errors, and progress callbacks.tests/utils: behavior tests for shared internal utility behavior.tests/docs: README consistency checks.
Documentation
- API documentation: docs.rs/qubit-batch
- Crate package: crates.io/crates/qubit-batch
- Source repository: github.com/qubit-ltd/rs-batch
Testing and CI
Run the fast local checks from the crate root:
To match the repository CI environment, run:
./align-ci.sh aligns the local toolchain and CI-related configuration before
./ci-check.sh runs the same checks used by the pipeline. Use ./coverage.sh
when changing behavior that should be reflected in coverage reports.
Contributing
Issues and pull requests are welcome. Please keep changes focused, add or update tests when behavior changes, and update this README or rustdoc when public API or user-visible behavior changes.
By contributing, you agree that your contribution is licensed under the same Apache License, Version 2.0 as this project.
License and Copyright
Copyright (c) 2026. Haixing Hu.
This software is licensed under the Apache License, Version 2.0.
Author and Maintenance
Haixing Hu — Qubit Co. Ltd.
| Repository | github.com/qubit-ltd/rs-batch |
| API documentation | docs.rs/qubit-batch |
| Crate | crates.io/crates/qubit-batch |