streamweave-offset
Offset management for StreamWeave
Track processing positions for exactly-once processing and resumable pipelines.
The streamweave-offset package provides offset tracking and management for StreamWeave pipelines. It enables exactly-once processing guarantees by tracking the position of processed items in source streams, allowing pipelines to resume from where they left off after restarts.
β¨ Key Features
- Offset Types: Sequence, Timestamp, Custom, Earliest, Latest
- Offset Storage: In-memory and file-based storage backends
- Offset Tracker: High-level API for offset management
- Commit Strategies: Auto, Periodic, and Manual commit modes
- Reset Policies: Configurable behavior when no offset is found
- Persistence: File-based persistence for recovery after restarts
π¦ Installation
Add this to your Cargo.toml:
[]
= "0.3.0"
π Quick Start
Basic Offset Tracking
use ;
// Create an in-memory offset store
let store = Boxnew;
// Create an offset tracker with auto-commit
let tracker = new;
// Record processed offsets
tracker.record?;
tracker.record?;
// Get the current offset
let current = tracker.get_offset?;
assert_eq!;
File-Based Persistence
use ;
// Create a file-based offset store
let store = Boxnew;
// Create tracker
let tracker = new;
// Record offsets (automatically persisted)
tracker.record?;
// After restart, offsets are automatically loaded
let tracker2 = new;
let offset = tracker2.get_offset?;
// offset is Sequence(1000)
π API Overview
Offset Enum
The Offset enum represents different types of offsets:
Offset Types:
- Sequence: Numeric sequence numbers (e.g., Kafka partition offsets)
- Timestamp: Time-based offsets (e.g., event timestamps)
- Custom: String-based offsets (e.g., file positions, custom IDs)
- Earliest: Start from the beginning of the stream
- Latest: Start from the latest available position
OffsetStore Trait
The OffsetStore trait defines the interface for offset storage:
Implementations:
InMemoryOffsetStore- In-memory storage (for testing)FileOffsetStore- File-based persistence (JSON)
OffsetTracker
The OffsetTracker provides a high-level API for offset management:
Key Methods:
get_offset(source)- Get current committed offsetrecord(source, offset)- Record a processed offsetcommit(source)- Manually commit pending offsetcommit_all()- Commit all pending offsetsreset(source, offset)- Reset offset to specific valueclear(source)- Clear offset for a source
CommitStrategy
The CommitStrategy enum defines when offsets are committed:
OffsetResetPolicy
The OffsetResetPolicy enum defines behavior when no offset is found:
π Usage Examples
Auto-Commit Strategy
Commit offsets immediately after each item:
use ;
let store = Boxnew;
let tracker = new;
// Each record is immediately committed
tracker.record?;
tracker.record?;
// Offset is immediately available
let offset = tracker.get_offset?;
assert_eq!;
Periodic Commit Strategy
Commit offsets every N items:
use ;
let store = Boxnew;
let tracker = with_strategy;
// Record 9 items (not yet committed)
for i in 1..=9
assert!;
// 10th item triggers commit
tracker.record?;
let offset = tracker.get_offset?;
assert_eq!;
Manual Commit Strategy
Commit offsets only when explicitly requested:
use ;
let store = Boxnew;
let tracker = with_strategy;
// Record offsets (not committed)
tracker.record?;
tracker.record?;
// Check pending offsets
let pending = tracker.get_all_pending?;
assert_eq!;
// Manually commit
tracker.commit?;
// Now committed
let offset = tracker.get_offset?;
assert_eq!;
Offset Reset Policies
Handle missing offsets with reset policies:
use ;
// Earliest policy (default) - start from beginning
let store = Boxnew;
let tracker = new
.with_reset_policy;
let offset = tracker.get_offset?;
assert_eq!;
// Latest policy - start from latest
let store = Boxnew;
let tracker = new
.with_reset_policy;
let offset = tracker.get_offset?;
assert_eq!;
// None policy - fail if no offset found
let store = Boxnew;
let tracker = new
.with_reset_policy;
let result = tracker.get_offset;
assert!;
File-Based Persistence
Persist offsets to disk for recovery:
use ;
// Create file-based store
let store = Boxnew;
let tracker = new;
// Record offsets (automatically persisted to disk)
tracker.record?;
tracker.record?;
// After restart, create new tracker with same file
let store2 = Boxnew;
let tracker2 = new;
// Offsets are automatically loaded
let offset1 = tracker2.get_offset?;
let offset2 = tracker2.get_offset?;
assert_eq!;
assert_eq!;
Offset Management in Consumers
Use offsets in consumer implementations:
use ;
use Consumer;
// Load or create offset tracker
let store = Boxnew;
let tracker = new;
// Get starting offset
let start_offset = tracker.get_offset?;
// Create consumer starting from offset
let consumer = new
.with_start_offset;
// Process messages
let mut current_offset = start_offset;
for message in messages
Offset Recovery Scenarios
Handle recovery after failures:
use ;
// After restart, load offsets
let store = Boxnew;
let tracker = new
.with_reset_policy;
// Get offset for each source
let sources = vec!;
for source in sources
Multiple Sources
Track offsets for multiple sources:
use ;
let store = Boxnew;
let tracker = new;
// Track offsets for multiple sources
tracker.record?;
tracker.record?;
tracker.record?;
// Get all committed offsets
let all_offsets = tracker.get_all_committed?;
assert_eq!;
// Commit all pending offsets
tracker.commit_all?;
ποΈ Architecture
Offset tracking integrates with consumers for exactly-once processing:
βββββββββββββββ
β Consumer ββββprocesses itemβββ> Offset
βββββββββββββββ β
β
βΌ
βββββββββββββββ ββββββββββββββββ
βOffsetTrackerββββrecordsβββ> β OffsetStore β
βββββββββββββββ ββββββββββββββββ
β β
β βΌ
ββββcommits (based on strategy)βββ> Persistence
Offset Flow:
- Consumer processes item at offset N
- OffsetTracker records offset N
- Based on CommitStrategy, offset is committed
- OffsetStore persists offset
- On restart, offsets are loaded and processing resumes
π§ Configuration
Commit Strategies
Auto (Default):
- Commits after each item
- Maximum safety, higher I/O
- Best for critical data
Periodic:
- Commits every N items
- Balance between safety and performance
- Best for high-throughput scenarios
Manual:
- Commits only when requested
- Maximum control
- Best for transactional scenarios
Reset Policies
Earliest (Default):
- Starts from beginning if no offset found
- Safe default for new sources
- May reprocess data
Latest:
- Starts from latest position
- Skips old data
- Best for real-time processing
None:
- Fails if no offset found
- Explicit error handling
- Best for strict requirements
π Error Handling
Offset operations return OffsetResult<T> which can be:
β‘ Performance Considerations
- In-Memory Store: Fast but not persistent
- File Store: Persistent with JSON serialization overhead
- Auto Commit: Higher I/O, maximum safety
- Periodic Commit: Reduced I/O, batch efficiency
- Manual Commit: Minimal I/O, maximum control
π Examples
For more examples, see:
π Dependencies
streamweave-offset depends on:
chrono- Timestamp supportserde- Serialization supportserde_json- JSON serializationstreamweave- Core traits
π― Use Cases
Offset management is used for:
- Exactly-Once Processing: Track processed items to avoid duplicates
- Resumable Pipelines: Resume from last processed position after restart
- Kafka Integration: Track partition offsets for consumer groups
- File Processing: Track line numbers or byte positions
- Recovery: Recover from failures without data loss
π Documentation
π See Also
- streamweave - Core traits
- streamweave-message - Message envelopes
- streamweave-transaction - Transaction support
π€ Contributing
Contributions are welcome! Please see the Contributing Guide for details.
π License
This project is licensed under the CC BY-SA 4.0 license.