crdb_test_utils/
full_object.rs

1use anyhow::anyhow;
2use crdb_core::{
3    BinPtr, DbPtr, DynSized, Event, EventId, Object, ObjectId, ResultExt, Updatedness,
4};
5use std::{
6    collections::{BTreeMap, HashSet},
7    ops::{Bound, RangeBounds},
8    sync::{Arc, RwLock},
9};
10
11#[cfg(test)]
12mod tests;
13
14fn fmt_option_arc(
15    v: &Option<Arc<dyn DynSized>>,
16    fmt: &mut std::fmt::Formatter<'_>,
17) -> std::fmt::Result {
18    match v {
19        Some(v) => write!(fmt, "Some({:p})", &**v),
20        None => write!(fmt, "None"),
21    }
22}
23
24#[derive(Clone, deepsize::DeepSizeOf, educe::Educe)]
25#[educe(Debug)]
26pub struct Change {
27    #[educe(Debug(method = std::fmt::Pointer::fmt))]
28    pub event: Arc<dyn DynSized>,
29    #[educe(Debug(method = fmt_option_arc))]
30    pub snapshot_after: Option<Arc<dyn DynSized>>,
31}
32
33impl Change {
34    pub fn new(event: Arc<dyn DynSized>) -> Change {
35        Change {
36            event,
37            snapshot_after: None,
38        }
39    }
40
41    pub fn set_snapshot(&mut self, snapshot: Arc<dyn DynSized>) {
42        self.snapshot_after = Some(snapshot);
43    }
44}
45
46#[derive(deepsize::DeepSizeOf, educe::Educe)]
47#[educe(Debug)]
48struct FullObjectImpl {
49    id: ObjectId,
50    last_updated: Option<Updatedness>,
51    created_at: EventId,
52    #[educe(Debug(method = std::fmt::Pointer::fmt))]
53    creation: Arc<dyn DynSized>,
54    changes: BTreeMap<EventId, Change>,
55}
56
57pub struct CreationInfo {
58    pub id: ObjectId,
59    pub created_at: EventId,
60    pub creation: Arc<dyn DynSized>,
61}
62
63#[derive(Clone, Debug)]
64pub struct FullObject {
65    data: Arc<RwLock<FullObjectImpl>>,
66}
67
68impl FullObject {
69    pub fn new(
70        id: ObjectId,
71        last_updated: Option<Updatedness>,
72        created_at: EventId,
73        creation: Arc<dyn DynSized>,
74    ) -> FullObject {
75        FullObject {
76            data: Arc::new(RwLock::new(FullObjectImpl {
77                id,
78                last_updated,
79                created_at,
80                creation,
81                changes: BTreeMap::new(),
82            })),
83        }
84    }
85
86    pub fn from_parts(
87        id: ObjectId,
88        last_updated: Option<Updatedness>,
89        created_at: EventId,
90        creation: Arc<dyn DynSized>,
91        changes: BTreeMap<EventId, Change>,
92    ) -> FullObject {
93        FullObject {
94            data: Arc::new(RwLock::new(FullObjectImpl {
95                id,
96                last_updated,
97                created_at,
98                creation,
99                changes,
100            })),
101        }
102    }
103
104    pub fn refcount(&self) -> usize {
105        let mut res = Arc::strong_count(&self.data);
106        let this = self.data.read().unwrap();
107        res += Arc::strong_count(&this.creation) - 1;
108        res += this
109            .changes
110            .values()
111            .map(|v| {
112                v.snapshot_after
113                    .as_ref()
114                    .map(|s| Arc::strong_count(s) - 1)
115                    .unwrap_or(0)
116            })
117            .sum::<usize>();
118        res
119    }
120
121    pub fn id(&self) -> ObjectId {
122        self.data.read().unwrap().id
123    }
124
125    pub fn last_updated(&self) -> Option<Updatedness> {
126        self.data.read().unwrap().last_updated
127    }
128
129    pub fn creation_info(&self) -> CreationInfo {
130        let this = self.data.read().unwrap();
131        CreationInfo {
132            id: this.id,
133            created_at: this.created_at,
134            creation: this.creation.clone(),
135        }
136    }
137
138    pub fn created_at(&self) -> EventId {
139        self.data.read().unwrap().created_at
140    }
141
142    pub fn changes_clone(&self) -> BTreeMap<EventId, Change> {
143        self.data.read().unwrap().changes.clone()
144    }
145
146    pub fn extract_all_clone(&self) -> (CreationInfo, BTreeMap<EventId, Change>) {
147        let this = self.data.read().unwrap();
148        (
149            CreationInfo {
150                id: this.id,
151                created_at: this.created_at,
152                creation: this.creation.clone(),
153            },
154            this.changes.clone(),
155        )
156    }
157
158    pub fn deep_size_of(&self) -> usize {
159        self.data.read().unwrap().deep_size_of()
160    }
161
162    pub fn apply<T: Object>(
163        &self,
164        id: EventId,
165        event: Arc<T::Event>,
166        updatedness: Option<Updatedness>,
167    ) -> crate::Result<bool> {
168        self.data
169            .write()
170            .unwrap()
171            .apply::<T>(id, event, updatedness)
172    }
173
174    pub fn recreate_at<T: Object>(
175        &self,
176        event_id: EventId,
177        updatedness: Option<Updatedness>,
178    ) -> crate::Result<()> {
179        self.data
180            .write()
181            .unwrap()
182            .recreate_at::<T>(event_id, updatedness)
183    }
184
185    pub fn recreate_with<T: Object>(
186        &self,
187        event_id: EventId,
188        data: Arc<T>,
189        updatedness: Option<Updatedness>,
190    ) -> Option<Arc<T>> {
191        self.data
192            .write()
193            .unwrap()
194            .recreate_with::<T>(event_id, data, updatedness)
195    }
196
197    pub fn required_binaries<T: Object>(&self) -> HashSet<BinPtr> {
198        self.data.read().unwrap().required_binaries::<T>()
199    }
200
201    pub fn get_snapshot_at<T: Object>(
202        &self,
203        mut at: Bound<EventId>,
204    ) -> anyhow::Result<(EventId, Arc<T>)> {
205        let mut this = self.data.write().unwrap();
206        // Avoid the panic in get_snapshot_at
207        if !(Bound::Unbounded, at).contains(&this.created_at) {
208            at = Bound::Included(this.created_at);
209        }
210        this.get_snapshot_at::<T>(at)
211    }
212
213    pub fn last_snapshot<T: Object>(&self) -> anyhow::Result<Arc<T>> {
214        {
215            let this = self.data.read().unwrap();
216            if this.changes.is_empty() {
217                return this
218                    .creation
219                    .clone()
220                    .arc_to_any()
221                    .downcast::<T>()
222                    .map_err(|_| anyhow!("Wrong type for snapshot downcast"));
223            }
224            let (_, last_change) = this.changes.last_key_value().unwrap();
225            if let Some(s) = &last_change.snapshot_after {
226                return s
227                    .clone()
228                    .arc_to_any()
229                    .downcast::<T>()
230                    .map_err(|_| anyhow!("Wrong type for snapshot downcast"));
231            }
232        }
233        Ok(self
234            .data
235            .write()
236            .unwrap()
237            .get_snapshot_at(Bound::Unbounded)?
238            .1)
239    }
240}
241
242impl FullObjectImpl {
243    /// Returns `true` if the event was newly applied. Returns `false` if the same event had
244    /// already been applied. Returns an error if another event with the same id had already
245    /// been applied, if the event is earlier than the object's last recreation time, or if
246    /// the provided `T` is wrong.
247    pub fn apply<T: Object>(
248        &mut self,
249        event_id: EventId,
250        event: Arc<T::Event>,
251        updatedness: Option<Updatedness>,
252    ) -> crate::Result<bool> {
253        if event_id <= self.created_at {
254            return Err(crate::Error::EventTooEarly {
255                event_id,
256                object_id: self.id,
257                created_at: self.created_at,
258            });
259        }
260        if let Some(c) = self.changes.get(&event_id) {
261            if (*c.event)
262                .ref_to_any()
263                .downcast_ref::<T::Event>()
264                .map(|e| e != &*event)
265                .unwrap_or(true)
266            {
267                return Err(crate::Error::EventAlreadyExists(event_id));
268            }
269            return Ok(false);
270        }
271
272        // Get the snapshot to just before the new event
273        let (_, mut last_snapshot) = self
274            .get_snapshot_at::<T>(Bound::Excluded(event_id))
275            .wrap_with_context(|| format!("applying event {event_id:?}"))?;
276
277        // Apply the new event
278        let last_snapshot_mut: &mut T = Arc::make_mut(&mut last_snapshot);
279        last_snapshot_mut.apply(DbPtr::from(self.id), &*event);
280        let new_change = Change {
281            event,
282            snapshot_after: Some(last_snapshot),
283        };
284        assert!(
285            self.changes.insert(event_id, new_change).is_none(),
286            "Object {:?} already had an event with id {event_id:?} despite earlier check",
287            self.id,
288        );
289
290        // Finally, invalidate all snapshots since the event
291        let to_invalidate = self
292            .changes
293            .range_mut((Bound::Excluded(event_id), Bound::Unbounded));
294        for c in to_invalidate {
295            c.1.snapshot_after = None;
296        }
297
298        self.bump_last_updated(updatedness);
299
300        Ok(true)
301    }
302
303    fn bump_last_updated(&mut self, updatedness: Option<Updatedness>) {
304        self.last_updated = match (self.last_updated, updatedness) {
305            (None, None) => None,
306            (None, Some(u)) => Some(u),
307            (Some(u), None) => Some(u),
308            (Some(a), Some(b)) => Some(std::cmp::max(a, b)),
309        };
310    }
311
312    pub fn required_binaries<T: Object>(&self) -> HashSet<BinPtr> {
313        let mut res = self
314            .creation
315            .clone()
316            .arc_to_any()
317            .downcast::<T>()
318            .unwrap()
319            .required_binaries()
320            .into_iter()
321            .collect::<HashSet<_>>();
322        for c in &self.changes {
323            res.extend(
324                c.1.event
325                    .clone()
326                    .arc_to_any()
327                    .downcast::<T::Event>()
328                    .unwrap()
329                    .required_binaries()
330                    .into_iter(),
331            );
332        }
333        res
334    }
335
336    pub fn recreate_with<T: Object>(
337        &mut self,
338        new_created_at: EventId,
339        object: Arc<T>,
340        updatedness: Option<Updatedness>,
341    ) -> Option<Arc<T>> {
342        if self.created_at == new_created_at
343            && DynSized::ref_to_any(&*self.creation)
344                .downcast_ref::<T>()
345                .unwrap()
346                == &*object
347        {
348            return None;
349        }
350        self.created_at = new_created_at;
351        self.creation = object as _;
352        self.changes = self.changes.split_off(&new_created_at);
353        self.changes.remove(&new_created_at);
354        for c in self.changes.values_mut() {
355            c.snapshot_after = None;
356        }
357        self.bump_last_updated(updatedness);
358        Some(self.get_snapshot_at(Bound::Unbounded).unwrap().1)
359    }
360
361    pub fn recreate_at<T: Object>(
362        &mut self,
363        max_new_created_at: EventId,
364        updatedness: Option<Updatedness>,
365    ) -> crate::Result<()> {
366        // First, check that we're not trying to roll the creation back in time, as this would result
367        // in passing invalid input to `get_snapshot_at`.
368        if max_new_created_at <= self.created_at {
369            return Ok(());
370        }
371
372        let (new_created_at, snapshot) = self
373            .get_snapshot_at::<T>(Bound::Included(max_new_created_at))
374            .wrap_with_context(|| {
375                format!(
376                    "getting last snapshot before {max_new_created_at:?} for object {:?}",
377                    self.id
378                )
379            })?;
380        if new_created_at != self.created_at {
381            self.created_at = new_created_at;
382            self.creation = snapshot;
383            self.changes = self.changes.split_off(&new_created_at);
384            self.changes.pop_first();
385            self.bump_last_updated(updatedness);
386        }
387        Ok(())
388    }
389
390    /// Returns `(was_actually_last_in_bound, id, event)`
391    fn last_snapshot_before(&self, at: Bound<EventId>) -> (bool, EventId, Arc<dyn DynSized>) {
392        let changes_before = self.changes.range((Bound::Unbounded, at));
393        let mut is_first = true;
394        for (id, c) in changes_before.rev() {
395            if let Some(s) = c.snapshot_after.as_ref() {
396                return (is_first, *id, s.clone());
397            }
398            is_first = false;
399        }
400        (is_first, self.created_at, self.creation.clone())
401    }
402
403    fn get_snapshot_at<T: Object>(
404        &mut self,
405        at: Bound<EventId>,
406    ) -> anyhow::Result<(EventId, Arc<T>)> {
407        debug_assert!(
408            (Bound::Unbounded, at).contains(&self.created_at),
409            "asked `get_snapshot_at` for a too-early bound"
410        );
411        // Find the last snapshot before `at`
412        let (_, last_snapshot_time, last_snapshot) = self.last_snapshot_before(at);
413        let mut last_snapshot = last_snapshot
414            .arc_to_any()
415            .downcast::<T>()
416            .map_err(|_| {
417                anyhow!(
418                    "Failed downcasting {:?} to type {:?}",
419                    self.id,
420                    T::type_ulid()
421                )
422            })
423            .map_err(crate::Error::Other)?;
424        let last_snapshot_mut = Arc::make_mut(&mut last_snapshot);
425
426        // Iterate through the changes since the last snapshot to just before the event
427        let to_apply = self
428            .changes
429            .range((Bound::Excluded(last_snapshot_time), at));
430        let mut last_event_time = last_snapshot_time;
431        for (id, change) in to_apply {
432            last_event_time = *id;
433            last_snapshot_mut.apply(
434                DbPtr::from(self.id),
435                (*change.event)
436                    .ref_to_any()
437                    .downcast_ref()
438                    .expect("Event with different type than object type"),
439            );
440        }
441
442        // Save the computed snapshot
443        if last_event_time != last_snapshot_time {
444            assert!(
445                self.changes
446                    .get_mut(&last_event_time)
447                    .unwrap()
448                    .snapshot_after
449                    .replace(last_snapshot.clone())
450                    .is_none(),
451                "Recomputed snapshot that was already computed"
452            );
453        }
454
455        Ok((last_event_time, last_snapshot))
456    }
457}