1use super::mmap::MemoryMap;
2use crate::error::{JournalError, Result};
3use crate::file::{file::JournalFile, filter::FilterExpr, offset_array, offset_array::Direction};
4use std::num::NonZeroU64;
5
6#[derive(Debug, Copy, Clone, PartialEq, Eq)]
7pub enum Location {
8 Head,
9 Tail,
10 Realtime(u64),
11 Monotonic(u64, [u8; 16]),
12 Seqnum(u64, Option<[u8; 16]>),
13 XorHash(u64),
14 ResolvedEntry(NonZeroU64),
15}
16
17impl Default for Location {
18 fn default() -> Self {
19 Self::Head
20 }
21}
22
23#[derive(Debug)]
24pub struct JournalCursor {
25 pub location: Location,
26 pub filter_expr: Option<FilterExpr>,
27 pub array_cursor: Option<offset_array::Cursor>,
28}
29
30impl JournalCursor {
31 #[allow(clippy::new_without_default)]
32 pub fn new() -> Self {
33 Self {
34 location: Location::Head,
35 filter_expr: None,
36 array_cursor: None,
37 }
38 }
39
40 pub fn set_location(&mut self, location: Location) {
41 self.location = location;
42 self.array_cursor = None;
43 }
44
45 pub fn set_filter(&mut self, filter_expr: FilterExpr) {
46 self.filter_expr = Some(filter_expr);
47 }
49
50 pub fn clear_filter(&mut self) {
51 self.filter_expr = None;
52 self.array_cursor = None;
53 self.set_location(Location::Head);
54 }
55
56 pub fn step<M: MemoryMap>(
57 &mut self,
58 journal_file: &JournalFile<M>,
59 direction: Direction,
60 ) -> Result<bool> {
61 let new_location = if self.filter_expr.is_some() {
62 self.resolve_filter_location(journal_file, direction)?
63 } else {
64 self.resolve_array_cursor(journal_file, direction)?
65 };
66
67 if let Some(location) = new_location {
68 self.location = location;
69 Ok(true)
70 } else {
71 Ok(false)
72 }
73 }
74
75 pub fn position(&self) -> Result<NonZeroU64> {
76 match self.location {
77 Location::ResolvedEntry(entry_offset) => Ok(entry_offset),
78 _ => Err(JournalError::UnsetCursor),
79 }
80 }
81
82 fn entry_list<M: MemoryMap>(journal_file: &JournalFile<M>) -> Result<offset_array::List> {
83 journal_file
84 .entry_list()
85 .ok_or(JournalError::InvalidOffsetArrayOffset)
86 }
87
88 fn store_array_cursor_value<M: MemoryMap>(
89 &mut self,
90 journal_file: &JournalFile<M>,
91 cursor: offset_array::Cursor,
92 ) -> Result<Option<Location>> {
93 let Some((cursor, offset)) = cursor.materialize_value(journal_file)? else {
94 return Ok(None);
95 };
96 self.array_cursor = Some(cursor);
97 Ok(Some(Location::ResolvedEntry(offset)))
98 }
99
100 fn resolve_head_array<M: MemoryMap>(
101 &mut self,
102 journal_file: &JournalFile<M>,
103 ) -> Result<Option<Location>> {
104 let cursor = Self::entry_list(journal_file)?.cursor_head();
105 self.store_array_cursor_value(journal_file, cursor)
106 }
107
108 fn resolve_tail_array<M: MemoryMap>(
109 &mut self,
110 journal_file: &JournalFile<M>,
111 ) -> Result<Option<Location>> {
112 let entry_list = Self::entry_list(journal_file)?;
113 let cursor = entry_list.cursor_tail(journal_file)?;
114 self.store_array_cursor_value(journal_file, cursor)
115 }
116
117 fn realtime_array_cursor<M: MemoryMap>(
118 journal_file: &JournalFile<M>,
119 realtime: u64,
120 ) -> Result<offset_array::Cursor> {
121 let entry_list = Self::entry_list(journal_file)?;
122 let predicate = |entry_offset| {
123 let entry_object = journal_file.entry_ref(entry_offset)?;
124 Ok(entry_object.header.realtime < realtime)
125 };
126
127 entry_list
128 .directed_partition_point(journal_file, predicate, Direction::Forward)?
129 .map(Ok)
130 .unwrap_or_else(|| entry_list.cursor_tail(journal_file))
131 }
132
133 fn resolve_realtime_array<M: MemoryMap>(
134 &mut self,
135 journal_file: &JournalFile<M>,
136 realtime: u64,
137 ) -> Result<Option<Location>> {
138 let cursor = Self::realtime_array_cursor(journal_file, realtime)?;
139 self.store_array_cursor_value(journal_file, cursor)
140 }
141
142 fn step_array_cursor<M: MemoryMap>(
143 &mut self,
144 journal_file: &JournalFile<M>,
145 direction: Direction,
146 ) -> Result<Option<Location>> {
147 let cursor = self
148 .array_cursor
149 .as_ref()
150 .ok_or(JournalError::UnsetCursor)?;
151 let cursor = match direction {
152 Direction::Forward => cursor.next(journal_file)?,
153 Direction::Backward => cursor.previous(journal_file)?,
154 };
155 let Some(cursor) = cursor else {
156 return Ok(None);
157 };
158 self.store_array_cursor_value(journal_file, cursor)
159 }
160
161 fn resolve_array_cursor<M: MemoryMap>(
162 &mut self,
163 journal_file: &JournalFile<M>,
164 direction: Direction,
165 ) -> Result<Option<Location>> {
166 match (self.location, direction) {
167 (Location::Head, Direction::Forward) => self.resolve_head_array(journal_file),
168 (Location::Head, Direction::Backward) => Ok(None),
169 (Location::Tail, Direction::Forward) => Ok(None),
170 (Location::Tail, Direction::Backward) => self.resolve_tail_array(journal_file),
171 (Location::Realtime(realtime), _) => {
172 self.resolve_realtime_array(journal_file, realtime)
173 }
174 (Location::ResolvedEntry(_), direction) => {
175 self.step_array_cursor(journal_file, direction)
176 }
177 _ => Err(JournalError::InvalidQueryConfiguration),
178 }
179 }
180
181 fn filter_expr_mut(&mut self) -> Result<&mut FilterExpr> {
182 self.filter_expr
183 .as_mut()
184 .ok_or(JournalError::InvalidQueryConfiguration)
185 }
186
187 fn filter_head<M: MemoryMap>(
188 filter_expr: &mut FilterExpr,
189 journal_file: &JournalFile<M>,
190 ) -> Result<Option<Location>> {
191 Ok(filter_expr
192 .head()
193 .next(journal_file, NonZeroU64::MIN)?
194 .map(Location::ResolvedEntry))
195 }
196
197 fn filter_tail<M: MemoryMap>(
198 filter_expr: &mut FilterExpr,
199 journal_file: &JournalFile<M>,
200 ) -> Result<Option<Location>> {
201 Ok(filter_expr
202 .tail(journal_file)?
203 .previous(journal_file, NonZeroU64::MAX)?
204 .map(Location::ResolvedEntry))
205 }
206
207 fn resolve_filter_at_offset<M: MemoryMap>(
208 &mut self,
209 journal_file: &JournalFile<M>,
210 direction: Direction,
211 entry_offset: NonZeroU64,
212 ) -> Result<Option<Location>> {
213 let filter_expr = self.filter_expr_mut()?;
214 match direction {
215 Direction::Forward => Ok(filter_expr
216 .head()
217 .next(journal_file, entry_offset)?
218 .map(Location::ResolvedEntry)),
219 Direction::Backward => Ok(filter_expr
220 .tail(journal_file)?
221 .previous(journal_file, entry_offset)?
222 .map(Location::ResolvedEntry)),
223 }
224 }
225
226 fn resolve_filter_realtime<M: MemoryMap>(
227 &mut self,
228 journal_file: &JournalFile<M>,
229 realtime: u64,
230 direction: Direction,
231 ) -> Result<Option<Location>> {
232 let cursor = Self::realtime_array_cursor(journal_file, realtime)?;
233 let Some(entry_offset) = cursor.value(journal_file)? else {
234 return Ok(None);
235 };
236 self.resolve_filter_at_offset(journal_file, direction, entry_offset)
237 }
238
239 fn filter_after_resolved<M: MemoryMap>(
240 &mut self,
241 journal_file: &JournalFile<M>,
242 location_offset: NonZeroU64,
243 ) -> Result<Option<Location>> {
244 Ok(self
245 .filter_expr_mut()?
246 .next(journal_file, location_offset.saturating_add(1))?
247 .map(Location::ResolvedEntry))
248 }
249
250 fn filter_before_resolved<M: MemoryMap>(
251 &mut self,
252 journal_file: &JournalFile<M>,
253 location_offset: NonZeroU64,
254 ) -> Result<Option<Location>> {
255 let Some(needle_offset) = NonZeroU64::new(location_offset.get() - 1) else {
256 return Ok(None);
257 };
258 Ok(self
259 .filter_expr_mut()?
260 .previous(journal_file, needle_offset)?
261 .map(Location::ResolvedEntry))
262 }
263
264 fn resolve_filter_location<M: MemoryMap>(
265 &mut self,
266 journal_file: &JournalFile<M>,
267 direction: Direction,
268 ) -> Result<Option<Location>> {
269 match (self.location, direction) {
270 (Location::Head, Direction::Forward) => {
271 Self::filter_head(self.filter_expr_mut()?, journal_file)
272 }
273 (Location::Head, Direction::Backward) => Ok(None),
274 (Location::Tail, Direction::Forward) => Ok(None),
275 (Location::Tail, Direction::Backward) => {
276 Self::filter_tail(self.filter_expr_mut()?, journal_file)
277 }
278 (Location::Realtime(realtime), direction) => {
279 self.resolve_filter_realtime(journal_file, realtime, direction)
280 }
281 (Location::ResolvedEntry(offset), Direction::Forward) => {
282 self.filter_after_resolved(journal_file, offset)
283 }
284 (Location::ResolvedEntry(offset), Direction::Backward) => {
285 self.filter_before_resolved(journal_file, offset)
286 }
287 _ => Err(JournalError::InvalidQueryConfiguration),
288 }
289 }
290}