Skip to main content

reddb_file/primary_replica/
timeline.rs

1use super::*;
2
3#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
4pub struct TimelineId(pub u64);
5
6impl TimelineId {
7    pub const fn initial() -> Self {
8        Self(1)
9    }
10
11    pub const fn next(self) -> Self {
12        Self(self.0 + 1)
13    }
14}
15
16#[derive(Debug, Clone, PartialEq, Eq)]
17pub struct TimelineHistoryEntry {
18    pub timeline: TimelineId,
19    pub parent_timeline: Option<TimelineId>,
20    pub fork_lsn: u64,
21    pub created_at_unix_ms: u64,
22    pub reason: String,
23}
24
25impl TimelineHistoryEntry {
26    pub fn initial(created_at_unix_ms: u64) -> Self {
27        Self {
28            timeline: TimelineId::initial(),
29            parent_timeline: None,
30            fork_lsn: 0,
31            created_at_unix_ms,
32            reason: "initial".into(),
33        }
34    }
35
36    pub fn fork(
37        timeline: TimelineId,
38        parent_timeline: TimelineId,
39        fork_lsn: u64,
40        created_at_unix_ms: u64,
41        reason: impl Into<String>,
42    ) -> Self {
43        Self {
44            timeline,
45            parent_timeline: Some(parent_timeline),
46            fork_lsn,
47            created_at_unix_ms,
48            reason: reason.into(),
49        }
50    }
51}
52
53#[derive(Debug, Clone, PartialEq, Eq)]
54pub struct TimelineHistory {
55    pub entries: Vec<TimelineHistoryEntry>,
56}
57
58impl TimelineHistory {
59    pub fn new(initial_created_at_unix_ms: u64) -> Self {
60        Self {
61            entries: vec![TimelineHistoryEntry::initial(initial_created_at_unix_ms)],
62        }
63    }
64
65    pub fn current(&self) -> Option<TimelineId> {
66        self.entries.last().map(|entry| entry.timeline)
67    }
68
69    pub fn fork(
70        &mut self,
71        new_timeline: TimelineId,
72        parent_timeline: TimelineId,
73        fork_lsn: u64,
74        created_at_unix_ms: u64,
75        reason: impl Into<String>,
76    ) -> RdbFileResult<()> {
77        if self.current() != Some(parent_timeline) {
78            return Err(RdbFileError::InvalidOperation(format!(
79                "cannot fork timeline {} from non-current parent {}",
80                new_timeline.0, parent_timeline.0
81            )));
82        }
83        if new_timeline <= parent_timeline {
84            return Err(RdbFileError::InvalidOperation(format!(
85                "new timeline {} must be greater than parent {}",
86                new_timeline.0, parent_timeline.0
87            )));
88        }
89        self.entries.push(TimelineHistoryEntry::fork(
90            new_timeline,
91            parent_timeline,
92            fork_lsn,
93            created_at_unix_ms,
94            reason,
95        ));
96        Ok(())
97    }
98
99    pub fn ancestor_lsn(&self, timeline: TimelineId) -> Option<u64> {
100        self.entries
101            .iter()
102            .find(|entry| entry.timeline == timeline)
103            .map(|entry| entry.fork_lsn)
104    }
105
106    pub fn descendant_chain_from(&self, timeline: TimelineId) -> Option<Vec<TimelineHistoryEntry>> {
107        let index = self
108            .entries
109            .iter()
110            .position(|entry| entry.timeline == timeline)?;
111        Some(self.entries[index.saturating_add(1)..].to_vec())
112    }
113
114    pub fn rejoin_decision(
115        &self,
116        node_timeline: TimelineId,
117        node_flushed_lsn: u64,
118        available_from_lsn: u64,
119    ) -> RejoinDecision {
120        if self.current() == Some(node_timeline) {
121            return RejoinDecision::AlreadyCurrent;
122        }
123        let Some(current) = self.current() else {
124            return RejoinDecision::Reclone;
125        };
126        let Some(chain) = self.descendant_chain_from(node_timeline) else {
127            return RejoinDecision::Reclone;
128        };
129        let Some(first_child) = chain.first() else {
130            return RejoinDecision::Reclone;
131        };
132        if node_flushed_lsn >= first_child.fork_lsn {
133            RejoinDecision::Rewind {
134                target_timeline: current,
135                rewind_to_lsn: first_child.fork_lsn,
136            }
137        } else if node_flushed_lsn >= available_from_lsn {
138            RejoinDecision::FollowNewTimeline {
139                target_timeline: current,
140                start_lsn: node_flushed_lsn,
141            }
142        } else {
143            RejoinDecision::Reclone
144        }
145    }
146
147    pub fn promotion_history(
148        &self,
149        candidate: &PromotionCandidate,
150        new_timeline: TimelineId,
151        created_at_unix_ms: u64,
152    ) -> RdbFileResult<Self> {
153        let mut next = self.clone();
154        next.fork(
155            new_timeline,
156            candidate.timeline,
157            candidate.applied_lsn,
158            created_at_unix_ms,
159            format!("promote {}", candidate.replica_id),
160        )?;
161        Ok(next)
162    }
163
164    pub fn write_to_path(&self, path: impl AsRef<Path>) -> RdbFileResult<()> {
165        write_bytes_atomically(path.as_ref(), &self.encode())
166    }
167
168    pub fn read_from_path(path: impl AsRef<Path>) -> RdbFileResult<Self> {
169        Self::decode(&fs::read(path)?)
170    }
171
172    pub fn encode(&self) -> Vec<u8> {
173        let mut out = Vec::new();
174        out.extend_from_slice(TIMELINE_HISTORY_MAGIC);
175        put_u16(&mut out, PRIMARY_REPLICA_ARTIFACT_VERSION);
176        put_u32(&mut out, self.entries.len() as u32);
177        for entry in &self.entries {
178            put_u64(&mut out, entry.timeline.0);
179            match entry.parent_timeline {
180                Some(parent) => {
181                    out.push(1);
182                    put_u64(&mut out, parent.0);
183                }
184                None => {
185                    out.push(0);
186                    put_u64(&mut out, 0);
187                }
188            }
189            put_u64(&mut out, entry.fork_lsn);
190            put_u64(&mut out, entry.created_at_unix_ms);
191            put_string(&mut out, &entry.reason);
192        }
193        let checksum = crc32(&out);
194        put_u32(&mut out, checksum);
195        out
196    }
197
198    pub fn decode(bytes: &[u8]) -> RdbFileResult<Self> {
199        verify_checksum(bytes, "timeline history")?;
200        let payload_end = bytes.len() - CHECKSUM_LEN;
201        let mut cursor = 0usize;
202        expect_magic(
203            bytes,
204            &mut cursor,
205            payload_end,
206            TIMELINE_HISTORY_MAGIC,
207            "timeline history",
208        )?;
209        let version = take_u16(bytes, &mut cursor, payload_end)?;
210        if version != PRIMARY_REPLICA_ARTIFACT_VERSION {
211            return Err(RdbFileError::InvalidOperation(format!(
212                "unsupported timeline history version {version}"
213            )));
214        }
215        let count = take_u32(bytes, &mut cursor, payload_end)? as usize;
216        if count == 0 {
217            return Err(RdbFileError::InvalidOperation(
218                "timeline history must contain an initial entry".into(),
219            ));
220        }
221        let mut entries = Vec::with_capacity(count);
222        let mut previous = None;
223        for index in 0..count {
224            let timeline = TimelineId(take_u64(bytes, &mut cursor, payload_end)?);
225            let has_parent = take_u8(bytes, &mut cursor, payload_end)? != 0;
226            let parent_raw = TimelineId(take_u64(bytes, &mut cursor, payload_end)?);
227            let parent_timeline = has_parent.then_some(parent_raw);
228            let fork_lsn = take_u64(bytes, &mut cursor, payload_end)?;
229            let created_at_unix_ms = take_u64(bytes, &mut cursor, payload_end)?;
230            let reason = take_string(bytes, &mut cursor, payload_end)?;
231            if index == 0 && parent_timeline.is_some() {
232                return Err(RdbFileError::InvalidOperation(
233                    "initial timeline history entry cannot have a parent".into(),
234                ));
235            }
236            if index == 0 && timeline != TimelineId::initial() {
237                return Err(RdbFileError::InvalidOperation(
238                    "timeline history must start at initial timeline".into(),
239                ));
240            }
241            if index > 0 && parent_timeline != previous {
242                return Err(RdbFileError::InvalidOperation(
243                    "timeline history parent does not match previous timeline".into(),
244                ));
245            }
246            if let Some(parent) = parent_timeline {
247                if timeline <= parent {
248                    return Err(RdbFileError::InvalidOperation(format!(
249                        "timeline {} must be greater than parent {}",
250                        timeline.0, parent.0
251                    )));
252                }
253            }
254            previous = Some(timeline);
255            entries.push(TimelineHistoryEntry {
256                timeline,
257                parent_timeline,
258                fork_lsn,
259                created_at_unix_ms,
260                reason,
261            });
262        }
263        reject_trailing_bytes(bytes, cursor, payload_end, "timeline history")?;
264        Ok(Self { entries })
265    }
266}
267
268#[derive(Debug, Clone, Copy, PartialEq, Eq)]
269pub enum RejoinDecision {
270    AlreadyCurrent,
271    FollowNewTimeline {
272        target_timeline: TimelineId,
273        start_lsn: u64,
274    },
275    Rewind {
276        target_timeline: TimelineId,
277        rewind_to_lsn: u64,
278    },
279    Reclone,
280}
281
282#[derive(Debug, Clone, PartialEq, Eq)]
283pub struct PromotionCandidate {
284    pub replica_id: String,
285    pub timeline: TimelineId,
286    pub received_lsn: u64,
287    pub flushed_lsn: u64,
288    pub applied_lsn: u64,
289}
290
291impl PromotionCandidate {
292    pub fn select(timeline: TimelineId, acks: &[ReplicaAck]) -> Option<Self> {
293        acks.iter()
294            .filter(|ack| ack.timeline == timeline)
295            .max_by(|left, right| {
296                (
297                    left.applied_lsn,
298                    left.flushed_lsn,
299                    left.received_lsn,
300                    std::cmp::Reverse(left.replica_id.as_str()),
301                )
302                    .cmp(&(
303                        right.applied_lsn,
304                        right.flushed_lsn,
305                        right.received_lsn,
306                        std::cmp::Reverse(right.replica_id.as_str()),
307                    ))
308            })
309            .map(|ack| Self {
310                replica_id: ack.replica_id.clone(),
311                timeline: ack.timeline,
312                received_lsn: ack.received_lsn,
313                flushed_lsn: ack.flushed_lsn,
314                applied_lsn: ack.applied_lsn,
315            })
316    }
317}