1use crate::structured_db::{
2 StructuredColumnValue, StructuredDbIterator, StructuredSchema, combined_resolver, decode_row,
3 load_structured_schema_from_cobble_schema, project_decoded_row_for_read,
4 project_structured_schema_for_scan,
5};
6use cobble::{
7 GlobalSnapshotManifest, GlobalSnapshotSummary, ReadOnlyDb, ReadOptions, Reader, ReaderConfig,
8 Result, ScanOptions, VolumeDescriptor,
9};
10use std::ops::Range;
11use std::sync::Arc;
12
13pub struct StructuredReader {
14 reader: Reader,
15 structured_schema: Arc<StructuredSchema>,
16}
17
18impl StructuredReader {
19 pub fn open(read_config: ReaderConfig, global_snapshot_id: u64) -> Result<Self> {
20 let volumes = read_config.volumes.clone();
21 let resolver = combined_resolver(None);
22 let reader = Reader::open_with_resolver(read_config, global_snapshot_id, Some(resolver))?;
23 let structured_schema = load_schema_from_reader(&reader, &volumes)?;
24 Ok(Self {
25 reader,
26 structured_schema: Arc::new(structured_schema),
27 })
28 }
29
30 pub fn open_current(read_config: ReaderConfig) -> Result<Self> {
31 let volumes = read_config.volumes.clone();
32 let resolver = combined_resolver(None);
33 let reader = Reader::open_current_with_resolver(read_config, Some(resolver))?;
34 let structured_schema = load_schema_from_reader(&reader, &volumes)?;
35 Ok(Self {
36 reader,
37 structured_schema: Arc::new(structured_schema),
38 })
39 }
40
41 pub fn current_schema(&self) -> StructuredSchema {
42 self.structured_schema.as_ref().clone()
43 }
44
45 pub fn get(
48 &mut self,
49 bucket_id: u16,
50 key: &[u8],
51 ) -> Result<Option<Vec<Option<StructuredColumnValue>>>> {
52 let raw = self.reader.get(bucket_id, key)?;
53 raw.map(|columns| decode_row(&self.structured_schema, 0, columns))
54 .transpose()
55 }
56
57 pub fn get_with_options(
58 &mut self,
59 bucket_id: u16,
60 key: &[u8],
61 options: &ReadOptions,
62 ) -> Result<Option<Vec<Option<StructuredColumnValue>>>> {
63 let raw = if options.column_indices.is_some() {
64 self.reader.get(bucket_id, key)?
65 } else {
66 self.reader.get_with_options(bucket_id, key, options)?
67 };
68 raw.map(|columns| decode_row(&self.structured_schema, 0, columns))
69 .transpose()
70 .map(|row| row.map(|decoded| project_decoded_row_for_read(decoded, options)))
71 }
72
73 pub fn scan(
74 &mut self,
75 bucket_id: u16,
76 range: Range<&[u8]>,
77 ) -> Result<StructuredDbIterator<'static>> {
78 self.scan_with_options(bucket_id, range, &ScanOptions::default())
79 }
80
81 pub fn scan_with_options(
82 &mut self,
83 bucket_id: u16,
84 range: Range<&[u8]>,
85 options: &ScanOptions,
86 ) -> Result<StructuredDbIterator<'static>> {
87 let inner = self.reader.scan_with_options(bucket_id, range, options)?;
88 let projected_schema = project_structured_schema_for_scan(&self.structured_schema, options);
89 Ok(StructuredDbIterator::new(inner, projected_schema, 0))
90 }
91
92 pub fn refresh(&mut self) -> Result<()> {
95 self.reader.refresh()
96 }
97
98 pub fn read_mode(&self) -> &'static str {
99 self.reader.read_mode()
100 }
101
102 pub fn configured_snapshot_id(&self) -> Option<u64> {
103 self.reader.configured_snapshot_id()
104 }
105
106 pub fn current_global_snapshot(&self) -> &GlobalSnapshotManifest {
107 self.reader.current_global_snapshot()
108 }
109
110 pub fn list_global_snapshots(&self) -> Result<Vec<GlobalSnapshotSummary>> {
111 self.reader.list_global_snapshots()
112 }
113
114 pub fn list_global_snapshot_manifests(&self) -> Result<Vec<GlobalSnapshotManifest>> {
115 self.reader.list_global_snapshot_manifests()
116 }
117}
118
119fn load_schema_from_reader(
121 reader: &Reader,
122 volumes: &[VolumeDescriptor],
123) -> Result<StructuredSchema> {
124 let manifest = reader.current_global_snapshot();
125 let shard = manifest.shard_snapshots.first().ok_or_else(|| {
126 cobble::Error::ConfigError("global snapshot has no shard snapshots".to_string())
127 })?;
128 let config = cobble::Config {
129 volumes: volumes.to_vec(),
130 total_buckets: manifest.total_buckets,
131 ..cobble::Config::default()
132 };
133 let read_only = ReadOnlyDb::open_with_db_id_and_resolver(
134 config,
135 shard.snapshot_id,
136 shard.db_id.clone(),
137 combined_resolver(None),
138 )?;
139 load_structured_schema_from_cobble_schema(&read_only.current_schema())
140}
141
142#[cfg(test)]
143mod tests {
144 use super::*;
145 use crate::StructuredColumnType;
146 use crate::list::{ListConfig, ListRetainMode};
147 use crate::structured_single_db::StructuredSingleDb;
148 use bytes::Bytes;
149 use cobble::{ReadOptions, VolumeDescriptor};
150 use std::collections::BTreeMap;
151 use std::thread;
152 use std::time::Duration;
153 use uuid::Uuid;
154
155 fn test_schema() -> StructuredSchema {
156 StructuredSchema {
157 columns: BTreeMap::from([(
158 1,
159 StructuredColumnType::List(ListConfig {
160 max_elements: Some(3),
161 retain_mode: ListRetainMode::Last,
162 preserve_element_ttl: false,
163 }),
164 )]),
165 }
166 }
167
168 fn test_config(root: &str) -> cobble::Config {
169 cobble::Config {
170 volumes: VolumeDescriptor::single_volume(format!("file://{}", root)),
171 num_columns: 2,
172 total_buckets: 2,
173 snapshot_on_flush: true,
174 ..cobble::Config::default()
175 }
176 }
177
178 #[test]
179 fn test_structured_reader_get_scan() {
180 let root = format!("/tmp/ds_reader_get_scan_{}", Uuid::new_v4());
181
182 let mut db = StructuredSingleDb::open(test_config(&root)).unwrap();
184 db.update_schema()
185 .add_list_column(
186 1,
187 ListConfig {
188 max_elements: Some(3),
189 retain_mode: ListRetainMode::Last,
190 preserve_element_ttl: false,
191 },
192 )
193 .commit()
194 .unwrap();
195 db.put(0, b"k1", 0, Bytes::from_static(b"v0")).unwrap();
196 db.merge(0, b"k1", 1, vec![Bytes::from_static(b"a")])
197 .unwrap();
198 db.merge(0, b"k1", 1, vec![Bytes::from_static(b"b")])
199 .unwrap();
200 let snap_id = db.snapshot().unwrap();
201 thread::sleep(Duration::from_millis(200));
202 db.close().unwrap();
203
204 let read_config = ReaderConfig {
206 volumes: VolumeDescriptor::single_volume(format!("file://{}", root)),
207 total_buckets: 2,
208 ..ReaderConfig::default()
209 };
210 let mut reader = StructuredReader::open(read_config, snap_id).unwrap();
211
212 assert_eq!(reader.current_schema(), test_schema());
214
215 let row = reader.get(0, b"k1").unwrap().expect("row exists");
217 assert_eq!(
218 row[0],
219 Some(StructuredColumnValue::Bytes(Bytes::from_static(b"v0")))
220 );
221 assert_eq!(
222 row[1],
223 Some(StructuredColumnValue::List(vec![
224 Bytes::from_static(b"a"),
225 Bytes::from_static(b"b"),
226 ]))
227 );
228
229 let mut iter = reader.scan(0, b"k0".as_ref()..b"k9".as_ref()).unwrap();
231 let first = iter.next().expect("one row").unwrap();
232 assert_eq!(first.0.as_ref(), b"k1");
233 assert!(iter.next().is_none());
234
235 let _ = std::fs::remove_dir_all(root);
236 }
237
238 #[test]
239 fn test_structured_reader_open_current() {
240 let root = format!("/tmp/ds_reader_current_{}", Uuid::new_v4());
241
242 let mut db = StructuredSingleDb::open(test_config(&root)).unwrap();
243 db.update_schema()
244 .add_list_column(
245 1,
246 ListConfig {
247 max_elements: Some(3),
248 retain_mode: ListRetainMode::Last,
249 preserve_element_ttl: false,
250 },
251 )
252 .commit()
253 .unwrap();
254 db.put(0, b"k1", 0, Bytes::from_static(b"v0")).unwrap();
255 let _ = db.snapshot().unwrap();
256 thread::sleep(Duration::from_millis(200));
257 db.close().unwrap();
258
259 let read_config = ReaderConfig {
260 volumes: VolumeDescriptor::single_volume(format!("file://{}", root)),
261 total_buckets: 2,
262 ..ReaderConfig::default()
263 };
264 let mut reader = StructuredReader::open_current(read_config).unwrap();
265
266 let row = reader.get(0, b"k1").unwrap().expect("row exists");
267 assert_eq!(
268 row[0],
269 Some(StructuredColumnValue::Bytes(Bytes::from_static(b"v0")))
270 );
271
272 let _ = std::fs::remove_dir_all(root);
273 }
274
275 #[test]
276 fn test_structured_reader_get_with_projection_reindexes_schema() {
277 let root = format!("/tmp/ds_reader_get_projection_{}", Uuid::new_v4());
278
279 let mut db = StructuredSingleDb::open(test_config(&root)).unwrap();
280 db.update_schema()
281 .add_list_column(
282 1,
283 ListConfig {
284 max_elements: Some(3),
285 retain_mode: ListRetainMode::Last,
286 preserve_element_ttl: false,
287 },
288 )
289 .commit()
290 .unwrap();
291 db.put(0, b"k1", 0, Bytes::from_static(b"v0")).unwrap();
292 db.merge(0, b"k1", 1, vec![Bytes::from_static(b"a")])
293 .unwrap();
294 db.merge(0, b"k1", 1, vec![Bytes::from_static(b"b")])
295 .unwrap();
296 let snap_id = db.snapshot().unwrap();
297 thread::sleep(Duration::from_millis(200));
298 db.close().unwrap();
299
300 let read_config = ReaderConfig {
301 volumes: VolumeDescriptor::single_volume(format!("file://{}", root)),
302 total_buckets: 2,
303 ..ReaderConfig::default()
304 };
305 let mut reader = StructuredReader::open(read_config, snap_id).unwrap();
306 let row = reader
307 .get_with_options(0, b"k1", &ReadOptions::for_column(1))
308 .unwrap()
309 .expect("row exists");
310 assert_eq!(row.len(), 1);
311 assert_eq!(
312 row[0],
313 Some(StructuredColumnValue::List(vec![
314 Bytes::from_static(b"a"),
315 Bytes::from_static(b"b"),
316 ]))
317 );
318
319 let _ = std::fs::remove_dir_all(root);
320 }
321}