dbx_core/storage/
versioned_batch.rs1use arrow::record_batch::RecordBatch;
6use std::sync::Arc;
7
8#[derive(Debug, Clone)]
14pub struct VersionedBatch {
15 pub data: Arc<RecordBatch>,
17
18 pub begin_ts: u64,
20
21 pub end_ts: Option<u64>,
23
24 pub sequence: u64,
26}
27
28impl VersionedBatch {
29 pub fn new(data: Arc<RecordBatch>, begin_ts: u64, sequence: u64) -> Self {
31 Self {
32 data,
33 begin_ts,
34 end_ts: None,
35 sequence,
36 }
37 }
38
39 pub fn mark_obsolete(&mut self, end_ts: u64) {
41 self.end_ts = Some(end_ts);
42 }
43
44 pub fn is_visible(&self, read_ts: u64) -> bool {
50 self.begin_ts <= read_ts && self.end_ts.is_none_or(|end| end > read_ts)
51 }
52
53 pub fn is_obsolete(&self) -> bool {
55 self.end_ts.is_some()
56 }
57
58 pub fn num_rows(&self) -> usize {
60 self.data.num_rows()
61 }
62
63 pub fn num_columns(&self) -> usize {
65 self.data.num_columns()
66 }
67}
68
69#[derive(Debug, Clone)]
71pub struct VersionInfo {
72 pub key: Vec<u8>,
74
75 pub batch_sequences: Vec<u64>,
78}
79
80impl VersionInfo {
81 pub fn new(key: Vec<u8>) -> Self {
83 Self {
84 key,
85 batch_sequences: Vec::new(),
86 }
87 }
88
89 pub fn add_version(&mut self, sequence: u64) {
93 match self
95 .batch_sequences
96 .binary_search_by(|s| s.cmp(&sequence).reverse())
97 {
98 Ok(_) => {} Err(pos) => self.batch_sequences.insert(pos, sequence),
100 }
101 }
102
103 pub fn get_visible_version(&self, batches: &[VersionedBatch], read_ts: u64) -> Option<u64> {
105 for &seq in &self.batch_sequences {
106 if let Some(batch) = batches.iter().find(|b| b.sequence == seq)
107 && batch.is_visible(read_ts)
108 {
109 return Some(seq);
110 }
111 }
112 None
113 }
114}
115
116#[cfg(test)]
117mod tests {
118 use super::*;
119 use arrow::array::{Int32Array, StringArray};
120 use arrow::datatypes::{DataType, Field, Schema};
121
122 fn create_test_batch() -> RecordBatch {
123 let schema = Arc::new(Schema::new(vec![
124 Field::new("id", DataType::Int32, false),
125 Field::new("name", DataType::Utf8, false),
126 ]));
127
128 let id_array = Int32Array::from(vec![1, 2, 3]);
129 let name_array = StringArray::from(vec!["Alice", "Bob", "Charlie"]);
130
131 RecordBatch::try_new(schema, vec![Arc::new(id_array), Arc::new(name_array)]).unwrap()
132 }
133
134 #[test]
135 fn test_versioned_batch_creation() {
136 let batch = create_test_batch();
137 let versioned = VersionedBatch::new(Arc::new(batch), 10, 1);
138
139 assert_eq!(versioned.begin_ts, 10);
140 assert_eq!(versioned.end_ts, None);
141 assert_eq!(versioned.sequence, 1);
142 assert_eq!(versioned.num_rows(), 3);
143 assert!(!versioned.is_obsolete());
144 }
145
146 #[test]
147 fn test_visibility() {
148 let batch = create_test_batch();
149 let mut versioned = VersionedBatch::new(Arc::new(batch), 10, 1);
150
151 assert!(!versioned.is_visible(5));
153 assert!(versioned.is_visible(10));
154 assert!(versioned.is_visible(15));
155
156 versioned.mark_obsolete(20);
158
159 assert!(!versioned.is_visible(5));
161 assert!(versioned.is_visible(10));
162 assert!(versioned.is_visible(15));
163 assert!(!versioned.is_visible(20));
164 assert!(!versioned.is_visible(25));
165 assert!(versioned.is_obsolete());
166 }
167
168 #[test]
169 fn test_version_info() {
170 let mut info = VersionInfo::new(b"key1".to_vec());
171
172 info.add_version(1);
173 info.add_version(3);
174 info.add_version(2);
175
176 assert_eq!(info.batch_sequences, vec![3, 2, 1]);
178 }
179
180 #[test]
181 fn test_get_visible_version() {
182 let batch1 = create_test_batch();
183 let batch2 = create_test_batch();
184 let batch3 = create_test_batch();
185
186 let mut v1 = VersionedBatch::new(Arc::new(batch1), 10, 1);
187 let mut v2 = VersionedBatch::new(Arc::new(batch2), 20, 2);
188 let v3 = VersionedBatch::new(Arc::new(batch3), 30, 3);
189
190 v1.mark_obsolete(20); v2.mark_obsolete(30); let batches = vec![v1, v2, v3];
194
195 let mut info = VersionInfo::new(b"key1".to_vec());
196 info.add_version(1);
197 info.add_version(2);
198 info.add_version(3);
199
200 assert_eq!(info.get_visible_version(&batches, 15), Some(1));
202
203 assert_eq!(info.get_visible_version(&batches, 25), Some(2));
205
206 assert_eq!(info.get_visible_version(&batches, 35), Some(3));
208
209 assert_eq!(info.get_visible_version(&batches, 5), None);
211 }
212
213 #[test]
214 fn test_arc_sharing() {
215 let batch = Arc::new(create_test_batch());
216
217 let v1 = VersionedBatch::new(Arc::clone(&batch), 10, 1);
218 let v2 = VersionedBatch::new(Arc::clone(&batch), 20, 2);
219
220 assert_eq!(Arc::strong_count(&batch), 3); assert_eq!(v1.num_rows(), v2.num_rows());
223 }
224}