1use super::*;
2
3impl RedDBRuntime {
4 pub fn replica_relay_manifest_path(&self, replica_id: &str) -> Option<std::path::PathBuf> {
5 let plan = self.primary_replica_file_plan()?;
6 Some(plan.relay_manifest_path(replica_id))
7 }
8
9 pub fn primary_replica_file_plan(&self) -> Option<reddb_file::PrimaryReplicaFilePlan> {
10 self.primary_replica_file_plan_result().ok().flatten()
11 }
12
13 fn primary_replica_root(&self) -> Option<std::path::PathBuf> {
14 let data_path = self.inner.db.options().data_path.as_ref()?;
15 Some(crate::replication::primary::PrimaryReplication::primary_replica_root_for(data_path))
16 }
17
18 pub fn primary_replica_file_plan_result(
19 &self,
20 ) -> RedDBResult<Option<reddb_file::PrimaryReplicaFilePlan>> {
21 let Some(root) = self.primary_replica_root() else {
22 return Ok(None);
23 };
24 let timeline = self.primary_replica_current_timeline(&root)?;
25 Ok(Some(reddb_file::PrimaryReplicaFilePlan::new(
26 root, timeline,
27 )))
28 }
29
30 fn primary_replica_current_timeline(
31 &self,
32 root: &std::path::Path,
33 ) -> RedDBResult<reddb_file::TimelineId> {
34 let path = reddb_file::PrimaryReplicaFilePlan::new(root, reddb_file::TimelineId::initial())
35 .timeline_history_path();
36 match reddb_file::TimelineHistory::read_from_path(&path) {
37 Ok(history) => Ok(history
38 .current()
39 .unwrap_or_else(reddb_file::TimelineId::initial)),
40 Err(reddb_file::RdbFileError::Io(err))
41 if err.kind() == std::io::ErrorKind::NotFound =>
42 {
43 Ok(reddb_file::TimelineId::initial())
44 }
45 Err(err) => Err(RedDBError::Internal(err.to_string())),
46 }
47 }
48
49 pub fn create_primary_replica_basebackup(
50 &self,
51 chunk_bytes: usize,
52 ) -> RedDBResult<Option<reddb_file::PrimaryReplicaBaseBackupManifest>> {
53 let Some(plan) = self.primary_replica_file_plan_result()? else {
54 return Ok(None);
55 };
56 self.flush()?;
57 let checkpoint_lsn = self.primary_logical_head_lsn().max(self.cdc_current_lsn());
58 let snapshot = self.inner.db.store().to_binary_dump_bytes();
59 let backup = reddb_file::BaseBackupPlan::new(plan.timeline, 0, checkpoint_lsn);
60 let manifest = plan
61 .write_basebackup_snapshot_parts(backup, &snapshot, chunk_bytes)
62 .map_err(|err| RedDBError::Internal(err.to_string()))?;
63 manifest
64 .write_to_path(plan.basebackup_path(&backup))
65 .map_err(|err| RedDBError::Internal(err.to_string()))?;
66 Ok(Some(manifest))
67 }
68
69 pub fn materialize_primary_replica_basebackup_snapshot(
70 &self,
71 manifest: &reddb_file::PrimaryReplicaBaseBackupManifest,
72 parts_root: impl AsRef<std::path::Path>,
73 destination: impl AsRef<std::path::Path>,
74 ) -> RedDBResult<u64> {
75 let snapshot = manifest
76 .read_snapshot_parts(parts_root)
77 .map_err(|err| RedDBError::Internal(err.to_string()))?;
78 let loaded = crate::storage::unified::UnifiedStore::load_from_bytes_with_config(
79 &snapshot,
80 crate::storage::unified::UnifiedStoreConfig::default(),
81 )
82 .map_err(|err| RedDBError::Internal(format!("validate basebackup snapshot: {err}")))?;
83 loaded.set_config_tree(
84 "red.replication",
85 &crate::json!({
86 "last_applied_lsn": manifest.checkpoint_lsn,
87 "state": "healthy",
88 "last_error": "",
89 }),
90 );
91 let snapshot = loaded.to_binary_dump_bytes();
92 let destination = destination.as_ref();
93 if let Some(parent) = destination.parent() {
94 std::fs::create_dir_all(parent)?;
95 }
96 crate::storage::EmbeddedRdbArtifact::create_with_snapshot(destination, &snapshot)?;
97 Ok(manifest.checkpoint_lsn)
98 }
99
100 pub fn replica_rebootstrap_staging_root(&self) -> Option<std::path::PathBuf> {
101 let data_path = self.inner.db.options().data_path.as_ref()?;
102 Some(reddb_file::layout::rebootstrap_staging_root(data_path))
103 }
104
105 pub fn replica_rebootstrap_pending_path(&self) -> Option<std::path::PathBuf> {
106 let data_path = self.inner.db.options().data_path.as_ref()?;
107 Some(reddb_file::layout::rebootstrap_pending_path(data_path))
108 }
109
110 pub(crate) async fn stage_primary_replica_rebootstrap_from_snapshot(
111 &self,
112 client: &mut crate::grpc::proto::red_db_client::RedDbClient<tonic::transport::Channel>,
113 chunk_bytes: usize,
114 ) -> RedDBResult<Option<u64>> {
115 let Some(parts_root) = self.replica_rebootstrap_staging_root() else {
116 return Ok(None);
117 };
118 let Some(pending_path) = self.replica_rebootstrap_pending_path() else {
119 return Ok(None);
120 };
121 let data_path = self
122 .inner
123 .db
124 .options()
125 .data_path
126 .as_ref()
127 .ok_or_else(|| RedDBError::Internal("replica data path unavailable".into()))?;
128 let intent_log_path = reddb_file::layout::rebootstrap_intent_log_path(data_path);
129 let intent_log = crate::telemetry::admin_intent_log::AdminIntentLog::open(intent_log_path)
130 .map_err(|err| RedDBError::Internal(err.to_string()))?;
131 let replica_id = self.resolve_replica_id();
132 let bootstrapper = crate::replication::replica::ReplicaBootstrapper::new(replica_id);
133 let source_lsn = self.config_u64("red.replication.last_applied_lsn", 0);
134 let _ = std::fs::remove_file(&pending_path);
135
136 let chunk_bytes = chunk_bytes.max(1);
137 let (resume, mut bootstrap_handle) = match bootstrapper.resume(&intent_log) {
138 Some((resume, handle)) => (Some(resume), handle),
139 None => (
140 None,
141 bootstrapper
142 .begin(&intent_log, source_lsn, 0)
143 .map_err(|err| RedDBError::Internal(err.to_string()))?,
144 ),
145 };
146 let mut token: Option<String> = resume
147 .as_ref()
148 .and_then(|resume| resume.snapshot_token.clone());
149 let mut manifest: Option<reddb_file::PrimaryReplicaBaseBackupManifest> = None;
150 let mut written = std::collections::BTreeSet::new();
151 let mut offset = resume
152 .as_ref()
153 .map(|resume| resume.snapshot_offset)
154 .unwrap_or(0);
155
156 loop {
157 let mut request = tonic::Request::new(crate::grpc::proto::Empty {});
158 request.metadata_mut().insert(
159 "x-reddb-snapshot-max-bytes",
160 chunk_bytes
161 .to_string()
162 .parse()
163 .map_err(|err| RedDBError::Internal(format!("snapshot max bytes: {err}")))?,
164 );
165 request.metadata_mut().insert(
166 "x-reddb-snapshot-offset",
167 offset
168 .to_string()
169 .parse()
170 .map_err(|err| RedDBError::Internal(format!("snapshot offset: {err}")))?,
171 );
172 if let Some(token) = &token {
173 request.metadata_mut().insert(
174 "x-reddb-snapshot-token",
175 token.parse().map_err(|err| {
176 RedDBError::Internal(format!("snapshot token metadata: {err}"))
177 })?,
178 );
179 }
180
181 let response = client
182 .replication_snapshot(request)
183 .await
184 .map_err(|err| RedDBError::Internal(format!("replication snapshot: {err}")))?;
185 let payload = reddb_wire::replication::BaseBackupChunk::decode_json(
186 response.into_inner().payload.as_bytes(),
187 )
188 .map_err(|err| RedDBError::Internal(format!("parse replication snapshot: {err}")))?;
189 if token.is_none() {
190 token = payload.snapshot_token.clone();
191 }
192 let staged =
193 crate::replication::replica::stage_basebackup_snapshot_chunk(&payload, &parts_root)
194 .map_err(|err| RedDBError::Internal(err.to_string()))?
195 .ok_or_else(|| {
196 RedDBError::Internal(
197 "replication snapshot did not include basebackup payload".into(),
198 )
199 })?;
200 if let Some(existing) = &manifest {
201 if existing != &staged.manifest {
202 return Err(RedDBError::Internal(
203 "replication snapshot basebackup manifest changed while downloading".into(),
204 ));
205 }
206 } else {
207 manifest = Some(staged.manifest.clone());
208 written.extend(
209 crate::replication::replica::recover_staged_basebackup_chunks(
210 &staged.manifest,
211 &parts_root,
212 )
213 .map_err(|err| RedDBError::Internal(err.to_string()))?,
214 );
215 }
216 if let Some(ordinal) = staged.chunk_ordinal {
217 written.insert(ordinal);
218 }
219 let current = manifest
220 .as_ref()
221 .expect("manifest set after staging basebackup chunk");
222 if current
223 .chunks
224 .iter()
225 .all(|chunk| written.contains(&chunk.ordinal))
226 {
227 current
228 .verify_snapshot_parts(&parts_root)
229 .map_err(|err| RedDBError::Internal(err.to_string()))?;
230 let checkpoint_lsn = self.materialize_primary_replica_basebackup_snapshot(
231 current,
232 &parts_root,
233 &pending_path,
234 )?;
235 self.inner.db.store().set_config_tree(
236 "red.replication",
237 &crate::json!({
238 "state": "rebootstrap_ready",
239 "rebootstrap_pending_path": pending_path.display().to_string(),
240 "rebootstrap_checkpoint_lsn": checkpoint_lsn,
241 "rebootstrap_timeline": current.timeline.0,
242 }),
243 );
244 let data_path =
245 self.inner.db.options().data_path.as_ref().ok_or_else(|| {
246 RedDBError::Internal("replica data path unavailable".into())
247 })?;
248 reddb_file::write_rebootstrap_ready_marker(
249 data_path,
250 &reddb_file::ReplicaRebootstrapReadyMarker {
251 pending_path: pending_path.clone(),
252 checkpoint_lsn,
253 timeline: current.timeline,
254 },
255 )
256 .map_err(|err| RedDBError::Internal(err.to_string()))?;
257 bootstrap_handle
258 .complete(current.chunks.len() as u64, 0)
259 .map_err(|err| RedDBError::Internal(err.to_string()))?;
260 return Ok(Some(checkpoint_lsn));
261 }
262 let next = current
263 .chunks
264 .iter()
265 .find(|chunk| !written.contains(&chunk.ordinal))
266 .ok_or_else(|| RedDBError::Internal("basebackup chunk tracking stalled".into()))?;
267 offset = next.snapshot_offset;
268 if let Some(snapshot_token) = token.as_deref() {
269 bootstrap_handle
270 .checkpoint_snapshot_transfer(
271 snapshot_token,
272 offset,
273 source_lsn,
274 written.len() as u64,
275 )
276 .map_err(|err| RedDBError::Internal(err.to_string()))?;
277 }
278 }
279 }
280
281 pub fn primary_replica_slot_catalog(
282 &self,
283 ) -> RedDBResult<Option<reddb_file::ReplicationSlotCatalog>> {
284 let Some(plan) = self.primary_replica_file_plan_result()? else {
285 return Ok(None);
286 };
287 match reddb_file::ReplicationSlotCatalog::read_from_path(plan.slots_path()) {
288 Ok(catalog) => Ok(Some(catalog)),
289 Err(reddb_file::RdbFileError::Io(err))
290 if err.kind() == std::io::ErrorKind::NotFound =>
291 {
292 Ok(None)
293 }
294 Err(err) => Err(RedDBError::Internal(err.to_string())),
295 }
296 }
297
298 pub fn primary_replica_wal_retention_plan(
299 &self,
300 ) -> RedDBResult<Option<reddb_file::WalRetentionPlan>> {
301 let Some(plan) = self.primary_replica_file_plan_result()? else {
302 return Ok(None);
303 };
304 let Some(catalog) = self.primary_replica_slot_catalog()? else {
305 return Ok(None);
306 };
307 let current_lsn = self.primary_logical_head_lsn().max(self.cdc_current_lsn());
308 Ok(Some(
309 plan.plan_wal_retention(&catalog, current_lsn)
310 .map_err(|err| RedDBError::Internal(err.to_string()))?,
311 ))
312 }
313
314 pub fn primary_replica_catchup_mode(
315 &self,
316 available_from_lsn: u64,
317 replica_lsn: u64,
318 ) -> RedDBResult<Option<reddb_file::ReplicaCatchupMode>> {
319 let Some(plan) = self.primary_replica_file_plan_result()? else {
320 return Ok(None);
321 };
322 let basebackups = plan
323 .list_basebackups()
324 .map_err(|err| RedDBError::Internal(err.to_string()))?;
325 Ok(Some(plan.catchup_mode_with_basebackups(
326 available_from_lsn,
327 replica_lsn,
328 &basebackups,
329 )))
330 }
331
332 pub fn primary_replica_timeline_history_path(&self) -> Option<std::path::PathBuf> {
333 let root = self.primary_replica_root()?;
334 Some(
335 reddb_file::PrimaryReplicaFilePlan::new(root, reddb_file::TimelineId::initial())
336 .timeline_history_path(),
337 )
338 }
339
340 pub fn primary_replica_rejoin_decision(
341 &self,
342 node_timeline: reddb_file::TimelineId,
343 node_flushed_lsn: u64,
344 available_from_lsn: u64,
345 ) -> RedDBResult<Option<reddb_file::RejoinDecision>> {
346 let Some(path) = self.primary_replica_timeline_history_path() else {
347 return Ok(None);
348 };
349 let history = match reddb_file::TimelineHistory::read_from_path(&path) {
350 Ok(history) => history,
351 Err(reddb_file::RdbFileError::Io(err))
352 if err.kind() == std::io::ErrorKind::NotFound =>
353 {
354 reddb_file::TimelineHistory::new(crate::utils::now_unix_millis())
355 }
356 Err(err) => return Err(RedDBError::Internal(err.to_string())),
357 };
358 Ok(Some(history.rejoin_decision(
359 node_timeline,
360 node_flushed_lsn,
361 available_from_lsn,
362 )))
363 }
364
365 pub fn persist_primary_replica_rejoin_plan(
366 &self,
367 node_timeline: reddb_file::TimelineId,
368 node_flushed_lsn: u64,
369 available_from_lsn: u64,
370 ) -> RedDBResult<Option<reddb_file::RejoinDecision>> {
371 let Some(decision) = self.primary_replica_rejoin_decision(
372 node_timeline,
373 node_flushed_lsn,
374 available_from_lsn,
375 )?
376 else {
377 return Ok(None);
378 };
379
380 let (state, target_timeline, rewind_to_lsn, start_lsn) = match decision {
381 reddb_file::RejoinDecision::AlreadyCurrent => {
382 ("timeline_current", node_timeline.0, 0, node_flushed_lsn)
383 }
384 reddb_file::RejoinDecision::FollowNewTimeline {
385 target_timeline,
386 start_lsn,
387 } => ("rejoin_follow_wal", target_timeline.0, 0, start_lsn),
388 reddb_file::RejoinDecision::Rewind {
389 target_timeline,
390 rewind_to_lsn,
391 } => (
392 "rejoin_rewind_required",
393 target_timeline.0,
394 rewind_to_lsn,
395 0,
396 ),
397 reddb_file::RejoinDecision::Reclone => ("reclone_required", 0, 0, 0),
398 };
399 self.inner.db.store().set_config_tree(
400 "red.replication",
401 &crate::json!({
402 "state": state,
403 "rejoin_node_timeline": node_timeline.0,
404 "rejoin_node_flushed_lsn": node_flushed_lsn,
405 "rejoin_available_from_lsn": available_from_lsn,
406 "rejoin_target_timeline": target_timeline,
407 "rejoin_rewind_to_lsn": rewind_to_lsn,
408 "rejoin_start_lsn": start_lsn,
409 "rejoin_rewind_confirmed_timeline": 0,
410 "rejoin_rewind_confirmed_lsn": 0,
411 }),
412 );
413
414 Ok(Some(decision))
415 }
416
417 pub fn prune_primary_replica_wal_segments(
418 &self,
419 ) -> RedDBResult<Option<reddb_file::WalPruneResult>> {
420 let current_lsn = self.primary_logical_head_lsn().max(self.cdc_current_lsn());
421 self.prune_primary_replica_wal_segments_at(current_lsn)
422 }
423
424 fn prune_primary_replica_wal_segments_at(
425 &self,
426 current_lsn: u64,
427 ) -> RedDBResult<Option<reddb_file::WalPruneResult>> {
428 let Some(plan) = self.primary_replica_file_plan_result()? else {
429 return Ok(None);
430 };
431 let Some(catalog) = self.primary_replica_slot_catalog()? else {
432 return Ok(None);
433 };
434 Ok(Some(
435 plan.prune_wal_segments(&catalog, current_lsn)
436 .map_err(|err| RedDBError::Internal(err.to_string()))?,
437 ))
438 }
439
440 pub fn ack_primary_replica_lsn_and_prune(
441 &self,
442 replica_id: &str,
443 applied_lsn: u64,
444 durable_lsn: u64,
445 apply_errors_total: u64,
446 divergence_total: u64,
447 ) -> RedDBResult<Option<reddb_file::WalPruneResult>> {
448 let Some(repl) = self.inner.db.replication.as_ref() else {
449 return Ok(None);
450 };
451 repl.ack_replica_lsn_with_observability(
452 replica_id,
453 applied_lsn,
454 durable_lsn,
455 apply_errors_total,
456 divergence_total,
457 );
458 self.refresh_replication_flow_control();
459 let current_lsn = self
460 .primary_logical_head_lsn()
461 .max(self.cdc_current_lsn())
462 .max(applied_lsn)
463 .max(durable_lsn);
464 self.prune_primary_replica_wal_segments_at(current_lsn)
465 }
466
467 pub fn record_failover_timeline_promotion(
468 &self,
469 replica_id: &str,
470 applied_lsn: u64,
471 ) -> RedDBResult<reddb_file::TimelineHistory> {
472 let Some(path) = self.primary_replica_timeline_history_path() else {
473 return Ok(reddb_file::TimelineHistory::new(
474 crate::utils::now_unix_millis(),
475 ));
476 };
477 let now_ms = crate::utils::now_unix_millis();
478 let history = match reddb_file::TimelineHistory::read_from_path(&path) {
479 Ok(history) => history,
480 Err(reddb_file::RdbFileError::Io(err))
481 if err.kind() == std::io::ErrorKind::NotFound =>
482 {
483 reddb_file::TimelineHistory::new(now_ms)
484 }
485 Err(err) => return Err(RedDBError::Internal(err.to_string())),
486 };
487 let parent = history
488 .current()
489 .unwrap_or_else(reddb_file::TimelineId::initial);
490 let candidate = reddb_file::PromotionCandidate {
491 replica_id: replica_id.to_string(),
492 timeline: parent,
493 received_lsn: applied_lsn,
494 flushed_lsn: applied_lsn,
495 applied_lsn,
496 };
497 let promoted = history
498 .promotion_history(&candidate, parent.next(), now_ms)
499 .map_err(|err| RedDBError::Internal(err.to_string()))?;
500 promoted
501 .write_to_path(path)
502 .map_err(|err| RedDBError::Internal(err.to_string()))?;
503 Ok(promoted)
504 }
505
506 pub fn record_replica_relay_batch(
507 &self,
508 replica_id: &str,
509 records: &[(u64, Vec<u8>)],
510 applied_lsn: u64,
511 ) -> RedDBResult<()> {
512 let Some(plan) = self.primary_replica_file_plan_result()? else {
513 return Ok(());
514 };
515 let path = plan.relay_manifest_path(replica_id);
516 let Some((first_lsn, _)) = records.first() else {
517 return Ok(());
518 };
519 let end_lsn = records
520 .iter()
521 .map(|(lsn, _)| *lsn)
522 .max()
523 .unwrap_or(*first_lsn);
524 let mut manifest = match reddb_file::ReplicaRelayLogManifest::read_from_path(&path) {
525 Ok(manifest) => {
526 let relay_dir = path.parent().ok_or_else(|| {
527 RedDBError::Internal("relay manifest path has no parent".into())
528 })?;
529 manifest
530 .validate_segments(relay_dir)
531 .map_err(|err| RedDBError::Internal(err.to_string()))?;
532 manifest
533 }
534 Err(reddb_file::RdbFileError::Io(err))
535 if err.kind() == std::io::ErrorKind::NotFound =>
536 {
537 reddb_file::ReplicaRelayLogManifest::new(
538 replica_id,
539 reddb_file::TimelineId::initial(),
540 )
541 }
542 Err(err) => return Err(RedDBError::Internal(err.to_string())),
543 };
544 if manifest.replica_id != replica_id {
545 return Err(RedDBError::Internal(format!(
546 "relay manifest replica_id {} does not match {}",
547 manifest.replica_id, replica_id
548 )));
549 }
550 if manifest.timeline != reddb_file::TimelineId::initial() {
551 return Err(RedDBError::Internal(format!(
552 "relay manifest timeline {} is not current timeline {}",
553 manifest.timeline.0,
554 reddb_file::TimelineId::initial().0
555 )));
556 }
557
558 if end_lsn > manifest.flushed_lsn {
559 let new_records = records
560 .iter()
561 .filter(|(lsn, _)| *lsn > manifest.flushed_lsn)
562 .map(|(lsn, payload)| reddb_file::ReplicaRelayLogRecord::new(*lsn, payload.clone()))
563 .collect::<Vec<_>>();
564 let segment = reddb_file::ReplicaRelayLogSegment::from_records(
565 reddb_file::TimelineId::initial(),
566 new_records,
567 )
568 .map_err(|err| RedDBError::Internal(err.to_string()))?;
569 let start_lsn = segment.start_lsn;
570 let end_lsn = segment.end_lsn;
571 let relative_path = reddb_file::layout::relay_segment_relative_path(start_lsn, end_lsn);
572 let segment_path = path
573 .parent()
574 .ok_or_else(|| RedDBError::Internal("relay manifest path has no parent".into()))?
575 .join(&relative_path);
576 segment
577 .write_to_path(&segment_path)
578 .map_err(|err| RedDBError::Internal(err.to_string()))?;
579 manifest
580 .push_segment(
581 reddb_file::RelayLogSegmentRef::new(
582 relative_path,
583 start_lsn,
584 end_lsn,
585 segment
586 .checksum()
587 .map_err(|err| RedDBError::Internal(err.to_string()))?,
588 )
589 .map_err(|err| RedDBError::Internal(err.to_string()))?,
590 )
591 .map_err(|err| RedDBError::Internal(err.to_string()))?;
592 }
593 manifest
594 .mark_applied(applied_lsn.min(manifest.flushed_lsn))
595 .map_err(|err| RedDBError::Internal(err.to_string()))?;
596 manifest
597 .write_to_path(path)
598 .map_err(|err| RedDBError::Internal(err.to_string()))
599 }
600}