inc-complete 0.10.2

A library for writing incremental computations that re-execute the minimum number of steps when an input is changed
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
use std::collections::BTreeSet;
use std::sync::Arc;
use std::sync::atomic::{AtomicU32, Ordering};

use crate::accumulate::{ACCUMULATED_COMPUTATION_ID, Accumulate, Accumulated};
use crate::cell::CellData;
use crate::storage::StorageFor;
use crate::{Cell, Computation, Storage};

pub mod debug_with_db;
mod handle;
mod serialize;
mod tests;

pub use handle::DbHandle;
use parking_lot::Mutex;
use rustc_hash::FxHashSet;

const START_VERSION: u32 = 1;

/// The central database object to manage and cache incremental computations.
///
/// To use this, a type implementing `Storage` is required to be provided.
/// See the documentation for `impl_storage!`.
pub struct Db<Storage> {
    cells: dashmap::DashMap<Cell, CellData>,
    version: AtomicU32,
    next_cell: AtomicU32,
    storage: Storage,

    /// Lock used when acquiring new Cells to ensure the same data isn't assigned
    /// multiple ids concurrently. Maps computation_id to each lock.
    cell_locks: dashmap::DashMap<u32, Arc<Mutex<()>>>,
}

impl<Storage: Default> Db<Storage> {
    /// Construct a new `Db` object using `Default::default()` for the initial storage.
    pub fn new() -> Self {
        Self::with_storage(Storage::default())
    }
}

impl<S: Default> Default for Db<S> {
    fn default() -> Self {
        Self::new()
    }
}

/// Abstracts over the `get` function provided by `Db<S>` and `DbHandle<S>` to avoid
/// providing `get` and `get_db` variants for each function.
pub trait DbGet<C: Computation> {
    /// Run an incremental computation `C` and return its output.
    /// If `C` is already cached, no computation will be performed.
    fn get(&self, key: C) -> C::Output;
}

impl<S, C> DbGet<C> for Db<S>
where
    C: Computation,
    S: Storage + StorageFor<C>,
{
    fn get(&self, key: C) -> C::Output {
        self.get(key)
    }
}

impl<S> Db<S> {
    /// Construct a new `Db` object with the given initial storage.
    pub fn with_storage(storage: S) -> Self {
        Self {
            cells: Default::default(),
            version: AtomicU32::new(START_VERSION),
            next_cell: AtomicU32::new(0),
            cell_locks: Default::default(),
            storage,
        }
    }

    /// Retrieve an immutable reference to this `Db`'s storage
    pub fn storage(&self) -> &S {
        &self.storage
    }

    /// Retrieve a mutable reference to this `Db`'s storage.
    ///
    /// Note that any mutations made to the storage using this are _not_ tracked by the `Db`!
    /// Using this incorrectly may break correctness!
    pub fn storage_mut(&mut self) -> &mut S {
        &mut self.storage
    }
}

impl<S: Storage> Db<S> {
    /// Return the corresponding Cell for a given computation, if it exists.
    ///
    /// This will not update any values.
    fn get_cell<C: Computation>(&self, computation: &C) -> Option<Cell>
    where
        S: StorageFor<C>,
    {
        self.storage.get_cell_for_computation(computation)
    }

    pub(crate) fn get_or_insert_cell<C>(&self, input: C) -> Cell
    where
        C: Computation,
        S: StorageFor<C>,
    {
        let computation_id = C::computation_id();
        let lock = self.cell_locks.entry(computation_id).or_default().clone();
        let _guard = lock.lock();

        if let Some(cell) = self.get_cell(&input) {
            cell
        } else {
            // We just need a unique ID here, we don't care about ordering between
            // threads, so we're using Ordering::Relaxed.
            let cell_id = self.next_cell.fetch_add(1, Ordering::Relaxed);
            let new_cell = Cell::new(cell_id);

            self.cells.insert(new_cell, CellData::new(computation_id));
            self.storage.insert_new_cell(new_cell, input);
            new_cell
        }
    }

    fn handle(&self, cell: Cell) -> DbHandle<'_, S> {
        DbHandle::new(self, cell)
    }

    #[cfg(test)]
    #[allow(unused)]
    pub(crate) fn with_cell_data<C: Computation>(&self, input: &C, f: impl FnOnce(&CellData))
    where
        S: StorageFor<C>,
    {
        let cell = self
            .get_cell(input)
            .unwrap_or_else(|| panic!("unwrap_cell_value: Expected cell to exist"));

        self.cells.get(&cell).map(|value| f(&value)).unwrap()
    }

    pub fn version(&self) -> u32 {
        self.version.load(Ordering::SeqCst)
    }

    pub fn gc(&mut self, version: u32) {
        let used_cells: std::collections::HashSet<Cell> = self
            .cells
            .iter()
            .filter_map(|entry| {
                if entry.value().last_verified_version >= version {
                    Some(entry.key().clone())
                } else {
                    None
                }
            })
            .collect();

        self.storage.gc(&used_cells);
    }
}

