RockSolid Store
RockSolid Store is a Rust library providing a robust, ergonomic, and opinionated persistence layer on top of the powerful RocksDB embedded key-value store. It aims to simplify common database interactions, focusing on ease of use, automatic serialization, transactions, batching, merge operations, and key expiration, while offering flexibility for advanced use cases.
Features ✨
- Typed Store: Provides
RocksDbStore<DB> (non-transactional) and RocksDbTxnStore (transactional alias for RocksDbStore<TransactionDB>) for type-safe interactions.
- Simplified API: Offers clear methods for common operations (
get, set, merge, delete, multiget, iterators).
- Automatic Serialization: Seamlessly serialize/deserialize keys (using
bytevec) and values (using MessagePack via rmp_serde) with minimal boilerplate via serde.
- Value Expiry: Built-in support for associating expiry timestamps (Unix epoch seconds) with values using
ValueWithExpiry<T>. (Note: Application logic is needed to check expiry; automatic TTL compaction is not yet implemented).
- Transactional Support: Provides access to RocksDB's Pessimistic Transactions with multiple management styles:
- Transaction Context: A convenient builder-like context (
transaction_context()) for managing operations within a single transaction.
- Execute Closure: Automatically commit/rollback based on closure success/failure (
execute_transaction()).
- Manual Control: Traditional
begin_transaction(), commit(), rollback().
- Support for
WriteBatchTransaction for batching within transactions (useful for optimistic locking patterns).
- Atomic Batch Writes: Efficiently group multiple write operations (
set, delete, merge) into atomic non-transactional batches using the fluent BatchWriter API.
- Configurable Merge Operators: Supports standard merge operators and includes a flexible Merge Router (
MergeRouterBuilder) to dispatch merge operations based on key patterns using matchit.
- Configuration: Easy configuration of core RocksDB options (path, compression, WAL mode, parallelism, merge operators, prefix extractors etc.) via
RocksDbConfig. Escape hatch for advanced options via custom_options.
- Error Handling: Uses a clear
StoreError enum and StoreResult<T> for robust error management.
- Utilities: Includes basic utilities for database backup (checkpointing) and migration (
backup_db, migrate_db).
- Helper Macros: Optional macros (
generate_dao_*) to reduce boilerplate for common data access patterns (see macros.rs).
Installation ⚙️
Add rocksolid to your Cargo.toml:
[dependencies]
rocksolid = "1.0"
serde = { version = "1.0", features = ["derive"] }
RockSolid relies on the rocksdb crate. Ensure you have the necessary system dependencies for rocksdb (like clang, libclang-dev, llvm-dev). See the rust-rocksdb documentation for details.
Quick Start 🚀
use rocksolid::{RocksDbStore, RocksDbConfig, StoreResult, StoreError};
use serde::{Serialize, Deserialize};
use tempfile::tempdir;
#[derive(Serialize, Deserialize, Debug, PartialEq)]
struct User {
id: u32,
name: String,
}
fn main() -> StoreResult<()> {
let temp_dir = tempdir().expect("Failed to create temp dir");
let db_path = temp_dir.path().join("quickstart_db");
let config = RocksDbConfig {
path: db_path.to_str().unwrap().to_string(),
create_if_missing: true,
..Default::default()
};
let store = RocksDbStore::open(config)?;
let user = User { id: 1, name: "Alice".to_string() };
let user_key = format!("user:{}", user.id);
store.set(&user_key, &user)?;
let retrieved_user: Option<User> = store.get(&user_key)?; assert_eq!(retrieved_user.as_ref(), Some(&user));
println!("Retrieved User: {:?}", retrieved_user);
store.remove(&user_key)?;
let deleted_user: Option<User> = store.get(&user_key)?;
assert!(deleted_user.is_none());
println!("User after deletion: {:?}", deleted_user);
let user2 = User { id: 2, name: "Bob".into() };
let mut writer = store.batch_writer();
writer.set_raw("config:enabled", b"true")? .set("user:2", &user2)?; writer.commit()?;
assert!(store.get_raw("config:enabled")?.is_some());
assert!(store.get::<_, User>("user:2")?.is_some());
println!("Batch committed successfully.");
Ok(())
}
Transactional Usage 🏦
Uses RocksDbTxnStore (alias for RocksDbStore<TransactionDB>) and TransactionContext.
use rocksolid::{RocksDbStore, RocksDbConfig, RocksDbTxnStore, StoreResult, StoreError};
use serde::{Serialize, Deserialize};
use tempfile::tempdir;
#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
struct Account {
id: String,
balance: i64,
}
fn transfer_funds(store: &RocksDbTxnStore, from_acc_key: &str, to_acc_key: &str, amount: i64) -> StoreResult<()> {
if amount <= 0 {
return Err(StoreError::Other("Transfer amount must be positive".into()));
}
let mut ctx = store.transaction_context();
let mut from_account: Account = ctx
.get(from_acc_key)?
.ok_or_else(|| StoreError::Other(format!("Account not found: {}", from_acc_key)))?;
let mut to_account: Account = ctx
.get(to_acc_key)?
.ok_or_else(|| StoreError::Other(format!("Account not found: {}", to_acc_key)))?;
if from_account.balance < amount {
ctx.rollback()?;
return Err(StoreError::Other(format!(
"Insufficient funds in account {}",
from_account.id
)));
}
from_account.balance -= amount;
to_account.balance += amount;
ctx.set(from_acc_key, &from_account)?
.set(to_acc_key, &to_account)?;
ctx.commit()?;
println!("Transfer successful!");
Ok(())
}
fn main() -> StoreResult<()> {
let temp_dir = tempdir().expect("Failed to create temp dir");
let db_path = temp_dir.path().join("txn_example_db");
let config = RocksDbConfig { path: db_path.to_str().unwrap().to_string(), create_if_missing: true, ..Default::default() };
let txn_store = RocksDbStore::open_transactional(config)?;
let acc_a_key = "acc:A";
let acc_b_key = "acc:B";
txn_store.set(acc_a_key, &Account { id: "A".into(), balance: 100 })?;
txn_store.set(acc_b_key, &Account { id: "B".into(), balance: 50 })?;
println!("Initialized accounts.");
transfer_funds(&txn_store, acc_a_key, acc_b_key, 25)?;
let acc_a: Option<Account> = txn_store.get(acc_a_key)?;
let acc_b: Option<Account> = txn_store.get(acc_b_key)?;
println!("Final Balances: A={:?}, B={:?}", acc_a, acc_b);
assert_eq!(acc_a.unwrap().balance, 75);
assert_eq!(acc_b.unwrap().balance, 75);
match transfer_funds(&txn_store, acc_a_key, acc_b_key, 100) {
Ok(_) => println!("Transfer 2 finished (unexpectedly)."),
Err(e) => eprintln!("Transfer 2 failed as expected: {}", e),
}
let acc_a_final: Option<Account> = txn_store.get(acc_a_key)?;
let acc_b_final: Option<Account> = txn_store.get(acc_b_key)?;
assert_eq!(acc_a_final.unwrap().balance, 75);
assert_eq!(acc_b_final.unwrap().balance, 75);
Ok(())
}
Merge Operators & Routing 🔄
Configure merge operators, including the key-based router, in RocksDbConfig. This allows atomic read-modify-write operations handled by RocksDB.
use rocksolid::{
RocksDbConfig, RocksDbStore, StoreResult, MergeValue, MergeValueOperator,
config::MergeOperatorConfig,
merge_routing::{MergeRouterBuilder, MergeRouteHandlerFn}
};
use rocksdb::MergeOperands;
use serde::{Serialize, Deserialize};
use std::collections::HashSet;
use matchit::Params; use tempfile::tempdir;
#[derive(Serialize, Deserialize, Debug)]
struct SimpleSet(HashSet<String>);
fn set_union_full_handler(
_key: &[u8], existing_val: Option<&[u8]>, operands: &MergeOperands, _params: &Params
) -> Option<Vec<u8>> {
let mut current_set: HashSet<String> = existing_val
.and_then(|v| rocksolid::deserialize_value::<SimpleSet>(v).ok())
.map(|s| s.0)
.unwrap_or_default();
for op in operands { if let Ok(operand_set) = rocksolid::deserialize_value::<SimpleSet>(op) {
current_set.extend(operand_set.0);
}
}
rocksolid::serialize_value(&SimpleSet(current_set)).ok() }
fn set_union_partial_handler(
_key: &[u8], _existing: Option<&[u8]>, operands: &MergeOperands, _params: &Params
) -> Option<Vec<u8>> {
let mut combined_set: HashSet<String> = HashSet::new();
for op in operands {
if let Ok(operand_set) = rocksolid::deserialize_value::<SimpleSet>(op) {
combined_set.extend(operand_set.0);
}
}
rocksolid::serialize_value(&SimpleSet(combined_set)).ok()
}
fn main() -> StoreResult<()> {
let temp_dir = tempdir().expect("Failed to create temp dir");
let db_path = temp_dir.path().join("merge_db");
let mut router_builder = MergeRouterBuilder::new();
router_builder.operator_name("MySetRouter"); router_builder.add_route(
"/sets/:set_name", Some(set_union_full_handler),
Some(set_union_partial_handler),
)?; let router_config = router_builder.build()?;
let config = RocksDbConfig {
path: db_path.to_str().unwrap().to_string(),
create_if_missing: true,
merge_operators: vec![router_config], ..Default::default()
};
let store = RocksDbStore::open(config)?;
let set_key = "/sets/online_users";
let mut set1 = HashSet::new(); set1.insert("alice".to_string());
let mut set2 = HashSet::new(); set2.insert("bob".to_string()); set2.insert("alice".to_string());
store.merge(set_key, &MergeValue(MergeValueOperator::SetUnion, &SimpleSet(set1)))?;
store.merge(set_key, &MergeValue(MergeValueOperator::SetUnion, &SimpleSet(set2)))?;
let final_set_wrapper: Option<SimpleSet> = store.get(set_key)?;
let final_set = final_set_wrapper.unwrap().0;
println!("Final Set: {:?}", final_set);
assert_eq!(final_set.len(), 2);
assert!(final_set.contains("alice"));
assert!(final_set.contains("bob"));
Ok(())
}
Key/Value Expiry ⏱️
Store values with an expiry timestamp.
use rocksolid::{RocksDbConfig, RocksDbStore, StoreResult, ValueWithExpiry};
use serde::{Deserialize, Serialize};
use tempfile::tempdir;
use std::{thread, time::{Duration, SystemTime, UNIX_EPOCH}};
#[derive(Serialize, Deserialize, Debug, PartialEq)]
struct SessionData { user_id: u32, token: String }
fn main() -> StoreResult<()> {
let temp_dir = tempdir().expect("Failed to create temp dir");
let db_path = temp_dir.path().join("expiry_db");
let config = RocksDbConfig { path: db_path.to_str().unwrap().to_string(), create_if_missing: true, ..Default::default() };
let store = RocksDbStore::open(config)?;
let session = SessionData { user_id: 123, token: "abc".to_string() };
let session_key = "session:123";
let now_secs = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs();
let expire_time = now_secs + 2;
store.set_with_expiry(session_key, &session, expire_time)?;
println!("Set session with expiry at epoch second {}", expire_time);
let retrieved_expiry: Option<ValueWithExpiry<SessionData>> = store.get_with_expiry(session_key)?;
assert!(retrieved_expiry.is_some());
let val_exp = retrieved_expiry.unwrap();
assert_eq!(val_exp.expire_time, expire_time);
let inner_session: SessionData = val_exp.get()?;
assert_eq!(inner_session, session);
println!("Retrieved session data: {:?}", inner_session);
println!("Waiting for expiry...");
thread::sleep(Duration::from_secs(3));
let retrieved_after_expiry: Option<ValueWithExpiry<SessionData>> = store.get_with_expiry(session_key)?;
if let Some(val_exp) = retrieved_after_expiry {
let current_time_secs = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs();
if val_exp.expire_time < current_time_secs {
println!("Key '{}' has expired (Expiry: {}, Current: {})", session_key, val_exp.expire_time, current_time_secs);
} else {
println!("Key '{}' still valid.", session_key);
}
} else {
println!("Key '{}' not found after expiry check.", session_key);
}
Ok(())
}
Configuration (RocksDbConfig) ⚙️
The RocksDbConfig struct allows tuning various RocksDB parameters:
use rocksolid::{RocksDbConfig, config::CompressionType};
let config = RocksDbConfig {
path: "/path/to/your/db".to_string(),
create_if_missing: true,
parallelism: num_cpus::get().max(2), compression: CompressionType::Lz4, enable_statistics: false, ..Default::default() };
Refer to the config.rs module and the RocksDB Tuning Guide for more details on options.
API Reference 📚
For a detailed list of functions and types, please refer to the generated rustdoc documentation (link assumes crate is published).
Key components:
RocksDbStore<DB> / RocksDbTxnStore: The main store handles (see methods in examples).
BatchWriter: .set(), .set_raw(), .set_with_expiry(), .merge(), .merge_raw(), .delete(), .delete_range(), .raw_batch_mut(), .commit(), .discard().
TransactionContext: .set(), .set_raw(), .set_with_expiry(), .merge(), .merge_raw(), .delete(), .get(), .get_raw(), .get_with_expiry(), .exists(), .tx() (immutable access to raw Tx), .tx_mut() (mutable access to raw Tx), .commit(), .rollback().
- Configuration:
RocksDbConfig, config::CompressionType, config::MergeOperatorConfig, merge_routing::MergeRouterBuilder.
- Types:
ValueWithExpiry<T>, MergeValue<PatchVal>, MergeValueOperator, StoreError, StoreResult<T>.
- Serialization:
serialization::serialize_key, serialization::deserialize_key, serialization::serialize_value, serialization::deserialize_value.
- Macros:
generate_dao_* helpers in macros.rs.
Running Examples
The examples from the examples/ directory can be run using Cargo:
cargo run --example basic_usage
cargo run --example batching
cargo run --example transactional
cargo run --example merge_router
Contributing 🤝
Contributions are welcome! Please feel free to submit issues and pull requests.
Potential areas for contribution:
- Adding more examples and documentation.
- Implementing support for Column Families.
- Adding TTL compaction filter based on
ValueWithExpiry.
- Expanding utility functions (e.g., backup compression, restore).
- Improving error handling and reporting.
- Adding support for Optimistic Transactions.
- Performance benchmarks and tuning guides.
Please follow standard Rust coding conventions and ensure tests pass (cargo test).
License 📜
This project is licensed under the Mozilla Public License Version 2.0. See the LICENSE file for details.