redb_extras/table_buckets/
mod.rs

1//! Table bucket storage utility module.
2//!
3//! This module provides bucket-based table grouping for sequence data by
4//! mapping each bucket to its own redb table. It mirrors the bucketed key
5//! approach but uses table-per-bucket instead of key prefixes.
6
7use crate::MergeableValue;
8use redb::{
9    Key, MultimapTableDefinition, ReadableTable, TableDefinition, TableHandle, Value,
10    WriteTransaction,
11};
12use std::borrow::Borrow;
13use std::collections::{HashMap, HashSet};
14use std::sync::{Arc, Mutex};
15
16pub mod iterator;
17
18pub use crate::key_buckets::BucketError;
19pub use iterator::{
20    TableBucketIterExt, TableBucketMultimapIterExt, TableBucketRangeIterator,
21    TableBucketRangeMultimapIterator,
22};
23
24/// Builder for table bucket configuration and name resolution.
25#[derive(Debug, Clone)]
26pub struct TableBucketBuilder {
27    bucket_size: u64,
28    table_prefix: String,
29    table_names: Arc<Mutex<HashMap<u64, &'static str>>>,
30}
31
32#[cfg(test)]
33mod tests {
34    use super::TableBucketBuilder;
35    use crate::MergeableValue;
36    use redb::{Database, ReadableDatabase, TableDefinition, TableError};
37    use tempfile::NamedTempFile;
38
39    impl MergeableValue for String {
40        fn merge(existing: Option<Self>, incoming: Self) -> Self {
41            match existing {
42                Some(existing) => format!("{}+{}", existing, incoming),
43                None => incoming,
44            }
45        }
46    }
47
48    #[test]
49    fn merge_bucket_tables_into_target() -> Result<(), Box<dyn std::error::Error>> {
50        let temp_file = NamedTempFile::new()?;
51        let db = Database::create(temp_file.path())?;
52        let builder = TableBucketBuilder::new(100, "merge_test")?;
53        let target: TableDefinition<u64, String> = TableDefinition::new("merged");
54
55        {
56            let write_txn = db.begin_write()?;
57            {
58                let mut table = write_txn.open_table(builder.table_definition::<u64, String>(0))?;
59                table.insert(1u64, "a".to_string())?;
60                table.insert(2u64, "x".to_string())?;
61            }
62            {
63                let mut table = write_txn.open_table(builder.table_definition::<u64, String>(1))?;
64                table.insert(1u64, "b".to_string())?;
65                table.insert(3u64, "y".to_string())?;
66            }
67            {
68                let mut table = write_txn.open_table(builder.table_definition::<u64, String>(2))?;
69                table.insert(1u64, "c".to_string())?;
70            }
71            write_txn.commit()?;
72        }
73
74        {
75            let mut write_txn = db.begin_write()?;
76            builder.merge(&mut write_txn, target, 0, 1)?;
77            write_txn.commit()?;
78        }
79
80        let read_txn = db.begin_read()?;
81        let target_read: TableDefinition<u64, String> = TableDefinition::new("merged");
82        let table = read_txn.open_table(target_read)?;
83        assert_eq!(table.get(1u64)?.unwrap().value(), "a+b");
84        assert_eq!(table.get(2u64)?.unwrap().value(), "x");
85        assert_eq!(table.get(3u64)?.unwrap().value(), "y");
86
87        match read_txn.open_table(builder.table_definition::<u64, String>(0)) {
88            Err(TableError::TableDoesNotExist(_)) => {}
89            _ => panic!("bucket 0 table should be deleted"),
90        }
91
92        match read_txn.open_table(builder.table_definition::<u64, String>(1)) {
93            Err(TableError::TableDoesNotExist(_)) => {}
94            _ => panic!("bucket 1 table should be deleted"),
95        }
96
97        let bucket_two = read_txn.open_table(builder.table_definition::<u64, String>(2))?;
98        assert_eq!(bucket_two.get(1u64)?.unwrap().value(), "c");
99
100        Ok(())
101    }
102
103    #[test]
104    fn merge_all_bucket_tables_into_target() -> Result<(), Box<dyn std::error::Error>> {
105        let temp_file = NamedTempFile::new()?;
106        let db = Database::create(temp_file.path())?;
107        let builder = TableBucketBuilder::new(100, "merge_all")?;
108        let target: TableDefinition<u64, String> = TableDefinition::new("merged_all");
109
110        {
111            let write_txn = db.begin_write()?;
112            {
113                let mut table = write_txn.open_table(builder.table_definition::<u64, String>(0))?;
114                table.insert(1u64, "a".to_string())?;
115            }
116            {
117                let mut table = write_txn.open_table(builder.table_definition::<u64, String>(2))?;
118                table.insert(1u64, "c".to_string())?;
119            }
120            write_txn.commit()?;
121        }
122
123        {
124            let mut write_txn = db.begin_write()?;
125            builder.merge_all(&mut write_txn, target)?;
126            write_txn.commit()?;
127        }
128
129        let read_txn = db.begin_read()?;
130        let target_read: TableDefinition<u64, String> = TableDefinition::new("merged_all");
131        let table = read_txn.open_table(target_read)?;
132        assert_eq!(table.get(1u64)?.unwrap().value(), "a+c");
133
134        match read_txn.open_table(builder.table_definition::<u64, String>(0)) {
135            Err(TableError::TableDoesNotExist(_)) => {}
136            _ => panic!("bucket 0 table should be deleted"),
137        }
138
139        match read_txn.open_table(builder.table_definition::<u64, String>(2)) {
140            Err(TableError::TableDoesNotExist(_)) => {}
141            _ => panic!("bucket 2 table should be deleted"),
142        }
143
144        Ok(())
145    }
146}
147
148impl TableBucketBuilder {
149    /// Create a new builder with the specified bucket size and table prefix.
150    ///
151    /// # Arguments
152    /// * `bucket_size` - Size of each bucket for integer division (must be > 0)
153    /// * `table_prefix` - Prefix for bucket table names
154    pub fn new(bucket_size: u64, table_prefix: impl Into<String>) -> Result<Self, BucketError> {
155        if bucket_size == 0 {
156            return Err(BucketError::InvalidBucketSize(bucket_size));
157        }
158
159        Ok(Self {
160            bucket_size,
161            table_prefix: table_prefix.into(),
162            table_names: Arc::new(Mutex::new(HashMap::new())),
163        })
164    }
165
166    /// Get the configured bucket size.
167    pub fn bucket_size(&self) -> u64 {
168        self.bucket_size
169    }
170
171    /// Get the configured table prefix.
172    pub fn table_prefix(&self) -> &str {
173        &self.table_prefix
174    }
175
176    /// Compute the bucket for the given sequence.
177    pub fn bucket_for_sequence(&self, sequence: u64) -> u64 {
178        sequence / self.bucket_size
179    }
180
181    /// Resolve the bucket table name, caching and leaking the name string.
182    pub fn bucket_table_name(&self, bucket: u64) -> &'static str {
183        let mut table_names = self
184            .table_names
185            .lock()
186            .unwrap_or_else(|err| err.into_inner());
187
188        if let Some(name) = table_names.get(&bucket) {
189            return name;
190        }
191
192        let name = format!("{}_{}", self.table_prefix, bucket);
193        let leaked = Box::leak(name.into_boxed_str());
194        table_names.insert(bucket, leaked);
195        leaked
196    }
197
198    /// Create a table definition for the given bucket.
199    pub fn table_definition<K: Key + 'static, V: Value + 'static>(
200        &self,
201        bucket: u64,
202    ) -> TableDefinition<'static, K, V> {
203        TableDefinition::new(self.bucket_table_name(bucket))
204    }
205
206    /// Create a multimap table definition for the given bucket.
207    pub fn multimap_table_definition<K: Key + 'static, V: Key + 'static>(
208        &self,
209        bucket: u64,
210    ) -> MultimapTableDefinition<'static, K, V> {
211        MultimapTableDefinition::new(self.bucket_table_name(bucket))
212    }
213
214    /// Merge bucket tables into a single non-bucketed target table and delete the originals.
215    pub fn merge<K, V>(
216        &self,
217        txn: &mut WriteTransaction,
218        target: TableDefinition<'static, K, V>,
219        start_bucket: u64,
220        end_bucket: u64,
221    ) -> Result<(), BucketError>
222    where
223        K: Key + 'static,
224        V: Value + MergeableValue + 'static,
225        for<'b> V: From<V::SelfType<'b>>,
226        for<'b> V: Borrow<V::SelfType<'b>>,
227    {
228        if start_bucket > end_bucket {
229            return Err(BucketError::InvalidRange {
230                start: start_bucket,
231                end: end_bucket,
232            });
233        }
234
235        let mut existing_tables = HashSet::new();
236        let tables = txn.list_tables().map_err(|err| {
237            BucketError::IterationError(format!("Failed to list tables: {}", err))
238        })?;
239        for table in tables {
240            existing_tables.insert(table.name().to_string());
241        }
242
243        let mut target_table = txn.open_table(target).map_err(|err| {
244            BucketError::IterationError(format!("Failed to open target table: {}", err))
245        })?;
246
247        for bucket in start_bucket..=end_bucket {
248            let bucket_name = self.bucket_table_name(bucket);
249            if !existing_tables.contains(bucket_name) {
250                continue;
251            }
252
253            let definition = self.table_definition::<K, V>(bucket);
254            let bucket_table = txn.open_table(definition).map_err(|err| {
255                BucketError::IterationError(format!(
256                    "Failed to open bucket table {}: {}",
257                    bucket, err
258                ))
259            })?;
260
261            let iter = bucket_table.iter().map_err(|err| {
262                BucketError::IterationError(format!(
263                    "Failed to iterate bucket table {}: {}",
264                    bucket, err
265                ))
266            })?;
267
268            for entry in iter {
269                let (key_guard, value_guard) = entry.map_err(|err| {
270                    BucketError::IterationError(format!(
271                        "Failed to read bucket table {}: {}",
272                        bucket, err
273                    ))
274                })?;
275
276                let incoming = V::from(value_guard.value());
277                let existing_value = match target_table.get(key_guard.value()) {
278                    Ok(Some(existing_guard)) => Some(V::from(existing_guard.value())),
279                    Ok(None) => None,
280                    Err(err) => {
281                        return Err(BucketError::IterationError(format!(
282                            "Failed to read target table: {}",
283                            err
284                        )))
285                    }
286                };
287                let merged = V::merge(existing_value, incoming);
288                target_table
289                    .insert(key_guard.value(), merged)
290                    .map_err(|err| {
291                        BucketError::IterationError(format!(
292                            "Failed to write merged value: {}",
293                            err
294                        ))
295                    })?;
296            }
297
298            drop(bucket_table);
299            txn.delete_table(definition).map_err(|err| {
300                BucketError::IterationError(format!(
301                    "Failed to delete bucket table {}: {}",
302                    bucket, err
303                ))
304            })?;
305        }
306
307        Ok(())
308    }
309
310    /// Merge all bucket tables discovered in the database into the target table.
311    pub fn merge_all<K, V>(
312        &self,
313        txn: &mut WriteTransaction,
314        target: TableDefinition<'static, K, V>,
315    ) -> Result<(), BucketError>
316    where
317        K: Key + 'static,
318        V: Value + MergeableValue + 'static,
319        for<'b> V: From<V::SelfType<'b>>,
320        for<'b> V: Borrow<V::SelfType<'b>>,
321    {
322        let Some((min_bucket, max_bucket)) = self.bucket_range_from_tables(txn)? else {
323            return Ok(());
324        };
325
326        self.merge(txn, target, min_bucket, max_bucket)
327    }
328
329    fn bucket_range_from_tables(
330        &self,
331        txn: &WriteTransaction,
332    ) -> Result<Option<(u64, u64)>, BucketError> {
333        let mut min_bucket: Option<u64> = None;
334        let mut max_bucket: Option<u64> = None;
335        let prefix = format!("{}_", self.table_prefix);
336
337        let tables = txn.list_tables().map_err(|err| {
338            BucketError::IterationError(format!("Failed to list tables: {}", err))
339        })?;
340
341        for table in tables {
342            let name = table.name();
343            let Some(bucket_suffix) = name.strip_prefix(&prefix) else {
344                continue;
345            };
346            let Ok(bucket) = bucket_suffix.parse::<u64>() else {
347                continue;
348            };
349
350            min_bucket = Some(min_bucket.map_or(bucket, |current| current.min(bucket)));
351            max_bucket = Some(max_bucket.map_or(bucket, |current| current.max(bucket)));
352        }
353
354        Ok(min_bucket.zip(max_bucket))
355    }
356}