pub struct CdcBuffer { /* private fields */ }Expand description
CDC event buffer — circular buffer of change events.
Splits the “next LSN” counter (write-contended on every emit)
from the event ring (short-hold push/pop) so that concurrent
emitters don’t serialise on a single RwLock. The previous
design used one RwLock<CdcState> that turned every insert
into a write-lock acquire, capping 16-way concurrent writes
at ~1000 ops/s (each writer paid ~1ms queueing for the same
mutex even though the work it guarded was a one-line VecDeque
push).
New layout:
- LSN is an
AtomicU64, assigned withfetch_add(1). Zero contention. - Events are guarded by a
parking_lot::Mutex<VecDeque>. The critical section ispop_front (if full) + push_back— microseconds at most, parking-free at low contention.
Readers (poll, current_lsn, stats) take the same mutex
briefly; they’re cold paths compared to the write hot path.
Implementations§
Source§impl CdcBuffer
impl CdcBuffer
Sourcepub fn emit(
&self,
operation: ChangeOperation,
collection: &str,
entity_id: u64,
entity_kind: &str,
) -> u64
pub fn emit( &self, operation: ChangeOperation, collection: &str, entity_id: u64, entity_kind: &str, ) -> u64
Emit a change event into the buffer. changed_columns
defaults to None for backwards compatibility; call sites
that have a damage vector available should use
Self::emit_with_columns instead.
Sourcepub fn emit_with_columns(
&self,
operation: ChangeOperation,
collection: &str,
entity_id: u64,
entity_kind: &str,
changed_columns: Option<Vec<String>>,
) -> u64
pub fn emit_with_columns( &self, operation: ChangeOperation, collection: &str, entity_id: u64, entity_kind: &str, changed_columns: Option<Vec<String>>, ) -> u64
Emit a change event with an optional list of column names
that were affected. Use from update paths that have already
computed a RowDamageVector
so CDC consumers can filter by touched column without re-diffing.
Sourcepub fn emit_batch_same_collection<I>(
&self,
operation: ChangeOperation,
collection: &str,
entity_kind: &str,
entity_ids: I,
) -> Vec<u64>
pub fn emit_batch_same_collection<I>( &self, operation: ChangeOperation, collection: &str, entity_kind: &str, entity_ids: I, ) -> Vec<u64>
Emit many same-collection events with one LSN reservation and one ring-buffer lock. This is used by bulk insert paths that do not need per-row logical-WAL records.
Sourcepub fn emit_kv(
&self,
operation: ChangeOperation,
collection: &str,
key: &str,
entity_id: u64,
before: Option<Value>,
after: Option<Value>,
) -> u64
pub fn emit_kv( &self, operation: ChangeOperation, collection: &str, key: &str, entity_id: u64, before: Option<Value>, after: Option<Value>, ) -> u64
Emit a committed logical KV event into the same CDC ring used by
result-cache invalidation and /changes consumers.
Sourcepub fn poll(&self, since_lsn: u64, max_count: usize) -> Vec<ChangeEvent>
pub fn poll(&self, since_lsn: u64, max_count: usize) -> Vec<ChangeEvent>
Poll for events since a given LSN.
Sourcepub fn current_lsn(&self) -> u64
pub fn current_lsn(&self) -> u64
Get the current (latest) LSN.
Sourcepub fn set_current_lsn(&self, lsn: u64)
pub fn set_current_lsn(&self, lsn: u64)
Restore the LSN cursor after process restart. Only advances; never rewinds. Under concurrent emit this is guarded by a compare-exchange loop.
Sourcepub fn oldest_lsn(&self) -> Option<u64>
pub fn oldest_lsn(&self) -> Option<u64>
Get the oldest available LSN (or None if empty).
Auto Trait Implementations§
impl !Freeze for CdcBuffer
impl !RefUnwindSafe for CdcBuffer
impl Send for CdcBuffer
impl Sync for CdcBuffer
impl Unpin for CdcBuffer
impl UnsafeUnpin for CdcBuffer
impl UnwindSafe for CdcBuffer
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left is true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left(&self) returns true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§impl<T> IntoRequest<T> for T
impl<T> IntoRequest<T> for T
Source§fn into_request(self) -> Request<T>
fn into_request(self) -> Request<T>
T in a tonic::Request