salsa/
runtime.rs

1use crate::durability::Durability;
2use crate::hash::FxIndexSet;
3use crate::plumbing::CycleRecoveryStrategy;
4use crate::revision::{AtomicRevision, Revision};
5use crate::{Cancelled, Cycle, Database, DatabaseKeyIndex, Event, EventKind};
6use parking_lot::lock_api::{RawRwLock, RawRwLockRecursive};
7use parking_lot::{Mutex, RwLock};
8use std::hash::Hash;
9use std::panic::panic_any;
10use std::sync::atomic::{AtomicUsize, Ordering};
11use tracing::debug;
12use triomphe::Arc;
13
14mod dependency_graph;
15use dependency_graph::DependencyGraph;
16
17pub(crate) mod local_state;
18use local_state::LocalState;
19
20use self::local_state::{ActiveQueryGuard, QueryInputs, QueryRevisions};
21
22/// The salsa runtime stores the storage for all queries as well as
23/// tracking the query stack and dependencies between cycles.
24///
25/// Each new runtime you create (e.g., via `Runtime::new` or
26/// `Runtime::default`) will have an independent set of query storage
27/// associated with it. Normally, therefore, you only do this once, at
28/// the start of your application.
29pub struct Runtime {
30    /// Our unique runtime id.
31    id: RuntimeId,
32
33    /// If this is a "forked" runtime, then the `revision_guard` will
34    /// be `Some`; this guard holds a read-lock on the global query
35    /// lock.
36    revision_guard: Option<RevisionGuard>,
37
38    /// Local state that is specific to this runtime (thread).
39    local_state: LocalState,
40
41    /// Shared state that is accessible via all runtimes.
42    shared_state: Arc<SharedState>,
43}
44
45#[derive(Clone, Debug)]
46pub(crate) enum WaitResult {
47    Completed,
48    Panicked,
49    Cycle(Cycle),
50}
51
52impl Default for Runtime {
53    fn default() -> Self {
54        Runtime {
55            id: RuntimeId { counter: 0 },
56            revision_guard: None,
57            shared_state: Default::default(),
58            local_state: Default::default(),
59        }
60    }
61}
62
63impl std::fmt::Debug for Runtime {
64    fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
65        fmt.debug_struct("Runtime")
66            .field("id", &self.id())
67            .field("forked", &self.revision_guard.is_some())
68            .field("shared_state", &self.shared_state)
69            .finish()
70    }
71}
72
73impl Runtime {
74    /// Create a new runtime; equivalent to `Self::default`. This is
75    /// used when creating a new database.
76    pub fn new() -> Self {
77        Self::default()
78    }
79
80    /// See [`crate::storage::Storage::snapshot`].
81    pub(crate) fn snapshot(&self) -> Self {
82        if self.local_state.query_in_progress() {
83            panic!("it is not legal to `snapshot` during a query (see salsa-rs/salsa#80)");
84        }
85
86        let revision_guard = RevisionGuard::new(&self.shared_state);
87
88        let id = RuntimeId {
89            counter: self.shared_state.next_id.fetch_add(1, Ordering::SeqCst),
90        };
91
92        Runtime {
93            id,
94            revision_guard: Some(revision_guard),
95            shared_state: self.shared_state.clone(),
96            local_state: Default::default(),
97        }
98    }
99
100    /// A "synthetic write" causes the system to act *as though* some
101    /// input of durability `durability` has changed. This is mostly
102    /// useful for profiling scenarios.
103    ///
104    /// **WARNING:** Just like an ordinary write, this method triggers
105    /// cancellation. If you invoke it while a snapshot exists, it
106    /// will block until that snapshot is dropped -- if that snapshot
107    /// is owned by the current thread, this could trigger deadlock.
108    pub fn synthetic_write(&mut self, durability: Durability) {
109        self.with_incremented_revision(|_next_revision| Some(durability));
110    }
111
112    /// The unique identifier attached to this `SalsaRuntime`. Each
113    /// snapshotted runtime has a distinct identifier.
114    #[inline]
115    pub fn id(&self) -> RuntimeId {
116        self.id
117    }
118
119    /// Returns the database-key for the query that this thread is
120    /// actively executing (if any).
121    pub fn active_query(&self) -> Option<DatabaseKeyIndex> {
122        self.local_state.active_query()
123    }
124
125    /// Read current value of the revision counter.
126    #[inline]
127    pub(crate) fn current_revision(&self) -> Revision {
128        self.shared_state.revisions[0].load()
129    }
130
131    /// The revision in which values with durability `d` may have last
132    /// changed.  For D0, this is just the current revision. But for
133    /// higher levels of durability, this value may lag behind the
134    /// current revision. If we encounter a value of durability Di,
135    /// then, we can check this function to get a "bound" on when the
136    /// value may have changed, which allows us to skip walking its
137    /// dependencies.
138    #[inline]
139    pub(crate) fn last_changed_revision(&self, d: Durability) -> Revision {
140        self.shared_state.revisions[d.index()].load()
141    }
142
143    /// Read current value of the revision counter.
144    #[inline]
145    pub(crate) fn pending_revision(&self) -> Revision {
146        self.shared_state.pending_revision.load()
147    }
148
149    #[cold]
150    pub(crate) fn unwind_cancelled(&self) {
151        self.report_untracked_read();
152        Cancelled::PendingWrite.throw();
153    }
154
155    /// Acquires the **global query write lock** (ensuring that no queries are
156    /// executing) and then increments the current revision counter; invokes
157    /// `op` with the global query write lock still held.
158    ///
159    /// While we wait to acquire the global query write lock, this method will
160    /// also increment `pending_revision_increments`, thus signalling to queries
161    /// that their results are "cancelled" and they should abort as expeditiously
162    /// as possible.
163    ///
164    /// The `op` closure should actually perform the writes needed. It is given
165    /// the new revision as an argument, and its return value indicates whether
166    /// any pre-existing value was modified:
167    ///
168    /// - returning `None` means that no pre-existing value was modified (this
169    ///   could occur e.g. when setting some key on an input that was never set
170    ///   before)
171    /// - returning `Some(d)` indicates that a pre-existing value was modified
172    ///   and it had the durability `d`. This will update the records for when
173    ///   values with each durability were modified.
174    ///
175    /// Note that, given our writer model, we can assume that only one thread is
176    /// attempting to increment the global revision at a time.
177    pub(crate) fn with_incremented_revision<F>(&mut self, op: F)
178    where
179        F: FnOnce(Revision) -> Option<Durability>,
180    {
181        tracing::debug!("increment_revision()");
182
183        if !self.permits_increment() {
184            panic!("increment_revision invoked during a query computation");
185        }
186
187        // Set the `pending_revision` field so that people
188        // know current revision is cancelled.
189        let current_revision = self.shared_state.pending_revision.fetch_then_increment();
190
191        // To modify the revision, we need the lock.
192        let shared_state = self.shared_state.clone();
193        let _lock = shared_state.query_lock.write();
194
195        let old_revision = self.shared_state.revisions[0].fetch_then_increment();
196        assert_eq!(current_revision, old_revision);
197
198        let new_revision = current_revision.next();
199
200        debug!("increment_revision: incremented to {:?}", new_revision);
201
202        if let Some(d) = op(new_revision) {
203            for rev in &self.shared_state.revisions[1..=d.index()] {
204                rev.store(new_revision);
205            }
206        }
207    }
208
209    pub(crate) fn permits_increment(&self) -> bool {
210        self.revision_guard.is_none() && !self.local_state.query_in_progress()
211    }
212
213    #[inline]
214    pub(crate) fn push_query(&self, database_key_index: DatabaseKeyIndex) -> ActiveQueryGuard<'_> {
215        self.local_state.push_query(database_key_index)
216    }
217
218    /// Reports that the currently active query read the result from
219    /// another query.
220    ///
221    /// Also checks whether the "cycle participant" flag is set on
222    /// the current stack frame -- if so, panics with `CycleParticipant`
223    /// value, which should be caught by the code executing the query.
224    ///
225    /// # Parameters
226    ///
227    /// - `database_key`: the query whose result was read
228    /// - `changed_revision`: the last revision in which the result of that
229    ///   query had changed
230    pub(crate) fn report_query_read_and_unwind_if_cycle_resulted(
231        &self,
232        input: DatabaseKeyIndex,
233        durability: Durability,
234        changed_at: Revision,
235    ) {
236        self.local_state
237            .report_query_read_and_unwind_if_cycle_resulted(input, durability, changed_at);
238    }
239
240    /// Reports that the query depends on some state unknown to salsa.
241    ///
242    /// Queries which report untracked reads will be re-executed in the next
243    /// revision.
244    pub fn report_untracked_read(&self) {
245        self.local_state
246            .report_untracked_read(self.current_revision());
247    }
248
249    /// Acts as though the current query had read an input with the given durability; this will force the current query's durability to be at most `durability`.
250    ///
251    /// This is mostly useful to control the durability level for [on-demand inputs](https://salsa-rs.github.io/salsa/common_patterns/on_demand_inputs.html).
252    pub fn report_synthetic_read(&self, durability: Durability) {
253        let changed_at = self.last_changed_revision(durability);
254        self.local_state
255            .report_synthetic_read(durability, changed_at);
256    }
257
258    /// Handles a cycle in the dependency graph that was detected when the
259    /// current thread tried to block on `database_key_index` which is being
260    /// executed by `to_id`. If this function returns, then `to_id` no longer
261    /// depends on the current thread, and so we should continue executing
262    /// as normal. Otherwise, the function will throw a `Cycle` which is expected
263    /// to be caught by some frame on our stack. This occurs either if there is
264    /// a frame on our stack with cycle recovery (possibly the top one!) or if there
265    /// is no cycle recovery at all.
266    fn unblock_cycle_and_maybe_throw(
267        &self,
268        db: &dyn Database,
269        dg: &mut DependencyGraph,
270        database_key_index: DatabaseKeyIndex,
271        to_id: RuntimeId,
272    ) {
273        debug!(
274            "unblock_cycle_and_maybe_throw(database_key={:?})",
275            database_key_index
276        );
277
278        let mut from_stack = self.local_state.take_query_stack();
279        let from_id = self.id();
280
281        // Make a "dummy stack frame". As we iterate through the cycle, we will collect the
282        // inputs from each participant. Then, if we are participating in cycle recovery, we
283        // will propagate those results to all participants.
284        let mut cycle_query = ActiveQuery::new(database_key_index);
285
286        // Identify the cycle participants:
287        let cycle = {
288            let mut v = vec![];
289            dg.for_each_cycle_participant(
290                from_id,
291                &mut from_stack,
292                database_key_index,
293                to_id,
294                |aqs| {
295                    aqs.iter_mut().for_each(|aq| {
296                        cycle_query.add_from(aq);
297                        v.push(aq.database_key_index);
298                    });
299                },
300            );
301
302            // We want to give the participants in a deterministic order
303            // (at least for this execution, not necessarily across executions),
304            // no matter where it started on the stack. Find the minimum
305            // key and rotate it to the front.
306            let min = v.iter().min().unwrap();
307            let index = v.iter().position(|p| p == min).unwrap();
308            v.rotate_left(index);
309
310            // No need to store extra memory.
311            v.shrink_to_fit();
312
313            Cycle::new(Arc::new(v))
314        };
315        debug!(
316            "cycle {:?}, cycle_query {:#?}",
317            cycle.debug(db),
318            cycle_query,
319        );
320
321        // We can remove the cycle participants from the list of dependencies;
322        // they are a strongly connected component (SCC) and we only care about
323        // dependencies to things outside the SCC that control whether it will
324        // form again.
325        cycle_query.remove_cycle_participants(&cycle);
326
327        // Mark each cycle participant that has recovery set, along with
328        // any frames that come after them on the same thread. Those frames
329        // are going to be unwound so that fallback can occur.
330        dg.for_each_cycle_participant(from_id, &mut from_stack, database_key_index, to_id, |aqs| {
331            aqs.iter_mut()
332                .skip_while(
333                    |aq| match db.cycle_recovery_strategy(aq.database_key_index) {
334                        CycleRecoveryStrategy::Panic => true,
335                        CycleRecoveryStrategy::Fallback => false,
336                    },
337                )
338                .for_each(|aq| {
339                    debug!("marking {:?} for fallback", aq.database_key_index.debug(db));
340                    aq.take_inputs_from(&cycle_query);
341                    assert!(aq.cycle.is_none());
342                    aq.cycle = Some(cycle.clone());
343                });
344        });
345
346        // Unblock every thread that has cycle recovery with a `WaitResult::Cycle`.
347        // They will throw the cycle, which will be caught by the frame that has
348        // cycle recovery so that it can execute that recovery.
349        let (me_recovered, others_recovered) =
350            dg.maybe_unblock_runtimes_in_cycle(from_id, &from_stack, database_key_index, to_id);
351
352        self.local_state.restore_query_stack(from_stack);
353
354        if me_recovered {
355            // If the current thread has recovery, we want to throw
356            // so that it can begin.
357            cycle.throw()
358        } else if others_recovered {
359            // If other threads have recovery but we didn't: return and we will block on them.
360        } else {
361            // if nobody has recover, then we panic
362            panic_any(cycle);
363        }
364    }
365
366    /// Block until `other_id` completes executing `database_key`;
367    /// panic or unwind in the case of a cycle.
368    ///
369    /// `query_mutex_guard` is the guard for the current query's state;
370    /// it will be dropped after we have successfully registered the
371    /// dependency.
372    ///
373    /// # Propagating panics
374    ///
375    /// If the thread `other_id` panics, then our thread is considered
376    /// cancelled, so this function will panic with a `Cancelled` value.
377    ///
378    /// # Cycle handling
379    ///
380    /// If the thread `other_id` already depends on the current thread,
381    /// and hence there is a cycle in the query graph, then this function
382    /// will unwind instead of returning normally. The method of unwinding
383    /// depends on the [`Self::mutual_cycle_recovery_strategy`]
384    /// of the cycle participants:
385    ///
386    /// * [`CycleRecoveryStrategy::Panic`]: panic with the [`Cycle`] as the value.
387    /// * [`CycleRecoveryStrategy::Fallback`]: initiate unwinding with [`CycleParticipant::unwind`].
388    pub(crate) fn block_on_or_unwind<QueryMutexGuard>(
389        &self,
390        db: &dyn Database,
391        database_key: DatabaseKeyIndex,
392        other_id: RuntimeId,
393        query_mutex_guard: QueryMutexGuard,
394    ) {
395        let mut dg = self.shared_state.dependency_graph.lock();
396
397        if dg.depends_on(other_id, self.id()) {
398            self.unblock_cycle_and_maybe_throw(db, &mut dg, database_key, other_id);
399
400            // If the above fn returns, then (via cycle recovery) it has unblocked the
401            // cycle, so we can continue.
402            assert!(!dg.depends_on(other_id, self.id()));
403        }
404
405        db.salsa_event(Event {
406            runtime_id: self.id(),
407            kind: EventKind::WillBlockOn {
408                other_runtime_id: other_id,
409                database_key,
410            },
411        });
412
413        let stack = self.local_state.take_query_stack();
414
415        let (stack, result) = DependencyGraph::block_on(
416            dg,
417            self.id(),
418            database_key,
419            other_id,
420            stack,
421            query_mutex_guard,
422        );
423
424        self.local_state.restore_query_stack(stack);
425
426        match result {
427            WaitResult::Completed => (),
428
429            // If the other thread panicked, then we consider this thread
430            // cancelled. The assumption is that the panic will be detected
431            // by the other thread and responded to appropriately.
432            WaitResult::Panicked => Cancelled::PropagatedPanic.throw(),
433
434            WaitResult::Cycle(c) => c.throw(),
435        }
436    }
437
438    /// Invoked when this runtime completed computing `database_key` with
439    /// the given result `wait_result` (`wait_result` should be `None` if
440    /// computing `database_key` panicked and could not complete).
441    /// This function unblocks any dependent queries and allows them
442    /// to continue executing.
443    pub(crate) fn unblock_queries_blocked_on(
444        &self,
445        database_key: DatabaseKeyIndex,
446        wait_result: WaitResult,
447    ) {
448        self.shared_state
449            .dependency_graph
450            .lock()
451            .unblock_runtimes_blocked_on(database_key, wait_result);
452    }
453}
454
455/// State that will be common to all threads (when we support multiple threads)
456struct SharedState {
457    /// Stores the next id to use for a snapshotted runtime (starts at 1).
458    next_id: AtomicUsize,
459
460    /// Whenever derived queries are executing, they acquire this lock
461    /// in read mode. Mutating inputs (and thus creating a new
462    /// revision) requires a write lock (thus guaranteeing that no
463    /// derived queries are in progress). Note that this is not needed
464    /// to prevent **race conditions** -- the revision counter itself
465    /// is stored in an `AtomicUsize` so it can be cheaply read
466    /// without acquiring the lock.  Rather, the `query_lock` is used
467    /// to ensure a higher-level consistency property.
468    query_lock: RwLock<()>,
469
470    /// This is typically equal to `revision` -- set to `revision+1`
471    /// when a new revision is pending (which implies that the current
472    /// revision is cancelled).
473    pending_revision: AtomicRevision,
474
475    /// Stores the "last change" revision for values of each duration.
476    /// This vector is always of length at least 1 (for Durability 0)
477    /// but its total length depends on the number of durations. The
478    /// element at index 0 is special as it represents the "current
479    /// revision".  In general, we have the invariant that revisions
480    /// in here are *declining* -- that is, `revisions[i] >=
481    /// revisions[i + 1]`, for all `i`. This is because when you
482    /// modify a value with durability D, that implies that values
483    /// with durability less than D may have changed too.
484    revisions: Vec<AtomicRevision>,
485
486    /// The dependency graph tracks which runtimes are blocked on one
487    /// another, waiting for queries to terminate.
488    dependency_graph: Mutex<DependencyGraph>,
489}
490
491impl SharedState {
492    fn with_durabilities(durabilities: usize) -> Self {
493        SharedState {
494            next_id: AtomicUsize::new(1),
495            query_lock: Default::default(),
496            revisions: (0..durabilities).map(|_| AtomicRevision::start()).collect(),
497            pending_revision: AtomicRevision::start(),
498            dependency_graph: Default::default(),
499        }
500    }
501}
502
503impl std::panic::RefUnwindSafe for SharedState {}
504
505impl Default for SharedState {
506    fn default() -> Self {
507        Self::with_durabilities(Durability::LEN)
508    }
509}
510
511impl std::fmt::Debug for SharedState {
512    fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
513        let query_lock = if self.query_lock.try_write().is_some() {
514            "<unlocked>"
515        } else if self.query_lock.try_read().is_some() {
516            "<rlocked>"
517        } else {
518            "<wlocked>"
519        };
520        fmt.debug_struct("SharedState")
521            .field("query_lock", &query_lock)
522            .field("revisions", &self.revisions)
523            .field("pending_revision", &self.pending_revision)
524            .finish()
525    }
526}
527
528#[derive(Debug)]
529struct ActiveQuery {
530    /// What query is executing
531    database_key_index: DatabaseKeyIndex,
532
533    /// Minimum durability of inputs observed so far.
534    durability: Durability,
535
536    /// Maximum revision of all inputs observed. If we observe an
537    /// untracked read, this will be set to the most recent revision.
538    changed_at: Revision,
539
540    /// Set of subqueries that were accessed thus far, or `None` if
541    /// there was an untracked the read.
542    dependencies: Option<FxIndexSet<DatabaseKeyIndex>>,
543
544    /// Stores the entire cycle, if one is found and this query is part of it.
545    cycle: Option<Cycle>,
546}
547
548impl ActiveQuery {
549    fn new(database_key_index: DatabaseKeyIndex) -> Self {
550        ActiveQuery {
551            database_key_index,
552            durability: Durability::MAX,
553            changed_at: Revision::start(),
554            dependencies: Some(FxIndexSet::default()),
555            cycle: None,
556        }
557    }
558
559    fn add_read(&mut self, input: DatabaseKeyIndex, durability: Durability, revision: Revision) {
560        if let Some(set) = &mut self.dependencies {
561            set.insert(input);
562        }
563
564        self.durability = self.durability.min(durability);
565        self.changed_at = self.changed_at.max(revision);
566    }
567
568    fn add_untracked_read(&mut self, changed_at: Revision) {
569        self.dependencies = None;
570        self.durability = Durability::LOW;
571        self.changed_at = changed_at;
572    }
573
574    fn add_synthetic_read(&mut self, durability: Durability, revision: Revision) {
575        self.dependencies = None;
576        self.durability = self.durability.min(durability);
577        self.changed_at = self.changed_at.max(revision);
578    }
579
580    pub(crate) fn revisions(&self) -> QueryRevisions {
581        let inputs = match &self.dependencies {
582            None => QueryInputs::Untracked,
583
584            Some(dependencies) => {
585                if dependencies.is_empty() {
586                    QueryInputs::NoInputs
587                } else {
588                    QueryInputs::Tracked {
589                        inputs: dependencies.iter().copied().collect(),
590                    }
591                }
592            }
593        };
594
595        QueryRevisions {
596            changed_at: self.changed_at,
597            inputs,
598            durability: self.durability,
599        }
600    }
601
602    /// Adds any dependencies from `other` into `self`.
603    /// Used during cycle recovery, see [`Runtime::create_cycle_error`].
604    fn add_from(&mut self, other: &ActiveQuery) {
605        self.changed_at = self.changed_at.max(other.changed_at);
606        self.durability = self.durability.min(other.durability);
607        if let Some(other_dependencies) = &other.dependencies {
608            if let Some(my_dependencies) = &mut self.dependencies {
609                my_dependencies.extend(other_dependencies.iter().copied());
610            }
611        } else {
612            self.dependencies = None;
613        }
614    }
615
616    /// Removes the participants in `cycle` from my dependencies.
617    /// Used during cycle recovery, see [`Runtime::create_cycle_error`].
618    fn remove_cycle_participants(&mut self, cycle: &Cycle) {
619        if let Some(my_dependencies) = &mut self.dependencies {
620            for p in cycle.participant_keys() {
621                my_dependencies.remove(&p);
622            }
623        }
624    }
625
626    /// Copy the changed-at, durability, and dependencies from `cycle_query`.
627    /// Used during cycle recovery, see [`Runtime::create_cycle_error`].
628    pub(crate) fn take_inputs_from(&mut self, cycle_query: &ActiveQuery) {
629        self.changed_at = cycle_query.changed_at;
630        self.durability = cycle_query.durability;
631        self.dependencies = cycle_query.dependencies.clone();
632    }
633}
634
635/// A unique identifier for a particular runtime. Each time you create
636/// a snapshot, a fresh `RuntimeId` is generated. Once a snapshot is
637/// complete, its `RuntimeId` may potentially be re-used.
638#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)]
639pub struct RuntimeId {
640    counter: usize,
641}
642
643#[derive(Clone, Debug)]
644pub(crate) struct StampedValue<V> {
645    pub(crate) value: V,
646    pub(crate) durability: Durability,
647    pub(crate) changed_at: Revision,
648}
649
650struct RevisionGuard {
651    shared_state: Arc<SharedState>,
652}
653
654impl RevisionGuard {
655    fn new(shared_state: &Arc<SharedState>) -> Self {
656        // Subtle: we use a "recursive" lock here so that it is not an
657        // error to acquire a read-lock when one is already held (this
658        // happens when a query uses `snapshot` to spawn off parallel
659        // workers, for example).
660        //
661        // This has the side-effect that we are responsible to ensure
662        // that people contending for the write lock do not starve,
663        // but this is what we achieve via the cancellation mechanism.
664        //
665        // (In particular, since we only ever have one "mutating
666        // handle" to the database, the only contention for the global
667        // query lock occurs when there are "futures" evaluating
668        // queries in parallel, and those futures hold a read-lock
669        // already, so the starvation problem is more about them bring
670        // themselves to a close, versus preventing other people from
671        // *starting* work).
672        unsafe {
673            shared_state.query_lock.raw().lock_shared_recursive();
674        }
675
676        Self {
677            shared_state: shared_state.clone(),
678        }
679    }
680}
681
682impl Drop for RevisionGuard {
683    fn drop(&mut self) {
684        // Release our read-lock without using RAII. As documented in
685        // `Snapshot::new` above, this requires the unsafe keyword.
686        unsafe {
687            self.shared_state.query_lock.raw().unlock_shared();
688        }
689    }
690}