query_flow/
runtime.rs

1//! Query runtime and context.
2
3use std::any::{Any, TypeId};
4use std::cell::{Cell, RefCell};
5use std::ops::Deref;
6use std::sync::Arc;
7
8use whale::{Durability, GetOrInsertResult, RevisionCounter, Runtime as WhaleRuntime};
9
10use crate::asset::{AssetKey, AssetLocator, DurabilityLevel, FullAssetKey, PendingAsset};
11use crate::db::Db;
12use crate::key::FullCacheKey;
13use crate::loading::AssetLoadingState;
14use crate::query::Query;
15use crate::storage::{
16    AssetKeyRegistry, AssetState, CachedEntry, CachedValue, LocatorStorage, PendingStorage,
17    QueryRegistry, VerifierStorage,
18};
19use crate::tracer::{
20    ExecutionResult, InvalidationReason, NoopTracer, SpanId, Tracer, TracerAssetKey,
21    TracerAssetState, TracerQueryKey,
22};
23use crate::QueryError;
24
25/// Function type for comparing user errors during early cutoff.
26///
27/// Used by `QueryRuntimeBuilder::error_comparator` to customize how
28/// `QueryError::UserError` values are compared for caching purposes.
29pub type ErrorComparator = fn(&anyhow::Error, &anyhow::Error) -> bool;
30
31/// Number of durability levels (matches whale's default).
32const DURABILITY_LEVELS: usize = 4;
33
34// Thread-local query execution stack for cycle detection.
35thread_local! {
36    static QUERY_STACK: RefCell<Vec<FullCacheKey>> = const { RefCell::new(Vec::new()) };
37}
38
39/// Execution context passed through query execution.
40///
41/// Contains a span ID for tracing correlation.
42#[derive(Clone, Copy)]
43pub struct ExecutionContext {
44    span_id: SpanId,
45}
46
47impl ExecutionContext {
48    /// Create a new execution context with the given span ID.
49    #[inline]
50    pub fn new(span_id: SpanId) -> Self {
51        Self { span_id }
52    }
53
54    /// Get the span ID for this execution context.
55    #[inline]
56    pub fn span_id(&self) -> SpanId {
57        self.span_id
58    }
59}
60
61/// Result of polling a query, containing the value and its revision.
62///
63/// This is returned by [`QueryRuntime::poll`] and provides both the query result
64/// and its change revision, enabling efficient change detection for subscription
65/// patterns.
66///
67/// # Example
68///
69/// ```ignore
70/// let result = runtime.poll(MyQuery::new())?;
71///
72/// // Access the value via Deref
73/// println!("{:?}", *result);
74///
75/// // Check if changed since last poll
76/// if result.revision > last_known_revision {
77///     notify_subscribers(&result.value);
78///     last_known_revision = result.revision;
79/// }
80/// ```
81#[derive(Debug, Clone)]
82pub struct Polled<T> {
83    /// The query result value.
84    pub value: T,
85    /// The revision at which this value was last changed.
86    ///
87    /// Compare this with a previously stored revision to detect changes.
88    pub revision: RevisionCounter,
89}
90
91impl<T: Deref> Deref for Polled<T> {
92    type Target = T::Target;
93
94    fn deref(&self) -> &Self::Target {
95        &self.value
96    }
97}
98
99/// The query runtime manages query execution, caching, and dependency tracking.
100///
101/// This is cheap to clone - all data is behind `Arc`.
102///
103/// # Type Parameter
104///
105/// - `T: Tracer` - The tracer type for observability. Use `NoopTracer` (default)
106///   for zero-cost when tracing is not needed.
107///
108/// # Example
109///
110/// ```ignore
111/// // Without tracing (default)
112/// let runtime = QueryRuntime::new();
113///
114/// // With tracing
115/// let tracer = MyTracer::new();
116/// let runtime = QueryRuntime::with_tracer(tracer);
117///
118/// // Sync query execution
119/// let result = runtime.query(MyQuery { ... })?;
120/// ```
121pub struct QueryRuntime<T: Tracer = NoopTracer> {
122    /// Whale runtime for dependency tracking and cache storage.
123    /// Query outputs and asset values are stored in Node.data as Option<CachedEntry>.
124    whale: WhaleRuntime<FullCacheKey, Option<CachedEntry>, DURABILITY_LEVELS>,
125    /// Registered asset locators
126    locators: Arc<LocatorStorage>,
127    /// Pending asset requests
128    pending: Arc<PendingStorage>,
129    /// Registry for tracking query instances (for list_queries)
130    query_registry: Arc<QueryRegistry>,
131    /// Registry for tracking asset keys (for list_asset_keys)
132    asset_key_registry: Arc<AssetKeyRegistry>,
133    /// Verifiers for re-executing queries (for verify-then-decide pattern)
134    verifiers: Arc<VerifierStorage>,
135    /// Comparator for user errors during early cutoff
136    error_comparator: ErrorComparator,
137    /// Tracer for observability
138    tracer: Arc<T>,
139}
140
141impl Default for QueryRuntime<NoopTracer> {
142    fn default() -> Self {
143        Self::new()
144    }
145}
146
147impl<T: Tracer> Clone for QueryRuntime<T> {
148    fn clone(&self) -> Self {
149        Self {
150            whale: self.whale.clone(),
151            locators: self.locators.clone(),
152            pending: self.pending.clone(),
153            query_registry: self.query_registry.clone(),
154            asset_key_registry: self.asset_key_registry.clone(),
155            verifiers: self.verifiers.clone(),
156            error_comparator: self.error_comparator,
157            tracer: self.tracer.clone(),
158        }
159    }
160}
161
162/// Default error comparator that treats all errors as different.
163///
164/// This is conservative - it always triggers recomputation when an error occurs.
165fn default_error_comparator(_a: &anyhow::Error, _b: &anyhow::Error) -> bool {
166    false
167}
168
169impl<T: Tracer> QueryRuntime<T> {
170    /// Get cached output along with its revision (single atomic access).
171    fn get_cached_with_revision<Q: Query>(
172        &self,
173        key: &FullCacheKey,
174    ) -> Option<(CachedValue<Arc<Q::Output>>, RevisionCounter)> {
175        let node = self.whale.get(key)?;
176        let revision = node.changed_at;
177        let entry = node.data.as_ref()?;
178        let cached = entry.to_cached_value::<Q::Output>()?;
179        Some((cached, revision))
180    }
181
182    /// Get a reference to the tracer.
183    #[inline]
184    pub fn tracer(&self) -> &T {
185        &self.tracer
186    }
187}
188
189impl QueryRuntime<NoopTracer> {
190    /// Create a new query runtime with default settings.
191    pub fn new() -> Self {
192        Self::with_tracer(NoopTracer)
193    }
194
195    /// Create a builder for customizing the runtime.
196    ///
197    /// # Example
198    ///
199    /// ```ignore
200    /// let runtime = QueryRuntime::builder()
201    ///     .error_comparator(|a, b| {
202    ///         // Custom error comparison logic
203    ///         match (a.downcast_ref::<MyError>(), b.downcast_ref::<MyError>()) {
204    ///             (Some(a), Some(b)) => a == b,
205    ///             _ => false,
206    ///         }
207    ///     })
208    ///     .build();
209    /// ```
210    pub fn builder() -> QueryRuntimeBuilder<NoopTracer> {
211        QueryRuntimeBuilder::new()
212    }
213}
214
215impl<T: Tracer> QueryRuntime<T> {
216    /// Create a new query runtime with the specified tracer.
217    pub fn with_tracer(tracer: T) -> Self {
218        QueryRuntimeBuilder::new().tracer(tracer).build()
219    }
220
221    /// Execute a query synchronously.
222    ///
223    /// Returns the cached result if valid, otherwise executes the query.
224    ///
225    /// # Errors
226    ///
227    /// - `QueryError::Suspend` - Query is waiting for async loading
228    /// - `QueryError::Cycle` - Dependency cycle detected
229    pub fn query<Q: Query>(&self, query: Q) -> Result<Arc<Q::Output>, QueryError> {
230        self.query_internal(query)
231            .and_then(|(inner_result, _)| inner_result.map_err(QueryError::UserError))
232    }
233
234    /// Internal implementation shared by query() and poll().
235    ///
236    /// Returns (result, revision) tuple where result is either Ok(output) or Err(user_error).
237    /// System errors (Suspend, Cycle, etc.) are returned as the outer Err.
238    #[allow(clippy::type_complexity)]
239    fn query_internal<Q: Query>(
240        &self,
241        query: Q,
242    ) -> Result<(Result<Arc<Q::Output>, Arc<anyhow::Error>>, RevisionCounter), QueryError> {
243        let key = query.cache_key();
244        let full_key = FullCacheKey::new::<Q, _>(&key);
245
246        // Emit query key event for GC tracking (before other events)
247        self.tracer.on_query_key(&full_key);
248
249        // Create execution context and emit start event
250        let span_id = self.tracer.new_span_id();
251        let exec_ctx = ExecutionContext::new(span_id);
252        let query_key = TracerQueryKey::new(std::any::type_name::<Q>(), full_key.debug_repr());
253
254        self.tracer.on_query_start(span_id, query_key.clone());
255
256        // Check for cycles using thread-local stack
257        let cycle_detected = QUERY_STACK.with(|stack| {
258            let stack = stack.borrow();
259            stack.iter().any(|k| k == &full_key)
260        });
261
262        if cycle_detected {
263            let path = QUERY_STACK.with(|stack| {
264                let stack = stack.borrow();
265                let mut path: Vec<String> =
266                    stack.iter().map(|k| k.debug_repr().to_string()).collect();
267                path.push(full_key.debug_repr().to_string());
268                path
269            });
270
271            self.tracer.on_cycle_detected(
272                path.iter()
273                    .map(|s| TracerQueryKey::new("", s.clone()))
274                    .collect(),
275            );
276            self.tracer
277                .on_query_end(span_id, query_key.clone(), ExecutionResult::CycleDetected);
278
279            return Err(QueryError::Cycle { path });
280        }
281
282        // Check if cached and valid (with verify-then-decide pattern)
283        let current_rev = self.whale.current_revision();
284
285        // Fast path: already verified at current revision
286        if self.whale.is_verified_at(&full_key, &current_rev) {
287            // Single atomic access to get both cached value and revision
288            if let Some((cached, revision)) = self.get_cached_with_revision::<Q>(&full_key) {
289                self.tracer.on_cache_check(span_id, query_key.clone(), true);
290                self.tracer
291                    .on_query_end(span_id, query_key.clone(), ExecutionResult::CacheHit);
292
293                return match cached {
294                    CachedValue::Ok(output) => Ok((Ok(output), revision)),
295                    CachedValue::UserError(err) => Ok((Err(err), revision)),
296                };
297            }
298        }
299
300        // Check shallow validity (deps' changed_at ok) and try verify-then-decide
301        if self.whale.is_valid(&full_key) {
302            // Single atomic access to get both cached value and revision
303            if let Some((cached, revision)) = self.get_cached_with_revision::<Q>(&full_key) {
304                // Shallow valid but not verified - verify deps first
305                let mut deps_verified = true;
306                if let Some(deps) = self.whale.get_dependency_ids(&full_key) {
307                    for dep in deps {
308                        if let Some(verifier) = self.verifiers.get(&dep) {
309                            // Re-query dep to verify it (triggers recursive verification)
310                            if verifier.verify(self as &dyn std::any::Any).is_err() {
311                                deps_verified = false;
312                                break;
313                            }
314                        }
315                    }
316                }
317
318                // Re-check validity after deps are verified
319                if deps_verified && self.whale.is_valid(&full_key) {
320                    // Deps didn't change their changed_at, mark as verified and use cached
321                    self.whale.mark_verified(&full_key, &current_rev);
322
323                    self.tracer.on_cache_check(span_id, query_key.clone(), true);
324                    self.tracer
325                        .on_query_end(span_id, query_key.clone(), ExecutionResult::CacheHit);
326
327                    return match cached {
328                        CachedValue::Ok(output) => Ok((Ok(output), revision)),
329                        CachedValue::UserError(err) => Ok((Err(err), revision)),
330                    };
331                }
332                // A dep's changed_at increased, fall through to execute
333            }
334        }
335
336        self.tracer
337            .on_cache_check(span_id, query_key.clone(), false);
338
339        // Execute the query with cycle tracking
340        QUERY_STACK.with(|stack| {
341            stack.borrow_mut().push(full_key.clone());
342        });
343
344        let result = self.execute_query::<Q>(&query, &full_key, exec_ctx);
345
346        QUERY_STACK.with(|stack| {
347            stack.borrow_mut().pop();
348        });
349
350        // Emit end event
351        let exec_result = match &result {
352            Ok((_, true, _)) => ExecutionResult::Changed,
353            Ok((_, false, _)) => ExecutionResult::Unchanged,
354            Err(QueryError::Suspend { .. }) => ExecutionResult::Suspended,
355            Err(QueryError::Cycle { .. }) => ExecutionResult::CycleDetected,
356            Err(e) => ExecutionResult::Error {
357                message: format!("{:?}", e),
358            },
359        };
360        self.tracer
361            .on_query_end(span_id, query_key.clone(), exec_result);
362
363        result.map(|(inner_result, _, revision)| (inner_result, revision))
364    }
365
366    /// Execute a query, caching the result if appropriate.
367    ///
368    /// Returns (result, output_changed, revision) tuple.
369    /// - `result`: Ok(output) for success, Err(user_error) for user errors
370    /// - System errors (Suspend, Cycle, etc.) are returned as outer Err
371    #[allow(clippy::type_complexity)]
372    fn execute_query<Q: Query>(
373        &self,
374        query: &Q,
375        full_key: &FullCacheKey,
376        exec_ctx: ExecutionContext,
377    ) -> Result<
378        (
379            Result<Arc<Q::Output>, Arc<anyhow::Error>>,
380            bool,
381            RevisionCounter,
382        ),
383        QueryError,
384    > {
385        // Create context for this query execution
386        let ctx = QueryContext {
387            runtime: self,
388            current_key: full_key.clone(),
389            parent_query_type: std::any::type_name::<Q>(),
390            exec_ctx,
391            deps: RefCell::new(Vec::new()),
392            first_access_revision: Cell::new(None),
393        };
394
395        // Execute the query (clone because query() takes ownership)
396        let result = query.clone().query(&ctx);
397
398        // Get collected dependencies
399        let deps: Vec<FullCacheKey> = ctx.deps.borrow().clone();
400
401        // Query durability defaults to stable - Whale will automatically reduce
402        // the effective durability to min(requested, min(dep_durabilities)).
403        // A pure query with no dependencies remains stable.
404        // A query depending on volatile assets becomes volatile.
405        let durability = Durability::stable();
406
407        match result {
408            Ok(output) => {
409                // Check if output changed (for early cutoff)
410                // existing_revision is Some only when output is unchanged (can reuse revision)
411                let existing_revision = if let Some((CachedValue::Ok(old), rev)) =
412                    self.get_cached_with_revision::<Q>(full_key)
413                {
414                    if Q::output_eq(&*old, &*output) {
415                        Some(rev) // Same output - reuse revision
416                    } else {
417                        None // Different output
418                    }
419                } else {
420                    None // No previous Ok value
421                };
422                let output_changed = existing_revision.is_none();
423
424                // Emit early cutoff check event
425                self.tracer.on_early_cutoff_check(
426                    exec_ctx.span_id(),
427                    TracerQueryKey::new(std::any::type_name::<Q>(), full_key.debug_repr()),
428                    output_changed,
429                );
430
431                // Update whale with cached entry (atomic update of value + dependency state)
432                let entry = CachedEntry::Ok(output.clone() as Arc<dyn std::any::Any + Send + Sync>);
433                let revision = if let Some(existing_rev) = existing_revision {
434                    // confirm_unchanged doesn't change changed_at, use existing
435                    let _ = self.whale.confirm_unchanged(full_key, deps);
436                    existing_rev
437                } else {
438                    // Use new_rev from register result
439                    match self
440                        .whale
441                        .register(full_key.clone(), Some(entry), durability, deps)
442                    {
443                        Ok(result) => result.new_rev,
444                        Err(missing) => {
445                            return Err(QueryError::DependenciesRemoved {
446                                missing_keys: missing,
447                            })
448                        }
449                    }
450                };
451
452                // Register query in registry for list_queries
453                let is_new_query = self.query_registry.register(query);
454                if is_new_query {
455                    let sentinel = FullCacheKey::query_set_sentinel::<Q>();
456                    let _ = self
457                        .whale
458                        .register(sentinel, None, Durability::stable(), vec![]);
459                }
460
461                // Store verifier for this query (for verify-then-decide pattern)
462                self.verifiers
463                    .insert::<Q, T>(full_key.clone(), query.clone());
464
465                Ok((Ok(output), output_changed, revision))
466            }
467            Err(QueryError::UserError(err)) => {
468                // Check if error changed (for early cutoff)
469                // existing_revision is Some only when error is unchanged (can reuse revision)
470                let existing_revision = if let Some((CachedValue::UserError(old_err), rev)) =
471                    self.get_cached_with_revision::<Q>(full_key)
472                {
473                    if (self.error_comparator)(old_err.as_ref(), err.as_ref()) {
474                        Some(rev) // Same error - reuse revision
475                    } else {
476                        None // Different error
477                    }
478                } else {
479                    None // No previous UserError
480                };
481                let output_changed = existing_revision.is_none();
482
483                // Emit early cutoff check event
484                self.tracer.on_early_cutoff_check(
485                    exec_ctx.span_id(),
486                    TracerQueryKey::new(std::any::type_name::<Q>(), full_key.debug_repr()),
487                    output_changed,
488                );
489
490                // Update whale with cached error (atomic update of value + dependency state)
491                let entry = CachedEntry::UserError(err.clone());
492                let revision = if let Some(existing_rev) = existing_revision {
493                    // confirm_unchanged doesn't change changed_at, use existing
494                    let _ = self.whale.confirm_unchanged(full_key, deps);
495                    existing_rev
496                } else {
497                    // Use new_rev from register result
498                    match self
499                        .whale
500                        .register(full_key.clone(), Some(entry), durability, deps)
501                    {
502                        Ok(result) => result.new_rev,
503                        Err(missing) => {
504                            return Err(QueryError::DependenciesRemoved {
505                                missing_keys: missing,
506                            })
507                        }
508                    }
509                };
510
511                // Register query in registry for list_queries
512                let is_new_query = self.query_registry.register(query);
513                if is_new_query {
514                    let sentinel = FullCacheKey::query_set_sentinel::<Q>();
515                    let _ = self
516                        .whale
517                        .register(sentinel, None, Durability::stable(), vec![]);
518                }
519
520                // Store verifier for this query (for verify-then-decide pattern)
521                self.verifiers
522                    .insert::<Q, T>(full_key.clone(), query.clone());
523
524                Ok((Err(err), output_changed, revision))
525            }
526            Err(e) => {
527                // System errors (Suspend, Cycle, Cancelled, MissingDependency) are not cached
528                Err(e)
529            }
530        }
531    }
532
533    /// Invalidate a query, forcing recomputation on next access.
534    ///
535    /// This also invalidates any queries that depend on this one.
536    pub fn invalidate<Q: Query>(&self, key: &Q::CacheKey) {
537        let full_key = FullCacheKey::new::<Q, _>(key);
538
539        self.tracer.on_query_invalidated(
540            TracerQueryKey::new(std::any::type_name::<Q>(), full_key.debug_repr()),
541            InvalidationReason::ManualInvalidation,
542        );
543
544        // Update whale to invalidate dependents (register with None to clear cached value)
545        // Use stable durability to increment all revision counters, ensuring queries
546        // at any durability level will see this as a change.
547        let _ = self
548            .whale
549            .register(full_key, None, Durability::stable(), vec![]);
550    }
551
552    /// Remove a query from the cache entirely, freeing memory.
553    ///
554    /// Use this for GC when a query is no longer needed.
555    /// Unlike `invalidate`, this removes all traces of the query from storage.
556    /// The query will be recomputed from scratch on next access.
557    ///
558    /// This also invalidates any queries that depend on this one.
559    pub fn remove_query<Q: Query>(&self, key: &Q::CacheKey) {
560        let full_key = FullCacheKey::new::<Q, _>(key);
561
562        self.tracer.on_query_invalidated(
563            TracerQueryKey::new(std::any::type_name::<Q>(), full_key.debug_repr()),
564            InvalidationReason::ManualInvalidation,
565        );
566
567        // Remove verifier if exists
568        self.verifiers.remove(&full_key);
569
570        // Remove from whale storage (this also handles dependent invalidation)
571        self.whale.remove(&full_key);
572
573        // Remove from registry and update sentinel for list_queries
574        if self.query_registry.remove::<Q>(key) {
575            let sentinel = FullCacheKey::query_set_sentinel::<Q>();
576            let _ = self
577                .whale
578                .register(sentinel, None, Durability::stable(), vec![]);
579        }
580    }
581
582    /// Clear all cached values by removing all nodes from whale.
583    ///
584    /// Note: This is a relatively expensive operation as it iterates through all keys.
585    pub fn clear_cache(&self) {
586        let keys = self.whale.keys();
587        for key in keys {
588            self.whale.remove(&key);
589        }
590    }
591
592    /// Poll a query, returning both the result and its change revision.
593    ///
594    /// This is useful for implementing subscription patterns where you need to
595    /// detect changes efficiently. Compare the returned `revision` with a
596    /// previously stored value to determine if the query result has changed.
597    ///
598    /// The returned `Polled` contains a `Result<Arc<Q::Output>, Arc<anyhow::Error>>`
599    /// as its value, allowing you to track revision changes for both success and
600    /// user error cases.
601    ///
602    /// # Example
603    ///
604    /// ```ignore
605    /// struct Subscription<Q: Query> {
606    ///     query: Q,
607    ///     last_revision: RevisionCounter,
608    ///     tx: Sender<Result<Arc<Q::Output>, Arc<anyhow::Error>>>,
609    /// }
610    ///
611    /// // Polling loop
612    /// for sub in &mut subscriptions {
613    ///     let result = runtime.poll(sub.query.clone())?;
614    ///     if result.revision > sub.last_revision {
615    ///         sub.tx.send(result.value.clone())?;
616    ///         sub.last_revision = result.revision;
617    ///     }
618    /// }
619    /// ```
620    ///
621    /// # Errors
622    ///
623    /// Returns `Err` only for system errors (Suspend, Cycle, etc.).
624    /// User errors are returned as `Ok(Polled { value: Err(error), ... })`.
625    #[allow(clippy::type_complexity)]
626    pub fn poll<Q: Query>(
627        &self,
628        query: Q,
629    ) -> Result<Polled<Result<Arc<Q::Output>, Arc<anyhow::Error>>>, QueryError> {
630        let (value, revision) = self.query_internal(query)?;
631        Ok(Polled { value, revision })
632    }
633
634    /// Get the change revision of a query without executing it.
635    ///
636    /// Returns `None` if the query has never been executed.
637    ///
638    /// This is useful for checking if a query has changed since the last poll
639    /// without the cost of executing the query.
640    ///
641    /// # Example
642    ///
643    /// ```ignore
644    /// // Check if query has changed before deciding to poll
645    /// if let Some(rev) = runtime.changed_at::<MyQuery>(&key) {
646    ///     if rev > last_known_revision {
647    ///         let result = runtime.query(MyQuery::new(key))?;
648    ///         // Process result...
649    ///     }
650    /// }
651    /// ```
652    pub fn changed_at<Q: Query>(&self, key: &Q::CacheKey) -> Option<RevisionCounter> {
653        let full_key = FullCacheKey::new::<Q, _>(key);
654        self.whale.get(&full_key).map(|node| node.changed_at)
655    }
656}
657
658// ============================================================================
659// GC (Garbage Collection) API
660// ============================================================================
661
662impl<T: Tracer> QueryRuntime<T> {
663    /// Get all query keys currently in the cache.
664    ///
665    /// This is useful for implementing custom garbage collection strategies.
666    /// Use this in combination with [`Tracer::on_query_key`] to track access
667    /// times and implement LRU, TTL, or other GC algorithms externally.
668    ///
669    /// # Example
670    ///
671    /// ```ignore
672    /// // Collect all keys that haven't been accessed recently
673    /// let stale_keys: Vec<_> = runtime.query_keys()
674    ///     .filter(|key| tracker.is_stale(key))
675    ///     .collect();
676    ///
677    /// // Remove stale queries that have no dependents
678    /// for key in stale_keys {
679    ///     runtime.remove_if_unused(&key);
680    /// }
681    /// ```
682    pub fn query_keys(&self) -> Vec<FullCacheKey> {
683        self.whale.keys()
684    }
685
686    /// Remove a query if it has no dependents.
687    ///
688    /// Returns `true` if the query was removed, `false` if it has dependents
689    /// or doesn't exist. This is the safe way to remove queries during GC,
690    /// as it won't break queries that depend on this one.
691    ///
692    /// # Example
693    ///
694    /// ```ignore
695    /// let key = FullCacheKey::new::<MyQuery, _>(&cache_key);
696    /// if runtime.remove_query_if_unused::<MyQuery>(&cache_key) {
697    ///     println!("Query removed");
698    /// } else {
699    ///     println!("Query has dependents, not removed");
700    /// }
701    /// ```
702    pub fn remove_query_if_unused<Q: Query>(&self, key: &Q::CacheKey) -> bool {
703        let full_key = FullCacheKey::new::<Q, _>(key);
704        self.remove_if_unused(&full_key)
705    }
706
707    /// Remove a query by its [`FullCacheKey`].
708    ///
709    /// This is the type-erased version of [`remove_query`](Self::remove_query).
710    /// Use this when you have a `FullCacheKey` from [`query_keys`](Self::query_keys)
711    /// or [`Tracer::on_query_key`].
712    ///
713    /// Returns `true` if the query was removed, `false` if it doesn't exist.
714    ///
715    /// # Warning
716    ///
717    /// This forcibly removes the query even if other queries depend on it.
718    /// Dependent queries will be recomputed on next access. For safe GC,
719    /// use [`remove_if_unused`](Self::remove_if_unused) instead.
720    pub fn remove(&self, key: &FullCacheKey) -> bool {
721        // Remove verifier if exists
722        self.verifiers.remove(key);
723
724        // Remove from whale storage
725        self.whale.remove(key).is_some()
726    }
727
728    /// Remove a query by its [`FullCacheKey`] if it has no dependents.
729    ///
730    /// This is the type-erased version of [`remove_query_if_unused`](Self::remove_query_if_unused).
731    /// Use this when you have a `FullCacheKey` from [`query_keys`](Self::query_keys)
732    /// or [`Tracer::on_query_key`].
733    ///
734    /// Returns `true` if the query was removed, `false` if it has dependents
735    /// or doesn't exist.
736    ///
737    /// # Example
738    ///
739    /// ```ignore
740    /// // Implement LRU GC
741    /// for key in runtime.query_keys() {
742    ///     if tracker.is_expired(&key) {
743    ///         runtime.remove_if_unused(&key);
744    ///     }
745    /// }
746    /// ```
747    pub fn remove_if_unused(&self, key: &FullCacheKey) -> bool {
748        if self.whale.remove_if_unused(key.clone()).is_some() {
749            // Successfully removed - clean up verifier
750            self.verifiers.remove(key);
751            true
752        } else {
753            false
754        }
755    }
756}
757
758// ============================================================================
759// Builder
760// ============================================================================
761
762/// Builder for [`QueryRuntime`] with customizable settings.
763///
764/// # Example
765///
766/// ```ignore
767/// let runtime = QueryRuntime::builder()
768///     .error_comparator(|a, b| {
769///         // Treat all errors of the same type as equal
770///         a.downcast_ref::<std::io::Error>().is_some()
771///             == b.downcast_ref::<std::io::Error>().is_some()
772///     })
773///     .build();
774/// ```
775pub struct QueryRuntimeBuilder<T: Tracer = NoopTracer> {
776    error_comparator: ErrorComparator,
777    tracer: T,
778}
779
780impl Default for QueryRuntimeBuilder<NoopTracer> {
781    fn default() -> Self {
782        Self::new()
783    }
784}
785
786impl QueryRuntimeBuilder<NoopTracer> {
787    /// Create a new builder with default settings.
788    pub fn new() -> Self {
789        Self {
790            error_comparator: default_error_comparator,
791            tracer: NoopTracer,
792        }
793    }
794}
795
796impl<T: Tracer> QueryRuntimeBuilder<T> {
797    /// Set the error comparator function for early cutoff optimization.
798    ///
799    /// When a query returns `QueryError::UserError`, this function is used
800    /// to compare it with the previously cached error. If they are equal,
801    /// downstream queries can skip recomputation (early cutoff).
802    ///
803    /// The default comparator returns `false` for all errors, meaning errors
804    /// are always considered different (conservative, always recomputes).
805    ///
806    /// # Example
807    ///
808    /// ```ignore
809    /// // Treat errors as equal if they have the same display message
810    /// let runtime = QueryRuntime::builder()
811    ///     .error_comparator(|a, b| a.to_string() == b.to_string())
812    ///     .build();
813    /// ```
814    pub fn error_comparator(mut self, f: ErrorComparator) -> Self {
815        self.error_comparator = f;
816        self
817    }
818
819    /// Set the tracer for observability.
820    pub fn tracer<U: Tracer>(self, tracer: U) -> QueryRuntimeBuilder<U> {
821        QueryRuntimeBuilder {
822            error_comparator: self.error_comparator,
823            tracer,
824        }
825    }
826
827    /// Build the runtime with the configured settings.
828    pub fn build(self) -> QueryRuntime<T> {
829        QueryRuntime {
830            whale: WhaleRuntime::new(),
831            locators: Arc::new(LocatorStorage::new()),
832            pending: Arc::new(PendingStorage::new()),
833            query_registry: Arc::new(QueryRegistry::new()),
834            asset_key_registry: Arc::new(AssetKeyRegistry::new()),
835            verifiers: Arc::new(VerifierStorage::new()),
836            error_comparator: self.error_comparator,
837            tracer: Arc::new(self.tracer),
838        }
839    }
840}
841
842// ============================================================================
843// Asset API
844// ============================================================================
845
846impl<T: Tracer> QueryRuntime<T> {
847    /// Register an asset locator for a specific asset key type.
848    ///
849    /// Only one locator can be registered per key type. Later registrations
850    /// replace earlier ones.
851    ///
852    /// # Example
853    ///
854    /// ```ignore
855    /// let runtime = QueryRuntime::new();
856    /// runtime.register_asset_locator(FileSystemLocator::new("/assets"));
857    /// ```
858    pub fn register_asset_locator<K, L>(&self, locator: L)
859    where
860        K: AssetKey,
861        L: AssetLocator<K>,
862    {
863        self.locators.insert::<K, L>(locator);
864    }
865
866    /// Get an iterator over pending asset requests.
867    ///
868    /// Returns assets that have been requested but not yet resolved.
869    /// The user should fetch these externally and call `resolve_asset()`.
870    ///
871    /// # Example
872    ///
873    /// ```ignore
874    /// for pending in runtime.pending_assets() {
875    ///     if let Some(path) = pending.key::<FilePath>() {
876    ///         let content = fetch_file(path);
877    ///         runtime.resolve_asset(path.clone(), content);
878    ///     }
879    /// }
880    /// ```
881    pub fn pending_assets(&self) -> Vec<PendingAsset> {
882        self.pending.get_all()
883    }
884
885    /// Get pending assets filtered by key type.
886    pub fn pending_assets_of<K: AssetKey>(&self) -> Vec<K> {
887        self.pending.get_of_type::<K>()
888    }
889
890    /// Check if there are any pending assets.
891    pub fn has_pending_assets(&self) -> bool {
892        !self.pending.is_empty()
893    }
894
895    /// Resolve an asset with its loaded value.
896    ///
897    /// This marks the asset as ready and invalidates any queries that
898    /// depend on it (if the value changed), triggering recomputation on next access.
899    ///
900    /// This method is idempotent - resolving with the same value (via `asset_eq`)
901    /// will not trigger downstream recomputation.
902    ///
903    /// # Arguments
904    ///
905    /// * `key` - The asset key identifying this resource
906    /// * `value` - The loaded asset value
907    /// * `durability` - How frequently this asset is expected to change
908    ///
909    /// # Example
910    ///
911    /// ```ignore
912    /// let content = std::fs::read_to_string(&path)?;
913    /// runtime.resolve_asset(FilePath(path), content, DurabilityLevel::Volatile);
914    /// ```
915    pub fn resolve_asset<K: AssetKey>(&self, key: K, value: K::Asset, durability: DurabilityLevel) {
916        self.resolve_asset_internal(key, value, durability);
917    }
918
919    fn resolve_asset_internal<K: AssetKey>(
920        &self,
921        key: K,
922        value: K::Asset,
923        durability_level: DurabilityLevel,
924    ) {
925        let full_asset_key = FullAssetKey::new(&key);
926        let full_cache_key = FullCacheKey::from_asset_key(&full_asset_key);
927
928        // Remove from pending BEFORE registering the value
929        self.pending.remove(&full_asset_key);
930
931        // Prepare the new entry
932        let value_arc: Arc<K::Asset> = Arc::new(value);
933        let entry = CachedEntry::AssetReady(value_arc.clone() as Arc<dyn Any + Send + Sync>);
934        let durability =
935            Durability::new(durability_level.as_u8() as usize).unwrap_or(Durability::volatile());
936
937        // Atomic compare-and-update
938        let result = self
939            .whale
940            .update_with_compare(
941                full_cache_key,
942                Some(entry),
943                |old_data, _new_data| {
944                    // Compare old and new values
945                    match old_data.and_then(|d| d.as_ref()) {
946                        Some(CachedEntry::AssetReady(old_arc)) => {
947                            match old_arc.clone().downcast::<K::Asset>() {
948                                Ok(old_value) => !K::asset_eq(&old_value, &value_arc),
949                                Err(_) => true, // Type mismatch, treat as changed
950                            }
951                        }
952                        _ => true, // Loading, NotFound, or not present -> changed
953                    }
954                },
955                durability,
956                vec![],
957            )
958            .expect("update_with_compare with no dependencies cannot fail");
959
960        // Emit asset resolved event
961        self.tracer.on_asset_resolved(
962            TracerAssetKey::new(std::any::type_name::<K>(), format!("{:?}", key)),
963            result.changed,
964        );
965
966        // Register asset key in registry for list_asset_keys
967        let is_new_asset = self.asset_key_registry.register(&key);
968        if is_new_asset {
969            // Update sentinel to invalidate list_asset_keys dependents
970            let sentinel = FullCacheKey::asset_key_set_sentinel::<K>();
971            let _ = self
972                .whale
973                .register(sentinel, None, Durability::stable(), vec![]);
974        }
975    }
976
977    /// Invalidate an asset, forcing queries to re-request it.
978    ///
979    /// The asset will be marked as loading and added to pending assets.
980    /// Dependent queries will suspend until the asset is resolved again.
981    ///
982    /// # Example
983    ///
984    /// ```ignore
985    /// // File was modified externally
986    /// runtime.invalidate_asset(&FilePath("config.json".into()));
987    /// // Queries depending on this asset will now suspend
988    /// // User should fetch the new value and call resolve_asset
989    /// ```
990    pub fn invalidate_asset<K: AssetKey>(&self, key: &K) {
991        let full_asset_key = FullAssetKey::new(key);
992        let full_cache_key = FullCacheKey::from_asset_key(&full_asset_key);
993
994        // Emit asset invalidated event
995        self.tracer.on_asset_invalidated(TracerAssetKey::new(
996            std::any::type_name::<K>(),
997            format!("{:?}", key),
998        ));
999
1000        // Add to pending FIRST (before clearing whale state)
1001        // This ensures: readers see either old value, or Loading+pending
1002        self.pending.insert::<K>(full_asset_key, key.clone());
1003
1004        // Atomic: clear cached value + invalidate dependents
1005        // Using None for data means "needs to be loaded"
1006        // Use stable durability to ensure queries at any durability level see the change.
1007        let _ = self
1008            .whale
1009            .register(full_cache_key, None, Durability::stable(), vec![]);
1010    }
1011
1012    /// Remove an asset from the cache entirely.
1013    ///
1014    /// Unlike `invalidate_asset`, this removes all traces of the asset.
1015    /// Dependent queries will go through the locator again on next access.
1016    pub fn remove_asset<K: AssetKey>(&self, key: &K) {
1017        let full_asset_key = FullAssetKey::new(key);
1018        let full_cache_key = FullCacheKey::from_asset_key(&full_asset_key);
1019
1020        // Remove from pending first
1021        self.pending.remove(&full_asset_key);
1022
1023        // Remove from whale (this also cleans up dependency edges)
1024        // whale.remove() invalidates dependents before removing
1025        self.whale.remove(&full_cache_key);
1026
1027        // Remove from registry and update sentinel for list_asset_keys
1028        if self.asset_key_registry.remove::<K>(key) {
1029            let sentinel = FullCacheKey::asset_key_set_sentinel::<K>();
1030            let _ = self
1031                .whale
1032                .register(sentinel, None, Durability::stable(), vec![]);
1033        }
1034    }
1035
1036    /// Get an asset by key without tracking dependencies.
1037    ///
1038    /// Unlike `QueryContext::asset()`, this method does NOT register the caller
1039    /// as a dependent of the asset. Use this for direct asset access outside
1040    /// of query execution.
1041    ///
1042    /// # Returns
1043    ///
1044    /// - `Ok(AssetLoadingState::ready(...))` - Asset is loaded and ready
1045    /// - `Ok(AssetLoadingState::loading(...))` - Asset is still loading (added to pending)
1046    /// - `Err(QueryError::MissingDependency)` - Asset was not found
1047    pub fn get_asset<K: AssetKey>(&self, key: K) -> Result<AssetLoadingState<K>, QueryError> {
1048        self.get_asset_internal(key)
1049    }
1050
1051    /// Internal: Get asset state and its changed_at revision atomically.
1052    ///
1053    /// Returns (AssetLoadingState, changed_at) where changed_at is from the same
1054    /// whale node that provided the asset value.
1055    fn get_asset_with_revision<K: AssetKey>(
1056        &self,
1057        key: K,
1058    ) -> Result<(AssetLoadingState<K>, RevisionCounter), QueryError> {
1059        let full_asset_key = FullAssetKey::new(&key);
1060        let full_cache_key = FullCacheKey::from_asset_key(&full_asset_key);
1061
1062        // Helper to emit AssetRequested event
1063        let emit_requested = |tracer: &T, key: &K, state: TracerAssetState| {
1064            tracer.on_asset_requested(
1065                TracerAssetKey::new(std::any::type_name::<K>(), format!("{:?}", key)),
1066                state,
1067            );
1068        };
1069
1070        // Check whale cache first (single atomic read)
1071        if let Some(node) = self.whale.get(&full_cache_key) {
1072            let changed_at = node.changed_at;
1073            // Check if valid at current revision
1074            if self.whale.is_valid(&full_cache_key) {
1075                match &node.data {
1076                    Some(CachedEntry::AssetReady(arc)) => {
1077                        emit_requested(&self.tracer, &key, TracerAssetState::Ready);
1078                        match arc.clone().downcast::<K::Asset>() {
1079                            Ok(value) => {
1080                                return Ok((AssetLoadingState::ready(key, value), changed_at))
1081                            }
1082                            Err(_) => {
1083                                return Err(QueryError::MissingDependency {
1084                                    description: format!("Asset type mismatch: {:?}", key),
1085                                })
1086                            }
1087                        }
1088                    }
1089                    Some(CachedEntry::AssetNotFound) => {
1090                        emit_requested(&self.tracer, &key, TracerAssetState::NotFound);
1091                        return Err(QueryError::MissingDependency {
1092                            description: format!("Asset not found: {:?}", key),
1093                        });
1094                    }
1095                    None => {
1096                        // Loading state
1097                        emit_requested(&self.tracer, &key, TracerAssetState::Loading);
1098                        return Ok((AssetLoadingState::loading(key), changed_at));
1099                    }
1100                    _ => {
1101                        // Query-related entries (Ok, UserError) shouldn't be here
1102                        // Fall through to locator
1103                    }
1104                }
1105            }
1106        }
1107
1108        // Not in cache or invalid - try locator
1109        if let Some(locator) = self.locators.get(TypeId::of::<K>()) {
1110            if let Some(state) = locator.locate_any(&key) {
1111                match state {
1112                    AssetState::Ready {
1113                        value: arc,
1114                        durability: durability_level,
1115                    } => {
1116                        emit_requested(&self.tracer, &key, TracerAssetState::Ready);
1117
1118                        let typed_value: Arc<K::Asset> = match arc.downcast::<K::Asset>() {
1119                            Ok(v) => v,
1120                            Err(_) => {
1121                                return Err(QueryError::MissingDependency {
1122                                    description: format!("Asset type mismatch: {:?}", key),
1123                                });
1124                            }
1125                        };
1126
1127                        // Store in whale atomically with early cutoff
1128                        // Use durability from LocateResult::Ready
1129                        let entry = CachedEntry::AssetReady(typed_value.clone());
1130                        let durability = Durability::new(durability_level.as_u8() as usize)
1131                            .unwrap_or(Durability::volatile());
1132                        let new_value = typed_value.clone();
1133                        let result = self
1134                            .whale
1135                            .update_with_compare(
1136                                full_cache_key,
1137                                Some(entry),
1138                                |old_data, _new_data| {
1139                                    let Some(CachedEntry::AssetReady(old_arc)) =
1140                                        old_data.and_then(|d| d.as_ref())
1141                                    else {
1142                                        return true;
1143                                    };
1144                                    let Ok(old_value) = old_arc.clone().downcast::<K::Asset>()
1145                                    else {
1146                                        return true;
1147                                    };
1148                                    !K::asset_eq(&old_value, &new_value)
1149                                },
1150                                durability,
1151                                vec![],
1152                            )
1153                            .expect("update_with_compare with no dependencies cannot fail");
1154
1155                        return Ok((AssetLoadingState::ready(key, typed_value), result.revision));
1156                    }
1157                    AssetState::Loading => {
1158                        emit_requested(&self.tracer, &key, TracerAssetState::Loading);
1159
1160                        self.pending.insert::<K>(full_asset_key, key.clone());
1161                        match self
1162                            .whale
1163                            .get_or_insert(full_cache_key, None, Durability::volatile(), vec![])
1164                            .expect("get_or_insert with no dependencies cannot fail")
1165                        {
1166                            GetOrInsertResult::Inserted(node) => {
1167                                return Ok((AssetLoadingState::loading(key), node.changed_at));
1168                            }
1169                            GetOrInsertResult::Existing(node) => {
1170                                let changed_at = node.changed_at;
1171                                match &node.data {
1172                                    Some(CachedEntry::AssetReady(arc)) => {
1173                                        match arc.clone().downcast::<K::Asset>() {
1174                                            Ok(value) => {
1175                                                return Ok((
1176                                                    AssetLoadingState::ready(key, value),
1177                                                    changed_at,
1178                                                ))
1179                                            }
1180                                            Err(_) => {
1181                                                return Ok((
1182                                                    AssetLoadingState::loading(key),
1183                                                    changed_at,
1184                                                ))
1185                                            }
1186                                        }
1187                                    }
1188                                    Some(CachedEntry::AssetNotFound) => {
1189                                        return Err(QueryError::MissingDependency {
1190                                            description: format!("Asset not found: {:?}", key),
1191                                        });
1192                                    }
1193                                    _ => return Ok((AssetLoadingState::loading(key), changed_at)),
1194                                }
1195                            }
1196                        }
1197                    }
1198                    AssetState::NotFound => {
1199                        emit_requested(&self.tracer, &key, TracerAssetState::NotFound);
1200
1201                        let entry = CachedEntry::AssetNotFound;
1202                        let durability = Durability::volatile();
1203                        let _ = self.whale.update_with_compare(
1204                            full_cache_key,
1205                            Some(entry),
1206                            |old_data, _new_data| {
1207                                !matches!(
1208                                    old_data.and_then(|d| d.as_ref()),
1209                                    Some(CachedEntry::AssetNotFound)
1210                                )
1211                            },
1212                            durability,
1213                            vec![],
1214                        );
1215
1216                        return Err(QueryError::MissingDependency {
1217                            description: format!("Asset not found: {:?}", key),
1218                        });
1219                    }
1220                }
1221            }
1222        }
1223
1224        // No locator registered or locator returned None - mark as pending
1225        emit_requested(&self.tracer, &key, TracerAssetState::Loading);
1226        self.pending
1227            .insert::<K>(full_asset_key.clone(), key.clone());
1228
1229        match self
1230            .whale
1231            .get_or_insert(full_cache_key, None, Durability::volatile(), vec![])
1232            .expect("get_or_insert with no dependencies cannot fail")
1233        {
1234            GetOrInsertResult::Inserted(node) => {
1235                Ok((AssetLoadingState::loading(key), node.changed_at))
1236            }
1237            GetOrInsertResult::Existing(node) => {
1238                let changed_at = node.changed_at;
1239                match &node.data {
1240                    Some(CachedEntry::AssetReady(arc)) => {
1241                        match arc.clone().downcast::<K::Asset>() {
1242                            Ok(value) => Ok((AssetLoadingState::ready(key, value), changed_at)),
1243                            Err(_) => Ok((AssetLoadingState::loading(key), changed_at)),
1244                        }
1245                    }
1246                    Some(CachedEntry::AssetNotFound) => Err(QueryError::MissingDependency {
1247                        description: format!("Asset not found: {:?}", key),
1248                    }),
1249                    _ => Ok((AssetLoadingState::loading(key), changed_at)),
1250                }
1251            }
1252        }
1253    }
1254
1255    /// Internal: Get asset state, checking cache and locator.
1256    fn get_asset_internal<K: AssetKey>(&self, key: K) -> Result<AssetLoadingState<K>, QueryError> {
1257        self.get_asset_with_revision(key).map(|(state, _)| state)
1258    }
1259}
1260
1261impl<T: Tracer> Db for QueryRuntime<T> {
1262    fn query<Q: Query>(&self, query: Q) -> Result<Arc<Q::Output>, QueryError> {
1263        QueryRuntime::query(self, query)
1264    }
1265
1266    fn asset<K: AssetKey>(&self, key: K) -> Result<AssetLoadingState<K>, QueryError> {
1267        self.get_asset_internal(key)
1268    }
1269
1270    fn list_queries<Q: Query>(&self) -> Vec<Q> {
1271        self.query_registry.get_all::<Q>()
1272    }
1273
1274    fn list_asset_keys<K: AssetKey>(&self) -> Vec<K> {
1275        self.asset_key_registry.get_all::<K>()
1276    }
1277}
1278
1279/// Context provided to queries during execution.
1280///
1281/// Use this to access dependencies via `query()`.
1282pub struct QueryContext<'a, T: Tracer = NoopTracer> {
1283    runtime: &'a QueryRuntime<T>,
1284    current_key: FullCacheKey,
1285    parent_query_type: &'static str,
1286    exec_ctx: ExecutionContext,
1287    deps: RefCell<Vec<FullCacheKey>>,
1288    /// Revision at first asset access, used for consistency checking.
1289    /// Assets with changed_at > this value were modified during query execution.
1290    first_access_revision: Cell<Option<RevisionCounter>>,
1291}
1292
1293impl<'a, T: Tracer> QueryContext<'a, T> {
1294    /// Ensure consistency of asset access.
1295    ///
1296    /// On first asset access: records max(current_global, dep_changed_at) as baseline.
1297    /// On subsequent asset accesses: checks dep_changed_at <= baseline.
1298    ///
1299    /// IMPORTANT: current_global must be obtained BEFORE accessing the asset.
1300    #[inline]
1301    fn ensure_consistent(
1302        &self,
1303        current_global: RevisionCounter,
1304        dep_changed_at: RevisionCounter,
1305    ) -> Result<(), QueryError> {
1306        match self.first_access_revision.get() {
1307            None => {
1308                // First asset access: record max(current_global, dep_changed_at)
1309                // Using max ensures we don't get false positives when only one asset is accessed
1310                let first = current_global.max(dep_changed_at);
1311                self.first_access_revision.set(Some(first));
1312                Ok(())
1313            }
1314            Some(first) => {
1315                // Subsequent asset accesses: check consistency
1316                if dep_changed_at > first {
1317                    Err(QueryError::InconsistentAssetResolution)
1318                } else {
1319                    Ok(())
1320                }
1321            }
1322        }
1323    }
1324
1325    /// Query a dependency.
1326    ///
1327    /// The dependency is automatically tracked for invalidation.
1328    ///
1329    /// # Example
1330    ///
1331    /// ```ignore
1332    /// fn query(self, db: &impl Db) -> Result<Self::Output, QueryError> {
1333    ///     let dep_result = db.query(OtherQuery { id: self.id })?;
1334    ///     Ok(process(&dep_result))
1335    /// }
1336    /// ```
1337    pub fn query<Q: Query>(&self, query: Q) -> Result<Arc<Q::Output>, QueryError> {
1338        let key = query.cache_key();
1339        let full_key = FullCacheKey::new::<Q, _>(&key);
1340
1341        // Emit dependency registered event
1342        self.runtime.tracer.on_dependency_registered(
1343            self.exec_ctx.span_id(),
1344            TracerQueryKey::new(self.parent_query_type, self.current_key.debug_repr()),
1345            TracerQueryKey::new(std::any::type_name::<Q>(), full_key.debug_repr()),
1346        );
1347
1348        // Record this as a dependency
1349        self.deps.borrow_mut().push(full_key.clone());
1350
1351        // Execute the query
1352        self.runtime.query(query)
1353    }
1354
1355    /// Access an asset, tracking it as a dependency.
1356    ///
1357    /// Returns `AssetLoadingState<K>`:
1358    /// - `is_loading()` if the asset is still being loaded
1359    /// - `is_ready()` if the asset is available
1360    ///
1361    /// Use `.suspend()?` to convert to `Result<Arc<K::Asset>, QueryError>`,
1362    /// which returns `Err(QueryError::Suspend { asset })` if still loading.
1363    ///
1364    /// # Example
1365    ///
1366    /// ```ignore
1367    /// #[query]
1368    /// fn process_file(db: &impl Db, path: FilePath) -> Result<Output, QueryError> {
1369    ///     let content = db.asset(path)?.suspend()?;
1370    ///     // Process content...
1371    ///     Ok(output)
1372    /// }
1373    /// ```
1374    ///
1375    /// # Errors
1376    ///
1377    /// Returns `Err(QueryError::MissingDependency)` if the asset was not found.
1378    pub fn asset<K: AssetKey>(&self, key: K) -> Result<AssetLoadingState<K>, QueryError> {
1379        let full_asset_key = FullAssetKey::new(&key);
1380        let full_cache_key = FullCacheKey::from_asset_key(&full_asset_key);
1381
1382        // 1. Get current_global FIRST (before accessing the asset)
1383        let current_global = self
1384            .runtime
1385            .whale
1386            .current_revision()
1387            .get(Durability::volatile());
1388
1389        // 2. Emit asset dependency registered event
1390        self.runtime.tracer.on_asset_dependency_registered(
1391            self.exec_ctx.span_id(),
1392            TracerQueryKey::new(self.parent_query_type, self.current_key.debug_repr()),
1393            TracerAssetKey::new(std::any::type_name::<K>(), format!("{:?}", key)),
1394        );
1395
1396        // 3. Record dependency on this asset
1397        self.deps.borrow_mut().push(full_cache_key);
1398
1399        // 4. Get asset AND changed_at from the same whale access (atomic)
1400        let (result, dep_changed_at) = match self.runtime.get_asset_with_revision(key) {
1401            Ok((state, rev)) => (Ok(state), rev),
1402            Err(e) => return Err(e),
1403        };
1404
1405        // 5. Check consistency - detects if resolve_asset was called during query execution
1406        self.ensure_consistent(current_global, dep_changed_at)?;
1407
1408        // Emit missing dependency event on error
1409        if let Err(QueryError::MissingDependency { ref description }) = result {
1410            self.runtime.tracer.on_missing_dependency(
1411                TracerQueryKey::new(self.parent_query_type, self.current_key.debug_repr()),
1412                description.clone(),
1413            );
1414        }
1415
1416        result
1417    }
1418
1419    /// List all query instances of type Q that have been registered.
1420    ///
1421    /// This method establishes a dependency on the "set" of queries of type Q.
1422    /// The calling query will be invalidated when:
1423    /// - A new query of type Q is first executed (added to set)
1424    ///
1425    /// The calling query will NOT be invalidated when:
1426    /// - An individual query of type Q has its value change
1427    ///
1428    /// # Example
1429    ///
1430    /// ```ignore
1431    /// #[query]
1432    /// fn all_results(db: &impl Db) -> Result<Vec<i32>, QueryError> {
1433    ///     let queries = db.list_queries::<MyQuery>();
1434    ///     let mut results = Vec::new();
1435    ///     for q in queries {
1436    ///         results.push(*db.query(q)?);
1437    ///     }
1438    ///     Ok(results)
1439    /// }
1440    /// ```
1441    pub fn list_queries<Q: Query>(&self) -> Vec<Q> {
1442        // Record dependency on the sentinel (set-level dependency)
1443        let sentinel = FullCacheKey::query_set_sentinel::<Q>();
1444
1445        self.runtime.tracer.on_dependency_registered(
1446            self.exec_ctx.span_id(),
1447            TracerQueryKey::new(self.parent_query_type, self.current_key.debug_repr()),
1448            TracerQueryKey::new("QuerySet", sentinel.debug_repr()),
1449        );
1450
1451        // Ensure sentinel exists in whale (for dependency tracking)
1452        if self.runtime.whale.get(&sentinel).is_none() {
1453            let _ =
1454                self.runtime
1455                    .whale
1456                    .register(sentinel.clone(), None, Durability::volatile(), vec![]);
1457        }
1458
1459        self.deps.borrow_mut().push(sentinel);
1460
1461        // Return all registered queries
1462        self.runtime.query_registry.get_all::<Q>()
1463    }
1464
1465    /// List all asset keys of type K that have been registered.
1466    ///
1467    /// This method establishes a dependency on the "set" of asset keys of type K.
1468    /// The calling query will be invalidated when:
1469    /// - A new asset of type K is resolved for the first time (added to set)
1470    /// - An asset of type K is removed via remove_asset
1471    ///
1472    /// The calling query will NOT be invalidated when:
1473    /// - An individual asset's value changes (use `db.asset()` for that)
1474    ///
1475    /// # Example
1476    ///
1477    /// ```ignore
1478    /// #[query]
1479    /// fn all_configs(db: &impl Db) -> Result<Vec<String>, QueryError> {
1480    ///     let keys = db.list_asset_keys::<ConfigFile>();
1481    ///     let mut contents = Vec::new();
1482    ///     for key in keys {
1483    ///         let content = db.asset(&key)?.suspend()?;
1484    ///         contents.push((*content).clone());
1485    ///     }
1486    ///     Ok(contents)
1487    /// }
1488    /// ```
1489    pub fn list_asset_keys<K: AssetKey>(&self) -> Vec<K> {
1490        // Record dependency on the sentinel (set-level dependency)
1491        let sentinel = FullCacheKey::asset_key_set_sentinel::<K>();
1492
1493        self.runtime.tracer.on_asset_dependency_registered(
1494            self.exec_ctx.span_id(),
1495            TracerQueryKey::new(self.parent_query_type, self.current_key.debug_repr()),
1496            TracerAssetKey::new("AssetKeySet", sentinel.debug_repr()),
1497        );
1498
1499        // Ensure sentinel exists in whale (for dependency tracking)
1500        if self.runtime.whale.get(&sentinel).is_none() {
1501            let _ =
1502                self.runtime
1503                    .whale
1504                    .register(sentinel.clone(), None, Durability::volatile(), vec![]);
1505        }
1506
1507        self.deps.borrow_mut().push(sentinel);
1508
1509        // Return all registered asset keys
1510        self.runtime.asset_key_registry.get_all::<K>()
1511    }
1512}
1513
1514impl<'a, T: Tracer> Db for QueryContext<'a, T> {
1515    fn query<Q: Query>(&self, query: Q) -> Result<Arc<Q::Output>, QueryError> {
1516        QueryContext::query(self, query)
1517    }
1518
1519    fn asset<K: AssetKey>(&self, key: K) -> Result<AssetLoadingState<K>, QueryError> {
1520        QueryContext::asset(self, key)
1521    }
1522
1523    fn list_queries<Q: Query>(&self) -> Vec<Q> {
1524        QueryContext::list_queries(self)
1525    }
1526
1527    fn list_asset_keys<K: AssetKey>(&self) -> Vec<K> {
1528        QueryContext::list_asset_keys(self)
1529    }
1530}
1531
1532#[cfg(test)]
1533mod tests {
1534    use super::*;
1535
1536    #[test]
1537    fn test_simple_query() {
1538        #[derive(Clone)]
1539        struct Add {
1540            a: i32,
1541            b: i32,
1542        }
1543
1544        impl Query for Add {
1545            type CacheKey = (i32, i32);
1546            type Output = i32;
1547
1548            fn cache_key(&self) -> Self::CacheKey {
1549                (self.a, self.b)
1550            }
1551
1552            fn query(self, _db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
1553                Ok(Arc::new(self.a + self.b))
1554            }
1555
1556            fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
1557                old == new
1558            }
1559        }
1560
1561        let runtime = QueryRuntime::new();
1562
1563        let result = runtime.query(Add { a: 1, b: 2 }).unwrap();
1564        assert_eq!(*result, 3);
1565
1566        // Second query should be cached
1567        let result2 = runtime.query(Add { a: 1, b: 2 }).unwrap();
1568        assert_eq!(*result2, 3);
1569    }
1570
1571    #[test]
1572    fn test_dependent_queries() {
1573        #[derive(Clone)]
1574        struct Base {
1575            value: i32,
1576        }
1577
1578        impl Query for Base {
1579            type CacheKey = i32;
1580            type Output = i32;
1581
1582            fn cache_key(&self) -> Self::CacheKey {
1583                self.value
1584            }
1585
1586            fn query(self, _db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
1587                Ok(Arc::new(self.value * 2))
1588            }
1589
1590            fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
1591                old == new
1592            }
1593        }
1594
1595        #[derive(Clone)]
1596        struct Derived {
1597            base_value: i32,
1598        }
1599
1600        impl Query for Derived {
1601            type CacheKey = i32;
1602            type Output = i32;
1603
1604            fn cache_key(&self) -> Self::CacheKey {
1605                self.base_value
1606            }
1607
1608            fn query(self, db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
1609                let base = db.query(Base {
1610                    value: self.base_value,
1611                })?;
1612                Ok(Arc::new(*base + 10))
1613            }
1614
1615            fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
1616                old == new
1617            }
1618        }
1619
1620        let runtime = QueryRuntime::new();
1621
1622        let result = runtime.query(Derived { base_value: 5 }).unwrap();
1623        assert_eq!(*result, 20); // 5 * 2 + 10
1624    }
1625
1626    #[test]
1627    fn test_cycle_detection() {
1628        #[derive(Clone)]
1629        struct CycleA {
1630            id: i32,
1631        }
1632
1633        #[derive(Clone)]
1634        struct CycleB {
1635            id: i32,
1636        }
1637
1638        impl Query for CycleA {
1639            type CacheKey = i32;
1640            type Output = i32;
1641
1642            fn cache_key(&self) -> Self::CacheKey {
1643                self.id
1644            }
1645
1646            fn query(self, db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
1647                let b = db.query(CycleB { id: self.id })?;
1648                Ok(Arc::new(*b + 1))
1649            }
1650
1651            fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
1652                old == new
1653            }
1654        }
1655
1656        impl Query for CycleB {
1657            type CacheKey = i32;
1658            type Output = i32;
1659
1660            fn cache_key(&self) -> Self::CacheKey {
1661                self.id
1662            }
1663
1664            fn query(self, db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
1665                let a = db.query(CycleA { id: self.id })?;
1666                Ok(Arc::new(*a + 1))
1667            }
1668
1669            fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
1670                old == new
1671            }
1672        }
1673
1674        let runtime = QueryRuntime::new();
1675
1676        let result = runtime.query(CycleA { id: 1 });
1677        assert!(matches!(result, Err(QueryError::Cycle { .. })));
1678    }
1679
1680    #[test]
1681    fn test_fallible_query() {
1682        #[derive(Clone)]
1683        struct ParseInt {
1684            input: String,
1685        }
1686
1687        impl Query for ParseInt {
1688            type CacheKey = String;
1689            type Output = Result<i32, std::num::ParseIntError>;
1690
1691            fn cache_key(&self) -> Self::CacheKey {
1692                self.input.clone()
1693            }
1694
1695            fn query(self, _db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
1696                Ok(Arc::new(self.input.parse()))
1697            }
1698
1699            fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
1700                old == new
1701            }
1702        }
1703
1704        let runtime = QueryRuntime::new();
1705
1706        // Valid parse
1707        let result = runtime
1708            .query(ParseInt {
1709                input: "42".to_string(),
1710            })
1711            .unwrap();
1712        assert_eq!(*result, Ok(42));
1713
1714        // Invalid parse - system succeeds, user error in output
1715        let result = runtime
1716            .query(ParseInt {
1717                input: "not_a_number".to_string(),
1718            })
1719            .unwrap();
1720        assert!(result.is_err());
1721    }
1722
1723    // Macro tests
1724    mod macro_tests {
1725        use super::*;
1726        use crate::query;
1727
1728        #[query]
1729        fn add(db: &impl Db, a: i32, b: i32) -> Result<i32, QueryError> {
1730            let _ = db; // silence unused warning
1731            Ok(a + b)
1732        }
1733
1734        #[test]
1735        fn test_macro_basic() {
1736            let runtime = QueryRuntime::new();
1737            let result = runtime.query(Add::new(1, 2)).unwrap();
1738            assert_eq!(*result, 3);
1739        }
1740
1741        #[query]
1742        fn simple_double(db: &impl Db, x: i32) -> Result<i32, QueryError> {
1743            let _ = db;
1744            Ok(x * 2)
1745        }
1746
1747        #[test]
1748        fn test_macro_simple() {
1749            let runtime = QueryRuntime::new();
1750            let result = runtime.query(SimpleDouble::new(5)).unwrap();
1751            assert_eq!(*result, 10);
1752        }
1753
1754        #[query(keys(id))]
1755        fn with_key_selection(
1756            db: &impl Db,
1757            id: u32,
1758            include_extra: bool,
1759        ) -> Result<String, QueryError> {
1760            let _ = db;
1761            Ok(format!("id={}, extra={}", id, include_extra))
1762        }
1763
1764        #[test]
1765        fn test_macro_key_selection() {
1766            let runtime = QueryRuntime::new();
1767
1768            // Same id, different include_extra - should return cached
1769            let r1 = runtime.query(WithKeySelection::new(1, true)).unwrap();
1770            let r2 = runtime.query(WithKeySelection::new(1, false)).unwrap();
1771
1772            // Both should have same value because only `id` is the key
1773            assert_eq!(*r1, "id=1, extra=true");
1774            assert_eq!(*r2, "id=1, extra=true"); // Cached!
1775        }
1776
1777        #[query]
1778        fn dependent(db: &impl Db, a: i32, b: i32) -> Result<i32, QueryError> {
1779            let sum = db.query(Add::new(a, b))?;
1780            Ok(*sum * 2)
1781        }
1782
1783        #[test]
1784        fn test_macro_dependencies() {
1785            let runtime = QueryRuntime::new();
1786            let result = runtime.query(Dependent::new(3, 4)).unwrap();
1787            assert_eq!(*result, 14); // (3 + 4) * 2
1788        }
1789
1790        #[query(output_eq)]
1791        fn with_output_eq(db: &impl Db, x: i32) -> Result<i32, QueryError> {
1792            let _ = db;
1793            Ok(x * 2)
1794        }
1795
1796        #[test]
1797        fn test_macro_output_eq() {
1798            let runtime = QueryRuntime::new();
1799            let result = runtime.query(WithOutputEq::new(5)).unwrap();
1800            assert_eq!(*result, 10);
1801        }
1802
1803        #[query(name = "CustomName")]
1804        fn original_name(db: &impl Db, x: i32) -> Result<i32, QueryError> {
1805            let _ = db;
1806            Ok(x)
1807        }
1808
1809        #[test]
1810        fn test_macro_custom_name() {
1811            let runtime = QueryRuntime::new();
1812            let result = runtime.query(CustomName::new(42)).unwrap();
1813            assert_eq!(*result, 42);
1814        }
1815
1816        // Test that attribute macros like #[tracing::instrument] are preserved
1817        // We use #[allow(unused_variables)] and #[inline] as test attributes since
1818        // they don't require external dependencies.
1819        #[allow(unused_variables)]
1820        #[inline]
1821        #[query]
1822        fn with_attributes(db: &impl Db, x: i32) -> Result<i32, QueryError> {
1823            // This would warn without #[allow(unused_variables)] on the generated method
1824            let unused_var = 42;
1825            Ok(x * 2)
1826        }
1827
1828        #[test]
1829        fn test_macro_preserves_attributes() {
1830            let runtime = QueryRuntime::new();
1831            // If attributes weren't preserved, this might warn about unused_var
1832            let result = runtime.query(WithAttributes::new(5)).unwrap();
1833            assert_eq!(*result, 10);
1834        }
1835    }
1836
1837    // Tests for poll() and changed_at()
1838    mod poll_tests {
1839        use super::*;
1840
1841        #[derive(Clone)]
1842        struct Counter {
1843            id: i32,
1844        }
1845
1846        impl Query for Counter {
1847            type CacheKey = i32;
1848            type Output = i32;
1849
1850            fn cache_key(&self) -> Self::CacheKey {
1851                self.id
1852            }
1853
1854            fn query(self, _db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
1855                Ok(Arc::new(self.id * 10))
1856            }
1857
1858            fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
1859                old == new
1860            }
1861        }
1862
1863        #[test]
1864        fn test_poll_returns_value_and_revision() {
1865            let runtime = QueryRuntime::new();
1866
1867            let result = runtime.poll(Counter { id: 1 }).unwrap();
1868
1869            // Value should be correct - access through Result and Arc
1870            assert_eq!(**result.value.as_ref().unwrap(), 10);
1871
1872            // Revision should be non-zero after first execution
1873            assert!(result.revision > 0);
1874        }
1875
1876        #[test]
1877        fn test_poll_revision_stable_on_cache_hit() {
1878            let runtime = QueryRuntime::new();
1879
1880            // First poll
1881            let result1 = runtime.poll(Counter { id: 1 }).unwrap();
1882            let rev1 = result1.revision;
1883
1884            // Second poll (cache hit)
1885            let result2 = runtime.poll(Counter { id: 1 }).unwrap();
1886            let rev2 = result2.revision;
1887
1888            // Revision should be the same (no change)
1889            assert_eq!(rev1, rev2);
1890        }
1891
1892        #[test]
1893        fn test_poll_revision_changes_on_invalidate() {
1894            let runtime = QueryRuntime::new();
1895
1896            // First poll
1897            let result1 = runtime.poll(Counter { id: 1 }).unwrap();
1898            let rev1 = result1.revision;
1899
1900            // Invalidate and poll again
1901            runtime.invalidate::<Counter>(&1);
1902            let result2 = runtime.poll(Counter { id: 1 }).unwrap();
1903            let rev2 = result2.revision;
1904
1905            // Revision should increase (value was recomputed)
1906            // Note: Since output_eq returns true (same value), this might not change
1907            // depending on early cutoff behavior. Let's verify the value is still correct.
1908            assert_eq!(**result2.value.as_ref().unwrap(), 10);
1909
1910            // With early cutoff, revision might stay the same if value didn't change
1911            // This is expected behavior
1912            assert!(rev2 >= rev1);
1913        }
1914
1915        #[test]
1916        fn test_changed_at_returns_none_for_unexecuted_query() {
1917            let runtime = QueryRuntime::new();
1918
1919            // Query has never been executed
1920            let rev = runtime.changed_at::<Counter>(&1);
1921            assert!(rev.is_none());
1922        }
1923
1924        #[test]
1925        fn test_changed_at_returns_revision_after_execution() {
1926            let runtime = QueryRuntime::new();
1927
1928            // Execute the query
1929            let _ = runtime.query(Counter { id: 1 }).unwrap();
1930
1931            // Now changed_at should return Some
1932            let rev = runtime.changed_at::<Counter>(&1);
1933            assert!(rev.is_some());
1934            assert!(rev.unwrap() > 0);
1935        }
1936
1937        #[test]
1938        fn test_changed_at_matches_poll_revision() {
1939            let runtime = QueryRuntime::new();
1940
1941            // Poll the query
1942            let result = runtime.poll(Counter { id: 1 }).unwrap();
1943
1944            // changed_at should match the revision from poll
1945            let rev = runtime.changed_at::<Counter>(&1);
1946            assert_eq!(rev, Some(result.revision));
1947        }
1948
1949        #[test]
1950        fn test_poll_value_access() {
1951            let runtime = QueryRuntime::new();
1952
1953            let result = runtime.poll(Counter { id: 5 }).unwrap();
1954
1955            // Access through Result and Arc
1956            let value: &i32 = result.value.as_ref().unwrap();
1957            assert_eq!(*value, 50);
1958
1959            // Access Arc directly via field after unwrapping Result
1960            let arc: &Arc<i32> = result.value.as_ref().unwrap();
1961            assert_eq!(**arc, 50);
1962        }
1963
1964        #[test]
1965        fn test_subscription_pattern() {
1966            let runtime = QueryRuntime::new();
1967
1968            // Simulate subscription pattern
1969            let mut last_revision: RevisionCounter = 0;
1970            let mut notifications = 0;
1971
1972            // First poll - should notify (new value)
1973            let result = runtime.poll(Counter { id: 1 }).unwrap();
1974            if result.revision > last_revision {
1975                notifications += 1;
1976                last_revision = result.revision;
1977            }
1978
1979            // Second poll - should NOT notify (no change)
1980            let result = runtime.poll(Counter { id: 1 }).unwrap();
1981            if result.revision > last_revision {
1982                notifications += 1;
1983                last_revision = result.revision;
1984            }
1985
1986            // Third poll - should NOT notify (no change)
1987            let result = runtime.poll(Counter { id: 1 }).unwrap();
1988            if result.revision > last_revision {
1989                notifications += 1;
1990                #[allow(unused_assignments)]
1991                {
1992                    last_revision = result.revision;
1993                }
1994            }
1995
1996            // Only the first poll should have triggered a notification
1997            assert_eq!(notifications, 1);
1998        }
1999    }
2000
2001    // Tests for GC APIs
2002    mod gc_tests {
2003        use super::*;
2004        use std::collections::HashSet;
2005        use std::sync::atomic::{AtomicUsize, Ordering};
2006        use std::sync::Mutex;
2007
2008        #[derive(Clone)]
2009        struct Leaf {
2010            id: i32,
2011        }
2012
2013        impl Query for Leaf {
2014            type CacheKey = i32;
2015            type Output = i32;
2016
2017            fn cache_key(&self) -> Self::CacheKey {
2018                self.id
2019            }
2020
2021            fn query(self, _db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
2022                Ok(Arc::new(self.id * 10))
2023            }
2024
2025            fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
2026                old == new
2027            }
2028        }
2029
2030        #[derive(Clone)]
2031        struct Parent {
2032            child_id: i32,
2033        }
2034
2035        impl Query for Parent {
2036            type CacheKey = i32;
2037            type Output = i32;
2038
2039            fn cache_key(&self) -> Self::CacheKey {
2040                self.child_id
2041            }
2042
2043            fn query(self, db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
2044                let child = db.query(Leaf { id: self.child_id })?;
2045                Ok(Arc::new(*child + 1))
2046            }
2047
2048            fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
2049                old == new
2050            }
2051        }
2052
2053        #[test]
2054        fn test_query_keys_returns_all_cached_queries() {
2055            let runtime = QueryRuntime::new();
2056
2057            // Execute some queries
2058            let _ = runtime.query(Leaf { id: 1 }).unwrap();
2059            let _ = runtime.query(Leaf { id: 2 }).unwrap();
2060            let _ = runtime.query(Leaf { id: 3 }).unwrap();
2061
2062            // Get all keys
2063            let keys = runtime.query_keys();
2064
2065            // Should have at least 3 keys (might have more due to sentinels)
2066            assert!(keys.len() >= 3);
2067        }
2068
2069        #[test]
2070        fn test_remove_removes_query() {
2071            let runtime = QueryRuntime::new();
2072
2073            // Execute a query
2074            let _ = runtime.query(Leaf { id: 1 }).unwrap();
2075
2076            // Get the key
2077            let full_key = FullCacheKey::new::<Leaf, _>(&1);
2078
2079            // Query should exist
2080            assert!(runtime.changed_at::<Leaf>(&1).is_some());
2081
2082            // Remove it
2083            assert!(runtime.remove(&full_key));
2084
2085            // Query should no longer exist
2086            assert!(runtime.changed_at::<Leaf>(&1).is_none());
2087        }
2088
2089        #[test]
2090        fn test_remove_if_unused_removes_leaf_query() {
2091            let runtime = QueryRuntime::new();
2092
2093            // Execute a leaf query (no dependents)
2094            let _ = runtime.query(Leaf { id: 1 }).unwrap();
2095
2096            // Should be removable since no other query depends on it
2097            assert!(runtime.remove_query_if_unused::<Leaf>(&1));
2098
2099            // Query should no longer exist
2100            assert!(runtime.changed_at::<Leaf>(&1).is_none());
2101        }
2102
2103        #[test]
2104        fn test_remove_if_unused_does_not_remove_query_with_dependents() {
2105            let runtime = QueryRuntime::new();
2106
2107            // Execute parent query (which depends on Leaf)
2108            let _ = runtime.query(Parent { child_id: 1 }).unwrap();
2109
2110            // Leaf query should not be removable since Parent depends on it
2111            assert!(!runtime.remove_query_if_unused::<Leaf>(&1));
2112
2113            // Leaf query should still exist
2114            assert!(runtime.changed_at::<Leaf>(&1).is_some());
2115
2116            // But Parent should be removable (no dependents)
2117            assert!(runtime.remove_query_if_unused::<Parent>(&1));
2118        }
2119
2120        #[test]
2121        fn test_remove_if_unused_with_full_cache_key() {
2122            let runtime = QueryRuntime::new();
2123
2124            // Execute a leaf query
2125            let _ = runtime.query(Leaf { id: 1 }).unwrap();
2126
2127            let full_key = FullCacheKey::new::<Leaf, _>(&1);
2128
2129            // Should be removable via FullCacheKey
2130            assert!(runtime.remove_if_unused(&full_key));
2131
2132            // Query should no longer exist
2133            assert!(runtime.changed_at::<Leaf>(&1).is_none());
2134        }
2135
2136        // Test tracer receives on_query_key calls
2137        struct GcTracker {
2138            accessed_keys: Mutex<HashSet<String>>,
2139            access_count: AtomicUsize,
2140        }
2141
2142        impl GcTracker {
2143            fn new() -> Self {
2144                Self {
2145                    accessed_keys: Mutex::new(HashSet::new()),
2146                    access_count: AtomicUsize::new(0),
2147                }
2148            }
2149        }
2150
2151        impl Tracer for GcTracker {
2152            fn new_span_id(&self) -> SpanId {
2153                SpanId(1)
2154            }
2155
2156            fn on_query_key(&self, full_key: &FullCacheKey) {
2157                self.accessed_keys
2158                    .lock()
2159                    .unwrap()
2160                    .insert(full_key.debug_repr().to_string());
2161                self.access_count.fetch_add(1, Ordering::Relaxed);
2162            }
2163        }
2164
2165        #[test]
2166        fn test_tracer_receives_on_query_key() {
2167            let tracker = GcTracker::new();
2168            let runtime = QueryRuntime::with_tracer(tracker);
2169
2170            // Execute some queries
2171            let _ = runtime.query(Leaf { id: 1 }).unwrap();
2172            let _ = runtime.query(Leaf { id: 2 }).unwrap();
2173
2174            // Tracer should have received on_query_key calls
2175            let count = runtime.tracer().access_count.load(Ordering::Relaxed);
2176            assert_eq!(count, 2);
2177
2178            // Check that the keys were recorded
2179            let keys = runtime.tracer().accessed_keys.lock().unwrap();
2180            assert!(keys.iter().any(|k| k.contains("Leaf")));
2181        }
2182
2183        #[test]
2184        fn test_tracer_receives_on_query_key_for_cache_hits() {
2185            let tracker = GcTracker::new();
2186            let runtime = QueryRuntime::with_tracer(tracker);
2187
2188            // Execute query twice (second is cache hit)
2189            let _ = runtime.query(Leaf { id: 1 }).unwrap();
2190            let _ = runtime.query(Leaf { id: 1 }).unwrap();
2191
2192            // Tracer should have received on_query_key for both calls
2193            let count = runtime.tracer().access_count.load(Ordering::Relaxed);
2194            assert_eq!(count, 2);
2195        }
2196
2197        #[test]
2198        fn test_gc_workflow() {
2199            let tracker = GcTracker::new();
2200            let runtime = QueryRuntime::with_tracer(tracker);
2201
2202            // Execute some queries
2203            let _ = runtime.query(Leaf { id: 1 }).unwrap();
2204            let _ = runtime.query(Leaf { id: 2 }).unwrap();
2205            let _ = runtime.query(Leaf { id: 3 }).unwrap();
2206
2207            // Simulate GC: remove all queries that are not in use
2208            let mut removed = 0;
2209            for key in runtime.query_keys() {
2210                if runtime.remove_if_unused(&key) {
2211                    removed += 1;
2212                }
2213            }
2214
2215            // All leaf queries should be removable
2216            assert!(removed >= 3);
2217
2218            // Queries should no longer exist
2219            assert!(runtime.changed_at::<Leaf>(&1).is_none());
2220            assert!(runtime.changed_at::<Leaf>(&2).is_none());
2221            assert!(runtime.changed_at::<Leaf>(&3).is_none());
2222        }
2223    }
2224}