1use crate::partition::config::PartitionConfig;
7use crate::partition::scan::{enumerate_segments, find_head_segment, SegmentInfo};
8use crate::partition::shard::select_shard;
9use crate::partition::PartitionError;
10use crate::Result;
11use redb::{Database, ReadTransaction, ReadableTable, TableDefinition, WriteTransaction};
12use std::collections::HashMap;
13
14pub fn encode_segment_key(key: &[u8], shard: u16, segment: u16) -> Result<Vec<u8>> {
16 let mut encoded_key = Vec::with_capacity(4 + key.len() + 4);
17
18 encoded_key.extend_from_slice(&(key.len() as u32).to_be_bytes());
20
21 encoded_key.extend_from_slice(key);
23
24 encoded_key.extend_from_slice(&shard.to_be_bytes());
26
27 encoded_key.extend_from_slice(&segment.to_be_bytes());
29
30 Ok(encoded_key)
31}
32
33type SegmentDataMap = HashMap<u16, Vec<(SegmentInfo, Option<Vec<u8>>)>>;
35type SegmentSimpleMap = HashMap<u16, Vec<(u16, Vec<u8>)>>;
36type SegmentResult = Option<(SegmentInfo, Vec<u8>)>;
37
38pub const SEGMENT_TABLE: TableDefinition<&'static [u8], &'static [u8]> =
40 TableDefinition::new("redb_extras_segments");
41
42pub const META_TABLE: TableDefinition<&'static [u8], &'static [u8]> =
44 TableDefinition::new("redb_extras_meta");
45
46pub struct PartitionedTable<V> {
57 name: &'static str,
58 config: PartitionConfig,
59 _phantom: std::marker::PhantomData<V>,
60}
61
62impl<V> PartitionedTable<V> {
63 pub fn new(name: &'static str, config: PartitionConfig) -> Self {
72 Self {
73 name,
74 config,
75 _phantom: std::marker::PhantomData,
76 }
77 }
78
79 pub fn ensure_table_exists(&self, db: &Database) -> Result<()> {
90 let txn = db
91 .begin_write()
92 .map_err(|e| PartitionError::DatabaseError(format!("Failed to begin write: {}", e)))?;
93
94 {
95 let _segment_table = txn.open_table(SEGMENT_TABLE).map_err(|e| {
96 PartitionError::DatabaseError(format!("Failed to open segment table: {}", e))
97 })?;
98
99 if self.config.use_meta {
100 let _meta_table = txn.open_table(META_TABLE).map_err(|e| {
101 PartitionError::DatabaseError(format!("Failed to open meta table: {}", e))
102 })?;
103 }
104 }
105
106 txn.commit().map_err(|e| {
107 PartitionError::DatabaseError(format!("Failed to commit table creation: {}", e))
108 })?;
109
110 Ok(())
111 }
112
113 pub fn name(&self) -> &'static str {
115 self.name
116 }
117
118 pub fn config(&self) -> &PartitionConfig {
120 &self.config
121 }
122
123 pub fn select_shard(&self, key: &[u8], element_id: u64) -> Result<u16> {
125 Ok(select_shard(key, element_id, self.config.shard_count)?)
126 }
127}
128
129pub struct PartitionedRead<'a, V> {
133 table: &'a PartitionedTable<V>,
134 txn: &'a ReadTransaction,
135}
136
137impl<'a, V> PartitionedRead<'a, V> {
138 pub fn new(table: &'a PartitionedTable<V>, txn: &'a ReadTransaction) -> Self {
140 Self { table, txn }
141 }
142
143 pub fn table(&self) -> &PartitionedTable<V> {
145 self.table
146 }
147
148 pub fn collect_all_segments(&self, key: &[u8]) -> Result<SegmentDataMap> {
159 let mut result = HashMap::new();
160
161 let table = self.txn.open_table(SEGMENT_TABLE).map_err(|e| {
163 PartitionError::DatabaseError(format!("Failed to open segment table: {}", e))
164 })?;
165
166 for shard in 0..self.table.config.shard_count {
168 let mut shard_segments = Vec::new();
169
170 let mut segment_iter = enumerate_segments(&table, key, shard)?;
172
173 while let Some(segment_result) = segment_iter.next() {
174 let segment_info = segment_result?;
175 shard_segments.push((segment_info.clone(), segment_info.segment_data.clone()));
176 }
177
178 if !shard_segments.is_empty() {
179 result.insert(shard, shard_segments);
180 }
181 }
182
183 Ok(result)
184 }
185
186 pub fn enumerate_all_segments(&self, key: &[u8]) -> Result<SegmentSimpleMap> {
197 let mut result = HashMap::new();
198
199 let table = self.txn.open_table(SEGMENT_TABLE).map_err(|e| {
201 PartitionError::DatabaseError(format!("Failed to open segment table: {}", e))
202 })?;
203
204 for shard in 0..self.table.config.shard_count {
206 let mut shard_segments = Vec::new();
207
208 let mut segment_iter = enumerate_segments(&table, key, shard)?;
210
211 while let Some(segment_result) = segment_iter.next() {
212 let segment_info = segment_result?;
213 if let Some(data) = segment_info.segment_data {
214 shard_segments.push((segment_info.segment_id, data));
215 }
216 }
217
218 if !shard_segments.is_empty() {
219 result.insert(shard, shard_segments);
220 }
221 }
222
223 Ok(result)
224 }
225
226 pub fn read_segment_data(&self, segment_info: &SegmentInfo) -> Result<SegmentResult> {
237 if let Some(ref data) = segment_info.segment_data {
239 return Ok(Some((segment_info.clone(), data.clone())));
240 }
241
242 let table = self.txn.open_table(SEGMENT_TABLE).map_err(|e| {
244 PartitionError::DatabaseError(format!("Failed to open segment table: {}", e))
245 })?;
246
247 match table.get(&*segment_info.segment_key) {
248 Ok(Some(value_guard)) => {
249 let data = value_guard.value().to_vec();
250 let mut info_with_data = segment_info.clone();
251 info_with_data.segment_data = Some(data.clone());
252 Ok(Some((info_with_data, data)))
253 }
254 Ok(None) => Ok(None),
255 Err(e) => {
256 Err(PartitionError::DatabaseError(format!("Failed to read segment: {}", e)).into())
257 }
258 }
259 }
260}
261
262pub struct PartitionedWrite<'a, V> {
266 table: &'a PartitionedTable<V>,
267 txn: &'a mut WriteTransaction,
268}
269
270impl<'a, V> PartitionedWrite<'a, V> {
271 pub fn new(table: &'a PartitionedTable<V>, txn: &'a mut WriteTransaction) -> Self {
273 Self { table, txn }
274 }
275
276 pub fn read_segment_data(&self, segment_info: &SegmentInfo) -> Result<SegmentResult> {
287 if let Some(ref data) = segment_info.segment_data {
289 return Ok(Some((segment_info.clone(), data.clone())));
290 }
291
292 let table = self.txn.open_table(SEGMENT_TABLE).map_err(|e| {
294 PartitionError::DatabaseError(format!("Failed to open segment table: {}", e))
295 })?;
296
297 let result = match table.get(&*segment_info.segment_key) {
298 Ok(Some(value_guard)) => {
299 let data = value_guard.value().to_vec();
300 let mut info_with_data = segment_info.clone();
301 info_with_data.segment_data = Some(data.clone());
302 Ok(Some((info_with_data, data)))
303 }
304 Ok(None) => Ok(None),
305 Err(e) => Err(PartitionError::DatabaseError(format!(
306 "Failed to read segment: {}",
307 e
308 ))),
309 };
310
311 drop(table);
313 Ok(result?)
314 }
315
316 pub fn table(&self) -> &PartitionedTable<V> {
318 self.table
319 }
320
321 pub fn find_head_segment_scan(&self, key: &[u8], shard: u16) -> Result<Option<u16>> {
333 let table = self.txn.open_table(SEGMENT_TABLE).map_err(|e| {
334 PartitionError::DatabaseError(format!("Failed to open segment table: {}", e))
335 })?;
336
337 Ok(find_head_segment(&table, key, shard)?)
338 }
339
340 pub fn write_segment_data(&self, segment_key: &[u8], data: &[u8]) -> Result<()> {
351 let mut table = self.txn.open_table(SEGMENT_TABLE).map_err(|e| {
352 PartitionError::DatabaseError(format!("Failed to open segment table: {}", e))
353 })?;
354
355 table.insert(segment_key, data).map_err(|e| {
356 PartitionError::DatabaseError(format!("Failed to write segment: {}", e))
357 })?;
358
359 Ok(())
360 }
361
362 pub fn create_new_segment(
375 &self,
376 key: &[u8],
377 shard: u16,
378 segment_id: u16,
379 data: &[u8],
380 ) -> Result<()> {
381 let segment_key = encode_segment_key(key, shard, segment_id)?;
382 self.write_segment_data(&segment_key, data)
383 }
384
385 pub fn update_head_segment(&self, key: &[u8], shard: u16, data: &[u8]) -> Result<(bool, u16)> {
400 let head_segment = self.find_head_segment_scan(key, shard)?;
402
403 match head_segment {
404 Some(segment_id) => {
405 if data.len() <= self.table.config.segment_max_bytes {
407 let segment_key = encode_segment_key(key, shard, segment_id)?;
409 self.write_segment_data(&segment_key, data)?;
410 Ok((false, segment_id))
411 } else {
412 let new_segment_id = segment_id + 1;
414 let new_segment_key = encode_segment_key(key, shard, new_segment_id)?;
415 self.write_segment_data(&new_segment_key, data)?;
416 Ok((true, new_segment_id))
417 }
418 }
419 None => {
420 let segment_key = encode_segment_key(key, shard, 0)?;
422 self.write_segment_data(&segment_key, data)?;
423 Ok((true, 0))
424 }
425 }
426 }
427}
428
429#[cfg(test)]
430mod tests {
431 use super::*;
432 use crate::partition::config::PartitionConfig;
433
434 #[test]
435 fn test_partitioned_table_creation() {
436 let config = PartitionConfig::default();
437 let table: PartitionedTable<()> = PartitionedTable::new("test_table", config);
438
439 assert_eq!(table.name(), "test_table");
440 assert_eq!(table.config().shard_count, 16);
441 assert!(table.config().use_meta);
442 }
443
444 #[test]
445 fn test_shard_selection() {
446 let config = PartitionConfig::new(8, 1024, true).unwrap();
447 let table: PartitionedTable<()> = PartitionedTable::new("test", config);
448
449 let key = b"test_key";
450 let element_id = 12345;
451
452 let shard = table.select_shard(key, element_id).unwrap();
453 assert!(shard < 8);
454
455 let shard2 = table.select_shard(key, element_id).unwrap();
457 assert_eq!(shard, shard2);
458 }
459}