query_flow/
runtime.rs

1//! Query runtime and context.
2
3use std::any::TypeId;
4use std::cell::RefCell;
5use std::ops::Deref;
6use std::sync::Arc;
7
8use whale::{Durability, RevisionCounter, Runtime as WhaleRuntime};
9
10use crate::asset::{AssetKey, AssetLocator, DurabilityLevel, FullAssetKey, PendingAsset};
11use crate::db::Db;
12use crate::key::FullCacheKey;
13use crate::loading::AssetLoadingState;
14use crate::query::Query;
15use crate::storage::{
16    AssetKeyRegistry, AssetState, AssetStorage, CachedEntry, CachedValue, LocatorStorage,
17    PendingStorage, QueryRegistry, VerifierStorage,
18};
19use crate::tracer::{
20    ExecutionResult, InvalidationReason, NoopTracer, SpanId, Tracer, TracerAssetKey,
21    TracerAssetState, TracerQueryKey,
22};
23use crate::QueryError;
24
25/// Function type for comparing user errors during early cutoff.
26///
27/// Used by `QueryRuntimeBuilder::error_comparator` to customize how
28/// `QueryError::UserError` values are compared for caching purposes.
29pub type ErrorComparator = fn(&anyhow::Error, &anyhow::Error) -> bool;
30
31/// Number of durability levels (matches whale's default).
32const DURABILITY_LEVELS: usize = 4;
33
34// Thread-local query execution stack for cycle detection.
35thread_local! {
36    static QUERY_STACK: RefCell<Vec<FullCacheKey>> = const { RefCell::new(Vec::new()) };
37}
38
39/// Execution context passed through query execution.
40///
41/// Contains a span ID for tracing correlation.
42#[derive(Clone, Copy)]
43pub struct ExecutionContext {
44    span_id: SpanId,
45}
46
47impl ExecutionContext {
48    /// Create a new execution context with the given span ID.
49    #[inline]
50    pub fn new(span_id: SpanId) -> Self {
51        Self { span_id }
52    }
53
54    /// Get the span ID for this execution context.
55    #[inline]
56    pub fn span_id(&self) -> SpanId {
57        self.span_id
58    }
59}
60
61/// Result of polling a query, containing the value and its revision.
62///
63/// This is returned by [`QueryRuntime::poll`] and provides both the query result
64/// and its change revision, enabling efficient change detection for subscription
65/// patterns.
66///
67/// # Example
68///
69/// ```ignore
70/// let result = runtime.poll(MyQuery::new())?;
71///
72/// // Access the value via Deref
73/// println!("{:?}", *result);
74///
75/// // Check if changed since last poll
76/// if result.revision > last_known_revision {
77///     notify_subscribers(&result.value);
78///     last_known_revision = result.revision;
79/// }
80/// ```
81#[derive(Debug, Clone)]
82pub struct Polled<T> {
83    /// The query result value.
84    pub value: T,
85    /// The revision at which this value was last changed.
86    ///
87    /// Compare this with a previously stored revision to detect changes.
88    pub revision: RevisionCounter,
89}
90
91impl<T: Deref> Deref for Polled<T> {
92    type Target = T::Target;
93
94    fn deref(&self) -> &Self::Target {
95        &self.value
96    }
97}
98
99/// The query runtime manages query execution, caching, and dependency tracking.
100///
101/// This is cheap to clone - all data is behind `Arc`.
102///
103/// # Type Parameter
104///
105/// - `T: Tracer` - The tracer type for observability. Use `NoopTracer` (default)
106///   for zero-cost when tracing is not needed.
107///
108/// # Example
109///
110/// ```ignore
111/// // Without tracing (default)
112/// let runtime = QueryRuntime::new();
113///
114/// // With tracing
115/// let tracer = MyTracer::new();
116/// let runtime = QueryRuntime::with_tracer(tracer);
117///
118/// // Sync query execution
119/// let result = runtime.query(MyQuery { ... })?;
120/// ```
121pub struct QueryRuntime<T: Tracer = NoopTracer> {
122    /// Whale runtime for dependency tracking and cache storage.
123    /// Query outputs are stored in Node.data as Option<CachedEntry>.
124    whale: WhaleRuntime<FullCacheKey, Option<CachedEntry>, DURABILITY_LEVELS>,
125    /// Asset cache and state storage
126    assets: Arc<AssetStorage>,
127    /// Registered asset locators
128    locators: Arc<LocatorStorage>,
129    /// Pending asset requests
130    pending: Arc<PendingStorage>,
131    /// Registry for tracking query instances (for list_queries)
132    query_registry: Arc<QueryRegistry>,
133    /// Registry for tracking asset keys (for list_asset_keys)
134    asset_key_registry: Arc<AssetKeyRegistry>,
135    /// Verifiers for re-executing queries (for verify-then-decide pattern)
136    verifiers: Arc<VerifierStorage>,
137    /// Comparator for user errors during early cutoff
138    error_comparator: ErrorComparator,
139    /// Tracer for observability
140    tracer: Arc<T>,
141}
142
143impl Default for QueryRuntime<NoopTracer> {
144    fn default() -> Self {
145        Self::new()
146    }
147}
148
149impl<T: Tracer> Clone for QueryRuntime<T> {
150    fn clone(&self) -> Self {
151        Self {
152            whale: self.whale.clone(),
153            assets: self.assets.clone(),
154            locators: self.locators.clone(),
155            pending: self.pending.clone(),
156            query_registry: self.query_registry.clone(),
157            asset_key_registry: self.asset_key_registry.clone(),
158            verifiers: self.verifiers.clone(),
159            error_comparator: self.error_comparator,
160            tracer: self.tracer.clone(),
161        }
162    }
163}
164
165/// Default error comparator that treats all errors as different.
166///
167/// This is conservative - it always triggers recomputation when an error occurs.
168fn default_error_comparator(_a: &anyhow::Error, _b: &anyhow::Error) -> bool {
169    false
170}
171
172impl<T: Tracer> QueryRuntime<T> {
173    /// Get cached output along with its revision (single atomic access).
174    fn get_cached_with_revision<Q: Query>(
175        &self,
176        key: &FullCacheKey,
177    ) -> Option<(CachedValue<Arc<Q::Output>>, RevisionCounter)> {
178        let node = self.whale.get(key)?;
179        let revision = node.changed_at;
180        let entry = node.data.as_ref()?;
181        let cached = entry.to_cached_value::<Q::Output>()?;
182        Some((cached, revision))
183    }
184
185    /// Get a reference to the tracer.
186    #[inline]
187    pub fn tracer(&self) -> &T {
188        &self.tracer
189    }
190}
191
192impl QueryRuntime<NoopTracer> {
193    /// Create a new query runtime with default settings.
194    pub fn new() -> Self {
195        Self::with_tracer(NoopTracer)
196    }
197
198    /// Create a builder for customizing the runtime.
199    ///
200    /// # Example
201    ///
202    /// ```ignore
203    /// let runtime = QueryRuntime::builder()
204    ///     .error_comparator(|a, b| {
205    ///         // Custom error comparison logic
206    ///         match (a.downcast_ref::<MyError>(), b.downcast_ref::<MyError>()) {
207    ///             (Some(a), Some(b)) => a == b,
208    ///             _ => false,
209    ///         }
210    ///     })
211    ///     .build();
212    /// ```
213    pub fn builder() -> QueryRuntimeBuilder<NoopTracer> {
214        QueryRuntimeBuilder::new()
215    }
216}
217
218impl<T: Tracer> QueryRuntime<T> {
219    /// Create a new query runtime with the specified tracer.
220    pub fn with_tracer(tracer: T) -> Self {
221        QueryRuntimeBuilder::new().tracer(tracer).build()
222    }
223
224    /// Execute a query synchronously.
225    ///
226    /// Returns the cached result if valid, otherwise executes the query.
227    ///
228    /// # Errors
229    ///
230    /// - `QueryError::Suspend` - Query is waiting for async loading
231    /// - `QueryError::Cycle` - Dependency cycle detected
232    pub fn query<Q: Query>(&self, query: Q) -> Result<Arc<Q::Output>, QueryError> {
233        self.query_internal(query)
234            .and_then(|(inner_result, _)| inner_result.map_err(QueryError::UserError))
235    }
236
237    /// Internal implementation shared by query() and poll().
238    ///
239    /// Returns (result, revision) tuple where result is either Ok(output) or Err(user_error).
240    /// System errors (Suspend, Cycle, etc.) are returned as the outer Err.
241    #[allow(clippy::type_complexity)]
242    fn query_internal<Q: Query>(
243        &self,
244        query: Q,
245    ) -> Result<(Result<Arc<Q::Output>, Arc<anyhow::Error>>, RevisionCounter), QueryError> {
246        let key = query.cache_key();
247        let full_key = FullCacheKey::new::<Q, _>(&key);
248
249        // Create execution context and emit start event
250        let span_id = self.tracer.new_span_id();
251        let exec_ctx = ExecutionContext::new(span_id);
252        let query_key = TracerQueryKey::new(std::any::type_name::<Q>(), full_key.debug_repr());
253
254        self.tracer.on_query_start(span_id, query_key.clone());
255
256        // Check for cycles using thread-local stack
257        let cycle_detected = QUERY_STACK.with(|stack| {
258            let stack = stack.borrow();
259            stack.iter().any(|k| k == &full_key)
260        });
261
262        if cycle_detected {
263            let path = QUERY_STACK.with(|stack| {
264                let stack = stack.borrow();
265                let mut path: Vec<String> =
266                    stack.iter().map(|k| k.debug_repr().to_string()).collect();
267                path.push(full_key.debug_repr().to_string());
268                path
269            });
270
271            self.tracer.on_cycle_detected(
272                path.iter()
273                    .map(|s| TracerQueryKey::new("", s.clone()))
274                    .collect(),
275            );
276            self.tracer
277                .on_query_end(span_id, query_key.clone(), ExecutionResult::CycleDetected);
278
279            return Err(QueryError::Cycle { path });
280        }
281
282        // Check if cached and valid (with verify-then-decide pattern)
283        let current_rev = self.whale.current_revision();
284
285        // Fast path: already verified at current revision
286        if self.whale.is_verified_at(&full_key, &current_rev) {
287            // Single atomic access to get both cached value and revision
288            if let Some((cached, revision)) = self.get_cached_with_revision::<Q>(&full_key) {
289                self.tracer.on_cache_check(span_id, query_key.clone(), true);
290                self.tracer
291                    .on_query_end(span_id, query_key.clone(), ExecutionResult::CacheHit);
292
293                return match cached {
294                    CachedValue::Ok(output) => Ok((Ok(output), revision)),
295                    CachedValue::UserError(err) => Ok((Err(err), revision)),
296                };
297            }
298        }
299
300        // Check shallow validity (deps' changed_at ok) and try verify-then-decide
301        if self.whale.is_valid(&full_key) {
302            // Single atomic access to get both cached value and revision
303            if let Some((cached, revision)) = self.get_cached_with_revision::<Q>(&full_key) {
304                // Shallow valid but not verified - verify deps first
305                let mut deps_verified = true;
306                if let Some(deps) = self.whale.get_dependency_ids(&full_key) {
307                    for dep in deps {
308                        if let Some(verifier) = self.verifiers.get(&dep) {
309                            // Re-query dep to verify it (triggers recursive verification)
310                            if verifier.verify(self as &dyn std::any::Any).is_err() {
311                                deps_verified = false;
312                                break;
313                            }
314                        }
315                    }
316                }
317
318                // Re-check validity after deps are verified
319                if deps_verified && self.whale.is_valid(&full_key) {
320                    // Deps didn't change their changed_at, mark as verified and use cached
321                    self.whale.mark_verified(&full_key, &current_rev);
322
323                    self.tracer.on_cache_check(span_id, query_key.clone(), true);
324                    self.tracer
325                        .on_query_end(span_id, query_key.clone(), ExecutionResult::CacheHit);
326
327                    return match cached {
328                        CachedValue::Ok(output) => Ok((Ok(output), revision)),
329                        CachedValue::UserError(err) => Ok((Err(err), revision)),
330                    };
331                }
332                // A dep's changed_at increased, fall through to execute
333            }
334        }
335
336        self.tracer
337            .on_cache_check(span_id, query_key.clone(), false);
338
339        // Execute the query with cycle tracking
340        QUERY_STACK.with(|stack| {
341            stack.borrow_mut().push(full_key.clone());
342        });
343
344        let result = self.execute_query::<Q>(&query, &full_key, exec_ctx);
345
346        QUERY_STACK.with(|stack| {
347            stack.borrow_mut().pop();
348        });
349
350        // Emit end event
351        let exec_result = match &result {
352            Ok((_, true, _)) => ExecutionResult::Changed,
353            Ok((_, false, _)) => ExecutionResult::Unchanged,
354            Err(QueryError::Suspend { .. }) => ExecutionResult::Suspended,
355            Err(QueryError::Cycle { .. }) => ExecutionResult::CycleDetected,
356            Err(e) => ExecutionResult::Error {
357                message: format!("{:?}", e),
358            },
359        };
360        self.tracer
361            .on_query_end(span_id, query_key.clone(), exec_result);
362
363        result.map(|(inner_result, _, revision)| (inner_result, revision))
364    }
365
366    /// Execute a query, caching the result if appropriate.
367    ///
368    /// Returns (result, output_changed, revision) tuple.
369    /// - `result`: Ok(output) for success, Err(user_error) for user errors
370    /// - System errors (Suspend, Cycle, etc.) are returned as outer Err
371    #[allow(clippy::type_complexity)]
372    fn execute_query<Q: Query>(
373        &self,
374        query: &Q,
375        full_key: &FullCacheKey,
376        exec_ctx: ExecutionContext,
377    ) -> Result<
378        (
379            Result<Arc<Q::Output>, Arc<anyhow::Error>>,
380            bool,
381            RevisionCounter,
382        ),
383        QueryError,
384    > {
385        // Create context for this query execution
386        let ctx = QueryContext {
387            runtime: self,
388            current_key: full_key.clone(),
389            parent_query_type: std::any::type_name::<Q>(),
390            exec_ctx,
391            deps: RefCell::new(Vec::new()),
392        };
393
394        // Execute the query (clone because query() takes ownership)
395        let result = query.clone().query(&ctx);
396
397        // Get collected dependencies
398        let deps: Vec<FullCacheKey> = ctx.deps.borrow().clone();
399
400        // Get durability for whale registration
401        let durability =
402            Durability::new(query.durability() as usize).unwrap_or(Durability::volatile());
403
404        match result {
405            Ok(output) => {
406                let output = Arc::new(output);
407
408                // Check if output changed (for early cutoff)
409                // existing_revision is Some only when output is unchanged (can reuse revision)
410                let existing_revision = if let Some((CachedValue::Ok(old), rev)) =
411                    self.get_cached_with_revision::<Q>(full_key)
412                {
413                    if Q::output_eq(&old, &output) {
414                        Some(rev) // Same output - reuse revision
415                    } else {
416                        None // Different output
417                    }
418                } else {
419                    None // No previous Ok value
420                };
421                let output_changed = existing_revision.is_none();
422
423                // Emit early cutoff check event
424                self.tracer.on_early_cutoff_check(
425                    exec_ctx.span_id(),
426                    TracerQueryKey::new(std::any::type_name::<Q>(), full_key.debug_repr()),
427                    output_changed,
428                );
429
430                // Update whale with cached entry (atomic update of value + dependency state)
431                let entry = CachedEntry::Ok(output.clone() as Arc<dyn std::any::Any + Send + Sync>);
432                let revision = if let Some(existing_rev) = existing_revision {
433                    // confirm_unchanged doesn't change changed_at, use existing
434                    let _ = self.whale.confirm_unchanged(full_key, deps);
435                    existing_rev
436                } else {
437                    // Use new_rev from register result
438                    match self
439                        .whale
440                        .register(full_key.clone(), Some(entry), durability, deps)
441                    {
442                        Ok(result) => result.new_rev,
443                        Err(missing) => {
444                            return Err(QueryError::DependenciesRemoved {
445                                missing_keys: missing,
446                            })
447                        }
448                    }
449                };
450
451                // Register query in registry for list_queries
452                let is_new_query = self.query_registry.register(query);
453                if is_new_query {
454                    let sentinel = FullCacheKey::query_set_sentinel::<Q>();
455                    let _ = self
456                        .whale
457                        .register(sentinel, None, Durability::volatile(), vec![]);
458                }
459
460                // Store verifier for this query (for verify-then-decide pattern)
461                self.verifiers
462                    .insert::<Q, T>(full_key.clone(), query.clone());
463
464                Ok((Ok(output), output_changed, revision))
465            }
466            Err(QueryError::UserError(err)) => {
467                // Check if error changed (for early cutoff)
468                // existing_revision is Some only when error is unchanged (can reuse revision)
469                let existing_revision = if let Some((CachedValue::UserError(old_err), rev)) =
470                    self.get_cached_with_revision::<Q>(full_key)
471                {
472                    if (self.error_comparator)(old_err.as_ref(), err.as_ref()) {
473                        Some(rev) // Same error - reuse revision
474                    } else {
475                        None // Different error
476                    }
477                } else {
478                    None // No previous UserError
479                };
480                let output_changed = existing_revision.is_none();
481
482                // Emit early cutoff check event
483                self.tracer.on_early_cutoff_check(
484                    exec_ctx.span_id(),
485                    TracerQueryKey::new(std::any::type_name::<Q>(), full_key.debug_repr()),
486                    output_changed,
487                );
488
489                // Update whale with cached error (atomic update of value + dependency state)
490                let entry = CachedEntry::UserError(err.clone());
491                let revision = if let Some(existing_rev) = existing_revision {
492                    // confirm_unchanged doesn't change changed_at, use existing
493                    let _ = self.whale.confirm_unchanged(full_key, deps);
494                    existing_rev
495                } else {
496                    // Use new_rev from register result
497                    match self
498                        .whale
499                        .register(full_key.clone(), Some(entry), durability, deps)
500                    {
501                        Ok(result) => result.new_rev,
502                        Err(missing) => {
503                            return Err(QueryError::DependenciesRemoved {
504                                missing_keys: missing,
505                            })
506                        }
507                    }
508                };
509
510                // Register query in registry for list_queries
511                let is_new_query = self.query_registry.register(query);
512                if is_new_query {
513                    let sentinel = FullCacheKey::query_set_sentinel::<Q>();
514                    let _ = self
515                        .whale
516                        .register(sentinel, None, Durability::volatile(), vec![]);
517                }
518
519                // Store verifier for this query (for verify-then-decide pattern)
520                self.verifiers
521                    .insert::<Q, T>(full_key.clone(), query.clone());
522
523                Ok((Err(err), output_changed, revision))
524            }
525            Err(e) => {
526                // System errors (Suspend, Cycle, Cancelled, MissingDependency) are not cached
527                Err(e)
528            }
529        }
530    }
531
532    /// Invalidate a query, forcing recomputation on next access.
533    ///
534    /// This also invalidates any queries that depend on this one.
535    pub fn invalidate<Q: Query>(&self, key: &Q::CacheKey) {
536        let full_key = FullCacheKey::new::<Q, _>(key);
537
538        self.tracer.on_query_invalidated(
539            TracerQueryKey::new(std::any::type_name::<Q>(), full_key.debug_repr()),
540            InvalidationReason::ManualInvalidation,
541        );
542
543        // Update whale to invalidate dependents (register with None to clear cached value)
544        let _ = self
545            .whale
546            .register(full_key, None, Durability::volatile(), vec![]);
547    }
548
549    /// Remove a query from the cache entirely, freeing memory.
550    ///
551    /// Use this for GC when a query is no longer needed.
552    /// Unlike `invalidate`, this removes all traces of the query from storage.
553    /// The query will be recomputed from scratch on next access.
554    ///
555    /// This also invalidates any queries that depend on this one.
556    pub fn remove_query<Q: Query>(&self, key: &Q::CacheKey) {
557        let full_key = FullCacheKey::new::<Q, _>(key);
558
559        self.tracer.on_query_invalidated(
560            TracerQueryKey::new(std::any::type_name::<Q>(), full_key.debug_repr()),
561            InvalidationReason::ManualInvalidation,
562        );
563
564        // Remove verifier if exists
565        self.verifiers.remove(&full_key);
566
567        // Remove from whale storage (this also handles dependent invalidation)
568        self.whale.remove(&full_key);
569
570        // Remove from registry and update sentinel for list_queries
571        if self.query_registry.remove::<Q>(key) {
572            let sentinel = FullCacheKey::query_set_sentinel::<Q>();
573            let _ = self
574                .whale
575                .register(sentinel, None, Durability::volatile(), vec![]);
576        }
577    }
578
579    /// Clear all cached values by removing all nodes from whale.
580    ///
581    /// Note: This is a relatively expensive operation as it iterates through all keys.
582    pub fn clear_cache(&self) {
583        let keys = self.whale.keys();
584        for key in keys {
585            self.whale.remove(&key);
586        }
587    }
588
589    /// Poll a query, returning both the result and its change revision.
590    ///
591    /// This is useful for implementing subscription patterns where you need to
592    /// detect changes efficiently. Compare the returned `revision` with a
593    /// previously stored value to determine if the query result has changed.
594    ///
595    /// The returned `Polled` contains a `Result<Arc<Q::Output>, Arc<anyhow::Error>>`
596    /// as its value, allowing you to track revision changes for both success and
597    /// user error cases.
598    ///
599    /// # Example
600    ///
601    /// ```ignore
602    /// struct Subscription<Q: Query> {
603    ///     query: Q,
604    ///     last_revision: RevisionCounter,
605    ///     tx: Sender<Result<Arc<Q::Output>, Arc<anyhow::Error>>>,
606    /// }
607    ///
608    /// // Polling loop
609    /// for sub in &mut subscriptions {
610    ///     let result = runtime.poll(sub.query.clone())?;
611    ///     if result.revision > sub.last_revision {
612    ///         sub.tx.send(result.value.clone())?;
613    ///         sub.last_revision = result.revision;
614    ///     }
615    /// }
616    /// ```
617    ///
618    /// # Errors
619    ///
620    /// Returns `Err` only for system errors (Suspend, Cycle, etc.).
621    /// User errors are returned as `Ok(Polled { value: Err(error), ... })`.
622    #[allow(clippy::type_complexity)]
623    pub fn poll<Q: Query>(
624        &self,
625        query: Q,
626    ) -> Result<Polled<Result<Arc<Q::Output>, Arc<anyhow::Error>>>, QueryError> {
627        let (value, revision) = self.query_internal(query)?;
628        Ok(Polled { value, revision })
629    }
630
631    /// Get the change revision of a query without executing it.
632    ///
633    /// Returns `None` if the query has never been executed.
634    ///
635    /// This is useful for checking if a query has changed since the last poll
636    /// without the cost of executing the query.
637    ///
638    /// # Example
639    ///
640    /// ```ignore
641    /// // Check if query has changed before deciding to poll
642    /// if let Some(rev) = runtime.changed_at::<MyQuery>(&key) {
643    ///     if rev > last_known_revision {
644    ///         let result = runtime.query(MyQuery::new(key))?;
645    ///         // Process result...
646    ///     }
647    /// }
648    /// ```
649    pub fn changed_at<Q: Query>(&self, key: &Q::CacheKey) -> Option<RevisionCounter> {
650        let full_key = FullCacheKey::new::<Q, _>(key);
651        self.whale.get(&full_key).map(|node| node.changed_at)
652    }
653}
654
655// ============================================================================
656// Builder
657// ============================================================================
658
659/// Builder for [`QueryRuntime`] with customizable settings.
660///
661/// # Example
662///
663/// ```ignore
664/// let runtime = QueryRuntime::builder()
665///     .error_comparator(|a, b| {
666///         // Treat all errors of the same type as equal
667///         a.downcast_ref::<std::io::Error>().is_some()
668///             == b.downcast_ref::<std::io::Error>().is_some()
669///     })
670///     .build();
671/// ```
672pub struct QueryRuntimeBuilder<T: Tracer = NoopTracer> {
673    error_comparator: ErrorComparator,
674    tracer: T,
675}
676
677impl Default for QueryRuntimeBuilder<NoopTracer> {
678    fn default() -> Self {
679        Self::new()
680    }
681}
682
683impl QueryRuntimeBuilder<NoopTracer> {
684    /// Create a new builder with default settings.
685    pub fn new() -> Self {
686        Self {
687            error_comparator: default_error_comparator,
688            tracer: NoopTracer,
689        }
690    }
691}
692
693impl<T: Tracer> QueryRuntimeBuilder<T> {
694    /// Set the error comparator function for early cutoff optimization.
695    ///
696    /// When a query returns `QueryError::UserError`, this function is used
697    /// to compare it with the previously cached error. If they are equal,
698    /// downstream queries can skip recomputation (early cutoff).
699    ///
700    /// The default comparator returns `false` for all errors, meaning errors
701    /// are always considered different (conservative, always recomputes).
702    ///
703    /// # Example
704    ///
705    /// ```ignore
706    /// // Treat errors as equal if they have the same display message
707    /// let runtime = QueryRuntime::builder()
708    ///     .error_comparator(|a, b| a.to_string() == b.to_string())
709    ///     .build();
710    /// ```
711    pub fn error_comparator(mut self, f: ErrorComparator) -> Self {
712        self.error_comparator = f;
713        self
714    }
715
716    /// Set the tracer for observability.
717    pub fn tracer<U: Tracer>(self, tracer: U) -> QueryRuntimeBuilder<U> {
718        QueryRuntimeBuilder {
719            error_comparator: self.error_comparator,
720            tracer,
721        }
722    }
723
724    /// Build the runtime with the configured settings.
725    pub fn build(self) -> QueryRuntime<T> {
726        QueryRuntime {
727            whale: WhaleRuntime::new(),
728            assets: Arc::new(AssetStorage::new()),
729            locators: Arc::new(LocatorStorage::new()),
730            pending: Arc::new(PendingStorage::new()),
731            query_registry: Arc::new(QueryRegistry::new()),
732            asset_key_registry: Arc::new(AssetKeyRegistry::new()),
733            verifiers: Arc::new(VerifierStorage::new()),
734            error_comparator: self.error_comparator,
735            tracer: Arc::new(self.tracer),
736        }
737    }
738}
739
740// ============================================================================
741// Asset API
742// ============================================================================
743
744impl<T: Tracer> QueryRuntime<T> {
745    /// Register an asset locator for a specific asset key type.
746    ///
747    /// Only one locator can be registered per key type. Later registrations
748    /// replace earlier ones.
749    ///
750    /// # Example
751    ///
752    /// ```ignore
753    /// let runtime = QueryRuntime::new();
754    /// runtime.register_asset_locator(FileSystemLocator::new("/assets"));
755    /// ```
756    pub fn register_asset_locator<K, L>(&self, locator: L)
757    where
758        K: AssetKey,
759        L: AssetLocator<K>,
760    {
761        self.locators.insert::<K, L>(locator);
762    }
763
764    /// Get an iterator over pending asset requests.
765    ///
766    /// Returns assets that have been requested but not yet resolved.
767    /// The user should fetch these externally and call `resolve_asset()`.
768    ///
769    /// # Example
770    ///
771    /// ```ignore
772    /// for pending in runtime.pending_assets() {
773    ///     if let Some(path) = pending.key::<FilePath>() {
774    ///         let content = fetch_file(path);
775    ///         runtime.resolve_asset(path.clone(), content);
776    ///     }
777    /// }
778    /// ```
779    pub fn pending_assets(&self) -> Vec<PendingAsset> {
780        self.pending.get_all()
781    }
782
783    /// Get pending assets filtered by key type.
784    pub fn pending_assets_of<K: AssetKey>(&self) -> Vec<K> {
785        self.pending.get_of_type::<K>()
786    }
787
788    /// Check if there are any pending assets.
789    pub fn has_pending_assets(&self) -> bool {
790        !self.pending.is_empty()
791    }
792
793    /// Resolve an asset with its loaded value.
794    ///
795    /// This marks the asset as ready and invalidates any queries that
796    /// depend on it (if the value changed), triggering recomputation on next access.
797    ///
798    /// This method is idempotent - resolving with the same value (via `asset_eq`)
799    /// will not trigger downstream recomputation.
800    ///
801    /// Uses the asset key's default durability.
802    ///
803    /// # Example
804    ///
805    /// ```ignore
806    /// let content = std::fs::read_to_string(&path)?;
807    /// runtime.resolve_asset(FilePath(path), content);
808    /// ```
809    pub fn resolve_asset<K: AssetKey>(&self, key: K, value: K::Asset) {
810        let durability = key.durability();
811        self.resolve_asset_internal(key, value, durability);
812    }
813
814    /// Resolve an asset with a specific durability level.
815    ///
816    /// Use this to override the asset key's default durability.
817    pub fn resolve_asset_with_durability<K: AssetKey>(
818        &self,
819        key: K,
820        value: K::Asset,
821        durability: DurabilityLevel,
822    ) {
823        self.resolve_asset_internal(key, value, durability);
824    }
825
826    fn resolve_asset_internal<K: AssetKey>(
827        &self,
828        key: K,
829        value: K::Asset,
830        durability_level: DurabilityLevel,
831    ) {
832        let full_asset_key = FullAssetKey::new(&key);
833        let full_cache_key = FullCacheKey::from_asset_key(&full_asset_key);
834
835        // Check for early cutoff
836        let changed = if let Some(old_value) = self.assets.get_ready::<K>(&full_asset_key) {
837            !K::asset_eq(&old_value, &value)
838        } else {
839            true // Loading or NotFound -> Ready is a change
840        };
841
842        // Emit asset resolved event
843        self.tracer.on_asset_resolved(
844            TracerAssetKey::new(std::any::type_name::<K>(), format!("{:?}", key)),
845            changed,
846        );
847
848        // Store the new value
849        self.assets
850            .insert_ready::<K>(full_asset_key.clone(), Arc::new(value));
851
852        // Remove from pending
853        self.pending.remove(&full_asset_key);
854
855        // Update whale dependency tracking
856        let durability =
857            Durability::new(durability_level.as_u8() as usize).unwrap_or(Durability::volatile());
858
859        if changed {
860            // Register with new changed_at to invalidate dependents
861            let _ = self
862                .whale
863                .register(full_cache_key, None, durability, vec![]);
864        } else {
865            // Early cutoff - keep old changed_at
866            let _ = self.whale.confirm_unchanged(&full_cache_key, vec![]);
867        }
868
869        // Register asset key in registry for list_asset_keys
870        let is_new_asset = self.asset_key_registry.register(&key);
871        if is_new_asset {
872            // Update sentinel to invalidate list_asset_keys dependents
873            let sentinel = FullCacheKey::asset_key_set_sentinel::<K>();
874            let _ = self
875                .whale
876                .register(sentinel, None, Durability::volatile(), vec![]);
877        }
878    }
879
880    /// Invalidate an asset, forcing queries to re-request it.
881    ///
882    /// The asset will be marked as loading and added to pending assets.
883    /// Dependent queries will suspend until the asset is resolved again.
884    ///
885    /// # Example
886    ///
887    /// ```ignore
888    /// // File was modified externally
889    /// runtime.invalidate_asset(&FilePath("config.json".into()));
890    /// // Queries depending on this asset will now suspend
891    /// // User should fetch the new value and call resolve_asset
892    /// ```
893    pub fn invalidate_asset<K: AssetKey>(&self, key: &K) {
894        let full_asset_key = FullAssetKey::new(key);
895        let full_cache_key = FullCacheKey::from_asset_key(&full_asset_key);
896
897        // Emit asset invalidated event
898        self.tracer.on_asset_invalidated(TracerAssetKey::new(
899            std::any::type_name::<K>(),
900            format!("{:?}", key),
901        ));
902
903        // Mark as loading
904        self.assets
905            .insert(full_asset_key.clone(), AssetState::Loading);
906
907        // Add to pending
908        self.pending.insert::<K>(full_asset_key, key.clone());
909
910        // Update whale to invalidate dependents (use volatile during loading)
911        let _ = self
912            .whale
913            .register(full_cache_key, None, Durability::volatile(), vec![]);
914    }
915
916    /// Remove an asset from the cache entirely.
917    ///
918    /// Unlike `invalidate_asset`, this removes all traces of the asset.
919    /// Dependent queries will go through the locator again on next access.
920    pub fn remove_asset<K: AssetKey>(&self, key: &K) {
921        let full_asset_key = FullAssetKey::new(key);
922        let full_cache_key = FullCacheKey::from_asset_key(&full_asset_key);
923
924        // First, register a change to invalidate dependents
925        // This ensures queries that depend on this asset will recompute
926        let _ = self
927            .whale
928            .register(full_cache_key.clone(), None, Durability::volatile(), vec![]);
929
930        // Then remove the asset from storage
931        self.assets.remove(&full_asset_key);
932        self.pending.remove(&full_asset_key);
933
934        // Finally remove from whale tracking
935        self.whale.remove(&full_cache_key);
936
937        // Remove from registry and update sentinel for list_asset_keys
938        if self.asset_key_registry.remove::<K>(key) {
939            let sentinel = FullCacheKey::asset_key_set_sentinel::<K>();
940            let _ = self
941                .whale
942                .register(sentinel, None, Durability::volatile(), vec![]);
943        }
944    }
945
946    /// Get an asset by key without tracking dependencies.
947    ///
948    /// Unlike `QueryContext::asset()`, this method does NOT register the caller
949    /// as a dependent of the asset. Use this for direct asset access outside
950    /// of query execution.
951    ///
952    /// # Returns
953    ///
954    /// - `Ok(AssetLoadingState::ready(...))` - Asset is loaded and ready
955    /// - `Ok(AssetLoadingState::loading(...))` - Asset is still loading (added to pending)
956    /// - `Err(QueryError::MissingDependency)` - Asset was not found
957    pub fn get_asset<K: AssetKey>(&self, key: K) -> Result<AssetLoadingState<K>, QueryError> {
958        self.get_asset_internal(key)
959    }
960
961    /// Internal: Get asset state, checking cache and locator.
962    fn get_asset_internal<K: AssetKey>(&self, key: K) -> Result<AssetLoadingState<K>, QueryError> {
963        let full_asset_key = FullAssetKey::new(&key);
964        let full_cache_key = FullCacheKey::from_asset_key(&full_asset_key);
965
966        // Helper to emit AssetRequested event
967        let emit_requested = |tracer: &T, key: &K, state: TracerAssetState| {
968            tracer.on_asset_requested(
969                TracerAssetKey::new(std::any::type_name::<K>(), format!("{:?}", key)),
970                state,
971            );
972        };
973
974        // Check cache first
975        if let Some(state) = self.assets.get(&full_asset_key) {
976            // Check if whale thinks it's valid
977            if self.whale.is_valid(&full_cache_key) {
978                return match state {
979                    AssetState::Loading => {
980                        emit_requested(&self.tracer, &key, TracerAssetState::Loading);
981                        Ok(AssetLoadingState::loading(key))
982                    }
983                    AssetState::Ready(arc) => {
984                        emit_requested(&self.tracer, &key, TracerAssetState::Ready);
985                        match arc.downcast::<K::Asset>() {
986                            Ok(value) => Ok(AssetLoadingState::ready(key, value)),
987                            Err(_) => Err(QueryError::MissingDependency {
988                                description: format!("Asset type mismatch: {:?}", key),
989                            }),
990                        }
991                    }
992                    AssetState::NotFound => {
993                        emit_requested(&self.tracer, &key, TracerAssetState::NotFound);
994                        Err(QueryError::MissingDependency {
995                            description: format!("Asset not found: {:?}", key),
996                        })
997                    }
998                };
999            }
1000        }
1001
1002        // Check if there's a registered locator
1003        if let Some(locator) = self.locators.get(TypeId::of::<K>()) {
1004            if let Some(state) = locator.locate_any(&key) {
1005                // Store the state
1006                self.assets.insert(full_asset_key.clone(), state.clone());
1007
1008                match state {
1009                    AssetState::Ready(arc) => {
1010                        emit_requested(&self.tracer, &key, TracerAssetState::Ready);
1011
1012                        // Register with whale
1013                        let durability = Durability::new(key.durability().as_u8() as usize)
1014                            .unwrap_or(Durability::volatile());
1015                        self.whale
1016                            .register(full_cache_key, None, durability, vec![])
1017                            .expect("register with no dependencies cannot fail");
1018
1019                        match arc.downcast::<K::Asset>() {
1020                            Ok(value) => return Ok(AssetLoadingState::ready(key, value)),
1021                            Err(_) => {
1022                                return Err(QueryError::MissingDependency {
1023                                    description: format!("Asset type mismatch: {:?}", key),
1024                                })
1025                            }
1026                        }
1027                    }
1028                    AssetState::Loading => {
1029                        emit_requested(&self.tracer, &key, TracerAssetState::Loading);
1030                        self.pending.insert::<K>(full_asset_key, key.clone());
1031
1032                        // Register in whale so queries can depend on this asset
1033                        self.whale
1034                            .register(full_cache_key, None, Durability::volatile(), vec![])
1035                            .expect("register with no dependencies cannot fail");
1036
1037                        return Ok(AssetLoadingState::loading(key));
1038                    }
1039                    AssetState::NotFound => {
1040                        emit_requested(&self.tracer, &key, TracerAssetState::NotFound);
1041                        return Err(QueryError::MissingDependency {
1042                            description: format!("Asset not found: {:?}", key),
1043                        });
1044                    }
1045                }
1046            }
1047        }
1048
1049        // No locator registered or locator returned None - mark as pending
1050        emit_requested(&self.tracer, &key, TracerAssetState::Loading);
1051        self.assets
1052            .insert(full_asset_key.clone(), AssetState::Loading);
1053        self.pending
1054            .insert::<K>(full_asset_key.clone(), key.clone());
1055
1056        // Register in whale so queries can depend on this asset
1057        self.whale
1058            .register(full_cache_key, None, Durability::volatile(), vec![])
1059            .expect("register with no dependencies cannot fail");
1060
1061        Ok(AssetLoadingState::loading(key))
1062    }
1063}
1064
1065impl<T: Tracer> Db for QueryRuntime<T> {
1066    fn query<Q: Query>(&self, query: Q) -> Result<Arc<Q::Output>, QueryError> {
1067        QueryRuntime::query(self, query)
1068    }
1069
1070    fn asset<K: AssetKey>(&self, key: K) -> Result<AssetLoadingState<K>, QueryError> {
1071        self.get_asset_internal(key)
1072    }
1073
1074    fn list_queries<Q: Query>(&self) -> Vec<Q> {
1075        self.query_registry.get_all::<Q>()
1076    }
1077
1078    fn list_asset_keys<K: AssetKey>(&self) -> Vec<K> {
1079        self.asset_key_registry.get_all::<K>()
1080    }
1081}
1082
1083/// Context provided to queries during execution.
1084///
1085/// Use this to access dependencies via `query()`.
1086pub struct QueryContext<'a, T: Tracer = NoopTracer> {
1087    runtime: &'a QueryRuntime<T>,
1088    current_key: FullCacheKey,
1089    parent_query_type: &'static str,
1090    exec_ctx: ExecutionContext,
1091    deps: RefCell<Vec<FullCacheKey>>,
1092}
1093
1094impl<'a, T: Tracer> QueryContext<'a, T> {
1095    /// Query a dependency.
1096    ///
1097    /// The dependency is automatically tracked for invalidation.
1098    ///
1099    /// # Example
1100    ///
1101    /// ```ignore
1102    /// fn query(self, db: &impl Db) -> Result<Self::Output, QueryError> {
1103    ///     let dep_result = db.query(OtherQuery { id: self.id })?;
1104    ///     Ok(process(&dep_result))
1105    /// }
1106    /// ```
1107    pub fn query<Q: Query>(&self, query: Q) -> Result<Arc<Q::Output>, QueryError> {
1108        let key = query.cache_key();
1109        let full_key = FullCacheKey::new::<Q, _>(&key);
1110
1111        // Emit dependency registered event
1112        self.runtime.tracer.on_dependency_registered(
1113            self.exec_ctx.span_id(),
1114            TracerQueryKey::new(self.parent_query_type, self.current_key.debug_repr()),
1115            TracerQueryKey::new(std::any::type_name::<Q>(), full_key.debug_repr()),
1116        );
1117
1118        // Record this as a dependency
1119        self.deps.borrow_mut().push(full_key.clone());
1120
1121        // Execute the query
1122        self.runtime.query(query)
1123    }
1124
1125    /// Access an asset, tracking it as a dependency.
1126    ///
1127    /// Returns `AssetLoadingState<K>`:
1128    /// - `is_loading()` if the asset is still being loaded
1129    /// - `is_ready()` if the asset is available
1130    ///
1131    /// Use `.suspend()?` to convert to `Result<Arc<K::Asset>, QueryError>`,
1132    /// which returns `Err(QueryError::Suspend { asset })` if still loading.
1133    ///
1134    /// # Example
1135    ///
1136    /// ```ignore
1137    /// #[query]
1138    /// fn process_file(db: &impl Db, path: FilePath) -> Result<Output, QueryError> {
1139    ///     let content = db.asset(path)?.suspend()?;
1140    ///     // Process content...
1141    ///     Ok(output)
1142    /// }
1143    /// ```
1144    ///
1145    /// # Errors
1146    ///
1147    /// Returns `Err(QueryError::MissingDependency)` if the asset was not found.
1148    pub fn asset<K: AssetKey>(&self, key: K) -> Result<AssetLoadingState<K>, QueryError> {
1149        let full_asset_key = FullAssetKey::new(&key);
1150        let full_cache_key = FullCacheKey::from_asset_key(&full_asset_key);
1151
1152        // Emit asset dependency registered event
1153        self.runtime.tracer.on_asset_dependency_registered(
1154            self.exec_ctx.span_id(),
1155            TracerQueryKey::new(self.parent_query_type, self.current_key.debug_repr()),
1156            TracerAssetKey::new(std::any::type_name::<K>(), format!("{:?}", key)),
1157        );
1158
1159        // Record dependency on this asset
1160        self.deps.borrow_mut().push(full_cache_key);
1161
1162        // Get asset from runtime
1163        let result = self.runtime.get_asset_internal(key);
1164
1165        // Emit missing dependency event on error
1166        if let Err(QueryError::MissingDependency { ref description }) = result {
1167            self.runtime.tracer.on_missing_dependency(
1168                TracerQueryKey::new(self.parent_query_type, self.current_key.debug_repr()),
1169                description.clone(),
1170            );
1171        }
1172
1173        result
1174    }
1175
1176    /// List all query instances of type Q that have been registered.
1177    ///
1178    /// This method establishes a dependency on the "set" of queries of type Q.
1179    /// The calling query will be invalidated when:
1180    /// - A new query of type Q is first executed (added to set)
1181    ///
1182    /// The calling query will NOT be invalidated when:
1183    /// - An individual query of type Q has its value change
1184    ///
1185    /// # Example
1186    ///
1187    /// ```ignore
1188    /// #[query]
1189    /// fn all_results(db: &impl Db) -> Result<Vec<i32>, QueryError> {
1190    ///     let queries = db.list_queries::<MyQuery>();
1191    ///     let mut results = Vec::new();
1192    ///     for q in queries {
1193    ///         results.push(*db.query(q)?);
1194    ///     }
1195    ///     Ok(results)
1196    /// }
1197    /// ```
1198    pub fn list_queries<Q: Query>(&self) -> Vec<Q> {
1199        // Record dependency on the sentinel (set-level dependency)
1200        let sentinel = FullCacheKey::query_set_sentinel::<Q>();
1201
1202        self.runtime.tracer.on_dependency_registered(
1203            self.exec_ctx.span_id(),
1204            TracerQueryKey::new(self.parent_query_type, self.current_key.debug_repr()),
1205            TracerQueryKey::new("QuerySet", sentinel.debug_repr()),
1206        );
1207
1208        // Ensure sentinel exists in whale (for dependency tracking)
1209        if self.runtime.whale.get(&sentinel).is_none() {
1210            let _ =
1211                self.runtime
1212                    .whale
1213                    .register(sentinel.clone(), None, Durability::volatile(), vec![]);
1214        }
1215
1216        self.deps.borrow_mut().push(sentinel);
1217
1218        // Return all registered queries
1219        self.runtime.query_registry.get_all::<Q>()
1220    }
1221
1222    /// List all asset keys of type K that have been registered.
1223    ///
1224    /// This method establishes a dependency on the "set" of asset keys of type K.
1225    /// The calling query will be invalidated when:
1226    /// - A new asset of type K is resolved for the first time (added to set)
1227    /// - An asset of type K is removed via remove_asset
1228    ///
1229    /// The calling query will NOT be invalidated when:
1230    /// - An individual asset's value changes (use `db.asset()` for that)
1231    ///
1232    /// # Example
1233    ///
1234    /// ```ignore
1235    /// #[query]
1236    /// fn all_configs(db: &impl Db) -> Result<Vec<String>, QueryError> {
1237    ///     let keys = db.list_asset_keys::<ConfigFile>();
1238    ///     let mut contents = Vec::new();
1239    ///     for key in keys {
1240    ///         let content = db.asset(&key)?.suspend()?;
1241    ///         contents.push((*content).clone());
1242    ///     }
1243    ///     Ok(contents)
1244    /// }
1245    /// ```
1246    pub fn list_asset_keys<K: AssetKey>(&self) -> Vec<K> {
1247        // Record dependency on the sentinel (set-level dependency)
1248        let sentinel = FullCacheKey::asset_key_set_sentinel::<K>();
1249
1250        self.runtime.tracer.on_asset_dependency_registered(
1251            self.exec_ctx.span_id(),
1252            TracerQueryKey::new(self.parent_query_type, self.current_key.debug_repr()),
1253            TracerAssetKey::new("AssetKeySet", sentinel.debug_repr()),
1254        );
1255
1256        // Ensure sentinel exists in whale (for dependency tracking)
1257        if self.runtime.whale.get(&sentinel).is_none() {
1258            let _ =
1259                self.runtime
1260                    .whale
1261                    .register(sentinel.clone(), None, Durability::volatile(), vec![]);
1262        }
1263
1264        self.deps.borrow_mut().push(sentinel);
1265
1266        // Return all registered asset keys
1267        self.runtime.asset_key_registry.get_all::<K>()
1268    }
1269}
1270
1271impl<'a, T: Tracer> Db for QueryContext<'a, T> {
1272    fn query<Q: Query>(&self, query: Q) -> Result<Arc<Q::Output>, QueryError> {
1273        QueryContext::query(self, query)
1274    }
1275
1276    fn asset<K: AssetKey>(&self, key: K) -> Result<AssetLoadingState<K>, QueryError> {
1277        QueryContext::asset(self, key)
1278    }
1279
1280    fn list_queries<Q: Query>(&self) -> Vec<Q> {
1281        QueryContext::list_queries(self)
1282    }
1283
1284    fn list_asset_keys<K: AssetKey>(&self) -> Vec<K> {
1285        QueryContext::list_asset_keys(self)
1286    }
1287}
1288
1289#[cfg(test)]
1290mod tests {
1291    use super::*;
1292
1293    #[test]
1294    fn test_simple_query() {
1295        #[derive(Clone)]
1296        struct Add {
1297            a: i32,
1298            b: i32,
1299        }
1300
1301        impl Query for Add {
1302            type CacheKey = (i32, i32);
1303            type Output = i32;
1304
1305            fn cache_key(&self) -> Self::CacheKey {
1306                (self.a, self.b)
1307            }
1308
1309            fn query(self, _db: &impl Db) -> Result<Self::Output, QueryError> {
1310                Ok(self.a + self.b)
1311            }
1312
1313            fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
1314                old == new
1315            }
1316        }
1317
1318        let runtime = QueryRuntime::new();
1319
1320        let result = runtime.query(Add { a: 1, b: 2 }).unwrap();
1321        assert_eq!(*result, 3);
1322
1323        // Second query should be cached
1324        let result2 = runtime.query(Add { a: 1, b: 2 }).unwrap();
1325        assert_eq!(*result2, 3);
1326    }
1327
1328    #[test]
1329    fn test_dependent_queries() {
1330        #[derive(Clone)]
1331        struct Base {
1332            value: i32,
1333        }
1334
1335        impl Query for Base {
1336            type CacheKey = i32;
1337            type Output = i32;
1338
1339            fn cache_key(&self) -> Self::CacheKey {
1340                self.value
1341            }
1342
1343            fn query(self, _db: &impl Db) -> Result<Self::Output, QueryError> {
1344                Ok(self.value * 2)
1345            }
1346
1347            fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
1348                old == new
1349            }
1350        }
1351
1352        #[derive(Clone)]
1353        struct Derived {
1354            base_value: i32,
1355        }
1356
1357        impl Query for Derived {
1358            type CacheKey = i32;
1359            type Output = i32;
1360
1361            fn cache_key(&self) -> Self::CacheKey {
1362                self.base_value
1363            }
1364
1365            fn query(self, db: &impl Db) -> Result<Self::Output, QueryError> {
1366                let base = db.query(Base {
1367                    value: self.base_value,
1368                })?;
1369                Ok(*base + 10)
1370            }
1371
1372            fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
1373                old == new
1374            }
1375        }
1376
1377        let runtime = QueryRuntime::new();
1378
1379        let result = runtime.query(Derived { base_value: 5 }).unwrap();
1380        assert_eq!(*result, 20); // 5 * 2 + 10
1381    }
1382
1383    #[test]
1384    fn test_cycle_detection() {
1385        #[derive(Clone)]
1386        struct CycleA {
1387            id: i32,
1388        }
1389
1390        #[derive(Clone)]
1391        struct CycleB {
1392            id: i32,
1393        }
1394
1395        impl Query for CycleA {
1396            type CacheKey = i32;
1397            type Output = i32;
1398
1399            fn cache_key(&self) -> Self::CacheKey {
1400                self.id
1401            }
1402
1403            fn query(self, db: &impl Db) -> Result<Self::Output, QueryError> {
1404                let b = db.query(CycleB { id: self.id })?;
1405                Ok(*b + 1)
1406            }
1407
1408            fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
1409                old == new
1410            }
1411        }
1412
1413        impl Query for CycleB {
1414            type CacheKey = i32;
1415            type Output = i32;
1416
1417            fn cache_key(&self) -> Self::CacheKey {
1418                self.id
1419            }
1420
1421            fn query(self, db: &impl Db) -> Result<Self::Output, QueryError> {
1422                let a = db.query(CycleA { id: self.id })?;
1423                Ok(*a + 1)
1424            }
1425
1426            fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
1427                old == new
1428            }
1429        }
1430
1431        let runtime = QueryRuntime::new();
1432
1433        let result = runtime.query(CycleA { id: 1 });
1434        assert!(matches!(result, Err(QueryError::Cycle { .. })));
1435    }
1436
1437    #[test]
1438    fn test_fallible_query() {
1439        #[derive(Clone)]
1440        struct ParseInt {
1441            input: String,
1442        }
1443
1444        impl Query for ParseInt {
1445            type CacheKey = String;
1446            type Output = Result<i32, std::num::ParseIntError>;
1447
1448            fn cache_key(&self) -> Self::CacheKey {
1449                self.input.clone()
1450            }
1451
1452            fn query(self, _db: &impl Db) -> Result<Self::Output, QueryError> {
1453                Ok(self.input.parse())
1454            }
1455
1456            fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
1457                old == new
1458            }
1459        }
1460
1461        let runtime = QueryRuntime::new();
1462
1463        // Valid parse
1464        let result = runtime
1465            .query(ParseInt {
1466                input: "42".to_string(),
1467            })
1468            .unwrap();
1469        assert_eq!(*result, Ok(42));
1470
1471        // Invalid parse - system succeeds, user error in output
1472        let result = runtime
1473            .query(ParseInt {
1474                input: "not_a_number".to_string(),
1475            })
1476            .unwrap();
1477        assert!(result.is_err());
1478    }
1479
1480    // Macro tests
1481    mod macro_tests {
1482        use super::*;
1483        use crate::query;
1484
1485        #[query]
1486        fn add(db: &impl Db, a: i32, b: i32) -> Result<i32, QueryError> {
1487            let _ = db; // silence unused warning
1488            Ok(a + b)
1489        }
1490
1491        #[test]
1492        fn test_macro_basic() {
1493            let runtime = QueryRuntime::new();
1494            let result = runtime.query(Add::new(1, 2)).unwrap();
1495            assert_eq!(*result, 3);
1496        }
1497
1498        #[query(durability = 2)]
1499        fn with_durability(db: &impl Db, x: i32) -> Result<i32, QueryError> {
1500            let _ = db;
1501            Ok(x * 2)
1502        }
1503
1504        #[test]
1505        fn test_macro_durability() {
1506            let runtime = QueryRuntime::new();
1507            let result = runtime.query(WithDurability::new(5)).unwrap();
1508            assert_eq!(*result, 10);
1509        }
1510
1511        #[query(keys(id))]
1512        fn with_key_selection(
1513            db: &impl Db,
1514            id: u32,
1515            include_extra: bool,
1516        ) -> Result<String, QueryError> {
1517            let _ = db;
1518            Ok(format!("id={}, extra={}", id, include_extra))
1519        }
1520
1521        #[test]
1522        fn test_macro_key_selection() {
1523            let runtime = QueryRuntime::new();
1524
1525            // Same id, different include_extra - should return cached
1526            let r1 = runtime.query(WithKeySelection::new(1, true)).unwrap();
1527            let r2 = runtime.query(WithKeySelection::new(1, false)).unwrap();
1528
1529            // Both should have same value because only `id` is the key
1530            assert_eq!(*r1, "id=1, extra=true");
1531            assert_eq!(*r2, "id=1, extra=true"); // Cached!
1532        }
1533
1534        #[query]
1535        fn dependent(db: &impl Db, a: i32, b: i32) -> Result<i32, QueryError> {
1536            let sum = db.query(Add::new(a, b))?;
1537            Ok(*sum * 2)
1538        }
1539
1540        #[test]
1541        fn test_macro_dependencies() {
1542            let runtime = QueryRuntime::new();
1543            let result = runtime.query(Dependent::new(3, 4)).unwrap();
1544            assert_eq!(*result, 14); // (3 + 4) * 2
1545        }
1546
1547        #[query(output_eq)]
1548        fn with_output_eq(db: &impl Db, x: i32) -> Result<i32, QueryError> {
1549            let _ = db;
1550            Ok(x * 2)
1551        }
1552
1553        #[test]
1554        fn test_macro_output_eq() {
1555            let runtime = QueryRuntime::new();
1556            let result = runtime.query(WithOutputEq::new(5)).unwrap();
1557            assert_eq!(*result, 10);
1558        }
1559
1560        #[query(name = "CustomName")]
1561        fn original_name(db: &impl Db, x: i32) -> Result<i32, QueryError> {
1562            let _ = db;
1563            Ok(x)
1564        }
1565
1566        #[test]
1567        fn test_macro_custom_name() {
1568            let runtime = QueryRuntime::new();
1569            let result = runtime.query(CustomName::new(42)).unwrap();
1570            assert_eq!(*result, 42);
1571        }
1572
1573        // Test that attribute macros like #[tracing::instrument] are preserved
1574        // We use #[allow(unused_variables)] and #[inline] as test attributes since
1575        // they don't require external dependencies.
1576        #[allow(unused_variables)]
1577        #[inline]
1578        #[query]
1579        fn with_attributes(db: &impl Db, x: i32) -> Result<i32, QueryError> {
1580            // This would warn without #[allow(unused_variables)] on the generated method
1581            let unused_var = 42;
1582            Ok(x * 2)
1583        }
1584
1585        #[test]
1586        fn test_macro_preserves_attributes() {
1587            let runtime = QueryRuntime::new();
1588            // If attributes weren't preserved, this might warn about unused_var
1589            let result = runtime.query(WithAttributes::new(5)).unwrap();
1590            assert_eq!(*result, 10);
1591        }
1592    }
1593
1594    // Tests for poll() and changed_at()
1595    mod poll_tests {
1596        use super::*;
1597
1598        #[derive(Clone)]
1599        struct Counter {
1600            id: i32,
1601        }
1602
1603        impl Query for Counter {
1604            type CacheKey = i32;
1605            type Output = i32;
1606
1607            fn cache_key(&self) -> Self::CacheKey {
1608                self.id
1609            }
1610
1611            fn query(self, _db: &impl Db) -> Result<Self::Output, QueryError> {
1612                Ok(self.id * 10)
1613            }
1614
1615            fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
1616                old == new
1617            }
1618        }
1619
1620        #[test]
1621        fn test_poll_returns_value_and_revision() {
1622            let runtime = QueryRuntime::new();
1623
1624            let result = runtime.poll(Counter { id: 1 }).unwrap();
1625
1626            // Value should be correct - access through Result and Arc
1627            assert_eq!(**result.value.as_ref().unwrap(), 10);
1628
1629            // Revision should be non-zero after first execution
1630            assert!(result.revision > 0);
1631        }
1632
1633        #[test]
1634        fn test_poll_revision_stable_on_cache_hit() {
1635            let runtime = QueryRuntime::new();
1636
1637            // First poll
1638            let result1 = runtime.poll(Counter { id: 1 }).unwrap();
1639            let rev1 = result1.revision;
1640
1641            // Second poll (cache hit)
1642            let result2 = runtime.poll(Counter { id: 1 }).unwrap();
1643            let rev2 = result2.revision;
1644
1645            // Revision should be the same (no change)
1646            assert_eq!(rev1, rev2);
1647        }
1648
1649        #[test]
1650        fn test_poll_revision_changes_on_invalidate() {
1651            let runtime = QueryRuntime::new();
1652
1653            // First poll
1654            let result1 = runtime.poll(Counter { id: 1 }).unwrap();
1655            let rev1 = result1.revision;
1656
1657            // Invalidate and poll again
1658            runtime.invalidate::<Counter>(&1);
1659            let result2 = runtime.poll(Counter { id: 1 }).unwrap();
1660            let rev2 = result2.revision;
1661
1662            // Revision should increase (value was recomputed)
1663            // Note: Since output_eq returns true (same value), this might not change
1664            // depending on early cutoff behavior. Let's verify the value is still correct.
1665            assert_eq!(**result2.value.as_ref().unwrap(), 10);
1666
1667            // With early cutoff, revision might stay the same if value didn't change
1668            // This is expected behavior
1669            assert!(rev2 >= rev1);
1670        }
1671
1672        #[test]
1673        fn test_changed_at_returns_none_for_unexecuted_query() {
1674            let runtime = QueryRuntime::new();
1675
1676            // Query has never been executed
1677            let rev = runtime.changed_at::<Counter>(&1);
1678            assert!(rev.is_none());
1679        }
1680
1681        #[test]
1682        fn test_changed_at_returns_revision_after_execution() {
1683            let runtime = QueryRuntime::new();
1684
1685            // Execute the query
1686            let _ = runtime.query(Counter { id: 1 }).unwrap();
1687
1688            // Now changed_at should return Some
1689            let rev = runtime.changed_at::<Counter>(&1);
1690            assert!(rev.is_some());
1691            assert!(rev.unwrap() > 0);
1692        }
1693
1694        #[test]
1695        fn test_changed_at_matches_poll_revision() {
1696            let runtime = QueryRuntime::new();
1697
1698            // Poll the query
1699            let result = runtime.poll(Counter { id: 1 }).unwrap();
1700
1701            // changed_at should match the revision from poll
1702            let rev = runtime.changed_at::<Counter>(&1);
1703            assert_eq!(rev, Some(result.revision));
1704        }
1705
1706        #[test]
1707        fn test_poll_value_access() {
1708            let runtime = QueryRuntime::new();
1709
1710            let result = runtime.poll(Counter { id: 5 }).unwrap();
1711
1712            // Access through Result and Arc
1713            let value: &i32 = result.value.as_ref().unwrap();
1714            assert_eq!(*value, 50);
1715
1716            // Access Arc directly via field after unwrapping Result
1717            let arc: &Arc<i32> = result.value.as_ref().unwrap();
1718            assert_eq!(**arc, 50);
1719        }
1720
1721        #[test]
1722        fn test_subscription_pattern() {
1723            let runtime = QueryRuntime::new();
1724
1725            // Simulate subscription pattern
1726            let mut last_revision: RevisionCounter = 0;
1727            let mut notifications = 0;
1728
1729            // First poll - should notify (new value)
1730            let result = runtime.poll(Counter { id: 1 }).unwrap();
1731            if result.revision > last_revision {
1732                notifications += 1;
1733                last_revision = result.revision;
1734            }
1735
1736            // Second poll - should NOT notify (no change)
1737            let result = runtime.poll(Counter { id: 1 }).unwrap();
1738            if result.revision > last_revision {
1739                notifications += 1;
1740                last_revision = result.revision;
1741            }
1742
1743            // Third poll - should NOT notify (no change)
1744            let result = runtime.poll(Counter { id: 1 }).unwrap();
1745            if result.revision > last_revision {
1746                notifications += 1;
1747                #[allow(unused_assignments)]
1748                {
1749                    last_revision = result.revision;
1750                }
1751            }
1752
1753            // Only the first poll should have triggered a notification
1754            assert_eq!(notifications, 1);
1755        }
1756    }
1757}