query_flow/
runtime.rs

1//! Query runtime and context.
2
3use std::any::{Any, TypeId};
4use std::cell::RefCell;
5use std::ops::Deref;
6use std::rc::Rc;
7use std::sync::Arc;
8
9use whale::{Durability, GetOrInsertResult, RevisionCounter, Runtime as WhaleRuntime};
10
11use crate::asset::{AssetKey, AssetLocator, DurabilityLevel, FullAssetKey, PendingAsset};
12use crate::db::Db;
13use crate::key::FullCacheKey;
14use crate::loading::AssetLoadingState;
15use crate::query::Query;
16use crate::storage::{
17    AssetKeyRegistry, CachedEntry, CachedValue, ErasedLocateResult, LocatorStorage, PendingStorage,
18    QueryRegistry, VerifierStorage,
19};
20use crate::tracer::{
21    ExecutionResult, InvalidationReason, NoopTracer, SpanId, Tracer, TracerAssetKey,
22    TracerAssetState, TracerQueryKey,
23};
24use crate::QueryError;
25
26/// Function type for comparing user errors during early cutoff.
27///
28/// Used by `QueryRuntimeBuilder::error_comparator` to customize how
29/// `QueryError::UserError` values are compared for caching purposes.
30pub type ErrorComparator = fn(&anyhow::Error, &anyhow::Error) -> bool;
31
32/// Number of durability levels (matches whale's default).
33const DURABILITY_LEVELS: usize = 4;
34
35// Thread-local query execution stack for cycle detection.
36thread_local! {
37    static QUERY_STACK: RefCell<Vec<FullCacheKey>> = const { RefCell::new(Vec::new()) };
38
39    /// Current consistency tracker for leaf asset checks.
40    /// Set during query execution, used by all nested locator calls.
41    /// Uses Rc since thread-local is single-threaded.
42    static CONSISTENCY_TRACKER: RefCell<Option<Rc<ConsistencyTracker>>> = const { RefCell::new(None) };
43}
44
45/// Check a leaf asset against the current consistency tracker (if any).
46/// Returns Ok if no tracker is set or if the check passes.
47fn check_leaf_asset_consistency(dep_changed_at: RevisionCounter) -> Result<(), QueryError> {
48    CONSISTENCY_TRACKER.with(|tracker| {
49        if let Some(ref t) = *tracker.borrow() {
50            t.check_leaf_asset(dep_changed_at)
51        } else {
52            Ok(())
53        }
54    })
55}
56
57/// RAII guard that sets the consistency tracker for the current thread.
58struct ConsistencyTrackerGuard {
59    previous: Option<Rc<ConsistencyTracker>>,
60}
61
62impl ConsistencyTrackerGuard {
63    fn new(tracker: Rc<ConsistencyTracker>) -> Self {
64        let previous = CONSISTENCY_TRACKER.with(|t| t.borrow_mut().replace(tracker));
65        Self { previous }
66    }
67}
68
69impl Drop for ConsistencyTrackerGuard {
70    fn drop(&mut self) {
71        CONSISTENCY_TRACKER.with(|t| {
72            *t.borrow_mut() = self.previous.take();
73        });
74    }
75}
76
77/// Check for cycles in the query stack and return error if detected.
78fn check_cycle(key: &FullCacheKey) -> Result<(), QueryError> {
79    let cycle_detected = QUERY_STACK.with(|stack| stack.borrow().iter().any(|k| k == key));
80    if cycle_detected {
81        let path = QUERY_STACK.with(|stack| {
82            let stack = stack.borrow();
83            let mut path: Vec<String> = stack.iter().map(|k| k.debug_repr().to_string()).collect();
84            path.push(key.debug_repr().to_string());
85            path
86        });
87        return Err(QueryError::Cycle { path });
88    }
89    Ok(())
90}
91
92/// RAII guard for pushing/popping from query stack.
93struct StackGuard;
94
95impl StackGuard {
96    fn push(key: FullCacheKey) -> Self {
97        QUERY_STACK.with(|stack| stack.borrow_mut().push(key));
98        StackGuard
99    }
100}
101
102impl Drop for StackGuard {
103    fn drop(&mut self) {
104        QUERY_STACK.with(|stack| {
105            stack.borrow_mut().pop();
106        });
107    }
108}
109
110/// Execution context passed through query execution.
111///
112/// Contains a span ID for tracing correlation.
113#[derive(Clone, Copy)]
114pub struct ExecutionContext {
115    span_id: SpanId,
116}
117
118impl ExecutionContext {
119    /// Create a new execution context with the given span ID.
120    #[inline]
121    pub fn new(span_id: SpanId) -> Self {
122        Self { span_id }
123    }
124
125    /// Get the span ID for this execution context.
126    #[inline]
127    pub fn span_id(&self) -> SpanId {
128        self.span_id
129    }
130}
131
132/// Result of polling a query, containing the value and its revision.
133///
134/// This is returned by [`QueryRuntime::poll`] and provides both the query result
135/// and its change revision, enabling efficient change detection for subscription
136/// patterns.
137///
138/// # Example
139///
140/// ```ignore
141/// let result = runtime.poll(MyQuery::new())?;
142///
143/// // Access the value via Deref
144/// println!("{:?}", *result);
145///
146/// // Check if changed since last poll
147/// if result.revision > last_known_revision {
148///     notify_subscribers(&result.value);
149///     last_known_revision = result.revision;
150/// }
151/// ```
152#[derive(Debug, Clone)]
153pub struct Polled<T> {
154    /// The query result value.
155    pub value: T,
156    /// The revision at which this value was last changed.
157    ///
158    /// Compare this with a previously stored revision to detect changes.
159    pub revision: RevisionCounter,
160}
161
162impl<T: Deref> Deref for Polled<T> {
163    type Target = T::Target;
164
165    fn deref(&self) -> &Self::Target {
166        &self.value
167    }
168}
169
170/// The query runtime manages query execution, caching, and dependency tracking.
171///
172/// This is cheap to clone - all data is behind `Arc`.
173///
174/// # Type Parameter
175///
176/// - `T: Tracer` - The tracer type for observability. Use `NoopTracer` (default)
177///   for zero-cost when tracing is not needed.
178///
179/// # Example
180///
181/// ```ignore
182/// // Without tracing (default)
183/// let runtime = QueryRuntime::new();
184///
185/// // With tracing
186/// let tracer = MyTracer::new();
187/// let runtime = QueryRuntime::with_tracer(tracer);
188///
189/// // Sync query execution
190/// let result = runtime.query(MyQuery { ... })?;
191/// ```
192pub struct QueryRuntime<T: Tracer = NoopTracer> {
193    /// Whale runtime for dependency tracking and cache storage.
194    /// Query outputs and asset values are stored in Node.data as Option<CachedEntry>.
195    whale: WhaleRuntime<FullCacheKey, Option<CachedEntry>, DURABILITY_LEVELS>,
196    /// Registered asset locators
197    locators: Arc<LocatorStorage<T>>,
198    /// Pending asset requests
199    pending: Arc<PendingStorage>,
200    /// Registry for tracking query instances (for list_queries)
201    query_registry: Arc<QueryRegistry>,
202    /// Registry for tracking asset keys (for list_asset_keys)
203    asset_key_registry: Arc<AssetKeyRegistry>,
204    /// Verifiers for re-executing queries (for verify-then-decide pattern)
205    verifiers: Arc<VerifierStorage>,
206    /// Comparator for user errors during early cutoff
207    error_comparator: ErrorComparator,
208    /// Tracer for observability
209    tracer: Arc<T>,
210}
211
212#[test]
213fn test_runtime_send_sync() {
214    fn assert_send_sync<T: Send + Sync>() {}
215    assert_send_sync::<QueryRuntime<NoopTracer>>();
216}
217
218impl Default for QueryRuntime<NoopTracer> {
219    fn default() -> Self {
220        Self::new()
221    }
222}
223
224impl<T: Tracer> Clone for QueryRuntime<T> {
225    fn clone(&self) -> Self {
226        Self {
227            whale: self.whale.clone(),
228            locators: self.locators.clone(),
229            pending: self.pending.clone(),
230            query_registry: self.query_registry.clone(),
231            asset_key_registry: self.asset_key_registry.clone(),
232            verifiers: self.verifiers.clone(),
233            error_comparator: self.error_comparator,
234            tracer: self.tracer.clone(),
235        }
236    }
237}
238
239/// Default error comparator that treats all errors as different.
240///
241/// This is conservative - it always triggers recomputation when an error occurs.
242fn default_error_comparator(_a: &anyhow::Error, _b: &anyhow::Error) -> bool {
243    false
244}
245
246impl<T: Tracer> QueryRuntime<T> {
247    /// Get cached output along with its revision (single atomic access).
248    fn get_cached_with_revision<Q: Query>(
249        &self,
250        key: &FullCacheKey,
251    ) -> Option<(CachedValue<Arc<Q::Output>>, RevisionCounter)> {
252        let node = self.whale.get(key)?;
253        let revision = node.changed_at;
254        let entry = node.data.as_ref()?;
255        let cached = entry.to_cached_value::<Q::Output>()?;
256        Some((cached, revision))
257    }
258
259    /// Get a reference to the tracer.
260    #[inline]
261    pub fn tracer(&self) -> &T {
262        &self.tracer
263    }
264}
265
266impl QueryRuntime<NoopTracer> {
267    /// Create a new query runtime with default settings.
268    pub fn new() -> Self {
269        Self::with_tracer(NoopTracer)
270    }
271
272    /// Create a builder for customizing the runtime.
273    ///
274    /// # Example
275    ///
276    /// ```ignore
277    /// let runtime = QueryRuntime::builder()
278    ///     .error_comparator(|a, b| {
279    ///         // Custom error comparison logic
280    ///         match (a.downcast_ref::<MyError>(), b.downcast_ref::<MyError>()) {
281    ///             (Some(a), Some(b)) => a == b,
282    ///             _ => false,
283    ///         }
284    ///     })
285    ///     .build();
286    /// ```
287    pub fn builder() -> QueryRuntimeBuilder<NoopTracer> {
288        QueryRuntimeBuilder::new()
289    }
290}
291
292impl<T: Tracer> QueryRuntime<T> {
293    /// Create a new query runtime with the specified tracer.
294    pub fn with_tracer(tracer: T) -> Self {
295        QueryRuntimeBuilder::new().tracer(tracer).build()
296    }
297
298    /// Execute a query synchronously.
299    ///
300    /// Returns the cached result if valid, otherwise executes the query.
301    ///
302    /// # Errors
303    ///
304    /// - `QueryError::Suspend` - Query is waiting for async loading
305    /// - `QueryError::Cycle` - Dependency cycle detected
306    pub fn query<Q: Query>(&self, query: Q) -> Result<Arc<Q::Output>, QueryError> {
307        self.query_internal(query)
308            .and_then(|(inner_result, _)| inner_result.map_err(QueryError::UserError))
309    }
310
311    /// Internal implementation shared by query() and poll().
312    ///
313    /// Returns (result, revision) tuple where result is either Ok(output) or Err(user_error).
314    /// System errors (Suspend, Cycle, etc.) are returned as the outer Err.
315    #[allow(clippy::type_complexity)]
316    fn query_internal<Q: Query>(
317        &self,
318        query: Q,
319    ) -> Result<(Result<Arc<Q::Output>, Arc<anyhow::Error>>, RevisionCounter), QueryError> {
320        let key = query.cache_key();
321        let full_key = FullCacheKey::new::<Q, _>(&key);
322
323        // Emit query key event for GC tracking (before other events)
324        self.tracer.on_query_key(&full_key);
325
326        // Create execution context and emit start event
327        let span_id = self.tracer.new_span_id();
328        let exec_ctx = ExecutionContext::new(span_id);
329        let query_key = TracerQueryKey::new(std::any::type_name::<Q>(), full_key.debug_repr());
330
331        self.tracer.on_query_start(span_id, query_key.clone());
332
333        // Check for cycles using thread-local stack
334        let cycle_detected = QUERY_STACK.with(|stack| {
335            let stack = stack.borrow();
336            stack.iter().any(|k| k == &full_key)
337        });
338
339        if cycle_detected {
340            let path = QUERY_STACK.with(|stack| {
341                let stack = stack.borrow();
342                let mut path: Vec<String> =
343                    stack.iter().map(|k| k.debug_repr().to_string()).collect();
344                path.push(full_key.debug_repr().to_string());
345                path
346            });
347
348            self.tracer.on_cycle_detected(
349                path.iter()
350                    .map(|s| TracerQueryKey::new("", s.clone()))
351                    .collect(),
352            );
353            self.tracer
354                .on_query_end(span_id, query_key.clone(), ExecutionResult::CycleDetected);
355
356            return Err(QueryError::Cycle { path });
357        }
358
359        // Check if cached and valid (with verify-then-decide pattern)
360        let current_rev = self.whale.current_revision();
361
362        // Fast path: already verified at current revision
363        if self.whale.is_verified_at(&full_key, &current_rev) {
364            // Single atomic access to get both cached value and revision
365            if let Some((cached, revision)) = self.get_cached_with_revision::<Q>(&full_key) {
366                self.tracer.on_cache_check(span_id, query_key.clone(), true);
367                self.tracer
368                    .on_query_end(span_id, query_key.clone(), ExecutionResult::CacheHit);
369
370                return match cached {
371                    CachedValue::Ok(output) => Ok((Ok(output), revision)),
372                    CachedValue::UserError(err) => Ok((Err(err), revision)),
373                };
374            }
375        }
376
377        // Check shallow validity (deps' changed_at ok) and try verify-then-decide
378        if self.whale.is_valid(&full_key) {
379            // Single atomic access to get both cached value and revision
380            if let Some((cached, revision)) = self.get_cached_with_revision::<Q>(&full_key) {
381                // Shallow valid but not verified - verify deps first
382                let mut deps_verified = true;
383                if let Some(deps) = self.whale.get_dependency_ids(&full_key) {
384                    for dep in deps {
385                        if let Some(verifier) = self.verifiers.get(&dep) {
386                            // Re-run query/asset to verify it (triggers recursive verification)
387                            // For assets, this re-accesses the asset which may re-run the locator
388                            if verifier.verify(self as &dyn std::any::Any).is_err() {
389                                deps_verified = false;
390                                break;
391                            }
392                        }
393                        // Note: deps without verifiers are assumed valid (they're verified
394                        // by the final is_valid check if their changed_at increased)
395                    }
396                }
397
398                // Re-check validity after deps are verified
399                if deps_verified && self.whale.is_valid(&full_key) {
400                    // Deps didn't change their changed_at, mark as verified and use cached
401                    self.whale.mark_verified(&full_key, &current_rev);
402
403                    self.tracer.on_cache_check(span_id, query_key.clone(), true);
404                    self.tracer
405                        .on_query_end(span_id, query_key.clone(), ExecutionResult::CacheHit);
406
407                    return match cached {
408                        CachedValue::Ok(output) => Ok((Ok(output), revision)),
409                        CachedValue::UserError(err) => Ok((Err(err), revision)),
410                    };
411                }
412                // A dep's changed_at increased, fall through to execute
413            }
414        }
415
416        self.tracer
417            .on_cache_check(span_id, query_key.clone(), false);
418
419        // Execute the query with cycle tracking
420        let _guard = StackGuard::push(full_key.clone());
421        let result = self.execute_query::<Q>(&query, &full_key, exec_ctx);
422        drop(_guard);
423
424        // Emit end event
425        let exec_result = match &result {
426            Ok((_, true, _)) => ExecutionResult::Changed,
427            Ok((_, false, _)) => ExecutionResult::Unchanged,
428            Err(QueryError::Suspend { .. }) => ExecutionResult::Suspended,
429            Err(QueryError::Cycle { .. }) => ExecutionResult::CycleDetected,
430            Err(e) => ExecutionResult::Error {
431                message: format!("{:?}", e),
432            },
433        };
434        self.tracer
435            .on_query_end(span_id, query_key.clone(), exec_result);
436
437        result.map(|(inner_result, _, revision)| (inner_result, revision))
438    }
439
440    /// Execute a query, caching the result if appropriate.
441    ///
442    /// Returns (result, output_changed, revision) tuple.
443    /// - `result`: Ok(output) for success, Err(user_error) for user errors
444    /// - System errors (Suspend, Cycle, etc.) are returned as outer Err
445    #[allow(clippy::type_complexity)]
446    fn execute_query<Q: Query>(
447        &self,
448        query: &Q,
449        full_key: &FullCacheKey,
450        exec_ctx: ExecutionContext,
451    ) -> Result<
452        (
453            Result<Arc<Q::Output>, Arc<anyhow::Error>>,
454            bool,
455            RevisionCounter,
456        ),
457        QueryError,
458    > {
459        // Capture current global revision at query start for consistency checking
460        let start_revision = self.whale.current_revision().get(Durability::volatile());
461
462        // Create consistency tracker for this query execution
463        let tracker = Rc::new(ConsistencyTracker::new(start_revision));
464
465        // Set thread-local tracker for nested locator calls
466        let _tracker_guard = ConsistencyTrackerGuard::new(tracker);
467
468        // Create context for this query execution
469        let ctx = QueryContext {
470            runtime: self,
471            current_key: full_key.clone(),
472            parent_query_type: std::any::type_name::<Q>(),
473            exec_ctx,
474            deps: RefCell::new(Vec::new()),
475        };
476
477        // Execute the query (clone because query() takes ownership)
478        let result = query.clone().query(&ctx);
479
480        // Get collected dependencies
481        let deps: Vec<FullCacheKey> = ctx.deps.borrow().clone();
482
483        // Query durability defaults to stable - Whale will automatically reduce
484        // the effective durability to min(requested, min(dep_durabilities)).
485        // A pure query with no dependencies remains stable.
486        // A query depending on volatile assets becomes volatile.
487        let durability = Durability::stable();
488
489        match result {
490            Ok(output) => {
491                // Check if output changed (for early cutoff)
492                // existing_revision is Some only when output is unchanged (can reuse revision)
493                let existing_revision = if let Some((CachedValue::Ok(old), rev)) =
494                    self.get_cached_with_revision::<Q>(full_key)
495                {
496                    if Q::output_eq(&*old, &*output) {
497                        Some(rev) // Same output - reuse revision
498                    } else {
499                        None // Different output
500                    }
501                } else {
502                    None // No previous Ok value
503                };
504                let output_changed = existing_revision.is_none();
505
506                // Emit early cutoff check event
507                self.tracer.on_early_cutoff_check(
508                    exec_ctx.span_id(),
509                    TracerQueryKey::new(std::any::type_name::<Q>(), full_key.debug_repr()),
510                    output_changed,
511                );
512
513                // Update whale with cached entry (atomic update of value + dependency state)
514                let entry = CachedEntry::Ok(output.clone() as Arc<dyn std::any::Any + Send + Sync>);
515                let revision = if let Some(existing_rev) = existing_revision {
516                    // confirm_unchanged doesn't change changed_at, use existing
517                    let _ = self.whale.confirm_unchanged(full_key, deps);
518                    existing_rev
519                } else {
520                    // Use new_rev from register result
521                    match self
522                        .whale
523                        .register(full_key.clone(), Some(entry), durability, deps)
524                    {
525                        Ok(result) => result.new_rev,
526                        Err(missing) => {
527                            return Err(QueryError::DependenciesRemoved {
528                                missing_keys: missing,
529                            })
530                        }
531                    }
532                };
533
534                // Register query in registry for list_queries
535                let is_new_query = self.query_registry.register(query);
536                if is_new_query {
537                    let sentinel = FullCacheKey::query_set_sentinel::<Q>();
538                    let _ = self
539                        .whale
540                        .register(sentinel, None, Durability::stable(), vec![]);
541                }
542
543                // Store verifier for this query (for verify-then-decide pattern)
544                self.verifiers
545                    .insert::<Q, T>(full_key.clone(), query.clone());
546
547                Ok((Ok(output), output_changed, revision))
548            }
549            Err(QueryError::UserError(err)) => {
550                // Check if error changed (for early cutoff)
551                // existing_revision is Some only when error is unchanged (can reuse revision)
552                let existing_revision = if let Some((CachedValue::UserError(old_err), rev)) =
553                    self.get_cached_with_revision::<Q>(full_key)
554                {
555                    if (self.error_comparator)(old_err.as_ref(), err.as_ref()) {
556                        Some(rev) // Same error - reuse revision
557                    } else {
558                        None // Different error
559                    }
560                } else {
561                    None // No previous UserError
562                };
563                let output_changed = existing_revision.is_none();
564
565                // Emit early cutoff check event
566                self.tracer.on_early_cutoff_check(
567                    exec_ctx.span_id(),
568                    TracerQueryKey::new(std::any::type_name::<Q>(), full_key.debug_repr()),
569                    output_changed,
570                );
571
572                // Update whale with cached error (atomic update of value + dependency state)
573                let entry = CachedEntry::UserError(err.clone());
574                let revision = if let Some(existing_rev) = existing_revision {
575                    // confirm_unchanged doesn't change changed_at, use existing
576                    let _ = self.whale.confirm_unchanged(full_key, deps);
577                    existing_rev
578                } else {
579                    // Use new_rev from register result
580                    match self
581                        .whale
582                        .register(full_key.clone(), Some(entry), durability, deps)
583                    {
584                        Ok(result) => result.new_rev,
585                        Err(missing) => {
586                            return Err(QueryError::DependenciesRemoved {
587                                missing_keys: missing,
588                            })
589                        }
590                    }
591                };
592
593                // Register query in registry for list_queries
594                let is_new_query = self.query_registry.register(query);
595                if is_new_query {
596                    let sentinel = FullCacheKey::query_set_sentinel::<Q>();
597                    let _ = self
598                        .whale
599                        .register(sentinel, None, Durability::stable(), vec![]);
600                }
601
602                // Store verifier for this query (for verify-then-decide pattern)
603                self.verifiers
604                    .insert::<Q, T>(full_key.clone(), query.clone());
605
606                Ok((Err(err), output_changed, revision))
607            }
608            Err(e) => {
609                // System errors (Suspend, Cycle, Cancelled, MissingDependency) are not cached
610                Err(e)
611            }
612        }
613    }
614
615    /// Invalidate a query, forcing recomputation on next access.
616    ///
617    /// This also invalidates any queries that depend on this one.
618    pub fn invalidate<Q: Query>(&self, key: &Q::CacheKey) {
619        let full_key = FullCacheKey::new::<Q, _>(key);
620
621        self.tracer.on_query_invalidated(
622            TracerQueryKey::new(std::any::type_name::<Q>(), full_key.debug_repr()),
623            InvalidationReason::ManualInvalidation,
624        );
625
626        // Update whale to invalidate dependents (register with None to clear cached value)
627        // Use stable durability to increment all revision counters, ensuring queries
628        // at any durability level will see this as a change.
629        let _ = self
630            .whale
631            .register(full_key, None, Durability::stable(), vec![]);
632    }
633
634    /// Remove a query from the cache entirely, freeing memory.
635    ///
636    /// Use this for GC when a query is no longer needed.
637    /// Unlike `invalidate`, this removes all traces of the query from storage.
638    /// The query will be recomputed from scratch on next access.
639    ///
640    /// This also invalidates any queries that depend on this one.
641    pub fn remove_query<Q: Query>(&self, key: &Q::CacheKey) {
642        let full_key = FullCacheKey::new::<Q, _>(key);
643
644        self.tracer.on_query_invalidated(
645            TracerQueryKey::new(std::any::type_name::<Q>(), full_key.debug_repr()),
646            InvalidationReason::ManualInvalidation,
647        );
648
649        // Remove verifier if exists
650        self.verifiers.remove(&full_key);
651
652        // Remove from whale storage (this also handles dependent invalidation)
653        self.whale.remove(&full_key);
654
655        // Remove from registry and update sentinel for list_queries
656        if self.query_registry.remove::<Q>(key) {
657            let sentinel = FullCacheKey::query_set_sentinel::<Q>();
658            let _ = self
659                .whale
660                .register(sentinel, None, Durability::stable(), vec![]);
661        }
662    }
663
664    /// Clear all cached values by removing all nodes from whale.
665    ///
666    /// Note: This is a relatively expensive operation as it iterates through all keys.
667    pub fn clear_cache(&self) {
668        let keys = self.whale.keys();
669        for key in keys {
670            self.whale.remove(&key);
671        }
672    }
673
674    /// Poll a query, returning both the result and its change revision.
675    ///
676    /// This is useful for implementing subscription patterns where you need to
677    /// detect changes efficiently. Compare the returned `revision` with a
678    /// previously stored value to determine if the query result has changed.
679    ///
680    /// The returned `Polled` contains a `Result<Arc<Q::Output>, Arc<anyhow::Error>>`
681    /// as its value, allowing you to track revision changes for both success and
682    /// user error cases.
683    ///
684    /// # Example
685    ///
686    /// ```ignore
687    /// struct Subscription<Q: Query> {
688    ///     query: Q,
689    ///     last_revision: RevisionCounter,
690    ///     tx: Sender<Result<Arc<Q::Output>, Arc<anyhow::Error>>>,
691    /// }
692    ///
693    /// // Polling loop
694    /// for sub in &mut subscriptions {
695    ///     let result = runtime.poll(sub.query.clone())?;
696    ///     if result.revision > sub.last_revision {
697    ///         sub.tx.send(result.value.clone())?;
698    ///         sub.last_revision = result.revision;
699    ///     }
700    /// }
701    /// ```
702    ///
703    /// # Errors
704    ///
705    /// Returns `Err` only for system errors (Suspend, Cycle, etc.).
706    /// User errors are returned as `Ok(Polled { value: Err(error), ... })`.
707    #[allow(clippy::type_complexity)]
708    pub fn poll<Q: Query>(
709        &self,
710        query: Q,
711    ) -> Result<Polled<Result<Arc<Q::Output>, Arc<anyhow::Error>>>, QueryError> {
712        let (value, revision) = self.query_internal(query)?;
713        Ok(Polled { value, revision })
714    }
715
716    /// Get the change revision of a query without executing it.
717    ///
718    /// Returns `None` if the query has never been executed.
719    ///
720    /// This is useful for checking if a query has changed since the last poll
721    /// without the cost of executing the query.
722    ///
723    /// # Example
724    ///
725    /// ```ignore
726    /// // Check if query has changed before deciding to poll
727    /// if let Some(rev) = runtime.changed_at::<MyQuery>(&key) {
728    ///     if rev > last_known_revision {
729    ///         let result = runtime.query(MyQuery::new(key))?;
730    ///         // Process result...
731    ///     }
732    /// }
733    /// ```
734    pub fn changed_at<Q: Query>(&self, key: &Q::CacheKey) -> Option<RevisionCounter> {
735        let full_key = FullCacheKey::new::<Q, _>(key);
736        self.whale.get(&full_key).map(|node| node.changed_at)
737    }
738}
739
740// ============================================================================
741// GC (Garbage Collection) API
742// ============================================================================
743
744impl<T: Tracer> QueryRuntime<T> {
745    /// Get all query keys currently in the cache.
746    ///
747    /// This is useful for implementing custom garbage collection strategies.
748    /// Use this in combination with [`Tracer::on_query_key`] to track access
749    /// times and implement LRU, TTL, or other GC algorithms externally.
750    ///
751    /// # Example
752    ///
753    /// ```ignore
754    /// // Collect all keys that haven't been accessed recently
755    /// let stale_keys: Vec<_> = runtime.query_keys()
756    ///     .filter(|key| tracker.is_stale(key))
757    ///     .collect();
758    ///
759    /// // Remove stale queries that have no dependents
760    /// for key in stale_keys {
761    ///     runtime.remove_if_unused(&key);
762    /// }
763    /// ```
764    pub fn query_keys(&self) -> Vec<FullCacheKey> {
765        self.whale.keys()
766    }
767
768    /// Remove a query if it has no dependents.
769    ///
770    /// Returns `true` if the query was removed, `false` if it has dependents
771    /// or doesn't exist. This is the safe way to remove queries during GC,
772    /// as it won't break queries that depend on this one.
773    ///
774    /// # Example
775    ///
776    /// ```ignore
777    /// let key = FullCacheKey::new::<MyQuery, _>(&cache_key);
778    /// if runtime.remove_query_if_unused::<MyQuery>(&cache_key) {
779    ///     println!("Query removed");
780    /// } else {
781    ///     println!("Query has dependents, not removed");
782    /// }
783    /// ```
784    pub fn remove_query_if_unused<Q: Query>(&self, key: &Q::CacheKey) -> bool {
785        let full_key = FullCacheKey::new::<Q, _>(key);
786        self.remove_if_unused(&full_key)
787    }
788
789    /// Remove a query by its [`FullCacheKey`].
790    ///
791    /// This is the type-erased version of [`remove_query`](Self::remove_query).
792    /// Use this when you have a `FullCacheKey` from [`query_keys`](Self::query_keys)
793    /// or [`Tracer::on_query_key`].
794    ///
795    /// Returns `true` if the query was removed, `false` if it doesn't exist.
796    ///
797    /// # Warning
798    ///
799    /// This forcibly removes the query even if other queries depend on it.
800    /// Dependent queries will be recomputed on next access. For safe GC,
801    /// use [`remove_if_unused`](Self::remove_if_unused) instead.
802    pub fn remove(&self, key: &FullCacheKey) -> bool {
803        // Remove verifier if exists
804        self.verifiers.remove(key);
805
806        // Remove from whale storage
807        self.whale.remove(key).is_some()
808    }
809
810    /// Remove a query by its [`FullCacheKey`] if it has no dependents.
811    ///
812    /// This is the type-erased version of [`remove_query_if_unused`](Self::remove_query_if_unused).
813    /// Use this when you have a `FullCacheKey` from [`query_keys`](Self::query_keys)
814    /// or [`Tracer::on_query_key`].
815    ///
816    /// Returns `true` if the query was removed, `false` if it has dependents
817    /// or doesn't exist.
818    ///
819    /// # Example
820    ///
821    /// ```ignore
822    /// // Implement LRU GC
823    /// for key in runtime.query_keys() {
824    ///     if tracker.is_expired(&key) {
825    ///         runtime.remove_if_unused(&key);
826    ///     }
827    /// }
828    /// ```
829    pub fn remove_if_unused(&self, key: &FullCacheKey) -> bool {
830        if self.whale.remove_if_unused(key.clone()).is_some() {
831            // Successfully removed - clean up verifier
832            self.verifiers.remove(key);
833            true
834        } else {
835            false
836        }
837    }
838}
839
840// ============================================================================
841// Builder
842// ============================================================================
843
844/// Builder for [`QueryRuntime`] with customizable settings.
845///
846/// # Example
847///
848/// ```ignore
849/// let runtime = QueryRuntime::builder()
850///     .error_comparator(|a, b| {
851///         // Treat all errors of the same type as equal
852///         a.downcast_ref::<std::io::Error>().is_some()
853///             == b.downcast_ref::<std::io::Error>().is_some()
854///     })
855///     .build();
856/// ```
857pub struct QueryRuntimeBuilder<T: Tracer = NoopTracer> {
858    error_comparator: ErrorComparator,
859    tracer: T,
860}
861
862impl Default for QueryRuntimeBuilder<NoopTracer> {
863    fn default() -> Self {
864        Self::new()
865    }
866}
867
868impl QueryRuntimeBuilder<NoopTracer> {
869    /// Create a new builder with default settings.
870    pub fn new() -> Self {
871        Self {
872            error_comparator: default_error_comparator,
873            tracer: NoopTracer,
874        }
875    }
876}
877
878impl<T: Tracer> QueryRuntimeBuilder<T> {
879    /// Set the error comparator function for early cutoff optimization.
880    ///
881    /// When a query returns `QueryError::UserError`, this function is used
882    /// to compare it with the previously cached error. If they are equal,
883    /// downstream queries can skip recomputation (early cutoff).
884    ///
885    /// The default comparator returns `false` for all errors, meaning errors
886    /// are always considered different (conservative, always recomputes).
887    ///
888    /// # Example
889    ///
890    /// ```ignore
891    /// // Treat errors as equal if they have the same display message
892    /// let runtime = QueryRuntime::builder()
893    ///     .error_comparator(|a, b| a.to_string() == b.to_string())
894    ///     .build();
895    /// ```
896    pub fn error_comparator(mut self, f: ErrorComparator) -> Self {
897        self.error_comparator = f;
898        self
899    }
900
901    /// Set the tracer for observability.
902    pub fn tracer<U: Tracer>(self, tracer: U) -> QueryRuntimeBuilder<U> {
903        QueryRuntimeBuilder {
904            error_comparator: self.error_comparator,
905            tracer,
906        }
907    }
908
909    /// Build the runtime with the configured settings.
910    pub fn build(self) -> QueryRuntime<T> {
911        QueryRuntime {
912            whale: WhaleRuntime::new(),
913            locators: Arc::new(LocatorStorage::new()),
914            pending: Arc::new(PendingStorage::new()),
915            query_registry: Arc::new(QueryRegistry::new()),
916            asset_key_registry: Arc::new(AssetKeyRegistry::new()),
917            verifiers: Arc::new(VerifierStorage::new()),
918            error_comparator: self.error_comparator,
919            tracer: Arc::new(self.tracer),
920        }
921    }
922}
923
924// ============================================================================
925// Asset API
926// ============================================================================
927
928impl<T: Tracer> QueryRuntime<T> {
929    /// Register an asset locator for a specific asset key type.
930    ///
931    /// Only one locator can be registered per key type. Later registrations
932    /// replace earlier ones.
933    ///
934    /// # Example
935    ///
936    /// ```ignore
937    /// let runtime = QueryRuntime::new();
938    /// runtime.register_asset_locator(FileSystemLocator::new("/assets"));
939    /// ```
940    pub fn register_asset_locator<K, L>(&self, locator: L)
941    where
942        K: AssetKey,
943        L: AssetLocator<K>,
944    {
945        self.locators.insert::<K, L>(locator);
946    }
947
948    /// Get an iterator over pending asset requests.
949    ///
950    /// Returns assets that have been requested but not yet resolved.
951    /// The user should fetch these externally and call `resolve_asset()`.
952    ///
953    /// # Example
954    ///
955    /// ```ignore
956    /// for pending in runtime.pending_assets() {
957    ///     if let Some(path) = pending.key::<FilePath>() {
958    ///         let content = fetch_file(path);
959    ///         runtime.resolve_asset(path.clone(), content);
960    ///     }
961    /// }
962    /// ```
963    pub fn pending_assets(&self) -> Vec<PendingAsset> {
964        self.pending.get_all()
965    }
966
967    /// Get pending assets filtered by key type.
968    pub fn pending_assets_of<K: AssetKey>(&self) -> Vec<K> {
969        self.pending.get_of_type::<K>()
970    }
971
972    /// Check if there are any pending assets.
973    pub fn has_pending_assets(&self) -> bool {
974        !self.pending.is_empty()
975    }
976
977    /// Resolve an asset with its loaded value.
978    ///
979    /// This marks the asset as ready and invalidates any queries that
980    /// depend on it (if the value changed), triggering recomputation on next access.
981    ///
982    /// This method is idempotent - resolving with the same value (via `asset_eq`)
983    /// will not trigger downstream recomputation.
984    ///
985    /// # Arguments
986    ///
987    /// * `key` - The asset key identifying this resource
988    /// * `value` - The loaded asset value
989    /// * `durability` - How frequently this asset is expected to change
990    ///
991    /// # Example
992    ///
993    /// ```ignore
994    /// let content = std::fs::read_to_string(&path)?;
995    /// runtime.resolve_asset(FilePath(path), content, DurabilityLevel::Volatile);
996    /// ```
997    pub fn resolve_asset<K: AssetKey>(&self, key: K, value: K::Asset, durability: DurabilityLevel) {
998        self.resolve_asset_internal(key, value, durability);
999    }
1000
1001    /// Resolve an asset with an error.
1002    ///
1003    /// This marks the asset as errored and caches the error. Queries depending
1004    /// on this asset will receive `Err(QueryError::UserError(...))`.
1005    ///
1006    /// Use this when async loading fails (e.g., network error, file not found,
1007    /// access denied).
1008    ///
1009    /// # Arguments
1010    ///
1011    /// * `key` - The asset key identifying this resource
1012    /// * `error` - The error to cache (will be wrapped in `Arc`)
1013    /// * `durability` - How frequently this error state is expected to change
1014    ///
1015    /// # Example
1016    ///
1017    /// ```ignore
1018    /// match fetch_file(&path) {
1019    ///     Ok(content) => runtime.resolve_asset(FilePath(path), content, DurabilityLevel::Volatile),
1020    ///     Err(e) => runtime.resolve_asset_error(FilePath(path), e, DurabilityLevel::Volatile),
1021    /// }
1022    /// ```
1023    pub fn resolve_asset_error<K: AssetKey>(
1024        &self,
1025        key: K,
1026        error: impl Into<anyhow::Error>,
1027        durability: DurabilityLevel,
1028    ) {
1029        let full_asset_key = FullAssetKey::new(&key);
1030        let full_cache_key = FullCacheKey::from_asset_key(&full_asset_key);
1031
1032        // Remove from pending BEFORE registering the error
1033        self.pending.remove(&full_asset_key);
1034
1035        // Prepare the error entry
1036        let error_arc = Arc::new(error.into());
1037        let entry = CachedEntry::AssetError(error_arc.clone());
1038        let durability =
1039            Durability::new(durability.as_u8() as usize).unwrap_or(Durability::volatile());
1040
1041        // Atomic compare-and-update (errors are always considered changed for now)
1042        let result = self
1043            .whale
1044            .update_with_compare(
1045                full_cache_key,
1046                Some(entry),
1047                |old_data, _new_data| {
1048                    // Compare old and new errors using error_comparator
1049                    match old_data.and_then(|d| d.as_ref()) {
1050                        Some(CachedEntry::AssetError(old_err)) => {
1051                            !(self.error_comparator)(old_err.as_ref(), error_arc.as_ref())
1052                        }
1053                        _ => true, // Loading, Ready, or not present -> changed
1054                    }
1055                },
1056                durability,
1057                vec![],
1058            )
1059            .expect("update_with_compare with no dependencies cannot fail");
1060
1061        // Emit asset resolved event (with changed status)
1062        self.tracer.on_asset_resolved(
1063            TracerAssetKey::new(std::any::type_name::<K>(), format!("{:?}", key)),
1064            result.changed,
1065        );
1066
1067        // Register asset key in registry for list_asset_keys
1068        let is_new_asset = self.asset_key_registry.register(&key);
1069        if is_new_asset {
1070            // Update sentinel to invalidate list_asset_keys dependents
1071            let sentinel = FullCacheKey::asset_key_set_sentinel::<K>();
1072            let _ = self
1073                .whale
1074                .register(sentinel, None, Durability::stable(), vec![]);
1075        }
1076    }
1077
1078    fn resolve_asset_internal<K: AssetKey>(
1079        &self,
1080        key: K,
1081        value: K::Asset,
1082        durability_level: DurabilityLevel,
1083    ) {
1084        let full_asset_key = FullAssetKey::new(&key);
1085        let full_cache_key = FullCacheKey::from_asset_key(&full_asset_key);
1086
1087        // Remove from pending BEFORE registering the value
1088        self.pending.remove(&full_asset_key);
1089
1090        // Prepare the new entry
1091        let value_arc: Arc<K::Asset> = Arc::new(value);
1092        let entry = CachedEntry::AssetReady(value_arc.clone() as Arc<dyn Any + Send + Sync>);
1093        let durability =
1094            Durability::new(durability_level.as_u8() as usize).unwrap_or(Durability::volatile());
1095
1096        // Atomic compare-and-update
1097        let result = self
1098            .whale
1099            .update_with_compare(
1100                full_cache_key,
1101                Some(entry),
1102                |old_data, _new_data| {
1103                    // Compare old and new values
1104                    match old_data.and_then(|d| d.as_ref()) {
1105                        Some(CachedEntry::AssetReady(old_arc)) => {
1106                            match old_arc.clone().downcast::<K::Asset>() {
1107                                Ok(old_value) => !K::asset_eq(&old_value, &value_arc),
1108                                Err(_) => true, // Type mismatch, treat as changed
1109                            }
1110                        }
1111                        _ => true, // Loading, NotFound, or not present -> changed
1112                    }
1113                },
1114                durability,
1115                vec![],
1116            )
1117            .expect("update_with_compare with no dependencies cannot fail");
1118
1119        // Emit asset resolved event
1120        self.tracer.on_asset_resolved(
1121            TracerAssetKey::new(std::any::type_name::<K>(), format!("{:?}", key)),
1122            result.changed,
1123        );
1124
1125        // Register asset key in registry for list_asset_keys
1126        let is_new_asset = self.asset_key_registry.register(&key);
1127        if is_new_asset {
1128            // Update sentinel to invalidate list_asset_keys dependents
1129            let sentinel = FullCacheKey::asset_key_set_sentinel::<K>();
1130            let _ = self
1131                .whale
1132                .register(sentinel, None, Durability::stable(), vec![]);
1133        }
1134    }
1135
1136    /// Invalidate an asset, forcing queries to re-request it.
1137    ///
1138    /// The asset will be marked as loading and added to pending assets.
1139    /// Dependent queries will suspend until the asset is resolved again.
1140    ///
1141    /// # Example
1142    ///
1143    /// ```ignore
1144    /// // File was modified externally
1145    /// runtime.invalidate_asset(&FilePath("config.json".into()));
1146    /// // Queries depending on this asset will now suspend
1147    /// // User should fetch the new value and call resolve_asset
1148    /// ```
1149    pub fn invalidate_asset<K: AssetKey>(&self, key: &K) {
1150        let full_asset_key = FullAssetKey::new(key);
1151        let full_cache_key = FullCacheKey::from_asset_key(&full_asset_key);
1152
1153        // Emit asset invalidated event
1154        self.tracer.on_asset_invalidated(TracerAssetKey::new(
1155            std::any::type_name::<K>(),
1156            format!("{:?}", key),
1157        ));
1158
1159        // Add to pending FIRST (before clearing whale state)
1160        // This ensures: readers see either old value, or Loading+pending
1161        self.pending.insert::<K>(full_asset_key, key.clone());
1162
1163        // Atomic: clear cached value + invalidate dependents
1164        // Using None for data means "needs to be loaded"
1165        // Use stable durability to ensure queries at any durability level see the change.
1166        let _ = self
1167            .whale
1168            .register(full_cache_key, None, Durability::stable(), vec![]);
1169    }
1170
1171    /// Remove an asset from the cache entirely.
1172    ///
1173    /// Unlike `invalidate_asset`, this removes all traces of the asset.
1174    /// Dependent queries will go through the locator again on next access.
1175    pub fn remove_asset<K: AssetKey>(&self, key: &K) {
1176        let full_asset_key = FullAssetKey::new(key);
1177        let full_cache_key = FullCacheKey::from_asset_key(&full_asset_key);
1178
1179        // Remove from pending first
1180        self.pending.remove(&full_asset_key);
1181
1182        // Remove from whale (this also cleans up dependency edges)
1183        // whale.remove() invalidates dependents before removing
1184        self.whale.remove(&full_cache_key);
1185
1186        // Remove from registry and update sentinel for list_asset_keys
1187        if self.asset_key_registry.remove::<K>(key) {
1188            let sentinel = FullCacheKey::asset_key_set_sentinel::<K>();
1189            let _ = self
1190                .whale
1191                .register(sentinel, None, Durability::stable(), vec![]);
1192        }
1193    }
1194
1195    /// Get an asset by key without tracking dependencies.
1196    ///
1197    /// Unlike `QueryContext::asset()`, this method does NOT register the caller
1198    /// as a dependent of the asset. Use this for direct asset access outside
1199    /// of query execution.
1200    ///
1201    /// # Returns
1202    ///
1203    /// - `Ok(AssetLoadingState::ready(...))` - Asset is loaded and ready
1204    /// - `Ok(AssetLoadingState::loading(...))` - Asset is still loading (added to pending)
1205    /// - `Err(QueryError::MissingDependency)` - Asset was not found
1206    pub fn get_asset<K: AssetKey>(&self, key: K) -> Result<AssetLoadingState<K>, QueryError> {
1207        self.get_asset_internal(key)
1208    }
1209
1210    /// Internal: Get asset state and its changed_at revision atomically.
1211    ///
1212    /// This is the "direct" version called from QueryRuntime::asset (no dependency tracking).
1213    /// For calls from QueryContext::asset, use `get_asset_with_revision_ctx`.
1214    ///
1215    /// Returns (AssetLoadingState, changed_at) where changed_at is from the same
1216    /// whale node that provided the asset value.
1217    fn get_asset_with_revision<K: AssetKey>(
1218        &self,
1219        key: K,
1220    ) -> Result<(AssetLoadingState<K>, RevisionCounter), QueryError> {
1221        let full_asset_key = FullAssetKey::new(&key);
1222        let full_cache_key = FullCacheKey::from_asset_key(&full_asset_key);
1223
1224        // Helper to emit AssetRequested event
1225        let emit_requested = |tracer: &T, key: &K, state: TracerAssetState| {
1226            tracer.on_asset_requested(
1227                TracerAssetKey::new(std::any::type_name::<K>(), format!("{:?}", key)),
1228                state,
1229            );
1230        };
1231
1232        // Check whale cache first (single atomic read)
1233        if let Some(node) = self.whale.get(&full_cache_key) {
1234            let changed_at = node.changed_at;
1235            // Check if valid at current revision (shallow check)
1236            if self.whale.is_valid(&full_cache_key) {
1237                // Verify dependencies recursively (like query path does)
1238                let mut deps_verified = true;
1239                if let Some(deps) = self.whale.get_dependency_ids(&full_cache_key) {
1240                    for dep in deps {
1241                        if let Some(verifier) = self.verifiers.get(&dep) {
1242                            // Re-run query/asset to verify it (triggers recursive verification)
1243                            if verifier.verify(self as &dyn std::any::Any).is_err() {
1244                                deps_verified = false;
1245                                break;
1246                            }
1247                        }
1248                    }
1249                }
1250
1251                // Re-check validity after deps are verified
1252                if deps_verified && self.whale.is_valid(&full_cache_key) {
1253                    // For cached entries, check consistency for leaf assets (no locator deps).
1254                    // This detects if resolve_asset/resolve_asset_error was called during query execution.
1255                    let has_locator_deps = self
1256                        .whale
1257                        .get_dependency_ids(&full_cache_key)
1258                        .is_some_and(|deps| !deps.is_empty());
1259
1260                    match &node.data {
1261                        Some(CachedEntry::AssetReady(arc)) => {
1262                            // Check consistency for cached leaf assets
1263                            if !has_locator_deps {
1264                                check_leaf_asset_consistency(changed_at)?;
1265                            }
1266                            emit_requested(&self.tracer, &key, TracerAssetState::Ready);
1267                            match arc.clone().downcast::<K::Asset>() {
1268                                Ok(value) => {
1269                                    return Ok((AssetLoadingState::ready(key, value), changed_at))
1270                                }
1271                                Err(_) => {
1272                                    return Err(QueryError::MissingDependency {
1273                                        description: format!("Asset type mismatch: {:?}", key),
1274                                    })
1275                                }
1276                            }
1277                        }
1278                        Some(CachedEntry::AssetError(err)) => {
1279                            // Check consistency for cached leaf errors
1280                            if !has_locator_deps {
1281                                check_leaf_asset_consistency(changed_at)?;
1282                            }
1283                            emit_requested(&self.tracer, &key, TracerAssetState::NotFound);
1284                            return Err(QueryError::UserError(err.clone()));
1285                        }
1286                        None => {
1287                            // Loading state - no value to be inconsistent with
1288                            emit_requested(&self.tracer, &key, TracerAssetState::Loading);
1289                            return Ok((AssetLoadingState::loading(key), changed_at));
1290                        }
1291                        _ => {
1292                            // Query-related entries (Ok, UserError) shouldn't be here
1293                            // Fall through to locator
1294                        }
1295                    }
1296                }
1297            }
1298        }
1299
1300        // Not in cache or invalid - try locator
1301        // Use LocatorContext to track deps on the asset itself
1302        check_cycle(&full_cache_key)?;
1303        let _guard = StackGuard::push(full_cache_key.clone());
1304
1305        let locator_ctx = LocatorContext::new(self, full_cache_key.clone());
1306        let locator_result =
1307            self.locators
1308                .locate_with_locator_ctx(TypeId::of::<K>(), &locator_ctx, &key);
1309
1310        if let Some(result) = locator_result {
1311            // Get collected dependencies from the locator context
1312            let locator_deps = locator_ctx.into_deps();
1313            match result {
1314                Ok(ErasedLocateResult::Ready {
1315                    value: arc,
1316                    durability: durability_level,
1317                }) => {
1318                    emit_requested(&self.tracer, &key, TracerAssetState::Ready);
1319
1320                    let typed_value: Arc<K::Asset> = match arc.downcast::<K::Asset>() {
1321                        Ok(v) => v,
1322                        Err(_) => {
1323                            return Err(QueryError::MissingDependency {
1324                                description: format!("Asset type mismatch: {:?}", key),
1325                            });
1326                        }
1327                    };
1328
1329                    // Store in whale atomically with early cutoff
1330                    // Include locator's dependencies so the asset is invalidated when they change
1331                    let entry = CachedEntry::AssetReady(typed_value.clone());
1332                    let durability = Durability::new(durability_level.as_u8() as usize)
1333                        .unwrap_or(Durability::volatile());
1334                    let new_value = typed_value.clone();
1335                    let result = self
1336                        .whale
1337                        .update_with_compare(
1338                            full_cache_key.clone(),
1339                            Some(entry),
1340                            |old_data, _new_data| {
1341                                let Some(CachedEntry::AssetReady(old_arc)) =
1342                                    old_data.and_then(|d| d.as_ref())
1343                                else {
1344                                    return true;
1345                                };
1346                                let Ok(old_value) = old_arc.clone().downcast::<K::Asset>() else {
1347                                    return true;
1348                                };
1349                                !K::asset_eq(&old_value, &new_value)
1350                            },
1351                            durability,
1352                            locator_deps,
1353                        )
1354                        .expect("update_with_compare should succeed");
1355
1356                    // Register verifier for this asset (for verify-then-decide pattern)
1357                    self.verifiers
1358                        .insert_asset::<K, T>(full_cache_key, key.clone());
1359
1360                    return Ok((AssetLoadingState::ready(key, typed_value), result.revision));
1361                }
1362                Ok(ErasedLocateResult::Pending) => {
1363                    emit_requested(&self.tracer, &key, TracerAssetState::Loading);
1364
1365                    // Add to pending list for Pending result
1366                    self.pending.insert::<K>(full_asset_key, key.clone());
1367                    match self
1368                        .whale
1369                        .get_or_insert(full_cache_key, None, Durability::volatile(), locator_deps)
1370                        .expect("get_or_insert should succeed")
1371                    {
1372                        GetOrInsertResult::Inserted(node) => {
1373                            return Ok((AssetLoadingState::loading(key), node.changed_at));
1374                        }
1375                        GetOrInsertResult::Existing(node) => {
1376                            let changed_at = node.changed_at;
1377                            match &node.data {
1378                                Some(CachedEntry::AssetReady(arc)) => {
1379                                    match arc.clone().downcast::<K::Asset>() {
1380                                        Ok(value) => {
1381                                            return Ok((
1382                                                AssetLoadingState::ready(key, value),
1383                                                changed_at,
1384                                            ))
1385                                        }
1386                                        Err(_) => {
1387                                            return Ok((
1388                                                AssetLoadingState::loading(key),
1389                                                changed_at,
1390                                            ))
1391                                        }
1392                                    }
1393                                }
1394                                Some(CachedEntry::AssetError(err)) => {
1395                                    return Err(QueryError::UserError(err.clone()));
1396                                }
1397                                _ => return Ok((AssetLoadingState::loading(key), changed_at)),
1398                            }
1399                        }
1400                    }
1401                }
1402                Err(QueryError::UserError(err)) => {
1403                    // Locator returned a user error - cache it as AssetError
1404                    let entry = CachedEntry::AssetError(err.clone());
1405                    let _ = self.whale.register(
1406                        full_cache_key,
1407                        Some(entry),
1408                        Durability::volatile(),
1409                        locator_deps,
1410                    );
1411                    return Err(QueryError::UserError(err));
1412                }
1413                Err(e) => {
1414                    // Other errors (Cycle, Suspended, etc.) - do NOT cache, propagate directly
1415                    return Err(e);
1416                }
1417            }
1418        }
1419
1420        // No locator registered or locator returned None - mark as pending
1421        // (no locator was called, so no deps to track)
1422        emit_requested(&self.tracer, &key, TracerAssetState::Loading);
1423        self.pending
1424            .insert::<K>(full_asset_key.clone(), key.clone());
1425
1426        match self
1427            .whale
1428            .get_or_insert(full_cache_key, None, Durability::volatile(), vec![])
1429            .expect("get_or_insert with no dependencies cannot fail")
1430        {
1431            GetOrInsertResult::Inserted(node) => {
1432                Ok((AssetLoadingState::loading(key), node.changed_at))
1433            }
1434            GetOrInsertResult::Existing(node) => {
1435                let changed_at = node.changed_at;
1436                match &node.data {
1437                    Some(CachedEntry::AssetReady(arc)) => {
1438                        match arc.clone().downcast::<K::Asset>() {
1439                            Ok(value) => Ok((AssetLoadingState::ready(key, value), changed_at)),
1440                            Err(_) => Ok((AssetLoadingState::loading(key), changed_at)),
1441                        }
1442                    }
1443                    Some(CachedEntry::AssetError(err)) => Err(QueryError::UserError(err.clone())),
1444                    _ => Ok((AssetLoadingState::loading(key), changed_at)),
1445                }
1446            }
1447        }
1448    }
1449
1450    /// Internal: Get asset state and its changed_at revision atomically (with QueryContext).
1451    ///
1452    /// This version is called from QueryContext::asset. Consistency checking for
1453    /// cached leaf assets is done inside this function before returning.
1454    fn get_asset_with_revision_ctx<K: AssetKey>(
1455        &self,
1456        key: K,
1457        _ctx: &QueryContext<'_, T>,
1458    ) -> Result<(AssetLoadingState<K>, RevisionCounter), QueryError> {
1459        let full_asset_key = FullAssetKey::new(&key);
1460        let full_cache_key = FullCacheKey::from_asset_key(&full_asset_key);
1461
1462        // Helper to emit AssetRequested event
1463        let emit_requested = |tracer: &T, key: &K, state: TracerAssetState| {
1464            tracer.on_asset_requested(
1465                TracerAssetKey::new(std::any::type_name::<K>(), format!("{:?}", key)),
1466                state,
1467            );
1468        };
1469
1470        // Check whale cache first (single atomic read)
1471        if let Some(node) = self.whale.get(&full_cache_key) {
1472            let changed_at = node.changed_at;
1473            // Check if valid at current revision (shallow check)
1474            if self.whale.is_valid(&full_cache_key) {
1475                // Verify dependencies recursively (like query path does)
1476                let mut deps_verified = true;
1477                if let Some(deps) = self.whale.get_dependency_ids(&full_cache_key) {
1478                    for dep in deps {
1479                        if let Some(verifier) = self.verifiers.get(&dep) {
1480                            // Re-run query/asset to verify it (triggers recursive verification)
1481                            if verifier.verify(self as &dyn std::any::Any).is_err() {
1482                                deps_verified = false;
1483                                break;
1484                            }
1485                        }
1486                    }
1487                }
1488
1489                // Re-check validity after deps are verified
1490                if deps_verified && self.whale.is_valid(&full_cache_key) {
1491                    // For cached entries, check consistency for leaf assets (no locator deps).
1492                    // This detects if resolve_asset/resolve_asset_error was called during query execution.
1493                    let has_locator_deps = self
1494                        .whale
1495                        .get_dependency_ids(&full_cache_key)
1496                        .is_some_and(|deps| !deps.is_empty());
1497
1498                    match &node.data {
1499                        Some(CachedEntry::AssetReady(arc)) => {
1500                            // Check consistency for cached leaf assets
1501                            if !has_locator_deps {
1502                                check_leaf_asset_consistency(changed_at)?;
1503                            }
1504                            emit_requested(&self.tracer, &key, TracerAssetState::Ready);
1505                            match arc.clone().downcast::<K::Asset>() {
1506                                Ok(value) => {
1507                                    return Ok((AssetLoadingState::ready(key, value), changed_at))
1508                                }
1509                                Err(_) => {
1510                                    return Err(QueryError::MissingDependency {
1511                                        description: format!("Asset type mismatch: {:?}", key),
1512                                    })
1513                                }
1514                            }
1515                        }
1516                        Some(CachedEntry::AssetError(err)) => {
1517                            // Check consistency for cached leaf errors
1518                            if !has_locator_deps {
1519                                check_leaf_asset_consistency(changed_at)?;
1520                            }
1521                            emit_requested(&self.tracer, &key, TracerAssetState::NotFound);
1522                            return Err(QueryError::UserError(err.clone()));
1523                        }
1524                        None => {
1525                            // Loading state - no value to be inconsistent with
1526                            emit_requested(&self.tracer, &key, TracerAssetState::Loading);
1527                            return Ok((AssetLoadingState::loading(key), changed_at));
1528                        }
1529                        _ => {
1530                            // Query-related entries (Ok, UserError) shouldn't be here
1531                            // Fall through to locator
1532                        }
1533                    }
1534                }
1535            }
1536        }
1537
1538        // Not in cache or invalid - try locator
1539        // Use LocatorContext to track deps on the asset itself (not the calling query)
1540        // Consistency tracking is handled via thread-local storage
1541        check_cycle(&full_cache_key)?;
1542        let _guard = StackGuard::push(full_cache_key.clone());
1543
1544        let locator_ctx = LocatorContext::new(self, full_cache_key.clone());
1545        let locator_result =
1546            self.locators
1547                .locate_with_locator_ctx(TypeId::of::<K>(), &locator_ctx, &key);
1548
1549        if let Some(result) = locator_result {
1550            // Get collected dependencies from the locator context
1551            let locator_deps = locator_ctx.into_deps();
1552            match result {
1553                Ok(ErasedLocateResult::Ready {
1554                    value: arc,
1555                    durability: durability_level,
1556                }) => {
1557                    emit_requested(&self.tracer, &key, TracerAssetState::Ready);
1558
1559                    let typed_value: Arc<K::Asset> = match arc.downcast::<K::Asset>() {
1560                        Ok(v) => v,
1561                        Err(_) => {
1562                            return Err(QueryError::MissingDependency {
1563                                description: format!("Asset type mismatch: {:?}", key),
1564                            });
1565                        }
1566                    };
1567
1568                    // Store in whale atomically with early cutoff
1569                    // Include locator's dependencies so the asset is invalidated when they change
1570                    let entry = CachedEntry::AssetReady(typed_value.clone());
1571                    let durability = Durability::new(durability_level.as_u8() as usize)
1572                        .unwrap_or(Durability::volatile());
1573                    let new_value = typed_value.clone();
1574                    let result = self
1575                        .whale
1576                        .update_with_compare(
1577                            full_cache_key.clone(),
1578                            Some(entry),
1579                            |old_data, _new_data| {
1580                                let Some(CachedEntry::AssetReady(old_arc)) =
1581                                    old_data.and_then(|d| d.as_ref())
1582                                else {
1583                                    return true;
1584                                };
1585                                let Ok(old_value) = old_arc.clone().downcast::<K::Asset>() else {
1586                                    return true;
1587                                };
1588                                !K::asset_eq(&old_value, &new_value)
1589                            },
1590                            durability,
1591                            locator_deps,
1592                        )
1593                        .expect("update_with_compare should succeed");
1594
1595                    // Register verifier for this asset (for verify-then-decide pattern)
1596                    self.verifiers
1597                        .insert_asset::<K, T>(full_cache_key, key.clone());
1598
1599                    return Ok((AssetLoadingState::ready(key, typed_value), result.revision));
1600                }
1601                Ok(ErasedLocateResult::Pending) => {
1602                    emit_requested(&self.tracer, &key, TracerAssetState::Loading);
1603
1604                    // Add to pending list for Pending result
1605                    self.pending.insert::<K>(full_asset_key, key.clone());
1606                    match self
1607                        .whale
1608                        .get_or_insert(full_cache_key, None, Durability::volatile(), locator_deps)
1609                        .expect("get_or_insert should succeed")
1610                    {
1611                        GetOrInsertResult::Inserted(node) => {
1612                            return Ok((AssetLoadingState::loading(key), node.changed_at));
1613                        }
1614                        GetOrInsertResult::Existing(node) => {
1615                            let changed_at = node.changed_at;
1616                            match &node.data {
1617                                Some(CachedEntry::AssetReady(arc)) => {
1618                                    match arc.clone().downcast::<K::Asset>() {
1619                                        Ok(value) => {
1620                                            return Ok((
1621                                                AssetLoadingState::ready(key, value),
1622                                                changed_at,
1623                                            ));
1624                                        }
1625                                        Err(_) => {
1626                                            return Ok((
1627                                                AssetLoadingState::loading(key),
1628                                                changed_at,
1629                                            ))
1630                                        }
1631                                    }
1632                                }
1633                                Some(CachedEntry::AssetError(err)) => {
1634                                    return Err(QueryError::UserError(err.clone()));
1635                                }
1636                                _ => return Ok((AssetLoadingState::loading(key), changed_at)),
1637                            }
1638                        }
1639                    }
1640                }
1641                Err(QueryError::UserError(err)) => {
1642                    // Locator returned a user error - cache it as AssetError
1643                    let entry = CachedEntry::AssetError(err.clone());
1644                    let _ = self.whale.register(
1645                        full_cache_key,
1646                        Some(entry),
1647                        Durability::volatile(),
1648                        locator_deps,
1649                    );
1650                    return Err(QueryError::UserError(err));
1651                }
1652                Err(e) => {
1653                    // Other errors (Cycle, Suspended, etc.) - do NOT cache, propagate directly
1654                    return Err(e);
1655                }
1656            }
1657        }
1658
1659        // No locator registered or locator returned None - mark as pending
1660        emit_requested(&self.tracer, &key, TracerAssetState::Loading);
1661        self.pending
1662            .insert::<K>(full_asset_key.clone(), key.clone());
1663
1664        match self
1665            .whale
1666            .get_or_insert(full_cache_key, None, Durability::volatile(), vec![])
1667            .expect("get_or_insert with no dependencies cannot fail")
1668        {
1669            GetOrInsertResult::Inserted(node) => {
1670                Ok((AssetLoadingState::loading(key), node.changed_at))
1671            }
1672            GetOrInsertResult::Existing(node) => {
1673                let changed_at = node.changed_at;
1674                match &node.data {
1675                    Some(CachedEntry::AssetReady(arc)) => {
1676                        match arc.clone().downcast::<K::Asset>() {
1677                            Ok(value) => Ok((AssetLoadingState::ready(key, value), changed_at)),
1678                            Err(_) => Ok((AssetLoadingState::loading(key), changed_at)),
1679                        }
1680                    }
1681                    Some(CachedEntry::AssetError(err)) => Err(QueryError::UserError(err.clone())),
1682                    _ => Ok((AssetLoadingState::loading(key), changed_at)),
1683                }
1684            }
1685        }
1686    }
1687
1688    /// Internal: Get asset state, checking cache and locator.
1689    fn get_asset_internal<K: AssetKey>(&self, key: K) -> Result<AssetLoadingState<K>, QueryError> {
1690        self.get_asset_with_revision(key).map(|(state, _)| state)
1691    }
1692}
1693
1694impl<T: Tracer> Db for QueryRuntime<T> {
1695    fn query<Q: Query>(&self, query: Q) -> Result<Arc<Q::Output>, QueryError> {
1696        QueryRuntime::query(self, query)
1697    }
1698
1699    fn asset<K: AssetKey>(&self, key: K) -> Result<Arc<K::Asset>, QueryError> {
1700        self.get_asset_internal(key)?.suspend()
1701    }
1702
1703    fn asset_state<K: AssetKey>(&self, key: K) -> Result<AssetLoadingState<K>, QueryError> {
1704        self.get_asset_internal(key)
1705    }
1706
1707    fn list_queries<Q: Query>(&self) -> Vec<Q> {
1708        self.query_registry.get_all::<Q>()
1709    }
1710
1711    fn list_asset_keys<K: AssetKey>(&self) -> Vec<K> {
1712        self.asset_key_registry.get_all::<K>()
1713    }
1714}
1715
1716/// Tracks consistency of leaf asset accesses during query execution.
1717///
1718/// A "leaf" asset is one without dependencies (externally resolved via `resolve_asset`).
1719/// This tracker ensures that all leaf assets accessed during a query execution
1720/// (including those accessed by locators) are consistent - i.e., none were modified
1721/// via `resolve_asset` mid-execution.
1722///
1723/// The tracker is shared across `QueryContext` and `LocatorContext` to propagate
1724/// consistency checking through the entire execution tree.
1725#[derive(Debug)]
1726pub(crate) struct ConsistencyTracker {
1727    /// Global revision at query start. All leaf assets must have changed_at <= this.
1728    start_revision: RevisionCounter,
1729}
1730
1731impl ConsistencyTracker {
1732    /// Create a new tracker with the given start revision.
1733    pub fn new(start_revision: RevisionCounter) -> Self {
1734        Self { start_revision }
1735    }
1736
1737    /// Check consistency for a leaf asset access.
1738    ///
1739    /// A leaf asset is consistent if its changed_at <= start_revision.
1740    /// This detects if resolve_asset was called during query execution.
1741    ///
1742    /// Returns Ok(()) if consistent, Err if inconsistent.
1743    pub fn check_leaf_asset(&self, dep_changed_at: RevisionCounter) -> Result<(), QueryError> {
1744        if dep_changed_at > self.start_revision {
1745            Err(QueryError::InconsistentAssetResolution)
1746        } else {
1747            Ok(())
1748        }
1749    }
1750}
1751
1752/// Context provided to queries during execution.
1753///
1754/// Use this to access dependencies via `query()`.
1755pub struct QueryContext<'a, T: Tracer = NoopTracer> {
1756    runtime: &'a QueryRuntime<T>,
1757    current_key: FullCacheKey,
1758    parent_query_type: &'static str,
1759    exec_ctx: ExecutionContext,
1760    deps: RefCell<Vec<FullCacheKey>>,
1761}
1762
1763impl<'a, T: Tracer> QueryContext<'a, T> {
1764    /// Query a dependency.
1765    ///
1766    /// The dependency is automatically tracked for invalidation.
1767    ///
1768    /// # Example
1769    ///
1770    /// ```ignore
1771    /// fn query(self, db: &impl Db) -> Result<Self::Output, QueryError> {
1772    ///     let dep_result = db.query(OtherQuery { id: self.id })?;
1773    ///     Ok(process(&dep_result))
1774    /// }
1775    /// ```
1776    pub fn query<Q: Query>(&self, query: Q) -> Result<Arc<Q::Output>, QueryError> {
1777        let key = query.cache_key();
1778        let full_key = FullCacheKey::new::<Q, _>(&key);
1779
1780        // Emit dependency registered event
1781        self.runtime.tracer.on_dependency_registered(
1782            self.exec_ctx.span_id(),
1783            TracerQueryKey::new(self.parent_query_type, self.current_key.debug_repr()),
1784            TracerQueryKey::new(std::any::type_name::<Q>(), full_key.debug_repr()),
1785        );
1786
1787        // Record this as a dependency
1788        self.deps.borrow_mut().push(full_key.clone());
1789
1790        // Execute the query
1791        self.runtime.query(query)
1792    }
1793
1794    /// Access an asset, tracking it as a dependency.
1795    ///
1796    /// Returns the asset value if ready, or `Err(QueryError::Suspend)` if still loading.
1797    /// Use this with the `?` operator for automatic suspension on loading.
1798    ///
1799    /// # Example
1800    ///
1801    /// ```ignore
1802    /// #[query]
1803    /// fn process_file(db: &impl Db, path: FilePath) -> Result<Output, QueryError> {
1804    ///     let content = db.asset(path)?;
1805    ///     // Process content...
1806    ///     Ok(output)
1807    /// }
1808    /// ```
1809    ///
1810    /// # Errors
1811    ///
1812    /// - Returns `Err(QueryError::Suspend)` if the asset is still loading.
1813    /// - Returns `Err(QueryError::MissingDependency)` if the asset was not found.
1814    pub fn asset<K: AssetKey>(&self, key: K) -> Result<Arc<K::Asset>, QueryError> {
1815        self.asset_state(key)?.suspend()
1816    }
1817
1818    /// Access an asset's loading state, tracking it as a dependency.
1819    ///
1820    /// Unlike [`asset()`](Self::asset), this method returns the full loading state,
1821    /// allowing you to check if an asset is loading without triggering suspension.
1822    ///
1823    /// # Example
1824    ///
1825    /// ```ignore
1826    /// let state = db.asset_state(key)?;
1827    /// if state.is_loading() {
1828    ///     // Handle loading case explicitly
1829    /// } else {
1830    ///     let value = state.get().unwrap();
1831    /// }
1832    /// ```
1833    ///
1834    /// # Errors
1835    ///
1836    /// Returns `Err(QueryError::MissingDependency)` if the asset was not found.
1837    pub fn asset_state<K: AssetKey>(&self, key: K) -> Result<AssetLoadingState<K>, QueryError> {
1838        let full_asset_key = FullAssetKey::new(&key);
1839        let full_cache_key = FullCacheKey::from_asset_key(&full_asset_key);
1840
1841        // 1. Emit asset dependency registered event
1842        self.runtime.tracer.on_asset_dependency_registered(
1843            self.exec_ctx.span_id(),
1844            TracerQueryKey::new(self.parent_query_type, self.current_key.debug_repr()),
1845            TracerAssetKey::new(std::any::type_name::<K>(), format!("{:?}", key)),
1846        );
1847
1848        // 2. Record dependency on this asset
1849        self.deps.borrow_mut().push(full_cache_key);
1850
1851        // 3. Get asset from cache/locator
1852        // Consistency checking for cached leaf assets is done inside get_asset_with_revision_ctx
1853        let (state, _changed_at) = self.runtime.get_asset_with_revision_ctx(key, self)?;
1854
1855        Ok(state)
1856    }
1857
1858    /// List all query instances of type Q that have been registered.
1859    ///
1860    /// This method establishes a dependency on the "set" of queries of type Q.
1861    /// The calling query will be invalidated when:
1862    /// - A new query of type Q is first executed (added to set)
1863    ///
1864    /// The calling query will NOT be invalidated when:
1865    /// - An individual query of type Q has its value change
1866    ///
1867    /// # Example
1868    ///
1869    /// ```ignore
1870    /// #[query]
1871    /// fn all_results(db: &impl Db) -> Result<Vec<i32>, QueryError> {
1872    ///     let queries = db.list_queries::<MyQuery>();
1873    ///     let mut results = Vec::new();
1874    ///     for q in queries {
1875    ///         results.push(*db.query(q)?);
1876    ///     }
1877    ///     Ok(results)
1878    /// }
1879    /// ```
1880    pub fn list_queries<Q: Query>(&self) -> Vec<Q> {
1881        // Record dependency on the sentinel (set-level dependency)
1882        let sentinel = FullCacheKey::query_set_sentinel::<Q>();
1883
1884        self.runtime.tracer.on_dependency_registered(
1885            self.exec_ctx.span_id(),
1886            TracerQueryKey::new(self.parent_query_type, self.current_key.debug_repr()),
1887            TracerQueryKey::new("QuerySet", sentinel.debug_repr()),
1888        );
1889
1890        // Ensure sentinel exists in whale (for dependency tracking)
1891        if self.runtime.whale.get(&sentinel).is_none() {
1892            let _ =
1893                self.runtime
1894                    .whale
1895                    .register(sentinel.clone(), None, Durability::volatile(), vec![]);
1896        }
1897
1898        self.deps.borrow_mut().push(sentinel);
1899
1900        // Return all registered queries
1901        self.runtime.query_registry.get_all::<Q>()
1902    }
1903
1904    /// List all asset keys of type K that have been registered.
1905    ///
1906    /// This method establishes a dependency on the "set" of asset keys of type K.
1907    /// The calling query will be invalidated when:
1908    /// - A new asset of type K is resolved for the first time (added to set)
1909    /// - An asset of type K is removed via remove_asset
1910    ///
1911    /// The calling query will NOT be invalidated when:
1912    /// - An individual asset's value changes (use `db.asset()` for that)
1913    ///
1914    /// # Example
1915    ///
1916    /// ```ignore
1917    /// #[query]
1918    /// fn all_configs(db: &impl Db) -> Result<Vec<String>, QueryError> {
1919    ///     let keys = db.list_asset_keys::<ConfigFile>();
1920    ///     let mut contents = Vec::new();
1921    ///     for key in keys {
1922    ///         let content = db.asset(key)?;
1923    ///         contents.push((*content).clone());
1924    ///     }
1925    ///     Ok(contents)
1926    /// }
1927    /// ```
1928    pub fn list_asset_keys<K: AssetKey>(&self) -> Vec<K> {
1929        // Record dependency on the sentinel (set-level dependency)
1930        let sentinel = FullCacheKey::asset_key_set_sentinel::<K>();
1931
1932        self.runtime.tracer.on_asset_dependency_registered(
1933            self.exec_ctx.span_id(),
1934            TracerQueryKey::new(self.parent_query_type, self.current_key.debug_repr()),
1935            TracerAssetKey::new("AssetKeySet", sentinel.debug_repr()),
1936        );
1937
1938        // Ensure sentinel exists in whale (for dependency tracking)
1939        if self.runtime.whale.get(&sentinel).is_none() {
1940            let _ =
1941                self.runtime
1942                    .whale
1943                    .register(sentinel.clone(), None, Durability::volatile(), vec![]);
1944        }
1945
1946        self.deps.borrow_mut().push(sentinel);
1947
1948        // Return all registered asset keys
1949        self.runtime.asset_key_registry.get_all::<K>()
1950    }
1951}
1952
1953impl<'a, T: Tracer> Db for QueryContext<'a, T> {
1954    fn query<Q: Query>(&self, query: Q) -> Result<Arc<Q::Output>, QueryError> {
1955        QueryContext::query(self, query)
1956    }
1957
1958    fn asset<K: AssetKey>(&self, key: K) -> Result<Arc<K::Asset>, QueryError> {
1959        QueryContext::asset(self, key)
1960    }
1961
1962    fn asset_state<K: AssetKey>(&self, key: K) -> Result<AssetLoadingState<K>, QueryError> {
1963        QueryContext::asset_state(self, key)
1964    }
1965
1966    fn list_queries<Q: Query>(&self) -> Vec<Q> {
1967        QueryContext::list_queries(self)
1968    }
1969
1970    fn list_asset_keys<K: AssetKey>(&self) -> Vec<K> {
1971        QueryContext::list_asset_keys(self)
1972    }
1973}
1974
1975/// Context for collecting dependencies during asset locator execution.
1976///
1977/// Unlike `QueryContext`, this is specifically for locators and does not
1978/// register dependencies on any parent query. Dependencies collected here
1979/// are stored with the asset itself.
1980pub(crate) struct LocatorContext<'a, T: Tracer> {
1981    runtime: &'a QueryRuntime<T>,
1982    deps: RefCell<Vec<FullCacheKey>>,
1983}
1984
1985impl<'a, T: Tracer> LocatorContext<'a, T> {
1986    /// Create a new locator context for the given asset key.
1987    ///
1988    /// Consistency tracking is handled via thread-local storage, so leaf asset
1989    /// accesses will be checked against any active tracker from a parent query.
1990    pub(crate) fn new(runtime: &'a QueryRuntime<T>, _asset_key: FullCacheKey) -> Self {
1991        Self {
1992            runtime,
1993            deps: RefCell::new(Vec::new()),
1994        }
1995    }
1996
1997    /// Consume this context and return the collected dependencies.
1998    pub(crate) fn into_deps(self) -> Vec<FullCacheKey> {
1999        self.deps.into_inner()
2000    }
2001}
2002
2003impl<T: Tracer> Db for LocatorContext<'_, T> {
2004    fn query<Q: Query>(&self, query: Q) -> Result<Arc<Q::Output>, QueryError> {
2005        let key = query.cache_key();
2006        let full_key = FullCacheKey::new::<Q, _>(&key);
2007
2008        // Record this as a dependency of the asset being located
2009        self.deps.borrow_mut().push(full_key);
2010
2011        // Execute the query via the runtime
2012        self.runtime.query(query)
2013    }
2014
2015    fn asset<K: AssetKey>(&self, key: K) -> Result<Arc<K::Asset>, QueryError> {
2016        self.asset_state(key)?.suspend()
2017    }
2018
2019    fn asset_state<K: AssetKey>(&self, key: K) -> Result<AssetLoadingState<K>, QueryError> {
2020        let full_asset_key = FullAssetKey::new(&key);
2021        let full_cache_key = FullCacheKey::from_asset_key(&full_asset_key);
2022
2023        // Record this as a dependency of the asset being located
2024        self.deps.borrow_mut().push(full_cache_key);
2025
2026        // Access the asset - consistency checking is done inside get_asset_with_revision
2027        let (state, _changed_at) = self.runtime.get_asset_with_revision(key)?;
2028
2029        Ok(state)
2030    }
2031
2032    fn list_queries<Q: Query>(&self) -> Vec<Q> {
2033        self.runtime.list_queries()
2034    }
2035
2036    fn list_asset_keys<K: AssetKey>(&self) -> Vec<K> {
2037        self.runtime.list_asset_keys()
2038    }
2039}
2040
2041#[cfg(test)]
2042mod tests {
2043    use super::*;
2044
2045    #[test]
2046    fn test_simple_query() {
2047        #[derive(Clone)]
2048        struct Add {
2049            a: i32,
2050            b: i32,
2051        }
2052
2053        impl Query for Add {
2054            type CacheKey = (i32, i32);
2055            type Output = i32;
2056
2057            fn cache_key(&self) -> Self::CacheKey {
2058                (self.a, self.b)
2059            }
2060
2061            fn query(self, _db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
2062                Ok(Arc::new(self.a + self.b))
2063            }
2064
2065            fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
2066                old == new
2067            }
2068        }
2069
2070        let runtime = QueryRuntime::new();
2071
2072        let result = runtime.query(Add { a: 1, b: 2 }).unwrap();
2073        assert_eq!(*result, 3);
2074
2075        // Second query should be cached
2076        let result2 = runtime.query(Add { a: 1, b: 2 }).unwrap();
2077        assert_eq!(*result2, 3);
2078    }
2079
2080    #[test]
2081    fn test_dependent_queries() {
2082        #[derive(Clone)]
2083        struct Base {
2084            value: i32,
2085        }
2086
2087        impl Query for Base {
2088            type CacheKey = i32;
2089            type Output = i32;
2090
2091            fn cache_key(&self) -> Self::CacheKey {
2092                self.value
2093            }
2094
2095            fn query(self, _db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
2096                Ok(Arc::new(self.value * 2))
2097            }
2098
2099            fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
2100                old == new
2101            }
2102        }
2103
2104        #[derive(Clone)]
2105        struct Derived {
2106            base_value: i32,
2107        }
2108
2109        impl Query for Derived {
2110            type CacheKey = i32;
2111            type Output = i32;
2112
2113            fn cache_key(&self) -> Self::CacheKey {
2114                self.base_value
2115            }
2116
2117            fn query(self, db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
2118                let base = db.query(Base {
2119                    value: self.base_value,
2120                })?;
2121                Ok(Arc::new(*base + 10))
2122            }
2123
2124            fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
2125                old == new
2126            }
2127        }
2128
2129        let runtime = QueryRuntime::new();
2130
2131        let result = runtime.query(Derived { base_value: 5 }).unwrap();
2132        assert_eq!(*result, 20); // 5 * 2 + 10
2133    }
2134
2135    #[test]
2136    fn test_cycle_detection() {
2137        #[derive(Clone)]
2138        struct CycleA {
2139            id: i32,
2140        }
2141
2142        #[derive(Clone)]
2143        struct CycleB {
2144            id: i32,
2145        }
2146
2147        impl Query for CycleA {
2148            type CacheKey = i32;
2149            type Output = i32;
2150
2151            fn cache_key(&self) -> Self::CacheKey {
2152                self.id
2153            }
2154
2155            fn query(self, db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
2156                let b = db.query(CycleB { id: self.id })?;
2157                Ok(Arc::new(*b + 1))
2158            }
2159
2160            fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
2161                old == new
2162            }
2163        }
2164
2165        impl Query for CycleB {
2166            type CacheKey = i32;
2167            type Output = i32;
2168
2169            fn cache_key(&self) -> Self::CacheKey {
2170                self.id
2171            }
2172
2173            fn query(self, db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
2174                let a = db.query(CycleA { id: self.id })?;
2175                Ok(Arc::new(*a + 1))
2176            }
2177
2178            fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
2179                old == new
2180            }
2181        }
2182
2183        let runtime = QueryRuntime::new();
2184
2185        let result = runtime.query(CycleA { id: 1 });
2186        assert!(matches!(result, Err(QueryError::Cycle { .. })));
2187    }
2188
2189    #[test]
2190    fn test_fallible_query() {
2191        #[derive(Clone)]
2192        struct ParseInt {
2193            input: String,
2194        }
2195
2196        impl Query for ParseInt {
2197            type CacheKey = String;
2198            type Output = Result<i32, std::num::ParseIntError>;
2199
2200            fn cache_key(&self) -> Self::CacheKey {
2201                self.input.clone()
2202            }
2203
2204            fn query(self, _db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
2205                Ok(Arc::new(self.input.parse()))
2206            }
2207
2208            fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
2209                old == new
2210            }
2211        }
2212
2213        let runtime = QueryRuntime::new();
2214
2215        // Valid parse
2216        let result = runtime
2217            .query(ParseInt {
2218                input: "42".to_string(),
2219            })
2220            .unwrap();
2221        assert_eq!(*result, Ok(42));
2222
2223        // Invalid parse - system succeeds, user error in output
2224        let result = runtime
2225            .query(ParseInt {
2226                input: "not_a_number".to_string(),
2227            })
2228            .unwrap();
2229        assert!(result.is_err());
2230    }
2231
2232    // Macro tests
2233    mod macro_tests {
2234        use super::*;
2235        use crate::query;
2236
2237        #[query]
2238        fn add(db: &impl Db, a: i32, b: i32) -> Result<i32, QueryError> {
2239            let _ = db; // silence unused warning
2240            Ok(a + b)
2241        }
2242
2243        #[test]
2244        fn test_macro_basic() {
2245            let runtime = QueryRuntime::new();
2246            let result = runtime.query(Add::new(1, 2)).unwrap();
2247            assert_eq!(*result, 3);
2248        }
2249
2250        #[query]
2251        fn simple_double(db: &impl Db, x: i32) -> Result<i32, QueryError> {
2252            let _ = db;
2253            Ok(x * 2)
2254        }
2255
2256        #[test]
2257        fn test_macro_simple() {
2258            let runtime = QueryRuntime::new();
2259            let result = runtime.query(SimpleDouble::new(5)).unwrap();
2260            assert_eq!(*result, 10);
2261        }
2262
2263        #[query(keys(id))]
2264        fn with_key_selection(
2265            db: &impl Db,
2266            id: u32,
2267            include_extra: bool,
2268        ) -> Result<String, QueryError> {
2269            let _ = db;
2270            Ok(format!("id={}, extra={}", id, include_extra))
2271        }
2272
2273        #[test]
2274        fn test_macro_key_selection() {
2275            let runtime = QueryRuntime::new();
2276
2277            // Same id, different include_extra - should return cached
2278            let r1 = runtime.query(WithKeySelection::new(1, true)).unwrap();
2279            let r2 = runtime.query(WithKeySelection::new(1, false)).unwrap();
2280
2281            // Both should have same value because only `id` is the key
2282            assert_eq!(*r1, "id=1, extra=true");
2283            assert_eq!(*r2, "id=1, extra=true"); // Cached!
2284        }
2285
2286        #[query]
2287        fn dependent(db: &impl Db, a: i32, b: i32) -> Result<i32, QueryError> {
2288            let sum = db.query(Add::new(a, b))?;
2289            Ok(*sum * 2)
2290        }
2291
2292        #[test]
2293        fn test_macro_dependencies() {
2294            let runtime = QueryRuntime::new();
2295            let result = runtime.query(Dependent::new(3, 4)).unwrap();
2296            assert_eq!(*result, 14); // (3 + 4) * 2
2297        }
2298
2299        #[query(output_eq)]
2300        fn with_output_eq(db: &impl Db, x: i32) -> Result<i32, QueryError> {
2301            let _ = db;
2302            Ok(x * 2)
2303        }
2304
2305        #[test]
2306        fn test_macro_output_eq() {
2307            let runtime = QueryRuntime::new();
2308            let result = runtime.query(WithOutputEq::new(5)).unwrap();
2309            assert_eq!(*result, 10);
2310        }
2311
2312        #[query(name = "CustomName")]
2313        fn original_name(db: &impl Db, x: i32) -> Result<i32, QueryError> {
2314            let _ = db;
2315            Ok(x)
2316        }
2317
2318        #[test]
2319        fn test_macro_custom_name() {
2320            let runtime = QueryRuntime::new();
2321            let result = runtime.query(CustomName::new(42)).unwrap();
2322            assert_eq!(*result, 42);
2323        }
2324
2325        // Test that attribute macros like #[tracing::instrument] are preserved
2326        // We use #[allow(unused_variables)] and #[inline] as test attributes since
2327        // they don't require external dependencies.
2328        #[allow(unused_variables)]
2329        #[inline]
2330        #[query]
2331        fn with_attributes(db: &impl Db, x: i32) -> Result<i32, QueryError> {
2332            // This would warn without #[allow(unused_variables)] on the generated method
2333            let unused_var = 42;
2334            Ok(x * 2)
2335        }
2336
2337        #[test]
2338        fn test_macro_preserves_attributes() {
2339            let runtime = QueryRuntime::new();
2340            // If attributes weren't preserved, this might warn about unused_var
2341            let result = runtime.query(WithAttributes::new(5)).unwrap();
2342            assert_eq!(*result, 10);
2343        }
2344    }
2345
2346    // Tests for poll() and changed_at()
2347    mod poll_tests {
2348        use super::*;
2349
2350        #[derive(Clone)]
2351        struct Counter {
2352            id: i32,
2353        }
2354
2355        impl Query for Counter {
2356            type CacheKey = i32;
2357            type Output = i32;
2358
2359            fn cache_key(&self) -> Self::CacheKey {
2360                self.id
2361            }
2362
2363            fn query(self, _db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
2364                Ok(Arc::new(self.id * 10))
2365            }
2366
2367            fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
2368                old == new
2369            }
2370        }
2371
2372        #[test]
2373        fn test_poll_returns_value_and_revision() {
2374            let runtime = QueryRuntime::new();
2375
2376            let result = runtime.poll(Counter { id: 1 }).unwrap();
2377
2378            // Value should be correct - access through Result and Arc
2379            assert_eq!(**result.value.as_ref().unwrap(), 10);
2380
2381            // Revision should be non-zero after first execution
2382            assert!(result.revision > 0);
2383        }
2384
2385        #[test]
2386        fn test_poll_revision_stable_on_cache_hit() {
2387            let runtime = QueryRuntime::new();
2388
2389            // First poll
2390            let result1 = runtime.poll(Counter { id: 1 }).unwrap();
2391            let rev1 = result1.revision;
2392
2393            // Second poll (cache hit)
2394            let result2 = runtime.poll(Counter { id: 1 }).unwrap();
2395            let rev2 = result2.revision;
2396
2397            // Revision should be the same (no change)
2398            assert_eq!(rev1, rev2);
2399        }
2400
2401        #[test]
2402        fn test_poll_revision_changes_on_invalidate() {
2403            let runtime = QueryRuntime::new();
2404
2405            // First poll
2406            let result1 = runtime.poll(Counter { id: 1 }).unwrap();
2407            let rev1 = result1.revision;
2408
2409            // Invalidate and poll again
2410            runtime.invalidate::<Counter>(&1);
2411            let result2 = runtime.poll(Counter { id: 1 }).unwrap();
2412            let rev2 = result2.revision;
2413
2414            // Revision should increase (value was recomputed)
2415            // Note: Since output_eq returns true (same value), this might not change
2416            // depending on early cutoff behavior. Let's verify the value is still correct.
2417            assert_eq!(**result2.value.as_ref().unwrap(), 10);
2418
2419            // With early cutoff, revision might stay the same if value didn't change
2420            // This is expected behavior
2421            assert!(rev2 >= rev1);
2422        }
2423
2424        #[test]
2425        fn test_changed_at_returns_none_for_unexecuted_query() {
2426            let runtime = QueryRuntime::new();
2427
2428            // Query has never been executed
2429            let rev = runtime.changed_at::<Counter>(&1);
2430            assert!(rev.is_none());
2431        }
2432
2433        #[test]
2434        fn test_changed_at_returns_revision_after_execution() {
2435            let runtime = QueryRuntime::new();
2436
2437            // Execute the query
2438            let _ = runtime.query(Counter { id: 1 }).unwrap();
2439
2440            // Now changed_at should return Some
2441            let rev = runtime.changed_at::<Counter>(&1);
2442            assert!(rev.is_some());
2443            assert!(rev.unwrap() > 0);
2444        }
2445
2446        #[test]
2447        fn test_changed_at_matches_poll_revision() {
2448            let runtime = QueryRuntime::new();
2449
2450            // Poll the query
2451            let result = runtime.poll(Counter { id: 1 }).unwrap();
2452
2453            // changed_at should match the revision from poll
2454            let rev = runtime.changed_at::<Counter>(&1);
2455            assert_eq!(rev, Some(result.revision));
2456        }
2457
2458        #[test]
2459        fn test_poll_value_access() {
2460            let runtime = QueryRuntime::new();
2461
2462            let result = runtime.poll(Counter { id: 5 }).unwrap();
2463
2464            // Access through Result and Arc
2465            let value: &i32 = result.value.as_ref().unwrap();
2466            assert_eq!(*value, 50);
2467
2468            // Access Arc directly via field after unwrapping Result
2469            let arc: &Arc<i32> = result.value.as_ref().unwrap();
2470            assert_eq!(**arc, 50);
2471        }
2472
2473        #[test]
2474        fn test_subscription_pattern() {
2475            let runtime = QueryRuntime::new();
2476
2477            // Simulate subscription pattern
2478            let mut last_revision: RevisionCounter = 0;
2479            let mut notifications = 0;
2480
2481            // First poll - should notify (new value)
2482            let result = runtime.poll(Counter { id: 1 }).unwrap();
2483            if result.revision > last_revision {
2484                notifications += 1;
2485                last_revision = result.revision;
2486            }
2487
2488            // Second poll - should NOT notify (no change)
2489            let result = runtime.poll(Counter { id: 1 }).unwrap();
2490            if result.revision > last_revision {
2491                notifications += 1;
2492                last_revision = result.revision;
2493            }
2494
2495            // Third poll - should NOT notify (no change)
2496            let result = runtime.poll(Counter { id: 1 }).unwrap();
2497            if result.revision > last_revision {
2498                notifications += 1;
2499                #[allow(unused_assignments)]
2500                {
2501                    last_revision = result.revision;
2502                }
2503            }
2504
2505            // Only the first poll should have triggered a notification
2506            assert_eq!(notifications, 1);
2507        }
2508    }
2509
2510    // Tests for GC APIs
2511    mod gc_tests {
2512        use super::*;
2513        use std::collections::HashSet;
2514        use std::sync::atomic::{AtomicUsize, Ordering};
2515        use std::sync::Mutex;
2516
2517        #[derive(Clone)]
2518        struct Leaf {
2519            id: i32,
2520        }
2521
2522        impl Query for Leaf {
2523            type CacheKey = i32;
2524            type Output = i32;
2525
2526            fn cache_key(&self) -> Self::CacheKey {
2527                self.id
2528            }
2529
2530            fn query(self, _db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
2531                Ok(Arc::new(self.id * 10))
2532            }
2533
2534            fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
2535                old == new
2536            }
2537        }
2538
2539        #[derive(Clone)]
2540        struct Parent {
2541            child_id: i32,
2542        }
2543
2544        impl Query for Parent {
2545            type CacheKey = i32;
2546            type Output = i32;
2547
2548            fn cache_key(&self) -> Self::CacheKey {
2549                self.child_id
2550            }
2551
2552            fn query(self, db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
2553                let child = db.query(Leaf { id: self.child_id })?;
2554                Ok(Arc::new(*child + 1))
2555            }
2556
2557            fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
2558                old == new
2559            }
2560        }
2561
2562        #[test]
2563        fn test_query_keys_returns_all_cached_queries() {
2564            let runtime = QueryRuntime::new();
2565
2566            // Execute some queries
2567            let _ = runtime.query(Leaf { id: 1 }).unwrap();
2568            let _ = runtime.query(Leaf { id: 2 }).unwrap();
2569            let _ = runtime.query(Leaf { id: 3 }).unwrap();
2570
2571            // Get all keys
2572            let keys = runtime.query_keys();
2573
2574            // Should have at least 3 keys (might have more due to sentinels)
2575            assert!(keys.len() >= 3);
2576        }
2577
2578        #[test]
2579        fn test_remove_removes_query() {
2580            let runtime = QueryRuntime::new();
2581
2582            // Execute a query
2583            let _ = runtime.query(Leaf { id: 1 }).unwrap();
2584
2585            // Get the key
2586            let full_key = FullCacheKey::new::<Leaf, _>(&1);
2587
2588            // Query should exist
2589            assert!(runtime.changed_at::<Leaf>(&1).is_some());
2590
2591            // Remove it
2592            assert!(runtime.remove(&full_key));
2593
2594            // Query should no longer exist
2595            assert!(runtime.changed_at::<Leaf>(&1).is_none());
2596        }
2597
2598        #[test]
2599        fn test_remove_if_unused_removes_leaf_query() {
2600            let runtime = QueryRuntime::new();
2601
2602            // Execute a leaf query (no dependents)
2603            let _ = runtime.query(Leaf { id: 1 }).unwrap();
2604
2605            // Should be removable since no other query depends on it
2606            assert!(runtime.remove_query_if_unused::<Leaf>(&1));
2607
2608            // Query should no longer exist
2609            assert!(runtime.changed_at::<Leaf>(&1).is_none());
2610        }
2611
2612        #[test]
2613        fn test_remove_if_unused_does_not_remove_query_with_dependents() {
2614            let runtime = QueryRuntime::new();
2615
2616            // Execute parent query (which depends on Leaf)
2617            let _ = runtime.query(Parent { child_id: 1 }).unwrap();
2618
2619            // Leaf query should not be removable since Parent depends on it
2620            assert!(!runtime.remove_query_if_unused::<Leaf>(&1));
2621
2622            // Leaf query should still exist
2623            assert!(runtime.changed_at::<Leaf>(&1).is_some());
2624
2625            // But Parent should be removable (no dependents)
2626            assert!(runtime.remove_query_if_unused::<Parent>(&1));
2627        }
2628
2629        #[test]
2630        fn test_remove_if_unused_with_full_cache_key() {
2631            let runtime = QueryRuntime::new();
2632
2633            // Execute a leaf query
2634            let _ = runtime.query(Leaf { id: 1 }).unwrap();
2635
2636            let full_key = FullCacheKey::new::<Leaf, _>(&1);
2637
2638            // Should be removable via FullCacheKey
2639            assert!(runtime.remove_if_unused(&full_key));
2640
2641            // Query should no longer exist
2642            assert!(runtime.changed_at::<Leaf>(&1).is_none());
2643        }
2644
2645        // Test tracer receives on_query_key calls
2646        struct GcTracker {
2647            accessed_keys: Mutex<HashSet<String>>,
2648            access_count: AtomicUsize,
2649        }
2650
2651        impl GcTracker {
2652            fn new() -> Self {
2653                Self {
2654                    accessed_keys: Mutex::new(HashSet::new()),
2655                    access_count: AtomicUsize::new(0),
2656                }
2657            }
2658        }
2659
2660        impl Tracer for GcTracker {
2661            fn new_span_id(&self) -> SpanId {
2662                SpanId(1)
2663            }
2664
2665            fn on_query_key(&self, full_key: &FullCacheKey) {
2666                self.accessed_keys
2667                    .lock()
2668                    .unwrap()
2669                    .insert(full_key.debug_repr().to_string());
2670                self.access_count.fetch_add(1, Ordering::Relaxed);
2671            }
2672        }
2673
2674        #[test]
2675        fn test_tracer_receives_on_query_key() {
2676            let tracker = GcTracker::new();
2677            let runtime = QueryRuntime::with_tracer(tracker);
2678
2679            // Execute some queries
2680            let _ = runtime.query(Leaf { id: 1 }).unwrap();
2681            let _ = runtime.query(Leaf { id: 2 }).unwrap();
2682
2683            // Tracer should have received on_query_key calls
2684            let count = runtime.tracer().access_count.load(Ordering::Relaxed);
2685            assert_eq!(count, 2);
2686
2687            // Check that the keys were recorded
2688            let keys = runtime.tracer().accessed_keys.lock().unwrap();
2689            assert!(keys.iter().any(|k| k.contains("Leaf")));
2690        }
2691
2692        #[test]
2693        fn test_tracer_receives_on_query_key_for_cache_hits() {
2694            let tracker = GcTracker::new();
2695            let runtime = QueryRuntime::with_tracer(tracker);
2696
2697            // Execute query twice (second is cache hit)
2698            let _ = runtime.query(Leaf { id: 1 }).unwrap();
2699            let _ = runtime.query(Leaf { id: 1 }).unwrap();
2700
2701            // Tracer should have received on_query_key for both calls
2702            let count = runtime.tracer().access_count.load(Ordering::Relaxed);
2703            assert_eq!(count, 2);
2704        }
2705
2706        #[test]
2707        fn test_gc_workflow() {
2708            let tracker = GcTracker::new();
2709            let runtime = QueryRuntime::with_tracer(tracker);
2710
2711            // Execute some queries
2712            let _ = runtime.query(Leaf { id: 1 }).unwrap();
2713            let _ = runtime.query(Leaf { id: 2 }).unwrap();
2714            let _ = runtime.query(Leaf { id: 3 }).unwrap();
2715
2716            // Simulate GC: remove all queries that are not in use
2717            let mut removed = 0;
2718            for key in runtime.query_keys() {
2719                if runtime.remove_if_unused(&key) {
2720                    removed += 1;
2721                }
2722            }
2723
2724            // All leaf queries should be removable
2725            assert!(removed >= 3);
2726
2727            // Queries should no longer exist
2728            assert!(runtime.changed_at::<Leaf>(&1).is_none());
2729            assert!(runtime.changed_at::<Leaf>(&2).is_none());
2730            assert!(runtime.changed_at::<Leaf>(&3).is_none());
2731        }
2732    }
2733}