1use alloc::collections::BTreeMap;
18use alloc::string::String;
19use alloc::vec::Vec;
20use core::sync::atomic::{AtomicU64, Ordering};
21
22#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
27pub struct ObjectId {
28 pub topic: String,
30 pub key: Vec<u8>,
32}
33
34impl ObjectId {
35 #[must_use]
37 pub fn new(topic: String, key: Vec<u8>) -> Self {
38 Self { topic, key }
39 }
40}
41
42#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
44pub enum ObjectState {
45 New,
47 Modified,
49 Deleted,
51 Committed,
53}
54
55#[derive(Debug, Clone, PartialEq, Eq)]
57pub struct ObjectRef {
58 pub id: ObjectId,
60 pub state: Vec<u8>,
62 pub lifecycle: ObjectState,
64 pub version: u64,
67}
68
69#[derive(Debug, Clone, PartialEq, Eq)]
71pub struct WeakObjectRef {
72 pub id: ObjectId,
74 pub expected_version: u64,
77}
78
79impl WeakObjectRef {
80 #[must_use]
82 pub fn id(&self) -> &ObjectId {
83 &self.id
84 }
85
86 #[must_use]
88 pub fn expected_version(&self) -> u64 {
89 self.expected_version
90 }
91}
92
93#[derive(Debug, Default)]
95pub struct ObjectCache {
96 objects: BTreeMap<ObjectId, ObjectRef>,
97 seq: AtomicU64,
98}
99
100impl ObjectCache {
101 #[must_use]
103 pub fn new() -> Self {
104 Self::default()
105 }
106
107 #[must_use]
109 pub fn len(&self) -> usize {
110 self.objects.len()
111 }
112
113 #[must_use]
115 pub fn is_empty(&self) -> bool {
116 self.objects.is_empty()
117 }
118
119 pub fn register(&mut self, id: ObjectId, state: Vec<u8>) -> ObjectRef {
123 if let Some(existing) = self.objects.get_mut(&id) {
124 existing.state = state;
125 existing.version += 1;
126 existing.lifecycle = ObjectState::Modified;
127 existing.clone()
128 } else {
129 let v = self.seq.fetch_add(1, Ordering::Relaxed) + 1;
130 let entry = ObjectRef {
131 id: id.clone(),
132 state,
133 lifecycle: ObjectState::New,
134 version: v,
135 };
136 self.objects.insert(id, entry.clone());
137 entry
138 }
139 }
140
141 #[must_use]
143 pub fn get(&self, id: &ObjectId) -> Option<&ObjectRef> {
144 self.objects.get(id)
145 }
146
147 #[must_use]
150 pub fn weak_ref(&self, id: &ObjectId) -> Option<WeakObjectRef> {
151 self.objects.get(id).map(|o| WeakObjectRef {
152 id: o.id.clone(),
153 expected_version: o.version,
154 })
155 }
156
157 #[must_use]
161 pub fn resolve(&self, weak: &WeakObjectRef) -> Option<&ObjectRef> {
162 self.objects
163 .get(&weak.id)
164 .filter(|o| o.version == weak.expected_version)
165 }
166
167 pub fn mark_deleted(&mut self, id: &ObjectId) -> bool {
170 if let Some(o) = self.objects.get_mut(id) {
171 o.lifecycle = ObjectState::Deleted;
172 o.version += 1;
173 true
174 } else {
175 false
176 }
177 }
178
179 pub fn commit_all(&mut self) -> usize {
182 let mut deleted = 0;
183 let to_remove: Vec<ObjectId> = self
184 .objects
185 .iter()
186 .filter(|(_, o)| matches!(o.lifecycle, ObjectState::Deleted))
187 .map(|(id, _)| id.clone())
188 .collect();
189 for id in &to_remove {
190 self.objects.remove(id);
191 deleted += 1;
192 }
193 for o in self.objects.values_mut() {
194 o.lifecycle = ObjectState::Committed;
195 }
196 deleted
197 }
198
199 pub fn rollback_all(&mut self) -> usize {
203 let mut affected = 0;
204 let to_remove: Vec<ObjectId> = self
205 .objects
206 .iter()
207 .filter(|(_, o)| matches!(o.lifecycle, ObjectState::New))
208 .map(|(id, _)| id.clone())
209 .collect();
210 for id in &to_remove {
211 self.objects.remove(id);
212 affected += 1;
213 }
214 for o in self.objects.values_mut() {
215 if matches!(o.lifecycle, ObjectState::Modified | ObjectState::Deleted) {
216 o.lifecycle = ObjectState::Committed;
217 affected += 1;
218 }
219 }
220 affected
221 }
222
223 #[must_use]
225 pub fn ids(&self) -> Vec<ObjectId> {
226 self.objects.keys().cloned().collect()
227 }
228
229 pub fn iter(&self) -> impl Iterator<Item = &ObjectRef> {
231 self.objects.values()
232 }
233}
234
235#[cfg(test)]
236#[allow(clippy::expect_used, clippy::unwrap_used, clippy::panic)]
237mod tests {
238 use super::*;
239
240 fn id(topic: &str, key: &[u8]) -> ObjectId {
241 ObjectId::new(topic.into(), key.to_vec())
242 }
243
244 #[test]
245 fn register_then_get_round_trip() {
246 let mut c = ObjectCache::new();
247 let r = c.register(id("T", b"k1"), alloc::vec![1, 2, 3]);
248 assert_eq!(r.lifecycle, ObjectState::New);
249 assert_eq!(c.len(), 1);
250 assert_eq!(c.get(&id("T", b"k1")).unwrap().state, alloc::vec![1, 2, 3]);
251 }
252
253 #[test]
254 fn re_register_increments_version_and_marks_modified() {
255 let mut c = ObjectCache::new();
256 c.register(id("T", b"k"), alloc::vec![1]);
257 let v0 = c.get(&id("T", b"k")).unwrap().version;
258 let r = c.register(id("T", b"k"), alloc::vec![2]);
259 assert_eq!(r.version, v0 + 1);
260 assert_eq!(r.lifecycle, ObjectState::Modified);
261 }
262
263 #[test]
264 fn weak_ref_resolves_at_same_version() {
265 let mut c = ObjectCache::new();
266 c.register(id("T", b"k"), alloc::vec![1]);
267 let w = c.weak_ref(&id("T", b"k")).unwrap();
268 assert!(c.resolve(&w).is_some());
269 }
270
271 #[test]
272 fn weak_ref_invalidated_on_modify() {
273 let mut c = ObjectCache::new();
274 c.register(id("T", b"k"), alloc::vec![1]);
275 let w = c.weak_ref(&id("T", b"k")).unwrap();
276 c.register(id("T", b"k"), alloc::vec![2]);
277 assert!(c.resolve(&w).is_none());
278 }
279
280 #[test]
281 fn mark_deleted_then_commit_removes() {
282 let mut c = ObjectCache::new();
283 c.register(id("T", b"k"), alloc::vec![1]);
284 c.commit_all();
285 assert!(c.mark_deleted(&id("T", b"k")));
286 let removed = c.commit_all();
287 assert_eq!(removed, 1);
288 assert!(c.is_empty());
289 }
290
291 #[test]
292 fn rollback_drops_new_objects() {
293 let mut c = ObjectCache::new();
294 c.register(id("T", b"a"), alloc::vec![]);
295 c.register(id("T", b"b"), alloc::vec![]);
296 let n = c.rollback_all();
297 assert_eq!(n, 2);
298 assert!(c.is_empty());
299 }
300
301 #[test]
302 fn rollback_after_commit_restores_modified_to_committed() {
303 let mut c = ObjectCache::new();
304 c.register(id("T", b"k"), alloc::vec![1]);
305 c.commit_all();
306 c.register(id("T", b"k"), alloc::vec![2]); assert_eq!(
308 c.get(&id("T", b"k")).unwrap().lifecycle,
309 ObjectState::Modified
310 );
311 c.rollback_all();
312 assert_eq!(
313 c.get(&id("T", b"k")).unwrap().lifecycle,
314 ObjectState::Committed
315 );
316 }
317
318 #[test]
319 fn mark_deleted_unknown_returns_false() {
320 let mut c = ObjectCache::new();
321 assert!(!c.mark_deleted(&id("T", b"x")));
322 }
323
324 #[test]
325 fn ids_returns_stable_order() {
326 let mut c = ObjectCache::new();
327 c.register(id("T", b"b"), alloc::vec![]);
328 c.register(id("T", b"a"), alloc::vec![]);
329 let ids = c.ids();
330 assert_eq!(ids[0].key, b"a");
331 assert_eq!(ids[1].key, b"b");
332 }
333}