query_flow/
runtime.rs

1//! Query runtime and context.
2
3use std::any::{Any, TypeId};
4use std::cell::RefCell;
5use std::ops::Deref;
6use std::rc::Rc;
7use std::sync::Arc;
8
9use whale::{Durability, GetOrInsertResult, RevisionCounter, Runtime as WhaleRuntime};
10
11use crate::asset::{AssetKey, AssetLocator, DurabilityLevel, PendingAsset};
12use crate::db::Db;
13use crate::key::{
14    AssetCacheKey, AssetKeySetSentinelKey, FullCacheKey, QueryCacheKey, QuerySetSentinelKey,
15};
16use crate::loading::AssetLoadingState;
17use crate::query::Query;
18use crate::storage::{
19    AssetKeyRegistry, CachedEntry, CachedValue, ErasedLocateResult, LocatorStorage, PendingStorage,
20    QueryRegistry, VerifierStorage,
21};
22use crate::tracer::{
23    ExecutionResult, InvalidationReason, NoopTracer, SpanContext, SpanId, TraceId, Tracer,
24    TracerAssetState,
25};
26use crate::QueryError;
27
28/// Function type for comparing user errors during early cutoff.
29///
30/// Used by `QueryRuntimeBuilder::error_comparator` to customize how
31/// `QueryError::UserError` values are compared for caching purposes.
32pub type ErrorComparator = fn(&anyhow::Error, &anyhow::Error) -> bool;
33
34/// Number of durability levels (matches whale's default).
35const DURABILITY_LEVELS: usize = 4;
36
37// Thread-local query execution stack for cycle detection.
38thread_local! {
39    static QUERY_STACK: RefCell<Vec<FullCacheKey>> = const { RefCell::new(Vec::new()) };
40
41    /// Current consistency tracker for leaf asset checks.
42    /// Set during query execution, used by all nested locator calls.
43    /// Uses Rc since thread-local is single-threaded.
44    static CONSISTENCY_TRACKER: RefCell<Option<Rc<ConsistencyTracker>>> = const { RefCell::new(None) };
45
46    /// Stack for tracking parent-child query relationships.
47    static SPAN_STACK: RefCell<SpanStack> = const { RefCell::new(SpanStack::Empty) };
48}
49
50/// Thread-local span stack state.
51/// Empty when no query is executing; Active with trace_id and span stack during execution.
52enum SpanStack {
53    Empty,
54    Active(TraceId, Vec<SpanId>),
55}
56
57/// Check a leaf asset against the current consistency tracker (if any).
58/// Returns Ok if no tracker is set or if the check passes.
59fn check_leaf_asset_consistency(dep_changed_at: RevisionCounter) -> Result<(), QueryError> {
60    CONSISTENCY_TRACKER.with(|tracker| {
61        if let Some(ref t) = *tracker.borrow() {
62            t.check_leaf_asset(dep_changed_at)
63        } else {
64            Ok(())
65        }
66    })
67}
68
69/// RAII guard that sets the consistency tracker for the current thread.
70struct ConsistencyTrackerGuard {
71    previous: Option<Rc<ConsistencyTracker>>,
72}
73
74impl ConsistencyTrackerGuard {
75    fn new(tracker: Rc<ConsistencyTracker>) -> Self {
76        let previous = CONSISTENCY_TRACKER.with(|t| t.borrow_mut().replace(tracker));
77        Self { previous }
78    }
79}
80
81impl Drop for ConsistencyTrackerGuard {
82    fn drop(&mut self) {
83        CONSISTENCY_TRACKER.with(|t| {
84            *t.borrow_mut() = self.previous.take();
85        });
86    }
87}
88
89/// Check for cycles in the query stack and return error if detected.
90fn check_cycle(key: &FullCacheKey) -> Result<(), QueryError> {
91    let cycle_detected = QUERY_STACK.with(|stack| stack.borrow().iter().any(|k| k == key));
92    if cycle_detected {
93        let path = QUERY_STACK.with(|stack| {
94            let stack = stack.borrow();
95            let mut path: Vec<FullCacheKey> = stack.iter().cloned().collect();
96            path.push(key.clone());
97            path
98        });
99        return Err(QueryError::Cycle { path });
100    }
101    Ok(())
102}
103
104/// RAII guard for pushing/popping from query stack.
105struct StackGuard;
106
107impl StackGuard {
108    fn push(key: FullCacheKey) -> Self {
109        QUERY_STACK.with(|stack| stack.borrow_mut().push(key));
110        StackGuard
111    }
112}
113
114impl Drop for StackGuard {
115    fn drop(&mut self) {
116        QUERY_STACK.with(|stack| {
117            stack.borrow_mut().pop();
118        });
119    }
120}
121
122/// RAII guard for pushing/popping from span stack.
123struct SpanStackGuard;
124
125impl SpanStackGuard {
126    /// Push a span onto the stack. Sets trace_id if this is the root span.
127    fn push(trace_id: TraceId, span_id: SpanId) -> Self {
128        SPAN_STACK.with(|stack| {
129            let mut s = stack.borrow_mut();
130            match &mut *s {
131                SpanStack::Empty => *s = SpanStack::Active(trace_id, vec![span_id]),
132                SpanStack::Active(_, spans) => spans.push(span_id),
133            }
134        });
135        SpanStackGuard
136    }
137}
138
139impl Drop for SpanStackGuard {
140    fn drop(&mut self) {
141        SPAN_STACK.with(|stack| {
142            let mut s = stack.borrow_mut();
143            if let SpanStack::Active(_, spans) = &mut *s {
144                spans.pop();
145                if spans.is_empty() {
146                    *s = SpanStack::Empty;
147                }
148            }
149        });
150    }
151}
152
153/// Execution context passed through query execution.
154///
155/// Contains a SpanContext for tracing correlation with parent-child relationships.
156#[derive(Clone, Copy)]
157pub struct ExecutionContext {
158    span_ctx: SpanContext,
159}
160
161impl ExecutionContext {
162    /// Create a new execution context with the given span context.
163    #[inline]
164    pub fn new(span_ctx: SpanContext) -> Self {
165        Self { span_ctx }
166    }
167
168    /// Get the span context for this execution context.
169    #[inline]
170    pub fn span_ctx(&self) -> &SpanContext {
171        &self.span_ctx
172    }
173}
174
175/// Result of polling a query, containing the value and its revision.
176///
177/// This is returned by [`QueryRuntime::poll`] and provides both the query result
178/// and its change revision, enabling efficient change detection for subscription
179/// patterns.
180///
181/// # Example
182///
183/// ```ignore
184/// let result = runtime.poll(MyQuery::new())?;
185///
186/// // Access the value via Deref
187/// println!("{:?}", *result);
188///
189/// // Check if changed since last poll
190/// if result.revision > last_known_revision {
191///     notify_subscribers(&result.value);
192///     last_known_revision = result.revision;
193/// }
194/// ```
195#[derive(Debug, Clone)]
196pub struct Polled<T> {
197    /// The query result value.
198    pub value: T,
199    /// The revision at which this value was last changed.
200    ///
201    /// Compare this with a previously stored revision to detect changes.
202    pub revision: RevisionCounter,
203}
204
205impl<T: Deref> Deref for Polled<T> {
206    type Target = T::Target;
207
208    fn deref(&self) -> &Self::Target {
209        &self.value
210    }
211}
212
213/// The query runtime manages query execution, caching, and dependency tracking.
214///
215/// This is cheap to clone - all data is behind `Arc`.
216///
217/// # Type Parameter
218///
219/// - `T: Tracer` - The tracer type for observability. Use `NoopTracer` (default)
220///   for zero-cost when tracing is not needed.
221///
222/// # Example
223///
224/// ```ignore
225/// // Without tracing (default)
226/// let runtime = QueryRuntime::new();
227///
228/// // With tracing
229/// let tracer = MyTracer::new();
230/// let runtime = QueryRuntime::with_tracer(tracer);
231///
232/// // Sync query execution
233/// let result = runtime.query(MyQuery { ... })?;
234/// ```
235pub struct QueryRuntime<T: Tracer = NoopTracer> {
236    /// Whale runtime for dependency tracking and cache storage.
237    /// Query outputs and asset values are stored in Node.data as Option<CachedEntry>.
238    whale: WhaleRuntime<FullCacheKey, Option<CachedEntry>, DURABILITY_LEVELS>,
239    /// Registered asset locators
240    locators: Arc<LocatorStorage<T>>,
241    /// Pending asset requests
242    pending: Arc<PendingStorage>,
243    /// Registry for tracking query instances (for list_queries)
244    query_registry: Arc<QueryRegistry>,
245    /// Registry for tracking asset keys (for list_asset_keys)
246    asset_key_registry: Arc<AssetKeyRegistry>,
247    /// Verifiers for re-executing queries (for verify-then-decide pattern)
248    verifiers: Arc<VerifierStorage>,
249    /// Comparator for user errors during early cutoff
250    error_comparator: ErrorComparator,
251    /// Tracer for observability
252    tracer: Arc<T>,
253}
254
255#[test]
256fn test_runtime_send_sync() {
257    fn assert_send_sync<T: Send + Sync>() {}
258    assert_send_sync::<QueryRuntime<NoopTracer>>();
259}
260
261impl Default for QueryRuntime<NoopTracer> {
262    fn default() -> Self {
263        Self::new()
264    }
265}
266
267impl<T: Tracer> Clone for QueryRuntime<T> {
268    fn clone(&self) -> Self {
269        Self {
270            whale: self.whale.clone(),
271            locators: self.locators.clone(),
272            pending: self.pending.clone(),
273            query_registry: self.query_registry.clone(),
274            asset_key_registry: self.asset_key_registry.clone(),
275            verifiers: self.verifiers.clone(),
276            error_comparator: self.error_comparator,
277            tracer: self.tracer.clone(),
278        }
279    }
280}
281
282/// Default error comparator that treats all errors as different.
283///
284/// This is conservative - it always triggers recomputation when an error occurs.
285fn default_error_comparator(_a: &anyhow::Error, _b: &anyhow::Error) -> bool {
286    false
287}
288
289impl<T: Tracer> QueryRuntime<T> {
290    /// Get cached output along with its revision (single atomic access).
291    fn get_cached_with_revision<Q: Query>(
292        &self,
293        key: &FullCacheKey,
294    ) -> Option<(CachedValue<Arc<Q::Output>>, RevisionCounter)> {
295        let node = self.whale.get(key)?;
296        let revision = node.changed_at;
297        let entry = node.data.as_ref()?;
298        let cached = entry.to_cached_value::<Q::Output>()?;
299        Some((cached, revision))
300    }
301
302    /// Get a reference to the tracer.
303    #[inline]
304    pub fn tracer(&self) -> &T {
305        &self.tracer
306    }
307}
308
309impl QueryRuntime<NoopTracer> {
310    /// Create a new query runtime with default settings.
311    pub fn new() -> Self {
312        Self::with_tracer(NoopTracer)
313    }
314
315    /// Create a builder for customizing the runtime.
316    ///
317    /// # Example
318    ///
319    /// ```ignore
320    /// let runtime = QueryRuntime::builder()
321    ///     .error_comparator(|a, b| {
322    ///         // Custom error comparison logic
323    ///         match (a.downcast_ref::<MyError>(), b.downcast_ref::<MyError>()) {
324    ///             (Some(a), Some(b)) => a == b,
325    ///             _ => false,
326    ///         }
327    ///     })
328    ///     .build();
329    /// ```
330    pub fn builder() -> QueryRuntimeBuilder<NoopTracer> {
331        QueryRuntimeBuilder::new()
332    }
333}
334
335impl<T: Tracer> QueryRuntime<T> {
336    /// Create a new query runtime with the specified tracer.
337    pub fn with_tracer(tracer: T) -> Self {
338        QueryRuntimeBuilder::new().tracer(tracer).build()
339    }
340
341    /// Execute a query synchronously.
342    ///
343    /// Returns the cached result if valid, otherwise executes the query.
344    ///
345    /// # Errors
346    ///
347    /// - `QueryError::Suspend` - Query is waiting for async loading
348    /// - `QueryError::Cycle` - Dependency cycle detected
349    pub fn query<Q: Query>(&self, query: Q) -> Result<Arc<Q::Output>, QueryError> {
350        self.query_internal(query)
351            .and_then(|(inner_result, _)| inner_result.map_err(QueryError::UserError))
352    }
353
354    /// Internal implementation shared by query() and poll().
355    ///
356    /// Returns (result, revision) tuple where result is either Ok(output) or Err(user_error).
357    /// System errors (Suspend, Cycle, etc.) are returned as the outer Err.
358    #[allow(clippy::type_complexity)]
359    fn query_internal<Q: Query>(
360        &self,
361        query: Q,
362    ) -> Result<(Result<Arc<Q::Output>, Arc<anyhow::Error>>, RevisionCounter), QueryError> {
363        let query_cache_key = QueryCacheKey::new(query.clone());
364        let full_key: FullCacheKey = query_cache_key.clone().into();
365
366        // Create SpanContext with parent relationship from SPAN_STACK
367        let span_id = self.tracer.new_span_id();
368        let (trace_id, parent_span_id) = SPAN_STACK.with(|stack| match &*stack.borrow() {
369            SpanStack::Empty => (self.tracer.new_trace_id(), None),
370            SpanStack::Active(tid, spans) => (*tid, spans.last().copied()),
371        });
372        let span_ctx = SpanContext {
373            span_id,
374            trace_id,
375            parent_span_id,
376        };
377
378        // Push to span stack and create execution context
379        let _span_guard = SpanStackGuard::push(trace_id, span_id);
380        let exec_ctx = ExecutionContext::new(span_ctx);
381
382        self.tracer.on_query_start(&span_ctx, &query_cache_key);
383
384        // Check for cycles using thread-local stack
385        let cycle_detected = QUERY_STACK.with(|stack| {
386            let stack = stack.borrow();
387            stack.iter().any(|k| k == &full_key)
388        });
389
390        if cycle_detected {
391            let path = QUERY_STACK.with(|stack| {
392                let stack = stack.borrow();
393                let mut path: Vec<FullCacheKey> = stack.iter().cloned().collect();
394                path.push(full_key.clone());
395                path
396            });
397
398            self.tracer.on_cycle_detected(&path);
399            self.tracer
400                .on_query_end(&span_ctx, &query_cache_key, ExecutionResult::CycleDetected);
401
402            return Err(QueryError::Cycle { path });
403        }
404
405        // Check if cached and valid (with verify-then-decide pattern)
406        let current_rev = self.whale.current_revision();
407
408        // Fast path: already verified at current revision
409        if self.whale.is_verified_at(&full_key, &current_rev) {
410            // Single atomic access to get both cached value and revision
411            if let Some((cached, revision)) = self.get_cached_with_revision::<Q>(&full_key) {
412                self.tracer
413                    .on_cache_check(&span_ctx, &query_cache_key, true);
414                self.tracer
415                    .on_query_end(&span_ctx, &query_cache_key, ExecutionResult::CacheHit);
416
417                return match cached {
418                    CachedValue::Ok(output) => Ok((Ok(output), revision)),
419                    CachedValue::UserError(err) => Ok((Err(err), revision)),
420                };
421            }
422        }
423
424        // Check shallow validity (deps' changed_at ok) and try verify-then-decide
425        if self.whale.is_valid(&full_key) {
426            // Single atomic access to get both cached value and revision
427            if let Some((cached, revision)) = self.get_cached_with_revision::<Q>(&full_key) {
428                // Shallow valid but not verified - verify deps first
429                let mut deps_verified = true;
430                if let Some(deps) = self.whale.get_dependency_ids(&full_key) {
431                    for dep in deps {
432                        if let Some(verifier) = self.verifiers.get(&dep) {
433                            // Re-run query/asset to verify it (triggers recursive verification)
434                            // For assets, this re-accesses the asset which may re-run the locator
435                            if verifier.verify(self as &dyn std::any::Any).is_err() {
436                                deps_verified = false;
437                                break;
438                            }
439                        }
440                        // Note: deps without verifiers are assumed valid (they're verified
441                        // by the final is_valid check if their changed_at increased)
442                    }
443                }
444
445                // Re-check validity after deps are verified
446                if deps_verified && self.whale.is_valid(&full_key) {
447                    // Deps didn't change their changed_at, mark as verified and use cached
448                    self.whale.mark_verified(&full_key, &current_rev);
449
450                    self.tracer
451                        .on_cache_check(&span_ctx, &query_cache_key, true);
452                    self.tracer.on_query_end(
453                        &span_ctx,
454                        &query_cache_key,
455                        ExecutionResult::CacheHit,
456                    );
457
458                    return match cached {
459                        CachedValue::Ok(output) => Ok((Ok(output), revision)),
460                        CachedValue::UserError(err) => Ok((Err(err), revision)),
461                    };
462                }
463                // A dep's changed_at increased, fall through to execute
464            }
465        }
466
467        self.tracer
468            .on_cache_check(&span_ctx, &query_cache_key, false);
469
470        // Execute the query with cycle tracking
471        let _guard = StackGuard::push(full_key.clone());
472        let result = self.execute_query::<Q>(&query, &query_cache_key, &full_key, exec_ctx);
473        drop(_guard);
474
475        // Emit end event
476        let exec_result = match &result {
477            Ok((_, true, _)) => ExecutionResult::Changed,
478            Ok((_, false, _)) => ExecutionResult::Unchanged,
479            Err(QueryError::Suspend { .. }) => ExecutionResult::Suspended,
480            Err(QueryError::Cycle { .. }) => ExecutionResult::CycleDetected,
481            Err(e) => ExecutionResult::Error {
482                message: format!("{:?}", e),
483            },
484        };
485        self.tracer
486            .on_query_end(&span_ctx, &query_cache_key, exec_result);
487
488        result.map(|(inner_result, _, revision)| (inner_result, revision))
489    }
490
491    /// Execute a query, caching the result if appropriate.
492    ///
493    /// Returns (result, output_changed, revision) tuple.
494    /// - `result`: Ok(output) for success, Err(user_error) for user errors
495    /// - System errors (Suspend, Cycle, etc.) are returned as outer Err
496    #[allow(clippy::type_complexity)]
497    fn execute_query<Q: Query>(
498        &self,
499        query: &Q,
500        query_cache_key: &QueryCacheKey,
501        full_key: &FullCacheKey,
502        exec_ctx: ExecutionContext,
503    ) -> Result<
504        (
505            Result<Arc<Q::Output>, Arc<anyhow::Error>>,
506            bool,
507            RevisionCounter,
508        ),
509        QueryError,
510    > {
511        // Capture current global revision at query start for consistency checking
512        let start_revision = self.whale.current_revision().get(Durability::volatile());
513
514        // Create consistency tracker for this query execution
515        let tracker = Rc::new(ConsistencyTracker::new(start_revision));
516
517        // Set thread-local tracker for nested locator calls
518        let _tracker_guard = ConsistencyTrackerGuard::new(tracker);
519
520        // Create context for this query execution
521        let ctx = QueryContext {
522            runtime: self,
523            current_key: full_key.clone(),
524            exec_ctx,
525            deps: RefCell::new(Vec::new()),
526        };
527
528        // Execute the query (clone because query() takes ownership)
529        let result = query.clone().query(&ctx);
530
531        // Get collected dependencies
532        let deps: Vec<FullCacheKey> = ctx.deps.borrow().clone();
533
534        // Query durability defaults to stable - Whale will automatically reduce
535        // the effective durability to min(requested, min(dep_durabilities)).
536        // A pure query with no dependencies remains stable.
537        // A query depending on volatile assets becomes volatile.
538        let durability = Durability::stable();
539
540        match result {
541            Ok(output) => {
542                // Check if output changed (for early cutoff)
543                // existing_revision is Some only when output is unchanged (can reuse revision)
544                let existing_revision = if let Some((CachedValue::Ok(old), rev)) =
545                    self.get_cached_with_revision::<Q>(full_key)
546                {
547                    if Q::output_eq(&*old, &*output) {
548                        Some(rev) // Same output - reuse revision
549                    } else {
550                        None // Different output
551                    }
552                } else {
553                    None // No previous Ok value
554                };
555                let output_changed = existing_revision.is_none();
556
557                // Emit early cutoff check event
558                self.tracer.on_early_cutoff_check(
559                    exec_ctx.span_ctx(),
560                    query_cache_key,
561                    output_changed,
562                );
563
564                // Update whale with cached entry (atomic update of value + dependency state)
565                let entry = CachedEntry::Ok(output.clone() as Arc<dyn std::any::Any + Send + Sync>);
566                let revision = if let Some(existing_rev) = existing_revision {
567                    // confirm_unchanged doesn't change changed_at, use existing
568                    let _ = self.whale.confirm_unchanged(full_key, deps);
569                    existing_rev
570                } else {
571                    // Use new_rev from register result
572                    match self
573                        .whale
574                        .register(full_key.clone(), Some(entry), durability, deps)
575                    {
576                        Ok(result) => result.new_rev,
577                        Err(missing) => {
578                            return Err(QueryError::DependenciesRemoved {
579                                missing_keys: missing,
580                            })
581                        }
582                    }
583                };
584
585                // Register query in registry for list_queries
586                let is_new_query = self.query_registry.register(query);
587                if is_new_query {
588                    let sentinel = QuerySetSentinelKey::new::<Q>().into();
589                    let _ = self
590                        .whale
591                        .register(sentinel, None, Durability::stable(), vec![]);
592                }
593
594                // Store verifier for this query (for verify-then-decide pattern)
595                self.verifiers
596                    .insert::<Q, T>(full_key.clone(), query.clone());
597
598                Ok((Ok(output), output_changed, revision))
599            }
600            Err(QueryError::UserError(err)) => {
601                // Check if error changed (for early cutoff)
602                // existing_revision is Some only when error is unchanged (can reuse revision)
603                let existing_revision = if let Some((CachedValue::UserError(old_err), rev)) =
604                    self.get_cached_with_revision::<Q>(full_key)
605                {
606                    if (self.error_comparator)(old_err.as_ref(), err.as_ref()) {
607                        Some(rev) // Same error - reuse revision
608                    } else {
609                        None // Different error
610                    }
611                } else {
612                    None // No previous UserError
613                };
614                let output_changed = existing_revision.is_none();
615
616                // Emit early cutoff check event
617                self.tracer.on_early_cutoff_check(
618                    exec_ctx.span_ctx(),
619                    query_cache_key,
620                    output_changed,
621                );
622
623                // Update whale with cached error (atomic update of value + dependency state)
624                let entry = CachedEntry::UserError(err.clone());
625                let revision = if let Some(existing_rev) = existing_revision {
626                    // confirm_unchanged doesn't change changed_at, use existing
627                    let _ = self.whale.confirm_unchanged(full_key, deps);
628                    existing_rev
629                } else {
630                    // Use new_rev from register result
631                    match self
632                        .whale
633                        .register(full_key.clone(), Some(entry), durability, deps)
634                    {
635                        Ok(result) => result.new_rev,
636                        Err(missing) => {
637                            return Err(QueryError::DependenciesRemoved {
638                                missing_keys: missing,
639                            })
640                        }
641                    }
642                };
643
644                // Register query in registry for list_queries
645                let is_new_query = self.query_registry.register(query);
646                if is_new_query {
647                    let sentinel = QuerySetSentinelKey::new::<Q>().into();
648                    let _ = self
649                        .whale
650                        .register(sentinel, None, Durability::stable(), vec![]);
651                }
652
653                // Store verifier for this query (for verify-then-decide pattern)
654                self.verifiers
655                    .insert::<Q, T>(full_key.clone(), query.clone());
656
657                Ok((Err(err), output_changed, revision))
658            }
659            Err(e) => {
660                // System errors (Suspend, Cycle, Cancelled) are not cached
661                Err(e)
662            }
663        }
664    }
665
666    /// Invalidate a query, forcing recomputation on next access.
667    ///
668    /// This also invalidates any queries that depend on this one.
669    pub fn invalidate<Q: Query>(&self, query: &Q) {
670        let query_cache_key = QueryCacheKey::new(query.clone());
671        let full_key: FullCacheKey = query_cache_key.clone().into();
672
673        self.tracer
674            .on_query_invalidated(&query_cache_key, InvalidationReason::ManualInvalidation);
675
676        // Update whale to invalidate dependents (register with None to clear cached value)
677        // Use stable durability to increment all revision counters, ensuring queries
678        // at any durability level will see this as a change.
679        let _ = self
680            .whale
681            .register(full_key, None, Durability::stable(), vec![]);
682    }
683
684    /// Remove a query from the cache entirely, freeing memory.
685    ///
686    /// Use this for GC when a query is no longer needed.
687    /// Unlike `invalidate`, this removes all traces of the query from storage.
688    /// The query will be recomputed from scratch on next access.
689    ///
690    /// This also invalidates any queries that depend on this one.
691    pub fn remove_query<Q: Query>(&self, query: &Q) {
692        let query_cache_key = QueryCacheKey::new(query.clone());
693        let full_key: FullCacheKey = query_cache_key.clone().into();
694
695        self.tracer
696            .on_query_invalidated(&query_cache_key, InvalidationReason::ManualInvalidation);
697
698        // Remove verifier if exists
699        self.verifiers.remove(&full_key);
700
701        // Remove from whale storage (this also handles dependent invalidation)
702        self.whale.remove(&full_key);
703
704        // Remove from registry and update sentinel for list_queries
705        if self.query_registry.remove::<Q>(query) {
706            let sentinel = QuerySetSentinelKey::new::<Q>().into();
707            let _ = self
708                .whale
709                .register(sentinel, None, Durability::stable(), vec![]);
710        }
711    }
712
713    /// Clear all cached values by removing all nodes from whale.
714    ///
715    /// Note: This is a relatively expensive operation as it iterates through all keys.
716    pub fn clear_cache(&self) {
717        let keys = self.whale.keys();
718        for key in keys {
719            self.whale.remove(&key);
720        }
721    }
722
723    /// Poll a query, returning both the result and its change revision.
724    ///
725    /// This is useful for implementing subscription patterns where you need to
726    /// detect changes efficiently. Compare the returned `revision` with a
727    /// previously stored value to determine if the query result has changed.
728    ///
729    /// The returned `Polled` contains a `Result<Arc<Q::Output>, Arc<anyhow::Error>>`
730    /// as its value, allowing you to track revision changes for both success and
731    /// user error cases.
732    ///
733    /// # Example
734    ///
735    /// ```ignore
736    /// struct Subscription<Q: Query> {
737    ///     query: Q,
738    ///     last_revision: RevisionCounter,
739    ///     tx: Sender<Result<Arc<Q::Output>, Arc<anyhow::Error>>>,
740    /// }
741    ///
742    /// // Polling loop
743    /// for sub in &mut subscriptions {
744    ///     let result = runtime.poll(sub.query.clone())?;
745    ///     if result.revision > sub.last_revision {
746    ///         sub.tx.send(result.value.clone())?;
747    ///         sub.last_revision = result.revision;
748    ///     }
749    /// }
750    /// ```
751    ///
752    /// # Errors
753    ///
754    /// Returns `Err` only for system errors (Suspend, Cycle, etc.).
755    /// User errors are returned as `Ok(Polled { value: Err(error), ... })`.
756    #[allow(clippy::type_complexity)]
757    pub fn poll<Q: Query>(
758        &self,
759        query: Q,
760    ) -> Result<Polled<Result<Arc<Q::Output>, Arc<anyhow::Error>>>, QueryError> {
761        let (value, revision) = self.query_internal(query)?;
762        Ok(Polled { value, revision })
763    }
764
765    /// Get the change revision of a query without executing it.
766    ///
767    /// Returns `None` if the query has never been executed.
768    ///
769    /// This is useful for checking if a query has changed since the last poll
770    /// without the cost of executing the query.
771    ///
772    /// # Example
773    ///
774    /// ```ignore
775    /// // Check if query has changed before deciding to poll
776    /// if let Some(rev) = runtime.changed_at(&MyQuery::new(key)) {
777    ///     if rev > last_known_revision {
778    ///         let result = runtime.query(MyQuery::new(key))?;
779    ///         // Process result...
780    ///     }
781    /// }
782    /// ```
783    pub fn changed_at<Q: Query>(&self, query: &Q) -> Option<RevisionCounter> {
784        let full_key = QueryCacheKey::new(query.clone()).into();
785        self.whale.get(&full_key).map(|node| node.changed_at)
786    }
787}
788
789// ============================================================================
790// GC (Garbage Collection) API
791// ============================================================================
792
793impl<T: Tracer> QueryRuntime<T> {
794    /// Get all query keys currently in the cache.
795    ///
796    /// This is useful for implementing custom garbage collection strategies.
797    /// Use this in combination with [`Tracer::on_query_key`] to track access
798    /// times and implement LRU, TTL, or other GC algorithms externally.
799    ///
800    /// # Example
801    ///
802    /// ```ignore
803    /// // Collect all keys that haven't been accessed recently
804    /// let stale_keys: Vec<_> = runtime.query_keys()
805    ///     .filter(|key| tracker.is_stale(key))
806    ///     .collect();
807    ///
808    /// // Remove stale queries that have no dependents
809    /// for key in stale_keys {
810    ///     runtime.remove_if_unused(&key);
811    /// }
812    /// ```
813    pub fn query_keys(&self) -> Vec<FullCacheKey> {
814        self.whale.keys()
815    }
816
817    /// Remove a query if it has no dependents.
818    ///
819    /// Returns `true` if the query was removed, `false` if it has dependents
820    /// or doesn't exist. This is the safe way to remove queries during GC,
821    /// as it won't break queries that depend on this one.
822    ///
823    /// # Example
824    ///
825    /// ```ignore
826    /// let query = MyQuery::new(cache_key);
827    /// if runtime.remove_query_if_unused(&query) {
828    ///     println!("Query removed");
829    /// } else {
830    ///     println!("Query has dependents, not removed");
831    /// }
832    /// ```
833    pub fn remove_query_if_unused<Q: Query>(&self, query: &Q) -> bool {
834        let full_key = QueryCacheKey::new(query.clone()).into();
835        self.remove_if_unused(&full_key)
836    }
837
838    /// Remove a query by its [`FullCacheKey`].
839    ///
840    /// This is the type-erased version of [`remove_query`](Self::remove_query).
841    /// Use this when you have a `FullCacheKey` from [`query_keys`](Self::query_keys)
842    /// or [`Tracer::on_query_key`].
843    ///
844    /// Returns `true` if the query was removed, `false` if it doesn't exist.
845    ///
846    /// # Warning
847    ///
848    /// This forcibly removes the query even if other queries depend on it.
849    /// Dependent queries will be recomputed on next access. For safe GC,
850    /// use [`remove_if_unused`](Self::remove_if_unused) instead.
851    pub fn remove(&self, key: &FullCacheKey) -> bool {
852        // Remove verifier if exists
853        self.verifiers.remove(key);
854
855        // Remove from whale storage
856        self.whale.remove(key).is_some()
857    }
858
859    /// Remove a query by its [`FullCacheKey`] if it has no dependents.
860    ///
861    /// This is the type-erased version of [`remove_query_if_unused`](Self::remove_query_if_unused).
862    /// Use this when you have a `FullCacheKey` from [`query_keys`](Self::query_keys)
863    /// or [`Tracer::on_query_key`].
864    ///
865    /// Returns `true` if the query was removed, `false` if it has dependents
866    /// or doesn't exist.
867    ///
868    /// # Example
869    ///
870    /// ```ignore
871    /// // Implement LRU GC
872    /// for key in runtime.query_keys() {
873    ///     if tracker.is_expired(&key) {
874    ///         runtime.remove_if_unused(&key);
875    ///     }
876    /// }
877    /// ```
878    pub fn remove_if_unused(&self, key: &FullCacheKey) -> bool {
879        if self.whale.remove_if_unused(key.clone()).is_some() {
880            // Successfully removed - clean up verifier
881            self.verifiers.remove(key);
882            true
883        } else {
884            false
885        }
886    }
887}
888
889// ============================================================================
890// Builder
891// ============================================================================
892
893/// Builder for [`QueryRuntime`] with customizable settings.
894///
895/// # Example
896///
897/// ```ignore
898/// let runtime = QueryRuntime::builder()
899///     .error_comparator(|a, b| {
900///         // Treat all errors of the same type as equal
901///         a.downcast_ref::<std::io::Error>().is_some()
902///             == b.downcast_ref::<std::io::Error>().is_some()
903///     })
904///     .build();
905/// ```
906pub struct QueryRuntimeBuilder<T: Tracer = NoopTracer> {
907    error_comparator: ErrorComparator,
908    tracer: T,
909}
910
911impl Default for QueryRuntimeBuilder<NoopTracer> {
912    fn default() -> Self {
913        Self::new()
914    }
915}
916
917impl QueryRuntimeBuilder<NoopTracer> {
918    /// Create a new builder with default settings.
919    pub fn new() -> Self {
920        Self {
921            error_comparator: default_error_comparator,
922            tracer: NoopTracer,
923        }
924    }
925}
926
927impl<T: Tracer> QueryRuntimeBuilder<T> {
928    /// Set the error comparator function for early cutoff optimization.
929    ///
930    /// When a query returns `QueryError::UserError`, this function is used
931    /// to compare it with the previously cached error. If they are equal,
932    /// downstream queries can skip recomputation (early cutoff).
933    ///
934    /// The default comparator returns `false` for all errors, meaning errors
935    /// are always considered different (conservative, always recomputes).
936    ///
937    /// # Example
938    ///
939    /// ```ignore
940    /// // Treat errors as equal if they have the same display message
941    /// let runtime = QueryRuntime::builder()
942    ///     .error_comparator(|a, b| a.to_string() == b.to_string())
943    ///     .build();
944    /// ```
945    pub fn error_comparator(mut self, f: ErrorComparator) -> Self {
946        self.error_comparator = f;
947        self
948    }
949
950    /// Set the tracer for observability.
951    pub fn tracer<U: Tracer>(self, tracer: U) -> QueryRuntimeBuilder<U> {
952        QueryRuntimeBuilder {
953            error_comparator: self.error_comparator,
954            tracer,
955        }
956    }
957
958    /// Build the runtime with the configured settings.
959    pub fn build(self) -> QueryRuntime<T> {
960        QueryRuntime {
961            whale: WhaleRuntime::new(),
962            locators: Arc::new(LocatorStorage::new()),
963            pending: Arc::new(PendingStorage::new()),
964            query_registry: Arc::new(QueryRegistry::new()),
965            asset_key_registry: Arc::new(AssetKeyRegistry::new()),
966            verifiers: Arc::new(VerifierStorage::new()),
967            error_comparator: self.error_comparator,
968            tracer: Arc::new(self.tracer),
969        }
970    }
971}
972
973// ============================================================================
974// Asset API
975// ============================================================================
976
977impl<T: Tracer> QueryRuntime<T> {
978    /// Register an asset locator for a specific asset key type.
979    ///
980    /// Only one locator can be registered per key type. Later registrations
981    /// replace earlier ones.
982    ///
983    /// # Example
984    ///
985    /// ```ignore
986    /// let runtime = QueryRuntime::new();
987    /// runtime.register_asset_locator(FileSystemLocator::new("/assets"));
988    /// ```
989    pub fn register_asset_locator<K, L>(&self, locator: L)
990    where
991        K: AssetKey,
992        L: AssetLocator<K>,
993    {
994        self.locators.insert::<K, L>(locator);
995    }
996
997    /// Get an iterator over pending asset requests.
998    ///
999    /// Returns assets that have been requested but not yet resolved.
1000    /// The user should fetch these externally and call `resolve_asset()`.
1001    ///
1002    /// # Example
1003    ///
1004    /// ```ignore
1005    /// for pending in runtime.pending_assets() {
1006    ///     if let Some(path) = pending.key::<FilePath>() {
1007    ///         let content = fetch_file(path);
1008    ///         runtime.resolve_asset(path.clone(), content);
1009    ///     }
1010    /// }
1011    /// ```
1012    pub fn pending_assets(&self) -> Vec<PendingAsset> {
1013        self.pending.get_all()
1014    }
1015
1016    /// Get pending assets filtered by key type.
1017    pub fn pending_assets_of<K: AssetKey>(&self) -> Vec<K> {
1018        self.pending.get_of_type::<K>()
1019    }
1020
1021    /// Check if there are any pending assets.
1022    pub fn has_pending_assets(&self) -> bool {
1023        !self.pending.is_empty()
1024    }
1025
1026    /// Resolve an asset with its loaded value.
1027    ///
1028    /// This marks the asset as ready and invalidates any queries that
1029    /// depend on it (if the value changed), triggering recomputation on next access.
1030    ///
1031    /// This method is idempotent - resolving with the same value (via `asset_eq`)
1032    /// will not trigger downstream recomputation.
1033    ///
1034    /// # Arguments
1035    ///
1036    /// * `key` - The asset key identifying this resource
1037    /// * `value` - The loaded asset value
1038    /// * `durability` - How frequently this asset is expected to change
1039    ///
1040    /// # Example
1041    ///
1042    /// ```ignore
1043    /// let content = std::fs::read_to_string(&path)?;
1044    /// runtime.resolve_asset(FilePath(path), content, DurabilityLevel::Volatile);
1045    /// ```
1046    pub fn resolve_asset<K: AssetKey>(&self, key: K, value: K::Asset, durability: DurabilityLevel) {
1047        self.resolve_asset_internal(key, value, durability);
1048    }
1049
1050    /// Resolve an asset with an error.
1051    ///
1052    /// This marks the asset as errored and caches the error. Queries depending
1053    /// on this asset will receive `Err(QueryError::UserError(...))`.
1054    ///
1055    /// Use this when async loading fails (e.g., network error, file not found,
1056    /// access denied).
1057    ///
1058    /// # Arguments
1059    ///
1060    /// * `key` - The asset key identifying this resource
1061    /// * `error` - The error to cache (will be wrapped in `Arc`)
1062    /// * `durability` - How frequently this error state is expected to change
1063    ///
1064    /// # Example
1065    ///
1066    /// ```ignore
1067    /// match fetch_file(&path) {
1068    ///     Ok(content) => runtime.resolve_asset(FilePath(path), content, DurabilityLevel::Volatile),
1069    ///     Err(e) => runtime.resolve_asset_error(FilePath(path), e, DurabilityLevel::Volatile),
1070    /// }
1071    /// ```
1072    pub fn resolve_asset_error<K: AssetKey>(
1073        &self,
1074        key: K,
1075        error: impl Into<anyhow::Error>,
1076        durability: DurabilityLevel,
1077    ) {
1078        let asset_cache_key = AssetCacheKey::new(key.clone());
1079
1080        // Remove from pending BEFORE registering the error
1081        self.pending.remove(&asset_cache_key);
1082
1083        // Prepare the error entry
1084        let error_arc = Arc::new(error.into());
1085        let entry = CachedEntry::AssetError(error_arc.clone());
1086        let durability =
1087            Durability::new(durability.as_u8() as usize).unwrap_or(Durability::volatile());
1088
1089        // Atomic compare-and-update (errors are always considered changed for now)
1090        let result = self
1091            .whale
1092            .update_with_compare(
1093                asset_cache_key.into(),
1094                Some(entry),
1095                |old_data, _new_data| {
1096                    // Compare old and new errors using error_comparator
1097                    match old_data.and_then(|d| d.as_ref()) {
1098                        Some(CachedEntry::AssetError(old_err)) => {
1099                            !(self.error_comparator)(old_err.as_ref(), error_arc.as_ref())
1100                        }
1101                        _ => true, // Loading, Ready, or not present -> changed
1102                    }
1103                },
1104                durability,
1105                vec![],
1106            )
1107            .expect("update_with_compare with no dependencies cannot fail");
1108
1109        // Emit asset resolved event (with changed status)
1110        let asset_cache_key = AssetCacheKey::new(key.clone());
1111        self.tracer
1112            .on_asset_resolved(&asset_cache_key, result.changed);
1113
1114        // Register asset key in registry for list_asset_keys
1115        let is_new_asset = self.asset_key_registry.register(&key);
1116        if is_new_asset {
1117            // Update sentinel to invalidate list_asset_keys dependents
1118            let sentinel = AssetKeySetSentinelKey::new::<K>().into();
1119            let _ = self
1120                .whale
1121                .register(sentinel, None, Durability::stable(), vec![]);
1122        }
1123    }
1124
1125    fn resolve_asset_internal<K: AssetKey>(
1126        &self,
1127        key: K,
1128        value: K::Asset,
1129        durability_level: DurabilityLevel,
1130    ) {
1131        let asset_cache_key = AssetCacheKey::new(key.clone());
1132
1133        // Remove from pending BEFORE registering the value
1134        self.pending.remove(&asset_cache_key);
1135
1136        // Prepare the new entry
1137        let value_arc: Arc<K::Asset> = Arc::new(value);
1138        let entry = CachedEntry::AssetReady(value_arc.clone() as Arc<dyn Any + Send + Sync>);
1139        let durability =
1140            Durability::new(durability_level.as_u8() as usize).unwrap_or(Durability::volatile());
1141
1142        // Atomic compare-and-update
1143        let result = self
1144            .whale
1145            .update_with_compare(
1146                asset_cache_key.into(),
1147                Some(entry),
1148                |old_data, _new_data| {
1149                    // Compare old and new values
1150                    match old_data.and_then(|d| d.as_ref()) {
1151                        Some(CachedEntry::AssetReady(old_arc)) => {
1152                            match old_arc.clone().downcast::<K::Asset>() {
1153                                Ok(old_value) => !K::asset_eq(&old_value, &value_arc),
1154                                Err(_) => true, // Type mismatch, treat as changed
1155                            }
1156                        }
1157                        _ => true, // Loading, NotFound, or not present -> changed
1158                    }
1159                },
1160                durability,
1161                vec![],
1162            )
1163            .expect("update_with_compare with no dependencies cannot fail");
1164
1165        // Emit asset resolved event
1166        let asset_cache_key = AssetCacheKey::new(key.clone());
1167        self.tracer
1168            .on_asset_resolved(&asset_cache_key, result.changed);
1169
1170        // Register asset key in registry for list_asset_keys
1171        let is_new_asset = self.asset_key_registry.register(&key);
1172        if is_new_asset {
1173            // Update sentinel to invalidate list_asset_keys dependents
1174            let sentinel = AssetKeySetSentinelKey::new::<K>().into();
1175            let _ = self
1176                .whale
1177                .register(sentinel, None, Durability::stable(), vec![]);
1178        }
1179    }
1180
1181    /// Invalidate an asset, forcing queries to re-request it.
1182    ///
1183    /// The asset will be marked as loading and added to pending assets.
1184    /// Dependent queries will suspend until the asset is resolved again.
1185    ///
1186    /// # Example
1187    ///
1188    /// ```ignore
1189    /// // File was modified externally
1190    /// runtime.invalidate_asset(&FilePath("config.json".into()));
1191    /// // Queries depending on this asset will now suspend
1192    /// // User should fetch the new value and call resolve_asset
1193    /// ```
1194    pub fn invalidate_asset<K: AssetKey>(&self, key: &K) {
1195        let asset_cache_key = AssetCacheKey::new(key.clone());
1196        let full_cache_key: FullCacheKey = asset_cache_key.clone().into();
1197
1198        // Emit asset invalidated event
1199        self.tracer.on_asset_invalidated(&asset_cache_key);
1200
1201        // Add to pending FIRST (before clearing whale state)
1202        // This ensures: readers see either old value, or Loading+pending
1203        self.pending
1204            .insert::<K>(asset_cache_key.clone(), key.clone());
1205
1206        // Atomic: clear cached value + invalidate dependents
1207        // Using None for data means "needs to be loaded"
1208        // Use stable durability to ensure queries at any durability level see the change.
1209        let _ = self
1210            .whale
1211            .register(full_cache_key, None, Durability::stable(), vec![]);
1212    }
1213
1214    /// Remove an asset from the cache entirely.
1215    ///
1216    /// Unlike `invalidate_asset`, this removes all traces of the asset.
1217    /// Dependent queries will go through the locator again on next access.
1218    pub fn remove_asset<K: AssetKey>(&self, key: &K) {
1219        let asset_cache_key = AssetCacheKey::new(key.clone());
1220        let full_cache_key: FullCacheKey = asset_cache_key.clone().into();
1221
1222        // Remove from pending first
1223        self.pending.remove(&asset_cache_key);
1224
1225        // Remove from whale (this also cleans up dependency edges)
1226        // whale.remove() invalidates dependents before removing
1227        self.whale.remove(&full_cache_key);
1228
1229        // Remove from registry and update sentinel for list_asset_keys
1230        if self.asset_key_registry.remove::<K>(key) {
1231            let sentinel = AssetKeySetSentinelKey::new::<K>().into();
1232            let _ = self
1233                .whale
1234                .register(sentinel, None, Durability::stable(), vec![]);
1235        }
1236    }
1237
1238    /// Get an asset by key without tracking dependencies.
1239    ///
1240    /// Unlike `QueryContext::asset()`, this method does NOT register the caller
1241    /// as a dependent of the asset. Use this for direct asset access outside
1242    /// of query execution.
1243    ///
1244    /// # Returns
1245    ///
1246    /// - `Ok(AssetLoadingState::ready(...))` - Asset is loaded and ready
1247    /// - `Ok(AssetLoadingState::loading(...))` - Asset is still loading (added to pending)
1248    /// - `Err(QueryError::UserError)` - Asset was not found or locator returned an error
1249    pub fn get_asset<K: AssetKey>(&self, key: K) -> Result<AssetLoadingState<K>, QueryError> {
1250        self.get_asset_internal(key)
1251    }
1252
1253    /// Internal: Get asset state and its changed_at revision atomically.
1254    ///
1255    /// This is the "direct" version called from QueryRuntime::asset (no dependency tracking).
1256    /// For calls from QueryContext::asset, use `get_asset_with_revision_ctx`.
1257    ///
1258    /// Returns (AssetLoadingState, changed_at) where changed_at is from the same
1259    /// whale node that provided the asset value.
1260    fn get_asset_with_revision<K: AssetKey>(
1261        &self,
1262        key: K,
1263    ) -> Result<(AssetLoadingState<K>, RevisionCounter), QueryError> {
1264        let asset_cache_key = AssetCacheKey::new(key.clone());
1265        let full_cache_key: FullCacheKey = asset_cache_key.clone().into();
1266
1267        // Create a span for this asset request (like queries do)
1268        // This ensures child queries called from locators show as children of this asset
1269        let asset_span_id = self.tracer.new_span_id();
1270        let (trace_id, parent_span_id) = SPAN_STACK.with(|stack| match &*stack.borrow() {
1271            SpanStack::Empty => (self.tracer.new_trace_id(), None),
1272            SpanStack::Active(tid, spans) => (*tid, spans.last().copied()),
1273        });
1274        let span_ctx = SpanContext {
1275            span_id: asset_span_id,
1276            trace_id,
1277            parent_span_id,
1278        };
1279
1280        // Push asset span to stack so child queries see this asset as their parent
1281        let _span_guard = SpanStackGuard::push(trace_id, asset_span_id);
1282
1283        // Check whale cache first (single atomic read)
1284        if let Some(node) = self.whale.get(&full_cache_key) {
1285            let changed_at = node.changed_at;
1286            // Check if valid at current revision (shallow check)
1287            if self.whale.is_valid(&full_cache_key) {
1288                // Verify dependencies recursively (like query path does)
1289                let mut deps_verified = true;
1290                if let Some(deps) = self.whale.get_dependency_ids(&full_cache_key) {
1291                    for dep in deps {
1292                        if let Some(verifier) = self.verifiers.get(&dep) {
1293                            // Re-run query/asset to verify it (triggers recursive verification)
1294                            if verifier.verify(self as &dyn std::any::Any).is_err() {
1295                                deps_verified = false;
1296                                break;
1297                            }
1298                        }
1299                    }
1300                }
1301
1302                // Re-check validity after deps are verified
1303                if deps_verified && self.whale.is_valid(&full_cache_key) {
1304                    // For cached entries, check consistency for leaf assets (no locator deps).
1305                    // This detects if resolve_asset/resolve_asset_error was called during query execution.
1306                    let has_locator_deps = self
1307                        .whale
1308                        .get_dependency_ids(&full_cache_key)
1309                        .is_some_and(|deps| !deps.is_empty());
1310
1311                    match &node.data {
1312                        Some(CachedEntry::AssetReady(arc)) => {
1313                            // Check consistency for cached leaf assets
1314                            if !has_locator_deps {
1315                                check_leaf_asset_consistency(changed_at)?;
1316                            }
1317                            // Cache hit: start + end immediately (no locator runs)
1318                            self.tracer.on_asset_requested(&span_ctx, &asset_cache_key);
1319                            self.tracer.on_asset_located(
1320                                &span_ctx,
1321                                &asset_cache_key,
1322                                TracerAssetState::Ready,
1323                            );
1324                            match arc.clone().downcast::<K::Asset>() {
1325                                Ok(value) => {
1326                                    return Ok((AssetLoadingState::ready(key, value), changed_at))
1327                                }
1328                                Err(_) => {
1329                                    unreachable!("Asset type mismatch: {:?}", key)
1330                                }
1331                            }
1332                        }
1333                        Some(CachedEntry::AssetError(err)) => {
1334                            // Check consistency for cached leaf errors
1335                            if !has_locator_deps {
1336                                check_leaf_asset_consistency(changed_at)?;
1337                            }
1338                            // Cache hit: start + end immediately (no locator runs)
1339                            self.tracer.on_asset_requested(&span_ctx, &asset_cache_key);
1340                            self.tracer.on_asset_located(
1341                                &span_ctx,
1342                                &asset_cache_key,
1343                                TracerAssetState::NotFound,
1344                            );
1345                            return Err(QueryError::UserError(err.clone()));
1346                        }
1347                        None => {
1348                            // Loading state - no value to be inconsistent with
1349                            // Cache hit: start + end immediately (no locator runs)
1350                            self.tracer.on_asset_requested(&span_ctx, &asset_cache_key);
1351                            self.tracer.on_asset_located(
1352                                &span_ctx,
1353                                &asset_cache_key,
1354                                TracerAssetState::Loading,
1355                            );
1356                            return Ok((AssetLoadingState::loading(key), changed_at));
1357                        }
1358                        _ => {
1359                            // Query-related entries (Ok, UserError) shouldn't be here
1360                            // Fall through to locator
1361                        }
1362                    }
1363                }
1364            }
1365        }
1366
1367        // Not in cache or invalid - try locator
1368        // Use LocatorContext to track deps on the asset itself
1369        check_cycle(&full_cache_key)?;
1370        let _guard = StackGuard::push(full_cache_key.clone());
1371
1372        // Notify tracer BEFORE locator runs (START event) so child queries appear as children
1373        self.tracer.on_asset_requested(&span_ctx, &asset_cache_key);
1374
1375        let locator_ctx = LocatorContext::new(self, full_cache_key.clone());
1376        let locator_result =
1377            self.locators
1378                .locate_with_locator_ctx(TypeId::of::<K>(), &locator_ctx, &key);
1379
1380        if let Some(result) = locator_result {
1381            // Get collected dependencies from the locator context
1382            let locator_deps = locator_ctx.into_deps();
1383            match result {
1384                Ok(ErasedLocateResult::Ready {
1385                    value: arc,
1386                    durability: durability_level,
1387                }) => {
1388                    // END event after locator completes
1389                    self.tracer.on_asset_located(
1390                        &span_ctx,
1391                        &asset_cache_key,
1392                        TracerAssetState::Ready,
1393                    );
1394
1395                    let typed_value: Arc<K::Asset> = match arc.downcast::<K::Asset>() {
1396                        Ok(v) => v,
1397                        Err(_) => {
1398                            unreachable!("Asset type mismatch: {:?}", key);
1399                        }
1400                    };
1401
1402                    // Store in whale atomically with early cutoff
1403                    // Include locator's dependencies so the asset is invalidated when they change
1404                    let entry = CachedEntry::AssetReady(typed_value.clone());
1405                    let durability = Durability::new(durability_level.as_u8() as usize)
1406                        .unwrap_or(Durability::volatile());
1407                    let new_value = typed_value.clone();
1408                    let result = self
1409                        .whale
1410                        .update_with_compare(
1411                            full_cache_key.clone(),
1412                            Some(entry),
1413                            |old_data, _new_data| {
1414                                let Some(CachedEntry::AssetReady(old_arc)) =
1415                                    old_data.and_then(|d| d.as_ref())
1416                                else {
1417                                    return true;
1418                                };
1419                                let Ok(old_value) = old_arc.clone().downcast::<K::Asset>() else {
1420                                    return true;
1421                                };
1422                                !K::asset_eq(&old_value, &new_value)
1423                            },
1424                            durability,
1425                            locator_deps,
1426                        )
1427                        .expect("update_with_compare should succeed");
1428
1429                    // Register verifier for this asset (for verify-then-decide pattern)
1430                    self.verifiers
1431                        .insert_asset::<K, T>(full_cache_key, key.clone());
1432
1433                    return Ok((AssetLoadingState::ready(key, typed_value), result.revision));
1434                }
1435                Ok(ErasedLocateResult::Pending) => {
1436                    // END event after locator completes with pending
1437                    self.tracer.on_asset_located(
1438                        &span_ctx,
1439                        &asset_cache_key,
1440                        TracerAssetState::Loading,
1441                    );
1442
1443                    // Add to pending list for Pending result
1444                    self.pending
1445                        .insert::<K>(asset_cache_key.clone(), key.clone());
1446                    match self
1447                        .whale
1448                        .get_or_insert(full_cache_key, None, Durability::volatile(), locator_deps)
1449                        .expect("get_or_insert should succeed")
1450                    {
1451                        GetOrInsertResult::Inserted(node) => {
1452                            return Ok((AssetLoadingState::loading(key), node.changed_at));
1453                        }
1454                        GetOrInsertResult::Existing(node) => {
1455                            let changed_at = node.changed_at;
1456                            match &node.data {
1457                                Some(CachedEntry::AssetReady(arc)) => {
1458                                    match arc.clone().downcast::<K::Asset>() {
1459                                        Ok(value) => {
1460                                            return Ok((
1461                                                AssetLoadingState::ready(key, value),
1462                                                changed_at,
1463                                            ))
1464                                        }
1465                                        Err(_) => {
1466                                            return Ok((
1467                                                AssetLoadingState::loading(key),
1468                                                changed_at,
1469                                            ))
1470                                        }
1471                                    }
1472                                }
1473                                Some(CachedEntry::AssetError(err)) => {
1474                                    return Err(QueryError::UserError(err.clone()));
1475                                }
1476                                _ => return Ok((AssetLoadingState::loading(key), changed_at)),
1477                            }
1478                        }
1479                    }
1480                }
1481                Err(QueryError::UserError(err)) => {
1482                    // END event after locator completes with error
1483                    self.tracer.on_asset_located(
1484                        &span_ctx,
1485                        &asset_cache_key,
1486                        TracerAssetState::NotFound,
1487                    );
1488                    // Locator returned a user error - cache it as AssetError
1489                    let entry = CachedEntry::AssetError(err.clone());
1490                    let _ = self.whale.register(
1491                        full_cache_key,
1492                        Some(entry),
1493                        Durability::volatile(),
1494                        locator_deps,
1495                    );
1496                    return Err(QueryError::UserError(err));
1497                }
1498                Err(e) => {
1499                    // Other errors (Cycle, Suspended, etc.) - do NOT cache, propagate directly
1500                    return Err(e);
1501                }
1502            }
1503        }
1504
1505        // No locator registered or locator returned None - mark as pending
1506        // (no locator was called, so no deps to track)
1507        // END event - no locator ran
1508        self.tracer
1509            .on_asset_located(&span_ctx, &asset_cache_key, TracerAssetState::Loading);
1510        self.pending
1511            .insert::<K>(asset_cache_key.clone(), key.clone());
1512
1513        match self
1514            .whale
1515            .get_or_insert(full_cache_key, None, Durability::volatile(), vec![])
1516            .expect("get_or_insert with no dependencies cannot fail")
1517        {
1518            GetOrInsertResult::Inserted(node) => {
1519                Ok((AssetLoadingState::loading(key), node.changed_at))
1520            }
1521            GetOrInsertResult::Existing(node) => {
1522                let changed_at = node.changed_at;
1523                match &node.data {
1524                    Some(CachedEntry::AssetReady(arc)) => {
1525                        match arc.clone().downcast::<K::Asset>() {
1526                            Ok(value) => Ok((AssetLoadingState::ready(key, value), changed_at)),
1527                            Err(_) => Ok((AssetLoadingState::loading(key), changed_at)),
1528                        }
1529                    }
1530                    Some(CachedEntry::AssetError(err)) => Err(QueryError::UserError(err.clone())),
1531                    _ => Ok((AssetLoadingState::loading(key), changed_at)),
1532                }
1533            }
1534        }
1535    }
1536
1537    /// Internal: Get asset state and its changed_at revision atomically (with QueryContext).
1538    ///
1539    /// This version is called from QueryContext::asset. Consistency checking for
1540    /// cached leaf assets is done inside this function before returning.
1541    fn get_asset_with_revision_ctx<K: AssetKey>(
1542        &self,
1543        key: K,
1544        _ctx: &QueryContext<'_, T>,
1545    ) -> Result<(AssetLoadingState<K>, RevisionCounter), QueryError> {
1546        let asset_cache_key = AssetCacheKey::new(key.clone());
1547        let full_cache_key: FullCacheKey = asset_cache_key.clone().into();
1548
1549        // Create a span for this asset request (like queries do)
1550        // This ensures child queries called from locators show as children of this asset
1551        let asset_span_id = self.tracer.new_span_id();
1552        let (trace_id, parent_span_id) = SPAN_STACK.with(|stack| match &*stack.borrow() {
1553            SpanStack::Empty => (self.tracer.new_trace_id(), None),
1554            SpanStack::Active(tid, spans) => (*tid, spans.last().copied()),
1555        });
1556        let span_ctx = SpanContext {
1557            span_id: asset_span_id,
1558            trace_id,
1559            parent_span_id,
1560        };
1561
1562        // Push asset span to stack so child queries see this asset as their parent
1563        let _span_guard = SpanStackGuard::push(trace_id, asset_span_id);
1564
1565        // Check whale cache first (single atomic read)
1566        if let Some(node) = self.whale.get(&full_cache_key) {
1567            let changed_at = node.changed_at;
1568            // Check if valid at current revision (shallow check)
1569            if self.whale.is_valid(&full_cache_key) {
1570                // Verify dependencies recursively (like query path does)
1571                let mut deps_verified = true;
1572                if let Some(deps) = self.whale.get_dependency_ids(&full_cache_key) {
1573                    for dep in deps {
1574                        if let Some(verifier) = self.verifiers.get(&dep) {
1575                            // Re-run query/asset to verify it (triggers recursive verification)
1576                            if verifier.verify(self as &dyn std::any::Any).is_err() {
1577                                deps_verified = false;
1578                                break;
1579                            }
1580                        }
1581                    }
1582                }
1583
1584                // Re-check validity after deps are verified
1585                if deps_verified && self.whale.is_valid(&full_cache_key) {
1586                    // For cached entries, check consistency for leaf assets (no locator deps).
1587                    // This detects if resolve_asset/resolve_asset_error was called during query execution.
1588                    let has_locator_deps = self
1589                        .whale
1590                        .get_dependency_ids(&full_cache_key)
1591                        .is_some_and(|deps| !deps.is_empty());
1592
1593                    match &node.data {
1594                        Some(CachedEntry::AssetReady(arc)) => {
1595                            // Check consistency for cached leaf assets
1596                            if !has_locator_deps {
1597                                check_leaf_asset_consistency(changed_at)?;
1598                            }
1599                            // Cache hit: start + end immediately (no locator runs)
1600                            self.tracer.on_asset_requested(&span_ctx, &asset_cache_key);
1601                            self.tracer.on_asset_located(
1602                                &span_ctx,
1603                                &asset_cache_key,
1604                                TracerAssetState::Ready,
1605                            );
1606                            match arc.clone().downcast::<K::Asset>() {
1607                                Ok(value) => {
1608                                    return Ok((AssetLoadingState::ready(key, value), changed_at))
1609                                }
1610                                Err(_) => {
1611                                    unreachable!("Asset type mismatch: {:?}", key)
1612                                }
1613                            }
1614                        }
1615                        Some(CachedEntry::AssetError(err)) => {
1616                            // Check consistency for cached leaf errors
1617                            if !has_locator_deps {
1618                                check_leaf_asset_consistency(changed_at)?;
1619                            }
1620                            // Cache hit: start + end immediately (no locator runs)
1621                            self.tracer.on_asset_requested(&span_ctx, &asset_cache_key);
1622                            self.tracer.on_asset_located(
1623                                &span_ctx,
1624                                &asset_cache_key,
1625                                TracerAssetState::NotFound,
1626                            );
1627                            return Err(QueryError::UserError(err.clone()));
1628                        }
1629                        None => {
1630                            // Loading state - no value to be inconsistent with
1631                            // Cache hit: start + end immediately (no locator runs)
1632                            self.tracer.on_asset_requested(&span_ctx, &asset_cache_key);
1633                            self.tracer.on_asset_located(
1634                                &span_ctx,
1635                                &asset_cache_key,
1636                                TracerAssetState::Loading,
1637                            );
1638                            return Ok((AssetLoadingState::loading(key), changed_at));
1639                        }
1640                        _ => {
1641                            // Query-related entries (Ok, UserError) shouldn't be here
1642                            // Fall through to locator
1643                        }
1644                    }
1645                }
1646            }
1647        }
1648
1649        // Not in cache or invalid - try locator
1650        // Use LocatorContext to track deps on the asset itself (not the calling query)
1651        // Consistency tracking is handled via thread-local storage
1652        check_cycle(&full_cache_key)?;
1653        let _guard = StackGuard::push(full_cache_key.clone());
1654
1655        // START event before locator runs
1656        self.tracer.on_asset_requested(&span_ctx, &asset_cache_key);
1657
1658        let locator_ctx = LocatorContext::new(self, full_cache_key.clone());
1659        let locator_result =
1660            self.locators
1661                .locate_with_locator_ctx(TypeId::of::<K>(), &locator_ctx, &key);
1662
1663        if let Some(result) = locator_result {
1664            // Get collected dependencies from the locator context
1665            let locator_deps = locator_ctx.into_deps();
1666            match result {
1667                Ok(ErasedLocateResult::Ready {
1668                    value: arc,
1669                    durability: durability_level,
1670                }) => {
1671                    // END event after locator completes
1672                    self.tracer.on_asset_located(
1673                        &span_ctx,
1674                        &asset_cache_key,
1675                        TracerAssetState::Ready,
1676                    );
1677
1678                    let typed_value: Arc<K::Asset> = match arc.downcast::<K::Asset>() {
1679                        Ok(v) => v,
1680                        Err(_) => {
1681                            unreachable!("Asset type mismatch: {:?}", key);
1682                        }
1683                    };
1684
1685                    // Store in whale atomically with early cutoff
1686                    // Include locator's dependencies so the asset is invalidated when they change
1687                    let entry = CachedEntry::AssetReady(typed_value.clone());
1688                    let durability = Durability::new(durability_level.as_u8() as usize)
1689                        .unwrap_or(Durability::volatile());
1690                    let new_value = typed_value.clone();
1691                    let result = self
1692                        .whale
1693                        .update_with_compare(
1694                            full_cache_key.clone(),
1695                            Some(entry),
1696                            |old_data, _new_data| {
1697                                let Some(CachedEntry::AssetReady(old_arc)) =
1698                                    old_data.and_then(|d| d.as_ref())
1699                                else {
1700                                    return true;
1701                                };
1702                                let Ok(old_value) = old_arc.clone().downcast::<K::Asset>() else {
1703                                    return true;
1704                                };
1705                                !K::asset_eq(&old_value, &new_value)
1706                            },
1707                            durability,
1708                            locator_deps,
1709                        )
1710                        .expect("update_with_compare should succeed");
1711
1712                    // Register verifier for this asset (for verify-then-decide pattern)
1713                    self.verifiers
1714                        .insert_asset::<K, T>(full_cache_key, key.clone());
1715
1716                    return Ok((AssetLoadingState::ready(key, typed_value), result.revision));
1717                }
1718                Ok(ErasedLocateResult::Pending) => {
1719                    // END event after locator completes with pending
1720                    self.tracer.on_asset_located(
1721                        &span_ctx,
1722                        &asset_cache_key,
1723                        TracerAssetState::Loading,
1724                    );
1725
1726                    // Add to pending list for Pending result
1727                    self.pending
1728                        .insert::<K>(asset_cache_key.clone(), key.clone());
1729                    match self
1730                        .whale
1731                        .get_or_insert(full_cache_key, None, Durability::volatile(), locator_deps)
1732                        .expect("get_or_insert should succeed")
1733                    {
1734                        GetOrInsertResult::Inserted(node) => {
1735                            return Ok((AssetLoadingState::loading(key), node.changed_at));
1736                        }
1737                        GetOrInsertResult::Existing(node) => {
1738                            let changed_at = node.changed_at;
1739                            match &node.data {
1740                                Some(CachedEntry::AssetReady(arc)) => {
1741                                    match arc.clone().downcast::<K::Asset>() {
1742                                        Ok(value) => {
1743                                            return Ok((
1744                                                AssetLoadingState::ready(key, value),
1745                                                changed_at,
1746                                            ));
1747                                        }
1748                                        Err(_) => {
1749                                            return Ok((
1750                                                AssetLoadingState::loading(key),
1751                                                changed_at,
1752                                            ))
1753                                        }
1754                                    }
1755                                }
1756                                Some(CachedEntry::AssetError(err)) => {
1757                                    return Err(QueryError::UserError(err.clone()));
1758                                }
1759                                _ => return Ok((AssetLoadingState::loading(key), changed_at)),
1760                            }
1761                        }
1762                    }
1763                }
1764                Err(QueryError::UserError(err)) => {
1765                    // END event after locator completes with error
1766                    self.tracer.on_asset_located(
1767                        &span_ctx,
1768                        &asset_cache_key,
1769                        TracerAssetState::NotFound,
1770                    );
1771                    // Locator returned a user error - cache it as AssetError
1772                    let entry = CachedEntry::AssetError(err.clone());
1773                    let _ = self.whale.register(
1774                        full_cache_key,
1775                        Some(entry),
1776                        Durability::volatile(),
1777                        locator_deps,
1778                    );
1779                    return Err(QueryError::UserError(err));
1780                }
1781                Err(e) => {
1782                    // Other errors (Cycle, Suspended, etc.) - do NOT cache, propagate directly
1783                    return Err(e);
1784                }
1785            }
1786        }
1787
1788        // No locator registered or locator returned None - mark as pending
1789        // END event - no locator ran
1790        self.tracer
1791            .on_asset_located(&span_ctx, &asset_cache_key, TracerAssetState::Loading);
1792        self.pending
1793            .insert::<K>(asset_cache_key.clone(), key.clone());
1794
1795        match self
1796            .whale
1797            .get_or_insert(full_cache_key, None, Durability::volatile(), vec![])
1798            .expect("get_or_insert with no dependencies cannot fail")
1799        {
1800            GetOrInsertResult::Inserted(node) => {
1801                Ok((AssetLoadingState::loading(key), node.changed_at))
1802            }
1803            GetOrInsertResult::Existing(node) => {
1804                let changed_at = node.changed_at;
1805                match &node.data {
1806                    Some(CachedEntry::AssetReady(arc)) => {
1807                        match arc.clone().downcast::<K::Asset>() {
1808                            Ok(value) => Ok((AssetLoadingState::ready(key, value), changed_at)),
1809                            Err(_) => Ok((AssetLoadingState::loading(key), changed_at)),
1810                        }
1811                    }
1812                    Some(CachedEntry::AssetError(err)) => Err(QueryError::UserError(err.clone())),
1813                    _ => Ok((AssetLoadingState::loading(key), changed_at)),
1814                }
1815            }
1816        }
1817    }
1818
1819    /// Internal: Get asset state, checking cache and locator.
1820    fn get_asset_internal<K: AssetKey>(&self, key: K) -> Result<AssetLoadingState<K>, QueryError> {
1821        self.get_asset_with_revision(key).map(|(state, _)| state)
1822    }
1823}
1824
1825impl<T: Tracer> Db for QueryRuntime<T> {
1826    fn query<Q: Query>(&self, query: Q) -> Result<Arc<Q::Output>, QueryError> {
1827        QueryRuntime::query(self, query)
1828    }
1829
1830    fn asset<K: AssetKey>(&self, key: K) -> Result<Arc<K::Asset>, QueryError> {
1831        self.get_asset_internal(key)?.suspend()
1832    }
1833
1834    fn asset_state<K: AssetKey>(&self, key: K) -> Result<AssetLoadingState<K>, QueryError> {
1835        self.get_asset_internal(key)
1836    }
1837
1838    fn list_queries<Q: Query>(&self) -> Vec<Q> {
1839        self.query_registry.get_all::<Q>()
1840    }
1841
1842    fn list_asset_keys<K: AssetKey>(&self) -> Vec<K> {
1843        self.asset_key_registry.get_all::<K>()
1844    }
1845}
1846
1847/// Tracks consistency of leaf asset accesses during query execution.
1848///
1849/// A "leaf" asset is one without dependencies (externally resolved via `resolve_asset`).
1850/// This tracker ensures that all leaf assets accessed during a query execution
1851/// (including those accessed by locators) are consistent - i.e., none were modified
1852/// via `resolve_asset` mid-execution.
1853///
1854/// The tracker is shared across `QueryContext` and `LocatorContext` to propagate
1855/// consistency checking through the entire execution tree.
1856#[derive(Debug)]
1857pub(crate) struct ConsistencyTracker {
1858    /// Global revision at query start. All leaf assets must have changed_at <= this.
1859    start_revision: RevisionCounter,
1860}
1861
1862impl ConsistencyTracker {
1863    /// Create a new tracker with the given start revision.
1864    pub fn new(start_revision: RevisionCounter) -> Self {
1865        Self { start_revision }
1866    }
1867
1868    /// Check consistency for a leaf asset access.
1869    ///
1870    /// A leaf asset is consistent if its changed_at <= start_revision.
1871    /// This detects if resolve_asset was called during query execution.
1872    ///
1873    /// Returns Ok(()) if consistent, Err if inconsistent.
1874    pub fn check_leaf_asset(&self, dep_changed_at: RevisionCounter) -> Result<(), QueryError> {
1875        if dep_changed_at > self.start_revision {
1876            Err(QueryError::InconsistentAssetResolution)
1877        } else {
1878            Ok(())
1879        }
1880    }
1881}
1882
1883/// Context provided to queries during execution.
1884///
1885/// Use this to access dependencies via `query()`.
1886pub struct QueryContext<'a, T: Tracer = NoopTracer> {
1887    runtime: &'a QueryRuntime<T>,
1888    current_key: FullCacheKey,
1889    exec_ctx: ExecutionContext,
1890    deps: RefCell<Vec<FullCacheKey>>,
1891}
1892
1893impl<'a, T: Tracer> QueryContext<'a, T> {
1894    /// Query a dependency.
1895    ///
1896    /// The dependency is automatically tracked for invalidation.
1897    ///
1898    /// # Example
1899    ///
1900    /// ```ignore
1901    /// fn query(self, db: &impl Db) -> Result<Self::Output, QueryError> {
1902    ///     let dep_result = db.query(OtherQuery { id: self.id })?;
1903    ///     Ok(process(&dep_result))
1904    /// }
1905    /// ```
1906    pub fn query<Q: Query>(&self, query: Q) -> Result<Arc<Q::Output>, QueryError> {
1907        let full_key: FullCacheKey = QueryCacheKey::new(query.clone()).into();
1908
1909        // Emit dependency registered event
1910        self.runtime.tracer.on_dependency_registered(
1911            self.exec_ctx.span_ctx(),
1912            &self.current_key,
1913            &full_key,
1914        );
1915
1916        // Record this as a dependency
1917        self.deps.borrow_mut().push(full_key);
1918
1919        // Execute the query
1920        self.runtime.query(query)
1921    }
1922
1923    /// Access an asset, tracking it as a dependency.
1924    ///
1925    /// Returns the asset value if ready, or `Err(QueryError::Suspend)` if still loading.
1926    /// Use this with the `?` operator for automatic suspension on loading.
1927    ///
1928    /// # Example
1929    ///
1930    /// ```ignore
1931    /// #[query]
1932    /// fn process_file(db: &impl Db, path: FilePath) -> Result<Output, QueryError> {
1933    ///     let content = db.asset(path)?;
1934    ///     // Process content...
1935    ///     Ok(output)
1936    /// }
1937    /// ```
1938    ///
1939    /// # Errors
1940    ///
1941    /// - Returns `Err(QueryError::Suspend)` if the asset is still loading.
1942    /// - Returns `Err(QueryError::UserError)` if the asset was not found.
1943    pub fn asset<K: AssetKey>(&self, key: K) -> Result<Arc<K::Asset>, QueryError> {
1944        self.asset_state(key)?.suspend()
1945    }
1946
1947    /// Access an asset's loading state, tracking it as a dependency.
1948    ///
1949    /// Unlike [`asset()`](Self::asset), this method returns the full loading state,
1950    /// allowing you to check if an asset is loading without triggering suspension.
1951    ///
1952    /// # Example
1953    ///
1954    /// ```ignore
1955    /// let state = db.asset_state(key)?;
1956    /// if state.is_loading() {
1957    ///     // Handle loading case explicitly
1958    /// } else {
1959    ///     let value = state.get().unwrap();
1960    /// }
1961    /// ```
1962    ///
1963    /// # Errors
1964    ///
1965    /// Returns `Err(QueryError::UserError)` if the asset was not found.
1966    pub fn asset_state<K: AssetKey>(&self, key: K) -> Result<AssetLoadingState<K>, QueryError> {
1967        let full_cache_key: FullCacheKey = AssetCacheKey::new(key.clone()).into();
1968
1969        // 1. Emit asset dependency registered event
1970        self.runtime.tracer.on_asset_dependency_registered(
1971            self.exec_ctx.span_ctx(),
1972            &self.current_key,
1973            &full_cache_key,
1974        );
1975
1976        // 2. Record dependency on this asset
1977        self.deps.borrow_mut().push(full_cache_key);
1978
1979        // 3. Get asset from cache/locator
1980        // Consistency checking for cached leaf assets is done inside get_asset_with_revision_ctx
1981        let (state, _changed_at) = self.runtime.get_asset_with_revision_ctx(key, self)?;
1982
1983        Ok(state)
1984    }
1985
1986    /// List all query instances of type Q that have been registered.
1987    ///
1988    /// This method establishes a dependency on the "set" of queries of type Q.
1989    /// The calling query will be invalidated when:
1990    /// - A new query of type Q is first executed (added to set)
1991    ///
1992    /// The calling query will NOT be invalidated when:
1993    /// - An individual query of type Q has its value change
1994    ///
1995    /// # Example
1996    ///
1997    /// ```ignore
1998    /// #[query]
1999    /// fn all_results(db: &impl Db) -> Result<Vec<i32>, QueryError> {
2000    ///     let queries = db.list_queries::<MyQuery>();
2001    ///     let mut results = Vec::new();
2002    ///     for q in queries {
2003    ///         results.push(*db.query(q)?);
2004    ///     }
2005    ///     Ok(results)
2006    /// }
2007    /// ```
2008    pub fn list_queries<Q: Query>(&self) -> Vec<Q> {
2009        // Record dependency on the sentinel (set-level dependency)
2010        let sentinel: FullCacheKey = QuerySetSentinelKey::new::<Q>().into();
2011
2012        self.runtime.tracer.on_dependency_registered(
2013            self.exec_ctx.span_ctx(),
2014            &self.current_key,
2015            &sentinel,
2016        );
2017
2018        // Ensure sentinel exists in whale (for dependency tracking)
2019        if self.runtime.whale.get(&sentinel).is_none() {
2020            let _ =
2021                self.runtime
2022                    .whale
2023                    .register(sentinel.clone(), None, Durability::volatile(), vec![]);
2024        }
2025
2026        self.deps.borrow_mut().push(sentinel);
2027
2028        // Return all registered queries
2029        self.runtime.query_registry.get_all::<Q>()
2030    }
2031
2032    /// List all asset keys of type K that have been registered.
2033    ///
2034    /// This method establishes a dependency on the "set" of asset keys of type K.
2035    /// The calling query will be invalidated when:
2036    /// - A new asset of type K is resolved for the first time (added to set)
2037    /// - An asset of type K is removed via remove_asset
2038    ///
2039    /// The calling query will NOT be invalidated when:
2040    /// - An individual asset's value changes (use `db.asset()` for that)
2041    ///
2042    /// # Example
2043    ///
2044    /// ```ignore
2045    /// #[query]
2046    /// fn all_configs(db: &impl Db) -> Result<Vec<String>, QueryError> {
2047    ///     let keys = db.list_asset_keys::<ConfigFile>();
2048    ///     let mut contents = Vec::new();
2049    ///     for key in keys {
2050    ///         let content = db.asset(key)?;
2051    ///         contents.push((*content).clone());
2052    ///     }
2053    ///     Ok(contents)
2054    /// }
2055    /// ```
2056    pub fn list_asset_keys<K: AssetKey>(&self) -> Vec<K> {
2057        // Record dependency on the sentinel (set-level dependency)
2058        let sentinel: FullCacheKey = AssetKeySetSentinelKey::new::<K>().into();
2059
2060        self.runtime.tracer.on_asset_dependency_registered(
2061            self.exec_ctx.span_ctx(),
2062            &self.current_key,
2063            &sentinel,
2064        );
2065
2066        // Ensure sentinel exists in whale (for dependency tracking)
2067        if self.runtime.whale.get(&sentinel).is_none() {
2068            let _ =
2069                self.runtime
2070                    .whale
2071                    .register(sentinel.clone(), None, Durability::volatile(), vec![]);
2072        }
2073
2074        self.deps.borrow_mut().push(sentinel);
2075
2076        // Return all registered asset keys
2077        self.runtime.asset_key_registry.get_all::<K>()
2078    }
2079}
2080
2081impl<'a, T: Tracer> Db for QueryContext<'a, T> {
2082    fn query<Q: Query>(&self, query: Q) -> Result<Arc<Q::Output>, QueryError> {
2083        QueryContext::query(self, query)
2084    }
2085
2086    fn asset<K: AssetKey>(&self, key: K) -> Result<Arc<K::Asset>, QueryError> {
2087        QueryContext::asset(self, key)
2088    }
2089
2090    fn asset_state<K: AssetKey>(&self, key: K) -> Result<AssetLoadingState<K>, QueryError> {
2091        QueryContext::asset_state(self, key)
2092    }
2093
2094    fn list_queries<Q: Query>(&self) -> Vec<Q> {
2095        QueryContext::list_queries(self)
2096    }
2097
2098    fn list_asset_keys<K: AssetKey>(&self) -> Vec<K> {
2099        QueryContext::list_asset_keys(self)
2100    }
2101}
2102
2103/// Context for collecting dependencies during asset locator execution.
2104///
2105/// Unlike `QueryContext`, this is specifically for locators and does not
2106/// register dependencies on any parent query. Dependencies collected here
2107/// are stored with the asset itself.
2108pub(crate) struct LocatorContext<'a, T: Tracer> {
2109    runtime: &'a QueryRuntime<T>,
2110    deps: RefCell<Vec<FullCacheKey>>,
2111}
2112
2113impl<'a, T: Tracer> LocatorContext<'a, T> {
2114    /// Create a new locator context for the given asset key.
2115    ///
2116    /// Consistency tracking is handled via thread-local storage, so leaf asset
2117    /// accesses will be checked against any active tracker from a parent query.
2118    pub(crate) fn new(runtime: &'a QueryRuntime<T>, _asset_key: FullCacheKey) -> Self {
2119        Self {
2120            runtime,
2121            deps: RefCell::new(Vec::new()),
2122        }
2123    }
2124
2125    /// Consume this context and return the collected dependencies.
2126    pub(crate) fn into_deps(self) -> Vec<FullCacheKey> {
2127        self.deps.into_inner()
2128    }
2129}
2130
2131impl<T: Tracer> Db for LocatorContext<'_, T> {
2132    fn query<Q: Query>(&self, query: Q) -> Result<Arc<Q::Output>, QueryError> {
2133        let full_key = QueryCacheKey::new(query.clone()).into();
2134
2135        // Record this as a dependency of the asset being located
2136        self.deps.borrow_mut().push(full_key);
2137
2138        // Execute the query via the runtime
2139        self.runtime.query(query)
2140    }
2141
2142    fn asset<K: AssetKey>(&self, key: K) -> Result<Arc<K::Asset>, QueryError> {
2143        self.asset_state(key)?.suspend()
2144    }
2145
2146    fn asset_state<K: AssetKey>(&self, key: K) -> Result<AssetLoadingState<K>, QueryError> {
2147        let full_cache_key = AssetCacheKey::new(key.clone()).into();
2148
2149        // Record this as a dependency of the asset being located
2150        self.deps.borrow_mut().push(full_cache_key);
2151
2152        // Access the asset - consistency checking is done inside get_asset_with_revision
2153        let (state, _changed_at) = self.runtime.get_asset_with_revision(key)?;
2154
2155        Ok(state)
2156    }
2157
2158    fn list_queries<Q: Query>(&self) -> Vec<Q> {
2159        self.runtime.list_queries()
2160    }
2161
2162    fn list_asset_keys<K: AssetKey>(&self) -> Vec<K> {
2163        self.runtime.list_asset_keys()
2164    }
2165}
2166
2167#[cfg(test)]
2168mod tests {
2169    use super::*;
2170
2171    #[test]
2172    fn test_simple_query() {
2173        #[derive(Clone, Debug, Hash, PartialEq, Eq)]
2174        struct Add {
2175            a: i32,
2176            b: i32,
2177        }
2178
2179        impl Query for Add {
2180            type Output = i32;
2181
2182            fn query(self, _db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
2183                Ok(Arc::new(self.a + self.b))
2184            }
2185
2186            fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
2187                old == new
2188            }
2189        }
2190
2191        let runtime = QueryRuntime::new();
2192
2193        let result = runtime.query(Add { a: 1, b: 2 }).unwrap();
2194        assert_eq!(*result, 3);
2195
2196        // Second query should be cached
2197        let result2 = runtime.query(Add { a: 1, b: 2 }).unwrap();
2198        assert_eq!(*result2, 3);
2199    }
2200
2201    #[test]
2202    fn test_dependent_queries() {
2203        #[derive(Clone, Debug, Hash, PartialEq, Eq)]
2204        struct Base {
2205            value: i32,
2206        }
2207
2208        impl Query for Base {
2209            type Output = i32;
2210
2211            fn query(self, _db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
2212                Ok(Arc::new(self.value * 2))
2213            }
2214
2215            fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
2216                old == new
2217            }
2218        }
2219
2220        #[derive(Clone, Debug, Hash, PartialEq, Eq)]
2221        struct Derived {
2222            base_value: i32,
2223        }
2224
2225        impl Query for Derived {
2226            type Output = i32;
2227
2228            fn query(self, db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
2229                let base = db.query(Base {
2230                    value: self.base_value,
2231                })?;
2232                Ok(Arc::new(*base + 10))
2233            }
2234
2235            fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
2236                old == new
2237            }
2238        }
2239
2240        let runtime = QueryRuntime::new();
2241
2242        let result = runtime.query(Derived { base_value: 5 }).unwrap();
2243        assert_eq!(*result, 20); // 5 * 2 + 10
2244    }
2245
2246    #[test]
2247    fn test_cycle_detection() {
2248        #[derive(Clone, Debug, Hash, PartialEq, Eq)]
2249        struct CycleA {
2250            id: i32,
2251        }
2252
2253        #[derive(Clone, Debug, Hash, PartialEq, Eq)]
2254        struct CycleB {
2255            id: i32,
2256        }
2257
2258        impl Query for CycleA {
2259            type Output = i32;
2260
2261            fn query(self, db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
2262                let b = db.query(CycleB { id: self.id })?;
2263                Ok(Arc::new(*b + 1))
2264            }
2265
2266            fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
2267                old == new
2268            }
2269        }
2270
2271        impl Query for CycleB {
2272            type Output = i32;
2273
2274            fn query(self, db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
2275                let a = db.query(CycleA { id: self.id })?;
2276                Ok(Arc::new(*a + 1))
2277            }
2278
2279            fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
2280                old == new
2281            }
2282        }
2283
2284        let runtime = QueryRuntime::new();
2285
2286        let result = runtime.query(CycleA { id: 1 });
2287        assert!(matches!(result, Err(QueryError::Cycle { .. })));
2288    }
2289
2290    #[test]
2291    fn test_fallible_query() {
2292        #[derive(Clone, Debug, Hash, PartialEq, Eq)]
2293        struct ParseInt {
2294            input: String,
2295        }
2296
2297        impl Query for ParseInt {
2298            type Output = Result<i32, std::num::ParseIntError>;
2299
2300            fn query(self, _db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
2301                Ok(Arc::new(self.input.parse()))
2302            }
2303
2304            fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
2305                old == new
2306            }
2307        }
2308
2309        let runtime = QueryRuntime::new();
2310
2311        // Valid parse
2312        let result = runtime
2313            .query(ParseInt {
2314                input: "42".to_string(),
2315            })
2316            .unwrap();
2317        assert_eq!(*result, Ok(42));
2318
2319        // Invalid parse - system succeeds, user error in output
2320        let result = runtime
2321            .query(ParseInt {
2322                input: "not_a_number".to_string(),
2323            })
2324            .unwrap();
2325        assert!(result.is_err());
2326    }
2327
2328    // Macro tests
2329    mod macro_tests {
2330        use super::*;
2331        use crate::query;
2332
2333        #[query]
2334        fn add(db: &impl Db, a: i32, b: i32) -> Result<i32, QueryError> {
2335            let _ = db; // silence unused warning
2336            Ok(a + b)
2337        }
2338
2339        #[test]
2340        fn test_macro_basic() {
2341            let runtime = QueryRuntime::new();
2342            let result = runtime.query(Add::new(1, 2)).unwrap();
2343            assert_eq!(*result, 3);
2344        }
2345
2346        #[query]
2347        fn simple_double(db: &impl Db, x: i32) -> Result<i32, QueryError> {
2348            let _ = db;
2349            Ok(x * 2)
2350        }
2351
2352        #[test]
2353        fn test_macro_simple() {
2354            let runtime = QueryRuntime::new();
2355            let result = runtime.query(SimpleDouble::new(5)).unwrap();
2356            assert_eq!(*result, 10);
2357        }
2358
2359        #[query(keys(id))]
2360        fn with_key_selection(
2361            db: &impl Db,
2362            id: u32,
2363            include_extra: bool,
2364        ) -> Result<String, QueryError> {
2365            let _ = db;
2366            Ok(format!("id={}, extra={}", id, include_extra))
2367        }
2368
2369        #[test]
2370        fn test_macro_key_selection() {
2371            let runtime = QueryRuntime::new();
2372
2373            // Same id, different include_extra - should return cached
2374            let r1 = runtime.query(WithKeySelection::new(1, true)).unwrap();
2375            let r2 = runtime.query(WithKeySelection::new(1, false)).unwrap();
2376
2377            // Both should have same value because only `id` is the key
2378            assert_eq!(*r1, "id=1, extra=true");
2379            assert_eq!(*r2, "id=1, extra=true"); // Cached!
2380        }
2381
2382        #[query]
2383        fn dependent(db: &impl Db, a: i32, b: i32) -> Result<i32, QueryError> {
2384            let sum = db.query(Add::new(a, b))?;
2385            Ok(*sum * 2)
2386        }
2387
2388        #[test]
2389        fn test_macro_dependencies() {
2390            let runtime = QueryRuntime::new();
2391            let result = runtime.query(Dependent::new(3, 4)).unwrap();
2392            assert_eq!(*result, 14); // (3 + 4) * 2
2393        }
2394
2395        #[query(output_eq)]
2396        fn with_output_eq(db: &impl Db, x: i32) -> Result<i32, QueryError> {
2397            let _ = db;
2398            Ok(x * 2)
2399        }
2400
2401        #[test]
2402        fn test_macro_output_eq() {
2403            let runtime = QueryRuntime::new();
2404            let result = runtime.query(WithOutputEq::new(5)).unwrap();
2405            assert_eq!(*result, 10);
2406        }
2407
2408        #[query(name = "CustomName")]
2409        fn original_name(db: &impl Db, x: i32) -> Result<i32, QueryError> {
2410            let _ = db;
2411            Ok(x)
2412        }
2413
2414        #[test]
2415        fn test_macro_custom_name() {
2416            let runtime = QueryRuntime::new();
2417            let result = runtime.query(CustomName::new(42)).unwrap();
2418            assert_eq!(*result, 42);
2419        }
2420
2421        // Test that attribute macros like #[tracing::instrument] are preserved
2422        // We use #[allow(unused_variables)] and #[inline] as test attributes since
2423        // they don't require external dependencies.
2424        #[allow(unused_variables)]
2425        #[inline]
2426        #[query]
2427        fn with_attributes(db: &impl Db, x: i32) -> Result<i32, QueryError> {
2428            // This would warn without #[allow(unused_variables)] on the generated method
2429            let unused_var = 42;
2430            Ok(x * 2)
2431        }
2432
2433        #[test]
2434        fn test_macro_preserves_attributes() {
2435            let runtime = QueryRuntime::new();
2436            // If attributes weren't preserved, this might warn about unused_var
2437            let result = runtime.query(WithAttributes::new(5)).unwrap();
2438            assert_eq!(*result, 10);
2439        }
2440    }
2441
2442    // Tests for poll() and changed_at()
2443    mod poll_tests {
2444        use super::*;
2445
2446        #[derive(Clone, Debug, Hash, PartialEq, Eq)]
2447        struct Counter {
2448            id: i32,
2449        }
2450
2451        impl Query for Counter {
2452            type Output = i32;
2453
2454            fn query(self, _db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
2455                Ok(Arc::new(self.id * 10))
2456            }
2457
2458            fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
2459                old == new
2460            }
2461        }
2462
2463        #[test]
2464        fn test_poll_returns_value_and_revision() {
2465            let runtime = QueryRuntime::new();
2466
2467            let result = runtime.poll(Counter { id: 1 }).unwrap();
2468
2469            // Value should be correct - access through Result and Arc
2470            assert_eq!(**result.value.as_ref().unwrap(), 10);
2471
2472            // Revision should be non-zero after first execution
2473            assert!(result.revision > 0);
2474        }
2475
2476        #[test]
2477        fn test_poll_revision_stable_on_cache_hit() {
2478            let runtime = QueryRuntime::new();
2479
2480            // First poll
2481            let result1 = runtime.poll(Counter { id: 1 }).unwrap();
2482            let rev1 = result1.revision;
2483
2484            // Second poll (cache hit)
2485            let result2 = runtime.poll(Counter { id: 1 }).unwrap();
2486            let rev2 = result2.revision;
2487
2488            // Revision should be the same (no change)
2489            assert_eq!(rev1, rev2);
2490        }
2491
2492        #[test]
2493        fn test_poll_revision_changes_on_invalidate() {
2494            let runtime = QueryRuntime::new();
2495
2496            // First poll
2497            let result1 = runtime.poll(Counter { id: 1 }).unwrap();
2498            let rev1 = result1.revision;
2499
2500            // Invalidate and poll again
2501            runtime.invalidate(&Counter { id: 1 });
2502            let result2 = runtime.poll(Counter { id: 1 }).unwrap();
2503            let rev2 = result2.revision;
2504
2505            // Revision should increase (value was recomputed)
2506            // Note: Since output_eq returns true (same value), this might not change
2507            // depending on early cutoff behavior. Let's verify the value is still correct.
2508            assert_eq!(**result2.value.as_ref().unwrap(), 10);
2509
2510            // With early cutoff, revision might stay the same if value didn't change
2511            // This is expected behavior
2512            assert!(rev2 >= rev1);
2513        }
2514
2515        #[test]
2516        fn test_changed_at_returns_none_for_unexecuted_query() {
2517            let runtime = QueryRuntime::new();
2518
2519            // Query has never been executed
2520            let rev = runtime.changed_at(&Counter { id: 1 });
2521            assert!(rev.is_none());
2522        }
2523
2524        #[test]
2525        fn test_changed_at_returns_revision_after_execution() {
2526            let runtime = QueryRuntime::new();
2527
2528            // Execute the query
2529            let _ = runtime.query(Counter { id: 1 }).unwrap();
2530
2531            // Now changed_at should return Some
2532            let rev = runtime.changed_at(&Counter { id: 1 });
2533            assert!(rev.is_some());
2534            assert!(rev.unwrap() > 0);
2535        }
2536
2537        #[test]
2538        fn test_changed_at_matches_poll_revision() {
2539            let runtime = QueryRuntime::new();
2540
2541            // Poll the query
2542            let result = runtime.poll(Counter { id: 1 }).unwrap();
2543
2544            // changed_at should match the revision from poll
2545            let rev = runtime.changed_at(&Counter { id: 1 });
2546            assert_eq!(rev, Some(result.revision));
2547        }
2548
2549        #[test]
2550        fn test_poll_value_access() {
2551            let runtime = QueryRuntime::new();
2552
2553            let result = runtime.poll(Counter { id: 5 }).unwrap();
2554
2555            // Access through Result and Arc
2556            let value: &i32 = result.value.as_ref().unwrap();
2557            assert_eq!(*value, 50);
2558
2559            // Access Arc directly via field after unwrapping Result
2560            let arc: &Arc<i32> = result.value.as_ref().unwrap();
2561            assert_eq!(**arc, 50);
2562        }
2563
2564        #[test]
2565        fn test_subscription_pattern() {
2566            let runtime = QueryRuntime::new();
2567
2568            // Simulate subscription pattern
2569            let mut last_revision: RevisionCounter = 0;
2570            let mut notifications = 0;
2571
2572            // First poll - should notify (new value)
2573            let result = runtime.poll(Counter { id: 1 }).unwrap();
2574            if result.revision > last_revision {
2575                notifications += 1;
2576                last_revision = result.revision;
2577            }
2578
2579            // Second poll - should NOT notify (no change)
2580            let result = runtime.poll(Counter { id: 1 }).unwrap();
2581            if result.revision > last_revision {
2582                notifications += 1;
2583                last_revision = result.revision;
2584            }
2585
2586            // Third poll - should NOT notify (no change)
2587            let result = runtime.poll(Counter { id: 1 }).unwrap();
2588            if result.revision > last_revision {
2589                notifications += 1;
2590                #[allow(unused_assignments)]
2591                {
2592                    last_revision = result.revision;
2593                }
2594            }
2595
2596            // Only the first poll should have triggered a notification
2597            assert_eq!(notifications, 1);
2598        }
2599    }
2600
2601    // Tests for GC APIs
2602    mod gc_tests {
2603        use super::*;
2604        use crate::tracer::{SpanContext, SpanId, TraceId};
2605        use std::collections::HashSet;
2606        use std::sync::atomic::{AtomicUsize, Ordering};
2607        use std::sync::Mutex;
2608
2609        #[derive(Clone, Debug, Hash, PartialEq, Eq)]
2610        struct Leaf {
2611            id: i32,
2612        }
2613
2614        impl Query for Leaf {
2615            type Output = i32;
2616
2617            fn query(self, _db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
2618                Ok(Arc::new(self.id * 10))
2619            }
2620
2621            fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
2622                old == new
2623            }
2624        }
2625
2626        #[derive(Clone, Debug, Hash, PartialEq, Eq)]
2627        struct Parent {
2628            child_id: i32,
2629        }
2630
2631        impl Query for Parent {
2632            type Output = i32;
2633
2634            fn query(self, db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
2635                let child = db.query(Leaf { id: self.child_id })?;
2636                Ok(Arc::new(*child + 1))
2637            }
2638
2639            fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
2640                old == new
2641            }
2642        }
2643
2644        #[test]
2645        fn test_query_keys_returns_all_cached_queries() {
2646            let runtime = QueryRuntime::new();
2647
2648            // Execute some queries
2649            let _ = runtime.query(Leaf { id: 1 }).unwrap();
2650            let _ = runtime.query(Leaf { id: 2 }).unwrap();
2651            let _ = runtime.query(Leaf { id: 3 }).unwrap();
2652
2653            // Get all keys
2654            let keys = runtime.query_keys();
2655
2656            // Should have at least 3 keys (might have more due to sentinels)
2657            assert!(keys.len() >= 3);
2658        }
2659
2660        #[test]
2661        fn test_remove_removes_query() {
2662            let runtime = QueryRuntime::new();
2663
2664            // Execute a query
2665            let _ = runtime.query(Leaf { id: 1 }).unwrap();
2666
2667            // Get the key
2668            let full_key = QueryCacheKey::new(Leaf { id: 1 }).into();
2669
2670            // Query should exist
2671            assert!(runtime.changed_at(&Leaf { id: 1 }).is_some());
2672
2673            // Remove it
2674            assert!(runtime.remove(&full_key));
2675
2676            // Query should no longer exist
2677            assert!(runtime.changed_at(&Leaf { id: 1 }).is_none());
2678        }
2679
2680        #[test]
2681        fn test_remove_if_unused_removes_leaf_query() {
2682            let runtime = QueryRuntime::new();
2683
2684            // Execute a leaf query (no dependents)
2685            let _ = runtime.query(Leaf { id: 1 }).unwrap();
2686
2687            // Should be removable since no other query depends on it
2688            assert!(runtime.remove_query_if_unused(&Leaf { id: 1 }));
2689
2690            // Query should no longer exist
2691            assert!(runtime.changed_at(&Leaf { id: 1 }).is_none());
2692        }
2693
2694        #[test]
2695        fn test_remove_if_unused_does_not_remove_query_with_dependents() {
2696            let runtime = QueryRuntime::new();
2697
2698            // Execute parent query (which depends on Leaf)
2699            let _ = runtime.query(Parent { child_id: 1 }).unwrap();
2700
2701            // Leaf query should not be removable since Parent depends on it
2702            assert!(!runtime.remove_query_if_unused(&Leaf { id: 1 }));
2703
2704            // Leaf query should still exist
2705            assert!(runtime.changed_at(&Leaf { id: 1 }).is_some());
2706
2707            // But Parent should be removable (no dependents)
2708            assert!(runtime.remove_query_if_unused(&Parent { child_id: 1 }));
2709        }
2710
2711        #[test]
2712        fn test_remove_if_unused_with_full_cache_key() {
2713            let runtime = QueryRuntime::new();
2714
2715            // Execute a leaf query
2716            let _ = runtime.query(Leaf { id: 1 }).unwrap();
2717
2718            let full_key = QueryCacheKey::new(Leaf { id: 1 }).into();
2719
2720            // Should be removable via FullCacheKey
2721            assert!(runtime.remove_if_unused(&full_key));
2722
2723            // Query should no longer exist
2724            assert!(runtime.changed_at(&Leaf { id: 1 }).is_none());
2725        }
2726
2727        // Test tracer receives on_query_start calls (for GC tracking)
2728        struct GcTracker {
2729            accessed_keys: Mutex<HashSet<String>>,
2730            access_count: AtomicUsize,
2731        }
2732
2733        impl GcTracker {
2734            fn new() -> Self {
2735                Self {
2736                    accessed_keys: Mutex::new(HashSet::new()),
2737                    access_count: AtomicUsize::new(0),
2738                }
2739            }
2740        }
2741
2742        impl Tracer for GcTracker {
2743            fn new_span_id(&self) -> SpanId {
2744                SpanId(1)
2745            }
2746
2747            fn new_trace_id(&self) -> TraceId {
2748                TraceId(1)
2749            }
2750
2751            fn on_query_start(&self, _ctx: &SpanContext, query_key: &QueryCacheKey) {
2752                self.accessed_keys
2753                    .lock()
2754                    .unwrap()
2755                    .insert(query_key.debug_repr().to_string());
2756                self.access_count.fetch_add(1, Ordering::Relaxed);
2757            }
2758        }
2759
2760        #[test]
2761        fn test_tracer_receives_on_query_start() {
2762            let tracker = GcTracker::new();
2763            let runtime = QueryRuntime::with_tracer(tracker);
2764
2765            // Execute some queries
2766            let _ = runtime.query(Leaf { id: 1 }).unwrap();
2767            let _ = runtime.query(Leaf { id: 2 }).unwrap();
2768
2769            // Tracer should have received on_query_start calls
2770            let count = runtime.tracer().access_count.load(Ordering::Relaxed);
2771            assert_eq!(count, 2);
2772
2773            // Check that the keys were recorded
2774            let keys = runtime.tracer().accessed_keys.lock().unwrap();
2775            assert!(keys.iter().any(|k| k.contains("Leaf")));
2776        }
2777
2778        #[test]
2779        fn test_tracer_receives_on_query_start_for_cache_hits() {
2780            let tracker = GcTracker::new();
2781            let runtime = QueryRuntime::with_tracer(tracker);
2782
2783            // Execute query twice (second is cache hit)
2784            let _ = runtime.query(Leaf { id: 1 }).unwrap();
2785            let _ = runtime.query(Leaf { id: 1 }).unwrap();
2786
2787            // Tracer should have received on_query_start for both calls
2788            let count = runtime.tracer().access_count.load(Ordering::Relaxed);
2789            assert_eq!(count, 2);
2790        }
2791
2792        #[test]
2793        fn test_gc_workflow() {
2794            let tracker = GcTracker::new();
2795            let runtime = QueryRuntime::with_tracer(tracker);
2796
2797            // Execute some queries
2798            let _ = runtime.query(Leaf { id: 1 }).unwrap();
2799            let _ = runtime.query(Leaf { id: 2 }).unwrap();
2800            let _ = runtime.query(Leaf { id: 3 }).unwrap();
2801
2802            // Simulate GC: remove all queries that are not in use
2803            let mut removed = 0;
2804            for key in runtime.query_keys() {
2805                if runtime.remove_if_unused(&key) {
2806                    removed += 1;
2807                }
2808            }
2809
2810            // All leaf queries should be removable
2811            assert!(removed >= 3);
2812
2813            // Queries should no longer exist
2814            assert!(runtime.changed_at(&Leaf { id: 1 }).is_none());
2815            assert!(runtime.changed_at(&Leaf { id: 2 }).is_none());
2816            assert!(runtime.changed_at(&Leaf { id: 3 }).is_none());
2817        }
2818    }
2819}