1use super::*;
2use crate::application::entity::metadata_to_json;
3use crate::replication::cdc::{server_json_to_wire_json, ChangeRecord};
4use crate::runtime::impl_core::current_connection_id;
5use std::time::{SystemTime, UNIX_EPOCH};
6
7impl RedDBRuntime {
8 fn latest_metadata_for(
9 &self,
10 collection: &str,
11 entity_id: u64,
12 ) -> Option<reddb_wire::replication::ChangeRecordJsonValue> {
13 self.inner
14 .db
15 .store()
16 .get_metadata(collection, EntityId::new(entity_id))
17 .map(|metadata| server_json_to_wire_json(metadata_to_json(&metadata)))
18 }
19
20 pub(crate) fn cdc_emit_no_cache_invalidate(
26 &self,
27 operation: crate::replication::cdc::ChangeOperation,
28 collection: &str,
29 entity_id: u64,
30 entity_kind: &str,
31 ) -> u64 {
32 let lsn = self
33 .inner
34 .cdc
35 .emit(operation, collection, entity_id, entity_kind);
36
37 if let Some(ref primary) = self.inner.db.replication {
39 let store = self.inner.db.store();
40 let entity = if operation == crate::replication::cdc::ChangeOperation::Delete {
41 None
42 } else {
43 store.get(collection, EntityId::new(entity_id))
44 };
45 let record = ChangeRecord {
46 term: self.current_replication_term(),
47 lsn,
48 timestamp: SystemTime::now()
49 .duration_since(UNIX_EPOCH)
50 .unwrap_or_default()
51 .as_millis() as u64,
52 operation,
53 collection: collection.to_string(),
54 entity_id,
55 entity_kind: entity_kind.to_string(),
56 entity_bytes: entity
57 .as_ref()
58 .map(|e| UnifiedStore::serialize_entity(e, store.format_version())),
59 metadata: self.latest_metadata_for(collection, entity_id),
60 refresh_records: None,
61 };
62 let encoded = record.encode();
63 primary.append_logical_record(record.lsn, encoded);
64 }
65 lsn
66 }
67
68 pub(crate) fn cdc_emit_insert_batch_no_cache_invalidate(
69 &self,
70 collection: &str,
71 ids: &[EntityId],
72 entity_kind: &str,
73 ) -> Vec<u64> {
74 if ids.is_empty() {
75 return Vec::new();
76 }
77
78 if self.inner.db.replication.is_none() {
82 return self.inner.cdc.emit_batch_same_collection(
83 crate::replication::cdc::ChangeOperation::Insert,
84 collection,
85 entity_kind,
86 ids.iter().map(|id| id.raw()),
87 );
88 }
89
90 ids.iter()
93 .map(|id| {
94 self.cdc_emit_no_cache_invalidate(
95 crate::replication::cdc::ChangeOperation::Insert,
96 collection,
97 id.raw(),
98 entity_kind,
99 )
100 })
101 .collect()
102 }
103
104 pub fn cdc_emit(
105 &self,
106 operation: crate::replication::cdc::ChangeOperation,
107 collection: &str,
108 entity_id: u64,
109 entity_kind: &str,
110 ) -> u64 {
111 let lsn = self
112 .inner
113 .cdc
114 .emit(operation, collection, entity_id, entity_kind);
115 self.invalidate_result_cache_for_table(collection);
121
122 if let Some(ref primary) = self.inner.db.replication {
124 let store = self.inner.db.store();
125 let entity = if operation == crate::replication::cdc::ChangeOperation::Delete {
126 None
127 } else {
128 store.get(collection, EntityId::new(entity_id))
129 };
130 let record = ChangeRecord {
131 term: self.current_replication_term(),
132 lsn,
133 timestamp: SystemTime::now()
134 .duration_since(UNIX_EPOCH)
135 .unwrap_or_default()
136 .as_millis() as u64,
137 operation,
138 collection: collection.to_string(),
139 entity_id,
140 entity_kind: entity_kind.to_string(),
141 entity_bytes: entity
142 .as_ref()
143 .map(|entity| UnifiedStore::serialize_entity(entity, store.format_version())),
144 metadata: self.latest_metadata_for(collection, entity_id),
145 refresh_records: None,
146 };
147 let encoded = record.encode();
148 primary.append_logical_record(record.lsn, encoded);
149 }
150 lsn
151 }
152
153 pub(crate) fn cdc_emit_kv(
154 &self,
155 operation: crate::replication::cdc::ChangeOperation,
156 collection: &str,
157 key: &str,
158 entity_id: u64,
159 before: Option<crate::json::Value>,
160 after: Option<crate::json::Value>,
161 ) -> u64 {
162 let lsn = self
163 .inner
164 .cdc
165 .emit_kv(operation, collection, key, entity_id, before, after);
166 self.inner.kv_stats.incr_watch_events_emitted();
167 self.invalidate_result_cache_for_table(collection);
168 lsn
169 }
170
171 pub(crate) fn record_kv_watch_event(
172 &self,
173 operation: crate::replication::cdc::ChangeOperation,
174 collection: &str,
175 key: &str,
176 entity_id: u64,
177 before: Option<crate::json::Value>,
178 after: Option<crate::json::Value>,
179 ) {
180 if self.current_xid().is_some() {
181 let conn_id = current_connection_id();
182 let event = crate::replication::cdc::KvWatchEvent {
183 collection: collection.to_string(),
184 key: key.to_string(),
185 op: operation,
186 before,
187 after,
188 lsn: 0,
189 committed_at: 0,
190 dropped_event_count: 0,
191 };
192 self.inner
193 .pending_kv_watch_events
194 .write()
195 .entry(conn_id)
196 .or_default()
197 .push(event);
198 return;
199 }
200
201 self.cdc_emit_kv(operation, collection, key, entity_id, before, after);
202 }
203
204 pub(crate) fn cdc_emit_prebuilt(
205 &self,
206 operation: crate::replication::cdc::ChangeOperation,
207 collection: &str,
208 entity: &UnifiedEntity,
209 entity_kind: &str,
210 metadata: Option<&crate::storage::Metadata>,
211 invalidate_cache: bool,
212 ) -> u64 {
213 self.cdc_emit_prebuilt_with_columns(
214 operation,
215 collection,
216 entity,
217 entity_kind,
218 metadata,
219 invalidate_cache,
220 None,
221 )
222 }
223
224 pub(crate) fn cdc_emit_prebuilt_with_columns(
231 &self,
232 operation: crate::replication::cdc::ChangeOperation,
233 collection: &str,
234 entity: &UnifiedEntity,
235 entity_kind: &str,
236 metadata: Option<&crate::storage::Metadata>,
237 invalidate_cache: bool,
238 changed_columns: Option<Vec<String>>,
239 ) -> u64 {
240 if invalidate_cache {
241 self.invalidate_result_cache();
242 }
243
244 let public_id = entity.logical_id().raw();
245 let lsn = self.inner.cdc.emit_with_columns(
246 operation,
247 collection,
248 public_id,
249 entity_kind,
250 changed_columns,
251 );
252
253 if let Some(ref primary) = self.inner.db.replication {
254 let store = self.inner.db.store();
255 let record = ChangeRecord {
256 term: self.current_replication_term(),
257 lsn,
258 timestamp: SystemTime::now()
259 .duration_since(UNIX_EPOCH)
260 .unwrap_or_default()
261 .as_millis() as u64,
262 operation,
263 collection: collection.to_string(),
264 entity_id: entity.id.raw(),
265 entity_kind: entity_kind.to_string(),
266 entity_bytes: Some(UnifiedStore::serialize_entity(
267 entity,
268 store.format_version(),
269 )),
270 metadata: metadata
271 .map(metadata_to_json)
272 .map(server_json_to_wire_json)
273 .or_else(|| self.latest_metadata_for(collection, entity.id.raw())),
274 refresh_records: None,
275 };
276 let encoded = record.encode();
277 primary.append_logical_record(record.lsn, encoded);
278 }
279
280 lsn
281 }
282
283 pub(crate) fn current_replication_term(&self) -> u64 {
284 self.inner.db.options().replication.term
285 }
286
287 pub(crate) fn cdc_emit_prebuilt_batch<'a, I>(
288 &self,
289 operation: crate::replication::cdc::ChangeOperation,
290 entity_kind: &str,
291 items: I,
292 invalidate_cache: bool,
293 ) where
294 I: IntoIterator<
295 Item = (
296 &'a str,
297 &'a UnifiedEntity,
298 Option<&'a crate::storage::Metadata>,
299 ),
300 >,
301 {
302 let items: Vec<(&str, &UnifiedEntity, Option<&crate::storage::Metadata>)> =
303 items.into_iter().collect();
304 if items.is_empty() {
305 return;
306 }
307
308 if invalidate_cache {
309 self.invalidate_result_cache();
310 }
311
312 for (collection, entity, metadata) in items {
313 self.cdc_emit_prebuilt(operation, collection, entity, entity_kind, metadata, false);
314 }
315 }
316
317 pub fn cdc_poll(
319 &self,
320 since_lsn: u64,
321 max_count: usize,
322 ) -> Vec<crate::replication::cdc::ChangeEvent> {
323 self.inner.cdc.poll(since_lsn, max_count)
324 }
325
326 pub fn cdc_current_lsn(&self) -> u64 {
330 self.inner.cdc.current_lsn()
331 }
332
333 pub fn kv_watch_events_since(
334 &self,
335 collection: &str,
336 key: &str,
337 since_lsn: u64,
338 max_count: usize,
339 ) -> Vec<crate::replication::cdc::KvWatchEvent> {
340 self.inner
341 .cdc
342 .poll(since_lsn, max_count)
343 .into_iter()
344 .filter_map(|event| event.kv)
345 .filter(|event| event.collection == collection && event.key == key)
346 .collect()
347 }
348
349 pub fn kv_watch_events_since_prefix(
350 &self,
351 collection: &str,
352 prefix: &str,
353 since_lsn: u64,
354 max_count: usize,
355 ) -> Vec<crate::replication::cdc::KvWatchEvent> {
356 self.inner
357 .cdc
358 .poll(since_lsn, max_count)
359 .into_iter()
360 .filter_map(|event| event.kv)
361 .filter(|event| event.collection == collection && event.key.starts_with(prefix))
362 .collect()
363 }
364
365 pub(crate) fn kv_watch_subscribe<'a>(
366 &'a self,
367 collection: impl Into<String>,
368 key: impl Into<String>,
369 from_lsn: Option<u64>,
370 ) -> crate::runtime::kv_watch::KvWatchStream<'a> {
371 crate::runtime::kv_watch::KvWatchStream::subscribe(
372 &self.inner.cdc,
373 &self.inner.kv_stats,
374 collection,
375 key,
376 from_lsn,
377 self.kv_watch_idle_timeout_ms(),
378 )
379 }
380
381 pub(crate) fn kv_watch_subscribe_prefix<'a>(
382 &'a self,
383 collection: impl Into<String>,
384 prefix: impl Into<String>,
385 from_lsn: Option<u64>,
386 ) -> crate::runtime::kv_watch::KvWatchStream<'a> {
387 crate::runtime::kv_watch::KvWatchStream::subscribe_prefix(
388 &self.inner.cdc,
389 &self.inner.kv_stats,
390 collection,
391 prefix,
392 from_lsn,
393 self.kv_watch_idle_timeout_ms(),
394 )
395 }
396
397 pub(crate) fn kv_watch_idle_timeout_ms(&self) -> u64 {
398 self.config_u64("red.config.kv.watch.idle_timeout_ms", 60_000)
399 }
400}