query_flow/
runtime.rs

1//! Query runtime and context.
2
3use std::any::TypeId;
4use std::cell::RefCell;
5use std::sync::Arc;
6
7use whale::{Durability, Runtime as WhaleRuntime};
8
9use crate::asset::{AssetKey, AssetLocator, DurabilityLevel, FullAssetKey, PendingAsset};
10use crate::key::FullCacheKey;
11use crate::loading::LoadingState;
12use crate::query::Query;
13use crate::storage::{
14    AssetKeyRegistry, AssetState, AssetStorage, CacheStorage, CachedValue, LocatorStorage,
15    PendingStorage, QueryRegistry,
16};
17use crate::QueryError;
18
19/// Function type for comparing user errors during early cutoff.
20///
21/// Used by `QueryRuntimeBuilder::error_comparator` to customize how
22/// `QueryError::UserError` values are compared for caching purposes.
23pub type ErrorComparator = fn(&anyhow::Error, &anyhow::Error) -> bool;
24
25#[cfg(feature = "inspector")]
26use query_flow_inspector::{EventSink, FlowEvent, SpanId};
27
28/// Number of durability levels (matches whale's default).
29const DURABILITY_LEVELS: usize = 4;
30
31// Thread-local query execution stack for cycle detection.
32thread_local! {
33    static QUERY_STACK: RefCell<Vec<FullCacheKey>> = const { RefCell::new(Vec::new()) };
34}
35
36/// The query runtime manages query execution, caching, and dependency tracking.
37///
38/// This is cheap to clone - all data is behind `Arc`.
39///
40/// # Example
41///
42/// ```ignore
43/// let runtime = QueryRuntime::new();
44///
45/// // Sync query execution
46/// let result = runtime.query(MyQuery { ... })?;
47///
48/// // Async query execution (waits through Suspend)
49/// let result = runtime.query_async(MyQuery { ... }).await?;
50/// ```
51pub struct QueryRuntime {
52    /// Whale runtime for dependency tracking
53    whale: WhaleRuntime<FullCacheKey, (), DURABILITY_LEVELS>,
54    /// Cache for query outputs
55    cache: Arc<CacheStorage>,
56    /// Asset cache and state storage
57    assets: Arc<AssetStorage>,
58    /// Registered asset locators
59    locators: Arc<LocatorStorage>,
60    /// Pending asset requests
61    pending: Arc<PendingStorage>,
62    /// Registry for tracking query instances (for list_queries)
63    query_registry: Arc<QueryRegistry>,
64    /// Registry for tracking asset keys (for list_asset_keys)
65    asset_key_registry: Arc<AssetKeyRegistry>,
66    /// Comparator for user errors during early cutoff
67    error_comparator: ErrorComparator,
68    /// Event sink for inspector/tracing
69    #[cfg(feature = "inspector")]
70    sink: Arc<parking_lot::RwLock<Option<Arc<dyn EventSink>>>>,
71}
72
73impl Default for QueryRuntime {
74    fn default() -> Self {
75        Self::new()
76    }
77}
78
79impl Clone for QueryRuntime {
80    fn clone(&self) -> Self {
81        Self {
82            whale: self.whale.clone(),
83            cache: self.cache.clone(),
84            assets: self.assets.clone(),
85            locators: self.locators.clone(),
86            pending: self.pending.clone(),
87            query_registry: self.query_registry.clone(),
88            asset_key_registry: self.asset_key_registry.clone(),
89            error_comparator: self.error_comparator,
90            #[cfg(feature = "inspector")]
91            sink: self.sink.clone(),
92        }
93    }
94}
95
96/// Default error comparator that treats all errors as different.
97///
98/// This is conservative - it always triggers recomputation when an error occurs.
99fn default_error_comparator(_a: &anyhow::Error, _b: &anyhow::Error) -> bool {
100    false
101}
102
103impl QueryRuntime {
104    /// Create a new query runtime with default settings.
105    pub fn new() -> Self {
106        Self::builder().build()
107    }
108
109    /// Create a builder for customizing the runtime.
110    ///
111    /// # Example
112    ///
113    /// ```ignore
114    /// let runtime = QueryRuntime::builder()
115    ///     .error_comparator(|a, b| {
116    ///         // Custom error comparison logic
117    ///         match (a.downcast_ref::<MyError>(), b.downcast_ref::<MyError>()) {
118    ///             (Some(a), Some(b)) => a == b,
119    ///             _ => false,
120    ///         }
121    ///     })
122    ///     .build();
123    /// ```
124    pub fn builder() -> QueryRuntimeBuilder {
125        QueryRuntimeBuilder::new()
126    }
127
128    /// Set the event sink for tracing/inspection.
129    ///
130    /// Pass `Some(sink)` to enable event collection, or `None` to disable.
131    ///
132    /// # Example
133    ///
134    /// ```ignore
135    /// use query_flow_inspector::EventCollector;
136    /// use std::sync::Arc;
137    ///
138    /// let collector = Arc::new(EventCollector::new());
139    /// runtime.set_sink(Some(collector.clone()));
140    /// runtime.query(MyQuery::new());
141    /// let trace = collector.trace();
142    /// ```
143    #[cfg(feature = "inspector")]
144    pub fn set_sink(&self, sink: Option<Arc<dyn EventSink>>) {
145        *self.sink.write() = sink;
146    }
147
148    /// Get the current event sink.
149    #[cfg(feature = "inspector")]
150    pub fn sink(&self) -> Option<Arc<dyn EventSink>> {
151        self.sink.read().clone()
152    }
153
154    /// Emit an event to the sink if one is set.
155    #[cfg(feature = "inspector")]
156    #[inline]
157    fn emit<F: FnOnce() -> FlowEvent>(&self, event: F) {
158        let guard = self.sink.read();
159        if let Some(ref sink) = *guard {
160            sink.emit(event());
161        }
162    }
163
164    /// Execute a query synchronously.
165    ///
166    /// Returns the cached result if valid, otherwise executes the query.
167    ///
168    /// # Errors
169    ///
170    /// - `QueryError::Suspend` - Query is waiting for async loading
171    /// - `QueryError::Cycle` - Dependency cycle detected
172    pub fn query<Q: Query>(&self, query: Q) -> Result<Arc<Q::Output>, QueryError> {
173        let key = query.cache_key();
174        let full_key = FullCacheKey::new::<Q, _>(&key);
175
176        // Generate span ID and emit start event
177        #[cfg(feature = "inspector")]
178        let span_id = query_flow_inspector::new_span_id();
179        #[cfg(feature = "inspector")]
180        let start_time = std::time::Instant::now();
181        #[cfg(feature = "inspector")]
182        let query_key =
183            query_flow_inspector::QueryKey::new(std::any::type_name::<Q>(), full_key.debug_repr());
184
185        #[cfg(feature = "inspector")]
186        self.emit(|| FlowEvent::QueryStart {
187            span_id,
188            query: query_key.clone(),
189        });
190
191        // Check for cycles using thread-local stack
192        let cycle_detected = QUERY_STACK.with(|stack| {
193            let stack = stack.borrow();
194            stack.iter().any(|k| k == &full_key)
195        });
196
197        if cycle_detected {
198            let path = QUERY_STACK.with(|stack| {
199                let stack = stack.borrow();
200                let mut path: Vec<String> =
201                    stack.iter().map(|k| k.debug_repr().to_string()).collect();
202                path.push(full_key.debug_repr().to_string());
203                path
204            });
205
206            #[cfg(feature = "inspector")]
207            self.emit(|| FlowEvent::CycleDetected {
208                path: path
209                    .iter()
210                    .map(|s| query_flow_inspector::QueryKey::new("", s.clone()))
211                    .collect(),
212            });
213
214            #[cfg(feature = "inspector")]
215            self.emit(|| FlowEvent::QueryEnd {
216                span_id,
217                query: query_key.clone(),
218                result: query_flow_inspector::ExecutionResult::CycleDetected,
219                duration: start_time.elapsed(),
220            });
221
222            return Err(QueryError::Cycle { path });
223        }
224
225        // Check if cached and valid
226        if let Some(cached) = self.get_cached_if_valid::<Q>(&full_key) {
227            #[cfg(feature = "inspector")]
228            self.emit(|| FlowEvent::CacheCheck {
229                span_id,
230                query: query_key.clone(),
231                valid: true,
232            });
233
234            #[cfg(feature = "inspector")]
235            self.emit(|| FlowEvent::QueryEnd {
236                span_id,
237                query: query_key.clone(),
238                result: query_flow_inspector::ExecutionResult::CacheHit,
239                duration: start_time.elapsed(),
240            });
241
242            return match cached {
243                CachedValue::Ok(output) => Ok(output),
244                CachedValue::UserError(err) => Err(QueryError::UserError(err)),
245            };
246        }
247
248        #[cfg(feature = "inspector")]
249        self.emit(|| FlowEvent::CacheCheck {
250            span_id,
251            query: query_key.clone(),
252            valid: false,
253        });
254
255        // Execute the query with cycle tracking
256        QUERY_STACK.with(|stack| {
257            stack.borrow_mut().push(full_key.clone());
258        });
259
260        #[cfg(feature = "inspector")]
261        let result = self.execute_query::<Q>(&query, &full_key, span_id);
262        #[cfg(not(feature = "inspector"))]
263        let result = self.execute_query::<Q>(&query, &full_key);
264
265        QUERY_STACK.with(|stack| {
266            stack.borrow_mut().pop();
267        });
268
269        // Emit end event
270        #[cfg(feature = "inspector")]
271        {
272            let exec_result = match &result {
273                Ok((_, true)) => query_flow_inspector::ExecutionResult::Changed,
274                Ok((_, false)) => query_flow_inspector::ExecutionResult::Unchanged,
275                Err(QueryError::Suspend) => query_flow_inspector::ExecutionResult::Suspended,
276                Err(QueryError::Cycle { .. }) => {
277                    query_flow_inspector::ExecutionResult::CycleDetected
278                }
279                Err(e) => query_flow_inspector::ExecutionResult::Error {
280                    message: format!("{:?}", e),
281                },
282            };
283            self.emit(|| FlowEvent::QueryEnd {
284                span_id,
285                query: query_key.clone(),
286                result: exec_result,
287                duration: start_time.elapsed(),
288            });
289        }
290
291        result.map(|(output, _)| output)
292    }
293
294    /// Execute a query, caching the result if appropriate.
295    ///
296    /// Returns (output, output_changed) tuple for instrumentation.
297    #[cfg(feature = "inspector")]
298    fn execute_query<Q: Query>(
299        &self,
300        query: &Q,
301        full_key: &FullCacheKey,
302        span_id: SpanId,
303    ) -> Result<(Arc<Q::Output>, bool), QueryError> {
304        // Create context for this query execution
305        let mut ctx = QueryContext {
306            runtime: self,
307            current_key: full_key.clone(),
308            parent_query_type: std::any::type_name::<Q>(),
309            span_id,
310            deps: RefCell::new(Vec::new()),
311        };
312
313        // Execute the query
314        let result = query.query(&mut ctx);
315
316        // Get collected dependencies
317        let deps: Vec<FullCacheKey> = ctx.deps.borrow().clone();
318
319        // Get durability for whale registration
320        let durability =
321            Durability::new(query.durability() as usize).unwrap_or(Durability::volatile());
322
323        match result {
324            Ok(output) => {
325                let output = Arc::new(output);
326
327                // Check if output changed (for early cutoff)
328                let output_changed =
329                    if let Some(CachedValue::Ok(old)) = self.cache.get_cached::<Q>(full_key) {
330                        !Q::output_eq(&old, &output)
331                    } else {
332                        true // No previous value or was error, so "changed"
333                    };
334
335                // Emit early cutoff check event
336                self.emit(|| FlowEvent::EarlyCutoffCheck {
337                    span_id,
338                    query: query_flow_inspector::QueryKey::new(
339                        std::any::type_name::<Q>(),
340                        full_key.debug_repr(),
341                    ),
342                    output_changed,
343                });
344
345                // Update cache
346                self.cache.insert_ok::<Q>(full_key.clone(), output.clone());
347
348                // Update whale dependency tracking
349                if output_changed {
350                    let _ = self.whale.register(full_key.clone(), (), durability, deps);
351                } else {
352                    let _ = self.whale.confirm_unchanged(full_key, deps);
353                }
354
355                // Register query in registry for list_queries
356                let is_new_query = self.query_registry.register(query);
357                if is_new_query {
358                    let sentinel = FullCacheKey::query_set_sentinel::<Q>();
359                    let _ = self
360                        .whale
361                        .register(sentinel, (), Durability::volatile(), vec![]);
362                }
363
364                Ok((output, output_changed))
365            }
366            Err(QueryError::UserError(err)) => {
367                // Check if error changed (for early cutoff)
368                let output_changed = if let Some(CachedValue::UserError(old_err)) =
369                    self.cache.get_cached::<Q>(full_key)
370                {
371                    !(self.error_comparator)(old_err.as_ref(), err.as_ref())
372                } else {
373                    true // No previous error or was Ok, so "changed"
374                };
375
376                // Emit early cutoff check event
377                self.emit(|| FlowEvent::EarlyCutoffCheck {
378                    span_id,
379                    query: query_flow_inspector::QueryKey::new(
380                        std::any::type_name::<Q>(),
381                        full_key.debug_repr(),
382                    ),
383                    output_changed,
384                });
385
386                // Update cache with error
387                self.cache.insert_error(full_key.clone(), err.clone());
388
389                // Update whale dependency tracking
390                if output_changed {
391                    let _ = self.whale.register(full_key.clone(), (), durability, deps);
392                } else {
393                    let _ = self.whale.confirm_unchanged(full_key, deps);
394                }
395
396                // Register query in registry for list_queries
397                let is_new_query = self.query_registry.register(query);
398                if is_new_query {
399                    let sentinel = FullCacheKey::query_set_sentinel::<Q>();
400                    let _ = self
401                        .whale
402                        .register(sentinel, (), Durability::volatile(), vec![]);
403                }
404
405                Err(QueryError::UserError(err))
406            }
407            Err(e) => {
408                // System errors (Suspend, Cycle, Cancelled, MissingDependency) are not cached
409                Err(e)
410            }
411        }
412    }
413
414    /// Execute a query, caching the result if appropriate.
415    ///
416    /// Returns (output, output_changed) tuple for instrumentation.
417    #[cfg(not(feature = "inspector"))]
418    fn execute_query<Q: Query>(
419        &self,
420        query: &Q,
421        full_key: &FullCacheKey,
422    ) -> Result<(Arc<Q::Output>, bool), QueryError> {
423        // Create context for this query execution
424        let mut ctx = QueryContext {
425            runtime: self,
426            current_key: full_key.clone(),
427            deps: RefCell::new(Vec::new()),
428        };
429
430        // Execute the query
431        let result = query.query(&mut ctx);
432
433        // Get collected dependencies
434        let deps: Vec<FullCacheKey> = ctx.deps.borrow().clone();
435
436        // Get durability for whale registration
437        let durability =
438            Durability::new(query.durability() as usize).unwrap_or(Durability::volatile());
439
440        match result {
441            Ok(output) => {
442                let output = Arc::new(output);
443
444                // Check if output changed (for early cutoff)
445                let output_changed =
446                    if let Some(CachedValue::Ok(old)) = self.cache.get_cached::<Q>(full_key) {
447                        !Q::output_eq(&old, &output)
448                    } else {
449                        true // No previous value or was error, so "changed"
450                    };
451
452                // Update cache
453                self.cache.insert_ok::<Q>(full_key.clone(), output.clone());
454
455                // Update whale dependency tracking
456                if output_changed {
457                    let _ = self.whale.register(full_key.clone(), (), durability, deps);
458                } else {
459                    let _ = self.whale.confirm_unchanged(full_key, deps);
460                }
461
462                // Register query in registry for list_queries
463                let is_new_query = self.query_registry.register(query);
464                if is_new_query {
465                    let sentinel = FullCacheKey::query_set_sentinel::<Q>();
466                    let _ = self
467                        .whale
468                        .register(sentinel, (), Durability::volatile(), vec![]);
469                }
470
471                Ok((output, output_changed))
472            }
473            Err(QueryError::UserError(err)) => {
474                // Check if error changed (for early cutoff)
475                let output_changed = if let Some(CachedValue::UserError(old_err)) =
476                    self.cache.get_cached::<Q>(full_key)
477                {
478                    !(self.error_comparator)(old_err.as_ref(), err.as_ref())
479                } else {
480                    true // No previous error or was Ok, so "changed"
481                };
482
483                // Update cache with error
484                self.cache.insert_error(full_key.clone(), err.clone());
485
486                // Update whale dependency tracking
487                if output_changed {
488                    let _ = self.whale.register(full_key.clone(), (), durability, deps);
489                } else {
490                    let _ = self.whale.confirm_unchanged(full_key, deps);
491                }
492
493                // Register query in registry for list_queries
494                let is_new_query = self.query_registry.register(query);
495                if is_new_query {
496                    let sentinel = FullCacheKey::query_set_sentinel::<Q>();
497                    let _ = self
498                        .whale
499                        .register(sentinel, (), Durability::volatile(), vec![]);
500                }
501
502                Err(QueryError::UserError(err))
503            }
504            Err(e) => {
505                // System errors (Suspend, Cycle, Cancelled, MissingDependency) are not cached
506                Err(e)
507            }
508        }
509    }
510
511    /// Get cached result (Ok or UserError) if it's still valid.
512    fn get_cached_if_valid<Q: Query>(
513        &self,
514        full_key: &FullCacheKey,
515    ) -> Option<CachedValue<Arc<Q::Output>>> {
516        // Check whale validity first
517        if !self.whale.is_valid(full_key) {
518            return None;
519        }
520
521        // Then check if we have the cached value
522        self.cache.get_cached::<Q>(full_key)
523    }
524
525    /// Invalidate a query, forcing recomputation on next access.
526    ///
527    /// This also invalidates any queries that depend on this one.
528    pub fn invalidate<Q: Query>(&self, key: &Q::CacheKey) {
529        let full_key = FullCacheKey::new::<Q, _>(key);
530
531        #[cfg(feature = "inspector")]
532        self.emit(|| FlowEvent::QueryInvalidated {
533            query: query_flow_inspector::QueryKey::new(
534                std::any::type_name::<Q>(),
535                full_key.debug_repr(),
536            ),
537            reason: query_flow_inspector::InvalidationReason::ManualInvalidation,
538        });
539
540        // Remove from cache
541        self.cache.remove(&full_key);
542
543        // Update whale to invalidate dependents (register with new changed_at)
544        let _ = self
545            .whale
546            .register(full_key, (), Durability::volatile(), vec![]);
547    }
548
549    /// Clear all cached values.
550    pub fn clear_cache(&self) {
551        self.cache.clear();
552    }
553}
554
555// ============================================================================
556// Builder
557// ============================================================================
558
559/// Builder for [`QueryRuntime`] with customizable settings.
560///
561/// # Example
562///
563/// ```ignore
564/// let runtime = QueryRuntime::builder()
565///     .error_comparator(|a, b| {
566///         // Treat all errors of the same type as equal
567///         a.downcast_ref::<std::io::Error>().is_some()
568///             == b.downcast_ref::<std::io::Error>().is_some()
569///     })
570///     .build();
571/// ```
572pub struct QueryRuntimeBuilder {
573    error_comparator: ErrorComparator,
574}
575
576impl Default for QueryRuntimeBuilder {
577    fn default() -> Self {
578        Self::new()
579    }
580}
581
582impl QueryRuntimeBuilder {
583    /// Create a new builder with default settings.
584    pub fn new() -> Self {
585        Self {
586            error_comparator: default_error_comparator,
587        }
588    }
589
590    /// Set the error comparator function for early cutoff optimization.
591    ///
592    /// When a query returns `QueryError::UserError`, this function is used
593    /// to compare it with the previously cached error. If they are equal,
594    /// downstream queries can skip recomputation (early cutoff).
595    ///
596    /// The default comparator returns `false` for all errors, meaning errors
597    /// are always considered different (conservative, always recomputes).
598    ///
599    /// # Example
600    ///
601    /// ```ignore
602    /// // Treat errors as equal if they have the same display message
603    /// let runtime = QueryRuntime::builder()
604    ///     .error_comparator(|a, b| a.to_string() == b.to_string())
605    ///     .build();
606    /// ```
607    pub fn error_comparator(mut self, f: ErrorComparator) -> Self {
608        self.error_comparator = f;
609        self
610    }
611
612    /// Build the runtime with the configured settings.
613    pub fn build(self) -> QueryRuntime {
614        QueryRuntime {
615            whale: WhaleRuntime::new(),
616            cache: Arc::new(CacheStorage::new()),
617            assets: Arc::new(AssetStorage::new()),
618            locators: Arc::new(LocatorStorage::new()),
619            pending: Arc::new(PendingStorage::new()),
620            query_registry: Arc::new(QueryRegistry::new()),
621            asset_key_registry: Arc::new(AssetKeyRegistry::new()),
622            error_comparator: self.error_comparator,
623            #[cfg(feature = "inspector")]
624            sink: Arc::new(parking_lot::RwLock::new(None)),
625        }
626    }
627}
628
629// ============================================================================
630// Asset API
631// ============================================================================
632
633impl QueryRuntime {
634    /// Register an asset locator for a specific asset key type.
635    ///
636    /// Only one locator can be registered per key type. Later registrations
637    /// replace earlier ones.
638    ///
639    /// # Example
640    ///
641    /// ```ignore
642    /// let runtime = QueryRuntime::new();
643    /// runtime.register_asset_locator(FileSystemLocator::new("/assets"));
644    /// ```
645    pub fn register_asset_locator<K, L>(&self, locator: L)
646    where
647        K: AssetKey,
648        L: AssetLocator<K>,
649    {
650        self.locators.insert::<K, L>(locator);
651    }
652
653    /// Get an iterator over pending asset requests.
654    ///
655    /// Returns assets that have been requested but not yet resolved.
656    /// The user should fetch these externally and call `resolve_asset()`.
657    ///
658    /// # Example
659    ///
660    /// ```ignore
661    /// for pending in runtime.pending_assets() {
662    ///     if let Some(path) = pending.key::<FilePath>() {
663    ///         let content = fetch_file(path);
664    ///         runtime.resolve_asset(path.clone(), content);
665    ///     }
666    /// }
667    /// ```
668    pub fn pending_assets(&self) -> Vec<PendingAsset> {
669        self.pending.get_all()
670    }
671
672    /// Get pending assets filtered by key type.
673    pub fn pending_assets_of<K: AssetKey>(&self) -> Vec<K> {
674        self.pending.get_of_type::<K>()
675    }
676
677    /// Check if there are any pending assets.
678    pub fn has_pending_assets(&self) -> bool {
679        !self.pending.is_empty()
680    }
681
682    /// Resolve an asset with its loaded value.
683    ///
684    /// This marks the asset as ready and invalidates any queries that
685    /// depend on it (if the value changed), triggering recomputation on next access.
686    ///
687    /// This method is idempotent - resolving with the same value (via `asset_eq`)
688    /// will not trigger downstream recomputation.
689    ///
690    /// Uses the asset key's default durability.
691    ///
692    /// # Example
693    ///
694    /// ```ignore
695    /// let content = std::fs::read_to_string(&path)?;
696    /// runtime.resolve_asset(FilePath(path), content);
697    /// ```
698    pub fn resolve_asset<K: AssetKey>(&self, key: K, value: K::Asset) {
699        let durability = key.durability();
700        self.resolve_asset_internal(key, value, durability);
701    }
702
703    /// Resolve an asset with a specific durability level.
704    ///
705    /// Use this to override the asset key's default durability.
706    pub fn resolve_asset_with_durability<K: AssetKey>(
707        &self,
708        key: K,
709        value: K::Asset,
710        durability: DurabilityLevel,
711    ) {
712        self.resolve_asset_internal(key, value, durability);
713    }
714
715    fn resolve_asset_internal<K: AssetKey>(
716        &self,
717        key: K,
718        value: K::Asset,
719        durability_level: DurabilityLevel,
720    ) {
721        let full_asset_key = FullAssetKey::new(&key);
722        let full_cache_key = FullCacheKey::from_asset_key(&full_asset_key);
723
724        // Check for early cutoff
725        let changed = if let Some(old_value) = self.assets.get_ready::<K>(&full_asset_key) {
726            !K::asset_eq(&old_value, &value)
727        } else {
728            true // Loading or NotFound -> Ready is a change
729        };
730
731        // Emit asset resolved event
732        #[cfg(feature = "inspector")]
733        self.emit(|| FlowEvent::AssetResolved {
734            asset: query_flow_inspector::AssetKey::new(
735                std::any::type_name::<K>(),
736                format!("{:?}", key),
737            ),
738            changed,
739        });
740
741        // Store the new value
742        self.assets
743            .insert_ready::<K>(full_asset_key.clone(), Arc::new(value));
744
745        // Remove from pending
746        self.pending.remove(&full_asset_key);
747
748        // Update whale dependency tracking
749        let durability =
750            Durability::new(durability_level.as_u8() as usize).unwrap_or(Durability::volatile());
751
752        if changed {
753            // Register with new changed_at to invalidate dependents
754            let _ = self.whale.register(full_cache_key, (), durability, vec![]);
755        } else {
756            // Early cutoff - keep old changed_at
757            let _ = self.whale.confirm_unchanged(&full_cache_key, vec![]);
758        }
759
760        // Register asset key in registry for list_asset_keys
761        let is_new_asset = self.asset_key_registry.register(&key);
762        if is_new_asset {
763            // Update sentinel to invalidate list_asset_keys dependents
764            let sentinel = FullCacheKey::asset_key_set_sentinel::<K>();
765            let _ = self
766                .whale
767                .register(sentinel, (), Durability::volatile(), vec![]);
768        }
769    }
770
771    /// Invalidate an asset, forcing queries to re-request it.
772    ///
773    /// The asset will be marked as loading and added to pending assets.
774    /// Dependent queries will suspend until the asset is resolved again.
775    ///
776    /// # Example
777    ///
778    /// ```ignore
779    /// // File was modified externally
780    /// runtime.invalidate_asset(&FilePath("config.json".into()));
781    /// // Queries depending on this asset will now suspend
782    /// // User should fetch the new value and call resolve_asset
783    /// ```
784    pub fn invalidate_asset<K: AssetKey>(&self, key: &K) {
785        let full_asset_key = FullAssetKey::new(key);
786        let full_cache_key = FullCacheKey::from_asset_key(&full_asset_key);
787
788        // Emit asset invalidated event
789        #[cfg(feature = "inspector")]
790        self.emit(|| FlowEvent::AssetInvalidated {
791            asset: query_flow_inspector::AssetKey::new(
792                std::any::type_name::<K>(),
793                format!("{:?}", key),
794            ),
795        });
796
797        // Mark as loading
798        self.assets
799            .insert(full_asset_key.clone(), AssetState::Loading);
800
801        // Add to pending
802        self.pending.insert::<K>(full_asset_key, key.clone());
803
804        // Update whale to invalidate dependents (use volatile during loading)
805        let _ = self
806            .whale
807            .register(full_cache_key, (), Durability::volatile(), vec![]);
808    }
809
810    /// Remove an asset from the cache entirely.
811    ///
812    /// Unlike `invalidate_asset`, this removes all traces of the asset.
813    /// Dependent queries will go through the locator again on next access.
814    pub fn remove_asset<K: AssetKey>(&self, key: &K) {
815        let full_asset_key = FullAssetKey::new(key);
816        let full_cache_key = FullCacheKey::from_asset_key(&full_asset_key);
817
818        // First, register a change to invalidate dependents
819        // This ensures queries that depend on this asset will recompute
820        let _ = self
821            .whale
822            .register(full_cache_key.clone(), (), Durability::volatile(), vec![]);
823
824        // Then remove the asset from storage
825        self.assets.remove(&full_asset_key);
826        self.pending.remove(&full_asset_key);
827
828        // Finally remove from whale tracking
829        self.whale.remove(&full_cache_key);
830
831        // Remove from registry and update sentinel for list_asset_keys
832        if self.asset_key_registry.remove::<K>(key) {
833            let sentinel = FullCacheKey::asset_key_set_sentinel::<K>();
834            let _ = self
835                .whale
836                .register(sentinel, (), Durability::volatile(), vec![]);
837        }
838    }
839
840    /// Internal: Get asset state, checking cache and locator.
841    fn get_asset_internal<K: AssetKey>(
842        &self,
843        key: &K,
844    ) -> Result<LoadingState<Arc<K::Asset>>, QueryError> {
845        let full_asset_key = FullAssetKey::new(key);
846        let full_cache_key = FullCacheKey::from_asset_key(&full_asset_key);
847
848        // Helper to emit AssetRequested event
849        #[cfg(feature = "inspector")]
850        let emit_requested = |runtime: &Self, state: query_flow_inspector::AssetState| {
851            runtime.emit(|| FlowEvent::AssetRequested {
852                asset: query_flow_inspector::AssetKey::new(
853                    std::any::type_name::<K>(),
854                    format!("{:?}", key),
855                ),
856                state,
857            });
858        };
859
860        // Check cache first
861        if let Some(state) = self.assets.get(&full_asset_key) {
862            // Check if whale thinks it's valid
863            if self.whale.is_valid(&full_cache_key) {
864                return match state {
865                    AssetState::Loading => {
866                        #[cfg(feature = "inspector")]
867                        emit_requested(self, query_flow_inspector::AssetState::Loading);
868                        Ok(LoadingState::Loading)
869                    }
870                    AssetState::Ready(arc) => {
871                        #[cfg(feature = "inspector")]
872                        emit_requested(self, query_flow_inspector::AssetState::Ready);
873                        match arc.downcast::<K::Asset>() {
874                            Ok(value) => Ok(LoadingState::Ready(value)),
875                            Err(_) => Err(QueryError::MissingDependency {
876                                description: format!("Asset type mismatch: {:?}", key),
877                            }),
878                        }
879                    }
880                    AssetState::NotFound => {
881                        #[cfg(feature = "inspector")]
882                        emit_requested(self, query_flow_inspector::AssetState::NotFound);
883                        Err(QueryError::MissingDependency {
884                            description: format!("Asset not found: {:?}", key),
885                        })
886                    }
887                };
888            }
889        }
890
891        // Check if there's a registered locator
892        if let Some(locator) = self.locators.get(TypeId::of::<K>()) {
893            if let Some(state) = locator.locate_any(key) {
894                // Store the state
895                self.assets.insert(full_asset_key.clone(), state.clone());
896
897                match state {
898                    AssetState::Ready(arc) => {
899                        #[cfg(feature = "inspector")]
900                        emit_requested(self, query_flow_inspector::AssetState::Ready);
901
902                        // Register with whale
903                        let durability = Durability::new(key.durability().as_u8() as usize)
904                            .unwrap_or(Durability::volatile());
905                        let _ = self.whale.register(full_cache_key, (), durability, vec![]);
906
907                        match arc.downcast::<K::Asset>() {
908                            Ok(value) => return Ok(LoadingState::Ready(value)),
909                            Err(_) => {
910                                return Err(QueryError::MissingDependency {
911                                    description: format!("Asset type mismatch: {:?}", key),
912                                })
913                            }
914                        }
915                    }
916                    AssetState::Loading => {
917                        #[cfg(feature = "inspector")]
918                        emit_requested(self, query_flow_inspector::AssetState::Loading);
919                        self.pending.insert::<K>(full_asset_key, key.clone());
920                        return Ok(LoadingState::Loading);
921                    }
922                    AssetState::NotFound => {
923                        #[cfg(feature = "inspector")]
924                        emit_requested(self, query_flow_inspector::AssetState::NotFound);
925                        return Err(QueryError::MissingDependency {
926                            description: format!("Asset not found: {:?}", key),
927                        });
928                    }
929                }
930            }
931        }
932
933        // No locator registered or locator returned None - mark as pending
934        #[cfg(feature = "inspector")]
935        emit_requested(self, query_flow_inspector::AssetState::Loading);
936        self.assets
937            .insert(full_asset_key.clone(), AssetState::Loading);
938        self.pending.insert::<K>(full_asset_key, key.clone());
939        Ok(LoadingState::Loading)
940    }
941}
942
943/// Context provided to queries during execution.
944///
945/// Use this to access dependencies via `query()`.
946#[cfg(feature = "inspector")]
947pub struct QueryContext<'a> {
948    runtime: &'a QueryRuntime,
949    current_key: FullCacheKey,
950    parent_query_type: &'static str,
951    span_id: SpanId,
952    deps: RefCell<Vec<FullCacheKey>>,
953}
954
955/// Context provided to queries during execution.
956///
957/// Use this to access dependencies via `query()`.
958#[cfg(not(feature = "inspector"))]
959pub struct QueryContext<'a> {
960    runtime: &'a QueryRuntime,
961    #[allow(dead_code)]
962    current_key: FullCacheKey,
963    deps: RefCell<Vec<FullCacheKey>>,
964}
965
966impl<'a> QueryContext<'a> {
967    /// Query a dependency.
968    ///
969    /// The dependency is automatically tracked for invalidation.
970    ///
971    /// # Example
972    ///
973    /// ```ignore
974    /// fn query(&self, ctx: &mut QueryContext) -> Result<Self::Output, QueryError> {
975    ///     let dep_result = ctx.query(OtherQuery { id: self.id })?;
976    ///     Ok(process(&dep_result))
977    /// }
978    /// ```
979    pub fn query<Q: Query>(&mut self, query: Q) -> Result<Arc<Q::Output>, QueryError> {
980        let key = query.cache_key();
981        let full_key = FullCacheKey::new::<Q, _>(&key);
982
983        // Emit dependency registered event
984        #[cfg(feature = "inspector")]
985        self.runtime.emit(|| FlowEvent::DependencyRegistered {
986            span_id: self.span_id,
987            parent: query_flow_inspector::QueryKey::new(
988                self.parent_query_type,
989                self.current_key.debug_repr(),
990            ),
991            dependency: query_flow_inspector::QueryKey::new(
992                std::any::type_name::<Q>(),
993                full_key.debug_repr(),
994            ),
995        });
996
997        // Record this as a dependency
998        self.deps.borrow_mut().push(full_key.clone());
999
1000        // Execute the query
1001        self.runtime.query(query)
1002    }
1003
1004    /// Access an asset, tracking it as a dependency.
1005    ///
1006    /// Returns `LoadingState<Arc<K::Asset>>`:
1007    /// - `LoadingState::Loading` if the asset is still being loaded
1008    /// - `LoadingState::Ready(value)` if the asset is available
1009    ///
1010    /// # Example
1011    ///
1012    /// ```ignore
1013    /// #[query]
1014    /// fn process_file(ctx: &mut QueryContext, path: FilePath) -> Result<Output, QueryError> {
1015    ///     let content = ctx.asset(&path)?.suspend()?;
1016    ///     // Process content...
1017    ///     Ok(output)
1018    /// }
1019    /// ```
1020    ///
1021    /// # Errors
1022    ///
1023    /// Returns `Err(QueryError::MissingDependency)` if the asset was not found.
1024    pub fn asset<K: AssetKey>(
1025        &mut self,
1026        key: &K,
1027    ) -> Result<LoadingState<Arc<K::Asset>>, QueryError> {
1028        let full_asset_key = FullAssetKey::new(key);
1029        let full_cache_key = FullCacheKey::from_asset_key(&full_asset_key);
1030
1031        // Emit asset dependency registered event
1032        #[cfg(feature = "inspector")]
1033        self.runtime.emit(|| FlowEvent::AssetDependencyRegistered {
1034            span_id: self.span_id,
1035            parent: query_flow_inspector::QueryKey::new(
1036                self.parent_query_type,
1037                self.current_key.debug_repr(),
1038            ),
1039            asset: query_flow_inspector::AssetKey::new(
1040                std::any::type_name::<K>(),
1041                format!("{:?}", key),
1042            ),
1043        });
1044
1045        // Record dependency on this asset
1046        self.deps.borrow_mut().push(full_cache_key);
1047
1048        // Get asset from runtime
1049        let result = self.runtime.get_asset_internal(key);
1050
1051        // Emit missing dependency event on error
1052        #[cfg(feature = "inspector")]
1053        if let Err(QueryError::MissingDependency { ref description }) = result {
1054            self.runtime.emit(|| FlowEvent::MissingDependency {
1055                query: query_flow_inspector::QueryKey::new(
1056                    self.parent_query_type,
1057                    self.current_key.debug_repr(),
1058                ),
1059                dependency_description: description.clone(),
1060            });
1061        }
1062
1063        result
1064    }
1065
1066    /// List all query instances of type Q that have been registered.
1067    ///
1068    /// This method establishes a dependency on the "set" of queries of type Q.
1069    /// The calling query will be invalidated when:
1070    /// - A new query of type Q is first executed (added to set)
1071    ///
1072    /// The calling query will NOT be invalidated when:
1073    /// - An individual query of type Q has its value change
1074    ///
1075    /// # Example
1076    ///
1077    /// ```ignore
1078    /// #[query]
1079    /// fn all_results(ctx: &mut QueryContext) -> Result<Vec<i32>, QueryError> {
1080    ///     let queries = ctx.list_queries::<MyQuery>();
1081    ///     let mut results = Vec::new();
1082    ///     for q in queries {
1083    ///         results.push(*ctx.query(q)?);
1084    ///     }
1085    ///     Ok(results)
1086    /// }
1087    /// ```
1088    pub fn list_queries<Q: Query>(&mut self) -> Vec<Q> {
1089        // Record dependency on the sentinel (set-level dependency)
1090        let sentinel = FullCacheKey::query_set_sentinel::<Q>();
1091
1092        #[cfg(feature = "inspector")]
1093        self.runtime.emit(|| FlowEvent::DependencyRegistered {
1094            span_id: self.span_id,
1095            parent: query_flow_inspector::QueryKey::new(
1096                self.parent_query_type,
1097                self.current_key.debug_repr(),
1098            ),
1099            dependency: query_flow_inspector::QueryKey::new("QuerySet", sentinel.debug_repr()),
1100        });
1101
1102        self.deps.borrow_mut().push(sentinel);
1103
1104        // Return all registered queries
1105        self.runtime.query_registry.get_all::<Q>()
1106    }
1107
1108    /// List all asset keys of type K that have been registered.
1109    ///
1110    /// This method establishes a dependency on the "set" of asset keys of type K.
1111    /// The calling query will be invalidated when:
1112    /// - A new asset of type K is resolved for the first time (added to set)
1113    /// - An asset of type K is removed via remove_asset
1114    ///
1115    /// The calling query will NOT be invalidated when:
1116    /// - An individual asset's value changes (use `ctx.asset()` for that)
1117    ///
1118    /// # Example
1119    ///
1120    /// ```ignore
1121    /// #[query]
1122    /// fn all_configs(ctx: &mut QueryContext) -> Result<Vec<String>, QueryError> {
1123    ///     let keys = ctx.list_asset_keys::<ConfigFile>();
1124    ///     let mut contents = Vec::new();
1125    ///     for key in keys {
1126    ///         let content = ctx.asset(&key)?.suspend()?;
1127    ///         contents.push((*content).clone());
1128    ///     }
1129    ///     Ok(contents)
1130    /// }
1131    /// ```
1132    pub fn list_asset_keys<K: AssetKey>(&mut self) -> Vec<K> {
1133        // Record dependency on the sentinel (set-level dependency)
1134        let sentinel = FullCacheKey::asset_key_set_sentinel::<K>();
1135
1136        #[cfg(feature = "inspector")]
1137        self.runtime.emit(|| FlowEvent::AssetDependencyRegistered {
1138            span_id: self.span_id,
1139            parent: query_flow_inspector::QueryKey::new(
1140                self.parent_query_type,
1141                self.current_key.debug_repr(),
1142            ),
1143            asset: query_flow_inspector::AssetKey::new("AssetKeySet", sentinel.debug_repr()),
1144        });
1145
1146        self.deps.borrow_mut().push(sentinel);
1147
1148        // Return all registered asset keys
1149        self.runtime.asset_key_registry.get_all::<K>()
1150    }
1151}
1152
1153#[cfg(test)]
1154mod tests {
1155    use super::*;
1156
1157    #[test]
1158    fn test_simple_query() {
1159        #[derive(Clone)]
1160        struct Add {
1161            a: i32,
1162            b: i32,
1163        }
1164
1165        impl Query for Add {
1166            type CacheKey = (i32, i32);
1167            type Output = i32;
1168
1169            fn cache_key(&self) -> Self::CacheKey {
1170                (self.a, self.b)
1171            }
1172
1173            fn query(&self, _ctx: &mut QueryContext) -> Result<Self::Output, QueryError> {
1174                Ok(self.a + self.b)
1175            }
1176
1177            fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
1178                old == new
1179            }
1180        }
1181
1182        let runtime = QueryRuntime::new();
1183
1184        let result = runtime.query(Add { a: 1, b: 2 }).unwrap();
1185        assert_eq!(*result, 3);
1186
1187        // Second query should be cached
1188        let result2 = runtime.query(Add { a: 1, b: 2 }).unwrap();
1189        assert_eq!(*result2, 3);
1190    }
1191
1192    #[test]
1193    fn test_dependent_queries() {
1194        #[derive(Clone)]
1195        struct Base {
1196            value: i32,
1197        }
1198
1199        impl Query for Base {
1200            type CacheKey = i32;
1201            type Output = i32;
1202
1203            fn cache_key(&self) -> Self::CacheKey {
1204                self.value
1205            }
1206
1207            fn query(&self, _ctx: &mut QueryContext) -> Result<Self::Output, QueryError> {
1208                Ok(self.value * 2)
1209            }
1210
1211            fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
1212                old == new
1213            }
1214        }
1215
1216        #[derive(Clone)]
1217        struct Derived {
1218            base_value: i32,
1219        }
1220
1221        impl Query for Derived {
1222            type CacheKey = i32;
1223            type Output = i32;
1224
1225            fn cache_key(&self) -> Self::CacheKey {
1226                self.base_value
1227            }
1228
1229            fn query(&self, ctx: &mut QueryContext) -> Result<Self::Output, QueryError> {
1230                let base = ctx.query(Base {
1231                    value: self.base_value,
1232                })?;
1233                Ok(*base + 10)
1234            }
1235
1236            fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
1237                old == new
1238            }
1239        }
1240
1241        let runtime = QueryRuntime::new();
1242
1243        let result = runtime.query(Derived { base_value: 5 }).unwrap();
1244        assert_eq!(*result, 20); // 5 * 2 + 10
1245    }
1246
1247    #[test]
1248    fn test_cycle_detection() {
1249        #[derive(Clone)]
1250        struct CycleA {
1251            id: i32,
1252        }
1253
1254        #[derive(Clone)]
1255        struct CycleB {
1256            id: i32,
1257        }
1258
1259        impl Query for CycleA {
1260            type CacheKey = i32;
1261            type Output = i32;
1262
1263            fn cache_key(&self) -> Self::CacheKey {
1264                self.id
1265            }
1266
1267            fn query(&self, ctx: &mut QueryContext) -> Result<Self::Output, QueryError> {
1268                let b = ctx.query(CycleB { id: self.id })?;
1269                Ok(*b + 1)
1270            }
1271
1272            fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
1273                old == new
1274            }
1275        }
1276
1277        impl Query for CycleB {
1278            type CacheKey = i32;
1279            type Output = i32;
1280
1281            fn cache_key(&self) -> Self::CacheKey {
1282                self.id
1283            }
1284
1285            fn query(&self, ctx: &mut QueryContext) -> Result<Self::Output, QueryError> {
1286                let a = ctx.query(CycleA { id: self.id })?;
1287                Ok(*a + 1)
1288            }
1289
1290            fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
1291                old == new
1292            }
1293        }
1294
1295        let runtime = QueryRuntime::new();
1296
1297        let result = runtime.query(CycleA { id: 1 });
1298        assert!(matches!(result, Err(QueryError::Cycle { .. })));
1299    }
1300
1301    #[test]
1302    fn test_fallible_query() {
1303        #[derive(Clone)]
1304        struct ParseInt {
1305            input: String,
1306        }
1307
1308        impl Query for ParseInt {
1309            type CacheKey = String;
1310            type Output = Result<i32, std::num::ParseIntError>;
1311
1312            fn cache_key(&self) -> Self::CacheKey {
1313                self.input.clone()
1314            }
1315
1316            fn query(&self, _ctx: &mut QueryContext) -> Result<Self::Output, QueryError> {
1317                Ok(self.input.parse())
1318            }
1319
1320            fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
1321                old == new
1322            }
1323        }
1324
1325        let runtime = QueryRuntime::new();
1326
1327        // Valid parse
1328        let result = runtime
1329            .query(ParseInt {
1330                input: "42".to_string(),
1331            })
1332            .unwrap();
1333        assert_eq!(*result, Ok(42));
1334
1335        // Invalid parse - system succeeds, user error in output
1336        let result = runtime
1337            .query(ParseInt {
1338                input: "not_a_number".to_string(),
1339            })
1340            .unwrap();
1341        assert!(result.is_err());
1342    }
1343
1344    // Macro tests
1345    mod macro_tests {
1346        use super::*;
1347        use crate::query;
1348
1349        #[query]
1350        fn add(ctx: &mut QueryContext, a: i32, b: i32) -> Result<i32, QueryError> {
1351            let _ = ctx; // silence unused warning
1352            Ok(a + b)
1353        }
1354
1355        #[test]
1356        fn test_macro_basic() {
1357            let runtime = QueryRuntime::new();
1358            let result = runtime.query(Add::new(1, 2)).unwrap();
1359            assert_eq!(*result, 3);
1360        }
1361
1362        #[query(durability = 2)]
1363        fn with_durability(ctx: &mut QueryContext, x: i32) -> Result<i32, QueryError> {
1364            let _ = ctx;
1365            Ok(x * 2)
1366        }
1367
1368        #[test]
1369        fn test_macro_durability() {
1370            let runtime = QueryRuntime::new();
1371            let result = runtime.query(WithDurability::new(5)).unwrap();
1372            assert_eq!(*result, 10);
1373        }
1374
1375        #[query(keys(id))]
1376        fn with_key_selection(
1377            ctx: &mut QueryContext,
1378            id: u32,
1379            include_extra: bool,
1380        ) -> Result<String, QueryError> {
1381            let _ = ctx;
1382            Ok(format!("id={}, extra={}", id, include_extra))
1383        }
1384
1385        #[test]
1386        fn test_macro_key_selection() {
1387            let runtime = QueryRuntime::new();
1388
1389            // Same id, different include_extra - should return cached
1390            let r1 = runtime.query(WithKeySelection::new(1, true)).unwrap();
1391            let r2 = runtime.query(WithKeySelection::new(1, false)).unwrap();
1392
1393            // Both should have same value because only `id` is the key
1394            assert_eq!(*r1, "id=1, extra=true");
1395            assert_eq!(*r2, "id=1, extra=true"); // Cached!
1396        }
1397
1398        #[query]
1399        fn dependent(ctx: &mut QueryContext, a: i32, b: i32) -> Result<i32, QueryError> {
1400            let sum = ctx.query(Add::new(*a, *b))?;
1401            Ok(*sum * 2)
1402        }
1403
1404        #[test]
1405        fn test_macro_dependencies() {
1406            let runtime = QueryRuntime::new();
1407            let result = runtime.query(Dependent::new(3, 4)).unwrap();
1408            assert_eq!(*result, 14); // (3 + 4) * 2
1409        }
1410
1411        #[query(output_eq)]
1412        fn with_output_eq(ctx: &mut QueryContext, x: i32) -> Result<i32, QueryError> {
1413            let _ = ctx;
1414            Ok(*x * 2)
1415        }
1416
1417        #[test]
1418        fn test_macro_output_eq() {
1419            let runtime = QueryRuntime::new();
1420            let result = runtime.query(WithOutputEq::new(5)).unwrap();
1421            assert_eq!(*result, 10);
1422        }
1423
1424        #[query(name = "CustomName")]
1425        fn original_name(ctx: &mut QueryContext, x: i32) -> Result<i32, QueryError> {
1426            let _ = ctx;
1427            Ok(*x)
1428        }
1429
1430        #[test]
1431        fn test_macro_custom_name() {
1432            let runtime = QueryRuntime::new();
1433            let result = runtime.query(CustomName::new(42)).unwrap();
1434            assert_eq!(*result, 42);
1435        }
1436
1437        // Test that attribute macros like #[tracing::instrument] are preserved
1438        // We use #[allow(unused_variables)] and #[inline] as test attributes since
1439        // they don't require external dependencies.
1440        #[allow(unused_variables)]
1441        #[inline]
1442        #[query]
1443        fn with_attributes(ctx: &mut QueryContext, x: i32) -> Result<i32, QueryError> {
1444            // This would warn without #[allow(unused_variables)] on the generated method
1445            let unused_var = 42;
1446            Ok(*x * 2)
1447        }
1448
1449        #[test]
1450        fn test_macro_preserves_attributes() {
1451            let runtime = QueryRuntime::new();
1452            // If attributes weren't preserved, this might warn about unused_var
1453            let result = runtime.query(WithAttributes::new(5)).unwrap();
1454            assert_eq!(*result, 10);
1455        }
1456    }
1457}