ra_salsa/
runtime.rs

1use crate::durability::Durability;
2use crate::hash::FxIndexSet;
3use crate::plumbing::CycleRecoveryStrategy;
4use crate::revision::{AtomicRevision, Revision};
5use crate::{Cancelled, Cycle, Database, DatabaseKeyIndex, Event, EventKind};
6use itertools::Itertools;
7use parking_lot::lock_api::{RawRwLock, RawRwLockRecursive};
8use parking_lot::{Mutex, RwLock};
9use std::hash::Hash;
10use std::panic::panic_any;
11use std::sync::atomic::{AtomicU32, Ordering};
12use tracing::trace;
13use triomphe::{Arc, ThinArc};
14
15mod dependency_graph;
16use dependency_graph::DependencyGraph;
17
18pub(crate) mod local_state;
19use local_state::LocalState;
20
21use self::local_state::{ActiveQueryGuard, QueryRevisions};
22
23/// The salsa runtime stores the storage for all queries as well as
24/// tracking the query stack and dependencies between cycles.
25///
26/// Each new runtime you create (e.g., via `Runtime::new` or
27/// `Runtime::default`) will have an independent set of query storage
28/// associated with it. Normally, therefore, you only do this once, at
29/// the start of your application.
30pub struct Runtime {
31    /// Our unique runtime id.
32    id: RuntimeId,
33
34    /// If this is a "forked" runtime, then the `revision_guard` will
35    /// be `Some`; this guard holds a read-lock on the global query
36    /// lock.
37    revision_guard: Option<RevisionGuard>,
38
39    /// Local state that is specific to this runtime (thread).
40    local_state: LocalState,
41
42    /// Shared state that is accessible via all runtimes.
43    shared_state: Arc<SharedState>,
44}
45
46#[derive(Clone, Debug)]
47pub(crate) enum WaitResult {
48    Completed,
49    Panicked,
50    Cycle(Cycle),
51}
52
53impl Default for Runtime {
54    fn default() -> Self {
55        Runtime {
56            id: RuntimeId { counter: 0 },
57            revision_guard: None,
58            shared_state: Default::default(),
59            local_state: Default::default(),
60        }
61    }
62}
63
64impl std::fmt::Debug for Runtime {
65    fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
66        fmt.debug_struct("Runtime")
67            .field("id", &self.id())
68            .field("forked", &self.revision_guard.is_some())
69            .field("shared_state", &self.shared_state)
70            .finish()
71    }
72}
73
74impl Runtime {
75    /// Create a new runtime; equivalent to `Self::default`. This is
76    /// used when creating a new database.
77    pub fn new() -> Self {
78        Self::default()
79    }
80
81    /// See [`crate::storage::Storage::snapshot`].
82    pub(crate) fn snapshot(&self) -> Self {
83        if self.local_state.query_in_progress() {
84            panic!("it is not legal to `snapshot` during a query (see salsa-rs/salsa#80)");
85        }
86
87        let revision_guard = RevisionGuard::new(&self.shared_state);
88
89        let id = RuntimeId { counter: self.shared_state.next_id.fetch_add(1, Ordering::SeqCst) };
90
91        Runtime {
92            id,
93            revision_guard: Some(revision_guard),
94            shared_state: self.shared_state.clone(),
95            local_state: Default::default(),
96        }
97    }
98
99    /// A "synthetic write" causes the system to act *as though* some
100    /// input of durability `durability` has changed. This is mostly
101    /// useful for profiling scenarios.
102    ///
103    /// **WARNING:** Just like an ordinary write, this method triggers
104    /// cancellation. If you invoke it while a snapshot exists, it
105    /// will block until that snapshot is dropped -- if that snapshot
106    /// is owned by the current thread, this could trigger deadlock.
107    pub fn synthetic_write(&mut self, durability: Durability) {
108        self.with_incremented_revision(|_next_revision| Some(durability));
109    }
110
111    /// The unique identifier attached to this `SalsaRuntime`. Each
112    /// snapshotted runtime has a distinct identifier.
113    #[inline]
114    pub fn id(&self) -> RuntimeId {
115        self.id
116    }
117
118    /// Returns the database-key for the query that this thread is
119    /// actively executing (if any).
120    pub fn active_query(&self) -> Option<DatabaseKeyIndex> {
121        self.local_state.active_query()
122    }
123
124    /// Read current value of the revision counter.
125    #[inline]
126    pub(crate) fn current_revision(&self) -> Revision {
127        self.shared_state.revisions[0].load()
128    }
129
130    /// The revision in which values with durability `d` may have last
131    /// changed.  For D0, this is just the current revision. But for
132    /// higher levels of durability, this value may lag behind the
133    /// current revision. If we encounter a value of durability Di,
134    /// then, we can check this function to get a "bound" on when the
135    /// value may have changed, which allows us to skip walking its
136    /// dependencies.
137    #[inline]
138    pub(crate) fn last_changed_revision(&self, d: Durability) -> Revision {
139        self.shared_state.revisions[d.index()].load()
140    }
141
142    /// Read current value of the revision counter.
143    #[inline]
144    pub(crate) fn pending_revision(&self) -> Revision {
145        self.shared_state.pending_revision.load()
146    }
147
148    #[cold]
149    pub(crate) fn unwind_cancelled(&self) {
150        self.report_untracked_read();
151        Cancelled::PendingWrite.throw();
152    }
153
154    /// Acquires the **global query write lock** (ensuring that no queries are
155    /// executing) and then increments the current revision counter; invokes
156    /// `op` with the global query write lock still held.
157    ///
158    /// While we wait to acquire the global query write lock, this method will
159    /// also increment `pending_revision_increments`, thus signalling to queries
160    /// that their results are "cancelled" and they should abort as expeditiously
161    /// as possible.
162    ///
163    /// The `op` closure should actually perform the writes needed. It is given
164    /// the new revision as an argument, and its return value indicates whether
165    /// any pre-existing value was modified:
166    ///
167    /// - returning `None` means that no pre-existing value was modified (this
168    ///   could occur e.g. when setting some key on an input that was never set
169    ///   before)
170    /// - returning `Some(d)` indicates that a pre-existing value was modified
171    ///   and it had the durability `d`. This will update the records for when
172    ///   values with each durability were modified.
173    ///
174    /// Note that, given our writer model, we can assume that only one thread is
175    /// attempting to increment the global revision at a time.
176    pub(crate) fn with_incremented_revision<F>(&mut self, op: F)
177    where
178        F: FnOnce(Revision) -> Option<Durability>,
179    {
180        tracing::trace!("increment_revision()");
181
182        if !self.permits_increment() {
183            panic!("increment_revision invoked during a query computation");
184        }
185
186        // Set the `pending_revision` field so that people
187        // know current revision is cancelled.
188        let current_revision = self.shared_state.pending_revision.fetch_then_increment();
189
190        // To modify the revision, we need the lock.
191        let shared_state = self.shared_state.clone();
192        let _lock = shared_state.query_lock.write();
193
194        let old_revision = self.shared_state.revisions[0].fetch_then_increment();
195        assert_eq!(current_revision, old_revision);
196
197        let new_revision = current_revision.next();
198
199        trace!("increment_revision: incremented to {:?}", new_revision);
200
201        if let Some(d) = op(new_revision) {
202            for rev in &self.shared_state.revisions[1..=d.index()] {
203                rev.store(new_revision);
204            }
205        }
206    }
207
208    pub(crate) fn permits_increment(&self) -> bool {
209        self.revision_guard.is_none() && !self.local_state.query_in_progress()
210    }
211
212    #[inline]
213    pub(crate) fn push_query(&self, database_key_index: DatabaseKeyIndex) -> ActiveQueryGuard<'_> {
214        self.local_state.push_query(database_key_index)
215    }
216
217    /// Reports that the currently active query read the result from
218    /// another query.
219    ///
220    /// Also checks whether the "cycle participant" flag is set on
221    /// the current stack frame -- if so, panics with `CycleParticipant`
222    /// value, which should be caught by the code executing the query.
223    ///
224    /// # Parameters
225    ///
226    /// - `database_key`: the query whose result was read
227    /// - `changed_revision`: the last revision in which the result of that
228    ///   query had changed
229    pub(crate) fn report_query_read_and_unwind_if_cycle_resulted(
230        &self,
231        input: DatabaseKeyIndex,
232        durability: Durability,
233        changed_at: Revision,
234    ) {
235        self.local_state
236            .report_query_read_and_unwind_if_cycle_resulted(input, durability, changed_at);
237    }
238
239    /// Reports that the query depends on some state unknown to salsa.
240    ///
241    /// Queries which report untracked reads will be re-executed in the next
242    /// revision.
243    pub fn report_untracked_read(&self) {
244        self.local_state.report_untracked_read(self.current_revision());
245    }
246
247    /// Acts as though the current query had read an input with the given durability; this will force the current query's durability to be at most `durability`.
248    ///
249    /// This is mostly useful to control the durability level for [on-demand inputs](https://salsa-rs.github.io/salsa/common_patterns/on_demand_inputs.html).
250    pub fn report_synthetic_read(&self, durability: Durability) {
251        let changed_at = self.last_changed_revision(durability);
252        self.local_state.report_synthetic_read(durability, changed_at);
253    }
254
255    /// Handles a cycle in the dependency graph that was detected when the
256    /// current thread tried to block on `database_key_index` which is being
257    /// executed by `to_id`. If this function returns, then `to_id` no longer
258    /// depends on the current thread, and so we should continue executing
259    /// as normal. Otherwise, the function will throw a `Cycle` which is expected
260    /// to be caught by some frame on our stack. This occurs either if there is
261    /// a frame on our stack with cycle recovery (possibly the top one!) or if there
262    /// is no cycle recovery at all.
263    fn unblock_cycle_and_maybe_throw(
264        &self,
265        db: &dyn Database,
266        dg: &mut DependencyGraph,
267        database_key_index: DatabaseKeyIndex,
268        to_id: RuntimeId,
269    ) {
270        trace!("unblock_cycle_and_maybe_throw(database_key={:?})", database_key_index);
271
272        let mut from_stack = self.local_state.take_query_stack();
273        let from_id = self.id();
274
275        // Make a "dummy stack frame". As we iterate through the cycle, we will collect the
276        // inputs from each participant. Then, if we are participating in cycle recovery, we
277        // will propagate those results to all participants.
278        let mut cycle_query = ActiveQuery::new(database_key_index);
279
280        // Identify the cycle participants:
281        let cycle = {
282            let mut v = vec![];
283            dg.for_each_cycle_participant(
284                from_id,
285                &mut from_stack,
286                database_key_index,
287                to_id,
288                |aqs| {
289                    aqs.iter_mut().for_each(|aq| {
290                        cycle_query.add_from(aq);
291                        v.push(aq.database_key_index);
292                    });
293                },
294            );
295
296            // We want to give the participants in a deterministic order
297            // (at least for this execution, not necessarily across executions),
298            // no matter where it started on the stack. Find the minimum
299            // key and rotate it to the front.
300            let index = v.iter().position_min().unwrap_or_default();
301            v.rotate_left(index);
302
303            // No need to store extra memory.
304            v.shrink_to_fit();
305
306            Cycle::new(Arc::new(v))
307        };
308        trace!("cycle {:?}, cycle_query {:#?}", cycle.debug(db), cycle_query,);
309
310        // We can remove the cycle participants from the list of dependencies;
311        // they are a strongly connected component (SCC) and we only care about
312        // dependencies to things outside the SCC that control whether it will
313        // form again.
314        cycle_query.remove_cycle_participants(&cycle);
315
316        // Mark each cycle participant that has recovery set, along with
317        // any frames that come after them on the same thread. Those frames
318        // are going to be unwound so that fallback can occur.
319        dg.for_each_cycle_participant(from_id, &mut from_stack, database_key_index, to_id, |aqs| {
320            aqs.iter_mut()
321                .skip_while(|aq| match db.cycle_recovery_strategy(aq.database_key_index) {
322                    CycleRecoveryStrategy::Panic => true,
323                    CycleRecoveryStrategy::Fallback => false,
324                })
325                .for_each(|aq| {
326                    trace!("marking {:?} for fallback", aq.database_key_index.debug(db));
327                    aq.take_inputs_from(&cycle_query);
328                    assert!(aq.cycle.is_none());
329                    aq.cycle = Some(cycle.clone());
330                });
331        });
332
333        // Unblock every thread that has cycle recovery with a `WaitResult::Cycle`.
334        // They will throw the cycle, which will be caught by the frame that has
335        // cycle recovery so that it can execute that recovery.
336        let (me_recovered, others_recovered) =
337            dg.maybe_unblock_runtimes_in_cycle(from_id, &from_stack, database_key_index, to_id);
338
339        self.local_state.restore_query_stack(from_stack);
340
341        if me_recovered {
342            // If the current thread has recovery, we want to throw
343            // so that it can begin.
344            cycle.throw()
345        } else if others_recovered {
346            // If other threads have recovery but we didn't: return and we will block on them.
347        } else {
348            // if nobody has recover, then we panic
349            panic_any(cycle);
350        }
351    }
352
353    /// Block until `other_id` completes executing `database_key`;
354    /// panic or unwind in the case of a cycle.
355    ///
356    /// `query_mutex_guard` is the guard for the current query's state;
357    /// it will be dropped after we have successfully registered the
358    /// dependency.
359    ///
360    /// # Propagating panics
361    ///
362    /// If the thread `other_id` panics, then our thread is considered
363    /// cancelled, so this function will panic with a `Cancelled` value.
364    ///
365    /// # Cycle handling
366    ///
367    /// If the thread `other_id` already depends on the current thread,
368    /// and hence there is a cycle in the query graph, then this function
369    /// will unwind instead of returning normally. The method of unwinding
370    /// depends on the [`Self::mutual_cycle_recovery_strategy`]
371    /// of the cycle participants:
372    ///
373    /// * [`CycleRecoveryStrategy::Panic`]: panic with the [`Cycle`] as the value.
374    /// * [`CycleRecoveryStrategy::Fallback`]: initiate unwinding with [`CycleParticipant::unwind`].
375    pub(crate) fn block_on_or_unwind<QueryMutexGuard>(
376        &self,
377        db: &dyn Database,
378        database_key: DatabaseKeyIndex,
379        other_id: RuntimeId,
380        query_mutex_guard: QueryMutexGuard,
381    ) {
382        let mut dg = self.shared_state.dependency_graph.lock();
383
384        if dg.depends_on(other_id, self.id()) {
385            self.unblock_cycle_and_maybe_throw(db, &mut dg, database_key, other_id);
386
387            // If the above fn returns, then (via cycle recovery) it has unblocked the
388            // cycle, so we can continue.
389            assert!(!dg.depends_on(other_id, self.id()));
390        }
391
392        db.salsa_event(Event {
393            runtime_id: self.id(),
394            kind: EventKind::WillBlockOn { other_runtime_id: other_id, database_key },
395        });
396
397        let stack = self.local_state.take_query_stack();
398
399        let (stack, result) = DependencyGraph::block_on(
400            dg,
401            self.id(),
402            database_key,
403            other_id,
404            stack,
405            query_mutex_guard,
406        );
407
408        self.local_state.restore_query_stack(stack);
409
410        match result {
411            WaitResult::Completed => (),
412
413            // If the other thread panicked, then we consider this thread
414            // cancelled. The assumption is that the panic will be detected
415            // by the other thread and responded to appropriately.
416            WaitResult::Panicked => Cancelled::PropagatedPanic.throw(),
417
418            WaitResult::Cycle(c) => c.throw(),
419        }
420    }
421
422    /// Invoked when this runtime completed computing `database_key` with
423    /// the given result `wait_result` (`wait_result` should be `None` if
424    /// computing `database_key` panicked and could not complete).
425    /// This function unblocks any dependent queries and allows them
426    /// to continue executing.
427    pub(crate) fn unblock_queries_blocked_on(
428        &self,
429        database_key: DatabaseKeyIndex,
430        wait_result: WaitResult,
431    ) {
432        self.shared_state
433            .dependency_graph
434            .lock()
435            .unblock_runtimes_blocked_on(database_key, wait_result);
436    }
437}
438
439/// State that will be common to all threads (when we support multiple threads)
440struct SharedState {
441    /// Stores the next id to use for a snapshotted runtime (starts at 1).
442    next_id: AtomicU32,
443
444    /// Whenever derived queries are executing, they acquire this lock
445    /// in read mode. Mutating inputs (and thus creating a new
446    /// revision) requires a write lock (thus guaranteeing that no
447    /// derived queries are in progress). Note that this is not needed
448    /// to prevent **race conditions** -- the revision counter itself
449    /// is stored in an `AtomicUsize` so it can be cheaply read
450    /// without acquiring the lock.  Rather, the `query_lock` is used
451    /// to ensure a higher-level consistency property.
452    query_lock: RwLock<()>,
453
454    /// This is typically equal to `revision` -- set to `revision+1`
455    /// when a new revision is pending (which implies that the current
456    /// revision is cancelled).
457    pending_revision: AtomicRevision,
458
459    /// Stores the "last change" revision for values of each Durability.
460    /// This vector is always of length at least 1 (for Durability 0)
461    /// but its total length depends on the number of Durabilities. The
462    /// element at index 0 is special as it represents the "current
463    /// revision".  In general, we have the invariant that revisions
464    /// in here are *declining* -- that is, `revisions[i] >=
465    /// revisions[i + 1]`, for all `i`. This is because when you
466    /// modify a value with durability D, that implies that values
467    /// with durability less than D may have changed too.
468    revisions: [AtomicRevision; Durability::LEN],
469
470    /// The dependency graph tracks which runtimes are blocked on one
471    /// another, waiting for queries to terminate.
472    dependency_graph: Mutex<DependencyGraph>,
473}
474
475impl std::panic::RefUnwindSafe for SharedState {}
476
477impl Default for SharedState {
478    fn default() -> Self {
479        #[allow(clippy::declare_interior_mutable_const)]
480        const START: AtomicRevision = AtomicRevision::start();
481        SharedState {
482            next_id: AtomicU32::new(1),
483            query_lock: Default::default(),
484            revisions: [START; Durability::LEN],
485            pending_revision: START,
486            dependency_graph: Default::default(),
487        }
488    }
489}
490
491impl std::fmt::Debug for SharedState {
492    fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
493        let query_lock = if self.query_lock.is_locked_exclusive() {
494            "<wlocked>"
495        } else if self.query_lock.is_locked() {
496            "<rlocked>"
497        } else {
498            "<unlocked>"
499        };
500        fmt.debug_struct("SharedState")
501            .field("query_lock", &query_lock)
502            .field("revisions", &self.revisions)
503            .field("pending_revision", &self.pending_revision)
504            .finish()
505    }
506}
507
508#[derive(Debug)]
509struct ActiveQuery {
510    /// What query is executing
511    database_key_index: DatabaseKeyIndex,
512
513    /// Minimum durability of inputs observed so far.
514    durability: Durability,
515
516    /// Maximum revision of all inputs observed. If we observe an
517    /// untracked read, this will be set to the most recent revision.
518    changed_at: Revision,
519
520    /// Set of subqueries that were accessed thus far, or `None` if
521    /// there was an untracked the read.
522    dependencies: Option<FxIndexSet<DatabaseKeyIndex>>,
523
524    /// Stores the entire cycle, if one is found and this query is part of it.
525    cycle: Option<Cycle>,
526}
527
528impl ActiveQuery {
529    fn new(database_key_index: DatabaseKeyIndex) -> Self {
530        ActiveQuery {
531            database_key_index,
532            durability: Durability::MAX,
533            changed_at: Revision::start(),
534            dependencies: Some(FxIndexSet::default()),
535            cycle: None,
536        }
537    }
538
539    fn add_read(&mut self, input: DatabaseKeyIndex, durability: Durability, revision: Revision) {
540        if let Some(set) = &mut self.dependencies {
541            set.insert(input);
542        }
543
544        self.durability = self.durability.min(durability);
545        self.changed_at = self.changed_at.max(revision);
546    }
547
548    fn add_untracked_read(&mut self, changed_at: Revision) {
549        self.dependencies = None;
550        self.durability = Durability::LOW;
551        self.changed_at = changed_at;
552    }
553
554    fn add_synthetic_read(&mut self, durability: Durability, revision: Revision) {
555        self.dependencies = None;
556        self.durability = self.durability.min(durability);
557        self.changed_at = self.changed_at.max(revision);
558    }
559
560    pub(crate) fn revisions(&self) -> QueryRevisions {
561        let (inputs, untracked) = match &self.dependencies {
562            None => (None, true),
563
564            Some(dependencies) => (
565                if dependencies.is_empty() {
566                    None
567                } else {
568                    Some(ThinArc::from_header_and_iter((), dependencies.iter().copied()))
569                },
570                false,
571            ),
572        };
573
574        QueryRevisions {
575            changed_at: self.changed_at,
576            inputs,
577            untracked,
578            durability: self.durability,
579        }
580    }
581
582    /// Adds any dependencies from `other` into `self`.
583    /// Used during cycle recovery, see [`Runtime::create_cycle_error`].
584    fn add_from(&mut self, other: &ActiveQuery) {
585        self.changed_at = self.changed_at.max(other.changed_at);
586        self.durability = self.durability.min(other.durability);
587        if let Some(other_dependencies) = &other.dependencies {
588            if let Some(my_dependencies) = &mut self.dependencies {
589                my_dependencies.extend(other_dependencies.iter().copied());
590            }
591        } else {
592            self.dependencies = None;
593        }
594    }
595
596    /// Removes the participants in `cycle` from my dependencies.
597    /// Used during cycle recovery, see [`Runtime::create_cycle_error`].
598    fn remove_cycle_participants(&mut self, cycle: &Cycle) {
599        if let Some(my_dependencies) = &mut self.dependencies {
600            for p in cycle.participant_keys() {
601                my_dependencies.swap_remove(&p);
602            }
603        }
604    }
605
606    /// Copy the changed-at, durability, and dependencies from `cycle_query`.
607    /// Used during cycle recovery, see [`Runtime::create_cycle_error`].
608    pub(crate) fn take_inputs_from(&mut self, cycle_query: &ActiveQuery) {
609        self.changed_at = cycle_query.changed_at;
610        self.durability = cycle_query.durability;
611        self.dependencies.clone_from(&cycle_query.dependencies);
612    }
613}
614
615/// A unique identifier for a particular runtime. Each time you create
616/// a snapshot, a fresh `RuntimeId` is generated. Once a snapshot is
617/// complete, its `RuntimeId` may potentially be re-used.
618#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)]
619pub struct RuntimeId {
620    counter: u32,
621}
622
623#[derive(Clone, Debug)]
624pub(crate) struct StampedValue<V> {
625    pub(crate) value: V,
626    pub(crate) durability: Durability,
627    pub(crate) changed_at: Revision,
628}
629
630struct RevisionGuard {
631    shared_state: Arc<SharedState>,
632}
633
634impl RevisionGuard {
635    fn new(shared_state: &Arc<SharedState>) -> Self {
636        // Subtle: we use a "recursive" lock here so that it is not an
637        // error to acquire a read-lock when one is already held (this
638        // happens when a query uses `snapshot` to spawn off parallel
639        // workers, for example).
640        //
641        // This has the side-effect that we are responsible to ensure
642        // that people contending for the write lock do not starve,
643        // but this is what we achieve via the cancellation mechanism.
644        //
645        // (In particular, since we only ever have one "mutating
646        // handle" to the database, the only contention for the global
647        // query lock occurs when there are "futures" evaluating
648        // queries in parallel, and those futures hold a read-lock
649        // already, so the starvation problem is more about them bring
650        // themselves to a close, versus preventing other people from
651        // *starting* work).
652        unsafe {
653            shared_state.query_lock.raw().lock_shared_recursive();
654        }
655
656        Self { shared_state: shared_state.clone() }
657    }
658}
659
660impl Drop for RevisionGuard {
661    fn drop(&mut self) {
662        // Release our read-lock without using RAII. As documented in
663        // `Snapshot::new` above, this requires the unsafe keyword.
664        unsafe {
665            self.shared_state.query_lock.raw().unlock_shared();
666        }
667    }
668}