Skip to main content

zerodds_dlrl/
object_cache.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2026 ZeroDDS Contributors
3
4//! Object cache with identity tracking — DDS 1.4 §B.2 + §B.6.
5//!
6//! Spec §B.2 — `Cache`: central repository for the DLRL objects of a
7//! `DomainParticipant` instance. Unique identity via `ObjectId`
8//! (topic DCPS key + type discriminator).
9//!
10//! Spec §B.6 — `ObjectRoot`: lifecycle state (`NEW`/`MODIFIED`/
11//! `DELETED`/`COMMITTED`) + version counter for optimistic
12//! concurrency.
13//!
14//! `WeakObjectRef` (Spec §B.6.4) — weak reference to an object;
15//! becomes `None` when the object is removed from the cache.
16
17use alloc::collections::BTreeMap;
18use alloc::string::String;
19use alloc::vec::Vec;
20use core::sync::atomic::{AtomicU64, Ordering};
21
22/// Unique object identity — topic + DCPS key.
23///
24/// Spec §B.6.1: every DLRL object instance has a topic name plus
25/// a key (CDR-encoded BLOB).
26#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
27pub struct ObjectId {
28    /// Topic name (`dds_topic`).
29    pub topic: String,
30    /// CDR-encoded DCPS key.
31    pub key: Vec<u8>,
32}
33
34impl ObjectId {
35    /// Constructor.
36    #[must_use]
37    pub fn new(topic: String, key: Vec<u8>) -> Self {
38        Self { topic, key }
39    }
40}
41
42/// Lifecycle state of a DLRL object. Spec §B.6.2.
43#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
44pub enum ObjectState {
45    /// `NEW` — object created in the cache but not yet committed.
46    New,
47    /// `MODIFIED` — committed, then modified locally.
48    Modified,
49    /// `DELETED` — marked for deletion (cascade pending).
50    Deleted,
51    /// `COMMITTED` — last commit succeeded.
52    Committed,
53}
54
55/// Object entry in the cache. Spec §B.6.
56#[derive(Debug, Clone, PartialEq, Eq)]
57pub struct ObjectRef {
58    /// Identity.
59    pub id: ObjectId,
60    /// CDR-encoded state bytes (caller serializes via XCDR2).
61    pub state: Vec<u8>,
62    /// Lifecycle state.
63    pub lifecycle: ObjectState,
64    /// Version counter — incremented on every modify.
65    /// Optimistic-concurrency check (Spec §B.7.4).
66    pub version: u64,
67}
68
69/// Weak reference to an object in the cache. Spec §B.6.4.
70#[derive(Debug, Clone, PartialEq, Eq)]
71pub struct WeakObjectRef {
72    /// Object identity.
73    pub id: ObjectId,
74    /// Expected version (caller can check whether the object has
75    /// been changed in the meantime).
76    pub expected_version: u64,
77}
78
79impl WeakObjectRef {
80    /// Returns the ID.
81    #[must_use]
82    pub fn id(&self) -> &ObjectId {
83        &self.id
84    }
85
86    /// Returns the expected version.
87    #[must_use]
88    pub fn expected_version(&self) -> u64 {
89        self.expected_version
90    }
91}
92
93/// Object cache — central identity-tracking container. Spec §B.2.
94#[derive(Debug, Default)]
95pub struct ObjectCache {
96    objects: BTreeMap<ObjectId, ObjectRef>,
97    seq: AtomicU64,
98}
99
100impl ObjectCache {
101    /// Constructor.
102    #[must_use]
103    pub fn new() -> Self {
104        Self::default()
105    }
106
107    /// Number of held objects.
108    #[must_use]
109    pub fn len(&self) -> usize {
110        self.objects.len()
111    }
112
113    /// `true` if the cache is empty.
114    #[must_use]
115    pub fn is_empty(&self) -> bool {
116        self.objects.is_empty()
117    }
118
119    /// Spec §B.6.3 `register_object` — inserts a new object into the
120    /// cache. If the ID already exists, the state is overwritten and
121    /// the version is incremented (`MODIFIED`).
122    pub fn register(&mut self, id: ObjectId, state: Vec<u8>) -> ObjectRef {
123        if let Some(existing) = self.objects.get_mut(&id) {
124            existing.state = state;
125            existing.version += 1;
126            existing.lifecycle = ObjectState::Modified;
127            existing.clone()
128        } else {
129            let v = self.seq.fetch_add(1, Ordering::Relaxed) + 1;
130            let entry = ObjectRef {
131                id: id.clone(),
132                state,
133                lifecycle: ObjectState::New,
134                version: v,
135            };
136            self.objects.insert(id, entry.clone());
137            entry
138        }
139    }
140
141    /// Lookup by ID.
142    #[must_use]
143    pub fn get(&self, id: &ObjectId) -> Option<&ObjectRef> {
144        self.objects.get(id)
145    }
146
147    /// Returns a weak reference if the object exists.
148    /// Spec §B.6.4.
149    #[must_use]
150    pub fn weak_ref(&self, id: &ObjectId) -> Option<WeakObjectRef> {
151        self.objects.get(id).map(|o| WeakObjectRef {
152            id: o.id.clone(),
153            expected_version: o.version,
154        })
155    }
156
157    /// Resolves a weak reference. Returns `None` if the object was
158    /// removed or the version no longer matches
159    /// (Spec §B.7.4 optimistic-concurrency check).
160    #[must_use]
161    pub fn resolve(&self, weak: &WeakObjectRef) -> Option<&ObjectRef> {
162        self.objects
163            .get(&weak.id)
164            .filter(|o| o.version == weak.expected_version)
165    }
166
167    /// Spec §B.6.5 `mark_deleted` — marks an object for deletion.
168    /// Actual removal happens on the next `commit_all`.
169    pub fn mark_deleted(&mut self, id: &ObjectId) -> bool {
170        if let Some(o) = self.objects.get_mut(id) {
171            o.lifecycle = ObjectState::Deleted;
172            o.version += 1;
173            true
174        } else {
175            false
176        }
177    }
178
179    /// Spec §B.7.4 `commit_all` — marks all `New`/`Modified` objects
180    /// as `Committed` and removes `Deleted` objects.
181    pub fn commit_all(&mut self) -> usize {
182        let mut deleted = 0;
183        let to_remove: Vec<ObjectId> = self
184            .objects
185            .iter()
186            .filter(|(_, o)| matches!(o.lifecycle, ObjectState::Deleted))
187            .map(|(id, _)| id.clone())
188            .collect();
189        for id in &to_remove {
190            self.objects.remove(id);
191            deleted += 1;
192        }
193        for o in self.objects.values_mut() {
194            o.lifecycle = ObjectState::Committed;
195        }
196        deleted
197    }
198
199    /// Spec §B.7.4 `rollback_all` — removes `New` objects, restores
200    /// `Modified`/`Deleted` back to `Committed` (but not the state
201    /// bytes — the caller must hold a snapshot before `register`).
202    pub fn rollback_all(&mut self) -> usize {
203        let mut affected = 0;
204        let to_remove: Vec<ObjectId> = self
205            .objects
206            .iter()
207            .filter(|(_, o)| matches!(o.lifecycle, ObjectState::New))
208            .map(|(id, _)| id.clone())
209            .collect();
210        for id in &to_remove {
211            self.objects.remove(id);
212            affected += 1;
213        }
214        for o in self.objects.values_mut() {
215            if matches!(o.lifecycle, ObjectState::Modified | ObjectState::Deleted) {
216                o.lifecycle = ObjectState::Committed;
217                affected += 1;
218            }
219        }
220        affected
221    }
222
223    /// List of all object IDs in stable order.
224    #[must_use]
225    pub fn ids(&self) -> Vec<ObjectId> {
226        self.objects.keys().cloned().collect()
227    }
228
229    /// Iterates over all objects.
230    pub fn iter(&self) -> impl Iterator<Item = &ObjectRef> {
231        self.objects.values()
232    }
233}
234
235#[cfg(test)]
236#[allow(clippy::expect_used, clippy::unwrap_used, clippy::panic)]
237mod tests {
238    use super::*;
239
240    fn id(topic: &str, key: &[u8]) -> ObjectId {
241        ObjectId::new(topic.into(), key.to_vec())
242    }
243
244    #[test]
245    fn register_then_get_round_trip() {
246        let mut c = ObjectCache::new();
247        let r = c.register(id("T", b"k1"), alloc::vec![1, 2, 3]);
248        assert_eq!(r.lifecycle, ObjectState::New);
249        assert_eq!(c.len(), 1);
250        assert_eq!(c.get(&id("T", b"k1")).unwrap().state, alloc::vec![1, 2, 3]);
251    }
252
253    #[test]
254    fn re_register_increments_version_and_marks_modified() {
255        let mut c = ObjectCache::new();
256        c.register(id("T", b"k"), alloc::vec![1]);
257        let v0 = c.get(&id("T", b"k")).unwrap().version;
258        let r = c.register(id("T", b"k"), alloc::vec![2]);
259        assert_eq!(r.version, v0 + 1);
260        assert_eq!(r.lifecycle, ObjectState::Modified);
261    }
262
263    #[test]
264    fn weak_ref_resolves_at_same_version() {
265        let mut c = ObjectCache::new();
266        c.register(id("T", b"k"), alloc::vec![1]);
267        let w = c.weak_ref(&id("T", b"k")).unwrap();
268        assert!(c.resolve(&w).is_some());
269    }
270
271    #[test]
272    fn weak_ref_invalidated_on_modify() {
273        let mut c = ObjectCache::new();
274        c.register(id("T", b"k"), alloc::vec![1]);
275        let w = c.weak_ref(&id("T", b"k")).unwrap();
276        c.register(id("T", b"k"), alloc::vec![2]);
277        assert!(c.resolve(&w).is_none());
278    }
279
280    #[test]
281    fn mark_deleted_then_commit_removes() {
282        let mut c = ObjectCache::new();
283        c.register(id("T", b"k"), alloc::vec![1]);
284        c.commit_all();
285        assert!(c.mark_deleted(&id("T", b"k")));
286        let removed = c.commit_all();
287        assert_eq!(removed, 1);
288        assert!(c.is_empty());
289    }
290
291    #[test]
292    fn rollback_drops_new_objects() {
293        let mut c = ObjectCache::new();
294        c.register(id("T", b"a"), alloc::vec![]);
295        c.register(id("T", b"b"), alloc::vec![]);
296        let n = c.rollback_all();
297        assert_eq!(n, 2);
298        assert!(c.is_empty());
299    }
300
301    #[test]
302    fn rollback_after_commit_restores_modified_to_committed() {
303        let mut c = ObjectCache::new();
304        c.register(id("T", b"k"), alloc::vec![1]);
305        c.commit_all();
306        c.register(id("T", b"k"), alloc::vec![2]); // Modified
307        assert_eq!(
308            c.get(&id("T", b"k")).unwrap().lifecycle,
309            ObjectState::Modified
310        );
311        c.rollback_all();
312        assert_eq!(
313            c.get(&id("T", b"k")).unwrap().lifecycle,
314            ObjectState::Committed
315        );
316    }
317
318    #[test]
319    fn mark_deleted_unknown_returns_false() {
320        let mut c = ObjectCache::new();
321        assert!(!c.mark_deleted(&id("T", b"x")));
322    }
323
324    #[test]
325    fn ids_returns_stable_order() {
326        let mut c = ObjectCache::new();
327        c.register(id("T", b"b"), alloc::vec![]);
328        c.register(id("T", b"a"), alloc::vec![]);
329        let ids = c.ids();
330        assert_eq!(ids[0].key, b"a");
331        assert_eq!(ids[1].key, b"b");
332    }
333}