raft_engine/
pipe_log.rs

1// Copyright (c) 2017-present, PingCAP, Inc. Licensed under Apache-2.0.
2
3//! A generic log storage.
4
5use std::cmp::Ordering;
6use std::fmt::{self, Display};
7
8use fail::fail_point;
9use num_derive::{FromPrimitive, ToPrimitive};
10use num_traits::ToPrimitive;
11use serde_repr::{Deserialize_repr, Serialize_repr};
12use strum::EnumIter;
13
14use crate::Result;
15
16/// The type of log queue.
17#[repr(u8)]
18#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)]
19pub enum LogQueue {
20    Append = 0,
21    Rewrite = 1,
22}
23
24/// Sequence number for log file. It is unique within a log queue.
25pub type FileSeq = u64;
26
27/// A unique identifier for a log file.
28#[derive(Debug, Copy, Clone, PartialEq, Eq)]
29pub struct FileId {
30    pub queue: LogQueue,
31    pub seq: FileSeq,
32}
33
34impl FileId {
35    /// Creates a [`FileId`] from a [`LogQueue`] and a [`FileSeq`].
36    pub fn new(queue: LogQueue, seq: FileSeq) -> Self {
37        Self { queue, seq }
38    }
39
40    /// Creates a new [`FileId`] representing a non-existing file.
41    #[cfg(test)]
42    pub fn dummy(queue: LogQueue) -> Self {
43        Self { queue, seq: 0 }
44    }
45}
46
47/// Order by freshness.
48impl std::cmp::Ord for FileId {
49    fn cmp(&self, other: &Self) -> Ordering {
50        match (self.queue, other.queue) {
51            (LogQueue::Append, LogQueue::Rewrite) => Ordering::Greater,
52            (LogQueue::Rewrite, LogQueue::Append) => Ordering::Less,
53            _ => self.seq.cmp(&other.seq),
54        }
55    }
56}
57
58impl std::cmp::PartialOrd for FileId {
59    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
60        Some(self.cmp(other))
61    }
62}
63
64/// A logical pointer to a chunk of log file data.
65#[derive(Debug, Copy, Clone, PartialEq, Eq)]
66pub struct FileBlockHandle {
67    pub id: FileId,
68    pub offset: u64,
69    pub len: usize,
70}
71
72impl FileBlockHandle {
73    /// Creates a new [`FileBlockHandle`] that points to nothing.
74    #[cfg(test)]
75    pub fn dummy(queue: LogQueue) -> Self {
76        Self {
77            id: FileId::dummy(queue),
78            offset: 0,
79            len: 0,
80        }
81    }
82}
83
84/// Version of log file format.
85#[repr(u64)]
86#[derive(
87    Clone,
88    Copy,
89    Debug,
90    Eq,
91    PartialEq,
92    FromPrimitive,
93    ToPrimitive,
94    Serialize_repr,
95    Deserialize_repr,
96    EnumIter,
97    Default,
98)]
99pub enum Version {
100    #[default]
101    V1 = 1,
102    V2 = 2,
103}
104
105impl Version {
106    pub fn has_log_signing(&self) -> bool {
107        fail_point!("pipe_log::version::force_enable_log_signing", |_| { true });
108        match self {
109            Version::V1 => false,
110            Version::V2 => true,
111        }
112    }
113}
114
115impl Display for Version {
116    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
117        write!(f, "{}", self.to_u64().unwrap())
118    }
119}
120
121pub struct LogFileContext {
122    pub id: FileId,
123    pub version: Version,
124}
125
126impl LogFileContext {
127    pub fn new(id: FileId, version: Version) -> Self {
128        Self { id, version }
129    }
130
131    /// Returns the `signature` in `u32` format.
132    pub fn get_signature(&self) -> Option<u32> {
133        if self.version.has_log_signing() {
134            // Here, the count of files will be always limited to less than
135            // `u32::MAX`. So, we just use the low 32 bit as the `signature`
136            // by default.
137            Some(self.id.seq as u32)
138        } else {
139            None
140        }
141    }
142}
143
144/// Some bytes whose value might be dependent on the file it is written to.
145pub trait ReactiveBytes {
146    fn as_bytes(&mut self, ctx: &LogFileContext) -> &[u8];
147}
148
149impl<T> ReactiveBytes for &T
150where
151    T: AsRef<[u8]> + ?Sized,
152{
153    fn as_bytes(&mut self, _ctx: &LogFileContext) -> &[u8] {
154        (*self).as_ref()
155    }
156}
157
158/// A `PipeLog` serves reads and writes over multiple queues of log files.
159///
160/// # Safety
161///
162/// The pipe will panic if it encounters an unrecoverable failure. Otherwise the
163/// operations on it should be atomic, i.e. failed operation will not affect
164/// other ones, and user can still use it afterwards without breaking
165/// consistency.
166pub trait PipeLog: Sized {
167    /// Reads some bytes from the specified position.
168    fn read_bytes(&self, handle: FileBlockHandle) -> Result<Vec<u8>>;
169
170    /// Appends some bytes to the specified log queue. Returns file position of
171    /// the written bytes.
172    fn append<T: ReactiveBytes + ?Sized>(
173        &self,
174        queue: LogQueue,
175        bytes: &mut T,
176    ) -> Result<FileBlockHandle>;
177
178    /// Synchronizes all buffered writes.
179    ///
180    /// This operation might incurs a great latency overhead. It's advised to
181    /// call it once every batch of writes.
182    fn sync(&self, queue: LogQueue) -> Result<()>;
183
184    /// Returns the smallest and largest file sequence number, still in use,
185    /// of the specified log queue.
186    fn file_span(&self, queue: LogQueue) -> (FileSeq, FileSeq);
187
188    /// Returns the oldest file ID that is newer than `position`% of all files.
189    fn file_at(&self, queue: LogQueue, mut position: f64) -> FileSeq {
190        position = position.clamp(0.0, 1.0);
191        let (first, active) = self.file_span(queue);
192        let count = active - first + 1;
193        first + (count as f64 * position) as u64
194    }
195
196    /// Returns total size of the specified log queue.
197    fn total_size(&self, queue: LogQueue) -> usize;
198
199    /// Rotates a new log file for the specified log queue.
200    ///
201    /// Implementation should be atomic under error conditions but not
202    /// necessarily panic-safe.
203    fn rotate(&self, queue: LogQueue) -> Result<()>;
204
205    /// Deletes all log files smaller than the specified file ID. The scope is
206    /// limited to the log queue of `file_id`.
207    ///
208    /// Returns the number of deleted files.
209    fn purge_to(&self, file_id: FileId) -> Result<usize>;
210}