1use std::{
5 collections::{
6 BTreeMap,
7 btree_map::{Iter, Range},
8 },
9 ops::RangeBounds,
10};
11
12use crate::encoded::{key::EncodedKey, row::EncodedRow};
13
14#[derive(Debug, Clone)]
16pub enum PendingWrite {
17 Set(EncodedRow),
18 Remove,
19}
20
21#[derive(Debug, Default, Clone)]
23pub struct Pending {
24 writes: BTreeMap<EncodedKey, PendingWrite>,
26}
27
28impl Pending {
29 pub fn new() -> Self {
31 Self {
32 writes: BTreeMap::new(),
33 }
34 }
35
36 pub fn insert(&mut self, key: EncodedKey, value: EncodedRow) {
38 self.writes.insert(key, PendingWrite::Set(value));
39 }
40
41 pub fn remove(&mut self, key: EncodedKey) {
43 self.writes.insert(key, PendingWrite::Remove);
44 }
45
46 pub fn get(&self, key: &EncodedKey) -> Option<&EncodedRow> {
48 match self.writes.get(key) {
49 Some(PendingWrite::Set(value)) => Some(value),
50 _ => None,
51 }
52 }
53
54 pub fn is_removed(&self, key: &EncodedKey) -> bool {
56 matches!(self.writes.get(key), Some(PendingWrite::Remove))
57 }
58
59 pub fn contains_key(&self, key: &EncodedKey) -> bool {
61 self.writes.contains_key(key)
62 }
63
64 pub fn iter_sorted(&self) -> Iter<'_, EncodedKey, PendingWrite> {
66 self.writes.iter()
67 }
68
69 pub fn range<R>(&self, range: R) -> Range<'_, EncodedKey, PendingWrite>
71 where
72 R: RangeBounds<EncodedKey>,
73 {
74 self.writes.range(range)
75 }
76}
77
78#[cfg(test)]
79pub mod tests {
80 use std::vec;
81
82 use reifydb_type::util::cowvec::CowVec;
83
84 use super::*;
85
86 fn make_key(s: &str) -> EncodedKey {
87 EncodedKey::new(s.as_bytes().to_vec())
88 }
89
90 fn make_value(s: &str) -> EncodedRow {
91 EncodedRow(CowVec::new(s.as_bytes().to_vec()))
92 }
93
94 #[test]
95 fn test_insert_single_write() {
96 let mut pending = Pending::new();
97 let key = make_key("key1");
98 let value = make_value("value1");
99
100 pending.insert(key.clone(), value.clone());
101
102 assert_eq!(pending.get(&key), Some(&value));
103 assert!(!pending.is_removed(&key));
104 assert!(pending.contains_key(&key));
105 }
106
107 #[test]
108 fn test_insert_multiple_writes() {
109 let mut pending = Pending::new();
110
111 pending.insert(make_key("key1"), make_value("value1"));
112 pending.insert(make_key("key2"), make_value("value2"));
113 pending.insert(make_key("key3"), make_value("value3"));
114
115 assert_eq!(pending.get(&make_key("key1")), Some(&make_value("value1")));
116 assert_eq!(pending.get(&make_key("key2")), Some(&make_value("value2")));
117 assert_eq!(pending.get(&make_key("key3")), Some(&make_value("value3")));
118 }
119
120 #[test]
121 fn test_insert_overwrites_existing_key() {
122 let mut pending = Pending::new();
123 let key = make_key("key1");
124
125 pending.insert(key.clone(), make_value("value1"));
126 pending.insert(key.clone(), make_value("value2"));
127
128 assert_eq!(pending.get(&key), Some(&make_value("value2")));
129 }
130
131 #[test]
132 fn test_remove_operation() {
133 let mut pending = Pending::new();
134 let key = make_key("key1");
135
136 pending.remove(key.clone());
137
138 assert!(pending.is_removed(&key));
139 assert!(pending.contains_key(&key));
140 assert_eq!(pending.get(&key), None);
141 }
142
143 #[test]
144 fn test_write_then_remove() {
145 let mut pending = Pending::new();
146 let key = make_key("key1");
147
148 pending.insert(key.clone(), make_value("value1"));
149 assert_eq!(pending.get(&key), Some(&make_value("value1")));
150
151 pending.remove(key.clone());
152 assert!(pending.is_removed(&key));
153 assert_eq!(pending.get(&key), None);
154 }
155
156 #[test]
157 fn test_remove_then_write() {
158 let mut pending = Pending::new();
159 let key = make_key("key1");
160
161 pending.remove(key.clone());
162 assert!(pending.is_removed(&key));
163
164 pending.insert(key.clone(), make_value("value1"));
165 assert!(!pending.is_removed(&key));
166 assert_eq!(pending.get(&key), Some(&make_value("value1")));
167 }
168
169 #[test]
170 fn test_iter_sorted_order() {
171 let mut pending = Pending::new();
172
173 pending.insert(make_key("zebra"), make_value("z"));
175 pending.insert(make_key("apple"), make_value("a"));
176 pending.insert(make_key("mango"), make_value("m"));
177
178 let keys: Vec<_> = pending.iter_sorted().map(|(k, _)| k.clone()).collect();
179
180 assert_eq!(keys, vec![make_key("apple"), make_key("mango"), make_key("zebra")]);
182 }
183
184 #[test]
185 fn test_range_query() {
186 let mut pending = Pending::new();
187
188 pending.insert(make_key("a"), make_value("1"));
189 pending.insert(make_key("b"), make_value("2"));
190 pending.insert(make_key("c"), make_value("3"));
191 pending.insert(make_key("d"), make_value("4"));
192
193 let range_keys: Vec<_> = pending.range(make_key("b")..make_key("d")).map(|(k, _)| k.clone()).collect();
194
195 assert_eq!(range_keys, vec![make_key("b"), make_key("c")]);
196 }
197
198 #[test]
199 fn test_range_query_inclusive() {
200 let mut pending = Pending::new();
201
202 pending.insert(make_key("a"), make_value("1"));
203 pending.insert(make_key("b"), make_value("2"));
204 pending.insert(make_key("c"), make_value("3"));
205
206 let range_keys: Vec<_> = pending.range(make_key("a")..=make_key("c")).map(|(k, _)| k.clone()).collect();
207
208 assert_eq!(range_keys, vec![make_key("a"), make_key("b"), make_key("c")]);
209 }
210
211 #[test]
212 fn test_range_query_empty() {
213 let mut pending = Pending::new();
214
215 pending.insert(make_key("a"), make_value("1"));
216 pending.insert(make_key("z"), make_value("2"));
217
218 let range_keys: Vec<_> = pending.range(make_key("m")..make_key("n")).map(|(k, _)| k.clone()).collect();
219
220 assert!(range_keys.is_empty());
221 }
222
223 #[test]
224 fn test_contains_key() {
225 let mut pending = Pending::new();
226
227 pending.insert(make_key("key1"), make_value("value1"));
228 pending.remove(make_key("key2"));
229
230 assert!(pending.contains_key(&make_key("key1")));
231 assert!(pending.contains_key(&make_key("key2"))); assert!(!pending.contains_key(&make_key("key3")));
233 }
234
235 #[test]
236 fn test_get_nonexistent_key() {
237 let pending = Pending::new();
238 assert_eq!(pending.get(&make_key("missing")), None);
239 }
240
241 #[test]
242 fn test_is_removed_nonexistent_key() {
243 let pending = Pending::new();
244 assert!(!pending.is_removed(&make_key("missing")));
245 }
246
247 #[test]
248 fn test_mixed_writes_and_removes() {
249 let mut pending = Pending::new();
250
251 pending.insert(make_key("write1"), make_value("v1"));
252 pending.remove(make_key("remove1"));
253 pending.insert(make_key("write2"), make_value("v2"));
254 pending.remove(make_key("remove2"));
255
256 assert_eq!(pending.get(&make_key("write1")), Some(&make_value("v1")));
257 assert_eq!(pending.get(&make_key("write2")), Some(&make_value("v2")));
258 assert!(pending.is_removed(&make_key("remove1")));
259 assert!(pending.is_removed(&make_key("remove2")));
260 assert_eq!(pending.get(&make_key("remove1")), None);
261 assert_eq!(pending.get(&make_key("remove2")), None);
262 }
263
264 #[test]
265 fn test_iter_sorted_includes_removes() {
266 let mut pending = Pending::new();
267
268 pending.insert(make_key("b"), make_value("2"));
269 pending.remove(make_key("a"));
270 pending.insert(make_key("c"), make_value("3"));
271
272 let items: Vec<_> = pending.iter_sorted().collect();
273 assert_eq!(items.len(), 3);
274
275 assert_eq!(items[0].0, &make_key("a"));
277 assert!(matches!(items[0].1, PendingWrite::Remove));
278
279 assert_eq!(items[1].0, &make_key("b"));
280 assert!(matches!(items[1].1, PendingWrite::Set(_)));
281
282 assert_eq!(items[2].0, &make_key("c"));
283 assert!(matches!(items[2].1, PendingWrite::Set(_)));
284 }
285}