query_flow/
runtime.rs

1//! Query runtime and context.
2
3use std::any::{Any, TypeId};
4use std::cell::{Cell, RefCell};
5use std::ops::Deref;
6use std::sync::Arc;
7
8use whale::{Durability, GetOrInsertResult, RevisionCounter, Runtime as WhaleRuntime};
9
10use crate::asset::{AssetKey, AssetLocator, DurabilityLevel, FullAssetKey, PendingAsset};
11use crate::db::Db;
12use crate::key::FullCacheKey;
13use crate::loading::AssetLoadingState;
14use crate::query::Query;
15use crate::storage::{
16    AssetKeyRegistry, CachedEntry, CachedValue, ErasedLocateResult, LocatorStorage, PendingStorage,
17    QueryRegistry, VerifierStorage,
18};
19use crate::tracer::{
20    ExecutionResult, InvalidationReason, NoopTracer, SpanId, Tracer, TracerAssetKey,
21    TracerAssetState, TracerQueryKey,
22};
23use crate::QueryError;
24
25/// Function type for comparing user errors during early cutoff.
26///
27/// Used by `QueryRuntimeBuilder::error_comparator` to customize how
28/// `QueryError::UserError` values are compared for caching purposes.
29pub type ErrorComparator = fn(&anyhow::Error, &anyhow::Error) -> bool;
30
31/// Number of durability levels (matches whale's default).
32const DURABILITY_LEVELS: usize = 4;
33
34// Thread-local query execution stack for cycle detection.
35thread_local! {
36    static QUERY_STACK: RefCell<Vec<FullCacheKey>> = const { RefCell::new(Vec::new()) };
37}
38
39/// Execution context passed through query execution.
40///
41/// Contains a span ID for tracing correlation.
42#[derive(Clone, Copy)]
43pub struct ExecutionContext {
44    span_id: SpanId,
45}
46
47impl ExecutionContext {
48    /// Create a new execution context with the given span ID.
49    #[inline]
50    pub fn new(span_id: SpanId) -> Self {
51        Self { span_id }
52    }
53
54    /// Get the span ID for this execution context.
55    #[inline]
56    pub fn span_id(&self) -> SpanId {
57        self.span_id
58    }
59}
60
61/// Result of polling a query, containing the value and its revision.
62///
63/// This is returned by [`QueryRuntime::poll`] and provides both the query result
64/// and its change revision, enabling efficient change detection for subscription
65/// patterns.
66///
67/// # Example
68///
69/// ```ignore
70/// let result = runtime.poll(MyQuery::new())?;
71///
72/// // Access the value via Deref
73/// println!("{:?}", *result);
74///
75/// // Check if changed since last poll
76/// if result.revision > last_known_revision {
77///     notify_subscribers(&result.value);
78///     last_known_revision = result.revision;
79/// }
80/// ```
81#[derive(Debug, Clone)]
82pub struct Polled<T> {
83    /// The query result value.
84    pub value: T,
85    /// The revision at which this value was last changed.
86    ///
87    /// Compare this with a previously stored revision to detect changes.
88    pub revision: RevisionCounter,
89}
90
91impl<T: Deref> Deref for Polled<T> {
92    type Target = T::Target;
93
94    fn deref(&self) -> &Self::Target {
95        &self.value
96    }
97}
98
99/// The query runtime manages query execution, caching, and dependency tracking.
100///
101/// This is cheap to clone - all data is behind `Arc`.
102///
103/// # Type Parameter
104///
105/// - `T: Tracer` - The tracer type for observability. Use `NoopTracer` (default)
106///   for zero-cost when tracing is not needed.
107///
108/// # Example
109///
110/// ```ignore
111/// // Without tracing (default)
112/// let runtime = QueryRuntime::new();
113///
114/// // With tracing
115/// let tracer = MyTracer::new();
116/// let runtime = QueryRuntime::with_tracer(tracer);
117///
118/// // Sync query execution
119/// let result = runtime.query(MyQuery { ... })?;
120/// ```
121pub struct QueryRuntime<T: Tracer = NoopTracer> {
122    /// Whale runtime for dependency tracking and cache storage.
123    /// Query outputs and asset values are stored in Node.data as Option<CachedEntry>.
124    whale: WhaleRuntime<FullCacheKey, Option<CachedEntry>, DURABILITY_LEVELS>,
125    /// Registered asset locators
126    locators: Arc<LocatorStorage<T>>,
127    /// Pending asset requests
128    pending: Arc<PendingStorage>,
129    /// Registry for tracking query instances (for list_queries)
130    query_registry: Arc<QueryRegistry>,
131    /// Registry for tracking asset keys (for list_asset_keys)
132    asset_key_registry: Arc<AssetKeyRegistry>,
133    /// Verifiers for re-executing queries (for verify-then-decide pattern)
134    verifiers: Arc<VerifierStorage>,
135    /// Comparator for user errors during early cutoff
136    error_comparator: ErrorComparator,
137    /// Tracer for observability
138    tracer: Arc<T>,
139}
140
141impl Default for QueryRuntime<NoopTracer> {
142    fn default() -> Self {
143        Self::new()
144    }
145}
146
147impl<T: Tracer> Clone for QueryRuntime<T> {
148    fn clone(&self) -> Self {
149        Self {
150            whale: self.whale.clone(),
151            locators: self.locators.clone(),
152            pending: self.pending.clone(),
153            query_registry: self.query_registry.clone(),
154            asset_key_registry: self.asset_key_registry.clone(),
155            verifiers: self.verifiers.clone(),
156            error_comparator: self.error_comparator,
157            tracer: self.tracer.clone(),
158        }
159    }
160}
161
162/// Default error comparator that treats all errors as different.
163///
164/// This is conservative - it always triggers recomputation when an error occurs.
165fn default_error_comparator(_a: &anyhow::Error, _b: &anyhow::Error) -> bool {
166    false
167}
168
169impl<T: Tracer> QueryRuntime<T> {
170    /// Get cached output along with its revision (single atomic access).
171    fn get_cached_with_revision<Q: Query>(
172        &self,
173        key: &FullCacheKey,
174    ) -> Option<(CachedValue<Arc<Q::Output>>, RevisionCounter)> {
175        let node = self.whale.get(key)?;
176        let revision = node.changed_at;
177        let entry = node.data.as_ref()?;
178        let cached = entry.to_cached_value::<Q::Output>()?;
179        Some((cached, revision))
180    }
181
182    /// Get a reference to the tracer.
183    #[inline]
184    pub fn tracer(&self) -> &T {
185        &self.tracer
186    }
187}
188
189impl QueryRuntime<NoopTracer> {
190    /// Create a new query runtime with default settings.
191    pub fn new() -> Self {
192        Self::with_tracer(NoopTracer)
193    }
194
195    /// Create a builder for customizing the runtime.
196    ///
197    /// # Example
198    ///
199    /// ```ignore
200    /// let runtime = QueryRuntime::builder()
201    ///     .error_comparator(|a, b| {
202    ///         // Custom error comparison logic
203    ///         match (a.downcast_ref::<MyError>(), b.downcast_ref::<MyError>()) {
204    ///             (Some(a), Some(b)) => a == b,
205    ///             _ => false,
206    ///         }
207    ///     })
208    ///     .build();
209    /// ```
210    pub fn builder() -> QueryRuntimeBuilder<NoopTracer> {
211        QueryRuntimeBuilder::new()
212    }
213}
214
215impl<T: Tracer> QueryRuntime<T> {
216    /// Create a new query runtime with the specified tracer.
217    pub fn with_tracer(tracer: T) -> Self {
218        QueryRuntimeBuilder::new().tracer(tracer).build()
219    }
220
221    /// Execute a query synchronously.
222    ///
223    /// Returns the cached result if valid, otherwise executes the query.
224    ///
225    /// # Errors
226    ///
227    /// - `QueryError::Suspend` - Query is waiting for async loading
228    /// - `QueryError::Cycle` - Dependency cycle detected
229    pub fn query<Q: Query>(&self, query: Q) -> Result<Arc<Q::Output>, QueryError> {
230        self.query_internal(query)
231            .and_then(|(inner_result, _)| inner_result.map_err(QueryError::UserError))
232    }
233
234    /// Internal implementation shared by query() and poll().
235    ///
236    /// Returns (result, revision) tuple where result is either Ok(output) or Err(user_error).
237    /// System errors (Suspend, Cycle, etc.) are returned as the outer Err.
238    #[allow(clippy::type_complexity)]
239    fn query_internal<Q: Query>(
240        &self,
241        query: Q,
242    ) -> Result<(Result<Arc<Q::Output>, Arc<anyhow::Error>>, RevisionCounter), QueryError> {
243        let key = query.cache_key();
244        let full_key = FullCacheKey::new::<Q, _>(&key);
245
246        // Emit query key event for GC tracking (before other events)
247        self.tracer.on_query_key(&full_key);
248
249        // Create execution context and emit start event
250        let span_id = self.tracer.new_span_id();
251        let exec_ctx = ExecutionContext::new(span_id);
252        let query_key = TracerQueryKey::new(std::any::type_name::<Q>(), full_key.debug_repr());
253
254        self.tracer.on_query_start(span_id, query_key.clone());
255
256        // Check for cycles using thread-local stack
257        let cycle_detected = QUERY_STACK.with(|stack| {
258            let stack = stack.borrow();
259            stack.iter().any(|k| k == &full_key)
260        });
261
262        if cycle_detected {
263            let path = QUERY_STACK.with(|stack| {
264                let stack = stack.borrow();
265                let mut path: Vec<String> =
266                    stack.iter().map(|k| k.debug_repr().to_string()).collect();
267                path.push(full_key.debug_repr().to_string());
268                path
269            });
270
271            self.tracer.on_cycle_detected(
272                path.iter()
273                    .map(|s| TracerQueryKey::new("", s.clone()))
274                    .collect(),
275            );
276            self.tracer
277                .on_query_end(span_id, query_key.clone(), ExecutionResult::CycleDetected);
278
279            return Err(QueryError::Cycle { path });
280        }
281
282        // Check if cached and valid (with verify-then-decide pattern)
283        let current_rev = self.whale.current_revision();
284
285        // Fast path: already verified at current revision
286        if self.whale.is_verified_at(&full_key, &current_rev) {
287            // Single atomic access to get both cached value and revision
288            if let Some((cached, revision)) = self.get_cached_with_revision::<Q>(&full_key) {
289                self.tracer.on_cache_check(span_id, query_key.clone(), true);
290                self.tracer
291                    .on_query_end(span_id, query_key.clone(), ExecutionResult::CacheHit);
292
293                return match cached {
294                    CachedValue::Ok(output) => Ok((Ok(output), revision)),
295                    CachedValue::UserError(err) => Ok((Err(err), revision)),
296                };
297            }
298        }
299
300        // Check shallow validity (deps' changed_at ok) and try verify-then-decide
301        if self.whale.is_valid(&full_key) {
302            // Single atomic access to get both cached value and revision
303            if let Some((cached, revision)) = self.get_cached_with_revision::<Q>(&full_key) {
304                // Shallow valid but not verified - verify deps first
305                let mut deps_verified = true;
306                if let Some(deps) = self.whale.get_dependency_ids(&full_key) {
307                    for dep in deps {
308                        if let Some(verifier) = self.verifiers.get(&dep) {
309                            // Re-query dep to verify it (triggers recursive verification)
310                            if verifier.verify(self as &dyn std::any::Any).is_err() {
311                                deps_verified = false;
312                                break;
313                            }
314                        }
315                    }
316                }
317
318                // Re-check validity after deps are verified
319                if deps_verified && self.whale.is_valid(&full_key) {
320                    // Deps didn't change their changed_at, mark as verified and use cached
321                    self.whale.mark_verified(&full_key, &current_rev);
322
323                    self.tracer.on_cache_check(span_id, query_key.clone(), true);
324                    self.tracer
325                        .on_query_end(span_id, query_key.clone(), ExecutionResult::CacheHit);
326
327                    return match cached {
328                        CachedValue::Ok(output) => Ok((Ok(output), revision)),
329                        CachedValue::UserError(err) => Ok((Err(err), revision)),
330                    };
331                }
332                // A dep's changed_at increased, fall through to execute
333            }
334        }
335
336        self.tracer
337            .on_cache_check(span_id, query_key.clone(), false);
338
339        // Execute the query with cycle tracking
340        QUERY_STACK.with(|stack| {
341            stack.borrow_mut().push(full_key.clone());
342        });
343
344        let result = self.execute_query::<Q>(&query, &full_key, exec_ctx);
345
346        QUERY_STACK.with(|stack| {
347            stack.borrow_mut().pop();
348        });
349
350        // Emit end event
351        let exec_result = match &result {
352            Ok((_, true, _)) => ExecutionResult::Changed,
353            Ok((_, false, _)) => ExecutionResult::Unchanged,
354            Err(QueryError::Suspend { .. }) => ExecutionResult::Suspended,
355            Err(QueryError::Cycle { .. }) => ExecutionResult::CycleDetected,
356            Err(e) => ExecutionResult::Error {
357                message: format!("{:?}", e),
358            },
359        };
360        self.tracer
361            .on_query_end(span_id, query_key.clone(), exec_result);
362
363        result.map(|(inner_result, _, revision)| (inner_result, revision))
364    }
365
366    /// Execute a query, caching the result if appropriate.
367    ///
368    /// Returns (result, output_changed, revision) tuple.
369    /// - `result`: Ok(output) for success, Err(user_error) for user errors
370    /// - System errors (Suspend, Cycle, etc.) are returned as outer Err
371    #[allow(clippy::type_complexity)]
372    fn execute_query<Q: Query>(
373        &self,
374        query: &Q,
375        full_key: &FullCacheKey,
376        exec_ctx: ExecutionContext,
377    ) -> Result<
378        (
379            Result<Arc<Q::Output>, Arc<anyhow::Error>>,
380            bool,
381            RevisionCounter,
382        ),
383        QueryError,
384    > {
385        // Create context for this query execution
386        let ctx = QueryContext {
387            runtime: self,
388            current_key: full_key.clone(),
389            parent_query_type: std::any::type_name::<Q>(),
390            exec_ctx,
391            deps: RefCell::new(Vec::new()),
392            first_access_revision: Cell::new(None),
393        };
394
395        // Execute the query (clone because query() takes ownership)
396        let result = query.clone().query(&ctx);
397
398        // Get collected dependencies
399        let deps: Vec<FullCacheKey> = ctx.deps.borrow().clone();
400
401        // Query durability defaults to stable - Whale will automatically reduce
402        // the effective durability to min(requested, min(dep_durabilities)).
403        // A pure query with no dependencies remains stable.
404        // A query depending on volatile assets becomes volatile.
405        let durability = Durability::stable();
406
407        match result {
408            Ok(output) => {
409                // Check if output changed (for early cutoff)
410                // existing_revision is Some only when output is unchanged (can reuse revision)
411                let existing_revision = if let Some((CachedValue::Ok(old), rev)) =
412                    self.get_cached_with_revision::<Q>(full_key)
413                {
414                    if Q::output_eq(&*old, &*output) {
415                        Some(rev) // Same output - reuse revision
416                    } else {
417                        None // Different output
418                    }
419                } else {
420                    None // No previous Ok value
421                };
422                let output_changed = existing_revision.is_none();
423
424                // Emit early cutoff check event
425                self.tracer.on_early_cutoff_check(
426                    exec_ctx.span_id(),
427                    TracerQueryKey::new(std::any::type_name::<Q>(), full_key.debug_repr()),
428                    output_changed,
429                );
430
431                // Update whale with cached entry (atomic update of value + dependency state)
432                let entry = CachedEntry::Ok(output.clone() as Arc<dyn std::any::Any + Send + Sync>);
433                let revision = if let Some(existing_rev) = existing_revision {
434                    // confirm_unchanged doesn't change changed_at, use existing
435                    let _ = self.whale.confirm_unchanged(full_key, deps);
436                    existing_rev
437                } else {
438                    // Use new_rev from register result
439                    match self
440                        .whale
441                        .register(full_key.clone(), Some(entry), durability, deps)
442                    {
443                        Ok(result) => result.new_rev,
444                        Err(missing) => {
445                            return Err(QueryError::DependenciesRemoved {
446                                missing_keys: missing,
447                            })
448                        }
449                    }
450                };
451
452                // Register query in registry for list_queries
453                let is_new_query = self.query_registry.register(query);
454                if is_new_query {
455                    let sentinel = FullCacheKey::query_set_sentinel::<Q>();
456                    let _ = self
457                        .whale
458                        .register(sentinel, None, Durability::stable(), vec![]);
459                }
460
461                // Store verifier for this query (for verify-then-decide pattern)
462                self.verifiers
463                    .insert::<Q, T>(full_key.clone(), query.clone());
464
465                Ok((Ok(output), output_changed, revision))
466            }
467            Err(QueryError::UserError(err)) => {
468                // Check if error changed (for early cutoff)
469                // existing_revision is Some only when error is unchanged (can reuse revision)
470                let existing_revision = if let Some((CachedValue::UserError(old_err), rev)) =
471                    self.get_cached_with_revision::<Q>(full_key)
472                {
473                    if (self.error_comparator)(old_err.as_ref(), err.as_ref()) {
474                        Some(rev) // Same error - reuse revision
475                    } else {
476                        None // Different error
477                    }
478                } else {
479                    None // No previous UserError
480                };
481                let output_changed = existing_revision.is_none();
482
483                // Emit early cutoff check event
484                self.tracer.on_early_cutoff_check(
485                    exec_ctx.span_id(),
486                    TracerQueryKey::new(std::any::type_name::<Q>(), full_key.debug_repr()),
487                    output_changed,
488                );
489
490                // Update whale with cached error (atomic update of value + dependency state)
491                let entry = CachedEntry::UserError(err.clone());
492                let revision = if let Some(existing_rev) = existing_revision {
493                    // confirm_unchanged doesn't change changed_at, use existing
494                    let _ = self.whale.confirm_unchanged(full_key, deps);
495                    existing_rev
496                } else {
497                    // Use new_rev from register result
498                    match self
499                        .whale
500                        .register(full_key.clone(), Some(entry), durability, deps)
501                    {
502                        Ok(result) => result.new_rev,
503                        Err(missing) => {
504                            return Err(QueryError::DependenciesRemoved {
505                                missing_keys: missing,
506                            })
507                        }
508                    }
509                };
510
511                // Register query in registry for list_queries
512                let is_new_query = self.query_registry.register(query);
513                if is_new_query {
514                    let sentinel = FullCacheKey::query_set_sentinel::<Q>();
515                    let _ = self
516                        .whale
517                        .register(sentinel, None, Durability::stable(), vec![]);
518                }
519
520                // Store verifier for this query (for verify-then-decide pattern)
521                self.verifiers
522                    .insert::<Q, T>(full_key.clone(), query.clone());
523
524                Ok((Err(err), output_changed, revision))
525            }
526            Err(e) => {
527                // System errors (Suspend, Cycle, Cancelled, MissingDependency) are not cached
528                Err(e)
529            }
530        }
531    }
532
533    /// Invalidate a query, forcing recomputation on next access.
534    ///
535    /// This also invalidates any queries that depend on this one.
536    pub fn invalidate<Q: Query>(&self, key: &Q::CacheKey) {
537        let full_key = FullCacheKey::new::<Q, _>(key);
538
539        self.tracer.on_query_invalidated(
540            TracerQueryKey::new(std::any::type_name::<Q>(), full_key.debug_repr()),
541            InvalidationReason::ManualInvalidation,
542        );
543
544        // Update whale to invalidate dependents (register with None to clear cached value)
545        // Use stable durability to increment all revision counters, ensuring queries
546        // at any durability level will see this as a change.
547        let _ = self
548            .whale
549            .register(full_key, None, Durability::stable(), vec![]);
550    }
551
552    /// Remove a query from the cache entirely, freeing memory.
553    ///
554    /// Use this for GC when a query is no longer needed.
555    /// Unlike `invalidate`, this removes all traces of the query from storage.
556    /// The query will be recomputed from scratch on next access.
557    ///
558    /// This also invalidates any queries that depend on this one.
559    pub fn remove_query<Q: Query>(&self, key: &Q::CacheKey) {
560        let full_key = FullCacheKey::new::<Q, _>(key);
561
562        self.tracer.on_query_invalidated(
563            TracerQueryKey::new(std::any::type_name::<Q>(), full_key.debug_repr()),
564            InvalidationReason::ManualInvalidation,
565        );
566
567        // Remove verifier if exists
568        self.verifiers.remove(&full_key);
569
570        // Remove from whale storage (this also handles dependent invalidation)
571        self.whale.remove(&full_key);
572
573        // Remove from registry and update sentinel for list_queries
574        if self.query_registry.remove::<Q>(key) {
575            let sentinel = FullCacheKey::query_set_sentinel::<Q>();
576            let _ = self
577                .whale
578                .register(sentinel, None, Durability::stable(), vec![]);
579        }
580    }
581
582    /// Clear all cached values by removing all nodes from whale.
583    ///
584    /// Note: This is a relatively expensive operation as it iterates through all keys.
585    pub fn clear_cache(&self) {
586        let keys = self.whale.keys();
587        for key in keys {
588            self.whale.remove(&key);
589        }
590    }
591
592    /// Poll a query, returning both the result and its change revision.
593    ///
594    /// This is useful for implementing subscription patterns where you need to
595    /// detect changes efficiently. Compare the returned `revision` with a
596    /// previously stored value to determine if the query result has changed.
597    ///
598    /// The returned `Polled` contains a `Result<Arc<Q::Output>, Arc<anyhow::Error>>`
599    /// as its value, allowing you to track revision changes for both success and
600    /// user error cases.
601    ///
602    /// # Example
603    ///
604    /// ```ignore
605    /// struct Subscription<Q: Query> {
606    ///     query: Q,
607    ///     last_revision: RevisionCounter,
608    ///     tx: Sender<Result<Arc<Q::Output>, Arc<anyhow::Error>>>,
609    /// }
610    ///
611    /// // Polling loop
612    /// for sub in &mut subscriptions {
613    ///     let result = runtime.poll(sub.query.clone())?;
614    ///     if result.revision > sub.last_revision {
615    ///         sub.tx.send(result.value.clone())?;
616    ///         sub.last_revision = result.revision;
617    ///     }
618    /// }
619    /// ```
620    ///
621    /// # Errors
622    ///
623    /// Returns `Err` only for system errors (Suspend, Cycle, etc.).
624    /// User errors are returned as `Ok(Polled { value: Err(error), ... })`.
625    #[allow(clippy::type_complexity)]
626    pub fn poll<Q: Query>(
627        &self,
628        query: Q,
629    ) -> Result<Polled<Result<Arc<Q::Output>, Arc<anyhow::Error>>>, QueryError> {
630        let (value, revision) = self.query_internal(query)?;
631        Ok(Polled { value, revision })
632    }
633
634    /// Get the change revision of a query without executing it.
635    ///
636    /// Returns `None` if the query has never been executed.
637    ///
638    /// This is useful for checking if a query has changed since the last poll
639    /// without the cost of executing the query.
640    ///
641    /// # Example
642    ///
643    /// ```ignore
644    /// // Check if query has changed before deciding to poll
645    /// if let Some(rev) = runtime.changed_at::<MyQuery>(&key) {
646    ///     if rev > last_known_revision {
647    ///         let result = runtime.query(MyQuery::new(key))?;
648    ///         // Process result...
649    ///     }
650    /// }
651    /// ```
652    pub fn changed_at<Q: Query>(&self, key: &Q::CacheKey) -> Option<RevisionCounter> {
653        let full_key = FullCacheKey::new::<Q, _>(key);
654        self.whale.get(&full_key).map(|node| node.changed_at)
655    }
656}
657
658// ============================================================================
659// GC (Garbage Collection) API
660// ============================================================================
661
662impl<T: Tracer> QueryRuntime<T> {
663    /// Get all query keys currently in the cache.
664    ///
665    /// This is useful for implementing custom garbage collection strategies.
666    /// Use this in combination with [`Tracer::on_query_key`] to track access
667    /// times and implement LRU, TTL, or other GC algorithms externally.
668    ///
669    /// # Example
670    ///
671    /// ```ignore
672    /// // Collect all keys that haven't been accessed recently
673    /// let stale_keys: Vec<_> = runtime.query_keys()
674    ///     .filter(|key| tracker.is_stale(key))
675    ///     .collect();
676    ///
677    /// // Remove stale queries that have no dependents
678    /// for key in stale_keys {
679    ///     runtime.remove_if_unused(&key);
680    /// }
681    /// ```
682    pub fn query_keys(&self) -> Vec<FullCacheKey> {
683        self.whale.keys()
684    }
685
686    /// Remove a query if it has no dependents.
687    ///
688    /// Returns `true` if the query was removed, `false` if it has dependents
689    /// or doesn't exist. This is the safe way to remove queries during GC,
690    /// as it won't break queries that depend on this one.
691    ///
692    /// # Example
693    ///
694    /// ```ignore
695    /// let key = FullCacheKey::new::<MyQuery, _>(&cache_key);
696    /// if runtime.remove_query_if_unused::<MyQuery>(&cache_key) {
697    ///     println!("Query removed");
698    /// } else {
699    ///     println!("Query has dependents, not removed");
700    /// }
701    /// ```
702    pub fn remove_query_if_unused<Q: Query>(&self, key: &Q::CacheKey) -> bool {
703        let full_key = FullCacheKey::new::<Q, _>(key);
704        self.remove_if_unused(&full_key)
705    }
706
707    /// Remove a query by its [`FullCacheKey`].
708    ///
709    /// This is the type-erased version of [`remove_query`](Self::remove_query).
710    /// Use this when you have a `FullCacheKey` from [`query_keys`](Self::query_keys)
711    /// or [`Tracer::on_query_key`].
712    ///
713    /// Returns `true` if the query was removed, `false` if it doesn't exist.
714    ///
715    /// # Warning
716    ///
717    /// This forcibly removes the query even if other queries depend on it.
718    /// Dependent queries will be recomputed on next access. For safe GC,
719    /// use [`remove_if_unused`](Self::remove_if_unused) instead.
720    pub fn remove(&self, key: &FullCacheKey) -> bool {
721        // Remove verifier if exists
722        self.verifiers.remove(key);
723
724        // Remove from whale storage
725        self.whale.remove(key).is_some()
726    }
727
728    /// Remove a query by its [`FullCacheKey`] if it has no dependents.
729    ///
730    /// This is the type-erased version of [`remove_query_if_unused`](Self::remove_query_if_unused).
731    /// Use this when you have a `FullCacheKey` from [`query_keys`](Self::query_keys)
732    /// or [`Tracer::on_query_key`].
733    ///
734    /// Returns `true` if the query was removed, `false` if it has dependents
735    /// or doesn't exist.
736    ///
737    /// # Example
738    ///
739    /// ```ignore
740    /// // Implement LRU GC
741    /// for key in runtime.query_keys() {
742    ///     if tracker.is_expired(&key) {
743    ///         runtime.remove_if_unused(&key);
744    ///     }
745    /// }
746    /// ```
747    pub fn remove_if_unused(&self, key: &FullCacheKey) -> bool {
748        if self.whale.remove_if_unused(key.clone()).is_some() {
749            // Successfully removed - clean up verifier
750            self.verifiers.remove(key);
751            true
752        } else {
753            false
754        }
755    }
756}
757
758// ============================================================================
759// Builder
760// ============================================================================
761
762/// Builder for [`QueryRuntime`] with customizable settings.
763///
764/// # Example
765///
766/// ```ignore
767/// let runtime = QueryRuntime::builder()
768///     .error_comparator(|a, b| {
769///         // Treat all errors of the same type as equal
770///         a.downcast_ref::<std::io::Error>().is_some()
771///             == b.downcast_ref::<std::io::Error>().is_some()
772///     })
773///     .build();
774/// ```
775pub struct QueryRuntimeBuilder<T: Tracer = NoopTracer> {
776    error_comparator: ErrorComparator,
777    tracer: T,
778}
779
780impl Default for QueryRuntimeBuilder<NoopTracer> {
781    fn default() -> Self {
782        Self::new()
783    }
784}
785
786impl QueryRuntimeBuilder<NoopTracer> {
787    /// Create a new builder with default settings.
788    pub fn new() -> Self {
789        Self {
790            error_comparator: default_error_comparator,
791            tracer: NoopTracer,
792        }
793    }
794}
795
796impl<T: Tracer> QueryRuntimeBuilder<T> {
797    /// Set the error comparator function for early cutoff optimization.
798    ///
799    /// When a query returns `QueryError::UserError`, this function is used
800    /// to compare it with the previously cached error. If they are equal,
801    /// downstream queries can skip recomputation (early cutoff).
802    ///
803    /// The default comparator returns `false` for all errors, meaning errors
804    /// are always considered different (conservative, always recomputes).
805    ///
806    /// # Example
807    ///
808    /// ```ignore
809    /// // Treat errors as equal if they have the same display message
810    /// let runtime = QueryRuntime::builder()
811    ///     .error_comparator(|a, b| a.to_string() == b.to_string())
812    ///     .build();
813    /// ```
814    pub fn error_comparator(mut self, f: ErrorComparator) -> Self {
815        self.error_comparator = f;
816        self
817    }
818
819    /// Set the tracer for observability.
820    pub fn tracer<U: Tracer>(self, tracer: U) -> QueryRuntimeBuilder<U> {
821        QueryRuntimeBuilder {
822            error_comparator: self.error_comparator,
823            tracer,
824        }
825    }
826
827    /// Build the runtime with the configured settings.
828    pub fn build(self) -> QueryRuntime<T> {
829        QueryRuntime {
830            whale: WhaleRuntime::new(),
831            locators: Arc::new(LocatorStorage::new()),
832            pending: Arc::new(PendingStorage::new()),
833            query_registry: Arc::new(QueryRegistry::new()),
834            asset_key_registry: Arc::new(AssetKeyRegistry::new()),
835            verifiers: Arc::new(VerifierStorage::new()),
836            error_comparator: self.error_comparator,
837            tracer: Arc::new(self.tracer),
838        }
839    }
840}
841
842// ============================================================================
843// Asset API
844// ============================================================================
845
846impl<T: Tracer> QueryRuntime<T> {
847    /// Register an asset locator for a specific asset key type.
848    ///
849    /// Only one locator can be registered per key type. Later registrations
850    /// replace earlier ones.
851    ///
852    /// # Example
853    ///
854    /// ```ignore
855    /// let runtime = QueryRuntime::new();
856    /// runtime.register_asset_locator(FileSystemLocator::new("/assets"));
857    /// ```
858    pub fn register_asset_locator<K, L>(&self, locator: L)
859    where
860        K: AssetKey,
861        L: AssetLocator<K>,
862    {
863        self.locators.insert::<K, L>(locator);
864    }
865
866    /// Get an iterator over pending asset requests.
867    ///
868    /// Returns assets that have been requested but not yet resolved.
869    /// The user should fetch these externally and call `resolve_asset()`.
870    ///
871    /// # Example
872    ///
873    /// ```ignore
874    /// for pending in runtime.pending_assets() {
875    ///     if let Some(path) = pending.key::<FilePath>() {
876    ///         let content = fetch_file(path);
877    ///         runtime.resolve_asset(path.clone(), content);
878    ///     }
879    /// }
880    /// ```
881    pub fn pending_assets(&self) -> Vec<PendingAsset> {
882        self.pending.get_all()
883    }
884
885    /// Get pending assets filtered by key type.
886    pub fn pending_assets_of<K: AssetKey>(&self) -> Vec<K> {
887        self.pending.get_of_type::<K>()
888    }
889
890    /// Check if there are any pending assets.
891    pub fn has_pending_assets(&self) -> bool {
892        !self.pending.is_empty()
893    }
894
895    /// Resolve an asset with its loaded value.
896    ///
897    /// This marks the asset as ready and invalidates any queries that
898    /// depend on it (if the value changed), triggering recomputation on next access.
899    ///
900    /// This method is idempotent - resolving with the same value (via `asset_eq`)
901    /// will not trigger downstream recomputation.
902    ///
903    /// # Arguments
904    ///
905    /// * `key` - The asset key identifying this resource
906    /// * `value` - The loaded asset value
907    /// * `durability` - How frequently this asset is expected to change
908    ///
909    /// # Example
910    ///
911    /// ```ignore
912    /// let content = std::fs::read_to_string(&path)?;
913    /// runtime.resolve_asset(FilePath(path), content, DurabilityLevel::Volatile);
914    /// ```
915    pub fn resolve_asset<K: AssetKey>(&self, key: K, value: K::Asset, durability: DurabilityLevel) {
916        self.resolve_asset_internal(key, value, durability);
917    }
918
919    /// Resolve an asset with an error.
920    ///
921    /// This marks the asset as errored and caches the error. Queries depending
922    /// on this asset will receive `Err(QueryError::UserError(...))`.
923    ///
924    /// Use this when async loading fails (e.g., network error, file not found,
925    /// access denied).
926    ///
927    /// # Arguments
928    ///
929    /// * `key` - The asset key identifying this resource
930    /// * `error` - The error to cache (will be wrapped in `Arc`)
931    /// * `durability` - How frequently this error state is expected to change
932    ///
933    /// # Example
934    ///
935    /// ```ignore
936    /// match fetch_file(&path) {
937    ///     Ok(content) => runtime.resolve_asset(FilePath(path), content, DurabilityLevel::Volatile),
938    ///     Err(e) => runtime.resolve_asset_error(FilePath(path), e, DurabilityLevel::Volatile),
939    /// }
940    /// ```
941    pub fn resolve_asset_error<K: AssetKey>(
942        &self,
943        key: K,
944        error: impl Into<anyhow::Error>,
945        durability: DurabilityLevel,
946    ) {
947        let full_asset_key = FullAssetKey::new(&key);
948        let full_cache_key = FullCacheKey::from_asset_key(&full_asset_key);
949
950        // Remove from pending BEFORE registering the error
951        self.pending.remove(&full_asset_key);
952
953        // Prepare the error entry
954        let error_arc = Arc::new(error.into());
955        let entry = CachedEntry::AssetError(error_arc.clone());
956        let durability =
957            Durability::new(durability.as_u8() as usize).unwrap_or(Durability::volatile());
958
959        // Atomic compare-and-update (errors are always considered changed for now)
960        let result = self
961            .whale
962            .update_with_compare(
963                full_cache_key,
964                Some(entry),
965                |old_data, _new_data| {
966                    // Compare old and new errors using error_comparator
967                    match old_data.and_then(|d| d.as_ref()) {
968                        Some(CachedEntry::AssetError(old_err)) => {
969                            !(self.error_comparator)(old_err.as_ref(), error_arc.as_ref())
970                        }
971                        _ => true, // Loading, Ready, or not present -> changed
972                    }
973                },
974                durability,
975                vec![],
976            )
977            .expect("update_with_compare with no dependencies cannot fail");
978
979        // Emit asset resolved event (with changed status)
980        self.tracer.on_asset_resolved(
981            TracerAssetKey::new(std::any::type_name::<K>(), format!("{:?}", key)),
982            result.changed,
983        );
984
985        // Register asset key in registry for list_asset_keys
986        let is_new_asset = self.asset_key_registry.register(&key);
987        if is_new_asset {
988            // Update sentinel to invalidate list_asset_keys dependents
989            let sentinel = FullCacheKey::asset_key_set_sentinel::<K>();
990            let _ = self
991                .whale
992                .register(sentinel, None, Durability::stable(), vec![]);
993        }
994    }
995
996    fn resolve_asset_internal<K: AssetKey>(
997        &self,
998        key: K,
999        value: K::Asset,
1000        durability_level: DurabilityLevel,
1001    ) {
1002        let full_asset_key = FullAssetKey::new(&key);
1003        let full_cache_key = FullCacheKey::from_asset_key(&full_asset_key);
1004
1005        // Remove from pending BEFORE registering the value
1006        self.pending.remove(&full_asset_key);
1007
1008        // Prepare the new entry
1009        let value_arc: Arc<K::Asset> = Arc::new(value);
1010        let entry = CachedEntry::AssetReady(value_arc.clone() as Arc<dyn Any + Send + Sync>);
1011        let durability =
1012            Durability::new(durability_level.as_u8() as usize).unwrap_or(Durability::volatile());
1013
1014        // Atomic compare-and-update
1015        let result = self
1016            .whale
1017            .update_with_compare(
1018                full_cache_key,
1019                Some(entry),
1020                |old_data, _new_data| {
1021                    // Compare old and new values
1022                    match old_data.and_then(|d| d.as_ref()) {
1023                        Some(CachedEntry::AssetReady(old_arc)) => {
1024                            match old_arc.clone().downcast::<K::Asset>() {
1025                                Ok(old_value) => !K::asset_eq(&old_value, &value_arc),
1026                                Err(_) => true, // Type mismatch, treat as changed
1027                            }
1028                        }
1029                        _ => true, // Loading, NotFound, or not present -> changed
1030                    }
1031                },
1032                durability,
1033                vec![],
1034            )
1035            .expect("update_with_compare with no dependencies cannot fail");
1036
1037        // Emit asset resolved event
1038        self.tracer.on_asset_resolved(
1039            TracerAssetKey::new(std::any::type_name::<K>(), format!("{:?}", key)),
1040            result.changed,
1041        );
1042
1043        // Register asset key in registry for list_asset_keys
1044        let is_new_asset = self.asset_key_registry.register(&key);
1045        if is_new_asset {
1046            // Update sentinel to invalidate list_asset_keys dependents
1047            let sentinel = FullCacheKey::asset_key_set_sentinel::<K>();
1048            let _ = self
1049                .whale
1050                .register(sentinel, None, Durability::stable(), vec![]);
1051        }
1052    }
1053
1054    /// Invalidate an asset, forcing queries to re-request it.
1055    ///
1056    /// The asset will be marked as loading and added to pending assets.
1057    /// Dependent queries will suspend until the asset is resolved again.
1058    ///
1059    /// # Example
1060    ///
1061    /// ```ignore
1062    /// // File was modified externally
1063    /// runtime.invalidate_asset(&FilePath("config.json".into()));
1064    /// // Queries depending on this asset will now suspend
1065    /// // User should fetch the new value and call resolve_asset
1066    /// ```
1067    pub fn invalidate_asset<K: AssetKey>(&self, key: &K) {
1068        let full_asset_key = FullAssetKey::new(key);
1069        let full_cache_key = FullCacheKey::from_asset_key(&full_asset_key);
1070
1071        // Emit asset invalidated event
1072        self.tracer.on_asset_invalidated(TracerAssetKey::new(
1073            std::any::type_name::<K>(),
1074            format!("{:?}", key),
1075        ));
1076
1077        // Add to pending FIRST (before clearing whale state)
1078        // This ensures: readers see either old value, or Loading+pending
1079        self.pending.insert::<K>(full_asset_key, key.clone());
1080
1081        // Atomic: clear cached value + invalidate dependents
1082        // Using None for data means "needs to be loaded"
1083        // Use stable durability to ensure queries at any durability level see the change.
1084        let _ = self
1085            .whale
1086            .register(full_cache_key, None, Durability::stable(), vec![]);
1087    }
1088
1089    /// Remove an asset from the cache entirely.
1090    ///
1091    /// Unlike `invalidate_asset`, this removes all traces of the asset.
1092    /// Dependent queries will go through the locator again on next access.
1093    pub fn remove_asset<K: AssetKey>(&self, key: &K) {
1094        let full_asset_key = FullAssetKey::new(key);
1095        let full_cache_key = FullCacheKey::from_asset_key(&full_asset_key);
1096
1097        // Remove from pending first
1098        self.pending.remove(&full_asset_key);
1099
1100        // Remove from whale (this also cleans up dependency edges)
1101        // whale.remove() invalidates dependents before removing
1102        self.whale.remove(&full_cache_key);
1103
1104        // Remove from registry and update sentinel for list_asset_keys
1105        if self.asset_key_registry.remove::<K>(key) {
1106            let sentinel = FullCacheKey::asset_key_set_sentinel::<K>();
1107            let _ = self
1108                .whale
1109                .register(sentinel, None, Durability::stable(), vec![]);
1110        }
1111    }
1112
1113    /// Get an asset by key without tracking dependencies.
1114    ///
1115    /// Unlike `QueryContext::asset()`, this method does NOT register the caller
1116    /// as a dependent of the asset. Use this for direct asset access outside
1117    /// of query execution.
1118    ///
1119    /// # Returns
1120    ///
1121    /// - `Ok(AssetLoadingState::ready(...))` - Asset is loaded and ready
1122    /// - `Ok(AssetLoadingState::loading(...))` - Asset is still loading (added to pending)
1123    /// - `Err(QueryError::MissingDependency)` - Asset was not found
1124    pub fn get_asset<K: AssetKey>(&self, key: K) -> Result<AssetLoadingState<K>, QueryError> {
1125        self.get_asset_internal(key)
1126    }
1127
1128    /// Internal: Get asset state and its changed_at revision atomically.
1129    ///
1130    /// This is the "direct" version called from QueryRuntime::asset (no dependency tracking).
1131    /// For calls from QueryContext::asset, use `get_asset_with_revision_ctx`.
1132    ///
1133    /// Returns (AssetLoadingState, changed_at) where changed_at is from the same
1134    /// whale node that provided the asset value.
1135    fn get_asset_with_revision<K: AssetKey>(
1136        &self,
1137        key: K,
1138    ) -> Result<(AssetLoadingState<K>, RevisionCounter), QueryError> {
1139        let full_asset_key = FullAssetKey::new(&key);
1140        let full_cache_key = FullCacheKey::from_asset_key(&full_asset_key);
1141
1142        // Helper to emit AssetRequested event
1143        let emit_requested = |tracer: &T, key: &K, state: TracerAssetState| {
1144            tracer.on_asset_requested(
1145                TracerAssetKey::new(std::any::type_name::<K>(), format!("{:?}", key)),
1146                state,
1147            );
1148        };
1149
1150        // Check whale cache first (single atomic read)
1151        if let Some(node) = self.whale.get(&full_cache_key) {
1152            let changed_at = node.changed_at;
1153            // Check if valid at current revision
1154            if self.whale.is_valid(&full_cache_key) {
1155                match &node.data {
1156                    Some(CachedEntry::AssetReady(arc)) => {
1157                        emit_requested(&self.tracer, &key, TracerAssetState::Ready);
1158                        match arc.clone().downcast::<K::Asset>() {
1159                            Ok(value) => {
1160                                return Ok((AssetLoadingState::ready(key, value), changed_at))
1161                            }
1162                            Err(_) => {
1163                                return Err(QueryError::MissingDependency {
1164                                    description: format!("Asset type mismatch: {:?}", key),
1165                                })
1166                            }
1167                        }
1168                    }
1169                    Some(CachedEntry::AssetError(err)) => {
1170                        emit_requested(&self.tracer, &key, TracerAssetState::NotFound);
1171                        return Err(QueryError::UserError(err.clone()));
1172                    }
1173                    None => {
1174                        // Loading state
1175                        emit_requested(&self.tracer, &key, TracerAssetState::Loading);
1176                        return Ok((AssetLoadingState::loading(key), changed_at));
1177                    }
1178                    _ => {
1179                        // Query-related entries (Ok, UserError) shouldn't be here
1180                        // Fall through to locator
1181                    }
1182                }
1183            }
1184        }
1185
1186        // Not in cache or invalid - try locator
1187        if let Some(result) = self
1188            .locators
1189            .locate_with_runtime(TypeId::of::<K>(), self, &key)
1190        {
1191            match result {
1192                Ok(ErasedLocateResult::Ready {
1193                    value: arc,
1194                    durability: durability_level,
1195                }) => {
1196                    emit_requested(&self.tracer, &key, TracerAssetState::Ready);
1197
1198                    let typed_value: Arc<K::Asset> = match arc.downcast::<K::Asset>() {
1199                        Ok(v) => v,
1200                        Err(_) => {
1201                            return Err(QueryError::MissingDependency {
1202                                description: format!("Asset type mismatch: {:?}", key),
1203                            });
1204                        }
1205                    };
1206
1207                    // Store in whale atomically with early cutoff
1208                    let entry = CachedEntry::AssetReady(typed_value.clone());
1209                    let durability = Durability::new(durability_level.as_u8() as usize)
1210                        .unwrap_or(Durability::volatile());
1211                    let new_value = typed_value.clone();
1212                    let result = self
1213                        .whale
1214                        .update_with_compare(
1215                            full_cache_key,
1216                            Some(entry),
1217                            |old_data, _new_data| {
1218                                let Some(CachedEntry::AssetReady(old_arc)) =
1219                                    old_data.and_then(|d| d.as_ref())
1220                                else {
1221                                    return true;
1222                                };
1223                                let Ok(old_value) = old_arc.clone().downcast::<K::Asset>() else {
1224                                    return true;
1225                                };
1226                                !K::asset_eq(&old_value, &new_value)
1227                            },
1228                            durability,
1229                            vec![],
1230                        )
1231                        .expect("update_with_compare with no dependencies cannot fail");
1232
1233                    return Ok((AssetLoadingState::ready(key, typed_value), result.revision));
1234                }
1235                Ok(ErasedLocateResult::Pending) => {
1236                    emit_requested(&self.tracer, &key, TracerAssetState::Loading);
1237
1238                    // Add to pending list for Pending result
1239                    self.pending.insert::<K>(full_asset_key, key.clone());
1240                    match self
1241                        .whale
1242                        .get_or_insert(full_cache_key, None, Durability::volatile(), vec![])
1243                        .expect("get_or_insert with no dependencies cannot fail")
1244                    {
1245                        GetOrInsertResult::Inserted(node) => {
1246                            return Ok((AssetLoadingState::loading(key), node.changed_at));
1247                        }
1248                        GetOrInsertResult::Existing(node) => {
1249                            let changed_at = node.changed_at;
1250                            match &node.data {
1251                                Some(CachedEntry::AssetReady(arc)) => {
1252                                    match arc.clone().downcast::<K::Asset>() {
1253                                        Ok(value) => {
1254                                            return Ok((
1255                                                AssetLoadingState::ready(key, value),
1256                                                changed_at,
1257                                            ))
1258                                        }
1259                                        Err(_) => {
1260                                            return Ok((
1261                                                AssetLoadingState::loading(key),
1262                                                changed_at,
1263                                            ))
1264                                        }
1265                                    }
1266                                }
1267                                Some(CachedEntry::AssetError(err)) => {
1268                                    return Err(QueryError::UserError(err.clone()));
1269                                }
1270                                _ => return Ok((AssetLoadingState::loading(key), changed_at)),
1271                            }
1272                        }
1273                    }
1274                }
1275                Err(QueryError::UserError(err)) => {
1276                    // Locator returned a user error - cache it as AssetError
1277                    let entry = CachedEntry::AssetError(err.clone());
1278                    let _ = self.whale.register(
1279                        full_cache_key,
1280                        Some(entry),
1281                        Durability::volatile(),
1282                        vec![],
1283                    );
1284                    return Err(QueryError::UserError(err));
1285                }
1286                Err(e) => {
1287                    // Other errors (Cycle, Suspended, etc.) - do NOT cache, propagate directly
1288                    return Err(e);
1289                }
1290            }
1291        }
1292
1293        // No locator registered or locator returned None - mark as pending
1294        emit_requested(&self.tracer, &key, TracerAssetState::Loading);
1295        self.pending
1296            .insert::<K>(full_asset_key.clone(), key.clone());
1297
1298        match self
1299            .whale
1300            .get_or_insert(full_cache_key, None, Durability::volatile(), vec![])
1301            .expect("get_or_insert with no dependencies cannot fail")
1302        {
1303            GetOrInsertResult::Inserted(node) => {
1304                Ok((AssetLoadingState::loading(key), node.changed_at))
1305            }
1306            GetOrInsertResult::Existing(node) => {
1307                let changed_at = node.changed_at;
1308                match &node.data {
1309                    Some(CachedEntry::AssetReady(arc)) => {
1310                        match arc.clone().downcast::<K::Asset>() {
1311                            Ok(value) => Ok((AssetLoadingState::ready(key, value), changed_at)),
1312                            Err(_) => Ok((AssetLoadingState::loading(key), changed_at)),
1313                        }
1314                    }
1315                    Some(CachedEntry::AssetError(err)) => Err(QueryError::UserError(err.clone())),
1316                    _ => Ok((AssetLoadingState::loading(key), changed_at)),
1317                }
1318            }
1319        }
1320    }
1321
1322    /// Internal: Get asset state and its changed_at revision atomically (with QueryContext).
1323    ///
1324    /// This version is called from QueryContext::asset and enables dependency tracking
1325    /// within the locator's db.query() and db.asset() calls.
1326    fn get_asset_with_revision_ctx<K: AssetKey>(
1327        &self,
1328        key: K,
1329        ctx: &QueryContext<'_, T>,
1330    ) -> Result<(AssetLoadingState<K>, RevisionCounter), QueryError> {
1331        let full_asset_key = FullAssetKey::new(&key);
1332        let full_cache_key = FullCacheKey::from_asset_key(&full_asset_key);
1333
1334        // Helper to emit AssetRequested event
1335        let emit_requested = |tracer: &T, key: &K, state: TracerAssetState| {
1336            tracer.on_asset_requested(
1337                TracerAssetKey::new(std::any::type_name::<K>(), format!("{:?}", key)),
1338                state,
1339            );
1340        };
1341
1342        // Check whale cache first (single atomic read)
1343        if let Some(node) = self.whale.get(&full_cache_key) {
1344            let changed_at = node.changed_at;
1345            // Check if valid at current revision
1346            if self.whale.is_valid(&full_cache_key) {
1347                match &node.data {
1348                    Some(CachedEntry::AssetReady(arc)) => {
1349                        emit_requested(&self.tracer, &key, TracerAssetState::Ready);
1350                        match arc.clone().downcast::<K::Asset>() {
1351                            Ok(value) => {
1352                                return Ok((AssetLoadingState::ready(key, value), changed_at))
1353                            }
1354                            Err(_) => {
1355                                return Err(QueryError::MissingDependency {
1356                                    description: format!("Asset type mismatch: {:?}", key),
1357                                })
1358                            }
1359                        }
1360                    }
1361                    Some(CachedEntry::AssetError(err)) => {
1362                        emit_requested(&self.tracer, &key, TracerAssetState::NotFound);
1363                        return Err(QueryError::UserError(err.clone()));
1364                    }
1365                    None => {
1366                        // Loading state
1367                        emit_requested(&self.tracer, &key, TracerAssetState::Loading);
1368                        return Ok((AssetLoadingState::loading(key), changed_at));
1369                    }
1370                    _ => {
1371                        // Query-related entries (Ok, UserError) shouldn't be here
1372                        // Fall through to locator
1373                    }
1374                }
1375            }
1376        }
1377
1378        // Not in cache or invalid - try locator (with context for dependency tracking)
1379        // First, check for cycles using the query stack (assets share the same stack as queries)
1380        let cycle_detected = QUERY_STACK.with(|stack| {
1381            let stack = stack.borrow();
1382            stack.iter().any(|k| k == &full_cache_key)
1383        });
1384
1385        if cycle_detected {
1386            let path = QUERY_STACK.with(|stack| {
1387                let stack = stack.borrow();
1388                let mut path: Vec<String> =
1389                    stack.iter().map(|k| k.debug_repr().to_string()).collect();
1390                path.push(full_cache_key.debug_repr().to_string());
1391                path
1392            });
1393            return Err(QueryError::Cycle { path });
1394        }
1395
1396        // Push asset to stack before calling locator
1397        QUERY_STACK.with(|stack| {
1398            stack.borrow_mut().push(full_cache_key.clone());
1399        });
1400
1401        let locator_result = self.locators.locate_with_ctx(TypeId::of::<K>(), ctx, &key);
1402
1403        // Pop asset from stack after locator returns
1404        QUERY_STACK.with(|stack| {
1405            stack.borrow_mut().pop();
1406        });
1407
1408        if let Some(result) = locator_result {
1409            match result {
1410                Ok(ErasedLocateResult::Ready {
1411                    value: arc,
1412                    durability: durability_level,
1413                }) => {
1414                    emit_requested(&self.tracer, &key, TracerAssetState::Ready);
1415
1416                    let typed_value: Arc<K::Asset> = match arc.downcast::<K::Asset>() {
1417                        Ok(v) => v,
1418                        Err(_) => {
1419                            return Err(QueryError::MissingDependency {
1420                                description: format!("Asset type mismatch: {:?}", key),
1421                            });
1422                        }
1423                    };
1424
1425                    // Store in whale atomically with early cutoff
1426                    let entry = CachedEntry::AssetReady(typed_value.clone());
1427                    let durability = Durability::new(durability_level.as_u8() as usize)
1428                        .unwrap_or(Durability::volatile());
1429                    let new_value = typed_value.clone();
1430                    let result = self
1431                        .whale
1432                        .update_with_compare(
1433                            full_cache_key,
1434                            Some(entry),
1435                            |old_data, _new_data| {
1436                                let Some(CachedEntry::AssetReady(old_arc)) =
1437                                    old_data.and_then(|d| d.as_ref())
1438                                else {
1439                                    return true;
1440                                };
1441                                let Ok(old_value) = old_arc.clone().downcast::<K::Asset>() else {
1442                                    return true;
1443                                };
1444                                !K::asset_eq(&old_value, &new_value)
1445                            },
1446                            durability,
1447                            vec![],
1448                        )
1449                        .expect("update_with_compare with no dependencies cannot fail");
1450
1451                    return Ok((AssetLoadingState::ready(key, typed_value), result.revision));
1452                }
1453                Ok(ErasedLocateResult::Pending) => {
1454                    emit_requested(&self.tracer, &key, TracerAssetState::Loading);
1455
1456                    // Add to pending list for Pending result
1457                    self.pending.insert::<K>(full_asset_key, key.clone());
1458                    match self
1459                        .whale
1460                        .get_or_insert(full_cache_key, None, Durability::volatile(), vec![])
1461                        .expect("get_or_insert with no dependencies cannot fail")
1462                    {
1463                        GetOrInsertResult::Inserted(node) => {
1464                            return Ok((AssetLoadingState::loading(key), node.changed_at));
1465                        }
1466                        GetOrInsertResult::Existing(node) => {
1467                            let changed_at = node.changed_at;
1468                            match &node.data {
1469                                Some(CachedEntry::AssetReady(arc)) => {
1470                                    match arc.clone().downcast::<K::Asset>() {
1471                                        Ok(value) => {
1472                                            return Ok((
1473                                                AssetLoadingState::ready(key, value),
1474                                                changed_at,
1475                                            ))
1476                                        }
1477                                        Err(_) => {
1478                                            return Ok((
1479                                                AssetLoadingState::loading(key),
1480                                                changed_at,
1481                                            ))
1482                                        }
1483                                    }
1484                                }
1485                                Some(CachedEntry::AssetError(err)) => {
1486                                    return Err(QueryError::UserError(err.clone()));
1487                                }
1488                                _ => return Ok((AssetLoadingState::loading(key), changed_at)),
1489                            }
1490                        }
1491                    }
1492                }
1493                Err(QueryError::UserError(err)) => {
1494                    // Locator returned a user error - cache it as AssetError
1495                    let entry = CachedEntry::AssetError(err.clone());
1496                    let _ = self.whale.register(
1497                        full_cache_key,
1498                        Some(entry),
1499                        Durability::volatile(),
1500                        vec![],
1501                    );
1502                    return Err(QueryError::UserError(err));
1503                }
1504                Err(e) => {
1505                    // Other errors (Cycle, Suspended, etc.) - do NOT cache, propagate directly
1506                    return Err(e);
1507                }
1508            }
1509        }
1510
1511        // No locator registered or locator returned None - mark as pending
1512        emit_requested(&self.tracer, &key, TracerAssetState::Loading);
1513        self.pending
1514            .insert::<K>(full_asset_key.clone(), key.clone());
1515
1516        match self
1517            .whale
1518            .get_or_insert(full_cache_key, None, Durability::volatile(), vec![])
1519            .expect("get_or_insert with no dependencies cannot fail")
1520        {
1521            GetOrInsertResult::Inserted(node) => {
1522                Ok((AssetLoadingState::loading(key), node.changed_at))
1523            }
1524            GetOrInsertResult::Existing(node) => {
1525                let changed_at = node.changed_at;
1526                match &node.data {
1527                    Some(CachedEntry::AssetReady(arc)) => {
1528                        match arc.clone().downcast::<K::Asset>() {
1529                            Ok(value) => Ok((AssetLoadingState::ready(key, value), changed_at)),
1530                            Err(_) => Ok((AssetLoadingState::loading(key), changed_at)),
1531                        }
1532                    }
1533                    Some(CachedEntry::AssetError(err)) => Err(QueryError::UserError(err.clone())),
1534                    _ => Ok((AssetLoadingState::loading(key), changed_at)),
1535                }
1536            }
1537        }
1538    }
1539
1540    /// Internal: Get asset state, checking cache and locator.
1541    fn get_asset_internal<K: AssetKey>(&self, key: K) -> Result<AssetLoadingState<K>, QueryError> {
1542        self.get_asset_with_revision(key).map(|(state, _)| state)
1543    }
1544}
1545
1546impl<T: Tracer> Db for QueryRuntime<T> {
1547    fn query<Q: Query>(&self, query: Q) -> Result<Arc<Q::Output>, QueryError> {
1548        QueryRuntime::query(self, query)
1549    }
1550
1551    fn asset<K: AssetKey>(&self, key: K) -> Result<AssetLoadingState<K>, QueryError> {
1552        self.get_asset_internal(key)
1553    }
1554
1555    fn list_queries<Q: Query>(&self) -> Vec<Q> {
1556        self.query_registry.get_all::<Q>()
1557    }
1558
1559    fn list_asset_keys<K: AssetKey>(&self) -> Vec<K> {
1560        self.asset_key_registry.get_all::<K>()
1561    }
1562}
1563
1564/// Context provided to queries during execution.
1565///
1566/// Use this to access dependencies via `query()`.
1567pub struct QueryContext<'a, T: Tracer = NoopTracer> {
1568    runtime: &'a QueryRuntime<T>,
1569    current_key: FullCacheKey,
1570    parent_query_type: &'static str,
1571    exec_ctx: ExecutionContext,
1572    deps: RefCell<Vec<FullCacheKey>>,
1573    /// Revision at first asset access, used for consistency checking.
1574    /// Assets with changed_at > this value were modified during query execution.
1575    first_access_revision: Cell<Option<RevisionCounter>>,
1576}
1577
1578impl<'a, T: Tracer> QueryContext<'a, T> {
1579    /// Ensure consistency of asset access.
1580    ///
1581    /// On first asset access: records max(current_global, dep_changed_at) as baseline.
1582    /// On subsequent asset accesses: checks dep_changed_at <= baseline.
1583    ///
1584    /// IMPORTANT: current_global must be obtained BEFORE accessing the asset.
1585    #[inline]
1586    fn ensure_consistent(
1587        &self,
1588        current_global: RevisionCounter,
1589        dep_changed_at: RevisionCounter,
1590    ) -> Result<(), QueryError> {
1591        match self.first_access_revision.get() {
1592            None => {
1593                // First asset access: record max(current_global, dep_changed_at)
1594                // Using max ensures we don't get false positives when only one asset is accessed
1595                let first = current_global.max(dep_changed_at);
1596                self.first_access_revision.set(Some(first));
1597                Ok(())
1598            }
1599            Some(first) => {
1600                // Subsequent asset accesses: check consistency
1601                if dep_changed_at > first {
1602                    Err(QueryError::InconsistentAssetResolution)
1603                } else {
1604                    Ok(())
1605                }
1606            }
1607        }
1608    }
1609
1610    /// Query a dependency.
1611    ///
1612    /// The dependency is automatically tracked for invalidation.
1613    ///
1614    /// # Example
1615    ///
1616    /// ```ignore
1617    /// fn query(self, db: &impl Db) -> Result<Self::Output, QueryError> {
1618    ///     let dep_result = db.query(OtherQuery { id: self.id })?;
1619    ///     Ok(process(&dep_result))
1620    /// }
1621    /// ```
1622    pub fn query<Q: Query>(&self, query: Q) -> Result<Arc<Q::Output>, QueryError> {
1623        let key = query.cache_key();
1624        let full_key = FullCacheKey::new::<Q, _>(&key);
1625
1626        // Emit dependency registered event
1627        self.runtime.tracer.on_dependency_registered(
1628            self.exec_ctx.span_id(),
1629            TracerQueryKey::new(self.parent_query_type, self.current_key.debug_repr()),
1630            TracerQueryKey::new(std::any::type_name::<Q>(), full_key.debug_repr()),
1631        );
1632
1633        // Record this as a dependency
1634        self.deps.borrow_mut().push(full_key.clone());
1635
1636        // Execute the query
1637        self.runtime.query(query)
1638    }
1639
1640    /// Access an asset, tracking it as a dependency.
1641    ///
1642    /// Returns `AssetLoadingState<K>`:
1643    /// - `is_loading()` if the asset is still being loaded
1644    /// - `is_ready()` if the asset is available
1645    ///
1646    /// Use `.suspend()?` to convert to `Result<Arc<K::Asset>, QueryError>`,
1647    /// which returns `Err(QueryError::Suspend { asset })` if still loading.
1648    ///
1649    /// # Example
1650    ///
1651    /// ```ignore
1652    /// #[query]
1653    /// fn process_file(db: &impl Db, path: FilePath) -> Result<Output, QueryError> {
1654    ///     let content = db.asset(path)?.suspend()?;
1655    ///     // Process content...
1656    ///     Ok(output)
1657    /// }
1658    /// ```
1659    ///
1660    /// # Errors
1661    ///
1662    /// Returns `Err(QueryError::MissingDependency)` if the asset was not found.
1663    pub fn asset<K: AssetKey>(&self, key: K) -> Result<AssetLoadingState<K>, QueryError> {
1664        let full_asset_key = FullAssetKey::new(&key);
1665        let full_cache_key = FullCacheKey::from_asset_key(&full_asset_key);
1666
1667        // 1. Get current_global FIRST (before accessing the asset)
1668        let current_global = self
1669            .runtime
1670            .whale
1671            .current_revision()
1672            .get(Durability::volatile());
1673
1674        // 2. Emit asset dependency registered event
1675        self.runtime.tracer.on_asset_dependency_registered(
1676            self.exec_ctx.span_id(),
1677            TracerQueryKey::new(self.parent_query_type, self.current_key.debug_repr()),
1678            TracerAssetKey::new(std::any::type_name::<K>(), format!("{:?}", key)),
1679        );
1680
1681        // 3. Record dependency on this asset
1682        self.deps.borrow_mut().push(full_cache_key);
1683
1684        // 4. Get asset AND changed_at from the same whale access (atomic)
1685        // Use the context-aware version to enable dependency tracking in locators
1686        let (result, dep_changed_at) = match self.runtime.get_asset_with_revision_ctx(key, self) {
1687            Ok((state, rev)) => (Ok(state), rev),
1688            Err(e) => return Err(e),
1689        };
1690
1691        // 5. Check consistency - detects if resolve_asset was called during query execution
1692        self.ensure_consistent(current_global, dep_changed_at)?;
1693
1694        // Emit missing dependency event on error
1695        if let Err(QueryError::MissingDependency { ref description }) = result {
1696            self.runtime.tracer.on_missing_dependency(
1697                TracerQueryKey::new(self.parent_query_type, self.current_key.debug_repr()),
1698                description.clone(),
1699            );
1700        }
1701
1702        result
1703    }
1704
1705    /// List all query instances of type Q that have been registered.
1706    ///
1707    /// This method establishes a dependency on the "set" of queries of type Q.
1708    /// The calling query will be invalidated when:
1709    /// - A new query of type Q is first executed (added to set)
1710    ///
1711    /// The calling query will NOT be invalidated when:
1712    /// - An individual query of type Q has its value change
1713    ///
1714    /// # Example
1715    ///
1716    /// ```ignore
1717    /// #[query]
1718    /// fn all_results(db: &impl Db) -> Result<Vec<i32>, QueryError> {
1719    ///     let queries = db.list_queries::<MyQuery>();
1720    ///     let mut results = Vec::new();
1721    ///     for q in queries {
1722    ///         results.push(*db.query(q)?);
1723    ///     }
1724    ///     Ok(results)
1725    /// }
1726    /// ```
1727    pub fn list_queries<Q: Query>(&self) -> Vec<Q> {
1728        // Record dependency on the sentinel (set-level dependency)
1729        let sentinel = FullCacheKey::query_set_sentinel::<Q>();
1730
1731        self.runtime.tracer.on_dependency_registered(
1732            self.exec_ctx.span_id(),
1733            TracerQueryKey::new(self.parent_query_type, self.current_key.debug_repr()),
1734            TracerQueryKey::new("QuerySet", sentinel.debug_repr()),
1735        );
1736
1737        // Ensure sentinel exists in whale (for dependency tracking)
1738        if self.runtime.whale.get(&sentinel).is_none() {
1739            let _ =
1740                self.runtime
1741                    .whale
1742                    .register(sentinel.clone(), None, Durability::volatile(), vec![]);
1743        }
1744
1745        self.deps.borrow_mut().push(sentinel);
1746
1747        // Return all registered queries
1748        self.runtime.query_registry.get_all::<Q>()
1749    }
1750
1751    /// List all asset keys of type K that have been registered.
1752    ///
1753    /// This method establishes a dependency on the "set" of asset keys of type K.
1754    /// The calling query will be invalidated when:
1755    /// - A new asset of type K is resolved for the first time (added to set)
1756    /// - An asset of type K is removed via remove_asset
1757    ///
1758    /// The calling query will NOT be invalidated when:
1759    /// - An individual asset's value changes (use `db.asset()` for that)
1760    ///
1761    /// # Example
1762    ///
1763    /// ```ignore
1764    /// #[query]
1765    /// fn all_configs(db: &impl Db) -> Result<Vec<String>, QueryError> {
1766    ///     let keys = db.list_asset_keys::<ConfigFile>();
1767    ///     let mut contents = Vec::new();
1768    ///     for key in keys {
1769    ///         let content = db.asset(&key)?.suspend()?;
1770    ///         contents.push((*content).clone());
1771    ///     }
1772    ///     Ok(contents)
1773    /// }
1774    /// ```
1775    pub fn list_asset_keys<K: AssetKey>(&self) -> Vec<K> {
1776        // Record dependency on the sentinel (set-level dependency)
1777        let sentinel = FullCacheKey::asset_key_set_sentinel::<K>();
1778
1779        self.runtime.tracer.on_asset_dependency_registered(
1780            self.exec_ctx.span_id(),
1781            TracerQueryKey::new(self.parent_query_type, self.current_key.debug_repr()),
1782            TracerAssetKey::new("AssetKeySet", sentinel.debug_repr()),
1783        );
1784
1785        // Ensure sentinel exists in whale (for dependency tracking)
1786        if self.runtime.whale.get(&sentinel).is_none() {
1787            let _ =
1788                self.runtime
1789                    .whale
1790                    .register(sentinel.clone(), None, Durability::volatile(), vec![]);
1791        }
1792
1793        self.deps.borrow_mut().push(sentinel);
1794
1795        // Return all registered asset keys
1796        self.runtime.asset_key_registry.get_all::<K>()
1797    }
1798}
1799
1800impl<'a, T: Tracer> Db for QueryContext<'a, T> {
1801    fn query<Q: Query>(&self, query: Q) -> Result<Arc<Q::Output>, QueryError> {
1802        QueryContext::query(self, query)
1803    }
1804
1805    fn asset<K: AssetKey>(&self, key: K) -> Result<AssetLoadingState<K>, QueryError> {
1806        QueryContext::asset(self, key)
1807    }
1808
1809    fn list_queries<Q: Query>(&self) -> Vec<Q> {
1810        QueryContext::list_queries(self)
1811    }
1812
1813    fn list_asset_keys<K: AssetKey>(&self) -> Vec<K> {
1814        QueryContext::list_asset_keys(self)
1815    }
1816}
1817
1818#[cfg(test)]
1819mod tests {
1820    use super::*;
1821
1822    #[test]
1823    fn test_simple_query() {
1824        #[derive(Clone)]
1825        struct Add {
1826            a: i32,
1827            b: i32,
1828        }
1829
1830        impl Query for Add {
1831            type CacheKey = (i32, i32);
1832            type Output = i32;
1833
1834            fn cache_key(&self) -> Self::CacheKey {
1835                (self.a, self.b)
1836            }
1837
1838            fn query(self, _db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
1839                Ok(Arc::new(self.a + self.b))
1840            }
1841
1842            fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
1843                old == new
1844            }
1845        }
1846
1847        let runtime = QueryRuntime::new();
1848
1849        let result = runtime.query(Add { a: 1, b: 2 }).unwrap();
1850        assert_eq!(*result, 3);
1851
1852        // Second query should be cached
1853        let result2 = runtime.query(Add { a: 1, b: 2 }).unwrap();
1854        assert_eq!(*result2, 3);
1855    }
1856
1857    #[test]
1858    fn test_dependent_queries() {
1859        #[derive(Clone)]
1860        struct Base {
1861            value: i32,
1862        }
1863
1864        impl Query for Base {
1865            type CacheKey = i32;
1866            type Output = i32;
1867
1868            fn cache_key(&self) -> Self::CacheKey {
1869                self.value
1870            }
1871
1872            fn query(self, _db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
1873                Ok(Arc::new(self.value * 2))
1874            }
1875
1876            fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
1877                old == new
1878            }
1879        }
1880
1881        #[derive(Clone)]
1882        struct Derived {
1883            base_value: i32,
1884        }
1885
1886        impl Query for Derived {
1887            type CacheKey = i32;
1888            type Output = i32;
1889
1890            fn cache_key(&self) -> Self::CacheKey {
1891                self.base_value
1892            }
1893
1894            fn query(self, db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
1895                let base = db.query(Base {
1896                    value: self.base_value,
1897                })?;
1898                Ok(Arc::new(*base + 10))
1899            }
1900
1901            fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
1902                old == new
1903            }
1904        }
1905
1906        let runtime = QueryRuntime::new();
1907
1908        let result = runtime.query(Derived { base_value: 5 }).unwrap();
1909        assert_eq!(*result, 20); // 5 * 2 + 10
1910    }
1911
1912    #[test]
1913    fn test_cycle_detection() {
1914        #[derive(Clone)]
1915        struct CycleA {
1916            id: i32,
1917        }
1918
1919        #[derive(Clone)]
1920        struct CycleB {
1921            id: i32,
1922        }
1923
1924        impl Query for CycleA {
1925            type CacheKey = i32;
1926            type Output = i32;
1927
1928            fn cache_key(&self) -> Self::CacheKey {
1929                self.id
1930            }
1931
1932            fn query(self, db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
1933                let b = db.query(CycleB { id: self.id })?;
1934                Ok(Arc::new(*b + 1))
1935            }
1936
1937            fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
1938                old == new
1939            }
1940        }
1941
1942        impl Query for CycleB {
1943            type CacheKey = i32;
1944            type Output = i32;
1945
1946            fn cache_key(&self) -> Self::CacheKey {
1947                self.id
1948            }
1949
1950            fn query(self, db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
1951                let a = db.query(CycleA { id: self.id })?;
1952                Ok(Arc::new(*a + 1))
1953            }
1954
1955            fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
1956                old == new
1957            }
1958        }
1959
1960        let runtime = QueryRuntime::new();
1961
1962        let result = runtime.query(CycleA { id: 1 });
1963        assert!(matches!(result, Err(QueryError::Cycle { .. })));
1964    }
1965
1966    #[test]
1967    fn test_fallible_query() {
1968        #[derive(Clone)]
1969        struct ParseInt {
1970            input: String,
1971        }
1972
1973        impl Query for ParseInt {
1974            type CacheKey = String;
1975            type Output = Result<i32, std::num::ParseIntError>;
1976
1977            fn cache_key(&self) -> Self::CacheKey {
1978                self.input.clone()
1979            }
1980
1981            fn query(self, _db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
1982                Ok(Arc::new(self.input.parse()))
1983            }
1984
1985            fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
1986                old == new
1987            }
1988        }
1989
1990        let runtime = QueryRuntime::new();
1991
1992        // Valid parse
1993        let result = runtime
1994            .query(ParseInt {
1995                input: "42".to_string(),
1996            })
1997            .unwrap();
1998        assert_eq!(*result, Ok(42));
1999
2000        // Invalid parse - system succeeds, user error in output
2001        let result = runtime
2002            .query(ParseInt {
2003                input: "not_a_number".to_string(),
2004            })
2005            .unwrap();
2006        assert!(result.is_err());
2007    }
2008
2009    // Macro tests
2010    mod macro_tests {
2011        use super::*;
2012        use crate::query;
2013
2014        #[query]
2015        fn add(db: &impl Db, a: i32, b: i32) -> Result<i32, QueryError> {
2016            let _ = db; // silence unused warning
2017            Ok(a + b)
2018        }
2019
2020        #[test]
2021        fn test_macro_basic() {
2022            let runtime = QueryRuntime::new();
2023            let result = runtime.query(Add::new(1, 2)).unwrap();
2024            assert_eq!(*result, 3);
2025        }
2026
2027        #[query]
2028        fn simple_double(db: &impl Db, x: i32) -> Result<i32, QueryError> {
2029            let _ = db;
2030            Ok(x * 2)
2031        }
2032
2033        #[test]
2034        fn test_macro_simple() {
2035            let runtime = QueryRuntime::new();
2036            let result = runtime.query(SimpleDouble::new(5)).unwrap();
2037            assert_eq!(*result, 10);
2038        }
2039
2040        #[query(keys(id))]
2041        fn with_key_selection(
2042            db: &impl Db,
2043            id: u32,
2044            include_extra: bool,
2045        ) -> Result<String, QueryError> {
2046            let _ = db;
2047            Ok(format!("id={}, extra={}", id, include_extra))
2048        }
2049
2050        #[test]
2051        fn test_macro_key_selection() {
2052            let runtime = QueryRuntime::new();
2053
2054            // Same id, different include_extra - should return cached
2055            let r1 = runtime.query(WithKeySelection::new(1, true)).unwrap();
2056            let r2 = runtime.query(WithKeySelection::new(1, false)).unwrap();
2057
2058            // Both should have same value because only `id` is the key
2059            assert_eq!(*r1, "id=1, extra=true");
2060            assert_eq!(*r2, "id=1, extra=true"); // Cached!
2061        }
2062
2063        #[query]
2064        fn dependent(db: &impl Db, a: i32, b: i32) -> Result<i32, QueryError> {
2065            let sum = db.query(Add::new(a, b))?;
2066            Ok(*sum * 2)
2067        }
2068
2069        #[test]
2070        fn test_macro_dependencies() {
2071            let runtime = QueryRuntime::new();
2072            let result = runtime.query(Dependent::new(3, 4)).unwrap();
2073            assert_eq!(*result, 14); // (3 + 4) * 2
2074        }
2075
2076        #[query(output_eq)]
2077        fn with_output_eq(db: &impl Db, x: i32) -> Result<i32, QueryError> {
2078            let _ = db;
2079            Ok(x * 2)
2080        }
2081
2082        #[test]
2083        fn test_macro_output_eq() {
2084            let runtime = QueryRuntime::new();
2085            let result = runtime.query(WithOutputEq::new(5)).unwrap();
2086            assert_eq!(*result, 10);
2087        }
2088
2089        #[query(name = "CustomName")]
2090        fn original_name(db: &impl Db, x: i32) -> Result<i32, QueryError> {
2091            let _ = db;
2092            Ok(x)
2093        }
2094
2095        #[test]
2096        fn test_macro_custom_name() {
2097            let runtime = QueryRuntime::new();
2098            let result = runtime.query(CustomName::new(42)).unwrap();
2099            assert_eq!(*result, 42);
2100        }
2101
2102        // Test that attribute macros like #[tracing::instrument] are preserved
2103        // We use #[allow(unused_variables)] and #[inline] as test attributes since
2104        // they don't require external dependencies.
2105        #[allow(unused_variables)]
2106        #[inline]
2107        #[query]
2108        fn with_attributes(db: &impl Db, x: i32) -> Result<i32, QueryError> {
2109            // This would warn without #[allow(unused_variables)] on the generated method
2110            let unused_var = 42;
2111            Ok(x * 2)
2112        }
2113
2114        #[test]
2115        fn test_macro_preserves_attributes() {
2116            let runtime = QueryRuntime::new();
2117            // If attributes weren't preserved, this might warn about unused_var
2118            let result = runtime.query(WithAttributes::new(5)).unwrap();
2119            assert_eq!(*result, 10);
2120        }
2121    }
2122
2123    // Tests for poll() and changed_at()
2124    mod poll_tests {
2125        use super::*;
2126
2127        #[derive(Clone)]
2128        struct Counter {
2129            id: i32,
2130        }
2131
2132        impl Query for Counter {
2133            type CacheKey = i32;
2134            type Output = i32;
2135
2136            fn cache_key(&self) -> Self::CacheKey {
2137                self.id
2138            }
2139
2140            fn query(self, _db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
2141                Ok(Arc::new(self.id * 10))
2142            }
2143
2144            fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
2145                old == new
2146            }
2147        }
2148
2149        #[test]
2150        fn test_poll_returns_value_and_revision() {
2151            let runtime = QueryRuntime::new();
2152
2153            let result = runtime.poll(Counter { id: 1 }).unwrap();
2154
2155            // Value should be correct - access through Result and Arc
2156            assert_eq!(**result.value.as_ref().unwrap(), 10);
2157
2158            // Revision should be non-zero after first execution
2159            assert!(result.revision > 0);
2160        }
2161
2162        #[test]
2163        fn test_poll_revision_stable_on_cache_hit() {
2164            let runtime = QueryRuntime::new();
2165
2166            // First poll
2167            let result1 = runtime.poll(Counter { id: 1 }).unwrap();
2168            let rev1 = result1.revision;
2169
2170            // Second poll (cache hit)
2171            let result2 = runtime.poll(Counter { id: 1 }).unwrap();
2172            let rev2 = result2.revision;
2173
2174            // Revision should be the same (no change)
2175            assert_eq!(rev1, rev2);
2176        }
2177
2178        #[test]
2179        fn test_poll_revision_changes_on_invalidate() {
2180            let runtime = QueryRuntime::new();
2181
2182            // First poll
2183            let result1 = runtime.poll(Counter { id: 1 }).unwrap();
2184            let rev1 = result1.revision;
2185
2186            // Invalidate and poll again
2187            runtime.invalidate::<Counter>(&1);
2188            let result2 = runtime.poll(Counter { id: 1 }).unwrap();
2189            let rev2 = result2.revision;
2190
2191            // Revision should increase (value was recomputed)
2192            // Note: Since output_eq returns true (same value), this might not change
2193            // depending on early cutoff behavior. Let's verify the value is still correct.
2194            assert_eq!(**result2.value.as_ref().unwrap(), 10);
2195
2196            // With early cutoff, revision might stay the same if value didn't change
2197            // This is expected behavior
2198            assert!(rev2 >= rev1);
2199        }
2200
2201        #[test]
2202        fn test_changed_at_returns_none_for_unexecuted_query() {
2203            let runtime = QueryRuntime::new();
2204
2205            // Query has never been executed
2206            let rev = runtime.changed_at::<Counter>(&1);
2207            assert!(rev.is_none());
2208        }
2209
2210        #[test]
2211        fn test_changed_at_returns_revision_after_execution() {
2212            let runtime = QueryRuntime::new();
2213
2214            // Execute the query
2215            let _ = runtime.query(Counter { id: 1 }).unwrap();
2216
2217            // Now changed_at should return Some
2218            let rev = runtime.changed_at::<Counter>(&1);
2219            assert!(rev.is_some());
2220            assert!(rev.unwrap() > 0);
2221        }
2222
2223        #[test]
2224        fn test_changed_at_matches_poll_revision() {
2225            let runtime = QueryRuntime::new();
2226
2227            // Poll the query
2228            let result = runtime.poll(Counter { id: 1 }).unwrap();
2229
2230            // changed_at should match the revision from poll
2231            let rev = runtime.changed_at::<Counter>(&1);
2232            assert_eq!(rev, Some(result.revision));
2233        }
2234
2235        #[test]
2236        fn test_poll_value_access() {
2237            let runtime = QueryRuntime::new();
2238
2239            let result = runtime.poll(Counter { id: 5 }).unwrap();
2240
2241            // Access through Result and Arc
2242            let value: &i32 = result.value.as_ref().unwrap();
2243            assert_eq!(*value, 50);
2244
2245            // Access Arc directly via field after unwrapping Result
2246            let arc: &Arc<i32> = result.value.as_ref().unwrap();
2247            assert_eq!(**arc, 50);
2248        }
2249
2250        #[test]
2251        fn test_subscription_pattern() {
2252            let runtime = QueryRuntime::new();
2253
2254            // Simulate subscription pattern
2255            let mut last_revision: RevisionCounter = 0;
2256            let mut notifications = 0;
2257
2258            // First poll - should notify (new value)
2259            let result = runtime.poll(Counter { id: 1 }).unwrap();
2260            if result.revision > last_revision {
2261                notifications += 1;
2262                last_revision = result.revision;
2263            }
2264
2265            // Second poll - should NOT notify (no change)
2266            let result = runtime.poll(Counter { id: 1 }).unwrap();
2267            if result.revision > last_revision {
2268                notifications += 1;
2269                last_revision = result.revision;
2270            }
2271
2272            // Third poll - should NOT notify (no change)
2273            let result = runtime.poll(Counter { id: 1 }).unwrap();
2274            if result.revision > last_revision {
2275                notifications += 1;
2276                #[allow(unused_assignments)]
2277                {
2278                    last_revision = result.revision;
2279                }
2280            }
2281
2282            // Only the first poll should have triggered a notification
2283            assert_eq!(notifications, 1);
2284        }
2285    }
2286
2287    // Tests for GC APIs
2288    mod gc_tests {
2289        use super::*;
2290        use std::collections::HashSet;
2291        use std::sync::atomic::{AtomicUsize, Ordering};
2292        use std::sync::Mutex;
2293
2294        #[derive(Clone)]
2295        struct Leaf {
2296            id: i32,
2297        }
2298
2299        impl Query for Leaf {
2300            type CacheKey = i32;
2301            type Output = i32;
2302
2303            fn cache_key(&self) -> Self::CacheKey {
2304                self.id
2305            }
2306
2307            fn query(self, _db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
2308                Ok(Arc::new(self.id * 10))
2309            }
2310
2311            fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
2312                old == new
2313            }
2314        }
2315
2316        #[derive(Clone)]
2317        struct Parent {
2318            child_id: i32,
2319        }
2320
2321        impl Query for Parent {
2322            type CacheKey = i32;
2323            type Output = i32;
2324
2325            fn cache_key(&self) -> Self::CacheKey {
2326                self.child_id
2327            }
2328
2329            fn query(self, db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
2330                let child = db.query(Leaf { id: self.child_id })?;
2331                Ok(Arc::new(*child + 1))
2332            }
2333
2334            fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
2335                old == new
2336            }
2337        }
2338
2339        #[test]
2340        fn test_query_keys_returns_all_cached_queries() {
2341            let runtime = QueryRuntime::new();
2342
2343            // Execute some queries
2344            let _ = runtime.query(Leaf { id: 1 }).unwrap();
2345            let _ = runtime.query(Leaf { id: 2 }).unwrap();
2346            let _ = runtime.query(Leaf { id: 3 }).unwrap();
2347
2348            // Get all keys
2349            let keys = runtime.query_keys();
2350
2351            // Should have at least 3 keys (might have more due to sentinels)
2352            assert!(keys.len() >= 3);
2353        }
2354
2355        #[test]
2356        fn test_remove_removes_query() {
2357            let runtime = QueryRuntime::new();
2358
2359            // Execute a query
2360            let _ = runtime.query(Leaf { id: 1 }).unwrap();
2361
2362            // Get the key
2363            let full_key = FullCacheKey::new::<Leaf, _>(&1);
2364
2365            // Query should exist
2366            assert!(runtime.changed_at::<Leaf>(&1).is_some());
2367
2368            // Remove it
2369            assert!(runtime.remove(&full_key));
2370
2371            // Query should no longer exist
2372            assert!(runtime.changed_at::<Leaf>(&1).is_none());
2373        }
2374
2375        #[test]
2376        fn test_remove_if_unused_removes_leaf_query() {
2377            let runtime = QueryRuntime::new();
2378
2379            // Execute a leaf query (no dependents)
2380            let _ = runtime.query(Leaf { id: 1 }).unwrap();
2381
2382            // Should be removable since no other query depends on it
2383            assert!(runtime.remove_query_if_unused::<Leaf>(&1));
2384
2385            // Query should no longer exist
2386            assert!(runtime.changed_at::<Leaf>(&1).is_none());
2387        }
2388
2389        #[test]
2390        fn test_remove_if_unused_does_not_remove_query_with_dependents() {
2391            let runtime = QueryRuntime::new();
2392
2393            // Execute parent query (which depends on Leaf)
2394            let _ = runtime.query(Parent { child_id: 1 }).unwrap();
2395
2396            // Leaf query should not be removable since Parent depends on it
2397            assert!(!runtime.remove_query_if_unused::<Leaf>(&1));
2398
2399            // Leaf query should still exist
2400            assert!(runtime.changed_at::<Leaf>(&1).is_some());
2401
2402            // But Parent should be removable (no dependents)
2403            assert!(runtime.remove_query_if_unused::<Parent>(&1));
2404        }
2405
2406        #[test]
2407        fn test_remove_if_unused_with_full_cache_key() {
2408            let runtime = QueryRuntime::new();
2409
2410            // Execute a leaf query
2411            let _ = runtime.query(Leaf { id: 1 }).unwrap();
2412
2413            let full_key = FullCacheKey::new::<Leaf, _>(&1);
2414
2415            // Should be removable via FullCacheKey
2416            assert!(runtime.remove_if_unused(&full_key));
2417
2418            // Query should no longer exist
2419            assert!(runtime.changed_at::<Leaf>(&1).is_none());
2420        }
2421
2422        // Test tracer receives on_query_key calls
2423        struct GcTracker {
2424            accessed_keys: Mutex<HashSet<String>>,
2425            access_count: AtomicUsize,
2426        }
2427
2428        impl GcTracker {
2429            fn new() -> Self {
2430                Self {
2431                    accessed_keys: Mutex::new(HashSet::new()),
2432                    access_count: AtomicUsize::new(0),
2433                }
2434            }
2435        }
2436
2437        impl Tracer for GcTracker {
2438            fn new_span_id(&self) -> SpanId {
2439                SpanId(1)
2440            }
2441
2442            fn on_query_key(&self, full_key: &FullCacheKey) {
2443                self.accessed_keys
2444                    .lock()
2445                    .unwrap()
2446                    .insert(full_key.debug_repr().to_string());
2447                self.access_count.fetch_add(1, Ordering::Relaxed);
2448            }
2449        }
2450
2451        #[test]
2452        fn test_tracer_receives_on_query_key() {
2453            let tracker = GcTracker::new();
2454            let runtime = QueryRuntime::with_tracer(tracker);
2455
2456            // Execute some queries
2457            let _ = runtime.query(Leaf { id: 1 }).unwrap();
2458            let _ = runtime.query(Leaf { id: 2 }).unwrap();
2459
2460            // Tracer should have received on_query_key calls
2461            let count = runtime.tracer().access_count.load(Ordering::Relaxed);
2462            assert_eq!(count, 2);
2463
2464            // Check that the keys were recorded
2465            let keys = runtime.tracer().accessed_keys.lock().unwrap();
2466            assert!(keys.iter().any(|k| k.contains("Leaf")));
2467        }
2468
2469        #[test]
2470        fn test_tracer_receives_on_query_key_for_cache_hits() {
2471            let tracker = GcTracker::new();
2472            let runtime = QueryRuntime::with_tracer(tracker);
2473
2474            // Execute query twice (second is cache hit)
2475            let _ = runtime.query(Leaf { id: 1 }).unwrap();
2476            let _ = runtime.query(Leaf { id: 1 }).unwrap();
2477
2478            // Tracer should have received on_query_key for both calls
2479            let count = runtime.tracer().access_count.load(Ordering::Relaxed);
2480            assert_eq!(count, 2);
2481        }
2482
2483        #[test]
2484        fn test_gc_workflow() {
2485            let tracker = GcTracker::new();
2486            let runtime = QueryRuntime::with_tracer(tracker);
2487
2488            // Execute some queries
2489            let _ = runtime.query(Leaf { id: 1 }).unwrap();
2490            let _ = runtime.query(Leaf { id: 2 }).unwrap();
2491            let _ = runtime.query(Leaf { id: 3 }).unwrap();
2492
2493            // Simulate GC: remove all queries that are not in use
2494            let mut removed = 0;
2495            for key in runtime.query_keys() {
2496                if runtime.remove_if_unused(&key) {
2497                    removed += 1;
2498                }
2499            }
2500
2501            // All leaf queries should be removable
2502            assert!(removed >= 3);
2503
2504            // Queries should no longer exist
2505            assert!(runtime.changed_at::<Leaf>(&1).is_none());
2506            assert!(runtime.changed_at::<Leaf>(&2).is_none());
2507            assert!(runtime.changed_at::<Leaf>(&3).is_none());
2508        }
2509    }
2510}