use crate::partition::config::PartitionConfig;
use crate::partition::scan::{enumerate_segments, find_head_segment, SegmentInfo};
use crate::partition::shard::select_shard;
use crate::partition::PartitionError;
use crate::Result;
use redb::{Database, ReadTransaction, ReadableTable, TableDefinition, WriteTransaction};
use std::collections::HashMap;
pub fn encode_segment_key(key: &[u8], shard: u16, segment: u16) -> Result<Vec<u8>> {
let mut encoded_key = Vec::with_capacity(4 + key.len() + 4);
encoded_key.extend_from_slice(&(key.len() as u32).to_be_bytes());
encoded_key.extend_from_slice(key);
encoded_key.extend_from_slice(&shard.to_be_bytes());
encoded_key.extend_from_slice(&segment.to_be_bytes());
Ok(encoded_key)
}
type SegmentDataMap = HashMap<u16, Vec<(SegmentInfo, Option<Vec<u8>>)>>;
type SegmentSimpleMap = HashMap<u16, Vec<(u16, Vec<u8>)>>;
type SegmentResult = Option<(SegmentInfo, Vec<u8>)>;
pub const SEGMENT_TABLE: TableDefinition<&'static [u8], &'static [u8]> =
TableDefinition::new("redb_extras_segments");
pub const META_TABLE: TableDefinition<&'static [u8], &'static [u8]> =
TableDefinition::new("redb_extras_meta");
pub struct PartitionedTable<V> {
name: &'static str,
config: PartitionConfig,
_phantom: std::marker::PhantomData<V>,
}
impl<V> PartitionedTable<V> {
pub fn new(name: &'static str, config: PartitionConfig) -> Self {
Self {
name,
config,
_phantom: std::marker::PhantomData,
}
}
pub fn ensure_table_exists(&self, db: &Database) -> Result<()> {
let txn = db
.begin_write()
.map_err(|e| PartitionError::DatabaseError(format!("Failed to begin write: {}", e)))?;
{
let _segment_table = txn.open_table(SEGMENT_TABLE).map_err(|e| {
PartitionError::DatabaseError(format!("Failed to open segment table: {}", e))
})?;
if self.config.use_meta {
let _meta_table = txn.open_table(META_TABLE).map_err(|e| {
PartitionError::DatabaseError(format!("Failed to open meta table: {}", e))
})?;
}
}
txn.commit().map_err(|e| {
PartitionError::DatabaseError(format!("Failed to commit table creation: {}", e))
})?;
Ok(())
}
pub fn name(&self) -> &'static str {
self.name
}
pub fn config(&self) -> &PartitionConfig {
&self.config
}
pub fn select_shard(&self, key: &[u8], element_id: u64) -> Result<u16> {
Ok(select_shard(key, element_id, self.config.shard_count)?)
}
}
pub struct PartitionedRead<'a, V> {
table: &'a PartitionedTable<V>,
txn: &'a ReadTransaction,
}
impl<'a, V> PartitionedRead<'a, V> {
pub fn new(table: &'a PartitionedTable<V>, txn: &'a ReadTransaction) -> Self {
Self { table, txn }
}
pub fn table(&self) -> &PartitionedTable<V> {
self.table
}
pub fn collect_all_segments(&self, key: &[u8]) -> Result<SegmentDataMap> {
let mut result = HashMap::new();
let table = self.txn.open_table(SEGMENT_TABLE).map_err(|e| {
PartitionError::DatabaseError(format!("Failed to open segment table: {}", e))
})?;
for shard in 0..self.table.config.shard_count {
let mut shard_segments = Vec::new();
let mut segment_iter = enumerate_segments(&table, key, shard)?;
while let Some(segment_result) = segment_iter.next() {
let segment_info = segment_result?;
shard_segments.push((segment_info.clone(), segment_info.segment_data.clone()));
}
if !shard_segments.is_empty() {
result.insert(shard, shard_segments);
}
}
Ok(result)
}
pub fn enumerate_all_segments(&self, key: &[u8]) -> Result<SegmentSimpleMap> {
let mut result = HashMap::new();
let table = self.txn.open_table(SEGMENT_TABLE).map_err(|e| {
PartitionError::DatabaseError(format!("Failed to open segment table: {}", e))
})?;
for shard in 0..self.table.config.shard_count {
let mut shard_segments = Vec::new();
let mut segment_iter = enumerate_segments(&table, key, shard)?;
while let Some(segment_result) = segment_iter.next() {
let segment_info = segment_result?;
if let Some(data) = segment_info.segment_data {
shard_segments.push((segment_info.segment_id, data));
}
}
if !shard_segments.is_empty() {
result.insert(shard, shard_segments);
}
}
Ok(result)
}
pub fn read_segment_data(&self, segment_info: &SegmentInfo) -> Result<SegmentResult> {
if let Some(ref data) = segment_info.segment_data {
return Ok(Some((segment_info.clone(), data.clone())));
}
let table = self.txn.open_table(SEGMENT_TABLE).map_err(|e| {
PartitionError::DatabaseError(format!("Failed to open segment table: {}", e))
})?;
match table.get(&*segment_info.segment_key) {
Ok(Some(value_guard)) => {
let data = value_guard.value().to_vec();
let mut info_with_data = segment_info.clone();
info_with_data.segment_data = Some(data.clone());
Ok(Some((info_with_data, data)))
}
Ok(None) => Ok(None),
Err(e) => {
Err(PartitionError::DatabaseError(format!("Failed to read segment: {}", e)).into())
}
}
}
}
pub struct PartitionedWrite<'a, V> {
table: &'a PartitionedTable<V>,
txn: &'a mut WriteTransaction,
}
impl<'a, V> PartitionedWrite<'a, V> {
pub fn new(table: &'a PartitionedTable<V>, txn: &'a mut WriteTransaction) -> Self {
Self { table, txn }
}
pub fn read_segment_data(&self, segment_info: &SegmentInfo) -> Result<SegmentResult> {
if let Some(ref data) = segment_info.segment_data {
return Ok(Some((segment_info.clone(), data.clone())));
}
let table = self.txn.open_table(SEGMENT_TABLE).map_err(|e| {
PartitionError::DatabaseError(format!("Failed to open segment table: {}", e))
})?;
let result = match table.get(&*segment_info.segment_key) {
Ok(Some(value_guard)) => {
let data = value_guard.value().to_vec();
let mut info_with_data = segment_info.clone();
info_with_data.segment_data = Some(data.clone());
Ok(Some((info_with_data, data)))
}
Ok(None) => Ok(None),
Err(e) => Err(PartitionError::DatabaseError(format!(
"Failed to read segment: {}",
e
))),
};
drop(table);
Ok(result?)
}
pub fn table(&self) -> &PartitionedTable<V> {
self.table
}
pub fn find_head_segment_scan(&self, key: &[u8], shard: u16) -> Result<Option<u16>> {
let table = self.txn.open_table(SEGMENT_TABLE).map_err(|e| {
PartitionError::DatabaseError(format!("Failed to open segment table: {}", e))
})?;
Ok(find_head_segment(&table, key, shard)?)
}
pub fn write_segment_data(&self, segment_key: &[u8], data: &[u8]) -> Result<()> {
let mut table = self.txn.open_table(SEGMENT_TABLE).map_err(|e| {
PartitionError::DatabaseError(format!("Failed to open segment table: {}", e))
})?;
table.insert(segment_key, data).map_err(|e| {
PartitionError::DatabaseError(format!("Failed to write segment: {}", e))
})?;
Ok(())
}
pub fn create_new_segment(
&self,
key: &[u8],
shard: u16,
segment_id: u16,
data: &[u8],
) -> Result<()> {
let segment_key = encode_segment_key(key, shard, segment_id)?;
self.write_segment_data(&segment_key, data)
}
pub fn update_head_segment(&self, key: &[u8], shard: u16, data: &[u8]) -> Result<(bool, u16)> {
let head_segment = self.find_head_segment_scan(key, shard)?;
match head_segment {
Some(segment_id) => {
if data.len() <= self.table.config.segment_max_bytes {
let segment_key = encode_segment_key(key, shard, segment_id)?;
self.write_segment_data(&segment_key, data)?;
Ok((false, segment_id))
} else {
let new_segment_id = segment_id + 1;
let new_segment_key = encode_segment_key(key, shard, new_segment_id)?;
self.write_segment_data(&new_segment_key, data)?;
Ok((true, new_segment_id))
}
}
None => {
let segment_key = encode_segment_key(key, shard, 0)?;
self.write_segment_data(&segment_key, data)?;
Ok((true, 0))
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::partition::config::PartitionConfig;
#[test]
fn test_partitioned_table_creation() {
let config = PartitionConfig::default();
let table: PartitionedTable<()> = PartitionedTable::new("test_table", config);
assert_eq!(table.name(), "test_table");
assert_eq!(table.config().shard_count, 16);
assert!(table.config().use_meta);
}
#[test]
fn test_shard_selection() {
let config = PartitionConfig::new(8, 1024, true).unwrap();
let table: PartitionedTable<()> = PartitionedTable::new("test", config);
let key = b"test_key";
let element_id = 12345;
let shard = table.select_shard(key, element_id).unwrap();
assert!(shard < 8);
let shard2 = table.select_shard(key, element_id).unwrap();
assert_eq!(shard, shard2);
}
}