Skip to main content

reifydb_core/actors/
pending.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::{
5	collections::{
6		BTreeMap,
7		btree_map::{Iter, Range},
8	},
9	ops::RangeBounds,
10};
11
12use crate::encoded::{key::EncodedKey, row::EncodedRow};
13
14/// Represents a pending operation on a key
15#[derive(Debug, Clone)]
16pub enum PendingWrite {
17	Set(EncodedRow),
18	Remove,
19}
20
21/// Manages pending writes and removes with sorted key access
22#[derive(Debug, Default, Clone)]
23pub struct Pending {
24	/// Primary storage - BTreeMap for sorted key access and range queries
25	writes: BTreeMap<EncodedKey, PendingWrite>,
26}
27
28impl Pending {
29	/// Create a new empty pending writes manager
30	pub fn new() -> Self {
31		Self {
32			writes: BTreeMap::new(),
33		}
34	}
35
36	/// Insert a write operation
37	pub fn insert(&mut self, key: EncodedKey, value: EncodedRow) {
38		self.writes.insert(key, PendingWrite::Set(value));
39	}
40
41	/// Insert a remove operation
42	pub fn remove(&mut self, key: EncodedKey) {
43		self.writes.insert(key, PendingWrite::Remove);
44	}
45
46	/// Get a value if it exists and is a write (not a remove)
47	pub fn get(&self, key: &EncodedKey) -> Option<&EncodedRow> {
48		match self.writes.get(key) {
49			Some(PendingWrite::Set(value)) => Some(value),
50			_ => None,
51		}
52	}
53
54	/// Check if a key is marked for removal
55	pub fn is_removed(&self, key: &EncodedKey) -> bool {
56		matches!(self.writes.get(key), Some(PendingWrite::Remove))
57	}
58
59	/// Check if a key exists (either as write or remove)
60	pub fn contains_key(&self, key: &EncodedKey) -> bool {
61		self.writes.contains_key(key)
62	}
63
64	/// Iterate over all pending operations in sorted key order
65	pub fn iter_sorted(&self) -> Iter<'_, EncodedKey, PendingWrite> {
66		self.writes.iter()
67	}
68
69	/// Range query over pending operations in sorted key order
70	pub fn range<R>(&self, range: R) -> Range<'_, EncodedKey, PendingWrite>
71	where
72		R: RangeBounds<EncodedKey>,
73	{
74		self.writes.range(range)
75	}
76}
77
78#[cfg(test)]
79pub mod tests {
80	use std::vec;
81
82	use reifydb_type::util::cowvec::CowVec;
83
84	use super::*;
85
86	fn make_key(s: &str) -> EncodedKey {
87		EncodedKey::new(s.as_bytes().to_vec())
88	}
89
90	fn make_value(s: &str) -> EncodedRow {
91		EncodedRow(CowVec::new(s.as_bytes().to_vec()))
92	}
93
94	#[test]
95	fn test_insert_single_write() {
96		let mut pending = Pending::new();
97		let key = make_key("key1");
98		let value = make_value("value1");
99
100		pending.insert(key.clone(), value.clone());
101
102		assert_eq!(pending.get(&key), Some(&value));
103		assert!(!pending.is_removed(&key));
104		assert!(pending.contains_key(&key));
105	}
106
107	#[test]
108	fn test_insert_multiple_writes() {
109		let mut pending = Pending::new();
110
111		pending.insert(make_key("key1"), make_value("value1"));
112		pending.insert(make_key("key2"), make_value("value2"));
113		pending.insert(make_key("key3"), make_value("value3"));
114
115		assert_eq!(pending.get(&make_key("key1")), Some(&make_value("value1")));
116		assert_eq!(pending.get(&make_key("key2")), Some(&make_value("value2")));
117		assert_eq!(pending.get(&make_key("key3")), Some(&make_value("value3")));
118	}
119
120	#[test]
121	fn test_insert_overwrites_existing_key() {
122		let mut pending = Pending::new();
123		let key = make_key("key1");
124
125		pending.insert(key.clone(), make_value("value1"));
126		pending.insert(key.clone(), make_value("value2"));
127
128		assert_eq!(pending.get(&key), Some(&make_value("value2")));
129	}
130
131	#[test]
132	fn test_remove_operation() {
133		let mut pending = Pending::new();
134		let key = make_key("key1");
135
136		pending.remove(key.clone());
137
138		assert!(pending.is_removed(&key));
139		assert!(pending.contains_key(&key));
140		assert_eq!(pending.get(&key), None);
141	}
142
143	#[test]
144	fn test_write_then_remove() {
145		let mut pending = Pending::new();
146		let key = make_key("key1");
147
148		pending.insert(key.clone(), make_value("value1"));
149		assert_eq!(pending.get(&key), Some(&make_value("value1")));
150
151		pending.remove(key.clone());
152		assert!(pending.is_removed(&key));
153		assert_eq!(pending.get(&key), None);
154	}
155
156	#[test]
157	fn test_remove_then_write() {
158		let mut pending = Pending::new();
159		let key = make_key("key1");
160
161		pending.remove(key.clone());
162		assert!(pending.is_removed(&key));
163
164		pending.insert(key.clone(), make_value("value1"));
165		assert!(!pending.is_removed(&key));
166		assert_eq!(pending.get(&key), Some(&make_value("value1")));
167	}
168
169	#[test]
170	fn test_iter_sorted_order() {
171		let mut pending = Pending::new();
172
173		// Insert in non-sorted order
174		pending.insert(make_key("zebra"), make_value("z"));
175		pending.insert(make_key("apple"), make_value("a"));
176		pending.insert(make_key("mango"), make_value("m"));
177
178		let keys: Vec<_> = pending.iter_sorted().map(|(k, _)| k.clone()).collect();
179
180		// BTreeMap should return in sorted order
181		assert_eq!(keys, vec![make_key("apple"), make_key("mango"), make_key("zebra")]);
182	}
183
184	#[test]
185	fn test_range_query() {
186		let mut pending = Pending::new();
187
188		pending.insert(make_key("a"), make_value("1"));
189		pending.insert(make_key("b"), make_value("2"));
190		pending.insert(make_key("c"), make_value("3"));
191		pending.insert(make_key("d"), make_value("4"));
192
193		let range_keys: Vec<_> = pending.range(make_key("b")..make_key("d")).map(|(k, _)| k.clone()).collect();
194
195		assert_eq!(range_keys, vec![make_key("b"), make_key("c")]);
196	}
197
198	#[test]
199	fn test_range_query_inclusive() {
200		let mut pending = Pending::new();
201
202		pending.insert(make_key("a"), make_value("1"));
203		pending.insert(make_key("b"), make_value("2"));
204		pending.insert(make_key("c"), make_value("3"));
205
206		let range_keys: Vec<_> = pending.range(make_key("a")..=make_key("c")).map(|(k, _)| k.clone()).collect();
207
208		assert_eq!(range_keys, vec![make_key("a"), make_key("b"), make_key("c")]);
209	}
210
211	#[test]
212	fn test_range_query_empty() {
213		let mut pending = Pending::new();
214
215		pending.insert(make_key("a"), make_value("1"));
216		pending.insert(make_key("z"), make_value("2"));
217
218		let range_keys: Vec<_> = pending.range(make_key("m")..make_key("n")).map(|(k, _)| k.clone()).collect();
219
220		assert!(range_keys.is_empty());
221	}
222
223	#[test]
224	fn test_contains_key() {
225		let mut pending = Pending::new();
226
227		pending.insert(make_key("key1"), make_value("value1"));
228		pending.remove(make_key("key2"));
229
230		assert!(pending.contains_key(&make_key("key1")));
231		assert!(pending.contains_key(&make_key("key2"))); // Remove is also "contained"
232		assert!(!pending.contains_key(&make_key("key3")));
233	}
234
235	#[test]
236	fn test_get_nonexistent_key() {
237		let pending = Pending::new();
238		assert_eq!(pending.get(&make_key("missing")), None);
239	}
240
241	#[test]
242	fn test_is_removed_nonexistent_key() {
243		let pending = Pending::new();
244		assert!(!pending.is_removed(&make_key("missing")));
245	}
246
247	#[test]
248	fn test_mixed_writes_and_removes() {
249		let mut pending = Pending::new();
250
251		pending.insert(make_key("write1"), make_value("v1"));
252		pending.remove(make_key("remove1"));
253		pending.insert(make_key("write2"), make_value("v2"));
254		pending.remove(make_key("remove2"));
255
256		assert_eq!(pending.get(&make_key("write1")), Some(&make_value("v1")));
257		assert_eq!(pending.get(&make_key("write2")), Some(&make_value("v2")));
258		assert!(pending.is_removed(&make_key("remove1")));
259		assert!(pending.is_removed(&make_key("remove2")));
260		assert_eq!(pending.get(&make_key("remove1")), None);
261		assert_eq!(pending.get(&make_key("remove2")), None);
262	}
263
264	#[test]
265	fn test_iter_sorted_includes_removes() {
266		let mut pending = Pending::new();
267
268		pending.insert(make_key("b"), make_value("2"));
269		pending.remove(make_key("a"));
270		pending.insert(make_key("c"), make_value("3"));
271
272		let items: Vec<_> = pending.iter_sorted().collect();
273		assert_eq!(items.len(), 3);
274
275		// Check order
276		assert_eq!(items[0].0, &make_key("a"));
277		assert!(matches!(items[0].1, PendingWrite::Remove));
278
279		assert_eq!(items[1].0, &make_key("b"));
280		assert!(matches!(items[1].1, PendingWrite::Set(_)));
281
282		assert_eq!(items[2].0, &make_key("c"));
283		assert!(matches!(items[2].1, PendingWrite::Set(_)));
284	}
285}