use bytes::{BufMut, Bytes, BytesMut};
use std::cmp::Ordering;
use std::collections::BTreeSet;
use std::sync::atomic::{AtomicU64, Ordering as AtomicOrdering};
use std::sync::{Arc, Mutex};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
#[repr(u8)]
pub enum ValueType {
Deletion = 0x00,
Value = 0x01,
Merge = 0x02,
Log = 0x03,
}
impl ValueType {
#[inline]
#[must_use]
pub const fn from_u8(v: u8) -> Option<Self> {
match v {
0x00 => Some(Self::Deletion),
0x01 => Some(Self::Value),
0x02 => Some(Self::Merge),
0x03 => Some(Self::Log),
_ => None,
}
}
}
#[derive(Debug, Clone, Eq)]
pub struct InternalKey {
pub user_key: Bytes,
pub seq: u64,
pub kind: ValueType,
}
impl InternalKey {
#[inline]
pub const fn new(user_key: Bytes, seq: u64, kind: ValueType) -> Self {
Self {
user_key,
seq,
kind,
}
}
#[inline]
pub const fn for_lookup(user_key: Bytes) -> Self {
Self {
user_key,
seq: u64::MAX,
kind: ValueType::Value, }
}
#[inline]
pub fn encode(&self) -> Bytes {
let mut buf = BytesMut::with_capacity(self.user_key.len() + 8);
buf.extend_from_slice(&self.user_key);
let packed = (self.seq << 8) | (self.kind as u64);
let inverted = !packed;
buf.put_u64(inverted);
buf.freeze()
}
#[inline]
#[allow(clippy::needless_pass_by_value)] pub fn decode(bytes: Bytes) -> Option<Self> {
if bytes.len() < 8 {
return None;
}
let split_idx = bytes.len() - 8;
let user_key = bytes.slice(..split_idx);
let trailer = bytes.slice(split_idx..);
let inverted = u64::from_be_bytes(trailer.as_ref().try_into().ok()?);
let packed = !inverted;
let kind_u8 = (packed & 0xFF) as u8;
let seq = packed >> 8;
let kind = ValueType::from_u8(kind_u8)?;
Some(Self {
user_key,
seq,
kind,
})
}
#[inline]
pub fn extract_user_key(bytes: &Bytes) -> Bytes {
if bytes.len() <= 8 {
return bytes.clone();
}
bytes.slice(..bytes.len() - 8)
}
}
impl PartialEq for InternalKey {
#[inline]
fn eq(&self, other: &Self) -> bool {
self.user_key == other.user_key && self.seq == other.seq && self.kind == other.kind
}
}
impl PartialOrd for InternalKey {
#[inline]
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl Ord for InternalKey {
#[inline]
fn cmp(&self, other: &Self) -> Ordering {
match self.user_key.cmp(&other.user_key) {
Ordering::Equal => {
other.seq.cmp(&self.seq)
}
ord => ord,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct InternalKeyRef<'a> {
pub user_key: &'a [u8],
pub seq: u64,
pub kind: ValueType,
}
impl<'a> InternalKeyRef<'a> {
#[inline]
pub const fn new(user_key: &'a [u8], seq: u64, kind: ValueType) -> Self {
Self {
user_key,
seq,
kind,
}
}
#[inline]
pub const fn for_lookup(user_key: &'a [u8], snapshot_seq: u64) -> Self {
Self {
user_key,
seq: snapshot_seq,
kind: ValueType::Value,
}
}
#[inline]
pub fn encode_to_vec(&self) -> Vec<u8> {
let mut buf = Vec::with_capacity(self.user_key.len() + 8);
buf.extend_from_slice(self.user_key);
let packed = (self.seq << 8) | (self.kind as u64);
let inverted = !packed;
buf.extend_from_slice(&inverted.to_be_bytes());
buf
}
}
impl PartialOrd for InternalKeyRef<'_> {
#[inline]
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl Ord for InternalKeyRef<'_> {
#[inline]
fn cmp(&self, other: &Self) -> Ordering {
match self.user_key.cmp(other.user_key) {
Ordering::Equal => other.seq.cmp(&self.seq),
ord => ord,
}
}
}
use crossbeam_skiplist_fd::equivalentor::{Comparable, Equivalent};
impl Equivalent<InternalKeyRef<'_>> for InternalKey {
#[inline]
fn equivalent(&self, query: &InternalKeyRef<'_>) -> bool {
self.user_key.as_ref() == query.user_key && self.seq == query.seq && self.kind == query.kind
}
}
impl Comparable<InternalKeyRef<'_>> for InternalKey {
#[inline]
fn compare(&self, query: &InternalKeyRef<'_>) -> Ordering {
match self.user_key.as_ref().cmp(query.user_key) {
Ordering::Equal => query.seq.cmp(&self.seq),
ord => ord,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_internal_key_encoding() {
let key = InternalKey::new(Bytes::from("key"), 100, ValueType::Value);
let encoded = key.encode();
let decoded = InternalKey::decode(encoded).unwrap();
assert_eq!(decoded.user_key, Bytes::from("key"));
assert_eq!(decoded.seq, 100);
assert_eq!(decoded.kind, ValueType::Value);
}
#[test]
fn test_internal_key_sorting() {
let k2 = InternalKey::new(Bytes::from("abc"), 200, ValueType::Value);
let k1 = InternalKey::new(Bytes::from("abc"), 100, ValueType::Value);
let kb = InternalKey::new(Bytes::from("abd"), 100, ValueType::Value);
assert_eq!(k2.cmp(&k1), Ordering::Less);
assert_eq!(k1.cmp(&kb), Ordering::Less);
}
#[test]
fn test_encoded_byte_sorting() {
let k2 = InternalKey::new(Bytes::from("abc"), 200, ValueType::Value);
let k1 = InternalKey::new(Bytes::from("abc"), 100, ValueType::Value);
let e2 = k2.encode();
let e1 = k1.encode();
assert!(e2 < e1);
}
}
pub struct SnapshotTracker {
active: Mutex<BTreeSet<u64>>,
oldest_cached: AtomicU64,
}
impl SnapshotTracker {
#[must_use]
pub const fn new() -> Self {
Self {
active: Mutex::new(BTreeSet::new()),
oldest_cached: AtomicU64::new(u64::MAX), }
}
pub fn register(&self, seq: u64) {
let mut active = self.active.lock().expect("SnapshotTracker lock poisoned");
active.insert(seq);
if let Some(&oldest) = active.iter().next() {
self.oldest_cached.store(oldest, AtomicOrdering::Release);
}
}
pub fn unregister(&self, seq: u64) {
let oldest = {
let mut active = self.active.lock().expect("SnapshotTracker lock poisoned");
active.remove(&seq);
active.iter().next().copied().unwrap_or(u64::MAX)
};
self.oldest_cached.store(oldest, AtomicOrdering::Release);
}
pub fn oldest_snapshot(&self) -> u64 {
self.oldest_cached.load(AtomicOrdering::Acquire)
}
pub fn active_count(&self) -> usize {
self.active
.lock()
.expect("SnapshotTracker lock poisoned")
.len()
}
}
impl Default for SnapshotTracker {
fn default() -> Self {
Self::new()
}
}
pub struct SnapshotHandle {
seq: u64,
tracker: Arc<SnapshotTracker>,
}
impl SnapshotHandle {
pub fn new(seq: u64, tracker: Arc<SnapshotTracker>) -> Self {
tracker.register(seq);
Self { seq, tracker }
}
#[must_use]
pub const fn seq(&self) -> u64 {
self.seq
}
}
impl Drop for SnapshotHandle {
fn drop(&mut self) {
self.tracker.unregister(self.seq);
}
}
#[cfg(test)]
mod snapshot_tracker_tests {
use super::*;
#[test]
fn test_tracker_empty() {
let tracker = SnapshotTracker::new();
assert_eq!(tracker.oldest_snapshot(), u64::MAX);
assert_eq!(tracker.active_count(), 0);
}
#[test]
fn test_tracker_single_snapshot() {
let tracker = SnapshotTracker::new();
tracker.register(100);
assert_eq!(tracker.oldest_snapshot(), 100);
assert_eq!(tracker.active_count(), 1);
tracker.unregister(100);
assert_eq!(tracker.oldest_snapshot(), u64::MAX);
assert_eq!(tracker.active_count(), 0);
}
#[test]
fn test_tracker_multiple_snapshots() {
let tracker = SnapshotTracker::new();
tracker.register(100);
tracker.register(200);
tracker.register(50);
assert_eq!(tracker.oldest_snapshot(), 50);
assert_eq!(tracker.active_count(), 3);
tracker.unregister(50);
assert_eq!(tracker.oldest_snapshot(), 100);
tracker.unregister(200);
assert_eq!(tracker.oldest_snapshot(), 100);
tracker.unregister(100);
assert_eq!(tracker.oldest_snapshot(), u64::MAX);
}
#[test]
fn test_snapshot_handle_auto_unregister() {
let tracker = Arc::new(SnapshotTracker::new());
{
let _handle = SnapshotHandle::new(100, Arc::clone(&tracker));
assert_eq!(tracker.oldest_snapshot(), 100);
assert_eq!(tracker.active_count(), 1);
}
assert_eq!(tracker.oldest_snapshot(), u64::MAX);
assert_eq!(tracker.active_count(), 0);
}
}