redb_extras/table_buckets/
mod.rs1use crate::MergeableValue;
8use redb::{
9 Key, MultimapTableDefinition, ReadableTable, TableDefinition, TableHandle, Value,
10 WriteTransaction,
11};
12use std::borrow::Borrow;
13use std::collections::{HashMap, HashSet};
14use std::sync::{Arc, Mutex};
15
16pub mod iterator;
17
18pub use crate::key_buckets::BucketError;
19pub use iterator::{
20 TableBucketIterExt, TableBucketMultimapIterExt, TableBucketRangeIterator,
21 TableBucketRangeMultimapIterator,
22};
23
24#[derive(Debug, Clone)]
26pub struct TableBucketBuilder {
27 bucket_size: u64,
28 table_prefix: String,
29 table_names: Arc<Mutex<HashMap<u64, &'static str>>>,
30}
31
32#[cfg(test)]
33mod tests {
34 use super::TableBucketBuilder;
35 use crate::MergeableValue;
36 use redb::{Database, ReadableDatabase, TableDefinition, TableError};
37 use tempfile::NamedTempFile;
38
39 impl MergeableValue for String {
40 fn merge(existing: Option<Self>, incoming: Self) -> Self {
41 match existing {
42 Some(existing) => format!("{}+{}", existing, incoming),
43 None => incoming,
44 }
45 }
46 }
47
48 #[test]
49 fn merge_bucket_tables_into_target() -> Result<(), Box<dyn std::error::Error>> {
50 let temp_file = NamedTempFile::new()?;
51 let db = Database::create(temp_file.path())?;
52 let builder = TableBucketBuilder::new(100, "merge_test")?;
53 let target: TableDefinition<u64, String> = TableDefinition::new("merged");
54
55 {
56 let write_txn = db.begin_write()?;
57 {
58 let mut table = write_txn.open_table(builder.table_definition::<u64, String>(0))?;
59 table.insert(1u64, "a".to_string())?;
60 table.insert(2u64, "x".to_string())?;
61 }
62 {
63 let mut table = write_txn.open_table(builder.table_definition::<u64, String>(1))?;
64 table.insert(1u64, "b".to_string())?;
65 table.insert(3u64, "y".to_string())?;
66 }
67 {
68 let mut table = write_txn.open_table(builder.table_definition::<u64, String>(2))?;
69 table.insert(1u64, "c".to_string())?;
70 }
71 write_txn.commit()?;
72 }
73
74 {
75 let mut write_txn = db.begin_write()?;
76 builder.merge(&mut write_txn, target, 0, 1)?;
77 write_txn.commit()?;
78 }
79
80 let read_txn = db.begin_read()?;
81 let target_read: TableDefinition<u64, String> = TableDefinition::new("merged");
82 let table = read_txn.open_table(target_read)?;
83 assert_eq!(table.get(1u64)?.unwrap().value(), "a+b");
84 assert_eq!(table.get(2u64)?.unwrap().value(), "x");
85 assert_eq!(table.get(3u64)?.unwrap().value(), "y");
86
87 match read_txn.open_table(builder.table_definition::<u64, String>(0)) {
88 Err(TableError::TableDoesNotExist(_)) => {}
89 _ => panic!("bucket 0 table should be deleted"),
90 }
91
92 match read_txn.open_table(builder.table_definition::<u64, String>(1)) {
93 Err(TableError::TableDoesNotExist(_)) => {}
94 _ => panic!("bucket 1 table should be deleted"),
95 }
96
97 let bucket_two = read_txn.open_table(builder.table_definition::<u64, String>(2))?;
98 assert_eq!(bucket_two.get(1u64)?.unwrap().value(), "c");
99
100 Ok(())
101 }
102
103 #[test]
104 fn merge_all_bucket_tables_into_target() -> Result<(), Box<dyn std::error::Error>> {
105 let temp_file = NamedTempFile::new()?;
106 let db = Database::create(temp_file.path())?;
107 let builder = TableBucketBuilder::new(100, "merge_all")?;
108 let target: TableDefinition<u64, String> = TableDefinition::new("merged_all");
109
110 {
111 let write_txn = db.begin_write()?;
112 {
113 let mut table = write_txn.open_table(builder.table_definition::<u64, String>(0))?;
114 table.insert(1u64, "a".to_string())?;
115 }
116 {
117 let mut table = write_txn.open_table(builder.table_definition::<u64, String>(2))?;
118 table.insert(1u64, "c".to_string())?;
119 }
120 write_txn.commit()?;
121 }
122
123 {
124 let mut write_txn = db.begin_write()?;
125 builder.merge_all(&mut write_txn, target)?;
126 write_txn.commit()?;
127 }
128
129 let read_txn = db.begin_read()?;
130 let target_read: TableDefinition<u64, String> = TableDefinition::new("merged_all");
131 let table = read_txn.open_table(target_read)?;
132 assert_eq!(table.get(1u64)?.unwrap().value(), "a+c");
133
134 match read_txn.open_table(builder.table_definition::<u64, String>(0)) {
135 Err(TableError::TableDoesNotExist(_)) => {}
136 _ => panic!("bucket 0 table should be deleted"),
137 }
138
139 match read_txn.open_table(builder.table_definition::<u64, String>(2)) {
140 Err(TableError::TableDoesNotExist(_)) => {}
141 _ => panic!("bucket 2 table should be deleted"),
142 }
143
144 Ok(())
145 }
146}
147
148impl TableBucketBuilder {
149 pub fn new(bucket_size: u64, table_prefix: impl Into<String>) -> Result<Self, BucketError> {
155 if bucket_size == 0 {
156 return Err(BucketError::InvalidBucketSize(bucket_size));
157 }
158
159 Ok(Self {
160 bucket_size,
161 table_prefix: table_prefix.into(),
162 table_names: Arc::new(Mutex::new(HashMap::new())),
163 })
164 }
165
166 pub fn bucket_size(&self) -> u64 {
168 self.bucket_size
169 }
170
171 pub fn table_prefix(&self) -> &str {
173 &self.table_prefix
174 }
175
176 pub fn bucket_for_sequence(&self, sequence: u64) -> u64 {
178 sequence / self.bucket_size
179 }
180
181 pub fn bucket_table_name(&self, bucket: u64) -> &'static str {
183 let mut table_names = self
184 .table_names
185 .lock()
186 .unwrap_or_else(|err| err.into_inner());
187
188 if let Some(name) = table_names.get(&bucket) {
189 return name;
190 }
191
192 let name = format!("{}_{}", self.table_prefix, bucket);
193 let leaked = Box::leak(name.into_boxed_str());
194 table_names.insert(bucket, leaked);
195 leaked
196 }
197
198 pub fn table_definition<K: Key + 'static, V: Value + 'static>(
200 &self,
201 bucket: u64,
202 ) -> TableDefinition<'static, K, V> {
203 TableDefinition::new(self.bucket_table_name(bucket))
204 }
205
206 pub fn multimap_table_definition<K: Key + 'static, V: Key + 'static>(
208 &self,
209 bucket: u64,
210 ) -> MultimapTableDefinition<'static, K, V> {
211 MultimapTableDefinition::new(self.bucket_table_name(bucket))
212 }
213
214 pub fn merge<K, V>(
216 &self,
217 txn: &mut WriteTransaction,
218 target: TableDefinition<'static, K, V>,
219 start_bucket: u64,
220 end_bucket: u64,
221 ) -> Result<(), BucketError>
222 where
223 K: Key + 'static,
224 V: Value + MergeableValue + 'static,
225 for<'b> V: From<V::SelfType<'b>>,
226 for<'b> V: Borrow<V::SelfType<'b>>,
227 {
228 if start_bucket > end_bucket {
229 return Err(BucketError::InvalidRange {
230 start: start_bucket,
231 end: end_bucket,
232 });
233 }
234
235 let mut existing_tables = HashSet::new();
236 let tables = txn.list_tables().map_err(|err| {
237 BucketError::IterationError(format!("Failed to list tables: {}", err))
238 })?;
239 for table in tables {
240 existing_tables.insert(table.name().to_string());
241 }
242
243 let mut target_table = txn.open_table(target).map_err(|err| {
244 BucketError::IterationError(format!("Failed to open target table: {}", err))
245 })?;
246
247 for bucket in start_bucket..=end_bucket {
248 let bucket_name = self.bucket_table_name(bucket);
249 if !existing_tables.contains(bucket_name) {
250 continue;
251 }
252
253 let definition = self.table_definition::<K, V>(bucket);
254 let bucket_table = txn.open_table(definition).map_err(|err| {
255 BucketError::IterationError(format!(
256 "Failed to open bucket table {}: {}",
257 bucket, err
258 ))
259 })?;
260
261 let iter = bucket_table.iter().map_err(|err| {
262 BucketError::IterationError(format!(
263 "Failed to iterate bucket table {}: {}",
264 bucket, err
265 ))
266 })?;
267
268 for entry in iter {
269 let (key_guard, value_guard) = entry.map_err(|err| {
270 BucketError::IterationError(format!(
271 "Failed to read bucket table {}: {}",
272 bucket, err
273 ))
274 })?;
275
276 let incoming = V::from(value_guard.value());
277 let existing_value = match target_table.get(key_guard.value()) {
278 Ok(Some(existing_guard)) => Some(V::from(existing_guard.value())),
279 Ok(None) => None,
280 Err(err) => {
281 return Err(BucketError::IterationError(format!(
282 "Failed to read target table: {}",
283 err
284 )))
285 }
286 };
287 let merged = V::merge(existing_value, incoming);
288 target_table
289 .insert(key_guard.value(), merged)
290 .map_err(|err| {
291 BucketError::IterationError(format!(
292 "Failed to write merged value: {}",
293 err
294 ))
295 })?;
296 }
297
298 drop(bucket_table);
299 txn.delete_table(definition).map_err(|err| {
300 BucketError::IterationError(format!(
301 "Failed to delete bucket table {}: {}",
302 bucket, err
303 ))
304 })?;
305 }
306
307 Ok(())
308 }
309
310 pub fn merge_all<K, V>(
312 &self,
313 txn: &mut WriteTransaction,
314 target: TableDefinition<'static, K, V>,
315 ) -> Result<(), BucketError>
316 where
317 K: Key + 'static,
318 V: Value + MergeableValue + 'static,
319 for<'b> V: From<V::SelfType<'b>>,
320 for<'b> V: Borrow<V::SelfType<'b>>,
321 {
322 let Some((min_bucket, max_bucket)) = self.bucket_range_from_tables(txn)? else {
323 return Ok(());
324 };
325
326 self.merge(txn, target, min_bucket, max_bucket)
327 }
328
329 fn bucket_range_from_tables(
330 &self,
331 txn: &WriteTransaction,
332 ) -> Result<Option<(u64, u64)>, BucketError> {
333 let mut min_bucket: Option<u64> = None;
334 let mut max_bucket: Option<u64> = None;
335 let prefix = format!("{}_", self.table_prefix);
336
337 let tables = txn.list_tables().map_err(|err| {
338 BucketError::IterationError(format!("Failed to list tables: {}", err))
339 })?;
340
341 for table in tables {
342 let name = table.name();
343 let Some(bucket_suffix) = name.strip_prefix(&prefix) else {
344 continue;
345 };
346 let Ok(bucket) = bucket_suffix.parse::<u64>() else {
347 continue;
348 };
349
350 min_bucket = Some(min_bucket.map_or(bucket, |current| current.min(bucket)));
351 max_bucket = Some(max_bucket.map_or(bucket, |current| current.max(bucket)));
352 }
353
354 Ok(min_bucket.zip(max_bucket))
355 }
356}