query_flow/
runtime.rs

1//! Query runtime and context.
2
3use std::any::{Any, TypeId};
4use std::cell::RefCell;
5use std::ops::Deref;
6use std::rc::Rc;
7use std::sync::Arc;
8
9use whale::{Durability, GetOrInsertResult, RevisionCounter, Runtime as WhaleRuntime};
10
11use crate::asset::{AssetKey, AssetLocator, DurabilityLevel, PendingAsset};
12use crate::db::Db;
13use crate::key::{
14    AssetCacheKey, AssetKeySetSentinelKey, FullCacheKey, QueryCacheKey, QuerySetSentinelKey,
15};
16use crate::loading::AssetLoadingState;
17use crate::query::Query;
18use crate::storage::{
19    AssetKeyRegistry, CachedEntry, CachedValue, ErasedLocateResult, LocatorStorage, PendingStorage,
20    QueryRegistry, VerifierStorage,
21};
22use crate::tracer::{
23    ExecutionResult, InvalidationReason, NoopTracer, SpanContext, SpanId, TraceId, Tracer,
24    TracerAssetState,
25};
26use crate::QueryError;
27
28/// Function type for comparing user errors during early cutoff.
29///
30/// Used by `QueryRuntimeBuilder::error_comparator` to customize how
31/// `QueryError::UserError` values are compared for caching purposes.
32pub type ErrorComparator = fn(&anyhow::Error, &anyhow::Error) -> bool;
33
34/// Number of durability levels (matches whale's default).
35const DURABILITY_LEVELS: usize = 4;
36
37// Thread-local query execution stack for cycle detection.
38thread_local! {
39    static QUERY_STACK: RefCell<Vec<FullCacheKey>> = const { RefCell::new(Vec::new()) };
40
41    /// Current consistency tracker for leaf asset checks.
42    /// Set during query execution, used by all nested locator calls.
43    /// Uses Rc since thread-local is single-threaded.
44    static CONSISTENCY_TRACKER: RefCell<Option<Rc<ConsistencyTracker>>> = const { RefCell::new(None) };
45
46    /// Stack for tracking parent-child query relationships.
47    static SPAN_STACK: RefCell<SpanStack> = const { RefCell::new(SpanStack::Empty) };
48}
49
50/// Thread-local span stack state.
51/// Empty when no query is executing; Active with trace_id and span stack during execution.
52enum SpanStack {
53    Empty,
54    Active(TraceId, Vec<SpanId>),
55}
56
57/// Check a leaf asset against the current consistency tracker (if any).
58/// Returns Ok if no tracker is set or if the check passes.
59fn check_leaf_asset_consistency(dep_changed_at: RevisionCounter) -> Result<(), QueryError> {
60    CONSISTENCY_TRACKER.with(|tracker| {
61        if let Some(ref t) = *tracker.borrow() {
62            t.check_leaf_asset(dep_changed_at)
63        } else {
64            Ok(())
65        }
66    })
67}
68
69/// RAII guard that sets the consistency tracker for the current thread.
70struct ConsistencyTrackerGuard {
71    previous: Option<Rc<ConsistencyTracker>>,
72}
73
74impl ConsistencyTrackerGuard {
75    fn new(tracker: Rc<ConsistencyTracker>) -> Self {
76        let previous = CONSISTENCY_TRACKER.with(|t| t.borrow_mut().replace(tracker));
77        Self { previous }
78    }
79}
80
81impl Drop for ConsistencyTrackerGuard {
82    fn drop(&mut self) {
83        CONSISTENCY_TRACKER.with(|t| {
84            *t.borrow_mut() = self.previous.take();
85        });
86    }
87}
88
89/// Check for cycles in the query stack and return error if detected.
90fn check_cycle(key: &FullCacheKey) -> Result<(), QueryError> {
91    let cycle_detected = QUERY_STACK.with(|stack| stack.borrow().iter().any(|k| k == key));
92    if cycle_detected {
93        let path = QUERY_STACK.with(|stack| {
94            let stack = stack.borrow();
95            let mut path: Vec<FullCacheKey> = stack.iter().cloned().collect();
96            path.push(key.clone());
97            path
98        });
99        return Err(QueryError::Cycle { path });
100    }
101    Ok(())
102}
103
104/// RAII guard for pushing/popping from query stack.
105struct StackGuard;
106
107impl StackGuard {
108    fn push(key: FullCacheKey) -> Self {
109        QUERY_STACK.with(|stack| stack.borrow_mut().push(key));
110        StackGuard
111    }
112}
113
114impl Drop for StackGuard {
115    fn drop(&mut self) {
116        QUERY_STACK.with(|stack| {
117            stack.borrow_mut().pop();
118        });
119    }
120}
121
122/// RAII guard for pushing/popping from span stack.
123struct SpanStackGuard;
124
125impl SpanStackGuard {
126    /// Push a span onto the stack. Sets trace_id if this is the root span.
127    fn push(trace_id: TraceId, span_id: SpanId) -> Self {
128        SPAN_STACK.with(|stack| {
129            let mut s = stack.borrow_mut();
130            match &mut *s {
131                SpanStack::Empty => *s = SpanStack::Active(trace_id, vec![span_id]),
132                SpanStack::Active(_, spans) => spans.push(span_id),
133            }
134        });
135        SpanStackGuard
136    }
137}
138
139impl Drop for SpanStackGuard {
140    fn drop(&mut self) {
141        SPAN_STACK.with(|stack| {
142            let mut s = stack.borrow_mut();
143            if let SpanStack::Active(_, spans) = &mut *s {
144                spans.pop();
145                if spans.is_empty() {
146                    *s = SpanStack::Empty;
147                }
148            }
149        });
150    }
151}
152
153/// Execution context passed through query execution.
154///
155/// Contains a SpanContext for tracing correlation with parent-child relationships.
156#[derive(Clone, Copy)]
157pub struct ExecutionContext {
158    span_ctx: SpanContext,
159}
160
161impl ExecutionContext {
162    /// Create a new execution context with the given span context.
163    #[inline]
164    pub fn new(span_ctx: SpanContext) -> Self {
165        Self { span_ctx }
166    }
167
168    /// Get the span context for this execution context.
169    #[inline]
170    pub fn span_ctx(&self) -> &SpanContext {
171        &self.span_ctx
172    }
173}
174
175/// Result of polling a query, containing the value and its revision.
176///
177/// This is returned by [`QueryRuntime::poll`] and provides both the query result
178/// and its change revision, enabling efficient change detection for subscription
179/// patterns.
180///
181/// # Example
182///
183/// ```ignore
184/// let result = runtime.poll(MyQuery::new())?;
185///
186/// // Access the value via Deref
187/// println!("{:?}", *result);
188///
189/// // Check if changed since last poll
190/// if result.revision > last_known_revision {
191///     notify_subscribers(&result.value);
192///     last_known_revision = result.revision;
193/// }
194/// ```
195#[derive(Debug, Clone)]
196pub struct Polled<T> {
197    /// The query result value.
198    pub value: T,
199    /// The revision at which this value was last changed.
200    ///
201    /// Compare this with a previously stored revision to detect changes.
202    pub revision: RevisionCounter,
203}
204
205impl<T: Deref> Deref for Polled<T> {
206    type Target = T::Target;
207
208    fn deref(&self) -> &Self::Target {
209        &self.value
210    }
211}
212
213/// The query runtime manages query execution, caching, and dependency tracking.
214///
215/// This is cheap to clone - all data is behind `Arc`.
216///
217/// # Type Parameter
218///
219/// - `T: Tracer` - The tracer type for observability. Use `NoopTracer` (default)
220///   for zero-cost when tracing is not needed.
221///
222/// # Example
223///
224/// ```ignore
225/// // Without tracing (default)
226/// let runtime = QueryRuntime::new();
227///
228/// // With tracing
229/// let tracer = MyTracer::new();
230/// let runtime = QueryRuntime::with_tracer(tracer);
231///
232/// // Sync query execution
233/// let result = runtime.query(MyQuery { ... })?;
234/// ```
235pub struct QueryRuntime<T: Tracer = NoopTracer> {
236    /// Whale runtime for dependency tracking and cache storage.
237    /// Query outputs and asset values are stored in Node.data as Option<CachedEntry>.
238    whale: WhaleRuntime<FullCacheKey, Option<CachedEntry>, DURABILITY_LEVELS>,
239    /// Registered asset locators
240    locators: Arc<LocatorStorage<T>>,
241    /// Pending asset requests
242    pending: Arc<PendingStorage>,
243    /// Registry for tracking query instances (for list_queries)
244    query_registry: Arc<QueryRegistry>,
245    /// Registry for tracking asset keys (for list_asset_keys)
246    asset_key_registry: Arc<AssetKeyRegistry>,
247    /// Verifiers for re-executing queries (for verify-then-decide pattern)
248    verifiers: Arc<VerifierStorage>,
249    /// Comparator for user errors during early cutoff
250    error_comparator: ErrorComparator,
251    /// Tracer for observability
252    tracer: Arc<T>,
253}
254
255#[test]
256fn test_runtime_send_sync() {
257    fn assert_send_sync<T: Send + Sync>() {}
258    assert_send_sync::<QueryRuntime<NoopTracer>>();
259}
260
261impl Default for QueryRuntime<NoopTracer> {
262    fn default() -> Self {
263        Self::new()
264    }
265}
266
267impl<T: Tracer> Clone for QueryRuntime<T> {
268    fn clone(&self) -> Self {
269        Self {
270            whale: self.whale.clone(),
271            locators: self.locators.clone(),
272            pending: self.pending.clone(),
273            query_registry: self.query_registry.clone(),
274            asset_key_registry: self.asset_key_registry.clone(),
275            verifiers: self.verifiers.clone(),
276            error_comparator: self.error_comparator,
277            tracer: self.tracer.clone(),
278        }
279    }
280}
281
282/// Default error comparator that treats all errors as different.
283///
284/// This is conservative - it always triggers recomputation when an error occurs.
285fn default_error_comparator(_a: &anyhow::Error, _b: &anyhow::Error) -> bool {
286    false
287}
288
289impl<T: Tracer> QueryRuntime<T> {
290    /// Get cached output along with its revision (single atomic access).
291    fn get_cached_with_revision<Q: Query>(
292        &self,
293        key: &FullCacheKey,
294    ) -> Option<(CachedValue<Arc<Q::Output>>, RevisionCounter)> {
295        let node = self.whale.get(key)?;
296        let revision = node.changed_at;
297        let entry = node.data.as_ref()?;
298        let cached = entry.to_cached_value::<Q::Output>()?;
299        Some((cached, revision))
300    }
301
302    /// Get a reference to the tracer.
303    #[inline]
304    pub fn tracer(&self) -> &T {
305        &self.tracer
306    }
307}
308
309impl QueryRuntime<NoopTracer> {
310    /// Create a new query runtime with default settings.
311    pub fn new() -> Self {
312        Self::with_tracer(NoopTracer)
313    }
314
315    /// Create a builder for customizing the runtime.
316    ///
317    /// # Example
318    ///
319    /// ```ignore
320    /// let runtime = QueryRuntime::builder()
321    ///     .error_comparator(|a, b| {
322    ///         // Custom error comparison logic
323    ///         match (a.downcast_ref::<MyError>(), b.downcast_ref::<MyError>()) {
324    ///             (Some(a), Some(b)) => a == b,
325    ///             _ => false,
326    ///         }
327    ///     })
328    ///     .build();
329    /// ```
330    pub fn builder() -> QueryRuntimeBuilder<NoopTracer> {
331        QueryRuntimeBuilder::new()
332    }
333}
334
335impl<T: Tracer> QueryRuntime<T> {
336    /// Create a new query runtime with the specified tracer.
337    pub fn with_tracer(tracer: T) -> Self {
338        QueryRuntimeBuilder::new().tracer(tracer).build()
339    }
340
341    /// Execute a query synchronously.
342    ///
343    /// Returns the cached result if valid, otherwise executes the query.
344    ///
345    /// # Errors
346    ///
347    /// - `QueryError::Suspend` - Query is waiting for async loading
348    /// - `QueryError::Cycle` - Dependency cycle detected
349    pub fn query<Q: Query>(&self, query: Q) -> Result<Arc<Q::Output>, QueryError> {
350        self.query_internal(query)
351            .and_then(|(inner_result, _)| inner_result.map_err(QueryError::UserError))
352    }
353
354    /// Internal implementation shared by query() and poll().
355    ///
356    /// Returns (result, revision) tuple where result is either Ok(output) or Err(user_error).
357    /// System errors (Suspend, Cycle, etc.) are returned as the outer Err.
358    #[allow(clippy::type_complexity)]
359    fn query_internal<Q: Query>(
360        &self,
361        query: Q,
362    ) -> Result<(Result<Arc<Q::Output>, Arc<anyhow::Error>>, RevisionCounter), QueryError> {
363        let query_cache_key = QueryCacheKey::new(query.clone());
364        let full_key: FullCacheKey = query_cache_key.clone().into();
365
366        // Create SpanContext with parent relationship from SPAN_STACK
367        let span_id = self.tracer.new_span_id();
368        let (trace_id, parent_span_id) = SPAN_STACK.with(|stack| match &*stack.borrow() {
369            SpanStack::Empty => (self.tracer.new_trace_id(), None),
370            SpanStack::Active(tid, spans) => (*tid, spans.last().copied()),
371        });
372        let span_ctx = SpanContext {
373            span_id,
374            trace_id,
375            parent_span_id,
376        };
377
378        // Push to span stack and create execution context
379        let _span_guard = SpanStackGuard::push(trace_id, span_id);
380        let exec_ctx = ExecutionContext::new(span_ctx);
381
382        self.tracer.on_query_start(&span_ctx, &query_cache_key);
383
384        // Check for cycles using thread-local stack
385        let cycle_detected = QUERY_STACK.with(|stack| {
386            let stack = stack.borrow();
387            stack.iter().any(|k| k == &full_key)
388        });
389
390        if cycle_detected {
391            let path = QUERY_STACK.with(|stack| {
392                let stack = stack.borrow();
393                let mut path: Vec<FullCacheKey> = stack.iter().cloned().collect();
394                path.push(full_key.clone());
395                path
396            });
397
398            self.tracer.on_cycle_detected(&path);
399            self.tracer
400                .on_query_end(&span_ctx, &query_cache_key, ExecutionResult::CycleDetected);
401
402            return Err(QueryError::Cycle { path });
403        }
404
405        // Check if cached and valid (with verify-then-decide pattern)
406        let current_rev = self.whale.current_revision();
407
408        // Fast path: already verified at current revision
409        if self.whale.is_verified_at(&full_key, &current_rev) {
410            // Single atomic access to get both cached value and revision
411            if let Some((cached, revision)) = self.get_cached_with_revision::<Q>(&full_key) {
412                self.tracer
413                    .on_cache_check(&span_ctx, &query_cache_key, true);
414                self.tracer
415                    .on_query_end(&span_ctx, &query_cache_key, ExecutionResult::CacheHit);
416
417                return match cached {
418                    CachedValue::Ok(output) => Ok((Ok(output), revision)),
419                    CachedValue::UserError(err) => Ok((Err(err), revision)),
420                };
421            }
422        }
423
424        // Check shallow validity (deps' changed_at ok) and try verify-then-decide
425        if self.whale.is_valid(&full_key) {
426            // Single atomic access to get both cached value and revision
427            if let Some((cached, revision)) = self.get_cached_with_revision::<Q>(&full_key) {
428                // Shallow valid but not verified - verify deps first
429                let mut deps_verified = true;
430                if let Some(deps) = self.whale.get_dependency_ids(&full_key) {
431                    for dep in deps {
432                        if let Some(verifier) = self.verifiers.get(&dep) {
433                            // Re-run query/asset to verify it (triggers recursive verification)
434                            // For assets, this re-accesses the asset which may re-run the locator
435                            if verifier.verify(self as &dyn std::any::Any).is_err() {
436                                deps_verified = false;
437                                break;
438                            }
439                        }
440                        // Note: deps without verifiers are assumed valid (they're verified
441                        // by the final is_valid check if their changed_at increased)
442                    }
443                }
444
445                // Re-check validity after deps are verified
446                if deps_verified && self.whale.is_valid(&full_key) {
447                    // Deps didn't change their changed_at, mark as verified and use cached
448                    self.whale.mark_verified(&full_key, &current_rev);
449
450                    self.tracer
451                        .on_cache_check(&span_ctx, &query_cache_key, true);
452                    self.tracer.on_query_end(
453                        &span_ctx,
454                        &query_cache_key,
455                        ExecutionResult::CacheHit,
456                    );
457
458                    return match cached {
459                        CachedValue::Ok(output) => Ok((Ok(output), revision)),
460                        CachedValue::UserError(err) => Ok((Err(err), revision)),
461                    };
462                }
463                // A dep's changed_at increased, fall through to execute
464            }
465        }
466
467        self.tracer
468            .on_cache_check(&span_ctx, &query_cache_key, false);
469
470        // Execute the query with cycle tracking
471        let _guard = StackGuard::push(full_key.clone());
472        let result = self.execute_query::<Q>(&query, &query_cache_key, &full_key, exec_ctx);
473        drop(_guard);
474
475        // Emit end event
476        let exec_result = match &result {
477            Ok((_, true, _)) => ExecutionResult::Changed,
478            Ok((_, false, _)) => ExecutionResult::Unchanged,
479            Err(QueryError::Suspend { .. }) => ExecutionResult::Suspended,
480            Err(QueryError::Cycle { .. }) => ExecutionResult::CycleDetected,
481            Err(e) => ExecutionResult::Error {
482                message: format!("{:?}", e),
483            },
484        };
485        self.tracer
486            .on_query_end(&span_ctx, &query_cache_key, exec_result);
487
488        result.map(|(inner_result, _, revision)| (inner_result, revision))
489    }
490
491    /// Execute a query, caching the result if appropriate.
492    ///
493    /// Returns (result, output_changed, revision) tuple.
494    /// - `result`: Ok(output) for success, Err(user_error) for user errors
495    /// - System errors (Suspend, Cycle, etc.) are returned as outer Err
496    #[allow(clippy::type_complexity)]
497    fn execute_query<Q: Query>(
498        &self,
499        query: &Q,
500        query_cache_key: &QueryCacheKey,
501        full_key: &FullCacheKey,
502        exec_ctx: ExecutionContext,
503    ) -> Result<
504        (
505            Result<Arc<Q::Output>, Arc<anyhow::Error>>,
506            bool,
507            RevisionCounter,
508        ),
509        QueryError,
510    > {
511        // Capture current global revision at query start for consistency checking
512        let start_revision = self.whale.current_revision().get(Durability::volatile());
513
514        // Create consistency tracker for this query execution
515        let tracker = Rc::new(ConsistencyTracker::new(start_revision));
516
517        // Set thread-local tracker for nested locator calls
518        let _tracker_guard = ConsistencyTrackerGuard::new(tracker);
519
520        // Create context for this query execution
521        let ctx = QueryContext {
522            runtime: self,
523            current_key: full_key.clone(),
524            exec_ctx,
525            deps: RefCell::new(Vec::new()),
526        };
527
528        // Execute the query (clone because query() takes ownership)
529        let db = DbDispatch::QueryContext(&ctx);
530        let result = query.clone().query(&db);
531
532        // Get collected dependencies
533        let deps: Vec<FullCacheKey> = ctx.deps.borrow().clone();
534
535        // Query durability defaults to stable - Whale will automatically reduce
536        // the effective durability to min(requested, min(dep_durabilities)).
537        // A pure query with no dependencies remains stable.
538        // A query depending on volatile assets becomes volatile.
539        let durability = Durability::stable();
540
541        match result {
542            Ok(output) => {
543                // Check if output changed (for early cutoff)
544                // existing_revision is Some only when output is unchanged (can reuse revision)
545                let existing_revision = if let Some((CachedValue::Ok(old), rev)) =
546                    self.get_cached_with_revision::<Q>(full_key)
547                {
548                    if Q::output_eq(&*old, &*output) {
549                        Some(rev) // Same output - reuse revision
550                    } else {
551                        None // Different output
552                    }
553                } else {
554                    None // No previous Ok value
555                };
556                let output_changed = existing_revision.is_none();
557
558                // Emit early cutoff check event
559                self.tracer.on_early_cutoff_check(
560                    exec_ctx.span_ctx(),
561                    query_cache_key,
562                    output_changed,
563                );
564
565                // Update whale with cached entry (atomic update of value + dependency state)
566                let entry = CachedEntry::Ok(output.clone() as Arc<dyn std::any::Any + Send + Sync>);
567                let revision = if let Some(existing_rev) = existing_revision {
568                    // confirm_unchanged doesn't change changed_at, use existing
569                    let _ = self.whale.confirm_unchanged(full_key, deps);
570                    existing_rev
571                } else {
572                    // Use new_rev from register result
573                    match self
574                        .whale
575                        .register(full_key.clone(), Some(entry), durability, deps)
576                    {
577                        Ok(result) => result.new_rev,
578                        Err(missing) => {
579                            return Err(QueryError::DependenciesRemoved {
580                                missing_keys: missing,
581                            })
582                        }
583                    }
584                };
585
586                // Register query in registry for list_queries
587                let is_new_query = self.query_registry.register(query);
588                if is_new_query {
589                    let sentinel = QuerySetSentinelKey::new::<Q>().into();
590                    let _ = self
591                        .whale
592                        .register(sentinel, None, Durability::stable(), vec![]);
593                }
594
595                // Store verifier for this query (for verify-then-decide pattern)
596                self.verifiers
597                    .insert::<Q, T>(full_key.clone(), query.clone());
598
599                Ok((Ok(output), output_changed, revision))
600            }
601            Err(QueryError::UserError(err)) => {
602                // Check if error changed (for early cutoff)
603                // existing_revision is Some only when error is unchanged (can reuse revision)
604                let existing_revision = if let Some((CachedValue::UserError(old_err), rev)) =
605                    self.get_cached_with_revision::<Q>(full_key)
606                {
607                    if (self.error_comparator)(old_err.as_ref(), err.as_ref()) {
608                        Some(rev) // Same error - reuse revision
609                    } else {
610                        None // Different error
611                    }
612                } else {
613                    None // No previous UserError
614                };
615                let output_changed = existing_revision.is_none();
616
617                // Emit early cutoff check event
618                self.tracer.on_early_cutoff_check(
619                    exec_ctx.span_ctx(),
620                    query_cache_key,
621                    output_changed,
622                );
623
624                // Update whale with cached error (atomic update of value + dependency state)
625                let entry = CachedEntry::UserError(err.clone());
626                let revision = if let Some(existing_rev) = existing_revision {
627                    // confirm_unchanged doesn't change changed_at, use existing
628                    let _ = self.whale.confirm_unchanged(full_key, deps);
629                    existing_rev
630                } else {
631                    // Use new_rev from register result
632                    match self
633                        .whale
634                        .register(full_key.clone(), Some(entry), durability, deps)
635                    {
636                        Ok(result) => result.new_rev,
637                        Err(missing) => {
638                            return Err(QueryError::DependenciesRemoved {
639                                missing_keys: missing,
640                            })
641                        }
642                    }
643                };
644
645                // Register query in registry for list_queries
646                let is_new_query = self.query_registry.register(query);
647                if is_new_query {
648                    let sentinel = QuerySetSentinelKey::new::<Q>().into();
649                    let _ = self
650                        .whale
651                        .register(sentinel, None, Durability::stable(), vec![]);
652                }
653
654                // Store verifier for this query (for verify-then-decide pattern)
655                self.verifiers
656                    .insert::<Q, T>(full_key.clone(), query.clone());
657
658                Ok((Err(err), output_changed, revision))
659            }
660            Err(e) => {
661                // System errors (Suspend, Cycle, Cancelled) are not cached
662                Err(e)
663            }
664        }
665    }
666
667    /// Invalidate a query, forcing recomputation on next access.
668    ///
669    /// This also invalidates any queries that depend on this one.
670    pub fn invalidate<Q: Query>(&self, query: &Q) {
671        let query_cache_key = QueryCacheKey::new(query.clone());
672        let full_key: FullCacheKey = query_cache_key.clone().into();
673
674        self.tracer
675            .on_query_invalidated(&query_cache_key, InvalidationReason::ManualInvalidation);
676
677        // Update whale to invalidate dependents (register with None to clear cached value)
678        // Use stable durability to increment all revision counters, ensuring queries
679        // at any durability level will see this as a change.
680        let _ = self
681            .whale
682            .register(full_key, None, Durability::stable(), vec![]);
683    }
684
685    /// Remove a query from the cache entirely, freeing memory.
686    ///
687    /// Use this for GC when a query is no longer needed.
688    /// Unlike `invalidate`, this removes all traces of the query from storage.
689    /// The query will be recomputed from scratch on next access.
690    ///
691    /// This also invalidates any queries that depend on this one.
692    pub fn remove_query<Q: Query>(&self, query: &Q) {
693        let query_cache_key = QueryCacheKey::new(query.clone());
694        let full_key: FullCacheKey = query_cache_key.clone().into();
695
696        self.tracer
697            .on_query_invalidated(&query_cache_key, InvalidationReason::ManualInvalidation);
698
699        // Remove verifier if exists
700        self.verifiers.remove(&full_key);
701
702        // Remove from whale storage (this also handles dependent invalidation)
703        self.whale.remove(&full_key);
704
705        // Remove from registry and update sentinel for list_queries
706        if self.query_registry.remove::<Q>(query) {
707            let sentinel = QuerySetSentinelKey::new::<Q>().into();
708            let _ = self
709                .whale
710                .register(sentinel, None, Durability::stable(), vec![]);
711        }
712    }
713
714    /// Clear all cached values by removing all nodes from whale.
715    ///
716    /// Note: This is a relatively expensive operation as it iterates through all keys.
717    pub fn clear_cache(&self) {
718        let keys = self.whale.keys();
719        for key in keys {
720            self.whale.remove(&key);
721        }
722    }
723
724    /// Poll a query, returning both the result and its change revision.
725    ///
726    /// This is useful for implementing subscription patterns where you need to
727    /// detect changes efficiently. Compare the returned `revision` with a
728    /// previously stored value to determine if the query result has changed.
729    ///
730    /// The returned `Polled` contains a `Result<Arc<Q::Output>, Arc<anyhow::Error>>`
731    /// as its value, allowing you to track revision changes for both success and
732    /// user error cases.
733    ///
734    /// # Example
735    ///
736    /// ```ignore
737    /// struct Subscription<Q: Query> {
738    ///     query: Q,
739    ///     last_revision: RevisionCounter,
740    ///     tx: Sender<Result<Arc<Q::Output>, Arc<anyhow::Error>>>,
741    /// }
742    ///
743    /// // Polling loop
744    /// for sub in &mut subscriptions {
745    ///     let result = runtime.poll(sub.query.clone())?;
746    ///     if result.revision > sub.last_revision {
747    ///         sub.tx.send(result.value.clone())?;
748    ///         sub.last_revision = result.revision;
749    ///     }
750    /// }
751    /// ```
752    ///
753    /// # Errors
754    ///
755    /// Returns `Err` only for system errors (Suspend, Cycle, etc.).
756    /// User errors are returned as `Ok(Polled { value: Err(error), ... })`.
757    #[allow(clippy::type_complexity)]
758    pub fn poll<Q: Query>(
759        &self,
760        query: Q,
761    ) -> Result<Polled<Result<Arc<Q::Output>, Arc<anyhow::Error>>>, QueryError> {
762        let (value, revision) = self.query_internal(query)?;
763        Ok(Polled { value, revision })
764    }
765
766    /// Get the change revision of a query without executing it.
767    ///
768    /// Returns `None` if the query has never been executed.
769    ///
770    /// This is useful for checking if a query has changed since the last poll
771    /// without the cost of executing the query.
772    ///
773    /// # Example
774    ///
775    /// ```ignore
776    /// // Check if query has changed before deciding to poll
777    /// if let Some(rev) = runtime.changed_at(&MyQuery::new(key)) {
778    ///     if rev > last_known_revision {
779    ///         let result = runtime.query(MyQuery::new(key))?;
780    ///         // Process result...
781    ///     }
782    /// }
783    /// ```
784    pub fn changed_at<Q: Query>(&self, query: &Q) -> Option<RevisionCounter> {
785        let full_key = QueryCacheKey::new(query.clone()).into();
786        self.whale.get(&full_key).map(|node| node.changed_at)
787    }
788}
789
790// ============================================================================
791// GC (Garbage Collection) API
792// ============================================================================
793
794impl<T: Tracer> QueryRuntime<T> {
795    /// Get all query keys currently in the cache.
796    ///
797    /// This is useful for implementing custom garbage collection strategies.
798    /// Use this in combination with [`Tracer::on_query_key`] to track access
799    /// times and implement LRU, TTL, or other GC algorithms externally.
800    ///
801    /// # Example
802    ///
803    /// ```ignore
804    /// // Collect all keys that haven't been accessed recently
805    /// let stale_keys: Vec<_> = runtime.query_keys()
806    ///     .filter(|key| tracker.is_stale(key))
807    ///     .collect();
808    ///
809    /// // Remove stale queries that have no dependents
810    /// for key in stale_keys {
811    ///     runtime.remove_if_unused(&key);
812    /// }
813    /// ```
814    pub fn query_keys(&self) -> Vec<FullCacheKey> {
815        self.whale.keys()
816    }
817
818    /// Remove a query if it has no dependents.
819    ///
820    /// Returns `true` if the query was removed, `false` if it has dependents
821    /// or doesn't exist. This is the safe way to remove queries during GC,
822    /// as it won't break queries that depend on this one.
823    ///
824    /// # Example
825    ///
826    /// ```ignore
827    /// let query = MyQuery::new(cache_key);
828    /// if runtime.remove_query_if_unused(&query) {
829    ///     println!("Query removed");
830    /// } else {
831    ///     println!("Query has dependents, not removed");
832    /// }
833    /// ```
834    pub fn remove_query_if_unused<Q: Query>(&self, query: &Q) -> bool {
835        let full_key = QueryCacheKey::new(query.clone()).into();
836        self.remove_if_unused(&full_key)
837    }
838
839    /// Remove a query by its [`FullCacheKey`].
840    ///
841    /// This is the type-erased version of [`remove_query`](Self::remove_query).
842    /// Use this when you have a `FullCacheKey` from [`query_keys`](Self::query_keys)
843    /// or [`Tracer::on_query_key`].
844    ///
845    /// Returns `true` if the query was removed, `false` if it doesn't exist.
846    ///
847    /// # Warning
848    ///
849    /// This forcibly removes the query even if other queries depend on it.
850    /// Dependent queries will be recomputed on next access. For safe GC,
851    /// use [`remove_if_unused`](Self::remove_if_unused) instead.
852    pub fn remove(&self, key: &FullCacheKey) -> bool {
853        // Remove verifier if exists
854        self.verifiers.remove(key);
855
856        // Remove from whale storage
857        self.whale.remove(key).is_some()
858    }
859
860    /// Remove a query by its [`FullCacheKey`] if it has no dependents.
861    ///
862    /// This is the type-erased version of [`remove_query_if_unused`](Self::remove_query_if_unused).
863    /// Use this when you have a `FullCacheKey` from [`query_keys`](Self::query_keys)
864    /// or [`Tracer::on_query_key`].
865    ///
866    /// Returns `true` if the query was removed, `false` if it has dependents
867    /// or doesn't exist.
868    ///
869    /// # Example
870    ///
871    /// ```ignore
872    /// // Implement LRU GC
873    /// for key in runtime.query_keys() {
874    ///     if tracker.is_expired(&key) {
875    ///         runtime.remove_if_unused(&key);
876    ///     }
877    /// }
878    /// ```
879    pub fn remove_if_unused(&self, key: &FullCacheKey) -> bool {
880        if self.whale.remove_if_unused(key.clone()).is_some() {
881            // Successfully removed - clean up verifier
882            self.verifiers.remove(key);
883            true
884        } else {
885            false
886        }
887    }
888}
889
890// ============================================================================
891// Builder
892// ============================================================================
893
894/// Builder for [`QueryRuntime`] with customizable settings.
895///
896/// # Example
897///
898/// ```ignore
899/// let runtime = QueryRuntime::builder()
900///     .error_comparator(|a, b| {
901///         // Treat all errors of the same type as equal
902///         a.downcast_ref::<std::io::Error>().is_some()
903///             == b.downcast_ref::<std::io::Error>().is_some()
904///     })
905///     .build();
906/// ```
907pub struct QueryRuntimeBuilder<T: Tracer = NoopTracer> {
908    error_comparator: ErrorComparator,
909    tracer: T,
910}
911
912impl Default for QueryRuntimeBuilder<NoopTracer> {
913    fn default() -> Self {
914        Self::new()
915    }
916}
917
918impl QueryRuntimeBuilder<NoopTracer> {
919    /// Create a new builder with default settings.
920    pub fn new() -> Self {
921        Self {
922            error_comparator: default_error_comparator,
923            tracer: NoopTracer,
924        }
925    }
926}
927
928impl<T: Tracer> QueryRuntimeBuilder<T> {
929    /// Set the error comparator function for early cutoff optimization.
930    ///
931    /// When a query returns `QueryError::UserError`, this function is used
932    /// to compare it with the previously cached error. If they are equal,
933    /// downstream queries can skip recomputation (early cutoff).
934    ///
935    /// The default comparator returns `false` for all errors, meaning errors
936    /// are always considered different (conservative, always recomputes).
937    ///
938    /// # Example
939    ///
940    /// ```ignore
941    /// // Treat errors as equal if they have the same display message
942    /// let runtime = QueryRuntime::builder()
943    ///     .error_comparator(|a, b| a.to_string() == b.to_string())
944    ///     .build();
945    /// ```
946    pub fn error_comparator(mut self, f: ErrorComparator) -> Self {
947        self.error_comparator = f;
948        self
949    }
950
951    /// Set the tracer for observability.
952    pub fn tracer<U: Tracer>(self, tracer: U) -> QueryRuntimeBuilder<U> {
953        QueryRuntimeBuilder {
954            error_comparator: self.error_comparator,
955            tracer,
956        }
957    }
958
959    /// Build the runtime with the configured settings.
960    pub fn build(self) -> QueryRuntime<T> {
961        QueryRuntime {
962            whale: WhaleRuntime::new(),
963            locators: Arc::new(LocatorStorage::new()),
964            pending: Arc::new(PendingStorage::new()),
965            query_registry: Arc::new(QueryRegistry::new()),
966            asset_key_registry: Arc::new(AssetKeyRegistry::new()),
967            verifiers: Arc::new(VerifierStorage::new()),
968            error_comparator: self.error_comparator,
969            tracer: Arc::new(self.tracer),
970        }
971    }
972}
973
974// ============================================================================
975// Asset API
976// ============================================================================
977
978impl<T: Tracer> QueryRuntime<T> {
979    /// Register an asset locator for a specific asset key type.
980    ///
981    /// Only one locator can be registered per key type. Later registrations
982    /// replace earlier ones.
983    ///
984    /// # Example
985    ///
986    /// ```ignore
987    /// let runtime = QueryRuntime::new();
988    /// runtime.register_asset_locator(FileSystemLocator::new("/assets"));
989    /// ```
990    pub fn register_asset_locator<K, L>(&self, locator: L)
991    where
992        K: AssetKey,
993        L: AssetLocator<K>,
994    {
995        self.locators.insert::<K, L>(locator);
996    }
997
998    /// Get an iterator over pending asset requests.
999    ///
1000    /// Returns assets that have been requested but not yet resolved.
1001    /// The user should fetch these externally and call `resolve_asset()`.
1002    ///
1003    /// # Example
1004    ///
1005    /// ```ignore
1006    /// for pending in runtime.pending_assets() {
1007    ///     if let Some(path) = pending.key::<FilePath>() {
1008    ///         let content = fetch_file(path);
1009    ///         runtime.resolve_asset(path.clone(), content);
1010    ///     }
1011    /// }
1012    /// ```
1013    pub fn pending_assets(&self) -> Vec<PendingAsset> {
1014        self.pending.get_all()
1015    }
1016
1017    /// Get pending assets filtered by key type.
1018    pub fn pending_assets_of<K: AssetKey>(&self) -> Vec<K> {
1019        self.pending.get_of_type::<K>()
1020    }
1021
1022    /// Check if there are any pending assets.
1023    pub fn has_pending_assets(&self) -> bool {
1024        !self.pending.is_empty()
1025    }
1026
1027    /// Resolve an asset with its loaded value.
1028    ///
1029    /// This marks the asset as ready and invalidates any queries that
1030    /// depend on it (if the value changed), triggering recomputation on next access.
1031    ///
1032    /// This method is idempotent - resolving with the same value (via `asset_eq`)
1033    /// will not trigger downstream recomputation.
1034    ///
1035    /// # Arguments
1036    ///
1037    /// * `key` - The asset key identifying this resource
1038    /// * `value` - The loaded asset value
1039    /// * `durability` - How frequently this asset is expected to change
1040    ///
1041    /// # Example
1042    ///
1043    /// ```ignore
1044    /// let content = std::fs::read_to_string(&path)?;
1045    /// runtime.resolve_asset(FilePath(path), content, DurabilityLevel::Volatile);
1046    /// ```
1047    pub fn resolve_asset<K: AssetKey>(&self, key: K, value: K::Asset, durability: DurabilityLevel) {
1048        self.resolve_asset_internal(key, value, durability);
1049    }
1050
1051    /// Resolve an asset with an error.
1052    ///
1053    /// This marks the asset as errored and caches the error. Queries depending
1054    /// on this asset will receive `Err(QueryError::UserError(...))`.
1055    ///
1056    /// Use this when async loading fails (e.g., network error, file not found,
1057    /// access denied).
1058    ///
1059    /// # Arguments
1060    ///
1061    /// * `key` - The asset key identifying this resource
1062    /// * `error` - The error to cache (will be wrapped in `Arc`)
1063    /// * `durability` - How frequently this error state is expected to change
1064    ///
1065    /// # Example
1066    ///
1067    /// ```ignore
1068    /// match fetch_file(&path) {
1069    ///     Ok(content) => runtime.resolve_asset(FilePath(path), content, DurabilityLevel::Volatile),
1070    ///     Err(e) => runtime.resolve_asset_error(FilePath(path), e, DurabilityLevel::Volatile),
1071    /// }
1072    /// ```
1073    pub fn resolve_asset_error<K: AssetKey>(
1074        &self,
1075        key: K,
1076        error: impl Into<anyhow::Error>,
1077        durability: DurabilityLevel,
1078    ) {
1079        let asset_cache_key = AssetCacheKey::new(key.clone());
1080
1081        // Remove from pending BEFORE registering the error
1082        self.pending.remove(&asset_cache_key);
1083
1084        // Prepare the error entry
1085        let error_arc = Arc::new(error.into());
1086        let entry = CachedEntry::AssetError(error_arc.clone());
1087        let durability =
1088            Durability::new(durability.as_u8() as usize).unwrap_or(Durability::volatile());
1089
1090        // Atomic compare-and-update (errors are always considered changed for now)
1091        let result = self
1092            .whale
1093            .update_with_compare(
1094                asset_cache_key.into(),
1095                Some(entry),
1096                |old_data, _new_data| {
1097                    // Compare old and new errors using error_comparator
1098                    match old_data.and_then(|d| d.as_ref()) {
1099                        Some(CachedEntry::AssetError(old_err)) => {
1100                            !(self.error_comparator)(old_err.as_ref(), error_arc.as_ref())
1101                        }
1102                        _ => true, // Loading, Ready, or not present -> changed
1103                    }
1104                },
1105                durability,
1106                vec![],
1107            )
1108            .expect("update_with_compare with no dependencies cannot fail");
1109
1110        // Emit asset resolved event (with changed status)
1111        let asset_cache_key = AssetCacheKey::new(key.clone());
1112        self.tracer
1113            .on_asset_resolved(&asset_cache_key, result.changed);
1114
1115        // Register asset key in registry for list_asset_keys
1116        let is_new_asset = self.asset_key_registry.register(&key);
1117        if is_new_asset {
1118            // Update sentinel to invalidate list_asset_keys dependents
1119            let sentinel = AssetKeySetSentinelKey::new::<K>().into();
1120            let _ = self
1121                .whale
1122                .register(sentinel, None, Durability::stable(), vec![]);
1123        }
1124    }
1125
1126    fn resolve_asset_internal<K: AssetKey>(
1127        &self,
1128        key: K,
1129        value: K::Asset,
1130        durability_level: DurabilityLevel,
1131    ) {
1132        let asset_cache_key = AssetCacheKey::new(key.clone());
1133
1134        // Remove from pending BEFORE registering the value
1135        self.pending.remove(&asset_cache_key);
1136
1137        // Prepare the new entry
1138        let value_arc: Arc<K::Asset> = Arc::new(value);
1139        let entry = CachedEntry::AssetReady(value_arc.clone() as Arc<dyn Any + Send + Sync>);
1140        let durability =
1141            Durability::new(durability_level.as_u8() as usize).unwrap_or(Durability::volatile());
1142
1143        // Atomic compare-and-update
1144        let result = self
1145            .whale
1146            .update_with_compare(
1147                asset_cache_key.into(),
1148                Some(entry),
1149                |old_data, _new_data| {
1150                    // Compare old and new values
1151                    match old_data.and_then(|d| d.as_ref()) {
1152                        Some(CachedEntry::AssetReady(old_arc)) => {
1153                            match old_arc.clone().downcast::<K::Asset>() {
1154                                Ok(old_value) => !K::asset_eq(&old_value, &value_arc),
1155                                Err(_) => true, // Type mismatch, treat as changed
1156                            }
1157                        }
1158                        _ => true, // Loading, NotFound, or not present -> changed
1159                    }
1160                },
1161                durability,
1162                vec![],
1163            )
1164            .expect("update_with_compare with no dependencies cannot fail");
1165
1166        // Emit asset resolved event
1167        let asset_cache_key = AssetCacheKey::new(key.clone());
1168        self.tracer
1169            .on_asset_resolved(&asset_cache_key, result.changed);
1170
1171        // Register asset key in registry for list_asset_keys
1172        let is_new_asset = self.asset_key_registry.register(&key);
1173        if is_new_asset {
1174            // Update sentinel to invalidate list_asset_keys dependents
1175            let sentinel = AssetKeySetSentinelKey::new::<K>().into();
1176            let _ = self
1177                .whale
1178                .register(sentinel, None, Durability::stable(), vec![]);
1179        }
1180    }
1181
1182    /// Invalidate an asset, forcing queries to re-request it.
1183    ///
1184    /// The asset will be marked as loading and added to pending assets.
1185    /// Dependent queries will suspend until the asset is resolved again.
1186    ///
1187    /// # Example
1188    ///
1189    /// ```ignore
1190    /// // File was modified externally
1191    /// runtime.invalidate_asset(&FilePath("config.json".into()));
1192    /// // Queries depending on this asset will now suspend
1193    /// // User should fetch the new value and call resolve_asset
1194    /// ```
1195    pub fn invalidate_asset<K: AssetKey>(&self, key: &K) {
1196        let asset_cache_key = AssetCacheKey::new(key.clone());
1197        let full_cache_key: FullCacheKey = asset_cache_key.clone().into();
1198
1199        // Emit asset invalidated event
1200        self.tracer.on_asset_invalidated(&asset_cache_key);
1201
1202        // Add to pending FIRST (before clearing whale state)
1203        // This ensures: readers see either old value, or Loading+pending
1204        self.pending
1205            .insert::<K>(asset_cache_key.clone(), key.clone());
1206
1207        // Atomic: clear cached value + invalidate dependents
1208        // Using None for data means "needs to be loaded"
1209        // Use stable durability to ensure queries at any durability level see the change.
1210        let _ = self
1211            .whale
1212            .register(full_cache_key, None, Durability::stable(), vec![]);
1213    }
1214
1215    /// Remove an asset from the cache entirely.
1216    ///
1217    /// Unlike `invalidate_asset`, this removes all traces of the asset.
1218    /// Dependent queries will go through the locator again on next access.
1219    pub fn remove_asset<K: AssetKey>(&self, key: &K) {
1220        let asset_cache_key = AssetCacheKey::new(key.clone());
1221        let full_cache_key: FullCacheKey = asset_cache_key.clone().into();
1222
1223        // Remove from pending first
1224        self.pending.remove(&asset_cache_key);
1225
1226        // Remove from whale (this also cleans up dependency edges)
1227        // whale.remove() invalidates dependents before removing
1228        self.whale.remove(&full_cache_key);
1229
1230        // Remove from registry and update sentinel for list_asset_keys
1231        if self.asset_key_registry.remove::<K>(key) {
1232            let sentinel = AssetKeySetSentinelKey::new::<K>().into();
1233            let _ = self
1234                .whale
1235                .register(sentinel, None, Durability::stable(), vec![]);
1236        }
1237    }
1238
1239    /// Get an asset by key without tracking dependencies.
1240    ///
1241    /// Unlike `QueryContext::asset()`, this method does NOT register the caller
1242    /// as a dependent of the asset. Use this for direct asset access outside
1243    /// of query execution.
1244    ///
1245    /// # Returns
1246    ///
1247    /// - `Ok(AssetLoadingState::ready(...))` - Asset is loaded and ready
1248    /// - `Ok(AssetLoadingState::loading(...))` - Asset is still loading (added to pending)
1249    /// - `Err(QueryError::UserError)` - Asset was not found or locator returned an error
1250    pub fn get_asset<K: AssetKey>(&self, key: K) -> Result<AssetLoadingState<K>, QueryError> {
1251        self.get_asset_internal(key)
1252    }
1253
1254    /// Internal: Get asset state and its changed_at revision atomically.
1255    ///
1256    /// This is the "direct" version called from QueryRuntime::asset (no dependency tracking).
1257    /// For calls from QueryContext::asset, use `get_asset_with_revision_ctx`.
1258    ///
1259    /// Returns (AssetLoadingState, changed_at) where changed_at is from the same
1260    /// whale node that provided the asset value.
1261    fn get_asset_with_revision<K: AssetKey>(
1262        &self,
1263        key: K,
1264    ) -> Result<(AssetLoadingState<K>, RevisionCounter), QueryError> {
1265        let asset_cache_key = AssetCacheKey::new(key.clone());
1266        let full_cache_key: FullCacheKey = asset_cache_key.clone().into();
1267
1268        // Create a span for this asset request (like queries do)
1269        // This ensures child queries called from locators show as children of this asset
1270        let asset_span_id = self.tracer.new_span_id();
1271        let (trace_id, parent_span_id) = SPAN_STACK.with(|stack| match &*stack.borrow() {
1272            SpanStack::Empty => (self.tracer.new_trace_id(), None),
1273            SpanStack::Active(tid, spans) => (*tid, spans.last().copied()),
1274        });
1275        let span_ctx = SpanContext {
1276            span_id: asset_span_id,
1277            trace_id,
1278            parent_span_id,
1279        };
1280
1281        // Push asset span to stack so child queries see this asset as their parent
1282        let _span_guard = SpanStackGuard::push(trace_id, asset_span_id);
1283
1284        // Check whale cache first (single atomic read)
1285        if let Some(node) = self.whale.get(&full_cache_key) {
1286            let changed_at = node.changed_at;
1287            // Check if valid at current revision (shallow check)
1288            if self.whale.is_valid(&full_cache_key) {
1289                // Verify dependencies recursively (like query path does)
1290                let mut deps_verified = true;
1291                if let Some(deps) = self.whale.get_dependency_ids(&full_cache_key) {
1292                    for dep in deps {
1293                        if let Some(verifier) = self.verifiers.get(&dep) {
1294                            // Re-run query/asset to verify it (triggers recursive verification)
1295                            if verifier.verify(self as &dyn std::any::Any).is_err() {
1296                                deps_verified = false;
1297                                break;
1298                            }
1299                        }
1300                    }
1301                }
1302
1303                // Re-check validity after deps are verified
1304                if deps_verified && self.whale.is_valid(&full_cache_key) {
1305                    // For cached entries, check consistency for leaf assets (no locator deps).
1306                    // This detects if resolve_asset/resolve_asset_error was called during query execution.
1307                    let has_locator_deps = self
1308                        .whale
1309                        .get_dependency_ids(&full_cache_key)
1310                        .is_some_and(|deps| !deps.is_empty());
1311
1312                    match &node.data {
1313                        Some(CachedEntry::AssetReady(arc)) => {
1314                            // Check consistency for cached leaf assets
1315                            if !has_locator_deps {
1316                                check_leaf_asset_consistency(changed_at)?;
1317                            }
1318                            // Cache hit: start + end immediately (no locator runs)
1319                            self.tracer.on_asset_requested(&span_ctx, &asset_cache_key);
1320                            self.tracer.on_asset_located(
1321                                &span_ctx,
1322                                &asset_cache_key,
1323                                TracerAssetState::Ready,
1324                            );
1325                            match arc.clone().downcast::<K::Asset>() {
1326                                Ok(value) => {
1327                                    return Ok((AssetLoadingState::ready(key, value), changed_at))
1328                                }
1329                                Err(_) => {
1330                                    unreachable!("Asset type mismatch: {:?}", key)
1331                                }
1332                            }
1333                        }
1334                        Some(CachedEntry::AssetError(err)) => {
1335                            // Check consistency for cached leaf errors
1336                            if !has_locator_deps {
1337                                check_leaf_asset_consistency(changed_at)?;
1338                            }
1339                            // Cache hit: start + end immediately (no locator runs)
1340                            self.tracer.on_asset_requested(&span_ctx, &asset_cache_key);
1341                            self.tracer.on_asset_located(
1342                                &span_ctx,
1343                                &asset_cache_key,
1344                                TracerAssetState::NotFound,
1345                            );
1346                            return Err(QueryError::UserError(err.clone()));
1347                        }
1348                        None => {
1349                            // Loading state - no value to be inconsistent with
1350                            // Cache hit: start + end immediately (no locator runs)
1351                            self.tracer.on_asset_requested(&span_ctx, &asset_cache_key);
1352                            self.tracer.on_asset_located(
1353                                &span_ctx,
1354                                &asset_cache_key,
1355                                TracerAssetState::Loading,
1356                            );
1357                            return Ok((AssetLoadingState::loading(key), changed_at));
1358                        }
1359                        _ => {
1360                            // Query-related entries (Ok, UserError) shouldn't be here
1361                            // Fall through to locator
1362                        }
1363                    }
1364                }
1365            }
1366        }
1367
1368        // Not in cache or invalid - try locator
1369        // Use LocatorContext to track deps on the asset itself
1370        check_cycle(&full_cache_key)?;
1371        let _guard = StackGuard::push(full_cache_key.clone());
1372
1373        // Notify tracer BEFORE locator runs (START event) so child queries appear as children
1374        self.tracer.on_asset_requested(&span_ctx, &asset_cache_key);
1375
1376        let locator_ctx = LocatorContext::new(self, full_cache_key.clone());
1377        let locator_result =
1378            self.locators
1379                .locate_with_locator_ctx(TypeId::of::<K>(), &locator_ctx, &key);
1380
1381        if let Some(result) = locator_result {
1382            // Get collected dependencies from the locator context
1383            let locator_deps = locator_ctx.into_deps();
1384            match result {
1385                Ok(ErasedLocateResult::Ready {
1386                    value: arc,
1387                    durability: durability_level,
1388                }) => {
1389                    // END event after locator completes
1390                    self.tracer.on_asset_located(
1391                        &span_ctx,
1392                        &asset_cache_key,
1393                        TracerAssetState::Ready,
1394                    );
1395
1396                    let typed_value: Arc<K::Asset> = match arc.downcast::<K::Asset>() {
1397                        Ok(v) => v,
1398                        Err(_) => {
1399                            unreachable!("Asset type mismatch: {:?}", key);
1400                        }
1401                    };
1402
1403                    // Store in whale atomically with early cutoff
1404                    // Include locator's dependencies so the asset is invalidated when they change
1405                    let entry = CachedEntry::AssetReady(typed_value.clone());
1406                    let durability = Durability::new(durability_level.as_u8() as usize)
1407                        .unwrap_or(Durability::volatile());
1408                    let new_value = typed_value.clone();
1409                    let result = self
1410                        .whale
1411                        .update_with_compare(
1412                            full_cache_key.clone(),
1413                            Some(entry),
1414                            |old_data, _new_data| {
1415                                let Some(CachedEntry::AssetReady(old_arc)) =
1416                                    old_data.and_then(|d| d.as_ref())
1417                                else {
1418                                    return true;
1419                                };
1420                                let Ok(old_value) = old_arc.clone().downcast::<K::Asset>() else {
1421                                    return true;
1422                                };
1423                                !K::asset_eq(&old_value, &new_value)
1424                            },
1425                            durability,
1426                            locator_deps,
1427                        )
1428                        .expect("update_with_compare should succeed");
1429
1430                    // Register verifier for this asset (for verify-then-decide pattern)
1431                    self.verifiers
1432                        .insert_asset::<K, T>(full_cache_key, key.clone());
1433
1434                    return Ok((AssetLoadingState::ready(key, typed_value), result.revision));
1435                }
1436                Ok(ErasedLocateResult::Pending) => {
1437                    // END event after locator completes with pending
1438                    self.tracer.on_asset_located(
1439                        &span_ctx,
1440                        &asset_cache_key,
1441                        TracerAssetState::Loading,
1442                    );
1443
1444                    // Add to pending list for Pending result
1445                    self.pending
1446                        .insert::<K>(asset_cache_key.clone(), key.clone());
1447                    match self
1448                        .whale
1449                        .get_or_insert(full_cache_key, None, Durability::volatile(), locator_deps)
1450                        .expect("get_or_insert should succeed")
1451                    {
1452                        GetOrInsertResult::Inserted(node) => {
1453                            return Ok((AssetLoadingState::loading(key), node.changed_at));
1454                        }
1455                        GetOrInsertResult::Existing(node) => {
1456                            let changed_at = node.changed_at;
1457                            match &node.data {
1458                                Some(CachedEntry::AssetReady(arc)) => {
1459                                    match arc.clone().downcast::<K::Asset>() {
1460                                        Ok(value) => {
1461                                            return Ok((
1462                                                AssetLoadingState::ready(key, value),
1463                                                changed_at,
1464                                            ))
1465                                        }
1466                                        Err(_) => {
1467                                            return Ok((
1468                                                AssetLoadingState::loading(key),
1469                                                changed_at,
1470                                            ))
1471                                        }
1472                                    }
1473                                }
1474                                Some(CachedEntry::AssetError(err)) => {
1475                                    return Err(QueryError::UserError(err.clone()));
1476                                }
1477                                _ => return Ok((AssetLoadingState::loading(key), changed_at)),
1478                            }
1479                        }
1480                    }
1481                }
1482                Err(QueryError::UserError(err)) => {
1483                    // END event after locator completes with error
1484                    self.tracer.on_asset_located(
1485                        &span_ctx,
1486                        &asset_cache_key,
1487                        TracerAssetState::NotFound,
1488                    );
1489                    // Locator returned a user error - cache it as AssetError
1490                    let entry = CachedEntry::AssetError(err.clone());
1491                    let _ = self.whale.register(
1492                        full_cache_key,
1493                        Some(entry),
1494                        Durability::volatile(),
1495                        locator_deps,
1496                    );
1497                    return Err(QueryError::UserError(err));
1498                }
1499                Err(e) => {
1500                    // Other errors (Cycle, Suspended, etc.) - do NOT cache, propagate directly
1501                    return Err(e);
1502                }
1503            }
1504        }
1505
1506        // No locator registered or locator returned None - mark as pending
1507        // (no locator was called, so no deps to track)
1508        // END event - no locator ran
1509        self.tracer
1510            .on_asset_located(&span_ctx, &asset_cache_key, TracerAssetState::Loading);
1511        self.pending
1512            .insert::<K>(asset_cache_key.clone(), key.clone());
1513
1514        match self
1515            .whale
1516            .get_or_insert(full_cache_key, None, Durability::volatile(), vec![])
1517            .expect("get_or_insert with no dependencies cannot fail")
1518        {
1519            GetOrInsertResult::Inserted(node) => {
1520                Ok((AssetLoadingState::loading(key), node.changed_at))
1521            }
1522            GetOrInsertResult::Existing(node) => {
1523                let changed_at = node.changed_at;
1524                match &node.data {
1525                    Some(CachedEntry::AssetReady(arc)) => {
1526                        match arc.clone().downcast::<K::Asset>() {
1527                            Ok(value) => Ok((AssetLoadingState::ready(key, value), changed_at)),
1528                            Err(_) => Ok((AssetLoadingState::loading(key), changed_at)),
1529                        }
1530                    }
1531                    Some(CachedEntry::AssetError(err)) => Err(QueryError::UserError(err.clone())),
1532                    _ => Ok((AssetLoadingState::loading(key), changed_at)),
1533                }
1534            }
1535        }
1536    }
1537
1538    /// Internal: Get asset state and its changed_at revision atomically (with QueryContext).
1539    ///
1540    /// This version is called from QueryContext::asset. Consistency checking for
1541    /// cached leaf assets is done inside this function before returning.
1542    fn get_asset_with_revision_ctx<K: AssetKey>(
1543        &self,
1544        key: K,
1545        _ctx: &QueryContext<'_, T>,
1546    ) -> Result<(AssetLoadingState<K>, RevisionCounter), QueryError> {
1547        let asset_cache_key = AssetCacheKey::new(key.clone());
1548        let full_cache_key: FullCacheKey = asset_cache_key.clone().into();
1549
1550        // Create a span for this asset request (like queries do)
1551        // This ensures child queries called from locators show as children of this asset
1552        let asset_span_id = self.tracer.new_span_id();
1553        let (trace_id, parent_span_id) = SPAN_STACK.with(|stack| match &*stack.borrow() {
1554            SpanStack::Empty => (self.tracer.new_trace_id(), None),
1555            SpanStack::Active(tid, spans) => (*tid, spans.last().copied()),
1556        });
1557        let span_ctx = SpanContext {
1558            span_id: asset_span_id,
1559            trace_id,
1560            parent_span_id,
1561        };
1562
1563        // Push asset span to stack so child queries see this asset as their parent
1564        let _span_guard = SpanStackGuard::push(trace_id, asset_span_id);
1565
1566        // Check whale cache first (single atomic read)
1567        if let Some(node) = self.whale.get(&full_cache_key) {
1568            let changed_at = node.changed_at;
1569            // Check if valid at current revision (shallow check)
1570            if self.whale.is_valid(&full_cache_key) {
1571                // Verify dependencies recursively (like query path does)
1572                let mut deps_verified = true;
1573                if let Some(deps) = self.whale.get_dependency_ids(&full_cache_key) {
1574                    for dep in deps {
1575                        if let Some(verifier) = self.verifiers.get(&dep) {
1576                            // Re-run query/asset to verify it (triggers recursive verification)
1577                            if verifier.verify(self as &dyn std::any::Any).is_err() {
1578                                deps_verified = false;
1579                                break;
1580                            }
1581                        }
1582                    }
1583                }
1584
1585                // Re-check validity after deps are verified
1586                if deps_verified && self.whale.is_valid(&full_cache_key) {
1587                    // For cached entries, check consistency for leaf assets (no locator deps).
1588                    // This detects if resolve_asset/resolve_asset_error was called during query execution.
1589                    let has_locator_deps = self
1590                        .whale
1591                        .get_dependency_ids(&full_cache_key)
1592                        .is_some_and(|deps| !deps.is_empty());
1593
1594                    match &node.data {
1595                        Some(CachedEntry::AssetReady(arc)) => {
1596                            // Check consistency for cached leaf assets
1597                            if !has_locator_deps {
1598                                check_leaf_asset_consistency(changed_at)?;
1599                            }
1600                            // Cache hit: start + end immediately (no locator runs)
1601                            self.tracer.on_asset_requested(&span_ctx, &asset_cache_key);
1602                            self.tracer.on_asset_located(
1603                                &span_ctx,
1604                                &asset_cache_key,
1605                                TracerAssetState::Ready,
1606                            );
1607                            match arc.clone().downcast::<K::Asset>() {
1608                                Ok(value) => {
1609                                    return Ok((AssetLoadingState::ready(key, value), changed_at))
1610                                }
1611                                Err(_) => {
1612                                    unreachable!("Asset type mismatch: {:?}", key)
1613                                }
1614                            }
1615                        }
1616                        Some(CachedEntry::AssetError(err)) => {
1617                            // Check consistency for cached leaf errors
1618                            if !has_locator_deps {
1619                                check_leaf_asset_consistency(changed_at)?;
1620                            }
1621                            // Cache hit: start + end immediately (no locator runs)
1622                            self.tracer.on_asset_requested(&span_ctx, &asset_cache_key);
1623                            self.tracer.on_asset_located(
1624                                &span_ctx,
1625                                &asset_cache_key,
1626                                TracerAssetState::NotFound,
1627                            );
1628                            return Err(QueryError::UserError(err.clone()));
1629                        }
1630                        None => {
1631                            // Loading state - no value to be inconsistent with
1632                            // Cache hit: start + end immediately (no locator runs)
1633                            self.tracer.on_asset_requested(&span_ctx, &asset_cache_key);
1634                            self.tracer.on_asset_located(
1635                                &span_ctx,
1636                                &asset_cache_key,
1637                                TracerAssetState::Loading,
1638                            );
1639                            return Ok((AssetLoadingState::loading(key), changed_at));
1640                        }
1641                        _ => {
1642                            // Query-related entries (Ok, UserError) shouldn't be here
1643                            // Fall through to locator
1644                        }
1645                    }
1646                }
1647            }
1648        }
1649
1650        // Not in cache or invalid - try locator
1651        // Use LocatorContext to track deps on the asset itself (not the calling query)
1652        // Consistency tracking is handled via thread-local storage
1653        check_cycle(&full_cache_key)?;
1654        let _guard = StackGuard::push(full_cache_key.clone());
1655
1656        // START event before locator runs
1657        self.tracer.on_asset_requested(&span_ctx, &asset_cache_key);
1658
1659        let locator_ctx = LocatorContext::new(self, full_cache_key.clone());
1660        let locator_result =
1661            self.locators
1662                .locate_with_locator_ctx(TypeId::of::<K>(), &locator_ctx, &key);
1663
1664        if let Some(result) = locator_result {
1665            // Get collected dependencies from the locator context
1666            let locator_deps = locator_ctx.into_deps();
1667            match result {
1668                Ok(ErasedLocateResult::Ready {
1669                    value: arc,
1670                    durability: durability_level,
1671                }) => {
1672                    // END event after locator completes
1673                    self.tracer.on_asset_located(
1674                        &span_ctx,
1675                        &asset_cache_key,
1676                        TracerAssetState::Ready,
1677                    );
1678
1679                    let typed_value: Arc<K::Asset> = match arc.downcast::<K::Asset>() {
1680                        Ok(v) => v,
1681                        Err(_) => {
1682                            unreachable!("Asset type mismatch: {:?}", key);
1683                        }
1684                    };
1685
1686                    // Store in whale atomically with early cutoff
1687                    // Include locator's dependencies so the asset is invalidated when they change
1688                    let entry = CachedEntry::AssetReady(typed_value.clone());
1689                    let durability = Durability::new(durability_level.as_u8() as usize)
1690                        .unwrap_or(Durability::volatile());
1691                    let new_value = typed_value.clone();
1692                    let result = self
1693                        .whale
1694                        .update_with_compare(
1695                            full_cache_key.clone(),
1696                            Some(entry),
1697                            |old_data, _new_data| {
1698                                let Some(CachedEntry::AssetReady(old_arc)) =
1699                                    old_data.and_then(|d| d.as_ref())
1700                                else {
1701                                    return true;
1702                                };
1703                                let Ok(old_value) = old_arc.clone().downcast::<K::Asset>() else {
1704                                    return true;
1705                                };
1706                                !K::asset_eq(&old_value, &new_value)
1707                            },
1708                            durability,
1709                            locator_deps,
1710                        )
1711                        .expect("update_with_compare should succeed");
1712
1713                    // Register verifier for this asset (for verify-then-decide pattern)
1714                    self.verifiers
1715                        .insert_asset::<K, T>(full_cache_key, key.clone());
1716
1717                    return Ok((AssetLoadingState::ready(key, typed_value), result.revision));
1718                }
1719                Ok(ErasedLocateResult::Pending) => {
1720                    // END event after locator completes with pending
1721                    self.tracer.on_asset_located(
1722                        &span_ctx,
1723                        &asset_cache_key,
1724                        TracerAssetState::Loading,
1725                    );
1726
1727                    // Add to pending list for Pending result
1728                    self.pending
1729                        .insert::<K>(asset_cache_key.clone(), key.clone());
1730                    match self
1731                        .whale
1732                        .get_or_insert(full_cache_key, None, Durability::volatile(), locator_deps)
1733                        .expect("get_or_insert should succeed")
1734                    {
1735                        GetOrInsertResult::Inserted(node) => {
1736                            return Ok((AssetLoadingState::loading(key), node.changed_at));
1737                        }
1738                        GetOrInsertResult::Existing(node) => {
1739                            let changed_at = node.changed_at;
1740                            match &node.data {
1741                                Some(CachedEntry::AssetReady(arc)) => {
1742                                    match arc.clone().downcast::<K::Asset>() {
1743                                        Ok(value) => {
1744                                            return Ok((
1745                                                AssetLoadingState::ready(key, value),
1746                                                changed_at,
1747                                            ));
1748                                        }
1749                                        Err(_) => {
1750                                            return Ok((
1751                                                AssetLoadingState::loading(key),
1752                                                changed_at,
1753                                            ))
1754                                        }
1755                                    }
1756                                }
1757                                Some(CachedEntry::AssetError(err)) => {
1758                                    return Err(QueryError::UserError(err.clone()));
1759                                }
1760                                _ => return Ok((AssetLoadingState::loading(key), changed_at)),
1761                            }
1762                        }
1763                    }
1764                }
1765                Err(QueryError::UserError(err)) => {
1766                    // END event after locator completes with error
1767                    self.tracer.on_asset_located(
1768                        &span_ctx,
1769                        &asset_cache_key,
1770                        TracerAssetState::NotFound,
1771                    );
1772                    // Locator returned a user error - cache it as AssetError
1773                    let entry = CachedEntry::AssetError(err.clone());
1774                    let _ = self.whale.register(
1775                        full_cache_key,
1776                        Some(entry),
1777                        Durability::volatile(),
1778                        locator_deps,
1779                    );
1780                    return Err(QueryError::UserError(err));
1781                }
1782                Err(e) => {
1783                    // Other errors (Cycle, Suspended, etc.) - do NOT cache, propagate directly
1784                    return Err(e);
1785                }
1786            }
1787        }
1788
1789        // No locator registered or locator returned None - mark as pending
1790        // END event - no locator ran
1791        self.tracer
1792            .on_asset_located(&span_ctx, &asset_cache_key, TracerAssetState::Loading);
1793        self.pending
1794            .insert::<K>(asset_cache_key.clone(), key.clone());
1795
1796        match self
1797            .whale
1798            .get_or_insert(full_cache_key, None, Durability::volatile(), vec![])
1799            .expect("get_or_insert with no dependencies cannot fail")
1800        {
1801            GetOrInsertResult::Inserted(node) => {
1802                Ok((AssetLoadingState::loading(key), node.changed_at))
1803            }
1804            GetOrInsertResult::Existing(node) => {
1805                let changed_at = node.changed_at;
1806                match &node.data {
1807                    Some(CachedEntry::AssetReady(arc)) => {
1808                        match arc.clone().downcast::<K::Asset>() {
1809                            Ok(value) => Ok((AssetLoadingState::ready(key, value), changed_at)),
1810                            Err(_) => Ok((AssetLoadingState::loading(key), changed_at)),
1811                        }
1812                    }
1813                    Some(CachedEntry::AssetError(err)) => Err(QueryError::UserError(err.clone())),
1814                    _ => Ok((AssetLoadingState::loading(key), changed_at)),
1815                }
1816            }
1817        }
1818    }
1819
1820    /// Internal: Get asset state, checking cache and locator.
1821    fn get_asset_internal<K: AssetKey>(&self, key: K) -> Result<AssetLoadingState<K>, QueryError> {
1822        self.get_asset_with_revision(key).map(|(state, _)| state)
1823    }
1824}
1825
1826impl<T: Tracer> Db for QueryRuntime<T> {
1827    fn query<Q: Query>(&self, query: Q) -> Result<Arc<Q::Output>, QueryError> {
1828        QueryRuntime::query(self, query)
1829    }
1830
1831    fn asset<K: AssetKey>(&self, key: K) -> Result<Arc<K::Asset>, QueryError> {
1832        self.get_asset_internal(key)?.suspend()
1833    }
1834
1835    fn asset_state<K: AssetKey>(&self, key: K) -> Result<AssetLoadingState<K>, QueryError> {
1836        self.get_asset_internal(key)
1837    }
1838
1839    fn list_queries<Q: Query>(&self) -> Vec<Q> {
1840        self.query_registry.get_all::<Q>()
1841    }
1842
1843    fn list_asset_keys<K: AssetKey>(&self) -> Vec<K> {
1844        self.asset_key_registry.get_all::<K>()
1845    }
1846}
1847
1848/// Tracks consistency of leaf asset accesses during query execution.
1849///
1850/// A "leaf" asset is one without dependencies (externally resolved via `resolve_asset`).
1851/// This tracker ensures that all leaf assets accessed during a query execution
1852/// (including those accessed by locators) are consistent - i.e., none were modified
1853/// via `resolve_asset` mid-execution.
1854///
1855/// The tracker is shared across `QueryContext` and `LocatorContext` to propagate
1856/// consistency checking through the entire execution tree.
1857#[derive(Debug)]
1858pub(crate) struct ConsistencyTracker {
1859    /// Global revision at query start. All leaf assets must have changed_at <= this.
1860    start_revision: RevisionCounter,
1861}
1862
1863impl ConsistencyTracker {
1864    /// Create a new tracker with the given start revision.
1865    pub fn new(start_revision: RevisionCounter) -> Self {
1866        Self { start_revision }
1867    }
1868
1869    /// Check consistency for a leaf asset access.
1870    ///
1871    /// A leaf asset is consistent if its changed_at <= start_revision.
1872    /// This detects if resolve_asset was called during query execution.
1873    ///
1874    /// Returns Ok(()) if consistent, Err if inconsistent.
1875    pub fn check_leaf_asset(&self, dep_changed_at: RevisionCounter) -> Result<(), QueryError> {
1876        if dep_changed_at > self.start_revision {
1877            Err(QueryError::InconsistentAssetResolution)
1878        } else {
1879            Ok(())
1880        }
1881    }
1882}
1883
1884/// Context provided to queries during execution.
1885///
1886/// Use this to access dependencies via `query()`.
1887pub(crate) struct QueryContext<'a, T: Tracer = NoopTracer> {
1888    runtime: &'a QueryRuntime<T>,
1889    current_key: FullCacheKey,
1890    exec_ctx: ExecutionContext,
1891    deps: RefCell<Vec<FullCacheKey>>,
1892}
1893
1894impl<'a, T: Tracer> QueryContext<'a, T> {
1895    /// Query a dependency.
1896    ///
1897    /// The dependency is automatically tracked for invalidation.
1898    ///
1899    /// # Example
1900    ///
1901    /// ```ignore
1902    /// fn query(self, db: &impl Db) -> Result<Self::Output, QueryError> {
1903    ///     let dep_result = db.query(OtherQuery { id: self.id })?;
1904    ///     Ok(process(&dep_result))
1905    /// }
1906    /// ```
1907    pub fn query<Q: Query>(&self, query: Q) -> Result<Arc<Q::Output>, QueryError> {
1908        let full_key: FullCacheKey = QueryCacheKey::new(query.clone()).into();
1909
1910        // Emit dependency registered event
1911        self.runtime.tracer.on_dependency_registered(
1912            self.exec_ctx.span_ctx(),
1913            &self.current_key,
1914            &full_key,
1915        );
1916
1917        // Record this as a dependency
1918        self.deps.borrow_mut().push(full_key);
1919
1920        // Execute the query
1921        self.runtime.query(query)
1922    }
1923
1924    /// Access an asset, tracking it as a dependency.
1925    ///
1926    /// Returns the asset value if ready, or `Err(QueryError::Suspend)` if still loading.
1927    /// Use this with the `?` operator for automatic suspension on loading.
1928    ///
1929    /// # Example
1930    ///
1931    /// ```ignore
1932    /// #[query]
1933    /// fn process_file(db: &impl Db, path: FilePath) -> Result<Output, QueryError> {
1934    ///     let content = db.asset(path)?;
1935    ///     // Process content...
1936    ///     Ok(output)
1937    /// }
1938    /// ```
1939    ///
1940    /// # Errors
1941    ///
1942    /// - Returns `Err(QueryError::Suspend)` if the asset is still loading.
1943    /// - Returns `Err(QueryError::UserError)` if the asset was not found.
1944    pub fn asset<K: AssetKey>(&self, key: K) -> Result<Arc<K::Asset>, QueryError> {
1945        self.asset_state(key)?.suspend()
1946    }
1947
1948    /// Access an asset's loading state, tracking it as a dependency.
1949    ///
1950    /// Unlike [`asset()`](Self::asset), this method returns the full loading state,
1951    /// allowing you to check if an asset is loading without triggering suspension.
1952    ///
1953    /// # Example
1954    ///
1955    /// ```ignore
1956    /// let state = db.asset_state(key)?;
1957    /// if state.is_loading() {
1958    ///     // Handle loading case explicitly
1959    /// } else {
1960    ///     let value = state.get().unwrap();
1961    /// }
1962    /// ```
1963    ///
1964    /// # Errors
1965    ///
1966    /// Returns `Err(QueryError::UserError)` if the asset was not found.
1967    pub fn asset_state<K: AssetKey>(&self, key: K) -> Result<AssetLoadingState<K>, QueryError> {
1968        let full_cache_key: FullCacheKey = AssetCacheKey::new(key.clone()).into();
1969
1970        // 1. Emit asset dependency registered event
1971        self.runtime.tracer.on_asset_dependency_registered(
1972            self.exec_ctx.span_ctx(),
1973            &self.current_key,
1974            &full_cache_key,
1975        );
1976
1977        // 2. Record dependency on this asset
1978        self.deps.borrow_mut().push(full_cache_key);
1979
1980        // 3. Get asset from cache/locator
1981        // Consistency checking for cached leaf assets is done inside get_asset_with_revision_ctx
1982        let (state, _changed_at) = self.runtime.get_asset_with_revision_ctx(key, self)?;
1983
1984        Ok(state)
1985    }
1986
1987    /// List all query instances of type Q that have been registered.
1988    ///
1989    /// This method establishes a dependency on the "set" of queries of type Q.
1990    /// The calling query will be invalidated when:
1991    /// - A new query of type Q is first executed (added to set)
1992    ///
1993    /// The calling query will NOT be invalidated when:
1994    /// - An individual query of type Q has its value change
1995    ///
1996    /// # Example
1997    ///
1998    /// ```ignore
1999    /// #[query]
2000    /// fn all_results(db: &impl Db) -> Result<Vec<i32>, QueryError> {
2001    ///     let queries = db.list_queries::<MyQuery>();
2002    ///     let mut results = Vec::new();
2003    ///     for q in queries {
2004    ///         results.push(*db.query(q)?);
2005    ///     }
2006    ///     Ok(results)
2007    /// }
2008    /// ```
2009    pub fn list_queries<Q: Query>(&self) -> Vec<Q> {
2010        // Record dependency on the sentinel (set-level dependency)
2011        let sentinel: FullCacheKey = QuerySetSentinelKey::new::<Q>().into();
2012
2013        self.runtime.tracer.on_dependency_registered(
2014            self.exec_ctx.span_ctx(),
2015            &self.current_key,
2016            &sentinel,
2017        );
2018
2019        // Ensure sentinel exists in whale (for dependency tracking)
2020        if self.runtime.whale.get(&sentinel).is_none() {
2021            let _ =
2022                self.runtime
2023                    .whale
2024                    .register(sentinel.clone(), None, Durability::volatile(), vec![]);
2025        }
2026
2027        self.deps.borrow_mut().push(sentinel);
2028
2029        // Return all registered queries
2030        self.runtime.query_registry.get_all::<Q>()
2031    }
2032
2033    /// List all asset keys of type K that have been registered.
2034    ///
2035    /// This method establishes a dependency on the "set" of asset keys of type K.
2036    /// The calling query will be invalidated when:
2037    /// - A new asset of type K is resolved for the first time (added to set)
2038    /// - An asset of type K is removed via remove_asset
2039    ///
2040    /// The calling query will NOT be invalidated when:
2041    /// - An individual asset's value changes (use `db.asset()` for that)
2042    ///
2043    /// # Example
2044    ///
2045    /// ```ignore
2046    /// #[query]
2047    /// fn all_configs(db: &impl Db) -> Result<Vec<String>, QueryError> {
2048    ///     let keys = db.list_asset_keys::<ConfigFile>();
2049    ///     let mut contents = Vec::new();
2050    ///     for key in keys {
2051    ///         let content = db.asset(key)?;
2052    ///         contents.push((*content).clone());
2053    ///     }
2054    ///     Ok(contents)
2055    /// }
2056    /// ```
2057    pub fn list_asset_keys<K: AssetKey>(&self) -> Vec<K> {
2058        // Record dependency on the sentinel (set-level dependency)
2059        let sentinel: FullCacheKey = AssetKeySetSentinelKey::new::<K>().into();
2060
2061        self.runtime.tracer.on_asset_dependency_registered(
2062            self.exec_ctx.span_ctx(),
2063            &self.current_key,
2064            &sentinel,
2065        );
2066
2067        // Ensure sentinel exists in whale (for dependency tracking)
2068        if self.runtime.whale.get(&sentinel).is_none() {
2069            let _ =
2070                self.runtime
2071                    .whale
2072                    .register(sentinel.clone(), None, Durability::volatile(), vec![]);
2073        }
2074
2075        self.deps.borrow_mut().push(sentinel);
2076
2077        // Return all registered asset keys
2078        self.runtime.asset_key_registry.get_all::<K>()
2079    }
2080}
2081
2082impl<'a, T: Tracer> Db for QueryContext<'a, T> {
2083    fn query<Q: Query>(&self, query: Q) -> Result<Arc<Q::Output>, QueryError> {
2084        QueryContext::query(self, query)
2085    }
2086
2087    fn asset<K: AssetKey>(&self, key: K) -> Result<Arc<K::Asset>, QueryError> {
2088        QueryContext::asset(self, key)
2089    }
2090
2091    fn asset_state<K: AssetKey>(&self, key: K) -> Result<AssetLoadingState<K>, QueryError> {
2092        QueryContext::asset_state(self, key)
2093    }
2094
2095    fn list_queries<Q: Query>(&self) -> Vec<Q> {
2096        QueryContext::list_queries(self)
2097    }
2098
2099    fn list_asset_keys<K: AssetKey>(&self) -> Vec<K> {
2100        QueryContext::list_asset_keys(self)
2101    }
2102}
2103
2104/// Context for collecting dependencies during asset locator execution.
2105///
2106/// Unlike `QueryContext`, this is specifically for locators and does not
2107/// register dependencies on any parent query. Dependencies collected here
2108/// are stored with the asset itself.
2109pub(crate) struct LocatorContext<'a, T: Tracer> {
2110    runtime: &'a QueryRuntime<T>,
2111    deps: RefCell<Vec<FullCacheKey>>,
2112}
2113
2114impl<'a, T: Tracer> LocatorContext<'a, T> {
2115    /// Create a new locator context for the given asset key.
2116    ///
2117    /// Consistency tracking is handled via thread-local storage, so leaf asset
2118    /// accesses will be checked against any active tracker from a parent query.
2119    pub(crate) fn new(runtime: &'a QueryRuntime<T>, _asset_key: FullCacheKey) -> Self {
2120        Self {
2121            runtime,
2122            deps: RefCell::new(Vec::new()),
2123        }
2124    }
2125
2126    /// Consume this context and return the collected dependencies.
2127    pub(crate) fn into_deps(self) -> Vec<FullCacheKey> {
2128        self.deps.into_inner()
2129    }
2130}
2131
2132impl<T: Tracer> Db for LocatorContext<'_, T> {
2133    fn query<Q: Query>(&self, query: Q) -> Result<Arc<Q::Output>, QueryError> {
2134        let full_key = QueryCacheKey::new(query.clone()).into();
2135
2136        // Record this as a dependency of the asset being located
2137        self.deps.borrow_mut().push(full_key);
2138
2139        // Execute the query via the runtime
2140        self.runtime.query(query)
2141    }
2142
2143    fn asset<K: AssetKey>(&self, key: K) -> Result<Arc<K::Asset>, QueryError> {
2144        self.asset_state(key)?.suspend()
2145    }
2146
2147    fn asset_state<K: AssetKey>(&self, key: K) -> Result<AssetLoadingState<K>, QueryError> {
2148        let full_cache_key = AssetCacheKey::new(key.clone()).into();
2149
2150        // Record this as a dependency of the asset being located
2151        self.deps.borrow_mut().push(full_cache_key);
2152
2153        // Access the asset - consistency checking is done inside get_asset_with_revision
2154        let (state, _changed_at) = self.runtime.get_asset_with_revision(key)?;
2155
2156        Ok(state)
2157    }
2158
2159    fn list_queries<Q: Query>(&self) -> Vec<Q> {
2160        self.runtime.list_queries()
2161    }
2162
2163    fn list_asset_keys<K: AssetKey>(&self) -> Vec<K> {
2164        self.runtime.list_asset_keys()
2165    }
2166}
2167
2168/// Enum dispatch wrapper for Db implementations.
2169///
2170/// Reduces monomorphization by providing a single concrete type
2171/// for `&impl Db` parameters in user code.
2172pub(crate) enum DbDispatch<'a, T: Tracer = NoopTracer> {
2173    /// Query execution context (tracks query dependencies)
2174    QueryContext(&'a QueryContext<'a, T>),
2175    /// Locator execution context (tracks asset dependencies)
2176    LocatorContext(&'a LocatorContext<'a, T>),
2177}
2178
2179impl<T: Tracer> Db for DbDispatch<'_, T> {
2180    fn query<Q: Query>(&self, query: Q) -> Result<Arc<Q::Output>, QueryError> {
2181        match self {
2182            DbDispatch::QueryContext(ctx) => ctx.query(query),
2183            DbDispatch::LocatorContext(ctx) => ctx.query(query),
2184        }
2185    }
2186
2187    fn asset<K: AssetKey>(&self, key: K) -> Result<Arc<K::Asset>, QueryError> {
2188        match self {
2189            DbDispatch::QueryContext(ctx) => ctx.asset(key),
2190            DbDispatch::LocatorContext(ctx) => ctx.asset(key),
2191        }
2192    }
2193
2194    fn asset_state<K: AssetKey>(&self, key: K) -> Result<AssetLoadingState<K>, QueryError> {
2195        match self {
2196            DbDispatch::QueryContext(ctx) => ctx.asset_state(key),
2197            DbDispatch::LocatorContext(ctx) => ctx.asset_state(key),
2198        }
2199    }
2200
2201    fn list_queries<Q: Query>(&self) -> Vec<Q> {
2202        match self {
2203            DbDispatch::QueryContext(ctx) => ctx.list_queries(),
2204            DbDispatch::LocatorContext(ctx) => ctx.list_queries(),
2205        }
2206    }
2207
2208    fn list_asset_keys<K: AssetKey>(&self) -> Vec<K> {
2209        match self {
2210            DbDispatch::QueryContext(ctx) => ctx.list_asset_keys(),
2211            DbDispatch::LocatorContext(ctx) => ctx.list_asset_keys(),
2212        }
2213    }
2214}
2215
2216#[cfg(test)]
2217mod tests {
2218    use super::*;
2219
2220    #[test]
2221    fn test_simple_query() {
2222        #[derive(Clone, Debug, Hash, PartialEq, Eq)]
2223        struct Add {
2224            a: i32,
2225            b: i32,
2226        }
2227
2228        impl Query for Add {
2229            type Output = i32;
2230
2231            fn query(self, _db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
2232                Ok(Arc::new(self.a + self.b))
2233            }
2234
2235            fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
2236                old == new
2237            }
2238        }
2239
2240        let runtime = QueryRuntime::new();
2241
2242        let result = runtime.query(Add { a: 1, b: 2 }).unwrap();
2243        assert_eq!(*result, 3);
2244
2245        // Second query should be cached
2246        let result2 = runtime.query(Add { a: 1, b: 2 }).unwrap();
2247        assert_eq!(*result2, 3);
2248    }
2249
2250    #[test]
2251    fn test_dependent_queries() {
2252        #[derive(Clone, Debug, Hash, PartialEq, Eq)]
2253        struct Base {
2254            value: i32,
2255        }
2256
2257        impl Query for Base {
2258            type Output = i32;
2259
2260            fn query(self, _db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
2261                Ok(Arc::new(self.value * 2))
2262            }
2263
2264            fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
2265                old == new
2266            }
2267        }
2268
2269        #[derive(Clone, Debug, Hash, PartialEq, Eq)]
2270        struct Derived {
2271            base_value: i32,
2272        }
2273
2274        impl Query for Derived {
2275            type Output = i32;
2276
2277            fn query(self, db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
2278                let base = db.query(Base {
2279                    value: self.base_value,
2280                })?;
2281                Ok(Arc::new(*base + 10))
2282            }
2283
2284            fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
2285                old == new
2286            }
2287        }
2288
2289        let runtime = QueryRuntime::new();
2290
2291        let result = runtime.query(Derived { base_value: 5 }).unwrap();
2292        assert_eq!(*result, 20); // 5 * 2 + 10
2293    }
2294
2295    #[test]
2296    fn test_cycle_detection() {
2297        #[derive(Clone, Debug, Hash, PartialEq, Eq)]
2298        struct CycleA {
2299            id: i32,
2300        }
2301
2302        #[derive(Clone, Debug, Hash, PartialEq, Eq)]
2303        struct CycleB {
2304            id: i32,
2305        }
2306
2307        impl Query for CycleA {
2308            type Output = i32;
2309
2310            fn query(self, db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
2311                let b = db.query(CycleB { id: self.id })?;
2312                Ok(Arc::new(*b + 1))
2313            }
2314
2315            fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
2316                old == new
2317            }
2318        }
2319
2320        impl Query for CycleB {
2321            type Output = i32;
2322
2323            fn query(self, db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
2324                let a = db.query(CycleA { id: self.id })?;
2325                Ok(Arc::new(*a + 1))
2326            }
2327
2328            fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
2329                old == new
2330            }
2331        }
2332
2333        let runtime = QueryRuntime::new();
2334
2335        let result = runtime.query(CycleA { id: 1 });
2336        assert!(matches!(result, Err(QueryError::Cycle { .. })));
2337    }
2338
2339    #[test]
2340    fn test_fallible_query() {
2341        #[derive(Clone, Debug, Hash, PartialEq, Eq)]
2342        struct ParseInt {
2343            input: String,
2344        }
2345
2346        impl Query for ParseInt {
2347            type Output = Result<i32, std::num::ParseIntError>;
2348
2349            fn query(self, _db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
2350                Ok(Arc::new(self.input.parse()))
2351            }
2352
2353            fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
2354                old == new
2355            }
2356        }
2357
2358        let runtime = QueryRuntime::new();
2359
2360        // Valid parse
2361        let result = runtime
2362            .query(ParseInt {
2363                input: "42".to_string(),
2364            })
2365            .unwrap();
2366        assert_eq!(*result, Ok(42));
2367
2368        // Invalid parse - system succeeds, user error in output
2369        let result = runtime
2370            .query(ParseInt {
2371                input: "not_a_number".to_string(),
2372            })
2373            .unwrap();
2374        assert!(result.is_err());
2375    }
2376
2377    // Macro tests
2378    mod macro_tests {
2379        use super::*;
2380        use crate::query;
2381
2382        #[query]
2383        fn add(db: &impl Db, a: i32, b: i32) -> Result<i32, QueryError> {
2384            let _ = db; // silence unused warning
2385            Ok(a + b)
2386        }
2387
2388        #[test]
2389        fn test_macro_basic() {
2390            let runtime = QueryRuntime::new();
2391            let result = runtime.query(Add::new(1, 2)).unwrap();
2392            assert_eq!(*result, 3);
2393        }
2394
2395        #[query]
2396        fn simple_double(db: &impl Db, x: i32) -> Result<i32, QueryError> {
2397            let _ = db;
2398            Ok(x * 2)
2399        }
2400
2401        #[test]
2402        fn test_macro_simple() {
2403            let runtime = QueryRuntime::new();
2404            let result = runtime.query(SimpleDouble::new(5)).unwrap();
2405            assert_eq!(*result, 10);
2406        }
2407
2408        #[query(keys(id))]
2409        fn with_key_selection(
2410            db: &impl Db,
2411            id: u32,
2412            include_extra: bool,
2413        ) -> Result<String, QueryError> {
2414            let _ = db;
2415            Ok(format!("id={}, extra={}", id, include_extra))
2416        }
2417
2418        #[test]
2419        fn test_macro_key_selection() {
2420            let runtime = QueryRuntime::new();
2421
2422            // Same id, different include_extra - should return cached
2423            let r1 = runtime.query(WithKeySelection::new(1, true)).unwrap();
2424            let r2 = runtime.query(WithKeySelection::new(1, false)).unwrap();
2425
2426            // Both should have same value because only `id` is the key
2427            assert_eq!(*r1, "id=1, extra=true");
2428            assert_eq!(*r2, "id=1, extra=true"); // Cached!
2429        }
2430
2431        #[query]
2432        fn dependent(db: &impl Db, a: i32, b: i32) -> Result<i32, QueryError> {
2433            let sum = db.query(Add::new(a, b))?;
2434            Ok(*sum * 2)
2435        }
2436
2437        #[test]
2438        fn test_macro_dependencies() {
2439            let runtime = QueryRuntime::new();
2440            let result = runtime.query(Dependent::new(3, 4)).unwrap();
2441            assert_eq!(*result, 14); // (3 + 4) * 2
2442        }
2443
2444        #[query(output_eq)]
2445        fn with_output_eq(db: &impl Db, x: i32) -> Result<i32, QueryError> {
2446            let _ = db;
2447            Ok(x * 2)
2448        }
2449
2450        #[test]
2451        fn test_macro_output_eq() {
2452            let runtime = QueryRuntime::new();
2453            let result = runtime.query(WithOutputEq::new(5)).unwrap();
2454            assert_eq!(*result, 10);
2455        }
2456
2457        #[query(name = "CustomName")]
2458        fn original_name(db: &impl Db, x: i32) -> Result<i32, QueryError> {
2459            let _ = db;
2460            Ok(x)
2461        }
2462
2463        #[test]
2464        fn test_macro_custom_name() {
2465            let runtime = QueryRuntime::new();
2466            let result = runtime.query(CustomName::new(42)).unwrap();
2467            assert_eq!(*result, 42);
2468        }
2469
2470        // Test that attribute macros like #[tracing::instrument] are preserved
2471        // We use #[allow(unused_variables)] and #[inline] as test attributes since
2472        // they don't require external dependencies.
2473        #[allow(unused_variables)]
2474        #[inline]
2475        #[query]
2476        fn with_attributes(db: &impl Db, x: i32) -> Result<i32, QueryError> {
2477            // This would warn without #[allow(unused_variables)] on the generated method
2478            let unused_var = 42;
2479            Ok(x * 2)
2480        }
2481
2482        #[test]
2483        fn test_macro_preserves_attributes() {
2484            let runtime = QueryRuntime::new();
2485            // If attributes weren't preserved, this might warn about unused_var
2486            let result = runtime.query(WithAttributes::new(5)).unwrap();
2487            assert_eq!(*result, 10);
2488        }
2489    }
2490
2491    // Tests for poll() and changed_at()
2492    mod poll_tests {
2493        use super::*;
2494
2495        #[derive(Clone, Debug, Hash, PartialEq, Eq)]
2496        struct Counter {
2497            id: i32,
2498        }
2499
2500        impl Query for Counter {
2501            type Output = i32;
2502
2503            fn query(self, _db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
2504                Ok(Arc::new(self.id * 10))
2505            }
2506
2507            fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
2508                old == new
2509            }
2510        }
2511
2512        #[test]
2513        fn test_poll_returns_value_and_revision() {
2514            let runtime = QueryRuntime::new();
2515
2516            let result = runtime.poll(Counter { id: 1 }).unwrap();
2517
2518            // Value should be correct - access through Result and Arc
2519            assert_eq!(**result.value.as_ref().unwrap(), 10);
2520
2521            // Revision should be non-zero after first execution
2522            assert!(result.revision > 0);
2523        }
2524
2525        #[test]
2526        fn test_poll_revision_stable_on_cache_hit() {
2527            let runtime = QueryRuntime::new();
2528
2529            // First poll
2530            let result1 = runtime.poll(Counter { id: 1 }).unwrap();
2531            let rev1 = result1.revision;
2532
2533            // Second poll (cache hit)
2534            let result2 = runtime.poll(Counter { id: 1 }).unwrap();
2535            let rev2 = result2.revision;
2536
2537            // Revision should be the same (no change)
2538            assert_eq!(rev1, rev2);
2539        }
2540
2541        #[test]
2542        fn test_poll_revision_changes_on_invalidate() {
2543            let runtime = QueryRuntime::new();
2544
2545            // First poll
2546            let result1 = runtime.poll(Counter { id: 1 }).unwrap();
2547            let rev1 = result1.revision;
2548
2549            // Invalidate and poll again
2550            runtime.invalidate(&Counter { id: 1 });
2551            let result2 = runtime.poll(Counter { id: 1 }).unwrap();
2552            let rev2 = result2.revision;
2553
2554            // Revision should increase (value was recomputed)
2555            // Note: Since output_eq returns true (same value), this might not change
2556            // depending on early cutoff behavior. Let's verify the value is still correct.
2557            assert_eq!(**result2.value.as_ref().unwrap(), 10);
2558
2559            // With early cutoff, revision might stay the same if value didn't change
2560            // This is expected behavior
2561            assert!(rev2 >= rev1);
2562        }
2563
2564        #[test]
2565        fn test_changed_at_returns_none_for_unexecuted_query() {
2566            let runtime = QueryRuntime::new();
2567
2568            // Query has never been executed
2569            let rev = runtime.changed_at(&Counter { id: 1 });
2570            assert!(rev.is_none());
2571        }
2572
2573        #[test]
2574        fn test_changed_at_returns_revision_after_execution() {
2575            let runtime = QueryRuntime::new();
2576
2577            // Execute the query
2578            let _ = runtime.query(Counter { id: 1 }).unwrap();
2579
2580            // Now changed_at should return Some
2581            let rev = runtime.changed_at(&Counter { id: 1 });
2582            assert!(rev.is_some());
2583            assert!(rev.unwrap() > 0);
2584        }
2585
2586        #[test]
2587        fn test_changed_at_matches_poll_revision() {
2588            let runtime = QueryRuntime::new();
2589
2590            // Poll the query
2591            let result = runtime.poll(Counter { id: 1 }).unwrap();
2592
2593            // changed_at should match the revision from poll
2594            let rev = runtime.changed_at(&Counter { id: 1 });
2595            assert_eq!(rev, Some(result.revision));
2596        }
2597
2598        #[test]
2599        fn test_poll_value_access() {
2600            let runtime = QueryRuntime::new();
2601
2602            let result = runtime.poll(Counter { id: 5 }).unwrap();
2603
2604            // Access through Result and Arc
2605            let value: &i32 = result.value.as_ref().unwrap();
2606            assert_eq!(*value, 50);
2607
2608            // Access Arc directly via field after unwrapping Result
2609            let arc: &Arc<i32> = result.value.as_ref().unwrap();
2610            assert_eq!(**arc, 50);
2611        }
2612
2613        #[test]
2614        fn test_subscription_pattern() {
2615            let runtime = QueryRuntime::new();
2616
2617            // Simulate subscription pattern
2618            let mut last_revision: RevisionCounter = 0;
2619            let mut notifications = 0;
2620
2621            // First poll - should notify (new value)
2622            let result = runtime.poll(Counter { id: 1 }).unwrap();
2623            if result.revision > last_revision {
2624                notifications += 1;
2625                last_revision = result.revision;
2626            }
2627
2628            // Second poll - should NOT notify (no change)
2629            let result = runtime.poll(Counter { id: 1 }).unwrap();
2630            if result.revision > last_revision {
2631                notifications += 1;
2632                last_revision = result.revision;
2633            }
2634
2635            // Third poll - should NOT notify (no change)
2636            let result = runtime.poll(Counter { id: 1 }).unwrap();
2637            if result.revision > last_revision {
2638                notifications += 1;
2639                #[allow(unused_assignments)]
2640                {
2641                    last_revision = result.revision;
2642                }
2643            }
2644
2645            // Only the first poll should have triggered a notification
2646            assert_eq!(notifications, 1);
2647        }
2648    }
2649
2650    // Tests for GC APIs
2651    mod gc_tests {
2652        use super::*;
2653        use crate::tracer::{SpanContext, SpanId, TraceId};
2654        use std::collections::HashSet;
2655        use std::sync::atomic::{AtomicUsize, Ordering};
2656        use std::sync::Mutex;
2657
2658        #[derive(Clone, Debug, Hash, PartialEq, Eq)]
2659        struct Leaf {
2660            id: i32,
2661        }
2662
2663        impl Query for Leaf {
2664            type Output = i32;
2665
2666            fn query(self, _db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
2667                Ok(Arc::new(self.id * 10))
2668            }
2669
2670            fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
2671                old == new
2672            }
2673        }
2674
2675        #[derive(Clone, Debug, Hash, PartialEq, Eq)]
2676        struct Parent {
2677            child_id: i32,
2678        }
2679
2680        impl Query for Parent {
2681            type Output = i32;
2682
2683            fn query(self, db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
2684                let child = db.query(Leaf { id: self.child_id })?;
2685                Ok(Arc::new(*child + 1))
2686            }
2687
2688            fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
2689                old == new
2690            }
2691        }
2692
2693        #[test]
2694        fn test_query_keys_returns_all_cached_queries() {
2695            let runtime = QueryRuntime::new();
2696
2697            // Execute some queries
2698            let _ = runtime.query(Leaf { id: 1 }).unwrap();
2699            let _ = runtime.query(Leaf { id: 2 }).unwrap();
2700            let _ = runtime.query(Leaf { id: 3 }).unwrap();
2701
2702            // Get all keys
2703            let keys = runtime.query_keys();
2704
2705            // Should have at least 3 keys (might have more due to sentinels)
2706            assert!(keys.len() >= 3);
2707        }
2708
2709        #[test]
2710        fn test_remove_removes_query() {
2711            let runtime = QueryRuntime::new();
2712
2713            // Execute a query
2714            let _ = runtime.query(Leaf { id: 1 }).unwrap();
2715
2716            // Get the key
2717            let full_key = QueryCacheKey::new(Leaf { id: 1 }).into();
2718
2719            // Query should exist
2720            assert!(runtime.changed_at(&Leaf { id: 1 }).is_some());
2721
2722            // Remove it
2723            assert!(runtime.remove(&full_key));
2724
2725            // Query should no longer exist
2726            assert!(runtime.changed_at(&Leaf { id: 1 }).is_none());
2727        }
2728
2729        #[test]
2730        fn test_remove_if_unused_removes_leaf_query() {
2731            let runtime = QueryRuntime::new();
2732
2733            // Execute a leaf query (no dependents)
2734            let _ = runtime.query(Leaf { id: 1 }).unwrap();
2735
2736            // Should be removable since no other query depends on it
2737            assert!(runtime.remove_query_if_unused(&Leaf { id: 1 }));
2738
2739            // Query should no longer exist
2740            assert!(runtime.changed_at(&Leaf { id: 1 }).is_none());
2741        }
2742
2743        #[test]
2744        fn test_remove_if_unused_does_not_remove_query_with_dependents() {
2745            let runtime = QueryRuntime::new();
2746
2747            // Execute parent query (which depends on Leaf)
2748            let _ = runtime.query(Parent { child_id: 1 }).unwrap();
2749
2750            // Leaf query should not be removable since Parent depends on it
2751            assert!(!runtime.remove_query_if_unused(&Leaf { id: 1 }));
2752
2753            // Leaf query should still exist
2754            assert!(runtime.changed_at(&Leaf { id: 1 }).is_some());
2755
2756            // But Parent should be removable (no dependents)
2757            assert!(runtime.remove_query_if_unused(&Parent { child_id: 1 }));
2758        }
2759
2760        #[test]
2761        fn test_remove_if_unused_with_full_cache_key() {
2762            let runtime = QueryRuntime::new();
2763
2764            // Execute a leaf query
2765            let _ = runtime.query(Leaf { id: 1 }).unwrap();
2766
2767            let full_key = QueryCacheKey::new(Leaf { id: 1 }).into();
2768
2769            // Should be removable via FullCacheKey
2770            assert!(runtime.remove_if_unused(&full_key));
2771
2772            // Query should no longer exist
2773            assert!(runtime.changed_at(&Leaf { id: 1 }).is_none());
2774        }
2775
2776        // Test tracer receives on_query_start calls (for GC tracking)
2777        struct GcTracker {
2778            accessed_keys: Mutex<HashSet<String>>,
2779            access_count: AtomicUsize,
2780        }
2781
2782        impl GcTracker {
2783            fn new() -> Self {
2784                Self {
2785                    accessed_keys: Mutex::new(HashSet::new()),
2786                    access_count: AtomicUsize::new(0),
2787                }
2788            }
2789        }
2790
2791        impl Tracer for GcTracker {
2792            fn new_span_id(&self) -> SpanId {
2793                SpanId(1)
2794            }
2795
2796            fn new_trace_id(&self) -> TraceId {
2797                TraceId(1)
2798            }
2799
2800            fn on_query_start(&self, _ctx: &SpanContext, query_key: &QueryCacheKey) {
2801                self.accessed_keys
2802                    .lock()
2803                    .unwrap()
2804                    .insert(query_key.debug_repr().to_string());
2805                self.access_count.fetch_add(1, Ordering::Relaxed);
2806            }
2807        }
2808
2809        #[test]
2810        fn test_tracer_receives_on_query_start() {
2811            let tracker = GcTracker::new();
2812            let runtime = QueryRuntime::with_tracer(tracker);
2813
2814            // Execute some queries
2815            let _ = runtime.query(Leaf { id: 1 }).unwrap();
2816            let _ = runtime.query(Leaf { id: 2 }).unwrap();
2817
2818            // Tracer should have received on_query_start calls
2819            let count = runtime.tracer().access_count.load(Ordering::Relaxed);
2820            assert_eq!(count, 2);
2821
2822            // Check that the keys were recorded
2823            let keys = runtime.tracer().accessed_keys.lock().unwrap();
2824            assert!(keys.iter().any(|k| k.contains("Leaf")));
2825        }
2826
2827        #[test]
2828        fn test_tracer_receives_on_query_start_for_cache_hits() {
2829            let tracker = GcTracker::new();
2830            let runtime = QueryRuntime::with_tracer(tracker);
2831
2832            // Execute query twice (second is cache hit)
2833            let _ = runtime.query(Leaf { id: 1 }).unwrap();
2834            let _ = runtime.query(Leaf { id: 1 }).unwrap();
2835
2836            // Tracer should have received on_query_start for both calls
2837            let count = runtime.tracer().access_count.load(Ordering::Relaxed);
2838            assert_eq!(count, 2);
2839        }
2840
2841        #[test]
2842        fn test_gc_workflow() {
2843            let tracker = GcTracker::new();
2844            let runtime = QueryRuntime::with_tracer(tracker);
2845
2846            // Execute some queries
2847            let _ = runtime.query(Leaf { id: 1 }).unwrap();
2848            let _ = runtime.query(Leaf { id: 2 }).unwrap();
2849            let _ = runtime.query(Leaf { id: 3 }).unwrap();
2850
2851            // Simulate GC: remove all queries that are not in use
2852            let mut removed = 0;
2853            for key in runtime.query_keys() {
2854                if runtime.remove_if_unused(&key) {
2855                    removed += 1;
2856                }
2857            }
2858
2859            // All leaf queries should be removable
2860            assert!(removed >= 3);
2861
2862            // Queries should no longer exist
2863            assert!(runtime.changed_at(&Leaf { id: 1 }).is_none());
2864            assert!(runtime.changed_at(&Leaf { id: 2 }).is_none());
2865            assert!(runtime.changed_at(&Leaf { id: 3 }).is_none());
2866        }
2867    }
2868}