query_flow/
runtime.rs

1//! Query runtime and context.
2
3use std::any::{Any, TypeId};
4use std::cell::RefCell;
5use std::ops::Deref;
6use std::rc::Rc;
7use std::sync::Arc;
8
9use whale::{Durability, GetOrInsertResult, RevisionCounter, Runtime as WhaleRuntime};
10
11use crate::asset::{AssetKey, AssetLocator, DurabilityLevel, FullAssetKey, PendingAsset};
12use crate::db::Db;
13use crate::key::FullCacheKey;
14use crate::loading::AssetLoadingState;
15use crate::query::Query;
16use crate::storage::{
17    AssetKeyRegistry, CachedEntry, CachedValue, ErasedLocateResult, LocatorStorage, PendingStorage,
18    QueryRegistry, VerifierStorage,
19};
20use crate::tracer::{
21    ExecutionResult, InvalidationReason, NoopTracer, SpanId, Tracer, TracerAssetKey,
22    TracerAssetState, TracerQueryKey,
23};
24use crate::QueryError;
25
26/// Function type for comparing user errors during early cutoff.
27///
28/// Used by `QueryRuntimeBuilder::error_comparator` to customize how
29/// `QueryError::UserError` values are compared for caching purposes.
30pub type ErrorComparator = fn(&anyhow::Error, &anyhow::Error) -> bool;
31
32/// Number of durability levels (matches whale's default).
33const DURABILITY_LEVELS: usize = 4;
34
35// Thread-local query execution stack for cycle detection.
36thread_local! {
37    static QUERY_STACK: RefCell<Vec<FullCacheKey>> = const { RefCell::new(Vec::new()) };
38
39    /// Current consistency tracker for leaf asset checks.
40    /// Set during query execution, used by all nested locator calls.
41    /// Uses Rc since thread-local is single-threaded.
42    static CONSISTENCY_TRACKER: RefCell<Option<Rc<ConsistencyTracker>>> = const { RefCell::new(None) };
43}
44
45/// Check a leaf asset against the current consistency tracker (if any).
46/// Returns Ok if no tracker is set or if the check passes.
47fn check_leaf_asset_consistency(dep_changed_at: RevisionCounter) -> Result<(), QueryError> {
48    CONSISTENCY_TRACKER.with(|tracker| {
49        if let Some(ref t) = *tracker.borrow() {
50            t.check_leaf_asset(dep_changed_at)
51        } else {
52            Ok(())
53        }
54    })
55}
56
57/// RAII guard that sets the consistency tracker for the current thread.
58struct ConsistencyTrackerGuard {
59    previous: Option<Rc<ConsistencyTracker>>,
60}
61
62impl ConsistencyTrackerGuard {
63    fn new(tracker: Rc<ConsistencyTracker>) -> Self {
64        let previous = CONSISTENCY_TRACKER.with(|t| t.borrow_mut().replace(tracker));
65        Self { previous }
66    }
67}
68
69impl Drop for ConsistencyTrackerGuard {
70    fn drop(&mut self) {
71        CONSISTENCY_TRACKER.with(|t| {
72            *t.borrow_mut() = self.previous.take();
73        });
74    }
75}
76
77/// Check for cycles in the query stack and return error if detected.
78fn check_cycle(key: &FullCacheKey) -> Result<(), QueryError> {
79    let cycle_detected = QUERY_STACK.with(|stack| stack.borrow().iter().any(|k| k == key));
80    if cycle_detected {
81        let path = QUERY_STACK.with(|stack| {
82            let stack = stack.borrow();
83            let mut path: Vec<String> = stack.iter().map(|k| k.debug_repr().to_string()).collect();
84            path.push(key.debug_repr().to_string());
85            path
86        });
87        return Err(QueryError::Cycle { path });
88    }
89    Ok(())
90}
91
92/// RAII guard for pushing/popping from query stack.
93struct StackGuard;
94
95impl StackGuard {
96    fn push(key: FullCacheKey) -> Self {
97        QUERY_STACK.with(|stack| stack.borrow_mut().push(key));
98        StackGuard
99    }
100}
101
102impl Drop for StackGuard {
103    fn drop(&mut self) {
104        QUERY_STACK.with(|stack| {
105            stack.borrow_mut().pop();
106        });
107    }
108}
109
110/// Execution context passed through query execution.
111///
112/// Contains a span ID for tracing correlation.
113#[derive(Clone, Copy)]
114pub struct ExecutionContext {
115    span_id: SpanId,
116}
117
118impl ExecutionContext {
119    /// Create a new execution context with the given span ID.
120    #[inline]
121    pub fn new(span_id: SpanId) -> Self {
122        Self { span_id }
123    }
124
125    /// Get the span ID for this execution context.
126    #[inline]
127    pub fn span_id(&self) -> SpanId {
128        self.span_id
129    }
130}
131
132/// Result of polling a query, containing the value and its revision.
133///
134/// This is returned by [`QueryRuntime::poll`] and provides both the query result
135/// and its change revision, enabling efficient change detection for subscription
136/// patterns.
137///
138/// # Example
139///
140/// ```ignore
141/// let result = runtime.poll(MyQuery::new())?;
142///
143/// // Access the value via Deref
144/// println!("{:?}", *result);
145///
146/// // Check if changed since last poll
147/// if result.revision > last_known_revision {
148///     notify_subscribers(&result.value);
149///     last_known_revision = result.revision;
150/// }
151/// ```
152#[derive(Debug, Clone)]
153pub struct Polled<T> {
154    /// The query result value.
155    pub value: T,
156    /// The revision at which this value was last changed.
157    ///
158    /// Compare this with a previously stored revision to detect changes.
159    pub revision: RevisionCounter,
160}
161
162impl<T: Deref> Deref for Polled<T> {
163    type Target = T::Target;
164
165    fn deref(&self) -> &Self::Target {
166        &self.value
167    }
168}
169
170/// The query runtime manages query execution, caching, and dependency tracking.
171///
172/// This is cheap to clone - all data is behind `Arc`.
173///
174/// # Type Parameter
175///
176/// - `T: Tracer` - The tracer type for observability. Use `NoopTracer` (default)
177///   for zero-cost when tracing is not needed.
178///
179/// # Example
180///
181/// ```ignore
182/// // Without tracing (default)
183/// let runtime = QueryRuntime::new();
184///
185/// // With tracing
186/// let tracer = MyTracer::new();
187/// let runtime = QueryRuntime::with_tracer(tracer);
188///
189/// // Sync query execution
190/// let result = runtime.query(MyQuery { ... })?;
191/// ```
192pub struct QueryRuntime<T: Tracer = NoopTracer> {
193    /// Whale runtime for dependency tracking and cache storage.
194    /// Query outputs and asset values are stored in Node.data as Option<CachedEntry>.
195    whale: WhaleRuntime<FullCacheKey, Option<CachedEntry>, DURABILITY_LEVELS>,
196    /// Registered asset locators
197    locators: Arc<LocatorStorage<T>>,
198    /// Pending asset requests
199    pending: Arc<PendingStorage>,
200    /// Registry for tracking query instances (for list_queries)
201    query_registry: Arc<QueryRegistry>,
202    /// Registry for tracking asset keys (for list_asset_keys)
203    asset_key_registry: Arc<AssetKeyRegistry>,
204    /// Verifiers for re-executing queries (for verify-then-decide pattern)
205    verifiers: Arc<VerifierStorage>,
206    /// Comparator for user errors during early cutoff
207    error_comparator: ErrorComparator,
208    /// Tracer for observability
209    tracer: Arc<T>,
210}
211
212impl Default for QueryRuntime<NoopTracer> {
213    fn default() -> Self {
214        Self::new()
215    }
216}
217
218impl<T: Tracer> Clone for QueryRuntime<T> {
219    fn clone(&self) -> Self {
220        Self {
221            whale: self.whale.clone(),
222            locators: self.locators.clone(),
223            pending: self.pending.clone(),
224            query_registry: self.query_registry.clone(),
225            asset_key_registry: self.asset_key_registry.clone(),
226            verifiers: self.verifiers.clone(),
227            error_comparator: self.error_comparator,
228            tracer: self.tracer.clone(),
229        }
230    }
231}
232
233/// Default error comparator that treats all errors as different.
234///
235/// This is conservative - it always triggers recomputation when an error occurs.
236fn default_error_comparator(_a: &anyhow::Error, _b: &anyhow::Error) -> bool {
237    false
238}
239
240impl<T: Tracer> QueryRuntime<T> {
241    /// Get cached output along with its revision (single atomic access).
242    fn get_cached_with_revision<Q: Query>(
243        &self,
244        key: &FullCacheKey,
245    ) -> Option<(CachedValue<Arc<Q::Output>>, RevisionCounter)> {
246        let node = self.whale.get(key)?;
247        let revision = node.changed_at;
248        let entry = node.data.as_ref()?;
249        let cached = entry.to_cached_value::<Q::Output>()?;
250        Some((cached, revision))
251    }
252
253    /// Get a reference to the tracer.
254    #[inline]
255    pub fn tracer(&self) -> &T {
256        &self.tracer
257    }
258}
259
260impl QueryRuntime<NoopTracer> {
261    /// Create a new query runtime with default settings.
262    pub fn new() -> Self {
263        Self::with_tracer(NoopTracer)
264    }
265
266    /// Create a builder for customizing the runtime.
267    ///
268    /// # Example
269    ///
270    /// ```ignore
271    /// let runtime = QueryRuntime::builder()
272    ///     .error_comparator(|a, b| {
273    ///         // Custom error comparison logic
274    ///         match (a.downcast_ref::<MyError>(), b.downcast_ref::<MyError>()) {
275    ///             (Some(a), Some(b)) => a == b,
276    ///             _ => false,
277    ///         }
278    ///     })
279    ///     .build();
280    /// ```
281    pub fn builder() -> QueryRuntimeBuilder<NoopTracer> {
282        QueryRuntimeBuilder::new()
283    }
284}
285
286impl<T: Tracer> QueryRuntime<T> {
287    /// Create a new query runtime with the specified tracer.
288    pub fn with_tracer(tracer: T) -> Self {
289        QueryRuntimeBuilder::new().tracer(tracer).build()
290    }
291
292    /// Execute a query synchronously.
293    ///
294    /// Returns the cached result if valid, otherwise executes the query.
295    ///
296    /// # Errors
297    ///
298    /// - `QueryError::Suspend` - Query is waiting for async loading
299    /// - `QueryError::Cycle` - Dependency cycle detected
300    pub fn query<Q: Query>(&self, query: Q) -> Result<Arc<Q::Output>, QueryError> {
301        self.query_internal(query)
302            .and_then(|(inner_result, _)| inner_result.map_err(QueryError::UserError))
303    }
304
305    /// Internal implementation shared by query() and poll().
306    ///
307    /// Returns (result, revision) tuple where result is either Ok(output) or Err(user_error).
308    /// System errors (Suspend, Cycle, etc.) are returned as the outer Err.
309    #[allow(clippy::type_complexity)]
310    fn query_internal<Q: Query>(
311        &self,
312        query: Q,
313    ) -> Result<(Result<Arc<Q::Output>, Arc<anyhow::Error>>, RevisionCounter), QueryError> {
314        let key = query.cache_key();
315        let full_key = FullCacheKey::new::<Q, _>(&key);
316
317        // Emit query key event for GC tracking (before other events)
318        self.tracer.on_query_key(&full_key);
319
320        // Create execution context and emit start event
321        let span_id = self.tracer.new_span_id();
322        let exec_ctx = ExecutionContext::new(span_id);
323        let query_key = TracerQueryKey::new(std::any::type_name::<Q>(), full_key.debug_repr());
324
325        self.tracer.on_query_start(span_id, query_key.clone());
326
327        // Check for cycles using thread-local stack
328        let cycle_detected = QUERY_STACK.with(|stack| {
329            let stack = stack.borrow();
330            stack.iter().any(|k| k == &full_key)
331        });
332
333        if cycle_detected {
334            let path = QUERY_STACK.with(|stack| {
335                let stack = stack.borrow();
336                let mut path: Vec<String> =
337                    stack.iter().map(|k| k.debug_repr().to_string()).collect();
338                path.push(full_key.debug_repr().to_string());
339                path
340            });
341
342            self.tracer.on_cycle_detected(
343                path.iter()
344                    .map(|s| TracerQueryKey::new("", s.clone()))
345                    .collect(),
346            );
347            self.tracer
348                .on_query_end(span_id, query_key.clone(), ExecutionResult::CycleDetected);
349
350            return Err(QueryError::Cycle { path });
351        }
352
353        // Check if cached and valid (with verify-then-decide pattern)
354        let current_rev = self.whale.current_revision();
355
356        // Fast path: already verified at current revision
357        if self.whale.is_verified_at(&full_key, &current_rev) {
358            // Single atomic access to get both cached value and revision
359            if let Some((cached, revision)) = self.get_cached_with_revision::<Q>(&full_key) {
360                self.tracer.on_cache_check(span_id, query_key.clone(), true);
361                self.tracer
362                    .on_query_end(span_id, query_key.clone(), ExecutionResult::CacheHit);
363
364                return match cached {
365                    CachedValue::Ok(output) => Ok((Ok(output), revision)),
366                    CachedValue::UserError(err) => Ok((Err(err), revision)),
367                };
368            }
369        }
370
371        // Check shallow validity (deps' changed_at ok) and try verify-then-decide
372        if self.whale.is_valid(&full_key) {
373            // Single atomic access to get both cached value and revision
374            if let Some((cached, revision)) = self.get_cached_with_revision::<Q>(&full_key) {
375                // Shallow valid but not verified - verify deps first
376                let mut deps_verified = true;
377                if let Some(deps) = self.whale.get_dependency_ids(&full_key) {
378                    for dep in deps {
379                        if let Some(verifier) = self.verifiers.get(&dep) {
380                            // Re-run query/asset to verify it (triggers recursive verification)
381                            // For assets, this re-accesses the asset which may re-run the locator
382                            if verifier.verify(self as &dyn std::any::Any).is_err() {
383                                deps_verified = false;
384                                break;
385                            }
386                        }
387                        // Note: deps without verifiers are assumed valid (they're verified
388                        // by the final is_valid check if their changed_at increased)
389                    }
390                }
391
392                // Re-check validity after deps are verified
393                if deps_verified && self.whale.is_valid(&full_key) {
394                    // Deps didn't change their changed_at, mark as verified and use cached
395                    self.whale.mark_verified(&full_key, &current_rev);
396
397                    self.tracer.on_cache_check(span_id, query_key.clone(), true);
398                    self.tracer
399                        .on_query_end(span_id, query_key.clone(), ExecutionResult::CacheHit);
400
401                    return match cached {
402                        CachedValue::Ok(output) => Ok((Ok(output), revision)),
403                        CachedValue::UserError(err) => Ok((Err(err), revision)),
404                    };
405                }
406                // A dep's changed_at increased, fall through to execute
407            }
408        }
409
410        self.tracer
411            .on_cache_check(span_id, query_key.clone(), false);
412
413        // Execute the query with cycle tracking
414        let _guard = StackGuard::push(full_key.clone());
415        let result = self.execute_query::<Q>(&query, &full_key, exec_ctx);
416        drop(_guard);
417
418        // Emit end event
419        let exec_result = match &result {
420            Ok((_, true, _)) => ExecutionResult::Changed,
421            Ok((_, false, _)) => ExecutionResult::Unchanged,
422            Err(QueryError::Suspend { .. }) => ExecutionResult::Suspended,
423            Err(QueryError::Cycle { .. }) => ExecutionResult::CycleDetected,
424            Err(e) => ExecutionResult::Error {
425                message: format!("{:?}", e),
426            },
427        };
428        self.tracer
429            .on_query_end(span_id, query_key.clone(), exec_result);
430
431        result.map(|(inner_result, _, revision)| (inner_result, revision))
432    }
433
434    /// Execute a query, caching the result if appropriate.
435    ///
436    /// Returns (result, output_changed, revision) tuple.
437    /// - `result`: Ok(output) for success, Err(user_error) for user errors
438    /// - System errors (Suspend, Cycle, etc.) are returned as outer Err
439    #[allow(clippy::type_complexity)]
440    fn execute_query<Q: Query>(
441        &self,
442        query: &Q,
443        full_key: &FullCacheKey,
444        exec_ctx: ExecutionContext,
445    ) -> Result<
446        (
447            Result<Arc<Q::Output>, Arc<anyhow::Error>>,
448            bool,
449            RevisionCounter,
450        ),
451        QueryError,
452    > {
453        // Capture current global revision at query start for consistency checking
454        let start_revision = self.whale.current_revision().get(Durability::volatile());
455
456        // Create consistency tracker for this query execution
457        let tracker = Rc::new(ConsistencyTracker::new(start_revision));
458
459        // Set thread-local tracker for nested locator calls
460        let _tracker_guard = ConsistencyTrackerGuard::new(tracker);
461
462        // Create context for this query execution
463        let ctx = QueryContext {
464            runtime: self,
465            current_key: full_key.clone(),
466            parent_query_type: std::any::type_name::<Q>(),
467            exec_ctx,
468            deps: RefCell::new(Vec::new()),
469        };
470
471        // Execute the query (clone because query() takes ownership)
472        let result = query.clone().query(&ctx);
473
474        // Get collected dependencies
475        let deps: Vec<FullCacheKey> = ctx.deps.borrow().clone();
476
477        // Query durability defaults to stable - Whale will automatically reduce
478        // the effective durability to min(requested, min(dep_durabilities)).
479        // A pure query with no dependencies remains stable.
480        // A query depending on volatile assets becomes volatile.
481        let durability = Durability::stable();
482
483        match result {
484            Ok(output) => {
485                // Check if output changed (for early cutoff)
486                // existing_revision is Some only when output is unchanged (can reuse revision)
487                let existing_revision = if let Some((CachedValue::Ok(old), rev)) =
488                    self.get_cached_with_revision::<Q>(full_key)
489                {
490                    if Q::output_eq(&*old, &*output) {
491                        Some(rev) // Same output - reuse revision
492                    } else {
493                        None // Different output
494                    }
495                } else {
496                    None // No previous Ok value
497                };
498                let output_changed = existing_revision.is_none();
499
500                // Emit early cutoff check event
501                self.tracer.on_early_cutoff_check(
502                    exec_ctx.span_id(),
503                    TracerQueryKey::new(std::any::type_name::<Q>(), full_key.debug_repr()),
504                    output_changed,
505                );
506
507                // Update whale with cached entry (atomic update of value + dependency state)
508                let entry = CachedEntry::Ok(output.clone() as Arc<dyn std::any::Any + Send + Sync>);
509                let revision = if let Some(existing_rev) = existing_revision {
510                    // confirm_unchanged doesn't change changed_at, use existing
511                    let _ = self.whale.confirm_unchanged(full_key, deps);
512                    existing_rev
513                } else {
514                    // Use new_rev from register result
515                    match self
516                        .whale
517                        .register(full_key.clone(), Some(entry), durability, deps)
518                    {
519                        Ok(result) => result.new_rev,
520                        Err(missing) => {
521                            return Err(QueryError::DependenciesRemoved {
522                                missing_keys: missing,
523                            })
524                        }
525                    }
526                };
527
528                // Register query in registry for list_queries
529                let is_new_query = self.query_registry.register(query);
530                if is_new_query {
531                    let sentinel = FullCacheKey::query_set_sentinel::<Q>();
532                    let _ = self
533                        .whale
534                        .register(sentinel, None, Durability::stable(), vec![]);
535                }
536
537                // Store verifier for this query (for verify-then-decide pattern)
538                self.verifiers
539                    .insert::<Q, T>(full_key.clone(), query.clone());
540
541                Ok((Ok(output), output_changed, revision))
542            }
543            Err(QueryError::UserError(err)) => {
544                // Check if error changed (for early cutoff)
545                // existing_revision is Some only when error is unchanged (can reuse revision)
546                let existing_revision = if let Some((CachedValue::UserError(old_err), rev)) =
547                    self.get_cached_with_revision::<Q>(full_key)
548                {
549                    if (self.error_comparator)(old_err.as_ref(), err.as_ref()) {
550                        Some(rev) // Same error - reuse revision
551                    } else {
552                        None // Different error
553                    }
554                } else {
555                    None // No previous UserError
556                };
557                let output_changed = existing_revision.is_none();
558
559                // Emit early cutoff check event
560                self.tracer.on_early_cutoff_check(
561                    exec_ctx.span_id(),
562                    TracerQueryKey::new(std::any::type_name::<Q>(), full_key.debug_repr()),
563                    output_changed,
564                );
565
566                // Update whale with cached error (atomic update of value + dependency state)
567                let entry = CachedEntry::UserError(err.clone());
568                let revision = if let Some(existing_rev) = existing_revision {
569                    // confirm_unchanged doesn't change changed_at, use existing
570                    let _ = self.whale.confirm_unchanged(full_key, deps);
571                    existing_rev
572                } else {
573                    // Use new_rev from register result
574                    match self
575                        .whale
576                        .register(full_key.clone(), Some(entry), durability, deps)
577                    {
578                        Ok(result) => result.new_rev,
579                        Err(missing) => {
580                            return Err(QueryError::DependenciesRemoved {
581                                missing_keys: missing,
582                            })
583                        }
584                    }
585                };
586
587                // Register query in registry for list_queries
588                let is_new_query = self.query_registry.register(query);
589                if is_new_query {
590                    let sentinel = FullCacheKey::query_set_sentinel::<Q>();
591                    let _ = self
592                        .whale
593                        .register(sentinel, None, Durability::stable(), vec![]);
594                }
595
596                // Store verifier for this query (for verify-then-decide pattern)
597                self.verifiers
598                    .insert::<Q, T>(full_key.clone(), query.clone());
599
600                Ok((Err(err), output_changed, revision))
601            }
602            Err(e) => {
603                // System errors (Suspend, Cycle, Cancelled, MissingDependency) are not cached
604                Err(e)
605            }
606        }
607    }
608
609    /// Invalidate a query, forcing recomputation on next access.
610    ///
611    /// This also invalidates any queries that depend on this one.
612    pub fn invalidate<Q: Query>(&self, key: &Q::CacheKey) {
613        let full_key = FullCacheKey::new::<Q, _>(key);
614
615        self.tracer.on_query_invalidated(
616            TracerQueryKey::new(std::any::type_name::<Q>(), full_key.debug_repr()),
617            InvalidationReason::ManualInvalidation,
618        );
619
620        // Update whale to invalidate dependents (register with None to clear cached value)
621        // Use stable durability to increment all revision counters, ensuring queries
622        // at any durability level will see this as a change.
623        let _ = self
624            .whale
625            .register(full_key, None, Durability::stable(), vec![]);
626    }
627
628    /// Remove a query from the cache entirely, freeing memory.
629    ///
630    /// Use this for GC when a query is no longer needed.
631    /// Unlike `invalidate`, this removes all traces of the query from storage.
632    /// The query will be recomputed from scratch on next access.
633    ///
634    /// This also invalidates any queries that depend on this one.
635    pub fn remove_query<Q: Query>(&self, key: &Q::CacheKey) {
636        let full_key = FullCacheKey::new::<Q, _>(key);
637
638        self.tracer.on_query_invalidated(
639            TracerQueryKey::new(std::any::type_name::<Q>(), full_key.debug_repr()),
640            InvalidationReason::ManualInvalidation,
641        );
642
643        // Remove verifier if exists
644        self.verifiers.remove(&full_key);
645
646        // Remove from whale storage (this also handles dependent invalidation)
647        self.whale.remove(&full_key);
648
649        // Remove from registry and update sentinel for list_queries
650        if self.query_registry.remove::<Q>(key) {
651            let sentinel = FullCacheKey::query_set_sentinel::<Q>();
652            let _ = self
653                .whale
654                .register(sentinel, None, Durability::stable(), vec![]);
655        }
656    }
657
658    /// Clear all cached values by removing all nodes from whale.
659    ///
660    /// Note: This is a relatively expensive operation as it iterates through all keys.
661    pub fn clear_cache(&self) {
662        let keys = self.whale.keys();
663        for key in keys {
664            self.whale.remove(&key);
665        }
666    }
667
668    /// Poll a query, returning both the result and its change revision.
669    ///
670    /// This is useful for implementing subscription patterns where you need to
671    /// detect changes efficiently. Compare the returned `revision` with a
672    /// previously stored value to determine if the query result has changed.
673    ///
674    /// The returned `Polled` contains a `Result<Arc<Q::Output>, Arc<anyhow::Error>>`
675    /// as its value, allowing you to track revision changes for both success and
676    /// user error cases.
677    ///
678    /// # Example
679    ///
680    /// ```ignore
681    /// struct Subscription<Q: Query> {
682    ///     query: Q,
683    ///     last_revision: RevisionCounter,
684    ///     tx: Sender<Result<Arc<Q::Output>, Arc<anyhow::Error>>>,
685    /// }
686    ///
687    /// // Polling loop
688    /// for sub in &mut subscriptions {
689    ///     let result = runtime.poll(sub.query.clone())?;
690    ///     if result.revision > sub.last_revision {
691    ///         sub.tx.send(result.value.clone())?;
692    ///         sub.last_revision = result.revision;
693    ///     }
694    /// }
695    /// ```
696    ///
697    /// # Errors
698    ///
699    /// Returns `Err` only for system errors (Suspend, Cycle, etc.).
700    /// User errors are returned as `Ok(Polled { value: Err(error), ... })`.
701    #[allow(clippy::type_complexity)]
702    pub fn poll<Q: Query>(
703        &self,
704        query: Q,
705    ) -> Result<Polled<Result<Arc<Q::Output>, Arc<anyhow::Error>>>, QueryError> {
706        let (value, revision) = self.query_internal(query)?;
707        Ok(Polled { value, revision })
708    }
709
710    /// Get the change revision of a query without executing it.
711    ///
712    /// Returns `None` if the query has never been executed.
713    ///
714    /// This is useful for checking if a query has changed since the last poll
715    /// without the cost of executing the query.
716    ///
717    /// # Example
718    ///
719    /// ```ignore
720    /// // Check if query has changed before deciding to poll
721    /// if let Some(rev) = runtime.changed_at::<MyQuery>(&key) {
722    ///     if rev > last_known_revision {
723    ///         let result = runtime.query(MyQuery::new(key))?;
724    ///         // Process result...
725    ///     }
726    /// }
727    /// ```
728    pub fn changed_at<Q: Query>(&self, key: &Q::CacheKey) -> Option<RevisionCounter> {
729        let full_key = FullCacheKey::new::<Q, _>(key);
730        self.whale.get(&full_key).map(|node| node.changed_at)
731    }
732}
733
734// ============================================================================
735// GC (Garbage Collection) API
736// ============================================================================
737
738impl<T: Tracer> QueryRuntime<T> {
739    /// Get all query keys currently in the cache.
740    ///
741    /// This is useful for implementing custom garbage collection strategies.
742    /// Use this in combination with [`Tracer::on_query_key`] to track access
743    /// times and implement LRU, TTL, or other GC algorithms externally.
744    ///
745    /// # Example
746    ///
747    /// ```ignore
748    /// // Collect all keys that haven't been accessed recently
749    /// let stale_keys: Vec<_> = runtime.query_keys()
750    ///     .filter(|key| tracker.is_stale(key))
751    ///     .collect();
752    ///
753    /// // Remove stale queries that have no dependents
754    /// for key in stale_keys {
755    ///     runtime.remove_if_unused(&key);
756    /// }
757    /// ```
758    pub fn query_keys(&self) -> Vec<FullCacheKey> {
759        self.whale.keys()
760    }
761
762    /// Remove a query if it has no dependents.
763    ///
764    /// Returns `true` if the query was removed, `false` if it has dependents
765    /// or doesn't exist. This is the safe way to remove queries during GC,
766    /// as it won't break queries that depend on this one.
767    ///
768    /// # Example
769    ///
770    /// ```ignore
771    /// let key = FullCacheKey::new::<MyQuery, _>(&cache_key);
772    /// if runtime.remove_query_if_unused::<MyQuery>(&cache_key) {
773    ///     println!("Query removed");
774    /// } else {
775    ///     println!("Query has dependents, not removed");
776    /// }
777    /// ```
778    pub fn remove_query_if_unused<Q: Query>(&self, key: &Q::CacheKey) -> bool {
779        let full_key = FullCacheKey::new::<Q, _>(key);
780        self.remove_if_unused(&full_key)
781    }
782
783    /// Remove a query by its [`FullCacheKey`].
784    ///
785    /// This is the type-erased version of [`remove_query`](Self::remove_query).
786    /// Use this when you have a `FullCacheKey` from [`query_keys`](Self::query_keys)
787    /// or [`Tracer::on_query_key`].
788    ///
789    /// Returns `true` if the query was removed, `false` if it doesn't exist.
790    ///
791    /// # Warning
792    ///
793    /// This forcibly removes the query even if other queries depend on it.
794    /// Dependent queries will be recomputed on next access. For safe GC,
795    /// use [`remove_if_unused`](Self::remove_if_unused) instead.
796    pub fn remove(&self, key: &FullCacheKey) -> bool {
797        // Remove verifier if exists
798        self.verifiers.remove(key);
799
800        // Remove from whale storage
801        self.whale.remove(key).is_some()
802    }
803
804    /// Remove a query by its [`FullCacheKey`] if it has no dependents.
805    ///
806    /// This is the type-erased version of [`remove_query_if_unused`](Self::remove_query_if_unused).
807    /// Use this when you have a `FullCacheKey` from [`query_keys`](Self::query_keys)
808    /// or [`Tracer::on_query_key`].
809    ///
810    /// Returns `true` if the query was removed, `false` if it has dependents
811    /// or doesn't exist.
812    ///
813    /// # Example
814    ///
815    /// ```ignore
816    /// // Implement LRU GC
817    /// for key in runtime.query_keys() {
818    ///     if tracker.is_expired(&key) {
819    ///         runtime.remove_if_unused(&key);
820    ///     }
821    /// }
822    /// ```
823    pub fn remove_if_unused(&self, key: &FullCacheKey) -> bool {
824        if self.whale.remove_if_unused(key.clone()).is_some() {
825            // Successfully removed - clean up verifier
826            self.verifiers.remove(key);
827            true
828        } else {
829            false
830        }
831    }
832}
833
834// ============================================================================
835// Builder
836// ============================================================================
837
838/// Builder for [`QueryRuntime`] with customizable settings.
839///
840/// # Example
841///
842/// ```ignore
843/// let runtime = QueryRuntime::builder()
844///     .error_comparator(|a, b| {
845///         // Treat all errors of the same type as equal
846///         a.downcast_ref::<std::io::Error>().is_some()
847///             == b.downcast_ref::<std::io::Error>().is_some()
848///     })
849///     .build();
850/// ```
851pub struct QueryRuntimeBuilder<T: Tracer = NoopTracer> {
852    error_comparator: ErrorComparator,
853    tracer: T,
854}
855
856impl Default for QueryRuntimeBuilder<NoopTracer> {
857    fn default() -> Self {
858        Self::new()
859    }
860}
861
862impl QueryRuntimeBuilder<NoopTracer> {
863    /// Create a new builder with default settings.
864    pub fn new() -> Self {
865        Self {
866            error_comparator: default_error_comparator,
867            tracer: NoopTracer,
868        }
869    }
870}
871
872impl<T: Tracer> QueryRuntimeBuilder<T> {
873    /// Set the error comparator function for early cutoff optimization.
874    ///
875    /// When a query returns `QueryError::UserError`, this function is used
876    /// to compare it with the previously cached error. If they are equal,
877    /// downstream queries can skip recomputation (early cutoff).
878    ///
879    /// The default comparator returns `false` for all errors, meaning errors
880    /// are always considered different (conservative, always recomputes).
881    ///
882    /// # Example
883    ///
884    /// ```ignore
885    /// // Treat errors as equal if they have the same display message
886    /// let runtime = QueryRuntime::builder()
887    ///     .error_comparator(|a, b| a.to_string() == b.to_string())
888    ///     .build();
889    /// ```
890    pub fn error_comparator(mut self, f: ErrorComparator) -> Self {
891        self.error_comparator = f;
892        self
893    }
894
895    /// Set the tracer for observability.
896    pub fn tracer<U: Tracer>(self, tracer: U) -> QueryRuntimeBuilder<U> {
897        QueryRuntimeBuilder {
898            error_comparator: self.error_comparator,
899            tracer,
900        }
901    }
902
903    /// Build the runtime with the configured settings.
904    pub fn build(self) -> QueryRuntime<T> {
905        QueryRuntime {
906            whale: WhaleRuntime::new(),
907            locators: Arc::new(LocatorStorage::new()),
908            pending: Arc::new(PendingStorage::new()),
909            query_registry: Arc::new(QueryRegistry::new()),
910            asset_key_registry: Arc::new(AssetKeyRegistry::new()),
911            verifiers: Arc::new(VerifierStorage::new()),
912            error_comparator: self.error_comparator,
913            tracer: Arc::new(self.tracer),
914        }
915    }
916}
917
918// ============================================================================
919// Asset API
920// ============================================================================
921
922impl<T: Tracer> QueryRuntime<T> {
923    /// Register an asset locator for a specific asset key type.
924    ///
925    /// Only one locator can be registered per key type. Later registrations
926    /// replace earlier ones.
927    ///
928    /// # Example
929    ///
930    /// ```ignore
931    /// let runtime = QueryRuntime::new();
932    /// runtime.register_asset_locator(FileSystemLocator::new("/assets"));
933    /// ```
934    pub fn register_asset_locator<K, L>(&self, locator: L)
935    where
936        K: AssetKey,
937        L: AssetLocator<K>,
938    {
939        self.locators.insert::<K, L>(locator);
940    }
941
942    /// Get an iterator over pending asset requests.
943    ///
944    /// Returns assets that have been requested but not yet resolved.
945    /// The user should fetch these externally and call `resolve_asset()`.
946    ///
947    /// # Example
948    ///
949    /// ```ignore
950    /// for pending in runtime.pending_assets() {
951    ///     if let Some(path) = pending.key::<FilePath>() {
952    ///         let content = fetch_file(path);
953    ///         runtime.resolve_asset(path.clone(), content);
954    ///     }
955    /// }
956    /// ```
957    pub fn pending_assets(&self) -> Vec<PendingAsset> {
958        self.pending.get_all()
959    }
960
961    /// Get pending assets filtered by key type.
962    pub fn pending_assets_of<K: AssetKey>(&self) -> Vec<K> {
963        self.pending.get_of_type::<K>()
964    }
965
966    /// Check if there are any pending assets.
967    pub fn has_pending_assets(&self) -> bool {
968        !self.pending.is_empty()
969    }
970
971    /// Resolve an asset with its loaded value.
972    ///
973    /// This marks the asset as ready and invalidates any queries that
974    /// depend on it (if the value changed), triggering recomputation on next access.
975    ///
976    /// This method is idempotent - resolving with the same value (via `asset_eq`)
977    /// will not trigger downstream recomputation.
978    ///
979    /// # Arguments
980    ///
981    /// * `key` - The asset key identifying this resource
982    /// * `value` - The loaded asset value
983    /// * `durability` - How frequently this asset is expected to change
984    ///
985    /// # Example
986    ///
987    /// ```ignore
988    /// let content = std::fs::read_to_string(&path)?;
989    /// runtime.resolve_asset(FilePath(path), content, DurabilityLevel::Volatile);
990    /// ```
991    pub fn resolve_asset<K: AssetKey>(&self, key: K, value: K::Asset, durability: DurabilityLevel) {
992        self.resolve_asset_internal(key, value, durability);
993    }
994
995    /// Resolve an asset with an error.
996    ///
997    /// This marks the asset as errored and caches the error. Queries depending
998    /// on this asset will receive `Err(QueryError::UserError(...))`.
999    ///
1000    /// Use this when async loading fails (e.g., network error, file not found,
1001    /// access denied).
1002    ///
1003    /// # Arguments
1004    ///
1005    /// * `key` - The asset key identifying this resource
1006    /// * `error` - The error to cache (will be wrapped in `Arc`)
1007    /// * `durability` - How frequently this error state is expected to change
1008    ///
1009    /// # Example
1010    ///
1011    /// ```ignore
1012    /// match fetch_file(&path) {
1013    ///     Ok(content) => runtime.resolve_asset(FilePath(path), content, DurabilityLevel::Volatile),
1014    ///     Err(e) => runtime.resolve_asset_error(FilePath(path), e, DurabilityLevel::Volatile),
1015    /// }
1016    /// ```
1017    pub fn resolve_asset_error<K: AssetKey>(
1018        &self,
1019        key: K,
1020        error: impl Into<anyhow::Error>,
1021        durability: DurabilityLevel,
1022    ) {
1023        let full_asset_key = FullAssetKey::new(&key);
1024        let full_cache_key = FullCacheKey::from_asset_key(&full_asset_key);
1025
1026        // Remove from pending BEFORE registering the error
1027        self.pending.remove(&full_asset_key);
1028
1029        // Prepare the error entry
1030        let error_arc = Arc::new(error.into());
1031        let entry = CachedEntry::AssetError(error_arc.clone());
1032        let durability =
1033            Durability::new(durability.as_u8() as usize).unwrap_or(Durability::volatile());
1034
1035        // Atomic compare-and-update (errors are always considered changed for now)
1036        let result = self
1037            .whale
1038            .update_with_compare(
1039                full_cache_key,
1040                Some(entry),
1041                |old_data, _new_data| {
1042                    // Compare old and new errors using error_comparator
1043                    match old_data.and_then(|d| d.as_ref()) {
1044                        Some(CachedEntry::AssetError(old_err)) => {
1045                            !(self.error_comparator)(old_err.as_ref(), error_arc.as_ref())
1046                        }
1047                        _ => true, // Loading, Ready, or not present -> changed
1048                    }
1049                },
1050                durability,
1051                vec![],
1052            )
1053            .expect("update_with_compare with no dependencies cannot fail");
1054
1055        // Emit asset resolved event (with changed status)
1056        self.tracer.on_asset_resolved(
1057            TracerAssetKey::new(std::any::type_name::<K>(), format!("{:?}", key)),
1058            result.changed,
1059        );
1060
1061        // Register asset key in registry for list_asset_keys
1062        let is_new_asset = self.asset_key_registry.register(&key);
1063        if is_new_asset {
1064            // Update sentinel to invalidate list_asset_keys dependents
1065            let sentinel = FullCacheKey::asset_key_set_sentinel::<K>();
1066            let _ = self
1067                .whale
1068                .register(sentinel, None, Durability::stable(), vec![]);
1069        }
1070    }
1071
1072    fn resolve_asset_internal<K: AssetKey>(
1073        &self,
1074        key: K,
1075        value: K::Asset,
1076        durability_level: DurabilityLevel,
1077    ) {
1078        let full_asset_key = FullAssetKey::new(&key);
1079        let full_cache_key = FullCacheKey::from_asset_key(&full_asset_key);
1080
1081        // Remove from pending BEFORE registering the value
1082        self.pending.remove(&full_asset_key);
1083
1084        // Prepare the new entry
1085        let value_arc: Arc<K::Asset> = Arc::new(value);
1086        let entry = CachedEntry::AssetReady(value_arc.clone() as Arc<dyn Any + Send + Sync>);
1087        let durability =
1088            Durability::new(durability_level.as_u8() as usize).unwrap_or(Durability::volatile());
1089
1090        // Atomic compare-and-update
1091        let result = self
1092            .whale
1093            .update_with_compare(
1094                full_cache_key,
1095                Some(entry),
1096                |old_data, _new_data| {
1097                    // Compare old and new values
1098                    match old_data.and_then(|d| d.as_ref()) {
1099                        Some(CachedEntry::AssetReady(old_arc)) => {
1100                            match old_arc.clone().downcast::<K::Asset>() {
1101                                Ok(old_value) => !K::asset_eq(&old_value, &value_arc),
1102                                Err(_) => true, // Type mismatch, treat as changed
1103                            }
1104                        }
1105                        _ => true, // Loading, NotFound, or not present -> changed
1106                    }
1107                },
1108                durability,
1109                vec![],
1110            )
1111            .expect("update_with_compare with no dependencies cannot fail");
1112
1113        // Emit asset resolved event
1114        self.tracer.on_asset_resolved(
1115            TracerAssetKey::new(std::any::type_name::<K>(), format!("{:?}", key)),
1116            result.changed,
1117        );
1118
1119        // Register asset key in registry for list_asset_keys
1120        let is_new_asset = self.asset_key_registry.register(&key);
1121        if is_new_asset {
1122            // Update sentinel to invalidate list_asset_keys dependents
1123            let sentinel = FullCacheKey::asset_key_set_sentinel::<K>();
1124            let _ = self
1125                .whale
1126                .register(sentinel, None, Durability::stable(), vec![]);
1127        }
1128    }
1129
1130    /// Invalidate an asset, forcing queries to re-request it.
1131    ///
1132    /// The asset will be marked as loading and added to pending assets.
1133    /// Dependent queries will suspend until the asset is resolved again.
1134    ///
1135    /// # Example
1136    ///
1137    /// ```ignore
1138    /// // File was modified externally
1139    /// runtime.invalidate_asset(&FilePath("config.json".into()));
1140    /// // Queries depending on this asset will now suspend
1141    /// // User should fetch the new value and call resolve_asset
1142    /// ```
1143    pub fn invalidate_asset<K: AssetKey>(&self, key: &K) {
1144        let full_asset_key = FullAssetKey::new(key);
1145        let full_cache_key = FullCacheKey::from_asset_key(&full_asset_key);
1146
1147        // Emit asset invalidated event
1148        self.tracer.on_asset_invalidated(TracerAssetKey::new(
1149            std::any::type_name::<K>(),
1150            format!("{:?}", key),
1151        ));
1152
1153        // Add to pending FIRST (before clearing whale state)
1154        // This ensures: readers see either old value, or Loading+pending
1155        self.pending.insert::<K>(full_asset_key, key.clone());
1156
1157        // Atomic: clear cached value + invalidate dependents
1158        // Using None for data means "needs to be loaded"
1159        // Use stable durability to ensure queries at any durability level see the change.
1160        let _ = self
1161            .whale
1162            .register(full_cache_key, None, Durability::stable(), vec![]);
1163    }
1164
1165    /// Remove an asset from the cache entirely.
1166    ///
1167    /// Unlike `invalidate_asset`, this removes all traces of the asset.
1168    /// Dependent queries will go through the locator again on next access.
1169    pub fn remove_asset<K: AssetKey>(&self, key: &K) {
1170        let full_asset_key = FullAssetKey::new(key);
1171        let full_cache_key = FullCacheKey::from_asset_key(&full_asset_key);
1172
1173        // Remove from pending first
1174        self.pending.remove(&full_asset_key);
1175
1176        // Remove from whale (this also cleans up dependency edges)
1177        // whale.remove() invalidates dependents before removing
1178        self.whale.remove(&full_cache_key);
1179
1180        // Remove from registry and update sentinel for list_asset_keys
1181        if self.asset_key_registry.remove::<K>(key) {
1182            let sentinel = FullCacheKey::asset_key_set_sentinel::<K>();
1183            let _ = self
1184                .whale
1185                .register(sentinel, None, Durability::stable(), vec![]);
1186        }
1187    }
1188
1189    /// Get an asset by key without tracking dependencies.
1190    ///
1191    /// Unlike `QueryContext::asset()`, this method does NOT register the caller
1192    /// as a dependent of the asset. Use this for direct asset access outside
1193    /// of query execution.
1194    ///
1195    /// # Returns
1196    ///
1197    /// - `Ok(AssetLoadingState::ready(...))` - Asset is loaded and ready
1198    /// - `Ok(AssetLoadingState::loading(...))` - Asset is still loading (added to pending)
1199    /// - `Err(QueryError::MissingDependency)` - Asset was not found
1200    pub fn get_asset<K: AssetKey>(&self, key: K) -> Result<AssetLoadingState<K>, QueryError> {
1201        self.get_asset_internal(key)
1202    }
1203
1204    /// Internal: Get asset state and its changed_at revision atomically.
1205    ///
1206    /// This is the "direct" version called from QueryRuntime::asset (no dependency tracking).
1207    /// For calls from QueryContext::asset, use `get_asset_with_revision_ctx`.
1208    ///
1209    /// Returns (AssetLoadingState, changed_at) where changed_at is from the same
1210    /// whale node that provided the asset value.
1211    fn get_asset_with_revision<K: AssetKey>(
1212        &self,
1213        key: K,
1214    ) -> Result<(AssetLoadingState<K>, RevisionCounter), QueryError> {
1215        let full_asset_key = FullAssetKey::new(&key);
1216        let full_cache_key = FullCacheKey::from_asset_key(&full_asset_key);
1217
1218        // Helper to emit AssetRequested event
1219        let emit_requested = |tracer: &T, key: &K, state: TracerAssetState| {
1220            tracer.on_asset_requested(
1221                TracerAssetKey::new(std::any::type_name::<K>(), format!("{:?}", key)),
1222                state,
1223            );
1224        };
1225
1226        // Check whale cache first (single atomic read)
1227        if let Some(node) = self.whale.get(&full_cache_key) {
1228            let changed_at = node.changed_at;
1229            // Check if valid at current revision (shallow check)
1230            if self.whale.is_valid(&full_cache_key) {
1231                // Verify dependencies recursively (like query path does)
1232                let mut deps_verified = true;
1233                if let Some(deps) = self.whale.get_dependency_ids(&full_cache_key) {
1234                    for dep in deps {
1235                        if let Some(verifier) = self.verifiers.get(&dep) {
1236                            // Re-run query/asset to verify it (triggers recursive verification)
1237                            if verifier.verify(self as &dyn std::any::Any).is_err() {
1238                                deps_verified = false;
1239                                break;
1240                            }
1241                        }
1242                    }
1243                }
1244
1245                // Re-check validity after deps are verified
1246                if deps_verified && self.whale.is_valid(&full_cache_key) {
1247                    // For cached entries, check consistency for leaf assets (no locator deps).
1248                    // This detects if resolve_asset/resolve_asset_error was called during query execution.
1249                    let has_locator_deps = self
1250                        .whale
1251                        .get_dependency_ids(&full_cache_key)
1252                        .is_some_and(|deps| !deps.is_empty());
1253
1254                    match &node.data {
1255                        Some(CachedEntry::AssetReady(arc)) => {
1256                            // Check consistency for cached leaf assets
1257                            if !has_locator_deps {
1258                                check_leaf_asset_consistency(changed_at)?;
1259                            }
1260                            emit_requested(&self.tracer, &key, TracerAssetState::Ready);
1261                            match arc.clone().downcast::<K::Asset>() {
1262                                Ok(value) => {
1263                                    return Ok((AssetLoadingState::ready(key, value), changed_at))
1264                                }
1265                                Err(_) => {
1266                                    return Err(QueryError::MissingDependency {
1267                                        description: format!("Asset type mismatch: {:?}", key),
1268                                    })
1269                                }
1270                            }
1271                        }
1272                        Some(CachedEntry::AssetError(err)) => {
1273                            // Check consistency for cached leaf errors
1274                            if !has_locator_deps {
1275                                check_leaf_asset_consistency(changed_at)?;
1276                            }
1277                            emit_requested(&self.tracer, &key, TracerAssetState::NotFound);
1278                            return Err(QueryError::UserError(err.clone()));
1279                        }
1280                        None => {
1281                            // Loading state - no value to be inconsistent with
1282                            emit_requested(&self.tracer, &key, TracerAssetState::Loading);
1283                            return Ok((AssetLoadingState::loading(key), changed_at));
1284                        }
1285                        _ => {
1286                            // Query-related entries (Ok, UserError) shouldn't be here
1287                            // Fall through to locator
1288                        }
1289                    }
1290                }
1291            }
1292        }
1293
1294        // Not in cache or invalid - try locator
1295        // Use LocatorContext to track deps on the asset itself
1296        check_cycle(&full_cache_key)?;
1297        let _guard = StackGuard::push(full_cache_key.clone());
1298
1299        let locator_ctx = LocatorContext::new(self, full_cache_key.clone());
1300        let locator_result =
1301            self.locators
1302                .locate_with_locator_ctx(TypeId::of::<K>(), &locator_ctx, &key);
1303
1304        if let Some(result) = locator_result {
1305            // Get collected dependencies from the locator context
1306            let locator_deps = locator_ctx.into_deps();
1307            match result {
1308                Ok(ErasedLocateResult::Ready {
1309                    value: arc,
1310                    durability: durability_level,
1311                }) => {
1312                    emit_requested(&self.tracer, &key, TracerAssetState::Ready);
1313
1314                    let typed_value: Arc<K::Asset> = match arc.downcast::<K::Asset>() {
1315                        Ok(v) => v,
1316                        Err(_) => {
1317                            return Err(QueryError::MissingDependency {
1318                                description: format!("Asset type mismatch: {:?}", key),
1319                            });
1320                        }
1321                    };
1322
1323                    // Store in whale atomically with early cutoff
1324                    // Include locator's dependencies so the asset is invalidated when they change
1325                    let entry = CachedEntry::AssetReady(typed_value.clone());
1326                    let durability = Durability::new(durability_level.as_u8() as usize)
1327                        .unwrap_or(Durability::volatile());
1328                    let new_value = typed_value.clone();
1329                    let result = self
1330                        .whale
1331                        .update_with_compare(
1332                            full_cache_key.clone(),
1333                            Some(entry),
1334                            |old_data, _new_data| {
1335                                let Some(CachedEntry::AssetReady(old_arc)) =
1336                                    old_data.and_then(|d| d.as_ref())
1337                                else {
1338                                    return true;
1339                                };
1340                                let Ok(old_value) = old_arc.clone().downcast::<K::Asset>() else {
1341                                    return true;
1342                                };
1343                                !K::asset_eq(&old_value, &new_value)
1344                            },
1345                            durability,
1346                            locator_deps,
1347                        )
1348                        .expect("update_with_compare should succeed");
1349
1350                    // Register verifier for this asset (for verify-then-decide pattern)
1351                    self.verifiers
1352                        .insert_asset::<K, T>(full_cache_key, key.clone());
1353
1354                    return Ok((AssetLoadingState::ready(key, typed_value), result.revision));
1355                }
1356                Ok(ErasedLocateResult::Pending) => {
1357                    emit_requested(&self.tracer, &key, TracerAssetState::Loading);
1358
1359                    // Add to pending list for Pending result
1360                    self.pending.insert::<K>(full_asset_key, key.clone());
1361                    match self
1362                        .whale
1363                        .get_or_insert(full_cache_key, None, Durability::volatile(), locator_deps)
1364                        .expect("get_or_insert should succeed")
1365                    {
1366                        GetOrInsertResult::Inserted(node) => {
1367                            return Ok((AssetLoadingState::loading(key), node.changed_at));
1368                        }
1369                        GetOrInsertResult::Existing(node) => {
1370                            let changed_at = node.changed_at;
1371                            match &node.data {
1372                                Some(CachedEntry::AssetReady(arc)) => {
1373                                    match arc.clone().downcast::<K::Asset>() {
1374                                        Ok(value) => {
1375                                            return Ok((
1376                                                AssetLoadingState::ready(key, value),
1377                                                changed_at,
1378                                            ))
1379                                        }
1380                                        Err(_) => {
1381                                            return Ok((
1382                                                AssetLoadingState::loading(key),
1383                                                changed_at,
1384                                            ))
1385                                        }
1386                                    }
1387                                }
1388                                Some(CachedEntry::AssetError(err)) => {
1389                                    return Err(QueryError::UserError(err.clone()));
1390                                }
1391                                _ => return Ok((AssetLoadingState::loading(key), changed_at)),
1392                            }
1393                        }
1394                    }
1395                }
1396                Err(QueryError::UserError(err)) => {
1397                    // Locator returned a user error - cache it as AssetError
1398                    let entry = CachedEntry::AssetError(err.clone());
1399                    let _ = self.whale.register(
1400                        full_cache_key,
1401                        Some(entry),
1402                        Durability::volatile(),
1403                        locator_deps,
1404                    );
1405                    return Err(QueryError::UserError(err));
1406                }
1407                Err(e) => {
1408                    // Other errors (Cycle, Suspended, etc.) - do NOT cache, propagate directly
1409                    return Err(e);
1410                }
1411            }
1412        }
1413
1414        // No locator registered or locator returned None - mark as pending
1415        // (no locator was called, so no deps to track)
1416        emit_requested(&self.tracer, &key, TracerAssetState::Loading);
1417        self.pending
1418            .insert::<K>(full_asset_key.clone(), key.clone());
1419
1420        match self
1421            .whale
1422            .get_or_insert(full_cache_key, None, Durability::volatile(), vec![])
1423            .expect("get_or_insert with no dependencies cannot fail")
1424        {
1425            GetOrInsertResult::Inserted(node) => {
1426                Ok((AssetLoadingState::loading(key), node.changed_at))
1427            }
1428            GetOrInsertResult::Existing(node) => {
1429                let changed_at = node.changed_at;
1430                match &node.data {
1431                    Some(CachedEntry::AssetReady(arc)) => {
1432                        match arc.clone().downcast::<K::Asset>() {
1433                            Ok(value) => Ok((AssetLoadingState::ready(key, value), changed_at)),
1434                            Err(_) => Ok((AssetLoadingState::loading(key), changed_at)),
1435                        }
1436                    }
1437                    Some(CachedEntry::AssetError(err)) => Err(QueryError::UserError(err.clone())),
1438                    _ => Ok((AssetLoadingState::loading(key), changed_at)),
1439                }
1440            }
1441        }
1442    }
1443
1444    /// Internal: Get asset state and its changed_at revision atomically (with QueryContext).
1445    ///
1446    /// This version is called from QueryContext::asset. Consistency checking for
1447    /// cached leaf assets is done inside this function before returning.
1448    fn get_asset_with_revision_ctx<K: AssetKey>(
1449        &self,
1450        key: K,
1451        _ctx: &QueryContext<'_, T>,
1452    ) -> Result<(AssetLoadingState<K>, RevisionCounter), QueryError> {
1453        let full_asset_key = FullAssetKey::new(&key);
1454        let full_cache_key = FullCacheKey::from_asset_key(&full_asset_key);
1455
1456        // Helper to emit AssetRequested event
1457        let emit_requested = |tracer: &T, key: &K, state: TracerAssetState| {
1458            tracer.on_asset_requested(
1459                TracerAssetKey::new(std::any::type_name::<K>(), format!("{:?}", key)),
1460                state,
1461            );
1462        };
1463
1464        // Check whale cache first (single atomic read)
1465        if let Some(node) = self.whale.get(&full_cache_key) {
1466            let changed_at = node.changed_at;
1467            // Check if valid at current revision (shallow check)
1468            if self.whale.is_valid(&full_cache_key) {
1469                // Verify dependencies recursively (like query path does)
1470                let mut deps_verified = true;
1471                if let Some(deps) = self.whale.get_dependency_ids(&full_cache_key) {
1472                    for dep in deps {
1473                        if let Some(verifier) = self.verifiers.get(&dep) {
1474                            // Re-run query/asset to verify it (triggers recursive verification)
1475                            if verifier.verify(self as &dyn std::any::Any).is_err() {
1476                                deps_verified = false;
1477                                break;
1478                            }
1479                        }
1480                    }
1481                }
1482
1483                // Re-check validity after deps are verified
1484                if deps_verified && self.whale.is_valid(&full_cache_key) {
1485                    // For cached entries, check consistency for leaf assets (no locator deps).
1486                    // This detects if resolve_asset/resolve_asset_error was called during query execution.
1487                    let has_locator_deps = self
1488                        .whale
1489                        .get_dependency_ids(&full_cache_key)
1490                        .is_some_and(|deps| !deps.is_empty());
1491
1492                    match &node.data {
1493                        Some(CachedEntry::AssetReady(arc)) => {
1494                            // Check consistency for cached leaf assets
1495                            if !has_locator_deps {
1496                                check_leaf_asset_consistency(changed_at)?;
1497                            }
1498                            emit_requested(&self.tracer, &key, TracerAssetState::Ready);
1499                            match arc.clone().downcast::<K::Asset>() {
1500                                Ok(value) => {
1501                                    return Ok((AssetLoadingState::ready(key, value), changed_at))
1502                                }
1503                                Err(_) => {
1504                                    return Err(QueryError::MissingDependency {
1505                                        description: format!("Asset type mismatch: {:?}", key),
1506                                    })
1507                                }
1508                            }
1509                        }
1510                        Some(CachedEntry::AssetError(err)) => {
1511                            // Check consistency for cached leaf errors
1512                            if !has_locator_deps {
1513                                check_leaf_asset_consistency(changed_at)?;
1514                            }
1515                            emit_requested(&self.tracer, &key, TracerAssetState::NotFound);
1516                            return Err(QueryError::UserError(err.clone()));
1517                        }
1518                        None => {
1519                            // Loading state - no value to be inconsistent with
1520                            emit_requested(&self.tracer, &key, TracerAssetState::Loading);
1521                            return Ok((AssetLoadingState::loading(key), changed_at));
1522                        }
1523                        _ => {
1524                            // Query-related entries (Ok, UserError) shouldn't be here
1525                            // Fall through to locator
1526                        }
1527                    }
1528                }
1529            }
1530        }
1531
1532        // Not in cache or invalid - try locator
1533        // Use LocatorContext to track deps on the asset itself (not the calling query)
1534        // Consistency tracking is handled via thread-local storage
1535        check_cycle(&full_cache_key)?;
1536        let _guard = StackGuard::push(full_cache_key.clone());
1537
1538        let locator_ctx = LocatorContext::new(self, full_cache_key.clone());
1539        let locator_result =
1540            self.locators
1541                .locate_with_locator_ctx(TypeId::of::<K>(), &locator_ctx, &key);
1542
1543        if let Some(result) = locator_result {
1544            // Get collected dependencies from the locator context
1545            let locator_deps = locator_ctx.into_deps();
1546            match result {
1547                Ok(ErasedLocateResult::Ready {
1548                    value: arc,
1549                    durability: durability_level,
1550                }) => {
1551                    emit_requested(&self.tracer, &key, TracerAssetState::Ready);
1552
1553                    let typed_value: Arc<K::Asset> = match arc.downcast::<K::Asset>() {
1554                        Ok(v) => v,
1555                        Err(_) => {
1556                            return Err(QueryError::MissingDependency {
1557                                description: format!("Asset type mismatch: {:?}", key),
1558                            });
1559                        }
1560                    };
1561
1562                    // Store in whale atomically with early cutoff
1563                    // Include locator's dependencies so the asset is invalidated when they change
1564                    let entry = CachedEntry::AssetReady(typed_value.clone());
1565                    let durability = Durability::new(durability_level.as_u8() as usize)
1566                        .unwrap_or(Durability::volatile());
1567                    let new_value = typed_value.clone();
1568                    let result = self
1569                        .whale
1570                        .update_with_compare(
1571                            full_cache_key.clone(),
1572                            Some(entry),
1573                            |old_data, _new_data| {
1574                                let Some(CachedEntry::AssetReady(old_arc)) =
1575                                    old_data.and_then(|d| d.as_ref())
1576                                else {
1577                                    return true;
1578                                };
1579                                let Ok(old_value) = old_arc.clone().downcast::<K::Asset>() else {
1580                                    return true;
1581                                };
1582                                !K::asset_eq(&old_value, &new_value)
1583                            },
1584                            durability,
1585                            locator_deps,
1586                        )
1587                        .expect("update_with_compare should succeed");
1588
1589                    // Register verifier for this asset (for verify-then-decide pattern)
1590                    self.verifiers
1591                        .insert_asset::<K, T>(full_cache_key, key.clone());
1592
1593                    return Ok((AssetLoadingState::ready(key, typed_value), result.revision));
1594                }
1595                Ok(ErasedLocateResult::Pending) => {
1596                    emit_requested(&self.tracer, &key, TracerAssetState::Loading);
1597
1598                    // Add to pending list for Pending result
1599                    self.pending.insert::<K>(full_asset_key, key.clone());
1600                    match self
1601                        .whale
1602                        .get_or_insert(full_cache_key, None, Durability::volatile(), locator_deps)
1603                        .expect("get_or_insert should succeed")
1604                    {
1605                        GetOrInsertResult::Inserted(node) => {
1606                            return Ok((AssetLoadingState::loading(key), node.changed_at));
1607                        }
1608                        GetOrInsertResult::Existing(node) => {
1609                            let changed_at = node.changed_at;
1610                            match &node.data {
1611                                Some(CachedEntry::AssetReady(arc)) => {
1612                                    match arc.clone().downcast::<K::Asset>() {
1613                                        Ok(value) => {
1614                                            return Ok((
1615                                                AssetLoadingState::ready(key, value),
1616                                                changed_at,
1617                                            ));
1618                                        }
1619                                        Err(_) => {
1620                                            return Ok((
1621                                                AssetLoadingState::loading(key),
1622                                                changed_at,
1623                                            ))
1624                                        }
1625                                    }
1626                                }
1627                                Some(CachedEntry::AssetError(err)) => {
1628                                    return Err(QueryError::UserError(err.clone()));
1629                                }
1630                                _ => return Ok((AssetLoadingState::loading(key), changed_at)),
1631                            }
1632                        }
1633                    }
1634                }
1635                Err(QueryError::UserError(err)) => {
1636                    // Locator returned a user error - cache it as AssetError
1637                    let entry = CachedEntry::AssetError(err.clone());
1638                    let _ = self.whale.register(
1639                        full_cache_key,
1640                        Some(entry),
1641                        Durability::volatile(),
1642                        locator_deps,
1643                    );
1644                    return Err(QueryError::UserError(err));
1645                }
1646                Err(e) => {
1647                    // Other errors (Cycle, Suspended, etc.) - do NOT cache, propagate directly
1648                    return Err(e);
1649                }
1650            }
1651        }
1652
1653        // No locator registered or locator returned None - mark as pending
1654        emit_requested(&self.tracer, &key, TracerAssetState::Loading);
1655        self.pending
1656            .insert::<K>(full_asset_key.clone(), key.clone());
1657
1658        match self
1659            .whale
1660            .get_or_insert(full_cache_key, None, Durability::volatile(), vec![])
1661            .expect("get_or_insert with no dependencies cannot fail")
1662        {
1663            GetOrInsertResult::Inserted(node) => {
1664                Ok((AssetLoadingState::loading(key), node.changed_at))
1665            }
1666            GetOrInsertResult::Existing(node) => {
1667                let changed_at = node.changed_at;
1668                match &node.data {
1669                    Some(CachedEntry::AssetReady(arc)) => {
1670                        match arc.clone().downcast::<K::Asset>() {
1671                            Ok(value) => Ok((AssetLoadingState::ready(key, value), changed_at)),
1672                            Err(_) => Ok((AssetLoadingState::loading(key), changed_at)),
1673                        }
1674                    }
1675                    Some(CachedEntry::AssetError(err)) => Err(QueryError::UserError(err.clone())),
1676                    _ => Ok((AssetLoadingState::loading(key), changed_at)),
1677                }
1678            }
1679        }
1680    }
1681
1682    /// Internal: Get asset state, checking cache and locator.
1683    fn get_asset_internal<K: AssetKey>(&self, key: K) -> Result<AssetLoadingState<K>, QueryError> {
1684        self.get_asset_with_revision(key).map(|(state, _)| state)
1685    }
1686}
1687
1688impl<T: Tracer> Db for QueryRuntime<T> {
1689    fn query<Q: Query>(&self, query: Q) -> Result<Arc<Q::Output>, QueryError> {
1690        QueryRuntime::query(self, query)
1691    }
1692
1693    fn asset<K: AssetKey>(&self, key: K) -> Result<AssetLoadingState<K>, QueryError> {
1694        self.get_asset_internal(key)
1695    }
1696
1697    fn list_queries<Q: Query>(&self) -> Vec<Q> {
1698        self.query_registry.get_all::<Q>()
1699    }
1700
1701    fn list_asset_keys<K: AssetKey>(&self) -> Vec<K> {
1702        self.asset_key_registry.get_all::<K>()
1703    }
1704}
1705
1706/// Tracks consistency of leaf asset accesses during query execution.
1707///
1708/// A "leaf" asset is one without dependencies (externally resolved via `resolve_asset`).
1709/// This tracker ensures that all leaf assets accessed during a query execution
1710/// (including those accessed by locators) are consistent - i.e., none were modified
1711/// via `resolve_asset` mid-execution.
1712///
1713/// The tracker is shared across `QueryContext` and `LocatorContext` to propagate
1714/// consistency checking through the entire execution tree.
1715#[derive(Debug)]
1716pub(crate) struct ConsistencyTracker {
1717    /// Global revision at query start. All leaf assets must have changed_at <= this.
1718    start_revision: RevisionCounter,
1719}
1720
1721impl ConsistencyTracker {
1722    /// Create a new tracker with the given start revision.
1723    pub fn new(start_revision: RevisionCounter) -> Self {
1724        Self { start_revision }
1725    }
1726
1727    /// Check consistency for a leaf asset access.
1728    ///
1729    /// A leaf asset is consistent if its changed_at <= start_revision.
1730    /// This detects if resolve_asset was called during query execution.
1731    ///
1732    /// Returns Ok(()) if consistent, Err if inconsistent.
1733    pub fn check_leaf_asset(&self, dep_changed_at: RevisionCounter) -> Result<(), QueryError> {
1734        if dep_changed_at > self.start_revision {
1735            Err(QueryError::InconsistentAssetResolution)
1736        } else {
1737            Ok(())
1738        }
1739    }
1740}
1741
1742/// Context provided to queries during execution.
1743///
1744/// Use this to access dependencies via `query()`.
1745pub struct QueryContext<'a, T: Tracer = NoopTracer> {
1746    runtime: &'a QueryRuntime<T>,
1747    current_key: FullCacheKey,
1748    parent_query_type: &'static str,
1749    exec_ctx: ExecutionContext,
1750    deps: RefCell<Vec<FullCacheKey>>,
1751}
1752
1753impl<'a, T: Tracer> QueryContext<'a, T> {
1754    /// Query a dependency.
1755    ///
1756    /// The dependency is automatically tracked for invalidation.
1757    ///
1758    /// # Example
1759    ///
1760    /// ```ignore
1761    /// fn query(self, db: &impl Db) -> Result<Self::Output, QueryError> {
1762    ///     let dep_result = db.query(OtherQuery { id: self.id })?;
1763    ///     Ok(process(&dep_result))
1764    /// }
1765    /// ```
1766    pub fn query<Q: Query>(&self, query: Q) -> Result<Arc<Q::Output>, QueryError> {
1767        let key = query.cache_key();
1768        let full_key = FullCacheKey::new::<Q, _>(&key);
1769
1770        // Emit dependency registered event
1771        self.runtime.tracer.on_dependency_registered(
1772            self.exec_ctx.span_id(),
1773            TracerQueryKey::new(self.parent_query_type, self.current_key.debug_repr()),
1774            TracerQueryKey::new(std::any::type_name::<Q>(), full_key.debug_repr()),
1775        );
1776
1777        // Record this as a dependency
1778        self.deps.borrow_mut().push(full_key.clone());
1779
1780        // Execute the query
1781        self.runtime.query(query)
1782    }
1783
1784    /// Access an asset, tracking it as a dependency.
1785    ///
1786    /// Returns `AssetLoadingState<K>`:
1787    /// - `is_loading()` if the asset is still being loaded
1788    /// - `is_ready()` if the asset is available
1789    ///
1790    /// Use `.suspend()?` to convert to `Result<Arc<K::Asset>, QueryError>`,
1791    /// which returns `Err(QueryError::Suspend { asset })` if still loading.
1792    ///
1793    /// # Example
1794    ///
1795    /// ```ignore
1796    /// #[query]
1797    /// fn process_file(db: &impl Db, path: FilePath) -> Result<Output, QueryError> {
1798    ///     let content = db.asset(path)?.suspend()?;
1799    ///     // Process content...
1800    ///     Ok(output)
1801    /// }
1802    /// ```
1803    ///
1804    /// # Errors
1805    ///
1806    /// Returns `Err(QueryError::MissingDependency)` if the asset was not found.
1807    pub fn asset<K: AssetKey>(&self, key: K) -> Result<AssetLoadingState<K>, QueryError> {
1808        let full_asset_key = FullAssetKey::new(&key);
1809        let full_cache_key = FullCacheKey::from_asset_key(&full_asset_key);
1810
1811        // 1. Emit asset dependency registered event
1812        self.runtime.tracer.on_asset_dependency_registered(
1813            self.exec_ctx.span_id(),
1814            TracerQueryKey::new(self.parent_query_type, self.current_key.debug_repr()),
1815            TracerAssetKey::new(std::any::type_name::<K>(), format!("{:?}", key)),
1816        );
1817
1818        // 2. Record dependency on this asset
1819        self.deps.borrow_mut().push(full_cache_key);
1820
1821        // 3. Get asset from cache/locator
1822        // Consistency checking for cached leaf assets is done inside get_asset_with_revision_ctx
1823        let (state, _changed_at) = self.runtime.get_asset_with_revision_ctx(key, self)?;
1824
1825        Ok(state)
1826    }
1827
1828    /// List all query instances of type Q that have been registered.
1829    ///
1830    /// This method establishes a dependency on the "set" of queries of type Q.
1831    /// The calling query will be invalidated when:
1832    /// - A new query of type Q is first executed (added to set)
1833    ///
1834    /// The calling query will NOT be invalidated when:
1835    /// - An individual query of type Q has its value change
1836    ///
1837    /// # Example
1838    ///
1839    /// ```ignore
1840    /// #[query]
1841    /// fn all_results(db: &impl Db) -> Result<Vec<i32>, QueryError> {
1842    ///     let queries = db.list_queries::<MyQuery>();
1843    ///     let mut results = Vec::new();
1844    ///     for q in queries {
1845    ///         results.push(*db.query(q)?);
1846    ///     }
1847    ///     Ok(results)
1848    /// }
1849    /// ```
1850    pub fn list_queries<Q: Query>(&self) -> Vec<Q> {
1851        // Record dependency on the sentinel (set-level dependency)
1852        let sentinel = FullCacheKey::query_set_sentinel::<Q>();
1853
1854        self.runtime.tracer.on_dependency_registered(
1855            self.exec_ctx.span_id(),
1856            TracerQueryKey::new(self.parent_query_type, self.current_key.debug_repr()),
1857            TracerQueryKey::new("QuerySet", sentinel.debug_repr()),
1858        );
1859
1860        // Ensure sentinel exists in whale (for dependency tracking)
1861        if self.runtime.whale.get(&sentinel).is_none() {
1862            let _ =
1863                self.runtime
1864                    .whale
1865                    .register(sentinel.clone(), None, Durability::volatile(), vec![]);
1866        }
1867
1868        self.deps.borrow_mut().push(sentinel);
1869
1870        // Return all registered queries
1871        self.runtime.query_registry.get_all::<Q>()
1872    }
1873
1874    /// List all asset keys of type K that have been registered.
1875    ///
1876    /// This method establishes a dependency on the "set" of asset keys of type K.
1877    /// The calling query will be invalidated when:
1878    /// - A new asset of type K is resolved for the first time (added to set)
1879    /// - An asset of type K is removed via remove_asset
1880    ///
1881    /// The calling query will NOT be invalidated when:
1882    /// - An individual asset's value changes (use `db.asset()` for that)
1883    ///
1884    /// # Example
1885    ///
1886    /// ```ignore
1887    /// #[query]
1888    /// fn all_configs(db: &impl Db) -> Result<Vec<String>, QueryError> {
1889    ///     let keys = db.list_asset_keys::<ConfigFile>();
1890    ///     let mut contents = Vec::new();
1891    ///     for key in keys {
1892    ///         let content = db.asset(&key)?.suspend()?;
1893    ///         contents.push((*content).clone());
1894    ///     }
1895    ///     Ok(contents)
1896    /// }
1897    /// ```
1898    pub fn list_asset_keys<K: AssetKey>(&self) -> Vec<K> {
1899        // Record dependency on the sentinel (set-level dependency)
1900        let sentinel = FullCacheKey::asset_key_set_sentinel::<K>();
1901
1902        self.runtime.tracer.on_asset_dependency_registered(
1903            self.exec_ctx.span_id(),
1904            TracerQueryKey::new(self.parent_query_type, self.current_key.debug_repr()),
1905            TracerAssetKey::new("AssetKeySet", sentinel.debug_repr()),
1906        );
1907
1908        // Ensure sentinel exists in whale (for dependency tracking)
1909        if self.runtime.whale.get(&sentinel).is_none() {
1910            let _ =
1911                self.runtime
1912                    .whale
1913                    .register(sentinel.clone(), None, Durability::volatile(), vec![]);
1914        }
1915
1916        self.deps.borrow_mut().push(sentinel);
1917
1918        // Return all registered asset keys
1919        self.runtime.asset_key_registry.get_all::<K>()
1920    }
1921}
1922
1923impl<'a, T: Tracer> Db for QueryContext<'a, T> {
1924    fn query<Q: Query>(&self, query: Q) -> Result<Arc<Q::Output>, QueryError> {
1925        QueryContext::query(self, query)
1926    }
1927
1928    fn asset<K: AssetKey>(&self, key: K) -> Result<AssetLoadingState<K>, QueryError> {
1929        QueryContext::asset(self, key)
1930    }
1931
1932    fn list_queries<Q: Query>(&self) -> Vec<Q> {
1933        QueryContext::list_queries(self)
1934    }
1935
1936    fn list_asset_keys<K: AssetKey>(&self) -> Vec<K> {
1937        QueryContext::list_asset_keys(self)
1938    }
1939}
1940
1941/// Context for collecting dependencies during asset locator execution.
1942///
1943/// Unlike `QueryContext`, this is specifically for locators and does not
1944/// register dependencies on any parent query. Dependencies collected here
1945/// are stored with the asset itself.
1946pub(crate) struct LocatorContext<'a, T: Tracer> {
1947    runtime: &'a QueryRuntime<T>,
1948    deps: RefCell<Vec<FullCacheKey>>,
1949}
1950
1951impl<'a, T: Tracer> LocatorContext<'a, T> {
1952    /// Create a new locator context for the given asset key.
1953    ///
1954    /// Consistency tracking is handled via thread-local storage, so leaf asset
1955    /// accesses will be checked against any active tracker from a parent query.
1956    pub(crate) fn new(runtime: &'a QueryRuntime<T>, _asset_key: FullCacheKey) -> Self {
1957        Self {
1958            runtime,
1959            deps: RefCell::new(Vec::new()),
1960        }
1961    }
1962
1963    /// Consume this context and return the collected dependencies.
1964    pub(crate) fn into_deps(self) -> Vec<FullCacheKey> {
1965        self.deps.into_inner()
1966    }
1967}
1968
1969impl<T: Tracer> Db for LocatorContext<'_, T> {
1970    fn query<Q: Query>(&self, query: Q) -> Result<Arc<Q::Output>, QueryError> {
1971        let key = query.cache_key();
1972        let full_key = FullCacheKey::new::<Q, _>(&key);
1973
1974        // Record this as a dependency of the asset being located
1975        self.deps.borrow_mut().push(full_key);
1976
1977        // Execute the query via the runtime
1978        self.runtime.query(query)
1979    }
1980
1981    fn asset<K: AssetKey>(&self, key: K) -> Result<AssetLoadingState<K>, QueryError> {
1982        let full_asset_key = FullAssetKey::new(&key);
1983        let full_cache_key = FullCacheKey::from_asset_key(&full_asset_key);
1984
1985        // Record this as a dependency of the asset being located
1986        self.deps.borrow_mut().push(full_cache_key);
1987
1988        // Access the asset - consistency checking is done inside get_asset_with_revision
1989        let (state, _changed_at) = self.runtime.get_asset_with_revision(key)?;
1990
1991        Ok(state)
1992    }
1993
1994    fn list_queries<Q: Query>(&self) -> Vec<Q> {
1995        self.runtime.list_queries()
1996    }
1997
1998    fn list_asset_keys<K: AssetKey>(&self) -> Vec<K> {
1999        self.runtime.list_asset_keys()
2000    }
2001}
2002
2003#[cfg(test)]
2004mod tests {
2005    use super::*;
2006
2007    #[test]
2008    fn test_simple_query() {
2009        #[derive(Clone)]
2010        struct Add {
2011            a: i32,
2012            b: i32,
2013        }
2014
2015        impl Query for Add {
2016            type CacheKey = (i32, i32);
2017            type Output = i32;
2018
2019            fn cache_key(&self) -> Self::CacheKey {
2020                (self.a, self.b)
2021            }
2022
2023            fn query(self, _db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
2024                Ok(Arc::new(self.a + self.b))
2025            }
2026
2027            fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
2028                old == new
2029            }
2030        }
2031
2032        let runtime = QueryRuntime::new();
2033
2034        let result = runtime.query(Add { a: 1, b: 2 }).unwrap();
2035        assert_eq!(*result, 3);
2036
2037        // Second query should be cached
2038        let result2 = runtime.query(Add { a: 1, b: 2 }).unwrap();
2039        assert_eq!(*result2, 3);
2040    }
2041
2042    #[test]
2043    fn test_dependent_queries() {
2044        #[derive(Clone)]
2045        struct Base {
2046            value: i32,
2047        }
2048
2049        impl Query for Base {
2050            type CacheKey = i32;
2051            type Output = i32;
2052
2053            fn cache_key(&self) -> Self::CacheKey {
2054                self.value
2055            }
2056
2057            fn query(self, _db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
2058                Ok(Arc::new(self.value * 2))
2059            }
2060
2061            fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
2062                old == new
2063            }
2064        }
2065
2066        #[derive(Clone)]
2067        struct Derived {
2068            base_value: i32,
2069        }
2070
2071        impl Query for Derived {
2072            type CacheKey = i32;
2073            type Output = i32;
2074
2075            fn cache_key(&self) -> Self::CacheKey {
2076                self.base_value
2077            }
2078
2079            fn query(self, db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
2080                let base = db.query(Base {
2081                    value: self.base_value,
2082                })?;
2083                Ok(Arc::new(*base + 10))
2084            }
2085
2086            fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
2087                old == new
2088            }
2089        }
2090
2091        let runtime = QueryRuntime::new();
2092
2093        let result = runtime.query(Derived { base_value: 5 }).unwrap();
2094        assert_eq!(*result, 20); // 5 * 2 + 10
2095    }
2096
2097    #[test]
2098    fn test_cycle_detection() {
2099        #[derive(Clone)]
2100        struct CycleA {
2101            id: i32,
2102        }
2103
2104        #[derive(Clone)]
2105        struct CycleB {
2106            id: i32,
2107        }
2108
2109        impl Query for CycleA {
2110            type CacheKey = i32;
2111            type Output = i32;
2112
2113            fn cache_key(&self) -> Self::CacheKey {
2114                self.id
2115            }
2116
2117            fn query(self, db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
2118                let b = db.query(CycleB { id: self.id })?;
2119                Ok(Arc::new(*b + 1))
2120            }
2121
2122            fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
2123                old == new
2124            }
2125        }
2126
2127        impl Query for CycleB {
2128            type CacheKey = i32;
2129            type Output = i32;
2130
2131            fn cache_key(&self) -> Self::CacheKey {
2132                self.id
2133            }
2134
2135            fn query(self, db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
2136                let a = db.query(CycleA { id: self.id })?;
2137                Ok(Arc::new(*a + 1))
2138            }
2139
2140            fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
2141                old == new
2142            }
2143        }
2144
2145        let runtime = QueryRuntime::new();
2146
2147        let result = runtime.query(CycleA { id: 1 });
2148        assert!(matches!(result, Err(QueryError::Cycle { .. })));
2149    }
2150
2151    #[test]
2152    fn test_fallible_query() {
2153        #[derive(Clone)]
2154        struct ParseInt {
2155            input: String,
2156        }
2157
2158        impl Query for ParseInt {
2159            type CacheKey = String;
2160            type Output = Result<i32, std::num::ParseIntError>;
2161
2162            fn cache_key(&self) -> Self::CacheKey {
2163                self.input.clone()
2164            }
2165
2166            fn query(self, _db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
2167                Ok(Arc::new(self.input.parse()))
2168            }
2169
2170            fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
2171                old == new
2172            }
2173        }
2174
2175        let runtime = QueryRuntime::new();
2176
2177        // Valid parse
2178        let result = runtime
2179            .query(ParseInt {
2180                input: "42".to_string(),
2181            })
2182            .unwrap();
2183        assert_eq!(*result, Ok(42));
2184
2185        // Invalid parse - system succeeds, user error in output
2186        let result = runtime
2187            .query(ParseInt {
2188                input: "not_a_number".to_string(),
2189            })
2190            .unwrap();
2191        assert!(result.is_err());
2192    }
2193
2194    // Macro tests
2195    mod macro_tests {
2196        use super::*;
2197        use crate::query;
2198
2199        #[query]
2200        fn add(db: &impl Db, a: i32, b: i32) -> Result<i32, QueryError> {
2201            let _ = db; // silence unused warning
2202            Ok(a + b)
2203        }
2204
2205        #[test]
2206        fn test_macro_basic() {
2207            let runtime = QueryRuntime::new();
2208            let result = runtime.query(Add::new(1, 2)).unwrap();
2209            assert_eq!(*result, 3);
2210        }
2211
2212        #[query]
2213        fn simple_double(db: &impl Db, x: i32) -> Result<i32, QueryError> {
2214            let _ = db;
2215            Ok(x * 2)
2216        }
2217
2218        #[test]
2219        fn test_macro_simple() {
2220            let runtime = QueryRuntime::new();
2221            let result = runtime.query(SimpleDouble::new(5)).unwrap();
2222            assert_eq!(*result, 10);
2223        }
2224
2225        #[query(keys(id))]
2226        fn with_key_selection(
2227            db: &impl Db,
2228            id: u32,
2229            include_extra: bool,
2230        ) -> Result<String, QueryError> {
2231            let _ = db;
2232            Ok(format!("id={}, extra={}", id, include_extra))
2233        }
2234
2235        #[test]
2236        fn test_macro_key_selection() {
2237            let runtime = QueryRuntime::new();
2238
2239            // Same id, different include_extra - should return cached
2240            let r1 = runtime.query(WithKeySelection::new(1, true)).unwrap();
2241            let r2 = runtime.query(WithKeySelection::new(1, false)).unwrap();
2242
2243            // Both should have same value because only `id` is the key
2244            assert_eq!(*r1, "id=1, extra=true");
2245            assert_eq!(*r2, "id=1, extra=true"); // Cached!
2246        }
2247
2248        #[query]
2249        fn dependent(db: &impl Db, a: i32, b: i32) -> Result<i32, QueryError> {
2250            let sum = db.query(Add::new(a, b))?;
2251            Ok(*sum * 2)
2252        }
2253
2254        #[test]
2255        fn test_macro_dependencies() {
2256            let runtime = QueryRuntime::new();
2257            let result = runtime.query(Dependent::new(3, 4)).unwrap();
2258            assert_eq!(*result, 14); // (3 + 4) * 2
2259        }
2260
2261        #[query(output_eq)]
2262        fn with_output_eq(db: &impl Db, x: i32) -> Result<i32, QueryError> {
2263            let _ = db;
2264            Ok(x * 2)
2265        }
2266
2267        #[test]
2268        fn test_macro_output_eq() {
2269            let runtime = QueryRuntime::new();
2270            let result = runtime.query(WithOutputEq::new(5)).unwrap();
2271            assert_eq!(*result, 10);
2272        }
2273
2274        #[query(name = "CustomName")]
2275        fn original_name(db: &impl Db, x: i32) -> Result<i32, QueryError> {
2276            let _ = db;
2277            Ok(x)
2278        }
2279
2280        #[test]
2281        fn test_macro_custom_name() {
2282            let runtime = QueryRuntime::new();
2283            let result = runtime.query(CustomName::new(42)).unwrap();
2284            assert_eq!(*result, 42);
2285        }
2286
2287        // Test that attribute macros like #[tracing::instrument] are preserved
2288        // We use #[allow(unused_variables)] and #[inline] as test attributes since
2289        // they don't require external dependencies.
2290        #[allow(unused_variables)]
2291        #[inline]
2292        #[query]
2293        fn with_attributes(db: &impl Db, x: i32) -> Result<i32, QueryError> {
2294            // This would warn without #[allow(unused_variables)] on the generated method
2295            let unused_var = 42;
2296            Ok(x * 2)
2297        }
2298
2299        #[test]
2300        fn test_macro_preserves_attributes() {
2301            let runtime = QueryRuntime::new();
2302            // If attributes weren't preserved, this might warn about unused_var
2303            let result = runtime.query(WithAttributes::new(5)).unwrap();
2304            assert_eq!(*result, 10);
2305        }
2306    }
2307
2308    // Tests for poll() and changed_at()
2309    mod poll_tests {
2310        use super::*;
2311
2312        #[derive(Clone)]
2313        struct Counter {
2314            id: i32,
2315        }
2316
2317        impl Query for Counter {
2318            type CacheKey = i32;
2319            type Output = i32;
2320
2321            fn cache_key(&self) -> Self::CacheKey {
2322                self.id
2323            }
2324
2325            fn query(self, _db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
2326                Ok(Arc::new(self.id * 10))
2327            }
2328
2329            fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
2330                old == new
2331            }
2332        }
2333
2334        #[test]
2335        fn test_poll_returns_value_and_revision() {
2336            let runtime = QueryRuntime::new();
2337
2338            let result = runtime.poll(Counter { id: 1 }).unwrap();
2339
2340            // Value should be correct - access through Result and Arc
2341            assert_eq!(**result.value.as_ref().unwrap(), 10);
2342
2343            // Revision should be non-zero after first execution
2344            assert!(result.revision > 0);
2345        }
2346
2347        #[test]
2348        fn test_poll_revision_stable_on_cache_hit() {
2349            let runtime = QueryRuntime::new();
2350
2351            // First poll
2352            let result1 = runtime.poll(Counter { id: 1 }).unwrap();
2353            let rev1 = result1.revision;
2354
2355            // Second poll (cache hit)
2356            let result2 = runtime.poll(Counter { id: 1 }).unwrap();
2357            let rev2 = result2.revision;
2358
2359            // Revision should be the same (no change)
2360            assert_eq!(rev1, rev2);
2361        }
2362
2363        #[test]
2364        fn test_poll_revision_changes_on_invalidate() {
2365            let runtime = QueryRuntime::new();
2366
2367            // First poll
2368            let result1 = runtime.poll(Counter { id: 1 }).unwrap();
2369            let rev1 = result1.revision;
2370
2371            // Invalidate and poll again
2372            runtime.invalidate::<Counter>(&1);
2373            let result2 = runtime.poll(Counter { id: 1 }).unwrap();
2374            let rev2 = result2.revision;
2375
2376            // Revision should increase (value was recomputed)
2377            // Note: Since output_eq returns true (same value), this might not change
2378            // depending on early cutoff behavior. Let's verify the value is still correct.
2379            assert_eq!(**result2.value.as_ref().unwrap(), 10);
2380
2381            // With early cutoff, revision might stay the same if value didn't change
2382            // This is expected behavior
2383            assert!(rev2 >= rev1);
2384        }
2385
2386        #[test]
2387        fn test_changed_at_returns_none_for_unexecuted_query() {
2388            let runtime = QueryRuntime::new();
2389
2390            // Query has never been executed
2391            let rev = runtime.changed_at::<Counter>(&1);
2392            assert!(rev.is_none());
2393        }
2394
2395        #[test]
2396        fn test_changed_at_returns_revision_after_execution() {
2397            let runtime = QueryRuntime::new();
2398
2399            // Execute the query
2400            let _ = runtime.query(Counter { id: 1 }).unwrap();
2401
2402            // Now changed_at should return Some
2403            let rev = runtime.changed_at::<Counter>(&1);
2404            assert!(rev.is_some());
2405            assert!(rev.unwrap() > 0);
2406        }
2407
2408        #[test]
2409        fn test_changed_at_matches_poll_revision() {
2410            let runtime = QueryRuntime::new();
2411
2412            // Poll the query
2413            let result = runtime.poll(Counter { id: 1 }).unwrap();
2414
2415            // changed_at should match the revision from poll
2416            let rev = runtime.changed_at::<Counter>(&1);
2417            assert_eq!(rev, Some(result.revision));
2418        }
2419
2420        #[test]
2421        fn test_poll_value_access() {
2422            let runtime = QueryRuntime::new();
2423
2424            let result = runtime.poll(Counter { id: 5 }).unwrap();
2425
2426            // Access through Result and Arc
2427            let value: &i32 = result.value.as_ref().unwrap();
2428            assert_eq!(*value, 50);
2429
2430            // Access Arc directly via field after unwrapping Result
2431            let arc: &Arc<i32> = result.value.as_ref().unwrap();
2432            assert_eq!(**arc, 50);
2433        }
2434
2435        #[test]
2436        fn test_subscription_pattern() {
2437            let runtime = QueryRuntime::new();
2438
2439            // Simulate subscription pattern
2440            let mut last_revision: RevisionCounter = 0;
2441            let mut notifications = 0;
2442
2443            // First poll - should notify (new value)
2444            let result = runtime.poll(Counter { id: 1 }).unwrap();
2445            if result.revision > last_revision {
2446                notifications += 1;
2447                last_revision = result.revision;
2448            }
2449
2450            // Second poll - should NOT notify (no change)
2451            let result = runtime.poll(Counter { id: 1 }).unwrap();
2452            if result.revision > last_revision {
2453                notifications += 1;
2454                last_revision = result.revision;
2455            }
2456
2457            // Third poll - should NOT notify (no change)
2458            let result = runtime.poll(Counter { id: 1 }).unwrap();
2459            if result.revision > last_revision {
2460                notifications += 1;
2461                #[allow(unused_assignments)]
2462                {
2463                    last_revision = result.revision;
2464                }
2465            }
2466
2467            // Only the first poll should have triggered a notification
2468            assert_eq!(notifications, 1);
2469        }
2470    }
2471
2472    // Tests for GC APIs
2473    mod gc_tests {
2474        use super::*;
2475        use std::collections::HashSet;
2476        use std::sync::atomic::{AtomicUsize, Ordering};
2477        use std::sync::Mutex;
2478
2479        #[derive(Clone)]
2480        struct Leaf {
2481            id: i32,
2482        }
2483
2484        impl Query for Leaf {
2485            type CacheKey = i32;
2486            type Output = i32;
2487
2488            fn cache_key(&self) -> Self::CacheKey {
2489                self.id
2490            }
2491
2492            fn query(self, _db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
2493                Ok(Arc::new(self.id * 10))
2494            }
2495
2496            fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
2497                old == new
2498            }
2499        }
2500
2501        #[derive(Clone)]
2502        struct Parent {
2503            child_id: i32,
2504        }
2505
2506        impl Query for Parent {
2507            type CacheKey = i32;
2508            type Output = i32;
2509
2510            fn cache_key(&self) -> Self::CacheKey {
2511                self.child_id
2512            }
2513
2514            fn query(self, db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
2515                let child = db.query(Leaf { id: self.child_id })?;
2516                Ok(Arc::new(*child + 1))
2517            }
2518
2519            fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
2520                old == new
2521            }
2522        }
2523
2524        #[test]
2525        fn test_query_keys_returns_all_cached_queries() {
2526            let runtime = QueryRuntime::new();
2527
2528            // Execute some queries
2529            let _ = runtime.query(Leaf { id: 1 }).unwrap();
2530            let _ = runtime.query(Leaf { id: 2 }).unwrap();
2531            let _ = runtime.query(Leaf { id: 3 }).unwrap();
2532
2533            // Get all keys
2534            let keys = runtime.query_keys();
2535
2536            // Should have at least 3 keys (might have more due to sentinels)
2537            assert!(keys.len() >= 3);
2538        }
2539
2540        #[test]
2541        fn test_remove_removes_query() {
2542            let runtime = QueryRuntime::new();
2543
2544            // Execute a query
2545            let _ = runtime.query(Leaf { id: 1 }).unwrap();
2546
2547            // Get the key
2548            let full_key = FullCacheKey::new::<Leaf, _>(&1);
2549
2550            // Query should exist
2551            assert!(runtime.changed_at::<Leaf>(&1).is_some());
2552
2553            // Remove it
2554            assert!(runtime.remove(&full_key));
2555
2556            // Query should no longer exist
2557            assert!(runtime.changed_at::<Leaf>(&1).is_none());
2558        }
2559
2560        #[test]
2561        fn test_remove_if_unused_removes_leaf_query() {
2562            let runtime = QueryRuntime::new();
2563
2564            // Execute a leaf query (no dependents)
2565            let _ = runtime.query(Leaf { id: 1 }).unwrap();
2566
2567            // Should be removable since no other query depends on it
2568            assert!(runtime.remove_query_if_unused::<Leaf>(&1));
2569
2570            // Query should no longer exist
2571            assert!(runtime.changed_at::<Leaf>(&1).is_none());
2572        }
2573
2574        #[test]
2575        fn test_remove_if_unused_does_not_remove_query_with_dependents() {
2576            let runtime = QueryRuntime::new();
2577
2578            // Execute parent query (which depends on Leaf)
2579            let _ = runtime.query(Parent { child_id: 1 }).unwrap();
2580
2581            // Leaf query should not be removable since Parent depends on it
2582            assert!(!runtime.remove_query_if_unused::<Leaf>(&1));
2583
2584            // Leaf query should still exist
2585            assert!(runtime.changed_at::<Leaf>(&1).is_some());
2586
2587            // But Parent should be removable (no dependents)
2588            assert!(runtime.remove_query_if_unused::<Parent>(&1));
2589        }
2590
2591        #[test]
2592        fn test_remove_if_unused_with_full_cache_key() {
2593            let runtime = QueryRuntime::new();
2594
2595            // Execute a leaf query
2596            let _ = runtime.query(Leaf { id: 1 }).unwrap();
2597
2598            let full_key = FullCacheKey::new::<Leaf, _>(&1);
2599
2600            // Should be removable via FullCacheKey
2601            assert!(runtime.remove_if_unused(&full_key));
2602
2603            // Query should no longer exist
2604            assert!(runtime.changed_at::<Leaf>(&1).is_none());
2605        }
2606
2607        // Test tracer receives on_query_key calls
2608        struct GcTracker {
2609            accessed_keys: Mutex<HashSet<String>>,
2610            access_count: AtomicUsize,
2611        }
2612
2613        impl GcTracker {
2614            fn new() -> Self {
2615                Self {
2616                    accessed_keys: Mutex::new(HashSet::new()),
2617                    access_count: AtomicUsize::new(0),
2618                }
2619            }
2620        }
2621
2622        impl Tracer for GcTracker {
2623            fn new_span_id(&self) -> SpanId {
2624                SpanId(1)
2625            }
2626
2627            fn on_query_key(&self, full_key: &FullCacheKey) {
2628                self.accessed_keys
2629                    .lock()
2630                    .unwrap()
2631                    .insert(full_key.debug_repr().to_string());
2632                self.access_count.fetch_add(1, Ordering::Relaxed);
2633            }
2634        }
2635
2636        #[test]
2637        fn test_tracer_receives_on_query_key() {
2638            let tracker = GcTracker::new();
2639            let runtime = QueryRuntime::with_tracer(tracker);
2640
2641            // Execute some queries
2642            let _ = runtime.query(Leaf { id: 1 }).unwrap();
2643            let _ = runtime.query(Leaf { id: 2 }).unwrap();
2644
2645            // Tracer should have received on_query_key calls
2646            let count = runtime.tracer().access_count.load(Ordering::Relaxed);
2647            assert_eq!(count, 2);
2648
2649            // Check that the keys were recorded
2650            let keys = runtime.tracer().accessed_keys.lock().unwrap();
2651            assert!(keys.iter().any(|k| k.contains("Leaf")));
2652        }
2653
2654        #[test]
2655        fn test_tracer_receives_on_query_key_for_cache_hits() {
2656            let tracker = GcTracker::new();
2657            let runtime = QueryRuntime::with_tracer(tracker);
2658
2659            // Execute query twice (second is cache hit)
2660            let _ = runtime.query(Leaf { id: 1 }).unwrap();
2661            let _ = runtime.query(Leaf { id: 1 }).unwrap();
2662
2663            // Tracer should have received on_query_key for both calls
2664            let count = runtime.tracer().access_count.load(Ordering::Relaxed);
2665            assert_eq!(count, 2);
2666        }
2667
2668        #[test]
2669        fn test_gc_workflow() {
2670            let tracker = GcTracker::new();
2671            let runtime = QueryRuntime::with_tracer(tracker);
2672
2673            // Execute some queries
2674            let _ = runtime.query(Leaf { id: 1 }).unwrap();
2675            let _ = runtime.query(Leaf { id: 2 }).unwrap();
2676            let _ = runtime.query(Leaf { id: 3 }).unwrap();
2677
2678            // Simulate GC: remove all queries that are not in use
2679            let mut removed = 0;
2680            for key in runtime.query_keys() {
2681                if runtime.remove_if_unused(&key) {
2682                    removed += 1;
2683                }
2684            }
2685
2686            // All leaf queries should be removable
2687            assert!(removed >= 3);
2688
2689            // Queries should no longer exist
2690            assert!(runtime.changed_at::<Leaf>(&1).is_none());
2691            assert!(runtime.changed_at::<Leaf>(&2).is_none());
2692            assert!(runtime.changed_at::<Leaf>(&3).is_none());
2693        }
2694    }
2695}