#![allow(unused_variables)]
use super::{Key, Transaction};
use crate::{Error, Result};
use rocksdb::DB;
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use parking_lot::RwLock;
pub type SnapshotId = u64;
pub type BranchId = u64;
const BRANCH_META_PREFIX: &[u8] = b"branch:meta:";
const BRANCH_REGISTRY_KEY: &[u8] = b"branch:registry";
const BRANCH_CHILDREN_PREFIX: &[u8] = b"branch:children:";
const BRANCH_DATA_PREFIX: &[u8] = b"bdata:";
pub const GIT_CONFIG_KEY: &[u8] = b"git:config";
pub const GIT_LINK_PREFIX: &[u8] = b"git:link:";
pub const GIT_COMMIT_PREFIX: &[u8] = b"git:commit:";
pub const GIT_DDL_HISTORY_PREFIX: &[u8] = b"git:ddl:";
pub const GIT_SCHEMA_SNAPSHOT_PREFIX: &[u8] = b"git:schema_snapshot:";
pub const GIT_PR_PREFIX: &[u8] = b"git:pr:";
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum BranchState {
Active,
Merged {
into_branch: BranchId,
at_timestamp: u64,
},
Dropped {
at_timestamp: u64,
},
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum MergeStrategy {
Auto,
Manual,
Theirs,
Ours,
}
impl Default for MergeStrategy {
fn default() -> Self {
Self::Auto
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MergeConflict {
pub key: String,
pub base_value: Option<Vec<u8>>,
pub source_value: Option<Vec<u8>>,
pub target_value: Option<Vec<u8>>,
pub source_timestamp: u64,
pub target_timestamp: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MergeResult {
pub merge_timestamp: u64,
pub merged_keys: usize,
pub conflicts: Vec<MergeConflict>,
pub completed: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct GitLinkMetadata {
pub git_branch: String,
pub last_commit: Option<String>,
pub auto_sync: bool,
pub provider: Option<String>,
pub pr_number: Option<u64>,
pub repo_path: Option<String>,
pub linked_at: u64,
}
impl Default for GitLinkMetadata {
fn default() -> Self {
Self {
git_branch: String::new(),
last_commit: None,
auto_sync: true,
provider: None,
pr_number: None,
repo_path: None,
linked_at: 0,
}
}
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct BranchOptions {
pub replication_factor: Option<usize>,
pub region: Option<String>,
pub metadata: HashMap<String, String>,
pub git_link: Option<GitLinkMetadata>,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct BranchStats {
pub modified_keys: u64,
pub storage_bytes: u64,
pub commit_count: u64,
pub last_modified: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BranchMetadata {
pub name: String,
pub branch_id: BranchId,
pub parent_id: Option<BranchId>,
pub created_at: u64,
pub created_from_snapshot: SnapshotId,
pub state: BranchState,
pub merge_base: Option<SnapshotId>,
pub options: BranchOptions,
pub stats: BranchStats,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BranchRegistry {
pub branches: HashMap<BranchId, String>,
pub main_branch: BranchId,
pub next_branch_id: u64,
}
impl BranchRegistry {
pub fn new() -> Self {
let mut branches = HashMap::new();
branches.insert(1, "main".to_string());
Self {
branches,
main_branch: 1,
next_branch_id: 2,
}
}
pub fn next_id(&mut self) -> BranchId {
let id = self.next_branch_id;
self.next_branch_id += 1;
id
}
pub fn add_branch(&mut self, branch_id: BranchId, name: String) {
self.branches.insert(branch_id, name);
}
pub fn remove_branch(&mut self, branch_id: BranchId) {
self.branches.remove(&branch_id);
}
pub fn contains(&self, branch_id: BranchId) -> bool {
self.branches.contains_key(&branch_id)
}
pub fn get_name(&self, branch_id: BranchId) -> Option<&str> {
self.branches.get(&branch_id).map(|s| s.as_str())
}
}
impl Default for BranchRegistry {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone)]
pub struct BranchGcConfig {
pub min_retention_seconds: u64,
pub auto_gc_enabled: bool,
pub gc_mode: BranchGcMode,
}
impl Default for BranchGcConfig {
fn default() -> Self {
Self {
min_retention_seconds: 300, auto_gc_enabled: true,
gc_mode: BranchGcMode::Deferred,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum BranchGcMode {
Immediate,
Deferred,
}
pub struct BranchManager {
db: Arc<DB>,
registry: Arc<RwLock<BranchRegistry>>,
metadata_cache: Arc<RwLock<HashMap<BranchId, BranchMetadata>>>,
timestamp: Arc<RwLock<u64>>,
gc_config: BranchGcConfig,
pending_gc: Arc<RwLock<HashMap<BranchId, u64>>>,
}
impl BranchManager {
pub fn new(db: Arc<DB>, timestamp: Arc<RwLock<u64>>) -> Result<Self> {
Self::with_gc_config(db, timestamp, BranchGcConfig::default())
}
pub fn with_gc_config(
db: Arc<DB>,
timestamp: Arc<RwLock<u64>>,
gc_config: BranchGcConfig,
) -> Result<Self> {
let registry = Self::load_or_create_registry(&db)?;
let pending_gc = Self::load_pending_gc(&db)?;
Ok(Self {
db,
registry: Arc::new(RwLock::new(registry)),
metadata_cache: Arc::new(RwLock::new(HashMap::new())),
timestamp,
gc_config,
pending_gc: Arc::new(RwLock::new(pending_gc)),
})
}
fn load_or_create_registry(db: &DB) -> Result<BranchRegistry> {
match db.get(BRANCH_REGISTRY_KEY) {
Ok(Some(data)) => {
bincode::deserialize(&data)
.map_err(|e| Error::storage(format!("Failed to deserialize registry: {}", e)))
}
Ok(None) => {
let registry = BranchRegistry::new();
let main_metadata = BranchMetadata {
name: "main".to_string(),
branch_id: 1,
parent_id: None,
created_at: 0,
created_from_snapshot: 0,
state: BranchState::Active,
merge_base: None,
options: BranchOptions::default(),
stats: BranchStats::default(),
};
let meta_key = encode_branch_meta_key("main");
let meta_value = bincode::serialize(&main_metadata)
.map_err(|e| Error::storage(format!("Failed to serialize metadata: {}", e)))?;
db.put(&meta_key, &meta_value)
.map_err(|e| Error::storage(format!("Failed to save main branch metadata: {}", e)))?;
let registry_value = bincode::serialize(®istry)
.map_err(|e| Error::storage(format!("Failed to serialize registry: {}", e)))?;
db.put(BRANCH_REGISTRY_KEY, ®istry_value)
.map_err(|e| Error::storage(format!("Failed to save registry: {}", e)))?;
Ok(registry)
}
Err(e) => Err(Error::storage(format!("Failed to load registry: {}", e))),
}
}
fn save_registry(&self) -> Result<()> {
let registry = self.registry.read();
let value = bincode::serialize(&*registry)
.map_err(|e| Error::storage(format!("Failed to serialize registry: {}", e)))?;
self.db.put(BRANCH_REGISTRY_KEY, &value)
.map_err(|e| Error::storage(format!("Failed to save registry: {}", e)))
}
fn load_pending_gc(db: &DB) -> Result<HashMap<BranchId, u64>> {
const PENDING_GC_KEY: &[u8] = b"branch:pending_gc";
match db.get(PENDING_GC_KEY) {
Ok(Some(data)) => {
bincode::deserialize(&data)
.map_err(|e| Error::storage(format!("Failed to deserialize pending GC queue: {}", e)))
}
Ok(None) => Ok(HashMap::new()),
Err(e) => Err(Error::storage(format!("Failed to load pending GC queue: {}", e))),
}
}
fn save_pending_gc(&self) -> Result<()> {
const PENDING_GC_KEY: &[u8] = b"branch:pending_gc";
let pending = self.pending_gc.read();
let value = bincode::serialize(&*pending)
.map_err(|e| Error::storage(format!("Failed to serialize pending GC queue: {}", e)))?;
self.db.put(PENDING_GC_KEY, &value)
.map_err(|e| Error::storage(format!("Failed to save pending GC queue: {}", e)))
}
pub fn create_branch(
&self,
name: &str,
parent_name: Option<&str>,
snapshot_id: SnapshotId,
options: BranchOptions,
) -> Result<BranchId> {
if self.get_branch_by_name(name).is_ok() {
return Err(Error::storage(format!("Branch '{}' already exists", name)));
}
let parent_id = if let Some(parent) = parent_name {
Some(self.get_branch_by_name(parent)?.branch_id)
} else {
Some(self.registry.read().main_branch)
};
let branch_id = {
let mut registry = self.registry.write();
registry.next_id()
};
let current_ts = self.next_timestamp();
let metadata = BranchMetadata {
name: name.to_string(),
branch_id,
parent_id,
created_at: current_ts,
created_from_snapshot: snapshot_id,
state: BranchState::Active,
merge_base: None,
options,
stats: BranchStats::default(),
};
let meta_key = encode_branch_meta_key(name);
let meta_value = bincode::serialize(&metadata)
.map_err(|e| Error::storage(format!("Failed to serialize metadata: {}", e)))?;
self.db.put(&meta_key, &meta_value)
.map_err(|e| Error::storage(format!("Failed to save branch metadata: {}", e)))?;
{
let mut registry = self.registry.write();
registry.add_branch(branch_id, name.to_string());
}
self.save_registry()?;
if let Some(parent_id) = parent_id {
self.add_child_branch(parent_id, branch_id)?;
}
self.metadata_cache.write().insert(branch_id, metadata);
Ok(branch_id)
}
pub fn drop_branch(&self, name: &str, if_exists: bool) -> Result<()> {
let metadata = match self.get_branch_by_name(name) {
Ok(meta) => meta,
Err(_) if if_exists => return Ok(()),
Err(e) => return Err(e),
};
if metadata.branch_id == self.registry.read().main_branch {
return Err(Error::storage("Cannot drop main branch"));
}
let children = self.get_child_branches(metadata.branch_id)?;
if !children.is_empty() {
return Err(Error::storage(format!(
"Cannot drop branch '{}': has {} child branch(es)",
name,
children.len()
)));
}
let mut updated_meta = metadata.clone();
updated_meta.state = BranchState::Dropped {
at_timestamp: self.current_timestamp(),
};
let meta_key = encode_branch_meta_key(name);
let meta_value = bincode::serialize(&updated_meta)
.map_err(|e| Error::storage(format!("Failed to serialize metadata: {}", e)))?;
self.db.put(&meta_key, &meta_value)
.map_err(|e| Error::storage(format!("Failed to save updated metadata: {}", e)))?;
{
let mut registry = self.registry.write();
registry.remove_branch(metadata.branch_id);
}
self.save_registry()?;
self.metadata_cache.write().remove(&metadata.branch_id);
self.schedule_branch_gc(metadata.branch_id, metadata.name.clone())?;
Ok(())
}
fn schedule_branch_gc(&self, branch_id: BranchId, branch_name: String) -> Result<()> {
if !self.gc_config.auto_gc_enabled {
tracing::debug!("Branch GC disabled, skipping cleanup for branch {}", branch_name);
return Ok(());
}
let current_ts = self.current_timestamp();
match self.gc_config.gc_mode {
BranchGcMode::Immediate => {
tracing::info!("Starting immediate GC for branch '{}' (ID: {})", branch_name, branch_id);
self.gc_branch_data(branch_id)?;
tracing::info!("Completed immediate GC for branch '{}'", branch_name);
Ok(())
}
BranchGcMode::Deferred => {
tracing::debug!("Scheduling deferred GC for branch '{}' (ID: {})", branch_name, branch_id);
self.pending_gc.write().insert(branch_id, current_ts);
self.save_pending_gc()?;
Ok(())
}
}
}
fn gc_branch_data(&self, branch_id: BranchId) -> Result<()> {
let mut prefix = Vec::new();
prefix.extend_from_slice(BRANCH_DATA_PREFIX);
prefix.extend_from_slice(&branch_id.to_be_bytes());
prefix.push(b':');
let mut delete_count = 0;
let mut keys_to_delete = Vec::new();
let iter = self.db.prefix_iterator(&prefix);
for item in iter {
let (key, _value) = item
.map_err(|e| Error::storage(format!("GC iterator error: {}", e)))?;
keys_to_delete.push(key.to_vec());
}
for key in keys_to_delete {
self.db.delete(&key)
.map_err(|e| Error::storage(format!("Failed to delete branch data: {}", e)))?;
delete_count += 1;
}
tracing::info!(
"Branch GC deleted {} data keys for branch ID {}",
delete_count,
branch_id
);
Ok(())
}
pub fn gc_eligible_branches(&self) -> Result<usize> {
let current_ts = self.current_timestamp();
let min_retention = self.gc_config.min_retention_seconds;
let mut gc_count = 0;
let mut pending = self.pending_gc.write();
let mut to_remove = Vec::new();
for (branch_id, drop_ts) in pending.iter() {
let age_seconds = current_ts.saturating_sub(*drop_ts);
if age_seconds >= min_retention {
tracing::debug!(
"Branch ID {} eligible for GC (age: {}s >= {}s)",
branch_id,
age_seconds,
min_retention
);
to_remove.push(*branch_id);
}
}
for branch_id in to_remove {
match self.gc_branch_data(branch_id) {
Ok(()) => {
pending.remove(&branch_id);
gc_count += 1;
tracing::info!("Successfully GC'd branch ID {}", branch_id);
}
Err(e) => {
tracing::warn!(
"Failed to GC branch ID {}: {}. Will retry later.",
branch_id,
e
);
}
}
}
drop(pending);
self.save_pending_gc()?;
if gc_count > 0 {
tracing::info!("Branch GC completed: {} branches cleaned up", gc_count);
}
Ok(gc_count)
}
pub fn run_gc(&self) -> Result<usize> {
tracing::info!("Starting manual branch GC run");
self.gc_eligible_branches()
}
pub fn pending_gc_count(&self) -> usize {
self.pending_gc.read().len()
}
pub fn gc_config(&self) -> &BranchGcConfig {
&self.gc_config
}
pub fn get_branch_by_name(&self, name: &str) -> Result<BranchMetadata> {
let meta_key = encode_branch_meta_key(name);
let data = self.db.get(&meta_key)
.map_err(|e| Error::storage(format!("Failed to read branch metadata: {}", e)))?
.ok_or_else(|| Error::storage(format!("Branch '{}' not found", name)))?;
bincode::deserialize(&data)
.map_err(|e| Error::storage(format!("Failed to deserialize metadata: {}", e)))
}
pub fn get_branch_by_id(&self, branch_id: BranchId) -> Result<BranchMetadata> {
if let Some(metadata) = self.metadata_cache.read().get(&branch_id) {
return Ok(metadata.clone());
}
let name = self.registry.read()
.get_name(branch_id)
.ok_or_else(|| Error::storage(format!("Branch ID {} not found", branch_id)))?
.to_string();
let metadata = self.get_branch_by_name(&name)?;
self.metadata_cache.write().insert(branch_id, metadata.clone());
Ok(metadata)
}
pub fn list_branches(&self) -> Result<Vec<BranchMetadata>> {
let registry = self.registry.read();
let mut branches = Vec::new();
for (branch_id, name) in ®istry.branches {
match self.get_branch_by_name(name) {
Ok(metadata) if metadata.state == BranchState::Active => {
branches.push(metadata);
}
_ => continue,
}
}
Ok(branches)
}
pub fn get_branch_name(&self, branch_id: BranchId) -> Option<String> {
let registry = self.registry.read();
registry.branches.get(&branch_id).cloned()
}
fn add_child_branch(&self, parent_id: BranchId, child_id: BranchId) -> Result<()> {
let key = encode_branch_children_key(parent_id);
let mut children: Vec<BranchId> = match self.db.get(&key) {
Ok(Some(data)) => bincode::deserialize(&data)
.map_err(|e| Error::storage(format!("Failed to deserialize children: {}", e)))?,
Ok(None) => Vec::new(),
Err(e) => return Err(Error::storage(format!("Failed to read children: {}", e))),
};
children.push(child_id);
let value = bincode::serialize(&children)
.map_err(|e| Error::storage(format!("Failed to serialize children: {}", e)))?;
self.db.put(&key, &value)
.map_err(|e| Error::storage(format!("Failed to save children: {}", e)))
}
fn get_child_branches(&self, parent_id: BranchId) -> Result<Vec<BranchId>> {
let key = encode_branch_children_key(parent_id);
match self.db.get(&key) {
Ok(Some(data)) => bincode::deserialize(&data)
.map_err(|e| Error::storage(format!("Failed to deserialize children: {}", e))),
Ok(None) => Ok(Vec::new()),
Err(e) => Err(Error::storage(format!("Failed to read children: {}", e))),
}
}
pub fn current_timestamp(&self) -> u64 {
*self.timestamp.read()
}
fn next_timestamp(&self) -> u64 {
let mut ts = self.timestamp.write();
*ts += 1;
*ts
}
pub fn merge_branch(
&self,
source_name: &str,
target_name: &str,
strategy: MergeStrategy,
) -> Result<MergeResult> {
tracing::info!(
"Starting merge: {} -> {} (strategy: {:?})",
source_name,
target_name,
strategy
);
let source = self.get_branch_by_name(source_name)?;
let target = self.get_branch_by_name(target_name)?;
if source.state != BranchState::Active {
return Err(Error::branch_merge(format!(
"Source branch '{}' is not active",
source_name
)));
}
if target.state != BranchState::Active {
return Err(Error::branch_merge(format!(
"Target branch '{}' is not active",
target_name
)));
}
let merge_base = self.find_merge_base(source.branch_id, target.branch_id)?;
tracing::debug!(
"Merge base found: snapshot_id = {}",
merge_base
);
let source_keys = self.collect_modified_keys(source.branch_id, merge_base)?;
let target_keys = self.collect_modified_keys(target.branch_id, merge_base)?;
let conflicts = self.detect_conflicts(
&source_keys,
&target_keys,
source.branch_id,
target.branch_id,
merge_base,
)?;
tracing::info!(
"Merge analysis: {} source keys, {} target keys, {} conflicts",
source_keys.len(),
target_keys.len(),
conflicts.len()
);
if strategy == MergeStrategy::Manual && !conflicts.is_empty() {
return Ok(MergeResult {
merge_timestamp: self.current_timestamp(),
merged_keys: 0,
conflicts,
completed: false,
});
}
let merge_timestamp = self.next_timestamp();
let merged_keys = self.apply_merge(
source.branch_id,
target.branch_id,
merge_base,
&source_keys,
&target_keys,
&conflicts,
strategy,
merge_timestamp,
)?;
let mut updated_source = source.clone();
updated_source.state = BranchState::Merged {
into_branch: target.branch_id,
at_timestamp: merge_timestamp,
};
let meta_key = encode_branch_meta_key(source_name);
let meta_value = bincode::serialize(&updated_source)
.map_err(|e| Error::storage(format!("Failed to serialize metadata: {}", e)))?;
self.db.put(&meta_key, &meta_value)
.map_err(|e| Error::storage(format!("Failed to save merged branch metadata: {}", e)))?;
self.metadata_cache.write().insert(source.branch_id, updated_source);
let mut updated_target = target.clone();
updated_target.merge_base = Some(merge_timestamp);
let target_meta_key = encode_branch_meta_key(target_name);
let target_meta_value = bincode::serialize(&updated_target)
.map_err(|e| Error::storage(format!("Failed to serialize metadata: {}", e)))?;
self.db.put(&target_meta_key, &target_meta_value)
.map_err(|e| Error::storage(format!("Failed to save target branch metadata: {}", e)))?;
self.metadata_cache.write().insert(target.branch_id, updated_target);
tracing::info!(
"Merge completed: {} keys merged, {} conflicts",
merged_keys,
conflicts.len()
);
Ok(MergeResult {
merge_timestamp,
merged_keys,
conflicts,
completed: true,
})
}
fn find_merge_base(&self, source_id: BranchId, target_id: BranchId) -> Result<SnapshotId> {
let source_chain = self.build_parent_chain(source_id)?;
let target_chain = self.build_parent_chain(target_id)?;
let source_snapshots: HashSet<SnapshotId> = source_chain
.iter()
.map(|(_, snapshot)| *snapshot)
.collect();
for (_, target_snapshot) in &target_chain {
if source_snapshots.contains(target_snapshot) {
return Ok(*target_snapshot);
}
}
let source_meta = self.get_branch_by_id(source_id)?;
let target_meta = self.get_branch_by_id(target_id)?;
if source_meta.created_from_snapshot == target_meta.created_from_snapshot {
return Ok(source_meta.created_from_snapshot);
}
let merge_base = source_meta.created_from_snapshot.min(target_meta.created_from_snapshot);
tracing::warn!(
"No common ancestor found, using earlier snapshot: {}",
merge_base
);
Ok(merge_base)
}
fn collect_modified_keys(
&self,
branch_id: BranchId,
since_snapshot: SnapshotId,
) -> Result<HashSet<String>> {
let mut keys = HashSet::new();
let prefix = {
let mut p = Vec::new();
p.extend_from_slice(BRANCH_DATA_PREFIX);
p.extend_from_slice(&branch_id.to_be_bytes());
p.push(b':');
p
};
let iter = self.db.prefix_iterator(&prefix);
for item in iter {
let (key, _value) = item.map_err(|e| Error::storage(format!("Iterator error: {}", e)))?;
if let Some((_branch_id, user_key)) = decode_branch_data_key(&key) {
keys.insert(user_key);
}
}
Ok(keys)
}
fn detect_conflicts(
&self,
source_keys: &HashSet<String>,
target_keys: &HashSet<String>,
source_id: BranchId,
target_id: BranchId,
merge_base: SnapshotId,
) -> Result<Vec<MergeConflict>> {
let mut conflicts = Vec::new();
let common_keys: HashSet<_> = source_keys.intersection(target_keys).collect();
for key in common_keys {
let base_value = self.get_key_at_snapshot(target_id, key, merge_base)?;
let source_value = self.get_latest_key_value(source_id, key)?;
let target_value = self.get_latest_key_value(target_id, key)?;
let source_modified = source_value != base_value;
let target_modified = target_value != base_value;
if source_modified && target_modified && source_value != target_value {
let source_ts = self.get_key_timestamp(source_id, key)?;
let target_ts = self.get_key_timestamp(target_id, key)?;
conflicts.push(MergeConflict {
key: key.to_string(),
base_value: base_value.map(|v| v.0),
source_value: source_value.map(|v| v.0),
target_value: target_value.map(|v| v.0),
source_timestamp: source_ts,
target_timestamp: target_ts,
});
}
}
Ok(conflicts)
}
fn apply_merge(
&self,
source_id: BranchId,
target_id: BranchId,
merge_base: SnapshotId,
source_keys: &HashSet<String>,
target_keys: &HashSet<String>,
conflicts: &[MergeConflict],
strategy: MergeStrategy,
merge_timestamp: u64,
) -> Result<usize> {
let mut merged_count = 0;
let conflict_keys: HashSet<String> = conflicts.iter()
.map(|c| c.key.clone())
.collect();
for key in source_keys {
if target_keys.contains(key) {
if conflict_keys.contains(key) {
match strategy {
MergeStrategy::Auto | MergeStrategy::Theirs => {
self.copy_key_to_branch(source_id, target_id, key, merge_timestamp)?;
merged_count += 1;
}
MergeStrategy::Ours => {
}
MergeStrategy::Manual => {
}
}
} else {
let base_value = self.get_key_at_snapshot(target_id, key, merge_base)?;
let source_value = self.get_latest_key_value(source_id, key)?;
if source_value != base_value {
self.copy_key_to_branch(source_id, target_id, key, merge_timestamp)?;
merged_count += 1;
}
}
} else {
self.copy_key_to_branch(source_id, target_id, key, merge_timestamp)?;
merged_count += 1;
}
}
Ok(merged_count)
}
fn copy_key_to_branch(
&self,
source_id: BranchId,
target_id: BranchId,
key: &str,
timestamp: u64,
) -> Result<()> {
let value = self.get_latest_key_value(source_id, key)?;
let target_key = encode_branch_data_key(target_id, key, timestamp);
if let Some((data, _ts)) = value {
self.db.put(&target_key, &data)
.map_err(|e| Error::storage(format!("Failed to copy key: {}", e)))?;
} else {
self.db.delete(&target_key)
.map_err(|e| Error::storage(format!("Failed to delete key: {}", e)))?;
}
Ok(())
}
fn get_key_at_snapshot(
&self,
branch_id: BranchId,
key: &str,
snapshot: SnapshotId,
) -> Result<Option<(Vec<u8>, u64)>> {
let branch_key = encode_branch_data_key(branch_id, key, snapshot);
match self.db.get(&branch_key) {
Ok(Some(data)) => Ok(Some((data, snapshot))),
Ok(None) => Ok(None),
Err(e) => Err(Error::storage(format!("Failed to read key: {}", e))),
}
}
fn get_latest_key_value(&self, branch_id: BranchId, key: &str) -> Result<Option<(Vec<u8>, u64)>> {
let mut prefix = Vec::new();
prefix.extend_from_slice(BRANCH_DATA_PREFIX);
prefix.extend_from_slice(&branch_id.to_be_bytes());
prefix.push(b':');
prefix.extend_from_slice(key.as_bytes());
prefix.push(b':');
let mut iter = self.db.prefix_iterator(&prefix);
if let Some(item) = iter.next() {
let (_k, v) = item.map_err(|e| Error::storage(format!("Iterator error: {}", e)))?;
return Ok(Some((v.to_vec(), 0))); }
Ok(None)
}
fn get_key_timestamp(&self, branch_id: BranchId, key: &str) -> Result<u64> {
if let Some((_data, ts)) = self.get_latest_key_value(branch_id, key)? {
Ok(ts)
} else {
Ok(0)
}
}
pub fn build_parent_chain(&self, branch_id: BranchId) -> Result<Vec<(BranchId, SnapshotId)>> {
let mut chain = Vec::new();
let mut current_id = branch_id;
loop {
let metadata = self.get_branch_by_id(current_id)?;
match metadata.parent_id {
Some(parent_id) => {
chain.push((parent_id, metadata.created_from_snapshot));
current_id = parent_id;
}
None => break, }
}
Ok(chain)
}
}
pub struct BranchTransaction {
tx: Transaction,
branch_id: BranchId,
branch_meta: BranchMetadata,
parent_chain: Vec<(BranchId, SnapshotId)>,
db: Arc<DB>,
}
impl BranchTransaction {
pub fn new(
db: Arc<DB>,
branch_id: BranchId,
branch_meta: BranchMetadata,
parent_chain: Vec<(BranchId, SnapshotId)>,
snapshot_id: SnapshotId,
snapshot_manager: Arc<super::time_travel::SnapshotManager>,
) -> Result<Self> {
let tx = Transaction::new(Arc::clone(&db), snapshot_id, snapshot_manager)?;
Ok(Self {
tx,
branch_id,
branch_meta,
parent_chain,
db,
})
}
pub fn get(&self, key: &Key) -> Result<Option<Vec<u8>>> {
let user_key = String::from_utf8_lossy(key);
let branch_key = encode_branch_data_key(
self.branch_id,
&user_key,
self.tx.snapshot_id(),
);
if let Some(value) = self.tx.get(&branch_key)? {
return Ok(Some(value));
}
for (parent_id, _parent_snapshot) in &self.parent_chain {
let parent_key = if *parent_id == 1 {
key.to_vec()
} else {
encode_branch_data_key(*parent_id, &user_key, 0) };
if let Some(value) = self.db.get(&parent_key)
.map_err(|e| Error::storage(format!("Parent read failed: {}", e)))? {
return Ok(Some(value));
}
}
Ok(None)
}
pub fn put(&mut self, key: Key, value: Vec<u8>) -> Result<()> {
let user_key = String::from_utf8_lossy(&key);
let branch_key = encode_branch_data_key(
self.branch_id,
&user_key,
self.tx.snapshot_id(),
);
self.tx.put(branch_key, value)
}
pub fn delete(&mut self, key: Key) -> Result<()> {
let user_key = String::from_utf8_lossy(&key);
let branch_key = encode_branch_data_key(
self.branch_id,
&user_key,
self.tx.snapshot_id(),
);
self.tx.delete(branch_key)
}
pub fn commit(self) -> Result<()> {
self.tx.commit()
}
pub fn rollback(self) -> Result<()> {
self.tx.rollback()
}
pub fn snapshot_id(&self) -> SnapshotId {
self.tx.snapshot_id()
}
}
fn encode_branch_meta_key(branch_name: &str) -> Vec<u8> {
let mut key = Vec::new();
key.extend_from_slice(BRANCH_META_PREFIX);
key.extend_from_slice(branch_name.as_bytes());
key
}
fn encode_branch_children_key(parent_id: BranchId) -> Vec<u8> {
let mut key = Vec::new();
key.extend_from_slice(BRANCH_CHILDREN_PREFIX);
key.extend_from_slice(&parent_id.to_be_bytes());
key
}
pub fn encode_branch_data_key(
branch_id: BranchId,
user_key: &str,
_timestamp: u64,
) -> Vec<u8> {
let mut key = Vec::new();
key.extend_from_slice(BRANCH_DATA_PREFIX);
key.extend_from_slice(&branch_id.to_be_bytes());
key.push(b':');
key.extend_from_slice(user_key.as_bytes());
key
}
pub fn decode_branch_data_key(key: &[u8]) -> Option<(BranchId, String)> {
if !key.starts_with(BRANCH_DATA_PREFIX) {
return None;
}
let remaining = key.get(BRANCH_DATA_PREFIX.len()..)?;
if remaining.len() < 8 {
return None;
}
let branch_id = u64::from_be_bytes(remaining.get(..8)?.try_into().ok()?);
if remaining.get(8)? != &b':' {
return None;
}
let remaining = remaining.get(9..)?;
let user_key = String::from_utf8(remaining.to_vec()).ok()?;
Some((branch_id, user_key))
}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used)]
mod tests {
use super::*;
use crate::Config;
use crate::storage::StorageEngine;
#[test]
fn test_branch_key_encoding() {
let key = encode_branch_data_key(1, "users:123", 100);
let (branch_id, user_key) = decode_branch_data_key(&key).unwrap();
assert_eq!(branch_id, 1);
assert_eq!(user_key, "users:123");
}
#[test]
fn test_create_branch_manager() {
let config = Config::in_memory();
let engine = StorageEngine::open_in_memory(&config).unwrap();
let manager = BranchManager::new(
Arc::clone(&engine.db),
Arc::clone(&engine.timestamp),
);
assert!(manager.is_ok());
}
#[test]
fn test_create_branch() {
let config = Config::in_memory();
let engine = StorageEngine::open_in_memory(&config).unwrap();
let manager = BranchManager::new(
Arc::clone(&engine.db),
Arc::clone(&engine.timestamp),
).unwrap();
let branch_id = manager.create_branch(
"dev",
Some("main"),
100,
BranchOptions::default(),
).unwrap();
assert!(branch_id > 1);
let metadata = manager.get_branch_by_name("dev").unwrap();
assert_eq!(metadata.name, "dev");
assert_eq!(metadata.branch_id, branch_id);
assert_eq!(metadata.parent_id, Some(1));
assert_eq!(metadata.state, BranchState::Active);
}
#[test]
fn test_drop_branch() {
let config = Config::in_memory();
let engine = StorageEngine::open_in_memory(&config).unwrap();
let manager = BranchManager::new(
Arc::clone(&engine.db),
Arc::clone(&engine.timestamp),
).unwrap();
manager.create_branch(
"temp",
Some("main"),
100,
BranchOptions::default(),
).unwrap();
manager.drop_branch("temp", false).unwrap();
let result = manager.get_branch_by_name("temp");
assert!(result.is_ok()); let metadata = result.unwrap();
assert!(matches!(metadata.state, BranchState::Dropped { .. }));
}
#[test]
fn test_cannot_drop_main() {
let config = Config::in_memory();
let engine = StorageEngine::open_in_memory(&config).unwrap();
let manager = BranchManager::new(
Arc::clone(&engine.db),
Arc::clone(&engine.timestamp),
).unwrap();
let result = manager.drop_branch("main", false);
assert!(result.is_err());
}
#[test]
fn test_cannot_drop_with_children() {
let config = Config::in_memory();
let engine = StorageEngine::open_in_memory(&config).unwrap();
let manager = BranchManager::new(
Arc::clone(&engine.db),
Arc::clone(&engine.timestamp),
).unwrap();
manager.create_branch(
"parent",
Some("main"),
100,
BranchOptions::default(),
).unwrap();
manager.create_branch(
"child",
Some("parent"),
200,
BranchOptions::default(),
).unwrap();
let result = manager.drop_branch("parent", false);
assert!(result.is_err());
}
#[test]
fn test_list_branches() {
let config = Config::in_memory();
let engine = StorageEngine::open_in_memory(&config).unwrap();
let manager = BranchManager::new(
Arc::clone(&engine.db),
Arc::clone(&engine.timestamp),
).unwrap();
manager.create_branch("dev", Some("main"), 100, BranchOptions::default()).unwrap();
manager.create_branch("staging", Some("main"), 200, BranchOptions::default()).unwrap();
let branches = manager.list_branches().unwrap();
assert_eq!(branches.len(), 3);
let names: Vec<_> = branches.iter().map(|b| b.name.as_str()).collect();
assert!(names.contains(&"main"));
assert!(names.contains(&"dev"));
assert!(names.contains(&"staging"));
}
#[test]
fn test_parent_chain() {
let config = Config::in_memory();
let engine = StorageEngine::open_in_memory(&config).unwrap();
let manager = BranchManager::new(
Arc::clone(&engine.db),
Arc::clone(&engine.timestamp),
).unwrap();
let dev_id = manager.create_branch(
"dev",
Some("main"),
100,
BranchOptions::default(),
).unwrap();
let feature_id = manager.create_branch(
"feature",
Some("dev"),
200,
BranchOptions::default(),
).unwrap();
let chain = manager.build_parent_chain(feature_id).unwrap();
assert_eq!(chain.len(), 2); assert_eq!(chain[0].0, dev_id);
assert_eq!(chain[1].0, 1); }
#[test]
fn test_find_merge_base() {
let config = Config::in_memory();
let engine = StorageEngine::open_in_memory(&config).unwrap();
let manager = BranchManager::new(
Arc::clone(&engine.db),
Arc::clone(&engine.timestamp),
).unwrap();
let dev_id = manager.create_branch(
"dev",
Some("main"),
100,
BranchOptions::default(),
).unwrap();
let staging_id = manager.create_branch(
"staging",
Some("main"),
100,
BranchOptions::default(),
).unwrap();
let merge_base = manager.find_merge_base(dev_id, staging_id).unwrap();
assert_eq!(merge_base, 100);
}
#[test]
fn test_merge_branch_auto_strategy() {
let config = Config::in_memory();
let engine = StorageEngine::open_in_memory(&config).unwrap();
let manager = BranchManager::new(
Arc::clone(&engine.db),
Arc::clone(&engine.timestamp),
).unwrap();
manager.create_branch(
"dev",
Some("main"),
100,
BranchOptions::default(),
).unwrap();
let result = manager.merge_branch(
"dev",
"main",
MergeStrategy::Auto,
).unwrap();
assert!(result.completed);
assert_eq!(result.conflicts.len(), 0);
}
#[test]
fn test_merge_branch_manual_strategy_no_conflicts() {
let config = Config::in_memory();
let engine = StorageEngine::open_in_memory(&config).unwrap();
let manager = BranchManager::new(
Arc::clone(&engine.db),
Arc::clone(&engine.timestamp),
).unwrap();
manager.create_branch(
"staging",
Some("main"),
100,
BranchOptions::default(),
).unwrap();
let result = manager.merge_branch(
"staging",
"main",
MergeStrategy::Manual,
).unwrap();
assert!(result.completed);
assert_eq!(result.conflicts.len(), 0);
}
#[test]
fn test_merge_updates_branch_state() {
let config = Config::in_memory();
let engine = StorageEngine::open_in_memory(&config).unwrap();
let manager = BranchManager::new(
Arc::clone(&engine.db),
Arc::clone(&engine.timestamp),
).unwrap();
manager.create_branch(
"feature",
Some("main"),
100,
BranchOptions::default(),
).unwrap();
manager.merge_branch(
"feature",
"main",
MergeStrategy::Auto,
).unwrap();
let feature_meta = manager.get_branch_by_name("feature").unwrap();
match feature_meta.state {
BranchState::Merged { into_branch, .. } => {
assert_eq!(into_branch, 1); }
_ => panic!("Expected branch to be in Merged state"),
}
}
#[test]
fn test_cannot_merge_inactive_branch() {
let config = Config::in_memory();
let engine = StorageEngine::open_in_memory(&config).unwrap();
let manager = BranchManager::new(
Arc::clone(&engine.db),
Arc::clone(&engine.timestamp),
).unwrap();
manager.create_branch(
"temp",
Some("main"),
100,
BranchOptions::default(),
).unwrap();
manager.drop_branch("temp", false).unwrap();
let result = manager.merge_branch(
"temp",
"main",
MergeStrategy::Auto,
);
assert!(result.is_err());
}
#[test]
fn test_merge_theirs_strategy() {
let config = Config::in_memory();
let engine = StorageEngine::open_in_memory(&config).unwrap();
let manager = BranchManager::new(
Arc::clone(&engine.db),
Arc::clone(&engine.timestamp),
).unwrap();
manager.create_branch(
"hotfix",
Some("main"),
100,
BranchOptions::default(),
).unwrap();
let result = manager.merge_branch(
"hotfix",
"main",
MergeStrategy::Theirs,
).unwrap();
assert!(result.completed);
}
#[test]
fn test_merge_ours_strategy() {
let config = Config::in_memory();
let engine = StorageEngine::open_in_memory(&config).unwrap();
let manager = BranchManager::new(
Arc::clone(&engine.db),
Arc::clone(&engine.timestamp),
).unwrap();
manager.create_branch(
"experimental",
Some("main"),
100,
BranchOptions::default(),
).unwrap();
let result = manager.merge_branch(
"experimental",
"main",
MergeStrategy::Ours,
).unwrap();
assert!(result.completed);
}
#[test]
fn test_gc_config_default() {
let config = BranchGcConfig::default();
assert_eq!(config.min_retention_seconds, 300);
assert!(config.auto_gc_enabled);
assert_eq!(config.gc_mode, BranchGcMode::Deferred);
}
#[test]
fn test_branch_gc_immediate_mode() {
let config = Config::in_memory();
let engine = StorageEngine::open_in_memory(&config).unwrap();
let gc_config = BranchGcConfig {
min_retention_seconds: 0,
auto_gc_enabled: true,
gc_mode: BranchGcMode::Immediate,
};
let manager = BranchManager::with_gc_config(
Arc::clone(&engine.db),
Arc::clone(&engine.timestamp),
gc_config,
).unwrap();
let branch_id = manager.create_branch(
"test_gc",
Some("main"),
100,
BranchOptions::default(),
).unwrap();
let key = encode_branch_data_key(branch_id, "test_key", 100);
manager.db.put(&key, b"test_value").unwrap();
assert!(manager.db.get(&key).unwrap().is_some());
manager.drop_branch("test_gc", false).unwrap();
assert!(manager.db.get(&key).unwrap().is_none());
assert_eq!(manager.pending_gc_count(), 0);
}
#[test]
fn test_branch_gc_deferred_mode() {
let config = Config::in_memory();
let engine = StorageEngine::open_in_memory(&config).unwrap();
let gc_config = BranchGcConfig {
min_retention_seconds: 2,
auto_gc_enabled: true,
gc_mode: BranchGcMode::Deferred,
};
let manager = BranchManager::with_gc_config(
Arc::clone(&engine.db),
Arc::clone(&engine.timestamp),
gc_config,
).unwrap();
let branch_id = manager.create_branch(
"test_deferred",
Some("main"),
100,
BranchOptions::default(),
).unwrap();
let key = encode_branch_data_key(branch_id, "test_key", 100);
manager.db.put(&key, b"test_value").unwrap();
manager.drop_branch("test_deferred", false).unwrap();
assert!(manager.db.get(&key).unwrap().is_some());
assert_eq!(manager.pending_gc_count(), 1);
{
let mut ts = manager.timestamp.write();
*ts += 3; }
let gc_count = manager.run_gc().unwrap();
assert_eq!(gc_count, 1);
assert!(manager.db.get(&key).unwrap().is_none());
assert_eq!(manager.pending_gc_count(), 0);
}
#[test]
fn test_gc_disabled() {
let config = Config::in_memory();
let engine = StorageEngine::open_in_memory(&config).unwrap();
let gc_config = BranchGcConfig {
min_retention_seconds: 0,
auto_gc_enabled: false,
gc_mode: BranchGcMode::Immediate,
};
let manager = BranchManager::with_gc_config(
Arc::clone(&engine.db),
Arc::clone(&engine.timestamp),
gc_config,
).unwrap();
let branch_id = manager.create_branch(
"test_no_gc",
Some("main"),
100,
BranchOptions::default(),
).unwrap();
let key = encode_branch_data_key(branch_id, "test_key", 100);
manager.db.put(&key, b"test_value").unwrap();
manager.drop_branch("test_no_gc", false).unwrap();
assert!(manager.db.get(&key).unwrap().is_some());
assert_eq!(manager.pending_gc_count(), 0);
}
#[test]
fn test_gc_multiple_branches() {
let config = Config::in_memory();
let engine = StorageEngine::open_in_memory(&config).unwrap();
let gc_config = BranchGcConfig {
min_retention_seconds: 1,
auto_gc_enabled: true,
gc_mode: BranchGcMode::Deferred,
};
let manager = BranchManager::with_gc_config(
Arc::clone(&engine.db),
Arc::clone(&engine.timestamp),
gc_config,
).unwrap();
for i in 0..5 {
let branch_name = format!("branch_{}", i);
let branch_id = manager.create_branch(
&branch_name,
Some("main"),
100 + i as u64,
BranchOptions::default(),
).unwrap();
let key = encode_branch_data_key(branch_id, "key", 100 + i as u64);
manager.db.put(&key, b"value").unwrap();
manager.drop_branch(&branch_name, false).unwrap();
}
assert_eq!(manager.pending_gc_count(), 5);
{
let mut ts = manager.timestamp.write();
*ts += 2;
}
let gc_count = manager.run_gc().unwrap();
assert_eq!(gc_count, 5);
assert_eq!(manager.pending_gc_count(), 0);
}
#[test]
fn test_gc_retention_period() {
let config = Config::in_memory();
let engine = StorageEngine::open_in_memory(&config).unwrap();
let gc_config = BranchGcConfig {
min_retention_seconds: 10,
auto_gc_enabled: true,
gc_mode: BranchGcMode::Deferred,
};
let manager = BranchManager::with_gc_config(
Arc::clone(&engine.db),
Arc::clone(&engine.timestamp),
gc_config,
).unwrap();
let branch_id = manager.create_branch(
"test_retention",
Some("main"),
100,
BranchOptions::default(),
).unwrap();
let key = encode_branch_data_key(branch_id, "key", 100);
manager.db.put(&key, b"value").unwrap();
manager.drop_branch("test_retention", false).unwrap();
{
let mut ts = manager.timestamp.write();
*ts += 5; }
let gc_count = manager.run_gc().unwrap();
assert_eq!(gc_count, 0);
assert!(manager.db.get(&key).unwrap().is_some());
{
let mut ts = manager.timestamp.write();
*ts += 6; }
let gc_count = manager.run_gc().unwrap();
assert_eq!(gc_count, 1);
assert!(manager.db.get(&key).unwrap().is_none());
}
#[test]
fn test_gc_persistence() {
let config = Config::in_memory();
let engine = StorageEngine::open_in_memory(&config).unwrap();
let gc_config = BranchGcConfig {
min_retention_seconds: 100,
auto_gc_enabled: true,
gc_mode: BranchGcMode::Deferred,
};
{
let manager = BranchManager::with_gc_config(
Arc::clone(&engine.db),
Arc::clone(&engine.timestamp),
gc_config.clone(),
).unwrap();
manager.create_branch(
"test_persist",
Some("main"),
100,
BranchOptions::default(),
).unwrap();
manager.drop_branch("test_persist", false).unwrap();
assert_eq!(manager.pending_gc_count(), 1);
}
let manager2 = BranchManager::with_gc_config(
Arc::clone(&engine.db),
Arc::clone(&engine.timestamp),
gc_config,
).unwrap();
assert_eq!(manager2.pending_gc_count(), 1);
}
#[test]
fn test_gc_with_multiple_keys() {
let config = Config::in_memory();
let engine = StorageEngine::open_in_memory(&config).unwrap();
let gc_config = BranchGcConfig {
min_retention_seconds: 0,
auto_gc_enabled: true,
gc_mode: BranchGcMode::Immediate,
};
let manager = BranchManager::with_gc_config(
Arc::clone(&engine.db),
Arc::clone(&engine.timestamp),
gc_config,
).unwrap();
let branch_id = manager.create_branch(
"multi_key",
Some("main"),
100,
BranchOptions::default(),
).unwrap();
for i in 0..100 {
let key = encode_branch_data_key(branch_id, &format!("key_{}", i), 100);
manager.db.put(&key, b"value").unwrap();
}
for i in 0..100 {
let key = encode_branch_data_key(branch_id, &format!("key_{}", i), 100);
assert!(manager.db.get(&key).unwrap().is_some());
}
manager.drop_branch("multi_key", false).unwrap();
for i in 0..100 {
let key = encode_branch_data_key(branch_id, &format!("key_{}", i), 100);
assert!(manager.db.get(&key).unwrap().is_none());
}
}
}