Skip to main content

reddb_server/runtime/
impl_cdc.rs

1use super::*;
2use crate::application::entity::metadata_to_json;
3use crate::replication::cdc::{server_json_to_wire_json, ChangeRecord};
4use crate::runtime::impl_core::current_connection_id;
5use std::time::{SystemTime, UNIX_EPOCH};
6
7impl RedDBRuntime {
8    fn latest_metadata_for(
9        &self,
10        collection: &str,
11        entity_id: u64,
12    ) -> Option<reddb_wire::replication::ChangeRecordJsonValue> {
13        self.inner
14            .db
15            .store()
16            .get_metadata(collection, EntityId::new(entity_id))
17            .map(|metadata| server_json_to_wire_json(metadata_to_json(&metadata)))
18    }
19
20    /// Emit a CDC record without invalidating the result cache.
21    ///
22    /// Used by `MutationEngine::append_batch` which calls
23    /// `invalidate_result_cache` once for the whole batch before this
24    /// loop, avoiding N write-lock acquisitions.
25    pub(crate) fn cdc_emit_no_cache_invalidate(
26        &self,
27        operation: crate::replication::cdc::ChangeOperation,
28        collection: &str,
29        entity_id: u64,
30        entity_kind: &str,
31    ) -> u64 {
32        let lsn = self
33            .inner
34            .cdc
35            .emit(operation, collection, entity_id, entity_kind);
36
37        // Append to logical WAL replication buffer (if primary mode)
38        if let Some(ref primary) = self.inner.db.replication {
39            let store = self.inner.db.store();
40            let entity = if operation == crate::replication::cdc::ChangeOperation::Delete {
41                None
42            } else {
43                store.get(collection, EntityId::new(entity_id))
44            };
45            let record = ChangeRecord {
46                term: self.current_replication_term(),
47                lsn,
48                timestamp: SystemTime::now()
49                    .duration_since(UNIX_EPOCH)
50                    .unwrap_or_default()
51                    .as_millis() as u64,
52                operation,
53                collection: collection.to_string(),
54                entity_id,
55                entity_kind: entity_kind.to_string(),
56                entity_bytes: entity
57                    .as_ref()
58                    .map(|e| UnifiedStore::serialize_entity(e, store.format_version())),
59                metadata: self.latest_metadata_for(collection, entity_id),
60                refresh_records: None,
61                range_id: None,
62                ownership_epoch: None,
63            };
64            let encoded = record.encode();
65            primary.append_logical_record(record.lsn, encoded);
66        }
67        lsn
68    }
69
70    pub(crate) fn cdc_emit_insert_batch_no_cache_invalidate(
71        &self,
72        collection: &str,
73        ids: &[EntityId],
74        entity_kind: &str,
75    ) -> Vec<u64> {
76        if ids.is_empty() {
77            return Vec::new();
78        }
79
80        // Without logical replication, CDC only needs the in-memory event
81        // ring. Reserve all LSNs and push the batch under one mutex instead
82        // of taking the ring lock once per inserted row.
83        if self.inner.db.replication.is_none() {
84            return self.inner.cdc.emit_batch_same_collection(
85                crate::replication::cdc::ChangeOperation::Insert,
86                collection,
87                entity_kind,
88                ids.iter().map(|id| id.raw()),
89            );
90        }
91
92        // Replication needs one logical-WAL record per entity with the
93        // serialized entity bytes, so keep the existing per-row path.
94        ids.iter()
95            .map(|id| {
96                self.cdc_emit_no_cache_invalidate(
97                    crate::replication::cdc::ChangeOperation::Insert,
98                    collection,
99                    id.raw(),
100                    entity_kind,
101                )
102            })
103            .collect()
104    }
105
106    pub fn cdc_emit(
107        &self,
108        operation: crate::replication::cdc::ChangeOperation,
109        collection: &str,
110        entity_id: u64,
111        entity_kind: &str,
112    ) -> u64 {
113        let lsn = self
114            .inner
115            .cdc
116            .emit(operation, collection, entity_id, entity_kind);
117        // Perf: prior to this we called `invalidate_result_cache()`
118        // which wipes EVERY cached query, across every table, under
119        // a write lock — turning each INSERT into a serialisation
120        // point for all readers. Swap to the per-table variant so
121        // unrelated query caches survive.
122        self.invalidate_result_cache_for_table(collection);
123
124        // Append to logical WAL replication buffer (if primary mode)
125        if let Some(ref primary) = self.inner.db.replication {
126            let store = self.inner.db.store();
127            let entity = if operation == crate::replication::cdc::ChangeOperation::Delete {
128                None
129            } else {
130                store.get(collection, EntityId::new(entity_id))
131            };
132            let record = ChangeRecord {
133                term: self.current_replication_term(),
134                lsn,
135                timestamp: SystemTime::now()
136                    .duration_since(UNIX_EPOCH)
137                    .unwrap_or_default()
138                    .as_millis() as u64,
139                operation,
140                collection: collection.to_string(),
141                entity_id,
142                entity_kind: entity_kind.to_string(),
143                entity_bytes: entity
144                    .as_ref()
145                    .map(|entity| UnifiedStore::serialize_entity(entity, store.format_version())),
146                metadata: self.latest_metadata_for(collection, entity_id),
147                refresh_records: None,
148                range_id: None,
149                ownership_epoch: None,
150            };
151            let encoded = record.encode();
152            primary.append_logical_record(record.lsn, encoded);
153        }
154        lsn
155    }
156
157    pub(crate) fn cdc_emit_kv(
158        &self,
159        operation: crate::replication::cdc::ChangeOperation,
160        collection: &str,
161        key: &str,
162        entity_id: u64,
163        before: Option<crate::json::Value>,
164        after: Option<crate::json::Value>,
165    ) -> u64 {
166        let lsn = self
167            .inner
168            .cdc
169            .emit_kv(operation, collection, key, entity_id, before, after);
170        self.inner.kv_stats.incr_watch_events_emitted();
171        self.invalidate_result_cache_for_table(collection);
172        lsn
173    }
174
175    pub(crate) fn record_kv_watch_event(
176        &self,
177        operation: crate::replication::cdc::ChangeOperation,
178        collection: &str,
179        key: &str,
180        entity_id: u64,
181        before: Option<crate::json::Value>,
182        after: Option<crate::json::Value>,
183    ) {
184        if self.current_xid().is_some() {
185            let conn_id = current_connection_id();
186            let event = crate::replication::cdc::KvWatchEvent {
187                collection: collection.to_string(),
188                key: key.to_string(),
189                op: operation,
190                before,
191                after,
192                lsn: 0,
193                committed_at: 0,
194                dropped_event_count: 0,
195            };
196            self.inner
197                .pending_kv_watch_events
198                .write()
199                .entry(conn_id)
200                .or_default()
201                .push(event);
202            return;
203        }
204
205        self.cdc_emit_kv(operation, collection, key, entity_id, before, after);
206    }
207
208    pub(crate) fn cdc_emit_prebuilt(
209        &self,
210        operation: crate::replication::cdc::ChangeOperation,
211        collection: &str,
212        entity: &UnifiedEntity,
213        entity_kind: &str,
214        metadata: Option<&crate::storage::Metadata>,
215        invalidate_cache: bool,
216    ) -> u64 {
217        self.cdc_emit_prebuilt_with_columns(
218            operation,
219            collection,
220            entity,
221            entity_kind,
222            metadata,
223            invalidate_cache,
224            None,
225        )
226    }
227
228    /// `cdc_emit_prebuilt` plus the list of column names whose values
229    /// changed on this update. Callers that have already computed a
230    /// `RowDamageVector` pass it here so downstream CDC consumers can
231    /// filter events by touched column without re-diffing.
232    /// `changed_columns` is only meaningful for `Update` operations —
233    /// insert and delete events ignore it.
234    pub(crate) fn cdc_emit_prebuilt_with_columns(
235        &self,
236        operation: crate::replication::cdc::ChangeOperation,
237        collection: &str,
238        entity: &UnifiedEntity,
239        entity_kind: &str,
240        metadata: Option<&crate::storage::Metadata>,
241        invalidate_cache: bool,
242        changed_columns: Option<Vec<String>>,
243    ) -> u64 {
244        if invalidate_cache {
245            self.invalidate_result_cache();
246        }
247
248        let public_id = entity.logical_id().raw();
249        let lsn = self.inner.cdc.emit_with_columns(
250            operation,
251            collection,
252            public_id,
253            entity_kind,
254            changed_columns,
255        );
256
257        if let Some(ref primary) = self.inner.db.replication {
258            let store = self.inner.db.store();
259            let record = ChangeRecord {
260                term: self.current_replication_term(),
261                lsn,
262                timestamp: SystemTime::now()
263                    .duration_since(UNIX_EPOCH)
264                    .unwrap_or_default()
265                    .as_millis() as u64,
266                operation,
267                collection: collection.to_string(),
268                entity_id: entity.id.raw(),
269                entity_kind: entity_kind.to_string(),
270                entity_bytes: Some(UnifiedStore::serialize_entity(
271                    entity,
272                    store.format_version(),
273                )),
274                metadata: metadata
275                    .map(metadata_to_json)
276                    .map(server_json_to_wire_json)
277                    .or_else(|| self.latest_metadata_for(collection, entity.id.raw())),
278                refresh_records: None,
279                range_id: None,
280                ownership_epoch: None,
281            };
282            let encoded = record.encode();
283            primary.append_logical_record(record.lsn, encoded);
284        }
285
286        lsn
287    }
288
289    pub(crate) fn current_replication_term(&self) -> u64 {
290        self.inner.db.options().replication.term
291    }
292
293    pub(crate) fn cdc_emit_prebuilt_batch<'a, I>(
294        &self,
295        operation: crate::replication::cdc::ChangeOperation,
296        entity_kind: &str,
297        items: I,
298        invalidate_cache: bool,
299    ) where
300        I: IntoIterator<
301            Item = (
302                &'a str,
303                &'a UnifiedEntity,
304                Option<&'a crate::storage::Metadata>,
305            ),
306        >,
307    {
308        let items: Vec<(&str, &UnifiedEntity, Option<&crate::storage::Metadata>)> =
309            items.into_iter().collect();
310        if items.is_empty() {
311            return;
312        }
313
314        if invalidate_cache {
315            self.invalidate_result_cache();
316        }
317
318        for (collection, entity, metadata) in items {
319            self.cdc_emit_prebuilt(operation, collection, entity, entity_kind, metadata, false);
320        }
321    }
322
323    /// Poll CDC events since a given LSN.
324    pub fn cdc_poll(
325        &self,
326        since_lsn: u64,
327        max_count: usize,
328    ) -> Vec<crate::replication::cdc::ChangeEvent> {
329        self.inner.cdc.poll(since_lsn, max_count)
330    }
331
332    /// PLAN.md Phase 11.4 — current CDC LSN. Public mutation
333    /// surfaces (HTTP query, gRPC entity ops) call this immediately
334    /// after a successful write to feed `enforce_commit_policy`.
335    pub fn cdc_current_lsn(&self) -> u64 {
336        self.inner.cdc.current_lsn()
337    }
338
339    pub fn kv_watch_events_since(
340        &self,
341        collection: &str,
342        key: &str,
343        since_lsn: u64,
344        max_count: usize,
345    ) -> Vec<crate::replication::cdc::KvWatchEvent> {
346        self.inner
347            .cdc
348            .poll(since_lsn, max_count)
349            .into_iter()
350            .filter_map(|event| event.kv)
351            .filter(|event| event.collection == collection && event.key == key)
352            .collect()
353    }
354
355    pub fn kv_watch_events_since_prefix(
356        &self,
357        collection: &str,
358        prefix: &str,
359        since_lsn: u64,
360        max_count: usize,
361    ) -> Vec<crate::replication::cdc::KvWatchEvent> {
362        self.inner
363            .cdc
364            .poll(since_lsn, max_count)
365            .into_iter()
366            .filter_map(|event| event.kv)
367            .filter(|event| event.collection == collection && event.key.starts_with(prefix))
368            .collect()
369    }
370
371    pub(crate) fn kv_watch_subscribe<'a>(
372        &'a self,
373        collection: impl Into<String>,
374        key: impl Into<String>,
375        from_lsn: Option<u64>,
376    ) -> crate::runtime::kv_watch::KvWatchStream<'a> {
377        crate::runtime::kv_watch::KvWatchStream::subscribe(
378            &self.inner.cdc,
379            &self.inner.kv_stats,
380            collection,
381            key,
382            from_lsn,
383            self.kv_watch_idle_timeout_ms(),
384        )
385    }
386
387    pub(crate) fn kv_watch_subscribe_prefix<'a>(
388        &'a self,
389        collection: impl Into<String>,
390        prefix: impl Into<String>,
391        from_lsn: Option<u64>,
392    ) -> crate::runtime::kv_watch::KvWatchStream<'a> {
393        crate::runtime::kv_watch::KvWatchStream::subscribe_prefix(
394            &self.inner.cdc,
395            &self.inner.kv_stats,
396            collection,
397            prefix,
398            from_lsn,
399            self.kv_watch_idle_timeout_ms(),
400        )
401    }
402
403    pub(crate) fn kv_watch_idle_timeout_ms(&self) -> u64 {
404        self.config_u64("red.config.kv.watch.idle_timeout_ms", 60_000)
405    }
406}