1use std::ops::Range;
31use std::path::Path;
32use std::sync::Arc;
33use sochdb_core::{Result, SochDBError, SochRow, SochValue};
34
35#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
37pub struct TxnHandle {
38 pub txn_id: u64,
40 pub snapshot_version: u64,
42 pub start_ts: u64,
44}
45
46impl TxnHandle {
47 pub fn new(txn_id: u64, snapshot_version: u64) -> Self {
49 let start_ts = std::time::SystemTime::now()
50 .duration_since(std::time::UNIX_EPOCH)
51 .unwrap()
52 .as_micros() as u64;
53 Self {
54 txn_id,
55 snapshot_version,
56 start_ts,
57 }
58 }
59}
60
61pub type ColumnId = u32;
63
64pub type RowId = u64;
66
67#[derive(Debug, Clone)]
69pub struct Row {
70 pub id: RowId,
72 pub values: Vec<Option<Vec<u8>>>,
74 pub txn_start: u64,
76 pub txn_end: u64,
78}
79
80impl Row {
81 pub fn new(id: RowId, values: Vec<Option<Vec<u8>>>) -> Self {
83 let now = std::time::SystemTime::now()
84 .duration_since(std::time::UNIX_EPOCH)
85 .unwrap()
86 .as_micros() as u64;
87 Self {
88 id,
89 values,
90 txn_start: now,
91 txn_end: 0,
92 }
93 }
94
95 pub fn is_visible(&self, snapshot_version: u64) -> bool {
97 self.txn_start <= snapshot_version && (self.txn_end == 0 || self.txn_end > snapshot_version)
98 }
99
100 pub fn to_soch_row(&self, _schema: &[String]) -> SochRow {
102 let values: Vec<SochValue> = self
103 .values
104 .iter()
105 .map(|v| match v {
106 Some(bytes) => {
107 if let Ok(s) = std::str::from_utf8(bytes) {
109 SochValue::Text(s.to_string())
110 } else {
111 SochValue::Binary(bytes.clone())
112 }
113 }
114 None => SochValue::Null,
115 })
116 .collect();
117 SochRow::new(values)
118 }
119}
120
121pub struct ColumnIterator {
123 position: usize,
125 rows: Vec<Row>,
127 column_ids: Vec<ColumnId>,
129}
130
131impl ColumnIterator {
132 pub fn new(rows: Vec<Row>, column_ids: Vec<ColumnId>) -> Self {
134 Self {
135 position: 0,
136 rows,
137 column_ids,
138 }
139 }
140
141 pub fn column_ids(&self) -> &[ColumnId] {
143 &self.column_ids
144 }
145}
146
147impl Iterator for ColumnIterator {
148 type Item = Row;
149
150 fn next(&mut self) -> Option<Self::Item> {
151 if self.position < self.rows.len() {
152 let row = self.rows[self.position].clone();
153 self.position += 1;
154 Some(row)
155 } else {
156 None
157 }
158 }
159}
160
161#[derive(Debug, Clone, Default)]
163pub struct StorageStats {
164 pub total_rows: u64,
166 pub disk_bytes: u64,
168 pub memory_bytes: u64,
170 pub num_levels: u32,
172 pub files_per_level: Vec<u32>,
174 pub read_amplification: f64,
176 pub write_amplification: f64,
178}
179
180pub trait StorageEngine: Send + Sync {
186 fn begin_txn(&self) -> Result<TxnHandle>;
188
189 fn get(&self, txn: &TxnHandle, key: &[u8]) -> Result<Option<Row>>;
191
192 fn put(&self, txn: &TxnHandle, key: &[u8], row: Row) -> Result<()>;
194
195 fn delete(&self, txn: &TxnHandle, key: &[u8]) -> Result<()>;
197
198 fn scan(&self, txn: &TxnHandle, range: Range<Vec<u8>>) -> Result<Vec<Row>>;
200
201 fn scan_columns(
209 &self,
210 txn: &TxnHandle,
211 range: Range<Vec<u8>>,
212 cols: &[ColumnId],
213 ) -> Result<ColumnIterator>;
214
215 fn commit(&self, txn: TxnHandle) -> Result<()>;
217
218 fn abort(&self, txn: TxnHandle) -> Result<()>;
220
221 fn stats(&self) -> StorageStats;
223
224 fn flush(&self) -> Result<()>;
226
227 fn compact(&self) -> Result<()>;
229
230 fn close(&self) -> Result<()>;
232}
233
234pub fn open_storage_engine<P: AsRef<Path>>(
236 path: P,
237 engine_type: StorageEngineType,
238) -> Result<Arc<dyn StorageEngine>> {
239 match engine_type {
240 StorageEngineType::Lscs => {
241 use crate::lscs::{ColumnDef, ColumnType, Lscs, LscsConfig, TableSchema};
242
243 let schema = TableSchema::new(
245 "default".to_string(),
246 vec![
247 ColumnDef {
248 name: "key".to_string(),
249 col_type: ColumnType::Binary,
250 nullable: false,
251 },
252 ColumnDef {
253 name: "value".to_string(),
254 col_type: ColumnType::Binary,
255 nullable: true,
256 },
257 ],
258 )
259 .with_mvcc();
260
261 let lscs = Lscs::new(path.as_ref().to_path_buf(), schema, LscsConfig::default())?;
262 Ok(Arc::new(LscsAdapter::new(lscs)))
263 }
264 StorageEngineType::Legacy => Err(SochDBError::InvalidArgument(
265 "Legacy LSMTree has been removed; use Lscs".to_string(),
266 )),
267 }
268}
269
270#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
272pub enum StorageEngineType {
273 #[default]
275 Lscs,
276 Legacy,
278}
279
280pub struct LscsAdapter {
282 inner: crate::lscs::Lscs,
283 next_txn_id: std::sync::atomic::AtomicU64,
284 version_counter: std::sync::atomic::AtomicU64,
285}
286
287impl LscsAdapter {
288 pub fn new(lscs: crate::lscs::Lscs) -> Self {
290 Self {
291 inner: lscs,
292 next_txn_id: std::sync::atomic::AtomicU64::new(1),
293 version_counter: std::sync::atomic::AtomicU64::new(1),
294 }
295 }
296}
297
298impl StorageEngine for LscsAdapter {
299 fn begin_txn(&self) -> Result<TxnHandle> {
300 let txn_id = self
301 .next_txn_id
302 .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
303 let snapshot = self
304 .version_counter
305 .load(std::sync::atomic::Ordering::Acquire);
306 Ok(TxnHandle::new(txn_id, snapshot))
307 }
308
309 fn get(&self, txn: &TxnHandle, key: &[u8]) -> Result<Option<Row>> {
310 let row_id = u64::from_le_bytes(
312 key.try_into()
313 .map_err(|_| SochDBError::InvalidArgument("Key must be 8 bytes".to_string()))?,
314 );
315
316 if let Some(values) = self.inner.get(row_id)? {
318 let num_cols = values.len();
320 let (txn_start, txn_end) = if num_cols >= 2 {
321 let start = values[num_cols - 2]
322 .as_ref()
323 .and_then(|v| v.get(..8))
324 .map(|b| u64::from_le_bytes(b.try_into().unwrap()))
325 .unwrap_or(0);
326 let end = values[num_cols - 1]
327 .as_ref()
328 .and_then(|v| v.get(..8))
329 .map(|b| u64::from_le_bytes(b.try_into().unwrap()))
330 .unwrap_or(0);
331 (start, end)
332 } else {
333 (0, 0)
334 };
335
336 let row = Row {
337 id: row_id,
338 values,
339 txn_start,
340 txn_end,
341 };
342
343 if row.is_visible(txn.snapshot_version) {
345 return Ok(Some(row));
346 }
347 }
348
349 Ok(None)
350 }
351
352 fn put(&self, _txn: &TxnHandle, key: &[u8], row: Row) -> Result<()> {
353 let values: Vec<Option<&[u8]>> = row.values.iter().map(|v| v.as_deref()).collect();
354 let _ = key; self.inner.insert(&values)?;
356 Ok(())
357 }
358
359 fn delete(&self, txn: &TxnHandle, key: &[u8]) -> Result<()> {
360 let row_id = u64::from_le_bytes(
363 key.try_into()
364 .map_err(|_| SochDBError::InvalidArgument("Key must be 8 bytes".to_string()))?,
365 );
366
367 self.inner
369 .mark_deleted(row_id, txn.txn_id, txn.snapshot_version)?;
370 Ok(())
371 }
372
373 fn scan(&self, txn: &TxnHandle, range: Range<Vec<u8>>) -> Result<Vec<Row>> {
374 let start = if range.start.len() >= 8 {
376 u64::from_le_bytes(range.start[..8].try_into().unwrap())
377 } else {
378 0
379 };
380 let end = if range.end.len() >= 8 {
381 u64::from_le_bytes(range.end[..8].try_into().unwrap())
382 } else {
383 u64::MAX
384 };
385
386 let scan_results = self.inner.scan_range(start, end)?;
388 let rows: Vec<Row> = scan_results
389 .into_iter()
390 .filter_map(|(row_id, values)| {
391 let row = Row {
392 id: row_id,
393 values,
394 txn_start: 0,
395 txn_end: 0,
396 };
397 if row.is_visible(txn.snapshot_version) {
398 Some(row)
399 } else {
400 None
401 }
402 })
403 .collect();
404
405 Ok(rows)
406 }
407
408 fn scan_columns(
409 &self,
410 txn: &TxnHandle,
411 range: Range<Vec<u8>>,
412 cols: &[ColumnId],
413 ) -> Result<ColumnIterator> {
414 let start = if range.start.len() >= 8 {
416 u64::from_le_bytes(range.start[..8].try_into().unwrap())
417 } else {
418 0
419 };
420 let end = if range.end.len() >= 8 {
421 u64::from_le_bytes(range.end[..8].try_into().unwrap())
422 } else {
423 u64::MAX
424 };
425
426 let col_indices: Vec<usize> = cols.iter().map(|&c| c as usize).collect();
429
430 let scan_results = self.inner.scan_columns_range(start, end, &col_indices)?;
431
432 let rows: Vec<Row> = scan_results
433 .into_iter()
434 .filter_map(|(row_id, values)| {
435 let row = Row {
436 id: row_id,
437 values,
438 txn_start: 0,
439 txn_end: 0,
440 };
441 if row.is_visible(txn.snapshot_version) {
442 Some(row)
443 } else {
444 None
445 }
446 })
447 .collect();
448
449 Ok(ColumnIterator::new(rows, cols.to_vec()))
450 }
451
452 fn commit(&self, txn: TxnHandle) -> Result<()> {
453 self.inner.fsync()?;
455 self.version_counter
456 .fetch_add(1, std::sync::atomic::Ordering::Release);
457 let _ = txn;
458 Ok(())
459 }
460
461 fn abort(&self, _txn: TxnHandle) -> Result<()> {
462 Ok(())
463 }
464
465 fn stats(&self) -> StorageStats {
466 let lscs_stats = self.inner.stats();
467 StorageStats {
468 total_rows: lscs_stats.next_row_id,
469 disk_bytes: lscs_stats.disk_bytes,
470 memory_bytes: lscs_stats.active_memtable_bytes as u64,
471 num_levels: lscs_stats.level_row_counts.len() as u32,
472 files_per_level: vec![0; lscs_stats.level_row_counts.len()],
473 read_amplification: 1.0,
474 write_amplification: 1.0,
475 }
476 }
477
478 fn flush(&self) -> Result<()> {
479 self.inner.flush()
480 }
481
482 fn compact(&self) -> Result<()> {
483 self.inner.compact()
484 }
485
486 fn close(&self) -> Result<()> {
487 self.flush()
488 }
489}
490
491#[cfg(test)]
492mod tests {
493 use super::*;
494
495 #[test]
496 fn test_txn_handle() {
497 let handle = TxnHandle::new(1, 100);
498 assert_eq!(handle.txn_id, 1);
499 assert_eq!(handle.snapshot_version, 100);
500 assert!(handle.start_ts > 0);
501 }
502
503 #[test]
504 fn test_row_visibility() {
505 let mut row = Row::new(1, vec![Some(b"test".to_vec())]);
506 row.txn_start = 100;
507 row.txn_end = 0;
508
509 assert!(row.is_visible(100));
511 assert!(row.is_visible(200));
512 assert!(!row.is_visible(99));
513
514 row.txn_end = 150;
516 assert!(row.is_visible(120)); assert!(!row.is_visible(150)); assert!(!row.is_visible(200)); }
520
521 #[test]
522 fn test_column_iterator() {
523 let rows = vec![
524 Row::new(1, vec![Some(b"a".to_vec()), Some(b"b".to_vec())]),
525 Row::new(2, vec![Some(b"c".to_vec()), Some(b"d".to_vec())]),
526 ];
527 let mut iter = ColumnIterator::new(rows, vec![0, 1]);
528
529 assert_eq!(iter.column_ids(), &[0, 1]);
530 assert!(iter.next().is_some());
531 assert!(iter.next().is_some());
532 assert!(iter.next().is_none());
533 }
534}