Skip to main content

reddb_server/runtime/
impl_primary_replica_file.rs

1use super::*;
2
3impl RedDBRuntime {
4    pub fn replica_relay_manifest_path(&self, replica_id: &str) -> Option<std::path::PathBuf> {
5        let plan = self.primary_replica_file_plan()?;
6        Some(plan.relay_manifest_path(replica_id))
7    }
8
9    pub fn primary_replica_file_plan(&self) -> Option<reddb_file::PrimaryReplicaFilePlan> {
10        self.primary_replica_file_plan_result().ok().flatten()
11    }
12
13    fn primary_replica_root(&self) -> Option<std::path::PathBuf> {
14        let data_path = self.inner.db.options().data_path.as_ref()?;
15        Some(crate::replication::primary::PrimaryReplication::primary_replica_root_for(data_path))
16    }
17
18    pub fn primary_replica_file_plan_result(
19        &self,
20    ) -> RedDBResult<Option<reddb_file::PrimaryReplicaFilePlan>> {
21        let Some(root) = self.primary_replica_root() else {
22            return Ok(None);
23        };
24        let timeline = self.primary_replica_current_timeline(&root)?;
25        Ok(Some(reddb_file::PrimaryReplicaFilePlan::new(
26            root, timeline,
27        )))
28    }
29
30    fn primary_replica_current_timeline(
31        &self,
32        root: &std::path::Path,
33    ) -> RedDBResult<reddb_file::TimelineId> {
34        let path = reddb_file::PrimaryReplicaFilePlan::new(root, reddb_file::TimelineId::initial())
35            .timeline_history_path();
36        match reddb_file::TimelineHistory::read_from_path(&path) {
37            Ok(history) => Ok(history
38                .current()
39                .unwrap_or_else(reddb_file::TimelineId::initial)),
40            Err(reddb_file::RdbFileError::Io(err))
41                if err.kind() == std::io::ErrorKind::NotFound =>
42            {
43                Ok(reddb_file::TimelineId::initial())
44            }
45            Err(err) => Err(RedDBError::Internal(err.to_string())),
46        }
47    }
48
49    pub fn create_primary_replica_basebackup(
50        &self,
51        chunk_bytes: usize,
52    ) -> RedDBResult<Option<reddb_file::PrimaryReplicaBaseBackupManifest>> {
53        let Some(plan) = self.primary_replica_file_plan_result()? else {
54            return Ok(None);
55        };
56        self.flush()?;
57        let checkpoint_lsn = self.primary_logical_head_lsn().max(self.cdc_current_lsn());
58        let snapshot = self.inner.db.store().to_binary_dump_bytes();
59        let backup = reddb_file::BaseBackupPlan::new(plan.timeline, 0, checkpoint_lsn);
60        let manifest = plan
61            .write_basebackup_snapshot_parts(backup, &snapshot, chunk_bytes)
62            .map_err(|err| RedDBError::Internal(err.to_string()))?;
63        manifest
64            .write_to_path(plan.basebackup_path(&backup))
65            .map_err(|err| RedDBError::Internal(err.to_string()))?;
66        Ok(Some(manifest))
67    }
68
69    pub fn materialize_primary_replica_basebackup_snapshot(
70        &self,
71        manifest: &reddb_file::PrimaryReplicaBaseBackupManifest,
72        parts_root: impl AsRef<std::path::Path>,
73        destination: impl AsRef<std::path::Path>,
74    ) -> RedDBResult<u64> {
75        let snapshot = manifest
76            .read_snapshot_parts(parts_root)
77            .map_err(|err| RedDBError::Internal(err.to_string()))?;
78        let loaded = crate::storage::unified::UnifiedStore::load_from_bytes_with_config(
79            &snapshot,
80            crate::storage::unified::UnifiedStoreConfig::default(),
81        )
82        .map_err(|err| RedDBError::Internal(format!("validate basebackup snapshot: {err}")))?;
83        loaded.set_config_tree(
84            "red.replication",
85            &crate::json!({
86                "last_applied_lsn": manifest.checkpoint_lsn,
87                "state": "healthy",
88                "last_error": "",
89            }),
90        );
91        let snapshot = loaded.to_binary_dump_bytes();
92        let destination = destination.as_ref();
93        if let Some(parent) = destination.parent() {
94            std::fs::create_dir_all(parent)?;
95        }
96        crate::storage::EmbeddedRdbArtifact::create_with_snapshot(destination, &snapshot)?;
97        Ok(manifest.checkpoint_lsn)
98    }
99
100    pub fn replica_rebootstrap_staging_root(&self) -> Option<std::path::PathBuf> {
101        let data_path = self.inner.db.options().data_path.as_ref()?;
102        Some(reddb_file::layout::rebootstrap_staging_root(data_path))
103    }
104
105    pub fn replica_rebootstrap_pending_path(&self) -> Option<std::path::PathBuf> {
106        let data_path = self.inner.db.options().data_path.as_ref()?;
107        Some(reddb_file::layout::rebootstrap_pending_path(data_path))
108    }
109
110    pub(crate) async fn stage_primary_replica_rebootstrap_from_snapshot(
111        &self,
112        client: &mut crate::grpc::proto::red_db_client::RedDbClient<tonic::transport::Channel>,
113        chunk_bytes: usize,
114    ) -> RedDBResult<Option<u64>> {
115        let Some(parts_root) = self.replica_rebootstrap_staging_root() else {
116            return Ok(None);
117        };
118        let Some(pending_path) = self.replica_rebootstrap_pending_path() else {
119            return Ok(None);
120        };
121        let data_path = self
122            .inner
123            .db
124            .options()
125            .data_path
126            .as_ref()
127            .ok_or_else(|| RedDBError::Internal("replica data path unavailable".into()))?;
128        let intent_log_path = reddb_file::layout::rebootstrap_intent_log_path(data_path);
129        let intent_log = crate::telemetry::admin_intent_log::AdminIntentLog::open(intent_log_path)
130            .map_err(|err| RedDBError::Internal(err.to_string()))?;
131        let replica_id = self.resolve_replica_id();
132        let bootstrapper = crate::replication::replica::ReplicaBootstrapper::new(replica_id);
133        let source_lsn = self.config_u64("red.replication.last_applied_lsn", 0);
134        let _ = std::fs::remove_file(&pending_path);
135
136        let chunk_bytes = chunk_bytes.max(1);
137        let (resume, mut bootstrap_handle) = match bootstrapper.resume(&intent_log) {
138            Some((resume, handle)) => (Some(resume), handle),
139            None => (
140                None,
141                bootstrapper
142                    .begin(&intent_log, source_lsn, 0)
143                    .map_err(|err| RedDBError::Internal(err.to_string()))?,
144            ),
145        };
146        let mut token: Option<String> = resume
147            .as_ref()
148            .and_then(|resume| resume.snapshot_token.clone());
149        let mut manifest: Option<reddb_file::PrimaryReplicaBaseBackupManifest> = None;
150        let mut written = std::collections::BTreeSet::new();
151        let mut offset = resume
152            .as_ref()
153            .map(|resume| resume.snapshot_offset)
154            .unwrap_or(0);
155
156        loop {
157            let mut request = tonic::Request::new(crate::grpc::proto::Empty {});
158            request.metadata_mut().insert(
159                "x-reddb-snapshot-max-bytes",
160                chunk_bytes
161                    .to_string()
162                    .parse()
163                    .map_err(|err| RedDBError::Internal(format!("snapshot max bytes: {err}")))?,
164            );
165            request.metadata_mut().insert(
166                "x-reddb-snapshot-offset",
167                offset
168                    .to_string()
169                    .parse()
170                    .map_err(|err| RedDBError::Internal(format!("snapshot offset: {err}")))?,
171            );
172            if let Some(token) = &token {
173                request.metadata_mut().insert(
174                    "x-reddb-snapshot-token",
175                    token.parse().map_err(|err| {
176                        RedDBError::Internal(format!("snapshot token metadata: {err}"))
177                    })?,
178                );
179            }
180
181            let response = client
182                .replication_snapshot(request)
183                .await
184                .map_err(|err| RedDBError::Internal(format!("replication snapshot: {err}")))?;
185            let payload = reddb_wire::replication::BaseBackupChunk::decode_json(
186                response.into_inner().payload.as_bytes(),
187            )
188            .map_err(|err| RedDBError::Internal(format!("parse replication snapshot: {err}")))?;
189            if token.is_none() {
190                token = payload.snapshot_token.clone();
191            }
192            let staged =
193                crate::replication::replica::stage_basebackup_snapshot_chunk(&payload, &parts_root)
194                    .map_err(|err| RedDBError::Internal(err.to_string()))?
195                    .ok_or_else(|| {
196                        RedDBError::Internal(
197                            "replication snapshot did not include basebackup payload".into(),
198                        )
199                    })?;
200            if let Some(existing) = &manifest {
201                if existing != &staged.manifest {
202                    return Err(RedDBError::Internal(
203                        "replication snapshot basebackup manifest changed while downloading".into(),
204                    ));
205                }
206            } else {
207                manifest = Some(staged.manifest.clone());
208                written.extend(
209                    crate::replication::replica::recover_staged_basebackup_chunks(
210                        &staged.manifest,
211                        &parts_root,
212                    )
213                    .map_err(|err| RedDBError::Internal(err.to_string()))?,
214                );
215            }
216            if let Some(ordinal) = staged.chunk_ordinal {
217                written.insert(ordinal);
218            }
219            let current = manifest
220                .as_ref()
221                .expect("manifest set after staging basebackup chunk");
222            if current
223                .chunks
224                .iter()
225                .all(|chunk| written.contains(&chunk.ordinal))
226            {
227                current
228                    .verify_snapshot_parts(&parts_root)
229                    .map_err(|err| RedDBError::Internal(err.to_string()))?;
230                let checkpoint_lsn = self.materialize_primary_replica_basebackup_snapshot(
231                    current,
232                    &parts_root,
233                    &pending_path,
234                )?;
235                self.inner.db.store().set_config_tree(
236                    "red.replication",
237                    &crate::json!({
238                        "state": "rebootstrap_ready",
239                        "rebootstrap_pending_path": pending_path.display().to_string(),
240                        "rebootstrap_checkpoint_lsn": checkpoint_lsn,
241                        "rebootstrap_timeline": current.timeline.0,
242                    }),
243                );
244                let data_path =
245                    self.inner.db.options().data_path.as_ref().ok_or_else(|| {
246                        RedDBError::Internal("replica data path unavailable".into())
247                    })?;
248                reddb_file::write_rebootstrap_ready_marker(
249                    data_path,
250                    &reddb_file::ReplicaRebootstrapReadyMarker {
251                        pending_path: pending_path.clone(),
252                        checkpoint_lsn,
253                        timeline: current.timeline,
254                    },
255                )
256                .map_err(|err| RedDBError::Internal(err.to_string()))?;
257                bootstrap_handle
258                    .complete(current.chunks.len() as u64, 0)
259                    .map_err(|err| RedDBError::Internal(err.to_string()))?;
260                return Ok(Some(checkpoint_lsn));
261            }
262            let next = current
263                .chunks
264                .iter()
265                .find(|chunk| !written.contains(&chunk.ordinal))
266                .ok_or_else(|| RedDBError::Internal("basebackup chunk tracking stalled".into()))?;
267            offset = next.snapshot_offset;
268            if let Some(snapshot_token) = token.as_deref() {
269                bootstrap_handle
270                    .checkpoint_snapshot_transfer(
271                        snapshot_token,
272                        offset,
273                        source_lsn,
274                        written.len() as u64,
275                    )
276                    .map_err(|err| RedDBError::Internal(err.to_string()))?;
277            }
278        }
279    }
280
281    pub fn primary_replica_slot_catalog(
282        &self,
283    ) -> RedDBResult<Option<reddb_file::ReplicationSlotCatalog>> {
284        let Some(plan) = self.primary_replica_file_plan_result()? else {
285            return Ok(None);
286        };
287        match reddb_file::ReplicationSlotCatalog::read_from_path(plan.slots_path()) {
288            Ok(catalog) => Ok(Some(catalog)),
289            Err(reddb_file::RdbFileError::Io(err))
290                if err.kind() == std::io::ErrorKind::NotFound =>
291            {
292                Ok(None)
293            }
294            Err(err) => Err(RedDBError::Internal(err.to_string())),
295        }
296    }
297
298    pub fn primary_replica_wal_retention_plan(
299        &self,
300    ) -> RedDBResult<Option<reddb_file::WalRetentionPlan>> {
301        let Some(plan) = self.primary_replica_file_plan_result()? else {
302            return Ok(None);
303        };
304        let Some(catalog) = self.primary_replica_slot_catalog()? else {
305            return Ok(None);
306        };
307        let current_lsn = self.primary_logical_head_lsn().max(self.cdc_current_lsn());
308        Ok(Some(
309            plan.plan_wal_retention(&catalog, current_lsn)
310                .map_err(|err| RedDBError::Internal(err.to_string()))?,
311        ))
312    }
313
314    pub fn primary_replica_catchup_mode(
315        &self,
316        available_from_lsn: u64,
317        replica_lsn: u64,
318    ) -> RedDBResult<Option<reddb_file::ReplicaCatchupMode>> {
319        let Some(plan) = self.primary_replica_file_plan_result()? else {
320            return Ok(None);
321        };
322        let basebackups = plan
323            .list_basebackups()
324            .map_err(|err| RedDBError::Internal(err.to_string()))?;
325        Ok(Some(plan.catchup_mode_with_basebackups(
326            available_from_lsn,
327            replica_lsn,
328            &basebackups,
329        )))
330    }
331
332    pub fn primary_replica_timeline_history_path(&self) -> Option<std::path::PathBuf> {
333        let root = self.primary_replica_root()?;
334        Some(
335            reddb_file::PrimaryReplicaFilePlan::new(root, reddb_file::TimelineId::initial())
336                .timeline_history_path(),
337        )
338    }
339
340    pub fn primary_replica_rejoin_decision(
341        &self,
342        node_timeline: reddb_file::TimelineId,
343        node_flushed_lsn: u64,
344        available_from_lsn: u64,
345    ) -> RedDBResult<Option<reddb_file::RejoinDecision>> {
346        let Some(path) = self.primary_replica_timeline_history_path() else {
347            return Ok(None);
348        };
349        let history = match reddb_file::TimelineHistory::read_from_path(&path) {
350            Ok(history) => history,
351            Err(reddb_file::RdbFileError::Io(err))
352                if err.kind() == std::io::ErrorKind::NotFound =>
353            {
354                reddb_file::TimelineHistory::new(crate::utils::now_unix_millis())
355            }
356            Err(err) => return Err(RedDBError::Internal(err.to_string())),
357        };
358        Ok(Some(history.rejoin_decision(
359            node_timeline,
360            node_flushed_lsn,
361            available_from_lsn,
362        )))
363    }
364
365    pub fn persist_primary_replica_rejoin_plan(
366        &self,
367        node_timeline: reddb_file::TimelineId,
368        node_flushed_lsn: u64,
369        available_from_lsn: u64,
370    ) -> RedDBResult<Option<reddb_file::RejoinDecision>> {
371        let Some(decision) = self.primary_replica_rejoin_decision(
372            node_timeline,
373            node_flushed_lsn,
374            available_from_lsn,
375        )?
376        else {
377            return Ok(None);
378        };
379
380        let (state, target_timeline, rewind_to_lsn, start_lsn) = match decision {
381            reddb_file::RejoinDecision::AlreadyCurrent => {
382                ("timeline_current", node_timeline.0, 0, node_flushed_lsn)
383            }
384            reddb_file::RejoinDecision::FollowNewTimeline {
385                target_timeline,
386                start_lsn,
387            } => ("rejoin_follow_wal", target_timeline.0, 0, start_lsn),
388            reddb_file::RejoinDecision::Rewind {
389                target_timeline,
390                rewind_to_lsn,
391            } => (
392                "rejoin_rewind_required",
393                target_timeline.0,
394                rewind_to_lsn,
395                0,
396            ),
397            reddb_file::RejoinDecision::Reclone => ("reclone_required", 0, 0, 0),
398        };
399        self.inner.db.store().set_config_tree(
400            "red.replication",
401            &crate::json!({
402                "state": state,
403                "rejoin_node_timeline": node_timeline.0,
404                "rejoin_node_flushed_lsn": node_flushed_lsn,
405                "rejoin_available_from_lsn": available_from_lsn,
406                "rejoin_target_timeline": target_timeline,
407                "rejoin_rewind_to_lsn": rewind_to_lsn,
408                "rejoin_start_lsn": start_lsn,
409                "rejoin_rewind_confirmed_timeline": 0,
410                "rejoin_rewind_confirmed_lsn": 0,
411            }),
412        );
413
414        Ok(Some(decision))
415    }
416
417    pub fn prune_primary_replica_wal_segments(
418        &self,
419    ) -> RedDBResult<Option<reddb_file::WalPruneResult>> {
420        let current_lsn = self.primary_logical_head_lsn().max(self.cdc_current_lsn());
421        self.prune_primary_replica_wal_segments_at(current_lsn)
422    }
423
424    fn prune_primary_replica_wal_segments_at(
425        &self,
426        current_lsn: u64,
427    ) -> RedDBResult<Option<reddb_file::WalPruneResult>> {
428        let Some(plan) = self.primary_replica_file_plan_result()? else {
429            return Ok(None);
430        };
431        let Some(catalog) = self.primary_replica_slot_catalog()? else {
432            return Ok(None);
433        };
434        Ok(Some(
435            plan.prune_wal_segments(&catalog, current_lsn)
436                .map_err(|err| RedDBError::Internal(err.to_string()))?,
437        ))
438    }
439
440    pub fn ack_primary_replica_lsn_and_prune(
441        &self,
442        replica_id: &str,
443        applied_lsn: u64,
444        durable_lsn: u64,
445        apply_errors_total: u64,
446        divergence_total: u64,
447    ) -> RedDBResult<Option<reddb_file::WalPruneResult>> {
448        let Some(repl) = self.inner.db.replication.as_ref() else {
449            return Ok(None);
450        };
451        repl.ack_replica_lsn_with_observability(
452            replica_id,
453            applied_lsn,
454            durable_lsn,
455            apply_errors_total,
456            divergence_total,
457        );
458        self.refresh_replication_flow_control();
459        let current_lsn = self
460            .primary_logical_head_lsn()
461            .max(self.cdc_current_lsn())
462            .max(applied_lsn)
463            .max(durable_lsn);
464        self.prune_primary_replica_wal_segments_at(current_lsn)
465    }
466
467    pub fn record_failover_timeline_promotion(
468        &self,
469        replica_id: &str,
470        applied_lsn: u64,
471    ) -> RedDBResult<reddb_file::TimelineHistory> {
472        let Some(path) = self.primary_replica_timeline_history_path() else {
473            return Ok(reddb_file::TimelineHistory::new(
474                crate::utils::now_unix_millis(),
475            ));
476        };
477        let now_ms = crate::utils::now_unix_millis();
478        let history = match reddb_file::TimelineHistory::read_from_path(&path) {
479            Ok(history) => history,
480            Err(reddb_file::RdbFileError::Io(err))
481                if err.kind() == std::io::ErrorKind::NotFound =>
482            {
483                reddb_file::TimelineHistory::new(now_ms)
484            }
485            Err(err) => return Err(RedDBError::Internal(err.to_string())),
486        };
487        let parent = history
488            .current()
489            .unwrap_or_else(reddb_file::TimelineId::initial);
490        let candidate = reddb_file::PromotionCandidate {
491            replica_id: replica_id.to_string(),
492            timeline: parent,
493            received_lsn: applied_lsn,
494            flushed_lsn: applied_lsn,
495            applied_lsn,
496        };
497        let promoted = history
498            .promotion_history(&candidate, parent.next(), now_ms)
499            .map_err(|err| RedDBError::Internal(err.to_string()))?;
500        promoted
501            .write_to_path(path)
502            .map_err(|err| RedDBError::Internal(err.to_string()))?;
503        Ok(promoted)
504    }
505
506    pub fn record_replica_relay_batch(
507        &self,
508        replica_id: &str,
509        records: &[(u64, Vec<u8>)],
510        applied_lsn: u64,
511    ) -> RedDBResult<()> {
512        let Some(plan) = self.primary_replica_file_plan_result()? else {
513            return Ok(());
514        };
515        let path = plan.relay_manifest_path(replica_id);
516        let Some((first_lsn, _)) = records.first() else {
517            return Ok(());
518        };
519        let end_lsn = records
520            .iter()
521            .map(|(lsn, _)| *lsn)
522            .max()
523            .unwrap_or(*first_lsn);
524        let mut manifest = match reddb_file::ReplicaRelayLogManifest::read_from_path(&path) {
525            Ok(manifest) => {
526                let relay_dir = path.parent().ok_or_else(|| {
527                    RedDBError::Internal("relay manifest path has no parent".into())
528                })?;
529                manifest
530                    .validate_segments(relay_dir)
531                    .map_err(|err| RedDBError::Internal(err.to_string()))?;
532                manifest
533            }
534            Err(reddb_file::RdbFileError::Io(err))
535                if err.kind() == std::io::ErrorKind::NotFound =>
536            {
537                reddb_file::ReplicaRelayLogManifest::new(
538                    replica_id,
539                    reddb_file::TimelineId::initial(),
540                )
541            }
542            Err(err) => return Err(RedDBError::Internal(err.to_string())),
543        };
544        if manifest.replica_id != replica_id {
545            return Err(RedDBError::Internal(format!(
546                "relay manifest replica_id {} does not match {}",
547                manifest.replica_id, replica_id
548            )));
549        }
550        if manifest.timeline != reddb_file::TimelineId::initial() {
551            return Err(RedDBError::Internal(format!(
552                "relay manifest timeline {} is not current timeline {}",
553                manifest.timeline.0,
554                reddb_file::TimelineId::initial().0
555            )));
556        }
557
558        if end_lsn > manifest.flushed_lsn {
559            let new_records = records
560                .iter()
561                .filter(|(lsn, _)| *lsn > manifest.flushed_lsn)
562                .map(|(lsn, payload)| reddb_file::ReplicaRelayLogRecord::new(*lsn, payload.clone()))
563                .collect::<Vec<_>>();
564            let segment = reddb_file::ReplicaRelayLogSegment::from_records(
565                reddb_file::TimelineId::initial(),
566                new_records,
567            )
568            .map_err(|err| RedDBError::Internal(err.to_string()))?;
569            let start_lsn = segment.start_lsn;
570            let end_lsn = segment.end_lsn;
571            let relative_path = reddb_file::layout::relay_segment_relative_path(start_lsn, end_lsn);
572            let segment_path = path
573                .parent()
574                .ok_or_else(|| RedDBError::Internal("relay manifest path has no parent".into()))?
575                .join(&relative_path);
576            segment
577                .write_to_path(&segment_path)
578                .map_err(|err| RedDBError::Internal(err.to_string()))?;
579            manifest
580                .push_segment(
581                    reddb_file::RelayLogSegmentRef::new(
582                        relative_path,
583                        start_lsn,
584                        end_lsn,
585                        segment
586                            .checksum()
587                            .map_err(|err| RedDBError::Internal(err.to_string()))?,
588                    )
589                    .map_err(|err| RedDBError::Internal(err.to_string()))?,
590                )
591                .map_err(|err| RedDBError::Internal(err.to_string()))?;
592        }
593        manifest
594            .mark_applied(applied_lsn.min(manifest.flushed_lsn))
595            .map_err(|err| RedDBError::Internal(err.to_string()))?;
596        manifest
597            .write_to_path(path)
598            .map_err(|err| RedDBError::Internal(err.to_string()))
599    }
600}