reifydb-store-single 0.5.0

Single-version storage for OLTP operations without version history
Documentation
// SPDX-License-Identifier: Apache-2.0
// Copyright (c) 2025 ReifyDB

use std::{collections::BTreeMap, ops::Bound, sync::Arc};

use reifydb_core::encoded::key::EncodedKey;
use reifydb_runtime::sync::rwlock::RwLock;
use reifydb_type::{Result, util::cowvec::CowVec};
use tracing::instrument;

use crate::tier::{RangeBatch, RangeCursor, RawEntry, TierBackend, TierStorage};

type MemoryStore = Arc<RwLock<BTreeMap<EncodedKey, Option<CowVec<u8>>>>>;

#[derive(Clone)]
pub struct MemoryPrimitiveStorage {
	inner: Arc<MemoryPrimitiveStorageInner>,
}

struct MemoryPrimitiveStorageInner {
	data: MemoryStore,
}

impl Default for MemoryPrimitiveStorage {
	fn default() -> Self {
		Self::new()
	}
}

impl MemoryPrimitiveStorage {
	#[instrument(name = "store::single::memory::new", level = "debug")]
	pub fn new() -> Self {
		Self {
			inner: Arc::new(MemoryPrimitiveStorageInner {
				data: Arc::new(RwLock::new(BTreeMap::new())),
			}),
		}
	}
}

impl TierStorage for MemoryPrimitiveStorage {
	#[instrument(name = "store::single::memory::get", level = "trace", skip(self, key), fields(key_len = key.len()))]
	fn get(&self, key: &[u8]) -> Result<Option<CowVec<u8>>> {
		let map = self.inner.data.read();
		Ok(map.get(key).cloned().flatten())
	}

	#[instrument(name = "store::single::memory::contains", level = "trace", skip(self, key), fields(key_len = key.len()), ret)]
	fn contains(&self, key: &[u8]) -> Result<bool> {
		let map = self.inner.data.read();
		Ok(matches!(map.get(key), Some(Some(_))))
	}

	#[instrument(name = "store::single::memory::get_with_tombstone", level = "trace", skip(self, key), fields(key_len = key.len()))]
	fn get_with_tombstone(&self, key: &[u8]) -> Result<Option<Option<CowVec<u8>>>> {
		let map = self.inner.data.read();
		Ok(map.get(key).cloned())
	}

	#[instrument(name = "store::single::memory::set", level = "debug", skip(self, entries), fields(entry_count = entries.len()))]
	fn set(&self, entries: Vec<(EncodedKey, Option<CowVec<u8>>)>) -> Result<()> {
		let mut map = self.inner.data.write();
		for (key, value) in entries {
			map.insert(key, value);
		}
		Ok(())
	}

	#[instrument(name = "store::single::memory::range_next", level = "trace", skip(self, cursor))]
	fn range_next(
		&self,
		cursor: &mut RangeCursor,
		start: Bound<&[u8]>,
		end: Bound<&[u8]>,
		batch_size: usize,
	) -> Result<RangeBatch> {
		if cursor.exhausted {
			return Ok(RangeBatch::empty());
		}

		let map = self.inner.data.read();

		let actual_start = if let Some(ref last_key) = cursor.last_key {
			Bound::Excluded(last_key.as_slice())
		} else {
			start
		};

		let entries: Vec<RawEntry> = map
			.range::<[u8], _>((actual_start, end))
			.take(batch_size)
			.map(|(k, v)| RawEntry {
				key: k.clone(),
				value: v.clone(),
			})
			.collect();

		if let Some(last_entry) = entries.last() {
			cursor.last_key = Some(last_entry.key.clone());
			cursor.exhausted = entries.len() < batch_size;
		} else {
			cursor.exhausted = true;
		}

		Ok(RangeBatch {
			entries,
			has_more: !cursor.exhausted,
		})
	}

	#[instrument(name = "store::single::memory::range_rev_next", level = "trace", skip(self, cursor))]
	fn range_rev_next(
		&self,
		cursor: &mut RangeCursor,
		start: Bound<&[u8]>,
		end: Bound<&[u8]>,
		batch_size: usize,
	) -> Result<RangeBatch> {
		if cursor.exhausted {
			return Ok(RangeBatch::empty());
		}

		let map = self.inner.data.read();

		let actual_end = if let Some(ref last_key) = cursor.last_key {
			Bound::Excluded(last_key.as_slice())
		} else {
			end
		};

		let entries: Vec<RawEntry> = map
			.range::<[u8], _>((start, actual_end))
			.rev()
			.take(batch_size)
			.map(|(k, v)| RawEntry {
				key: k.clone(),
				value: v.clone(),
			})
			.collect();

		if let Some(last_entry) = entries.last() {
			cursor.last_key = Some(last_entry.key.clone());
			cursor.exhausted = entries.len() < batch_size;
		} else {
			cursor.exhausted = true;
		}

		Ok(RangeBatch {
			entries,
			has_more: !cursor.exhausted,
		})
	}

	#[instrument(name = "store::single::memory::ensure_table", level = "trace", skip(self))]
	fn ensure_table(&self) -> Result<()> {
		Ok(())
	}

	#[instrument(name = "store::single::memory::clear_table", level = "debug", skip(self))]
	fn clear_table(&self) -> Result<()> {
		let mut map = self.inner.data.write();
		map.clear();
		Ok(())
	}
}

impl TierBackend for MemoryPrimitiveStorage {}