use std::{
collections::{
BTreeMap,
btree_map::{Iter, Range},
},
ops::RangeBounds,
};
use crate::encoded::{key::EncodedKey, row::EncodedRow};
#[derive(Debug, Clone)]
pub enum PendingWrite {
Set(EncodedRow),
Remove,
}
#[derive(Debug, Default, Clone)]
pub struct Pending {
writes: BTreeMap<EncodedKey, PendingWrite>,
}
impl Pending {
pub fn new() -> Self {
Self {
writes: BTreeMap::new(),
}
}
pub fn insert(&mut self, key: EncodedKey, value: EncodedRow) {
self.writes.insert(key, PendingWrite::Set(value));
}
pub fn remove(&mut self, key: EncodedKey) {
self.writes.insert(key, PendingWrite::Remove);
}
pub fn get(&self, key: &EncodedKey) -> Option<&EncodedRow> {
match self.writes.get(key) {
Some(PendingWrite::Set(value)) => Some(value),
_ => None,
}
}
pub fn is_removed(&self, key: &EncodedKey) -> bool {
matches!(self.writes.get(key), Some(PendingWrite::Remove))
}
pub fn contains_key(&self, key: &EncodedKey) -> bool {
self.writes.contains_key(key)
}
pub fn iter_sorted(&self) -> Iter<'_, EncodedKey, PendingWrite> {
self.writes.iter()
}
pub fn range<R>(&self, range: R) -> Range<'_, EncodedKey, PendingWrite>
where
R: RangeBounds<EncodedKey>,
{
self.writes.range(range)
}
}
#[cfg(test)]
pub mod tests {
use std::vec;
use reifydb_type::util::cowvec::CowVec;
use super::*;
fn make_key(s: &str) -> EncodedKey {
EncodedKey::new(s.as_bytes().to_vec())
}
fn make_value(s: &str) -> EncodedRow {
EncodedRow(CowVec::new(s.as_bytes().to_vec()))
}
#[test]
fn test_insert_single_write() {
let mut pending = Pending::new();
let key = make_key("key1");
let value = make_value("value1");
pending.insert(key.clone(), value.clone());
assert_eq!(pending.get(&key), Some(&value));
assert!(!pending.is_removed(&key));
assert!(pending.contains_key(&key));
}
#[test]
fn test_insert_multiple_writes() {
let mut pending = Pending::new();
pending.insert(make_key("key1"), make_value("value1"));
pending.insert(make_key("key2"), make_value("value2"));
pending.insert(make_key("key3"), make_value("value3"));
assert_eq!(pending.get(&make_key("key1")), Some(&make_value("value1")));
assert_eq!(pending.get(&make_key("key2")), Some(&make_value("value2")));
assert_eq!(pending.get(&make_key("key3")), Some(&make_value("value3")));
}
#[test]
fn test_insert_overwrites_existing_key() {
let mut pending = Pending::new();
let key = make_key("key1");
pending.insert(key.clone(), make_value("value1"));
pending.insert(key.clone(), make_value("value2"));
assert_eq!(pending.get(&key), Some(&make_value("value2")));
}
#[test]
fn test_remove_operation() {
let mut pending = Pending::new();
let key = make_key("key1");
pending.remove(key.clone());
assert!(pending.is_removed(&key));
assert!(pending.contains_key(&key));
assert_eq!(pending.get(&key), None);
}
#[test]
fn test_write_then_remove() {
let mut pending = Pending::new();
let key = make_key("key1");
pending.insert(key.clone(), make_value("value1"));
assert_eq!(pending.get(&key), Some(&make_value("value1")));
pending.remove(key.clone());
assert!(pending.is_removed(&key));
assert_eq!(pending.get(&key), None);
}
#[test]
fn test_remove_then_write() {
let mut pending = Pending::new();
let key = make_key("key1");
pending.remove(key.clone());
assert!(pending.is_removed(&key));
pending.insert(key.clone(), make_value("value1"));
assert!(!pending.is_removed(&key));
assert_eq!(pending.get(&key), Some(&make_value("value1")));
}
#[test]
fn test_iter_sorted_order() {
let mut pending = Pending::new();
pending.insert(make_key("zebra"), make_value("z"));
pending.insert(make_key("apple"), make_value("a"));
pending.insert(make_key("mango"), make_value("m"));
let keys: Vec<_> = pending.iter_sorted().map(|(k, _)| k.clone()).collect();
assert_eq!(keys, vec![make_key("apple"), make_key("mango"), make_key("zebra")]);
}
#[test]
fn test_range_query() {
let mut pending = Pending::new();
pending.insert(make_key("a"), make_value("1"));
pending.insert(make_key("b"), make_value("2"));
pending.insert(make_key("c"), make_value("3"));
pending.insert(make_key("d"), make_value("4"));
let range_keys: Vec<_> = pending.range(make_key("b")..make_key("d")).map(|(k, _)| k.clone()).collect();
assert_eq!(range_keys, vec![make_key("b"), make_key("c")]);
}
#[test]
fn test_range_query_inclusive() {
let mut pending = Pending::new();
pending.insert(make_key("a"), make_value("1"));
pending.insert(make_key("b"), make_value("2"));
pending.insert(make_key("c"), make_value("3"));
let range_keys: Vec<_> = pending.range(make_key("a")..=make_key("c")).map(|(k, _)| k.clone()).collect();
assert_eq!(range_keys, vec![make_key("a"), make_key("b"), make_key("c")]);
}
#[test]
fn test_range_query_empty() {
let mut pending = Pending::new();
pending.insert(make_key("a"), make_value("1"));
pending.insert(make_key("z"), make_value("2"));
let range_keys: Vec<_> = pending.range(make_key("m")..make_key("n")).map(|(k, _)| k.clone()).collect();
assert!(range_keys.is_empty());
}
#[test]
fn test_contains_key() {
let mut pending = Pending::new();
pending.insert(make_key("key1"), make_value("value1"));
pending.remove(make_key("key2"));
assert!(pending.contains_key(&make_key("key1")));
assert!(pending.contains_key(&make_key("key2"))); assert!(!pending.contains_key(&make_key("key3")));
}
#[test]
fn test_get_nonexistent_key() {
let pending = Pending::new();
assert_eq!(pending.get(&make_key("missing")), None);
}
#[test]
fn test_is_removed_nonexistent_key() {
let pending = Pending::new();
assert!(!pending.is_removed(&make_key("missing")));
}
#[test]
fn test_mixed_writes_and_removes() {
let mut pending = Pending::new();
pending.insert(make_key("write1"), make_value("v1"));
pending.remove(make_key("remove1"));
pending.insert(make_key("write2"), make_value("v2"));
pending.remove(make_key("remove2"));
assert_eq!(pending.get(&make_key("write1")), Some(&make_value("v1")));
assert_eq!(pending.get(&make_key("write2")), Some(&make_value("v2")));
assert!(pending.is_removed(&make_key("remove1")));
assert!(pending.is_removed(&make_key("remove2")));
assert_eq!(pending.get(&make_key("remove1")), None);
assert_eq!(pending.get(&make_key("remove2")), None);
}
#[test]
fn test_iter_sorted_includes_removes() {
let mut pending = Pending::new();
pending.insert(make_key("b"), make_value("2"));
pending.remove(make_key("a"));
pending.insert(make_key("c"), make_value("3"));
let items: Vec<_> = pending.iter_sorted().collect();
assert_eq!(items.len(), 3);
assert_eq!(items[0].0, &make_key("a"));
assert!(matches!(items[0].1, PendingWrite::Remove));
assert_eq!(items[1].0, &make_key("b"));
assert!(matches!(items[1].1, PendingWrite::Set(_)));
assert_eq!(items[2].0, &make_key("c"));
assert!(matches!(items[2].1, PendingWrite::Set(_)));
}
}