query_flow/
runtime.rs

1//! Query runtime and context.
2
3use std::any::{Any, TypeId};
4use std::cell::{Cell, RefCell};
5use std::ops::Deref;
6use std::sync::Arc;
7
8use whale::{Durability, GetOrInsertResult, RevisionCounter, Runtime as WhaleRuntime};
9
10use crate::asset::{AssetKey, AssetLocator, DurabilityLevel, FullAssetKey, PendingAsset};
11use crate::db::Db;
12use crate::key::FullCacheKey;
13use crate::loading::AssetLoadingState;
14use crate::query::Query;
15use crate::storage::{
16    AssetKeyRegistry, CachedEntry, CachedValue, ErasedLocateResult, LocatorStorage, PendingStorage,
17    QueryRegistry, VerifierStorage,
18};
19use crate::tracer::{
20    ExecutionResult, InvalidationReason, NoopTracer, SpanId, Tracer, TracerAssetKey,
21    TracerAssetState, TracerQueryKey,
22};
23use crate::QueryError;
24
25/// Function type for comparing user errors during early cutoff.
26///
27/// Used by `QueryRuntimeBuilder::error_comparator` to customize how
28/// `QueryError::UserError` values are compared for caching purposes.
29pub type ErrorComparator = fn(&anyhow::Error, &anyhow::Error) -> bool;
30
31/// Number of durability levels (matches whale's default).
32const DURABILITY_LEVELS: usize = 4;
33
34// Thread-local query execution stack for cycle detection.
35thread_local! {
36    static QUERY_STACK: RefCell<Vec<FullCacheKey>> = const { RefCell::new(Vec::new()) };
37}
38
39/// Check for cycles in the query stack and return error if detected.
40fn check_cycle(key: &FullCacheKey) -> Result<(), QueryError> {
41    let cycle_detected = QUERY_STACK.with(|stack| stack.borrow().iter().any(|k| k == key));
42    if cycle_detected {
43        let path = QUERY_STACK.with(|stack| {
44            let stack = stack.borrow();
45            let mut path: Vec<String> = stack.iter().map(|k| k.debug_repr().to_string()).collect();
46            path.push(key.debug_repr().to_string());
47            path
48        });
49        return Err(QueryError::Cycle { path });
50    }
51    Ok(())
52}
53
54/// RAII guard for pushing/popping from query stack.
55struct StackGuard;
56
57impl StackGuard {
58    fn push(key: FullCacheKey) -> Self {
59        QUERY_STACK.with(|stack| stack.borrow_mut().push(key));
60        StackGuard
61    }
62}
63
64impl Drop for StackGuard {
65    fn drop(&mut self) {
66        QUERY_STACK.with(|stack| {
67            stack.borrow_mut().pop();
68        });
69    }
70}
71
72/// Execution context passed through query execution.
73///
74/// Contains a span ID for tracing correlation.
75#[derive(Clone, Copy)]
76pub struct ExecutionContext {
77    span_id: SpanId,
78}
79
80impl ExecutionContext {
81    /// Create a new execution context with the given span ID.
82    #[inline]
83    pub fn new(span_id: SpanId) -> Self {
84        Self { span_id }
85    }
86
87    /// Get the span ID for this execution context.
88    #[inline]
89    pub fn span_id(&self) -> SpanId {
90        self.span_id
91    }
92}
93
94/// Result of polling a query, containing the value and its revision.
95///
96/// This is returned by [`QueryRuntime::poll`] and provides both the query result
97/// and its change revision, enabling efficient change detection for subscription
98/// patterns.
99///
100/// # Example
101///
102/// ```ignore
103/// let result = runtime.poll(MyQuery::new())?;
104///
105/// // Access the value via Deref
106/// println!("{:?}", *result);
107///
108/// // Check if changed since last poll
109/// if result.revision > last_known_revision {
110///     notify_subscribers(&result.value);
111///     last_known_revision = result.revision;
112/// }
113/// ```
114#[derive(Debug, Clone)]
115pub struct Polled<T> {
116    /// The query result value.
117    pub value: T,
118    /// The revision at which this value was last changed.
119    ///
120    /// Compare this with a previously stored revision to detect changes.
121    pub revision: RevisionCounter,
122}
123
124impl<T: Deref> Deref for Polled<T> {
125    type Target = T::Target;
126
127    fn deref(&self) -> &Self::Target {
128        &self.value
129    }
130}
131
132/// The query runtime manages query execution, caching, and dependency tracking.
133///
134/// This is cheap to clone - all data is behind `Arc`.
135///
136/// # Type Parameter
137///
138/// - `T: Tracer` - The tracer type for observability. Use `NoopTracer` (default)
139///   for zero-cost when tracing is not needed.
140///
141/// # Example
142///
143/// ```ignore
144/// // Without tracing (default)
145/// let runtime = QueryRuntime::new();
146///
147/// // With tracing
148/// let tracer = MyTracer::new();
149/// let runtime = QueryRuntime::with_tracer(tracer);
150///
151/// // Sync query execution
152/// let result = runtime.query(MyQuery { ... })?;
153/// ```
154pub struct QueryRuntime<T: Tracer = NoopTracer> {
155    /// Whale runtime for dependency tracking and cache storage.
156    /// Query outputs and asset values are stored in Node.data as Option<CachedEntry>.
157    whale: WhaleRuntime<FullCacheKey, Option<CachedEntry>, DURABILITY_LEVELS>,
158    /// Registered asset locators
159    locators: Arc<LocatorStorage<T>>,
160    /// Pending asset requests
161    pending: Arc<PendingStorage>,
162    /// Registry for tracking query instances (for list_queries)
163    query_registry: Arc<QueryRegistry>,
164    /// Registry for tracking asset keys (for list_asset_keys)
165    asset_key_registry: Arc<AssetKeyRegistry>,
166    /// Verifiers for re-executing queries (for verify-then-decide pattern)
167    verifiers: Arc<VerifierStorage>,
168    /// Comparator for user errors during early cutoff
169    error_comparator: ErrorComparator,
170    /// Tracer for observability
171    tracer: Arc<T>,
172}
173
174impl Default for QueryRuntime<NoopTracer> {
175    fn default() -> Self {
176        Self::new()
177    }
178}
179
180impl<T: Tracer> Clone for QueryRuntime<T> {
181    fn clone(&self) -> Self {
182        Self {
183            whale: self.whale.clone(),
184            locators: self.locators.clone(),
185            pending: self.pending.clone(),
186            query_registry: self.query_registry.clone(),
187            asset_key_registry: self.asset_key_registry.clone(),
188            verifiers: self.verifiers.clone(),
189            error_comparator: self.error_comparator,
190            tracer: self.tracer.clone(),
191        }
192    }
193}
194
195/// Default error comparator that treats all errors as different.
196///
197/// This is conservative - it always triggers recomputation when an error occurs.
198fn default_error_comparator(_a: &anyhow::Error, _b: &anyhow::Error) -> bool {
199    false
200}
201
202impl<T: Tracer> QueryRuntime<T> {
203    /// Get cached output along with its revision (single atomic access).
204    fn get_cached_with_revision<Q: Query>(
205        &self,
206        key: &FullCacheKey,
207    ) -> Option<(CachedValue<Arc<Q::Output>>, RevisionCounter)> {
208        let node = self.whale.get(key)?;
209        let revision = node.changed_at;
210        let entry = node.data.as_ref()?;
211        let cached = entry.to_cached_value::<Q::Output>()?;
212        Some((cached, revision))
213    }
214
215    /// Get a reference to the tracer.
216    #[inline]
217    pub fn tracer(&self) -> &T {
218        &self.tracer
219    }
220}
221
222impl QueryRuntime<NoopTracer> {
223    /// Create a new query runtime with default settings.
224    pub fn new() -> Self {
225        Self::with_tracer(NoopTracer)
226    }
227
228    /// Create a builder for customizing the runtime.
229    ///
230    /// # Example
231    ///
232    /// ```ignore
233    /// let runtime = QueryRuntime::builder()
234    ///     .error_comparator(|a, b| {
235    ///         // Custom error comparison logic
236    ///         match (a.downcast_ref::<MyError>(), b.downcast_ref::<MyError>()) {
237    ///             (Some(a), Some(b)) => a == b,
238    ///             _ => false,
239    ///         }
240    ///     })
241    ///     .build();
242    /// ```
243    pub fn builder() -> QueryRuntimeBuilder<NoopTracer> {
244        QueryRuntimeBuilder::new()
245    }
246}
247
248impl<T: Tracer> QueryRuntime<T> {
249    /// Create a new query runtime with the specified tracer.
250    pub fn with_tracer(tracer: T) -> Self {
251        QueryRuntimeBuilder::new().tracer(tracer).build()
252    }
253
254    /// Execute a query synchronously.
255    ///
256    /// Returns the cached result if valid, otherwise executes the query.
257    ///
258    /// # Errors
259    ///
260    /// - `QueryError::Suspend` - Query is waiting for async loading
261    /// - `QueryError::Cycle` - Dependency cycle detected
262    pub fn query<Q: Query>(&self, query: Q) -> Result<Arc<Q::Output>, QueryError> {
263        self.query_internal(query)
264            .and_then(|(inner_result, _)| inner_result.map_err(QueryError::UserError))
265    }
266
267    /// Internal implementation shared by query() and poll().
268    ///
269    /// Returns (result, revision) tuple where result is either Ok(output) or Err(user_error).
270    /// System errors (Suspend, Cycle, etc.) are returned as the outer Err.
271    #[allow(clippy::type_complexity)]
272    fn query_internal<Q: Query>(
273        &self,
274        query: Q,
275    ) -> Result<(Result<Arc<Q::Output>, Arc<anyhow::Error>>, RevisionCounter), QueryError> {
276        let key = query.cache_key();
277        let full_key = FullCacheKey::new::<Q, _>(&key);
278
279        // Emit query key event for GC tracking (before other events)
280        self.tracer.on_query_key(&full_key);
281
282        // Create execution context and emit start event
283        let span_id = self.tracer.new_span_id();
284        let exec_ctx = ExecutionContext::new(span_id);
285        let query_key = TracerQueryKey::new(std::any::type_name::<Q>(), full_key.debug_repr());
286
287        self.tracer.on_query_start(span_id, query_key.clone());
288
289        // Check for cycles using thread-local stack
290        let cycle_detected = QUERY_STACK.with(|stack| {
291            let stack = stack.borrow();
292            stack.iter().any(|k| k == &full_key)
293        });
294
295        if cycle_detected {
296            let path = QUERY_STACK.with(|stack| {
297                let stack = stack.borrow();
298                let mut path: Vec<String> =
299                    stack.iter().map(|k| k.debug_repr().to_string()).collect();
300                path.push(full_key.debug_repr().to_string());
301                path
302            });
303
304            self.tracer.on_cycle_detected(
305                path.iter()
306                    .map(|s| TracerQueryKey::new("", s.clone()))
307                    .collect(),
308            );
309            self.tracer
310                .on_query_end(span_id, query_key.clone(), ExecutionResult::CycleDetected);
311
312            return Err(QueryError::Cycle { path });
313        }
314
315        // Check if cached and valid (with verify-then-decide pattern)
316        let current_rev = self.whale.current_revision();
317
318        // Fast path: already verified at current revision
319        if self.whale.is_verified_at(&full_key, &current_rev) {
320            // Single atomic access to get both cached value and revision
321            if let Some((cached, revision)) = self.get_cached_with_revision::<Q>(&full_key) {
322                self.tracer.on_cache_check(span_id, query_key.clone(), true);
323                self.tracer
324                    .on_query_end(span_id, query_key.clone(), ExecutionResult::CacheHit);
325
326                return match cached {
327                    CachedValue::Ok(output) => Ok((Ok(output), revision)),
328                    CachedValue::UserError(err) => Ok((Err(err), revision)),
329                };
330            }
331        }
332
333        // Check shallow validity (deps' changed_at ok) and try verify-then-decide
334        if self.whale.is_valid(&full_key) {
335            // Single atomic access to get both cached value and revision
336            if let Some((cached, revision)) = self.get_cached_with_revision::<Q>(&full_key) {
337                // Shallow valid but not verified - verify deps first
338                let mut deps_verified = true;
339                if let Some(deps) = self.whale.get_dependency_ids(&full_key) {
340                    for dep in deps {
341                        if let Some(verifier) = self.verifiers.get(&dep) {
342                            // Re-query dep to verify it (triggers recursive verification)
343                            if verifier.verify(self as &dyn std::any::Any).is_err() {
344                                deps_verified = false;
345                                break;
346                            }
347                        }
348                    }
349                }
350
351                // Re-check validity after deps are verified
352                if deps_verified && self.whale.is_valid(&full_key) {
353                    // Deps didn't change their changed_at, mark as verified and use cached
354                    self.whale.mark_verified(&full_key, &current_rev);
355
356                    self.tracer.on_cache_check(span_id, query_key.clone(), true);
357                    self.tracer
358                        .on_query_end(span_id, query_key.clone(), ExecutionResult::CacheHit);
359
360                    return match cached {
361                        CachedValue::Ok(output) => Ok((Ok(output), revision)),
362                        CachedValue::UserError(err) => Ok((Err(err), revision)),
363                    };
364                }
365                // A dep's changed_at increased, fall through to execute
366            }
367        }
368
369        self.tracer
370            .on_cache_check(span_id, query_key.clone(), false);
371
372        // Execute the query with cycle tracking
373        let _guard = StackGuard::push(full_key.clone());
374        let result = self.execute_query::<Q>(&query, &full_key, exec_ctx);
375        drop(_guard);
376
377        // Emit end event
378        let exec_result = match &result {
379            Ok((_, true, _)) => ExecutionResult::Changed,
380            Ok((_, false, _)) => ExecutionResult::Unchanged,
381            Err(QueryError::Suspend { .. }) => ExecutionResult::Suspended,
382            Err(QueryError::Cycle { .. }) => ExecutionResult::CycleDetected,
383            Err(e) => ExecutionResult::Error {
384                message: format!("{:?}", e),
385            },
386        };
387        self.tracer
388            .on_query_end(span_id, query_key.clone(), exec_result);
389
390        result.map(|(inner_result, _, revision)| (inner_result, revision))
391    }
392
393    /// Execute a query, caching the result if appropriate.
394    ///
395    /// Returns (result, output_changed, revision) tuple.
396    /// - `result`: Ok(output) for success, Err(user_error) for user errors
397    /// - System errors (Suspend, Cycle, etc.) are returned as outer Err
398    #[allow(clippy::type_complexity)]
399    fn execute_query<Q: Query>(
400        &self,
401        query: &Q,
402        full_key: &FullCacheKey,
403        exec_ctx: ExecutionContext,
404    ) -> Result<
405        (
406            Result<Arc<Q::Output>, Arc<anyhow::Error>>,
407            bool,
408            RevisionCounter,
409        ),
410        QueryError,
411    > {
412        // Create context for this query execution
413        let ctx = QueryContext {
414            runtime: self,
415            current_key: full_key.clone(),
416            parent_query_type: std::any::type_name::<Q>(),
417            exec_ctx,
418            deps: RefCell::new(Vec::new()),
419            first_access_revision: Cell::new(None),
420        };
421
422        // Execute the query (clone because query() takes ownership)
423        let result = query.clone().query(&ctx);
424
425        // Get collected dependencies
426        let deps: Vec<FullCacheKey> = ctx.deps.borrow().clone();
427
428        // Query durability defaults to stable - Whale will automatically reduce
429        // the effective durability to min(requested, min(dep_durabilities)).
430        // A pure query with no dependencies remains stable.
431        // A query depending on volatile assets becomes volatile.
432        let durability = Durability::stable();
433
434        match result {
435            Ok(output) => {
436                // Check if output changed (for early cutoff)
437                // existing_revision is Some only when output is unchanged (can reuse revision)
438                let existing_revision = if let Some((CachedValue::Ok(old), rev)) =
439                    self.get_cached_with_revision::<Q>(full_key)
440                {
441                    if Q::output_eq(&*old, &*output) {
442                        Some(rev) // Same output - reuse revision
443                    } else {
444                        None // Different output
445                    }
446                } else {
447                    None // No previous Ok value
448                };
449                let output_changed = existing_revision.is_none();
450
451                // Emit early cutoff check event
452                self.tracer.on_early_cutoff_check(
453                    exec_ctx.span_id(),
454                    TracerQueryKey::new(std::any::type_name::<Q>(), full_key.debug_repr()),
455                    output_changed,
456                );
457
458                // Update whale with cached entry (atomic update of value + dependency state)
459                let entry = CachedEntry::Ok(output.clone() as Arc<dyn std::any::Any + Send + Sync>);
460                let revision = if let Some(existing_rev) = existing_revision {
461                    // confirm_unchanged doesn't change changed_at, use existing
462                    let _ = self.whale.confirm_unchanged(full_key, deps);
463                    existing_rev
464                } else {
465                    // Use new_rev from register result
466                    match self
467                        .whale
468                        .register(full_key.clone(), Some(entry), durability, deps)
469                    {
470                        Ok(result) => result.new_rev,
471                        Err(missing) => {
472                            return Err(QueryError::DependenciesRemoved {
473                                missing_keys: missing,
474                            })
475                        }
476                    }
477                };
478
479                // Register query in registry for list_queries
480                let is_new_query = self.query_registry.register(query);
481                if is_new_query {
482                    let sentinel = FullCacheKey::query_set_sentinel::<Q>();
483                    let _ = self
484                        .whale
485                        .register(sentinel, None, Durability::stable(), vec![]);
486                }
487
488                // Store verifier for this query (for verify-then-decide pattern)
489                self.verifiers
490                    .insert::<Q, T>(full_key.clone(), query.clone());
491
492                Ok((Ok(output), output_changed, revision))
493            }
494            Err(QueryError::UserError(err)) => {
495                // Check if error changed (for early cutoff)
496                // existing_revision is Some only when error is unchanged (can reuse revision)
497                let existing_revision = if let Some((CachedValue::UserError(old_err), rev)) =
498                    self.get_cached_with_revision::<Q>(full_key)
499                {
500                    if (self.error_comparator)(old_err.as_ref(), err.as_ref()) {
501                        Some(rev) // Same error - reuse revision
502                    } else {
503                        None // Different error
504                    }
505                } else {
506                    None // No previous UserError
507                };
508                let output_changed = existing_revision.is_none();
509
510                // Emit early cutoff check event
511                self.tracer.on_early_cutoff_check(
512                    exec_ctx.span_id(),
513                    TracerQueryKey::new(std::any::type_name::<Q>(), full_key.debug_repr()),
514                    output_changed,
515                );
516
517                // Update whale with cached error (atomic update of value + dependency state)
518                let entry = CachedEntry::UserError(err.clone());
519                let revision = if let Some(existing_rev) = existing_revision {
520                    // confirm_unchanged doesn't change changed_at, use existing
521                    let _ = self.whale.confirm_unchanged(full_key, deps);
522                    existing_rev
523                } else {
524                    // Use new_rev from register result
525                    match self
526                        .whale
527                        .register(full_key.clone(), Some(entry), durability, deps)
528                    {
529                        Ok(result) => result.new_rev,
530                        Err(missing) => {
531                            return Err(QueryError::DependenciesRemoved {
532                                missing_keys: missing,
533                            })
534                        }
535                    }
536                };
537
538                // Register query in registry for list_queries
539                let is_new_query = self.query_registry.register(query);
540                if is_new_query {
541                    let sentinel = FullCacheKey::query_set_sentinel::<Q>();
542                    let _ = self
543                        .whale
544                        .register(sentinel, None, Durability::stable(), vec![]);
545                }
546
547                // Store verifier for this query (for verify-then-decide pattern)
548                self.verifiers
549                    .insert::<Q, T>(full_key.clone(), query.clone());
550
551                Ok((Err(err), output_changed, revision))
552            }
553            Err(e) => {
554                // System errors (Suspend, Cycle, Cancelled, MissingDependency) are not cached
555                Err(e)
556            }
557        }
558    }
559
560    /// Invalidate a query, forcing recomputation on next access.
561    ///
562    /// This also invalidates any queries that depend on this one.
563    pub fn invalidate<Q: Query>(&self, key: &Q::CacheKey) {
564        let full_key = FullCacheKey::new::<Q, _>(key);
565
566        self.tracer.on_query_invalidated(
567            TracerQueryKey::new(std::any::type_name::<Q>(), full_key.debug_repr()),
568            InvalidationReason::ManualInvalidation,
569        );
570
571        // Update whale to invalidate dependents (register with None to clear cached value)
572        // Use stable durability to increment all revision counters, ensuring queries
573        // at any durability level will see this as a change.
574        let _ = self
575            .whale
576            .register(full_key, None, Durability::stable(), vec![]);
577    }
578
579    /// Remove a query from the cache entirely, freeing memory.
580    ///
581    /// Use this for GC when a query is no longer needed.
582    /// Unlike `invalidate`, this removes all traces of the query from storage.
583    /// The query will be recomputed from scratch on next access.
584    ///
585    /// This also invalidates any queries that depend on this one.
586    pub fn remove_query<Q: Query>(&self, key: &Q::CacheKey) {
587        let full_key = FullCacheKey::new::<Q, _>(key);
588
589        self.tracer.on_query_invalidated(
590            TracerQueryKey::new(std::any::type_name::<Q>(), full_key.debug_repr()),
591            InvalidationReason::ManualInvalidation,
592        );
593
594        // Remove verifier if exists
595        self.verifiers.remove(&full_key);
596
597        // Remove from whale storage (this also handles dependent invalidation)
598        self.whale.remove(&full_key);
599
600        // Remove from registry and update sentinel for list_queries
601        if self.query_registry.remove::<Q>(key) {
602            let sentinel = FullCacheKey::query_set_sentinel::<Q>();
603            let _ = self
604                .whale
605                .register(sentinel, None, Durability::stable(), vec![]);
606        }
607    }
608
609    /// Clear all cached values by removing all nodes from whale.
610    ///
611    /// Note: This is a relatively expensive operation as it iterates through all keys.
612    pub fn clear_cache(&self) {
613        let keys = self.whale.keys();
614        for key in keys {
615            self.whale.remove(&key);
616        }
617    }
618
619    /// Poll a query, returning both the result and its change revision.
620    ///
621    /// This is useful for implementing subscription patterns where you need to
622    /// detect changes efficiently. Compare the returned `revision` with a
623    /// previously stored value to determine if the query result has changed.
624    ///
625    /// The returned `Polled` contains a `Result<Arc<Q::Output>, Arc<anyhow::Error>>`
626    /// as its value, allowing you to track revision changes for both success and
627    /// user error cases.
628    ///
629    /// # Example
630    ///
631    /// ```ignore
632    /// struct Subscription<Q: Query> {
633    ///     query: Q,
634    ///     last_revision: RevisionCounter,
635    ///     tx: Sender<Result<Arc<Q::Output>, Arc<anyhow::Error>>>,
636    /// }
637    ///
638    /// // Polling loop
639    /// for sub in &mut subscriptions {
640    ///     let result = runtime.poll(sub.query.clone())?;
641    ///     if result.revision > sub.last_revision {
642    ///         sub.tx.send(result.value.clone())?;
643    ///         sub.last_revision = result.revision;
644    ///     }
645    /// }
646    /// ```
647    ///
648    /// # Errors
649    ///
650    /// Returns `Err` only for system errors (Suspend, Cycle, etc.).
651    /// User errors are returned as `Ok(Polled { value: Err(error), ... })`.
652    #[allow(clippy::type_complexity)]
653    pub fn poll<Q: Query>(
654        &self,
655        query: Q,
656    ) -> Result<Polled<Result<Arc<Q::Output>, Arc<anyhow::Error>>>, QueryError> {
657        let (value, revision) = self.query_internal(query)?;
658        Ok(Polled { value, revision })
659    }
660
661    /// Get the change revision of a query without executing it.
662    ///
663    /// Returns `None` if the query has never been executed.
664    ///
665    /// This is useful for checking if a query has changed since the last poll
666    /// without the cost of executing the query.
667    ///
668    /// # Example
669    ///
670    /// ```ignore
671    /// // Check if query has changed before deciding to poll
672    /// if let Some(rev) = runtime.changed_at::<MyQuery>(&key) {
673    ///     if rev > last_known_revision {
674    ///         let result = runtime.query(MyQuery::new(key))?;
675    ///         // Process result...
676    ///     }
677    /// }
678    /// ```
679    pub fn changed_at<Q: Query>(&self, key: &Q::CacheKey) -> Option<RevisionCounter> {
680        let full_key = FullCacheKey::new::<Q, _>(key);
681        self.whale.get(&full_key).map(|node| node.changed_at)
682    }
683}
684
685// ============================================================================
686// GC (Garbage Collection) API
687// ============================================================================
688
689impl<T: Tracer> QueryRuntime<T> {
690    /// Get all query keys currently in the cache.
691    ///
692    /// This is useful for implementing custom garbage collection strategies.
693    /// Use this in combination with [`Tracer::on_query_key`] to track access
694    /// times and implement LRU, TTL, or other GC algorithms externally.
695    ///
696    /// # Example
697    ///
698    /// ```ignore
699    /// // Collect all keys that haven't been accessed recently
700    /// let stale_keys: Vec<_> = runtime.query_keys()
701    ///     .filter(|key| tracker.is_stale(key))
702    ///     .collect();
703    ///
704    /// // Remove stale queries that have no dependents
705    /// for key in stale_keys {
706    ///     runtime.remove_if_unused(&key);
707    /// }
708    /// ```
709    pub fn query_keys(&self) -> Vec<FullCacheKey> {
710        self.whale.keys()
711    }
712
713    /// Remove a query if it has no dependents.
714    ///
715    /// Returns `true` if the query was removed, `false` if it has dependents
716    /// or doesn't exist. This is the safe way to remove queries during GC,
717    /// as it won't break queries that depend on this one.
718    ///
719    /// # Example
720    ///
721    /// ```ignore
722    /// let key = FullCacheKey::new::<MyQuery, _>(&cache_key);
723    /// if runtime.remove_query_if_unused::<MyQuery>(&cache_key) {
724    ///     println!("Query removed");
725    /// } else {
726    ///     println!("Query has dependents, not removed");
727    /// }
728    /// ```
729    pub fn remove_query_if_unused<Q: Query>(&self, key: &Q::CacheKey) -> bool {
730        let full_key = FullCacheKey::new::<Q, _>(key);
731        self.remove_if_unused(&full_key)
732    }
733
734    /// Remove a query by its [`FullCacheKey`].
735    ///
736    /// This is the type-erased version of [`remove_query`](Self::remove_query).
737    /// Use this when you have a `FullCacheKey` from [`query_keys`](Self::query_keys)
738    /// or [`Tracer::on_query_key`].
739    ///
740    /// Returns `true` if the query was removed, `false` if it doesn't exist.
741    ///
742    /// # Warning
743    ///
744    /// This forcibly removes the query even if other queries depend on it.
745    /// Dependent queries will be recomputed on next access. For safe GC,
746    /// use [`remove_if_unused`](Self::remove_if_unused) instead.
747    pub fn remove(&self, key: &FullCacheKey) -> bool {
748        // Remove verifier if exists
749        self.verifiers.remove(key);
750
751        // Remove from whale storage
752        self.whale.remove(key).is_some()
753    }
754
755    /// Remove a query by its [`FullCacheKey`] if it has no dependents.
756    ///
757    /// This is the type-erased version of [`remove_query_if_unused`](Self::remove_query_if_unused).
758    /// Use this when you have a `FullCacheKey` from [`query_keys`](Self::query_keys)
759    /// or [`Tracer::on_query_key`].
760    ///
761    /// Returns `true` if the query was removed, `false` if it has dependents
762    /// or doesn't exist.
763    ///
764    /// # Example
765    ///
766    /// ```ignore
767    /// // Implement LRU GC
768    /// for key in runtime.query_keys() {
769    ///     if tracker.is_expired(&key) {
770    ///         runtime.remove_if_unused(&key);
771    ///     }
772    /// }
773    /// ```
774    pub fn remove_if_unused(&self, key: &FullCacheKey) -> bool {
775        if self.whale.remove_if_unused(key.clone()).is_some() {
776            // Successfully removed - clean up verifier
777            self.verifiers.remove(key);
778            true
779        } else {
780            false
781        }
782    }
783}
784
785// ============================================================================
786// Builder
787// ============================================================================
788
789/// Builder for [`QueryRuntime`] with customizable settings.
790///
791/// # Example
792///
793/// ```ignore
794/// let runtime = QueryRuntime::builder()
795///     .error_comparator(|a, b| {
796///         // Treat all errors of the same type as equal
797///         a.downcast_ref::<std::io::Error>().is_some()
798///             == b.downcast_ref::<std::io::Error>().is_some()
799///     })
800///     .build();
801/// ```
802pub struct QueryRuntimeBuilder<T: Tracer = NoopTracer> {
803    error_comparator: ErrorComparator,
804    tracer: T,
805}
806
807impl Default for QueryRuntimeBuilder<NoopTracer> {
808    fn default() -> Self {
809        Self::new()
810    }
811}
812
813impl QueryRuntimeBuilder<NoopTracer> {
814    /// Create a new builder with default settings.
815    pub fn new() -> Self {
816        Self {
817            error_comparator: default_error_comparator,
818            tracer: NoopTracer,
819        }
820    }
821}
822
823impl<T: Tracer> QueryRuntimeBuilder<T> {
824    /// Set the error comparator function for early cutoff optimization.
825    ///
826    /// When a query returns `QueryError::UserError`, this function is used
827    /// to compare it with the previously cached error. If they are equal,
828    /// downstream queries can skip recomputation (early cutoff).
829    ///
830    /// The default comparator returns `false` for all errors, meaning errors
831    /// are always considered different (conservative, always recomputes).
832    ///
833    /// # Example
834    ///
835    /// ```ignore
836    /// // Treat errors as equal if they have the same display message
837    /// let runtime = QueryRuntime::builder()
838    ///     .error_comparator(|a, b| a.to_string() == b.to_string())
839    ///     .build();
840    /// ```
841    pub fn error_comparator(mut self, f: ErrorComparator) -> Self {
842        self.error_comparator = f;
843        self
844    }
845
846    /// Set the tracer for observability.
847    pub fn tracer<U: Tracer>(self, tracer: U) -> QueryRuntimeBuilder<U> {
848        QueryRuntimeBuilder {
849            error_comparator: self.error_comparator,
850            tracer,
851        }
852    }
853
854    /// Build the runtime with the configured settings.
855    pub fn build(self) -> QueryRuntime<T> {
856        QueryRuntime {
857            whale: WhaleRuntime::new(),
858            locators: Arc::new(LocatorStorage::new()),
859            pending: Arc::new(PendingStorage::new()),
860            query_registry: Arc::new(QueryRegistry::new()),
861            asset_key_registry: Arc::new(AssetKeyRegistry::new()),
862            verifiers: Arc::new(VerifierStorage::new()),
863            error_comparator: self.error_comparator,
864            tracer: Arc::new(self.tracer),
865        }
866    }
867}
868
869// ============================================================================
870// Asset API
871// ============================================================================
872
873impl<T: Tracer> QueryRuntime<T> {
874    /// Register an asset locator for a specific asset key type.
875    ///
876    /// Only one locator can be registered per key type. Later registrations
877    /// replace earlier ones.
878    ///
879    /// # Example
880    ///
881    /// ```ignore
882    /// let runtime = QueryRuntime::new();
883    /// runtime.register_asset_locator(FileSystemLocator::new("/assets"));
884    /// ```
885    pub fn register_asset_locator<K, L>(&self, locator: L)
886    where
887        K: AssetKey,
888        L: AssetLocator<K>,
889    {
890        self.locators.insert::<K, L>(locator);
891    }
892
893    /// Get an iterator over pending asset requests.
894    ///
895    /// Returns assets that have been requested but not yet resolved.
896    /// The user should fetch these externally and call `resolve_asset()`.
897    ///
898    /// # Example
899    ///
900    /// ```ignore
901    /// for pending in runtime.pending_assets() {
902    ///     if let Some(path) = pending.key::<FilePath>() {
903    ///         let content = fetch_file(path);
904    ///         runtime.resolve_asset(path.clone(), content);
905    ///     }
906    /// }
907    /// ```
908    pub fn pending_assets(&self) -> Vec<PendingAsset> {
909        self.pending.get_all()
910    }
911
912    /// Get pending assets filtered by key type.
913    pub fn pending_assets_of<K: AssetKey>(&self) -> Vec<K> {
914        self.pending.get_of_type::<K>()
915    }
916
917    /// Check if there are any pending assets.
918    pub fn has_pending_assets(&self) -> bool {
919        !self.pending.is_empty()
920    }
921
922    /// Resolve an asset with its loaded value.
923    ///
924    /// This marks the asset as ready and invalidates any queries that
925    /// depend on it (if the value changed), triggering recomputation on next access.
926    ///
927    /// This method is idempotent - resolving with the same value (via `asset_eq`)
928    /// will not trigger downstream recomputation.
929    ///
930    /// # Arguments
931    ///
932    /// * `key` - The asset key identifying this resource
933    /// * `value` - The loaded asset value
934    /// * `durability` - How frequently this asset is expected to change
935    ///
936    /// # Example
937    ///
938    /// ```ignore
939    /// let content = std::fs::read_to_string(&path)?;
940    /// runtime.resolve_asset(FilePath(path), content, DurabilityLevel::Volatile);
941    /// ```
942    pub fn resolve_asset<K: AssetKey>(&self, key: K, value: K::Asset, durability: DurabilityLevel) {
943        self.resolve_asset_internal(key, value, durability);
944    }
945
946    /// Resolve an asset with an error.
947    ///
948    /// This marks the asset as errored and caches the error. Queries depending
949    /// on this asset will receive `Err(QueryError::UserError(...))`.
950    ///
951    /// Use this when async loading fails (e.g., network error, file not found,
952    /// access denied).
953    ///
954    /// # Arguments
955    ///
956    /// * `key` - The asset key identifying this resource
957    /// * `error` - The error to cache (will be wrapped in `Arc`)
958    /// * `durability` - How frequently this error state is expected to change
959    ///
960    /// # Example
961    ///
962    /// ```ignore
963    /// match fetch_file(&path) {
964    ///     Ok(content) => runtime.resolve_asset(FilePath(path), content, DurabilityLevel::Volatile),
965    ///     Err(e) => runtime.resolve_asset_error(FilePath(path), e, DurabilityLevel::Volatile),
966    /// }
967    /// ```
968    pub fn resolve_asset_error<K: AssetKey>(
969        &self,
970        key: K,
971        error: impl Into<anyhow::Error>,
972        durability: DurabilityLevel,
973    ) {
974        let full_asset_key = FullAssetKey::new(&key);
975        let full_cache_key = FullCacheKey::from_asset_key(&full_asset_key);
976
977        // Remove from pending BEFORE registering the error
978        self.pending.remove(&full_asset_key);
979
980        // Prepare the error entry
981        let error_arc = Arc::new(error.into());
982        let entry = CachedEntry::AssetError(error_arc.clone());
983        let durability =
984            Durability::new(durability.as_u8() as usize).unwrap_or(Durability::volatile());
985
986        // Atomic compare-and-update (errors are always considered changed for now)
987        let result = self
988            .whale
989            .update_with_compare(
990                full_cache_key,
991                Some(entry),
992                |old_data, _new_data| {
993                    // Compare old and new errors using error_comparator
994                    match old_data.and_then(|d| d.as_ref()) {
995                        Some(CachedEntry::AssetError(old_err)) => {
996                            !(self.error_comparator)(old_err.as_ref(), error_arc.as_ref())
997                        }
998                        _ => true, // Loading, Ready, or not present -> changed
999                    }
1000                },
1001                durability,
1002                vec![],
1003            )
1004            .expect("update_with_compare with no dependencies cannot fail");
1005
1006        // Emit asset resolved event (with changed status)
1007        self.tracer.on_asset_resolved(
1008            TracerAssetKey::new(std::any::type_name::<K>(), format!("{:?}", key)),
1009            result.changed,
1010        );
1011
1012        // Register asset key in registry for list_asset_keys
1013        let is_new_asset = self.asset_key_registry.register(&key);
1014        if is_new_asset {
1015            // Update sentinel to invalidate list_asset_keys dependents
1016            let sentinel = FullCacheKey::asset_key_set_sentinel::<K>();
1017            let _ = self
1018                .whale
1019                .register(sentinel, None, Durability::stable(), vec![]);
1020        }
1021    }
1022
1023    fn resolve_asset_internal<K: AssetKey>(
1024        &self,
1025        key: K,
1026        value: K::Asset,
1027        durability_level: DurabilityLevel,
1028    ) {
1029        let full_asset_key = FullAssetKey::new(&key);
1030        let full_cache_key = FullCacheKey::from_asset_key(&full_asset_key);
1031
1032        // Remove from pending BEFORE registering the value
1033        self.pending.remove(&full_asset_key);
1034
1035        // Prepare the new entry
1036        let value_arc: Arc<K::Asset> = Arc::new(value);
1037        let entry = CachedEntry::AssetReady(value_arc.clone() as Arc<dyn Any + Send + Sync>);
1038        let durability =
1039            Durability::new(durability_level.as_u8() as usize).unwrap_or(Durability::volatile());
1040
1041        // Atomic compare-and-update
1042        let result = self
1043            .whale
1044            .update_with_compare(
1045                full_cache_key,
1046                Some(entry),
1047                |old_data, _new_data| {
1048                    // Compare old and new values
1049                    match old_data.and_then(|d| d.as_ref()) {
1050                        Some(CachedEntry::AssetReady(old_arc)) => {
1051                            match old_arc.clone().downcast::<K::Asset>() {
1052                                Ok(old_value) => !K::asset_eq(&old_value, &value_arc),
1053                                Err(_) => true, // Type mismatch, treat as changed
1054                            }
1055                        }
1056                        _ => true, // Loading, NotFound, or not present -> changed
1057                    }
1058                },
1059                durability,
1060                vec![],
1061            )
1062            .expect("update_with_compare with no dependencies cannot fail");
1063
1064        // Emit asset resolved event
1065        self.tracer.on_asset_resolved(
1066            TracerAssetKey::new(std::any::type_name::<K>(), format!("{:?}", key)),
1067            result.changed,
1068        );
1069
1070        // Register asset key in registry for list_asset_keys
1071        let is_new_asset = self.asset_key_registry.register(&key);
1072        if is_new_asset {
1073            // Update sentinel to invalidate list_asset_keys dependents
1074            let sentinel = FullCacheKey::asset_key_set_sentinel::<K>();
1075            let _ = self
1076                .whale
1077                .register(sentinel, None, Durability::stable(), vec![]);
1078        }
1079    }
1080
1081    /// Invalidate an asset, forcing queries to re-request it.
1082    ///
1083    /// The asset will be marked as loading and added to pending assets.
1084    /// Dependent queries will suspend until the asset is resolved again.
1085    ///
1086    /// # Example
1087    ///
1088    /// ```ignore
1089    /// // File was modified externally
1090    /// runtime.invalidate_asset(&FilePath("config.json".into()));
1091    /// // Queries depending on this asset will now suspend
1092    /// // User should fetch the new value and call resolve_asset
1093    /// ```
1094    pub fn invalidate_asset<K: AssetKey>(&self, key: &K) {
1095        let full_asset_key = FullAssetKey::new(key);
1096        let full_cache_key = FullCacheKey::from_asset_key(&full_asset_key);
1097
1098        // Emit asset invalidated event
1099        self.tracer.on_asset_invalidated(TracerAssetKey::new(
1100            std::any::type_name::<K>(),
1101            format!("{:?}", key),
1102        ));
1103
1104        // Add to pending FIRST (before clearing whale state)
1105        // This ensures: readers see either old value, or Loading+pending
1106        self.pending.insert::<K>(full_asset_key, key.clone());
1107
1108        // Atomic: clear cached value + invalidate dependents
1109        // Using None for data means "needs to be loaded"
1110        // Use stable durability to ensure queries at any durability level see the change.
1111        let _ = self
1112            .whale
1113            .register(full_cache_key, None, Durability::stable(), vec![]);
1114    }
1115
1116    /// Remove an asset from the cache entirely.
1117    ///
1118    /// Unlike `invalidate_asset`, this removes all traces of the asset.
1119    /// Dependent queries will go through the locator again on next access.
1120    pub fn remove_asset<K: AssetKey>(&self, key: &K) {
1121        let full_asset_key = FullAssetKey::new(key);
1122        let full_cache_key = FullCacheKey::from_asset_key(&full_asset_key);
1123
1124        // Remove from pending first
1125        self.pending.remove(&full_asset_key);
1126
1127        // Remove from whale (this also cleans up dependency edges)
1128        // whale.remove() invalidates dependents before removing
1129        self.whale.remove(&full_cache_key);
1130
1131        // Remove from registry and update sentinel for list_asset_keys
1132        if self.asset_key_registry.remove::<K>(key) {
1133            let sentinel = FullCacheKey::asset_key_set_sentinel::<K>();
1134            let _ = self
1135                .whale
1136                .register(sentinel, None, Durability::stable(), vec![]);
1137        }
1138    }
1139
1140    /// Get an asset by key without tracking dependencies.
1141    ///
1142    /// Unlike `QueryContext::asset()`, this method does NOT register the caller
1143    /// as a dependent of the asset. Use this for direct asset access outside
1144    /// of query execution.
1145    ///
1146    /// # Returns
1147    ///
1148    /// - `Ok(AssetLoadingState::ready(...))` - Asset is loaded and ready
1149    /// - `Ok(AssetLoadingState::loading(...))` - Asset is still loading (added to pending)
1150    /// - `Err(QueryError::MissingDependency)` - Asset was not found
1151    pub fn get_asset<K: AssetKey>(&self, key: K) -> Result<AssetLoadingState<K>, QueryError> {
1152        self.get_asset_internal(key)
1153    }
1154
1155    /// Internal: Get asset state and its changed_at revision atomically.
1156    ///
1157    /// This is the "direct" version called from QueryRuntime::asset (no dependency tracking).
1158    /// For calls from QueryContext::asset, use `get_asset_with_revision_ctx`.
1159    ///
1160    /// Returns (AssetLoadingState, changed_at) where changed_at is from the same
1161    /// whale node that provided the asset value.
1162    fn get_asset_with_revision<K: AssetKey>(
1163        &self,
1164        key: K,
1165    ) -> Result<(AssetLoadingState<K>, RevisionCounter), QueryError> {
1166        let full_asset_key = FullAssetKey::new(&key);
1167        let full_cache_key = FullCacheKey::from_asset_key(&full_asset_key);
1168
1169        // Helper to emit AssetRequested event
1170        let emit_requested = |tracer: &T, key: &K, state: TracerAssetState| {
1171            tracer.on_asset_requested(
1172                TracerAssetKey::new(std::any::type_name::<K>(), format!("{:?}", key)),
1173                state,
1174            );
1175        };
1176
1177        // Check whale cache first (single atomic read)
1178        if let Some(node) = self.whale.get(&full_cache_key) {
1179            let changed_at = node.changed_at;
1180            // Check if valid at current revision
1181            if self.whale.is_valid(&full_cache_key) {
1182                match &node.data {
1183                    Some(CachedEntry::AssetReady(arc)) => {
1184                        emit_requested(&self.tracer, &key, TracerAssetState::Ready);
1185                        match arc.clone().downcast::<K::Asset>() {
1186                            Ok(value) => {
1187                                return Ok((AssetLoadingState::ready(key, value), changed_at))
1188                            }
1189                            Err(_) => {
1190                                return Err(QueryError::MissingDependency {
1191                                    description: format!("Asset type mismatch: {:?}", key),
1192                                })
1193                            }
1194                        }
1195                    }
1196                    Some(CachedEntry::AssetError(err)) => {
1197                        emit_requested(&self.tracer, &key, TracerAssetState::NotFound);
1198                        return Err(QueryError::UserError(err.clone()));
1199                    }
1200                    None => {
1201                        // Loading state
1202                        emit_requested(&self.tracer, &key, TracerAssetState::Loading);
1203                        return Ok((AssetLoadingState::loading(key), changed_at));
1204                    }
1205                    _ => {
1206                        // Query-related entries (Ok, UserError) shouldn't be here
1207                        // Fall through to locator
1208                    }
1209                }
1210            }
1211        }
1212
1213        // Not in cache or invalid - try locator
1214        check_cycle(&full_cache_key)?;
1215        let _guard = StackGuard::push(full_cache_key.clone());
1216
1217        let locator_result = self
1218            .locators
1219            .locate_with_runtime(TypeId::of::<K>(), self, &key);
1220
1221        if let Some(result) = locator_result {
1222            match result {
1223                Ok(ErasedLocateResult::Ready {
1224                    value: arc,
1225                    durability: durability_level,
1226                }) => {
1227                    emit_requested(&self.tracer, &key, TracerAssetState::Ready);
1228
1229                    let typed_value: Arc<K::Asset> = match arc.downcast::<K::Asset>() {
1230                        Ok(v) => v,
1231                        Err(_) => {
1232                            return Err(QueryError::MissingDependency {
1233                                description: format!("Asset type mismatch: {:?}", key),
1234                            });
1235                        }
1236                    };
1237
1238                    // Store in whale atomically with early cutoff
1239                    let entry = CachedEntry::AssetReady(typed_value.clone());
1240                    let durability = Durability::new(durability_level.as_u8() as usize)
1241                        .unwrap_or(Durability::volatile());
1242                    let new_value = typed_value.clone();
1243                    let result = self
1244                        .whale
1245                        .update_with_compare(
1246                            full_cache_key,
1247                            Some(entry),
1248                            |old_data, _new_data| {
1249                                let Some(CachedEntry::AssetReady(old_arc)) =
1250                                    old_data.and_then(|d| d.as_ref())
1251                                else {
1252                                    return true;
1253                                };
1254                                let Ok(old_value) = old_arc.clone().downcast::<K::Asset>() else {
1255                                    return true;
1256                                };
1257                                !K::asset_eq(&old_value, &new_value)
1258                            },
1259                            durability,
1260                            vec![],
1261                        )
1262                        .expect("update_with_compare with no dependencies cannot fail");
1263
1264                    return Ok((AssetLoadingState::ready(key, typed_value), result.revision));
1265                }
1266                Ok(ErasedLocateResult::Pending) => {
1267                    emit_requested(&self.tracer, &key, TracerAssetState::Loading);
1268
1269                    // Add to pending list for Pending result
1270                    self.pending.insert::<K>(full_asset_key, key.clone());
1271                    match self
1272                        .whale
1273                        .get_or_insert(full_cache_key, None, Durability::volatile(), vec![])
1274                        .expect("get_or_insert with no dependencies cannot fail")
1275                    {
1276                        GetOrInsertResult::Inserted(node) => {
1277                            return Ok((AssetLoadingState::loading(key), node.changed_at));
1278                        }
1279                        GetOrInsertResult::Existing(node) => {
1280                            let changed_at = node.changed_at;
1281                            match &node.data {
1282                                Some(CachedEntry::AssetReady(arc)) => {
1283                                    match arc.clone().downcast::<K::Asset>() {
1284                                        Ok(value) => {
1285                                            return Ok((
1286                                                AssetLoadingState::ready(key, value),
1287                                                changed_at,
1288                                            ))
1289                                        }
1290                                        Err(_) => {
1291                                            return Ok((
1292                                                AssetLoadingState::loading(key),
1293                                                changed_at,
1294                                            ))
1295                                        }
1296                                    }
1297                                }
1298                                Some(CachedEntry::AssetError(err)) => {
1299                                    return Err(QueryError::UserError(err.clone()));
1300                                }
1301                                _ => return Ok((AssetLoadingState::loading(key), changed_at)),
1302                            }
1303                        }
1304                    }
1305                }
1306                Err(QueryError::UserError(err)) => {
1307                    // Locator returned a user error - cache it as AssetError
1308                    let entry = CachedEntry::AssetError(err.clone());
1309                    let _ = self.whale.register(
1310                        full_cache_key,
1311                        Some(entry),
1312                        Durability::volatile(),
1313                        vec![],
1314                    );
1315                    return Err(QueryError::UserError(err));
1316                }
1317                Err(e) => {
1318                    // Other errors (Cycle, Suspended, etc.) - do NOT cache, propagate directly
1319                    return Err(e);
1320                }
1321            }
1322        }
1323
1324        // No locator registered or locator returned None - mark as pending
1325        emit_requested(&self.tracer, &key, TracerAssetState::Loading);
1326        self.pending
1327            .insert::<K>(full_asset_key.clone(), key.clone());
1328
1329        match self
1330            .whale
1331            .get_or_insert(full_cache_key, None, Durability::volatile(), vec![])
1332            .expect("get_or_insert with no dependencies cannot fail")
1333        {
1334            GetOrInsertResult::Inserted(node) => {
1335                Ok((AssetLoadingState::loading(key), node.changed_at))
1336            }
1337            GetOrInsertResult::Existing(node) => {
1338                let changed_at = node.changed_at;
1339                match &node.data {
1340                    Some(CachedEntry::AssetReady(arc)) => {
1341                        match arc.clone().downcast::<K::Asset>() {
1342                            Ok(value) => Ok((AssetLoadingState::ready(key, value), changed_at)),
1343                            Err(_) => Ok((AssetLoadingState::loading(key), changed_at)),
1344                        }
1345                    }
1346                    Some(CachedEntry::AssetError(err)) => Err(QueryError::UserError(err.clone())),
1347                    _ => Ok((AssetLoadingState::loading(key), changed_at)),
1348                }
1349            }
1350        }
1351    }
1352
1353    /// Internal: Get asset state and its changed_at revision atomically (with QueryContext).
1354    ///
1355    /// This version is called from QueryContext::asset and enables dependency tracking
1356    /// within the locator's db.query() and db.asset() calls.
1357    fn get_asset_with_revision_ctx<K: AssetKey>(
1358        &self,
1359        key: K,
1360        ctx: &QueryContext<'_, T>,
1361    ) -> Result<(AssetLoadingState<K>, RevisionCounter), QueryError> {
1362        let full_asset_key = FullAssetKey::new(&key);
1363        let full_cache_key = FullCacheKey::from_asset_key(&full_asset_key);
1364
1365        // Helper to emit AssetRequested event
1366        let emit_requested = |tracer: &T, key: &K, state: TracerAssetState| {
1367            tracer.on_asset_requested(
1368                TracerAssetKey::new(std::any::type_name::<K>(), format!("{:?}", key)),
1369                state,
1370            );
1371        };
1372
1373        // Check whale cache first (single atomic read)
1374        if let Some(node) = self.whale.get(&full_cache_key) {
1375            let changed_at = node.changed_at;
1376            // Check if valid at current revision
1377            if self.whale.is_valid(&full_cache_key) {
1378                match &node.data {
1379                    Some(CachedEntry::AssetReady(arc)) => {
1380                        emit_requested(&self.tracer, &key, TracerAssetState::Ready);
1381                        match arc.clone().downcast::<K::Asset>() {
1382                            Ok(value) => {
1383                                return Ok((AssetLoadingState::ready(key, value), changed_at))
1384                            }
1385                            Err(_) => {
1386                                return Err(QueryError::MissingDependency {
1387                                    description: format!("Asset type mismatch: {:?}", key),
1388                                })
1389                            }
1390                        }
1391                    }
1392                    Some(CachedEntry::AssetError(err)) => {
1393                        emit_requested(&self.tracer, &key, TracerAssetState::NotFound);
1394                        return Err(QueryError::UserError(err.clone()));
1395                    }
1396                    None => {
1397                        // Loading state
1398                        emit_requested(&self.tracer, &key, TracerAssetState::Loading);
1399                        return Ok((AssetLoadingState::loading(key), changed_at));
1400                    }
1401                    _ => {
1402                        // Query-related entries (Ok, UserError) shouldn't be here
1403                        // Fall through to locator
1404                    }
1405                }
1406            }
1407        }
1408
1409        // Not in cache or invalid - try locator (with context for dependency tracking)
1410        check_cycle(&full_cache_key)?;
1411        let _guard = StackGuard::push(full_cache_key.clone());
1412
1413        let locator_result = self.locators.locate_with_ctx(TypeId::of::<K>(), ctx, &key);
1414
1415        if let Some(result) = locator_result {
1416            match result {
1417                Ok(ErasedLocateResult::Ready {
1418                    value: arc,
1419                    durability: durability_level,
1420                }) => {
1421                    emit_requested(&self.tracer, &key, TracerAssetState::Ready);
1422
1423                    let typed_value: Arc<K::Asset> = match arc.downcast::<K::Asset>() {
1424                        Ok(v) => v,
1425                        Err(_) => {
1426                            return Err(QueryError::MissingDependency {
1427                                description: format!("Asset type mismatch: {:?}", key),
1428                            });
1429                        }
1430                    };
1431
1432                    // Store in whale atomically with early cutoff
1433                    let entry = CachedEntry::AssetReady(typed_value.clone());
1434                    let durability = Durability::new(durability_level.as_u8() as usize)
1435                        .unwrap_or(Durability::volatile());
1436                    let new_value = typed_value.clone();
1437                    let result = self
1438                        .whale
1439                        .update_with_compare(
1440                            full_cache_key,
1441                            Some(entry),
1442                            |old_data, _new_data| {
1443                                let Some(CachedEntry::AssetReady(old_arc)) =
1444                                    old_data.and_then(|d| d.as_ref())
1445                                else {
1446                                    return true;
1447                                };
1448                                let Ok(old_value) = old_arc.clone().downcast::<K::Asset>() else {
1449                                    return true;
1450                                };
1451                                !K::asset_eq(&old_value, &new_value)
1452                            },
1453                            durability,
1454                            vec![],
1455                        )
1456                        .expect("update_with_compare with no dependencies cannot fail");
1457
1458                    return Ok((AssetLoadingState::ready(key, typed_value), result.revision));
1459                }
1460                Ok(ErasedLocateResult::Pending) => {
1461                    emit_requested(&self.tracer, &key, TracerAssetState::Loading);
1462
1463                    // Add to pending list for Pending result
1464                    self.pending.insert::<K>(full_asset_key, key.clone());
1465                    match self
1466                        .whale
1467                        .get_or_insert(full_cache_key, None, Durability::volatile(), vec![])
1468                        .expect("get_or_insert with no dependencies cannot fail")
1469                    {
1470                        GetOrInsertResult::Inserted(node) => {
1471                            return Ok((AssetLoadingState::loading(key), node.changed_at));
1472                        }
1473                        GetOrInsertResult::Existing(node) => {
1474                            let changed_at = node.changed_at;
1475                            match &node.data {
1476                                Some(CachedEntry::AssetReady(arc)) => {
1477                                    match arc.clone().downcast::<K::Asset>() {
1478                                        Ok(value) => {
1479                                            return Ok((
1480                                                AssetLoadingState::ready(key, value),
1481                                                changed_at,
1482                                            ))
1483                                        }
1484                                        Err(_) => {
1485                                            return Ok((
1486                                                AssetLoadingState::loading(key),
1487                                                changed_at,
1488                                            ))
1489                                        }
1490                                    }
1491                                }
1492                                Some(CachedEntry::AssetError(err)) => {
1493                                    return Err(QueryError::UserError(err.clone()));
1494                                }
1495                                _ => return Ok((AssetLoadingState::loading(key), changed_at)),
1496                            }
1497                        }
1498                    }
1499                }
1500                Err(QueryError::UserError(err)) => {
1501                    // Locator returned a user error - cache it as AssetError
1502                    let entry = CachedEntry::AssetError(err.clone());
1503                    let _ = self.whale.register(
1504                        full_cache_key,
1505                        Some(entry),
1506                        Durability::volatile(),
1507                        vec![],
1508                    );
1509                    return Err(QueryError::UserError(err));
1510                }
1511                Err(e) => {
1512                    // Other errors (Cycle, Suspended, etc.) - do NOT cache, propagate directly
1513                    return Err(e);
1514                }
1515            }
1516        }
1517
1518        // No locator registered or locator returned None - mark as pending
1519        emit_requested(&self.tracer, &key, TracerAssetState::Loading);
1520        self.pending
1521            .insert::<K>(full_asset_key.clone(), key.clone());
1522
1523        match self
1524            .whale
1525            .get_or_insert(full_cache_key, None, Durability::volatile(), vec![])
1526            .expect("get_or_insert with no dependencies cannot fail")
1527        {
1528            GetOrInsertResult::Inserted(node) => {
1529                Ok((AssetLoadingState::loading(key), node.changed_at))
1530            }
1531            GetOrInsertResult::Existing(node) => {
1532                let changed_at = node.changed_at;
1533                match &node.data {
1534                    Some(CachedEntry::AssetReady(arc)) => {
1535                        match arc.clone().downcast::<K::Asset>() {
1536                            Ok(value) => Ok((AssetLoadingState::ready(key, value), changed_at)),
1537                            Err(_) => Ok((AssetLoadingState::loading(key), changed_at)),
1538                        }
1539                    }
1540                    Some(CachedEntry::AssetError(err)) => Err(QueryError::UserError(err.clone())),
1541                    _ => Ok((AssetLoadingState::loading(key), changed_at)),
1542                }
1543            }
1544        }
1545    }
1546
1547    /// Internal: Get asset state, checking cache and locator.
1548    fn get_asset_internal<K: AssetKey>(&self, key: K) -> Result<AssetLoadingState<K>, QueryError> {
1549        self.get_asset_with_revision(key).map(|(state, _)| state)
1550    }
1551}
1552
1553impl<T: Tracer> Db for QueryRuntime<T> {
1554    fn query<Q: Query>(&self, query: Q) -> Result<Arc<Q::Output>, QueryError> {
1555        QueryRuntime::query(self, query)
1556    }
1557
1558    fn asset<K: AssetKey>(&self, key: K) -> Result<AssetLoadingState<K>, QueryError> {
1559        self.get_asset_internal(key)
1560    }
1561
1562    fn list_queries<Q: Query>(&self) -> Vec<Q> {
1563        self.query_registry.get_all::<Q>()
1564    }
1565
1566    fn list_asset_keys<K: AssetKey>(&self) -> Vec<K> {
1567        self.asset_key_registry.get_all::<K>()
1568    }
1569}
1570
1571/// Context provided to queries during execution.
1572///
1573/// Use this to access dependencies via `query()`.
1574pub struct QueryContext<'a, T: Tracer = NoopTracer> {
1575    runtime: &'a QueryRuntime<T>,
1576    current_key: FullCacheKey,
1577    parent_query_type: &'static str,
1578    exec_ctx: ExecutionContext,
1579    deps: RefCell<Vec<FullCacheKey>>,
1580    /// Revision at first asset access, used for consistency checking.
1581    /// Assets with changed_at > this value were modified during query execution.
1582    first_access_revision: Cell<Option<RevisionCounter>>,
1583}
1584
1585impl<'a, T: Tracer> QueryContext<'a, T> {
1586    /// Ensure consistency of asset access.
1587    ///
1588    /// On first asset access: records max(current_global, dep_changed_at) as baseline.
1589    /// On subsequent asset accesses: checks dep_changed_at <= baseline.
1590    ///
1591    /// IMPORTANT: current_global must be obtained BEFORE accessing the asset.
1592    #[inline]
1593    fn ensure_consistent(
1594        &self,
1595        current_global: RevisionCounter,
1596        dep_changed_at: RevisionCounter,
1597    ) -> Result<(), QueryError> {
1598        match self.first_access_revision.get() {
1599            None => {
1600                // First asset access: record max(current_global, dep_changed_at)
1601                // Using max ensures we don't get false positives when only one asset is accessed
1602                let first = current_global.max(dep_changed_at);
1603                self.first_access_revision.set(Some(first));
1604                Ok(())
1605            }
1606            Some(first) => {
1607                // Subsequent asset accesses: check consistency
1608                if dep_changed_at > first {
1609                    Err(QueryError::InconsistentAssetResolution)
1610                } else {
1611                    Ok(())
1612                }
1613            }
1614        }
1615    }
1616
1617    /// Query a dependency.
1618    ///
1619    /// The dependency is automatically tracked for invalidation.
1620    ///
1621    /// # Example
1622    ///
1623    /// ```ignore
1624    /// fn query(self, db: &impl Db) -> Result<Self::Output, QueryError> {
1625    ///     let dep_result = db.query(OtherQuery { id: self.id })?;
1626    ///     Ok(process(&dep_result))
1627    /// }
1628    /// ```
1629    pub fn query<Q: Query>(&self, query: Q) -> Result<Arc<Q::Output>, QueryError> {
1630        let key = query.cache_key();
1631        let full_key = FullCacheKey::new::<Q, _>(&key);
1632
1633        // Emit dependency registered event
1634        self.runtime.tracer.on_dependency_registered(
1635            self.exec_ctx.span_id(),
1636            TracerQueryKey::new(self.parent_query_type, self.current_key.debug_repr()),
1637            TracerQueryKey::new(std::any::type_name::<Q>(), full_key.debug_repr()),
1638        );
1639
1640        // Record this as a dependency
1641        self.deps.borrow_mut().push(full_key.clone());
1642
1643        // Execute the query
1644        self.runtime.query(query)
1645    }
1646
1647    /// Access an asset, tracking it as a dependency.
1648    ///
1649    /// Returns `AssetLoadingState<K>`:
1650    /// - `is_loading()` if the asset is still being loaded
1651    /// - `is_ready()` if the asset is available
1652    ///
1653    /// Use `.suspend()?` to convert to `Result<Arc<K::Asset>, QueryError>`,
1654    /// which returns `Err(QueryError::Suspend { asset })` if still loading.
1655    ///
1656    /// # Example
1657    ///
1658    /// ```ignore
1659    /// #[query]
1660    /// fn process_file(db: &impl Db, path: FilePath) -> Result<Output, QueryError> {
1661    ///     let content = db.asset(path)?.suspend()?;
1662    ///     // Process content...
1663    ///     Ok(output)
1664    /// }
1665    /// ```
1666    ///
1667    /// # Errors
1668    ///
1669    /// Returns `Err(QueryError::MissingDependency)` if the asset was not found.
1670    pub fn asset<K: AssetKey>(&self, key: K) -> Result<AssetLoadingState<K>, QueryError> {
1671        let full_asset_key = FullAssetKey::new(&key);
1672        let full_cache_key = FullCacheKey::from_asset_key(&full_asset_key);
1673
1674        // 1. Get current_global FIRST (before accessing the asset)
1675        let current_global = self
1676            .runtime
1677            .whale
1678            .current_revision()
1679            .get(Durability::volatile());
1680
1681        // 2. Emit asset dependency registered event
1682        self.runtime.tracer.on_asset_dependency_registered(
1683            self.exec_ctx.span_id(),
1684            TracerQueryKey::new(self.parent_query_type, self.current_key.debug_repr()),
1685            TracerAssetKey::new(std::any::type_name::<K>(), format!("{:?}", key)),
1686        );
1687
1688        // 3. Record dependency on this asset
1689        self.deps.borrow_mut().push(full_cache_key);
1690
1691        // 4. Get asset AND changed_at from the same whale access (atomic)
1692        // Use the context-aware version to enable dependency tracking in locators
1693        let (result, dep_changed_at) = match self.runtime.get_asset_with_revision_ctx(key, self) {
1694            Ok((state, rev)) => (Ok(state), rev),
1695            Err(e) => return Err(e),
1696        };
1697
1698        // 5. Check consistency - detects if resolve_asset was called during query execution
1699        self.ensure_consistent(current_global, dep_changed_at)?;
1700
1701        // Emit missing dependency event on error
1702        if let Err(QueryError::MissingDependency { ref description }) = result {
1703            self.runtime.tracer.on_missing_dependency(
1704                TracerQueryKey::new(self.parent_query_type, self.current_key.debug_repr()),
1705                description.clone(),
1706            );
1707        }
1708
1709        result
1710    }
1711
1712    /// List all query instances of type Q that have been registered.
1713    ///
1714    /// This method establishes a dependency on the "set" of queries of type Q.
1715    /// The calling query will be invalidated when:
1716    /// - A new query of type Q is first executed (added to set)
1717    ///
1718    /// The calling query will NOT be invalidated when:
1719    /// - An individual query of type Q has its value change
1720    ///
1721    /// # Example
1722    ///
1723    /// ```ignore
1724    /// #[query]
1725    /// fn all_results(db: &impl Db) -> Result<Vec<i32>, QueryError> {
1726    ///     let queries = db.list_queries::<MyQuery>();
1727    ///     let mut results = Vec::new();
1728    ///     for q in queries {
1729    ///         results.push(*db.query(q)?);
1730    ///     }
1731    ///     Ok(results)
1732    /// }
1733    /// ```
1734    pub fn list_queries<Q: Query>(&self) -> Vec<Q> {
1735        // Record dependency on the sentinel (set-level dependency)
1736        let sentinel = FullCacheKey::query_set_sentinel::<Q>();
1737
1738        self.runtime.tracer.on_dependency_registered(
1739            self.exec_ctx.span_id(),
1740            TracerQueryKey::new(self.parent_query_type, self.current_key.debug_repr()),
1741            TracerQueryKey::new("QuerySet", sentinel.debug_repr()),
1742        );
1743
1744        // Ensure sentinel exists in whale (for dependency tracking)
1745        if self.runtime.whale.get(&sentinel).is_none() {
1746            let _ =
1747                self.runtime
1748                    .whale
1749                    .register(sentinel.clone(), None, Durability::volatile(), vec![]);
1750        }
1751
1752        self.deps.borrow_mut().push(sentinel);
1753
1754        // Return all registered queries
1755        self.runtime.query_registry.get_all::<Q>()
1756    }
1757
1758    /// List all asset keys of type K that have been registered.
1759    ///
1760    /// This method establishes a dependency on the "set" of asset keys of type K.
1761    /// The calling query will be invalidated when:
1762    /// - A new asset of type K is resolved for the first time (added to set)
1763    /// - An asset of type K is removed via remove_asset
1764    ///
1765    /// The calling query will NOT be invalidated when:
1766    /// - An individual asset's value changes (use `db.asset()` for that)
1767    ///
1768    /// # Example
1769    ///
1770    /// ```ignore
1771    /// #[query]
1772    /// fn all_configs(db: &impl Db) -> Result<Vec<String>, QueryError> {
1773    ///     let keys = db.list_asset_keys::<ConfigFile>();
1774    ///     let mut contents = Vec::new();
1775    ///     for key in keys {
1776    ///         let content = db.asset(&key)?.suspend()?;
1777    ///         contents.push((*content).clone());
1778    ///     }
1779    ///     Ok(contents)
1780    /// }
1781    /// ```
1782    pub fn list_asset_keys<K: AssetKey>(&self) -> Vec<K> {
1783        // Record dependency on the sentinel (set-level dependency)
1784        let sentinel = FullCacheKey::asset_key_set_sentinel::<K>();
1785
1786        self.runtime.tracer.on_asset_dependency_registered(
1787            self.exec_ctx.span_id(),
1788            TracerQueryKey::new(self.parent_query_type, self.current_key.debug_repr()),
1789            TracerAssetKey::new("AssetKeySet", sentinel.debug_repr()),
1790        );
1791
1792        // Ensure sentinel exists in whale (for dependency tracking)
1793        if self.runtime.whale.get(&sentinel).is_none() {
1794            let _ =
1795                self.runtime
1796                    .whale
1797                    .register(sentinel.clone(), None, Durability::volatile(), vec![]);
1798        }
1799
1800        self.deps.borrow_mut().push(sentinel);
1801
1802        // Return all registered asset keys
1803        self.runtime.asset_key_registry.get_all::<K>()
1804    }
1805}
1806
1807impl<'a, T: Tracer> Db for QueryContext<'a, T> {
1808    fn query<Q: Query>(&self, query: Q) -> Result<Arc<Q::Output>, QueryError> {
1809        QueryContext::query(self, query)
1810    }
1811
1812    fn asset<K: AssetKey>(&self, key: K) -> Result<AssetLoadingState<K>, QueryError> {
1813        QueryContext::asset(self, key)
1814    }
1815
1816    fn list_queries<Q: Query>(&self) -> Vec<Q> {
1817        QueryContext::list_queries(self)
1818    }
1819
1820    fn list_asset_keys<K: AssetKey>(&self) -> Vec<K> {
1821        QueryContext::list_asset_keys(self)
1822    }
1823}
1824
1825#[cfg(test)]
1826mod tests {
1827    use super::*;
1828
1829    #[test]
1830    fn test_simple_query() {
1831        #[derive(Clone)]
1832        struct Add {
1833            a: i32,
1834            b: i32,
1835        }
1836
1837        impl Query for Add {
1838            type CacheKey = (i32, i32);
1839            type Output = i32;
1840
1841            fn cache_key(&self) -> Self::CacheKey {
1842                (self.a, self.b)
1843            }
1844
1845            fn query(self, _db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
1846                Ok(Arc::new(self.a + self.b))
1847            }
1848
1849            fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
1850                old == new
1851            }
1852        }
1853
1854        let runtime = QueryRuntime::new();
1855
1856        let result = runtime.query(Add { a: 1, b: 2 }).unwrap();
1857        assert_eq!(*result, 3);
1858
1859        // Second query should be cached
1860        let result2 = runtime.query(Add { a: 1, b: 2 }).unwrap();
1861        assert_eq!(*result2, 3);
1862    }
1863
1864    #[test]
1865    fn test_dependent_queries() {
1866        #[derive(Clone)]
1867        struct Base {
1868            value: i32,
1869        }
1870
1871        impl Query for Base {
1872            type CacheKey = i32;
1873            type Output = i32;
1874
1875            fn cache_key(&self) -> Self::CacheKey {
1876                self.value
1877            }
1878
1879            fn query(self, _db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
1880                Ok(Arc::new(self.value * 2))
1881            }
1882
1883            fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
1884                old == new
1885            }
1886        }
1887
1888        #[derive(Clone)]
1889        struct Derived {
1890            base_value: i32,
1891        }
1892
1893        impl Query for Derived {
1894            type CacheKey = i32;
1895            type Output = i32;
1896
1897            fn cache_key(&self) -> Self::CacheKey {
1898                self.base_value
1899            }
1900
1901            fn query(self, db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
1902                let base = db.query(Base {
1903                    value: self.base_value,
1904                })?;
1905                Ok(Arc::new(*base + 10))
1906            }
1907
1908            fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
1909                old == new
1910            }
1911        }
1912
1913        let runtime = QueryRuntime::new();
1914
1915        let result = runtime.query(Derived { base_value: 5 }).unwrap();
1916        assert_eq!(*result, 20); // 5 * 2 + 10
1917    }
1918
1919    #[test]
1920    fn test_cycle_detection() {
1921        #[derive(Clone)]
1922        struct CycleA {
1923            id: i32,
1924        }
1925
1926        #[derive(Clone)]
1927        struct CycleB {
1928            id: i32,
1929        }
1930
1931        impl Query for CycleA {
1932            type CacheKey = i32;
1933            type Output = i32;
1934
1935            fn cache_key(&self) -> Self::CacheKey {
1936                self.id
1937            }
1938
1939            fn query(self, db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
1940                let b = db.query(CycleB { id: self.id })?;
1941                Ok(Arc::new(*b + 1))
1942            }
1943
1944            fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
1945                old == new
1946            }
1947        }
1948
1949        impl Query for CycleB {
1950            type CacheKey = i32;
1951            type Output = i32;
1952
1953            fn cache_key(&self) -> Self::CacheKey {
1954                self.id
1955            }
1956
1957            fn query(self, db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
1958                let a = db.query(CycleA { id: self.id })?;
1959                Ok(Arc::new(*a + 1))
1960            }
1961
1962            fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
1963                old == new
1964            }
1965        }
1966
1967        let runtime = QueryRuntime::new();
1968
1969        let result = runtime.query(CycleA { id: 1 });
1970        assert!(matches!(result, Err(QueryError::Cycle { .. })));
1971    }
1972
1973    #[test]
1974    fn test_fallible_query() {
1975        #[derive(Clone)]
1976        struct ParseInt {
1977            input: String,
1978        }
1979
1980        impl Query for ParseInt {
1981            type CacheKey = String;
1982            type Output = Result<i32, std::num::ParseIntError>;
1983
1984            fn cache_key(&self) -> Self::CacheKey {
1985                self.input.clone()
1986            }
1987
1988            fn query(self, _db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
1989                Ok(Arc::new(self.input.parse()))
1990            }
1991
1992            fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
1993                old == new
1994            }
1995        }
1996
1997        let runtime = QueryRuntime::new();
1998
1999        // Valid parse
2000        let result = runtime
2001            .query(ParseInt {
2002                input: "42".to_string(),
2003            })
2004            .unwrap();
2005        assert_eq!(*result, Ok(42));
2006
2007        // Invalid parse - system succeeds, user error in output
2008        let result = runtime
2009            .query(ParseInt {
2010                input: "not_a_number".to_string(),
2011            })
2012            .unwrap();
2013        assert!(result.is_err());
2014    }
2015
2016    // Macro tests
2017    mod macro_tests {
2018        use super::*;
2019        use crate::query;
2020
2021        #[query]
2022        fn add(db: &impl Db, a: i32, b: i32) -> Result<i32, QueryError> {
2023            let _ = db; // silence unused warning
2024            Ok(a + b)
2025        }
2026
2027        #[test]
2028        fn test_macro_basic() {
2029            let runtime = QueryRuntime::new();
2030            let result = runtime.query(Add::new(1, 2)).unwrap();
2031            assert_eq!(*result, 3);
2032        }
2033
2034        #[query]
2035        fn simple_double(db: &impl Db, x: i32) -> Result<i32, QueryError> {
2036            let _ = db;
2037            Ok(x * 2)
2038        }
2039
2040        #[test]
2041        fn test_macro_simple() {
2042            let runtime = QueryRuntime::new();
2043            let result = runtime.query(SimpleDouble::new(5)).unwrap();
2044            assert_eq!(*result, 10);
2045        }
2046
2047        #[query(keys(id))]
2048        fn with_key_selection(
2049            db: &impl Db,
2050            id: u32,
2051            include_extra: bool,
2052        ) -> Result<String, QueryError> {
2053            let _ = db;
2054            Ok(format!("id={}, extra={}", id, include_extra))
2055        }
2056
2057        #[test]
2058        fn test_macro_key_selection() {
2059            let runtime = QueryRuntime::new();
2060
2061            // Same id, different include_extra - should return cached
2062            let r1 = runtime.query(WithKeySelection::new(1, true)).unwrap();
2063            let r2 = runtime.query(WithKeySelection::new(1, false)).unwrap();
2064
2065            // Both should have same value because only `id` is the key
2066            assert_eq!(*r1, "id=1, extra=true");
2067            assert_eq!(*r2, "id=1, extra=true"); // Cached!
2068        }
2069
2070        #[query]
2071        fn dependent(db: &impl Db, a: i32, b: i32) -> Result<i32, QueryError> {
2072            let sum = db.query(Add::new(a, b))?;
2073            Ok(*sum * 2)
2074        }
2075
2076        #[test]
2077        fn test_macro_dependencies() {
2078            let runtime = QueryRuntime::new();
2079            let result = runtime.query(Dependent::new(3, 4)).unwrap();
2080            assert_eq!(*result, 14); // (3 + 4) * 2
2081        }
2082
2083        #[query(output_eq)]
2084        fn with_output_eq(db: &impl Db, x: i32) -> Result<i32, QueryError> {
2085            let _ = db;
2086            Ok(x * 2)
2087        }
2088
2089        #[test]
2090        fn test_macro_output_eq() {
2091            let runtime = QueryRuntime::new();
2092            let result = runtime.query(WithOutputEq::new(5)).unwrap();
2093            assert_eq!(*result, 10);
2094        }
2095
2096        #[query(name = "CustomName")]
2097        fn original_name(db: &impl Db, x: i32) -> Result<i32, QueryError> {
2098            let _ = db;
2099            Ok(x)
2100        }
2101
2102        #[test]
2103        fn test_macro_custom_name() {
2104            let runtime = QueryRuntime::new();
2105            let result = runtime.query(CustomName::new(42)).unwrap();
2106            assert_eq!(*result, 42);
2107        }
2108
2109        // Test that attribute macros like #[tracing::instrument] are preserved
2110        // We use #[allow(unused_variables)] and #[inline] as test attributes since
2111        // they don't require external dependencies.
2112        #[allow(unused_variables)]
2113        #[inline]
2114        #[query]
2115        fn with_attributes(db: &impl Db, x: i32) -> Result<i32, QueryError> {
2116            // This would warn without #[allow(unused_variables)] on the generated method
2117            let unused_var = 42;
2118            Ok(x * 2)
2119        }
2120
2121        #[test]
2122        fn test_macro_preserves_attributes() {
2123            let runtime = QueryRuntime::new();
2124            // If attributes weren't preserved, this might warn about unused_var
2125            let result = runtime.query(WithAttributes::new(5)).unwrap();
2126            assert_eq!(*result, 10);
2127        }
2128    }
2129
2130    // Tests for poll() and changed_at()
2131    mod poll_tests {
2132        use super::*;
2133
2134        #[derive(Clone)]
2135        struct Counter {
2136            id: i32,
2137        }
2138
2139        impl Query for Counter {
2140            type CacheKey = i32;
2141            type Output = i32;
2142
2143            fn cache_key(&self) -> Self::CacheKey {
2144                self.id
2145            }
2146
2147            fn query(self, _db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
2148                Ok(Arc::new(self.id * 10))
2149            }
2150
2151            fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
2152                old == new
2153            }
2154        }
2155
2156        #[test]
2157        fn test_poll_returns_value_and_revision() {
2158            let runtime = QueryRuntime::new();
2159
2160            let result = runtime.poll(Counter { id: 1 }).unwrap();
2161
2162            // Value should be correct - access through Result and Arc
2163            assert_eq!(**result.value.as_ref().unwrap(), 10);
2164
2165            // Revision should be non-zero after first execution
2166            assert!(result.revision > 0);
2167        }
2168
2169        #[test]
2170        fn test_poll_revision_stable_on_cache_hit() {
2171            let runtime = QueryRuntime::new();
2172
2173            // First poll
2174            let result1 = runtime.poll(Counter { id: 1 }).unwrap();
2175            let rev1 = result1.revision;
2176
2177            // Second poll (cache hit)
2178            let result2 = runtime.poll(Counter { id: 1 }).unwrap();
2179            let rev2 = result2.revision;
2180
2181            // Revision should be the same (no change)
2182            assert_eq!(rev1, rev2);
2183        }
2184
2185        #[test]
2186        fn test_poll_revision_changes_on_invalidate() {
2187            let runtime = QueryRuntime::new();
2188
2189            // First poll
2190            let result1 = runtime.poll(Counter { id: 1 }).unwrap();
2191            let rev1 = result1.revision;
2192
2193            // Invalidate and poll again
2194            runtime.invalidate::<Counter>(&1);
2195            let result2 = runtime.poll(Counter { id: 1 }).unwrap();
2196            let rev2 = result2.revision;
2197
2198            // Revision should increase (value was recomputed)
2199            // Note: Since output_eq returns true (same value), this might not change
2200            // depending on early cutoff behavior. Let's verify the value is still correct.
2201            assert_eq!(**result2.value.as_ref().unwrap(), 10);
2202
2203            // With early cutoff, revision might stay the same if value didn't change
2204            // This is expected behavior
2205            assert!(rev2 >= rev1);
2206        }
2207
2208        #[test]
2209        fn test_changed_at_returns_none_for_unexecuted_query() {
2210            let runtime = QueryRuntime::new();
2211
2212            // Query has never been executed
2213            let rev = runtime.changed_at::<Counter>(&1);
2214            assert!(rev.is_none());
2215        }
2216
2217        #[test]
2218        fn test_changed_at_returns_revision_after_execution() {
2219            let runtime = QueryRuntime::new();
2220
2221            // Execute the query
2222            let _ = runtime.query(Counter { id: 1 }).unwrap();
2223
2224            // Now changed_at should return Some
2225            let rev = runtime.changed_at::<Counter>(&1);
2226            assert!(rev.is_some());
2227            assert!(rev.unwrap() > 0);
2228        }
2229
2230        #[test]
2231        fn test_changed_at_matches_poll_revision() {
2232            let runtime = QueryRuntime::new();
2233
2234            // Poll the query
2235            let result = runtime.poll(Counter { id: 1 }).unwrap();
2236
2237            // changed_at should match the revision from poll
2238            let rev = runtime.changed_at::<Counter>(&1);
2239            assert_eq!(rev, Some(result.revision));
2240        }
2241
2242        #[test]
2243        fn test_poll_value_access() {
2244            let runtime = QueryRuntime::new();
2245
2246            let result = runtime.poll(Counter { id: 5 }).unwrap();
2247
2248            // Access through Result and Arc
2249            let value: &i32 = result.value.as_ref().unwrap();
2250            assert_eq!(*value, 50);
2251
2252            // Access Arc directly via field after unwrapping Result
2253            let arc: &Arc<i32> = result.value.as_ref().unwrap();
2254            assert_eq!(**arc, 50);
2255        }
2256
2257        #[test]
2258        fn test_subscription_pattern() {
2259            let runtime = QueryRuntime::new();
2260
2261            // Simulate subscription pattern
2262            let mut last_revision: RevisionCounter = 0;
2263            let mut notifications = 0;
2264
2265            // First poll - should notify (new value)
2266            let result = runtime.poll(Counter { id: 1 }).unwrap();
2267            if result.revision > last_revision {
2268                notifications += 1;
2269                last_revision = result.revision;
2270            }
2271
2272            // Second poll - should NOT notify (no change)
2273            let result = runtime.poll(Counter { id: 1 }).unwrap();
2274            if result.revision > last_revision {
2275                notifications += 1;
2276                last_revision = result.revision;
2277            }
2278
2279            // Third poll - should NOT notify (no change)
2280            let result = runtime.poll(Counter { id: 1 }).unwrap();
2281            if result.revision > last_revision {
2282                notifications += 1;
2283                #[allow(unused_assignments)]
2284                {
2285                    last_revision = result.revision;
2286                }
2287            }
2288
2289            // Only the first poll should have triggered a notification
2290            assert_eq!(notifications, 1);
2291        }
2292    }
2293
2294    // Tests for GC APIs
2295    mod gc_tests {
2296        use super::*;
2297        use std::collections::HashSet;
2298        use std::sync::atomic::{AtomicUsize, Ordering};
2299        use std::sync::Mutex;
2300
2301        #[derive(Clone)]
2302        struct Leaf {
2303            id: i32,
2304        }
2305
2306        impl Query for Leaf {
2307            type CacheKey = i32;
2308            type Output = i32;
2309
2310            fn cache_key(&self) -> Self::CacheKey {
2311                self.id
2312            }
2313
2314            fn query(self, _db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
2315                Ok(Arc::new(self.id * 10))
2316            }
2317
2318            fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
2319                old == new
2320            }
2321        }
2322
2323        #[derive(Clone)]
2324        struct Parent {
2325            child_id: i32,
2326        }
2327
2328        impl Query for Parent {
2329            type CacheKey = i32;
2330            type Output = i32;
2331
2332            fn cache_key(&self) -> Self::CacheKey {
2333                self.child_id
2334            }
2335
2336            fn query(self, db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
2337                let child = db.query(Leaf { id: self.child_id })?;
2338                Ok(Arc::new(*child + 1))
2339            }
2340
2341            fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
2342                old == new
2343            }
2344        }
2345
2346        #[test]
2347        fn test_query_keys_returns_all_cached_queries() {
2348            let runtime = QueryRuntime::new();
2349
2350            // Execute some queries
2351            let _ = runtime.query(Leaf { id: 1 }).unwrap();
2352            let _ = runtime.query(Leaf { id: 2 }).unwrap();
2353            let _ = runtime.query(Leaf { id: 3 }).unwrap();
2354
2355            // Get all keys
2356            let keys = runtime.query_keys();
2357
2358            // Should have at least 3 keys (might have more due to sentinels)
2359            assert!(keys.len() >= 3);
2360        }
2361
2362        #[test]
2363        fn test_remove_removes_query() {
2364            let runtime = QueryRuntime::new();
2365
2366            // Execute a query
2367            let _ = runtime.query(Leaf { id: 1 }).unwrap();
2368
2369            // Get the key
2370            let full_key = FullCacheKey::new::<Leaf, _>(&1);
2371
2372            // Query should exist
2373            assert!(runtime.changed_at::<Leaf>(&1).is_some());
2374
2375            // Remove it
2376            assert!(runtime.remove(&full_key));
2377
2378            // Query should no longer exist
2379            assert!(runtime.changed_at::<Leaf>(&1).is_none());
2380        }
2381
2382        #[test]
2383        fn test_remove_if_unused_removes_leaf_query() {
2384            let runtime = QueryRuntime::new();
2385
2386            // Execute a leaf query (no dependents)
2387            let _ = runtime.query(Leaf { id: 1 }).unwrap();
2388
2389            // Should be removable since no other query depends on it
2390            assert!(runtime.remove_query_if_unused::<Leaf>(&1));
2391
2392            // Query should no longer exist
2393            assert!(runtime.changed_at::<Leaf>(&1).is_none());
2394        }
2395
2396        #[test]
2397        fn test_remove_if_unused_does_not_remove_query_with_dependents() {
2398            let runtime = QueryRuntime::new();
2399
2400            // Execute parent query (which depends on Leaf)
2401            let _ = runtime.query(Parent { child_id: 1 }).unwrap();
2402
2403            // Leaf query should not be removable since Parent depends on it
2404            assert!(!runtime.remove_query_if_unused::<Leaf>(&1));
2405
2406            // Leaf query should still exist
2407            assert!(runtime.changed_at::<Leaf>(&1).is_some());
2408
2409            // But Parent should be removable (no dependents)
2410            assert!(runtime.remove_query_if_unused::<Parent>(&1));
2411        }
2412
2413        #[test]
2414        fn test_remove_if_unused_with_full_cache_key() {
2415            let runtime = QueryRuntime::new();
2416
2417            // Execute a leaf query
2418            let _ = runtime.query(Leaf { id: 1 }).unwrap();
2419
2420            let full_key = FullCacheKey::new::<Leaf, _>(&1);
2421
2422            // Should be removable via FullCacheKey
2423            assert!(runtime.remove_if_unused(&full_key));
2424
2425            // Query should no longer exist
2426            assert!(runtime.changed_at::<Leaf>(&1).is_none());
2427        }
2428
2429        // Test tracer receives on_query_key calls
2430        struct GcTracker {
2431            accessed_keys: Mutex<HashSet<String>>,
2432            access_count: AtomicUsize,
2433        }
2434
2435        impl GcTracker {
2436            fn new() -> Self {
2437                Self {
2438                    accessed_keys: Mutex::new(HashSet::new()),
2439                    access_count: AtomicUsize::new(0),
2440                }
2441            }
2442        }
2443
2444        impl Tracer for GcTracker {
2445            fn new_span_id(&self) -> SpanId {
2446                SpanId(1)
2447            }
2448
2449            fn on_query_key(&self, full_key: &FullCacheKey) {
2450                self.accessed_keys
2451                    .lock()
2452                    .unwrap()
2453                    .insert(full_key.debug_repr().to_string());
2454                self.access_count.fetch_add(1, Ordering::Relaxed);
2455            }
2456        }
2457
2458        #[test]
2459        fn test_tracer_receives_on_query_key() {
2460            let tracker = GcTracker::new();
2461            let runtime = QueryRuntime::with_tracer(tracker);
2462
2463            // Execute some queries
2464            let _ = runtime.query(Leaf { id: 1 }).unwrap();
2465            let _ = runtime.query(Leaf { id: 2 }).unwrap();
2466
2467            // Tracer should have received on_query_key calls
2468            let count = runtime.tracer().access_count.load(Ordering::Relaxed);
2469            assert_eq!(count, 2);
2470
2471            // Check that the keys were recorded
2472            let keys = runtime.tracer().accessed_keys.lock().unwrap();
2473            assert!(keys.iter().any(|k| k.contains("Leaf")));
2474        }
2475
2476        #[test]
2477        fn test_tracer_receives_on_query_key_for_cache_hits() {
2478            let tracker = GcTracker::new();
2479            let runtime = QueryRuntime::with_tracer(tracker);
2480
2481            // Execute query twice (second is cache hit)
2482            let _ = runtime.query(Leaf { id: 1 }).unwrap();
2483            let _ = runtime.query(Leaf { id: 1 }).unwrap();
2484
2485            // Tracer should have received on_query_key for both calls
2486            let count = runtime.tracer().access_count.load(Ordering::Relaxed);
2487            assert_eq!(count, 2);
2488        }
2489
2490        #[test]
2491        fn test_gc_workflow() {
2492            let tracker = GcTracker::new();
2493            let runtime = QueryRuntime::with_tracer(tracker);
2494
2495            // Execute some queries
2496            let _ = runtime.query(Leaf { id: 1 }).unwrap();
2497            let _ = runtime.query(Leaf { id: 2 }).unwrap();
2498            let _ = runtime.query(Leaf { id: 3 }).unwrap();
2499
2500            // Simulate GC: remove all queries that are not in use
2501            let mut removed = 0;
2502            for key in runtime.query_keys() {
2503                if runtime.remove_if_unused(&key) {
2504                    removed += 1;
2505                }
2506            }
2507
2508            // All leaf queries should be removable
2509            assert!(removed >= 3);
2510
2511            // Queries should no longer exist
2512            assert!(runtime.changed_at::<Leaf>(&1).is_none());
2513            assert!(runtime.changed_at::<Leaf>(&2).is_none());
2514            assert!(runtime.changed_at::<Leaf>(&3).is_none());
2515        }
2516    }
2517}