1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
//! Time-travel extensions for StorageEngine
//!
//! This module provides extension methods for time-travel query support.
use super::{StorageEngine, SnapshotManager};
use crate::{Error, Result, Tuple};
use rocksdb::{IteratorMode, ReadOptions};
use tracing::debug;
impl StorageEngine {
/// Get a reference to the snapshot manager
pub fn snapshot_manager(&self) -> &SnapshotManager {
&self.snapshot_manager
}
/// Scan table at a specific snapshot (for time-travel queries)
///
/// Returns tuples as they existed at the given snapshot timestamp.
pub fn scan_table_at_snapshot(&self, table_name: &str, snapshot_ts: u64) -> Result<Vec<Tuple>> {
let prefix = format!("data:{}:", table_name);
let prefix_bytes = prefix.as_bytes();
let mut tuples = Vec::new();
let mut seen_rows = std::collections::HashSet::new();
// First, scan current data to get all row IDs
// Use total_order_seek to bypass prefix bloom filter for full table scans
let mut read_opts = ReadOptions::default();
read_opts.set_total_order_seek(true);
let iter = self.db.iterator_opt(IteratorMode::Start, read_opts);
for item in iter {
let (key, _) = item.map_err(|e| Error::storage(format!("Iterator error: {}", e)))?;
if key.starts_with(prefix_bytes) {
// Parse row ID from key: data:{table}:{row_id}
if let Ok(key_str) = std::str::from_utf8(&key) {
if let Some(row_id_str) = key_str.strip_prefix(&prefix) {
if let Ok(row_id) = row_id_str.parse::<u64>() {
seen_rows.insert(row_id);
}
}
}
} else if !key.is_empty() && key[0] > prefix_bytes[0] {
break;
}
}
// For each row, read the version at the snapshot
for row_id in seen_rows {
if let Some(value) = self.snapshot_manager.read_at_snapshot(table_name, row_id, snapshot_ts)? {
let tuple: Tuple = bincode::deserialize(&value)
.map_err(|e| Error::storage(format!("Failed to deserialize tuple: {}", e)))?;
tuples.push(tuple);
}
}
Ok(tuples)
}
/// Insert tuple with versioning (for time-travel support)
///
/// This creates a new version of the tuple at the current timestamp,
/// enabling AS OF TIMESTAMP/TRANSACTION/SCN queries.
///
/// This method is automatically called by insert_tuple() when
/// time_travel_enabled is true (default). It can also be called directly
/// to force versioning even when automatic versioning is disabled.
///
/// Implementation details:
/// - Writes current version to main data key (for fast non-time-travel queries)
/// - Writes versioned copy to snapshot storage (for time-travel queries)
/// - Registers snapshot metadata (timestamp, transaction ID, SCN)
/// - Triggers automatic garbage collection if needed
pub fn insert_tuple_versioned(&self, table_name: &str, tuple: Tuple) -> Result<u64> {
use super::Catalog;
let catalog = Catalog::new(self);
let row_id = catalog.next_row_id(table_name)?;
// Get table schema for compression
let schema = catalog.get_table_schema(table_name)?;
// Calculate original tuple size for metrics
let original_size = bincode::serialize(&tuple)
.map_err(|e| Error::storage(format!("Failed to calculate tuple size: {}", e)))?
.len();
// Check if compression is disabled for this table
let compression_enabled = catalog.get_compression_config(table_name)?
.map(|config| config.enabled)
.unwrap_or(true); // Default to enabled if no config
// Compress tuple using per-column compression (or skip if disabled)
let compressed = if compression_enabled {
super::compression::compress_tuple(
&tuple,
&schema,
table_name,
&self.compression_manager,
)?
} else {
// Compression disabled - store uncompressed with None codec
super::compression::CompressedTuple {
values: tuple.values.iter().map(|v| {
let mut result = vec![super::compression::CompressionCodec::None.to_u8()];
let value_bytes = bincode::serialize(v).unwrap_or_default();
result.extend_from_slice(&value_bytes);
result
}).collect(),
}
};
// Serialize compressed tuple
let value = bincode::serialize(&compressed)
.map_err(|e| Error::storage(format!("Failed to serialize compressed tuple: {}", e)))?;
// Track compression metrics per column
for (i, column) in schema.columns.iter().enumerate() {
if i < tuple.values.len() && i < compressed.values.len() {
let orig_col_size = bincode::serialize(&tuple.values[i])
.map(|v| v.len())
.unwrap_or(0);
let compressed_col_size = compressed.values[i].len();
// Extract codec from compressed value (first byte)
let codec = if !compressed.values[i].is_empty() {
super::compression::CompressionCodec::from_u8(compressed.values[i][0])
.unwrap_or(super::compression::CompressionCodec::None)
} else {
super::compression::CompressionCodec::None
};
self.compression_manager.update_stats(
table_name,
&column.name,
orig_col_size,
compressed_col_size,
codec,
);
}
}
// Get current timestamp (for MVCC)
let timestamp = self.next_timestamp();
debug!("insert_tuple_versioned: table={}, row_id={}, timestamp={}", table_name, row_id, timestamp);
// Write current version (for fast non-time-travel queries)
// This ensures zero-performance overhead for normal queries
let current_key = format!("data:{}:{}", table_name, row_id).into_bytes();
self.put(¤t_key, &value)?;
// Write versioned copy (for time-travel queries)
// This enables AS OF queries with O(1) snapshot lookup
self.snapshot_manager.write_version(table_name, row_id, timestamp, &value)?;
// Register snapshot with tri-modal resolution support
// (Timestamp, Transaction ID, SCN)
let _ = self.snapshot_manager.register_snapshot(timestamp);
Ok(row_id)
}
}