query_flow/
runtime.rs

1//! Query runtime and context.
2
3use std::any::TypeId;
4use std::cell::RefCell;
5use std::sync::Arc;
6
7use std::ops::Deref;
8
9use whale::{Durability, RevisionCounter, Runtime as WhaleRuntime};
10
11use crate::asset::{AssetKey, AssetLocator, DurabilityLevel, FullAssetKey, PendingAsset};
12use crate::key::FullCacheKey;
13use crate::loading::AssetLoadingState;
14use crate::query::Query;
15use crate::storage::{
16    AssetKeyRegistry, AssetState, AssetStorage, CachedEntry, CachedValue, LocatorStorage,
17    PendingStorage, QueryRegistry, VerifierStorage,
18};
19use crate::QueryError;
20
21/// Function type for comparing user errors during early cutoff.
22///
23/// Used by `QueryRuntimeBuilder::error_comparator` to customize how
24/// `QueryError::UserError` values are compared for caching purposes.
25pub type ErrorComparator = fn(&anyhow::Error, &anyhow::Error) -> bool;
26
27#[cfg(feature = "inspector")]
28use query_flow_inspector::{EventSink, FlowEvent, SpanId};
29
30/// Number of durability levels (matches whale's default).
31const DURABILITY_LEVELS: usize = 4;
32
33// Thread-local query execution stack for cycle detection.
34thread_local! {
35    static QUERY_STACK: RefCell<Vec<FullCacheKey>> = const { RefCell::new(Vec::new()) };
36}
37
38/// Execution context passed through query execution.
39///
40/// When the `inspector` feature is disabled, this is a zero-sized type (ZST)
41/// with no runtime cost.
42#[cfg(feature = "inspector")]
43#[derive(Clone, Copy)]
44pub struct ExecutionContext {
45    span_id: SpanId,
46}
47
48#[cfg(not(feature = "inspector"))]
49#[derive(Clone, Copy)]
50pub struct ExecutionContext;
51
52impl ExecutionContext {
53    /// Create a new execution context.
54    #[cfg(feature = "inspector")]
55    pub fn new() -> Self {
56        Self {
57            span_id: query_flow_inspector::new_span_id(),
58        }
59    }
60
61    /// Create a new execution context.
62    #[cfg(not(feature = "inspector"))]
63    #[inline(always)]
64    pub fn new() -> Self {
65        Self
66    }
67
68    /// Get the span ID for this execution context.
69    #[cfg(feature = "inspector")]
70    pub fn span_id(&self) -> SpanId {
71        self.span_id
72    }
73}
74
75impl Default for ExecutionContext {
76    fn default() -> Self {
77        Self::new()
78    }
79}
80
81/// Result of polling a query, containing the value and its revision.
82///
83/// This is returned by [`QueryRuntime::poll`] and provides both the query result
84/// and its change revision, enabling efficient change detection for subscription
85/// patterns.
86///
87/// # Example
88///
89/// ```ignore
90/// let result = runtime.poll(MyQuery::new())?;
91///
92/// // Access the value via Deref
93/// println!("{:?}", *result);
94///
95/// // Check if changed since last poll
96/// if result.revision > last_known_revision {
97///     notify_subscribers(&result.value);
98///     last_known_revision = result.revision;
99/// }
100/// ```
101#[derive(Debug, Clone)]
102pub struct Polled<T> {
103    /// The query result value.
104    pub value: T,
105    /// The revision at which this value was last changed.
106    ///
107    /// Compare this with a previously stored revision to detect changes.
108    pub revision: RevisionCounter,
109}
110
111impl<T: Deref> Deref for Polled<T> {
112    type Target = T::Target;
113
114    fn deref(&self) -> &Self::Target {
115        &self.value
116    }
117}
118
119/// The query runtime manages query execution, caching, and dependency tracking.
120///
121/// This is cheap to clone - all data is behind `Arc`.
122///
123/// # Example
124///
125/// ```ignore
126/// let runtime = QueryRuntime::new();
127///
128/// // Sync query execution
129/// let result = runtime.query(MyQuery { ... })?;
130///
131/// // Async query execution (waits through Suspend)
132/// let result = runtime.query_async(MyQuery { ... }).await?;
133/// ```
134pub struct QueryRuntime {
135    /// Whale runtime for dependency tracking and cache storage.
136    /// Query outputs are stored in Node.data as Option<CachedEntry>.
137    whale: WhaleRuntime<FullCacheKey, Option<CachedEntry>, DURABILITY_LEVELS>,
138    /// Asset cache and state storage
139    assets: Arc<AssetStorage>,
140    /// Registered asset locators
141    locators: Arc<LocatorStorage>,
142    /// Pending asset requests
143    pending: Arc<PendingStorage>,
144    /// Registry for tracking query instances (for list_queries)
145    query_registry: Arc<QueryRegistry>,
146    /// Registry for tracking asset keys (for list_asset_keys)
147    asset_key_registry: Arc<AssetKeyRegistry>,
148    /// Verifiers for re-executing queries (for verify-then-decide pattern)
149    verifiers: Arc<VerifierStorage>,
150    /// Comparator for user errors during early cutoff
151    error_comparator: ErrorComparator,
152    /// Event sink for inspector/tracing
153    #[cfg(feature = "inspector")]
154    sink: Arc<parking_lot::RwLock<Option<Arc<dyn EventSink>>>>,
155}
156
157impl Default for QueryRuntime {
158    fn default() -> Self {
159        Self::new()
160    }
161}
162
163impl Clone for QueryRuntime {
164    fn clone(&self) -> Self {
165        Self {
166            whale: self.whale.clone(),
167            assets: self.assets.clone(),
168            locators: self.locators.clone(),
169            pending: self.pending.clone(),
170            query_registry: self.query_registry.clone(),
171            asset_key_registry: self.asset_key_registry.clone(),
172            verifiers: self.verifiers.clone(),
173            error_comparator: self.error_comparator,
174            #[cfg(feature = "inspector")]
175            sink: self.sink.clone(),
176        }
177    }
178}
179
180/// Default error comparator that treats all errors as different.
181///
182/// This is conservative - it always triggers recomputation when an error occurs.
183fn default_error_comparator(_a: &anyhow::Error, _b: &anyhow::Error) -> bool {
184    false
185}
186
187impl QueryRuntime {
188    /// Get cached output along with its revision (single atomic access).
189    fn get_cached_with_revision<Q: Query>(
190        &self,
191        key: &FullCacheKey,
192    ) -> Option<(CachedValue<Arc<Q::Output>>, RevisionCounter)> {
193        let node = self.whale.get(key)?;
194        let revision = node.changed_at;
195        let entry = node.data.as_ref()?;
196        let cached = entry.to_cached_value::<Q::Output>()?;
197        Some((cached, revision))
198    }
199}
200
201impl QueryRuntime {
202    /// Create a new query runtime with default settings.
203    pub fn new() -> Self {
204        Self::builder().build()
205    }
206
207    /// Create a builder for customizing the runtime.
208    ///
209    /// # Example
210    ///
211    /// ```ignore
212    /// let runtime = QueryRuntime::builder()
213    ///     .error_comparator(|a, b| {
214    ///         // Custom error comparison logic
215    ///         match (a.downcast_ref::<MyError>(), b.downcast_ref::<MyError>()) {
216    ///             (Some(a), Some(b)) => a == b,
217    ///             _ => false,
218    ///         }
219    ///     })
220    ///     .build();
221    /// ```
222    pub fn builder() -> QueryRuntimeBuilder {
223        QueryRuntimeBuilder::new()
224    }
225
226    /// Set the event sink for tracing/inspection.
227    ///
228    /// Pass `Some(sink)` to enable event collection, or `None` to disable.
229    ///
230    /// # Example
231    ///
232    /// ```ignore
233    /// use query_flow_inspector::EventCollector;
234    /// use std::sync::Arc;
235    ///
236    /// let collector = Arc::new(EventCollector::new());
237    /// runtime.set_sink(Some(collector.clone()));
238    /// runtime.query(MyQuery::new());
239    /// let trace = collector.trace();
240    /// ```
241    #[cfg(feature = "inspector")]
242    pub fn set_sink(&self, sink: Option<Arc<dyn EventSink>>) {
243        *self.sink.write() = sink;
244    }
245
246    /// Get the current event sink.
247    #[cfg(feature = "inspector")]
248    pub fn sink(&self) -> Option<Arc<dyn EventSink>> {
249        self.sink.read().clone()
250    }
251
252    /// Emit an event to the sink if one is set.
253    #[cfg(feature = "inspector")]
254    #[inline]
255    fn emit<F: FnOnce() -> FlowEvent>(&self, event: F) {
256        let guard = self.sink.read();
257        if let Some(ref sink) = *guard {
258            sink.emit(event());
259        }
260    }
261
262    /// Execute a query synchronously.
263    ///
264    /// Returns the cached result if valid, otherwise executes the query.
265    ///
266    /// # Errors
267    ///
268    /// - `QueryError::Suspend` - Query is waiting for async loading
269    /// - `QueryError::Cycle` - Dependency cycle detected
270    pub fn query<Q: Query>(&self, query: Q) -> Result<Arc<Q::Output>, QueryError> {
271        self.query_internal(query)
272            .and_then(|(inner_result, _)| inner_result.map_err(QueryError::UserError))
273    }
274
275    /// Internal implementation shared by query() and poll().
276    ///
277    /// Returns (result, revision) tuple where result is either Ok(output) or Err(user_error).
278    /// System errors (Suspend, Cycle, etc.) are returned as the outer Err.
279    #[allow(clippy::type_complexity)]
280    fn query_internal<Q: Query>(
281        &self,
282        query: Q,
283    ) -> Result<(Result<Arc<Q::Output>, Arc<anyhow::Error>>, RevisionCounter), QueryError> {
284        let key = query.cache_key();
285        let full_key = FullCacheKey::new::<Q, _>(&key);
286
287        // Create execution context and emit start event
288        let exec_ctx = ExecutionContext::new();
289        #[cfg(feature = "inspector")]
290        let start_time = std::time::Instant::now();
291        #[cfg(feature = "inspector")]
292        let query_key =
293            query_flow_inspector::QueryKey::new(std::any::type_name::<Q>(), full_key.debug_repr());
294
295        #[cfg(feature = "inspector")]
296        self.emit(|| FlowEvent::QueryStart {
297            span_id: exec_ctx.span_id(),
298            query: query_key.clone(),
299        });
300
301        // Check for cycles using thread-local stack
302        let cycle_detected = QUERY_STACK.with(|stack| {
303            let stack = stack.borrow();
304            stack.iter().any(|k| k == &full_key)
305        });
306
307        if cycle_detected {
308            let path = QUERY_STACK.with(|stack| {
309                let stack = stack.borrow();
310                let mut path: Vec<String> =
311                    stack.iter().map(|k| k.debug_repr().to_string()).collect();
312                path.push(full_key.debug_repr().to_string());
313                path
314            });
315
316            #[cfg(feature = "inspector")]
317            self.emit(|| FlowEvent::CycleDetected {
318                path: path
319                    .iter()
320                    .map(|s| query_flow_inspector::QueryKey::new("", s.clone()))
321                    .collect(),
322            });
323
324            #[cfg(feature = "inspector")]
325            self.emit(|| FlowEvent::QueryEnd {
326                span_id: exec_ctx.span_id(),
327                query: query_key.clone(),
328                result: query_flow_inspector::ExecutionResult::CycleDetected,
329                duration: start_time.elapsed(),
330            });
331
332            return Err(QueryError::Cycle { path });
333        }
334
335        // Check if cached and valid (with verify-then-decide pattern)
336        let current_rev = self.whale.current_revision();
337
338        // Fast path: already verified at current revision
339        if self.whale.is_verified_at(&full_key, &current_rev) {
340            // Single atomic access to get both cached value and revision
341            if let Some((cached, revision)) = self.get_cached_with_revision::<Q>(&full_key) {
342                #[cfg(feature = "inspector")]
343                self.emit(|| FlowEvent::CacheCheck {
344                    span_id: exec_ctx.span_id(),
345                    query: query_key.clone(),
346                    valid: true,
347                });
348
349                #[cfg(feature = "inspector")]
350                self.emit(|| FlowEvent::QueryEnd {
351                    span_id: exec_ctx.span_id(),
352                    query: query_key.clone(),
353                    result: query_flow_inspector::ExecutionResult::CacheHit,
354                    duration: start_time.elapsed(),
355                });
356
357                return match cached {
358                    CachedValue::Ok(output) => Ok((Ok(output), revision)),
359                    CachedValue::UserError(err) => Ok((Err(err), revision)),
360                };
361            }
362        }
363
364        // Check shallow validity (deps' changed_at ok) and try verify-then-decide
365        if self.whale.is_valid(&full_key) {
366            // Single atomic access to get both cached value and revision
367            if let Some((cached, revision)) = self.get_cached_with_revision::<Q>(&full_key) {
368                // Shallow valid but not verified - verify deps first
369                let mut deps_verified = true;
370                if let Some(deps) = self.whale.get_dependency_ids(&full_key) {
371                    for dep in deps {
372                        if let Some(verifier) = self.verifiers.get(&dep) {
373                            // Re-query dep to verify it (triggers recursive verification)
374                            if verifier.verify(self).is_err() {
375                                deps_verified = false;
376                                break;
377                            }
378                        }
379                    }
380                }
381
382                // Re-check validity after deps are verified
383                if deps_verified && self.whale.is_valid(&full_key) {
384                    // Deps didn't change their changed_at, mark as verified and use cached
385                    self.whale.mark_verified(&full_key, &current_rev);
386
387                    #[cfg(feature = "inspector")]
388                    self.emit(|| FlowEvent::CacheCheck {
389                        span_id: exec_ctx.span_id(),
390                        query: query_key.clone(),
391                        valid: true,
392                    });
393
394                    #[cfg(feature = "inspector")]
395                    self.emit(|| FlowEvent::QueryEnd {
396                        span_id: exec_ctx.span_id(),
397                        query: query_key.clone(),
398                        result: query_flow_inspector::ExecutionResult::CacheHit,
399                        duration: start_time.elapsed(),
400                    });
401
402                    return match cached {
403                        CachedValue::Ok(output) => Ok((Ok(output), revision)),
404                        CachedValue::UserError(err) => Ok((Err(err), revision)),
405                    };
406                }
407                // A dep's changed_at increased, fall through to execute
408            }
409        }
410
411        #[cfg(feature = "inspector")]
412        self.emit(|| FlowEvent::CacheCheck {
413            span_id: exec_ctx.span_id(),
414            query: query_key.clone(),
415            valid: false,
416        });
417
418        // Execute the query with cycle tracking
419        QUERY_STACK.with(|stack| {
420            stack.borrow_mut().push(full_key.clone());
421        });
422
423        let result = self.execute_query::<Q>(&query, &full_key, exec_ctx);
424
425        QUERY_STACK.with(|stack| {
426            stack.borrow_mut().pop();
427        });
428
429        // Emit end event
430        #[cfg(feature = "inspector")]
431        {
432            let exec_result = match &result {
433                Ok((_, true, _)) => query_flow_inspector::ExecutionResult::Changed,
434                Ok((_, false, _)) => query_flow_inspector::ExecutionResult::Unchanged,
435                Err(QueryError::Suspend { .. }) => query_flow_inspector::ExecutionResult::Suspended,
436                Err(QueryError::Cycle { .. }) => {
437                    query_flow_inspector::ExecutionResult::CycleDetected
438                }
439                Err(e) => query_flow_inspector::ExecutionResult::Error {
440                    message: format!("{:?}", e),
441                },
442            };
443            self.emit(|| FlowEvent::QueryEnd {
444                span_id: exec_ctx.span_id(),
445                query: query_key.clone(),
446                result: exec_result,
447                duration: start_time.elapsed(),
448            });
449        }
450
451        result.map(|(inner_result, _, revision)| (inner_result, revision))
452    }
453
454    /// Execute a query, caching the result if appropriate.
455    ///
456    /// Returns (result, output_changed, revision) tuple.
457    /// - `result`: Ok(output) for success, Err(user_error) for user errors
458    /// - System errors (Suspend, Cycle, etc.) are returned as outer Err
459    #[allow(clippy::type_complexity)]
460    fn execute_query<Q: Query>(
461        &self,
462        query: &Q,
463        full_key: &FullCacheKey,
464        exec_ctx: ExecutionContext,
465    ) -> Result<
466        (
467            Result<Arc<Q::Output>, Arc<anyhow::Error>>,
468            bool,
469            RevisionCounter,
470        ),
471        QueryError,
472    > {
473        // Create context for this query execution
474        let mut ctx = QueryContext {
475            runtime: self,
476            current_key: full_key.clone(),
477            #[cfg(feature = "inspector")]
478            parent_query_type: std::any::type_name::<Q>(),
479            #[cfg(feature = "inspector")]
480            exec_ctx,
481            deps: RefCell::new(Vec::new()),
482        };
483        // Suppress unused variable warning when inspector is disabled
484        #[cfg(not(feature = "inspector"))]
485        let _ = exec_ctx;
486
487        // Execute the query
488        let result = query.query(&mut ctx);
489
490        // Get collected dependencies
491        let deps: Vec<FullCacheKey> = ctx.deps.borrow().clone();
492
493        // Get durability for whale registration
494        let durability =
495            Durability::new(query.durability() as usize).unwrap_or(Durability::volatile());
496
497        match result {
498            Ok(output) => {
499                let output = Arc::new(output);
500
501                // Check if output changed (for early cutoff)
502                // existing_revision is Some only when output is unchanged (can reuse revision)
503                let existing_revision = if let Some((CachedValue::Ok(old), rev)) =
504                    self.get_cached_with_revision::<Q>(full_key)
505                {
506                    if Q::output_eq(&old, &output) {
507                        Some(rev) // Same output - reuse revision
508                    } else {
509                        None // Different output
510                    }
511                } else {
512                    None // No previous Ok value
513                };
514                let output_changed = existing_revision.is_none();
515
516                // Emit early cutoff check event
517                #[cfg(feature = "inspector")]
518                self.emit(|| FlowEvent::EarlyCutoffCheck {
519                    span_id: exec_ctx.span_id(),
520                    query: query_flow_inspector::QueryKey::new(
521                        std::any::type_name::<Q>(),
522                        full_key.debug_repr(),
523                    ),
524                    output_changed,
525                });
526
527                // Update whale with cached entry (atomic update of value + dependency state)
528                let entry = CachedEntry::Ok(output.clone() as Arc<dyn std::any::Any + Send + Sync>);
529                let revision = if let Some(existing_rev) = existing_revision {
530                    // confirm_unchanged doesn't change changed_at, use existing
531                    let _ = self.whale.confirm_unchanged(full_key, deps);
532                    existing_rev
533                } else {
534                    // Use new_rev from register result
535                    match self
536                        .whale
537                        .register(full_key.clone(), Some(entry), durability, deps)
538                    {
539                        Ok(result) => result.new_rev,
540                        Err(missing) => {
541                            return Err(QueryError::DependenciesRemoved {
542                                missing_keys: missing,
543                            })
544                        }
545                    }
546                };
547
548                // Register query in registry for list_queries
549                let is_new_query = self.query_registry.register(query);
550                if is_new_query {
551                    let sentinel = FullCacheKey::query_set_sentinel::<Q>();
552                    let _ = self
553                        .whale
554                        .register(sentinel, None, Durability::volatile(), vec![]);
555                }
556
557                // Store verifier for this query (for verify-then-decide pattern)
558                self.verifiers.insert(full_key.clone(), query.clone());
559
560                Ok((Ok(output), output_changed, revision))
561            }
562            Err(QueryError::UserError(err)) => {
563                // Check if error changed (for early cutoff)
564                // existing_revision is Some only when error is unchanged (can reuse revision)
565                let existing_revision = if let Some((CachedValue::UserError(old_err), rev)) =
566                    self.get_cached_with_revision::<Q>(full_key)
567                {
568                    if (self.error_comparator)(old_err.as_ref(), err.as_ref()) {
569                        Some(rev) // Same error - reuse revision
570                    } else {
571                        None // Different error
572                    }
573                } else {
574                    None // No previous UserError
575                };
576                let output_changed = existing_revision.is_none();
577
578                // Emit early cutoff check event
579                #[cfg(feature = "inspector")]
580                self.emit(|| FlowEvent::EarlyCutoffCheck {
581                    span_id: exec_ctx.span_id(),
582                    query: query_flow_inspector::QueryKey::new(
583                        std::any::type_name::<Q>(),
584                        full_key.debug_repr(),
585                    ),
586                    output_changed,
587                });
588
589                // Update whale with cached error (atomic update of value + dependency state)
590                let entry = CachedEntry::UserError(err.clone());
591                let revision = if let Some(existing_rev) = existing_revision {
592                    // confirm_unchanged doesn't change changed_at, use existing
593                    let _ = self.whale.confirm_unchanged(full_key, deps);
594                    existing_rev
595                } else {
596                    // Use new_rev from register result
597                    match self
598                        .whale
599                        .register(full_key.clone(), Some(entry), durability, deps)
600                    {
601                        Ok(result) => result.new_rev,
602                        Err(missing) => {
603                            return Err(QueryError::DependenciesRemoved {
604                                missing_keys: missing,
605                            })
606                        }
607                    }
608                };
609
610                // Register query in registry for list_queries
611                let is_new_query = self.query_registry.register(query);
612                if is_new_query {
613                    let sentinel = FullCacheKey::query_set_sentinel::<Q>();
614                    let _ = self
615                        .whale
616                        .register(sentinel, None, Durability::volatile(), vec![]);
617                }
618
619                // Store verifier for this query (for verify-then-decide pattern)
620                self.verifiers.insert(full_key.clone(), query.clone());
621
622                Ok((Err(err), output_changed, revision))
623            }
624            Err(e) => {
625                // System errors (Suspend, Cycle, Cancelled, MissingDependency) are not cached
626                Err(e)
627            }
628        }
629    }
630
631    /// Invalidate a query, forcing recomputation on next access.
632    ///
633    /// This also invalidates any queries that depend on this one.
634    pub fn invalidate<Q: Query>(&self, key: &Q::CacheKey) {
635        let full_key = FullCacheKey::new::<Q, _>(key);
636
637        #[cfg(feature = "inspector")]
638        self.emit(|| FlowEvent::QueryInvalidated {
639            query: query_flow_inspector::QueryKey::new(
640                std::any::type_name::<Q>(),
641                full_key.debug_repr(),
642            ),
643            reason: query_flow_inspector::InvalidationReason::ManualInvalidation,
644        });
645
646        // Update whale to invalidate dependents (register with None to clear cached value)
647        let _ = self
648            .whale
649            .register(full_key, None, Durability::volatile(), vec![]);
650    }
651
652    /// Remove a query from the cache entirely, freeing memory.
653    ///
654    /// Use this for GC when a query is no longer needed.
655    /// Unlike `invalidate`, this removes all traces of the query from storage.
656    /// The query will be recomputed from scratch on next access.
657    ///
658    /// This also invalidates any queries that depend on this one.
659    pub fn remove_query<Q: Query>(&self, key: &Q::CacheKey) {
660        let full_key = FullCacheKey::new::<Q, _>(key);
661
662        #[cfg(feature = "inspector")]
663        self.emit(|| FlowEvent::QueryInvalidated {
664            query: query_flow_inspector::QueryKey::new(
665                std::any::type_name::<Q>(),
666                full_key.debug_repr(),
667            ),
668            reason: query_flow_inspector::InvalidationReason::ManualInvalidation,
669        });
670
671        // Remove verifier if exists
672        self.verifiers.remove(&full_key);
673
674        // Remove from whale storage (this also handles dependent invalidation)
675        self.whale.remove(&full_key);
676
677        // Remove from registry and update sentinel for list_queries
678        if self.query_registry.remove::<Q>(key) {
679            let sentinel = FullCacheKey::query_set_sentinel::<Q>();
680            let _ = self
681                .whale
682                .register(sentinel, None, Durability::volatile(), vec![]);
683        }
684    }
685
686    /// Clear all cached values by removing all nodes from whale.
687    ///
688    /// Note: This is a relatively expensive operation as it iterates through all keys.
689    pub fn clear_cache(&self) {
690        let keys = self.whale.keys();
691        for key in keys {
692            self.whale.remove(&key);
693        }
694    }
695
696    /// Poll a query, returning both the result and its change revision.
697    ///
698    /// This is useful for implementing subscription patterns where you need to
699    /// detect changes efficiently. Compare the returned `revision` with a
700    /// previously stored value to determine if the query result has changed.
701    ///
702    /// The returned `Polled` contains a `Result<Arc<Q::Output>, Arc<anyhow::Error>>`
703    /// as its value, allowing you to track revision changes for both success and
704    /// user error cases.
705    ///
706    /// # Example
707    ///
708    /// ```ignore
709    /// struct Subscription<Q: Query> {
710    ///     query: Q,
711    ///     last_revision: RevisionCounter,
712    ///     tx: Sender<Result<Arc<Q::Output>, Arc<anyhow::Error>>>,
713    /// }
714    ///
715    /// // Polling loop
716    /// for sub in &mut subscriptions {
717    ///     let result = runtime.poll(sub.query.clone())?;
718    ///     if result.revision > sub.last_revision {
719    ///         sub.tx.send(result.value.clone())?;
720    ///         sub.last_revision = result.revision;
721    ///     }
722    /// }
723    /// ```
724    ///
725    /// # Errors
726    ///
727    /// Returns `Err` only for system errors (Suspend, Cycle, etc.).
728    /// User errors are returned as `Ok(Polled { value: Err(error), ... })`.
729    #[allow(clippy::type_complexity)]
730    pub fn poll<Q: Query>(
731        &self,
732        query: Q,
733    ) -> Result<Polled<Result<Arc<Q::Output>, Arc<anyhow::Error>>>, QueryError> {
734        let (value, revision) = self.query_internal(query)?;
735        Ok(Polled { value, revision })
736    }
737
738    /// Get the change revision of a query without executing it.
739    ///
740    /// Returns `None` if the query has never been executed.
741    ///
742    /// This is useful for checking if a query has changed since the last poll
743    /// without the cost of executing the query.
744    ///
745    /// # Example
746    ///
747    /// ```ignore
748    /// // Check if query has changed before deciding to poll
749    /// if let Some(rev) = runtime.changed_at::<MyQuery>(&key) {
750    ///     if rev > last_known_revision {
751    ///         let result = runtime.query(MyQuery::new(key))?;
752    ///         // Process result...
753    ///     }
754    /// }
755    /// ```
756    pub fn changed_at<Q: Query>(&self, key: &Q::CacheKey) -> Option<RevisionCounter> {
757        let full_key = FullCacheKey::new::<Q, _>(key);
758        self.whale.get(&full_key).map(|node| node.changed_at)
759    }
760}
761
762// ============================================================================
763// Builder
764// ============================================================================
765
766/// Builder for [`QueryRuntime`] with customizable settings.
767///
768/// # Example
769///
770/// ```ignore
771/// let runtime = QueryRuntime::builder()
772///     .error_comparator(|a, b| {
773///         // Treat all errors of the same type as equal
774///         a.downcast_ref::<std::io::Error>().is_some()
775///             == b.downcast_ref::<std::io::Error>().is_some()
776///     })
777///     .build();
778/// ```
779pub struct QueryRuntimeBuilder {
780    error_comparator: ErrorComparator,
781}
782
783impl Default for QueryRuntimeBuilder {
784    fn default() -> Self {
785        Self::new()
786    }
787}
788
789impl QueryRuntimeBuilder {
790    /// Create a new builder with default settings.
791    pub fn new() -> Self {
792        Self {
793            error_comparator: default_error_comparator,
794        }
795    }
796
797    /// Set the error comparator function for early cutoff optimization.
798    ///
799    /// When a query returns `QueryError::UserError`, this function is used
800    /// to compare it with the previously cached error. If they are equal,
801    /// downstream queries can skip recomputation (early cutoff).
802    ///
803    /// The default comparator returns `false` for all errors, meaning errors
804    /// are always considered different (conservative, always recomputes).
805    ///
806    /// # Example
807    ///
808    /// ```ignore
809    /// // Treat errors as equal if they have the same display message
810    /// let runtime = QueryRuntime::builder()
811    ///     .error_comparator(|a, b| a.to_string() == b.to_string())
812    ///     .build();
813    /// ```
814    pub fn error_comparator(mut self, f: ErrorComparator) -> Self {
815        self.error_comparator = f;
816        self
817    }
818
819    /// Build the runtime with the configured settings.
820    pub fn build(self) -> QueryRuntime {
821        QueryRuntime {
822            whale: WhaleRuntime::new(),
823            assets: Arc::new(AssetStorage::new()),
824            locators: Arc::new(LocatorStorage::new()),
825            pending: Arc::new(PendingStorage::new()),
826            query_registry: Arc::new(QueryRegistry::new()),
827            asset_key_registry: Arc::new(AssetKeyRegistry::new()),
828            verifiers: Arc::new(VerifierStorage::new()),
829            error_comparator: self.error_comparator,
830            #[cfg(feature = "inspector")]
831            sink: Arc::new(parking_lot::RwLock::new(None)),
832        }
833    }
834}
835
836// ============================================================================
837// Asset API
838// ============================================================================
839
840impl QueryRuntime {
841    /// Register an asset locator for a specific asset key type.
842    ///
843    /// Only one locator can be registered per key type. Later registrations
844    /// replace earlier ones.
845    ///
846    /// # Example
847    ///
848    /// ```ignore
849    /// let runtime = QueryRuntime::new();
850    /// runtime.register_asset_locator(FileSystemLocator::new("/assets"));
851    /// ```
852    pub fn register_asset_locator<K, L>(&self, locator: L)
853    where
854        K: AssetKey,
855        L: AssetLocator<K>,
856    {
857        self.locators.insert::<K, L>(locator);
858    }
859
860    /// Get an iterator over pending asset requests.
861    ///
862    /// Returns assets that have been requested but not yet resolved.
863    /// The user should fetch these externally and call `resolve_asset()`.
864    ///
865    /// # Example
866    ///
867    /// ```ignore
868    /// for pending in runtime.pending_assets() {
869    ///     if let Some(path) = pending.key::<FilePath>() {
870    ///         let content = fetch_file(path);
871    ///         runtime.resolve_asset(path.clone(), content);
872    ///     }
873    /// }
874    /// ```
875    pub fn pending_assets(&self) -> Vec<PendingAsset> {
876        self.pending.get_all()
877    }
878
879    /// Get pending assets filtered by key type.
880    pub fn pending_assets_of<K: AssetKey>(&self) -> Vec<K> {
881        self.pending.get_of_type::<K>()
882    }
883
884    /// Check if there are any pending assets.
885    pub fn has_pending_assets(&self) -> bool {
886        !self.pending.is_empty()
887    }
888
889    /// Resolve an asset with its loaded value.
890    ///
891    /// This marks the asset as ready and invalidates any queries that
892    /// depend on it (if the value changed), triggering recomputation on next access.
893    ///
894    /// This method is idempotent - resolving with the same value (via `asset_eq`)
895    /// will not trigger downstream recomputation.
896    ///
897    /// Uses the asset key's default durability.
898    ///
899    /// # Example
900    ///
901    /// ```ignore
902    /// let content = std::fs::read_to_string(&path)?;
903    /// runtime.resolve_asset(FilePath(path), content);
904    /// ```
905    pub fn resolve_asset<K: AssetKey>(&self, key: K, value: K::Asset) {
906        let durability = key.durability();
907        self.resolve_asset_internal(key, value, durability);
908    }
909
910    /// Resolve an asset with a specific durability level.
911    ///
912    /// Use this to override the asset key's default durability.
913    pub fn resolve_asset_with_durability<K: AssetKey>(
914        &self,
915        key: K,
916        value: K::Asset,
917        durability: DurabilityLevel,
918    ) {
919        self.resolve_asset_internal(key, value, durability);
920    }
921
922    fn resolve_asset_internal<K: AssetKey>(
923        &self,
924        key: K,
925        value: K::Asset,
926        durability_level: DurabilityLevel,
927    ) {
928        let full_asset_key = FullAssetKey::new(&key);
929        let full_cache_key = FullCacheKey::from_asset_key(&full_asset_key);
930
931        // Check for early cutoff
932        let changed = if let Some(old_value) = self.assets.get_ready::<K>(&full_asset_key) {
933            !K::asset_eq(&old_value, &value)
934        } else {
935            true // Loading or NotFound -> Ready is a change
936        };
937
938        // Emit asset resolved event
939        #[cfg(feature = "inspector")]
940        self.emit(|| FlowEvent::AssetResolved {
941            asset: query_flow_inspector::AssetKey::new(
942                std::any::type_name::<K>(),
943                format!("{:?}", key),
944            ),
945            changed,
946        });
947
948        // Store the new value
949        self.assets
950            .insert_ready::<K>(full_asset_key.clone(), Arc::new(value));
951
952        // Remove from pending
953        self.pending.remove(&full_asset_key);
954
955        // Update whale dependency tracking
956        let durability =
957            Durability::new(durability_level.as_u8() as usize).unwrap_or(Durability::volatile());
958
959        if changed {
960            // Register with new changed_at to invalidate dependents
961            let _ = self
962                .whale
963                .register(full_cache_key, None, durability, vec![]);
964        } else {
965            // Early cutoff - keep old changed_at
966            let _ = self.whale.confirm_unchanged(&full_cache_key, vec![]);
967        }
968
969        // Register asset key in registry for list_asset_keys
970        let is_new_asset = self.asset_key_registry.register(&key);
971        if is_new_asset {
972            // Update sentinel to invalidate list_asset_keys dependents
973            let sentinel = FullCacheKey::asset_key_set_sentinel::<K>();
974            let _ = self
975                .whale
976                .register(sentinel, None, Durability::volatile(), vec![]);
977        }
978    }
979
980    /// Invalidate an asset, forcing queries to re-request it.
981    ///
982    /// The asset will be marked as loading and added to pending assets.
983    /// Dependent queries will suspend until the asset is resolved again.
984    ///
985    /// # Example
986    ///
987    /// ```ignore
988    /// // File was modified externally
989    /// runtime.invalidate_asset(&FilePath("config.json".into()));
990    /// // Queries depending on this asset will now suspend
991    /// // User should fetch the new value and call resolve_asset
992    /// ```
993    pub fn invalidate_asset<K: AssetKey>(&self, key: &K) {
994        let full_asset_key = FullAssetKey::new(key);
995        let full_cache_key = FullCacheKey::from_asset_key(&full_asset_key);
996
997        // Emit asset invalidated event
998        #[cfg(feature = "inspector")]
999        self.emit(|| FlowEvent::AssetInvalidated {
1000            asset: query_flow_inspector::AssetKey::new(
1001                std::any::type_name::<K>(),
1002                format!("{:?}", key),
1003            ),
1004        });
1005
1006        // Mark as loading
1007        self.assets
1008            .insert(full_asset_key.clone(), AssetState::Loading);
1009
1010        // Add to pending
1011        self.pending.insert::<K>(full_asset_key, key.clone());
1012
1013        // Update whale to invalidate dependents (use volatile during loading)
1014        let _ = self
1015            .whale
1016            .register(full_cache_key, None, Durability::volatile(), vec![]);
1017    }
1018
1019    /// Remove an asset from the cache entirely.
1020    ///
1021    /// Unlike `invalidate_asset`, this removes all traces of the asset.
1022    /// Dependent queries will go through the locator again on next access.
1023    pub fn remove_asset<K: AssetKey>(&self, key: &K) {
1024        let full_asset_key = FullAssetKey::new(key);
1025        let full_cache_key = FullCacheKey::from_asset_key(&full_asset_key);
1026
1027        // First, register a change to invalidate dependents
1028        // This ensures queries that depend on this asset will recompute
1029        let _ = self
1030            .whale
1031            .register(full_cache_key.clone(), None, Durability::volatile(), vec![]);
1032
1033        // Then remove the asset from storage
1034        self.assets.remove(&full_asset_key);
1035        self.pending.remove(&full_asset_key);
1036
1037        // Finally remove from whale tracking
1038        self.whale.remove(&full_cache_key);
1039
1040        // Remove from registry and update sentinel for list_asset_keys
1041        if self.asset_key_registry.remove::<K>(key) {
1042            let sentinel = FullCacheKey::asset_key_set_sentinel::<K>();
1043            let _ = self
1044                .whale
1045                .register(sentinel, None, Durability::volatile(), vec![]);
1046        }
1047    }
1048
1049    /// Get an asset by key without tracking dependencies.
1050    ///
1051    /// Unlike `QueryContext::asset()`, this method does NOT register the caller
1052    /// as a dependent of the asset. Use this for direct asset access outside
1053    /// of query execution.
1054    ///
1055    /// # Returns
1056    ///
1057    /// - `Ok(AssetLoadingState::ready(...))` - Asset is loaded and ready
1058    /// - `Ok(AssetLoadingState::loading(...))` - Asset is still loading (added to pending)
1059    /// - `Err(QueryError::MissingDependency)` - Asset was not found
1060    pub fn get_asset<K: AssetKey>(&self, key: K) -> Result<AssetLoadingState<K>, QueryError> {
1061        self.get_asset_internal(key)
1062    }
1063
1064    /// Internal: Get asset state, checking cache and locator.
1065    fn get_asset_internal<K: AssetKey>(&self, key: K) -> Result<AssetLoadingState<K>, QueryError> {
1066        let full_asset_key = FullAssetKey::new(&key);
1067        let full_cache_key = FullCacheKey::from_asset_key(&full_asset_key);
1068
1069        // Helper to emit AssetRequested event
1070        #[cfg(feature = "inspector")]
1071        let emit_requested = |runtime: &Self, key: &K, state: query_flow_inspector::AssetState| {
1072            runtime.emit(|| FlowEvent::AssetRequested {
1073                asset: query_flow_inspector::AssetKey::new(
1074                    std::any::type_name::<K>(),
1075                    format!("{:?}", key),
1076                ),
1077                state,
1078            });
1079        };
1080
1081        // Check cache first
1082        if let Some(state) = self.assets.get(&full_asset_key) {
1083            // Check if whale thinks it's valid
1084            if self.whale.is_valid(&full_cache_key) {
1085                return match state {
1086                    AssetState::Loading => {
1087                        #[cfg(feature = "inspector")]
1088                        emit_requested(self, &key, query_flow_inspector::AssetState::Loading);
1089                        Ok(AssetLoadingState::loading(key))
1090                    }
1091                    AssetState::Ready(arc) => {
1092                        #[cfg(feature = "inspector")]
1093                        emit_requested(self, &key, query_flow_inspector::AssetState::Ready);
1094                        match arc.downcast::<K::Asset>() {
1095                            Ok(value) => Ok(AssetLoadingState::ready(key, value)),
1096                            Err(_) => Err(QueryError::MissingDependency {
1097                                description: format!("Asset type mismatch: {:?}", key),
1098                            }),
1099                        }
1100                    }
1101                    AssetState::NotFound => {
1102                        #[cfg(feature = "inspector")]
1103                        emit_requested(self, &key, query_flow_inspector::AssetState::NotFound);
1104                        Err(QueryError::MissingDependency {
1105                            description: format!("Asset not found: {:?}", key),
1106                        })
1107                    }
1108                };
1109            }
1110        }
1111
1112        // Check if there's a registered locator
1113        if let Some(locator) = self.locators.get(TypeId::of::<K>()) {
1114            if let Some(state) = locator.locate_any(&key) {
1115                // Store the state
1116                self.assets.insert(full_asset_key.clone(), state.clone());
1117
1118                match state {
1119                    AssetState::Ready(arc) => {
1120                        #[cfg(feature = "inspector")]
1121                        emit_requested(self, &key, query_flow_inspector::AssetState::Ready);
1122
1123                        // Register with whale
1124                        let durability = Durability::new(key.durability().as_u8() as usize)
1125                            .unwrap_or(Durability::volatile());
1126                        self.whale
1127                            .register(full_cache_key, None, durability, vec![])
1128                            .expect("register with no dependencies cannot fail");
1129
1130                        match arc.downcast::<K::Asset>() {
1131                            Ok(value) => return Ok(AssetLoadingState::ready(key, value)),
1132                            Err(_) => {
1133                                return Err(QueryError::MissingDependency {
1134                                    description: format!("Asset type mismatch: {:?}", key),
1135                                })
1136                            }
1137                        }
1138                    }
1139                    AssetState::Loading => {
1140                        #[cfg(feature = "inspector")]
1141                        emit_requested(self, &key, query_flow_inspector::AssetState::Loading);
1142                        self.pending.insert::<K>(full_asset_key, key.clone());
1143
1144                        // Register in whale so queries can depend on this asset
1145                        self.whale
1146                            .register(full_cache_key, None, Durability::volatile(), vec![])
1147                            .expect("register with no dependencies cannot fail");
1148
1149                        return Ok(AssetLoadingState::loading(key));
1150                    }
1151                    AssetState::NotFound => {
1152                        #[cfg(feature = "inspector")]
1153                        emit_requested(self, &key, query_flow_inspector::AssetState::NotFound);
1154                        return Err(QueryError::MissingDependency {
1155                            description: format!("Asset not found: {:?}", key),
1156                        });
1157                    }
1158                }
1159            }
1160        }
1161
1162        // No locator registered or locator returned None - mark as pending
1163        #[cfg(feature = "inspector")]
1164        emit_requested(self, &key, query_flow_inspector::AssetState::Loading);
1165        self.assets
1166            .insert(full_asset_key.clone(), AssetState::Loading);
1167        self.pending
1168            .insert::<K>(full_asset_key.clone(), key.clone());
1169
1170        // Register in whale so queries can depend on this asset
1171        self.whale
1172            .register(full_cache_key, None, Durability::volatile(), vec![])
1173            .expect("register with no dependencies cannot fail");
1174
1175        Ok(AssetLoadingState::loading(key))
1176    }
1177}
1178
1179/// Context provided to queries during execution.
1180///
1181/// Use this to access dependencies via `query()`.
1182pub struct QueryContext<'a> {
1183    runtime: &'a QueryRuntime,
1184    #[cfg_attr(not(feature = "inspector"), allow(dead_code))]
1185    current_key: FullCacheKey,
1186    #[cfg(feature = "inspector")]
1187    parent_query_type: &'static str,
1188    #[cfg(feature = "inspector")]
1189    exec_ctx: ExecutionContext,
1190    deps: RefCell<Vec<FullCacheKey>>,
1191}
1192
1193impl<'a> QueryContext<'a> {
1194    /// Query a dependency.
1195    ///
1196    /// The dependency is automatically tracked for invalidation.
1197    ///
1198    /// # Example
1199    ///
1200    /// ```ignore
1201    /// fn query(&self, ctx: &mut QueryContext) -> Result<Self::Output, QueryError> {
1202    ///     let dep_result = ctx.query(OtherQuery { id: self.id })?;
1203    ///     Ok(process(&dep_result))
1204    /// }
1205    /// ```
1206    pub fn query<Q: Query>(&mut self, query: Q) -> Result<Arc<Q::Output>, QueryError> {
1207        let key = query.cache_key();
1208        let full_key = FullCacheKey::new::<Q, _>(&key);
1209
1210        // Emit dependency registered event
1211        #[cfg(feature = "inspector")]
1212        self.runtime.emit(|| FlowEvent::DependencyRegistered {
1213            span_id: self.exec_ctx.span_id(),
1214            parent: query_flow_inspector::QueryKey::new(
1215                self.parent_query_type,
1216                self.current_key.debug_repr(),
1217            ),
1218            dependency: query_flow_inspector::QueryKey::new(
1219                std::any::type_name::<Q>(),
1220                full_key.debug_repr(),
1221            ),
1222        });
1223
1224        // Record this as a dependency
1225        self.deps.borrow_mut().push(full_key.clone());
1226
1227        // Execute the query
1228        self.runtime.query(query)
1229    }
1230
1231    /// Access an asset, tracking it as a dependency.
1232    ///
1233    /// Returns `AssetLoadingState<K>`:
1234    /// - `is_loading()` if the asset is still being loaded
1235    /// - `is_ready()` if the asset is available
1236    ///
1237    /// Use `.suspend()?` to convert to `Result<Arc<K::Asset>, QueryError>`,
1238    /// which returns `Err(QueryError::Suspend { asset })` if still loading.
1239    ///
1240    /// # Example
1241    ///
1242    /// ```ignore
1243    /// #[query]
1244    /// fn process_file(ctx: &mut QueryContext, path: FilePath) -> Result<Output, QueryError> {
1245    ///     let content = ctx.asset(path)?.suspend()?;
1246    ///     // Process content...
1247    ///     Ok(output)
1248    /// }
1249    /// ```
1250    ///
1251    /// # Errors
1252    ///
1253    /// Returns `Err(QueryError::MissingDependency)` if the asset was not found.
1254    pub fn asset<K: AssetKey>(&mut self, key: K) -> Result<AssetLoadingState<K>, QueryError> {
1255        let full_asset_key = FullAssetKey::new(&key);
1256        let full_cache_key = FullCacheKey::from_asset_key(&full_asset_key);
1257
1258        // Emit asset dependency registered event
1259        #[cfg(feature = "inspector")]
1260        self.runtime.emit(|| FlowEvent::AssetDependencyRegistered {
1261            span_id: self.exec_ctx.span_id(),
1262            parent: query_flow_inspector::QueryKey::new(
1263                self.parent_query_type,
1264                self.current_key.debug_repr(),
1265            ),
1266            asset: query_flow_inspector::AssetKey::new(
1267                std::any::type_name::<K>(),
1268                format!("{:?}", key),
1269            ),
1270        });
1271
1272        // Record dependency on this asset
1273        self.deps.borrow_mut().push(full_cache_key);
1274
1275        // Get asset from runtime
1276        let result = self.runtime.get_asset_internal(key);
1277
1278        // Emit missing dependency event on error
1279        #[cfg(feature = "inspector")]
1280        if let Err(QueryError::MissingDependency { ref description }) = result {
1281            self.runtime.emit(|| FlowEvent::MissingDependency {
1282                query: query_flow_inspector::QueryKey::new(
1283                    self.parent_query_type,
1284                    self.current_key.debug_repr(),
1285                ),
1286                dependency_description: description.clone(),
1287            });
1288        }
1289
1290        result
1291    }
1292
1293    /// List all query instances of type Q that have been registered.
1294    ///
1295    /// This method establishes a dependency on the "set" of queries of type Q.
1296    /// The calling query will be invalidated when:
1297    /// - A new query of type Q is first executed (added to set)
1298    ///
1299    /// The calling query will NOT be invalidated when:
1300    /// - An individual query of type Q has its value change
1301    ///
1302    /// # Example
1303    ///
1304    /// ```ignore
1305    /// #[query]
1306    /// fn all_results(ctx: &mut QueryContext) -> Result<Vec<i32>, QueryError> {
1307    ///     let queries = ctx.list_queries::<MyQuery>();
1308    ///     let mut results = Vec::new();
1309    ///     for q in queries {
1310    ///         results.push(*ctx.query(q)?);
1311    ///     }
1312    ///     Ok(results)
1313    /// }
1314    /// ```
1315    pub fn list_queries<Q: Query>(&mut self) -> Vec<Q> {
1316        // Record dependency on the sentinel (set-level dependency)
1317        let sentinel = FullCacheKey::query_set_sentinel::<Q>();
1318
1319        #[cfg(feature = "inspector")]
1320        self.runtime.emit(|| FlowEvent::DependencyRegistered {
1321            span_id: self.exec_ctx.span_id(),
1322            parent: query_flow_inspector::QueryKey::new(
1323                self.parent_query_type,
1324                self.current_key.debug_repr(),
1325            ),
1326            dependency: query_flow_inspector::QueryKey::new("QuerySet", sentinel.debug_repr()),
1327        });
1328
1329        // Ensure sentinel exists in whale (for dependency tracking)
1330        if self.runtime.whale.get(&sentinel).is_none() {
1331            let _ =
1332                self.runtime
1333                    .whale
1334                    .register(sentinel.clone(), None, Durability::volatile(), vec![]);
1335        }
1336
1337        self.deps.borrow_mut().push(sentinel);
1338
1339        // Return all registered queries
1340        self.runtime.query_registry.get_all::<Q>()
1341    }
1342
1343    /// List all asset keys of type K that have been registered.
1344    ///
1345    /// This method establishes a dependency on the "set" of asset keys of type K.
1346    /// The calling query will be invalidated when:
1347    /// - A new asset of type K is resolved for the first time (added to set)
1348    /// - An asset of type K is removed via remove_asset
1349    ///
1350    /// The calling query will NOT be invalidated when:
1351    /// - An individual asset's value changes (use `ctx.asset()` for that)
1352    ///
1353    /// # Example
1354    ///
1355    /// ```ignore
1356    /// #[query]
1357    /// fn all_configs(ctx: &mut QueryContext) -> Result<Vec<String>, QueryError> {
1358    ///     let keys = ctx.list_asset_keys::<ConfigFile>();
1359    ///     let mut contents = Vec::new();
1360    ///     for key in keys {
1361    ///         let content = ctx.asset(&key)?.suspend()?;
1362    ///         contents.push((*content).clone());
1363    ///     }
1364    ///     Ok(contents)
1365    /// }
1366    /// ```
1367    pub fn list_asset_keys<K: AssetKey>(&mut self) -> Vec<K> {
1368        // Record dependency on the sentinel (set-level dependency)
1369        let sentinel = FullCacheKey::asset_key_set_sentinel::<K>();
1370
1371        #[cfg(feature = "inspector")]
1372        self.runtime.emit(|| FlowEvent::AssetDependencyRegistered {
1373            span_id: self.exec_ctx.span_id(),
1374            parent: query_flow_inspector::QueryKey::new(
1375                self.parent_query_type,
1376                self.current_key.debug_repr(),
1377            ),
1378            asset: query_flow_inspector::AssetKey::new("AssetKeySet", sentinel.debug_repr()),
1379        });
1380
1381        // Ensure sentinel exists in whale (for dependency tracking)
1382        if self.runtime.whale.get(&sentinel).is_none() {
1383            let _ =
1384                self.runtime
1385                    .whale
1386                    .register(sentinel.clone(), None, Durability::volatile(), vec![]);
1387        }
1388
1389        self.deps.borrow_mut().push(sentinel);
1390
1391        // Return all registered asset keys
1392        self.runtime.asset_key_registry.get_all::<K>()
1393    }
1394}
1395
1396#[cfg(test)]
1397mod tests {
1398    use super::*;
1399
1400    #[test]
1401    fn test_simple_query() {
1402        #[derive(Clone)]
1403        struct Add {
1404            a: i32,
1405            b: i32,
1406        }
1407
1408        impl Query for Add {
1409            type CacheKey = (i32, i32);
1410            type Output = i32;
1411
1412            fn cache_key(&self) -> Self::CacheKey {
1413                (self.a, self.b)
1414            }
1415
1416            fn query(&self, _ctx: &mut QueryContext) -> Result<Self::Output, QueryError> {
1417                Ok(self.a + self.b)
1418            }
1419
1420            fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
1421                old == new
1422            }
1423        }
1424
1425        let runtime = QueryRuntime::new();
1426
1427        let result = runtime.query(Add { a: 1, b: 2 }).unwrap();
1428        assert_eq!(*result, 3);
1429
1430        // Second query should be cached
1431        let result2 = runtime.query(Add { a: 1, b: 2 }).unwrap();
1432        assert_eq!(*result2, 3);
1433    }
1434
1435    #[test]
1436    fn test_dependent_queries() {
1437        #[derive(Clone)]
1438        struct Base {
1439            value: i32,
1440        }
1441
1442        impl Query for Base {
1443            type CacheKey = i32;
1444            type Output = i32;
1445
1446            fn cache_key(&self) -> Self::CacheKey {
1447                self.value
1448            }
1449
1450            fn query(&self, _ctx: &mut QueryContext) -> Result<Self::Output, QueryError> {
1451                Ok(self.value * 2)
1452            }
1453
1454            fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
1455                old == new
1456            }
1457        }
1458
1459        #[derive(Clone)]
1460        struct Derived {
1461            base_value: i32,
1462        }
1463
1464        impl Query for Derived {
1465            type CacheKey = i32;
1466            type Output = i32;
1467
1468            fn cache_key(&self) -> Self::CacheKey {
1469                self.base_value
1470            }
1471
1472            fn query(&self, ctx: &mut QueryContext) -> Result<Self::Output, QueryError> {
1473                let base = ctx.query(Base {
1474                    value: self.base_value,
1475                })?;
1476                Ok(*base + 10)
1477            }
1478
1479            fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
1480                old == new
1481            }
1482        }
1483
1484        let runtime = QueryRuntime::new();
1485
1486        let result = runtime.query(Derived { base_value: 5 }).unwrap();
1487        assert_eq!(*result, 20); // 5 * 2 + 10
1488    }
1489
1490    #[test]
1491    fn test_cycle_detection() {
1492        #[derive(Clone)]
1493        struct CycleA {
1494            id: i32,
1495        }
1496
1497        #[derive(Clone)]
1498        struct CycleB {
1499            id: i32,
1500        }
1501
1502        impl Query for CycleA {
1503            type CacheKey = i32;
1504            type Output = i32;
1505
1506            fn cache_key(&self) -> Self::CacheKey {
1507                self.id
1508            }
1509
1510            fn query(&self, ctx: &mut QueryContext) -> Result<Self::Output, QueryError> {
1511                let b = ctx.query(CycleB { id: self.id })?;
1512                Ok(*b + 1)
1513            }
1514
1515            fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
1516                old == new
1517            }
1518        }
1519
1520        impl Query for CycleB {
1521            type CacheKey = i32;
1522            type Output = i32;
1523
1524            fn cache_key(&self) -> Self::CacheKey {
1525                self.id
1526            }
1527
1528            fn query(&self, ctx: &mut QueryContext) -> Result<Self::Output, QueryError> {
1529                let a = ctx.query(CycleA { id: self.id })?;
1530                Ok(*a + 1)
1531            }
1532
1533            fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
1534                old == new
1535            }
1536        }
1537
1538        let runtime = QueryRuntime::new();
1539
1540        let result = runtime.query(CycleA { id: 1 });
1541        assert!(matches!(result, Err(QueryError::Cycle { .. })));
1542    }
1543
1544    #[test]
1545    fn test_fallible_query() {
1546        #[derive(Clone)]
1547        struct ParseInt {
1548            input: String,
1549        }
1550
1551        impl Query for ParseInt {
1552            type CacheKey = String;
1553            type Output = Result<i32, std::num::ParseIntError>;
1554
1555            fn cache_key(&self) -> Self::CacheKey {
1556                self.input.clone()
1557            }
1558
1559            fn query(&self, _ctx: &mut QueryContext) -> Result<Self::Output, QueryError> {
1560                Ok(self.input.parse())
1561            }
1562
1563            fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
1564                old == new
1565            }
1566        }
1567
1568        let runtime = QueryRuntime::new();
1569
1570        // Valid parse
1571        let result = runtime
1572            .query(ParseInt {
1573                input: "42".to_string(),
1574            })
1575            .unwrap();
1576        assert_eq!(*result, Ok(42));
1577
1578        // Invalid parse - system succeeds, user error in output
1579        let result = runtime
1580            .query(ParseInt {
1581                input: "not_a_number".to_string(),
1582            })
1583            .unwrap();
1584        assert!(result.is_err());
1585    }
1586
1587    // Macro tests
1588    mod macro_tests {
1589        use super::*;
1590        use crate::query;
1591
1592        #[query]
1593        fn add(ctx: &mut QueryContext, a: i32, b: i32) -> Result<i32, QueryError> {
1594            let _ = ctx; // silence unused warning
1595            Ok(a + b)
1596        }
1597
1598        #[test]
1599        fn test_macro_basic() {
1600            let runtime = QueryRuntime::new();
1601            let result = runtime.query(Add::new(1, 2)).unwrap();
1602            assert_eq!(*result, 3);
1603        }
1604
1605        #[query(durability = 2)]
1606        fn with_durability(ctx: &mut QueryContext, x: i32) -> Result<i32, QueryError> {
1607            let _ = ctx;
1608            Ok(x * 2)
1609        }
1610
1611        #[test]
1612        fn test_macro_durability() {
1613            let runtime = QueryRuntime::new();
1614            let result = runtime.query(WithDurability::new(5)).unwrap();
1615            assert_eq!(*result, 10);
1616        }
1617
1618        #[query(keys(id))]
1619        fn with_key_selection(
1620            ctx: &mut QueryContext,
1621            id: u32,
1622            include_extra: bool,
1623        ) -> Result<String, QueryError> {
1624            let _ = ctx;
1625            Ok(format!("id={}, extra={}", id, include_extra))
1626        }
1627
1628        #[test]
1629        fn test_macro_key_selection() {
1630            let runtime = QueryRuntime::new();
1631
1632            // Same id, different include_extra - should return cached
1633            let r1 = runtime.query(WithKeySelection::new(1, true)).unwrap();
1634            let r2 = runtime.query(WithKeySelection::new(1, false)).unwrap();
1635
1636            // Both should have same value because only `id` is the key
1637            assert_eq!(*r1, "id=1, extra=true");
1638            assert_eq!(*r2, "id=1, extra=true"); // Cached!
1639        }
1640
1641        #[query]
1642        fn dependent(ctx: &mut QueryContext, a: i32, b: i32) -> Result<i32, QueryError> {
1643            let sum = ctx.query(Add::new(*a, *b))?;
1644            Ok(*sum * 2)
1645        }
1646
1647        #[test]
1648        fn test_macro_dependencies() {
1649            let runtime = QueryRuntime::new();
1650            let result = runtime.query(Dependent::new(3, 4)).unwrap();
1651            assert_eq!(*result, 14); // (3 + 4) * 2
1652        }
1653
1654        #[query(output_eq)]
1655        fn with_output_eq(ctx: &mut QueryContext, x: i32) -> Result<i32, QueryError> {
1656            let _ = ctx;
1657            Ok(*x * 2)
1658        }
1659
1660        #[test]
1661        fn test_macro_output_eq() {
1662            let runtime = QueryRuntime::new();
1663            let result = runtime.query(WithOutputEq::new(5)).unwrap();
1664            assert_eq!(*result, 10);
1665        }
1666
1667        #[query(name = "CustomName")]
1668        fn original_name(ctx: &mut QueryContext, x: i32) -> Result<i32, QueryError> {
1669            let _ = ctx;
1670            Ok(*x)
1671        }
1672
1673        #[test]
1674        fn test_macro_custom_name() {
1675            let runtime = QueryRuntime::new();
1676            let result = runtime.query(CustomName::new(42)).unwrap();
1677            assert_eq!(*result, 42);
1678        }
1679
1680        // Test that attribute macros like #[tracing::instrument] are preserved
1681        // We use #[allow(unused_variables)] and #[inline] as test attributes since
1682        // they don't require external dependencies.
1683        #[allow(unused_variables)]
1684        #[inline]
1685        #[query]
1686        fn with_attributes(ctx: &mut QueryContext, x: i32) -> Result<i32, QueryError> {
1687            // This would warn without #[allow(unused_variables)] on the generated method
1688            let unused_var = 42;
1689            Ok(*x * 2)
1690        }
1691
1692        #[test]
1693        fn test_macro_preserves_attributes() {
1694            let runtime = QueryRuntime::new();
1695            // If attributes weren't preserved, this might warn about unused_var
1696            let result = runtime.query(WithAttributes::new(5)).unwrap();
1697            assert_eq!(*result, 10);
1698        }
1699    }
1700
1701    // Tests for poll() and changed_at()
1702    mod poll_tests {
1703        use super::*;
1704
1705        #[derive(Clone)]
1706        struct Counter {
1707            id: i32,
1708        }
1709
1710        impl Query for Counter {
1711            type CacheKey = i32;
1712            type Output = i32;
1713
1714            fn cache_key(&self) -> Self::CacheKey {
1715                self.id
1716            }
1717
1718            fn query(&self, _ctx: &mut QueryContext) -> Result<Self::Output, QueryError> {
1719                Ok(self.id * 10)
1720            }
1721
1722            fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
1723                old == new
1724            }
1725        }
1726
1727        #[test]
1728        fn test_poll_returns_value_and_revision() {
1729            let runtime = QueryRuntime::new();
1730
1731            let result = runtime.poll(Counter { id: 1 }).unwrap();
1732
1733            // Value should be correct - access through Result and Arc
1734            assert_eq!(**result.value.as_ref().unwrap(), 10);
1735
1736            // Revision should be non-zero after first execution
1737            assert!(result.revision > 0);
1738        }
1739
1740        #[test]
1741        fn test_poll_revision_stable_on_cache_hit() {
1742            let runtime = QueryRuntime::new();
1743
1744            // First poll
1745            let result1 = runtime.poll(Counter { id: 1 }).unwrap();
1746            let rev1 = result1.revision;
1747
1748            // Second poll (cache hit)
1749            let result2 = runtime.poll(Counter { id: 1 }).unwrap();
1750            let rev2 = result2.revision;
1751
1752            // Revision should be the same (no change)
1753            assert_eq!(rev1, rev2);
1754        }
1755
1756        #[test]
1757        fn test_poll_revision_changes_on_invalidate() {
1758            let runtime = QueryRuntime::new();
1759
1760            // First poll
1761            let result1 = runtime.poll(Counter { id: 1 }).unwrap();
1762            let rev1 = result1.revision;
1763
1764            // Invalidate and poll again
1765            runtime.invalidate::<Counter>(&1);
1766            let result2 = runtime.poll(Counter { id: 1 }).unwrap();
1767            let rev2 = result2.revision;
1768
1769            // Revision should increase (value was recomputed)
1770            // Note: Since output_eq returns true (same value), this might not change
1771            // depending on early cutoff behavior. Let's verify the value is still correct.
1772            assert_eq!(**result2.value.as_ref().unwrap(), 10);
1773
1774            // With early cutoff, revision might stay the same if value didn't change
1775            // This is expected behavior
1776            assert!(rev2 >= rev1);
1777        }
1778
1779        #[test]
1780        fn test_changed_at_returns_none_for_unexecuted_query() {
1781            let runtime = QueryRuntime::new();
1782
1783            // Query has never been executed
1784            let rev = runtime.changed_at::<Counter>(&1);
1785            assert!(rev.is_none());
1786        }
1787
1788        #[test]
1789        fn test_changed_at_returns_revision_after_execution() {
1790            let runtime = QueryRuntime::new();
1791
1792            // Execute the query
1793            let _ = runtime.query(Counter { id: 1 }).unwrap();
1794
1795            // Now changed_at should return Some
1796            let rev = runtime.changed_at::<Counter>(&1);
1797            assert!(rev.is_some());
1798            assert!(rev.unwrap() > 0);
1799        }
1800
1801        #[test]
1802        fn test_changed_at_matches_poll_revision() {
1803            let runtime = QueryRuntime::new();
1804
1805            // Poll the query
1806            let result = runtime.poll(Counter { id: 1 }).unwrap();
1807
1808            // changed_at should match the revision from poll
1809            let rev = runtime.changed_at::<Counter>(&1);
1810            assert_eq!(rev, Some(result.revision));
1811        }
1812
1813        #[test]
1814        fn test_poll_value_access() {
1815            let runtime = QueryRuntime::new();
1816
1817            let result = runtime.poll(Counter { id: 5 }).unwrap();
1818
1819            // Access through Result and Arc
1820            let value: &i32 = result.value.as_ref().unwrap();
1821            assert_eq!(*value, 50);
1822
1823            // Access Arc directly via field after unwrapping Result
1824            let arc: &Arc<i32> = result.value.as_ref().unwrap();
1825            assert_eq!(**arc, 50);
1826        }
1827
1828        #[test]
1829        fn test_subscription_pattern() {
1830            let runtime = QueryRuntime::new();
1831
1832            // Simulate subscription pattern
1833            let mut last_revision: RevisionCounter = 0;
1834            let mut notifications = 0;
1835
1836            // First poll - should notify (new value)
1837            let result = runtime.poll(Counter { id: 1 }).unwrap();
1838            if result.revision > last_revision {
1839                notifications += 1;
1840                last_revision = result.revision;
1841            }
1842
1843            // Second poll - should NOT notify (no change)
1844            let result = runtime.poll(Counter { id: 1 }).unwrap();
1845            if result.revision > last_revision {
1846                notifications += 1;
1847                last_revision = result.revision;
1848            }
1849
1850            // Third poll - should NOT notify (no change)
1851            let result = runtime.poll(Counter { id: 1 }).unwrap();
1852            if result.revision > last_revision {
1853                notifications += 1;
1854                #[allow(unused_assignments)]
1855                {
1856                    last_revision = result.revision;
1857                }
1858            }
1859
1860            // Only the first poll should have triggered a notification
1861            assert_eq!(notifications, 1);
1862        }
1863    }
1864}