reddb_file/primary_replica/
timeline.rs1use super::*;
2
3#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
4pub struct TimelineId(pub u64);
5
6impl TimelineId {
7 pub const fn initial() -> Self {
8 Self(1)
9 }
10
11 pub const fn next(self) -> Self {
12 Self(self.0 + 1)
13 }
14}
15
16#[derive(Debug, Clone, PartialEq, Eq)]
17pub struct TimelineHistoryEntry {
18 pub timeline: TimelineId,
19 pub parent_timeline: Option<TimelineId>,
20 pub fork_lsn: u64,
21 pub created_at_unix_ms: u64,
22 pub reason: String,
23}
24
25impl TimelineHistoryEntry {
26 pub fn initial(created_at_unix_ms: u64) -> Self {
27 Self {
28 timeline: TimelineId::initial(),
29 parent_timeline: None,
30 fork_lsn: 0,
31 created_at_unix_ms,
32 reason: "initial".into(),
33 }
34 }
35
36 pub fn fork(
37 timeline: TimelineId,
38 parent_timeline: TimelineId,
39 fork_lsn: u64,
40 created_at_unix_ms: u64,
41 reason: impl Into<String>,
42 ) -> Self {
43 Self {
44 timeline,
45 parent_timeline: Some(parent_timeline),
46 fork_lsn,
47 created_at_unix_ms,
48 reason: reason.into(),
49 }
50 }
51}
52
53#[derive(Debug, Clone, PartialEq, Eq)]
54pub struct TimelineHistory {
55 pub entries: Vec<TimelineHistoryEntry>,
56}
57
58impl TimelineHistory {
59 pub fn new(initial_created_at_unix_ms: u64) -> Self {
60 Self {
61 entries: vec![TimelineHistoryEntry::initial(initial_created_at_unix_ms)],
62 }
63 }
64
65 pub fn current(&self) -> Option<TimelineId> {
66 self.entries.last().map(|entry| entry.timeline)
67 }
68
69 pub fn fork(
70 &mut self,
71 new_timeline: TimelineId,
72 parent_timeline: TimelineId,
73 fork_lsn: u64,
74 created_at_unix_ms: u64,
75 reason: impl Into<String>,
76 ) -> RdbFileResult<()> {
77 if self.current() != Some(parent_timeline) {
78 return Err(RdbFileError::InvalidOperation(format!(
79 "cannot fork timeline {} from non-current parent {}",
80 new_timeline.0, parent_timeline.0
81 )));
82 }
83 if new_timeline <= parent_timeline {
84 return Err(RdbFileError::InvalidOperation(format!(
85 "new timeline {} must be greater than parent {}",
86 new_timeline.0, parent_timeline.0
87 )));
88 }
89 self.entries.push(TimelineHistoryEntry::fork(
90 new_timeline,
91 parent_timeline,
92 fork_lsn,
93 created_at_unix_ms,
94 reason,
95 ));
96 Ok(())
97 }
98
99 pub fn ancestor_lsn(&self, timeline: TimelineId) -> Option<u64> {
100 self.entries
101 .iter()
102 .find(|entry| entry.timeline == timeline)
103 .map(|entry| entry.fork_lsn)
104 }
105
106 pub fn descendant_chain_from(&self, timeline: TimelineId) -> Option<Vec<TimelineHistoryEntry>> {
107 let index = self
108 .entries
109 .iter()
110 .position(|entry| entry.timeline == timeline)?;
111 Some(self.entries[index.saturating_add(1)..].to_vec())
112 }
113
114 pub fn rejoin_decision(
115 &self,
116 node_timeline: TimelineId,
117 node_flushed_lsn: u64,
118 available_from_lsn: u64,
119 ) -> RejoinDecision {
120 if self.current() == Some(node_timeline) {
121 return RejoinDecision::AlreadyCurrent;
122 }
123 let Some(current) = self.current() else {
124 return RejoinDecision::Reclone;
125 };
126 let Some(chain) = self.descendant_chain_from(node_timeline) else {
127 return RejoinDecision::Reclone;
128 };
129 let Some(first_child) = chain.first() else {
130 return RejoinDecision::Reclone;
131 };
132 if node_flushed_lsn >= first_child.fork_lsn {
133 RejoinDecision::Rewind {
134 target_timeline: current,
135 rewind_to_lsn: first_child.fork_lsn,
136 }
137 } else if node_flushed_lsn >= available_from_lsn {
138 RejoinDecision::FollowNewTimeline {
139 target_timeline: current,
140 start_lsn: node_flushed_lsn,
141 }
142 } else {
143 RejoinDecision::Reclone
144 }
145 }
146
147 pub fn promotion_history(
148 &self,
149 candidate: &PromotionCandidate,
150 new_timeline: TimelineId,
151 created_at_unix_ms: u64,
152 ) -> RdbFileResult<Self> {
153 let mut next = self.clone();
154 next.fork(
155 new_timeline,
156 candidate.timeline,
157 candidate.applied_lsn,
158 created_at_unix_ms,
159 format!("promote {}", candidate.replica_id),
160 )?;
161 Ok(next)
162 }
163
164 pub fn write_to_path(&self, path: impl AsRef<Path>) -> RdbFileResult<()> {
165 write_bytes_atomically(path.as_ref(), &self.encode())
166 }
167
168 pub fn read_from_path(path: impl AsRef<Path>) -> RdbFileResult<Self> {
169 Self::decode(&fs::read(path)?)
170 }
171
172 pub fn encode(&self) -> Vec<u8> {
173 let mut out = Vec::new();
174 out.extend_from_slice(TIMELINE_HISTORY_MAGIC);
175 put_u16(&mut out, PRIMARY_REPLICA_ARTIFACT_VERSION);
176 put_u32(&mut out, self.entries.len() as u32);
177 for entry in &self.entries {
178 put_u64(&mut out, entry.timeline.0);
179 match entry.parent_timeline {
180 Some(parent) => {
181 out.push(1);
182 put_u64(&mut out, parent.0);
183 }
184 None => {
185 out.push(0);
186 put_u64(&mut out, 0);
187 }
188 }
189 put_u64(&mut out, entry.fork_lsn);
190 put_u64(&mut out, entry.created_at_unix_ms);
191 put_string(&mut out, &entry.reason);
192 }
193 let checksum = crc32(&out);
194 put_u32(&mut out, checksum);
195 out
196 }
197
198 pub fn decode(bytes: &[u8]) -> RdbFileResult<Self> {
199 verify_checksum(bytes, "timeline history")?;
200 let payload_end = bytes.len() - CHECKSUM_LEN;
201 let mut cursor = 0usize;
202 expect_magic(
203 bytes,
204 &mut cursor,
205 payload_end,
206 TIMELINE_HISTORY_MAGIC,
207 "timeline history",
208 )?;
209 let version = take_u16(bytes, &mut cursor, payload_end)?;
210 if version != PRIMARY_REPLICA_ARTIFACT_VERSION {
211 return Err(RdbFileError::InvalidOperation(format!(
212 "unsupported timeline history version {version}"
213 )));
214 }
215 let count = take_u32(bytes, &mut cursor, payload_end)? as usize;
216 if count == 0 {
217 return Err(RdbFileError::InvalidOperation(
218 "timeline history must contain an initial entry".into(),
219 ));
220 }
221 let mut entries = Vec::with_capacity(count);
222 let mut previous = None;
223 for index in 0..count {
224 let timeline = TimelineId(take_u64(bytes, &mut cursor, payload_end)?);
225 let has_parent = take_u8(bytes, &mut cursor, payload_end)? != 0;
226 let parent_raw = TimelineId(take_u64(bytes, &mut cursor, payload_end)?);
227 let parent_timeline = has_parent.then_some(parent_raw);
228 let fork_lsn = take_u64(bytes, &mut cursor, payload_end)?;
229 let created_at_unix_ms = take_u64(bytes, &mut cursor, payload_end)?;
230 let reason = take_string(bytes, &mut cursor, payload_end)?;
231 if index == 0 && parent_timeline.is_some() {
232 return Err(RdbFileError::InvalidOperation(
233 "initial timeline history entry cannot have a parent".into(),
234 ));
235 }
236 if index == 0 && timeline != TimelineId::initial() {
237 return Err(RdbFileError::InvalidOperation(
238 "timeline history must start at initial timeline".into(),
239 ));
240 }
241 if index > 0 && parent_timeline != previous {
242 return Err(RdbFileError::InvalidOperation(
243 "timeline history parent does not match previous timeline".into(),
244 ));
245 }
246 if let Some(parent) = parent_timeline {
247 if timeline <= parent {
248 return Err(RdbFileError::InvalidOperation(format!(
249 "timeline {} must be greater than parent {}",
250 timeline.0, parent.0
251 )));
252 }
253 }
254 previous = Some(timeline);
255 entries.push(TimelineHistoryEntry {
256 timeline,
257 parent_timeline,
258 fork_lsn,
259 created_at_unix_ms,
260 reason,
261 });
262 }
263 reject_trailing_bytes(bytes, cursor, payload_end, "timeline history")?;
264 Ok(Self { entries })
265 }
266}
267
268#[derive(Debug, Clone, Copy, PartialEq, Eq)]
269pub enum RejoinDecision {
270 AlreadyCurrent,
271 FollowNewTimeline {
272 target_timeline: TimelineId,
273 start_lsn: u64,
274 },
275 Rewind {
276 target_timeline: TimelineId,
277 rewind_to_lsn: u64,
278 },
279 Reclone,
280}
281
282#[derive(Debug, Clone, PartialEq, Eq)]
283pub struct PromotionCandidate {
284 pub replica_id: String,
285 pub timeline: TimelineId,
286 pub received_lsn: u64,
287 pub flushed_lsn: u64,
288 pub applied_lsn: u64,
289}
290
291impl PromotionCandidate {
292 pub fn select(timeline: TimelineId, acks: &[ReplicaAck]) -> Option<Self> {
293 acks.iter()
294 .filter(|ack| ack.timeline == timeline)
295 .max_by(|left, right| {
296 (
297 left.applied_lsn,
298 left.flushed_lsn,
299 left.received_lsn,
300 std::cmp::Reverse(left.replica_id.as_str()),
301 )
302 .cmp(&(
303 right.applied_lsn,
304 right.flushed_lsn,
305 right.received_lsn,
306 std::cmp::Reverse(right.replica_id.as_str()),
307 ))
308 })
309 .map(|ack| Self {
310 replica_id: ack.replica_id.clone(),
311 timeline: ack.timeline,
312 received_lsn: ack.received_lsn,
313 flushed_lsn: ack.flushed_lsn,
314 applied_lsn: ack.applied_lsn,
315 })
316 }
317}