1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
//! Content-addressed storage module for deduplicating large values
//!
//! Provides hash-based deduplication for large text and binary values.
//! Values above a threshold size are stored separately with their Blake3
//! hash as the key, allowing automatic deduplication.
//!
//! # Key Format
//!
//! ```text
//! cas:{blake3_hash} -> original value bytes
//! ```
//!
//! # Example
//!
//! ```sql
//! CREATE TABLE documents (
//! id INT PRIMARY KEY,
//! content TEXT STORAGE CONTENT_ADDRESSED -- Large text, deduplicated
//! );
//! ```
use blake3;
use rocksdb::DB;
use crate::{DataType, Error, Result, Value};
/// Minimum size (in bytes) for content-addressed storage
/// Values smaller than this are stored inline (no deduplication benefit)
pub const CAS_MIN_SIZE: usize = 1024;
/// Content-addressed storage manager
///
/// Provides methods to store and retrieve large values using their
/// content hash as the key.
pub struct ContentAddressedStore;
impl ContentAddressedStore {
/// Build the RocksDB key for a content hash
///
/// Format: `cas:{32-byte-hash}`
fn cas_key(hash: &[u8; 32]) -> Vec<u8> {
let mut key = Vec::with_capacity(36);
key.extend_from_slice(b"cas:");
key.extend_from_slice(hash);
key
}
/// Store a value if it's large enough, returning a CasRef or the original value
///
/// # Arguments
/// * `db` - RocksDB instance for persistence
/// * `value` - Value to potentially store in CAS
///
/// # Returns
/// * `Value::CasRef` if the value was stored in CAS
/// * Original value if it's too small for CAS (< 1KB)
pub fn maybe_store(db: &DB, value: &Value) -> Result<Value> {
let bytes = match value {
Value::String(s) if s.len() >= CAS_MIN_SIZE => s.as_bytes(),
Value::Bytes(b) if b.len() >= CAS_MIN_SIZE => b.as_slice(),
_ => return Ok(value.clone()),
};
// Compute Blake3 hash
let hash: [u8; 32] = blake3::hash(bytes).into();
let key = Self::cas_key(&hash);
// Store if not already present (idempotent)
let exists = db.get(&key)
.map_err(|e| Error::storage(format!("CAS lookup failed: {}", e)))?
.is_some();
if !exists {
db.put(&key, bytes)
.map_err(|e| Error::storage(format!("CAS store failed: {}", e)))?;
}
Ok(Value::CasRef { hash })
}
/// Store a value unconditionally (for migration), returning the CasRef
///
/// Unlike `maybe_store`, this always stores the value regardless of size.
pub fn store(db: &DB, value: &Value) -> Result<Value> {
let bytes = match value {
Value::String(s) => s.as_bytes(),
Value::Bytes(b) => b.as_slice(),
Value::Null => return Ok(value.clone()),
_ => return Err(Error::storage("CAS only supports String and Bytes types")),
};
// Compute Blake3 hash
let hash: [u8; 32] = blake3::hash(bytes).into();
let key = Self::cas_key(&hash);
// Store if not already present (idempotent)
let exists = db.get(&key)
.map_err(|e| Error::storage(format!("CAS lookup failed: {}", e)))?
.is_some();
if !exists {
db.put(&key, bytes)
.map_err(|e| Error::storage(format!("CAS store failed: {}", e)))?;
}
Ok(Value::CasRef { hash })
}
/// Resolve a CasRef back to its original value
///
/// # Arguments
/// * `db` - RocksDB instance for retrieval
/// * `hash` - Blake3 hash of the original value
/// * `target_type` - Expected data type (Text or Bytea)
///
/// # Returns
/// The original value (String or Bytes based on target_type)
pub fn resolve(db: &DB, hash: &[u8; 32], target_type: &DataType) -> Result<Value> {
let key = Self::cas_key(hash);
let bytes = db.get(&key)
.map_err(|e| Error::storage(format!("CAS resolve failed: {}", e)))?
.ok_or_else(|| Error::storage(format!(
"CAS reference not found: {}",
hex::encode(&hash[..8])
)))?;
match target_type {
DataType::Text | DataType::Varchar(_) | DataType::Char(_) | DataType::Json | DataType::Jsonb => {
let s = String::from_utf8(bytes.to_vec())
.map_err(|e| Error::storage(format!("CAS value not valid UTF-8: {}", e)))?;
Ok(Value::String(s))
}
DataType::Bytea => Ok(Value::Bytes(bytes.to_vec())),
_ => Err(Error::storage(format!(
"Invalid CAS target type: {}. Expected Text or Bytea.",
target_type
))),
}
}
/// Check if a CAS reference exists in storage
pub fn exists(db: &DB, hash: &[u8; 32]) -> Result<bool> {
let key = Self::cas_key(hash);
db.get(&key)
.map(|opt| opt.is_some())
.map_err(|e| Error::storage(format!("CAS exists check failed: {}", e)))
}
/// Get the size of a stored CAS value
pub fn get_size(db: &DB, hash: &[u8; 32]) -> Result<Option<usize>> {
let key = Self::cas_key(hash);
db.get(&key)
.map(|opt| opt.map(|v| v.len()))
.map_err(|e| Error::storage(format!("CAS size check failed: {}", e)))
}
/// Delete a CAS entry (use with caution - may break references)
///
/// This should only be used during garbage collection after verifying
/// no references exist.
pub fn delete(db: &DB, hash: &[u8; 32]) -> Result<()> {
let key = Self::cas_key(hash);
db.delete(&key)
.map_err(|e| Error::storage(format!("CAS delete failed: {}", e)))
}
/// Compute the hash for a value without storing it
pub fn compute_hash(value: &Value) -> Option<[u8; 32]> {
let bytes = match value {
Value::String(s) => s.as_bytes(),
Value::Bytes(b) => b.as_slice(),
_ => return None,
};
Some(blake3::hash(bytes).into())
}
}
/// Hex encoding module for display purposes
mod hex {
pub fn encode(bytes: &[u8]) -> String {
use std::fmt::Write;
let mut s = String::with_capacity(bytes.len() * 2);
for b in bytes {
let _ = write!(s, "{:02x}", b);
}
s
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
fn test_db() -> (TempDir, DB) {
let dir = TempDir::new().unwrap();
let db = DB::open_default(dir.path()).unwrap();
(dir, db)
}
#[test]
fn test_cas_small_value_not_stored() {
let (_dir, db) = test_db();
// Small value should be returned as-is
let small = Value::String("hello".to_string());
let result = ContentAddressedStore::maybe_store(&db, &small).unwrap();
match result {
Value::String(s) => assert_eq!(s, "hello"),
_ => panic!("Expected original String value"),
}
}
#[test]
fn test_cas_large_value_stored() {
let (_dir, db) = test_db();
// Large value should be stored and return CasRef
let large = Value::String("x".repeat(2000));
let result = ContentAddressedStore::maybe_store(&db, &large).unwrap();
match result {
Value::CasRef { hash } => {
// Resolve should return the original value
let resolved = ContentAddressedStore::resolve(&db, &hash, &DataType::Text).unwrap();
match resolved {
Value::String(s) => assert_eq!(s, "x".repeat(2000)),
_ => panic!("Expected String value"),
}
}
_ => panic!("Expected CasRef"),
}
}
#[test]
fn test_cas_deduplication() {
let (_dir, db) = test_db();
// Store same content twice
let content = Value::String("y".repeat(2000));
let ref1 = ContentAddressedStore::maybe_store(&db, &content).unwrap();
let ref2 = ContentAddressedStore::maybe_store(&db, &content).unwrap();
// Both should return the same hash
match (ref1, ref2) {
(Value::CasRef { hash: h1 }, Value::CasRef { hash: h2 }) => {
assert_eq!(h1, h2);
}
_ => panic!("Expected CasRef values"),
}
}
#[test]
fn test_cas_bytes() {
let (_dir, db) = test_db();
// Large bytes should be stored
let large = Value::Bytes(vec![0u8; 2000]);
let result = ContentAddressedStore::maybe_store(&db, &large).unwrap();
match result {
Value::CasRef { hash } => {
// Resolve as Bytea
let resolved = ContentAddressedStore::resolve(&db, &hash, &DataType::Bytea).unwrap();
match resolved {
Value::Bytes(b) => assert_eq!(b.len(), 2000),
_ => panic!("Expected Bytes value"),
}
}
_ => panic!("Expected CasRef"),
}
}
#[test]
fn test_cas_compute_hash() {
let value = Value::String("test content".to_string());
let hash1 = ContentAddressedStore::compute_hash(&value);
let hash2 = ContentAddressedStore::compute_hash(&value);
assert!(hash1.is_some());
assert_eq!(hash1, hash2); // Same content = same hash
}
}