1use sochdb_core::{Result, SochDBError, SochRow, SochValue};
34use std::ops::Range;
35use std::path::Path;
36use std::sync::Arc;
37
38#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
40pub struct TxnHandle {
41 pub txn_id: u64,
43 pub snapshot_version: u64,
45 pub start_ts: u64,
47}
48
49impl TxnHandle {
50 pub fn new(txn_id: u64, snapshot_version: u64) -> Self {
52 let start_ts = std::time::SystemTime::now()
53 .duration_since(std::time::UNIX_EPOCH)
54 .unwrap()
55 .as_micros() as u64;
56 Self {
57 txn_id,
58 snapshot_version,
59 start_ts,
60 }
61 }
62}
63
64pub type ColumnId = u32;
66
67pub type RowId = u64;
69
70#[derive(Debug, Clone)]
72pub struct Row {
73 pub id: RowId,
75 pub values: Vec<Option<Vec<u8>>>,
77 pub txn_start: u64,
79 pub txn_end: u64,
81}
82
83impl Row {
84 pub fn new(id: RowId, values: Vec<Option<Vec<u8>>>) -> Self {
86 let now = std::time::SystemTime::now()
87 .duration_since(std::time::UNIX_EPOCH)
88 .unwrap()
89 .as_micros() as u64;
90 Self {
91 id,
92 values,
93 txn_start: now,
94 txn_end: 0,
95 }
96 }
97
98 pub fn is_visible(&self, snapshot_version: u64) -> bool {
100 self.txn_start <= snapshot_version && (self.txn_end == 0 || self.txn_end > snapshot_version)
101 }
102
103 pub fn to_soch_row(&self, _schema: &[String]) -> SochRow {
105 let values: Vec<SochValue> = self
106 .values
107 .iter()
108 .map(|v| match v {
109 Some(bytes) => {
110 if let Ok(s) = std::str::from_utf8(bytes) {
112 SochValue::Text(s.to_string())
113 } else {
114 SochValue::Binary(bytes.clone())
115 }
116 }
117 None => SochValue::Null,
118 })
119 .collect();
120 SochRow::new(values)
121 }
122}
123
124pub struct ColumnIterator {
126 position: usize,
128 rows: Vec<Row>,
130 column_ids: Vec<ColumnId>,
132}
133
134impl ColumnIterator {
135 pub fn new(rows: Vec<Row>, column_ids: Vec<ColumnId>) -> Self {
137 Self {
138 position: 0,
139 rows,
140 column_ids,
141 }
142 }
143
144 pub fn column_ids(&self) -> &[ColumnId] {
146 &self.column_ids
147 }
148}
149
150impl Iterator for ColumnIterator {
151 type Item = Row;
152
153 fn next(&mut self) -> Option<Self::Item> {
154 if self.position < self.rows.len() {
155 let row = self.rows[self.position].clone();
156 self.position += 1;
157 Some(row)
158 } else {
159 None
160 }
161 }
162}
163
164#[derive(Debug, Clone, Default)]
166pub struct StorageStats {
167 pub total_rows: u64,
169 pub disk_bytes: u64,
171 pub memory_bytes: u64,
173 pub num_levels: u32,
175 pub files_per_level: Vec<u32>,
177 pub read_amplification: f64,
179 pub write_amplification: f64,
181}
182
183pub trait StorageEngine: Send + Sync {
189 fn begin_txn(&self) -> Result<TxnHandle>;
191
192 fn get(&self, txn: &TxnHandle, key: &[u8]) -> Result<Option<Row>>;
194
195 fn put(&self, txn: &TxnHandle, key: &[u8], row: Row) -> Result<()>;
197
198 fn delete(&self, txn: &TxnHandle, key: &[u8]) -> Result<()>;
200
201 fn scan(&self, txn: &TxnHandle, range: Range<Vec<u8>>) -> Result<Vec<Row>>;
203
204 fn scan_columns(
212 &self,
213 txn: &TxnHandle,
214 range: Range<Vec<u8>>,
215 cols: &[ColumnId],
216 ) -> Result<ColumnIterator>;
217
218 fn commit(&self, txn: TxnHandle) -> Result<()>;
220
221 fn abort(&self, txn: TxnHandle) -> Result<()>;
223
224 fn stats(&self) -> StorageStats;
226
227 fn flush(&self) -> Result<()>;
229
230 fn compact(&self) -> Result<()>;
232
233 fn close(&self) -> Result<()>;
235}
236
237pub fn open_storage_engine<P: AsRef<Path>>(
239 path: P,
240 engine_type: StorageEngineType,
241) -> Result<Arc<dyn StorageEngine>> {
242 match engine_type {
243 StorageEngineType::Lscs => {
244 use crate::lscs::{ColumnDef, ColumnType, Lscs, LscsConfig, TableSchema};
245
246 let schema = TableSchema::new(
248 "default".to_string(),
249 vec![
250 ColumnDef {
251 name: "key".to_string(),
252 col_type: ColumnType::Binary,
253 nullable: false,
254 },
255 ColumnDef {
256 name: "value".to_string(),
257 col_type: ColumnType::Binary,
258 nullable: true,
259 },
260 ],
261 )
262 .with_mvcc();
263
264 let lscs = Lscs::new(path.as_ref().to_path_buf(), schema, LscsConfig::default())?;
265 Ok(Arc::new(LscsAdapter::new(lscs)))
266 }
267 StorageEngineType::Legacy => Err(SochDBError::InvalidArgument(
268 "Legacy LSMTree has been removed; use Lscs".to_string(),
269 )),
270 }
271}
272
273#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
275pub enum StorageEngineType {
276 #[default]
278 Lscs,
279 Legacy,
281}
282
283pub struct LscsAdapter {
285 inner: crate::lscs::Lscs,
286 next_txn_id: std::sync::atomic::AtomicU64,
287 version_counter: std::sync::atomic::AtomicU64,
288}
289
290impl LscsAdapter {
291 pub fn new(lscs: crate::lscs::Lscs) -> Self {
293 Self {
294 inner: lscs,
295 next_txn_id: std::sync::atomic::AtomicU64::new(1),
296 version_counter: std::sync::atomic::AtomicU64::new(1),
297 }
298 }
299}
300
301impl StorageEngine for LscsAdapter {
302 fn begin_txn(&self) -> Result<TxnHandle> {
303 let txn_id = self
304 .next_txn_id
305 .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
306 let snapshot = self
307 .version_counter
308 .load(std::sync::atomic::Ordering::Acquire);
309 Ok(TxnHandle::new(txn_id, snapshot))
310 }
311
312 fn get(&self, txn: &TxnHandle, key: &[u8]) -> Result<Option<Row>> {
313 let row_id = u64::from_le_bytes(
315 key.try_into()
316 .map_err(|_| SochDBError::InvalidArgument("Key must be 8 bytes".to_string()))?,
317 );
318
319 if let Some(values) = self.inner.get(row_id)? {
321 let num_cols = values.len();
323 let (txn_start, txn_end) = if num_cols >= 2 {
324 let start = values[num_cols - 2]
325 .as_ref()
326 .and_then(|v| v.get(..8))
327 .map(|b| u64::from_le_bytes(b.try_into().unwrap()))
328 .unwrap_or(0);
329 let end = values[num_cols - 1]
330 .as_ref()
331 .and_then(|v| v.get(..8))
332 .map(|b| u64::from_le_bytes(b.try_into().unwrap()))
333 .unwrap_or(0);
334 (start, end)
335 } else {
336 (0, 0)
337 };
338
339 let row = Row {
340 id: row_id,
341 values,
342 txn_start,
343 txn_end,
344 };
345
346 if row.is_visible(txn.snapshot_version) {
348 return Ok(Some(row));
349 }
350 }
351
352 Ok(None)
353 }
354
355 fn put(&self, _txn: &TxnHandle, key: &[u8], row: Row) -> Result<()> {
356 let values: Vec<Option<&[u8]>> = row.values.iter().map(|v| v.as_deref()).collect();
357 let _ = key; self.inner.insert(&values)?;
359 Ok(())
360 }
361
362 fn delete(&self, txn: &TxnHandle, key: &[u8]) -> Result<()> {
363 let row_id = u64::from_le_bytes(
366 key.try_into()
367 .map_err(|_| SochDBError::InvalidArgument("Key must be 8 bytes".to_string()))?,
368 );
369
370 self.inner
372 .mark_deleted(row_id, txn.txn_id, txn.snapshot_version)?;
373 Ok(())
374 }
375
376 fn scan(&self, txn: &TxnHandle, range: Range<Vec<u8>>) -> Result<Vec<Row>> {
377 let start = if range.start.len() >= 8 {
379 u64::from_le_bytes(range.start[..8].try_into().unwrap())
380 } else {
381 0
382 };
383 let end = if range.end.len() >= 8 {
384 u64::from_le_bytes(range.end[..8].try_into().unwrap())
385 } else {
386 u64::MAX
387 };
388
389 let scan_results = self.inner.scan_range(start, end)?;
391 let rows: Vec<Row> = scan_results
392 .into_iter()
393 .filter_map(|(row_id, values)| {
394 let row = Row {
395 id: row_id,
396 values,
397 txn_start: 0,
398 txn_end: 0,
399 };
400 if row.is_visible(txn.snapshot_version) {
401 Some(row)
402 } else {
403 None
404 }
405 })
406 .collect();
407
408 Ok(rows)
409 }
410
411 fn scan_columns(
412 &self,
413 txn: &TxnHandle,
414 range: Range<Vec<u8>>,
415 cols: &[ColumnId],
416 ) -> Result<ColumnIterator> {
417 let start = if range.start.len() >= 8 {
419 u64::from_le_bytes(range.start[..8].try_into().unwrap())
420 } else {
421 0
422 };
423 let end = if range.end.len() >= 8 {
424 u64::from_le_bytes(range.end[..8].try_into().unwrap())
425 } else {
426 u64::MAX
427 };
428
429 let col_indices: Vec<usize> = cols.iter().map(|&c| c as usize).collect();
432
433 let scan_results = self.inner.scan_columns_range(start, end, &col_indices)?;
434
435 let rows: Vec<Row> = scan_results
436 .into_iter()
437 .filter_map(|(row_id, values)| {
438 let row = Row {
439 id: row_id,
440 values,
441 txn_start: 0,
442 txn_end: 0,
443 };
444 if row.is_visible(txn.snapshot_version) {
445 Some(row)
446 } else {
447 None
448 }
449 })
450 .collect();
451
452 Ok(ColumnIterator::new(rows, cols.to_vec()))
453 }
454
455 fn commit(&self, txn: TxnHandle) -> Result<()> {
456 self.inner.fsync()?;
458 self.version_counter
459 .fetch_add(1, std::sync::atomic::Ordering::Release);
460 let _ = txn;
461 Ok(())
462 }
463
464 fn abort(&self, _txn: TxnHandle) -> Result<()> {
465 Ok(())
466 }
467
468 fn stats(&self) -> StorageStats {
469 let lscs_stats = self.inner.stats();
470 StorageStats {
471 total_rows: lscs_stats.next_row_id,
472 disk_bytes: lscs_stats.disk_bytes,
473 memory_bytes: lscs_stats.active_memtable_bytes as u64,
474 num_levels: lscs_stats.level_row_counts.len() as u32,
475 files_per_level: vec![0; lscs_stats.level_row_counts.len()],
476 read_amplification: 1.0,
477 write_amplification: 1.0,
478 }
479 }
480
481 fn flush(&self) -> Result<()> {
482 self.inner.flush()
483 }
484
485 fn compact(&self) -> Result<()> {
486 self.inner.compact()
487 }
488
489 fn close(&self) -> Result<()> {
490 self.flush()
491 }
492}
493
494#[cfg(test)]
495mod tests {
496 use super::*;
497
498 #[test]
499 fn test_txn_handle() {
500 let handle = TxnHandle::new(1, 100);
501 assert_eq!(handle.txn_id, 1);
502 assert_eq!(handle.snapshot_version, 100);
503 assert!(handle.start_ts > 0);
504 }
505
506 #[test]
507 fn test_row_visibility() {
508 let mut row = Row::new(1, vec![Some(b"test".to_vec())]);
509 row.txn_start = 100;
510 row.txn_end = 0;
511
512 assert!(row.is_visible(100));
514 assert!(row.is_visible(200));
515 assert!(!row.is_visible(99));
516
517 row.txn_end = 150;
519 assert!(row.is_visible(120)); assert!(!row.is_visible(150)); assert!(!row.is_visible(200)); }
523
524 #[test]
525 fn test_column_iterator() {
526 let rows = vec![
527 Row::new(1, vec![Some(b"a".to_vec()), Some(b"b".to_vec())]),
528 Row::new(2, vec![Some(b"c".to_vec()), Some(b"d".to_vec())]),
529 ];
530 let mut iter = ColumnIterator::new(rows, vec![0, 1]);
531
532 assert_eq!(iter.column_ids(), &[0, 1]);
533 assert!(iter.next().is_some());
534 assert!(iter.next().is_some());
535 assert!(iter.next().is_none());
536 }
537}