query_flow/
runtime.rs

1//! Query runtime and context.
2
3use std::any::TypeId;
4use std::cell::RefCell;
5use std::sync::Arc;
6
7use whale::{Durability, Runtime as WhaleRuntime};
8
9use crate::asset::{AssetKey, AssetLocator, DurabilityLevel, FullAssetKey, PendingAsset};
10use crate::key::FullCacheKey;
11use crate::loading::LoadingState;
12use crate::query::Query;
13use crate::storage::{
14    AssetKeyRegistry, AssetState, AssetStorage, CacheStorage, LocatorStorage, PendingStorage,
15    QueryRegistry,
16};
17use crate::QueryError;
18
19#[cfg(feature = "inspector")]
20use query_flow_inspector::{EventSink, FlowEvent, SpanId};
21
22/// Number of durability levels (matches whale's default).
23const DURABILITY_LEVELS: usize = 4;
24
25// Thread-local query execution stack for cycle detection.
26thread_local! {
27    static QUERY_STACK: RefCell<Vec<FullCacheKey>> = const { RefCell::new(Vec::new()) };
28}
29
30/// The query runtime manages query execution, caching, and dependency tracking.
31///
32/// This is cheap to clone - all data is behind `Arc`.
33///
34/// # Example
35///
36/// ```ignore
37/// let runtime = QueryRuntime::new();
38///
39/// // Sync query execution
40/// let result = runtime.query(MyQuery { ... })?;
41///
42/// // Async query execution (waits through Suspend)
43/// let result = runtime.query_async(MyQuery { ... }).await?;
44/// ```
45pub struct QueryRuntime {
46    /// Whale runtime for dependency tracking
47    whale: WhaleRuntime<FullCacheKey, (), DURABILITY_LEVELS>,
48    /// Cache for query outputs
49    cache: Arc<CacheStorage>,
50    /// Asset cache and state storage
51    assets: Arc<AssetStorage>,
52    /// Registered asset locators
53    locators: Arc<LocatorStorage>,
54    /// Pending asset requests
55    pending: Arc<PendingStorage>,
56    /// Registry for tracking query instances (for list_queries)
57    query_registry: Arc<QueryRegistry>,
58    /// Registry for tracking asset keys (for list_asset_keys)
59    asset_key_registry: Arc<AssetKeyRegistry>,
60    /// Event sink for inspector/tracing
61    #[cfg(feature = "inspector")]
62    sink: Arc<parking_lot::RwLock<Option<Arc<dyn EventSink>>>>,
63}
64
65impl Default for QueryRuntime {
66    fn default() -> Self {
67        Self::new()
68    }
69}
70
71impl Clone for QueryRuntime {
72    fn clone(&self) -> Self {
73        Self {
74            whale: self.whale.clone(),
75            cache: self.cache.clone(),
76            assets: self.assets.clone(),
77            locators: self.locators.clone(),
78            pending: self.pending.clone(),
79            query_registry: self.query_registry.clone(),
80            asset_key_registry: self.asset_key_registry.clone(),
81            #[cfg(feature = "inspector")]
82            sink: self.sink.clone(),
83        }
84    }
85}
86
87impl QueryRuntime {
88    /// Create a new query runtime.
89    pub fn new() -> Self {
90        Self {
91            whale: WhaleRuntime::new(),
92            cache: Arc::new(CacheStorage::new()),
93            assets: Arc::new(AssetStorage::new()),
94            locators: Arc::new(LocatorStorage::new()),
95            pending: Arc::new(PendingStorage::new()),
96            query_registry: Arc::new(QueryRegistry::new()),
97            asset_key_registry: Arc::new(AssetKeyRegistry::new()),
98            #[cfg(feature = "inspector")]
99            sink: Arc::new(parking_lot::RwLock::new(None)),
100        }
101    }
102
103    /// Set the event sink for tracing/inspection.
104    ///
105    /// Pass `Some(sink)` to enable event collection, or `None` to disable.
106    ///
107    /// # Example
108    ///
109    /// ```ignore
110    /// use query_flow_inspector::EventCollector;
111    /// use std::sync::Arc;
112    ///
113    /// let collector = Arc::new(EventCollector::new());
114    /// runtime.set_sink(Some(collector.clone()));
115    /// runtime.query(MyQuery::new());
116    /// let trace = collector.trace();
117    /// ```
118    #[cfg(feature = "inspector")]
119    pub fn set_sink(&self, sink: Option<Arc<dyn EventSink>>) {
120        *self.sink.write() = sink;
121    }
122
123    /// Get the current event sink.
124    #[cfg(feature = "inspector")]
125    pub fn sink(&self) -> Option<Arc<dyn EventSink>> {
126        self.sink.read().clone()
127    }
128
129    /// Emit an event to the sink if one is set.
130    #[cfg(feature = "inspector")]
131    #[inline]
132    fn emit<F: FnOnce() -> FlowEvent>(&self, event: F) {
133        let guard = self.sink.read();
134        if let Some(ref sink) = *guard {
135            sink.emit(event());
136        }
137    }
138
139    /// Execute a query synchronously.
140    ///
141    /// Returns the cached result if valid, otherwise executes the query.
142    ///
143    /// # Errors
144    ///
145    /// - `QueryError::Suspend` - Query is waiting for async loading
146    /// - `QueryError::Cycle` - Dependency cycle detected
147    pub fn query<Q: Query>(&self, query: Q) -> Result<Arc<Q::Output>, QueryError> {
148        let key = query.cache_key();
149        let full_key = FullCacheKey::new::<Q, _>(&key);
150
151        // Generate span ID and emit start event
152        #[cfg(feature = "inspector")]
153        let span_id = query_flow_inspector::new_span_id();
154        #[cfg(feature = "inspector")]
155        let start_time = std::time::Instant::now();
156        #[cfg(feature = "inspector")]
157        let query_key =
158            query_flow_inspector::QueryKey::new(std::any::type_name::<Q>(), full_key.debug_repr());
159
160        #[cfg(feature = "inspector")]
161        self.emit(|| FlowEvent::QueryStart {
162            span_id,
163            query: query_key.clone(),
164        });
165
166        // Check for cycles using thread-local stack
167        let cycle_detected = QUERY_STACK.with(|stack| {
168            let stack = stack.borrow();
169            stack.iter().any(|k| k == &full_key)
170        });
171
172        if cycle_detected {
173            let path = QUERY_STACK.with(|stack| {
174                let stack = stack.borrow();
175                let mut path: Vec<String> =
176                    stack.iter().map(|k| k.debug_repr().to_string()).collect();
177                path.push(full_key.debug_repr().to_string());
178                path
179            });
180
181            #[cfg(feature = "inspector")]
182            self.emit(|| FlowEvent::CycleDetected {
183                path: path
184                    .iter()
185                    .map(|s| query_flow_inspector::QueryKey::new("", s.clone()))
186                    .collect(),
187            });
188
189            #[cfg(feature = "inspector")]
190            self.emit(|| FlowEvent::QueryEnd {
191                span_id,
192                query: query_key.clone(),
193                result: query_flow_inspector::ExecutionResult::CycleDetected,
194                duration: start_time.elapsed(),
195            });
196
197            return Err(QueryError::Cycle { path });
198        }
199
200        // Check if cached and valid
201        if let Some(cached) = self.get_cached_if_valid::<Q>(&full_key) {
202            #[cfg(feature = "inspector")]
203            self.emit(|| FlowEvent::CacheCheck {
204                span_id,
205                query: query_key.clone(),
206                valid: true,
207            });
208
209            #[cfg(feature = "inspector")]
210            self.emit(|| FlowEvent::QueryEnd {
211                span_id,
212                query: query_key.clone(),
213                result: query_flow_inspector::ExecutionResult::CacheHit,
214                duration: start_time.elapsed(),
215            });
216
217            return Ok(cached);
218        }
219
220        #[cfg(feature = "inspector")]
221        self.emit(|| FlowEvent::CacheCheck {
222            span_id,
223            query: query_key.clone(),
224            valid: false,
225        });
226
227        // Execute the query with cycle tracking
228        QUERY_STACK.with(|stack| {
229            stack.borrow_mut().push(full_key.clone());
230        });
231
232        #[cfg(feature = "inspector")]
233        let result = self.execute_query::<Q>(&query, &full_key, span_id);
234        #[cfg(not(feature = "inspector"))]
235        let result = self.execute_query::<Q>(&query, &full_key);
236
237        QUERY_STACK.with(|stack| {
238            stack.borrow_mut().pop();
239        });
240
241        // Emit end event
242        #[cfg(feature = "inspector")]
243        {
244            let exec_result = match &result {
245                Ok((_, true)) => query_flow_inspector::ExecutionResult::Changed,
246                Ok((_, false)) => query_flow_inspector::ExecutionResult::Unchanged,
247                Err(QueryError::Suspend) => query_flow_inspector::ExecutionResult::Suspended,
248                Err(QueryError::Cycle { .. }) => {
249                    query_flow_inspector::ExecutionResult::CycleDetected
250                }
251                Err(e) => query_flow_inspector::ExecutionResult::Error {
252                    message: format!("{:?}", e),
253                },
254            };
255            self.emit(|| FlowEvent::QueryEnd {
256                span_id,
257                query: query_key.clone(),
258                result: exec_result,
259                duration: start_time.elapsed(),
260            });
261        }
262
263        result.map(|(output, _)| output)
264    }
265
266    /// Execute a query, caching the result if appropriate.
267    ///
268    /// Returns (output, output_changed) tuple for instrumentation.
269    #[cfg(feature = "inspector")]
270    fn execute_query<Q: Query>(
271        &self,
272        query: &Q,
273        full_key: &FullCacheKey,
274        span_id: SpanId,
275    ) -> Result<(Arc<Q::Output>, bool), QueryError> {
276        // Create context for this query execution
277        let mut ctx = QueryContext {
278            runtime: self,
279            current_key: full_key.clone(),
280            parent_query_type: std::any::type_name::<Q>(),
281            span_id,
282            deps: RefCell::new(Vec::new()),
283        };
284
285        // Execute the query
286        let output = query.query(&mut ctx)?;
287        let output = Arc::new(output);
288
289        // Get collected dependencies
290        let deps: Vec<FullCacheKey> = ctx.deps.borrow().clone();
291
292        // Check if output changed (for early cutoff)
293        let output_changed = if let Some(old) = self.cache.get::<Q>(full_key) {
294            !Q::output_eq(&old, &output)
295        } else {
296            true // No previous value, so "changed"
297        };
298
299        // Emit early cutoff check event
300        self.emit(|| FlowEvent::EarlyCutoffCheck {
301            span_id,
302            query: query_flow_inspector::QueryKey::new(
303                std::any::type_name::<Q>(),
304                full_key.debug_repr(),
305            ),
306            output_changed,
307        });
308
309        // Update cache
310        self.cache.insert::<Q>(full_key.clone(), output.clone());
311
312        // Update whale dependency tracking
313        let durability =
314            Durability::new(query.durability() as usize).unwrap_or(Durability::volatile());
315
316        if output_changed {
317            // Register with new changed_at
318            let _ = self.whale.register(full_key.clone(), (), durability, deps);
319        } else {
320            // Early cutoff: keep old changed_at
321            let _ = self.whale.confirm_unchanged(full_key, deps);
322        }
323
324        // Register query in registry for list_queries
325        let is_new_query = self.query_registry.register(query);
326        if is_new_query {
327            // Update sentinel to invalidate list_queries dependents
328            let sentinel = FullCacheKey::query_set_sentinel::<Q>();
329            let _ = self
330                .whale
331                .register(sentinel, (), Durability::volatile(), vec![]);
332        }
333
334        Ok((output, output_changed))
335    }
336
337    /// Execute a query, caching the result if appropriate.
338    ///
339    /// Returns (output, output_changed) tuple for instrumentation.
340    #[cfg(not(feature = "inspector"))]
341    fn execute_query<Q: Query>(
342        &self,
343        query: &Q,
344        full_key: &FullCacheKey,
345    ) -> Result<(Arc<Q::Output>, bool), QueryError> {
346        // Create context for this query execution
347        let mut ctx = QueryContext {
348            runtime: self,
349            current_key: full_key.clone(),
350            deps: RefCell::new(Vec::new()),
351        };
352
353        // Execute the query
354        let output = query.query(&mut ctx)?;
355        let output = Arc::new(output);
356
357        // Get collected dependencies
358        let deps: Vec<FullCacheKey> = ctx.deps.borrow().clone();
359
360        // Check if output changed (for early cutoff)
361        let output_changed = if let Some(old) = self.cache.get::<Q>(full_key) {
362            !Q::output_eq(&old, &output)
363        } else {
364            true // No previous value, so "changed"
365        };
366
367        // Update cache
368        self.cache.insert::<Q>(full_key.clone(), output.clone());
369
370        // Update whale dependency tracking
371        let durability =
372            Durability::new(query.durability() as usize).unwrap_or(Durability::volatile());
373
374        if output_changed {
375            // Register with new changed_at
376            let _ = self.whale.register(full_key.clone(), (), durability, deps);
377        } else {
378            // Early cutoff: keep old changed_at
379            let _ = self.whale.confirm_unchanged(full_key, deps);
380        }
381
382        // Register query in registry for list_queries
383        let is_new_query = self.query_registry.register(query);
384        if is_new_query {
385            // Update sentinel to invalidate list_queries dependents
386            let sentinel = FullCacheKey::query_set_sentinel::<Q>();
387            let _ = self
388                .whale
389                .register(sentinel, (), Durability::volatile(), vec![]);
390        }
391
392        Ok((output, output_changed))
393    }
394
395    /// Get cached value if it's still valid.
396    fn get_cached_if_valid<Q: Query>(&self, full_key: &FullCacheKey) -> Option<Arc<Q::Output>> {
397        // Check whale validity first
398        if !self.whale.is_valid(full_key) {
399            return None;
400        }
401
402        // Then check if we have the cached value
403        self.cache.get::<Q>(full_key)
404    }
405
406    /// Invalidate a query, forcing recomputation on next access.
407    pub fn invalidate<Q: Query>(&self, key: &Q::CacheKey) {
408        let full_key = FullCacheKey::new::<Q, _>(key);
409
410        #[cfg(feature = "inspector")]
411        self.emit(|| FlowEvent::QueryInvalidated {
412            query: query_flow_inspector::QueryKey::new(
413                std::any::type_name::<Q>(),
414                full_key.debug_repr(),
415            ),
416            reason: query_flow_inspector::InvalidationReason::ManualInvalidation,
417        });
418
419        self.cache.remove(&full_key);
420        // Whale will detect invalidity via is_valid check
421    }
422
423    /// Clear all cached values.
424    pub fn clear_cache(&self) {
425        self.cache.clear();
426    }
427}
428
429// ============================================================================
430// Asset API
431// ============================================================================
432
433impl QueryRuntime {
434    /// Register an asset locator for a specific asset key type.
435    ///
436    /// Only one locator can be registered per key type. Later registrations
437    /// replace earlier ones.
438    ///
439    /// # Example
440    ///
441    /// ```ignore
442    /// let runtime = QueryRuntime::new();
443    /// runtime.register_asset_locator(FileSystemLocator::new("/assets"));
444    /// ```
445    pub fn register_asset_locator<K, L>(&self, locator: L)
446    where
447        K: AssetKey,
448        L: AssetLocator<K>,
449    {
450        self.locators.insert::<K, L>(locator);
451    }
452
453    /// Get an iterator over pending asset requests.
454    ///
455    /// Returns assets that have been requested but not yet resolved.
456    /// The user should fetch these externally and call `resolve_asset()`.
457    ///
458    /// # Example
459    ///
460    /// ```ignore
461    /// for pending in runtime.pending_assets() {
462    ///     if let Some(path) = pending.key::<FilePath>() {
463    ///         let content = fetch_file(path);
464    ///         runtime.resolve_asset(path.clone(), content);
465    ///     }
466    /// }
467    /// ```
468    pub fn pending_assets(&self) -> Vec<PendingAsset> {
469        self.pending.get_all()
470    }
471
472    /// Get pending assets filtered by key type.
473    pub fn pending_assets_of<K: AssetKey>(&self) -> Vec<K> {
474        self.pending.get_of_type::<K>()
475    }
476
477    /// Check if there are any pending assets.
478    pub fn has_pending_assets(&self) -> bool {
479        !self.pending.is_empty()
480    }
481
482    /// Resolve an asset with its loaded value.
483    ///
484    /// This marks the asset as ready and invalidates any queries that
485    /// depend on it (if the value changed), triggering recomputation on next access.
486    ///
487    /// This method is idempotent - resolving with the same value (via `asset_eq`)
488    /// will not trigger downstream recomputation.
489    ///
490    /// Uses the asset key's default durability.
491    ///
492    /// # Example
493    ///
494    /// ```ignore
495    /// let content = std::fs::read_to_string(&path)?;
496    /// runtime.resolve_asset(FilePath(path), content);
497    /// ```
498    pub fn resolve_asset<K: AssetKey>(&self, key: K, value: K::Asset) {
499        let durability = key.durability();
500        self.resolve_asset_internal(key, value, durability);
501    }
502
503    /// Resolve an asset with a specific durability level.
504    ///
505    /// Use this to override the asset key's default durability.
506    pub fn resolve_asset_with_durability<K: AssetKey>(
507        &self,
508        key: K,
509        value: K::Asset,
510        durability: DurabilityLevel,
511    ) {
512        self.resolve_asset_internal(key, value, durability);
513    }
514
515    fn resolve_asset_internal<K: AssetKey>(
516        &self,
517        key: K,
518        value: K::Asset,
519        durability_level: DurabilityLevel,
520    ) {
521        let full_asset_key = FullAssetKey::new(&key);
522        let full_cache_key = FullCacheKey::from_asset_key(&full_asset_key);
523
524        // Check for early cutoff
525        let changed = if let Some(old_value) = self.assets.get_ready::<K>(&full_asset_key) {
526            !K::asset_eq(&old_value, &value)
527        } else {
528            true // Loading or NotFound -> Ready is a change
529        };
530
531        // Emit asset resolved event
532        #[cfg(feature = "inspector")]
533        self.emit(|| FlowEvent::AssetResolved {
534            asset: query_flow_inspector::AssetKey::new(
535                std::any::type_name::<K>(),
536                format!("{:?}", key),
537            ),
538            changed,
539        });
540
541        // Store the new value
542        self.assets
543            .insert_ready::<K>(full_asset_key.clone(), Arc::new(value));
544
545        // Remove from pending
546        self.pending.remove(&full_asset_key);
547
548        // Update whale dependency tracking
549        let durability =
550            Durability::new(durability_level.as_u8() as usize).unwrap_or(Durability::volatile());
551
552        if changed {
553            // Register with new changed_at to invalidate dependents
554            let _ = self.whale.register(full_cache_key, (), durability, vec![]);
555        } else {
556            // Early cutoff - keep old changed_at
557            let _ = self.whale.confirm_unchanged(&full_cache_key, vec![]);
558        }
559
560        // Register asset key in registry for list_asset_keys
561        let is_new_asset = self.asset_key_registry.register(&key);
562        if is_new_asset {
563            // Update sentinel to invalidate list_asset_keys dependents
564            let sentinel = FullCacheKey::asset_key_set_sentinel::<K>();
565            let _ = self
566                .whale
567                .register(sentinel, (), Durability::volatile(), vec![]);
568        }
569    }
570
571    /// Invalidate an asset, forcing queries to re-request it.
572    ///
573    /// The asset will be marked as loading and added to pending assets.
574    /// Dependent queries will suspend until the asset is resolved again.
575    ///
576    /// # Example
577    ///
578    /// ```ignore
579    /// // File was modified externally
580    /// runtime.invalidate_asset(&FilePath("config.json".into()));
581    /// // Queries depending on this asset will now suspend
582    /// // User should fetch the new value and call resolve_asset
583    /// ```
584    pub fn invalidate_asset<K: AssetKey>(&self, key: &K) {
585        let full_asset_key = FullAssetKey::new(key);
586        let full_cache_key = FullCacheKey::from_asset_key(&full_asset_key);
587
588        // Emit asset invalidated event
589        #[cfg(feature = "inspector")]
590        self.emit(|| FlowEvent::AssetInvalidated {
591            asset: query_flow_inspector::AssetKey::new(
592                std::any::type_name::<K>(),
593                format!("{:?}", key),
594            ),
595        });
596
597        // Mark as loading
598        self.assets
599            .insert(full_asset_key.clone(), AssetState::Loading);
600
601        // Add to pending
602        self.pending.insert::<K>(full_asset_key, key.clone());
603
604        // Update whale to invalidate dependents (use volatile during loading)
605        let _ = self
606            .whale
607            .register(full_cache_key, (), Durability::volatile(), vec![]);
608    }
609
610    /// Remove an asset from the cache entirely.
611    ///
612    /// Unlike `invalidate_asset`, this removes all traces of the asset.
613    /// Dependent queries will go through the locator again on next access.
614    pub fn remove_asset<K: AssetKey>(&self, key: &K) {
615        let full_asset_key = FullAssetKey::new(key);
616        let full_cache_key = FullCacheKey::from_asset_key(&full_asset_key);
617
618        // First, register a change to invalidate dependents
619        // This ensures queries that depend on this asset will recompute
620        let _ = self
621            .whale
622            .register(full_cache_key.clone(), (), Durability::volatile(), vec![]);
623
624        // Then remove the asset from storage
625        self.assets.remove(&full_asset_key);
626        self.pending.remove(&full_asset_key);
627
628        // Finally remove from whale tracking
629        self.whale.remove(&full_cache_key);
630
631        // Remove from registry and update sentinel for list_asset_keys
632        if self.asset_key_registry.remove::<K>(key) {
633            let sentinel = FullCacheKey::asset_key_set_sentinel::<K>();
634            let _ = self
635                .whale
636                .register(sentinel, (), Durability::volatile(), vec![]);
637        }
638    }
639
640    /// Internal: Get asset state, checking cache and locator.
641    fn get_asset_internal<K: AssetKey>(
642        &self,
643        key: &K,
644    ) -> Result<LoadingState<Arc<K::Asset>>, QueryError> {
645        let full_asset_key = FullAssetKey::new(key);
646        let full_cache_key = FullCacheKey::from_asset_key(&full_asset_key);
647
648        // Helper to emit AssetRequested event
649        #[cfg(feature = "inspector")]
650        let emit_requested = |runtime: &Self, state: query_flow_inspector::AssetState| {
651            runtime.emit(|| FlowEvent::AssetRequested {
652                asset: query_flow_inspector::AssetKey::new(
653                    std::any::type_name::<K>(),
654                    format!("{:?}", key),
655                ),
656                state,
657            });
658        };
659
660        // Check cache first
661        if let Some(state) = self.assets.get(&full_asset_key) {
662            // Check if whale thinks it's valid
663            if self.whale.is_valid(&full_cache_key) {
664                return match state {
665                    AssetState::Loading => {
666                        #[cfg(feature = "inspector")]
667                        emit_requested(self, query_flow_inspector::AssetState::Loading);
668                        Ok(LoadingState::Loading)
669                    }
670                    AssetState::Ready(arc) => {
671                        #[cfg(feature = "inspector")]
672                        emit_requested(self, query_flow_inspector::AssetState::Ready);
673                        match arc.downcast::<K::Asset>() {
674                            Ok(value) => Ok(LoadingState::Ready(value)),
675                            Err(_) => Err(QueryError::MissingDependency {
676                                description: format!("Asset type mismatch: {:?}", key),
677                            }),
678                        }
679                    }
680                    AssetState::NotFound => {
681                        #[cfg(feature = "inspector")]
682                        emit_requested(self, query_flow_inspector::AssetState::NotFound);
683                        Err(QueryError::MissingDependency {
684                            description: format!("Asset not found: {:?}", key),
685                        })
686                    }
687                };
688            }
689        }
690
691        // Check if there's a registered locator
692        if let Some(locator) = self.locators.get(TypeId::of::<K>()) {
693            if let Some(state) = locator.locate_any(key) {
694                // Store the state
695                self.assets.insert(full_asset_key.clone(), state.clone());
696
697                match state {
698                    AssetState::Ready(arc) => {
699                        #[cfg(feature = "inspector")]
700                        emit_requested(self, query_flow_inspector::AssetState::Ready);
701
702                        // Register with whale
703                        let durability = Durability::new(key.durability().as_u8() as usize)
704                            .unwrap_or(Durability::volatile());
705                        let _ = self.whale.register(full_cache_key, (), durability, vec![]);
706
707                        match arc.downcast::<K::Asset>() {
708                            Ok(value) => return Ok(LoadingState::Ready(value)),
709                            Err(_) => {
710                                return Err(QueryError::MissingDependency {
711                                    description: format!("Asset type mismatch: {:?}", key),
712                                })
713                            }
714                        }
715                    }
716                    AssetState::Loading => {
717                        #[cfg(feature = "inspector")]
718                        emit_requested(self, query_flow_inspector::AssetState::Loading);
719                        self.pending.insert::<K>(full_asset_key, key.clone());
720                        return Ok(LoadingState::Loading);
721                    }
722                    AssetState::NotFound => {
723                        #[cfg(feature = "inspector")]
724                        emit_requested(self, query_flow_inspector::AssetState::NotFound);
725                        return Err(QueryError::MissingDependency {
726                            description: format!("Asset not found: {:?}", key),
727                        });
728                    }
729                }
730            }
731        }
732
733        // No locator registered or locator returned None - mark as pending
734        #[cfg(feature = "inspector")]
735        emit_requested(self, query_flow_inspector::AssetState::Loading);
736        self.assets
737            .insert(full_asset_key.clone(), AssetState::Loading);
738        self.pending.insert::<K>(full_asset_key, key.clone());
739        Ok(LoadingState::Loading)
740    }
741}
742
743/// Context provided to queries during execution.
744///
745/// Use this to access dependencies via `query()`.
746#[cfg(feature = "inspector")]
747pub struct QueryContext<'a> {
748    runtime: &'a QueryRuntime,
749    current_key: FullCacheKey,
750    parent_query_type: &'static str,
751    span_id: SpanId,
752    deps: RefCell<Vec<FullCacheKey>>,
753}
754
755/// Context provided to queries during execution.
756///
757/// Use this to access dependencies via `query()`.
758#[cfg(not(feature = "inspector"))]
759pub struct QueryContext<'a> {
760    runtime: &'a QueryRuntime,
761    #[allow(dead_code)]
762    current_key: FullCacheKey,
763    deps: RefCell<Vec<FullCacheKey>>,
764}
765
766impl<'a> QueryContext<'a> {
767    /// Query a dependency.
768    ///
769    /// The dependency is automatically tracked for invalidation.
770    ///
771    /// # Example
772    ///
773    /// ```ignore
774    /// fn query(&self, ctx: &mut QueryContext) -> Result<Self::Output, QueryError> {
775    ///     let dep_result = ctx.query(OtherQuery { id: self.id })?;
776    ///     Ok(process(&dep_result))
777    /// }
778    /// ```
779    pub fn query<Q: Query>(&mut self, query: Q) -> Result<Arc<Q::Output>, QueryError> {
780        let key = query.cache_key();
781        let full_key = FullCacheKey::new::<Q, _>(&key);
782
783        // Emit dependency registered event
784        #[cfg(feature = "inspector")]
785        self.runtime.emit(|| FlowEvent::DependencyRegistered {
786            span_id: self.span_id,
787            parent: query_flow_inspector::QueryKey::new(
788                self.parent_query_type,
789                self.current_key.debug_repr(),
790            ),
791            dependency: query_flow_inspector::QueryKey::new(
792                std::any::type_name::<Q>(),
793                full_key.debug_repr(),
794            ),
795        });
796
797        // Record this as a dependency
798        self.deps.borrow_mut().push(full_key.clone());
799
800        // Execute the query
801        self.runtime.query(query)
802    }
803
804    /// Access an asset, tracking it as a dependency.
805    ///
806    /// Returns `LoadingState<Arc<K::Asset>>`:
807    /// - `LoadingState::Loading` if the asset is still being loaded
808    /// - `LoadingState::Ready(value)` if the asset is available
809    ///
810    /// # Example
811    ///
812    /// ```ignore
813    /// #[query]
814    /// fn process_file(ctx: &mut QueryContext, path: FilePath) -> Result<Output, QueryError> {
815    ///     let content = ctx.asset(&path)?.suspend()?;
816    ///     // Process content...
817    ///     Ok(output)
818    /// }
819    /// ```
820    ///
821    /// # Errors
822    ///
823    /// Returns `Err(QueryError::MissingDependency)` if the asset was not found.
824    pub fn asset<K: AssetKey>(
825        &mut self,
826        key: &K,
827    ) -> Result<LoadingState<Arc<K::Asset>>, QueryError> {
828        let full_asset_key = FullAssetKey::new(key);
829        let full_cache_key = FullCacheKey::from_asset_key(&full_asset_key);
830
831        // Emit asset dependency registered event
832        #[cfg(feature = "inspector")]
833        self.runtime.emit(|| FlowEvent::AssetDependencyRegistered {
834            span_id: self.span_id,
835            parent: query_flow_inspector::QueryKey::new(
836                self.parent_query_type,
837                self.current_key.debug_repr(),
838            ),
839            asset: query_flow_inspector::AssetKey::new(
840                std::any::type_name::<K>(),
841                format!("{:?}", key),
842            ),
843        });
844
845        // Record dependency on this asset
846        self.deps.borrow_mut().push(full_cache_key);
847
848        // Get asset from runtime
849        let result = self.runtime.get_asset_internal(key);
850
851        // Emit missing dependency event on error
852        #[cfg(feature = "inspector")]
853        if let Err(QueryError::MissingDependency { ref description }) = result {
854            self.runtime.emit(|| FlowEvent::MissingDependency {
855                query: query_flow_inspector::QueryKey::new(
856                    self.parent_query_type,
857                    self.current_key.debug_repr(),
858                ),
859                dependency_description: description.clone(),
860            });
861        }
862
863        result
864    }
865
866    /// List all query instances of type Q that have been registered.
867    ///
868    /// This method establishes a dependency on the "set" of queries of type Q.
869    /// The calling query will be invalidated when:
870    /// - A new query of type Q is first executed (added to set)
871    ///
872    /// The calling query will NOT be invalidated when:
873    /// - An individual query of type Q has its value change
874    ///
875    /// # Example
876    ///
877    /// ```ignore
878    /// #[query]
879    /// fn all_results(ctx: &mut QueryContext) -> Result<Vec<i32>, QueryError> {
880    ///     let queries = ctx.list_queries::<MyQuery>();
881    ///     let mut results = Vec::new();
882    ///     for q in queries {
883    ///         results.push(*ctx.query(q)?);
884    ///     }
885    ///     Ok(results)
886    /// }
887    /// ```
888    pub fn list_queries<Q: Query>(&mut self) -> Vec<Q> {
889        // Record dependency on the sentinel (set-level dependency)
890        let sentinel = FullCacheKey::query_set_sentinel::<Q>();
891
892        #[cfg(feature = "inspector")]
893        self.runtime.emit(|| FlowEvent::DependencyRegistered {
894            span_id: self.span_id,
895            parent: query_flow_inspector::QueryKey::new(
896                self.parent_query_type,
897                self.current_key.debug_repr(),
898            ),
899            dependency: query_flow_inspector::QueryKey::new("QuerySet", sentinel.debug_repr()),
900        });
901
902        self.deps.borrow_mut().push(sentinel);
903
904        // Return all registered queries
905        self.runtime.query_registry.get_all::<Q>()
906    }
907
908    /// List all asset keys of type K that have been registered.
909    ///
910    /// This method establishes a dependency on the "set" of asset keys of type K.
911    /// The calling query will be invalidated when:
912    /// - A new asset of type K is resolved for the first time (added to set)
913    /// - An asset of type K is removed via remove_asset
914    ///
915    /// The calling query will NOT be invalidated when:
916    /// - An individual asset's value changes (use `ctx.asset()` for that)
917    ///
918    /// # Example
919    ///
920    /// ```ignore
921    /// #[query]
922    /// fn all_configs(ctx: &mut QueryContext) -> Result<Vec<String>, QueryError> {
923    ///     let keys = ctx.list_asset_keys::<ConfigFile>();
924    ///     let mut contents = Vec::new();
925    ///     for key in keys {
926    ///         let content = ctx.asset(&key)?.suspend()?;
927    ///         contents.push((*content).clone());
928    ///     }
929    ///     Ok(contents)
930    /// }
931    /// ```
932    pub fn list_asset_keys<K: AssetKey>(&mut self) -> Vec<K> {
933        // Record dependency on the sentinel (set-level dependency)
934        let sentinel = FullCacheKey::asset_key_set_sentinel::<K>();
935
936        #[cfg(feature = "inspector")]
937        self.runtime.emit(|| FlowEvent::AssetDependencyRegistered {
938            span_id: self.span_id,
939            parent: query_flow_inspector::QueryKey::new(
940                self.parent_query_type,
941                self.current_key.debug_repr(),
942            ),
943            asset: query_flow_inspector::AssetKey::new("AssetKeySet", sentinel.debug_repr()),
944        });
945
946        self.deps.borrow_mut().push(sentinel);
947
948        // Return all registered asset keys
949        self.runtime.asset_key_registry.get_all::<K>()
950    }
951}
952
953#[cfg(test)]
954mod tests {
955    use super::*;
956
957    #[test]
958    fn test_simple_query() {
959        #[derive(Clone)]
960        struct Add {
961            a: i32,
962            b: i32,
963        }
964
965        impl Query for Add {
966            type CacheKey = (i32, i32);
967            type Output = i32;
968
969            fn cache_key(&self) -> Self::CacheKey {
970                (self.a, self.b)
971            }
972
973            fn query(&self, _ctx: &mut QueryContext) -> Result<Self::Output, QueryError> {
974                Ok(self.a + self.b)
975            }
976
977            fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
978                old == new
979            }
980        }
981
982        let runtime = QueryRuntime::new();
983
984        let result = runtime.query(Add { a: 1, b: 2 }).unwrap();
985        assert_eq!(*result, 3);
986
987        // Second query should be cached
988        let result2 = runtime.query(Add { a: 1, b: 2 }).unwrap();
989        assert_eq!(*result2, 3);
990    }
991
992    #[test]
993    fn test_dependent_queries() {
994        #[derive(Clone)]
995        struct Base {
996            value: i32,
997        }
998
999        impl Query for Base {
1000            type CacheKey = i32;
1001            type Output = i32;
1002
1003            fn cache_key(&self) -> Self::CacheKey {
1004                self.value
1005            }
1006
1007            fn query(&self, _ctx: &mut QueryContext) -> Result<Self::Output, QueryError> {
1008                Ok(self.value * 2)
1009            }
1010
1011            fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
1012                old == new
1013            }
1014        }
1015
1016        #[derive(Clone)]
1017        struct Derived {
1018            base_value: i32,
1019        }
1020
1021        impl Query for Derived {
1022            type CacheKey = i32;
1023            type Output = i32;
1024
1025            fn cache_key(&self) -> Self::CacheKey {
1026                self.base_value
1027            }
1028
1029            fn query(&self, ctx: &mut QueryContext) -> Result<Self::Output, QueryError> {
1030                let base = ctx.query(Base {
1031                    value: self.base_value,
1032                })?;
1033                Ok(*base + 10)
1034            }
1035
1036            fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
1037                old == new
1038            }
1039        }
1040
1041        let runtime = QueryRuntime::new();
1042
1043        let result = runtime.query(Derived { base_value: 5 }).unwrap();
1044        assert_eq!(*result, 20); // 5 * 2 + 10
1045    }
1046
1047    #[test]
1048    fn test_cycle_detection() {
1049        #[derive(Clone)]
1050        struct CycleA {
1051            id: i32,
1052        }
1053
1054        #[derive(Clone)]
1055        struct CycleB {
1056            id: i32,
1057        }
1058
1059        impl Query for CycleA {
1060            type CacheKey = i32;
1061            type Output = i32;
1062
1063            fn cache_key(&self) -> Self::CacheKey {
1064                self.id
1065            }
1066
1067            fn query(&self, ctx: &mut QueryContext) -> Result<Self::Output, QueryError> {
1068                let b = ctx.query(CycleB { id: self.id })?;
1069                Ok(*b + 1)
1070            }
1071
1072            fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
1073                old == new
1074            }
1075        }
1076
1077        impl Query for CycleB {
1078            type CacheKey = i32;
1079            type Output = i32;
1080
1081            fn cache_key(&self) -> Self::CacheKey {
1082                self.id
1083            }
1084
1085            fn query(&self, ctx: &mut QueryContext) -> Result<Self::Output, QueryError> {
1086                let a = ctx.query(CycleA { id: self.id })?;
1087                Ok(*a + 1)
1088            }
1089
1090            fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
1091                old == new
1092            }
1093        }
1094
1095        let runtime = QueryRuntime::new();
1096
1097        let result = runtime.query(CycleA { id: 1 });
1098        assert!(matches!(result, Err(QueryError::Cycle { .. })));
1099    }
1100
1101    #[test]
1102    fn test_fallible_query() {
1103        #[derive(Clone)]
1104        struct ParseInt {
1105            input: String,
1106        }
1107
1108        impl Query for ParseInt {
1109            type CacheKey = String;
1110            type Output = Result<i32, std::num::ParseIntError>;
1111
1112            fn cache_key(&self) -> Self::CacheKey {
1113                self.input.clone()
1114            }
1115
1116            fn query(&self, _ctx: &mut QueryContext) -> Result<Self::Output, QueryError> {
1117                Ok(self.input.parse())
1118            }
1119
1120            fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
1121                old == new
1122            }
1123        }
1124
1125        let runtime = QueryRuntime::new();
1126
1127        // Valid parse
1128        let result = runtime
1129            .query(ParseInt {
1130                input: "42".to_string(),
1131            })
1132            .unwrap();
1133        assert_eq!(*result, Ok(42));
1134
1135        // Invalid parse - system succeeds, user error in output
1136        let result = runtime
1137            .query(ParseInt {
1138                input: "not_a_number".to_string(),
1139            })
1140            .unwrap();
1141        assert!(result.is_err());
1142    }
1143
1144    // Macro tests
1145    mod macro_tests {
1146        use super::*;
1147        use crate::query;
1148
1149        #[query]
1150        fn add(ctx: &mut QueryContext, a: i32, b: i32) -> Result<i32, QueryError> {
1151            let _ = ctx; // silence unused warning
1152            Ok(a + b)
1153        }
1154
1155        #[test]
1156        fn test_macro_basic() {
1157            let runtime = QueryRuntime::new();
1158            let result = runtime.query(Add::new(1, 2)).unwrap();
1159            assert_eq!(*result, 3);
1160        }
1161
1162        #[query(durability = 2)]
1163        fn with_durability(ctx: &mut QueryContext, x: i32) -> Result<i32, QueryError> {
1164            let _ = ctx;
1165            Ok(x * 2)
1166        }
1167
1168        #[test]
1169        fn test_macro_durability() {
1170            let runtime = QueryRuntime::new();
1171            let result = runtime.query(WithDurability::new(5)).unwrap();
1172            assert_eq!(*result, 10);
1173        }
1174
1175        #[query(keys(id))]
1176        fn with_key_selection(
1177            ctx: &mut QueryContext,
1178            id: u32,
1179            include_extra: bool,
1180        ) -> Result<String, QueryError> {
1181            let _ = ctx;
1182            Ok(format!("id={}, extra={}", id, include_extra))
1183        }
1184
1185        #[test]
1186        fn test_macro_key_selection() {
1187            let runtime = QueryRuntime::new();
1188
1189            // Same id, different include_extra - should return cached
1190            let r1 = runtime.query(WithKeySelection::new(1, true)).unwrap();
1191            let r2 = runtime.query(WithKeySelection::new(1, false)).unwrap();
1192
1193            // Both should have same value because only `id` is the key
1194            assert_eq!(*r1, "id=1, extra=true");
1195            assert_eq!(*r2, "id=1, extra=true"); // Cached!
1196        }
1197
1198        #[query]
1199        fn dependent(ctx: &mut QueryContext, a: i32, b: i32) -> Result<i32, QueryError> {
1200            let sum = ctx.query(Add::new(*a, *b))?;
1201            Ok(*sum * 2)
1202        }
1203
1204        #[test]
1205        fn test_macro_dependencies() {
1206            let runtime = QueryRuntime::new();
1207            let result = runtime.query(Dependent::new(3, 4)).unwrap();
1208            assert_eq!(*result, 14); // (3 + 4) * 2
1209        }
1210
1211        #[query(output_eq)]
1212        fn with_output_eq(ctx: &mut QueryContext, x: i32) -> Result<i32, QueryError> {
1213            let _ = ctx;
1214            Ok(*x * 2)
1215        }
1216
1217        #[test]
1218        fn test_macro_output_eq() {
1219            let runtime = QueryRuntime::new();
1220            let result = runtime.query(WithOutputEq::new(5)).unwrap();
1221            assert_eq!(*result, 10);
1222        }
1223
1224        #[query(name = "CustomName")]
1225        fn original_name(ctx: &mut QueryContext, x: i32) -> Result<i32, QueryError> {
1226            let _ = ctx;
1227            Ok(*x)
1228        }
1229
1230        #[test]
1231        fn test_macro_custom_name() {
1232            let runtime = QueryRuntime::new();
1233            let result = runtime.query(CustomName::new(42)).unwrap();
1234            assert_eq!(*result, 42);
1235        }
1236
1237        // Test that attribute macros like #[tracing::instrument] are preserved
1238        // We use #[allow(unused_variables)] and #[inline] as test attributes since
1239        // they don't require external dependencies.
1240        #[allow(unused_variables)]
1241        #[inline]
1242        #[query]
1243        fn with_attributes(ctx: &mut QueryContext, x: i32) -> Result<i32, QueryError> {
1244            // This would warn without #[allow(unused_variables)] on the generated method
1245            let unused_var = 42;
1246            Ok(*x * 2)
1247        }
1248
1249        #[test]
1250        fn test_macro_preserves_attributes() {
1251            let runtime = QueryRuntime::new();
1252            // If attributes weren't preserved, this might warn about unused_var
1253            let result = runtime.query(WithAttributes::new(5)).unwrap();
1254            assert_eq!(*result, 10);
1255        }
1256    }
1257}