1use anyhow::anyhow;
2use crdb_core::{
3 BinPtr, DbPtr, DynSized, Event, EventId, Object, ObjectId, ResultExt, Updatedness,
4};
5use std::{
6 collections::{BTreeMap, HashSet},
7 ops::{Bound, RangeBounds},
8 sync::{Arc, RwLock},
9};
10
11#[cfg(test)]
12mod tests;
13
14fn fmt_option_arc(
15 v: &Option<Arc<dyn DynSized>>,
16 fmt: &mut std::fmt::Formatter<'_>,
17) -> std::fmt::Result {
18 match v {
19 Some(v) => write!(fmt, "Some({:p})", &**v),
20 None => write!(fmt, "None"),
21 }
22}
23
24#[derive(Clone, deepsize::DeepSizeOf, educe::Educe)]
25#[educe(Debug)]
26pub struct Change {
27 #[educe(Debug(method = std::fmt::Pointer::fmt))]
28 pub event: Arc<dyn DynSized>,
29 #[educe(Debug(method = fmt_option_arc))]
30 pub snapshot_after: Option<Arc<dyn DynSized>>,
31}
32
33impl Change {
34 pub fn new(event: Arc<dyn DynSized>) -> Change {
35 Change {
36 event,
37 snapshot_after: None,
38 }
39 }
40
41 pub fn set_snapshot(&mut self, snapshot: Arc<dyn DynSized>) {
42 self.snapshot_after = Some(snapshot);
43 }
44}
45
46#[derive(deepsize::DeepSizeOf, educe::Educe)]
47#[educe(Debug)]
48struct FullObjectImpl {
49 id: ObjectId,
50 last_updated: Option<Updatedness>,
51 created_at: EventId,
52 #[educe(Debug(method = std::fmt::Pointer::fmt))]
53 creation: Arc<dyn DynSized>,
54 changes: BTreeMap<EventId, Change>,
55}
56
57pub struct CreationInfo {
58 pub id: ObjectId,
59 pub created_at: EventId,
60 pub creation: Arc<dyn DynSized>,
61}
62
63#[derive(Clone, Debug)]
64pub struct FullObject {
65 data: Arc<RwLock<FullObjectImpl>>,
66}
67
68impl FullObject {
69 pub fn new(
70 id: ObjectId,
71 last_updated: Option<Updatedness>,
72 created_at: EventId,
73 creation: Arc<dyn DynSized>,
74 ) -> FullObject {
75 FullObject {
76 data: Arc::new(RwLock::new(FullObjectImpl {
77 id,
78 last_updated,
79 created_at,
80 creation,
81 changes: BTreeMap::new(),
82 })),
83 }
84 }
85
86 pub fn from_parts(
87 id: ObjectId,
88 last_updated: Option<Updatedness>,
89 created_at: EventId,
90 creation: Arc<dyn DynSized>,
91 changes: BTreeMap<EventId, Change>,
92 ) -> FullObject {
93 FullObject {
94 data: Arc::new(RwLock::new(FullObjectImpl {
95 id,
96 last_updated,
97 created_at,
98 creation,
99 changes,
100 })),
101 }
102 }
103
104 pub fn refcount(&self) -> usize {
105 let mut res = Arc::strong_count(&self.data);
106 let this = self.data.read().unwrap();
107 res += Arc::strong_count(&this.creation) - 1;
108 res += this
109 .changes
110 .values()
111 .map(|v| {
112 v.snapshot_after
113 .as_ref()
114 .map(|s| Arc::strong_count(s) - 1)
115 .unwrap_or(0)
116 })
117 .sum::<usize>();
118 res
119 }
120
121 pub fn id(&self) -> ObjectId {
122 self.data.read().unwrap().id
123 }
124
125 pub fn last_updated(&self) -> Option<Updatedness> {
126 self.data.read().unwrap().last_updated
127 }
128
129 pub fn creation_info(&self) -> CreationInfo {
130 let this = self.data.read().unwrap();
131 CreationInfo {
132 id: this.id,
133 created_at: this.created_at,
134 creation: this.creation.clone(),
135 }
136 }
137
138 pub fn created_at(&self) -> EventId {
139 self.data.read().unwrap().created_at
140 }
141
142 pub fn changes_clone(&self) -> BTreeMap<EventId, Change> {
143 self.data.read().unwrap().changes.clone()
144 }
145
146 pub fn extract_all_clone(&self) -> (CreationInfo, BTreeMap<EventId, Change>) {
147 let this = self.data.read().unwrap();
148 (
149 CreationInfo {
150 id: this.id,
151 created_at: this.created_at,
152 creation: this.creation.clone(),
153 },
154 this.changes.clone(),
155 )
156 }
157
158 pub fn deep_size_of(&self) -> usize {
159 self.data.read().unwrap().deep_size_of()
160 }
161
162 pub fn apply<T: Object>(
163 &self,
164 id: EventId,
165 event: Arc<T::Event>,
166 updatedness: Option<Updatedness>,
167 ) -> crate::Result<bool> {
168 self.data
169 .write()
170 .unwrap()
171 .apply::<T>(id, event, updatedness)
172 }
173
174 pub fn recreate_at<T: Object>(
175 &self,
176 event_id: EventId,
177 updatedness: Option<Updatedness>,
178 ) -> crate::Result<()> {
179 self.data
180 .write()
181 .unwrap()
182 .recreate_at::<T>(event_id, updatedness)
183 }
184
185 pub fn recreate_with<T: Object>(
186 &self,
187 event_id: EventId,
188 data: Arc<T>,
189 updatedness: Option<Updatedness>,
190 ) -> Option<Arc<T>> {
191 self.data
192 .write()
193 .unwrap()
194 .recreate_with::<T>(event_id, data, updatedness)
195 }
196
197 pub fn required_binaries<T: Object>(&self) -> HashSet<BinPtr> {
198 self.data.read().unwrap().required_binaries::<T>()
199 }
200
201 pub fn get_snapshot_at<T: Object>(
202 &self,
203 mut at: Bound<EventId>,
204 ) -> anyhow::Result<(EventId, Arc<T>)> {
205 let mut this = self.data.write().unwrap();
206 if !(Bound::Unbounded, at).contains(&this.created_at) {
208 at = Bound::Included(this.created_at);
209 }
210 this.get_snapshot_at::<T>(at)
211 }
212
213 pub fn last_snapshot<T: Object>(&self) -> anyhow::Result<Arc<T>> {
214 {
215 let this = self.data.read().unwrap();
216 if this.changes.is_empty() {
217 return this
218 .creation
219 .clone()
220 .arc_to_any()
221 .downcast::<T>()
222 .map_err(|_| anyhow!("Wrong type for snapshot downcast"));
223 }
224 let (_, last_change) = this.changes.last_key_value().unwrap();
225 if let Some(s) = &last_change.snapshot_after {
226 return s
227 .clone()
228 .arc_to_any()
229 .downcast::<T>()
230 .map_err(|_| anyhow!("Wrong type for snapshot downcast"));
231 }
232 }
233 Ok(self
234 .data
235 .write()
236 .unwrap()
237 .get_snapshot_at(Bound::Unbounded)?
238 .1)
239 }
240}
241
242impl FullObjectImpl {
243 pub fn apply<T: Object>(
248 &mut self,
249 event_id: EventId,
250 event: Arc<T::Event>,
251 updatedness: Option<Updatedness>,
252 ) -> crate::Result<bool> {
253 if event_id <= self.created_at {
254 return Err(crate::Error::EventTooEarly {
255 event_id,
256 object_id: self.id,
257 created_at: self.created_at,
258 });
259 }
260 if let Some(c) = self.changes.get(&event_id) {
261 if (*c.event)
262 .ref_to_any()
263 .downcast_ref::<T::Event>()
264 .map(|e| e != &*event)
265 .unwrap_or(true)
266 {
267 return Err(crate::Error::EventAlreadyExists(event_id));
268 }
269 return Ok(false);
270 }
271
272 let (_, mut last_snapshot) = self
274 .get_snapshot_at::<T>(Bound::Excluded(event_id))
275 .wrap_with_context(|| format!("applying event {event_id:?}"))?;
276
277 let last_snapshot_mut: &mut T = Arc::make_mut(&mut last_snapshot);
279 last_snapshot_mut.apply(DbPtr::from(self.id), &*event);
280 let new_change = Change {
281 event,
282 snapshot_after: Some(last_snapshot),
283 };
284 assert!(
285 self.changes.insert(event_id, new_change).is_none(),
286 "Object {:?} already had an event with id {event_id:?} despite earlier check",
287 self.id,
288 );
289
290 let to_invalidate = self
292 .changes
293 .range_mut((Bound::Excluded(event_id), Bound::Unbounded));
294 for c in to_invalidate {
295 c.1.snapshot_after = None;
296 }
297
298 self.bump_last_updated(updatedness);
299
300 Ok(true)
301 }
302
303 fn bump_last_updated(&mut self, updatedness: Option<Updatedness>) {
304 self.last_updated = match (self.last_updated, updatedness) {
305 (None, None) => None,
306 (None, Some(u)) => Some(u),
307 (Some(u), None) => Some(u),
308 (Some(a), Some(b)) => Some(std::cmp::max(a, b)),
309 };
310 }
311
312 pub fn required_binaries<T: Object>(&self) -> HashSet<BinPtr> {
313 let mut res = self
314 .creation
315 .clone()
316 .arc_to_any()
317 .downcast::<T>()
318 .unwrap()
319 .required_binaries()
320 .into_iter()
321 .collect::<HashSet<_>>();
322 for c in &self.changes {
323 res.extend(
324 c.1.event
325 .clone()
326 .arc_to_any()
327 .downcast::<T::Event>()
328 .unwrap()
329 .required_binaries()
330 .into_iter(),
331 );
332 }
333 res
334 }
335
336 pub fn recreate_with<T: Object>(
337 &mut self,
338 new_created_at: EventId,
339 object: Arc<T>,
340 updatedness: Option<Updatedness>,
341 ) -> Option<Arc<T>> {
342 if self.created_at == new_created_at
343 && DynSized::ref_to_any(&*self.creation)
344 .downcast_ref::<T>()
345 .unwrap()
346 == &*object
347 {
348 return None;
349 }
350 self.created_at = new_created_at;
351 self.creation = object as _;
352 self.changes = self.changes.split_off(&new_created_at);
353 self.changes.remove(&new_created_at);
354 for c in self.changes.values_mut() {
355 c.snapshot_after = None;
356 }
357 self.bump_last_updated(updatedness);
358 Some(self.get_snapshot_at(Bound::Unbounded).unwrap().1)
359 }
360
361 pub fn recreate_at<T: Object>(
362 &mut self,
363 max_new_created_at: EventId,
364 updatedness: Option<Updatedness>,
365 ) -> crate::Result<()> {
366 if max_new_created_at <= self.created_at {
369 return Ok(());
370 }
371
372 let (new_created_at, snapshot) = self
373 .get_snapshot_at::<T>(Bound::Included(max_new_created_at))
374 .wrap_with_context(|| {
375 format!(
376 "getting last snapshot before {max_new_created_at:?} for object {:?}",
377 self.id
378 )
379 })?;
380 if new_created_at != self.created_at {
381 self.created_at = new_created_at;
382 self.creation = snapshot;
383 self.changes = self.changes.split_off(&new_created_at);
384 self.changes.pop_first();
385 self.bump_last_updated(updatedness);
386 }
387 Ok(())
388 }
389
390 fn last_snapshot_before(&self, at: Bound<EventId>) -> (bool, EventId, Arc<dyn DynSized>) {
392 let changes_before = self.changes.range((Bound::Unbounded, at));
393 let mut is_first = true;
394 for (id, c) in changes_before.rev() {
395 if let Some(s) = c.snapshot_after.as_ref() {
396 return (is_first, *id, s.clone());
397 }
398 is_first = false;
399 }
400 (is_first, self.created_at, self.creation.clone())
401 }
402
403 fn get_snapshot_at<T: Object>(
404 &mut self,
405 at: Bound<EventId>,
406 ) -> anyhow::Result<(EventId, Arc<T>)> {
407 debug_assert!(
408 (Bound::Unbounded, at).contains(&self.created_at),
409 "asked `get_snapshot_at` for a too-early bound"
410 );
411 let (_, last_snapshot_time, last_snapshot) = self.last_snapshot_before(at);
413 let mut last_snapshot = last_snapshot
414 .arc_to_any()
415 .downcast::<T>()
416 .map_err(|_| {
417 anyhow!(
418 "Failed downcasting {:?} to type {:?}",
419 self.id,
420 T::type_ulid()
421 )
422 })
423 .map_err(crate::Error::Other)?;
424 let last_snapshot_mut = Arc::make_mut(&mut last_snapshot);
425
426 let to_apply = self
428 .changes
429 .range((Bound::Excluded(last_snapshot_time), at));
430 let mut last_event_time = last_snapshot_time;
431 for (id, change) in to_apply {
432 last_event_time = *id;
433 last_snapshot_mut.apply(
434 DbPtr::from(self.id),
435 (*change.event)
436 .ref_to_any()
437 .downcast_ref()
438 .expect("Event with different type than object type"),
439 );
440 }
441
442 if last_event_time != last_snapshot_time {
444 assert!(
445 self.changes
446 .get_mut(&last_event_time)
447 .unwrap()
448 .snapshot_after
449 .replace(last_snapshot.clone())
450 .is_none(),
451 "Recomputed snapshot that was already computed"
452 );
453 }
454
455 Ok((last_event_time, last_snapshot))
456 }
457}