Skip to main content

iroh_sync/store/fs/
ranges.rs

1//! Ranges and helpers for working with [`redb`] tables
2
3use redb::{Key, Range, ReadableTable, Table, Value};
4
5use crate::{store::SortDirection, SignedEntry};
6
7use super::{
8    bounds::{ByKeyBounds, RecordsBounds},
9    into_entry,
10    tables::{RecordsByKeyId, RecordsId, RecordsValue},
11};
12
13/// An extension trait for [`Range`] that provides methods for mapped retrieval.
14pub trait RangeExt<K: Key, V: Value> {
15    /// Get the next entry and map the item with a callback function.
16    fn next_map<T>(
17        &mut self,
18        map: impl for<'x> Fn(K::SelfType<'x>, V::SelfType<'x>) -> T,
19    ) -> Option<anyhow::Result<T>>;
20
21    /// Get the next entry, but only if the callback function returns Some, otherwise continue.
22    ///
23    /// With `direction` the range can be either process in forward or backward direction.
24    fn next_filter_map<T>(
25        &mut self,
26        direction: &SortDirection,
27        filter_map: impl for<'x> Fn(K::SelfType<'x>, V::SelfType<'x>) -> Option<T>,
28    ) -> Option<anyhow::Result<T>>;
29
30    /// Like [`Self::next_filter_map`], but the callback returns a `Result`, and the result is
31    /// flattened with the result from the range operation.
32    fn next_try_filter_map<T>(
33        &mut self,
34        direction: &SortDirection,
35        filter_map: impl for<'x> Fn(K::SelfType<'x>, V::SelfType<'x>) -> Option<anyhow::Result<T>>,
36    ) -> Option<anyhow::Result<T>> {
37        Some(self.next_filter_map(direction, filter_map)?.and_then(|r| r))
38    }
39}
40
41impl<'a, K: Key + 'static, V: Value + 'static> RangeExt<K, V> for Range<'a, K, V> {
42    fn next_map<T>(
43        &mut self,
44        map: impl for<'x> Fn(K::SelfType<'x>, V::SelfType<'x>) -> T,
45    ) -> Option<anyhow::Result<T>> {
46        self.next()
47            .map(|r| r.map_err(Into::into).map(|r| map(r.0.value(), r.1.value())))
48    }
49
50    fn next_filter_map<T>(
51        &mut self,
52        direction: &SortDirection,
53        filter_map: impl for<'x> Fn(K::SelfType<'x>, V::SelfType<'x>) -> Option<T>,
54    ) -> Option<anyhow::Result<T>> {
55        loop {
56            let next = match direction {
57                SortDirection::Asc => self.next(),
58                SortDirection::Desc => self.next_back(),
59            };
60            match next {
61                None => break None,
62                Some(Err(err)) => break Some(Err(err.into())),
63                Some(Ok(res)) => match filter_map(res.0.value(), res.1.value()) {
64                    None => continue,
65                    Some(item) => break Some(Ok(item)),
66                },
67            }
68        }
69    }
70}
71
72/// An iterator over a range of entries from the records table.
73#[derive(derive_more::Debug)]
74#[debug("RecordsRange")]
75pub struct RecordsRange<'a>(Range<'a, RecordsId<'static>, RecordsValue<'static>>);
76
77impl<'a> RecordsRange<'a> {
78    pub(super) fn all(
79        records: &'a impl ReadableTable<RecordsId<'static>, RecordsValue<'static>>,
80    ) -> anyhow::Result<Self> {
81        let range = records.range::<RecordsId<'static>>(..)?;
82        Ok(Self(range))
83    }
84
85    pub(super) fn with_bounds(
86        records: &'a impl ReadableTable<RecordsId<'static>, RecordsValue<'static>>,
87        bounds: RecordsBounds,
88    ) -> anyhow::Result<Self> {
89        let range = records.range(bounds.as_ref())?;
90        Ok(Self(range))
91    }
92
93    /// Get the next item in the range.
94    ///
95    /// Omit items for which the `matcher` function returns false.
96    pub(super) fn next_filtered(
97        &mut self,
98        direction: &SortDirection,
99        filter: impl for<'x> Fn(RecordsId<'x>, RecordsValue<'x>) -> bool,
100    ) -> Option<anyhow::Result<SignedEntry>> {
101        self.0
102            .next_filter_map(direction, |k, v| filter(k, v).then(|| into_entry(k, v)))
103    }
104}
105
106impl<'a> Iterator for RecordsRange<'a> {
107    type Item = anyhow::Result<SignedEntry>;
108    fn next(&mut self) -> Option<Self::Item> {
109        self.0.next_map(into_entry)
110    }
111}
112
113#[derive(derive_more::Debug)]
114#[debug("RecordsByKeyRange")]
115pub struct RecordsByKeyRange<'a> {
116    records_table: &'a Table<'a, RecordsId<'static>, RecordsValue<'static>>,
117    by_key_range: Range<'a, RecordsByKeyId<'static>, ()>,
118}
119
120impl<'a> RecordsByKeyRange<'a> {
121    pub fn with_bounds(
122        records_by_key_table: &'a impl ReadableTable<RecordsByKeyId<'static>, ()>,
123        records_table: &'a Table<'a, RecordsId<'static>, RecordsValue<'static>>,
124        bounds: ByKeyBounds,
125    ) -> anyhow::Result<Self> {
126        let by_key_range = records_by_key_table.range(bounds.as_ref())?;
127        Ok(Self {
128            records_table,
129            by_key_range,
130        })
131    }
132
133    /// Get the next item in the range.
134    ///
135    /// Omit items for which the `filter` function returns false.
136    pub fn next_filtered(
137        &mut self,
138        direction: &SortDirection,
139        filter: impl for<'x> Fn(RecordsByKeyId<'x>) -> bool,
140    ) -> Option<anyhow::Result<SignedEntry>> {
141        let entry = self.by_key_range.next_try_filter_map(direction, |k, _v| {
142            if !filter(k) {
143                return None;
144            };
145            let (namespace, key, author) = k;
146            let records_id = (namespace, author, key);
147            let entry = self.records_table.get(&records_id).transpose()?;
148            let entry = entry
149                .map(|value| into_entry(records_id, value.value()))
150                .map_err(anyhow::Error::from);
151            Some(entry)
152        });
153        entry
154    }
155}