iroh_sync/store/fs/
ranges.rs1use redb::{Key, Range, ReadableTable, Table, Value};
4
5use crate::{store::SortDirection, SignedEntry};
6
7use super::{
8 bounds::{ByKeyBounds, RecordsBounds},
9 into_entry,
10 tables::{RecordsByKeyId, RecordsId, RecordsValue},
11};
12
13pub trait RangeExt<K: Key, V: Value> {
15 fn next_map<T>(
17 &mut self,
18 map: impl for<'x> Fn(K::SelfType<'x>, V::SelfType<'x>) -> T,
19 ) -> Option<anyhow::Result<T>>;
20
21 fn next_filter_map<T>(
25 &mut self,
26 direction: &SortDirection,
27 filter_map: impl for<'x> Fn(K::SelfType<'x>, V::SelfType<'x>) -> Option<T>,
28 ) -> Option<anyhow::Result<T>>;
29
30 fn next_try_filter_map<T>(
33 &mut self,
34 direction: &SortDirection,
35 filter_map: impl for<'x> Fn(K::SelfType<'x>, V::SelfType<'x>) -> Option<anyhow::Result<T>>,
36 ) -> Option<anyhow::Result<T>> {
37 Some(self.next_filter_map(direction, filter_map)?.and_then(|r| r))
38 }
39}
40
41impl<'a, K: Key + 'static, V: Value + 'static> RangeExt<K, V> for Range<'a, K, V> {
42 fn next_map<T>(
43 &mut self,
44 map: impl for<'x> Fn(K::SelfType<'x>, V::SelfType<'x>) -> T,
45 ) -> Option<anyhow::Result<T>> {
46 self.next()
47 .map(|r| r.map_err(Into::into).map(|r| map(r.0.value(), r.1.value())))
48 }
49
50 fn next_filter_map<T>(
51 &mut self,
52 direction: &SortDirection,
53 filter_map: impl for<'x> Fn(K::SelfType<'x>, V::SelfType<'x>) -> Option<T>,
54 ) -> Option<anyhow::Result<T>> {
55 loop {
56 let next = match direction {
57 SortDirection::Asc => self.next(),
58 SortDirection::Desc => self.next_back(),
59 };
60 match next {
61 None => break None,
62 Some(Err(err)) => break Some(Err(err.into())),
63 Some(Ok(res)) => match filter_map(res.0.value(), res.1.value()) {
64 None => continue,
65 Some(item) => break Some(Ok(item)),
66 },
67 }
68 }
69 }
70}
71
72#[derive(derive_more::Debug)]
74#[debug("RecordsRange")]
75pub struct RecordsRange<'a>(Range<'a, RecordsId<'static>, RecordsValue<'static>>);
76
77impl<'a> RecordsRange<'a> {
78 pub(super) fn all(
79 records: &'a impl ReadableTable<RecordsId<'static>, RecordsValue<'static>>,
80 ) -> anyhow::Result<Self> {
81 let range = records.range::<RecordsId<'static>>(..)?;
82 Ok(Self(range))
83 }
84
85 pub(super) fn with_bounds(
86 records: &'a impl ReadableTable<RecordsId<'static>, RecordsValue<'static>>,
87 bounds: RecordsBounds,
88 ) -> anyhow::Result<Self> {
89 let range = records.range(bounds.as_ref())?;
90 Ok(Self(range))
91 }
92
93 pub(super) fn next_filtered(
97 &mut self,
98 direction: &SortDirection,
99 filter: impl for<'x> Fn(RecordsId<'x>, RecordsValue<'x>) -> bool,
100 ) -> Option<anyhow::Result<SignedEntry>> {
101 self.0
102 .next_filter_map(direction, |k, v| filter(k, v).then(|| into_entry(k, v)))
103 }
104}
105
106impl<'a> Iterator for RecordsRange<'a> {
107 type Item = anyhow::Result<SignedEntry>;
108 fn next(&mut self) -> Option<Self::Item> {
109 self.0.next_map(into_entry)
110 }
111}
112
113#[derive(derive_more::Debug)]
114#[debug("RecordsByKeyRange")]
115pub struct RecordsByKeyRange<'a> {
116 records_table: &'a Table<'a, RecordsId<'static>, RecordsValue<'static>>,
117 by_key_range: Range<'a, RecordsByKeyId<'static>, ()>,
118}
119
120impl<'a> RecordsByKeyRange<'a> {
121 pub fn with_bounds(
122 records_by_key_table: &'a impl ReadableTable<RecordsByKeyId<'static>, ()>,
123 records_table: &'a Table<'a, RecordsId<'static>, RecordsValue<'static>>,
124 bounds: ByKeyBounds,
125 ) -> anyhow::Result<Self> {
126 let by_key_range = records_by_key_table.range(bounds.as_ref())?;
127 Ok(Self {
128 records_table,
129 by_key_range,
130 })
131 }
132
133 pub fn next_filtered(
137 &mut self,
138 direction: &SortDirection,
139 filter: impl for<'x> Fn(RecordsByKeyId<'x>) -> bool,
140 ) -> Option<anyhow::Result<SignedEntry>> {
141 let entry = self.by_key_range.next_try_filter_map(direction, |k, _v| {
142 if !filter(k) {
143 return None;
144 };
145 let (namespace, key, author) = k;
146 let records_id = (namespace, author, key);
147 let entry = self.records_table.get(&records_id).transpose()?;
148 let entry = entry
149 .map(|value| into_entry(records_id, value.value()))
150 .map_err(anyhow::Error::from);
151 Some(entry)
152 });
153 entry
154 }
155}