Skip to main content

reddb_server/runtime/
impl_cdc.rs

1use super::*;
2use crate::application::entity::metadata_to_json;
3use crate::replication::cdc::{server_json_to_wire_json, ChangeRecord};
4use crate::runtime::impl_core::current_connection_id;
5use std::time::{SystemTime, UNIX_EPOCH};
6
7impl RedDBRuntime {
8    fn latest_metadata_for(
9        &self,
10        collection: &str,
11        entity_id: u64,
12    ) -> Option<reddb_wire::replication::ChangeRecordJsonValue> {
13        self.inner
14            .db
15            .store()
16            .get_metadata(collection, EntityId::new(entity_id))
17            .map(|metadata| server_json_to_wire_json(metadata_to_json(&metadata)))
18    }
19
20    /// Emit a CDC record without invalidating the result cache.
21    ///
22    /// Used by `MutationEngine::append_batch` which calls
23    /// `invalidate_result_cache` once for the whole batch before this
24    /// loop, avoiding N write-lock acquisitions.
25    pub(crate) fn cdc_emit_no_cache_invalidate(
26        &self,
27        operation: crate::replication::cdc::ChangeOperation,
28        collection: &str,
29        entity_id: u64,
30        entity_kind: &str,
31    ) -> u64 {
32        let lsn = self
33            .inner
34            .cdc
35            .emit(operation, collection, entity_id, entity_kind);
36
37        // Append to logical WAL replication buffer (if primary mode)
38        if let Some(ref primary) = self.inner.db.replication {
39            let store = self.inner.db.store();
40            let entity = if operation == crate::replication::cdc::ChangeOperation::Delete {
41                None
42            } else {
43                store.get(collection, EntityId::new(entity_id))
44            };
45            let record = ChangeRecord {
46                term: self.current_replication_term(),
47                lsn,
48                timestamp: SystemTime::now()
49                    .duration_since(UNIX_EPOCH)
50                    .unwrap_or_default()
51                    .as_millis() as u64,
52                operation,
53                collection: collection.to_string(),
54                entity_id,
55                entity_kind: entity_kind.to_string(),
56                entity_bytes: entity
57                    .as_ref()
58                    .map(|e| UnifiedStore::serialize_entity(e, store.format_version())),
59                metadata: self.latest_metadata_for(collection, entity_id),
60                refresh_records: None,
61            };
62            let encoded = record.encode();
63            primary.append_logical_record(record.lsn, encoded);
64        }
65        lsn
66    }
67
68    pub(crate) fn cdc_emit_insert_batch_no_cache_invalidate(
69        &self,
70        collection: &str,
71        ids: &[EntityId],
72        entity_kind: &str,
73    ) -> Vec<u64> {
74        if ids.is_empty() {
75            return Vec::new();
76        }
77
78        // Without logical replication, CDC only needs the in-memory event
79        // ring. Reserve all LSNs and push the batch under one mutex instead
80        // of taking the ring lock once per inserted row.
81        if self.inner.db.replication.is_none() {
82            return self.inner.cdc.emit_batch_same_collection(
83                crate::replication::cdc::ChangeOperation::Insert,
84                collection,
85                entity_kind,
86                ids.iter().map(|id| id.raw()),
87            );
88        }
89
90        // Replication needs one logical-WAL record per entity with the
91        // serialized entity bytes, so keep the existing per-row path.
92        ids.iter()
93            .map(|id| {
94                self.cdc_emit_no_cache_invalidate(
95                    crate::replication::cdc::ChangeOperation::Insert,
96                    collection,
97                    id.raw(),
98                    entity_kind,
99                )
100            })
101            .collect()
102    }
103
104    pub fn cdc_emit(
105        &self,
106        operation: crate::replication::cdc::ChangeOperation,
107        collection: &str,
108        entity_id: u64,
109        entity_kind: &str,
110    ) -> u64 {
111        let lsn = self
112            .inner
113            .cdc
114            .emit(operation, collection, entity_id, entity_kind);
115        // Perf: prior to this we called `invalidate_result_cache()`
116        // which wipes EVERY cached query, across every table, under
117        // a write lock — turning each INSERT into a serialisation
118        // point for all readers. Swap to the per-table variant so
119        // unrelated query caches survive.
120        self.invalidate_result_cache_for_table(collection);
121
122        // Append to logical WAL replication buffer (if primary mode)
123        if let Some(ref primary) = self.inner.db.replication {
124            let store = self.inner.db.store();
125            let entity = if operation == crate::replication::cdc::ChangeOperation::Delete {
126                None
127            } else {
128                store.get(collection, EntityId::new(entity_id))
129            };
130            let record = ChangeRecord {
131                term: self.current_replication_term(),
132                lsn,
133                timestamp: SystemTime::now()
134                    .duration_since(UNIX_EPOCH)
135                    .unwrap_or_default()
136                    .as_millis() as u64,
137                operation,
138                collection: collection.to_string(),
139                entity_id,
140                entity_kind: entity_kind.to_string(),
141                entity_bytes: entity
142                    .as_ref()
143                    .map(|entity| UnifiedStore::serialize_entity(entity, store.format_version())),
144                metadata: self.latest_metadata_for(collection, entity_id),
145                refresh_records: None,
146            };
147            let encoded = record.encode();
148            primary.append_logical_record(record.lsn, encoded);
149        }
150        lsn
151    }
152
153    pub(crate) fn cdc_emit_kv(
154        &self,
155        operation: crate::replication::cdc::ChangeOperation,
156        collection: &str,
157        key: &str,
158        entity_id: u64,
159        before: Option<crate::json::Value>,
160        after: Option<crate::json::Value>,
161    ) -> u64 {
162        let lsn = self
163            .inner
164            .cdc
165            .emit_kv(operation, collection, key, entity_id, before, after);
166        self.inner.kv_stats.incr_watch_events_emitted();
167        self.invalidate_result_cache_for_table(collection);
168        lsn
169    }
170
171    pub(crate) fn record_kv_watch_event(
172        &self,
173        operation: crate::replication::cdc::ChangeOperation,
174        collection: &str,
175        key: &str,
176        entity_id: u64,
177        before: Option<crate::json::Value>,
178        after: Option<crate::json::Value>,
179    ) {
180        if self.current_xid().is_some() {
181            let conn_id = current_connection_id();
182            let event = crate::replication::cdc::KvWatchEvent {
183                collection: collection.to_string(),
184                key: key.to_string(),
185                op: operation,
186                before,
187                after,
188                lsn: 0,
189                committed_at: 0,
190                dropped_event_count: 0,
191            };
192            self.inner
193                .pending_kv_watch_events
194                .write()
195                .entry(conn_id)
196                .or_default()
197                .push(event);
198            return;
199        }
200
201        self.cdc_emit_kv(operation, collection, key, entity_id, before, after);
202    }
203
204    pub(crate) fn cdc_emit_prebuilt(
205        &self,
206        operation: crate::replication::cdc::ChangeOperation,
207        collection: &str,
208        entity: &UnifiedEntity,
209        entity_kind: &str,
210        metadata: Option<&crate::storage::Metadata>,
211        invalidate_cache: bool,
212    ) -> u64 {
213        self.cdc_emit_prebuilt_with_columns(
214            operation,
215            collection,
216            entity,
217            entity_kind,
218            metadata,
219            invalidate_cache,
220            None,
221        )
222    }
223
224    /// `cdc_emit_prebuilt` plus the list of column names whose values
225    /// changed on this update. Callers that have already computed a
226    /// `RowDamageVector` pass it here so downstream CDC consumers can
227    /// filter events by touched column without re-diffing.
228    /// `changed_columns` is only meaningful for `Update` operations —
229    /// insert and delete events ignore it.
230    pub(crate) fn cdc_emit_prebuilt_with_columns(
231        &self,
232        operation: crate::replication::cdc::ChangeOperation,
233        collection: &str,
234        entity: &UnifiedEntity,
235        entity_kind: &str,
236        metadata: Option<&crate::storage::Metadata>,
237        invalidate_cache: bool,
238        changed_columns: Option<Vec<String>>,
239    ) -> u64 {
240        if invalidate_cache {
241            self.invalidate_result_cache();
242        }
243
244        let public_id = entity.logical_id().raw();
245        let lsn = self.inner.cdc.emit_with_columns(
246            operation,
247            collection,
248            public_id,
249            entity_kind,
250            changed_columns,
251        );
252
253        if let Some(ref primary) = self.inner.db.replication {
254            let store = self.inner.db.store();
255            let record = ChangeRecord {
256                term: self.current_replication_term(),
257                lsn,
258                timestamp: SystemTime::now()
259                    .duration_since(UNIX_EPOCH)
260                    .unwrap_or_default()
261                    .as_millis() as u64,
262                operation,
263                collection: collection.to_string(),
264                entity_id: entity.id.raw(),
265                entity_kind: entity_kind.to_string(),
266                entity_bytes: Some(UnifiedStore::serialize_entity(
267                    entity,
268                    store.format_version(),
269                )),
270                metadata: metadata
271                    .map(metadata_to_json)
272                    .map(server_json_to_wire_json)
273                    .or_else(|| self.latest_metadata_for(collection, entity.id.raw())),
274                refresh_records: None,
275            };
276            let encoded = record.encode();
277            primary.append_logical_record(record.lsn, encoded);
278        }
279
280        lsn
281    }
282
283    pub(crate) fn current_replication_term(&self) -> u64 {
284        self.inner.db.options().replication.term
285    }
286
287    pub(crate) fn cdc_emit_prebuilt_batch<'a, I>(
288        &self,
289        operation: crate::replication::cdc::ChangeOperation,
290        entity_kind: &str,
291        items: I,
292        invalidate_cache: bool,
293    ) where
294        I: IntoIterator<
295            Item = (
296                &'a str,
297                &'a UnifiedEntity,
298                Option<&'a crate::storage::Metadata>,
299            ),
300        >,
301    {
302        let items: Vec<(&str, &UnifiedEntity, Option<&crate::storage::Metadata>)> =
303            items.into_iter().collect();
304        if items.is_empty() {
305            return;
306        }
307
308        if invalidate_cache {
309            self.invalidate_result_cache();
310        }
311
312        for (collection, entity, metadata) in items {
313            self.cdc_emit_prebuilt(operation, collection, entity, entity_kind, metadata, false);
314        }
315    }
316
317    /// Poll CDC events since a given LSN.
318    pub fn cdc_poll(
319        &self,
320        since_lsn: u64,
321        max_count: usize,
322    ) -> Vec<crate::replication::cdc::ChangeEvent> {
323        self.inner.cdc.poll(since_lsn, max_count)
324    }
325
326    /// PLAN.md Phase 11.4 — current CDC LSN. Public mutation
327    /// surfaces (HTTP query, gRPC entity ops) call this immediately
328    /// after a successful write to feed `enforce_commit_policy`.
329    pub fn cdc_current_lsn(&self) -> u64 {
330        self.inner.cdc.current_lsn()
331    }
332
333    pub fn kv_watch_events_since(
334        &self,
335        collection: &str,
336        key: &str,
337        since_lsn: u64,
338        max_count: usize,
339    ) -> Vec<crate::replication::cdc::KvWatchEvent> {
340        self.inner
341            .cdc
342            .poll(since_lsn, max_count)
343            .into_iter()
344            .filter_map(|event| event.kv)
345            .filter(|event| event.collection == collection && event.key == key)
346            .collect()
347    }
348
349    pub fn kv_watch_events_since_prefix(
350        &self,
351        collection: &str,
352        prefix: &str,
353        since_lsn: u64,
354        max_count: usize,
355    ) -> Vec<crate::replication::cdc::KvWatchEvent> {
356        self.inner
357            .cdc
358            .poll(since_lsn, max_count)
359            .into_iter()
360            .filter_map(|event| event.kv)
361            .filter(|event| event.collection == collection && event.key.starts_with(prefix))
362            .collect()
363    }
364
365    pub(crate) fn kv_watch_subscribe<'a>(
366        &'a self,
367        collection: impl Into<String>,
368        key: impl Into<String>,
369        from_lsn: Option<u64>,
370    ) -> crate::runtime::kv_watch::KvWatchStream<'a> {
371        crate::runtime::kv_watch::KvWatchStream::subscribe(
372            &self.inner.cdc,
373            &self.inner.kv_stats,
374            collection,
375            key,
376            from_lsn,
377            self.kv_watch_idle_timeout_ms(),
378        )
379    }
380
381    pub(crate) fn kv_watch_subscribe_prefix<'a>(
382        &'a self,
383        collection: impl Into<String>,
384        prefix: impl Into<String>,
385        from_lsn: Option<u64>,
386    ) -> crate::runtime::kv_watch::KvWatchStream<'a> {
387        crate::runtime::kv_watch::KvWatchStream::subscribe_prefix(
388            &self.inner.cdc,
389            &self.inner.kv_stats,
390            collection,
391            prefix,
392            from_lsn,
393            self.kv_watch_idle_timeout_ms(),
394        )
395    }
396
397    pub(crate) fn kv_watch_idle_timeout_ms(&self) -> u64 {
398        self.config_u64("red.config.kv.watch.idle_timeout_ms", 60_000)
399    }
400}