1use std::cmp::Ordering;
6use std::fmt::{self, Display};
7
8use fail::fail_point;
9use num_derive::{FromPrimitive, ToPrimitive};
10use num_traits::ToPrimitive;
11use serde_repr::{Deserialize_repr, Serialize_repr};
12use strum::EnumIter;
13
14use crate::Result;
15
16#[repr(u8)]
18#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)]
19pub enum LogQueue {
20 Append = 0,
21 Rewrite = 1,
22}
23
24pub type FileSeq = u64;
26
27#[derive(Debug, Copy, Clone, PartialEq, Eq)]
29pub struct FileId {
30 pub queue: LogQueue,
31 pub seq: FileSeq,
32}
33
34impl FileId {
35 pub fn new(queue: LogQueue, seq: FileSeq) -> Self {
37 Self { queue, seq }
38 }
39
40 #[cfg(test)]
42 pub fn dummy(queue: LogQueue) -> Self {
43 Self { queue, seq: 0 }
44 }
45}
46
47impl std::cmp::Ord for FileId {
49 fn cmp(&self, other: &Self) -> Ordering {
50 match (self.queue, other.queue) {
51 (LogQueue::Append, LogQueue::Rewrite) => Ordering::Greater,
52 (LogQueue::Rewrite, LogQueue::Append) => Ordering::Less,
53 _ => self.seq.cmp(&other.seq),
54 }
55 }
56}
57
58impl std::cmp::PartialOrd for FileId {
59 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
60 Some(self.cmp(other))
61 }
62}
63
64#[derive(Debug, Copy, Clone, PartialEq, Eq)]
66pub struct FileBlockHandle {
67 pub id: FileId,
68 pub offset: u64,
69 pub len: usize,
70}
71
72impl FileBlockHandle {
73 #[cfg(test)]
75 pub fn dummy(queue: LogQueue) -> Self {
76 Self {
77 id: FileId::dummy(queue),
78 offset: 0,
79 len: 0,
80 }
81 }
82}
83
84#[repr(u64)]
86#[derive(
87 Clone,
88 Copy,
89 Debug,
90 Eq,
91 PartialEq,
92 FromPrimitive,
93 ToPrimitive,
94 Serialize_repr,
95 Deserialize_repr,
96 EnumIter,
97 Default,
98)]
99pub enum Version {
100 #[default]
101 V1 = 1,
102 V2 = 2,
103}
104
105impl Version {
106 pub fn has_log_signing(&self) -> bool {
107 fail_point!("pipe_log::version::force_enable_log_signing", |_| { true });
108 match self {
109 Version::V1 => false,
110 Version::V2 => true,
111 }
112 }
113}
114
115impl Display for Version {
116 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
117 write!(f, "{}", self.to_u64().unwrap())
118 }
119}
120
121pub struct LogFileContext {
122 pub id: FileId,
123 pub version: Version,
124}
125
126impl LogFileContext {
127 pub fn new(id: FileId, version: Version) -> Self {
128 Self { id, version }
129 }
130
131 pub fn get_signature(&self) -> Option<u32> {
133 if self.version.has_log_signing() {
134 Some(self.id.seq as u32)
138 } else {
139 None
140 }
141 }
142}
143
144pub trait ReactiveBytes {
146 fn as_bytes(&mut self, ctx: &LogFileContext) -> &[u8];
147}
148
149impl<T> ReactiveBytes for &T
150where
151 T: AsRef<[u8]> + ?Sized,
152{
153 fn as_bytes(&mut self, _ctx: &LogFileContext) -> &[u8] {
154 (*self).as_ref()
155 }
156}
157
158pub trait PipeLog: Sized {
167 fn read_bytes(&self, handle: FileBlockHandle) -> Result<Vec<u8>>;
169
170 fn append<T: ReactiveBytes + ?Sized>(
173 &self,
174 queue: LogQueue,
175 bytes: &mut T,
176 ) -> Result<FileBlockHandle>;
177
178 fn sync(&self, queue: LogQueue) -> Result<()>;
183
184 fn file_span(&self, queue: LogQueue) -> (FileSeq, FileSeq);
187
188 fn file_at(&self, queue: LogQueue, mut position: f64) -> FileSeq {
190 position = position.clamp(0.0, 1.0);
191 let (first, active) = self.file_span(queue);
192 let count = active - first + 1;
193 first + (count as f64 * position) as u64
194 }
195
196 fn total_size(&self, queue: LogQueue) -> usize;
198
199 fn rotate(&self, queue: LogQueue) -> Result<()>;
204
205 fn purge_to(&self, file_id: FileId) -> Result<usize>;
210}