impl<S: Storage> Db<S> {
    /// Updates an input with a new value
    ///
    /// This requires an exclusive reference to self to ensure that there are no currently
    /// running queries. Updating an input while an incremental computation is occurring
    /// can break soundness for dependency tracking.
    ///
    /// Panics if the given computation is not an input - ie. panics if it has at least 1 dependency.
    pub fn update_input<C>(&mut self, input: C, new_value: C::Output)
    where
        C: Computation,
        S: StorageFor<C>,
    {
        let cell_id = self.get_or_insert_cell(input);
        assert!(
            self.is_input(cell_id),
            "`update_input` given a non-input value. Inputs must have 0 dependencies",
        );

        let changed = self.storage.update_output(cell_id, new_value);
        let mut cell = self.cells.get_mut(&cell_id).unwrap();

        if changed {
            let version = self.version.fetch_add(1, Ordering::SeqCst) + 1;
            cell.last_updated_version = version;
            cell.last_verified_version = version;
        } else {
            cell.last_verified_version = self.version.load(Ordering::SeqCst);
        }
    }

    fn is_input(&self, cell: Cell) -> bool {
        self.with_cell(cell, |cell| {
            cell.dependencies.is_empty() && cell.input_dependencies.is_empty()
        })
    }

    /// True if a given computation is stale and needs to be re-computed.
    /// Computations which have never been computed are also considered stale.
    ///
    /// Note that this may re-compute dependencies of the given computation.
    pub fn is_stale<C: Computation>(&self, input: &C) -> bool
    where
        S: StorageFor<C>,
    {
        // If the cell doesn't exist, it is definitely stale
        let Some(cell) = self.get_cell(input) else {
            return true;
        };
        self.is_stale_cell(cell)
    }

    /// True if a given cell is stale and needs to be re-computed.
    ///
    /// Note that this may re-compute some input
    fn is_stale_cell(&self, cell: Cell) -> bool {
        let computation_id = self.with_cell(cell, |data| data.computation_id);

        if self.storage.output_is_unset(cell, computation_id) {
            return true;
        }

        // if any input dependency has changed, this cell is stale
        let (last_verified, inputs, dependencies) = self.with_cell(cell, |data| {
            (
                data.last_verified_version,
                data.input_dependencies.clone(),
                data.dependencies.clone(),
            )
        });

        // Optimization: only recursively check all dependencies if any
        // of the inputs this cell depends on have changed
        let inputs_changed = inputs.into_iter().any(|input_id| {
            // This cell is stale if the dependency has been updated since
            // we last verified this cell
            self.with_cell(input_id, |input| input.last_updated_version > last_verified)
        });

        // Dependencies need to be iterated in the order they were computed.
        // Otherwise we may re-run a computation which does not need to be re-run.
        // In the worst case this could even lead to panics - see the div0 test.
        inputs_changed
            && dependencies.into_iter().any(|dependency_id| {
                self.update_cell(dependency_id);
                self.with_cell(dependency_id, |dependency| {
                    if computation_id == ACCUMULATED_COMPUTATION_ID {
                        dependency.last_run_version > last_verified
                    } else {
                        dependency.last_updated_version > last_verified
                    }
                })
            })
    }

    /// Similar to `update_input` but runs the compute function
    /// instead of accepting a given value. This also will not update
    /// `self.version`
    fn run_compute_function(&self, cell_id: Cell) {
        let computation_id = self.with_cell(cell_id, |data| data.computation_id);
        self.storage.clear_accumulated_for_cell(cell_id);
        let handle = self.handle(cell_id);
        let changed = S::run_computation(&handle, cell_id, computation_id);

        let version = self.version.load(Ordering::SeqCst);
        let mut cell = self.cells.get_mut(&cell_id).unwrap();
        cell.last_verified_version = version;
        cell.last_run_version = version;

        if changed {
            cell.last_updated_version = version;
        }
    }

    /// Trigger an update of the given cell, recursively checking and re-running any out of date
    /// dependencies.
    fn update_cell(&self, cell_id: Cell) {
        let last_verified_version = self.with_cell(cell_id, |data| data.last_verified_version);
        let version = self.version.load(Ordering::SeqCst);

        if last_verified_version != version {
            // if any dependency may have changed, update
            if self.is_stale_cell(cell_id) {
                let lock = self.with_cell(cell_id, |cell| cell.lock.clone());

                match lock.try_lock() {
                    Some(guard) => {
                        self.run_compute_function(cell_id);
                        drop(guard);
                    }
                    None => {
                        // This computation is already being run in another thread.
                        // Before blocking and waiting, since we have time, check for a cycle and
                        // issue and panic if found.
                        self.check_for_cycle(cell_id);

                        // Block until it finishes and return the result
                        drop(lock.lock());
                    }
                }
            } else {
                let mut cell = self.cells.get_mut(&cell_id).unwrap();
                cell.last_verified_version = version;
            }
        }
    }

