pub struct CompositeCursor {
pub positions: HashMap<u16, Arc<str>>,
}Expand description
Composite cursor tracking position across multiple shards.
Fields§
§positions: HashMap<u16, Arc<str>>Per-shard positions (shard_id -> stream_id).
Stored as Arc<str> so internal copies (e.g. cursor.clone()
inside the poll merger) bump a refcount instead of duplicating
each id’s bytes.
Implementations§
Source§impl CompositeCursor
impl CompositeCursor
Sourcepub fn encode(&self) -> Result<String, ConsumerError>
pub fn encode(&self) -> Result<String, ConsumerError>
Encode the cursor as a base64 string.
Pre-fix used unwrap_or_default(), which silently
produced an empty string on serialization failure. The
empty cursor then base64-encoded to an empty string and
the consumer’s next poll restarted from the beginning of
the stream — silent rewind. For the current positions
schema (HashMap<u16, Arc<str>>), serialization is
infallible, so the failure path is unreachable.
We surface that as a ConsumerError::InvalidCursor rather
than a panic. poll() is an async fn, and a panic that
propagates from there can abort the surrounding tokio
runtime worker. Returning Err lets poll() stay
non-panicking even if a future schema change breaks the
invariant; the caller will see a structured error and can
retry / log instead of taking down a worker.
Sourcepub fn decode(s: &str) -> Result<Self, ConsumerError>
pub fn decode(s: &str) -> Result<Self, ConsumerError>
Decode a cursor from a base64 string.
Sourcepub fn set(&mut self, shard_id: u16, position: impl Into<Arc<str>>)
pub fn set(&mut self, shard_id: u16, position: impl Into<Arc<str>>)
Set the position for a specific shard.
Accepts anything that converts into an Arc<str> — notably
String, &str, and Arc<str> itself. This lets adapters
hand us a freshly-allocated String (becomes a single boxed
allocation) without forcing a second copy for the cursor.
Unconditional: skips the backend-format guard and the
monotonicity guard. Use Self::set_checked from production
poll paths — the documented “refuse to advance across a
JetStream → Redis cursor format change” protection lives in
the checked variant, not here. The unchecked variant is kept
public for tests that need to seed a cursor directly.
Sourcepub fn set_checked(&mut self, shard_id: u16, new_id: &str) -> bool
pub fn set_checked(&mut self, shard_id: u16, new_id: &str) -> bool
Set the position for a specific shard, applying the same
backend-format mismatch guard as Self::update_from_events.
Returns true if the write happened, false if it was
refused (format mismatch — error-logged so operators see the
migration). No-existing-position is treated as accept (first
write).
Production poll paths must use this instead of Self::set
so a mid-stream backend migration (JetStream → Redis or vice
versa) is surfaced and refused at the cursor write site
rather than silently overwriting the format and stalling the
caller.
Sourcepub fn update_from_events(&mut self, events: &[StoredEvent])
pub fn update_from_events(&mut self, events: &[StoredEvent])
Update positions from consumed events.
Per-shard CAS routed through compare_stream_ids, which
understands the Redis (<ms>-<seq>) and JetStream (<u64>)
formats numerically and falls back to lex for opaque ids
(ULID, UUIDv7, hex digests). The cursor cannot regress and
decade-rollovers cannot freeze it. Unconditional inserts
would let whichever event for a given shard_id appeared
last in the slice win regardless of stream order; a plain
str::cmp CAS would wedge on the unpadded numeric ids both
built-in adapters emit ("9" > "10" lexicographically).
Trait Implementations§
Source§impl Clone for CompositeCursor
impl Clone for CompositeCursor
Source§fn clone(&self) -> CompositeCursor
fn clone(&self) -> CompositeCursor
1.0.0 (const: unstable) · Source§fn clone_from(&mut self, source: &Self)
fn clone_from(&mut self, source: &Self)
source. Read more