Skip to main content

journal_core/file/
cursor.rs

1use super::mmap::MemoryMap;
2use crate::error::{JournalError, Result};
3use crate::file::{file::JournalFile, filter::FilterExpr, offset_array, offset_array::Direction};
4use std::num::NonZeroU64;
5
6#[derive(Debug, Copy, Clone, PartialEq, Eq)]
7pub enum Location {
8    Head,
9    Tail,
10    Realtime(u64),
11    Monotonic(u64, [u8; 16]),
12    Seqnum(u64, Option<[u8; 16]>),
13    XorHash(u64),
14    ResolvedEntry(NonZeroU64),
15}
16
17impl Default for Location {
18    fn default() -> Self {
19        Self::Head
20    }
21}
22
23#[derive(Debug)]
24pub struct JournalCursor {
25    pub location: Location,
26    pub filter_expr: Option<FilterExpr>,
27    pub array_cursor: Option<offset_array::Cursor>,
28}
29
30impl JournalCursor {
31    #[allow(clippy::new_without_default)]
32    pub fn new() -> Self {
33        Self {
34            location: Location::Head,
35            filter_expr: None,
36            array_cursor: None,
37        }
38    }
39
40    pub fn set_location(&mut self, location: Location) {
41        self.location = location;
42        self.array_cursor = None;
43    }
44
45    pub fn set_filter(&mut self, filter_expr: FilterExpr) {
46        self.filter_expr = Some(filter_expr);
47        // FIXME: should we set cursor to None?
48    }
49
50    pub fn clear_filter(&mut self) {
51        self.filter_expr = None;
52        self.array_cursor = None;
53        self.set_location(Location::Head);
54    }
55
56    pub fn step<M: MemoryMap>(
57        &mut self,
58        journal_file: &JournalFile<M>,
59        direction: Direction,
60    ) -> Result<bool> {
61        let new_location = if self.filter_expr.is_some() {
62            self.resolve_filter_location(journal_file, direction)?
63        } else {
64            self.resolve_array_cursor(journal_file, direction)?
65        };
66
67        if let Some(location) = new_location {
68            self.location = location;
69            Ok(true)
70        } else {
71            Ok(false)
72        }
73    }
74
75    pub fn position(&self) -> Result<NonZeroU64> {
76        match self.location {
77            Location::ResolvedEntry(entry_offset) => Ok(entry_offset),
78            _ => Err(JournalError::UnsetCursor),
79        }
80    }
81
82    fn entry_list<M: MemoryMap>(journal_file: &JournalFile<M>) -> Result<offset_array::List> {
83        journal_file
84            .entry_list()
85            .ok_or(JournalError::InvalidOffsetArrayOffset)
86    }
87
88    fn store_array_cursor_value<M: MemoryMap>(
89        &mut self,
90        journal_file: &JournalFile<M>,
91        cursor: offset_array::Cursor,
92    ) -> Result<Option<Location>> {
93        let Some((cursor, offset)) = cursor.materialize_value(journal_file)? else {
94            return Ok(None);
95        };
96        self.array_cursor = Some(cursor);
97        Ok(Some(Location::ResolvedEntry(offset)))
98    }
99
100    fn resolve_head_array<M: MemoryMap>(
101        &mut self,
102        journal_file: &JournalFile<M>,
103    ) -> Result<Option<Location>> {
104        let cursor = Self::entry_list(journal_file)?.cursor_head();
105        self.store_array_cursor_value(journal_file, cursor)
106    }
107
108    fn resolve_tail_array<M: MemoryMap>(
109        &mut self,
110        journal_file: &JournalFile<M>,
111    ) -> Result<Option<Location>> {
112        let entry_list = Self::entry_list(journal_file)?;
113        let cursor = entry_list.cursor_tail(journal_file)?;
114        self.store_array_cursor_value(journal_file, cursor)
115    }
116
117    fn realtime_array_cursor<M: MemoryMap>(
118        journal_file: &JournalFile<M>,
119        realtime: u64,
120    ) -> Result<offset_array::Cursor> {
121        let entry_list = Self::entry_list(journal_file)?;
122        let predicate = |entry_offset| {
123            let entry_object = journal_file.entry_ref(entry_offset)?;
124            Ok(entry_object.header.realtime < realtime)
125        };
126
127        entry_list
128            .directed_partition_point(journal_file, predicate, Direction::Forward)?
129            .map(Ok)
130            .unwrap_or_else(|| entry_list.cursor_tail(journal_file))
131    }
132
133    fn resolve_realtime_array<M: MemoryMap>(
134        &mut self,
135        journal_file: &JournalFile<M>,
136        realtime: u64,
137    ) -> Result<Option<Location>> {
138        let cursor = Self::realtime_array_cursor(journal_file, realtime)?;
139        self.store_array_cursor_value(journal_file, cursor)
140    }
141
142    fn step_array_cursor<M: MemoryMap>(
143        &mut self,
144        journal_file: &JournalFile<M>,
145        direction: Direction,
146    ) -> Result<Option<Location>> {
147        let cursor = self
148            .array_cursor
149            .as_ref()
150            .ok_or(JournalError::UnsetCursor)?;
151        let cursor = match direction {
152            Direction::Forward => cursor.next(journal_file)?,
153            Direction::Backward => cursor.previous(journal_file)?,
154        };
155        let Some(cursor) = cursor else {
156            return Ok(None);
157        };
158        self.store_array_cursor_value(journal_file, cursor)
159    }
160
161    fn resolve_array_cursor<M: MemoryMap>(
162        &mut self,
163        journal_file: &JournalFile<M>,
164        direction: Direction,
165    ) -> Result<Option<Location>> {
166        match (self.location, direction) {
167            (Location::Head, Direction::Forward) => self.resolve_head_array(journal_file),
168            (Location::Head, Direction::Backward) => Ok(None),
169            (Location::Tail, Direction::Forward) => Ok(None),
170            (Location::Tail, Direction::Backward) => self.resolve_tail_array(journal_file),
171            (Location::Realtime(realtime), _) => {
172                self.resolve_realtime_array(journal_file, realtime)
173            }
174            (Location::ResolvedEntry(_), direction) => {
175                self.step_array_cursor(journal_file, direction)
176            }
177            _ => Err(JournalError::InvalidQueryConfiguration),
178        }
179    }
180
181    fn filter_expr_mut(&mut self) -> Result<&mut FilterExpr> {
182        self.filter_expr
183            .as_mut()
184            .ok_or(JournalError::InvalidQueryConfiguration)
185    }
186
187    fn filter_head<M: MemoryMap>(
188        filter_expr: &mut FilterExpr,
189        journal_file: &JournalFile<M>,
190    ) -> Result<Option<Location>> {
191        Ok(filter_expr
192            .head()
193            .next(journal_file, NonZeroU64::MIN)?
194            .map(Location::ResolvedEntry))
195    }
196
197    fn filter_tail<M: MemoryMap>(
198        filter_expr: &mut FilterExpr,
199        journal_file: &JournalFile<M>,
200    ) -> Result<Option<Location>> {
201        Ok(filter_expr
202            .tail(journal_file)?
203            .previous(journal_file, NonZeroU64::MAX)?
204            .map(Location::ResolvedEntry))
205    }
206
207    fn resolve_filter_at_offset<M: MemoryMap>(
208        &mut self,
209        journal_file: &JournalFile<M>,
210        direction: Direction,
211        entry_offset: NonZeroU64,
212    ) -> Result<Option<Location>> {
213        let filter_expr = self.filter_expr_mut()?;
214        match direction {
215            Direction::Forward => Ok(filter_expr
216                .head()
217                .next(journal_file, entry_offset)?
218                .map(Location::ResolvedEntry)),
219            Direction::Backward => Ok(filter_expr
220                .tail(journal_file)?
221                .previous(journal_file, entry_offset)?
222                .map(Location::ResolvedEntry)),
223        }
224    }
225
226    fn resolve_filter_realtime<M: MemoryMap>(
227        &mut self,
228        journal_file: &JournalFile<M>,
229        realtime: u64,
230        direction: Direction,
231    ) -> Result<Option<Location>> {
232        let cursor = Self::realtime_array_cursor(journal_file, realtime)?;
233        let Some(entry_offset) = cursor.value(journal_file)? else {
234            return Ok(None);
235        };
236        self.resolve_filter_at_offset(journal_file, direction, entry_offset)
237    }
238
239    fn filter_after_resolved<M: MemoryMap>(
240        &mut self,
241        journal_file: &JournalFile<M>,
242        location_offset: NonZeroU64,
243    ) -> Result<Option<Location>> {
244        Ok(self
245            .filter_expr_mut()?
246            .next(journal_file, location_offset.saturating_add(1))?
247            .map(Location::ResolvedEntry))
248    }
249
250    fn filter_before_resolved<M: MemoryMap>(
251        &mut self,
252        journal_file: &JournalFile<M>,
253        location_offset: NonZeroU64,
254    ) -> Result<Option<Location>> {
255        let Some(needle_offset) = NonZeroU64::new(location_offset.get() - 1) else {
256            return Ok(None);
257        };
258        Ok(self
259            .filter_expr_mut()?
260            .previous(journal_file, needle_offset)?
261            .map(Location::ResolvedEntry))
262    }
263
264    fn resolve_filter_location<M: MemoryMap>(
265        &mut self,
266        journal_file: &JournalFile<M>,
267        direction: Direction,
268    ) -> Result<Option<Location>> {
269        match (self.location, direction) {
270            (Location::Head, Direction::Forward) => {
271                Self::filter_head(self.filter_expr_mut()?, journal_file)
272            }
273            (Location::Head, Direction::Backward) => Ok(None),
274            (Location::Tail, Direction::Forward) => Ok(None),
275            (Location::Tail, Direction::Backward) => {
276                Self::filter_tail(self.filter_expr_mut()?, journal_file)
277            }
278            (Location::Realtime(realtime), direction) => {
279                self.resolve_filter_realtime(journal_file, realtime, direction)
280            }
281            (Location::ResolvedEntry(offset), Direction::Forward) => {
282                self.filter_after_resolved(journal_file, offset)
283            }
284            (Location::ResolvedEntry(offset), Direction::Backward) => {
285                self.filter_before_resolved(journal_file, offset)
286            }
287            _ => Err(JournalError::InvalidQueryConfiguration),
288        }
289    }
290}