    /// Perform a DFS to check for a cycle, panicking if found
    fn check_for_cycle(&self, starting_cell: Cell) {
        let mut visited = FxHashSet::default();
        let mut path = Vec::new();

        // We're going to push actions to this stack. Most actions will be pushing
        // a dependency cell to track as the next node in the graph, but some will be
        // pop actions for popping the top node off the current path. If we encounter
        // a node which is already in the current path, we have found a cycle.
        let mut stack = Vec::new();
        stack.push(Action::Traverse(starting_cell));

        enum Action {
            Traverse(Cell),
            Pop(Cell),
        }

        while let Some(action) = stack.pop() {
            match action {
                // This assert_eq is never expected to fail
                Action::Pop(expected) => assert_eq!(path.pop(), Some(expected)),
                Action::Traverse(cell) => {
                    if path.contains(&cell) {
                        // Include the same cell twice so the cycle is more clear to users
                        path.push(cell);
                        self.cycle_error(&path);
                    }

                    if visited.insert(cell) {
                        path.push(cell);
                        stack.push(Action::Pop(cell));
                        self.with_cell(cell, |cell| {
                            for dependency in cell.dependencies.iter() {
                                stack.push(Action::Traverse(*dependency));
                            }
                        });
                    }
                }
            }
        }
    }

    /// Issue an error with the given cycle
    fn cycle_error(&self, cycle: &[Cell]) {
        let mut error = String::new();
        for (i, cell) in cycle.iter().enumerate() {
            error += &format!(
                "\n  {}. {}",
                i + 1,
                self.storage.input_debug_string(self, *cell)
            );
        }
        panic!("inc-complete: Cycle Detected!\n\nCycle:{error}")
    }

    /// Retrieves the up to date value for the given computation, re-running any dependencies as
    /// necessary.
    ///
    /// This function can panic if the dynamic type of the value returned by `compute.run(..)` is not `T`.
    ///
    /// Locking behavior: This function locks the cell corresponding to the given computation. This
    /// can cause a deadlock if the computation recursively depends on itself.
    pub fn get<C: Computation>(&self, compute: C) -> C::Output
    where
        S: StorageFor<C>,
    {
        let cell_id = self.get_or_insert_cell(compute);
        self.get_with_cell::<C>(cell_id)
    }

    pub(crate) fn get_with_cell<Concrete: Computation>(&self, cell_id: Cell) -> Concrete::Output
    where
        S: StorageFor<Concrete>,
    {
        self.update_cell(cell_id);

        self.storage
            .get_output(cell_id)
            .expect("cell result should have been computed already")
    }

    fn with_cell<R>(&self, cell: Cell, f: impl FnOnce(&CellData) -> R) -> R {
        f(&self.cells.get(&cell).unwrap())
    }

    /// Retrieve each accumulated value of the given type after the given computation is run.
    ///
    /// This is most often used for operations like retrieving diagnostics or logs.
    ///
    /// Compared to [Db::get_accumulated_uncached], this version reuses the normal flow for
    /// queries and thus saves accumulated values for each intermediate query. This involves
    /// more synching and data duplication but can be beneficial if intermediate results
    /// ever need to be reused, e.g. if you call [Db::get_accumulated] in a loop where each
    /// call may share dependencies. If you already have a single query which emits all the
    /// accumulated values you need, [Db::get_accumulated_uncached] is likely faster, but
    /// requires a `&mut Db`.
    pub fn get_accumulated<Item, C>(&self, compute: C) -> BTreeSet<Item>
    where
        S: StorageFor<C> + StorageFor<Accumulated<Item>>,
        C: Computation,
        Item: 'static,
    {
        let cell_id = self.get_or_insert_cell(compute);
        self.update_cell(cell_id);
        self.get(Accumulated::<Item>::new(cell_id))
    }

    /// Retrieve each accumulated value of the given type after the given computation is run.
    ///
    /// This is most often used for operations like retrieving diagnostics or logs.
    ///
    /// This is a faster version of [Db::get_accumulated] for some use-cases. This version tends to be
    /// more efficient when you already have a single query which emits all the accumulated values
    /// you need, while the original [Db::get_accumulated] is more efficient when you have many
    /// smaller calls since it avoids duplicated work and is safe to call with only a [DbHandle].
    pub fn get_accumulated_uncached<Item, C>(&mut self, compute: C) -> BTreeSet<Item>
    where
        S: StorageFor<C> + StorageFor<Accumulated<Item>> + Accumulate<Item>,
        C: Computation,
        Item: 'static + Ord,
    {
        let cell_id = self.get_or_insert_cell(compute);
        self.update_cell(cell_id);

        let mut items = BTreeSet::new();
        let mut visited = BTreeSet::new();
        let mut queue = vec![cell_id];

        while let Some(cell) = queue.pop() {
            if visited.insert(cell) {
                self.with_cell(cell, |data| queue.extend_from_slice(&data.dependencies));
                items.extend(self.storage().get_accumulated::<Vec<Item>>(cell));
            }
        }

        items
    }
}