query_flow/
runtime.rs

1//! Query runtime and context.
2
3use std::any::TypeId;
4use std::cell::RefCell;
5use std::sync::Arc;
6
7use std::ops::Deref;
8
9use whale::{Durability, RevisionCounter, Runtime as WhaleRuntime};
10
11use crate::asset::{AssetKey, AssetLocator, DurabilityLevel, FullAssetKey, PendingAsset};
12use crate::key::FullCacheKey;
13use crate::loading::LoadingState;
14use crate::query::Query;
15use crate::storage::{
16    AssetKeyRegistry, AssetState, AssetStorage, CachedEntry, CachedValue, LocatorStorage,
17    PendingStorage, QueryRegistry, VerifierStorage,
18};
19use crate::QueryError;
20
21/// Function type for comparing user errors during early cutoff.
22///
23/// Used by `QueryRuntimeBuilder::error_comparator` to customize how
24/// `QueryError::UserError` values are compared for caching purposes.
25pub type ErrorComparator = fn(&anyhow::Error, &anyhow::Error) -> bool;
26
27#[cfg(feature = "inspector")]
28use query_flow_inspector::{EventSink, FlowEvent, SpanId};
29
30/// Number of durability levels (matches whale's default).
31const DURABILITY_LEVELS: usize = 4;
32
33// Thread-local query execution stack for cycle detection.
34thread_local! {
35    static QUERY_STACK: RefCell<Vec<FullCacheKey>> = const { RefCell::new(Vec::new()) };
36}
37
38/// Execution context passed through query execution.
39///
40/// When the `inspector` feature is disabled, this is a zero-sized type (ZST)
41/// with no runtime cost.
42#[cfg(feature = "inspector")]
43#[derive(Clone, Copy)]
44pub struct ExecutionContext {
45    span_id: SpanId,
46}
47
48#[cfg(not(feature = "inspector"))]
49#[derive(Clone, Copy)]
50pub struct ExecutionContext;
51
52impl ExecutionContext {
53    /// Create a new execution context.
54    #[cfg(feature = "inspector")]
55    pub fn new() -> Self {
56        Self {
57            span_id: query_flow_inspector::new_span_id(),
58        }
59    }
60
61    /// Create a new execution context.
62    #[cfg(not(feature = "inspector"))]
63    #[inline(always)]
64    pub fn new() -> Self {
65        Self
66    }
67
68    /// Get the span ID for this execution context.
69    #[cfg(feature = "inspector")]
70    pub fn span_id(&self) -> SpanId {
71        self.span_id
72    }
73}
74
75impl Default for ExecutionContext {
76    fn default() -> Self {
77        Self::new()
78    }
79}
80
81/// Result of polling a query, containing the value and its revision.
82///
83/// This is returned by [`QueryRuntime::poll`] and provides both the query result
84/// and its change revision, enabling efficient change detection for subscription
85/// patterns.
86///
87/// # Example
88///
89/// ```ignore
90/// let result = runtime.poll(MyQuery::new())?;
91///
92/// // Access the value via Deref
93/// println!("{:?}", *result);
94///
95/// // Check if changed since last poll
96/// if result.revision > last_known_revision {
97///     notify_subscribers(&result.value);
98///     last_known_revision = result.revision;
99/// }
100/// ```
101#[derive(Debug, Clone)]
102pub struct Polled<T> {
103    /// The query result value.
104    pub value: T,
105    /// The revision at which this value was last changed.
106    ///
107    /// Compare this with a previously stored revision to detect changes.
108    pub revision: RevisionCounter,
109}
110
111impl<T: Deref> Deref for Polled<T> {
112    type Target = T::Target;
113
114    fn deref(&self) -> &Self::Target {
115        &self.value
116    }
117}
118
119/// The query runtime manages query execution, caching, and dependency tracking.
120///
121/// This is cheap to clone - all data is behind `Arc`.
122///
123/// # Example
124///
125/// ```ignore
126/// let runtime = QueryRuntime::new();
127///
128/// // Sync query execution
129/// let result = runtime.query(MyQuery { ... })?;
130///
131/// // Async query execution (waits through Suspend)
132/// let result = runtime.query_async(MyQuery { ... }).await?;
133/// ```
134pub struct QueryRuntime {
135    /// Whale runtime for dependency tracking and cache storage.
136    /// Query outputs are stored in Node.data as Option<CachedEntry>.
137    whale: WhaleRuntime<FullCacheKey, Option<CachedEntry>, DURABILITY_LEVELS>,
138    /// Asset cache and state storage
139    assets: Arc<AssetStorage>,
140    /// Registered asset locators
141    locators: Arc<LocatorStorage>,
142    /// Pending asset requests
143    pending: Arc<PendingStorage>,
144    /// Registry for tracking query instances (for list_queries)
145    query_registry: Arc<QueryRegistry>,
146    /// Registry for tracking asset keys (for list_asset_keys)
147    asset_key_registry: Arc<AssetKeyRegistry>,
148    /// Verifiers for re-executing queries (for verify-then-decide pattern)
149    verifiers: Arc<VerifierStorage>,
150    /// Comparator for user errors during early cutoff
151    error_comparator: ErrorComparator,
152    /// Event sink for inspector/tracing
153    #[cfg(feature = "inspector")]
154    sink: Arc<parking_lot::RwLock<Option<Arc<dyn EventSink>>>>,
155}
156
157impl Default for QueryRuntime {
158    fn default() -> Self {
159        Self::new()
160    }
161}
162
163impl Clone for QueryRuntime {
164    fn clone(&self) -> Self {
165        Self {
166            whale: self.whale.clone(),
167            assets: self.assets.clone(),
168            locators: self.locators.clone(),
169            pending: self.pending.clone(),
170            query_registry: self.query_registry.clone(),
171            asset_key_registry: self.asset_key_registry.clone(),
172            verifiers: self.verifiers.clone(),
173            error_comparator: self.error_comparator,
174            #[cfg(feature = "inspector")]
175            sink: self.sink.clone(),
176        }
177    }
178}
179
180/// Default error comparator that treats all errors as different.
181///
182/// This is conservative - it always triggers recomputation when an error occurs.
183fn default_error_comparator(_a: &anyhow::Error, _b: &anyhow::Error) -> bool {
184    false
185}
186
187impl QueryRuntime {
188    /// Get cached output along with its revision (single atomic access).
189    fn get_cached_with_revision<Q: Query>(
190        &self,
191        key: &FullCacheKey,
192    ) -> Option<(CachedValue<Arc<Q::Output>>, RevisionCounter)> {
193        let node = self.whale.get(key)?;
194        let revision = node.changed_at;
195        let entry = node.data.as_ref()?;
196        let cached = entry.to_cached_value::<Q::Output>()?;
197        Some((cached, revision))
198    }
199}
200
201impl QueryRuntime {
202    /// Create a new query runtime with default settings.
203    pub fn new() -> Self {
204        Self::builder().build()
205    }
206
207    /// Create a builder for customizing the runtime.
208    ///
209    /// # Example
210    ///
211    /// ```ignore
212    /// let runtime = QueryRuntime::builder()
213    ///     .error_comparator(|a, b| {
214    ///         // Custom error comparison logic
215    ///         match (a.downcast_ref::<MyError>(), b.downcast_ref::<MyError>()) {
216    ///             (Some(a), Some(b)) => a == b,
217    ///             _ => false,
218    ///         }
219    ///     })
220    ///     .build();
221    /// ```
222    pub fn builder() -> QueryRuntimeBuilder {
223        QueryRuntimeBuilder::new()
224    }
225
226    /// Set the event sink for tracing/inspection.
227    ///
228    /// Pass `Some(sink)` to enable event collection, or `None` to disable.
229    ///
230    /// # Example
231    ///
232    /// ```ignore
233    /// use query_flow_inspector::EventCollector;
234    /// use std::sync::Arc;
235    ///
236    /// let collector = Arc::new(EventCollector::new());
237    /// runtime.set_sink(Some(collector.clone()));
238    /// runtime.query(MyQuery::new());
239    /// let trace = collector.trace();
240    /// ```
241    #[cfg(feature = "inspector")]
242    pub fn set_sink(&self, sink: Option<Arc<dyn EventSink>>) {
243        *self.sink.write() = sink;
244    }
245
246    /// Get the current event sink.
247    #[cfg(feature = "inspector")]
248    pub fn sink(&self) -> Option<Arc<dyn EventSink>> {
249        self.sink.read().clone()
250    }
251
252    /// Emit an event to the sink if one is set.
253    #[cfg(feature = "inspector")]
254    #[inline]
255    fn emit<F: FnOnce() -> FlowEvent>(&self, event: F) {
256        let guard = self.sink.read();
257        if let Some(ref sink) = *guard {
258            sink.emit(event());
259        }
260    }
261
262    /// Execute a query synchronously.
263    ///
264    /// Returns the cached result if valid, otherwise executes the query.
265    ///
266    /// # Errors
267    ///
268    /// - `QueryError::Suspend` - Query is waiting for async loading
269    /// - `QueryError::Cycle` - Dependency cycle detected
270    pub fn query<Q: Query>(&self, query: Q) -> Result<Arc<Q::Output>, QueryError> {
271        self.query_internal(query)
272            .and_then(|(inner_result, _)| inner_result.map_err(QueryError::UserError))
273    }
274
275    /// Internal implementation shared by query() and poll().
276    ///
277    /// Returns (result, revision) tuple where result is either Ok(output) or Err(user_error).
278    /// System errors (Suspend, Cycle, etc.) are returned as the outer Err.
279    #[allow(clippy::type_complexity)]
280    fn query_internal<Q: Query>(
281        &self,
282        query: Q,
283    ) -> Result<(Result<Arc<Q::Output>, Arc<anyhow::Error>>, RevisionCounter), QueryError> {
284        let key = query.cache_key();
285        let full_key = FullCacheKey::new::<Q, _>(&key);
286
287        // Create execution context and emit start event
288        let exec_ctx = ExecutionContext::new();
289        #[cfg(feature = "inspector")]
290        let start_time = std::time::Instant::now();
291        #[cfg(feature = "inspector")]
292        let query_key =
293            query_flow_inspector::QueryKey::new(std::any::type_name::<Q>(), full_key.debug_repr());
294
295        #[cfg(feature = "inspector")]
296        self.emit(|| FlowEvent::QueryStart {
297            span_id: exec_ctx.span_id(),
298            query: query_key.clone(),
299        });
300
301        // Check for cycles using thread-local stack
302        let cycle_detected = QUERY_STACK.with(|stack| {
303            let stack = stack.borrow();
304            stack.iter().any(|k| k == &full_key)
305        });
306
307        if cycle_detected {
308            let path = QUERY_STACK.with(|stack| {
309                let stack = stack.borrow();
310                let mut path: Vec<String> =
311                    stack.iter().map(|k| k.debug_repr().to_string()).collect();
312                path.push(full_key.debug_repr().to_string());
313                path
314            });
315
316            #[cfg(feature = "inspector")]
317            self.emit(|| FlowEvent::CycleDetected {
318                path: path
319                    .iter()
320                    .map(|s| query_flow_inspector::QueryKey::new("", s.clone()))
321                    .collect(),
322            });
323
324            #[cfg(feature = "inspector")]
325            self.emit(|| FlowEvent::QueryEnd {
326                span_id: exec_ctx.span_id(),
327                query: query_key.clone(),
328                result: query_flow_inspector::ExecutionResult::CycleDetected,
329                duration: start_time.elapsed(),
330            });
331
332            return Err(QueryError::Cycle { path });
333        }
334
335        // Check if cached and valid (with verify-then-decide pattern)
336        let current_rev = self.whale.current_revision();
337
338        // Fast path: already verified at current revision
339        if self.whale.is_verified_at(&full_key, &current_rev) {
340            // Single atomic access to get both cached value and revision
341            if let Some((cached, revision)) = self.get_cached_with_revision::<Q>(&full_key) {
342                #[cfg(feature = "inspector")]
343                self.emit(|| FlowEvent::CacheCheck {
344                    span_id: exec_ctx.span_id(),
345                    query: query_key.clone(),
346                    valid: true,
347                });
348
349                #[cfg(feature = "inspector")]
350                self.emit(|| FlowEvent::QueryEnd {
351                    span_id: exec_ctx.span_id(),
352                    query: query_key.clone(),
353                    result: query_flow_inspector::ExecutionResult::CacheHit,
354                    duration: start_time.elapsed(),
355                });
356
357                return match cached {
358                    CachedValue::Ok(output) => Ok((Ok(output), revision)),
359                    CachedValue::UserError(err) => Ok((Err(err), revision)),
360                };
361            }
362        }
363
364        // Check shallow validity (deps' changed_at ok) and try verify-then-decide
365        if self.whale.is_valid(&full_key) {
366            // Single atomic access to get both cached value and revision
367            if let Some((cached, revision)) = self.get_cached_with_revision::<Q>(&full_key) {
368                // Shallow valid but not verified - verify deps first
369                let mut deps_verified = true;
370                if let Some(deps) = self.whale.get_dependency_ids(&full_key) {
371                    for dep in deps {
372                        if let Some(verifier) = self.verifiers.get(&dep) {
373                            // Re-query dep to verify it (triggers recursive verification)
374                            if verifier.verify(self).is_err() {
375                                deps_verified = false;
376                                break;
377                            }
378                        }
379                    }
380                }
381
382                // Re-check validity after deps are verified
383                if deps_verified && self.whale.is_valid(&full_key) {
384                    // Deps didn't change their changed_at, mark as verified and use cached
385                    self.whale.mark_verified(&full_key, &current_rev);
386
387                    #[cfg(feature = "inspector")]
388                    self.emit(|| FlowEvent::CacheCheck {
389                        span_id: exec_ctx.span_id(),
390                        query: query_key.clone(),
391                        valid: true,
392                    });
393
394                    #[cfg(feature = "inspector")]
395                    self.emit(|| FlowEvent::QueryEnd {
396                        span_id: exec_ctx.span_id(),
397                        query: query_key.clone(),
398                        result: query_flow_inspector::ExecutionResult::CacheHit,
399                        duration: start_time.elapsed(),
400                    });
401
402                    return match cached {
403                        CachedValue::Ok(output) => Ok((Ok(output), revision)),
404                        CachedValue::UserError(err) => Ok((Err(err), revision)),
405                    };
406                }
407                // A dep's changed_at increased, fall through to execute
408            }
409        }
410
411        #[cfg(feature = "inspector")]
412        self.emit(|| FlowEvent::CacheCheck {
413            span_id: exec_ctx.span_id(),
414            query: query_key.clone(),
415            valid: false,
416        });
417
418        // Execute the query with cycle tracking
419        QUERY_STACK.with(|stack| {
420            stack.borrow_mut().push(full_key.clone());
421        });
422
423        let result = self.execute_query::<Q>(&query, &full_key, exec_ctx);
424
425        QUERY_STACK.with(|stack| {
426            stack.borrow_mut().pop();
427        });
428
429        // Emit end event
430        #[cfg(feature = "inspector")]
431        {
432            let exec_result = match &result {
433                Ok((_, true, _)) => query_flow_inspector::ExecutionResult::Changed,
434                Ok((_, false, _)) => query_flow_inspector::ExecutionResult::Unchanged,
435                Err(QueryError::Suspend) => query_flow_inspector::ExecutionResult::Suspended,
436                Err(QueryError::Cycle { .. }) => {
437                    query_flow_inspector::ExecutionResult::CycleDetected
438                }
439                Err(e) => query_flow_inspector::ExecutionResult::Error {
440                    message: format!("{:?}", e),
441                },
442            };
443            self.emit(|| FlowEvent::QueryEnd {
444                span_id: exec_ctx.span_id(),
445                query: query_key.clone(),
446                result: exec_result,
447                duration: start_time.elapsed(),
448            });
449        }
450
451        result.map(|(inner_result, _, revision)| (inner_result, revision))
452    }
453
454    /// Execute a query, caching the result if appropriate.
455    ///
456    /// Returns (result, output_changed, revision) tuple.
457    /// - `result`: Ok(output) for success, Err(user_error) for user errors
458    /// - System errors (Suspend, Cycle, etc.) are returned as outer Err
459    #[allow(clippy::type_complexity)]
460    fn execute_query<Q: Query>(
461        &self,
462        query: &Q,
463        full_key: &FullCacheKey,
464        exec_ctx: ExecutionContext,
465    ) -> Result<
466        (
467            Result<Arc<Q::Output>, Arc<anyhow::Error>>,
468            bool,
469            RevisionCounter,
470        ),
471        QueryError,
472    > {
473        // Create context for this query execution
474        let mut ctx = QueryContext {
475            runtime: self,
476            current_key: full_key.clone(),
477            #[cfg(feature = "inspector")]
478            parent_query_type: std::any::type_name::<Q>(),
479            #[cfg(feature = "inspector")]
480            exec_ctx,
481            deps: RefCell::new(Vec::new()),
482        };
483        // Suppress unused variable warning when inspector is disabled
484        #[cfg(not(feature = "inspector"))]
485        let _ = exec_ctx;
486
487        // Execute the query
488        let result = query.query(&mut ctx);
489
490        // Get collected dependencies
491        let deps: Vec<FullCacheKey> = ctx.deps.borrow().clone();
492
493        // Get durability for whale registration
494        let durability =
495            Durability::new(query.durability() as usize).unwrap_or(Durability::volatile());
496
497        match result {
498            Ok(output) => {
499                let output = Arc::new(output);
500
501                // Check if output changed (for early cutoff)
502                // existing_revision is Some only when output is unchanged (can reuse revision)
503                let existing_revision = if let Some((CachedValue::Ok(old), rev)) =
504                    self.get_cached_with_revision::<Q>(full_key)
505                {
506                    if Q::output_eq(&old, &output) {
507                        Some(rev) // Same output - reuse revision
508                    } else {
509                        None // Different output
510                    }
511                } else {
512                    None // No previous Ok value
513                };
514                let output_changed = existing_revision.is_none();
515
516                // Emit early cutoff check event
517                #[cfg(feature = "inspector")]
518                self.emit(|| FlowEvent::EarlyCutoffCheck {
519                    span_id: exec_ctx.span_id(),
520                    query: query_flow_inspector::QueryKey::new(
521                        std::any::type_name::<Q>(),
522                        full_key.debug_repr(),
523                    ),
524                    output_changed,
525                });
526
527                // Update whale with cached entry (atomic update of value + dependency state)
528                let entry = CachedEntry::Ok(output.clone() as Arc<dyn std::any::Any + Send + Sync>);
529                let revision = if let Some(existing_rev) = existing_revision {
530                    // confirm_unchanged doesn't change changed_at, use existing
531                    let _ = self.whale.confirm_unchanged(full_key, deps);
532                    existing_rev
533                } else {
534                    // Use new_rev from register result
535                    match self
536                        .whale
537                        .register(full_key.clone(), Some(entry), durability, deps)
538                    {
539                        Ok(result) => result.new_rev,
540                        Err(missing) => {
541                            return Err(QueryError::DependenciesRemoved {
542                                missing_keys: missing,
543                            })
544                        }
545                    }
546                };
547
548                // Register query in registry for list_queries
549                let is_new_query = self.query_registry.register(query);
550                if is_new_query {
551                    let sentinel = FullCacheKey::query_set_sentinel::<Q>();
552                    let _ = self
553                        .whale
554                        .register(sentinel, None, Durability::volatile(), vec![]);
555                }
556
557                // Store verifier for this query (for verify-then-decide pattern)
558                self.verifiers.insert(full_key.clone(), query.clone());
559
560                Ok((Ok(output), output_changed, revision))
561            }
562            Err(QueryError::UserError(err)) => {
563                // Check if error changed (for early cutoff)
564                // existing_revision is Some only when error is unchanged (can reuse revision)
565                let existing_revision = if let Some((CachedValue::UserError(old_err), rev)) =
566                    self.get_cached_with_revision::<Q>(full_key)
567                {
568                    if (self.error_comparator)(old_err.as_ref(), err.as_ref()) {
569                        Some(rev) // Same error - reuse revision
570                    } else {
571                        None // Different error
572                    }
573                } else {
574                    None // No previous UserError
575                };
576                let output_changed = existing_revision.is_none();
577
578                // Emit early cutoff check event
579                #[cfg(feature = "inspector")]
580                self.emit(|| FlowEvent::EarlyCutoffCheck {
581                    span_id: exec_ctx.span_id(),
582                    query: query_flow_inspector::QueryKey::new(
583                        std::any::type_name::<Q>(),
584                        full_key.debug_repr(),
585                    ),
586                    output_changed,
587                });
588
589                // Update whale with cached error (atomic update of value + dependency state)
590                let entry = CachedEntry::UserError(err.clone());
591                let revision = if let Some(existing_rev) = existing_revision {
592                    // confirm_unchanged doesn't change changed_at, use existing
593                    let _ = self.whale.confirm_unchanged(full_key, deps);
594                    existing_rev
595                } else {
596                    // Use new_rev from register result
597                    match self
598                        .whale
599                        .register(full_key.clone(), Some(entry), durability, deps)
600                    {
601                        Ok(result) => result.new_rev,
602                        Err(missing) => {
603                            return Err(QueryError::DependenciesRemoved {
604                                missing_keys: missing,
605                            })
606                        }
607                    }
608                };
609
610                // Register query in registry for list_queries
611                let is_new_query = self.query_registry.register(query);
612                if is_new_query {
613                    let sentinel = FullCacheKey::query_set_sentinel::<Q>();
614                    let _ = self
615                        .whale
616                        .register(sentinel, None, Durability::volatile(), vec![]);
617                }
618
619                // Store verifier for this query (for verify-then-decide pattern)
620                self.verifiers.insert(full_key.clone(), query.clone());
621
622                Ok((Err(err), output_changed, revision))
623            }
624            Err(e) => {
625                // System errors (Suspend, Cycle, Cancelled, MissingDependency) are not cached
626                Err(e)
627            }
628        }
629    }
630
631    /// Invalidate a query, forcing recomputation on next access.
632    ///
633    /// This also invalidates any queries that depend on this one.
634    pub fn invalidate<Q: Query>(&self, key: &Q::CacheKey) {
635        let full_key = FullCacheKey::new::<Q, _>(key);
636
637        #[cfg(feature = "inspector")]
638        self.emit(|| FlowEvent::QueryInvalidated {
639            query: query_flow_inspector::QueryKey::new(
640                std::any::type_name::<Q>(),
641                full_key.debug_repr(),
642            ),
643            reason: query_flow_inspector::InvalidationReason::ManualInvalidation,
644        });
645
646        // Update whale to invalidate dependents (register with None to clear cached value)
647        let _ = self
648            .whale
649            .register(full_key, None, Durability::volatile(), vec![]);
650    }
651
652    /// Remove a query from the cache entirely, freeing memory.
653    ///
654    /// Use this for GC when a query is no longer needed.
655    /// Unlike `invalidate`, this removes all traces of the query from storage.
656    /// The query will be recomputed from scratch on next access.
657    ///
658    /// This also invalidates any queries that depend on this one.
659    pub fn remove_query<Q: Query>(&self, key: &Q::CacheKey) {
660        let full_key = FullCacheKey::new::<Q, _>(key);
661
662        #[cfg(feature = "inspector")]
663        self.emit(|| FlowEvent::QueryInvalidated {
664            query: query_flow_inspector::QueryKey::new(
665                std::any::type_name::<Q>(),
666                full_key.debug_repr(),
667            ),
668            reason: query_flow_inspector::InvalidationReason::ManualInvalidation,
669        });
670
671        // Remove verifier if exists
672        self.verifiers.remove(&full_key);
673
674        // Remove from whale storage (this also handles dependent invalidation)
675        self.whale.remove(&full_key);
676
677        // Remove from registry and update sentinel for list_queries
678        if self.query_registry.remove::<Q>(key) {
679            let sentinel = FullCacheKey::query_set_sentinel::<Q>();
680            let _ = self
681                .whale
682                .register(sentinel, None, Durability::volatile(), vec![]);
683        }
684    }
685
686    /// Clear all cached values by removing all nodes from whale.
687    ///
688    /// Note: This is a relatively expensive operation as it iterates through all keys.
689    pub fn clear_cache(&self) {
690        let keys = self.whale.keys();
691        for key in keys {
692            self.whale.remove(&key);
693        }
694    }
695
696    /// Poll a query, returning both the result and its change revision.
697    ///
698    /// This is useful for implementing subscription patterns where you need to
699    /// detect changes efficiently. Compare the returned `revision` with a
700    /// previously stored value to determine if the query result has changed.
701    ///
702    /// The returned `Polled` contains a `Result<Arc<Q::Output>, Arc<anyhow::Error>>`
703    /// as its value, allowing you to track revision changes for both success and
704    /// user error cases.
705    ///
706    /// # Example
707    ///
708    /// ```ignore
709    /// struct Subscription<Q: Query> {
710    ///     query: Q,
711    ///     last_revision: RevisionCounter,
712    ///     tx: Sender<Result<Arc<Q::Output>, Arc<anyhow::Error>>>,
713    /// }
714    ///
715    /// // Polling loop
716    /// for sub in &mut subscriptions {
717    ///     let result = runtime.poll(sub.query.clone())?;
718    ///     if result.revision > sub.last_revision {
719    ///         sub.tx.send(result.value.clone())?;
720    ///         sub.last_revision = result.revision;
721    ///     }
722    /// }
723    /// ```
724    ///
725    /// # Errors
726    ///
727    /// Returns `Err` only for system errors (Suspend, Cycle, etc.).
728    /// User errors are returned as `Ok(Polled { value: Err(error), ... })`.
729    #[allow(clippy::type_complexity)]
730    pub fn poll<Q: Query>(
731        &self,
732        query: Q,
733    ) -> Result<Polled<Result<Arc<Q::Output>, Arc<anyhow::Error>>>, QueryError> {
734        let (value, revision) = self.query_internal(query)?;
735        Ok(Polled { value, revision })
736    }
737
738    /// Get the change revision of a query without executing it.
739    ///
740    /// Returns `None` if the query has never been executed.
741    ///
742    /// This is useful for checking if a query has changed since the last poll
743    /// without the cost of executing the query.
744    ///
745    /// # Example
746    ///
747    /// ```ignore
748    /// // Check if query has changed before deciding to poll
749    /// if let Some(rev) = runtime.changed_at::<MyQuery>(&key) {
750    ///     if rev > last_known_revision {
751    ///         let result = runtime.query(MyQuery::new(key))?;
752    ///         // Process result...
753    ///     }
754    /// }
755    /// ```
756    pub fn changed_at<Q: Query>(&self, key: &Q::CacheKey) -> Option<RevisionCounter> {
757        let full_key = FullCacheKey::new::<Q, _>(key);
758        self.whale.get(&full_key).map(|node| node.changed_at)
759    }
760}
761
762// ============================================================================
763// Builder
764// ============================================================================
765
766/// Builder for [`QueryRuntime`] with customizable settings.
767///
768/// # Example
769///
770/// ```ignore
771/// let runtime = QueryRuntime::builder()
772///     .error_comparator(|a, b| {
773///         // Treat all errors of the same type as equal
774///         a.downcast_ref::<std::io::Error>().is_some()
775///             == b.downcast_ref::<std::io::Error>().is_some()
776///     })
777///     .build();
778/// ```
779pub struct QueryRuntimeBuilder {
780    error_comparator: ErrorComparator,
781}
782
783impl Default for QueryRuntimeBuilder {
784    fn default() -> Self {
785        Self::new()
786    }
787}
788
789impl QueryRuntimeBuilder {
790    /// Create a new builder with default settings.
791    pub fn new() -> Self {
792        Self {
793            error_comparator: default_error_comparator,
794        }
795    }
796
797    /// Set the error comparator function for early cutoff optimization.
798    ///
799    /// When a query returns `QueryError::UserError`, this function is used
800    /// to compare it with the previously cached error. If they are equal,
801    /// downstream queries can skip recomputation (early cutoff).
802    ///
803    /// The default comparator returns `false` for all errors, meaning errors
804    /// are always considered different (conservative, always recomputes).
805    ///
806    /// # Example
807    ///
808    /// ```ignore
809    /// // Treat errors as equal if they have the same display message
810    /// let runtime = QueryRuntime::builder()
811    ///     .error_comparator(|a, b| a.to_string() == b.to_string())
812    ///     .build();
813    /// ```
814    pub fn error_comparator(mut self, f: ErrorComparator) -> Self {
815        self.error_comparator = f;
816        self
817    }
818
819    /// Build the runtime with the configured settings.
820    pub fn build(self) -> QueryRuntime {
821        QueryRuntime {
822            whale: WhaleRuntime::new(),
823            assets: Arc::new(AssetStorage::new()),
824            locators: Arc::new(LocatorStorage::new()),
825            pending: Arc::new(PendingStorage::new()),
826            query_registry: Arc::new(QueryRegistry::new()),
827            asset_key_registry: Arc::new(AssetKeyRegistry::new()),
828            verifiers: Arc::new(VerifierStorage::new()),
829            error_comparator: self.error_comparator,
830            #[cfg(feature = "inspector")]
831            sink: Arc::new(parking_lot::RwLock::new(None)),
832        }
833    }
834}
835
836// ============================================================================
837// Asset API
838// ============================================================================
839
840impl QueryRuntime {
841    /// Register an asset locator for a specific asset key type.
842    ///
843    /// Only one locator can be registered per key type. Later registrations
844    /// replace earlier ones.
845    ///
846    /// # Example
847    ///
848    /// ```ignore
849    /// let runtime = QueryRuntime::new();
850    /// runtime.register_asset_locator(FileSystemLocator::new("/assets"));
851    /// ```
852    pub fn register_asset_locator<K, L>(&self, locator: L)
853    where
854        K: AssetKey,
855        L: AssetLocator<K>,
856    {
857        self.locators.insert::<K, L>(locator);
858    }
859
860    /// Get an iterator over pending asset requests.
861    ///
862    /// Returns assets that have been requested but not yet resolved.
863    /// The user should fetch these externally and call `resolve_asset()`.
864    ///
865    /// # Example
866    ///
867    /// ```ignore
868    /// for pending in runtime.pending_assets() {
869    ///     if let Some(path) = pending.key::<FilePath>() {
870    ///         let content = fetch_file(path);
871    ///         runtime.resolve_asset(path.clone(), content);
872    ///     }
873    /// }
874    /// ```
875    pub fn pending_assets(&self) -> Vec<PendingAsset> {
876        self.pending.get_all()
877    }
878
879    /// Get pending assets filtered by key type.
880    pub fn pending_assets_of<K: AssetKey>(&self) -> Vec<K> {
881        self.pending.get_of_type::<K>()
882    }
883
884    /// Check if there are any pending assets.
885    pub fn has_pending_assets(&self) -> bool {
886        !self.pending.is_empty()
887    }
888
889    /// Resolve an asset with its loaded value.
890    ///
891    /// This marks the asset as ready and invalidates any queries that
892    /// depend on it (if the value changed), triggering recomputation on next access.
893    ///
894    /// This method is idempotent - resolving with the same value (via `asset_eq`)
895    /// will not trigger downstream recomputation.
896    ///
897    /// Uses the asset key's default durability.
898    ///
899    /// # Example
900    ///
901    /// ```ignore
902    /// let content = std::fs::read_to_string(&path)?;
903    /// runtime.resolve_asset(FilePath(path), content);
904    /// ```
905    pub fn resolve_asset<K: AssetKey>(&self, key: K, value: K::Asset) {
906        let durability = key.durability();
907        self.resolve_asset_internal(key, value, durability);
908    }
909
910    /// Resolve an asset with a specific durability level.
911    ///
912    /// Use this to override the asset key's default durability.
913    pub fn resolve_asset_with_durability<K: AssetKey>(
914        &self,
915        key: K,
916        value: K::Asset,
917        durability: DurabilityLevel,
918    ) {
919        self.resolve_asset_internal(key, value, durability);
920    }
921
922    fn resolve_asset_internal<K: AssetKey>(
923        &self,
924        key: K,
925        value: K::Asset,
926        durability_level: DurabilityLevel,
927    ) {
928        let full_asset_key = FullAssetKey::new(&key);
929        let full_cache_key = FullCacheKey::from_asset_key(&full_asset_key);
930
931        // Check for early cutoff
932        let changed = if let Some(old_value) = self.assets.get_ready::<K>(&full_asset_key) {
933            !K::asset_eq(&old_value, &value)
934        } else {
935            true // Loading or NotFound -> Ready is a change
936        };
937
938        // Emit asset resolved event
939        #[cfg(feature = "inspector")]
940        self.emit(|| FlowEvent::AssetResolved {
941            asset: query_flow_inspector::AssetKey::new(
942                std::any::type_name::<K>(),
943                format!("{:?}", key),
944            ),
945            changed,
946        });
947
948        // Store the new value
949        self.assets
950            .insert_ready::<K>(full_asset_key.clone(), Arc::new(value));
951
952        // Remove from pending
953        self.pending.remove(&full_asset_key);
954
955        // Update whale dependency tracking
956        let durability =
957            Durability::new(durability_level.as_u8() as usize).unwrap_or(Durability::volatile());
958
959        if changed {
960            // Register with new changed_at to invalidate dependents
961            let _ = self
962                .whale
963                .register(full_cache_key, None, durability, vec![]);
964        } else {
965            // Early cutoff - keep old changed_at
966            let _ = self.whale.confirm_unchanged(&full_cache_key, vec![]);
967        }
968
969        // Register asset key in registry for list_asset_keys
970        let is_new_asset = self.asset_key_registry.register(&key);
971        if is_new_asset {
972            // Update sentinel to invalidate list_asset_keys dependents
973            let sentinel = FullCacheKey::asset_key_set_sentinel::<K>();
974            let _ = self
975                .whale
976                .register(sentinel, None, Durability::volatile(), vec![]);
977        }
978    }
979
980    /// Invalidate an asset, forcing queries to re-request it.
981    ///
982    /// The asset will be marked as loading and added to pending assets.
983    /// Dependent queries will suspend until the asset is resolved again.
984    ///
985    /// # Example
986    ///
987    /// ```ignore
988    /// // File was modified externally
989    /// runtime.invalidate_asset(&FilePath("config.json".into()));
990    /// // Queries depending on this asset will now suspend
991    /// // User should fetch the new value and call resolve_asset
992    /// ```
993    pub fn invalidate_asset<K: AssetKey>(&self, key: &K) {
994        let full_asset_key = FullAssetKey::new(key);
995        let full_cache_key = FullCacheKey::from_asset_key(&full_asset_key);
996
997        // Emit asset invalidated event
998        #[cfg(feature = "inspector")]
999        self.emit(|| FlowEvent::AssetInvalidated {
1000            asset: query_flow_inspector::AssetKey::new(
1001                std::any::type_name::<K>(),
1002                format!("{:?}", key),
1003            ),
1004        });
1005
1006        // Mark as loading
1007        self.assets
1008            .insert(full_asset_key.clone(), AssetState::Loading);
1009
1010        // Add to pending
1011        self.pending.insert::<K>(full_asset_key, key.clone());
1012
1013        // Update whale to invalidate dependents (use volatile during loading)
1014        let _ = self
1015            .whale
1016            .register(full_cache_key, None, Durability::volatile(), vec![]);
1017    }
1018
1019    /// Remove an asset from the cache entirely.
1020    ///
1021    /// Unlike `invalidate_asset`, this removes all traces of the asset.
1022    /// Dependent queries will go through the locator again on next access.
1023    pub fn remove_asset<K: AssetKey>(&self, key: &K) {
1024        let full_asset_key = FullAssetKey::new(key);
1025        let full_cache_key = FullCacheKey::from_asset_key(&full_asset_key);
1026
1027        // First, register a change to invalidate dependents
1028        // This ensures queries that depend on this asset will recompute
1029        let _ = self
1030            .whale
1031            .register(full_cache_key.clone(), None, Durability::volatile(), vec![]);
1032
1033        // Then remove the asset from storage
1034        self.assets.remove(&full_asset_key);
1035        self.pending.remove(&full_asset_key);
1036
1037        // Finally remove from whale tracking
1038        self.whale.remove(&full_cache_key);
1039
1040        // Remove from registry and update sentinel for list_asset_keys
1041        if self.asset_key_registry.remove::<K>(key) {
1042            let sentinel = FullCacheKey::asset_key_set_sentinel::<K>();
1043            let _ = self
1044                .whale
1045                .register(sentinel, None, Durability::volatile(), vec![]);
1046        }
1047    }
1048
1049    /// Get an asset by key without tracking dependencies.
1050    ///
1051    /// Unlike `QueryContext::asset()`, this method does NOT register the caller
1052    /// as a dependent of the asset. Use this for direct asset access outside
1053    /// of query execution.
1054    ///
1055    /// # Returns
1056    ///
1057    /// - `Ok(LoadingState::Ready(value))` - Asset is loaded and ready
1058    /// - `Ok(LoadingState::Loading)` - Asset is still loading (added to pending)
1059    /// - `Err(QueryError::MissingDependency)` - Asset was not found
1060    pub fn get_asset<K: AssetKey>(
1061        &self,
1062        key: &K,
1063    ) -> Result<LoadingState<Arc<K::Asset>>, QueryError> {
1064        self.get_asset_internal(key)
1065    }
1066
1067    /// Internal: Get asset state, checking cache and locator.
1068    fn get_asset_internal<K: AssetKey>(
1069        &self,
1070        key: &K,
1071    ) -> Result<LoadingState<Arc<K::Asset>>, QueryError> {
1072        let full_asset_key = FullAssetKey::new(key);
1073        let full_cache_key = FullCacheKey::from_asset_key(&full_asset_key);
1074
1075        // Helper to emit AssetRequested event
1076        #[cfg(feature = "inspector")]
1077        let emit_requested = |runtime: &Self, state: query_flow_inspector::AssetState| {
1078            runtime.emit(|| FlowEvent::AssetRequested {
1079                asset: query_flow_inspector::AssetKey::new(
1080                    std::any::type_name::<K>(),
1081                    format!("{:?}", key),
1082                ),
1083                state,
1084            });
1085        };
1086
1087        // Check cache first
1088        if let Some(state) = self.assets.get(&full_asset_key) {
1089            // Check if whale thinks it's valid
1090            if self.whale.is_valid(&full_cache_key) {
1091                return match state {
1092                    AssetState::Loading => {
1093                        #[cfg(feature = "inspector")]
1094                        emit_requested(self, query_flow_inspector::AssetState::Loading);
1095                        Ok(LoadingState::Loading)
1096                    }
1097                    AssetState::Ready(arc) => {
1098                        #[cfg(feature = "inspector")]
1099                        emit_requested(self, query_flow_inspector::AssetState::Ready);
1100                        match arc.downcast::<K::Asset>() {
1101                            Ok(value) => Ok(LoadingState::Ready(value)),
1102                            Err(_) => Err(QueryError::MissingDependency {
1103                                description: format!("Asset type mismatch: {:?}", key),
1104                            }),
1105                        }
1106                    }
1107                    AssetState::NotFound => {
1108                        #[cfg(feature = "inspector")]
1109                        emit_requested(self, query_flow_inspector::AssetState::NotFound);
1110                        Err(QueryError::MissingDependency {
1111                            description: format!("Asset not found: {:?}", key),
1112                        })
1113                    }
1114                };
1115            }
1116        }
1117
1118        // Check if there's a registered locator
1119        if let Some(locator) = self.locators.get(TypeId::of::<K>()) {
1120            if let Some(state) = locator.locate_any(key) {
1121                // Store the state
1122                self.assets.insert(full_asset_key.clone(), state.clone());
1123
1124                match state {
1125                    AssetState::Ready(arc) => {
1126                        #[cfg(feature = "inspector")]
1127                        emit_requested(self, query_flow_inspector::AssetState::Ready);
1128
1129                        // Register with whale
1130                        let durability = Durability::new(key.durability().as_u8() as usize)
1131                            .unwrap_or(Durability::volatile());
1132                        self.whale
1133                            .register(full_cache_key, None, durability, vec![])
1134                            .expect("register with no dependencies cannot fail");
1135
1136                        match arc.downcast::<K::Asset>() {
1137                            Ok(value) => return Ok(LoadingState::Ready(value)),
1138                            Err(_) => {
1139                                return Err(QueryError::MissingDependency {
1140                                    description: format!("Asset type mismatch: {:?}", key),
1141                                })
1142                            }
1143                        }
1144                    }
1145                    AssetState::Loading => {
1146                        #[cfg(feature = "inspector")]
1147                        emit_requested(self, query_flow_inspector::AssetState::Loading);
1148                        self.pending.insert::<K>(full_asset_key, key.clone());
1149
1150                        // Register in whale so queries can depend on this asset
1151                        self.whale
1152                            .register(full_cache_key, None, Durability::volatile(), vec![])
1153                            .expect("register with no dependencies cannot fail");
1154
1155                        return Ok(LoadingState::Loading);
1156                    }
1157                    AssetState::NotFound => {
1158                        #[cfg(feature = "inspector")]
1159                        emit_requested(self, query_flow_inspector::AssetState::NotFound);
1160                        return Err(QueryError::MissingDependency {
1161                            description: format!("Asset not found: {:?}", key),
1162                        });
1163                    }
1164                }
1165            }
1166        }
1167
1168        // No locator registered or locator returned None - mark as pending
1169        #[cfg(feature = "inspector")]
1170        emit_requested(self, query_flow_inspector::AssetState::Loading);
1171        self.assets
1172            .insert(full_asset_key.clone(), AssetState::Loading);
1173        self.pending
1174            .insert::<K>(full_asset_key.clone(), key.clone());
1175
1176        // Register in whale so queries can depend on this asset
1177        self.whale
1178            .register(full_cache_key, None, Durability::volatile(), vec![])
1179            .expect("register with no dependencies cannot fail");
1180
1181        Ok(LoadingState::Loading)
1182    }
1183}
1184
1185/// Context provided to queries during execution.
1186///
1187/// Use this to access dependencies via `query()`.
1188pub struct QueryContext<'a> {
1189    runtime: &'a QueryRuntime,
1190    #[cfg_attr(not(feature = "inspector"), allow(dead_code))]
1191    current_key: FullCacheKey,
1192    #[cfg(feature = "inspector")]
1193    parent_query_type: &'static str,
1194    #[cfg(feature = "inspector")]
1195    exec_ctx: ExecutionContext,
1196    deps: RefCell<Vec<FullCacheKey>>,
1197}
1198
1199impl<'a> QueryContext<'a> {
1200    /// Query a dependency.
1201    ///
1202    /// The dependency is automatically tracked for invalidation.
1203    ///
1204    /// # Example
1205    ///
1206    /// ```ignore
1207    /// fn query(&self, ctx: &mut QueryContext) -> Result<Self::Output, QueryError> {
1208    ///     let dep_result = ctx.query(OtherQuery { id: self.id })?;
1209    ///     Ok(process(&dep_result))
1210    /// }
1211    /// ```
1212    pub fn query<Q: Query>(&mut self, query: Q) -> Result<Arc<Q::Output>, QueryError> {
1213        let key = query.cache_key();
1214        let full_key = FullCacheKey::new::<Q, _>(&key);
1215
1216        // Emit dependency registered event
1217        #[cfg(feature = "inspector")]
1218        self.runtime.emit(|| FlowEvent::DependencyRegistered {
1219            span_id: self.exec_ctx.span_id(),
1220            parent: query_flow_inspector::QueryKey::new(
1221                self.parent_query_type,
1222                self.current_key.debug_repr(),
1223            ),
1224            dependency: query_flow_inspector::QueryKey::new(
1225                std::any::type_name::<Q>(),
1226                full_key.debug_repr(),
1227            ),
1228        });
1229
1230        // Record this as a dependency
1231        self.deps.borrow_mut().push(full_key.clone());
1232
1233        // Execute the query
1234        self.runtime.query(query)
1235    }
1236
1237    /// Access an asset, tracking it as a dependency.
1238    ///
1239    /// Returns `LoadingState<Arc<K::Asset>>`:
1240    /// - `LoadingState::Loading` if the asset is still being loaded
1241    /// - `LoadingState::Ready(value)` if the asset is available
1242    ///
1243    /// # Example
1244    ///
1245    /// ```ignore
1246    /// #[query]
1247    /// fn process_file(ctx: &mut QueryContext, path: FilePath) -> Result<Output, QueryError> {
1248    ///     let content = ctx.asset(&path)?.suspend()?;
1249    ///     // Process content...
1250    ///     Ok(output)
1251    /// }
1252    /// ```
1253    ///
1254    /// # Errors
1255    ///
1256    /// Returns `Err(QueryError::MissingDependency)` if the asset was not found.
1257    pub fn asset<K: AssetKey>(
1258        &mut self,
1259        key: &K,
1260    ) -> Result<LoadingState<Arc<K::Asset>>, QueryError> {
1261        let full_asset_key = FullAssetKey::new(key);
1262        let full_cache_key = FullCacheKey::from_asset_key(&full_asset_key);
1263
1264        // Emit asset dependency registered event
1265        #[cfg(feature = "inspector")]
1266        self.runtime.emit(|| FlowEvent::AssetDependencyRegistered {
1267            span_id: self.exec_ctx.span_id(),
1268            parent: query_flow_inspector::QueryKey::new(
1269                self.parent_query_type,
1270                self.current_key.debug_repr(),
1271            ),
1272            asset: query_flow_inspector::AssetKey::new(
1273                std::any::type_name::<K>(),
1274                format!("{:?}", key),
1275            ),
1276        });
1277
1278        // Record dependency on this asset
1279        self.deps.borrow_mut().push(full_cache_key);
1280
1281        // Get asset from runtime
1282        let result = self.runtime.get_asset_internal(key);
1283
1284        // Emit missing dependency event on error
1285        #[cfg(feature = "inspector")]
1286        if let Err(QueryError::MissingDependency { ref description }) = result {
1287            self.runtime.emit(|| FlowEvent::MissingDependency {
1288                query: query_flow_inspector::QueryKey::new(
1289                    self.parent_query_type,
1290                    self.current_key.debug_repr(),
1291                ),
1292                dependency_description: description.clone(),
1293            });
1294        }
1295
1296        result
1297    }
1298
1299    /// List all query instances of type Q that have been registered.
1300    ///
1301    /// This method establishes a dependency on the "set" of queries of type Q.
1302    /// The calling query will be invalidated when:
1303    /// - A new query of type Q is first executed (added to set)
1304    ///
1305    /// The calling query will NOT be invalidated when:
1306    /// - An individual query of type Q has its value change
1307    ///
1308    /// # Example
1309    ///
1310    /// ```ignore
1311    /// #[query]
1312    /// fn all_results(ctx: &mut QueryContext) -> Result<Vec<i32>, QueryError> {
1313    ///     let queries = ctx.list_queries::<MyQuery>();
1314    ///     let mut results = Vec::new();
1315    ///     for q in queries {
1316    ///         results.push(*ctx.query(q)?);
1317    ///     }
1318    ///     Ok(results)
1319    /// }
1320    /// ```
1321    pub fn list_queries<Q: Query>(&mut self) -> Vec<Q> {
1322        // Record dependency on the sentinel (set-level dependency)
1323        let sentinel = FullCacheKey::query_set_sentinel::<Q>();
1324
1325        #[cfg(feature = "inspector")]
1326        self.runtime.emit(|| FlowEvent::DependencyRegistered {
1327            span_id: self.exec_ctx.span_id(),
1328            parent: query_flow_inspector::QueryKey::new(
1329                self.parent_query_type,
1330                self.current_key.debug_repr(),
1331            ),
1332            dependency: query_flow_inspector::QueryKey::new("QuerySet", sentinel.debug_repr()),
1333        });
1334
1335        // Ensure sentinel exists in whale (for dependency tracking)
1336        if self.runtime.whale.get(&sentinel).is_none() {
1337            let _ =
1338                self.runtime
1339                    .whale
1340                    .register(sentinel.clone(), None, Durability::volatile(), vec![]);
1341        }
1342
1343        self.deps.borrow_mut().push(sentinel);
1344
1345        // Return all registered queries
1346        self.runtime.query_registry.get_all::<Q>()
1347    }
1348
1349    /// List all asset keys of type K that have been registered.
1350    ///
1351    /// This method establishes a dependency on the "set" of asset keys of type K.
1352    /// The calling query will be invalidated when:
1353    /// - A new asset of type K is resolved for the first time (added to set)
1354    /// - An asset of type K is removed via remove_asset
1355    ///
1356    /// The calling query will NOT be invalidated when:
1357    /// - An individual asset's value changes (use `ctx.asset()` for that)
1358    ///
1359    /// # Example
1360    ///
1361    /// ```ignore
1362    /// #[query]
1363    /// fn all_configs(ctx: &mut QueryContext) -> Result<Vec<String>, QueryError> {
1364    ///     let keys = ctx.list_asset_keys::<ConfigFile>();
1365    ///     let mut contents = Vec::new();
1366    ///     for key in keys {
1367    ///         let content = ctx.asset(&key)?.suspend()?;
1368    ///         contents.push((*content).clone());
1369    ///     }
1370    ///     Ok(contents)
1371    /// }
1372    /// ```
1373    pub fn list_asset_keys<K: AssetKey>(&mut self) -> Vec<K> {
1374        // Record dependency on the sentinel (set-level dependency)
1375        let sentinel = FullCacheKey::asset_key_set_sentinel::<K>();
1376
1377        #[cfg(feature = "inspector")]
1378        self.runtime.emit(|| FlowEvent::AssetDependencyRegistered {
1379            span_id: self.exec_ctx.span_id(),
1380            parent: query_flow_inspector::QueryKey::new(
1381                self.parent_query_type,
1382                self.current_key.debug_repr(),
1383            ),
1384            asset: query_flow_inspector::AssetKey::new("AssetKeySet", sentinel.debug_repr()),
1385        });
1386
1387        // Ensure sentinel exists in whale (for dependency tracking)
1388        if self.runtime.whale.get(&sentinel).is_none() {
1389            let _ =
1390                self.runtime
1391                    .whale
1392                    .register(sentinel.clone(), None, Durability::volatile(), vec![]);
1393        }
1394
1395        self.deps.borrow_mut().push(sentinel);
1396
1397        // Return all registered asset keys
1398        self.runtime.asset_key_registry.get_all::<K>()
1399    }
1400}
1401
1402#[cfg(test)]
1403mod tests {
1404    use super::*;
1405
1406    #[test]
1407    fn test_simple_query() {
1408        #[derive(Clone)]
1409        struct Add {
1410            a: i32,
1411            b: i32,
1412        }
1413
1414        impl Query for Add {
1415            type CacheKey = (i32, i32);
1416            type Output = i32;
1417
1418            fn cache_key(&self) -> Self::CacheKey {
1419                (self.a, self.b)
1420            }
1421
1422            fn query(&self, _ctx: &mut QueryContext) -> Result<Self::Output, QueryError> {
1423                Ok(self.a + self.b)
1424            }
1425
1426            fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
1427                old == new
1428            }
1429        }
1430
1431        let runtime = QueryRuntime::new();
1432
1433        let result = runtime.query(Add { a: 1, b: 2 }).unwrap();
1434        assert_eq!(*result, 3);
1435
1436        // Second query should be cached
1437        let result2 = runtime.query(Add { a: 1, b: 2 }).unwrap();
1438        assert_eq!(*result2, 3);
1439    }
1440
1441    #[test]
1442    fn test_dependent_queries() {
1443        #[derive(Clone)]
1444        struct Base {
1445            value: i32,
1446        }
1447
1448        impl Query for Base {
1449            type CacheKey = i32;
1450            type Output = i32;
1451
1452            fn cache_key(&self) -> Self::CacheKey {
1453                self.value
1454            }
1455
1456            fn query(&self, _ctx: &mut QueryContext) -> Result<Self::Output, QueryError> {
1457                Ok(self.value * 2)
1458            }
1459
1460            fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
1461                old == new
1462            }
1463        }
1464
1465        #[derive(Clone)]
1466        struct Derived {
1467            base_value: i32,
1468        }
1469
1470        impl Query for Derived {
1471            type CacheKey = i32;
1472            type Output = i32;
1473
1474            fn cache_key(&self) -> Self::CacheKey {
1475                self.base_value
1476            }
1477
1478            fn query(&self, ctx: &mut QueryContext) -> Result<Self::Output, QueryError> {
1479                let base = ctx.query(Base {
1480                    value: self.base_value,
1481                })?;
1482                Ok(*base + 10)
1483            }
1484
1485            fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
1486                old == new
1487            }
1488        }
1489
1490        let runtime = QueryRuntime::new();
1491
1492        let result = runtime.query(Derived { base_value: 5 }).unwrap();
1493        assert_eq!(*result, 20); // 5 * 2 + 10
1494    }
1495
1496    #[test]
1497    fn test_cycle_detection() {
1498        #[derive(Clone)]
1499        struct CycleA {
1500            id: i32,
1501        }
1502
1503        #[derive(Clone)]
1504        struct CycleB {
1505            id: i32,
1506        }
1507
1508        impl Query for CycleA {
1509            type CacheKey = i32;
1510            type Output = i32;
1511
1512            fn cache_key(&self) -> Self::CacheKey {
1513                self.id
1514            }
1515
1516            fn query(&self, ctx: &mut QueryContext) -> Result<Self::Output, QueryError> {
1517                let b = ctx.query(CycleB { id: self.id })?;
1518                Ok(*b + 1)
1519            }
1520
1521            fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
1522                old == new
1523            }
1524        }
1525
1526        impl Query for CycleB {
1527            type CacheKey = i32;
1528            type Output = i32;
1529
1530            fn cache_key(&self) -> Self::CacheKey {
1531                self.id
1532            }
1533
1534            fn query(&self, ctx: &mut QueryContext) -> Result<Self::Output, QueryError> {
1535                let a = ctx.query(CycleA { id: self.id })?;
1536                Ok(*a + 1)
1537            }
1538
1539            fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
1540                old == new
1541            }
1542        }
1543
1544        let runtime = QueryRuntime::new();
1545
1546        let result = runtime.query(CycleA { id: 1 });
1547        assert!(matches!(result, Err(QueryError::Cycle { .. })));
1548    }
1549
1550    #[test]
1551    fn test_fallible_query() {
1552        #[derive(Clone)]
1553        struct ParseInt {
1554            input: String,
1555        }
1556
1557        impl Query for ParseInt {
1558            type CacheKey = String;
1559            type Output = Result<i32, std::num::ParseIntError>;
1560
1561            fn cache_key(&self) -> Self::CacheKey {
1562                self.input.clone()
1563            }
1564
1565            fn query(&self, _ctx: &mut QueryContext) -> Result<Self::Output, QueryError> {
1566                Ok(self.input.parse())
1567            }
1568
1569            fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
1570                old == new
1571            }
1572        }
1573
1574        let runtime = QueryRuntime::new();
1575
1576        // Valid parse
1577        let result = runtime
1578            .query(ParseInt {
1579                input: "42".to_string(),
1580            })
1581            .unwrap();
1582        assert_eq!(*result, Ok(42));
1583
1584        // Invalid parse - system succeeds, user error in output
1585        let result = runtime
1586            .query(ParseInt {
1587                input: "not_a_number".to_string(),
1588            })
1589            .unwrap();
1590        assert!(result.is_err());
1591    }
1592
1593    // Macro tests
1594    mod macro_tests {
1595        use super::*;
1596        use crate::query;
1597
1598        #[query]
1599        fn add(ctx: &mut QueryContext, a: i32, b: i32) -> Result<i32, QueryError> {
1600            let _ = ctx; // silence unused warning
1601            Ok(a + b)
1602        }
1603
1604        #[test]
1605        fn test_macro_basic() {
1606            let runtime = QueryRuntime::new();
1607            let result = runtime.query(Add::new(1, 2)).unwrap();
1608            assert_eq!(*result, 3);
1609        }
1610
1611        #[query(durability = 2)]
1612        fn with_durability(ctx: &mut QueryContext, x: i32) -> Result<i32, QueryError> {
1613            let _ = ctx;
1614            Ok(x * 2)
1615        }
1616
1617        #[test]
1618        fn test_macro_durability() {
1619            let runtime = QueryRuntime::new();
1620            let result = runtime.query(WithDurability::new(5)).unwrap();
1621            assert_eq!(*result, 10);
1622        }
1623
1624        #[query(keys(id))]
1625        fn with_key_selection(
1626            ctx: &mut QueryContext,
1627            id: u32,
1628            include_extra: bool,
1629        ) -> Result<String, QueryError> {
1630            let _ = ctx;
1631            Ok(format!("id={}, extra={}", id, include_extra))
1632        }
1633
1634        #[test]
1635        fn test_macro_key_selection() {
1636            let runtime = QueryRuntime::new();
1637
1638            // Same id, different include_extra - should return cached
1639            let r1 = runtime.query(WithKeySelection::new(1, true)).unwrap();
1640            let r2 = runtime.query(WithKeySelection::new(1, false)).unwrap();
1641
1642            // Both should have same value because only `id` is the key
1643            assert_eq!(*r1, "id=1, extra=true");
1644            assert_eq!(*r2, "id=1, extra=true"); // Cached!
1645        }
1646
1647        #[query]
1648        fn dependent(ctx: &mut QueryContext, a: i32, b: i32) -> Result<i32, QueryError> {
1649            let sum = ctx.query(Add::new(*a, *b))?;
1650            Ok(*sum * 2)
1651        }
1652
1653        #[test]
1654        fn test_macro_dependencies() {
1655            let runtime = QueryRuntime::new();
1656            let result = runtime.query(Dependent::new(3, 4)).unwrap();
1657            assert_eq!(*result, 14); // (3 + 4) * 2
1658        }
1659
1660        #[query(output_eq)]
1661        fn with_output_eq(ctx: &mut QueryContext, x: i32) -> Result<i32, QueryError> {
1662            let _ = ctx;
1663            Ok(*x * 2)
1664        }
1665
1666        #[test]
1667        fn test_macro_output_eq() {
1668            let runtime = QueryRuntime::new();
1669            let result = runtime.query(WithOutputEq::new(5)).unwrap();
1670            assert_eq!(*result, 10);
1671        }
1672
1673        #[query(name = "CustomName")]
1674        fn original_name(ctx: &mut QueryContext, x: i32) -> Result<i32, QueryError> {
1675            let _ = ctx;
1676            Ok(*x)
1677        }
1678
1679        #[test]
1680        fn test_macro_custom_name() {
1681            let runtime = QueryRuntime::new();
1682            let result = runtime.query(CustomName::new(42)).unwrap();
1683            assert_eq!(*result, 42);
1684        }
1685
1686        // Test that attribute macros like #[tracing::instrument] are preserved
1687        // We use #[allow(unused_variables)] and #[inline] as test attributes since
1688        // they don't require external dependencies.
1689        #[allow(unused_variables)]
1690        #[inline]
1691        #[query]
1692        fn with_attributes(ctx: &mut QueryContext, x: i32) -> Result<i32, QueryError> {
1693            // This would warn without #[allow(unused_variables)] on the generated method
1694            let unused_var = 42;
1695            Ok(*x * 2)
1696        }
1697
1698        #[test]
1699        fn test_macro_preserves_attributes() {
1700            let runtime = QueryRuntime::new();
1701            // If attributes weren't preserved, this might warn about unused_var
1702            let result = runtime.query(WithAttributes::new(5)).unwrap();
1703            assert_eq!(*result, 10);
1704        }
1705    }
1706
1707    // Tests for poll() and changed_at()
1708    mod poll_tests {
1709        use super::*;
1710
1711        #[derive(Clone)]
1712        struct Counter {
1713            id: i32,
1714        }
1715
1716        impl Query for Counter {
1717            type CacheKey = i32;
1718            type Output = i32;
1719
1720            fn cache_key(&self) -> Self::CacheKey {
1721                self.id
1722            }
1723
1724            fn query(&self, _ctx: &mut QueryContext) -> Result<Self::Output, QueryError> {
1725                Ok(self.id * 10)
1726            }
1727
1728            fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
1729                old == new
1730            }
1731        }
1732
1733        #[test]
1734        fn test_poll_returns_value_and_revision() {
1735            let runtime = QueryRuntime::new();
1736
1737            let result = runtime.poll(Counter { id: 1 }).unwrap();
1738
1739            // Value should be correct - access through Result and Arc
1740            assert_eq!(**result.value.as_ref().unwrap(), 10);
1741
1742            // Revision should be non-zero after first execution
1743            assert!(result.revision > 0);
1744        }
1745
1746        #[test]
1747        fn test_poll_revision_stable_on_cache_hit() {
1748            let runtime = QueryRuntime::new();
1749
1750            // First poll
1751            let result1 = runtime.poll(Counter { id: 1 }).unwrap();
1752            let rev1 = result1.revision;
1753
1754            // Second poll (cache hit)
1755            let result2 = runtime.poll(Counter { id: 1 }).unwrap();
1756            let rev2 = result2.revision;
1757
1758            // Revision should be the same (no change)
1759            assert_eq!(rev1, rev2);
1760        }
1761
1762        #[test]
1763        fn test_poll_revision_changes_on_invalidate() {
1764            let runtime = QueryRuntime::new();
1765
1766            // First poll
1767            let result1 = runtime.poll(Counter { id: 1 }).unwrap();
1768            let rev1 = result1.revision;
1769
1770            // Invalidate and poll again
1771            runtime.invalidate::<Counter>(&1);
1772            let result2 = runtime.poll(Counter { id: 1 }).unwrap();
1773            let rev2 = result2.revision;
1774
1775            // Revision should increase (value was recomputed)
1776            // Note: Since output_eq returns true (same value), this might not change
1777            // depending on early cutoff behavior. Let's verify the value is still correct.
1778            assert_eq!(**result2.value.as_ref().unwrap(), 10);
1779
1780            // With early cutoff, revision might stay the same if value didn't change
1781            // This is expected behavior
1782            assert!(rev2 >= rev1);
1783        }
1784
1785        #[test]
1786        fn test_changed_at_returns_none_for_unexecuted_query() {
1787            let runtime = QueryRuntime::new();
1788
1789            // Query has never been executed
1790            let rev = runtime.changed_at::<Counter>(&1);
1791            assert!(rev.is_none());
1792        }
1793
1794        #[test]
1795        fn test_changed_at_returns_revision_after_execution() {
1796            let runtime = QueryRuntime::new();
1797
1798            // Execute the query
1799            let _ = runtime.query(Counter { id: 1 }).unwrap();
1800
1801            // Now changed_at should return Some
1802            let rev = runtime.changed_at::<Counter>(&1);
1803            assert!(rev.is_some());
1804            assert!(rev.unwrap() > 0);
1805        }
1806
1807        #[test]
1808        fn test_changed_at_matches_poll_revision() {
1809            let runtime = QueryRuntime::new();
1810
1811            // Poll the query
1812            let result = runtime.poll(Counter { id: 1 }).unwrap();
1813
1814            // changed_at should match the revision from poll
1815            let rev = runtime.changed_at::<Counter>(&1);
1816            assert_eq!(rev, Some(result.revision));
1817        }
1818
1819        #[test]
1820        fn test_poll_value_access() {
1821            let runtime = QueryRuntime::new();
1822
1823            let result = runtime.poll(Counter { id: 5 }).unwrap();
1824
1825            // Access through Result and Arc
1826            let value: &i32 = result.value.as_ref().unwrap();
1827            assert_eq!(*value, 50);
1828
1829            // Access Arc directly via field after unwrapping Result
1830            let arc: &Arc<i32> = result.value.as_ref().unwrap();
1831            assert_eq!(**arc, 50);
1832        }
1833
1834        #[test]
1835        fn test_subscription_pattern() {
1836            let runtime = QueryRuntime::new();
1837
1838            // Simulate subscription pattern
1839            let mut last_revision: RevisionCounter = 0;
1840            let mut notifications = 0;
1841
1842            // First poll - should notify (new value)
1843            let result = runtime.poll(Counter { id: 1 }).unwrap();
1844            if result.revision > last_revision {
1845                notifications += 1;
1846                last_revision = result.revision;
1847            }
1848
1849            // Second poll - should NOT notify (no change)
1850            let result = runtime.poll(Counter { id: 1 }).unwrap();
1851            if result.revision > last_revision {
1852                notifications += 1;
1853                last_revision = result.revision;
1854            }
1855
1856            // Third poll - should NOT notify (no change)
1857            let result = runtime.poll(Counter { id: 1 }).unwrap();
1858            if result.revision > last_revision {
1859                notifications += 1;
1860                #[allow(unused_assignments)]
1861                {
1862                    last_revision = result.revision;
1863                }
1864            }
1865
1866            // Only the first poll should have triggered a notification
1867            assert_eq!(notifications, 1);
1868        }
1869    }
1870}