query_flow/runtime.rs
1//! Query runtime and context.
2
3use std::any::{Any, TypeId};
4use std::cell::RefCell;
5use std::ops::Deref;
6use std::rc::Rc;
7use std::sync::Arc;
8
9use whale::{Durability, GetOrInsertResult, RevisionCounter, Runtime as WhaleRuntime};
10
11use crate::asset::{AssetKey, AssetLocator, DurabilityLevel, FullAssetKey, PendingAsset};
12use crate::db::Db;
13use crate::key::FullCacheKey;
14use crate::loading::AssetLoadingState;
15use crate::query::Query;
16use crate::storage::{
17 AssetKeyRegistry, CachedEntry, CachedValue, ErasedLocateResult, LocatorStorage, PendingStorage,
18 QueryRegistry, VerifierStorage,
19};
20use crate::tracer::{
21 ExecutionResult, InvalidationReason, NoopTracer, SpanId, Tracer, TracerAssetKey,
22 TracerAssetState, TracerQueryKey,
23};
24use crate::QueryError;
25
26/// Function type for comparing user errors during early cutoff.
27///
28/// Used by `QueryRuntimeBuilder::error_comparator` to customize how
29/// `QueryError::UserError` values are compared for caching purposes.
30pub type ErrorComparator = fn(&anyhow::Error, &anyhow::Error) -> bool;
31
32/// Number of durability levels (matches whale's default).
33const DURABILITY_LEVELS: usize = 4;
34
35// Thread-local query execution stack for cycle detection.
36thread_local! {
37 static QUERY_STACK: RefCell<Vec<FullCacheKey>> = const { RefCell::new(Vec::new()) };
38
39 /// Current consistency tracker for leaf asset checks.
40 /// Set during query execution, used by all nested locator calls.
41 /// Uses Rc since thread-local is single-threaded.
42 static CONSISTENCY_TRACKER: RefCell<Option<Rc<ConsistencyTracker>>> = const { RefCell::new(None) };
43}
44
45/// Check a leaf asset against the current consistency tracker (if any).
46/// Returns Ok if no tracker is set or if the check passes.
47fn check_leaf_asset_consistency(dep_changed_at: RevisionCounter) -> Result<(), QueryError> {
48 CONSISTENCY_TRACKER.with(|tracker| {
49 if let Some(ref t) = *tracker.borrow() {
50 t.check_leaf_asset(dep_changed_at)
51 } else {
52 Ok(())
53 }
54 })
55}
56
57/// RAII guard that sets the consistency tracker for the current thread.
58struct ConsistencyTrackerGuard {
59 previous: Option<Rc<ConsistencyTracker>>,
60}
61
62impl ConsistencyTrackerGuard {
63 fn new(tracker: Rc<ConsistencyTracker>) -> Self {
64 let previous = CONSISTENCY_TRACKER.with(|t| t.borrow_mut().replace(tracker));
65 Self { previous }
66 }
67}
68
69impl Drop for ConsistencyTrackerGuard {
70 fn drop(&mut self) {
71 CONSISTENCY_TRACKER.with(|t| {
72 *t.borrow_mut() = self.previous.take();
73 });
74 }
75}
76
77/// Check for cycles in the query stack and return error if detected.
78fn check_cycle(key: &FullCacheKey) -> Result<(), QueryError> {
79 let cycle_detected = QUERY_STACK.with(|stack| stack.borrow().iter().any(|k| k == key));
80 if cycle_detected {
81 let path = QUERY_STACK.with(|stack| {
82 let stack = stack.borrow();
83 let mut path: Vec<String> = stack.iter().map(|k| k.debug_repr().to_string()).collect();
84 path.push(key.debug_repr().to_string());
85 path
86 });
87 return Err(QueryError::Cycle { path });
88 }
89 Ok(())
90}
91
92/// RAII guard for pushing/popping from query stack.
93struct StackGuard;
94
95impl StackGuard {
96 fn push(key: FullCacheKey) -> Self {
97 QUERY_STACK.with(|stack| stack.borrow_mut().push(key));
98 StackGuard
99 }
100}
101
102impl Drop for StackGuard {
103 fn drop(&mut self) {
104 QUERY_STACK.with(|stack| {
105 stack.borrow_mut().pop();
106 });
107 }
108}
109
110/// Execution context passed through query execution.
111///
112/// Contains a span ID for tracing correlation.
113#[derive(Clone, Copy)]
114pub struct ExecutionContext {
115 span_id: SpanId,
116}
117
118impl ExecutionContext {
119 /// Create a new execution context with the given span ID.
120 #[inline]
121 pub fn new(span_id: SpanId) -> Self {
122 Self { span_id }
123 }
124
125 /// Get the span ID for this execution context.
126 #[inline]
127 pub fn span_id(&self) -> SpanId {
128 self.span_id
129 }
130}
131
132/// Result of polling a query, containing the value and its revision.
133///
134/// This is returned by [`QueryRuntime::poll`] and provides both the query result
135/// and its change revision, enabling efficient change detection for subscription
136/// patterns.
137///
138/// # Example
139///
140/// ```ignore
141/// let result = runtime.poll(MyQuery::new())?;
142///
143/// // Access the value via Deref
144/// println!("{:?}", *result);
145///
146/// // Check if changed since last poll
147/// if result.revision > last_known_revision {
148/// notify_subscribers(&result.value);
149/// last_known_revision = result.revision;
150/// }
151/// ```
152#[derive(Debug, Clone)]
153pub struct Polled<T> {
154 /// The query result value.
155 pub value: T,
156 /// The revision at which this value was last changed.
157 ///
158 /// Compare this with a previously stored revision to detect changes.
159 pub revision: RevisionCounter,
160}
161
162impl<T: Deref> Deref for Polled<T> {
163 type Target = T::Target;
164
165 fn deref(&self) -> &Self::Target {
166 &self.value
167 }
168}
169
170/// The query runtime manages query execution, caching, and dependency tracking.
171///
172/// This is cheap to clone - all data is behind `Arc`.
173///
174/// # Type Parameter
175///
176/// - `T: Tracer` - The tracer type for observability. Use `NoopTracer` (default)
177/// for zero-cost when tracing is not needed.
178///
179/// # Example
180///
181/// ```ignore
182/// // Without tracing (default)
183/// let runtime = QueryRuntime::new();
184///
185/// // With tracing
186/// let tracer = MyTracer::new();
187/// let runtime = QueryRuntime::with_tracer(tracer);
188///
189/// // Sync query execution
190/// let result = runtime.query(MyQuery { ... })?;
191/// ```
192pub struct QueryRuntime<T: Tracer = NoopTracer> {
193 /// Whale runtime for dependency tracking and cache storage.
194 /// Query outputs and asset values are stored in Node.data as Option<CachedEntry>.
195 whale: WhaleRuntime<FullCacheKey, Option<CachedEntry>, DURABILITY_LEVELS>,
196 /// Registered asset locators
197 locators: Arc<LocatorStorage<T>>,
198 /// Pending asset requests
199 pending: Arc<PendingStorage>,
200 /// Registry for tracking query instances (for list_queries)
201 query_registry: Arc<QueryRegistry>,
202 /// Registry for tracking asset keys (for list_asset_keys)
203 asset_key_registry: Arc<AssetKeyRegistry>,
204 /// Verifiers for re-executing queries (for verify-then-decide pattern)
205 verifiers: Arc<VerifierStorage>,
206 /// Comparator for user errors during early cutoff
207 error_comparator: ErrorComparator,
208 /// Tracer for observability
209 tracer: Arc<T>,
210}
211
212impl Default for QueryRuntime<NoopTracer> {
213 fn default() -> Self {
214 Self::new()
215 }
216}
217
218impl<T: Tracer> Clone for QueryRuntime<T> {
219 fn clone(&self) -> Self {
220 Self {
221 whale: self.whale.clone(),
222 locators: self.locators.clone(),
223 pending: self.pending.clone(),
224 query_registry: self.query_registry.clone(),
225 asset_key_registry: self.asset_key_registry.clone(),
226 verifiers: self.verifiers.clone(),
227 error_comparator: self.error_comparator,
228 tracer: self.tracer.clone(),
229 }
230 }
231}
232
233/// Default error comparator that treats all errors as different.
234///
235/// This is conservative - it always triggers recomputation when an error occurs.
236fn default_error_comparator(_a: &anyhow::Error, _b: &anyhow::Error) -> bool {
237 false
238}
239
240impl<T: Tracer> QueryRuntime<T> {
241 /// Get cached output along with its revision (single atomic access).
242 fn get_cached_with_revision<Q: Query>(
243 &self,
244 key: &FullCacheKey,
245 ) -> Option<(CachedValue<Arc<Q::Output>>, RevisionCounter)> {
246 let node = self.whale.get(key)?;
247 let revision = node.changed_at;
248 let entry = node.data.as_ref()?;
249 let cached = entry.to_cached_value::<Q::Output>()?;
250 Some((cached, revision))
251 }
252
253 /// Get a reference to the tracer.
254 #[inline]
255 pub fn tracer(&self) -> &T {
256 &self.tracer
257 }
258}
259
260impl QueryRuntime<NoopTracer> {
261 /// Create a new query runtime with default settings.
262 pub fn new() -> Self {
263 Self::with_tracer(NoopTracer)
264 }
265
266 /// Create a builder for customizing the runtime.
267 ///
268 /// # Example
269 ///
270 /// ```ignore
271 /// let runtime = QueryRuntime::builder()
272 /// .error_comparator(|a, b| {
273 /// // Custom error comparison logic
274 /// match (a.downcast_ref::<MyError>(), b.downcast_ref::<MyError>()) {
275 /// (Some(a), Some(b)) => a == b,
276 /// _ => false,
277 /// }
278 /// })
279 /// .build();
280 /// ```
281 pub fn builder() -> QueryRuntimeBuilder<NoopTracer> {
282 QueryRuntimeBuilder::new()
283 }
284}
285
286impl<T: Tracer> QueryRuntime<T> {
287 /// Create a new query runtime with the specified tracer.
288 pub fn with_tracer(tracer: T) -> Self {
289 QueryRuntimeBuilder::new().tracer(tracer).build()
290 }
291
292 /// Execute a query synchronously.
293 ///
294 /// Returns the cached result if valid, otherwise executes the query.
295 ///
296 /// # Errors
297 ///
298 /// - `QueryError::Suspend` - Query is waiting for async loading
299 /// - `QueryError::Cycle` - Dependency cycle detected
300 pub fn query<Q: Query>(&self, query: Q) -> Result<Arc<Q::Output>, QueryError> {
301 self.query_internal(query)
302 .and_then(|(inner_result, _)| inner_result.map_err(QueryError::UserError))
303 }
304
305 /// Internal implementation shared by query() and poll().
306 ///
307 /// Returns (result, revision) tuple where result is either Ok(output) or Err(user_error).
308 /// System errors (Suspend, Cycle, etc.) are returned as the outer Err.
309 #[allow(clippy::type_complexity)]
310 fn query_internal<Q: Query>(
311 &self,
312 query: Q,
313 ) -> Result<(Result<Arc<Q::Output>, Arc<anyhow::Error>>, RevisionCounter), QueryError> {
314 let key = query.cache_key();
315 let full_key = FullCacheKey::new::<Q, _>(&key);
316
317 // Emit query key event for GC tracking (before other events)
318 self.tracer.on_query_key(&full_key);
319
320 // Create execution context and emit start event
321 let span_id = self.tracer.new_span_id();
322 let exec_ctx = ExecutionContext::new(span_id);
323 let query_key = TracerQueryKey::new(std::any::type_name::<Q>(), full_key.debug_repr());
324
325 self.tracer.on_query_start(span_id, query_key.clone());
326
327 // Check for cycles using thread-local stack
328 let cycle_detected = QUERY_STACK.with(|stack| {
329 let stack = stack.borrow();
330 stack.iter().any(|k| k == &full_key)
331 });
332
333 if cycle_detected {
334 let path = QUERY_STACK.with(|stack| {
335 let stack = stack.borrow();
336 let mut path: Vec<String> =
337 stack.iter().map(|k| k.debug_repr().to_string()).collect();
338 path.push(full_key.debug_repr().to_string());
339 path
340 });
341
342 self.tracer.on_cycle_detected(
343 path.iter()
344 .map(|s| TracerQueryKey::new("", s.clone()))
345 .collect(),
346 );
347 self.tracer
348 .on_query_end(span_id, query_key.clone(), ExecutionResult::CycleDetected);
349
350 return Err(QueryError::Cycle { path });
351 }
352
353 // Check if cached and valid (with verify-then-decide pattern)
354 let current_rev = self.whale.current_revision();
355
356 // Fast path: already verified at current revision
357 if self.whale.is_verified_at(&full_key, ¤t_rev) {
358 // Single atomic access to get both cached value and revision
359 if let Some((cached, revision)) = self.get_cached_with_revision::<Q>(&full_key) {
360 self.tracer.on_cache_check(span_id, query_key.clone(), true);
361 self.tracer
362 .on_query_end(span_id, query_key.clone(), ExecutionResult::CacheHit);
363
364 return match cached {
365 CachedValue::Ok(output) => Ok((Ok(output), revision)),
366 CachedValue::UserError(err) => Ok((Err(err), revision)),
367 };
368 }
369 }
370
371 // Check shallow validity (deps' changed_at ok) and try verify-then-decide
372 if self.whale.is_valid(&full_key) {
373 // Single atomic access to get both cached value and revision
374 if let Some((cached, revision)) = self.get_cached_with_revision::<Q>(&full_key) {
375 // Shallow valid but not verified - verify deps first
376 let mut deps_verified = true;
377 if let Some(deps) = self.whale.get_dependency_ids(&full_key) {
378 for dep in deps {
379 if let Some(verifier) = self.verifiers.get(&dep) {
380 // Re-run query/asset to verify it (triggers recursive verification)
381 // For assets, this re-accesses the asset which may re-run the locator
382 if verifier.verify(self as &dyn std::any::Any).is_err() {
383 deps_verified = false;
384 break;
385 }
386 }
387 // Note: deps without verifiers are assumed valid (they're verified
388 // by the final is_valid check if their changed_at increased)
389 }
390 }
391
392 // Re-check validity after deps are verified
393 if deps_verified && self.whale.is_valid(&full_key) {
394 // Deps didn't change their changed_at, mark as verified and use cached
395 self.whale.mark_verified(&full_key, ¤t_rev);
396
397 self.tracer.on_cache_check(span_id, query_key.clone(), true);
398 self.tracer
399 .on_query_end(span_id, query_key.clone(), ExecutionResult::CacheHit);
400
401 return match cached {
402 CachedValue::Ok(output) => Ok((Ok(output), revision)),
403 CachedValue::UserError(err) => Ok((Err(err), revision)),
404 };
405 }
406 // A dep's changed_at increased, fall through to execute
407 }
408 }
409
410 self.tracer
411 .on_cache_check(span_id, query_key.clone(), false);
412
413 // Execute the query with cycle tracking
414 let _guard = StackGuard::push(full_key.clone());
415 let result = self.execute_query::<Q>(&query, &full_key, exec_ctx);
416 drop(_guard);
417
418 // Emit end event
419 let exec_result = match &result {
420 Ok((_, true, _)) => ExecutionResult::Changed,
421 Ok((_, false, _)) => ExecutionResult::Unchanged,
422 Err(QueryError::Suspend { .. }) => ExecutionResult::Suspended,
423 Err(QueryError::Cycle { .. }) => ExecutionResult::CycleDetected,
424 Err(e) => ExecutionResult::Error {
425 message: format!("{:?}", e),
426 },
427 };
428 self.tracer
429 .on_query_end(span_id, query_key.clone(), exec_result);
430
431 result.map(|(inner_result, _, revision)| (inner_result, revision))
432 }
433
434 /// Execute a query, caching the result if appropriate.
435 ///
436 /// Returns (result, output_changed, revision) tuple.
437 /// - `result`: Ok(output) for success, Err(user_error) for user errors
438 /// - System errors (Suspend, Cycle, etc.) are returned as outer Err
439 #[allow(clippy::type_complexity)]
440 fn execute_query<Q: Query>(
441 &self,
442 query: &Q,
443 full_key: &FullCacheKey,
444 exec_ctx: ExecutionContext,
445 ) -> Result<
446 (
447 Result<Arc<Q::Output>, Arc<anyhow::Error>>,
448 bool,
449 RevisionCounter,
450 ),
451 QueryError,
452 > {
453 // Capture current global revision at query start for consistency checking
454 let start_revision = self.whale.current_revision().get(Durability::volatile());
455
456 // Create consistency tracker for this query execution
457 let tracker = Rc::new(ConsistencyTracker::new(start_revision));
458
459 // Set thread-local tracker for nested locator calls
460 let _tracker_guard = ConsistencyTrackerGuard::new(tracker);
461
462 // Create context for this query execution
463 let ctx = QueryContext {
464 runtime: self,
465 current_key: full_key.clone(),
466 parent_query_type: std::any::type_name::<Q>(),
467 exec_ctx,
468 deps: RefCell::new(Vec::new()),
469 };
470
471 // Execute the query (clone because query() takes ownership)
472 let result = query.clone().query(&ctx);
473
474 // Get collected dependencies
475 let deps: Vec<FullCacheKey> = ctx.deps.borrow().clone();
476
477 // Query durability defaults to stable - Whale will automatically reduce
478 // the effective durability to min(requested, min(dep_durabilities)).
479 // A pure query with no dependencies remains stable.
480 // A query depending on volatile assets becomes volatile.
481 let durability = Durability::stable();
482
483 match result {
484 Ok(output) => {
485 // Check if output changed (for early cutoff)
486 // existing_revision is Some only when output is unchanged (can reuse revision)
487 let existing_revision = if let Some((CachedValue::Ok(old), rev)) =
488 self.get_cached_with_revision::<Q>(full_key)
489 {
490 if Q::output_eq(&*old, &*output) {
491 Some(rev) // Same output - reuse revision
492 } else {
493 None // Different output
494 }
495 } else {
496 None // No previous Ok value
497 };
498 let output_changed = existing_revision.is_none();
499
500 // Emit early cutoff check event
501 self.tracer.on_early_cutoff_check(
502 exec_ctx.span_id(),
503 TracerQueryKey::new(std::any::type_name::<Q>(), full_key.debug_repr()),
504 output_changed,
505 );
506
507 // Update whale with cached entry (atomic update of value + dependency state)
508 let entry = CachedEntry::Ok(output.clone() as Arc<dyn std::any::Any + Send + Sync>);
509 let revision = if let Some(existing_rev) = existing_revision {
510 // confirm_unchanged doesn't change changed_at, use existing
511 let _ = self.whale.confirm_unchanged(full_key, deps);
512 existing_rev
513 } else {
514 // Use new_rev from register result
515 match self
516 .whale
517 .register(full_key.clone(), Some(entry), durability, deps)
518 {
519 Ok(result) => result.new_rev,
520 Err(missing) => {
521 return Err(QueryError::DependenciesRemoved {
522 missing_keys: missing,
523 })
524 }
525 }
526 };
527
528 // Register query in registry for list_queries
529 let is_new_query = self.query_registry.register(query);
530 if is_new_query {
531 let sentinel = FullCacheKey::query_set_sentinel::<Q>();
532 let _ = self
533 .whale
534 .register(sentinel, None, Durability::stable(), vec![]);
535 }
536
537 // Store verifier for this query (for verify-then-decide pattern)
538 self.verifiers
539 .insert::<Q, T>(full_key.clone(), query.clone());
540
541 Ok((Ok(output), output_changed, revision))
542 }
543 Err(QueryError::UserError(err)) => {
544 // Check if error changed (for early cutoff)
545 // existing_revision is Some only when error is unchanged (can reuse revision)
546 let existing_revision = if let Some((CachedValue::UserError(old_err), rev)) =
547 self.get_cached_with_revision::<Q>(full_key)
548 {
549 if (self.error_comparator)(old_err.as_ref(), err.as_ref()) {
550 Some(rev) // Same error - reuse revision
551 } else {
552 None // Different error
553 }
554 } else {
555 None // No previous UserError
556 };
557 let output_changed = existing_revision.is_none();
558
559 // Emit early cutoff check event
560 self.tracer.on_early_cutoff_check(
561 exec_ctx.span_id(),
562 TracerQueryKey::new(std::any::type_name::<Q>(), full_key.debug_repr()),
563 output_changed,
564 );
565
566 // Update whale with cached error (atomic update of value + dependency state)
567 let entry = CachedEntry::UserError(err.clone());
568 let revision = if let Some(existing_rev) = existing_revision {
569 // confirm_unchanged doesn't change changed_at, use existing
570 let _ = self.whale.confirm_unchanged(full_key, deps);
571 existing_rev
572 } else {
573 // Use new_rev from register result
574 match self
575 .whale
576 .register(full_key.clone(), Some(entry), durability, deps)
577 {
578 Ok(result) => result.new_rev,
579 Err(missing) => {
580 return Err(QueryError::DependenciesRemoved {
581 missing_keys: missing,
582 })
583 }
584 }
585 };
586
587 // Register query in registry for list_queries
588 let is_new_query = self.query_registry.register(query);
589 if is_new_query {
590 let sentinel = FullCacheKey::query_set_sentinel::<Q>();
591 let _ = self
592 .whale
593 .register(sentinel, None, Durability::stable(), vec![]);
594 }
595
596 // Store verifier for this query (for verify-then-decide pattern)
597 self.verifiers
598 .insert::<Q, T>(full_key.clone(), query.clone());
599
600 Ok((Err(err), output_changed, revision))
601 }
602 Err(e) => {
603 // System errors (Suspend, Cycle, Cancelled, MissingDependency) are not cached
604 Err(e)
605 }
606 }
607 }
608
609 /// Invalidate a query, forcing recomputation on next access.
610 ///
611 /// This also invalidates any queries that depend on this one.
612 pub fn invalidate<Q: Query>(&self, key: &Q::CacheKey) {
613 let full_key = FullCacheKey::new::<Q, _>(key);
614
615 self.tracer.on_query_invalidated(
616 TracerQueryKey::new(std::any::type_name::<Q>(), full_key.debug_repr()),
617 InvalidationReason::ManualInvalidation,
618 );
619
620 // Update whale to invalidate dependents (register with None to clear cached value)
621 // Use stable durability to increment all revision counters, ensuring queries
622 // at any durability level will see this as a change.
623 let _ = self
624 .whale
625 .register(full_key, None, Durability::stable(), vec![]);
626 }
627
628 /// Remove a query from the cache entirely, freeing memory.
629 ///
630 /// Use this for GC when a query is no longer needed.
631 /// Unlike `invalidate`, this removes all traces of the query from storage.
632 /// The query will be recomputed from scratch on next access.
633 ///
634 /// This also invalidates any queries that depend on this one.
635 pub fn remove_query<Q: Query>(&self, key: &Q::CacheKey) {
636 let full_key = FullCacheKey::new::<Q, _>(key);
637
638 self.tracer.on_query_invalidated(
639 TracerQueryKey::new(std::any::type_name::<Q>(), full_key.debug_repr()),
640 InvalidationReason::ManualInvalidation,
641 );
642
643 // Remove verifier if exists
644 self.verifiers.remove(&full_key);
645
646 // Remove from whale storage (this also handles dependent invalidation)
647 self.whale.remove(&full_key);
648
649 // Remove from registry and update sentinel for list_queries
650 if self.query_registry.remove::<Q>(key) {
651 let sentinel = FullCacheKey::query_set_sentinel::<Q>();
652 let _ = self
653 .whale
654 .register(sentinel, None, Durability::stable(), vec![]);
655 }
656 }
657
658 /// Clear all cached values by removing all nodes from whale.
659 ///
660 /// Note: This is a relatively expensive operation as it iterates through all keys.
661 pub fn clear_cache(&self) {
662 let keys = self.whale.keys();
663 for key in keys {
664 self.whale.remove(&key);
665 }
666 }
667
668 /// Poll a query, returning both the result and its change revision.
669 ///
670 /// This is useful for implementing subscription patterns where you need to
671 /// detect changes efficiently. Compare the returned `revision` with a
672 /// previously stored value to determine if the query result has changed.
673 ///
674 /// The returned `Polled` contains a `Result<Arc<Q::Output>, Arc<anyhow::Error>>`
675 /// as its value, allowing you to track revision changes for both success and
676 /// user error cases.
677 ///
678 /// # Example
679 ///
680 /// ```ignore
681 /// struct Subscription<Q: Query> {
682 /// query: Q,
683 /// last_revision: RevisionCounter,
684 /// tx: Sender<Result<Arc<Q::Output>, Arc<anyhow::Error>>>,
685 /// }
686 ///
687 /// // Polling loop
688 /// for sub in &mut subscriptions {
689 /// let result = runtime.poll(sub.query.clone())?;
690 /// if result.revision > sub.last_revision {
691 /// sub.tx.send(result.value.clone())?;
692 /// sub.last_revision = result.revision;
693 /// }
694 /// }
695 /// ```
696 ///
697 /// # Errors
698 ///
699 /// Returns `Err` only for system errors (Suspend, Cycle, etc.).
700 /// User errors are returned as `Ok(Polled { value: Err(error), ... })`.
701 #[allow(clippy::type_complexity)]
702 pub fn poll<Q: Query>(
703 &self,
704 query: Q,
705 ) -> Result<Polled<Result<Arc<Q::Output>, Arc<anyhow::Error>>>, QueryError> {
706 let (value, revision) = self.query_internal(query)?;
707 Ok(Polled { value, revision })
708 }
709
710 /// Get the change revision of a query without executing it.
711 ///
712 /// Returns `None` if the query has never been executed.
713 ///
714 /// This is useful for checking if a query has changed since the last poll
715 /// without the cost of executing the query.
716 ///
717 /// # Example
718 ///
719 /// ```ignore
720 /// // Check if query has changed before deciding to poll
721 /// if let Some(rev) = runtime.changed_at::<MyQuery>(&key) {
722 /// if rev > last_known_revision {
723 /// let result = runtime.query(MyQuery::new(key))?;
724 /// // Process result...
725 /// }
726 /// }
727 /// ```
728 pub fn changed_at<Q: Query>(&self, key: &Q::CacheKey) -> Option<RevisionCounter> {
729 let full_key = FullCacheKey::new::<Q, _>(key);
730 self.whale.get(&full_key).map(|node| node.changed_at)
731 }
732}
733
734// ============================================================================
735// GC (Garbage Collection) API
736// ============================================================================
737
738impl<T: Tracer> QueryRuntime<T> {
739 /// Get all query keys currently in the cache.
740 ///
741 /// This is useful for implementing custom garbage collection strategies.
742 /// Use this in combination with [`Tracer::on_query_key`] to track access
743 /// times and implement LRU, TTL, or other GC algorithms externally.
744 ///
745 /// # Example
746 ///
747 /// ```ignore
748 /// // Collect all keys that haven't been accessed recently
749 /// let stale_keys: Vec<_> = runtime.query_keys()
750 /// .filter(|key| tracker.is_stale(key))
751 /// .collect();
752 ///
753 /// // Remove stale queries that have no dependents
754 /// for key in stale_keys {
755 /// runtime.remove_if_unused(&key);
756 /// }
757 /// ```
758 pub fn query_keys(&self) -> Vec<FullCacheKey> {
759 self.whale.keys()
760 }
761
762 /// Remove a query if it has no dependents.
763 ///
764 /// Returns `true` if the query was removed, `false` if it has dependents
765 /// or doesn't exist. This is the safe way to remove queries during GC,
766 /// as it won't break queries that depend on this one.
767 ///
768 /// # Example
769 ///
770 /// ```ignore
771 /// let key = FullCacheKey::new::<MyQuery, _>(&cache_key);
772 /// if runtime.remove_query_if_unused::<MyQuery>(&cache_key) {
773 /// println!("Query removed");
774 /// } else {
775 /// println!("Query has dependents, not removed");
776 /// }
777 /// ```
778 pub fn remove_query_if_unused<Q: Query>(&self, key: &Q::CacheKey) -> bool {
779 let full_key = FullCacheKey::new::<Q, _>(key);
780 self.remove_if_unused(&full_key)
781 }
782
783 /// Remove a query by its [`FullCacheKey`].
784 ///
785 /// This is the type-erased version of [`remove_query`](Self::remove_query).
786 /// Use this when you have a `FullCacheKey` from [`query_keys`](Self::query_keys)
787 /// or [`Tracer::on_query_key`].
788 ///
789 /// Returns `true` if the query was removed, `false` if it doesn't exist.
790 ///
791 /// # Warning
792 ///
793 /// This forcibly removes the query even if other queries depend on it.
794 /// Dependent queries will be recomputed on next access. For safe GC,
795 /// use [`remove_if_unused`](Self::remove_if_unused) instead.
796 pub fn remove(&self, key: &FullCacheKey) -> bool {
797 // Remove verifier if exists
798 self.verifiers.remove(key);
799
800 // Remove from whale storage
801 self.whale.remove(key).is_some()
802 }
803
804 /// Remove a query by its [`FullCacheKey`] if it has no dependents.
805 ///
806 /// This is the type-erased version of [`remove_query_if_unused`](Self::remove_query_if_unused).
807 /// Use this when you have a `FullCacheKey` from [`query_keys`](Self::query_keys)
808 /// or [`Tracer::on_query_key`].
809 ///
810 /// Returns `true` if the query was removed, `false` if it has dependents
811 /// or doesn't exist.
812 ///
813 /// # Example
814 ///
815 /// ```ignore
816 /// // Implement LRU GC
817 /// for key in runtime.query_keys() {
818 /// if tracker.is_expired(&key) {
819 /// runtime.remove_if_unused(&key);
820 /// }
821 /// }
822 /// ```
823 pub fn remove_if_unused(&self, key: &FullCacheKey) -> bool {
824 if self.whale.remove_if_unused(key.clone()).is_some() {
825 // Successfully removed - clean up verifier
826 self.verifiers.remove(key);
827 true
828 } else {
829 false
830 }
831 }
832}
833
834// ============================================================================
835// Builder
836// ============================================================================
837
838/// Builder for [`QueryRuntime`] with customizable settings.
839///
840/// # Example
841///
842/// ```ignore
843/// let runtime = QueryRuntime::builder()
844/// .error_comparator(|a, b| {
845/// // Treat all errors of the same type as equal
846/// a.downcast_ref::<std::io::Error>().is_some()
847/// == b.downcast_ref::<std::io::Error>().is_some()
848/// })
849/// .build();
850/// ```
851pub struct QueryRuntimeBuilder<T: Tracer = NoopTracer> {
852 error_comparator: ErrorComparator,
853 tracer: T,
854}
855
856impl Default for QueryRuntimeBuilder<NoopTracer> {
857 fn default() -> Self {
858 Self::new()
859 }
860}
861
862impl QueryRuntimeBuilder<NoopTracer> {
863 /// Create a new builder with default settings.
864 pub fn new() -> Self {
865 Self {
866 error_comparator: default_error_comparator,
867 tracer: NoopTracer,
868 }
869 }
870}
871
872impl<T: Tracer> QueryRuntimeBuilder<T> {
873 /// Set the error comparator function for early cutoff optimization.
874 ///
875 /// When a query returns `QueryError::UserError`, this function is used
876 /// to compare it with the previously cached error. If they are equal,
877 /// downstream queries can skip recomputation (early cutoff).
878 ///
879 /// The default comparator returns `false` for all errors, meaning errors
880 /// are always considered different (conservative, always recomputes).
881 ///
882 /// # Example
883 ///
884 /// ```ignore
885 /// // Treat errors as equal if they have the same display message
886 /// let runtime = QueryRuntime::builder()
887 /// .error_comparator(|a, b| a.to_string() == b.to_string())
888 /// .build();
889 /// ```
890 pub fn error_comparator(mut self, f: ErrorComparator) -> Self {
891 self.error_comparator = f;
892 self
893 }
894
895 /// Set the tracer for observability.
896 pub fn tracer<U: Tracer>(self, tracer: U) -> QueryRuntimeBuilder<U> {
897 QueryRuntimeBuilder {
898 error_comparator: self.error_comparator,
899 tracer,
900 }
901 }
902
903 /// Build the runtime with the configured settings.
904 pub fn build(self) -> QueryRuntime<T> {
905 QueryRuntime {
906 whale: WhaleRuntime::new(),
907 locators: Arc::new(LocatorStorage::new()),
908 pending: Arc::new(PendingStorage::new()),
909 query_registry: Arc::new(QueryRegistry::new()),
910 asset_key_registry: Arc::new(AssetKeyRegistry::new()),
911 verifiers: Arc::new(VerifierStorage::new()),
912 error_comparator: self.error_comparator,
913 tracer: Arc::new(self.tracer),
914 }
915 }
916}
917
918// ============================================================================
919// Asset API
920// ============================================================================
921
922impl<T: Tracer> QueryRuntime<T> {
923 /// Register an asset locator for a specific asset key type.
924 ///
925 /// Only one locator can be registered per key type. Later registrations
926 /// replace earlier ones.
927 ///
928 /// # Example
929 ///
930 /// ```ignore
931 /// let runtime = QueryRuntime::new();
932 /// runtime.register_asset_locator(FileSystemLocator::new("/assets"));
933 /// ```
934 pub fn register_asset_locator<K, L>(&self, locator: L)
935 where
936 K: AssetKey,
937 L: AssetLocator<K>,
938 {
939 self.locators.insert::<K, L>(locator);
940 }
941
942 /// Get an iterator over pending asset requests.
943 ///
944 /// Returns assets that have been requested but not yet resolved.
945 /// The user should fetch these externally and call `resolve_asset()`.
946 ///
947 /// # Example
948 ///
949 /// ```ignore
950 /// for pending in runtime.pending_assets() {
951 /// if let Some(path) = pending.key::<FilePath>() {
952 /// let content = fetch_file(path);
953 /// runtime.resolve_asset(path.clone(), content);
954 /// }
955 /// }
956 /// ```
957 pub fn pending_assets(&self) -> Vec<PendingAsset> {
958 self.pending.get_all()
959 }
960
961 /// Get pending assets filtered by key type.
962 pub fn pending_assets_of<K: AssetKey>(&self) -> Vec<K> {
963 self.pending.get_of_type::<K>()
964 }
965
966 /// Check if there are any pending assets.
967 pub fn has_pending_assets(&self) -> bool {
968 !self.pending.is_empty()
969 }
970
971 /// Resolve an asset with its loaded value.
972 ///
973 /// This marks the asset as ready and invalidates any queries that
974 /// depend on it (if the value changed), triggering recomputation on next access.
975 ///
976 /// This method is idempotent - resolving with the same value (via `asset_eq`)
977 /// will not trigger downstream recomputation.
978 ///
979 /// # Arguments
980 ///
981 /// * `key` - The asset key identifying this resource
982 /// * `value` - The loaded asset value
983 /// * `durability` - How frequently this asset is expected to change
984 ///
985 /// # Example
986 ///
987 /// ```ignore
988 /// let content = std::fs::read_to_string(&path)?;
989 /// runtime.resolve_asset(FilePath(path), content, DurabilityLevel::Volatile);
990 /// ```
991 pub fn resolve_asset<K: AssetKey>(&self, key: K, value: K::Asset, durability: DurabilityLevel) {
992 self.resolve_asset_internal(key, value, durability);
993 }
994
995 /// Resolve an asset with an error.
996 ///
997 /// This marks the asset as errored and caches the error. Queries depending
998 /// on this asset will receive `Err(QueryError::UserError(...))`.
999 ///
1000 /// Use this when async loading fails (e.g., network error, file not found,
1001 /// access denied).
1002 ///
1003 /// # Arguments
1004 ///
1005 /// * `key` - The asset key identifying this resource
1006 /// * `error` - The error to cache (will be wrapped in `Arc`)
1007 /// * `durability` - How frequently this error state is expected to change
1008 ///
1009 /// # Example
1010 ///
1011 /// ```ignore
1012 /// match fetch_file(&path) {
1013 /// Ok(content) => runtime.resolve_asset(FilePath(path), content, DurabilityLevel::Volatile),
1014 /// Err(e) => runtime.resolve_asset_error(FilePath(path), e, DurabilityLevel::Volatile),
1015 /// }
1016 /// ```
1017 pub fn resolve_asset_error<K: AssetKey>(
1018 &self,
1019 key: K,
1020 error: impl Into<anyhow::Error>,
1021 durability: DurabilityLevel,
1022 ) {
1023 let full_asset_key = FullAssetKey::new(&key);
1024 let full_cache_key = FullCacheKey::from_asset_key(&full_asset_key);
1025
1026 // Remove from pending BEFORE registering the error
1027 self.pending.remove(&full_asset_key);
1028
1029 // Prepare the error entry
1030 let error_arc = Arc::new(error.into());
1031 let entry = CachedEntry::AssetError(error_arc.clone());
1032 let durability =
1033 Durability::new(durability.as_u8() as usize).unwrap_or(Durability::volatile());
1034
1035 // Atomic compare-and-update (errors are always considered changed for now)
1036 let result = self
1037 .whale
1038 .update_with_compare(
1039 full_cache_key,
1040 Some(entry),
1041 |old_data, _new_data| {
1042 // Compare old and new errors using error_comparator
1043 match old_data.and_then(|d| d.as_ref()) {
1044 Some(CachedEntry::AssetError(old_err)) => {
1045 !(self.error_comparator)(old_err.as_ref(), error_arc.as_ref())
1046 }
1047 _ => true, // Loading, Ready, or not present -> changed
1048 }
1049 },
1050 durability,
1051 vec![],
1052 )
1053 .expect("update_with_compare with no dependencies cannot fail");
1054
1055 // Emit asset resolved event (with changed status)
1056 self.tracer.on_asset_resolved(
1057 TracerAssetKey::new(std::any::type_name::<K>(), format!("{:?}", key)),
1058 result.changed,
1059 );
1060
1061 // Register asset key in registry for list_asset_keys
1062 let is_new_asset = self.asset_key_registry.register(&key);
1063 if is_new_asset {
1064 // Update sentinel to invalidate list_asset_keys dependents
1065 let sentinel = FullCacheKey::asset_key_set_sentinel::<K>();
1066 let _ = self
1067 .whale
1068 .register(sentinel, None, Durability::stable(), vec![]);
1069 }
1070 }
1071
1072 fn resolve_asset_internal<K: AssetKey>(
1073 &self,
1074 key: K,
1075 value: K::Asset,
1076 durability_level: DurabilityLevel,
1077 ) {
1078 let full_asset_key = FullAssetKey::new(&key);
1079 let full_cache_key = FullCacheKey::from_asset_key(&full_asset_key);
1080
1081 // Remove from pending BEFORE registering the value
1082 self.pending.remove(&full_asset_key);
1083
1084 // Prepare the new entry
1085 let value_arc: Arc<K::Asset> = Arc::new(value);
1086 let entry = CachedEntry::AssetReady(value_arc.clone() as Arc<dyn Any + Send + Sync>);
1087 let durability =
1088 Durability::new(durability_level.as_u8() as usize).unwrap_or(Durability::volatile());
1089
1090 // Atomic compare-and-update
1091 let result = self
1092 .whale
1093 .update_with_compare(
1094 full_cache_key,
1095 Some(entry),
1096 |old_data, _new_data| {
1097 // Compare old and new values
1098 match old_data.and_then(|d| d.as_ref()) {
1099 Some(CachedEntry::AssetReady(old_arc)) => {
1100 match old_arc.clone().downcast::<K::Asset>() {
1101 Ok(old_value) => !K::asset_eq(&old_value, &value_arc),
1102 Err(_) => true, // Type mismatch, treat as changed
1103 }
1104 }
1105 _ => true, // Loading, NotFound, or not present -> changed
1106 }
1107 },
1108 durability,
1109 vec![],
1110 )
1111 .expect("update_with_compare with no dependencies cannot fail");
1112
1113 // Emit asset resolved event
1114 self.tracer.on_asset_resolved(
1115 TracerAssetKey::new(std::any::type_name::<K>(), format!("{:?}", key)),
1116 result.changed,
1117 );
1118
1119 // Register asset key in registry for list_asset_keys
1120 let is_new_asset = self.asset_key_registry.register(&key);
1121 if is_new_asset {
1122 // Update sentinel to invalidate list_asset_keys dependents
1123 let sentinel = FullCacheKey::asset_key_set_sentinel::<K>();
1124 let _ = self
1125 .whale
1126 .register(sentinel, None, Durability::stable(), vec![]);
1127 }
1128 }
1129
1130 /// Invalidate an asset, forcing queries to re-request it.
1131 ///
1132 /// The asset will be marked as loading and added to pending assets.
1133 /// Dependent queries will suspend until the asset is resolved again.
1134 ///
1135 /// # Example
1136 ///
1137 /// ```ignore
1138 /// // File was modified externally
1139 /// runtime.invalidate_asset(&FilePath("config.json".into()));
1140 /// // Queries depending on this asset will now suspend
1141 /// // User should fetch the new value and call resolve_asset
1142 /// ```
1143 pub fn invalidate_asset<K: AssetKey>(&self, key: &K) {
1144 let full_asset_key = FullAssetKey::new(key);
1145 let full_cache_key = FullCacheKey::from_asset_key(&full_asset_key);
1146
1147 // Emit asset invalidated event
1148 self.tracer.on_asset_invalidated(TracerAssetKey::new(
1149 std::any::type_name::<K>(),
1150 format!("{:?}", key),
1151 ));
1152
1153 // Add to pending FIRST (before clearing whale state)
1154 // This ensures: readers see either old value, or Loading+pending
1155 self.pending.insert::<K>(full_asset_key, key.clone());
1156
1157 // Atomic: clear cached value + invalidate dependents
1158 // Using None for data means "needs to be loaded"
1159 // Use stable durability to ensure queries at any durability level see the change.
1160 let _ = self
1161 .whale
1162 .register(full_cache_key, None, Durability::stable(), vec![]);
1163 }
1164
1165 /// Remove an asset from the cache entirely.
1166 ///
1167 /// Unlike `invalidate_asset`, this removes all traces of the asset.
1168 /// Dependent queries will go through the locator again on next access.
1169 pub fn remove_asset<K: AssetKey>(&self, key: &K) {
1170 let full_asset_key = FullAssetKey::new(key);
1171 let full_cache_key = FullCacheKey::from_asset_key(&full_asset_key);
1172
1173 // Remove from pending first
1174 self.pending.remove(&full_asset_key);
1175
1176 // Remove from whale (this also cleans up dependency edges)
1177 // whale.remove() invalidates dependents before removing
1178 self.whale.remove(&full_cache_key);
1179
1180 // Remove from registry and update sentinel for list_asset_keys
1181 if self.asset_key_registry.remove::<K>(key) {
1182 let sentinel = FullCacheKey::asset_key_set_sentinel::<K>();
1183 let _ = self
1184 .whale
1185 .register(sentinel, None, Durability::stable(), vec![]);
1186 }
1187 }
1188
1189 /// Get an asset by key without tracking dependencies.
1190 ///
1191 /// Unlike `QueryContext::asset()`, this method does NOT register the caller
1192 /// as a dependent of the asset. Use this for direct asset access outside
1193 /// of query execution.
1194 ///
1195 /// # Returns
1196 ///
1197 /// - `Ok(AssetLoadingState::ready(...))` - Asset is loaded and ready
1198 /// - `Ok(AssetLoadingState::loading(...))` - Asset is still loading (added to pending)
1199 /// - `Err(QueryError::MissingDependency)` - Asset was not found
1200 pub fn get_asset<K: AssetKey>(&self, key: K) -> Result<AssetLoadingState<K>, QueryError> {
1201 self.get_asset_internal(key)
1202 }
1203
1204 /// Internal: Get asset state and its changed_at revision atomically.
1205 ///
1206 /// This is the "direct" version called from QueryRuntime::asset (no dependency tracking).
1207 /// For calls from QueryContext::asset, use `get_asset_with_revision_ctx`.
1208 ///
1209 /// Returns (AssetLoadingState, changed_at) where changed_at is from the same
1210 /// whale node that provided the asset value.
1211 fn get_asset_with_revision<K: AssetKey>(
1212 &self,
1213 key: K,
1214 ) -> Result<(AssetLoadingState<K>, RevisionCounter), QueryError> {
1215 let full_asset_key = FullAssetKey::new(&key);
1216 let full_cache_key = FullCacheKey::from_asset_key(&full_asset_key);
1217
1218 // Helper to emit AssetRequested event
1219 let emit_requested = |tracer: &T, key: &K, state: TracerAssetState| {
1220 tracer.on_asset_requested(
1221 TracerAssetKey::new(std::any::type_name::<K>(), format!("{:?}", key)),
1222 state,
1223 );
1224 };
1225
1226 // Check whale cache first (single atomic read)
1227 if let Some(node) = self.whale.get(&full_cache_key) {
1228 let changed_at = node.changed_at;
1229 // Check if valid at current revision (shallow check)
1230 if self.whale.is_valid(&full_cache_key) {
1231 // Verify dependencies recursively (like query path does)
1232 let mut deps_verified = true;
1233 if let Some(deps) = self.whale.get_dependency_ids(&full_cache_key) {
1234 for dep in deps {
1235 if let Some(verifier) = self.verifiers.get(&dep) {
1236 // Re-run query/asset to verify it (triggers recursive verification)
1237 if verifier.verify(self as &dyn std::any::Any).is_err() {
1238 deps_verified = false;
1239 break;
1240 }
1241 }
1242 }
1243 }
1244
1245 // Re-check validity after deps are verified
1246 if deps_verified && self.whale.is_valid(&full_cache_key) {
1247 // For cached entries, check consistency for leaf assets (no locator deps).
1248 // This detects if resolve_asset/resolve_asset_error was called during query execution.
1249 let has_locator_deps = self
1250 .whale
1251 .get_dependency_ids(&full_cache_key)
1252 .is_some_and(|deps| !deps.is_empty());
1253
1254 match &node.data {
1255 Some(CachedEntry::AssetReady(arc)) => {
1256 // Check consistency for cached leaf assets
1257 if !has_locator_deps {
1258 check_leaf_asset_consistency(changed_at)?;
1259 }
1260 emit_requested(&self.tracer, &key, TracerAssetState::Ready);
1261 match arc.clone().downcast::<K::Asset>() {
1262 Ok(value) => {
1263 return Ok((AssetLoadingState::ready(key, value), changed_at))
1264 }
1265 Err(_) => {
1266 return Err(QueryError::MissingDependency {
1267 description: format!("Asset type mismatch: {:?}", key),
1268 })
1269 }
1270 }
1271 }
1272 Some(CachedEntry::AssetError(err)) => {
1273 // Check consistency for cached leaf errors
1274 if !has_locator_deps {
1275 check_leaf_asset_consistency(changed_at)?;
1276 }
1277 emit_requested(&self.tracer, &key, TracerAssetState::NotFound);
1278 return Err(QueryError::UserError(err.clone()));
1279 }
1280 None => {
1281 // Loading state - no value to be inconsistent with
1282 emit_requested(&self.tracer, &key, TracerAssetState::Loading);
1283 return Ok((AssetLoadingState::loading(key), changed_at));
1284 }
1285 _ => {
1286 // Query-related entries (Ok, UserError) shouldn't be here
1287 // Fall through to locator
1288 }
1289 }
1290 }
1291 }
1292 }
1293
1294 // Not in cache or invalid - try locator
1295 // Use LocatorContext to track deps on the asset itself
1296 check_cycle(&full_cache_key)?;
1297 let _guard = StackGuard::push(full_cache_key.clone());
1298
1299 let locator_ctx = LocatorContext::new(self, full_cache_key.clone());
1300 let locator_result =
1301 self.locators
1302 .locate_with_locator_ctx(TypeId::of::<K>(), &locator_ctx, &key);
1303
1304 if let Some(result) = locator_result {
1305 // Get collected dependencies from the locator context
1306 let locator_deps = locator_ctx.into_deps();
1307 match result {
1308 Ok(ErasedLocateResult::Ready {
1309 value: arc,
1310 durability: durability_level,
1311 }) => {
1312 emit_requested(&self.tracer, &key, TracerAssetState::Ready);
1313
1314 let typed_value: Arc<K::Asset> = match arc.downcast::<K::Asset>() {
1315 Ok(v) => v,
1316 Err(_) => {
1317 return Err(QueryError::MissingDependency {
1318 description: format!("Asset type mismatch: {:?}", key),
1319 });
1320 }
1321 };
1322
1323 // Store in whale atomically with early cutoff
1324 // Include locator's dependencies so the asset is invalidated when they change
1325 let entry = CachedEntry::AssetReady(typed_value.clone());
1326 let durability = Durability::new(durability_level.as_u8() as usize)
1327 .unwrap_or(Durability::volatile());
1328 let new_value = typed_value.clone();
1329 let result = self
1330 .whale
1331 .update_with_compare(
1332 full_cache_key.clone(),
1333 Some(entry),
1334 |old_data, _new_data| {
1335 let Some(CachedEntry::AssetReady(old_arc)) =
1336 old_data.and_then(|d| d.as_ref())
1337 else {
1338 return true;
1339 };
1340 let Ok(old_value) = old_arc.clone().downcast::<K::Asset>() else {
1341 return true;
1342 };
1343 !K::asset_eq(&old_value, &new_value)
1344 },
1345 durability,
1346 locator_deps,
1347 )
1348 .expect("update_with_compare should succeed");
1349
1350 // Register verifier for this asset (for verify-then-decide pattern)
1351 self.verifiers
1352 .insert_asset::<K, T>(full_cache_key, key.clone());
1353
1354 return Ok((AssetLoadingState::ready(key, typed_value), result.revision));
1355 }
1356 Ok(ErasedLocateResult::Pending) => {
1357 emit_requested(&self.tracer, &key, TracerAssetState::Loading);
1358
1359 // Add to pending list for Pending result
1360 self.pending.insert::<K>(full_asset_key, key.clone());
1361 match self
1362 .whale
1363 .get_or_insert(full_cache_key, None, Durability::volatile(), locator_deps)
1364 .expect("get_or_insert should succeed")
1365 {
1366 GetOrInsertResult::Inserted(node) => {
1367 return Ok((AssetLoadingState::loading(key), node.changed_at));
1368 }
1369 GetOrInsertResult::Existing(node) => {
1370 let changed_at = node.changed_at;
1371 match &node.data {
1372 Some(CachedEntry::AssetReady(arc)) => {
1373 match arc.clone().downcast::<K::Asset>() {
1374 Ok(value) => {
1375 return Ok((
1376 AssetLoadingState::ready(key, value),
1377 changed_at,
1378 ))
1379 }
1380 Err(_) => {
1381 return Ok((
1382 AssetLoadingState::loading(key),
1383 changed_at,
1384 ))
1385 }
1386 }
1387 }
1388 Some(CachedEntry::AssetError(err)) => {
1389 return Err(QueryError::UserError(err.clone()));
1390 }
1391 _ => return Ok((AssetLoadingState::loading(key), changed_at)),
1392 }
1393 }
1394 }
1395 }
1396 Err(QueryError::UserError(err)) => {
1397 // Locator returned a user error - cache it as AssetError
1398 let entry = CachedEntry::AssetError(err.clone());
1399 let _ = self.whale.register(
1400 full_cache_key,
1401 Some(entry),
1402 Durability::volatile(),
1403 locator_deps,
1404 );
1405 return Err(QueryError::UserError(err));
1406 }
1407 Err(e) => {
1408 // Other errors (Cycle, Suspended, etc.) - do NOT cache, propagate directly
1409 return Err(e);
1410 }
1411 }
1412 }
1413
1414 // No locator registered or locator returned None - mark as pending
1415 // (no locator was called, so no deps to track)
1416 emit_requested(&self.tracer, &key, TracerAssetState::Loading);
1417 self.pending
1418 .insert::<K>(full_asset_key.clone(), key.clone());
1419
1420 match self
1421 .whale
1422 .get_or_insert(full_cache_key, None, Durability::volatile(), vec![])
1423 .expect("get_or_insert with no dependencies cannot fail")
1424 {
1425 GetOrInsertResult::Inserted(node) => {
1426 Ok((AssetLoadingState::loading(key), node.changed_at))
1427 }
1428 GetOrInsertResult::Existing(node) => {
1429 let changed_at = node.changed_at;
1430 match &node.data {
1431 Some(CachedEntry::AssetReady(arc)) => {
1432 match arc.clone().downcast::<K::Asset>() {
1433 Ok(value) => Ok((AssetLoadingState::ready(key, value), changed_at)),
1434 Err(_) => Ok((AssetLoadingState::loading(key), changed_at)),
1435 }
1436 }
1437 Some(CachedEntry::AssetError(err)) => Err(QueryError::UserError(err.clone())),
1438 _ => Ok((AssetLoadingState::loading(key), changed_at)),
1439 }
1440 }
1441 }
1442 }
1443
1444 /// Internal: Get asset state and its changed_at revision atomically (with QueryContext).
1445 ///
1446 /// This version is called from QueryContext::asset. Consistency checking for
1447 /// cached leaf assets is done inside this function before returning.
1448 fn get_asset_with_revision_ctx<K: AssetKey>(
1449 &self,
1450 key: K,
1451 _ctx: &QueryContext<'_, T>,
1452 ) -> Result<(AssetLoadingState<K>, RevisionCounter), QueryError> {
1453 let full_asset_key = FullAssetKey::new(&key);
1454 let full_cache_key = FullCacheKey::from_asset_key(&full_asset_key);
1455
1456 // Helper to emit AssetRequested event
1457 let emit_requested = |tracer: &T, key: &K, state: TracerAssetState| {
1458 tracer.on_asset_requested(
1459 TracerAssetKey::new(std::any::type_name::<K>(), format!("{:?}", key)),
1460 state,
1461 );
1462 };
1463
1464 // Check whale cache first (single atomic read)
1465 if let Some(node) = self.whale.get(&full_cache_key) {
1466 let changed_at = node.changed_at;
1467 // Check if valid at current revision (shallow check)
1468 if self.whale.is_valid(&full_cache_key) {
1469 // Verify dependencies recursively (like query path does)
1470 let mut deps_verified = true;
1471 if let Some(deps) = self.whale.get_dependency_ids(&full_cache_key) {
1472 for dep in deps {
1473 if let Some(verifier) = self.verifiers.get(&dep) {
1474 // Re-run query/asset to verify it (triggers recursive verification)
1475 if verifier.verify(self as &dyn std::any::Any).is_err() {
1476 deps_verified = false;
1477 break;
1478 }
1479 }
1480 }
1481 }
1482
1483 // Re-check validity after deps are verified
1484 if deps_verified && self.whale.is_valid(&full_cache_key) {
1485 // For cached entries, check consistency for leaf assets (no locator deps).
1486 // This detects if resolve_asset/resolve_asset_error was called during query execution.
1487 let has_locator_deps = self
1488 .whale
1489 .get_dependency_ids(&full_cache_key)
1490 .is_some_and(|deps| !deps.is_empty());
1491
1492 match &node.data {
1493 Some(CachedEntry::AssetReady(arc)) => {
1494 // Check consistency for cached leaf assets
1495 if !has_locator_deps {
1496 check_leaf_asset_consistency(changed_at)?;
1497 }
1498 emit_requested(&self.tracer, &key, TracerAssetState::Ready);
1499 match arc.clone().downcast::<K::Asset>() {
1500 Ok(value) => {
1501 return Ok((AssetLoadingState::ready(key, value), changed_at))
1502 }
1503 Err(_) => {
1504 return Err(QueryError::MissingDependency {
1505 description: format!("Asset type mismatch: {:?}", key),
1506 })
1507 }
1508 }
1509 }
1510 Some(CachedEntry::AssetError(err)) => {
1511 // Check consistency for cached leaf errors
1512 if !has_locator_deps {
1513 check_leaf_asset_consistency(changed_at)?;
1514 }
1515 emit_requested(&self.tracer, &key, TracerAssetState::NotFound);
1516 return Err(QueryError::UserError(err.clone()));
1517 }
1518 None => {
1519 // Loading state - no value to be inconsistent with
1520 emit_requested(&self.tracer, &key, TracerAssetState::Loading);
1521 return Ok((AssetLoadingState::loading(key), changed_at));
1522 }
1523 _ => {
1524 // Query-related entries (Ok, UserError) shouldn't be here
1525 // Fall through to locator
1526 }
1527 }
1528 }
1529 }
1530 }
1531
1532 // Not in cache or invalid - try locator
1533 // Use LocatorContext to track deps on the asset itself (not the calling query)
1534 // Consistency tracking is handled via thread-local storage
1535 check_cycle(&full_cache_key)?;
1536 let _guard = StackGuard::push(full_cache_key.clone());
1537
1538 let locator_ctx = LocatorContext::new(self, full_cache_key.clone());
1539 let locator_result =
1540 self.locators
1541 .locate_with_locator_ctx(TypeId::of::<K>(), &locator_ctx, &key);
1542
1543 if let Some(result) = locator_result {
1544 // Get collected dependencies from the locator context
1545 let locator_deps = locator_ctx.into_deps();
1546 match result {
1547 Ok(ErasedLocateResult::Ready {
1548 value: arc,
1549 durability: durability_level,
1550 }) => {
1551 emit_requested(&self.tracer, &key, TracerAssetState::Ready);
1552
1553 let typed_value: Arc<K::Asset> = match arc.downcast::<K::Asset>() {
1554 Ok(v) => v,
1555 Err(_) => {
1556 return Err(QueryError::MissingDependency {
1557 description: format!("Asset type mismatch: {:?}", key),
1558 });
1559 }
1560 };
1561
1562 // Store in whale atomically with early cutoff
1563 // Include locator's dependencies so the asset is invalidated when they change
1564 let entry = CachedEntry::AssetReady(typed_value.clone());
1565 let durability = Durability::new(durability_level.as_u8() as usize)
1566 .unwrap_or(Durability::volatile());
1567 let new_value = typed_value.clone();
1568 let result = self
1569 .whale
1570 .update_with_compare(
1571 full_cache_key.clone(),
1572 Some(entry),
1573 |old_data, _new_data| {
1574 let Some(CachedEntry::AssetReady(old_arc)) =
1575 old_data.and_then(|d| d.as_ref())
1576 else {
1577 return true;
1578 };
1579 let Ok(old_value) = old_arc.clone().downcast::<K::Asset>() else {
1580 return true;
1581 };
1582 !K::asset_eq(&old_value, &new_value)
1583 },
1584 durability,
1585 locator_deps,
1586 )
1587 .expect("update_with_compare should succeed");
1588
1589 // Register verifier for this asset (for verify-then-decide pattern)
1590 self.verifiers
1591 .insert_asset::<K, T>(full_cache_key, key.clone());
1592
1593 return Ok((AssetLoadingState::ready(key, typed_value), result.revision));
1594 }
1595 Ok(ErasedLocateResult::Pending) => {
1596 emit_requested(&self.tracer, &key, TracerAssetState::Loading);
1597
1598 // Add to pending list for Pending result
1599 self.pending.insert::<K>(full_asset_key, key.clone());
1600 match self
1601 .whale
1602 .get_or_insert(full_cache_key, None, Durability::volatile(), locator_deps)
1603 .expect("get_or_insert should succeed")
1604 {
1605 GetOrInsertResult::Inserted(node) => {
1606 return Ok((AssetLoadingState::loading(key), node.changed_at));
1607 }
1608 GetOrInsertResult::Existing(node) => {
1609 let changed_at = node.changed_at;
1610 match &node.data {
1611 Some(CachedEntry::AssetReady(arc)) => {
1612 match arc.clone().downcast::<K::Asset>() {
1613 Ok(value) => {
1614 return Ok((
1615 AssetLoadingState::ready(key, value),
1616 changed_at,
1617 ));
1618 }
1619 Err(_) => {
1620 return Ok((
1621 AssetLoadingState::loading(key),
1622 changed_at,
1623 ))
1624 }
1625 }
1626 }
1627 Some(CachedEntry::AssetError(err)) => {
1628 return Err(QueryError::UserError(err.clone()));
1629 }
1630 _ => return Ok((AssetLoadingState::loading(key), changed_at)),
1631 }
1632 }
1633 }
1634 }
1635 Err(QueryError::UserError(err)) => {
1636 // Locator returned a user error - cache it as AssetError
1637 let entry = CachedEntry::AssetError(err.clone());
1638 let _ = self.whale.register(
1639 full_cache_key,
1640 Some(entry),
1641 Durability::volatile(),
1642 locator_deps,
1643 );
1644 return Err(QueryError::UserError(err));
1645 }
1646 Err(e) => {
1647 // Other errors (Cycle, Suspended, etc.) - do NOT cache, propagate directly
1648 return Err(e);
1649 }
1650 }
1651 }
1652
1653 // No locator registered or locator returned None - mark as pending
1654 emit_requested(&self.tracer, &key, TracerAssetState::Loading);
1655 self.pending
1656 .insert::<K>(full_asset_key.clone(), key.clone());
1657
1658 match self
1659 .whale
1660 .get_or_insert(full_cache_key, None, Durability::volatile(), vec![])
1661 .expect("get_or_insert with no dependencies cannot fail")
1662 {
1663 GetOrInsertResult::Inserted(node) => {
1664 Ok((AssetLoadingState::loading(key), node.changed_at))
1665 }
1666 GetOrInsertResult::Existing(node) => {
1667 let changed_at = node.changed_at;
1668 match &node.data {
1669 Some(CachedEntry::AssetReady(arc)) => {
1670 match arc.clone().downcast::<K::Asset>() {
1671 Ok(value) => Ok((AssetLoadingState::ready(key, value), changed_at)),
1672 Err(_) => Ok((AssetLoadingState::loading(key), changed_at)),
1673 }
1674 }
1675 Some(CachedEntry::AssetError(err)) => Err(QueryError::UserError(err.clone())),
1676 _ => Ok((AssetLoadingState::loading(key), changed_at)),
1677 }
1678 }
1679 }
1680 }
1681
1682 /// Internal: Get asset state, checking cache and locator.
1683 fn get_asset_internal<K: AssetKey>(&self, key: K) -> Result<AssetLoadingState<K>, QueryError> {
1684 self.get_asset_with_revision(key).map(|(state, _)| state)
1685 }
1686}
1687
1688impl<T: Tracer> Db for QueryRuntime<T> {
1689 fn query<Q: Query>(&self, query: Q) -> Result<Arc<Q::Output>, QueryError> {
1690 QueryRuntime::query(self, query)
1691 }
1692
1693 fn asset<K: AssetKey>(&self, key: K) -> Result<AssetLoadingState<K>, QueryError> {
1694 self.get_asset_internal(key)
1695 }
1696
1697 fn list_queries<Q: Query>(&self) -> Vec<Q> {
1698 self.query_registry.get_all::<Q>()
1699 }
1700
1701 fn list_asset_keys<K: AssetKey>(&self) -> Vec<K> {
1702 self.asset_key_registry.get_all::<K>()
1703 }
1704}
1705
1706/// Tracks consistency of leaf asset accesses during query execution.
1707///
1708/// A "leaf" asset is one without dependencies (externally resolved via `resolve_asset`).
1709/// This tracker ensures that all leaf assets accessed during a query execution
1710/// (including those accessed by locators) are consistent - i.e., none were modified
1711/// via `resolve_asset` mid-execution.
1712///
1713/// The tracker is shared across `QueryContext` and `LocatorContext` to propagate
1714/// consistency checking through the entire execution tree.
1715#[derive(Debug)]
1716pub(crate) struct ConsistencyTracker {
1717 /// Global revision at query start. All leaf assets must have changed_at <= this.
1718 start_revision: RevisionCounter,
1719}
1720
1721impl ConsistencyTracker {
1722 /// Create a new tracker with the given start revision.
1723 pub fn new(start_revision: RevisionCounter) -> Self {
1724 Self { start_revision }
1725 }
1726
1727 /// Check consistency for a leaf asset access.
1728 ///
1729 /// A leaf asset is consistent if its changed_at <= start_revision.
1730 /// This detects if resolve_asset was called during query execution.
1731 ///
1732 /// Returns Ok(()) if consistent, Err if inconsistent.
1733 pub fn check_leaf_asset(&self, dep_changed_at: RevisionCounter) -> Result<(), QueryError> {
1734 if dep_changed_at > self.start_revision {
1735 Err(QueryError::InconsistentAssetResolution)
1736 } else {
1737 Ok(())
1738 }
1739 }
1740}
1741
1742/// Context provided to queries during execution.
1743///
1744/// Use this to access dependencies via `query()`.
1745pub struct QueryContext<'a, T: Tracer = NoopTracer> {
1746 runtime: &'a QueryRuntime<T>,
1747 current_key: FullCacheKey,
1748 parent_query_type: &'static str,
1749 exec_ctx: ExecutionContext,
1750 deps: RefCell<Vec<FullCacheKey>>,
1751}
1752
1753impl<'a, T: Tracer> QueryContext<'a, T> {
1754 /// Query a dependency.
1755 ///
1756 /// The dependency is automatically tracked for invalidation.
1757 ///
1758 /// # Example
1759 ///
1760 /// ```ignore
1761 /// fn query(self, db: &impl Db) -> Result<Self::Output, QueryError> {
1762 /// let dep_result = db.query(OtherQuery { id: self.id })?;
1763 /// Ok(process(&dep_result))
1764 /// }
1765 /// ```
1766 pub fn query<Q: Query>(&self, query: Q) -> Result<Arc<Q::Output>, QueryError> {
1767 let key = query.cache_key();
1768 let full_key = FullCacheKey::new::<Q, _>(&key);
1769
1770 // Emit dependency registered event
1771 self.runtime.tracer.on_dependency_registered(
1772 self.exec_ctx.span_id(),
1773 TracerQueryKey::new(self.parent_query_type, self.current_key.debug_repr()),
1774 TracerQueryKey::new(std::any::type_name::<Q>(), full_key.debug_repr()),
1775 );
1776
1777 // Record this as a dependency
1778 self.deps.borrow_mut().push(full_key.clone());
1779
1780 // Execute the query
1781 self.runtime.query(query)
1782 }
1783
1784 /// Access an asset, tracking it as a dependency.
1785 ///
1786 /// Returns `AssetLoadingState<K>`:
1787 /// - `is_loading()` if the asset is still being loaded
1788 /// - `is_ready()` if the asset is available
1789 ///
1790 /// Use `.suspend()?` to convert to `Result<Arc<K::Asset>, QueryError>`,
1791 /// which returns `Err(QueryError::Suspend { asset })` if still loading.
1792 ///
1793 /// # Example
1794 ///
1795 /// ```ignore
1796 /// #[query]
1797 /// fn process_file(db: &impl Db, path: FilePath) -> Result<Output, QueryError> {
1798 /// let content = db.asset(path)?.suspend()?;
1799 /// // Process content...
1800 /// Ok(output)
1801 /// }
1802 /// ```
1803 ///
1804 /// # Errors
1805 ///
1806 /// Returns `Err(QueryError::MissingDependency)` if the asset was not found.
1807 pub fn asset<K: AssetKey>(&self, key: K) -> Result<AssetLoadingState<K>, QueryError> {
1808 let full_asset_key = FullAssetKey::new(&key);
1809 let full_cache_key = FullCacheKey::from_asset_key(&full_asset_key);
1810
1811 // 1. Emit asset dependency registered event
1812 self.runtime.tracer.on_asset_dependency_registered(
1813 self.exec_ctx.span_id(),
1814 TracerQueryKey::new(self.parent_query_type, self.current_key.debug_repr()),
1815 TracerAssetKey::new(std::any::type_name::<K>(), format!("{:?}", key)),
1816 );
1817
1818 // 2. Record dependency on this asset
1819 self.deps.borrow_mut().push(full_cache_key);
1820
1821 // 3. Get asset from cache/locator
1822 // Consistency checking for cached leaf assets is done inside get_asset_with_revision_ctx
1823 let (state, _changed_at) = self.runtime.get_asset_with_revision_ctx(key, self)?;
1824
1825 Ok(state)
1826 }
1827
1828 /// List all query instances of type Q that have been registered.
1829 ///
1830 /// This method establishes a dependency on the "set" of queries of type Q.
1831 /// The calling query will be invalidated when:
1832 /// - A new query of type Q is first executed (added to set)
1833 ///
1834 /// The calling query will NOT be invalidated when:
1835 /// - An individual query of type Q has its value change
1836 ///
1837 /// # Example
1838 ///
1839 /// ```ignore
1840 /// #[query]
1841 /// fn all_results(db: &impl Db) -> Result<Vec<i32>, QueryError> {
1842 /// let queries = db.list_queries::<MyQuery>();
1843 /// let mut results = Vec::new();
1844 /// for q in queries {
1845 /// results.push(*db.query(q)?);
1846 /// }
1847 /// Ok(results)
1848 /// }
1849 /// ```
1850 pub fn list_queries<Q: Query>(&self) -> Vec<Q> {
1851 // Record dependency on the sentinel (set-level dependency)
1852 let sentinel = FullCacheKey::query_set_sentinel::<Q>();
1853
1854 self.runtime.tracer.on_dependency_registered(
1855 self.exec_ctx.span_id(),
1856 TracerQueryKey::new(self.parent_query_type, self.current_key.debug_repr()),
1857 TracerQueryKey::new("QuerySet", sentinel.debug_repr()),
1858 );
1859
1860 // Ensure sentinel exists in whale (for dependency tracking)
1861 if self.runtime.whale.get(&sentinel).is_none() {
1862 let _ =
1863 self.runtime
1864 .whale
1865 .register(sentinel.clone(), None, Durability::volatile(), vec![]);
1866 }
1867
1868 self.deps.borrow_mut().push(sentinel);
1869
1870 // Return all registered queries
1871 self.runtime.query_registry.get_all::<Q>()
1872 }
1873
1874 /// List all asset keys of type K that have been registered.
1875 ///
1876 /// This method establishes a dependency on the "set" of asset keys of type K.
1877 /// The calling query will be invalidated when:
1878 /// - A new asset of type K is resolved for the first time (added to set)
1879 /// - An asset of type K is removed via remove_asset
1880 ///
1881 /// The calling query will NOT be invalidated when:
1882 /// - An individual asset's value changes (use `db.asset()` for that)
1883 ///
1884 /// # Example
1885 ///
1886 /// ```ignore
1887 /// #[query]
1888 /// fn all_configs(db: &impl Db) -> Result<Vec<String>, QueryError> {
1889 /// let keys = db.list_asset_keys::<ConfigFile>();
1890 /// let mut contents = Vec::new();
1891 /// for key in keys {
1892 /// let content = db.asset(&key)?.suspend()?;
1893 /// contents.push((*content).clone());
1894 /// }
1895 /// Ok(contents)
1896 /// }
1897 /// ```
1898 pub fn list_asset_keys<K: AssetKey>(&self) -> Vec<K> {
1899 // Record dependency on the sentinel (set-level dependency)
1900 let sentinel = FullCacheKey::asset_key_set_sentinel::<K>();
1901
1902 self.runtime.tracer.on_asset_dependency_registered(
1903 self.exec_ctx.span_id(),
1904 TracerQueryKey::new(self.parent_query_type, self.current_key.debug_repr()),
1905 TracerAssetKey::new("AssetKeySet", sentinel.debug_repr()),
1906 );
1907
1908 // Ensure sentinel exists in whale (for dependency tracking)
1909 if self.runtime.whale.get(&sentinel).is_none() {
1910 let _ =
1911 self.runtime
1912 .whale
1913 .register(sentinel.clone(), None, Durability::volatile(), vec![]);
1914 }
1915
1916 self.deps.borrow_mut().push(sentinel);
1917
1918 // Return all registered asset keys
1919 self.runtime.asset_key_registry.get_all::<K>()
1920 }
1921}
1922
1923impl<'a, T: Tracer> Db for QueryContext<'a, T> {
1924 fn query<Q: Query>(&self, query: Q) -> Result<Arc<Q::Output>, QueryError> {
1925 QueryContext::query(self, query)
1926 }
1927
1928 fn asset<K: AssetKey>(&self, key: K) -> Result<AssetLoadingState<K>, QueryError> {
1929 QueryContext::asset(self, key)
1930 }
1931
1932 fn list_queries<Q: Query>(&self) -> Vec<Q> {
1933 QueryContext::list_queries(self)
1934 }
1935
1936 fn list_asset_keys<K: AssetKey>(&self) -> Vec<K> {
1937 QueryContext::list_asset_keys(self)
1938 }
1939}
1940
1941/// Context for collecting dependencies during asset locator execution.
1942///
1943/// Unlike `QueryContext`, this is specifically for locators and does not
1944/// register dependencies on any parent query. Dependencies collected here
1945/// are stored with the asset itself.
1946pub(crate) struct LocatorContext<'a, T: Tracer> {
1947 runtime: &'a QueryRuntime<T>,
1948 deps: RefCell<Vec<FullCacheKey>>,
1949}
1950
1951impl<'a, T: Tracer> LocatorContext<'a, T> {
1952 /// Create a new locator context for the given asset key.
1953 ///
1954 /// Consistency tracking is handled via thread-local storage, so leaf asset
1955 /// accesses will be checked against any active tracker from a parent query.
1956 pub(crate) fn new(runtime: &'a QueryRuntime<T>, _asset_key: FullCacheKey) -> Self {
1957 Self {
1958 runtime,
1959 deps: RefCell::new(Vec::new()),
1960 }
1961 }
1962
1963 /// Consume this context and return the collected dependencies.
1964 pub(crate) fn into_deps(self) -> Vec<FullCacheKey> {
1965 self.deps.into_inner()
1966 }
1967}
1968
1969impl<T: Tracer> Db for LocatorContext<'_, T> {
1970 fn query<Q: Query>(&self, query: Q) -> Result<Arc<Q::Output>, QueryError> {
1971 let key = query.cache_key();
1972 let full_key = FullCacheKey::new::<Q, _>(&key);
1973
1974 // Record this as a dependency of the asset being located
1975 self.deps.borrow_mut().push(full_key);
1976
1977 // Execute the query via the runtime
1978 self.runtime.query(query)
1979 }
1980
1981 fn asset<K: AssetKey>(&self, key: K) -> Result<AssetLoadingState<K>, QueryError> {
1982 let full_asset_key = FullAssetKey::new(&key);
1983 let full_cache_key = FullCacheKey::from_asset_key(&full_asset_key);
1984
1985 // Record this as a dependency of the asset being located
1986 self.deps.borrow_mut().push(full_cache_key);
1987
1988 // Access the asset - consistency checking is done inside get_asset_with_revision
1989 let (state, _changed_at) = self.runtime.get_asset_with_revision(key)?;
1990
1991 Ok(state)
1992 }
1993
1994 fn list_queries<Q: Query>(&self) -> Vec<Q> {
1995 self.runtime.list_queries()
1996 }
1997
1998 fn list_asset_keys<K: AssetKey>(&self) -> Vec<K> {
1999 self.runtime.list_asset_keys()
2000 }
2001}
2002
2003#[cfg(test)]
2004mod tests {
2005 use super::*;
2006
2007 #[test]
2008 fn test_simple_query() {
2009 #[derive(Clone)]
2010 struct Add {
2011 a: i32,
2012 b: i32,
2013 }
2014
2015 impl Query for Add {
2016 type CacheKey = (i32, i32);
2017 type Output = i32;
2018
2019 fn cache_key(&self) -> Self::CacheKey {
2020 (self.a, self.b)
2021 }
2022
2023 fn query(self, _db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
2024 Ok(Arc::new(self.a + self.b))
2025 }
2026
2027 fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
2028 old == new
2029 }
2030 }
2031
2032 let runtime = QueryRuntime::new();
2033
2034 let result = runtime.query(Add { a: 1, b: 2 }).unwrap();
2035 assert_eq!(*result, 3);
2036
2037 // Second query should be cached
2038 let result2 = runtime.query(Add { a: 1, b: 2 }).unwrap();
2039 assert_eq!(*result2, 3);
2040 }
2041
2042 #[test]
2043 fn test_dependent_queries() {
2044 #[derive(Clone)]
2045 struct Base {
2046 value: i32,
2047 }
2048
2049 impl Query for Base {
2050 type CacheKey = i32;
2051 type Output = i32;
2052
2053 fn cache_key(&self) -> Self::CacheKey {
2054 self.value
2055 }
2056
2057 fn query(self, _db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
2058 Ok(Arc::new(self.value * 2))
2059 }
2060
2061 fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
2062 old == new
2063 }
2064 }
2065
2066 #[derive(Clone)]
2067 struct Derived {
2068 base_value: i32,
2069 }
2070
2071 impl Query for Derived {
2072 type CacheKey = i32;
2073 type Output = i32;
2074
2075 fn cache_key(&self) -> Self::CacheKey {
2076 self.base_value
2077 }
2078
2079 fn query(self, db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
2080 let base = db.query(Base {
2081 value: self.base_value,
2082 })?;
2083 Ok(Arc::new(*base + 10))
2084 }
2085
2086 fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
2087 old == new
2088 }
2089 }
2090
2091 let runtime = QueryRuntime::new();
2092
2093 let result = runtime.query(Derived { base_value: 5 }).unwrap();
2094 assert_eq!(*result, 20); // 5 * 2 + 10
2095 }
2096
2097 #[test]
2098 fn test_cycle_detection() {
2099 #[derive(Clone)]
2100 struct CycleA {
2101 id: i32,
2102 }
2103
2104 #[derive(Clone)]
2105 struct CycleB {
2106 id: i32,
2107 }
2108
2109 impl Query for CycleA {
2110 type CacheKey = i32;
2111 type Output = i32;
2112
2113 fn cache_key(&self) -> Self::CacheKey {
2114 self.id
2115 }
2116
2117 fn query(self, db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
2118 let b = db.query(CycleB { id: self.id })?;
2119 Ok(Arc::new(*b + 1))
2120 }
2121
2122 fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
2123 old == new
2124 }
2125 }
2126
2127 impl Query for CycleB {
2128 type CacheKey = i32;
2129 type Output = i32;
2130
2131 fn cache_key(&self) -> Self::CacheKey {
2132 self.id
2133 }
2134
2135 fn query(self, db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
2136 let a = db.query(CycleA { id: self.id })?;
2137 Ok(Arc::new(*a + 1))
2138 }
2139
2140 fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
2141 old == new
2142 }
2143 }
2144
2145 let runtime = QueryRuntime::new();
2146
2147 let result = runtime.query(CycleA { id: 1 });
2148 assert!(matches!(result, Err(QueryError::Cycle { .. })));
2149 }
2150
2151 #[test]
2152 fn test_fallible_query() {
2153 #[derive(Clone)]
2154 struct ParseInt {
2155 input: String,
2156 }
2157
2158 impl Query for ParseInt {
2159 type CacheKey = String;
2160 type Output = Result<i32, std::num::ParseIntError>;
2161
2162 fn cache_key(&self) -> Self::CacheKey {
2163 self.input.clone()
2164 }
2165
2166 fn query(self, _db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
2167 Ok(Arc::new(self.input.parse()))
2168 }
2169
2170 fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
2171 old == new
2172 }
2173 }
2174
2175 let runtime = QueryRuntime::new();
2176
2177 // Valid parse
2178 let result = runtime
2179 .query(ParseInt {
2180 input: "42".to_string(),
2181 })
2182 .unwrap();
2183 assert_eq!(*result, Ok(42));
2184
2185 // Invalid parse - system succeeds, user error in output
2186 let result = runtime
2187 .query(ParseInt {
2188 input: "not_a_number".to_string(),
2189 })
2190 .unwrap();
2191 assert!(result.is_err());
2192 }
2193
2194 // Macro tests
2195 mod macro_tests {
2196 use super::*;
2197 use crate::query;
2198
2199 #[query]
2200 fn add(db: &impl Db, a: i32, b: i32) -> Result<i32, QueryError> {
2201 let _ = db; // silence unused warning
2202 Ok(a + b)
2203 }
2204
2205 #[test]
2206 fn test_macro_basic() {
2207 let runtime = QueryRuntime::new();
2208 let result = runtime.query(Add::new(1, 2)).unwrap();
2209 assert_eq!(*result, 3);
2210 }
2211
2212 #[query]
2213 fn simple_double(db: &impl Db, x: i32) -> Result<i32, QueryError> {
2214 let _ = db;
2215 Ok(x * 2)
2216 }
2217
2218 #[test]
2219 fn test_macro_simple() {
2220 let runtime = QueryRuntime::new();
2221 let result = runtime.query(SimpleDouble::new(5)).unwrap();
2222 assert_eq!(*result, 10);
2223 }
2224
2225 #[query(keys(id))]
2226 fn with_key_selection(
2227 db: &impl Db,
2228 id: u32,
2229 include_extra: bool,
2230 ) -> Result<String, QueryError> {
2231 let _ = db;
2232 Ok(format!("id={}, extra={}", id, include_extra))
2233 }
2234
2235 #[test]
2236 fn test_macro_key_selection() {
2237 let runtime = QueryRuntime::new();
2238
2239 // Same id, different include_extra - should return cached
2240 let r1 = runtime.query(WithKeySelection::new(1, true)).unwrap();
2241 let r2 = runtime.query(WithKeySelection::new(1, false)).unwrap();
2242
2243 // Both should have same value because only `id` is the key
2244 assert_eq!(*r1, "id=1, extra=true");
2245 assert_eq!(*r2, "id=1, extra=true"); // Cached!
2246 }
2247
2248 #[query]
2249 fn dependent(db: &impl Db, a: i32, b: i32) -> Result<i32, QueryError> {
2250 let sum = db.query(Add::new(a, b))?;
2251 Ok(*sum * 2)
2252 }
2253
2254 #[test]
2255 fn test_macro_dependencies() {
2256 let runtime = QueryRuntime::new();
2257 let result = runtime.query(Dependent::new(3, 4)).unwrap();
2258 assert_eq!(*result, 14); // (3 + 4) * 2
2259 }
2260
2261 #[query(output_eq)]
2262 fn with_output_eq(db: &impl Db, x: i32) -> Result<i32, QueryError> {
2263 let _ = db;
2264 Ok(x * 2)
2265 }
2266
2267 #[test]
2268 fn test_macro_output_eq() {
2269 let runtime = QueryRuntime::new();
2270 let result = runtime.query(WithOutputEq::new(5)).unwrap();
2271 assert_eq!(*result, 10);
2272 }
2273
2274 #[query(name = "CustomName")]
2275 fn original_name(db: &impl Db, x: i32) -> Result<i32, QueryError> {
2276 let _ = db;
2277 Ok(x)
2278 }
2279
2280 #[test]
2281 fn test_macro_custom_name() {
2282 let runtime = QueryRuntime::new();
2283 let result = runtime.query(CustomName::new(42)).unwrap();
2284 assert_eq!(*result, 42);
2285 }
2286
2287 // Test that attribute macros like #[tracing::instrument] are preserved
2288 // We use #[allow(unused_variables)] and #[inline] as test attributes since
2289 // they don't require external dependencies.
2290 #[allow(unused_variables)]
2291 #[inline]
2292 #[query]
2293 fn with_attributes(db: &impl Db, x: i32) -> Result<i32, QueryError> {
2294 // This would warn without #[allow(unused_variables)] on the generated method
2295 let unused_var = 42;
2296 Ok(x * 2)
2297 }
2298
2299 #[test]
2300 fn test_macro_preserves_attributes() {
2301 let runtime = QueryRuntime::new();
2302 // If attributes weren't preserved, this might warn about unused_var
2303 let result = runtime.query(WithAttributes::new(5)).unwrap();
2304 assert_eq!(*result, 10);
2305 }
2306 }
2307
2308 // Tests for poll() and changed_at()
2309 mod poll_tests {
2310 use super::*;
2311
2312 #[derive(Clone)]
2313 struct Counter {
2314 id: i32,
2315 }
2316
2317 impl Query for Counter {
2318 type CacheKey = i32;
2319 type Output = i32;
2320
2321 fn cache_key(&self) -> Self::CacheKey {
2322 self.id
2323 }
2324
2325 fn query(self, _db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
2326 Ok(Arc::new(self.id * 10))
2327 }
2328
2329 fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
2330 old == new
2331 }
2332 }
2333
2334 #[test]
2335 fn test_poll_returns_value_and_revision() {
2336 let runtime = QueryRuntime::new();
2337
2338 let result = runtime.poll(Counter { id: 1 }).unwrap();
2339
2340 // Value should be correct - access through Result and Arc
2341 assert_eq!(**result.value.as_ref().unwrap(), 10);
2342
2343 // Revision should be non-zero after first execution
2344 assert!(result.revision > 0);
2345 }
2346
2347 #[test]
2348 fn test_poll_revision_stable_on_cache_hit() {
2349 let runtime = QueryRuntime::new();
2350
2351 // First poll
2352 let result1 = runtime.poll(Counter { id: 1 }).unwrap();
2353 let rev1 = result1.revision;
2354
2355 // Second poll (cache hit)
2356 let result2 = runtime.poll(Counter { id: 1 }).unwrap();
2357 let rev2 = result2.revision;
2358
2359 // Revision should be the same (no change)
2360 assert_eq!(rev1, rev2);
2361 }
2362
2363 #[test]
2364 fn test_poll_revision_changes_on_invalidate() {
2365 let runtime = QueryRuntime::new();
2366
2367 // First poll
2368 let result1 = runtime.poll(Counter { id: 1 }).unwrap();
2369 let rev1 = result1.revision;
2370
2371 // Invalidate and poll again
2372 runtime.invalidate::<Counter>(&1);
2373 let result2 = runtime.poll(Counter { id: 1 }).unwrap();
2374 let rev2 = result2.revision;
2375
2376 // Revision should increase (value was recomputed)
2377 // Note: Since output_eq returns true (same value), this might not change
2378 // depending on early cutoff behavior. Let's verify the value is still correct.
2379 assert_eq!(**result2.value.as_ref().unwrap(), 10);
2380
2381 // With early cutoff, revision might stay the same if value didn't change
2382 // This is expected behavior
2383 assert!(rev2 >= rev1);
2384 }
2385
2386 #[test]
2387 fn test_changed_at_returns_none_for_unexecuted_query() {
2388 let runtime = QueryRuntime::new();
2389
2390 // Query has never been executed
2391 let rev = runtime.changed_at::<Counter>(&1);
2392 assert!(rev.is_none());
2393 }
2394
2395 #[test]
2396 fn test_changed_at_returns_revision_after_execution() {
2397 let runtime = QueryRuntime::new();
2398
2399 // Execute the query
2400 let _ = runtime.query(Counter { id: 1 }).unwrap();
2401
2402 // Now changed_at should return Some
2403 let rev = runtime.changed_at::<Counter>(&1);
2404 assert!(rev.is_some());
2405 assert!(rev.unwrap() > 0);
2406 }
2407
2408 #[test]
2409 fn test_changed_at_matches_poll_revision() {
2410 let runtime = QueryRuntime::new();
2411
2412 // Poll the query
2413 let result = runtime.poll(Counter { id: 1 }).unwrap();
2414
2415 // changed_at should match the revision from poll
2416 let rev = runtime.changed_at::<Counter>(&1);
2417 assert_eq!(rev, Some(result.revision));
2418 }
2419
2420 #[test]
2421 fn test_poll_value_access() {
2422 let runtime = QueryRuntime::new();
2423
2424 let result = runtime.poll(Counter { id: 5 }).unwrap();
2425
2426 // Access through Result and Arc
2427 let value: &i32 = result.value.as_ref().unwrap();
2428 assert_eq!(*value, 50);
2429
2430 // Access Arc directly via field after unwrapping Result
2431 let arc: &Arc<i32> = result.value.as_ref().unwrap();
2432 assert_eq!(**arc, 50);
2433 }
2434
2435 #[test]
2436 fn test_subscription_pattern() {
2437 let runtime = QueryRuntime::new();
2438
2439 // Simulate subscription pattern
2440 let mut last_revision: RevisionCounter = 0;
2441 let mut notifications = 0;
2442
2443 // First poll - should notify (new value)
2444 let result = runtime.poll(Counter { id: 1 }).unwrap();
2445 if result.revision > last_revision {
2446 notifications += 1;
2447 last_revision = result.revision;
2448 }
2449
2450 // Second poll - should NOT notify (no change)
2451 let result = runtime.poll(Counter { id: 1 }).unwrap();
2452 if result.revision > last_revision {
2453 notifications += 1;
2454 last_revision = result.revision;
2455 }
2456
2457 // Third poll - should NOT notify (no change)
2458 let result = runtime.poll(Counter { id: 1 }).unwrap();
2459 if result.revision > last_revision {
2460 notifications += 1;
2461 #[allow(unused_assignments)]
2462 {
2463 last_revision = result.revision;
2464 }
2465 }
2466
2467 // Only the first poll should have triggered a notification
2468 assert_eq!(notifications, 1);
2469 }
2470 }
2471
2472 // Tests for GC APIs
2473 mod gc_tests {
2474 use super::*;
2475 use std::collections::HashSet;
2476 use std::sync::atomic::{AtomicUsize, Ordering};
2477 use std::sync::Mutex;
2478
2479 #[derive(Clone)]
2480 struct Leaf {
2481 id: i32,
2482 }
2483
2484 impl Query for Leaf {
2485 type CacheKey = i32;
2486 type Output = i32;
2487
2488 fn cache_key(&self) -> Self::CacheKey {
2489 self.id
2490 }
2491
2492 fn query(self, _db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
2493 Ok(Arc::new(self.id * 10))
2494 }
2495
2496 fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
2497 old == new
2498 }
2499 }
2500
2501 #[derive(Clone)]
2502 struct Parent {
2503 child_id: i32,
2504 }
2505
2506 impl Query for Parent {
2507 type CacheKey = i32;
2508 type Output = i32;
2509
2510 fn cache_key(&self) -> Self::CacheKey {
2511 self.child_id
2512 }
2513
2514 fn query(self, db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
2515 let child = db.query(Leaf { id: self.child_id })?;
2516 Ok(Arc::new(*child + 1))
2517 }
2518
2519 fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
2520 old == new
2521 }
2522 }
2523
2524 #[test]
2525 fn test_query_keys_returns_all_cached_queries() {
2526 let runtime = QueryRuntime::new();
2527
2528 // Execute some queries
2529 let _ = runtime.query(Leaf { id: 1 }).unwrap();
2530 let _ = runtime.query(Leaf { id: 2 }).unwrap();
2531 let _ = runtime.query(Leaf { id: 3 }).unwrap();
2532
2533 // Get all keys
2534 let keys = runtime.query_keys();
2535
2536 // Should have at least 3 keys (might have more due to sentinels)
2537 assert!(keys.len() >= 3);
2538 }
2539
2540 #[test]
2541 fn test_remove_removes_query() {
2542 let runtime = QueryRuntime::new();
2543
2544 // Execute a query
2545 let _ = runtime.query(Leaf { id: 1 }).unwrap();
2546
2547 // Get the key
2548 let full_key = FullCacheKey::new::<Leaf, _>(&1);
2549
2550 // Query should exist
2551 assert!(runtime.changed_at::<Leaf>(&1).is_some());
2552
2553 // Remove it
2554 assert!(runtime.remove(&full_key));
2555
2556 // Query should no longer exist
2557 assert!(runtime.changed_at::<Leaf>(&1).is_none());
2558 }
2559
2560 #[test]
2561 fn test_remove_if_unused_removes_leaf_query() {
2562 let runtime = QueryRuntime::new();
2563
2564 // Execute a leaf query (no dependents)
2565 let _ = runtime.query(Leaf { id: 1 }).unwrap();
2566
2567 // Should be removable since no other query depends on it
2568 assert!(runtime.remove_query_if_unused::<Leaf>(&1));
2569
2570 // Query should no longer exist
2571 assert!(runtime.changed_at::<Leaf>(&1).is_none());
2572 }
2573
2574 #[test]
2575 fn test_remove_if_unused_does_not_remove_query_with_dependents() {
2576 let runtime = QueryRuntime::new();
2577
2578 // Execute parent query (which depends on Leaf)
2579 let _ = runtime.query(Parent { child_id: 1 }).unwrap();
2580
2581 // Leaf query should not be removable since Parent depends on it
2582 assert!(!runtime.remove_query_if_unused::<Leaf>(&1));
2583
2584 // Leaf query should still exist
2585 assert!(runtime.changed_at::<Leaf>(&1).is_some());
2586
2587 // But Parent should be removable (no dependents)
2588 assert!(runtime.remove_query_if_unused::<Parent>(&1));
2589 }
2590
2591 #[test]
2592 fn test_remove_if_unused_with_full_cache_key() {
2593 let runtime = QueryRuntime::new();
2594
2595 // Execute a leaf query
2596 let _ = runtime.query(Leaf { id: 1 }).unwrap();
2597
2598 let full_key = FullCacheKey::new::<Leaf, _>(&1);
2599
2600 // Should be removable via FullCacheKey
2601 assert!(runtime.remove_if_unused(&full_key));
2602
2603 // Query should no longer exist
2604 assert!(runtime.changed_at::<Leaf>(&1).is_none());
2605 }
2606
2607 // Test tracer receives on_query_key calls
2608 struct GcTracker {
2609 accessed_keys: Mutex<HashSet<String>>,
2610 access_count: AtomicUsize,
2611 }
2612
2613 impl GcTracker {
2614 fn new() -> Self {
2615 Self {
2616 accessed_keys: Mutex::new(HashSet::new()),
2617 access_count: AtomicUsize::new(0),
2618 }
2619 }
2620 }
2621
2622 impl Tracer for GcTracker {
2623 fn new_span_id(&self) -> SpanId {
2624 SpanId(1)
2625 }
2626
2627 fn on_query_key(&self, full_key: &FullCacheKey) {
2628 self.accessed_keys
2629 .lock()
2630 .unwrap()
2631 .insert(full_key.debug_repr().to_string());
2632 self.access_count.fetch_add(1, Ordering::Relaxed);
2633 }
2634 }
2635
2636 #[test]
2637 fn test_tracer_receives_on_query_key() {
2638 let tracker = GcTracker::new();
2639 let runtime = QueryRuntime::with_tracer(tracker);
2640
2641 // Execute some queries
2642 let _ = runtime.query(Leaf { id: 1 }).unwrap();
2643 let _ = runtime.query(Leaf { id: 2 }).unwrap();
2644
2645 // Tracer should have received on_query_key calls
2646 let count = runtime.tracer().access_count.load(Ordering::Relaxed);
2647 assert_eq!(count, 2);
2648
2649 // Check that the keys were recorded
2650 let keys = runtime.tracer().accessed_keys.lock().unwrap();
2651 assert!(keys.iter().any(|k| k.contains("Leaf")));
2652 }
2653
2654 #[test]
2655 fn test_tracer_receives_on_query_key_for_cache_hits() {
2656 let tracker = GcTracker::new();
2657 let runtime = QueryRuntime::with_tracer(tracker);
2658
2659 // Execute query twice (second is cache hit)
2660 let _ = runtime.query(Leaf { id: 1 }).unwrap();
2661 let _ = runtime.query(Leaf { id: 1 }).unwrap();
2662
2663 // Tracer should have received on_query_key for both calls
2664 let count = runtime.tracer().access_count.load(Ordering::Relaxed);
2665 assert_eq!(count, 2);
2666 }
2667
2668 #[test]
2669 fn test_gc_workflow() {
2670 let tracker = GcTracker::new();
2671 let runtime = QueryRuntime::with_tracer(tracker);
2672
2673 // Execute some queries
2674 let _ = runtime.query(Leaf { id: 1 }).unwrap();
2675 let _ = runtime.query(Leaf { id: 2 }).unwrap();
2676 let _ = runtime.query(Leaf { id: 3 }).unwrap();
2677
2678 // Simulate GC: remove all queries that are not in use
2679 let mut removed = 0;
2680 for key in runtime.query_keys() {
2681 if runtime.remove_if_unused(&key) {
2682 removed += 1;
2683 }
2684 }
2685
2686 // All leaf queries should be removable
2687 assert!(removed >= 3);
2688
2689 // Queries should no longer exist
2690 assert!(runtime.changed_at::<Leaf>(&1).is_none());
2691 assert!(runtime.changed_at::<Leaf>(&2).is_none());
2692 assert!(runtime.changed_at::<Leaf>(&3).is_none());
2693 }
2694 }
2695}