query_flow/
runtime.rs

1//! Query runtime and context.
2
3use std::any::TypeId;
4use std::cell::RefCell;
5use std::ops::Deref;
6use std::sync::Arc;
7
8use whale::{Durability, RevisionCounter, Runtime as WhaleRuntime};
9
10use crate::asset::{AssetKey, AssetLocator, DurabilityLevel, FullAssetKey, PendingAsset};
11use crate::db::Db;
12use crate::key::FullCacheKey;
13use crate::loading::AssetLoadingState;
14use crate::query::Query;
15use crate::storage::{
16    AssetKeyRegistry, AssetState, AssetStorage, CachedEntry, CachedValue, LocatorStorage,
17    PendingStorage, QueryRegistry, VerifierStorage,
18};
19use crate::tracer::{
20    ExecutionResult, InvalidationReason, NoopTracer, SpanId, Tracer, TracerAssetKey,
21    TracerAssetState, TracerQueryKey,
22};
23use crate::QueryError;
24
25/// Function type for comparing user errors during early cutoff.
26///
27/// Used by `QueryRuntimeBuilder::error_comparator` to customize how
28/// `QueryError::UserError` values are compared for caching purposes.
29pub type ErrorComparator = fn(&anyhow::Error, &anyhow::Error) -> bool;
30
31/// Number of durability levels (matches whale's default).
32const DURABILITY_LEVELS: usize = 4;
33
34// Thread-local query execution stack for cycle detection.
35thread_local! {
36    static QUERY_STACK: RefCell<Vec<FullCacheKey>> = const { RefCell::new(Vec::new()) };
37}
38
39/// Execution context passed through query execution.
40///
41/// Contains a span ID for tracing correlation.
42#[derive(Clone, Copy)]
43pub struct ExecutionContext {
44    span_id: SpanId,
45}
46
47impl ExecutionContext {
48    /// Create a new execution context with the given span ID.
49    #[inline]
50    pub fn new(span_id: SpanId) -> Self {
51        Self { span_id }
52    }
53
54    /// Get the span ID for this execution context.
55    #[inline]
56    pub fn span_id(&self) -> SpanId {
57        self.span_id
58    }
59}
60
61/// Result of polling a query, containing the value and its revision.
62///
63/// This is returned by [`QueryRuntime::poll`] and provides both the query result
64/// and its change revision, enabling efficient change detection for subscription
65/// patterns.
66///
67/// # Example
68///
69/// ```ignore
70/// let result = runtime.poll(MyQuery::new())?;
71///
72/// // Access the value via Deref
73/// println!("{:?}", *result);
74///
75/// // Check if changed since last poll
76/// if result.revision > last_known_revision {
77///     notify_subscribers(&result.value);
78///     last_known_revision = result.revision;
79/// }
80/// ```
81#[derive(Debug, Clone)]
82pub struct Polled<T> {
83    /// The query result value.
84    pub value: T,
85    /// The revision at which this value was last changed.
86    ///
87    /// Compare this with a previously stored revision to detect changes.
88    pub revision: RevisionCounter,
89}
90
91impl<T: Deref> Deref for Polled<T> {
92    type Target = T::Target;
93
94    fn deref(&self) -> &Self::Target {
95        &self.value
96    }
97}
98
99/// The query runtime manages query execution, caching, and dependency tracking.
100///
101/// This is cheap to clone - all data is behind `Arc`.
102///
103/// # Type Parameter
104///
105/// - `T: Tracer` - The tracer type for observability. Use `NoopTracer` (default)
106///   for zero-cost when tracing is not needed.
107///
108/// # Example
109///
110/// ```ignore
111/// // Without tracing (default)
112/// let runtime = QueryRuntime::new();
113///
114/// // With tracing
115/// let tracer = MyTracer::new();
116/// let runtime = QueryRuntime::with_tracer(tracer);
117///
118/// // Sync query execution
119/// let result = runtime.query(MyQuery { ... })?;
120/// ```
121pub struct QueryRuntime<T: Tracer = NoopTracer> {
122    /// Whale runtime for dependency tracking and cache storage.
123    /// Query outputs are stored in Node.data as Option<CachedEntry>.
124    whale: WhaleRuntime<FullCacheKey, Option<CachedEntry>, DURABILITY_LEVELS>,
125    /// Asset cache and state storage
126    assets: Arc<AssetStorage>,
127    /// Registered asset locators
128    locators: Arc<LocatorStorage>,
129    /// Pending asset requests
130    pending: Arc<PendingStorage>,
131    /// Registry for tracking query instances (for list_queries)
132    query_registry: Arc<QueryRegistry>,
133    /// Registry for tracking asset keys (for list_asset_keys)
134    asset_key_registry: Arc<AssetKeyRegistry>,
135    /// Verifiers for re-executing queries (for verify-then-decide pattern)
136    verifiers: Arc<VerifierStorage>,
137    /// Comparator for user errors during early cutoff
138    error_comparator: ErrorComparator,
139    /// Tracer for observability
140    tracer: Arc<T>,
141}
142
143impl Default for QueryRuntime<NoopTracer> {
144    fn default() -> Self {
145        Self::new()
146    }
147}
148
149impl<T: Tracer> Clone for QueryRuntime<T> {
150    fn clone(&self) -> Self {
151        Self {
152            whale: self.whale.clone(),
153            assets: self.assets.clone(),
154            locators: self.locators.clone(),
155            pending: self.pending.clone(),
156            query_registry: self.query_registry.clone(),
157            asset_key_registry: self.asset_key_registry.clone(),
158            verifiers: self.verifiers.clone(),
159            error_comparator: self.error_comparator,
160            tracer: self.tracer.clone(),
161        }
162    }
163}
164
165/// Default error comparator that treats all errors as different.
166///
167/// This is conservative - it always triggers recomputation when an error occurs.
168fn default_error_comparator(_a: &anyhow::Error, _b: &anyhow::Error) -> bool {
169    false
170}
171
172impl<T: Tracer> QueryRuntime<T> {
173    /// Get cached output along with its revision (single atomic access).
174    fn get_cached_with_revision<Q: Query>(
175        &self,
176        key: &FullCacheKey,
177    ) -> Option<(CachedValue<Arc<Q::Output>>, RevisionCounter)> {
178        let node = self.whale.get(key)?;
179        let revision = node.changed_at;
180        let entry = node.data.as_ref()?;
181        let cached = entry.to_cached_value::<Q::Output>()?;
182        Some((cached, revision))
183    }
184
185    /// Get a reference to the tracer.
186    #[inline]
187    pub fn tracer(&self) -> &T {
188        &self.tracer
189    }
190}
191
192impl QueryRuntime<NoopTracer> {
193    /// Create a new query runtime with default settings.
194    pub fn new() -> Self {
195        Self::with_tracer(NoopTracer)
196    }
197
198    /// Create a builder for customizing the runtime.
199    ///
200    /// # Example
201    ///
202    /// ```ignore
203    /// let runtime = QueryRuntime::builder()
204    ///     .error_comparator(|a, b| {
205    ///         // Custom error comparison logic
206    ///         match (a.downcast_ref::<MyError>(), b.downcast_ref::<MyError>()) {
207    ///             (Some(a), Some(b)) => a == b,
208    ///             _ => false,
209    ///         }
210    ///     })
211    ///     .build();
212    /// ```
213    pub fn builder() -> QueryRuntimeBuilder<NoopTracer> {
214        QueryRuntimeBuilder::new()
215    }
216}
217
218impl<T: Tracer> QueryRuntime<T> {
219    /// Create a new query runtime with the specified tracer.
220    pub fn with_tracer(tracer: T) -> Self {
221        QueryRuntimeBuilder::new().tracer(tracer).build()
222    }
223
224    /// Execute a query synchronously.
225    ///
226    /// Returns the cached result if valid, otherwise executes the query.
227    ///
228    /// # Errors
229    ///
230    /// - `QueryError::Suspend` - Query is waiting for async loading
231    /// - `QueryError::Cycle` - Dependency cycle detected
232    pub fn query<Q: Query>(&self, query: Q) -> Result<Arc<Q::Output>, QueryError> {
233        self.query_internal(query)
234            .and_then(|(inner_result, _)| inner_result.map_err(QueryError::UserError))
235    }
236
237    /// Internal implementation shared by query() and poll().
238    ///
239    /// Returns (result, revision) tuple where result is either Ok(output) or Err(user_error).
240    /// System errors (Suspend, Cycle, etc.) are returned as the outer Err.
241    #[allow(clippy::type_complexity)]
242    fn query_internal<Q: Query>(
243        &self,
244        query: Q,
245    ) -> Result<(Result<Arc<Q::Output>, Arc<anyhow::Error>>, RevisionCounter), QueryError> {
246        let key = query.cache_key();
247        let full_key = FullCacheKey::new::<Q, _>(&key);
248
249        // Emit query key event for GC tracking (before other events)
250        self.tracer.on_query_key(&full_key);
251
252        // Create execution context and emit start event
253        let span_id = self.tracer.new_span_id();
254        let exec_ctx = ExecutionContext::new(span_id);
255        let query_key = TracerQueryKey::new(std::any::type_name::<Q>(), full_key.debug_repr());
256
257        self.tracer.on_query_start(span_id, query_key.clone());
258
259        // Check for cycles using thread-local stack
260        let cycle_detected = QUERY_STACK.with(|stack| {
261            let stack = stack.borrow();
262            stack.iter().any(|k| k == &full_key)
263        });
264
265        if cycle_detected {
266            let path = QUERY_STACK.with(|stack| {
267                let stack = stack.borrow();
268                let mut path: Vec<String> =
269                    stack.iter().map(|k| k.debug_repr().to_string()).collect();
270                path.push(full_key.debug_repr().to_string());
271                path
272            });
273
274            self.tracer.on_cycle_detected(
275                path.iter()
276                    .map(|s| TracerQueryKey::new("", s.clone()))
277                    .collect(),
278            );
279            self.tracer
280                .on_query_end(span_id, query_key.clone(), ExecutionResult::CycleDetected);
281
282            return Err(QueryError::Cycle { path });
283        }
284
285        // Check if cached and valid (with verify-then-decide pattern)
286        let current_rev = self.whale.current_revision();
287
288        // Fast path: already verified at current revision
289        if self.whale.is_verified_at(&full_key, &current_rev) {
290            // Single atomic access to get both cached value and revision
291            if let Some((cached, revision)) = self.get_cached_with_revision::<Q>(&full_key) {
292                self.tracer.on_cache_check(span_id, query_key.clone(), true);
293                self.tracer
294                    .on_query_end(span_id, query_key.clone(), ExecutionResult::CacheHit);
295
296                return match cached {
297                    CachedValue::Ok(output) => Ok((Ok(output), revision)),
298                    CachedValue::UserError(err) => Ok((Err(err), revision)),
299                };
300            }
301        }
302
303        // Check shallow validity (deps' changed_at ok) and try verify-then-decide
304        if self.whale.is_valid(&full_key) {
305            // Single atomic access to get both cached value and revision
306            if let Some((cached, revision)) = self.get_cached_with_revision::<Q>(&full_key) {
307                // Shallow valid but not verified - verify deps first
308                let mut deps_verified = true;
309                if let Some(deps) = self.whale.get_dependency_ids(&full_key) {
310                    for dep in deps {
311                        if let Some(verifier) = self.verifiers.get(&dep) {
312                            // Re-query dep to verify it (triggers recursive verification)
313                            if verifier.verify(self as &dyn std::any::Any).is_err() {
314                                deps_verified = false;
315                                break;
316                            }
317                        }
318                    }
319                }
320
321                // Re-check validity after deps are verified
322                if deps_verified && self.whale.is_valid(&full_key) {
323                    // Deps didn't change their changed_at, mark as verified and use cached
324                    self.whale.mark_verified(&full_key, &current_rev);
325
326                    self.tracer.on_cache_check(span_id, query_key.clone(), true);
327                    self.tracer
328                        .on_query_end(span_id, query_key.clone(), ExecutionResult::CacheHit);
329
330                    return match cached {
331                        CachedValue::Ok(output) => Ok((Ok(output), revision)),
332                        CachedValue::UserError(err) => Ok((Err(err), revision)),
333                    };
334                }
335                // A dep's changed_at increased, fall through to execute
336            }
337        }
338
339        self.tracer
340            .on_cache_check(span_id, query_key.clone(), false);
341
342        // Execute the query with cycle tracking
343        QUERY_STACK.with(|stack| {
344            stack.borrow_mut().push(full_key.clone());
345        });
346
347        let result = self.execute_query::<Q>(&query, &full_key, exec_ctx);
348
349        QUERY_STACK.with(|stack| {
350            stack.borrow_mut().pop();
351        });
352
353        // Emit end event
354        let exec_result = match &result {
355            Ok((_, true, _)) => ExecutionResult::Changed,
356            Ok((_, false, _)) => ExecutionResult::Unchanged,
357            Err(QueryError::Suspend { .. }) => ExecutionResult::Suspended,
358            Err(QueryError::Cycle { .. }) => ExecutionResult::CycleDetected,
359            Err(e) => ExecutionResult::Error {
360                message: format!("{:?}", e),
361            },
362        };
363        self.tracer
364            .on_query_end(span_id, query_key.clone(), exec_result);
365
366        result.map(|(inner_result, _, revision)| (inner_result, revision))
367    }
368
369    /// Execute a query, caching the result if appropriate.
370    ///
371    /// Returns (result, output_changed, revision) tuple.
372    /// - `result`: Ok(output) for success, Err(user_error) for user errors
373    /// - System errors (Suspend, Cycle, etc.) are returned as outer Err
374    #[allow(clippy::type_complexity)]
375    fn execute_query<Q: Query>(
376        &self,
377        query: &Q,
378        full_key: &FullCacheKey,
379        exec_ctx: ExecutionContext,
380    ) -> Result<
381        (
382            Result<Arc<Q::Output>, Arc<anyhow::Error>>,
383            bool,
384            RevisionCounter,
385        ),
386        QueryError,
387    > {
388        // Create context for this query execution
389        let ctx = QueryContext {
390            runtime: self,
391            current_key: full_key.clone(),
392            parent_query_type: std::any::type_name::<Q>(),
393            exec_ctx,
394            deps: RefCell::new(Vec::new()),
395        };
396
397        // Execute the query (clone because query() takes ownership)
398        let result = query.clone().query(&ctx);
399
400        // Get collected dependencies
401        let deps: Vec<FullCacheKey> = ctx.deps.borrow().clone();
402
403        // Get durability for whale registration
404        let durability =
405            Durability::new(query.durability() as usize).unwrap_or(Durability::volatile());
406
407        match result {
408            Ok(output) => {
409                let output = Arc::new(output);
410
411                // Check if output changed (for early cutoff)
412                // existing_revision is Some only when output is unchanged (can reuse revision)
413                let existing_revision = if let Some((CachedValue::Ok(old), rev)) =
414                    self.get_cached_with_revision::<Q>(full_key)
415                {
416                    if Q::output_eq(&old, &output) {
417                        Some(rev) // Same output - reuse revision
418                    } else {
419                        None // Different output
420                    }
421                } else {
422                    None // No previous Ok value
423                };
424                let output_changed = existing_revision.is_none();
425
426                // Emit early cutoff check event
427                self.tracer.on_early_cutoff_check(
428                    exec_ctx.span_id(),
429                    TracerQueryKey::new(std::any::type_name::<Q>(), full_key.debug_repr()),
430                    output_changed,
431                );
432
433                // Update whale with cached entry (atomic update of value + dependency state)
434                let entry = CachedEntry::Ok(output.clone() as Arc<dyn std::any::Any + Send + Sync>);
435                let revision = if let Some(existing_rev) = existing_revision {
436                    // confirm_unchanged doesn't change changed_at, use existing
437                    let _ = self.whale.confirm_unchanged(full_key, deps);
438                    existing_rev
439                } else {
440                    // Use new_rev from register result
441                    match self
442                        .whale
443                        .register(full_key.clone(), Some(entry), durability, deps)
444                    {
445                        Ok(result) => result.new_rev,
446                        Err(missing) => {
447                            return Err(QueryError::DependenciesRemoved {
448                                missing_keys: missing,
449                            })
450                        }
451                    }
452                };
453
454                // Register query in registry for list_queries
455                let is_new_query = self.query_registry.register(query);
456                if is_new_query {
457                    let sentinel = FullCacheKey::query_set_sentinel::<Q>();
458                    let _ = self
459                        .whale
460                        .register(sentinel, None, Durability::volatile(), vec![]);
461                }
462
463                // Store verifier for this query (for verify-then-decide pattern)
464                self.verifiers
465                    .insert::<Q, T>(full_key.clone(), query.clone());
466
467                Ok((Ok(output), output_changed, revision))
468            }
469            Err(QueryError::UserError(err)) => {
470                // Check if error changed (for early cutoff)
471                // existing_revision is Some only when error is unchanged (can reuse revision)
472                let existing_revision = if let Some((CachedValue::UserError(old_err), rev)) =
473                    self.get_cached_with_revision::<Q>(full_key)
474                {
475                    if (self.error_comparator)(old_err.as_ref(), err.as_ref()) {
476                        Some(rev) // Same error - reuse revision
477                    } else {
478                        None // Different error
479                    }
480                } else {
481                    None // No previous UserError
482                };
483                let output_changed = existing_revision.is_none();
484
485                // Emit early cutoff check event
486                self.tracer.on_early_cutoff_check(
487                    exec_ctx.span_id(),
488                    TracerQueryKey::new(std::any::type_name::<Q>(), full_key.debug_repr()),
489                    output_changed,
490                );
491
492                // Update whale with cached error (atomic update of value + dependency state)
493                let entry = CachedEntry::UserError(err.clone());
494                let revision = if let Some(existing_rev) = existing_revision {
495                    // confirm_unchanged doesn't change changed_at, use existing
496                    let _ = self.whale.confirm_unchanged(full_key, deps);
497                    existing_rev
498                } else {
499                    // Use new_rev from register result
500                    match self
501                        .whale
502                        .register(full_key.clone(), Some(entry), durability, deps)
503                    {
504                        Ok(result) => result.new_rev,
505                        Err(missing) => {
506                            return Err(QueryError::DependenciesRemoved {
507                                missing_keys: missing,
508                            })
509                        }
510                    }
511                };
512
513                // Register query in registry for list_queries
514                let is_new_query = self.query_registry.register(query);
515                if is_new_query {
516                    let sentinel = FullCacheKey::query_set_sentinel::<Q>();
517                    let _ = self
518                        .whale
519                        .register(sentinel, None, Durability::volatile(), vec![]);
520                }
521
522                // Store verifier for this query (for verify-then-decide pattern)
523                self.verifiers
524                    .insert::<Q, T>(full_key.clone(), query.clone());
525
526                Ok((Err(err), output_changed, revision))
527            }
528            Err(e) => {
529                // System errors (Suspend, Cycle, Cancelled, MissingDependency) are not cached
530                Err(e)
531            }
532        }
533    }
534
535    /// Invalidate a query, forcing recomputation on next access.
536    ///
537    /// This also invalidates any queries that depend on this one.
538    pub fn invalidate<Q: Query>(&self, key: &Q::CacheKey) {
539        let full_key = FullCacheKey::new::<Q, _>(key);
540
541        self.tracer.on_query_invalidated(
542            TracerQueryKey::new(std::any::type_name::<Q>(), full_key.debug_repr()),
543            InvalidationReason::ManualInvalidation,
544        );
545
546        // Update whale to invalidate dependents (register with None to clear cached value)
547        let _ = self
548            .whale
549            .register(full_key, None, Durability::volatile(), vec![]);
550    }
551
552    /// Remove a query from the cache entirely, freeing memory.
553    ///
554    /// Use this for GC when a query is no longer needed.
555    /// Unlike `invalidate`, this removes all traces of the query from storage.
556    /// The query will be recomputed from scratch on next access.
557    ///
558    /// This also invalidates any queries that depend on this one.
559    pub fn remove_query<Q: Query>(&self, key: &Q::CacheKey) {
560        let full_key = FullCacheKey::new::<Q, _>(key);
561
562        self.tracer.on_query_invalidated(
563            TracerQueryKey::new(std::any::type_name::<Q>(), full_key.debug_repr()),
564            InvalidationReason::ManualInvalidation,
565        );
566
567        // Remove verifier if exists
568        self.verifiers.remove(&full_key);
569
570        // Remove from whale storage (this also handles dependent invalidation)
571        self.whale.remove(&full_key);
572
573        // Remove from registry and update sentinel for list_queries
574        if self.query_registry.remove::<Q>(key) {
575            let sentinel = FullCacheKey::query_set_sentinel::<Q>();
576            let _ = self
577                .whale
578                .register(sentinel, None, Durability::volatile(), vec![]);
579        }
580    }
581
582    /// Clear all cached values by removing all nodes from whale.
583    ///
584    /// Note: This is a relatively expensive operation as it iterates through all keys.
585    pub fn clear_cache(&self) {
586        let keys = self.whale.keys();
587        for key in keys {
588            self.whale.remove(&key);
589        }
590    }
591
592    /// Poll a query, returning both the result and its change revision.
593    ///
594    /// This is useful for implementing subscription patterns where you need to
595    /// detect changes efficiently. Compare the returned `revision` with a
596    /// previously stored value to determine if the query result has changed.
597    ///
598    /// The returned `Polled` contains a `Result<Arc<Q::Output>, Arc<anyhow::Error>>`
599    /// as its value, allowing you to track revision changes for both success and
600    /// user error cases.
601    ///
602    /// # Example
603    ///
604    /// ```ignore
605    /// struct Subscription<Q: Query> {
606    ///     query: Q,
607    ///     last_revision: RevisionCounter,
608    ///     tx: Sender<Result<Arc<Q::Output>, Arc<anyhow::Error>>>,
609    /// }
610    ///
611    /// // Polling loop
612    /// for sub in &mut subscriptions {
613    ///     let result = runtime.poll(sub.query.clone())?;
614    ///     if result.revision > sub.last_revision {
615    ///         sub.tx.send(result.value.clone())?;
616    ///         sub.last_revision = result.revision;
617    ///     }
618    /// }
619    /// ```
620    ///
621    /// # Errors
622    ///
623    /// Returns `Err` only for system errors (Suspend, Cycle, etc.).
624    /// User errors are returned as `Ok(Polled { value: Err(error), ... })`.
625    #[allow(clippy::type_complexity)]
626    pub fn poll<Q: Query>(
627        &self,
628        query: Q,
629    ) -> Result<Polled<Result<Arc<Q::Output>, Arc<anyhow::Error>>>, QueryError> {
630        let (value, revision) = self.query_internal(query)?;
631        Ok(Polled { value, revision })
632    }
633
634    /// Get the change revision of a query without executing it.
635    ///
636    /// Returns `None` if the query has never been executed.
637    ///
638    /// This is useful for checking if a query has changed since the last poll
639    /// without the cost of executing the query.
640    ///
641    /// # Example
642    ///
643    /// ```ignore
644    /// // Check if query has changed before deciding to poll
645    /// if let Some(rev) = runtime.changed_at::<MyQuery>(&key) {
646    ///     if rev > last_known_revision {
647    ///         let result = runtime.query(MyQuery::new(key))?;
648    ///         // Process result...
649    ///     }
650    /// }
651    /// ```
652    pub fn changed_at<Q: Query>(&self, key: &Q::CacheKey) -> Option<RevisionCounter> {
653        let full_key = FullCacheKey::new::<Q, _>(key);
654        self.whale.get(&full_key).map(|node| node.changed_at)
655    }
656}
657
658// ============================================================================
659// GC (Garbage Collection) API
660// ============================================================================
661
662impl<T: Tracer> QueryRuntime<T> {
663    /// Get all query keys currently in the cache.
664    ///
665    /// This is useful for implementing custom garbage collection strategies.
666    /// Use this in combination with [`Tracer::on_query_key`] to track access
667    /// times and implement LRU, TTL, or other GC algorithms externally.
668    ///
669    /// # Example
670    ///
671    /// ```ignore
672    /// // Collect all keys that haven't been accessed recently
673    /// let stale_keys: Vec<_> = runtime.query_keys()
674    ///     .filter(|key| tracker.is_stale(key))
675    ///     .collect();
676    ///
677    /// // Remove stale queries that have no dependents
678    /// for key in stale_keys {
679    ///     runtime.remove_if_unused(&key);
680    /// }
681    /// ```
682    pub fn query_keys(&self) -> Vec<FullCacheKey> {
683        self.whale.keys()
684    }
685
686    /// Remove a query if it has no dependents.
687    ///
688    /// Returns `true` if the query was removed, `false` if it has dependents
689    /// or doesn't exist. This is the safe way to remove queries during GC,
690    /// as it won't break queries that depend on this one.
691    ///
692    /// # Example
693    ///
694    /// ```ignore
695    /// let key = FullCacheKey::new::<MyQuery, _>(&cache_key);
696    /// if runtime.remove_query_if_unused::<MyQuery>(&cache_key) {
697    ///     println!("Query removed");
698    /// } else {
699    ///     println!("Query has dependents, not removed");
700    /// }
701    /// ```
702    pub fn remove_query_if_unused<Q: Query>(&self, key: &Q::CacheKey) -> bool {
703        let full_key = FullCacheKey::new::<Q, _>(key);
704        self.remove_if_unused(&full_key)
705    }
706
707    /// Remove a query by its [`FullCacheKey`].
708    ///
709    /// This is the type-erased version of [`remove_query`](Self::remove_query).
710    /// Use this when you have a `FullCacheKey` from [`query_keys`](Self::query_keys)
711    /// or [`Tracer::on_query_key`].
712    ///
713    /// Returns `true` if the query was removed, `false` if it doesn't exist.
714    ///
715    /// # Warning
716    ///
717    /// This forcibly removes the query even if other queries depend on it.
718    /// Dependent queries will be recomputed on next access. For safe GC,
719    /// use [`remove_if_unused`](Self::remove_if_unused) instead.
720    pub fn remove(&self, key: &FullCacheKey) -> bool {
721        // Remove verifier if exists
722        self.verifiers.remove(key);
723
724        // Remove from whale storage
725        self.whale.remove(key).is_some()
726    }
727
728    /// Remove a query by its [`FullCacheKey`] if it has no dependents.
729    ///
730    /// This is the type-erased version of [`remove_query_if_unused`](Self::remove_query_if_unused).
731    /// Use this when you have a `FullCacheKey` from [`query_keys`](Self::query_keys)
732    /// or [`Tracer::on_query_key`].
733    ///
734    /// Returns `true` if the query was removed, `false` if it has dependents
735    /// or doesn't exist.
736    ///
737    /// # Example
738    ///
739    /// ```ignore
740    /// // Implement LRU GC
741    /// for key in runtime.query_keys() {
742    ///     if tracker.is_expired(&key) {
743    ///         runtime.remove_if_unused(&key);
744    ///     }
745    /// }
746    /// ```
747    pub fn remove_if_unused(&self, key: &FullCacheKey) -> bool {
748        if self.whale.remove_if_unused(key.clone()).is_some() {
749            // Successfully removed - clean up verifier
750            self.verifiers.remove(key);
751            true
752        } else {
753            false
754        }
755    }
756}
757
758// ============================================================================
759// Builder
760// ============================================================================
761
762/// Builder for [`QueryRuntime`] with customizable settings.
763///
764/// # Example
765///
766/// ```ignore
767/// let runtime = QueryRuntime::builder()
768///     .error_comparator(|a, b| {
769///         // Treat all errors of the same type as equal
770///         a.downcast_ref::<std::io::Error>().is_some()
771///             == b.downcast_ref::<std::io::Error>().is_some()
772///     })
773///     .build();
774/// ```
775pub struct QueryRuntimeBuilder<T: Tracer = NoopTracer> {
776    error_comparator: ErrorComparator,
777    tracer: T,
778}
779
780impl Default for QueryRuntimeBuilder<NoopTracer> {
781    fn default() -> Self {
782        Self::new()
783    }
784}
785
786impl QueryRuntimeBuilder<NoopTracer> {
787    /// Create a new builder with default settings.
788    pub fn new() -> Self {
789        Self {
790            error_comparator: default_error_comparator,
791            tracer: NoopTracer,
792        }
793    }
794}
795
796impl<T: Tracer> QueryRuntimeBuilder<T> {
797    /// Set the error comparator function for early cutoff optimization.
798    ///
799    /// When a query returns `QueryError::UserError`, this function is used
800    /// to compare it with the previously cached error. If they are equal,
801    /// downstream queries can skip recomputation (early cutoff).
802    ///
803    /// The default comparator returns `false` for all errors, meaning errors
804    /// are always considered different (conservative, always recomputes).
805    ///
806    /// # Example
807    ///
808    /// ```ignore
809    /// // Treat errors as equal if they have the same display message
810    /// let runtime = QueryRuntime::builder()
811    ///     .error_comparator(|a, b| a.to_string() == b.to_string())
812    ///     .build();
813    /// ```
814    pub fn error_comparator(mut self, f: ErrorComparator) -> Self {
815        self.error_comparator = f;
816        self
817    }
818
819    /// Set the tracer for observability.
820    pub fn tracer<U: Tracer>(self, tracer: U) -> QueryRuntimeBuilder<U> {
821        QueryRuntimeBuilder {
822            error_comparator: self.error_comparator,
823            tracer,
824        }
825    }
826
827    /// Build the runtime with the configured settings.
828    pub fn build(self) -> QueryRuntime<T> {
829        QueryRuntime {
830            whale: WhaleRuntime::new(),
831            assets: Arc::new(AssetStorage::new()),
832            locators: Arc::new(LocatorStorage::new()),
833            pending: Arc::new(PendingStorage::new()),
834            query_registry: Arc::new(QueryRegistry::new()),
835            asset_key_registry: Arc::new(AssetKeyRegistry::new()),
836            verifiers: Arc::new(VerifierStorage::new()),
837            error_comparator: self.error_comparator,
838            tracer: Arc::new(self.tracer),
839        }
840    }
841}
842
843// ============================================================================
844// Asset API
845// ============================================================================
846
847impl<T: Tracer> QueryRuntime<T> {
848    /// Register an asset locator for a specific asset key type.
849    ///
850    /// Only one locator can be registered per key type. Later registrations
851    /// replace earlier ones.
852    ///
853    /// # Example
854    ///
855    /// ```ignore
856    /// let runtime = QueryRuntime::new();
857    /// runtime.register_asset_locator(FileSystemLocator::new("/assets"));
858    /// ```
859    pub fn register_asset_locator<K, L>(&self, locator: L)
860    where
861        K: AssetKey,
862        L: AssetLocator<K>,
863    {
864        self.locators.insert::<K, L>(locator);
865    }
866
867    /// Get an iterator over pending asset requests.
868    ///
869    /// Returns assets that have been requested but not yet resolved.
870    /// The user should fetch these externally and call `resolve_asset()`.
871    ///
872    /// # Example
873    ///
874    /// ```ignore
875    /// for pending in runtime.pending_assets() {
876    ///     if let Some(path) = pending.key::<FilePath>() {
877    ///         let content = fetch_file(path);
878    ///         runtime.resolve_asset(path.clone(), content);
879    ///     }
880    /// }
881    /// ```
882    pub fn pending_assets(&self) -> Vec<PendingAsset> {
883        self.pending.get_all()
884    }
885
886    /// Get pending assets filtered by key type.
887    pub fn pending_assets_of<K: AssetKey>(&self) -> Vec<K> {
888        self.pending.get_of_type::<K>()
889    }
890
891    /// Check if there are any pending assets.
892    pub fn has_pending_assets(&self) -> bool {
893        !self.pending.is_empty()
894    }
895
896    /// Resolve an asset with its loaded value.
897    ///
898    /// This marks the asset as ready and invalidates any queries that
899    /// depend on it (if the value changed), triggering recomputation on next access.
900    ///
901    /// This method is idempotent - resolving with the same value (via `asset_eq`)
902    /// will not trigger downstream recomputation.
903    ///
904    /// Uses the asset key's default durability.
905    ///
906    /// # Example
907    ///
908    /// ```ignore
909    /// let content = std::fs::read_to_string(&path)?;
910    /// runtime.resolve_asset(FilePath(path), content);
911    /// ```
912    pub fn resolve_asset<K: AssetKey>(&self, key: K, value: K::Asset) {
913        let durability = key.durability();
914        self.resolve_asset_internal(key, value, durability);
915    }
916
917    /// Resolve an asset with a specific durability level.
918    ///
919    /// Use this to override the asset key's default durability.
920    pub fn resolve_asset_with_durability<K: AssetKey>(
921        &self,
922        key: K,
923        value: K::Asset,
924        durability: DurabilityLevel,
925    ) {
926        self.resolve_asset_internal(key, value, durability);
927    }
928
929    fn resolve_asset_internal<K: AssetKey>(
930        &self,
931        key: K,
932        value: K::Asset,
933        durability_level: DurabilityLevel,
934    ) {
935        let full_asset_key = FullAssetKey::new(&key);
936        let full_cache_key = FullCacheKey::from_asset_key(&full_asset_key);
937
938        // Check for early cutoff
939        let changed = if let Some(old_value) = self.assets.get_ready::<K>(&full_asset_key) {
940            !K::asset_eq(&old_value, &value)
941        } else {
942            true // Loading or NotFound -> Ready is a change
943        };
944
945        // Emit asset resolved event
946        self.tracer.on_asset_resolved(
947            TracerAssetKey::new(std::any::type_name::<K>(), format!("{:?}", key)),
948            changed,
949        );
950
951        // Store the new value
952        self.assets
953            .insert_ready::<K>(full_asset_key.clone(), Arc::new(value));
954
955        // Remove from pending
956        self.pending.remove(&full_asset_key);
957
958        // Update whale dependency tracking
959        let durability =
960            Durability::new(durability_level.as_u8() as usize).unwrap_or(Durability::volatile());
961
962        if changed {
963            // Register with new changed_at to invalidate dependents
964            let _ = self
965                .whale
966                .register(full_cache_key, None, durability, vec![]);
967        } else {
968            // Early cutoff - keep old changed_at
969            let _ = self.whale.confirm_unchanged(&full_cache_key, vec![]);
970        }
971
972        // Register asset key in registry for list_asset_keys
973        let is_new_asset = self.asset_key_registry.register(&key);
974        if is_new_asset {
975            // Update sentinel to invalidate list_asset_keys dependents
976            let sentinel = FullCacheKey::asset_key_set_sentinel::<K>();
977            let _ = self
978                .whale
979                .register(sentinel, None, Durability::volatile(), vec![]);
980        }
981    }
982
983    /// Invalidate an asset, forcing queries to re-request it.
984    ///
985    /// The asset will be marked as loading and added to pending assets.
986    /// Dependent queries will suspend until the asset is resolved again.
987    ///
988    /// # Example
989    ///
990    /// ```ignore
991    /// // File was modified externally
992    /// runtime.invalidate_asset(&FilePath("config.json".into()));
993    /// // Queries depending on this asset will now suspend
994    /// // User should fetch the new value and call resolve_asset
995    /// ```
996    pub fn invalidate_asset<K: AssetKey>(&self, key: &K) {
997        let full_asset_key = FullAssetKey::new(key);
998        let full_cache_key = FullCacheKey::from_asset_key(&full_asset_key);
999
1000        // Emit asset invalidated event
1001        self.tracer.on_asset_invalidated(TracerAssetKey::new(
1002            std::any::type_name::<K>(),
1003            format!("{:?}", key),
1004        ));
1005
1006        // Mark as loading
1007        self.assets
1008            .insert(full_asset_key.clone(), AssetState::Loading);
1009
1010        // Add to pending
1011        self.pending.insert::<K>(full_asset_key, key.clone());
1012
1013        // Update whale to invalidate dependents (use volatile during loading)
1014        let _ = self
1015            .whale
1016            .register(full_cache_key, None, Durability::volatile(), vec![]);
1017    }
1018
1019    /// Remove an asset from the cache entirely.
1020    ///
1021    /// Unlike `invalidate_asset`, this removes all traces of the asset.
1022    /// Dependent queries will go through the locator again on next access.
1023    pub fn remove_asset<K: AssetKey>(&self, key: &K) {
1024        let full_asset_key = FullAssetKey::new(key);
1025        let full_cache_key = FullCacheKey::from_asset_key(&full_asset_key);
1026
1027        // First, register a change to invalidate dependents
1028        // This ensures queries that depend on this asset will recompute
1029        let _ = self
1030            .whale
1031            .register(full_cache_key.clone(), None, Durability::volatile(), vec![]);
1032
1033        // Then remove the asset from storage
1034        self.assets.remove(&full_asset_key);
1035        self.pending.remove(&full_asset_key);
1036
1037        // Finally remove from whale tracking
1038        self.whale.remove(&full_cache_key);
1039
1040        // Remove from registry and update sentinel for list_asset_keys
1041        if self.asset_key_registry.remove::<K>(key) {
1042            let sentinel = FullCacheKey::asset_key_set_sentinel::<K>();
1043            let _ = self
1044                .whale
1045                .register(sentinel, None, Durability::volatile(), vec![]);
1046        }
1047    }
1048
1049    /// Get an asset by key without tracking dependencies.
1050    ///
1051    /// Unlike `QueryContext::asset()`, this method does NOT register the caller
1052    /// as a dependent of the asset. Use this for direct asset access outside
1053    /// of query execution.
1054    ///
1055    /// # Returns
1056    ///
1057    /// - `Ok(AssetLoadingState::ready(...))` - Asset is loaded and ready
1058    /// - `Ok(AssetLoadingState::loading(...))` - Asset is still loading (added to pending)
1059    /// - `Err(QueryError::MissingDependency)` - Asset was not found
1060    pub fn get_asset<K: AssetKey>(&self, key: K) -> Result<AssetLoadingState<K>, QueryError> {
1061        self.get_asset_internal(key)
1062    }
1063
1064    /// Internal: Get asset state, checking cache and locator.
1065    fn get_asset_internal<K: AssetKey>(&self, key: K) -> Result<AssetLoadingState<K>, QueryError> {
1066        let full_asset_key = FullAssetKey::new(&key);
1067        let full_cache_key = FullCacheKey::from_asset_key(&full_asset_key);
1068
1069        // Helper to emit AssetRequested event
1070        let emit_requested = |tracer: &T, key: &K, state: TracerAssetState| {
1071            tracer.on_asset_requested(
1072                TracerAssetKey::new(std::any::type_name::<K>(), format!("{:?}", key)),
1073                state,
1074            );
1075        };
1076
1077        // Check cache first
1078        if let Some(state) = self.assets.get(&full_asset_key) {
1079            // Check if whale thinks it's valid
1080            if self.whale.is_valid(&full_cache_key) {
1081                return match state {
1082                    AssetState::Loading => {
1083                        emit_requested(&self.tracer, &key, TracerAssetState::Loading);
1084                        Ok(AssetLoadingState::loading(key))
1085                    }
1086                    AssetState::Ready(arc) => {
1087                        emit_requested(&self.tracer, &key, TracerAssetState::Ready);
1088                        match arc.downcast::<K::Asset>() {
1089                            Ok(value) => Ok(AssetLoadingState::ready(key, value)),
1090                            Err(_) => Err(QueryError::MissingDependency {
1091                                description: format!("Asset type mismatch: {:?}", key),
1092                            }),
1093                        }
1094                    }
1095                    AssetState::NotFound => {
1096                        emit_requested(&self.tracer, &key, TracerAssetState::NotFound);
1097                        Err(QueryError::MissingDependency {
1098                            description: format!("Asset not found: {:?}", key),
1099                        })
1100                    }
1101                };
1102            }
1103        }
1104
1105        // Check if there's a registered locator
1106        if let Some(locator) = self.locators.get(TypeId::of::<K>()) {
1107            if let Some(state) = locator.locate_any(&key) {
1108                // Store the state
1109                self.assets.insert(full_asset_key.clone(), state.clone());
1110
1111                match state {
1112                    AssetState::Ready(arc) => {
1113                        emit_requested(&self.tracer, &key, TracerAssetState::Ready);
1114
1115                        // Register with whale
1116                        let durability = Durability::new(key.durability().as_u8() as usize)
1117                            .unwrap_or(Durability::volatile());
1118                        self.whale
1119                            .register(full_cache_key, None, durability, vec![])
1120                            .expect("register with no dependencies cannot fail");
1121
1122                        match arc.downcast::<K::Asset>() {
1123                            Ok(value) => return Ok(AssetLoadingState::ready(key, value)),
1124                            Err(_) => {
1125                                return Err(QueryError::MissingDependency {
1126                                    description: format!("Asset type mismatch: {:?}", key),
1127                                })
1128                            }
1129                        }
1130                    }
1131                    AssetState::Loading => {
1132                        emit_requested(&self.tracer, &key, TracerAssetState::Loading);
1133                        self.pending.insert::<K>(full_asset_key, key.clone());
1134
1135                        // Register in whale so queries can depend on this asset
1136                        self.whale
1137                            .register(full_cache_key, None, Durability::volatile(), vec![])
1138                            .expect("register with no dependencies cannot fail");
1139
1140                        return Ok(AssetLoadingState::loading(key));
1141                    }
1142                    AssetState::NotFound => {
1143                        emit_requested(&self.tracer, &key, TracerAssetState::NotFound);
1144                        return Err(QueryError::MissingDependency {
1145                            description: format!("Asset not found: {:?}", key),
1146                        });
1147                    }
1148                }
1149            }
1150        }
1151
1152        // No locator registered or locator returned None - mark as pending
1153        emit_requested(&self.tracer, &key, TracerAssetState::Loading);
1154        self.assets
1155            .insert(full_asset_key.clone(), AssetState::Loading);
1156        self.pending
1157            .insert::<K>(full_asset_key.clone(), key.clone());
1158
1159        // Register in whale so queries can depend on this asset
1160        self.whale
1161            .register(full_cache_key, None, Durability::volatile(), vec![])
1162            .expect("register with no dependencies cannot fail");
1163
1164        Ok(AssetLoadingState::loading(key))
1165    }
1166}
1167
1168impl<T: Tracer> Db for QueryRuntime<T> {
1169    fn query<Q: Query>(&self, query: Q) -> Result<Arc<Q::Output>, QueryError> {
1170        QueryRuntime::query(self, query)
1171    }
1172
1173    fn asset<K: AssetKey>(&self, key: K) -> Result<AssetLoadingState<K>, QueryError> {
1174        self.get_asset_internal(key)
1175    }
1176
1177    fn list_queries<Q: Query>(&self) -> Vec<Q> {
1178        self.query_registry.get_all::<Q>()
1179    }
1180
1181    fn list_asset_keys<K: AssetKey>(&self) -> Vec<K> {
1182        self.asset_key_registry.get_all::<K>()
1183    }
1184}
1185
1186/// Context provided to queries during execution.
1187///
1188/// Use this to access dependencies via `query()`.
1189pub struct QueryContext<'a, T: Tracer = NoopTracer> {
1190    runtime: &'a QueryRuntime<T>,
1191    current_key: FullCacheKey,
1192    parent_query_type: &'static str,
1193    exec_ctx: ExecutionContext,
1194    deps: RefCell<Vec<FullCacheKey>>,
1195}
1196
1197impl<'a, T: Tracer> QueryContext<'a, T> {
1198    /// Query a dependency.
1199    ///
1200    /// The dependency is automatically tracked for invalidation.
1201    ///
1202    /// # Example
1203    ///
1204    /// ```ignore
1205    /// fn query(self, db: &impl Db) -> Result<Self::Output, QueryError> {
1206    ///     let dep_result = db.query(OtherQuery { id: self.id })?;
1207    ///     Ok(process(&dep_result))
1208    /// }
1209    /// ```
1210    pub fn query<Q: Query>(&self, query: Q) -> Result<Arc<Q::Output>, QueryError> {
1211        let key = query.cache_key();
1212        let full_key = FullCacheKey::new::<Q, _>(&key);
1213
1214        // Emit dependency registered event
1215        self.runtime.tracer.on_dependency_registered(
1216            self.exec_ctx.span_id(),
1217            TracerQueryKey::new(self.parent_query_type, self.current_key.debug_repr()),
1218            TracerQueryKey::new(std::any::type_name::<Q>(), full_key.debug_repr()),
1219        );
1220
1221        // Record this as a dependency
1222        self.deps.borrow_mut().push(full_key.clone());
1223
1224        // Execute the query
1225        self.runtime.query(query)
1226    }
1227
1228    /// Access an asset, tracking it as a dependency.
1229    ///
1230    /// Returns `AssetLoadingState<K>`:
1231    /// - `is_loading()` if the asset is still being loaded
1232    /// - `is_ready()` if the asset is available
1233    ///
1234    /// Use `.suspend()?` to convert to `Result<Arc<K::Asset>, QueryError>`,
1235    /// which returns `Err(QueryError::Suspend { asset })` if still loading.
1236    ///
1237    /// # Example
1238    ///
1239    /// ```ignore
1240    /// #[query]
1241    /// fn process_file(db: &impl Db, path: FilePath) -> Result<Output, QueryError> {
1242    ///     let content = db.asset(path)?.suspend()?;
1243    ///     // Process content...
1244    ///     Ok(output)
1245    /// }
1246    /// ```
1247    ///
1248    /// # Errors
1249    ///
1250    /// Returns `Err(QueryError::MissingDependency)` if the asset was not found.
1251    pub fn asset<K: AssetKey>(&self, key: K) -> Result<AssetLoadingState<K>, QueryError> {
1252        let full_asset_key = FullAssetKey::new(&key);
1253        let full_cache_key = FullCacheKey::from_asset_key(&full_asset_key);
1254
1255        // Emit asset dependency registered event
1256        self.runtime.tracer.on_asset_dependency_registered(
1257            self.exec_ctx.span_id(),
1258            TracerQueryKey::new(self.parent_query_type, self.current_key.debug_repr()),
1259            TracerAssetKey::new(std::any::type_name::<K>(), format!("{:?}", key)),
1260        );
1261
1262        // Record dependency on this asset
1263        self.deps.borrow_mut().push(full_cache_key);
1264
1265        // Get asset from runtime
1266        let result = self.runtime.get_asset_internal(key);
1267
1268        // Emit missing dependency event on error
1269        if let Err(QueryError::MissingDependency { ref description }) = result {
1270            self.runtime.tracer.on_missing_dependency(
1271                TracerQueryKey::new(self.parent_query_type, self.current_key.debug_repr()),
1272                description.clone(),
1273            );
1274        }
1275
1276        result
1277    }
1278
1279    /// List all query instances of type Q that have been registered.
1280    ///
1281    /// This method establishes a dependency on the "set" of queries of type Q.
1282    /// The calling query will be invalidated when:
1283    /// - A new query of type Q is first executed (added to set)
1284    ///
1285    /// The calling query will NOT be invalidated when:
1286    /// - An individual query of type Q has its value change
1287    ///
1288    /// # Example
1289    ///
1290    /// ```ignore
1291    /// #[query]
1292    /// fn all_results(db: &impl Db) -> Result<Vec<i32>, QueryError> {
1293    ///     let queries = db.list_queries::<MyQuery>();
1294    ///     let mut results = Vec::new();
1295    ///     for q in queries {
1296    ///         results.push(*db.query(q)?);
1297    ///     }
1298    ///     Ok(results)
1299    /// }
1300    /// ```
1301    pub fn list_queries<Q: Query>(&self) -> Vec<Q> {
1302        // Record dependency on the sentinel (set-level dependency)
1303        let sentinel = FullCacheKey::query_set_sentinel::<Q>();
1304
1305        self.runtime.tracer.on_dependency_registered(
1306            self.exec_ctx.span_id(),
1307            TracerQueryKey::new(self.parent_query_type, self.current_key.debug_repr()),
1308            TracerQueryKey::new("QuerySet", sentinel.debug_repr()),
1309        );
1310
1311        // Ensure sentinel exists in whale (for dependency tracking)
1312        if self.runtime.whale.get(&sentinel).is_none() {
1313            let _ =
1314                self.runtime
1315                    .whale
1316                    .register(sentinel.clone(), None, Durability::volatile(), vec![]);
1317        }
1318
1319        self.deps.borrow_mut().push(sentinel);
1320
1321        // Return all registered queries
1322        self.runtime.query_registry.get_all::<Q>()
1323    }
1324
1325    /// List all asset keys of type K that have been registered.
1326    ///
1327    /// This method establishes a dependency on the "set" of asset keys of type K.
1328    /// The calling query will be invalidated when:
1329    /// - A new asset of type K is resolved for the first time (added to set)
1330    /// - An asset of type K is removed via remove_asset
1331    ///
1332    /// The calling query will NOT be invalidated when:
1333    /// - An individual asset's value changes (use `db.asset()` for that)
1334    ///
1335    /// # Example
1336    ///
1337    /// ```ignore
1338    /// #[query]
1339    /// fn all_configs(db: &impl Db) -> Result<Vec<String>, QueryError> {
1340    ///     let keys = db.list_asset_keys::<ConfigFile>();
1341    ///     let mut contents = Vec::new();
1342    ///     for key in keys {
1343    ///         let content = db.asset(&key)?.suspend()?;
1344    ///         contents.push((*content).clone());
1345    ///     }
1346    ///     Ok(contents)
1347    /// }
1348    /// ```
1349    pub fn list_asset_keys<K: AssetKey>(&self) -> Vec<K> {
1350        // Record dependency on the sentinel (set-level dependency)
1351        let sentinel = FullCacheKey::asset_key_set_sentinel::<K>();
1352
1353        self.runtime.tracer.on_asset_dependency_registered(
1354            self.exec_ctx.span_id(),
1355            TracerQueryKey::new(self.parent_query_type, self.current_key.debug_repr()),
1356            TracerAssetKey::new("AssetKeySet", sentinel.debug_repr()),
1357        );
1358
1359        // Ensure sentinel exists in whale (for dependency tracking)
1360        if self.runtime.whale.get(&sentinel).is_none() {
1361            let _ =
1362                self.runtime
1363                    .whale
1364                    .register(sentinel.clone(), None, Durability::volatile(), vec![]);
1365        }
1366
1367        self.deps.borrow_mut().push(sentinel);
1368
1369        // Return all registered asset keys
1370        self.runtime.asset_key_registry.get_all::<K>()
1371    }
1372}
1373
1374impl<'a, T: Tracer> Db for QueryContext<'a, T> {
1375    fn query<Q: Query>(&self, query: Q) -> Result<Arc<Q::Output>, QueryError> {
1376        QueryContext::query(self, query)
1377    }
1378
1379    fn asset<K: AssetKey>(&self, key: K) -> Result<AssetLoadingState<K>, QueryError> {
1380        QueryContext::asset(self, key)
1381    }
1382
1383    fn list_queries<Q: Query>(&self) -> Vec<Q> {
1384        QueryContext::list_queries(self)
1385    }
1386
1387    fn list_asset_keys<K: AssetKey>(&self) -> Vec<K> {
1388        QueryContext::list_asset_keys(self)
1389    }
1390}
1391
1392#[cfg(test)]
1393mod tests {
1394    use super::*;
1395
1396    #[test]
1397    fn test_simple_query() {
1398        #[derive(Clone)]
1399        struct Add {
1400            a: i32,
1401            b: i32,
1402        }
1403
1404        impl Query for Add {
1405            type CacheKey = (i32, i32);
1406            type Output = i32;
1407
1408            fn cache_key(&self) -> Self::CacheKey {
1409                (self.a, self.b)
1410            }
1411
1412            fn query(self, _db: &impl Db) -> Result<Self::Output, QueryError> {
1413                Ok(self.a + self.b)
1414            }
1415
1416            fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
1417                old == new
1418            }
1419        }
1420
1421        let runtime = QueryRuntime::new();
1422
1423        let result = runtime.query(Add { a: 1, b: 2 }).unwrap();
1424        assert_eq!(*result, 3);
1425
1426        // Second query should be cached
1427        let result2 = runtime.query(Add { a: 1, b: 2 }).unwrap();
1428        assert_eq!(*result2, 3);
1429    }
1430
1431    #[test]
1432    fn test_dependent_queries() {
1433        #[derive(Clone)]
1434        struct Base {
1435            value: i32,
1436        }
1437
1438        impl Query for Base {
1439            type CacheKey = i32;
1440            type Output = i32;
1441
1442            fn cache_key(&self) -> Self::CacheKey {
1443                self.value
1444            }
1445
1446            fn query(self, _db: &impl Db) -> Result<Self::Output, QueryError> {
1447                Ok(self.value * 2)
1448            }
1449
1450            fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
1451                old == new
1452            }
1453        }
1454
1455        #[derive(Clone)]
1456        struct Derived {
1457            base_value: i32,
1458        }
1459
1460        impl Query for Derived {
1461            type CacheKey = i32;
1462            type Output = i32;
1463
1464            fn cache_key(&self) -> Self::CacheKey {
1465                self.base_value
1466            }
1467
1468            fn query(self, db: &impl Db) -> Result<Self::Output, QueryError> {
1469                let base = db.query(Base {
1470                    value: self.base_value,
1471                })?;
1472                Ok(*base + 10)
1473            }
1474
1475            fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
1476                old == new
1477            }
1478        }
1479
1480        let runtime = QueryRuntime::new();
1481
1482        let result = runtime.query(Derived { base_value: 5 }).unwrap();
1483        assert_eq!(*result, 20); // 5 * 2 + 10
1484    }
1485
1486    #[test]
1487    fn test_cycle_detection() {
1488        #[derive(Clone)]
1489        struct CycleA {
1490            id: i32,
1491        }
1492
1493        #[derive(Clone)]
1494        struct CycleB {
1495            id: i32,
1496        }
1497
1498        impl Query for CycleA {
1499            type CacheKey = i32;
1500            type Output = i32;
1501
1502            fn cache_key(&self) -> Self::CacheKey {
1503                self.id
1504            }
1505
1506            fn query(self, db: &impl Db) -> Result<Self::Output, QueryError> {
1507                let b = db.query(CycleB { id: self.id })?;
1508                Ok(*b + 1)
1509            }
1510
1511            fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
1512                old == new
1513            }
1514        }
1515
1516        impl Query for CycleB {
1517            type CacheKey = i32;
1518            type Output = i32;
1519
1520            fn cache_key(&self) -> Self::CacheKey {
1521                self.id
1522            }
1523
1524            fn query(self, db: &impl Db) -> Result<Self::Output, QueryError> {
1525                let a = db.query(CycleA { id: self.id })?;
1526                Ok(*a + 1)
1527            }
1528
1529            fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
1530                old == new
1531            }
1532        }
1533
1534        let runtime = QueryRuntime::new();
1535
1536        let result = runtime.query(CycleA { id: 1 });
1537        assert!(matches!(result, Err(QueryError::Cycle { .. })));
1538    }
1539
1540    #[test]
1541    fn test_fallible_query() {
1542        #[derive(Clone)]
1543        struct ParseInt {
1544            input: String,
1545        }
1546
1547        impl Query for ParseInt {
1548            type CacheKey = String;
1549            type Output = Result<i32, std::num::ParseIntError>;
1550
1551            fn cache_key(&self) -> Self::CacheKey {
1552                self.input.clone()
1553            }
1554
1555            fn query(self, _db: &impl Db) -> Result<Self::Output, QueryError> {
1556                Ok(self.input.parse())
1557            }
1558
1559            fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
1560                old == new
1561            }
1562        }
1563
1564        let runtime = QueryRuntime::new();
1565
1566        // Valid parse
1567        let result = runtime
1568            .query(ParseInt {
1569                input: "42".to_string(),
1570            })
1571            .unwrap();
1572        assert_eq!(*result, Ok(42));
1573
1574        // Invalid parse - system succeeds, user error in output
1575        let result = runtime
1576            .query(ParseInt {
1577                input: "not_a_number".to_string(),
1578            })
1579            .unwrap();
1580        assert!(result.is_err());
1581    }
1582
1583    // Macro tests
1584    mod macro_tests {
1585        use super::*;
1586        use crate::query;
1587
1588        #[query]
1589        fn add(db: &impl Db, a: i32, b: i32) -> Result<i32, QueryError> {
1590            let _ = db; // silence unused warning
1591            Ok(a + b)
1592        }
1593
1594        #[test]
1595        fn test_macro_basic() {
1596            let runtime = QueryRuntime::new();
1597            let result = runtime.query(Add::new(1, 2)).unwrap();
1598            assert_eq!(*result, 3);
1599        }
1600
1601        #[query(durability = 2)]
1602        fn with_durability(db: &impl Db, x: i32) -> Result<i32, QueryError> {
1603            let _ = db;
1604            Ok(x * 2)
1605        }
1606
1607        #[test]
1608        fn test_macro_durability() {
1609            let runtime = QueryRuntime::new();
1610            let result = runtime.query(WithDurability::new(5)).unwrap();
1611            assert_eq!(*result, 10);
1612        }
1613
1614        #[query(keys(id))]
1615        fn with_key_selection(
1616            db: &impl Db,
1617            id: u32,
1618            include_extra: bool,
1619        ) -> Result<String, QueryError> {
1620            let _ = db;
1621            Ok(format!("id={}, extra={}", id, include_extra))
1622        }
1623
1624        #[test]
1625        fn test_macro_key_selection() {
1626            let runtime = QueryRuntime::new();
1627
1628            // Same id, different include_extra - should return cached
1629            let r1 = runtime.query(WithKeySelection::new(1, true)).unwrap();
1630            let r2 = runtime.query(WithKeySelection::new(1, false)).unwrap();
1631
1632            // Both should have same value because only `id` is the key
1633            assert_eq!(*r1, "id=1, extra=true");
1634            assert_eq!(*r2, "id=1, extra=true"); // Cached!
1635        }
1636
1637        #[query]
1638        fn dependent(db: &impl Db, a: i32, b: i32) -> Result<i32, QueryError> {
1639            let sum = db.query(Add::new(a, b))?;
1640            Ok(*sum * 2)
1641        }
1642
1643        #[test]
1644        fn test_macro_dependencies() {
1645            let runtime = QueryRuntime::new();
1646            let result = runtime.query(Dependent::new(3, 4)).unwrap();
1647            assert_eq!(*result, 14); // (3 + 4) * 2
1648        }
1649
1650        #[query(output_eq)]
1651        fn with_output_eq(db: &impl Db, x: i32) -> Result<i32, QueryError> {
1652            let _ = db;
1653            Ok(x * 2)
1654        }
1655
1656        #[test]
1657        fn test_macro_output_eq() {
1658            let runtime = QueryRuntime::new();
1659            let result = runtime.query(WithOutputEq::new(5)).unwrap();
1660            assert_eq!(*result, 10);
1661        }
1662
1663        #[query(name = "CustomName")]
1664        fn original_name(db: &impl Db, x: i32) -> Result<i32, QueryError> {
1665            let _ = db;
1666            Ok(x)
1667        }
1668
1669        #[test]
1670        fn test_macro_custom_name() {
1671            let runtime = QueryRuntime::new();
1672            let result = runtime.query(CustomName::new(42)).unwrap();
1673            assert_eq!(*result, 42);
1674        }
1675
1676        // Test that attribute macros like #[tracing::instrument] are preserved
1677        // We use #[allow(unused_variables)] and #[inline] as test attributes since
1678        // they don't require external dependencies.
1679        #[allow(unused_variables)]
1680        #[inline]
1681        #[query]
1682        fn with_attributes(db: &impl Db, x: i32) -> Result<i32, QueryError> {
1683            // This would warn without #[allow(unused_variables)] on the generated method
1684            let unused_var = 42;
1685            Ok(x * 2)
1686        }
1687
1688        #[test]
1689        fn test_macro_preserves_attributes() {
1690            let runtime = QueryRuntime::new();
1691            // If attributes weren't preserved, this might warn about unused_var
1692            let result = runtime.query(WithAttributes::new(5)).unwrap();
1693            assert_eq!(*result, 10);
1694        }
1695    }
1696
1697    // Tests for poll() and changed_at()
1698    mod poll_tests {
1699        use super::*;
1700
1701        #[derive(Clone)]
1702        struct Counter {
1703            id: i32,
1704        }
1705
1706        impl Query for Counter {
1707            type CacheKey = i32;
1708            type Output = i32;
1709
1710            fn cache_key(&self) -> Self::CacheKey {
1711                self.id
1712            }
1713
1714            fn query(self, _db: &impl Db) -> Result<Self::Output, QueryError> {
1715                Ok(self.id * 10)
1716            }
1717
1718            fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
1719                old == new
1720            }
1721        }
1722
1723        #[test]
1724        fn test_poll_returns_value_and_revision() {
1725            let runtime = QueryRuntime::new();
1726
1727            let result = runtime.poll(Counter { id: 1 }).unwrap();
1728
1729            // Value should be correct - access through Result and Arc
1730            assert_eq!(**result.value.as_ref().unwrap(), 10);
1731
1732            // Revision should be non-zero after first execution
1733            assert!(result.revision > 0);
1734        }
1735
1736        #[test]
1737        fn test_poll_revision_stable_on_cache_hit() {
1738            let runtime = QueryRuntime::new();
1739
1740            // First poll
1741            let result1 = runtime.poll(Counter { id: 1 }).unwrap();
1742            let rev1 = result1.revision;
1743
1744            // Second poll (cache hit)
1745            let result2 = runtime.poll(Counter { id: 1 }).unwrap();
1746            let rev2 = result2.revision;
1747
1748            // Revision should be the same (no change)
1749            assert_eq!(rev1, rev2);
1750        }
1751
1752        #[test]
1753        fn test_poll_revision_changes_on_invalidate() {
1754            let runtime = QueryRuntime::new();
1755
1756            // First poll
1757            let result1 = runtime.poll(Counter { id: 1 }).unwrap();
1758            let rev1 = result1.revision;
1759
1760            // Invalidate and poll again
1761            runtime.invalidate::<Counter>(&1);
1762            let result2 = runtime.poll(Counter { id: 1 }).unwrap();
1763            let rev2 = result2.revision;
1764
1765            // Revision should increase (value was recomputed)
1766            // Note: Since output_eq returns true (same value), this might not change
1767            // depending on early cutoff behavior. Let's verify the value is still correct.
1768            assert_eq!(**result2.value.as_ref().unwrap(), 10);
1769
1770            // With early cutoff, revision might stay the same if value didn't change
1771            // This is expected behavior
1772            assert!(rev2 >= rev1);
1773        }
1774
1775        #[test]
1776        fn test_changed_at_returns_none_for_unexecuted_query() {
1777            let runtime = QueryRuntime::new();
1778
1779            // Query has never been executed
1780            let rev = runtime.changed_at::<Counter>(&1);
1781            assert!(rev.is_none());
1782        }
1783
1784        #[test]
1785        fn test_changed_at_returns_revision_after_execution() {
1786            let runtime = QueryRuntime::new();
1787
1788            // Execute the query
1789            let _ = runtime.query(Counter { id: 1 }).unwrap();
1790
1791            // Now changed_at should return Some
1792            let rev = runtime.changed_at::<Counter>(&1);
1793            assert!(rev.is_some());
1794            assert!(rev.unwrap() > 0);
1795        }
1796
1797        #[test]
1798        fn test_changed_at_matches_poll_revision() {
1799            let runtime = QueryRuntime::new();
1800
1801            // Poll the query
1802            let result = runtime.poll(Counter { id: 1 }).unwrap();
1803
1804            // changed_at should match the revision from poll
1805            let rev = runtime.changed_at::<Counter>(&1);
1806            assert_eq!(rev, Some(result.revision));
1807        }
1808
1809        #[test]
1810        fn test_poll_value_access() {
1811            let runtime = QueryRuntime::new();
1812
1813            let result = runtime.poll(Counter { id: 5 }).unwrap();
1814
1815            // Access through Result and Arc
1816            let value: &i32 = result.value.as_ref().unwrap();
1817            assert_eq!(*value, 50);
1818
1819            // Access Arc directly via field after unwrapping Result
1820            let arc: &Arc<i32> = result.value.as_ref().unwrap();
1821            assert_eq!(**arc, 50);
1822        }
1823
1824        #[test]
1825        fn test_subscription_pattern() {
1826            let runtime = QueryRuntime::new();
1827
1828            // Simulate subscription pattern
1829            let mut last_revision: RevisionCounter = 0;
1830            let mut notifications = 0;
1831
1832            // First poll - should notify (new value)
1833            let result = runtime.poll(Counter { id: 1 }).unwrap();
1834            if result.revision > last_revision {
1835                notifications += 1;
1836                last_revision = result.revision;
1837            }
1838
1839            // Second poll - should NOT notify (no change)
1840            let result = runtime.poll(Counter { id: 1 }).unwrap();
1841            if result.revision > last_revision {
1842                notifications += 1;
1843                last_revision = result.revision;
1844            }
1845
1846            // Third poll - should NOT notify (no change)
1847            let result = runtime.poll(Counter { id: 1 }).unwrap();
1848            if result.revision > last_revision {
1849                notifications += 1;
1850                #[allow(unused_assignments)]
1851                {
1852                    last_revision = result.revision;
1853                }
1854            }
1855
1856            // Only the first poll should have triggered a notification
1857            assert_eq!(notifications, 1);
1858        }
1859    }
1860
1861    // Tests for GC APIs
1862    mod gc_tests {
1863        use super::*;
1864        use std::collections::HashSet;
1865        use std::sync::atomic::{AtomicUsize, Ordering};
1866        use std::sync::Mutex;
1867
1868        #[derive(Clone)]
1869        struct Leaf {
1870            id: i32,
1871        }
1872
1873        impl Query for Leaf {
1874            type CacheKey = i32;
1875            type Output = i32;
1876
1877            fn cache_key(&self) -> Self::CacheKey {
1878                self.id
1879            }
1880
1881            fn query(self, _db: &impl Db) -> Result<Self::Output, QueryError> {
1882                Ok(self.id * 10)
1883            }
1884
1885            fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
1886                old == new
1887            }
1888        }
1889
1890        #[derive(Clone)]
1891        struct Parent {
1892            child_id: i32,
1893        }
1894
1895        impl Query for Parent {
1896            type CacheKey = i32;
1897            type Output = i32;
1898
1899            fn cache_key(&self) -> Self::CacheKey {
1900                self.child_id
1901            }
1902
1903            fn query(self, db: &impl Db) -> Result<Self::Output, QueryError> {
1904                let child = db.query(Leaf { id: self.child_id })?;
1905                Ok(*child + 1)
1906            }
1907
1908            fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
1909                old == new
1910            }
1911        }
1912
1913        #[test]
1914        fn test_query_keys_returns_all_cached_queries() {
1915            let runtime = QueryRuntime::new();
1916
1917            // Execute some queries
1918            let _ = runtime.query(Leaf { id: 1 }).unwrap();
1919            let _ = runtime.query(Leaf { id: 2 }).unwrap();
1920            let _ = runtime.query(Leaf { id: 3 }).unwrap();
1921
1922            // Get all keys
1923            let keys = runtime.query_keys();
1924
1925            // Should have at least 3 keys (might have more due to sentinels)
1926            assert!(keys.len() >= 3);
1927        }
1928
1929        #[test]
1930        fn test_remove_removes_query() {
1931            let runtime = QueryRuntime::new();
1932
1933            // Execute a query
1934            let _ = runtime.query(Leaf { id: 1 }).unwrap();
1935
1936            // Get the key
1937            let full_key = FullCacheKey::new::<Leaf, _>(&1);
1938
1939            // Query should exist
1940            assert!(runtime.changed_at::<Leaf>(&1).is_some());
1941
1942            // Remove it
1943            assert!(runtime.remove(&full_key));
1944
1945            // Query should no longer exist
1946            assert!(runtime.changed_at::<Leaf>(&1).is_none());
1947        }
1948
1949        #[test]
1950        fn test_remove_if_unused_removes_leaf_query() {
1951            let runtime = QueryRuntime::new();
1952
1953            // Execute a leaf query (no dependents)
1954            let _ = runtime.query(Leaf { id: 1 }).unwrap();
1955
1956            // Should be removable since no other query depends on it
1957            assert!(runtime.remove_query_if_unused::<Leaf>(&1));
1958
1959            // Query should no longer exist
1960            assert!(runtime.changed_at::<Leaf>(&1).is_none());
1961        }
1962
1963        #[test]
1964        fn test_remove_if_unused_does_not_remove_query_with_dependents() {
1965            let runtime = QueryRuntime::new();
1966
1967            // Execute parent query (which depends on Leaf)
1968            let _ = runtime.query(Parent { child_id: 1 }).unwrap();
1969
1970            // Leaf query should not be removable since Parent depends on it
1971            assert!(!runtime.remove_query_if_unused::<Leaf>(&1));
1972
1973            // Leaf query should still exist
1974            assert!(runtime.changed_at::<Leaf>(&1).is_some());
1975
1976            // But Parent should be removable (no dependents)
1977            assert!(runtime.remove_query_if_unused::<Parent>(&1));
1978        }
1979
1980        #[test]
1981        fn test_remove_if_unused_with_full_cache_key() {
1982            let runtime = QueryRuntime::new();
1983
1984            // Execute a leaf query
1985            let _ = runtime.query(Leaf { id: 1 }).unwrap();
1986
1987            let full_key = FullCacheKey::new::<Leaf, _>(&1);
1988
1989            // Should be removable via FullCacheKey
1990            assert!(runtime.remove_if_unused(&full_key));
1991
1992            // Query should no longer exist
1993            assert!(runtime.changed_at::<Leaf>(&1).is_none());
1994        }
1995
1996        // Test tracer receives on_query_key calls
1997        struct GcTracker {
1998            accessed_keys: Mutex<HashSet<String>>,
1999            access_count: AtomicUsize,
2000        }
2001
2002        impl GcTracker {
2003            fn new() -> Self {
2004                Self {
2005                    accessed_keys: Mutex::new(HashSet::new()),
2006                    access_count: AtomicUsize::new(0),
2007                }
2008            }
2009        }
2010
2011        impl Tracer for GcTracker {
2012            fn new_span_id(&self) -> SpanId {
2013                SpanId(1)
2014            }
2015
2016            fn on_query_key(&self, full_key: &FullCacheKey) {
2017                self.accessed_keys
2018                    .lock()
2019                    .unwrap()
2020                    .insert(full_key.debug_repr().to_string());
2021                self.access_count.fetch_add(1, Ordering::Relaxed);
2022            }
2023        }
2024
2025        #[test]
2026        fn test_tracer_receives_on_query_key() {
2027            let tracker = GcTracker::new();
2028            let runtime = QueryRuntime::with_tracer(tracker);
2029
2030            // Execute some queries
2031            let _ = runtime.query(Leaf { id: 1 }).unwrap();
2032            let _ = runtime.query(Leaf { id: 2 }).unwrap();
2033
2034            // Tracer should have received on_query_key calls
2035            let count = runtime.tracer().access_count.load(Ordering::Relaxed);
2036            assert_eq!(count, 2);
2037
2038            // Check that the keys were recorded
2039            let keys = runtime.tracer().accessed_keys.lock().unwrap();
2040            assert!(keys.iter().any(|k| k.contains("Leaf")));
2041        }
2042
2043        #[test]
2044        fn test_tracer_receives_on_query_key_for_cache_hits() {
2045            let tracker = GcTracker::new();
2046            let runtime = QueryRuntime::with_tracer(tracker);
2047
2048            // Execute query twice (second is cache hit)
2049            let _ = runtime.query(Leaf { id: 1 }).unwrap();
2050            let _ = runtime.query(Leaf { id: 1 }).unwrap();
2051
2052            // Tracer should have received on_query_key for both calls
2053            let count = runtime.tracer().access_count.load(Ordering::Relaxed);
2054            assert_eq!(count, 2);
2055        }
2056
2057        #[test]
2058        fn test_gc_workflow() {
2059            let tracker = GcTracker::new();
2060            let runtime = QueryRuntime::with_tracer(tracker);
2061
2062            // Execute some queries
2063            let _ = runtime.query(Leaf { id: 1 }).unwrap();
2064            let _ = runtime.query(Leaf { id: 2 }).unwrap();
2065            let _ = runtime.query(Leaf { id: 3 }).unwrap();
2066
2067            // Simulate GC: remove all queries that are not in use
2068            let mut removed = 0;
2069            for key in runtime.query_keys() {
2070                if runtime.remove_if_unused(&key) {
2071                    removed += 1;
2072                }
2073            }
2074
2075            // All leaf queries should be removable
2076            assert!(removed >= 3);
2077
2078            // Queries should no longer exist
2079            assert!(runtime.changed_at::<Leaf>(&1).is_none());
2080            assert!(runtime.changed_at::<Leaf>(&2).is_none());
2081            assert!(runtime.changed_at::<Leaf>(&3).is_none());
2082        }
2083    }
2084}