query_flow/
runtime.rs

1//! Query runtime and context.
2
3use std::any::{Any, TypeId};
4use std::cell::RefCell;
5use std::ops::Deref;
6use std::rc::Rc;
7use std::sync::Arc;
8
9use whale::{Durability, GetOrInsertResult, RevisionCounter, Runtime as WhaleRuntime};
10
11use crate::asset::{AssetKey, AssetLocator, DurabilityLevel, FullAssetKey, PendingAsset};
12use crate::db::Db;
13use crate::key::FullCacheKey;
14use crate::loading::AssetLoadingState;
15use crate::query::Query;
16use crate::storage::{
17    AssetKeyRegistry, CachedEntry, CachedValue, ErasedLocateResult, LocatorStorage, PendingStorage,
18    QueryRegistry, VerifierStorage,
19};
20use crate::tracer::{
21    ExecutionResult, InvalidationReason, NoopTracer, SpanId, Tracer, TracerAssetKey,
22    TracerAssetState, TracerQueryKey,
23};
24use crate::QueryError;
25
26/// Function type for comparing user errors during early cutoff.
27///
28/// Used by `QueryRuntimeBuilder::error_comparator` to customize how
29/// `QueryError::UserError` values are compared for caching purposes.
30pub type ErrorComparator = fn(&anyhow::Error, &anyhow::Error) -> bool;
31
32/// Number of durability levels (matches whale's default).
33const DURABILITY_LEVELS: usize = 4;
34
35// Thread-local query execution stack for cycle detection.
36thread_local! {
37    static QUERY_STACK: RefCell<Vec<FullCacheKey>> = const { RefCell::new(Vec::new()) };
38
39    /// Current consistency tracker for leaf asset checks.
40    /// Set during query execution, used by all nested locator calls.
41    /// Uses Rc since thread-local is single-threaded.
42    static CONSISTENCY_TRACKER: RefCell<Option<Rc<ConsistencyTracker>>> = const { RefCell::new(None) };
43}
44
45/// Check a leaf asset against the current consistency tracker (if any).
46/// Returns Ok if no tracker is set or if the check passes.
47fn check_leaf_asset_consistency(dep_changed_at: RevisionCounter) -> Result<(), QueryError> {
48    CONSISTENCY_TRACKER.with(|tracker| {
49        if let Some(ref t) = *tracker.borrow() {
50            t.check_leaf_asset(dep_changed_at)
51        } else {
52            Ok(())
53        }
54    })
55}
56
57/// RAII guard that sets the consistency tracker for the current thread.
58struct ConsistencyTrackerGuard {
59    previous: Option<Rc<ConsistencyTracker>>,
60}
61
62impl ConsistencyTrackerGuard {
63    fn new(tracker: Rc<ConsistencyTracker>) -> Self {
64        let previous = CONSISTENCY_TRACKER.with(|t| t.borrow_mut().replace(tracker));
65        Self { previous }
66    }
67}
68
69impl Drop for ConsistencyTrackerGuard {
70    fn drop(&mut self) {
71        CONSISTENCY_TRACKER.with(|t| {
72            *t.borrow_mut() = self.previous.take();
73        });
74    }
75}
76
77/// Check for cycles in the query stack and return error if detected.
78fn check_cycle(key: &FullCacheKey) -> Result<(), QueryError> {
79    let cycle_detected = QUERY_STACK.with(|stack| stack.borrow().iter().any(|k| k == key));
80    if cycle_detected {
81        let path = QUERY_STACK.with(|stack| {
82            let stack = stack.borrow();
83            let mut path: Vec<String> = stack.iter().map(|k| k.debug_repr().to_string()).collect();
84            path.push(key.debug_repr().to_string());
85            path
86        });
87        return Err(QueryError::Cycle { path });
88    }
89    Ok(())
90}
91
92/// RAII guard for pushing/popping from query stack.
93struct StackGuard;
94
95impl StackGuard {
96    fn push(key: FullCacheKey) -> Self {
97        QUERY_STACK.with(|stack| stack.borrow_mut().push(key));
98        StackGuard
99    }
100}
101
102impl Drop for StackGuard {
103    fn drop(&mut self) {
104        QUERY_STACK.with(|stack| {
105            stack.borrow_mut().pop();
106        });
107    }
108}
109
110/// Execution context passed through query execution.
111///
112/// Contains a span ID for tracing correlation.
113#[derive(Clone, Copy)]
114pub struct ExecutionContext {
115    span_id: SpanId,
116}
117
118impl ExecutionContext {
119    /// Create a new execution context with the given span ID.
120    #[inline]
121    pub fn new(span_id: SpanId) -> Self {
122        Self { span_id }
123    }
124
125    /// Get the span ID for this execution context.
126    #[inline]
127    pub fn span_id(&self) -> SpanId {
128        self.span_id
129    }
130}
131
132/// Result of polling a query, containing the value and its revision.
133///
134/// This is returned by [`QueryRuntime::poll`] and provides both the query result
135/// and its change revision, enabling efficient change detection for subscription
136/// patterns.
137///
138/// # Example
139///
140/// ```ignore
141/// let result = runtime.poll(MyQuery::new())?;
142///
143/// // Access the value via Deref
144/// println!("{:?}", *result);
145///
146/// // Check if changed since last poll
147/// if result.revision > last_known_revision {
148///     notify_subscribers(&result.value);
149///     last_known_revision = result.revision;
150/// }
151/// ```
152#[derive(Debug, Clone)]
153pub struct Polled<T> {
154    /// The query result value.
155    pub value: T,
156    /// The revision at which this value was last changed.
157    ///
158    /// Compare this with a previously stored revision to detect changes.
159    pub revision: RevisionCounter,
160}
161
162impl<T: Deref> Deref for Polled<T> {
163    type Target = T::Target;
164
165    fn deref(&self) -> &Self::Target {
166        &self.value
167    }
168}
169
170/// The query runtime manages query execution, caching, and dependency tracking.
171///
172/// This is cheap to clone - all data is behind `Arc`.
173///
174/// # Type Parameter
175///
176/// - `T: Tracer` - The tracer type for observability. Use `NoopTracer` (default)
177///   for zero-cost when tracing is not needed.
178///
179/// # Example
180///
181/// ```ignore
182/// // Without tracing (default)
183/// let runtime = QueryRuntime::new();
184///
185/// // With tracing
186/// let tracer = MyTracer::new();
187/// let runtime = QueryRuntime::with_tracer(tracer);
188///
189/// // Sync query execution
190/// let result = runtime.query(MyQuery { ... })?;
191/// ```
192pub struct QueryRuntime<T: Tracer = NoopTracer> {
193    /// Whale runtime for dependency tracking and cache storage.
194    /// Query outputs and asset values are stored in Node.data as Option<CachedEntry>.
195    whale: WhaleRuntime<FullCacheKey, Option<CachedEntry>, DURABILITY_LEVELS>,
196    /// Registered asset locators
197    locators: Arc<LocatorStorage<T>>,
198    /// Pending asset requests
199    pending: Arc<PendingStorage>,
200    /// Registry for tracking query instances (for list_queries)
201    query_registry: Arc<QueryRegistry>,
202    /// Registry for tracking asset keys (for list_asset_keys)
203    asset_key_registry: Arc<AssetKeyRegistry>,
204    /// Verifiers for re-executing queries (for verify-then-decide pattern)
205    verifiers: Arc<VerifierStorage>,
206    /// Comparator for user errors during early cutoff
207    error_comparator: ErrorComparator,
208    /// Tracer for observability
209    tracer: Arc<T>,
210}
211
212#[test]
213fn test_runtime_send_sync() {
214    fn assert_send_sync<T: Send + Sync>() {}
215    assert_send_sync::<QueryRuntime<NoopTracer>>();
216}
217
218impl Default for QueryRuntime<NoopTracer> {
219    fn default() -> Self {
220        Self::new()
221    }
222}
223
224impl<T: Tracer> Clone for QueryRuntime<T> {
225    fn clone(&self) -> Self {
226        Self {
227            whale: self.whale.clone(),
228            locators: self.locators.clone(),
229            pending: self.pending.clone(),
230            query_registry: self.query_registry.clone(),
231            asset_key_registry: self.asset_key_registry.clone(),
232            verifiers: self.verifiers.clone(),
233            error_comparator: self.error_comparator,
234            tracer: self.tracer.clone(),
235        }
236    }
237}
238
239/// Default error comparator that treats all errors as different.
240///
241/// This is conservative - it always triggers recomputation when an error occurs.
242fn default_error_comparator(_a: &anyhow::Error, _b: &anyhow::Error) -> bool {
243    false
244}
245
246impl<T: Tracer> QueryRuntime<T> {
247    /// Get cached output along with its revision (single atomic access).
248    fn get_cached_with_revision<Q: Query>(
249        &self,
250        key: &FullCacheKey,
251    ) -> Option<(CachedValue<Arc<Q::Output>>, RevisionCounter)> {
252        let node = self.whale.get(key)?;
253        let revision = node.changed_at;
254        let entry = node.data.as_ref()?;
255        let cached = entry.to_cached_value::<Q::Output>()?;
256        Some((cached, revision))
257    }
258
259    /// Get a reference to the tracer.
260    #[inline]
261    pub fn tracer(&self) -> &T {
262        &self.tracer
263    }
264}
265
266impl QueryRuntime<NoopTracer> {
267    /// Create a new query runtime with default settings.
268    pub fn new() -> Self {
269        Self::with_tracer(NoopTracer)
270    }
271
272    /// Create a builder for customizing the runtime.
273    ///
274    /// # Example
275    ///
276    /// ```ignore
277    /// let runtime = QueryRuntime::builder()
278    ///     .error_comparator(|a, b| {
279    ///         // Custom error comparison logic
280    ///         match (a.downcast_ref::<MyError>(), b.downcast_ref::<MyError>()) {
281    ///             (Some(a), Some(b)) => a == b,
282    ///             _ => false,
283    ///         }
284    ///     })
285    ///     .build();
286    /// ```
287    pub fn builder() -> QueryRuntimeBuilder<NoopTracer> {
288        QueryRuntimeBuilder::new()
289    }
290}
291
292impl<T: Tracer> QueryRuntime<T> {
293    /// Create a new query runtime with the specified tracer.
294    pub fn with_tracer(tracer: T) -> Self {
295        QueryRuntimeBuilder::new().tracer(tracer).build()
296    }
297
298    /// Execute a query synchronously.
299    ///
300    /// Returns the cached result if valid, otherwise executes the query.
301    ///
302    /// # Errors
303    ///
304    /// - `QueryError::Suspend` - Query is waiting for async loading
305    /// - `QueryError::Cycle` - Dependency cycle detected
306    pub fn query<Q: Query>(&self, query: Q) -> Result<Arc<Q::Output>, QueryError> {
307        self.query_internal(query)
308            .and_then(|(inner_result, _)| inner_result.map_err(QueryError::UserError))
309    }
310
311    /// Internal implementation shared by query() and poll().
312    ///
313    /// Returns (result, revision) tuple where result is either Ok(output) or Err(user_error).
314    /// System errors (Suspend, Cycle, etc.) are returned as the outer Err.
315    #[allow(clippy::type_complexity)]
316    fn query_internal<Q: Query>(
317        &self,
318        query: Q,
319    ) -> Result<(Result<Arc<Q::Output>, Arc<anyhow::Error>>, RevisionCounter), QueryError> {
320        let key = query.cache_key();
321        let full_key = FullCacheKey::new::<Q, _>(&key);
322
323        // Emit query key event for GC tracking (before other events)
324        self.tracer.on_query_key(&full_key);
325
326        // Create execution context and emit start event
327        let span_id = self.tracer.new_span_id();
328        let exec_ctx = ExecutionContext::new(span_id);
329        let query_key = TracerQueryKey::new(std::any::type_name::<Q>(), full_key.debug_repr());
330
331        self.tracer.on_query_start(span_id, query_key.clone());
332
333        // Check for cycles using thread-local stack
334        let cycle_detected = QUERY_STACK.with(|stack| {
335            let stack = stack.borrow();
336            stack.iter().any(|k| k == &full_key)
337        });
338
339        if cycle_detected {
340            let path = QUERY_STACK.with(|stack| {
341                let stack = stack.borrow();
342                let mut path: Vec<String> =
343                    stack.iter().map(|k| k.debug_repr().to_string()).collect();
344                path.push(full_key.debug_repr().to_string());
345                path
346            });
347
348            self.tracer.on_cycle_detected(
349                path.iter()
350                    .map(|s| TracerQueryKey::new("", s.clone()))
351                    .collect(),
352            );
353            self.tracer
354                .on_query_end(span_id, query_key.clone(), ExecutionResult::CycleDetected);
355
356            return Err(QueryError::Cycle { path });
357        }
358
359        // Check if cached and valid (with verify-then-decide pattern)
360        let current_rev = self.whale.current_revision();
361
362        // Fast path: already verified at current revision
363        if self.whale.is_verified_at(&full_key, &current_rev) {
364            // Single atomic access to get both cached value and revision
365            if let Some((cached, revision)) = self.get_cached_with_revision::<Q>(&full_key) {
366                self.tracer.on_cache_check(span_id, query_key.clone(), true);
367                self.tracer
368                    .on_query_end(span_id, query_key.clone(), ExecutionResult::CacheHit);
369
370                return match cached {
371                    CachedValue::Ok(output) => Ok((Ok(output), revision)),
372                    CachedValue::UserError(err) => Ok((Err(err), revision)),
373                };
374            }
375        }
376
377        // Check shallow validity (deps' changed_at ok) and try verify-then-decide
378        if self.whale.is_valid(&full_key) {
379            // Single atomic access to get both cached value and revision
380            if let Some((cached, revision)) = self.get_cached_with_revision::<Q>(&full_key) {
381                // Shallow valid but not verified - verify deps first
382                let mut deps_verified = true;
383                if let Some(deps) = self.whale.get_dependency_ids(&full_key) {
384                    for dep in deps {
385                        if let Some(verifier) = self.verifiers.get(&dep) {
386                            // Re-run query/asset to verify it (triggers recursive verification)
387                            // For assets, this re-accesses the asset which may re-run the locator
388                            if verifier.verify(self as &dyn std::any::Any).is_err() {
389                                deps_verified = false;
390                                break;
391                            }
392                        }
393                        // Note: deps without verifiers are assumed valid (they're verified
394                        // by the final is_valid check if their changed_at increased)
395                    }
396                }
397
398                // Re-check validity after deps are verified
399                if deps_verified && self.whale.is_valid(&full_key) {
400                    // Deps didn't change their changed_at, mark as verified and use cached
401                    self.whale.mark_verified(&full_key, &current_rev);
402
403                    self.tracer.on_cache_check(span_id, query_key.clone(), true);
404                    self.tracer
405                        .on_query_end(span_id, query_key.clone(), ExecutionResult::CacheHit);
406
407                    return match cached {
408                        CachedValue::Ok(output) => Ok((Ok(output), revision)),
409                        CachedValue::UserError(err) => Ok((Err(err), revision)),
410                    };
411                }
412                // A dep's changed_at increased, fall through to execute
413            }
414        }
415
416        self.tracer
417            .on_cache_check(span_id, query_key.clone(), false);
418
419        // Execute the query with cycle tracking
420        let _guard = StackGuard::push(full_key.clone());
421        let result = self.execute_query::<Q>(&query, &full_key, exec_ctx);
422        drop(_guard);
423
424        // Emit end event
425        let exec_result = match &result {
426            Ok((_, true, _)) => ExecutionResult::Changed,
427            Ok((_, false, _)) => ExecutionResult::Unchanged,
428            Err(QueryError::Suspend { .. }) => ExecutionResult::Suspended,
429            Err(QueryError::Cycle { .. }) => ExecutionResult::CycleDetected,
430            Err(e) => ExecutionResult::Error {
431                message: format!("{:?}", e),
432            },
433        };
434        self.tracer
435            .on_query_end(span_id, query_key.clone(), exec_result);
436
437        result.map(|(inner_result, _, revision)| (inner_result, revision))
438    }
439
440    /// Execute a query, caching the result if appropriate.
441    ///
442    /// Returns (result, output_changed, revision) tuple.
443    /// - `result`: Ok(output) for success, Err(user_error) for user errors
444    /// - System errors (Suspend, Cycle, etc.) are returned as outer Err
445    #[allow(clippy::type_complexity)]
446    fn execute_query<Q: Query>(
447        &self,
448        query: &Q,
449        full_key: &FullCacheKey,
450        exec_ctx: ExecutionContext,
451    ) -> Result<
452        (
453            Result<Arc<Q::Output>, Arc<anyhow::Error>>,
454            bool,
455            RevisionCounter,
456        ),
457        QueryError,
458    > {
459        // Capture current global revision at query start for consistency checking
460        let start_revision = self.whale.current_revision().get(Durability::volatile());
461
462        // Create consistency tracker for this query execution
463        let tracker = Rc::new(ConsistencyTracker::new(start_revision));
464
465        // Set thread-local tracker for nested locator calls
466        let _tracker_guard = ConsistencyTrackerGuard::new(tracker);
467
468        // Create context for this query execution
469        let ctx = QueryContext {
470            runtime: self,
471            current_key: full_key.clone(),
472            parent_query_type: std::any::type_name::<Q>(),
473            exec_ctx,
474            deps: RefCell::new(Vec::new()),
475        };
476
477        // Execute the query (clone because query() takes ownership)
478        let result = query.clone().query(&ctx);
479
480        // Get collected dependencies
481        let deps: Vec<FullCacheKey> = ctx.deps.borrow().clone();
482
483        // Query durability defaults to stable - Whale will automatically reduce
484        // the effective durability to min(requested, min(dep_durabilities)).
485        // A pure query with no dependencies remains stable.
486        // A query depending on volatile assets becomes volatile.
487        let durability = Durability::stable();
488
489        match result {
490            Ok(output) => {
491                // Check if output changed (for early cutoff)
492                // existing_revision is Some only when output is unchanged (can reuse revision)
493                let existing_revision = if let Some((CachedValue::Ok(old), rev)) =
494                    self.get_cached_with_revision::<Q>(full_key)
495                {
496                    if Q::output_eq(&*old, &*output) {
497                        Some(rev) // Same output - reuse revision
498                    } else {
499                        None // Different output
500                    }
501                } else {
502                    None // No previous Ok value
503                };
504                let output_changed = existing_revision.is_none();
505
506                // Emit early cutoff check event
507                self.tracer.on_early_cutoff_check(
508                    exec_ctx.span_id(),
509                    TracerQueryKey::new(std::any::type_name::<Q>(), full_key.debug_repr()),
510                    output_changed,
511                );
512
513                // Update whale with cached entry (atomic update of value + dependency state)
514                let entry = CachedEntry::Ok(output.clone() as Arc<dyn std::any::Any + Send + Sync>);
515                let revision = if let Some(existing_rev) = existing_revision {
516                    // confirm_unchanged doesn't change changed_at, use existing
517                    let _ = self.whale.confirm_unchanged(full_key, deps);
518                    existing_rev
519                } else {
520                    // Use new_rev from register result
521                    match self
522                        .whale
523                        .register(full_key.clone(), Some(entry), durability, deps)
524                    {
525                        Ok(result) => result.new_rev,
526                        Err(missing) => {
527                            return Err(QueryError::DependenciesRemoved {
528                                missing_keys: missing,
529                            })
530                        }
531                    }
532                };
533
534                // Register query in registry for list_queries
535                let is_new_query = self.query_registry.register(query);
536                if is_new_query {
537                    let sentinel = FullCacheKey::query_set_sentinel::<Q>();
538                    let _ = self
539                        .whale
540                        .register(sentinel, None, Durability::stable(), vec![]);
541                }
542
543                // Store verifier for this query (for verify-then-decide pattern)
544                self.verifiers
545                    .insert::<Q, T>(full_key.clone(), query.clone());
546
547                Ok((Ok(output), output_changed, revision))
548            }
549            Err(QueryError::UserError(err)) => {
550                // Check if error changed (for early cutoff)
551                // existing_revision is Some only when error is unchanged (can reuse revision)
552                let existing_revision = if let Some((CachedValue::UserError(old_err), rev)) =
553                    self.get_cached_with_revision::<Q>(full_key)
554                {
555                    if (self.error_comparator)(old_err.as_ref(), err.as_ref()) {
556                        Some(rev) // Same error - reuse revision
557                    } else {
558                        None // Different error
559                    }
560                } else {
561                    None // No previous UserError
562                };
563                let output_changed = existing_revision.is_none();
564
565                // Emit early cutoff check event
566                self.tracer.on_early_cutoff_check(
567                    exec_ctx.span_id(),
568                    TracerQueryKey::new(std::any::type_name::<Q>(), full_key.debug_repr()),
569                    output_changed,
570                );
571
572                // Update whale with cached error (atomic update of value + dependency state)
573                let entry = CachedEntry::UserError(err.clone());
574                let revision = if let Some(existing_rev) = existing_revision {
575                    // confirm_unchanged doesn't change changed_at, use existing
576                    let _ = self.whale.confirm_unchanged(full_key, deps);
577                    existing_rev
578                } else {
579                    // Use new_rev from register result
580                    match self
581                        .whale
582                        .register(full_key.clone(), Some(entry), durability, deps)
583                    {
584                        Ok(result) => result.new_rev,
585                        Err(missing) => {
586                            return Err(QueryError::DependenciesRemoved {
587                                missing_keys: missing,
588                            })
589                        }
590                    }
591                };
592
593                // Register query in registry for list_queries
594                let is_new_query = self.query_registry.register(query);
595                if is_new_query {
596                    let sentinel = FullCacheKey::query_set_sentinel::<Q>();
597                    let _ = self
598                        .whale
599                        .register(sentinel, None, Durability::stable(), vec![]);
600                }
601
602                // Store verifier for this query (for verify-then-decide pattern)
603                self.verifiers
604                    .insert::<Q, T>(full_key.clone(), query.clone());
605
606                Ok((Err(err), output_changed, revision))
607            }
608            Err(e) => {
609                // System errors (Suspend, Cycle, Cancelled) are not cached
610                Err(e)
611            }
612        }
613    }
614
615    /// Invalidate a query, forcing recomputation on next access.
616    ///
617    /// This also invalidates any queries that depend on this one.
618    pub fn invalidate<Q: Query>(&self, key: &Q::CacheKey) {
619        let full_key = FullCacheKey::new::<Q, _>(key);
620
621        self.tracer.on_query_invalidated(
622            TracerQueryKey::new(std::any::type_name::<Q>(), full_key.debug_repr()),
623            InvalidationReason::ManualInvalidation,
624        );
625
626        // Update whale to invalidate dependents (register with None to clear cached value)
627        // Use stable durability to increment all revision counters, ensuring queries
628        // at any durability level will see this as a change.
629        let _ = self
630            .whale
631            .register(full_key, None, Durability::stable(), vec![]);
632    }
633
634    /// Remove a query from the cache entirely, freeing memory.
635    ///
636    /// Use this for GC when a query is no longer needed.
637    /// Unlike `invalidate`, this removes all traces of the query from storage.
638    /// The query will be recomputed from scratch on next access.
639    ///
640    /// This also invalidates any queries that depend on this one.
641    pub fn remove_query<Q: Query>(&self, key: &Q::CacheKey) {
642        let full_key = FullCacheKey::new::<Q, _>(key);
643
644        self.tracer.on_query_invalidated(
645            TracerQueryKey::new(std::any::type_name::<Q>(), full_key.debug_repr()),
646            InvalidationReason::ManualInvalidation,
647        );
648
649        // Remove verifier if exists
650        self.verifiers.remove(&full_key);
651
652        // Remove from whale storage (this also handles dependent invalidation)
653        self.whale.remove(&full_key);
654
655        // Remove from registry and update sentinel for list_queries
656        if self.query_registry.remove::<Q>(key) {
657            let sentinel = FullCacheKey::query_set_sentinel::<Q>();
658            let _ = self
659                .whale
660                .register(sentinel, None, Durability::stable(), vec![]);
661        }
662    }
663
664    /// Clear all cached values by removing all nodes from whale.
665    ///
666    /// Note: This is a relatively expensive operation as it iterates through all keys.
667    pub fn clear_cache(&self) {
668        let keys = self.whale.keys();
669        for key in keys {
670            self.whale.remove(&key);
671        }
672    }
673
674    /// Poll a query, returning both the result and its change revision.
675    ///
676    /// This is useful for implementing subscription patterns where you need to
677    /// detect changes efficiently. Compare the returned `revision` with a
678    /// previously stored value to determine if the query result has changed.
679    ///
680    /// The returned `Polled` contains a `Result<Arc<Q::Output>, Arc<anyhow::Error>>`
681    /// as its value, allowing you to track revision changes for both success and
682    /// user error cases.
683    ///
684    /// # Example
685    ///
686    /// ```ignore
687    /// struct Subscription<Q: Query> {
688    ///     query: Q,
689    ///     last_revision: RevisionCounter,
690    ///     tx: Sender<Result<Arc<Q::Output>, Arc<anyhow::Error>>>,
691    /// }
692    ///
693    /// // Polling loop
694    /// for sub in &mut subscriptions {
695    ///     let result = runtime.poll(sub.query.clone())?;
696    ///     if result.revision > sub.last_revision {
697    ///         sub.tx.send(result.value.clone())?;
698    ///         sub.last_revision = result.revision;
699    ///     }
700    /// }
701    /// ```
702    ///
703    /// # Errors
704    ///
705    /// Returns `Err` only for system errors (Suspend, Cycle, etc.).
706    /// User errors are returned as `Ok(Polled { value: Err(error), ... })`.
707    #[allow(clippy::type_complexity)]
708    pub fn poll<Q: Query>(
709        &self,
710        query: Q,
711    ) -> Result<Polled<Result<Arc<Q::Output>, Arc<anyhow::Error>>>, QueryError> {
712        let (value, revision) = self.query_internal(query)?;
713        Ok(Polled { value, revision })
714    }
715
716    /// Get the change revision of a query without executing it.
717    ///
718    /// Returns `None` if the query has never been executed.
719    ///
720    /// This is useful for checking if a query has changed since the last poll
721    /// without the cost of executing the query.
722    ///
723    /// # Example
724    ///
725    /// ```ignore
726    /// // Check if query has changed before deciding to poll
727    /// if let Some(rev) = runtime.changed_at::<MyQuery>(&key) {
728    ///     if rev > last_known_revision {
729    ///         let result = runtime.query(MyQuery::new(key))?;
730    ///         // Process result...
731    ///     }
732    /// }
733    /// ```
734    pub fn changed_at<Q: Query>(&self, key: &Q::CacheKey) -> Option<RevisionCounter> {
735        let full_key = FullCacheKey::new::<Q, _>(key);
736        self.whale.get(&full_key).map(|node| node.changed_at)
737    }
738}
739
740// ============================================================================
741// GC (Garbage Collection) API
742// ============================================================================
743
744impl<T: Tracer> QueryRuntime<T> {
745    /// Get all query keys currently in the cache.
746    ///
747    /// This is useful for implementing custom garbage collection strategies.
748    /// Use this in combination with [`Tracer::on_query_key`] to track access
749    /// times and implement LRU, TTL, or other GC algorithms externally.
750    ///
751    /// # Example
752    ///
753    /// ```ignore
754    /// // Collect all keys that haven't been accessed recently
755    /// let stale_keys: Vec<_> = runtime.query_keys()
756    ///     .filter(|key| tracker.is_stale(key))
757    ///     .collect();
758    ///
759    /// // Remove stale queries that have no dependents
760    /// for key in stale_keys {
761    ///     runtime.remove_if_unused(&key);
762    /// }
763    /// ```
764    pub fn query_keys(&self) -> Vec<FullCacheKey> {
765        self.whale.keys()
766    }
767
768    /// Remove a query if it has no dependents.
769    ///
770    /// Returns `true` if the query was removed, `false` if it has dependents
771    /// or doesn't exist. This is the safe way to remove queries during GC,
772    /// as it won't break queries that depend on this one.
773    ///
774    /// # Example
775    ///
776    /// ```ignore
777    /// let key = FullCacheKey::new::<MyQuery, _>(&cache_key);
778    /// if runtime.remove_query_if_unused::<MyQuery>(&cache_key) {
779    ///     println!("Query removed");
780    /// } else {
781    ///     println!("Query has dependents, not removed");
782    /// }
783    /// ```
784    pub fn remove_query_if_unused<Q: Query>(&self, key: &Q::CacheKey) -> bool {
785        let full_key = FullCacheKey::new::<Q, _>(key);
786        self.remove_if_unused(&full_key)
787    }
788
789    /// Remove a query by its [`FullCacheKey`].
790    ///
791    /// This is the type-erased version of [`remove_query`](Self::remove_query).
792    /// Use this when you have a `FullCacheKey` from [`query_keys`](Self::query_keys)
793    /// or [`Tracer::on_query_key`].
794    ///
795    /// Returns `true` if the query was removed, `false` if it doesn't exist.
796    ///
797    /// # Warning
798    ///
799    /// This forcibly removes the query even if other queries depend on it.
800    /// Dependent queries will be recomputed on next access. For safe GC,
801    /// use [`remove_if_unused`](Self::remove_if_unused) instead.
802    pub fn remove(&self, key: &FullCacheKey) -> bool {
803        // Remove verifier if exists
804        self.verifiers.remove(key);
805
806        // Remove from whale storage
807        self.whale.remove(key).is_some()
808    }
809
810    /// Remove a query by its [`FullCacheKey`] if it has no dependents.
811    ///
812    /// This is the type-erased version of [`remove_query_if_unused`](Self::remove_query_if_unused).
813    /// Use this when you have a `FullCacheKey` from [`query_keys`](Self::query_keys)
814    /// or [`Tracer::on_query_key`].
815    ///
816    /// Returns `true` if the query was removed, `false` if it has dependents
817    /// or doesn't exist.
818    ///
819    /// # Example
820    ///
821    /// ```ignore
822    /// // Implement LRU GC
823    /// for key in runtime.query_keys() {
824    ///     if tracker.is_expired(&key) {
825    ///         runtime.remove_if_unused(&key);
826    ///     }
827    /// }
828    /// ```
829    pub fn remove_if_unused(&self, key: &FullCacheKey) -> bool {
830        if self.whale.remove_if_unused(key.clone()).is_some() {
831            // Successfully removed - clean up verifier
832            self.verifiers.remove(key);
833            true
834        } else {
835            false
836        }
837    }
838}
839
840// ============================================================================
841// Builder
842// ============================================================================
843
844/// Builder for [`QueryRuntime`] with customizable settings.
845///
846/// # Example
847///
848/// ```ignore
849/// let runtime = QueryRuntime::builder()
850///     .error_comparator(|a, b| {
851///         // Treat all errors of the same type as equal
852///         a.downcast_ref::<std::io::Error>().is_some()
853///             == b.downcast_ref::<std::io::Error>().is_some()
854///     })
855///     .build();
856/// ```
857pub struct QueryRuntimeBuilder<T: Tracer = NoopTracer> {
858    error_comparator: ErrorComparator,
859    tracer: T,
860}
861
862impl Default for QueryRuntimeBuilder<NoopTracer> {
863    fn default() -> Self {
864        Self::new()
865    }
866}
867
868impl QueryRuntimeBuilder<NoopTracer> {
869    /// Create a new builder with default settings.
870    pub fn new() -> Self {
871        Self {
872            error_comparator: default_error_comparator,
873            tracer: NoopTracer,
874        }
875    }
876}
877
878impl<T: Tracer> QueryRuntimeBuilder<T> {
879    /// Set the error comparator function for early cutoff optimization.
880    ///
881    /// When a query returns `QueryError::UserError`, this function is used
882    /// to compare it with the previously cached error. If they are equal,
883    /// downstream queries can skip recomputation (early cutoff).
884    ///
885    /// The default comparator returns `false` for all errors, meaning errors
886    /// are always considered different (conservative, always recomputes).
887    ///
888    /// # Example
889    ///
890    /// ```ignore
891    /// // Treat errors as equal if they have the same display message
892    /// let runtime = QueryRuntime::builder()
893    ///     .error_comparator(|a, b| a.to_string() == b.to_string())
894    ///     .build();
895    /// ```
896    pub fn error_comparator(mut self, f: ErrorComparator) -> Self {
897        self.error_comparator = f;
898        self
899    }
900
901    /// Set the tracer for observability.
902    pub fn tracer<U: Tracer>(self, tracer: U) -> QueryRuntimeBuilder<U> {
903        QueryRuntimeBuilder {
904            error_comparator: self.error_comparator,
905            tracer,
906        }
907    }
908
909    /// Build the runtime with the configured settings.
910    pub fn build(self) -> QueryRuntime<T> {
911        QueryRuntime {
912            whale: WhaleRuntime::new(),
913            locators: Arc::new(LocatorStorage::new()),
914            pending: Arc::new(PendingStorage::new()),
915            query_registry: Arc::new(QueryRegistry::new()),
916            asset_key_registry: Arc::new(AssetKeyRegistry::new()),
917            verifiers: Arc::new(VerifierStorage::new()),
918            error_comparator: self.error_comparator,
919            tracer: Arc::new(self.tracer),
920        }
921    }
922}
923
924// ============================================================================
925// Asset API
926// ============================================================================
927
928impl<T: Tracer> QueryRuntime<T> {
929    /// Register an asset locator for a specific asset key type.
930    ///
931    /// Only one locator can be registered per key type. Later registrations
932    /// replace earlier ones.
933    ///
934    /// # Example
935    ///
936    /// ```ignore
937    /// let runtime = QueryRuntime::new();
938    /// runtime.register_asset_locator(FileSystemLocator::new("/assets"));
939    /// ```
940    pub fn register_asset_locator<K, L>(&self, locator: L)
941    where
942        K: AssetKey,
943        L: AssetLocator<K>,
944    {
945        self.locators.insert::<K, L>(locator);
946    }
947
948    /// Get an iterator over pending asset requests.
949    ///
950    /// Returns assets that have been requested but not yet resolved.
951    /// The user should fetch these externally and call `resolve_asset()`.
952    ///
953    /// # Example
954    ///
955    /// ```ignore
956    /// for pending in runtime.pending_assets() {
957    ///     if let Some(path) = pending.key::<FilePath>() {
958    ///         let content = fetch_file(path);
959    ///         runtime.resolve_asset(path.clone(), content);
960    ///     }
961    /// }
962    /// ```
963    pub fn pending_assets(&self) -> Vec<PendingAsset> {
964        self.pending.get_all()
965    }
966
967    /// Get pending assets filtered by key type.
968    pub fn pending_assets_of<K: AssetKey>(&self) -> Vec<K> {
969        self.pending.get_of_type::<K>()
970    }
971
972    /// Check if there are any pending assets.
973    pub fn has_pending_assets(&self) -> bool {
974        !self.pending.is_empty()
975    }
976
977    /// Resolve an asset with its loaded value.
978    ///
979    /// This marks the asset as ready and invalidates any queries that
980    /// depend on it (if the value changed), triggering recomputation on next access.
981    ///
982    /// This method is idempotent - resolving with the same value (via `asset_eq`)
983    /// will not trigger downstream recomputation.
984    ///
985    /// # Arguments
986    ///
987    /// * `key` - The asset key identifying this resource
988    /// * `value` - The loaded asset value
989    /// * `durability` - How frequently this asset is expected to change
990    ///
991    /// # Example
992    ///
993    /// ```ignore
994    /// let content = std::fs::read_to_string(&path)?;
995    /// runtime.resolve_asset(FilePath(path), content, DurabilityLevel::Volatile);
996    /// ```
997    pub fn resolve_asset<K: AssetKey>(&self, key: K, value: K::Asset, durability: DurabilityLevel) {
998        self.resolve_asset_internal(key, value, durability);
999    }
1000
1001    /// Resolve an asset with an error.
1002    ///
1003    /// This marks the asset as errored and caches the error. Queries depending
1004    /// on this asset will receive `Err(QueryError::UserError(...))`.
1005    ///
1006    /// Use this when async loading fails (e.g., network error, file not found,
1007    /// access denied).
1008    ///
1009    /// # Arguments
1010    ///
1011    /// * `key` - The asset key identifying this resource
1012    /// * `error` - The error to cache (will be wrapped in `Arc`)
1013    /// * `durability` - How frequently this error state is expected to change
1014    ///
1015    /// # Example
1016    ///
1017    /// ```ignore
1018    /// match fetch_file(&path) {
1019    ///     Ok(content) => runtime.resolve_asset(FilePath(path), content, DurabilityLevel::Volatile),
1020    ///     Err(e) => runtime.resolve_asset_error(FilePath(path), e, DurabilityLevel::Volatile),
1021    /// }
1022    /// ```
1023    pub fn resolve_asset_error<K: AssetKey>(
1024        &self,
1025        key: K,
1026        error: impl Into<anyhow::Error>,
1027        durability: DurabilityLevel,
1028    ) {
1029        let full_asset_key = FullAssetKey::new(&key);
1030        let full_cache_key = FullCacheKey::from_asset_key(&full_asset_key);
1031
1032        // Remove from pending BEFORE registering the error
1033        self.pending.remove(&full_asset_key);
1034
1035        // Prepare the error entry
1036        let error_arc = Arc::new(error.into());
1037        let entry = CachedEntry::AssetError(error_arc.clone());
1038        let durability =
1039            Durability::new(durability.as_u8() as usize).unwrap_or(Durability::volatile());
1040
1041        // Atomic compare-and-update (errors are always considered changed for now)
1042        let result = self
1043            .whale
1044            .update_with_compare(
1045                full_cache_key,
1046                Some(entry),
1047                |old_data, _new_data| {
1048                    // Compare old and new errors using error_comparator
1049                    match old_data.and_then(|d| d.as_ref()) {
1050                        Some(CachedEntry::AssetError(old_err)) => {
1051                            !(self.error_comparator)(old_err.as_ref(), error_arc.as_ref())
1052                        }
1053                        _ => true, // Loading, Ready, or not present -> changed
1054                    }
1055                },
1056                durability,
1057                vec![],
1058            )
1059            .expect("update_with_compare with no dependencies cannot fail");
1060
1061        // Emit asset resolved event (with changed status)
1062        self.tracer.on_asset_resolved(
1063            TracerAssetKey::new(std::any::type_name::<K>(), format!("{:?}", key)),
1064            result.changed,
1065        );
1066
1067        // Register asset key in registry for list_asset_keys
1068        let is_new_asset = self.asset_key_registry.register(&key);
1069        if is_new_asset {
1070            // Update sentinel to invalidate list_asset_keys dependents
1071            let sentinel = FullCacheKey::asset_key_set_sentinel::<K>();
1072            let _ = self
1073                .whale
1074                .register(sentinel, None, Durability::stable(), vec![]);
1075        }
1076    }
1077
1078    fn resolve_asset_internal<K: AssetKey>(
1079        &self,
1080        key: K,
1081        value: K::Asset,
1082        durability_level: DurabilityLevel,
1083    ) {
1084        let full_asset_key = FullAssetKey::new(&key);
1085        let full_cache_key = FullCacheKey::from_asset_key(&full_asset_key);
1086
1087        // Remove from pending BEFORE registering the value
1088        self.pending.remove(&full_asset_key);
1089
1090        // Prepare the new entry
1091        let value_arc: Arc<K::Asset> = Arc::new(value);
1092        let entry = CachedEntry::AssetReady(value_arc.clone() as Arc<dyn Any + Send + Sync>);
1093        let durability =
1094            Durability::new(durability_level.as_u8() as usize).unwrap_or(Durability::volatile());
1095
1096        // Atomic compare-and-update
1097        let result = self
1098            .whale
1099            .update_with_compare(
1100                full_cache_key,
1101                Some(entry),
1102                |old_data, _new_data| {
1103                    // Compare old and new values
1104                    match old_data.and_then(|d| d.as_ref()) {
1105                        Some(CachedEntry::AssetReady(old_arc)) => {
1106                            match old_arc.clone().downcast::<K::Asset>() {
1107                                Ok(old_value) => !K::asset_eq(&old_value, &value_arc),
1108                                Err(_) => true, // Type mismatch, treat as changed
1109                            }
1110                        }
1111                        _ => true, // Loading, NotFound, or not present -> changed
1112                    }
1113                },
1114                durability,
1115                vec![],
1116            )
1117            .expect("update_with_compare with no dependencies cannot fail");
1118
1119        // Emit asset resolved event
1120        self.tracer.on_asset_resolved(
1121            TracerAssetKey::new(std::any::type_name::<K>(), format!("{:?}", key)),
1122            result.changed,
1123        );
1124
1125        // Register asset key in registry for list_asset_keys
1126        let is_new_asset = self.asset_key_registry.register(&key);
1127        if is_new_asset {
1128            // Update sentinel to invalidate list_asset_keys dependents
1129            let sentinel = FullCacheKey::asset_key_set_sentinel::<K>();
1130            let _ = self
1131                .whale
1132                .register(sentinel, None, Durability::stable(), vec![]);
1133        }
1134    }
1135
1136    /// Invalidate an asset, forcing queries to re-request it.
1137    ///
1138    /// The asset will be marked as loading and added to pending assets.
1139    /// Dependent queries will suspend until the asset is resolved again.
1140    ///
1141    /// # Example
1142    ///
1143    /// ```ignore
1144    /// // File was modified externally
1145    /// runtime.invalidate_asset(&FilePath("config.json".into()));
1146    /// // Queries depending on this asset will now suspend
1147    /// // User should fetch the new value and call resolve_asset
1148    /// ```
1149    pub fn invalidate_asset<K: AssetKey>(&self, key: &K) {
1150        let full_asset_key = FullAssetKey::new(key);
1151        let full_cache_key = FullCacheKey::from_asset_key(&full_asset_key);
1152
1153        // Emit asset invalidated event
1154        self.tracer.on_asset_invalidated(TracerAssetKey::new(
1155            std::any::type_name::<K>(),
1156            format!("{:?}", key),
1157        ));
1158
1159        // Add to pending FIRST (before clearing whale state)
1160        // This ensures: readers see either old value, or Loading+pending
1161        self.pending.insert::<K>(full_asset_key, key.clone());
1162
1163        // Atomic: clear cached value + invalidate dependents
1164        // Using None for data means "needs to be loaded"
1165        // Use stable durability to ensure queries at any durability level see the change.
1166        let _ = self
1167            .whale
1168            .register(full_cache_key, None, Durability::stable(), vec![]);
1169    }
1170
1171    /// Remove an asset from the cache entirely.
1172    ///
1173    /// Unlike `invalidate_asset`, this removes all traces of the asset.
1174    /// Dependent queries will go through the locator again on next access.
1175    pub fn remove_asset<K: AssetKey>(&self, key: &K) {
1176        let full_asset_key = FullAssetKey::new(key);
1177        let full_cache_key = FullCacheKey::from_asset_key(&full_asset_key);
1178
1179        // Remove from pending first
1180        self.pending.remove(&full_asset_key);
1181
1182        // Remove from whale (this also cleans up dependency edges)
1183        // whale.remove() invalidates dependents before removing
1184        self.whale.remove(&full_cache_key);
1185
1186        // Remove from registry and update sentinel for list_asset_keys
1187        if self.asset_key_registry.remove::<K>(key) {
1188            let sentinel = FullCacheKey::asset_key_set_sentinel::<K>();
1189            let _ = self
1190                .whale
1191                .register(sentinel, None, Durability::stable(), vec![]);
1192        }
1193    }
1194
1195    /// Get an asset by key without tracking dependencies.
1196    ///
1197    /// Unlike `QueryContext::asset()`, this method does NOT register the caller
1198    /// as a dependent of the asset. Use this for direct asset access outside
1199    /// of query execution.
1200    ///
1201    /// # Returns
1202    ///
1203    /// - `Ok(AssetLoadingState::ready(...))` - Asset is loaded and ready
1204    /// - `Ok(AssetLoadingState::loading(...))` - Asset is still loading (added to pending)
1205    /// - `Err(QueryError::UserError)` - Asset was not found or locator returned an error
1206    pub fn get_asset<K: AssetKey>(&self, key: K) -> Result<AssetLoadingState<K>, QueryError> {
1207        self.get_asset_internal(key)
1208    }
1209
1210    /// Internal: Get asset state and its changed_at revision atomically.
1211    ///
1212    /// This is the "direct" version called from QueryRuntime::asset (no dependency tracking).
1213    /// For calls from QueryContext::asset, use `get_asset_with_revision_ctx`.
1214    ///
1215    /// Returns (AssetLoadingState, changed_at) where changed_at is from the same
1216    /// whale node that provided the asset value.
1217    fn get_asset_with_revision<K: AssetKey>(
1218        &self,
1219        key: K,
1220    ) -> Result<(AssetLoadingState<K>, RevisionCounter), QueryError> {
1221        let full_asset_key = FullAssetKey::new(&key);
1222        let full_cache_key = FullCacheKey::from_asset_key(&full_asset_key);
1223
1224        // Helper to emit AssetRequested event
1225        let emit_requested = |tracer: &T, key: &K, state: TracerAssetState| {
1226            tracer.on_asset_requested(
1227                TracerAssetKey::new(std::any::type_name::<K>(), format!("{:?}", key)),
1228                state,
1229            );
1230        };
1231
1232        // Check whale cache first (single atomic read)
1233        if let Some(node) = self.whale.get(&full_cache_key) {
1234            let changed_at = node.changed_at;
1235            // Check if valid at current revision (shallow check)
1236            if self.whale.is_valid(&full_cache_key) {
1237                // Verify dependencies recursively (like query path does)
1238                let mut deps_verified = true;
1239                if let Some(deps) = self.whale.get_dependency_ids(&full_cache_key) {
1240                    for dep in deps {
1241                        if let Some(verifier) = self.verifiers.get(&dep) {
1242                            // Re-run query/asset to verify it (triggers recursive verification)
1243                            if verifier.verify(self as &dyn std::any::Any).is_err() {
1244                                deps_verified = false;
1245                                break;
1246                            }
1247                        }
1248                    }
1249                }
1250
1251                // Re-check validity after deps are verified
1252                if deps_verified && self.whale.is_valid(&full_cache_key) {
1253                    // For cached entries, check consistency for leaf assets (no locator deps).
1254                    // This detects if resolve_asset/resolve_asset_error was called during query execution.
1255                    let has_locator_deps = self
1256                        .whale
1257                        .get_dependency_ids(&full_cache_key)
1258                        .is_some_and(|deps| !deps.is_empty());
1259
1260                    match &node.data {
1261                        Some(CachedEntry::AssetReady(arc)) => {
1262                            // Check consistency for cached leaf assets
1263                            if !has_locator_deps {
1264                                check_leaf_asset_consistency(changed_at)?;
1265                            }
1266                            emit_requested(&self.tracer, &key, TracerAssetState::Ready);
1267                            match arc.clone().downcast::<K::Asset>() {
1268                                Ok(value) => {
1269                                    return Ok((AssetLoadingState::ready(key, value), changed_at))
1270                                }
1271                                Err(_) => {
1272                                    unreachable!("Asset type mismatch: {:?}", key)
1273                                }
1274                            }
1275                        }
1276                        Some(CachedEntry::AssetError(err)) => {
1277                            // Check consistency for cached leaf errors
1278                            if !has_locator_deps {
1279                                check_leaf_asset_consistency(changed_at)?;
1280                            }
1281                            emit_requested(&self.tracer, &key, TracerAssetState::NotFound);
1282                            return Err(QueryError::UserError(err.clone()));
1283                        }
1284                        None => {
1285                            // Loading state - no value to be inconsistent with
1286                            emit_requested(&self.tracer, &key, TracerAssetState::Loading);
1287                            return Ok((AssetLoadingState::loading(key), changed_at));
1288                        }
1289                        _ => {
1290                            // Query-related entries (Ok, UserError) shouldn't be here
1291                            // Fall through to locator
1292                        }
1293                    }
1294                }
1295            }
1296        }
1297
1298        // Not in cache or invalid - try locator
1299        // Use LocatorContext to track deps on the asset itself
1300        check_cycle(&full_cache_key)?;
1301        let _guard = StackGuard::push(full_cache_key.clone());
1302
1303        let locator_ctx = LocatorContext::new(self, full_cache_key.clone());
1304        let locator_result =
1305            self.locators
1306                .locate_with_locator_ctx(TypeId::of::<K>(), &locator_ctx, &key);
1307
1308        if let Some(result) = locator_result {
1309            // Get collected dependencies from the locator context
1310            let locator_deps = locator_ctx.into_deps();
1311            match result {
1312                Ok(ErasedLocateResult::Ready {
1313                    value: arc,
1314                    durability: durability_level,
1315                }) => {
1316                    emit_requested(&self.tracer, &key, TracerAssetState::Ready);
1317
1318                    let typed_value: Arc<K::Asset> = match arc.downcast::<K::Asset>() {
1319                        Ok(v) => v,
1320                        Err(_) => {
1321                            unreachable!("Asset type mismatch: {:?}", key);
1322                        }
1323                    };
1324
1325                    // Store in whale atomically with early cutoff
1326                    // Include locator's dependencies so the asset is invalidated when they change
1327                    let entry = CachedEntry::AssetReady(typed_value.clone());
1328                    let durability = Durability::new(durability_level.as_u8() as usize)
1329                        .unwrap_or(Durability::volatile());
1330                    let new_value = typed_value.clone();
1331                    let result = self
1332                        .whale
1333                        .update_with_compare(
1334                            full_cache_key.clone(),
1335                            Some(entry),
1336                            |old_data, _new_data| {
1337                                let Some(CachedEntry::AssetReady(old_arc)) =
1338                                    old_data.and_then(|d| d.as_ref())
1339                                else {
1340                                    return true;
1341                                };
1342                                let Ok(old_value) = old_arc.clone().downcast::<K::Asset>() else {
1343                                    return true;
1344                                };
1345                                !K::asset_eq(&old_value, &new_value)
1346                            },
1347                            durability,
1348                            locator_deps,
1349                        )
1350                        .expect("update_with_compare should succeed");
1351
1352                    // Register verifier for this asset (for verify-then-decide pattern)
1353                    self.verifiers
1354                        .insert_asset::<K, T>(full_cache_key, key.clone());
1355
1356                    return Ok((AssetLoadingState::ready(key, typed_value), result.revision));
1357                }
1358                Ok(ErasedLocateResult::Pending) => {
1359                    emit_requested(&self.tracer, &key, TracerAssetState::Loading);
1360
1361                    // Add to pending list for Pending result
1362                    self.pending.insert::<K>(full_asset_key, key.clone());
1363                    match self
1364                        .whale
1365                        .get_or_insert(full_cache_key, None, Durability::volatile(), locator_deps)
1366                        .expect("get_or_insert should succeed")
1367                    {
1368                        GetOrInsertResult::Inserted(node) => {
1369                            return Ok((AssetLoadingState::loading(key), node.changed_at));
1370                        }
1371                        GetOrInsertResult::Existing(node) => {
1372                            let changed_at = node.changed_at;
1373                            match &node.data {
1374                                Some(CachedEntry::AssetReady(arc)) => {
1375                                    match arc.clone().downcast::<K::Asset>() {
1376                                        Ok(value) => {
1377                                            return Ok((
1378                                                AssetLoadingState::ready(key, value),
1379                                                changed_at,
1380                                            ))
1381                                        }
1382                                        Err(_) => {
1383                                            return Ok((
1384                                                AssetLoadingState::loading(key),
1385                                                changed_at,
1386                                            ))
1387                                        }
1388                                    }
1389                                }
1390                                Some(CachedEntry::AssetError(err)) => {
1391                                    return Err(QueryError::UserError(err.clone()));
1392                                }
1393                                _ => return Ok((AssetLoadingState::loading(key), changed_at)),
1394                            }
1395                        }
1396                    }
1397                }
1398                Err(QueryError::UserError(err)) => {
1399                    // Locator returned a user error - cache it as AssetError
1400                    let entry = CachedEntry::AssetError(err.clone());
1401                    let _ = self.whale.register(
1402                        full_cache_key,
1403                        Some(entry),
1404                        Durability::volatile(),
1405                        locator_deps,
1406                    );
1407                    return Err(QueryError::UserError(err));
1408                }
1409                Err(e) => {
1410                    // Other errors (Cycle, Suspended, etc.) - do NOT cache, propagate directly
1411                    return Err(e);
1412                }
1413            }
1414        }
1415
1416        // No locator registered or locator returned None - mark as pending
1417        // (no locator was called, so no deps to track)
1418        emit_requested(&self.tracer, &key, TracerAssetState::Loading);
1419        self.pending
1420            .insert::<K>(full_asset_key.clone(), key.clone());
1421
1422        match self
1423            .whale
1424            .get_or_insert(full_cache_key, None, Durability::volatile(), vec![])
1425            .expect("get_or_insert with no dependencies cannot fail")
1426        {
1427            GetOrInsertResult::Inserted(node) => {
1428                Ok((AssetLoadingState::loading(key), node.changed_at))
1429            }
1430            GetOrInsertResult::Existing(node) => {
1431                let changed_at = node.changed_at;
1432                match &node.data {
1433                    Some(CachedEntry::AssetReady(arc)) => {
1434                        match arc.clone().downcast::<K::Asset>() {
1435                            Ok(value) => Ok((AssetLoadingState::ready(key, value), changed_at)),
1436                            Err(_) => Ok((AssetLoadingState::loading(key), changed_at)),
1437                        }
1438                    }
1439                    Some(CachedEntry::AssetError(err)) => Err(QueryError::UserError(err.clone())),
1440                    _ => Ok((AssetLoadingState::loading(key), changed_at)),
1441                }
1442            }
1443        }
1444    }
1445
1446    /// Internal: Get asset state and its changed_at revision atomically (with QueryContext).
1447    ///
1448    /// This version is called from QueryContext::asset. Consistency checking for
1449    /// cached leaf assets is done inside this function before returning.
1450    fn get_asset_with_revision_ctx<K: AssetKey>(
1451        &self,
1452        key: K,
1453        _ctx: &QueryContext<'_, T>,
1454    ) -> Result<(AssetLoadingState<K>, RevisionCounter), QueryError> {
1455        let full_asset_key = FullAssetKey::new(&key);
1456        let full_cache_key = FullCacheKey::from_asset_key(&full_asset_key);
1457
1458        // Helper to emit AssetRequested event
1459        let emit_requested = |tracer: &T, key: &K, state: TracerAssetState| {
1460            tracer.on_asset_requested(
1461                TracerAssetKey::new(std::any::type_name::<K>(), format!("{:?}", key)),
1462                state,
1463            );
1464        };
1465
1466        // Check whale cache first (single atomic read)
1467        if let Some(node) = self.whale.get(&full_cache_key) {
1468            let changed_at = node.changed_at;
1469            // Check if valid at current revision (shallow check)
1470            if self.whale.is_valid(&full_cache_key) {
1471                // Verify dependencies recursively (like query path does)
1472                let mut deps_verified = true;
1473                if let Some(deps) = self.whale.get_dependency_ids(&full_cache_key) {
1474                    for dep in deps {
1475                        if let Some(verifier) = self.verifiers.get(&dep) {
1476                            // Re-run query/asset to verify it (triggers recursive verification)
1477                            if verifier.verify(self as &dyn std::any::Any).is_err() {
1478                                deps_verified = false;
1479                                break;
1480                            }
1481                        }
1482                    }
1483                }
1484
1485                // Re-check validity after deps are verified
1486                if deps_verified && self.whale.is_valid(&full_cache_key) {
1487                    // For cached entries, check consistency for leaf assets (no locator deps).
1488                    // This detects if resolve_asset/resolve_asset_error was called during query execution.
1489                    let has_locator_deps = self
1490                        .whale
1491                        .get_dependency_ids(&full_cache_key)
1492                        .is_some_and(|deps| !deps.is_empty());
1493
1494                    match &node.data {
1495                        Some(CachedEntry::AssetReady(arc)) => {
1496                            // Check consistency for cached leaf assets
1497                            if !has_locator_deps {
1498                                check_leaf_asset_consistency(changed_at)?;
1499                            }
1500                            emit_requested(&self.tracer, &key, TracerAssetState::Ready);
1501                            match arc.clone().downcast::<K::Asset>() {
1502                                Ok(value) => {
1503                                    return Ok((AssetLoadingState::ready(key, value), changed_at))
1504                                }
1505                                Err(_) => {
1506                                    unreachable!("Asset type mismatch: {:?}", key)
1507                                }
1508                            }
1509                        }
1510                        Some(CachedEntry::AssetError(err)) => {
1511                            // Check consistency for cached leaf errors
1512                            if !has_locator_deps {
1513                                check_leaf_asset_consistency(changed_at)?;
1514                            }
1515                            emit_requested(&self.tracer, &key, TracerAssetState::NotFound);
1516                            return Err(QueryError::UserError(err.clone()));
1517                        }
1518                        None => {
1519                            // Loading state - no value to be inconsistent with
1520                            emit_requested(&self.tracer, &key, TracerAssetState::Loading);
1521                            return Ok((AssetLoadingState::loading(key), changed_at));
1522                        }
1523                        _ => {
1524                            // Query-related entries (Ok, UserError) shouldn't be here
1525                            // Fall through to locator
1526                        }
1527                    }
1528                }
1529            }
1530        }
1531
1532        // Not in cache or invalid - try locator
1533        // Use LocatorContext to track deps on the asset itself (not the calling query)
1534        // Consistency tracking is handled via thread-local storage
1535        check_cycle(&full_cache_key)?;
1536        let _guard = StackGuard::push(full_cache_key.clone());
1537
1538        let locator_ctx = LocatorContext::new(self, full_cache_key.clone());
1539        let locator_result =
1540            self.locators
1541                .locate_with_locator_ctx(TypeId::of::<K>(), &locator_ctx, &key);
1542
1543        if let Some(result) = locator_result {
1544            // Get collected dependencies from the locator context
1545            let locator_deps = locator_ctx.into_deps();
1546            match result {
1547                Ok(ErasedLocateResult::Ready {
1548                    value: arc,
1549                    durability: durability_level,
1550                }) => {
1551                    emit_requested(&self.tracer, &key, TracerAssetState::Ready);
1552
1553                    let typed_value: Arc<K::Asset> = match arc.downcast::<K::Asset>() {
1554                        Ok(v) => v,
1555                        Err(_) => {
1556                            unreachable!("Asset type mismatch: {:?}", key);
1557                        }
1558                    };
1559
1560                    // Store in whale atomically with early cutoff
1561                    // Include locator's dependencies so the asset is invalidated when they change
1562                    let entry = CachedEntry::AssetReady(typed_value.clone());
1563                    let durability = Durability::new(durability_level.as_u8() as usize)
1564                        .unwrap_or(Durability::volatile());
1565                    let new_value = typed_value.clone();
1566                    let result = self
1567                        .whale
1568                        .update_with_compare(
1569                            full_cache_key.clone(),
1570                            Some(entry),
1571                            |old_data, _new_data| {
1572                                let Some(CachedEntry::AssetReady(old_arc)) =
1573                                    old_data.and_then(|d| d.as_ref())
1574                                else {
1575                                    return true;
1576                                };
1577                                let Ok(old_value) = old_arc.clone().downcast::<K::Asset>() else {
1578                                    return true;
1579                                };
1580                                !K::asset_eq(&old_value, &new_value)
1581                            },
1582                            durability,
1583                            locator_deps,
1584                        )
1585                        .expect("update_with_compare should succeed");
1586
1587                    // Register verifier for this asset (for verify-then-decide pattern)
1588                    self.verifiers
1589                        .insert_asset::<K, T>(full_cache_key, key.clone());
1590
1591                    return Ok((AssetLoadingState::ready(key, typed_value), result.revision));
1592                }
1593                Ok(ErasedLocateResult::Pending) => {
1594                    emit_requested(&self.tracer, &key, TracerAssetState::Loading);
1595
1596                    // Add to pending list for Pending result
1597                    self.pending.insert::<K>(full_asset_key, key.clone());
1598                    match self
1599                        .whale
1600                        .get_or_insert(full_cache_key, None, Durability::volatile(), locator_deps)
1601                        .expect("get_or_insert should succeed")
1602                    {
1603                        GetOrInsertResult::Inserted(node) => {
1604                            return Ok((AssetLoadingState::loading(key), node.changed_at));
1605                        }
1606                        GetOrInsertResult::Existing(node) => {
1607                            let changed_at = node.changed_at;
1608                            match &node.data {
1609                                Some(CachedEntry::AssetReady(arc)) => {
1610                                    match arc.clone().downcast::<K::Asset>() {
1611                                        Ok(value) => {
1612                                            return Ok((
1613                                                AssetLoadingState::ready(key, value),
1614                                                changed_at,
1615                                            ));
1616                                        }
1617                                        Err(_) => {
1618                                            return Ok((
1619                                                AssetLoadingState::loading(key),
1620                                                changed_at,
1621                                            ))
1622                                        }
1623                                    }
1624                                }
1625                                Some(CachedEntry::AssetError(err)) => {
1626                                    return Err(QueryError::UserError(err.clone()));
1627                                }
1628                                _ => return Ok((AssetLoadingState::loading(key), changed_at)),
1629                            }
1630                        }
1631                    }
1632                }
1633                Err(QueryError::UserError(err)) => {
1634                    // Locator returned a user error - cache it as AssetError
1635                    let entry = CachedEntry::AssetError(err.clone());
1636                    let _ = self.whale.register(
1637                        full_cache_key,
1638                        Some(entry),
1639                        Durability::volatile(),
1640                        locator_deps,
1641                    );
1642                    return Err(QueryError::UserError(err));
1643                }
1644                Err(e) => {
1645                    // Other errors (Cycle, Suspended, etc.) - do NOT cache, propagate directly
1646                    return Err(e);
1647                }
1648            }
1649        }
1650
1651        // No locator registered or locator returned None - mark as pending
1652        emit_requested(&self.tracer, &key, TracerAssetState::Loading);
1653        self.pending
1654            .insert::<K>(full_asset_key.clone(), key.clone());
1655
1656        match self
1657            .whale
1658            .get_or_insert(full_cache_key, None, Durability::volatile(), vec![])
1659            .expect("get_or_insert with no dependencies cannot fail")
1660        {
1661            GetOrInsertResult::Inserted(node) => {
1662                Ok((AssetLoadingState::loading(key), node.changed_at))
1663            }
1664            GetOrInsertResult::Existing(node) => {
1665                let changed_at = node.changed_at;
1666                match &node.data {
1667                    Some(CachedEntry::AssetReady(arc)) => {
1668                        match arc.clone().downcast::<K::Asset>() {
1669                            Ok(value) => Ok((AssetLoadingState::ready(key, value), changed_at)),
1670                            Err(_) => Ok((AssetLoadingState::loading(key), changed_at)),
1671                        }
1672                    }
1673                    Some(CachedEntry::AssetError(err)) => Err(QueryError::UserError(err.clone())),
1674                    _ => Ok((AssetLoadingState::loading(key), changed_at)),
1675                }
1676            }
1677        }
1678    }
1679
1680    /// Internal: Get asset state, checking cache and locator.
1681    fn get_asset_internal<K: AssetKey>(&self, key: K) -> Result<AssetLoadingState<K>, QueryError> {
1682        self.get_asset_with_revision(key).map(|(state, _)| state)
1683    }
1684}
1685
1686impl<T: Tracer> Db for QueryRuntime<T> {
1687    fn query<Q: Query>(&self, query: Q) -> Result<Arc<Q::Output>, QueryError> {
1688        QueryRuntime::query(self, query)
1689    }
1690
1691    fn asset<K: AssetKey>(&self, key: K) -> Result<Arc<K::Asset>, QueryError> {
1692        self.get_asset_internal(key)?.suspend()
1693    }
1694
1695    fn asset_state<K: AssetKey>(&self, key: K) -> Result<AssetLoadingState<K>, QueryError> {
1696        self.get_asset_internal(key)
1697    }
1698
1699    fn list_queries<Q: Query>(&self) -> Vec<Q> {
1700        self.query_registry.get_all::<Q>()
1701    }
1702
1703    fn list_asset_keys<K: AssetKey>(&self) -> Vec<K> {
1704        self.asset_key_registry.get_all::<K>()
1705    }
1706}
1707
1708/// Tracks consistency of leaf asset accesses during query execution.
1709///
1710/// A "leaf" asset is one without dependencies (externally resolved via `resolve_asset`).
1711/// This tracker ensures that all leaf assets accessed during a query execution
1712/// (including those accessed by locators) are consistent - i.e., none were modified
1713/// via `resolve_asset` mid-execution.
1714///
1715/// The tracker is shared across `QueryContext` and `LocatorContext` to propagate
1716/// consistency checking through the entire execution tree.
1717#[derive(Debug)]
1718pub(crate) struct ConsistencyTracker {
1719    /// Global revision at query start. All leaf assets must have changed_at <= this.
1720    start_revision: RevisionCounter,
1721}
1722
1723impl ConsistencyTracker {
1724    /// Create a new tracker with the given start revision.
1725    pub fn new(start_revision: RevisionCounter) -> Self {
1726        Self { start_revision }
1727    }
1728
1729    /// Check consistency for a leaf asset access.
1730    ///
1731    /// A leaf asset is consistent if its changed_at <= start_revision.
1732    /// This detects if resolve_asset was called during query execution.
1733    ///
1734    /// Returns Ok(()) if consistent, Err if inconsistent.
1735    pub fn check_leaf_asset(&self, dep_changed_at: RevisionCounter) -> Result<(), QueryError> {
1736        if dep_changed_at > self.start_revision {
1737            Err(QueryError::InconsistentAssetResolution)
1738        } else {
1739            Ok(())
1740        }
1741    }
1742}
1743
1744/// Context provided to queries during execution.
1745///
1746/// Use this to access dependencies via `query()`.
1747pub struct QueryContext<'a, T: Tracer = NoopTracer> {
1748    runtime: &'a QueryRuntime<T>,
1749    current_key: FullCacheKey,
1750    parent_query_type: &'static str,
1751    exec_ctx: ExecutionContext,
1752    deps: RefCell<Vec<FullCacheKey>>,
1753}
1754
1755impl<'a, T: Tracer> QueryContext<'a, T> {
1756    /// Query a dependency.
1757    ///
1758    /// The dependency is automatically tracked for invalidation.
1759    ///
1760    /// # Example
1761    ///
1762    /// ```ignore
1763    /// fn query(self, db: &impl Db) -> Result<Self::Output, QueryError> {
1764    ///     let dep_result = db.query(OtherQuery { id: self.id })?;
1765    ///     Ok(process(&dep_result))
1766    /// }
1767    /// ```
1768    pub fn query<Q: Query>(&self, query: Q) -> Result<Arc<Q::Output>, QueryError> {
1769        let key = query.cache_key();
1770        let full_key = FullCacheKey::new::<Q, _>(&key);
1771
1772        // Emit dependency registered event
1773        self.runtime.tracer.on_dependency_registered(
1774            self.exec_ctx.span_id(),
1775            TracerQueryKey::new(self.parent_query_type, self.current_key.debug_repr()),
1776            TracerQueryKey::new(std::any::type_name::<Q>(), full_key.debug_repr()),
1777        );
1778
1779        // Record this as a dependency
1780        self.deps.borrow_mut().push(full_key.clone());
1781
1782        // Execute the query
1783        self.runtime.query(query)
1784    }
1785
1786    /// Access an asset, tracking it as a dependency.
1787    ///
1788    /// Returns the asset value if ready, or `Err(QueryError::Suspend)` if still loading.
1789    /// Use this with the `?` operator for automatic suspension on loading.
1790    ///
1791    /// # Example
1792    ///
1793    /// ```ignore
1794    /// #[query]
1795    /// fn process_file(db: &impl Db, path: FilePath) -> Result<Output, QueryError> {
1796    ///     let content = db.asset(path)?;
1797    ///     // Process content...
1798    ///     Ok(output)
1799    /// }
1800    /// ```
1801    ///
1802    /// # Errors
1803    ///
1804    /// - Returns `Err(QueryError::Suspend)` if the asset is still loading.
1805    /// - Returns `Err(QueryError::UserError)` if the asset was not found.
1806    pub fn asset<K: AssetKey>(&self, key: K) -> Result<Arc<K::Asset>, QueryError> {
1807        self.asset_state(key)?.suspend()
1808    }
1809
1810    /// Access an asset's loading state, tracking it as a dependency.
1811    ///
1812    /// Unlike [`asset()`](Self::asset), this method returns the full loading state,
1813    /// allowing you to check if an asset is loading without triggering suspension.
1814    ///
1815    /// # Example
1816    ///
1817    /// ```ignore
1818    /// let state = db.asset_state(key)?;
1819    /// if state.is_loading() {
1820    ///     // Handle loading case explicitly
1821    /// } else {
1822    ///     let value = state.get().unwrap();
1823    /// }
1824    /// ```
1825    ///
1826    /// # Errors
1827    ///
1828    /// Returns `Err(QueryError::UserError)` if the asset was not found.
1829    pub fn asset_state<K: AssetKey>(&self, key: K) -> Result<AssetLoadingState<K>, QueryError> {
1830        let full_asset_key = FullAssetKey::new(&key);
1831        let full_cache_key = FullCacheKey::from_asset_key(&full_asset_key);
1832
1833        // 1. Emit asset dependency registered event
1834        self.runtime.tracer.on_asset_dependency_registered(
1835            self.exec_ctx.span_id(),
1836            TracerQueryKey::new(self.parent_query_type, self.current_key.debug_repr()),
1837            TracerAssetKey::new(std::any::type_name::<K>(), format!("{:?}", key)),
1838        );
1839
1840        // 2. Record dependency on this asset
1841        self.deps.borrow_mut().push(full_cache_key);
1842
1843        // 3. Get asset from cache/locator
1844        // Consistency checking for cached leaf assets is done inside get_asset_with_revision_ctx
1845        let (state, _changed_at) = self.runtime.get_asset_with_revision_ctx(key, self)?;
1846
1847        Ok(state)
1848    }
1849
1850    /// List all query instances of type Q that have been registered.
1851    ///
1852    /// This method establishes a dependency on the "set" of queries of type Q.
1853    /// The calling query will be invalidated when:
1854    /// - A new query of type Q is first executed (added to set)
1855    ///
1856    /// The calling query will NOT be invalidated when:
1857    /// - An individual query of type Q has its value change
1858    ///
1859    /// # Example
1860    ///
1861    /// ```ignore
1862    /// #[query]
1863    /// fn all_results(db: &impl Db) -> Result<Vec<i32>, QueryError> {
1864    ///     let queries = db.list_queries::<MyQuery>();
1865    ///     let mut results = Vec::new();
1866    ///     for q in queries {
1867    ///         results.push(*db.query(q)?);
1868    ///     }
1869    ///     Ok(results)
1870    /// }
1871    /// ```
1872    pub fn list_queries<Q: Query>(&self) -> Vec<Q> {
1873        // Record dependency on the sentinel (set-level dependency)
1874        let sentinel = FullCacheKey::query_set_sentinel::<Q>();
1875
1876        self.runtime.tracer.on_dependency_registered(
1877            self.exec_ctx.span_id(),
1878            TracerQueryKey::new(self.parent_query_type, self.current_key.debug_repr()),
1879            TracerQueryKey::new("QuerySet", sentinel.debug_repr()),
1880        );
1881
1882        // Ensure sentinel exists in whale (for dependency tracking)
1883        if self.runtime.whale.get(&sentinel).is_none() {
1884            let _ =
1885                self.runtime
1886                    .whale
1887                    .register(sentinel.clone(), None, Durability::volatile(), vec![]);
1888        }
1889
1890        self.deps.borrow_mut().push(sentinel);
1891
1892        // Return all registered queries
1893        self.runtime.query_registry.get_all::<Q>()
1894    }
1895
1896    /// List all asset keys of type K that have been registered.
1897    ///
1898    /// This method establishes a dependency on the "set" of asset keys of type K.
1899    /// The calling query will be invalidated when:
1900    /// - A new asset of type K is resolved for the first time (added to set)
1901    /// - An asset of type K is removed via remove_asset
1902    ///
1903    /// The calling query will NOT be invalidated when:
1904    /// - An individual asset's value changes (use `db.asset()` for that)
1905    ///
1906    /// # Example
1907    ///
1908    /// ```ignore
1909    /// #[query]
1910    /// fn all_configs(db: &impl Db) -> Result<Vec<String>, QueryError> {
1911    ///     let keys = db.list_asset_keys::<ConfigFile>();
1912    ///     let mut contents = Vec::new();
1913    ///     for key in keys {
1914    ///         let content = db.asset(key)?;
1915    ///         contents.push((*content).clone());
1916    ///     }
1917    ///     Ok(contents)
1918    /// }
1919    /// ```
1920    pub fn list_asset_keys<K: AssetKey>(&self) -> Vec<K> {
1921        // Record dependency on the sentinel (set-level dependency)
1922        let sentinel = FullCacheKey::asset_key_set_sentinel::<K>();
1923
1924        self.runtime.tracer.on_asset_dependency_registered(
1925            self.exec_ctx.span_id(),
1926            TracerQueryKey::new(self.parent_query_type, self.current_key.debug_repr()),
1927            TracerAssetKey::new("AssetKeySet", sentinel.debug_repr()),
1928        );
1929
1930        // Ensure sentinel exists in whale (for dependency tracking)
1931        if self.runtime.whale.get(&sentinel).is_none() {
1932            let _ =
1933                self.runtime
1934                    .whale
1935                    .register(sentinel.clone(), None, Durability::volatile(), vec![]);
1936        }
1937
1938        self.deps.borrow_mut().push(sentinel);
1939
1940        // Return all registered asset keys
1941        self.runtime.asset_key_registry.get_all::<K>()
1942    }
1943}
1944
1945impl<'a, T: Tracer> Db for QueryContext<'a, T> {
1946    fn query<Q: Query>(&self, query: Q) -> Result<Arc<Q::Output>, QueryError> {
1947        QueryContext::query(self, query)
1948    }
1949
1950    fn asset<K: AssetKey>(&self, key: K) -> Result<Arc<K::Asset>, QueryError> {
1951        QueryContext::asset(self, key)
1952    }
1953
1954    fn asset_state<K: AssetKey>(&self, key: K) -> Result<AssetLoadingState<K>, QueryError> {
1955        QueryContext::asset_state(self, key)
1956    }
1957
1958    fn list_queries<Q: Query>(&self) -> Vec<Q> {
1959        QueryContext::list_queries(self)
1960    }
1961
1962    fn list_asset_keys<K: AssetKey>(&self) -> Vec<K> {
1963        QueryContext::list_asset_keys(self)
1964    }
1965}
1966
1967/// Context for collecting dependencies during asset locator execution.
1968///
1969/// Unlike `QueryContext`, this is specifically for locators and does not
1970/// register dependencies on any parent query. Dependencies collected here
1971/// are stored with the asset itself.
1972pub(crate) struct LocatorContext<'a, T: Tracer> {
1973    runtime: &'a QueryRuntime<T>,
1974    deps: RefCell<Vec<FullCacheKey>>,
1975}
1976
1977impl<'a, T: Tracer> LocatorContext<'a, T> {
1978    /// Create a new locator context for the given asset key.
1979    ///
1980    /// Consistency tracking is handled via thread-local storage, so leaf asset
1981    /// accesses will be checked against any active tracker from a parent query.
1982    pub(crate) fn new(runtime: &'a QueryRuntime<T>, _asset_key: FullCacheKey) -> Self {
1983        Self {
1984            runtime,
1985            deps: RefCell::new(Vec::new()),
1986        }
1987    }
1988
1989    /// Consume this context and return the collected dependencies.
1990    pub(crate) fn into_deps(self) -> Vec<FullCacheKey> {
1991        self.deps.into_inner()
1992    }
1993}
1994
1995impl<T: Tracer> Db for LocatorContext<'_, T> {
1996    fn query<Q: Query>(&self, query: Q) -> Result<Arc<Q::Output>, QueryError> {
1997        let key = query.cache_key();
1998        let full_key = FullCacheKey::new::<Q, _>(&key);
1999
2000        // Record this as a dependency of the asset being located
2001        self.deps.borrow_mut().push(full_key);
2002
2003        // Execute the query via the runtime
2004        self.runtime.query(query)
2005    }
2006
2007    fn asset<K: AssetKey>(&self, key: K) -> Result<Arc<K::Asset>, QueryError> {
2008        self.asset_state(key)?.suspend()
2009    }
2010
2011    fn asset_state<K: AssetKey>(&self, key: K) -> Result<AssetLoadingState<K>, QueryError> {
2012        let full_asset_key = FullAssetKey::new(&key);
2013        let full_cache_key = FullCacheKey::from_asset_key(&full_asset_key);
2014
2015        // Record this as a dependency of the asset being located
2016        self.deps.borrow_mut().push(full_cache_key);
2017
2018        // Access the asset - consistency checking is done inside get_asset_with_revision
2019        let (state, _changed_at) = self.runtime.get_asset_with_revision(key)?;
2020
2021        Ok(state)
2022    }
2023
2024    fn list_queries<Q: Query>(&self) -> Vec<Q> {
2025        self.runtime.list_queries()
2026    }
2027
2028    fn list_asset_keys<K: AssetKey>(&self) -> Vec<K> {
2029        self.runtime.list_asset_keys()
2030    }
2031}
2032
2033#[cfg(test)]
2034mod tests {
2035    use super::*;
2036
2037    #[test]
2038    fn test_simple_query() {
2039        #[derive(Clone)]
2040        struct Add {
2041            a: i32,
2042            b: i32,
2043        }
2044
2045        impl Query for Add {
2046            type CacheKey = (i32, i32);
2047            type Output = i32;
2048
2049            fn cache_key(&self) -> Self::CacheKey {
2050                (self.a, self.b)
2051            }
2052
2053            fn query(self, _db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
2054                Ok(Arc::new(self.a + self.b))
2055            }
2056
2057            fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
2058                old == new
2059            }
2060        }
2061
2062        let runtime = QueryRuntime::new();
2063
2064        let result = runtime.query(Add { a: 1, b: 2 }).unwrap();
2065        assert_eq!(*result, 3);
2066
2067        // Second query should be cached
2068        let result2 = runtime.query(Add { a: 1, b: 2 }).unwrap();
2069        assert_eq!(*result2, 3);
2070    }
2071
2072    #[test]
2073    fn test_dependent_queries() {
2074        #[derive(Clone)]
2075        struct Base {
2076            value: i32,
2077        }
2078
2079        impl Query for Base {
2080            type CacheKey = i32;
2081            type Output = i32;
2082
2083            fn cache_key(&self) -> Self::CacheKey {
2084                self.value
2085            }
2086
2087            fn query(self, _db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
2088                Ok(Arc::new(self.value * 2))
2089            }
2090
2091            fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
2092                old == new
2093            }
2094        }
2095
2096        #[derive(Clone)]
2097        struct Derived {
2098            base_value: i32,
2099        }
2100
2101        impl Query for Derived {
2102            type CacheKey = i32;
2103            type Output = i32;
2104
2105            fn cache_key(&self) -> Self::CacheKey {
2106                self.base_value
2107            }
2108
2109            fn query(self, db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
2110                let base = db.query(Base {
2111                    value: self.base_value,
2112                })?;
2113                Ok(Arc::new(*base + 10))
2114            }
2115
2116            fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
2117                old == new
2118            }
2119        }
2120
2121        let runtime = QueryRuntime::new();
2122
2123        let result = runtime.query(Derived { base_value: 5 }).unwrap();
2124        assert_eq!(*result, 20); // 5 * 2 + 10
2125    }
2126
2127    #[test]
2128    fn test_cycle_detection() {
2129        #[derive(Clone)]
2130        struct CycleA {
2131            id: i32,
2132        }
2133
2134        #[derive(Clone)]
2135        struct CycleB {
2136            id: i32,
2137        }
2138
2139        impl Query for CycleA {
2140            type CacheKey = i32;
2141            type Output = i32;
2142
2143            fn cache_key(&self) -> Self::CacheKey {
2144                self.id
2145            }
2146
2147            fn query(self, db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
2148                let b = db.query(CycleB { id: self.id })?;
2149                Ok(Arc::new(*b + 1))
2150            }
2151
2152            fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
2153                old == new
2154            }
2155        }
2156
2157        impl Query for CycleB {
2158            type CacheKey = i32;
2159            type Output = i32;
2160
2161            fn cache_key(&self) -> Self::CacheKey {
2162                self.id
2163            }
2164
2165            fn query(self, db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
2166                let a = db.query(CycleA { id: self.id })?;
2167                Ok(Arc::new(*a + 1))
2168            }
2169
2170            fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
2171                old == new
2172            }
2173        }
2174
2175        let runtime = QueryRuntime::new();
2176
2177        let result = runtime.query(CycleA { id: 1 });
2178        assert!(matches!(result, Err(QueryError::Cycle { .. })));
2179    }
2180
2181    #[test]
2182    fn test_fallible_query() {
2183        #[derive(Clone)]
2184        struct ParseInt {
2185            input: String,
2186        }
2187
2188        impl Query for ParseInt {
2189            type CacheKey = String;
2190            type Output = Result<i32, std::num::ParseIntError>;
2191
2192            fn cache_key(&self) -> Self::CacheKey {
2193                self.input.clone()
2194            }
2195
2196            fn query(self, _db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
2197                Ok(Arc::new(self.input.parse()))
2198            }
2199
2200            fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
2201                old == new
2202            }
2203        }
2204
2205        let runtime = QueryRuntime::new();
2206
2207        // Valid parse
2208        let result = runtime
2209            .query(ParseInt {
2210                input: "42".to_string(),
2211            })
2212            .unwrap();
2213        assert_eq!(*result, Ok(42));
2214
2215        // Invalid parse - system succeeds, user error in output
2216        let result = runtime
2217            .query(ParseInt {
2218                input: "not_a_number".to_string(),
2219            })
2220            .unwrap();
2221        assert!(result.is_err());
2222    }
2223
2224    // Macro tests
2225    mod macro_tests {
2226        use super::*;
2227        use crate::query;
2228
2229        #[query]
2230        fn add(db: &impl Db, a: i32, b: i32) -> Result<i32, QueryError> {
2231            let _ = db; // silence unused warning
2232            Ok(a + b)
2233        }
2234
2235        #[test]
2236        fn test_macro_basic() {
2237            let runtime = QueryRuntime::new();
2238            let result = runtime.query(Add::new(1, 2)).unwrap();
2239            assert_eq!(*result, 3);
2240        }
2241
2242        #[query]
2243        fn simple_double(db: &impl Db, x: i32) -> Result<i32, QueryError> {
2244            let _ = db;
2245            Ok(x * 2)
2246        }
2247
2248        #[test]
2249        fn test_macro_simple() {
2250            let runtime = QueryRuntime::new();
2251            let result = runtime.query(SimpleDouble::new(5)).unwrap();
2252            assert_eq!(*result, 10);
2253        }
2254
2255        #[query(keys(id))]
2256        fn with_key_selection(
2257            db: &impl Db,
2258            id: u32,
2259            include_extra: bool,
2260        ) -> Result<String, QueryError> {
2261            let _ = db;
2262            Ok(format!("id={}, extra={}", id, include_extra))
2263        }
2264
2265        #[test]
2266        fn test_macro_key_selection() {
2267            let runtime = QueryRuntime::new();
2268
2269            // Same id, different include_extra - should return cached
2270            let r1 = runtime.query(WithKeySelection::new(1, true)).unwrap();
2271            let r2 = runtime.query(WithKeySelection::new(1, false)).unwrap();
2272
2273            // Both should have same value because only `id` is the key
2274            assert_eq!(*r1, "id=1, extra=true");
2275            assert_eq!(*r2, "id=1, extra=true"); // Cached!
2276        }
2277
2278        #[query]
2279        fn dependent(db: &impl Db, a: i32, b: i32) -> Result<i32, QueryError> {
2280            let sum = db.query(Add::new(a, b))?;
2281            Ok(*sum * 2)
2282        }
2283
2284        #[test]
2285        fn test_macro_dependencies() {
2286            let runtime = QueryRuntime::new();
2287            let result = runtime.query(Dependent::new(3, 4)).unwrap();
2288            assert_eq!(*result, 14); // (3 + 4) * 2
2289        }
2290
2291        #[query(output_eq)]
2292        fn with_output_eq(db: &impl Db, x: i32) -> Result<i32, QueryError> {
2293            let _ = db;
2294            Ok(x * 2)
2295        }
2296
2297        #[test]
2298        fn test_macro_output_eq() {
2299            let runtime = QueryRuntime::new();
2300            let result = runtime.query(WithOutputEq::new(5)).unwrap();
2301            assert_eq!(*result, 10);
2302        }
2303
2304        #[query(name = "CustomName")]
2305        fn original_name(db: &impl Db, x: i32) -> Result<i32, QueryError> {
2306            let _ = db;
2307            Ok(x)
2308        }
2309
2310        #[test]
2311        fn test_macro_custom_name() {
2312            let runtime = QueryRuntime::new();
2313            let result = runtime.query(CustomName::new(42)).unwrap();
2314            assert_eq!(*result, 42);
2315        }
2316
2317        // Test that attribute macros like #[tracing::instrument] are preserved
2318        // We use #[allow(unused_variables)] and #[inline] as test attributes since
2319        // they don't require external dependencies.
2320        #[allow(unused_variables)]
2321        #[inline]
2322        #[query]
2323        fn with_attributes(db: &impl Db, x: i32) -> Result<i32, QueryError> {
2324            // This would warn without #[allow(unused_variables)] on the generated method
2325            let unused_var = 42;
2326            Ok(x * 2)
2327        }
2328
2329        #[test]
2330        fn test_macro_preserves_attributes() {
2331            let runtime = QueryRuntime::new();
2332            // If attributes weren't preserved, this might warn about unused_var
2333            let result = runtime.query(WithAttributes::new(5)).unwrap();
2334            assert_eq!(*result, 10);
2335        }
2336    }
2337
2338    // Tests for poll() and changed_at()
2339    mod poll_tests {
2340        use super::*;
2341
2342        #[derive(Clone)]
2343        struct Counter {
2344            id: i32,
2345        }
2346
2347        impl Query for Counter {
2348            type CacheKey = i32;
2349            type Output = i32;
2350
2351            fn cache_key(&self) -> Self::CacheKey {
2352                self.id
2353            }
2354
2355            fn query(self, _db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
2356                Ok(Arc::new(self.id * 10))
2357            }
2358
2359            fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
2360                old == new
2361            }
2362        }
2363
2364        #[test]
2365        fn test_poll_returns_value_and_revision() {
2366            let runtime = QueryRuntime::new();
2367
2368            let result = runtime.poll(Counter { id: 1 }).unwrap();
2369
2370            // Value should be correct - access through Result and Arc
2371            assert_eq!(**result.value.as_ref().unwrap(), 10);
2372
2373            // Revision should be non-zero after first execution
2374            assert!(result.revision > 0);
2375        }
2376
2377        #[test]
2378        fn test_poll_revision_stable_on_cache_hit() {
2379            let runtime = QueryRuntime::new();
2380
2381            // First poll
2382            let result1 = runtime.poll(Counter { id: 1 }).unwrap();
2383            let rev1 = result1.revision;
2384
2385            // Second poll (cache hit)
2386            let result2 = runtime.poll(Counter { id: 1 }).unwrap();
2387            let rev2 = result2.revision;
2388
2389            // Revision should be the same (no change)
2390            assert_eq!(rev1, rev2);
2391        }
2392
2393        #[test]
2394        fn test_poll_revision_changes_on_invalidate() {
2395            let runtime = QueryRuntime::new();
2396
2397            // First poll
2398            let result1 = runtime.poll(Counter { id: 1 }).unwrap();
2399            let rev1 = result1.revision;
2400
2401            // Invalidate and poll again
2402            runtime.invalidate::<Counter>(&1);
2403            let result2 = runtime.poll(Counter { id: 1 }).unwrap();
2404            let rev2 = result2.revision;
2405
2406            // Revision should increase (value was recomputed)
2407            // Note: Since output_eq returns true (same value), this might not change
2408            // depending on early cutoff behavior. Let's verify the value is still correct.
2409            assert_eq!(**result2.value.as_ref().unwrap(), 10);
2410
2411            // With early cutoff, revision might stay the same if value didn't change
2412            // This is expected behavior
2413            assert!(rev2 >= rev1);
2414        }
2415
2416        #[test]
2417        fn test_changed_at_returns_none_for_unexecuted_query() {
2418            let runtime = QueryRuntime::new();
2419
2420            // Query has never been executed
2421            let rev = runtime.changed_at::<Counter>(&1);
2422            assert!(rev.is_none());
2423        }
2424
2425        #[test]
2426        fn test_changed_at_returns_revision_after_execution() {
2427            let runtime = QueryRuntime::new();
2428
2429            // Execute the query
2430            let _ = runtime.query(Counter { id: 1 }).unwrap();
2431
2432            // Now changed_at should return Some
2433            let rev = runtime.changed_at::<Counter>(&1);
2434            assert!(rev.is_some());
2435            assert!(rev.unwrap() > 0);
2436        }
2437
2438        #[test]
2439        fn test_changed_at_matches_poll_revision() {
2440            let runtime = QueryRuntime::new();
2441
2442            // Poll the query
2443            let result = runtime.poll(Counter { id: 1 }).unwrap();
2444
2445            // changed_at should match the revision from poll
2446            let rev = runtime.changed_at::<Counter>(&1);
2447            assert_eq!(rev, Some(result.revision));
2448        }
2449
2450        #[test]
2451        fn test_poll_value_access() {
2452            let runtime = QueryRuntime::new();
2453
2454            let result = runtime.poll(Counter { id: 5 }).unwrap();
2455
2456            // Access through Result and Arc
2457            let value: &i32 = result.value.as_ref().unwrap();
2458            assert_eq!(*value, 50);
2459
2460            // Access Arc directly via field after unwrapping Result
2461            let arc: &Arc<i32> = result.value.as_ref().unwrap();
2462            assert_eq!(**arc, 50);
2463        }
2464
2465        #[test]
2466        fn test_subscription_pattern() {
2467            let runtime = QueryRuntime::new();
2468
2469            // Simulate subscription pattern
2470            let mut last_revision: RevisionCounter = 0;
2471            let mut notifications = 0;
2472
2473            // First poll - should notify (new value)
2474            let result = runtime.poll(Counter { id: 1 }).unwrap();
2475            if result.revision > last_revision {
2476                notifications += 1;
2477                last_revision = result.revision;
2478            }
2479
2480            // Second poll - should NOT notify (no change)
2481            let result = runtime.poll(Counter { id: 1 }).unwrap();
2482            if result.revision > last_revision {
2483                notifications += 1;
2484                last_revision = result.revision;
2485            }
2486
2487            // Third poll - should NOT notify (no change)
2488            let result = runtime.poll(Counter { id: 1 }).unwrap();
2489            if result.revision > last_revision {
2490                notifications += 1;
2491                #[allow(unused_assignments)]
2492                {
2493                    last_revision = result.revision;
2494                }
2495            }
2496
2497            // Only the first poll should have triggered a notification
2498            assert_eq!(notifications, 1);
2499        }
2500    }
2501
2502    // Tests for GC APIs
2503    mod gc_tests {
2504        use super::*;
2505        use std::collections::HashSet;
2506        use std::sync::atomic::{AtomicUsize, Ordering};
2507        use std::sync::Mutex;
2508
2509        #[derive(Clone)]
2510        struct Leaf {
2511            id: i32,
2512        }
2513
2514        impl Query for Leaf {
2515            type CacheKey = i32;
2516            type Output = i32;
2517
2518            fn cache_key(&self) -> Self::CacheKey {
2519                self.id
2520            }
2521
2522            fn query(self, _db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
2523                Ok(Arc::new(self.id * 10))
2524            }
2525
2526            fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
2527                old == new
2528            }
2529        }
2530
2531        #[derive(Clone)]
2532        struct Parent {
2533            child_id: i32,
2534        }
2535
2536        impl Query for Parent {
2537            type CacheKey = i32;
2538            type Output = i32;
2539
2540            fn cache_key(&self) -> Self::CacheKey {
2541                self.child_id
2542            }
2543
2544            fn query(self, db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
2545                let child = db.query(Leaf { id: self.child_id })?;
2546                Ok(Arc::new(*child + 1))
2547            }
2548
2549            fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
2550                old == new
2551            }
2552        }
2553
2554        #[test]
2555        fn test_query_keys_returns_all_cached_queries() {
2556            let runtime = QueryRuntime::new();
2557
2558            // Execute some queries
2559            let _ = runtime.query(Leaf { id: 1 }).unwrap();
2560            let _ = runtime.query(Leaf { id: 2 }).unwrap();
2561            let _ = runtime.query(Leaf { id: 3 }).unwrap();
2562
2563            // Get all keys
2564            let keys = runtime.query_keys();
2565
2566            // Should have at least 3 keys (might have more due to sentinels)
2567            assert!(keys.len() >= 3);
2568        }
2569
2570        #[test]
2571        fn test_remove_removes_query() {
2572            let runtime = QueryRuntime::new();
2573
2574            // Execute a query
2575            let _ = runtime.query(Leaf { id: 1 }).unwrap();
2576
2577            // Get the key
2578            let full_key = FullCacheKey::new::<Leaf, _>(&1);
2579
2580            // Query should exist
2581            assert!(runtime.changed_at::<Leaf>(&1).is_some());
2582
2583            // Remove it
2584            assert!(runtime.remove(&full_key));
2585
2586            // Query should no longer exist
2587            assert!(runtime.changed_at::<Leaf>(&1).is_none());
2588        }
2589
2590        #[test]
2591        fn test_remove_if_unused_removes_leaf_query() {
2592            let runtime = QueryRuntime::new();
2593
2594            // Execute a leaf query (no dependents)
2595            let _ = runtime.query(Leaf { id: 1 }).unwrap();
2596
2597            // Should be removable since no other query depends on it
2598            assert!(runtime.remove_query_if_unused::<Leaf>(&1));
2599
2600            // Query should no longer exist
2601            assert!(runtime.changed_at::<Leaf>(&1).is_none());
2602        }
2603
2604        #[test]
2605        fn test_remove_if_unused_does_not_remove_query_with_dependents() {
2606            let runtime = QueryRuntime::new();
2607
2608            // Execute parent query (which depends on Leaf)
2609            let _ = runtime.query(Parent { child_id: 1 }).unwrap();
2610
2611            // Leaf query should not be removable since Parent depends on it
2612            assert!(!runtime.remove_query_if_unused::<Leaf>(&1));
2613
2614            // Leaf query should still exist
2615            assert!(runtime.changed_at::<Leaf>(&1).is_some());
2616
2617            // But Parent should be removable (no dependents)
2618            assert!(runtime.remove_query_if_unused::<Parent>(&1));
2619        }
2620
2621        #[test]
2622        fn test_remove_if_unused_with_full_cache_key() {
2623            let runtime = QueryRuntime::new();
2624
2625            // Execute a leaf query
2626            let _ = runtime.query(Leaf { id: 1 }).unwrap();
2627
2628            let full_key = FullCacheKey::new::<Leaf, _>(&1);
2629
2630            // Should be removable via FullCacheKey
2631            assert!(runtime.remove_if_unused(&full_key));
2632
2633            // Query should no longer exist
2634            assert!(runtime.changed_at::<Leaf>(&1).is_none());
2635        }
2636
2637        // Test tracer receives on_query_key calls
2638        struct GcTracker {
2639            accessed_keys: Mutex<HashSet<String>>,
2640            access_count: AtomicUsize,
2641        }
2642
2643        impl GcTracker {
2644            fn new() -> Self {
2645                Self {
2646                    accessed_keys: Mutex::new(HashSet::new()),
2647                    access_count: AtomicUsize::new(0),
2648                }
2649            }
2650        }
2651
2652        impl Tracer for GcTracker {
2653            fn new_span_id(&self) -> SpanId {
2654                SpanId(1)
2655            }
2656
2657            fn on_query_key(&self, full_key: &FullCacheKey) {
2658                self.accessed_keys
2659                    .lock()
2660                    .unwrap()
2661                    .insert(full_key.debug_repr().to_string());
2662                self.access_count.fetch_add(1, Ordering::Relaxed);
2663            }
2664        }
2665
2666        #[test]
2667        fn test_tracer_receives_on_query_key() {
2668            let tracker = GcTracker::new();
2669            let runtime = QueryRuntime::with_tracer(tracker);
2670
2671            // Execute some queries
2672            let _ = runtime.query(Leaf { id: 1 }).unwrap();
2673            let _ = runtime.query(Leaf { id: 2 }).unwrap();
2674
2675            // Tracer should have received on_query_key calls
2676            let count = runtime.tracer().access_count.load(Ordering::Relaxed);
2677            assert_eq!(count, 2);
2678
2679            // Check that the keys were recorded
2680            let keys = runtime.tracer().accessed_keys.lock().unwrap();
2681            assert!(keys.iter().any(|k| k.contains("Leaf")));
2682        }
2683
2684        #[test]
2685        fn test_tracer_receives_on_query_key_for_cache_hits() {
2686            let tracker = GcTracker::new();
2687            let runtime = QueryRuntime::with_tracer(tracker);
2688
2689            // Execute query twice (second is cache hit)
2690            let _ = runtime.query(Leaf { id: 1 }).unwrap();
2691            let _ = runtime.query(Leaf { id: 1 }).unwrap();
2692
2693            // Tracer should have received on_query_key for both calls
2694            let count = runtime.tracer().access_count.load(Ordering::Relaxed);
2695            assert_eq!(count, 2);
2696        }
2697
2698        #[test]
2699        fn test_gc_workflow() {
2700            let tracker = GcTracker::new();
2701            let runtime = QueryRuntime::with_tracer(tracker);
2702
2703            // Execute some queries
2704            let _ = runtime.query(Leaf { id: 1 }).unwrap();
2705            let _ = runtime.query(Leaf { id: 2 }).unwrap();
2706            let _ = runtime.query(Leaf { id: 3 }).unwrap();
2707
2708            // Simulate GC: remove all queries that are not in use
2709            let mut removed = 0;
2710            for key in runtime.query_keys() {
2711                if runtime.remove_if_unused(&key) {
2712                    removed += 1;
2713                }
2714            }
2715
2716            // All leaf queries should be removable
2717            assert!(removed >= 3);
2718
2719            // Queries should no longer exist
2720            assert!(runtime.changed_at::<Leaf>(&1).is_none());
2721            assert!(runtime.changed_at::<Leaf>(&2).is_none());
2722            assert!(runtime.changed_at::<Leaf>(&3).is_none());
2723        }
2724    }
2725}