Skip to main content

dbx_core/storage/
versioned_batch.rs

1//! Versioned RecordBatch for MVCC Columnar Storage.
2//!
3//! Combines Arrow RecordBatch with MVCC version metadata for Snapshot Isolation.
4
5use arrow::record_batch::RecordBatch;
6use std::sync::Arc;
7
8/// A versioned RecordBatch with MVCC metadata.
9///
10/// Stores an immutable Arrow RecordBatch along with version information
11/// for Snapshot Isolation. Multiple versions can share the same RecordBatch
12/// via Arc, reducing memory overhead.
13#[derive(Debug, Clone)]
14pub struct VersionedBatch {
15    /// The actual columnar data (immutable, shared via Arc)
16    pub data: Arc<RecordBatch>,
17
18    /// Transaction timestamp when this batch became visible
19    pub begin_ts: u64,
20
21    /// Transaction timestamp when this batch became obsolete (None = still active)
22    pub end_ts: Option<u64>,
23
24    /// Batch sequence number for ordering within the same timestamp
25    pub sequence: u64,
26}
27
28impl VersionedBatch {
29    /// Create a new versioned batch.
30    pub fn new(data: Arc<RecordBatch>, begin_ts: u64, sequence: u64) -> Self {
31        Self {
32            data,
33            begin_ts,
34            end_ts: None,
35            sequence,
36        }
37    }
38
39    /// Mark this batch as obsolete at the given timestamp.
40    pub fn mark_obsolete(&mut self, end_ts: u64) {
41        self.end_ts = Some(end_ts);
42    }
43
44    /// Check if this batch is visible to a snapshot at the given read timestamp.
45    ///
46    /// A batch is visible if:
47    /// - begin_ts <= read_ts
48    /// - end_ts is None OR end_ts > read_ts
49    pub fn is_visible(&self, read_ts: u64) -> bool {
50        self.begin_ts <= read_ts && self.end_ts.is_none_or(|end| end > read_ts)
51    }
52
53    /// Check if this batch is obsolete (has an end_ts).
54    pub fn is_obsolete(&self) -> bool {
55        self.end_ts.is_some()
56    }
57
58    /// Get the number of rows in this batch.
59    pub fn num_rows(&self) -> usize {
60        self.data.num_rows()
61    }
62
63    /// Get the number of columns in this batch.
64    pub fn num_columns(&self) -> usize {
65        self.data.num_columns()
66    }
67}
68
69/// Metadata for tracking versions of a specific key or row.
70#[derive(Debug, Clone)]
71pub struct VersionInfo {
72    /// The key or row identifier
73    pub key: Vec<u8>,
74
75    /// List of batch sequences that contain versions of this key
76    /// Sorted by (begin_ts, sequence) in descending order (newest first)
77    pub batch_sequences: Vec<u64>,
78}
79
80impl VersionInfo {
81    /// Create new version info for a key.
82    pub fn new(key: Vec<u8>) -> Self {
83        Self {
84            key,
85            batch_sequences: Vec::new(),
86        }
87    }
88
89    /// Add a new version (batch sequence) for this key.
90    ///
91    /// Maintains descending order by timestamp.
92    pub fn add_version(&mut self, sequence: u64) {
93        // Insert in descending order
94        match self
95            .batch_sequences
96            .binary_search_by(|s| s.cmp(&sequence).reverse())
97        {
98            Ok(_) => {} // Already exists
99            Err(pos) => self.batch_sequences.insert(pos, sequence),
100        }
101    }
102
103    /// Get the latest version visible to the given read timestamp.
104    pub fn get_visible_version(&self, batches: &[VersionedBatch], read_ts: u64) -> Option<u64> {
105        for &seq in &self.batch_sequences {
106            if let Some(batch) = batches.iter().find(|b| b.sequence == seq)
107                && batch.is_visible(read_ts)
108            {
109                return Some(seq);
110            }
111        }
112        None
113    }
114}
115
116#[cfg(test)]
117mod tests {
118    use super::*;
119    use arrow::array::{Int32Array, StringArray};
120    use arrow::datatypes::{DataType, Field, Schema};
121
122    fn create_test_batch() -> RecordBatch {
123        let schema = Arc::new(Schema::new(vec![
124            Field::new("id", DataType::Int32, false),
125            Field::new("name", DataType::Utf8, false),
126        ]));
127
128        let id_array = Int32Array::from(vec![1, 2, 3]);
129        let name_array = StringArray::from(vec!["Alice", "Bob", "Charlie"]);
130
131        RecordBatch::try_new(schema, vec![Arc::new(id_array), Arc::new(name_array)]).unwrap()
132    }
133
134    #[test]
135    fn test_versioned_batch_creation() {
136        let batch = create_test_batch();
137        let versioned = VersionedBatch::new(Arc::new(batch), 10, 1);
138
139        assert_eq!(versioned.begin_ts, 10);
140        assert_eq!(versioned.end_ts, None);
141        assert_eq!(versioned.sequence, 1);
142        assert_eq!(versioned.num_rows(), 3);
143        assert!(!versioned.is_obsolete());
144    }
145
146    #[test]
147    fn test_visibility() {
148        let batch = create_test_batch();
149        let mut versioned = VersionedBatch::new(Arc::new(batch), 10, 1);
150
151        // Visible to snapshots at ts >= 10
152        assert!(!versioned.is_visible(5));
153        assert!(versioned.is_visible(10));
154        assert!(versioned.is_visible(15));
155
156        // Mark obsolete at ts=20
157        versioned.mark_obsolete(20);
158
159        // Now only visible to snapshots in [10, 20)
160        assert!(!versioned.is_visible(5));
161        assert!(versioned.is_visible(10));
162        assert!(versioned.is_visible(15));
163        assert!(!versioned.is_visible(20));
164        assert!(!versioned.is_visible(25));
165        assert!(versioned.is_obsolete());
166    }
167
168    #[test]
169    fn test_version_info() {
170        let mut info = VersionInfo::new(b"key1".to_vec());
171
172        info.add_version(1);
173        info.add_version(3);
174        info.add_version(2);
175
176        // Should be sorted in descending order
177        assert_eq!(info.batch_sequences, vec![3, 2, 1]);
178    }
179
180    #[test]
181    fn test_get_visible_version() {
182        let batch1 = create_test_batch();
183        let batch2 = create_test_batch();
184        let batch3 = create_test_batch();
185
186        let mut v1 = VersionedBatch::new(Arc::new(batch1), 10, 1);
187        let mut v2 = VersionedBatch::new(Arc::new(batch2), 20, 2);
188        let v3 = VersionedBatch::new(Arc::new(batch3), 30, 3);
189
190        v1.mark_obsolete(20); // Obsolete at ts=20
191        v2.mark_obsolete(30); // Obsolete at ts=30
192
193        let batches = vec![v1, v2, v3];
194
195        let mut info = VersionInfo::new(b"key1".to_vec());
196        info.add_version(1);
197        info.add_version(2);
198        info.add_version(3);
199
200        // At ts=15, should see version 1
201        assert_eq!(info.get_visible_version(&batches, 15), Some(1));
202
203        // At ts=25, should see version 2
204        assert_eq!(info.get_visible_version(&batches, 25), Some(2));
205
206        // At ts=35, should see version 3
207        assert_eq!(info.get_visible_version(&batches, 35), Some(3));
208
209        // At ts=5, nothing visible
210        assert_eq!(info.get_visible_version(&batches, 5), None);
211    }
212
213    #[test]
214    fn test_arc_sharing() {
215        let batch = Arc::new(create_test_batch());
216
217        let v1 = VersionedBatch::new(Arc::clone(&batch), 10, 1);
218        let v2 = VersionedBatch::new(Arc::clone(&batch), 20, 2);
219
220        // Both versions share the same RecordBatch
221        assert_eq!(Arc::strong_count(&batch), 3); // original + v1 + v2
222        assert_eq!(v1.num_rows(), v2.num_rows());
223    }
224}