Skip to main content

journal_core/file/
offset_array.rs

1use super::mmap::MemoryMap;
2use crate::error::{JournalError, Result};
3use crate::file::JournalFile;
4use std::num::{NonZeroU64, NonZeroUsize};
5use std::sync::Arc;
6
7#[derive(Debug, Clone, Copy, PartialEq, Eq)]
8pub enum Direction {
9    Forward,
10    Backward,
11}
12
13/// A reference to a single array of offsets in the journal file
14#[derive(Clone, Copy)]
15#[cfg_attr(feature = "allocative", derive(allocative::Allocative))]
16pub struct Node {
17    offset: NonZeroU64,
18    next_offset: Option<NonZeroU64>,
19    capacity: NonZeroUsize,
20    // Number of items remaining in this array and subsequent arrays
21    remaining_items: NonZeroUsize,
22}
23
24impl Node {
25    /// Create a new offset array reference
26    fn new<M: MemoryMap>(
27        journal_file: &JournalFile<M>,
28        offset: NonZeroU64,
29        remaining_items: NonZeroUsize,
30    ) -> Result<Self> {
31        let array = journal_file.offset_array_ref(offset)?;
32        let capacity =
33            NonZeroUsize::new(array.capacity()).ok_or(JournalError::EmptyOffsetArrayNode)?;
34
35        Ok(Self {
36            offset,
37            next_offset: array.header.next_offset_array,
38            capacity,
39            remaining_items,
40        })
41    }
42
43    /// Get the offset of this array in the file
44    pub fn offset(&self) -> NonZeroU64 {
45        self.offset
46    }
47
48    /// Get the maximum number of items this array can hold
49    pub fn capacity(&self) -> NonZeroUsize {
50        self.capacity
51    }
52
53    /// Get the number of items available in this array
54    pub fn len(&self) -> NonZeroUsize {
55        self.capacity.min(self.remaining_items)
56    }
57
58    /// Check if this array has a next array in the chain
59    pub fn has_next(&self) -> bool {
60        self.next_offset.is_some() && self.remaining_items > self.len()
61    }
62
63    /// Get the next array in the chain, if any
64    pub fn next<M: MemoryMap>(&self, journal_file: &JournalFile<M>) -> Result<Option<Self>> {
65        if !self.has_next() {
66            return Ok(None);
67        }
68
69        let next_offset = self.next_offset.unwrap();
70        let remaining_items = {
71            let n = self.remaining_items.get().saturating_sub(self.len().get());
72            NonZeroUsize::new(n).ok_or(JournalError::EmptyOffsetArrayNode)?
73        };
74        let node = Self::new(journal_file, next_offset, remaining_items);
75
76        Some(node).transpose()
77    }
78
79    /// Get an item at the specified index
80    pub fn get<M: MemoryMap>(
81        &self,
82        journal_file: &JournalFile<M>,
83        index: usize,
84    ) -> Result<Option<NonZeroU64>> {
85        if index >= self.len().get() {
86            return Err(JournalError::InvalidOffsetArrayIndex);
87        }
88
89        let array = journal_file.offset_array_ref(self.offset)?;
90        array.get(index, self.remaining_items.get())
91    }
92
93    /// Returns the first index where the predicate returns false, or array length if
94    /// the predicate is true for all elements
95    pub fn partition_point<M, F>(
96        &self,
97        journal_file: &JournalFile<M>,
98        left: usize,
99        right: usize,
100        predicate: F,
101    ) -> Result<usize>
102    where
103        M: MemoryMap,
104        F: Fn(NonZeroU64) -> Result<bool>,
105    {
106        let mut left = left;
107        let mut right = right;
108
109        debug_assert!(left <= right);
110        debug_assert!(right <= self.len().get());
111
112        while left != right {
113            let mid = left.midpoint(right);
114            let Some(offset) = self.get(journal_file, mid)? else {
115                return Err(JournalError::InvalidOffset);
116            };
117
118            if predicate(offset)? {
119                left = mid + 1;
120            } else {
121                right = mid;
122            }
123        }
124
125        Ok(left)
126    }
127
128    /// Find the forward or backward (depending on direction) position that matches the predicate.
129    pub fn directed_partition_point<M, F>(
130        &self,
131        journal_file: &JournalFile<M>,
132        left: usize,
133        right: usize,
134        predicate: F,
135        direction: Direction,
136    ) -> Result<Option<usize>>
137    where
138        M: MemoryMap,
139        F: Fn(NonZeroU64) -> Result<bool>,
140    {
141        let index = self.partition_point(journal_file, left, right, predicate)?;
142
143        Ok(match direction {
144            Direction::Forward => {
145                if index < self.len().get() {
146                    Some(index)
147                } else {
148                    None
149                }
150            }
151            Direction::Backward => {
152                if index > 0 {
153                    Some(index - 1)
154                } else {
155                    None
156                }
157            }
158        })
159    }
160}
161
162impl std::fmt::Debug for Node {
163    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
164        let next_offset = self.next_offset.map(|x| x.get()).unwrap_or(0);
165
166        f.debug_struct("Node")
167            .field("offset", &format!("0x{:x}", self.offset))
168            .field("next_offset", &format!("0x{:x}", next_offset))
169            .field("capacity", &self.capacity)
170            .field("len", &self.len())
171            .field("remaining_items", &self.remaining_items)
172            .finish()
173    }
174}
175
176/// A linked list of offset arrays
177#[derive(Copy, Clone)]
178#[cfg_attr(feature = "allocative", derive(allocative::Allocative))]
179pub struct List {
180    head_offset: NonZeroU64,
181    total_items: NonZeroUsize,
182}
183
184impl std::fmt::Debug for List {
185    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
186        f.debug_struct("List")
187            .field("head_offset", &format!("0x{:x}", self.head_offset))
188            .field("total_items", &self.total_items)
189            .finish()
190    }
191}
192
193impl List {
194    /// Create a new list from head offset and total items
195    pub fn new(head_offset: NonZeroU64, total_items: NonZeroUsize) -> Self {
196        Self {
197            head_offset,
198            total_items,
199        }
200    }
201
202    /// Get the head array of this chain
203    pub fn head<M: MemoryMap>(&self, journal_file: &JournalFile<M>) -> Result<Node> {
204        Node::new(journal_file, self.head_offset, self.total_items)
205    }
206
207    /// Get the tail array of this list by traversing from head to tail
208    pub fn tail<M: MemoryMap>(&self, journal_file: &JournalFile<M>) -> Result<Node> {
209        let mut current = self.head(journal_file)?;
210
211        while let Some(next) = current.next(journal_file)? {
212            current = next;
213        }
214
215        Ok(current)
216    }
217
218    fn node_chain<M: MemoryMap>(&self, journal_file: &JournalFile<M>) -> Result<Arc<[Node]>> {
219        let mut nodes = Vec::new();
220        let mut current = self.head(journal_file)?;
221
222        loop {
223            nodes.push(current);
224            let Some(next) = current.next(journal_file)? else {
225                break;
226            };
227            current = next;
228        }
229
230        Ok(Arc::from(nodes))
231    }
232
233    /// Get a cursor at the first position in the chain
234    pub fn cursor_head(self) -> Cursor {
235        Cursor::at_head(self)
236    }
237
238    /// Get a cursor at the last position in the chain
239    pub fn cursor_tail<M: MemoryMap>(self, journal_file: &JournalFile<M>) -> Result<Cursor> {
240        Cursor::at_tail(journal_file, self)
241    }
242
243    /// Finds the first/last array item position where the predicate function becomes false
244    /// in a chain of offset arrays.
245    ///
246    /// # Parameters
247    /// * `predicate` - Function that takes an array item value and returns true if the search should continue.
248    /// * `direction` - Direction of the search (Forward or Backward)
249    fn cursor_at_node_index(self, node: &Node, index: usize) -> Cursor {
250        Cursor::at_cached_position(self, *node, index, None, None)
251    }
252
253    fn node_partition_cursor<M, F>(
254        self,
255        journal_file: &JournalFile<M>,
256        node: &Node,
257        predicate: &F,
258        direction: Direction,
259    ) -> Result<Option<(Cursor, usize)>>
260    where
261        M: MemoryMap,
262        F: Fn(NonZeroU64) -> Result<bool>,
263    {
264        let left = 0;
265        let right = node.len().get();
266        let Some(index) =
267            node.directed_partition_point(journal_file, left, right, predicate, direction)?
268        else {
269            return Ok(None);
270        };
271
272        let cursor = self.cursor_at_node_index(node, index);
273        Ok(Some((cursor, index)))
274    }
275
276    fn backward_search_must_continue(node: &Node, index: usize) -> bool {
277        index == node.len().get() - 1 && node.has_next()
278    }
279
280    pub fn directed_partition_point<M, F>(
281        self,
282        journal_file: &JournalFile<M>,
283        predicate: F,
284        direction: Direction,
285    ) -> Result<Option<Cursor>>
286    where
287        M: MemoryMap,
288        F: Fn(NonZeroU64) -> Result<bool>,
289    {
290        let mut last_cursor: Option<Cursor> = None;
291        let mut node = self.head(journal_file)?;
292
293        loop {
294            if let Some((cursor, index)) =
295                self.node_partition_cursor(journal_file, &node, &predicate, direction)?
296            {
297                match direction {
298                    Direction::Forward => {
299                        return Ok(Some(cursor));
300                    }
301                    Direction::Backward => {
302                        last_cursor = Some(cursor);
303                        if !Self::backward_search_must_continue(&node, index) {
304                            return Ok(last_cursor);
305                        }
306                    }
307                }
308            } else if direction == Direction::Backward {
309                return Ok(last_cursor);
310            }
311
312            if let Some(nd) = node.next(journal_file)? {
313                node = nd;
314            } else {
315                break;
316            }
317        }
318
319        if direction == Direction::Backward {
320            return Ok(last_cursor);
321        }
322
323        Ok(None)
324    }
325
326    /// Collect all offsets in the entire list into the given vector
327    pub fn collect_offsets<M: MemoryMap>(
328        &self,
329        journal_file: &JournalFile<M>,
330        offsets: &mut Vec<NonZeroU64>,
331    ) -> Result<()> {
332        offsets.reserve(self.total_items.get());
333
334        let mut node = self.head(journal_file)?;
335
336        loop {
337            {
338                let array = journal_file.offset_array_ref(node.offset())?;
339                let remaining_items = node.remaining_items.get();
340                array.collect_offsets(0, remaining_items, offsets)?;
341            }
342
343            match node.next(journal_file)? {
344                Some(next) => node = next,
345                None => break,
346            }
347        }
348
349        Ok(())
350    }
351}
352
353/// A cursor pointing to a specific position within an offset array chain
354#[derive(Clone)]
355#[cfg_attr(feature = "allocative", derive(allocative::Allocative))]
356pub struct Cursor {
357    list: List,
358    array_offset: NonZeroU64,
359    array_index: usize,
360    remaining_items: NonZeroUsize,
361    node: Option<Node>,
362    node_index: Option<usize>,
363    node_chain: Option<Arc<[Node]>>,
364    cached_value: Option<NonZeroU64>,
365}
366
367impl Cursor {
368    pub fn head(&self) -> Self {
369        if let Some(chain) = &self.node_chain {
370            if let Some(node) = chain.first().copied() {
371                return Self {
372                    list: self.list,
373                    array_offset: node.offset,
374                    array_index: 0,
375                    remaining_items: node.remaining_items,
376                    node: Some(node),
377                    node_index: Some(0),
378                    node_chain: Some(Arc::clone(chain)),
379                    cached_value: None,
380                };
381            }
382        }
383        Self::at_head(self.list)
384    }
385
386    /// Create a cursor at the head of the chain
387    pub fn at_head(list: List) -> Self {
388        Self {
389            list,
390            array_offset: list.head_offset,
391            array_index: 0,
392            remaining_items: list.total_items,
393            node: None,
394            node_index: None,
395            node_chain: None,
396            cached_value: None,
397        }
398    }
399
400    /// Create a cursor at the tail of the chain
401    pub fn at_tail<M: MemoryMap>(journal_file: &JournalFile<M>, list: List) -> Result<Self> {
402        let chain = list.node_chain(journal_file)?;
403        let node_index = chain
404            .len()
405            .checked_sub(1)
406            .ok_or(JournalError::EmptyOffsetArrayList)?;
407        let current_array = chain[node_index];
408
409        Ok(Self {
410            list,
411            array_offset: current_array.offset,
412            array_index: current_array.len().get() - 1,
413            remaining_items: current_array.len(),
414            node: Some(current_array),
415            node_index: Some(node_index),
416            node_chain: Some(chain),
417            cached_value: None,
418        })
419    }
420
421    /// Create a cursor at a specific position
422    pub fn at_position<M: MemoryMap>(
423        journal_file: &JournalFile<M>,
424        offset_array_list: List,
425        array_offset: NonZeroU64,
426        array_index: usize,
427        remaining_items: NonZeroUsize,
428    ) -> Result<Self> {
429        debug_assert!(offset_array_list.total_items >= remaining_items);
430
431        // Verify the array exists
432        let array = Node::new(journal_file, array_offset, remaining_items)?;
433
434        // Verify the index is valid
435        if array_index >= array.len().get() {
436            return Err(JournalError::InvalidOffsetArrayIndex);
437        }
438
439        Ok(Self::at_cached_position(
440            offset_array_list,
441            array,
442            array_index,
443            None,
444            None,
445        ))
446    }
447
448    fn at_cached_position(
449        list: List,
450        node: Node,
451        array_index: usize,
452        node_index: Option<usize>,
453        node_chain: Option<Arc<[Node]>>,
454    ) -> Self {
455        Self {
456            list,
457            array_offset: node.offset,
458            array_index,
459            remaining_items: node.remaining_items,
460            node: Some(node),
461            node_index,
462            node_chain,
463            cached_value: None,
464        }
465    }
466
467    /// Get the current array this cursor points to
468    pub fn node<M: MemoryMap>(&self, journal_file: &JournalFile<M>) -> Result<Node> {
469        if let Some(node) = self.node {
470            return Ok(node);
471        }
472        Node::new(journal_file, self.array_offset, self.remaining_items)
473    }
474
475    pub fn value<M: MemoryMap>(&self, journal_file: &JournalFile<M>) -> Result<Option<NonZeroU64>> {
476        if let Some(value) = self.cached_value {
477            return Ok(Some(value));
478        }
479        self.node(journal_file)?.get(journal_file, self.array_index)
480    }
481
482    pub(crate) fn materialize_value<M: MemoryMap>(
483        mut self,
484        journal_file: &JournalFile<M>,
485    ) -> Result<Option<(Self, NonZeroU64)>> {
486        if let Some(value) = self.cached_value {
487            return Ok(Some((self, value)));
488        }
489
490        let node = self.node(journal_file)?;
491        let Some(value) = node.get(journal_file, self.array_index)? else {
492            return Ok(None);
493        };
494        self.node = Some(node);
495        self.cached_value = Some(value);
496        Ok(Some((self, value)))
497    }
498
499    fn node_chain_position<M: MemoryMap>(
500        &self,
501        journal_file: &JournalFile<M>,
502    ) -> Result<(Arc<[Node]>, usize)> {
503        if let (Some(chain), Some(index)) = (&self.node_chain, self.node_index) {
504            return Ok((Arc::clone(chain), index));
505        }
506
507        let chain = self.list.node_chain(journal_file)?;
508        let Some(index) = chain
509            .iter()
510            .position(|node| node.offset == self.array_offset)
511        else {
512            return Err(JournalError::InvalidOffsetArrayOffset);
513        };
514        Ok((chain, index))
515    }
516
517    /// Move to the next position
518    pub fn next<M: MemoryMap>(&self, journal_file: &JournalFile<M>) -> Result<Option<Self>> {
519        let array_node = self.node(journal_file)?;
520
521        // Same-node movement keeps the cached node metadata and avoids
522        // rereading the offset-array object until the value itself is needed.
523        if self.array_index + 1 < array_node.len().get() {
524            // Next item is in the same array
525            return Ok(Some(Self {
526                list: self.list,
527                array_offset: self.array_offset,
528                array_index: self.array_index + 1,
529                remaining_items: self.remaining_items,
530                node: Some(array_node),
531                node_index: self.node_index,
532                node_chain: self.node_chain.as_ref().map(Arc::clone),
533                cached_value: None,
534            }));
535        }
536
537        if !array_node.has_next() {
538            return Ok(None);
539        }
540
541        let (next_array, node_index, node_chain) =
542            if let (Some(chain), Some(index)) = (&self.node_chain, self.node_index) {
543                let next_index = index + 1;
544                let Some(next_array) = chain.get(next_index).copied() else {
545                    return Err(JournalError::InvalidOffsetArrayOffset);
546                };
547                (next_array, Some(next_index), Some(Arc::clone(chain)))
548            } else {
549                let next_array = array_node
550                    .next(journal_file)?
551                    .ok_or(JournalError::InvalidOffsetArrayOffset)?;
552                (next_array, None, None)
553            };
554
555        Ok(Some(Self {
556            list: self.list,
557            array_offset: next_array.offset,
558            array_index: 0,
559            remaining_items: next_array.remaining_items,
560            node: Some(next_array),
561            node_index,
562            node_chain,
563            cached_value: None,
564        }))
565    }
566
567    /// Move to the previous position
568    pub fn previous<M: MemoryMap>(&self, journal_file: &JournalFile<M>) -> Result<Option<Self>> {
569        if self.array_index > 0 {
570            // Previous item is in the same array
571            let array_node = self.node(journal_file)?;
572            return Ok(Some(Self {
573                list: self.list,
574                array_offset: self.array_offset,
575                array_index: self.array_index - 1,
576                remaining_items: self.remaining_items,
577                node: Some(array_node),
578                node_index: self.node_index,
579                node_chain: self.node_chain.as_ref().map(Arc::clone),
580                cached_value: None,
581            }));
582        }
583
584        if self.array_offset == self.list.head_offset {
585            return Ok(None);
586        }
587
588        let (chain, index) = self.node_chain_position(journal_file)?;
589        let previous_index = index
590            .checked_sub(1)
591            .ok_or(JournalError::InvalidOffsetArrayOffset)?;
592        let previous_node = chain[previous_index];
593
594        Ok(Some(Self {
595            list: self.list,
596            array_offset: previous_node.offset,
597            array_index: previous_node.len().get() - 1,
598            remaining_items: previous_node.remaining_items,
599            node: Some(previous_node),
600            node_index: Some(previous_index),
601            node_chain: Some(chain),
602            cached_value: None,
603        }))
604    }
605
606    pub fn collect_offsets<M: MemoryMap>(
607        &self,
608        journal_file: &JournalFile<M>,
609        offsets: &mut Vec<NonZeroU64>,
610    ) -> Result<()> {
611        let mut node = self.node(journal_file)?;
612
613        // Copy from position in the current array
614        {
615            let array = journal_file.offset_array_ref(node.offset())?;
616            let remaining_items = node.remaining_items.get();
617            array.collect_offsets(self.array_index, remaining_items, offsets)?;
618        }
619
620        // Copy from subsequent arrays
621        while let Some(next_node) = node.next(journal_file)? {
622            let array = journal_file.offset_array_ref(next_node.offset())?;
623            let remaining_items = next_node.remaining_items.get();
624            array.collect_offsets(0, remaining_items, offsets)?;
625            node = next_node;
626        }
627
628        Ok(())
629    }
630}
631
632impl std::fmt::Debug for Cursor {
633    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
634        f.debug_struct("Cursor")
635            .field("array_offset", &format!("0x{:x}", self.array_offset))
636            .field("array_index", &self.array_index)
637            .field("remaining_items", &self.remaining_items)
638            .finish()
639    }
640}
641
642#[derive(Debug, Clone)]
643#[cfg_attr(feature = "allocative", derive(allocative::Allocative))]
644pub struct InlinedCursor {
645    inlined_offset: NonZeroU64,
646    cursor: Option<Cursor>,
647    at_inlined_offset: bool,
648}
649
650impl InlinedCursor {
651    pub fn new(inlined_offset: NonZeroU64, cursor: Option<Cursor>) -> Self {
652        Self {
653            inlined_offset,
654            cursor,
655            at_inlined_offset: true,
656        }
657    }
658
659    pub fn head(&self) -> Self {
660        Self {
661            inlined_offset: self.inlined_offset,
662            cursor: self.cursor.as_ref().map(Cursor::head),
663            at_inlined_offset: true,
664        }
665    }
666
667    pub fn tail<M: MemoryMap>(&self, journal_file: &JournalFile<M>) -> Result<Self> {
668        // Start with a copy of the current cursor
669        let mut result = self.clone();
670
671        // If we have an entry array list cursor, move it to the tail
672        if let Some(cursor) = self.cursor.as_ref() {
673            result.cursor = Some(cursor.list.cursor_tail(journal_file)?);
674            result.at_inlined_offset = false;
675        }
676
677        Ok(result)
678    }
679
680    fn next<M: MemoryMap>(&self, journal_file: &JournalFile<M>) -> Result<Option<Self>> {
681        // Case 1: We're at the inlined entry, move to the first array entry
682        if self.at_inlined_offset {
683            if self.cursor.is_some() {
684                return Ok(Some(Self {
685                    inlined_offset: self.inlined_offset,
686                    cursor: self.cursor.clone(),
687                    at_inlined_offset: false,
688                }));
689            } else {
690                return Ok(None);
691            }
692        }
693
694        // Case 2: We're already in the entry array
695        if let Some(current_cursor) = self.cursor.as_ref() {
696            let next_cursor = current_cursor.next(journal_file)?;
697
698            if next_cursor.is_some() {
699                return Ok(Some(Self {
700                    inlined_offset: self.inlined_offset,
701                    cursor: next_cursor,
702                    at_inlined_offset: false,
703                }));
704            } else {
705                return Ok(None);
706            }
707        }
708
709        // No more entries
710        Ok(None)
711    }
712
713    fn previous<M: MemoryMap>(&self, journal_file: &JournalFile<M>) -> Result<Option<Self>> {
714        if self.at_inlined_offset {
715            return Ok(None);
716        }
717
718        if let Some(current_cursor) = self.cursor.as_ref() {
719            // Try to move to the previous position in the array
720            if let Some(prev_cursor) = current_cursor.previous(journal_file)? {
721                // We can move back within the array
722                let mut ic = self.clone();
723                ic.cursor = Some(prev_cursor);
724                return Ok(Some(ic));
725            } else {
726                // We're at the first array position, move to the inlined entry
727                let mut ic = self.clone();
728                ic.at_inlined_offset = true;
729                return Ok(Some(ic));
730            }
731        }
732
733        unreachable!();
734    }
735
736    pub fn value<M: MemoryMap>(&self, journal_file: &JournalFile<M>) -> Result<Option<NonZeroU64>> {
737        // Case 1: We're at the inlined entry
738        if self.at_inlined_offset {
739            return Ok(Some(self.inlined_offset));
740        }
741
742        // Case 2: We're in the entry array
743        if let Some(cursor) = self.cursor.as_ref() {
744            return cursor.value(journal_file);
745        }
746
747        unreachable!();
748    }
749
750    pub fn next_until<M: MemoryMap>(
751        &mut self,
752        journal_file: &JournalFile<M>,
753        offset: NonZeroU64,
754    ) -> Result<Option<NonZeroU64>> {
755        let Some(current_offset) = self.value(journal_file)? else {
756            return Ok(None);
757        };
758
759        if current_offset >= offset {
760            return Ok(Some(current_offset));
761        }
762
763        while let Some(ic) = self.next(journal_file)? {
764            *self = ic;
765
766            let Some(current_offset) = self.value(journal_file)? else {
767                break;
768            };
769
770            if current_offset >= offset {
771                return Ok(Some(current_offset));
772            }
773        }
774
775        Ok(None)
776    }
777
778    pub fn previous_until<M: MemoryMap>(
779        &mut self,
780        journal_file: &JournalFile<M>,
781        offset: NonZeroU64,
782    ) -> Result<Option<NonZeroU64>> {
783        let Some(current_offset) = self.value(journal_file)? else {
784            return Ok(None);
785        };
786
787        if current_offset <= offset {
788            return Ok(Some(current_offset));
789        }
790
791        while let Some(ic) = self.previous(journal_file)? {
792            *self = ic;
793
794            let Some(current_offset) = self.value(journal_file)? else {
795                break;
796            };
797
798            if current_offset <= offset {
799                return Ok(Some(current_offset));
800            }
801        }
802
803        Ok(None)
804    }
805
806    pub fn directed_partition_point<M, F>(
807        &self,
808        journal_file: &JournalFile<M>,
809        predicate: F,
810        direction: Direction,
811    ) -> Result<Option<Self>>
812    where
813        M: MemoryMap,
814        F: Fn(NonZeroU64) -> Result<bool>,
815    {
816        let inlined_match = self.inlined_partition_candidate(&predicate, direction)?;
817        let array_match = self.array_partition_candidate(journal_file, predicate, direction)?;
818
819        match (inlined_match, array_match) {
820            (Some(best), Some(array)) => {
821                Self::best_directed_match(journal_file, best, array, direction).map(Some)
822            }
823            (Some(best), None) => Ok(Some(best)),
824            (None, Some(array)) => Ok(Some(array)),
825            (None, None) => Ok(None),
826        }
827    }
828
829    fn inlined_partition_candidate<F>(
830        &self,
831        predicate: &F,
832        direction: Direction,
833    ) -> Result<Option<Self>>
834    where
835        F: Fn(NonZeroU64) -> Result<bool>,
836    {
837        let predicate_matches = predicate(self.inlined_offset)?;
838        match direction {
839            Direction::Forward if !predicate_matches => Ok(Some(self.head())),
840            Direction::Backward if predicate_matches => Ok(Some(self.head())),
841            _ => Ok(None),
842        }
843    }
844
845    fn array_partition_candidate<M, F>(
846        &self,
847        journal_file: &JournalFile<M>,
848        predicate: F,
849        direction: Direction,
850    ) -> Result<Option<Self>>
851    where
852        M: MemoryMap,
853        F: Fn(NonZeroU64) -> Result<bool>,
854    {
855        let Some(cursor) = self.cursor.as_ref() else {
856            return Ok(None);
857        };
858        let Some(cursor) =
859            cursor
860                .list
861                .directed_partition_point(journal_file, predicate, direction)?
862        else {
863            return Ok(None);
864        };
865
866        Ok(Some(Self {
867            inlined_offset: self.inlined_offset,
868            cursor: Some(cursor),
869            at_inlined_offset: false,
870        }))
871    }
872
873    fn best_directed_match<M: MemoryMap>(
874        journal_file: &JournalFile<M>,
875        best: Self,
876        candidate: Self,
877        direction: Direction,
878    ) -> Result<Self> {
879        let best_offset = best.value(journal_file)?;
880        let candidate_offset = candidate.value(journal_file)?;
881        let candidate_is_better = match direction {
882            Direction::Forward => candidate_offset < best_offset,
883            Direction::Backward => candidate_offset > best_offset,
884        };
885        if candidate_is_better {
886            Ok(candidate)
887        } else {
888            Ok(best)
889        }
890    }
891
892    pub fn collect_offsets<M: MemoryMap>(
893        &self,
894        journal_file: &JournalFile<M>,
895        offsets: &mut Vec<NonZeroU64>,
896    ) -> Result<()> {
897        // Handle the inline offset first if we're at it
898        if self.at_inlined_offset {
899            offsets.push(self.inlined_offset);
900
901            // If we have a cursor, collect all offsets from the beginning
902            if let Some(cursor) = self.cursor.as_ref() {
903                cursor.list.collect_offsets(journal_file, offsets)?;
904            }
905        } else if let Some(cursor) = self.cursor.as_ref() {
906            // We're somewhere in the array chain, collect from current position
907            cursor.collect_offsets(journal_file, offsets)?;
908        }
909
910        Ok(())
911    }
912}