redb_extras/partition/
table.rs

1//! Generic partitioned table implementation.
2//!
3//! Provides the core storage infrastructure for sharded and segmented data
4//! that can work with any value type.
5
6use crate::partition::config::PartitionConfig;
7use crate::partition::scan::{enumerate_segments, find_head_segment, SegmentInfo};
8use crate::partition::shard::select_shard;
9use crate::partition::PartitionError;
10use crate::Result;
11use redb::{Database, ReadTransaction, ReadableTable, TableDefinition, WriteTransaction};
12use std::collections::HashMap;
13
14/// Encodes a segment key with the format: \\[key_len\\]\\[key\\]\\[shard\\]\\[segment\\]
15pub fn encode_segment_key(key: &[u8], shard: u16, segment: u16) -> Result<Vec<u8>> {
16    let mut encoded_key = Vec::with_capacity(4 + key.len() + 4);
17
18    // Add key length (4 bytes big-endian)
19    encoded_key.extend_from_slice(&(key.len() as u32).to_be_bytes());
20
21    // Add base key
22    encoded_key.extend_from_slice(key);
23
24    // Add shard (2 bytes big-endian)
25    encoded_key.extend_from_slice(&shard.to_be_bytes());
26
27    // Add segment (2 bytes big-endian)
28    encoded_key.extend_from_slice(&segment.to_be_bytes());
29
30    Ok(encoded_key)
31}
32
33// Type aliases for complex return types
34type SegmentDataMap = HashMap<u16, Vec<(SegmentInfo, Option<Vec<u8>>)>>;
35type SegmentSimpleMap = HashMap<u16, Vec<(u16, Vec<u8>)>>;
36type SegmentResult = Option<(SegmentInfo, Vec<u8>)>;
37
38/// Table definition for segment data storage
39pub const SEGMENT_TABLE: TableDefinition<&'static [u8], &'static [u8]> =
40    TableDefinition::new("redb_extras_segments");
41
42/// Table definition for meta data storage (head segment tracking)
43pub const META_TABLE: TableDefinition<&'static [u8], &'static [u8]> =
44    TableDefinition::new("redb_extras_meta");
45
46/// Generic partitioned table that stores values in sharded segments.
47///
48/// This type provides the core storage infrastructure without knowing anything
49/// about specific value types. It handles the mechanics of:
50/// - Sharding writes across multiple partitions
51/// - Segmenting large values to control write amplification
52/// - Optional meta table for O(1) head segment discovery
53///
54/// The `V` parameter represents the value handler type that knows how to
55/// encode/decode and manipulate specific value types.
56pub struct PartitionedTable<V> {
57    name: &'static str,
58    config: PartitionConfig,
59    _phantom: std::marker::PhantomData<V>,
60}
61
62impl<V> PartitionedTable<V> {
63    /// Creates a new partitioned table with the given configuration.
64    ///
65    /// # Arguments
66    /// * `name` - Table name for database storage
67    /// * `config` - Partitioning configuration
68    ///
69    /// # Returns
70    /// New partitioned table instance
71    pub fn new(name: &'static str, config: PartitionConfig) -> Self {
72        Self {
73            name,
74            config,
75            _phantom: std::marker::PhantomData,
76        }
77    }
78
79    /// Ensures required tables exist in the database.
80    ///
81    /// This method creates the segment table and optionally the meta table
82    /// if they don't already exist.
83    ///
84    /// # Arguments
85    /// * `db` - The database instance
86    ///
87    /// # Returns
88    /// Ok on success, error on failure
89    pub fn ensure_table_exists(&self, db: &Database) -> Result<()> {
90        let txn = db
91            .begin_write()
92            .map_err(|e| PartitionError::DatabaseError(format!("Failed to begin write: {}", e)))?;
93
94        {
95            let _segment_table = txn.open_table(SEGMENT_TABLE).map_err(|e| {
96                PartitionError::DatabaseError(format!("Failed to open segment table: {}", e))
97            })?;
98
99            if self.config.use_meta {
100                let _meta_table = txn.open_table(META_TABLE).map_err(|e| {
101                    PartitionError::DatabaseError(format!("Failed to open meta table: {}", e))
102                })?;
103            }
104        }
105
106        txn.commit().map_err(|e| {
107            PartitionError::DatabaseError(format!("Failed to commit table creation: {}", e))
108        })?;
109
110        Ok(())
111    }
112
113    /// Returns the table name.
114    pub fn name(&self) -> &'static str {
115        self.name
116    }
117
118    /// Returns the configuration.
119    pub fn config(&self) -> &PartitionConfig {
120        &self.config
121    }
122
123    /// Selects the appropriate shard for a given base key and element.
124    pub fn select_shard(&self, key: &[u8], element_id: u64) -> Result<u16> {
125        Ok(select_shard(key, element_id, self.config.shard_count)?)
126    }
127}
128
129/// Read operations for partitioned tables.
130///
131/// Provides read-only access to partitioned data without the ability to modify.
132pub struct PartitionedRead<'a, V> {
133    table: &'a PartitionedTable<V>,
134    txn: &'a ReadTransaction,
135}
136
137impl<'a, V> PartitionedRead<'a, V> {
138    /// Creates a new read handle.
139    pub fn new(table: &'a PartitionedTable<V>, txn: &'a ReadTransaction) -> Self {
140        Self { table, txn }
141    }
142
143    /// Gets the table reference.
144    pub fn table(&self) -> &PartitionedTable<V> {
145        self.table
146    }
147
148    /// Collects all segments across all shards for a given base key.
149    ///
150    /// This method iterates through all shards and collects all segments
151    /// that belong to the specified base key.
152    ///
153    /// # Arguments
154    /// * `key` - The key to search for
155    ///
156    /// # Returns
157    /// HashMap where key is shard ID and value is vector of (segment_info, segment_data) tuples
158    pub fn collect_all_segments(&self, key: &[u8]) -> Result<SegmentDataMap> {
159        let mut result = HashMap::new();
160
161        // Open the segment table
162        let table = self.txn.open_table(SEGMENT_TABLE).map_err(|e| {
163            PartitionError::DatabaseError(format!("Failed to open segment table: {}", e))
164        })?;
165
166        // Iterate through all shards
167        for shard in 0..self.table.config.shard_count {
168            let mut shard_segments = Vec::new();
169
170            // Enumerate segments for this shard
171            let mut segment_iter = enumerate_segments(&table, key, shard)?;
172
173            while let Some(segment_result) = segment_iter.next() {
174                let segment_info = segment_result?;
175                shard_segments.push((segment_info.clone(), segment_info.segment_data.clone()));
176            }
177
178            if !shard_segments.is_empty() {
179                result.insert(shard, shard_segments);
180            }
181        }
182
183        Ok(result)
184    }
185
186    /// Enumerates all segments for a given base key across all shards.
187    ///
188    /// This method returns segment data in a simplified format
189    /// for easier consumption by callers.
190    ///
191    /// # Arguments
192    /// * `key` - The key to search for
193    ///
194    /// # Returns
195    /// HashMap where key is shard ID and value is vector of (segment_id, segment_data) tuples
196    pub fn enumerate_all_segments(&self, key: &[u8]) -> Result<SegmentSimpleMap> {
197        let mut result = HashMap::new();
198
199        // Open the segment table
200        let table = self.txn.open_table(SEGMENT_TABLE).map_err(|e| {
201            PartitionError::DatabaseError(format!("Failed to open segment table: {}", e))
202        })?;
203
204        // Iterate through all shards
205        for shard in 0..self.table.config.shard_count {
206            let mut shard_segments = Vec::new();
207
208            // Enumerate segments for this shard
209            let mut segment_iter = enumerate_segments(&table, key, shard)?;
210
211            while let Some(segment_result) = segment_iter.next() {
212                let segment_info = segment_result?;
213                if let Some(data) = segment_info.segment_data {
214                    shard_segments.push((segment_info.segment_id, data));
215                }
216            }
217
218            if !shard_segments.is_empty() {
219                result.insert(shard, shard_segments);
220            }
221        }
222
223        Ok(result)
224    }
225
226    /// Reads data for a specific segment.
227    ///
228    /// If segment_info already contains data, it's returned directly.
229    /// Otherwise, the data is read from the database.
230    ///
231    /// # Arguments
232    /// * `segment_info` - Information about the segment to read
233    ///
234    /// # Returns
235    /// Option containing (segment_info, segment_data) or None if segment doesn't exist
236    pub fn read_segment_data(&self, segment_info: &SegmentInfo) -> Result<SegmentResult> {
237        // If segment_info already has data, return it
238        if let Some(ref data) = segment_info.segment_data {
239            return Ok(Some((segment_info.clone(), data.clone())));
240        }
241
242        // Otherwise, read from the database
243        let table = self.txn.open_table(SEGMENT_TABLE).map_err(|e| {
244            PartitionError::DatabaseError(format!("Failed to open segment table: {}", e))
245        })?;
246
247        match table.get(&*segment_info.segment_key) {
248            Ok(Some(value_guard)) => {
249                let data = value_guard.value().to_vec();
250                let mut info_with_data = segment_info.clone();
251                info_with_data.segment_data = Some(data.clone());
252                Ok(Some((info_with_data, data)))
253            }
254            Ok(None) => Ok(None),
255            Err(e) => {
256                Err(PartitionError::DatabaseError(format!("Failed to read segment: {}", e)).into())
257            }
258        }
259    }
260}
261
262/// Write operations for partitioned tables.
263///
264/// Provides read-write access to partitioned data with the ability to modify values.
265pub struct PartitionedWrite<'a, V> {
266    table: &'a PartitionedTable<V>,
267    txn: &'a mut WriteTransaction,
268}
269
270impl<'a, V> PartitionedWrite<'a, V> {
271    /// Creates a new write handle.
272    pub fn new(table: &'a PartitionedTable<V>, txn: &'a mut WriteTransaction) -> Self {
273        Self { table, txn }
274    }
275
276    /// Reads segment data for the given segment info.
277    ///
278    /// If segment_info already contains data, it's returned directly.
279    /// Otherwise, the data is read from the database.
280    ///
281    /// # Arguments
282    /// * `segment_info` - Information about the segment to read
283    ///
284    /// # Returns
285    /// Option containing (segment_info, segment_data) or None if segment doesn't exist
286    pub fn read_segment_data(&self, segment_info: &SegmentInfo) -> Result<SegmentResult> {
287        // If segment_info already has data, return it
288        if let Some(ref data) = segment_info.segment_data {
289            return Ok(Some((segment_info.clone(), data.clone())));
290        }
291
292        // Otherwise, read from the database
293        let table = self.txn.open_table(SEGMENT_TABLE).map_err(|e| {
294            PartitionError::DatabaseError(format!("Failed to open segment table: {}", e))
295        })?;
296
297        let result = match table.get(&*segment_info.segment_key) {
298            Ok(Some(value_guard)) => {
299                let data = value_guard.value().to_vec();
300                let mut info_with_data = segment_info.clone();
301                info_with_data.segment_data = Some(data.clone());
302                Ok(Some((info_with_data, data)))
303            }
304            Ok(None) => Ok(None),
305            Err(e) => Err(PartitionError::DatabaseError(format!(
306                "Failed to read segment: {}",
307                e
308            ))),
309        };
310
311        // Drop table before returning result
312        drop(table);
313        Ok(result?)
314    }
315
316    /// Gets the table reference.
317    pub fn table(&self) -> &PartitionedTable<V> {
318        self.table
319    }
320
321    /// Finds the head segment using scan method (when meta table is disabled).
322    ///
323    /// This method scans all segments for the given (key, shard) pair
324    /// and returns the one with the highest segment ID.
325    ///
326    /// # Arguments
327    /// * `key` - The key to search for
328    /// * `shard` - The shard ID
329    ///
330    /// # Returns
331    /// The head segment ID, or None if no segments exist
332    pub fn find_head_segment_scan(&self, key: &[u8], shard: u16) -> Result<Option<u16>> {
333        let table = self.txn.open_table(SEGMENT_TABLE).map_err(|e| {
334            PartitionError::DatabaseError(format!("Failed to open segment table: {}", e))
335        })?;
336
337        Ok(find_head_segment(&table, key, shard)?)
338    }
339
340    /// Writes data to a specific segment.
341    ///
342    /// This method overwrites any existing data at the segment key.
343    ///
344    /// # Arguments
345    /// * `segment_key` - The encoded segment key
346    /// * `data` - The data to write
347    ///
348    /// # Returns
349    /// Ok on success, error on failure
350    pub fn write_segment_data(&self, segment_key: &[u8], data: &[u8]) -> Result<()> {
351        let mut table = self.txn.open_table(SEGMENT_TABLE).map_err(|e| {
352            PartitionError::DatabaseError(format!("Failed to open segment table: {}", e))
353        })?;
354
355        table.insert(segment_key, data).map_err(|e| {
356            PartitionError::DatabaseError(format!("Failed to write segment: {}", e))
357        })?;
358
359        Ok(())
360    }
361
362    /// Creates a new segment with the given data.
363    ///
364    /// The segment_id should be the next available ID for this shard.
365    ///
366    /// # Arguments
367    /// * `key` - The base key
368    /// * `shard` - The shard ID
369    /// * `segment_id` - The segment ID
370    /// * `data` - The segment data
371    ///
372    /// # Returns
373    /// Ok on success, error on failure
374    pub fn create_new_segment(
375        &self,
376        key: &[u8],
377        shard: u16,
378        segment_id: u16,
379        data: &[u8],
380    ) -> Result<()> {
381        let segment_key = encode_segment_key(key, shard, segment_id)?;
382        self.write_segment_data(&segment_key, data)
383    }
384
385    /// Updates the head segment with new data, rolling if necessary.
386    ///
387    /// This method checks if the new data fits in the current head segment.
388    /// If it doesn't fit, a new segment is created.
389    ///
390    /// # Arguments
391    /// * `key` - The base key
392    /// * `shard` - The shard ID
393    /// * `data` - The new segment data
394    ///
395    /// # Returns
396    /// Tuple of (was_rolled, new_segment_id) where:
397    /// - was_rolled: true if a new segment was created
398    /// - new_segment_id: ID of the segment that now contains the data
399    pub fn update_head_segment(&self, key: &[u8], shard: u16, data: &[u8]) -> Result<(bool, u16)> {
400        // Find current head segment
401        let head_segment = self.find_head_segment_scan(key, shard)?;
402
403        match head_segment {
404            Some(segment_id) => {
405                // Check if data fits in current segment
406                if data.len() <= self.table.config.segment_max_bytes {
407                    // Update existing segment
408                    let segment_key = encode_segment_key(key, shard, segment_id)?;
409                    self.write_segment_data(&segment_key, data)?;
410                    Ok((false, segment_id))
411                } else {
412                    // Roll to new segment
413                    let new_segment_id = segment_id + 1;
414                    let new_segment_key = encode_segment_key(key, shard, new_segment_id)?;
415                    self.write_segment_data(&new_segment_key, data)?;
416                    Ok((true, new_segment_id))
417                }
418            }
419            None => {
420                // No segments exist, create first one
421                let segment_key = encode_segment_key(key, shard, 0)?;
422                self.write_segment_data(&segment_key, data)?;
423                Ok((true, 0))
424            }
425        }
426    }
427}
428
429#[cfg(test)]
430mod tests {
431    use super::*;
432    use crate::partition::config::PartitionConfig;
433
434    #[test]
435    fn test_partitioned_table_creation() {
436        let config = PartitionConfig::default();
437        let table: PartitionedTable<()> = PartitionedTable::new("test_table", config);
438
439        assert_eq!(table.name(), "test_table");
440        assert_eq!(table.config().shard_count, 16);
441        assert!(table.config().use_meta);
442    }
443
444    #[test]
445    fn test_shard_selection() {
446        let config = PartitionConfig::new(8, 1024, true).unwrap();
447        let table: PartitionedTable<()> = PartitionedTable::new("test", config);
448
449        let key = b"test_key";
450        let element_id = 12345;
451
452        let shard = table.select_shard(key, element_id).unwrap();
453        assert!(shard < 8);
454
455        // Should be deterministic
456        let shard2 = table.select_shard(key, element_id).unwrap();
457        assert_eq!(shard, shard2);
458    }
459}