1use super::*;
2use crate::application::entity::metadata_to_json;
3use crate::replication::cdc::{server_json_to_wire_json, ChangeRecord};
4use crate::runtime::impl_core::current_connection_id;
5use std::time::{SystemTime, UNIX_EPOCH};
6
7impl RedDBRuntime {
8 fn latest_metadata_for(
9 &self,
10 collection: &str,
11 entity_id: u64,
12 ) -> Option<reddb_wire::replication::ChangeRecordJsonValue> {
13 self.inner
14 .db
15 .store()
16 .get_metadata(collection, EntityId::new(entity_id))
17 .map(|metadata| server_json_to_wire_json(metadata_to_json(&metadata)))
18 }
19
20 pub(crate) fn cdc_emit_no_cache_invalidate(
26 &self,
27 operation: crate::replication::cdc::ChangeOperation,
28 collection: &str,
29 entity_id: u64,
30 entity_kind: &str,
31 ) -> u64 {
32 let lsn = self
33 .inner
34 .cdc
35 .emit(operation, collection, entity_id, entity_kind);
36
37 if let Some(ref primary) = self.inner.db.replication {
39 let store = self.inner.db.store();
40 let entity = if operation == crate::replication::cdc::ChangeOperation::Delete {
41 None
42 } else {
43 store.get(collection, EntityId::new(entity_id))
44 };
45 let record = ChangeRecord {
46 term: self.current_replication_term(),
47 lsn,
48 timestamp: SystemTime::now()
49 .duration_since(UNIX_EPOCH)
50 .unwrap_or_default()
51 .as_millis() as u64,
52 operation,
53 collection: collection.to_string(),
54 entity_id,
55 entity_kind: entity_kind.to_string(),
56 entity_bytes: entity
57 .as_ref()
58 .map(|e| UnifiedStore::serialize_entity(e, store.format_version())),
59 metadata: self.latest_metadata_for(collection, entity_id),
60 refresh_records: None,
61 range_id: None,
62 ownership_epoch: None,
63 };
64 let encoded = record.encode();
65 primary.append_logical_record(record.lsn, encoded);
66 }
67 lsn
68 }
69
70 pub(crate) fn cdc_emit_insert_batch_no_cache_invalidate(
71 &self,
72 collection: &str,
73 ids: &[EntityId],
74 entity_kind: &str,
75 ) -> Vec<u64> {
76 if ids.is_empty() {
77 return Vec::new();
78 }
79
80 if self.inner.db.replication.is_none() {
84 return self.inner.cdc.emit_batch_same_collection(
85 crate::replication::cdc::ChangeOperation::Insert,
86 collection,
87 entity_kind,
88 ids.iter().map(|id| id.raw()),
89 );
90 }
91
92 ids.iter()
95 .map(|id| {
96 self.cdc_emit_no_cache_invalidate(
97 crate::replication::cdc::ChangeOperation::Insert,
98 collection,
99 id.raw(),
100 entity_kind,
101 )
102 })
103 .collect()
104 }
105
106 pub fn cdc_emit(
107 &self,
108 operation: crate::replication::cdc::ChangeOperation,
109 collection: &str,
110 entity_id: u64,
111 entity_kind: &str,
112 ) -> u64 {
113 let lsn = self
114 .inner
115 .cdc
116 .emit(operation, collection, entity_id, entity_kind);
117 self.invalidate_result_cache_for_table(collection);
123
124 if let Some(ref primary) = self.inner.db.replication {
126 let store = self.inner.db.store();
127 let entity = if operation == crate::replication::cdc::ChangeOperation::Delete {
128 None
129 } else {
130 store.get(collection, EntityId::new(entity_id))
131 };
132 let record = ChangeRecord {
133 term: self.current_replication_term(),
134 lsn,
135 timestamp: SystemTime::now()
136 .duration_since(UNIX_EPOCH)
137 .unwrap_or_default()
138 .as_millis() as u64,
139 operation,
140 collection: collection.to_string(),
141 entity_id,
142 entity_kind: entity_kind.to_string(),
143 entity_bytes: entity
144 .as_ref()
145 .map(|entity| UnifiedStore::serialize_entity(entity, store.format_version())),
146 metadata: self.latest_metadata_for(collection, entity_id),
147 refresh_records: None,
148 range_id: None,
149 ownership_epoch: None,
150 };
151 let encoded = record.encode();
152 primary.append_logical_record(record.lsn, encoded);
153 }
154 lsn
155 }
156
157 pub(crate) fn cdc_emit_kv(
158 &self,
159 operation: crate::replication::cdc::ChangeOperation,
160 collection: &str,
161 key: &str,
162 entity_id: u64,
163 before: Option<crate::json::Value>,
164 after: Option<crate::json::Value>,
165 ) -> u64 {
166 let lsn = self
167 .inner
168 .cdc
169 .emit_kv(operation, collection, key, entity_id, before, after);
170 self.inner.kv_stats.incr_watch_events_emitted();
171 self.invalidate_result_cache_for_table(collection);
172 lsn
173 }
174
175 pub(crate) fn record_kv_watch_event(
176 &self,
177 operation: crate::replication::cdc::ChangeOperation,
178 collection: &str,
179 key: &str,
180 entity_id: u64,
181 before: Option<crate::json::Value>,
182 after: Option<crate::json::Value>,
183 ) {
184 if self.current_xid().is_some() {
185 let conn_id = current_connection_id();
186 let event = crate::replication::cdc::KvWatchEvent {
187 collection: collection.to_string(),
188 key: key.to_string(),
189 op: operation,
190 before,
191 after,
192 lsn: 0,
193 committed_at: 0,
194 dropped_event_count: 0,
195 };
196 self.inner
197 .pending_kv_watch_events
198 .write()
199 .entry(conn_id)
200 .or_default()
201 .push(event);
202 return;
203 }
204
205 self.cdc_emit_kv(operation, collection, key, entity_id, before, after);
206 }
207
208 pub(crate) fn cdc_emit_prebuilt(
209 &self,
210 operation: crate::replication::cdc::ChangeOperation,
211 collection: &str,
212 entity: &UnifiedEntity,
213 entity_kind: &str,
214 metadata: Option<&crate::storage::Metadata>,
215 invalidate_cache: bool,
216 ) -> u64 {
217 self.cdc_emit_prebuilt_with_columns(
218 operation,
219 collection,
220 entity,
221 entity_kind,
222 metadata,
223 invalidate_cache,
224 None,
225 )
226 }
227
228 pub(crate) fn cdc_emit_prebuilt_with_columns(
235 &self,
236 operation: crate::replication::cdc::ChangeOperation,
237 collection: &str,
238 entity: &UnifiedEntity,
239 entity_kind: &str,
240 metadata: Option<&crate::storage::Metadata>,
241 invalidate_cache: bool,
242 changed_columns: Option<Vec<String>>,
243 ) -> u64 {
244 if invalidate_cache {
245 self.invalidate_result_cache();
246 }
247
248 let public_id = entity.logical_id().raw();
249 let lsn = self.inner.cdc.emit_with_columns(
250 operation,
251 collection,
252 public_id,
253 entity_kind,
254 changed_columns,
255 );
256
257 if let Some(ref primary) = self.inner.db.replication {
258 let store = self.inner.db.store();
259 let record = ChangeRecord {
260 term: self.current_replication_term(),
261 lsn,
262 timestamp: SystemTime::now()
263 .duration_since(UNIX_EPOCH)
264 .unwrap_or_default()
265 .as_millis() as u64,
266 operation,
267 collection: collection.to_string(),
268 entity_id: entity.id.raw(),
269 entity_kind: entity_kind.to_string(),
270 entity_bytes: Some(UnifiedStore::serialize_entity(
271 entity,
272 store.format_version(),
273 )),
274 metadata: metadata
275 .map(metadata_to_json)
276 .map(server_json_to_wire_json)
277 .or_else(|| self.latest_metadata_for(collection, entity.id.raw())),
278 refresh_records: None,
279 range_id: None,
280 ownership_epoch: None,
281 };
282 let encoded = record.encode();
283 primary.append_logical_record(record.lsn, encoded);
284 }
285
286 lsn
287 }
288
289 pub(crate) fn current_replication_term(&self) -> u64 {
290 self.inner.db.options().replication.term
291 }
292
293 pub(crate) fn cdc_emit_prebuilt_batch<'a, I>(
294 &self,
295 operation: crate::replication::cdc::ChangeOperation,
296 entity_kind: &str,
297 items: I,
298 invalidate_cache: bool,
299 ) where
300 I: IntoIterator<
301 Item = (
302 &'a str,
303 &'a UnifiedEntity,
304 Option<&'a crate::storage::Metadata>,
305 ),
306 >,
307 {
308 let items: Vec<(&str, &UnifiedEntity, Option<&crate::storage::Metadata>)> =
309 items.into_iter().collect();
310 if items.is_empty() {
311 return;
312 }
313
314 if invalidate_cache {
315 self.invalidate_result_cache();
316 }
317
318 for (collection, entity, metadata) in items {
319 self.cdc_emit_prebuilt(operation, collection, entity, entity_kind, metadata, false);
320 }
321 }
322
323 pub fn cdc_poll(
325 &self,
326 since_lsn: u64,
327 max_count: usize,
328 ) -> Vec<crate::replication::cdc::ChangeEvent> {
329 self.inner.cdc.poll(since_lsn, max_count)
330 }
331
332 pub fn cdc_current_lsn(&self) -> u64 {
336 self.inner.cdc.current_lsn()
337 }
338
339 pub fn kv_watch_events_since(
340 &self,
341 collection: &str,
342 key: &str,
343 since_lsn: u64,
344 max_count: usize,
345 ) -> Vec<crate::replication::cdc::KvWatchEvent> {
346 self.inner
347 .cdc
348 .poll(since_lsn, max_count)
349 .into_iter()
350 .filter_map(|event| event.kv)
351 .filter(|event| event.collection == collection && event.key == key)
352 .collect()
353 }
354
355 pub fn kv_watch_events_since_prefix(
356 &self,
357 collection: &str,
358 prefix: &str,
359 since_lsn: u64,
360 max_count: usize,
361 ) -> Vec<crate::replication::cdc::KvWatchEvent> {
362 self.inner
363 .cdc
364 .poll(since_lsn, max_count)
365 .into_iter()
366 .filter_map(|event| event.kv)
367 .filter(|event| event.collection == collection && event.key.starts_with(prefix))
368 .collect()
369 }
370
371 pub(crate) fn kv_watch_subscribe<'a>(
372 &'a self,
373 collection: impl Into<String>,
374 key: impl Into<String>,
375 from_lsn: Option<u64>,
376 ) -> crate::runtime::kv_watch::KvWatchStream<'a> {
377 crate::runtime::kv_watch::KvWatchStream::subscribe(
378 &self.inner.cdc,
379 &self.inner.kv_stats,
380 collection,
381 key,
382 from_lsn,
383 self.kv_watch_idle_timeout_ms(),
384 )
385 }
386
387 pub(crate) fn kv_watch_subscribe_prefix<'a>(
388 &'a self,
389 collection: impl Into<String>,
390 prefix: impl Into<String>,
391 from_lsn: Option<u64>,
392 ) -> crate::runtime::kv_watch::KvWatchStream<'a> {
393 crate::runtime::kv_watch::KvWatchStream::subscribe_prefix(
394 &self.inner.cdc,
395 &self.inner.kv_stats,
396 collection,
397 prefix,
398 from_lsn,
399 self.kv_watch_idle_timeout_ms(),
400 )
401 }
402
403 pub(crate) fn kv_watch_idle_timeout_ms(&self) -> u64 {
404 self.config_u64("red.config.kv.watch.idle_timeout_ms", 60_000)
405 }
406}