query_flow/runtime.rs
1//! Query runtime and context.
2
3use std::any::{Any, TypeId};
4use std::cell::RefCell;
5use std::ops::Deref;
6use std::rc::Rc;
7use std::sync::Arc;
8
9use whale::{Durability, GetOrInsertResult, RevisionCounter, Runtime as WhaleRuntime};
10
11use crate::asset::{AssetKey, AssetLocator, DurabilityLevel, FullAssetKey, PendingAsset};
12use crate::db::Db;
13use crate::key::FullCacheKey;
14use crate::loading::AssetLoadingState;
15use crate::query::Query;
16use crate::storage::{
17 AssetKeyRegistry, CachedEntry, CachedValue, ErasedLocateResult, LocatorStorage, PendingStorage,
18 QueryRegistry, VerifierStorage,
19};
20use crate::tracer::{
21 ExecutionResult, InvalidationReason, NoopTracer, SpanId, Tracer, TracerAssetKey,
22 TracerAssetState, TracerQueryKey,
23};
24use crate::QueryError;
25
26/// Function type for comparing user errors during early cutoff.
27///
28/// Used by `QueryRuntimeBuilder::error_comparator` to customize how
29/// `QueryError::UserError` values are compared for caching purposes.
30pub type ErrorComparator = fn(&anyhow::Error, &anyhow::Error) -> bool;
31
32/// Number of durability levels (matches whale's default).
33const DURABILITY_LEVELS: usize = 4;
34
35// Thread-local query execution stack for cycle detection.
36thread_local! {
37 static QUERY_STACK: RefCell<Vec<FullCacheKey>> = const { RefCell::new(Vec::new()) };
38
39 /// Current consistency tracker for leaf asset checks.
40 /// Set during query execution, used by all nested locator calls.
41 /// Uses Rc since thread-local is single-threaded.
42 static CONSISTENCY_TRACKER: RefCell<Option<Rc<ConsistencyTracker>>> = const { RefCell::new(None) };
43}
44
45/// Check a leaf asset against the current consistency tracker (if any).
46/// Returns Ok if no tracker is set or if the check passes.
47fn check_leaf_asset_consistency(dep_changed_at: RevisionCounter) -> Result<(), QueryError> {
48 CONSISTENCY_TRACKER.with(|tracker| {
49 if let Some(ref t) = *tracker.borrow() {
50 t.check_leaf_asset(dep_changed_at)
51 } else {
52 Ok(())
53 }
54 })
55}
56
57/// RAII guard that sets the consistency tracker for the current thread.
58struct ConsistencyTrackerGuard {
59 previous: Option<Rc<ConsistencyTracker>>,
60}
61
62impl ConsistencyTrackerGuard {
63 fn new(tracker: Rc<ConsistencyTracker>) -> Self {
64 let previous = CONSISTENCY_TRACKER.with(|t| t.borrow_mut().replace(tracker));
65 Self { previous }
66 }
67}
68
69impl Drop for ConsistencyTrackerGuard {
70 fn drop(&mut self) {
71 CONSISTENCY_TRACKER.with(|t| {
72 *t.borrow_mut() = self.previous.take();
73 });
74 }
75}
76
77/// Check for cycles in the query stack and return error if detected.
78fn check_cycle(key: &FullCacheKey) -> Result<(), QueryError> {
79 let cycle_detected = QUERY_STACK.with(|stack| stack.borrow().iter().any(|k| k == key));
80 if cycle_detected {
81 let path = QUERY_STACK.with(|stack| {
82 let stack = stack.borrow();
83 let mut path: Vec<String> = stack.iter().map(|k| k.debug_repr().to_string()).collect();
84 path.push(key.debug_repr().to_string());
85 path
86 });
87 return Err(QueryError::Cycle { path });
88 }
89 Ok(())
90}
91
92/// RAII guard for pushing/popping from query stack.
93struct StackGuard;
94
95impl StackGuard {
96 fn push(key: FullCacheKey) -> Self {
97 QUERY_STACK.with(|stack| stack.borrow_mut().push(key));
98 StackGuard
99 }
100}
101
102impl Drop for StackGuard {
103 fn drop(&mut self) {
104 QUERY_STACK.with(|stack| {
105 stack.borrow_mut().pop();
106 });
107 }
108}
109
110/// Execution context passed through query execution.
111///
112/// Contains a span ID for tracing correlation.
113#[derive(Clone, Copy)]
114pub struct ExecutionContext {
115 span_id: SpanId,
116}
117
118impl ExecutionContext {
119 /// Create a new execution context with the given span ID.
120 #[inline]
121 pub fn new(span_id: SpanId) -> Self {
122 Self { span_id }
123 }
124
125 /// Get the span ID for this execution context.
126 #[inline]
127 pub fn span_id(&self) -> SpanId {
128 self.span_id
129 }
130}
131
132/// Result of polling a query, containing the value and its revision.
133///
134/// This is returned by [`QueryRuntime::poll`] and provides both the query result
135/// and its change revision, enabling efficient change detection for subscription
136/// patterns.
137///
138/// # Example
139///
140/// ```ignore
141/// let result = runtime.poll(MyQuery::new())?;
142///
143/// // Access the value via Deref
144/// println!("{:?}", *result);
145///
146/// // Check if changed since last poll
147/// if result.revision > last_known_revision {
148/// notify_subscribers(&result.value);
149/// last_known_revision = result.revision;
150/// }
151/// ```
152#[derive(Debug, Clone)]
153pub struct Polled<T> {
154 /// The query result value.
155 pub value: T,
156 /// The revision at which this value was last changed.
157 ///
158 /// Compare this with a previously stored revision to detect changes.
159 pub revision: RevisionCounter,
160}
161
162impl<T: Deref> Deref for Polled<T> {
163 type Target = T::Target;
164
165 fn deref(&self) -> &Self::Target {
166 &self.value
167 }
168}
169
170/// The query runtime manages query execution, caching, and dependency tracking.
171///
172/// This is cheap to clone - all data is behind `Arc`.
173///
174/// # Type Parameter
175///
176/// - `T: Tracer` - The tracer type for observability. Use `NoopTracer` (default)
177/// for zero-cost when tracing is not needed.
178///
179/// # Example
180///
181/// ```ignore
182/// // Without tracing (default)
183/// let runtime = QueryRuntime::new();
184///
185/// // With tracing
186/// let tracer = MyTracer::new();
187/// let runtime = QueryRuntime::with_tracer(tracer);
188///
189/// // Sync query execution
190/// let result = runtime.query(MyQuery { ... })?;
191/// ```
192pub struct QueryRuntime<T: Tracer = NoopTracer> {
193 /// Whale runtime for dependency tracking and cache storage.
194 /// Query outputs and asset values are stored in Node.data as Option<CachedEntry>.
195 whale: WhaleRuntime<FullCacheKey, Option<CachedEntry>, DURABILITY_LEVELS>,
196 /// Registered asset locators
197 locators: Arc<LocatorStorage<T>>,
198 /// Pending asset requests
199 pending: Arc<PendingStorage>,
200 /// Registry for tracking query instances (for list_queries)
201 query_registry: Arc<QueryRegistry>,
202 /// Registry for tracking asset keys (for list_asset_keys)
203 asset_key_registry: Arc<AssetKeyRegistry>,
204 /// Verifiers for re-executing queries (for verify-then-decide pattern)
205 verifiers: Arc<VerifierStorage>,
206 /// Comparator for user errors during early cutoff
207 error_comparator: ErrorComparator,
208 /// Tracer for observability
209 tracer: Arc<T>,
210}
211
212#[test]
213fn test_runtime_send_sync() {
214 fn assert_send_sync<T: Send + Sync>() {}
215 assert_send_sync::<QueryRuntime<NoopTracer>>();
216}
217
218impl Default for QueryRuntime<NoopTracer> {
219 fn default() -> Self {
220 Self::new()
221 }
222}
223
224impl<T: Tracer> Clone for QueryRuntime<T> {
225 fn clone(&self) -> Self {
226 Self {
227 whale: self.whale.clone(),
228 locators: self.locators.clone(),
229 pending: self.pending.clone(),
230 query_registry: self.query_registry.clone(),
231 asset_key_registry: self.asset_key_registry.clone(),
232 verifiers: self.verifiers.clone(),
233 error_comparator: self.error_comparator,
234 tracer: self.tracer.clone(),
235 }
236 }
237}
238
239/// Default error comparator that treats all errors as different.
240///
241/// This is conservative - it always triggers recomputation when an error occurs.
242fn default_error_comparator(_a: &anyhow::Error, _b: &anyhow::Error) -> bool {
243 false
244}
245
246impl<T: Tracer> QueryRuntime<T> {
247 /// Get cached output along with its revision (single atomic access).
248 fn get_cached_with_revision<Q: Query>(
249 &self,
250 key: &FullCacheKey,
251 ) -> Option<(CachedValue<Arc<Q::Output>>, RevisionCounter)> {
252 let node = self.whale.get(key)?;
253 let revision = node.changed_at;
254 let entry = node.data.as_ref()?;
255 let cached = entry.to_cached_value::<Q::Output>()?;
256 Some((cached, revision))
257 }
258
259 /// Get a reference to the tracer.
260 #[inline]
261 pub fn tracer(&self) -> &T {
262 &self.tracer
263 }
264}
265
266impl QueryRuntime<NoopTracer> {
267 /// Create a new query runtime with default settings.
268 pub fn new() -> Self {
269 Self::with_tracer(NoopTracer)
270 }
271
272 /// Create a builder for customizing the runtime.
273 ///
274 /// # Example
275 ///
276 /// ```ignore
277 /// let runtime = QueryRuntime::builder()
278 /// .error_comparator(|a, b| {
279 /// // Custom error comparison logic
280 /// match (a.downcast_ref::<MyError>(), b.downcast_ref::<MyError>()) {
281 /// (Some(a), Some(b)) => a == b,
282 /// _ => false,
283 /// }
284 /// })
285 /// .build();
286 /// ```
287 pub fn builder() -> QueryRuntimeBuilder<NoopTracer> {
288 QueryRuntimeBuilder::new()
289 }
290}
291
292impl<T: Tracer> QueryRuntime<T> {
293 /// Create a new query runtime with the specified tracer.
294 pub fn with_tracer(tracer: T) -> Self {
295 QueryRuntimeBuilder::new().tracer(tracer).build()
296 }
297
298 /// Execute a query synchronously.
299 ///
300 /// Returns the cached result if valid, otherwise executes the query.
301 ///
302 /// # Errors
303 ///
304 /// - `QueryError::Suspend` - Query is waiting for async loading
305 /// - `QueryError::Cycle` - Dependency cycle detected
306 pub fn query<Q: Query>(&self, query: Q) -> Result<Arc<Q::Output>, QueryError> {
307 self.query_internal(query)
308 .and_then(|(inner_result, _)| inner_result.map_err(QueryError::UserError))
309 }
310
311 /// Internal implementation shared by query() and poll().
312 ///
313 /// Returns (result, revision) tuple where result is either Ok(output) or Err(user_error).
314 /// System errors (Suspend, Cycle, etc.) are returned as the outer Err.
315 #[allow(clippy::type_complexity)]
316 fn query_internal<Q: Query>(
317 &self,
318 query: Q,
319 ) -> Result<(Result<Arc<Q::Output>, Arc<anyhow::Error>>, RevisionCounter), QueryError> {
320 let key = query.cache_key();
321 let full_key = FullCacheKey::new::<Q, _>(&key);
322
323 // Emit query key event for GC tracking (before other events)
324 self.tracer.on_query_key(&full_key);
325
326 // Create execution context and emit start event
327 let span_id = self.tracer.new_span_id();
328 let exec_ctx = ExecutionContext::new(span_id);
329 let query_key = TracerQueryKey::new(std::any::type_name::<Q>(), full_key.debug_repr());
330
331 self.tracer.on_query_start(span_id, query_key.clone());
332
333 // Check for cycles using thread-local stack
334 let cycle_detected = QUERY_STACK.with(|stack| {
335 let stack = stack.borrow();
336 stack.iter().any(|k| k == &full_key)
337 });
338
339 if cycle_detected {
340 let path = QUERY_STACK.with(|stack| {
341 let stack = stack.borrow();
342 let mut path: Vec<String> =
343 stack.iter().map(|k| k.debug_repr().to_string()).collect();
344 path.push(full_key.debug_repr().to_string());
345 path
346 });
347
348 self.tracer.on_cycle_detected(
349 path.iter()
350 .map(|s| TracerQueryKey::new("", s.clone()))
351 .collect(),
352 );
353 self.tracer
354 .on_query_end(span_id, query_key.clone(), ExecutionResult::CycleDetected);
355
356 return Err(QueryError::Cycle { path });
357 }
358
359 // Check if cached and valid (with verify-then-decide pattern)
360 let current_rev = self.whale.current_revision();
361
362 // Fast path: already verified at current revision
363 if self.whale.is_verified_at(&full_key, ¤t_rev) {
364 // Single atomic access to get both cached value and revision
365 if let Some((cached, revision)) = self.get_cached_with_revision::<Q>(&full_key) {
366 self.tracer.on_cache_check(span_id, query_key.clone(), true);
367 self.tracer
368 .on_query_end(span_id, query_key.clone(), ExecutionResult::CacheHit);
369
370 return match cached {
371 CachedValue::Ok(output) => Ok((Ok(output), revision)),
372 CachedValue::UserError(err) => Ok((Err(err), revision)),
373 };
374 }
375 }
376
377 // Check shallow validity (deps' changed_at ok) and try verify-then-decide
378 if self.whale.is_valid(&full_key) {
379 // Single atomic access to get both cached value and revision
380 if let Some((cached, revision)) = self.get_cached_with_revision::<Q>(&full_key) {
381 // Shallow valid but not verified - verify deps first
382 let mut deps_verified = true;
383 if let Some(deps) = self.whale.get_dependency_ids(&full_key) {
384 for dep in deps {
385 if let Some(verifier) = self.verifiers.get(&dep) {
386 // Re-run query/asset to verify it (triggers recursive verification)
387 // For assets, this re-accesses the asset which may re-run the locator
388 if verifier.verify(self as &dyn std::any::Any).is_err() {
389 deps_verified = false;
390 break;
391 }
392 }
393 // Note: deps without verifiers are assumed valid (they're verified
394 // by the final is_valid check if their changed_at increased)
395 }
396 }
397
398 // Re-check validity after deps are verified
399 if deps_verified && self.whale.is_valid(&full_key) {
400 // Deps didn't change their changed_at, mark as verified and use cached
401 self.whale.mark_verified(&full_key, ¤t_rev);
402
403 self.tracer.on_cache_check(span_id, query_key.clone(), true);
404 self.tracer
405 .on_query_end(span_id, query_key.clone(), ExecutionResult::CacheHit);
406
407 return match cached {
408 CachedValue::Ok(output) => Ok((Ok(output), revision)),
409 CachedValue::UserError(err) => Ok((Err(err), revision)),
410 };
411 }
412 // A dep's changed_at increased, fall through to execute
413 }
414 }
415
416 self.tracer
417 .on_cache_check(span_id, query_key.clone(), false);
418
419 // Execute the query with cycle tracking
420 let _guard = StackGuard::push(full_key.clone());
421 let result = self.execute_query::<Q>(&query, &full_key, exec_ctx);
422 drop(_guard);
423
424 // Emit end event
425 let exec_result = match &result {
426 Ok((_, true, _)) => ExecutionResult::Changed,
427 Ok((_, false, _)) => ExecutionResult::Unchanged,
428 Err(QueryError::Suspend { .. }) => ExecutionResult::Suspended,
429 Err(QueryError::Cycle { .. }) => ExecutionResult::CycleDetected,
430 Err(e) => ExecutionResult::Error {
431 message: format!("{:?}", e),
432 },
433 };
434 self.tracer
435 .on_query_end(span_id, query_key.clone(), exec_result);
436
437 result.map(|(inner_result, _, revision)| (inner_result, revision))
438 }
439
440 /// Execute a query, caching the result if appropriate.
441 ///
442 /// Returns (result, output_changed, revision) tuple.
443 /// - `result`: Ok(output) for success, Err(user_error) for user errors
444 /// - System errors (Suspend, Cycle, etc.) are returned as outer Err
445 #[allow(clippy::type_complexity)]
446 fn execute_query<Q: Query>(
447 &self,
448 query: &Q,
449 full_key: &FullCacheKey,
450 exec_ctx: ExecutionContext,
451 ) -> Result<
452 (
453 Result<Arc<Q::Output>, Arc<anyhow::Error>>,
454 bool,
455 RevisionCounter,
456 ),
457 QueryError,
458 > {
459 // Capture current global revision at query start for consistency checking
460 let start_revision = self.whale.current_revision().get(Durability::volatile());
461
462 // Create consistency tracker for this query execution
463 let tracker = Rc::new(ConsistencyTracker::new(start_revision));
464
465 // Set thread-local tracker for nested locator calls
466 let _tracker_guard = ConsistencyTrackerGuard::new(tracker);
467
468 // Create context for this query execution
469 let ctx = QueryContext {
470 runtime: self,
471 current_key: full_key.clone(),
472 parent_query_type: std::any::type_name::<Q>(),
473 exec_ctx,
474 deps: RefCell::new(Vec::new()),
475 };
476
477 // Execute the query (clone because query() takes ownership)
478 let result = query.clone().query(&ctx);
479
480 // Get collected dependencies
481 let deps: Vec<FullCacheKey> = ctx.deps.borrow().clone();
482
483 // Query durability defaults to stable - Whale will automatically reduce
484 // the effective durability to min(requested, min(dep_durabilities)).
485 // A pure query with no dependencies remains stable.
486 // A query depending on volatile assets becomes volatile.
487 let durability = Durability::stable();
488
489 match result {
490 Ok(output) => {
491 // Check if output changed (for early cutoff)
492 // existing_revision is Some only when output is unchanged (can reuse revision)
493 let existing_revision = if let Some((CachedValue::Ok(old), rev)) =
494 self.get_cached_with_revision::<Q>(full_key)
495 {
496 if Q::output_eq(&*old, &*output) {
497 Some(rev) // Same output - reuse revision
498 } else {
499 None // Different output
500 }
501 } else {
502 None // No previous Ok value
503 };
504 let output_changed = existing_revision.is_none();
505
506 // Emit early cutoff check event
507 self.tracer.on_early_cutoff_check(
508 exec_ctx.span_id(),
509 TracerQueryKey::new(std::any::type_name::<Q>(), full_key.debug_repr()),
510 output_changed,
511 );
512
513 // Update whale with cached entry (atomic update of value + dependency state)
514 let entry = CachedEntry::Ok(output.clone() as Arc<dyn std::any::Any + Send + Sync>);
515 let revision = if let Some(existing_rev) = existing_revision {
516 // confirm_unchanged doesn't change changed_at, use existing
517 let _ = self.whale.confirm_unchanged(full_key, deps);
518 existing_rev
519 } else {
520 // Use new_rev from register result
521 match self
522 .whale
523 .register(full_key.clone(), Some(entry), durability, deps)
524 {
525 Ok(result) => result.new_rev,
526 Err(missing) => {
527 return Err(QueryError::DependenciesRemoved {
528 missing_keys: missing,
529 })
530 }
531 }
532 };
533
534 // Register query in registry for list_queries
535 let is_new_query = self.query_registry.register(query);
536 if is_new_query {
537 let sentinel = FullCacheKey::query_set_sentinel::<Q>();
538 let _ = self
539 .whale
540 .register(sentinel, None, Durability::stable(), vec![]);
541 }
542
543 // Store verifier for this query (for verify-then-decide pattern)
544 self.verifiers
545 .insert::<Q, T>(full_key.clone(), query.clone());
546
547 Ok((Ok(output), output_changed, revision))
548 }
549 Err(QueryError::UserError(err)) => {
550 // Check if error changed (for early cutoff)
551 // existing_revision is Some only when error is unchanged (can reuse revision)
552 let existing_revision = if let Some((CachedValue::UserError(old_err), rev)) =
553 self.get_cached_with_revision::<Q>(full_key)
554 {
555 if (self.error_comparator)(old_err.as_ref(), err.as_ref()) {
556 Some(rev) // Same error - reuse revision
557 } else {
558 None // Different error
559 }
560 } else {
561 None // No previous UserError
562 };
563 let output_changed = existing_revision.is_none();
564
565 // Emit early cutoff check event
566 self.tracer.on_early_cutoff_check(
567 exec_ctx.span_id(),
568 TracerQueryKey::new(std::any::type_name::<Q>(), full_key.debug_repr()),
569 output_changed,
570 );
571
572 // Update whale with cached error (atomic update of value + dependency state)
573 let entry = CachedEntry::UserError(err.clone());
574 let revision = if let Some(existing_rev) = existing_revision {
575 // confirm_unchanged doesn't change changed_at, use existing
576 let _ = self.whale.confirm_unchanged(full_key, deps);
577 existing_rev
578 } else {
579 // Use new_rev from register result
580 match self
581 .whale
582 .register(full_key.clone(), Some(entry), durability, deps)
583 {
584 Ok(result) => result.new_rev,
585 Err(missing) => {
586 return Err(QueryError::DependenciesRemoved {
587 missing_keys: missing,
588 })
589 }
590 }
591 };
592
593 // Register query in registry for list_queries
594 let is_new_query = self.query_registry.register(query);
595 if is_new_query {
596 let sentinel = FullCacheKey::query_set_sentinel::<Q>();
597 let _ = self
598 .whale
599 .register(sentinel, None, Durability::stable(), vec![]);
600 }
601
602 // Store verifier for this query (for verify-then-decide pattern)
603 self.verifiers
604 .insert::<Q, T>(full_key.clone(), query.clone());
605
606 Ok((Err(err), output_changed, revision))
607 }
608 Err(e) => {
609 // System errors (Suspend, Cycle, Cancelled, MissingDependency) are not cached
610 Err(e)
611 }
612 }
613 }
614
615 /// Invalidate a query, forcing recomputation on next access.
616 ///
617 /// This also invalidates any queries that depend on this one.
618 pub fn invalidate<Q: Query>(&self, key: &Q::CacheKey) {
619 let full_key = FullCacheKey::new::<Q, _>(key);
620
621 self.tracer.on_query_invalidated(
622 TracerQueryKey::new(std::any::type_name::<Q>(), full_key.debug_repr()),
623 InvalidationReason::ManualInvalidation,
624 );
625
626 // Update whale to invalidate dependents (register with None to clear cached value)
627 // Use stable durability to increment all revision counters, ensuring queries
628 // at any durability level will see this as a change.
629 let _ = self
630 .whale
631 .register(full_key, None, Durability::stable(), vec![]);
632 }
633
634 /// Remove a query from the cache entirely, freeing memory.
635 ///
636 /// Use this for GC when a query is no longer needed.
637 /// Unlike `invalidate`, this removes all traces of the query from storage.
638 /// The query will be recomputed from scratch on next access.
639 ///
640 /// This also invalidates any queries that depend on this one.
641 pub fn remove_query<Q: Query>(&self, key: &Q::CacheKey) {
642 let full_key = FullCacheKey::new::<Q, _>(key);
643
644 self.tracer.on_query_invalidated(
645 TracerQueryKey::new(std::any::type_name::<Q>(), full_key.debug_repr()),
646 InvalidationReason::ManualInvalidation,
647 );
648
649 // Remove verifier if exists
650 self.verifiers.remove(&full_key);
651
652 // Remove from whale storage (this also handles dependent invalidation)
653 self.whale.remove(&full_key);
654
655 // Remove from registry and update sentinel for list_queries
656 if self.query_registry.remove::<Q>(key) {
657 let sentinel = FullCacheKey::query_set_sentinel::<Q>();
658 let _ = self
659 .whale
660 .register(sentinel, None, Durability::stable(), vec![]);
661 }
662 }
663
664 /// Clear all cached values by removing all nodes from whale.
665 ///
666 /// Note: This is a relatively expensive operation as it iterates through all keys.
667 pub fn clear_cache(&self) {
668 let keys = self.whale.keys();
669 for key in keys {
670 self.whale.remove(&key);
671 }
672 }
673
674 /// Poll a query, returning both the result and its change revision.
675 ///
676 /// This is useful for implementing subscription patterns where you need to
677 /// detect changes efficiently. Compare the returned `revision` with a
678 /// previously stored value to determine if the query result has changed.
679 ///
680 /// The returned `Polled` contains a `Result<Arc<Q::Output>, Arc<anyhow::Error>>`
681 /// as its value, allowing you to track revision changes for both success and
682 /// user error cases.
683 ///
684 /// # Example
685 ///
686 /// ```ignore
687 /// struct Subscription<Q: Query> {
688 /// query: Q,
689 /// last_revision: RevisionCounter,
690 /// tx: Sender<Result<Arc<Q::Output>, Arc<anyhow::Error>>>,
691 /// }
692 ///
693 /// // Polling loop
694 /// for sub in &mut subscriptions {
695 /// let result = runtime.poll(sub.query.clone())?;
696 /// if result.revision > sub.last_revision {
697 /// sub.tx.send(result.value.clone())?;
698 /// sub.last_revision = result.revision;
699 /// }
700 /// }
701 /// ```
702 ///
703 /// # Errors
704 ///
705 /// Returns `Err` only for system errors (Suspend, Cycle, etc.).
706 /// User errors are returned as `Ok(Polled { value: Err(error), ... })`.
707 #[allow(clippy::type_complexity)]
708 pub fn poll<Q: Query>(
709 &self,
710 query: Q,
711 ) -> Result<Polled<Result<Arc<Q::Output>, Arc<anyhow::Error>>>, QueryError> {
712 let (value, revision) = self.query_internal(query)?;
713 Ok(Polled { value, revision })
714 }
715
716 /// Get the change revision of a query without executing it.
717 ///
718 /// Returns `None` if the query has never been executed.
719 ///
720 /// This is useful for checking if a query has changed since the last poll
721 /// without the cost of executing the query.
722 ///
723 /// # Example
724 ///
725 /// ```ignore
726 /// // Check if query has changed before deciding to poll
727 /// if let Some(rev) = runtime.changed_at::<MyQuery>(&key) {
728 /// if rev > last_known_revision {
729 /// let result = runtime.query(MyQuery::new(key))?;
730 /// // Process result...
731 /// }
732 /// }
733 /// ```
734 pub fn changed_at<Q: Query>(&self, key: &Q::CacheKey) -> Option<RevisionCounter> {
735 let full_key = FullCacheKey::new::<Q, _>(key);
736 self.whale.get(&full_key).map(|node| node.changed_at)
737 }
738}
739
740// ============================================================================
741// GC (Garbage Collection) API
742// ============================================================================
743
744impl<T: Tracer> QueryRuntime<T> {
745 /// Get all query keys currently in the cache.
746 ///
747 /// This is useful for implementing custom garbage collection strategies.
748 /// Use this in combination with [`Tracer::on_query_key`] to track access
749 /// times and implement LRU, TTL, or other GC algorithms externally.
750 ///
751 /// # Example
752 ///
753 /// ```ignore
754 /// // Collect all keys that haven't been accessed recently
755 /// let stale_keys: Vec<_> = runtime.query_keys()
756 /// .filter(|key| tracker.is_stale(key))
757 /// .collect();
758 ///
759 /// // Remove stale queries that have no dependents
760 /// for key in stale_keys {
761 /// runtime.remove_if_unused(&key);
762 /// }
763 /// ```
764 pub fn query_keys(&self) -> Vec<FullCacheKey> {
765 self.whale.keys()
766 }
767
768 /// Remove a query if it has no dependents.
769 ///
770 /// Returns `true` if the query was removed, `false` if it has dependents
771 /// or doesn't exist. This is the safe way to remove queries during GC,
772 /// as it won't break queries that depend on this one.
773 ///
774 /// # Example
775 ///
776 /// ```ignore
777 /// let key = FullCacheKey::new::<MyQuery, _>(&cache_key);
778 /// if runtime.remove_query_if_unused::<MyQuery>(&cache_key) {
779 /// println!("Query removed");
780 /// } else {
781 /// println!("Query has dependents, not removed");
782 /// }
783 /// ```
784 pub fn remove_query_if_unused<Q: Query>(&self, key: &Q::CacheKey) -> bool {
785 let full_key = FullCacheKey::new::<Q, _>(key);
786 self.remove_if_unused(&full_key)
787 }
788
789 /// Remove a query by its [`FullCacheKey`].
790 ///
791 /// This is the type-erased version of [`remove_query`](Self::remove_query).
792 /// Use this when you have a `FullCacheKey` from [`query_keys`](Self::query_keys)
793 /// or [`Tracer::on_query_key`].
794 ///
795 /// Returns `true` if the query was removed, `false` if it doesn't exist.
796 ///
797 /// # Warning
798 ///
799 /// This forcibly removes the query even if other queries depend on it.
800 /// Dependent queries will be recomputed on next access. For safe GC,
801 /// use [`remove_if_unused`](Self::remove_if_unused) instead.
802 pub fn remove(&self, key: &FullCacheKey) -> bool {
803 // Remove verifier if exists
804 self.verifiers.remove(key);
805
806 // Remove from whale storage
807 self.whale.remove(key).is_some()
808 }
809
810 /// Remove a query by its [`FullCacheKey`] if it has no dependents.
811 ///
812 /// This is the type-erased version of [`remove_query_if_unused`](Self::remove_query_if_unused).
813 /// Use this when you have a `FullCacheKey` from [`query_keys`](Self::query_keys)
814 /// or [`Tracer::on_query_key`].
815 ///
816 /// Returns `true` if the query was removed, `false` if it has dependents
817 /// or doesn't exist.
818 ///
819 /// # Example
820 ///
821 /// ```ignore
822 /// // Implement LRU GC
823 /// for key in runtime.query_keys() {
824 /// if tracker.is_expired(&key) {
825 /// runtime.remove_if_unused(&key);
826 /// }
827 /// }
828 /// ```
829 pub fn remove_if_unused(&self, key: &FullCacheKey) -> bool {
830 if self.whale.remove_if_unused(key.clone()).is_some() {
831 // Successfully removed - clean up verifier
832 self.verifiers.remove(key);
833 true
834 } else {
835 false
836 }
837 }
838}
839
840// ============================================================================
841// Builder
842// ============================================================================
843
844/// Builder for [`QueryRuntime`] with customizable settings.
845///
846/// # Example
847///
848/// ```ignore
849/// let runtime = QueryRuntime::builder()
850/// .error_comparator(|a, b| {
851/// // Treat all errors of the same type as equal
852/// a.downcast_ref::<std::io::Error>().is_some()
853/// == b.downcast_ref::<std::io::Error>().is_some()
854/// })
855/// .build();
856/// ```
857pub struct QueryRuntimeBuilder<T: Tracer = NoopTracer> {
858 error_comparator: ErrorComparator,
859 tracer: T,
860}
861
862impl Default for QueryRuntimeBuilder<NoopTracer> {
863 fn default() -> Self {
864 Self::new()
865 }
866}
867
868impl QueryRuntimeBuilder<NoopTracer> {
869 /// Create a new builder with default settings.
870 pub fn new() -> Self {
871 Self {
872 error_comparator: default_error_comparator,
873 tracer: NoopTracer,
874 }
875 }
876}
877
878impl<T: Tracer> QueryRuntimeBuilder<T> {
879 /// Set the error comparator function for early cutoff optimization.
880 ///
881 /// When a query returns `QueryError::UserError`, this function is used
882 /// to compare it with the previously cached error. If they are equal,
883 /// downstream queries can skip recomputation (early cutoff).
884 ///
885 /// The default comparator returns `false` for all errors, meaning errors
886 /// are always considered different (conservative, always recomputes).
887 ///
888 /// # Example
889 ///
890 /// ```ignore
891 /// // Treat errors as equal if they have the same display message
892 /// let runtime = QueryRuntime::builder()
893 /// .error_comparator(|a, b| a.to_string() == b.to_string())
894 /// .build();
895 /// ```
896 pub fn error_comparator(mut self, f: ErrorComparator) -> Self {
897 self.error_comparator = f;
898 self
899 }
900
901 /// Set the tracer for observability.
902 pub fn tracer<U: Tracer>(self, tracer: U) -> QueryRuntimeBuilder<U> {
903 QueryRuntimeBuilder {
904 error_comparator: self.error_comparator,
905 tracer,
906 }
907 }
908
909 /// Build the runtime with the configured settings.
910 pub fn build(self) -> QueryRuntime<T> {
911 QueryRuntime {
912 whale: WhaleRuntime::new(),
913 locators: Arc::new(LocatorStorage::new()),
914 pending: Arc::new(PendingStorage::new()),
915 query_registry: Arc::new(QueryRegistry::new()),
916 asset_key_registry: Arc::new(AssetKeyRegistry::new()),
917 verifiers: Arc::new(VerifierStorage::new()),
918 error_comparator: self.error_comparator,
919 tracer: Arc::new(self.tracer),
920 }
921 }
922}
923
924// ============================================================================
925// Asset API
926// ============================================================================
927
928impl<T: Tracer> QueryRuntime<T> {
929 /// Register an asset locator for a specific asset key type.
930 ///
931 /// Only one locator can be registered per key type. Later registrations
932 /// replace earlier ones.
933 ///
934 /// # Example
935 ///
936 /// ```ignore
937 /// let runtime = QueryRuntime::new();
938 /// runtime.register_asset_locator(FileSystemLocator::new("/assets"));
939 /// ```
940 pub fn register_asset_locator<K, L>(&self, locator: L)
941 where
942 K: AssetKey,
943 L: AssetLocator<K>,
944 {
945 self.locators.insert::<K, L>(locator);
946 }
947
948 /// Get an iterator over pending asset requests.
949 ///
950 /// Returns assets that have been requested but not yet resolved.
951 /// The user should fetch these externally and call `resolve_asset()`.
952 ///
953 /// # Example
954 ///
955 /// ```ignore
956 /// for pending in runtime.pending_assets() {
957 /// if let Some(path) = pending.key::<FilePath>() {
958 /// let content = fetch_file(path);
959 /// runtime.resolve_asset(path.clone(), content);
960 /// }
961 /// }
962 /// ```
963 pub fn pending_assets(&self) -> Vec<PendingAsset> {
964 self.pending.get_all()
965 }
966
967 /// Get pending assets filtered by key type.
968 pub fn pending_assets_of<K: AssetKey>(&self) -> Vec<K> {
969 self.pending.get_of_type::<K>()
970 }
971
972 /// Check if there are any pending assets.
973 pub fn has_pending_assets(&self) -> bool {
974 !self.pending.is_empty()
975 }
976
977 /// Resolve an asset with its loaded value.
978 ///
979 /// This marks the asset as ready and invalidates any queries that
980 /// depend on it (if the value changed), triggering recomputation on next access.
981 ///
982 /// This method is idempotent - resolving with the same value (via `asset_eq`)
983 /// will not trigger downstream recomputation.
984 ///
985 /// # Arguments
986 ///
987 /// * `key` - The asset key identifying this resource
988 /// * `value` - The loaded asset value
989 /// * `durability` - How frequently this asset is expected to change
990 ///
991 /// # Example
992 ///
993 /// ```ignore
994 /// let content = std::fs::read_to_string(&path)?;
995 /// runtime.resolve_asset(FilePath(path), content, DurabilityLevel::Volatile);
996 /// ```
997 pub fn resolve_asset<K: AssetKey>(&self, key: K, value: K::Asset, durability: DurabilityLevel) {
998 self.resolve_asset_internal(key, value, durability);
999 }
1000
1001 /// Resolve an asset with an error.
1002 ///
1003 /// This marks the asset as errored and caches the error. Queries depending
1004 /// on this asset will receive `Err(QueryError::UserError(...))`.
1005 ///
1006 /// Use this when async loading fails (e.g., network error, file not found,
1007 /// access denied).
1008 ///
1009 /// # Arguments
1010 ///
1011 /// * `key` - The asset key identifying this resource
1012 /// * `error` - The error to cache (will be wrapped in `Arc`)
1013 /// * `durability` - How frequently this error state is expected to change
1014 ///
1015 /// # Example
1016 ///
1017 /// ```ignore
1018 /// match fetch_file(&path) {
1019 /// Ok(content) => runtime.resolve_asset(FilePath(path), content, DurabilityLevel::Volatile),
1020 /// Err(e) => runtime.resolve_asset_error(FilePath(path), e, DurabilityLevel::Volatile),
1021 /// }
1022 /// ```
1023 pub fn resolve_asset_error<K: AssetKey>(
1024 &self,
1025 key: K,
1026 error: impl Into<anyhow::Error>,
1027 durability: DurabilityLevel,
1028 ) {
1029 let full_asset_key = FullAssetKey::new(&key);
1030 let full_cache_key = FullCacheKey::from_asset_key(&full_asset_key);
1031
1032 // Remove from pending BEFORE registering the error
1033 self.pending.remove(&full_asset_key);
1034
1035 // Prepare the error entry
1036 let error_arc = Arc::new(error.into());
1037 let entry = CachedEntry::AssetError(error_arc.clone());
1038 let durability =
1039 Durability::new(durability.as_u8() as usize).unwrap_or(Durability::volatile());
1040
1041 // Atomic compare-and-update (errors are always considered changed for now)
1042 let result = self
1043 .whale
1044 .update_with_compare(
1045 full_cache_key,
1046 Some(entry),
1047 |old_data, _new_data| {
1048 // Compare old and new errors using error_comparator
1049 match old_data.and_then(|d| d.as_ref()) {
1050 Some(CachedEntry::AssetError(old_err)) => {
1051 !(self.error_comparator)(old_err.as_ref(), error_arc.as_ref())
1052 }
1053 _ => true, // Loading, Ready, or not present -> changed
1054 }
1055 },
1056 durability,
1057 vec![],
1058 )
1059 .expect("update_with_compare with no dependencies cannot fail");
1060
1061 // Emit asset resolved event (with changed status)
1062 self.tracer.on_asset_resolved(
1063 TracerAssetKey::new(std::any::type_name::<K>(), format!("{:?}", key)),
1064 result.changed,
1065 );
1066
1067 // Register asset key in registry for list_asset_keys
1068 let is_new_asset = self.asset_key_registry.register(&key);
1069 if is_new_asset {
1070 // Update sentinel to invalidate list_asset_keys dependents
1071 let sentinel = FullCacheKey::asset_key_set_sentinel::<K>();
1072 let _ = self
1073 .whale
1074 .register(sentinel, None, Durability::stable(), vec![]);
1075 }
1076 }
1077
1078 fn resolve_asset_internal<K: AssetKey>(
1079 &self,
1080 key: K,
1081 value: K::Asset,
1082 durability_level: DurabilityLevel,
1083 ) {
1084 let full_asset_key = FullAssetKey::new(&key);
1085 let full_cache_key = FullCacheKey::from_asset_key(&full_asset_key);
1086
1087 // Remove from pending BEFORE registering the value
1088 self.pending.remove(&full_asset_key);
1089
1090 // Prepare the new entry
1091 let value_arc: Arc<K::Asset> = Arc::new(value);
1092 let entry = CachedEntry::AssetReady(value_arc.clone() as Arc<dyn Any + Send + Sync>);
1093 let durability =
1094 Durability::new(durability_level.as_u8() as usize).unwrap_or(Durability::volatile());
1095
1096 // Atomic compare-and-update
1097 let result = self
1098 .whale
1099 .update_with_compare(
1100 full_cache_key,
1101 Some(entry),
1102 |old_data, _new_data| {
1103 // Compare old and new values
1104 match old_data.and_then(|d| d.as_ref()) {
1105 Some(CachedEntry::AssetReady(old_arc)) => {
1106 match old_arc.clone().downcast::<K::Asset>() {
1107 Ok(old_value) => !K::asset_eq(&old_value, &value_arc),
1108 Err(_) => true, // Type mismatch, treat as changed
1109 }
1110 }
1111 _ => true, // Loading, NotFound, or not present -> changed
1112 }
1113 },
1114 durability,
1115 vec![],
1116 )
1117 .expect("update_with_compare with no dependencies cannot fail");
1118
1119 // Emit asset resolved event
1120 self.tracer.on_asset_resolved(
1121 TracerAssetKey::new(std::any::type_name::<K>(), format!("{:?}", key)),
1122 result.changed,
1123 );
1124
1125 // Register asset key in registry for list_asset_keys
1126 let is_new_asset = self.asset_key_registry.register(&key);
1127 if is_new_asset {
1128 // Update sentinel to invalidate list_asset_keys dependents
1129 let sentinel = FullCacheKey::asset_key_set_sentinel::<K>();
1130 let _ = self
1131 .whale
1132 .register(sentinel, None, Durability::stable(), vec![]);
1133 }
1134 }
1135
1136 /// Invalidate an asset, forcing queries to re-request it.
1137 ///
1138 /// The asset will be marked as loading and added to pending assets.
1139 /// Dependent queries will suspend until the asset is resolved again.
1140 ///
1141 /// # Example
1142 ///
1143 /// ```ignore
1144 /// // File was modified externally
1145 /// runtime.invalidate_asset(&FilePath("config.json".into()));
1146 /// // Queries depending on this asset will now suspend
1147 /// // User should fetch the new value and call resolve_asset
1148 /// ```
1149 pub fn invalidate_asset<K: AssetKey>(&self, key: &K) {
1150 let full_asset_key = FullAssetKey::new(key);
1151 let full_cache_key = FullCacheKey::from_asset_key(&full_asset_key);
1152
1153 // Emit asset invalidated event
1154 self.tracer.on_asset_invalidated(TracerAssetKey::new(
1155 std::any::type_name::<K>(),
1156 format!("{:?}", key),
1157 ));
1158
1159 // Add to pending FIRST (before clearing whale state)
1160 // This ensures: readers see either old value, or Loading+pending
1161 self.pending.insert::<K>(full_asset_key, key.clone());
1162
1163 // Atomic: clear cached value + invalidate dependents
1164 // Using None for data means "needs to be loaded"
1165 // Use stable durability to ensure queries at any durability level see the change.
1166 let _ = self
1167 .whale
1168 .register(full_cache_key, None, Durability::stable(), vec![]);
1169 }
1170
1171 /// Remove an asset from the cache entirely.
1172 ///
1173 /// Unlike `invalidate_asset`, this removes all traces of the asset.
1174 /// Dependent queries will go through the locator again on next access.
1175 pub fn remove_asset<K: AssetKey>(&self, key: &K) {
1176 let full_asset_key = FullAssetKey::new(key);
1177 let full_cache_key = FullCacheKey::from_asset_key(&full_asset_key);
1178
1179 // Remove from pending first
1180 self.pending.remove(&full_asset_key);
1181
1182 // Remove from whale (this also cleans up dependency edges)
1183 // whale.remove() invalidates dependents before removing
1184 self.whale.remove(&full_cache_key);
1185
1186 // Remove from registry and update sentinel for list_asset_keys
1187 if self.asset_key_registry.remove::<K>(key) {
1188 let sentinel = FullCacheKey::asset_key_set_sentinel::<K>();
1189 let _ = self
1190 .whale
1191 .register(sentinel, None, Durability::stable(), vec![]);
1192 }
1193 }
1194
1195 /// Get an asset by key without tracking dependencies.
1196 ///
1197 /// Unlike `QueryContext::asset()`, this method does NOT register the caller
1198 /// as a dependent of the asset. Use this for direct asset access outside
1199 /// of query execution.
1200 ///
1201 /// # Returns
1202 ///
1203 /// - `Ok(AssetLoadingState::ready(...))` - Asset is loaded and ready
1204 /// - `Ok(AssetLoadingState::loading(...))` - Asset is still loading (added to pending)
1205 /// - `Err(QueryError::MissingDependency)` - Asset was not found
1206 pub fn get_asset<K: AssetKey>(&self, key: K) -> Result<AssetLoadingState<K>, QueryError> {
1207 self.get_asset_internal(key)
1208 }
1209
1210 /// Internal: Get asset state and its changed_at revision atomically.
1211 ///
1212 /// This is the "direct" version called from QueryRuntime::asset (no dependency tracking).
1213 /// For calls from QueryContext::asset, use `get_asset_with_revision_ctx`.
1214 ///
1215 /// Returns (AssetLoadingState, changed_at) where changed_at is from the same
1216 /// whale node that provided the asset value.
1217 fn get_asset_with_revision<K: AssetKey>(
1218 &self,
1219 key: K,
1220 ) -> Result<(AssetLoadingState<K>, RevisionCounter), QueryError> {
1221 let full_asset_key = FullAssetKey::new(&key);
1222 let full_cache_key = FullCacheKey::from_asset_key(&full_asset_key);
1223
1224 // Helper to emit AssetRequested event
1225 let emit_requested = |tracer: &T, key: &K, state: TracerAssetState| {
1226 tracer.on_asset_requested(
1227 TracerAssetKey::new(std::any::type_name::<K>(), format!("{:?}", key)),
1228 state,
1229 );
1230 };
1231
1232 // Check whale cache first (single atomic read)
1233 if let Some(node) = self.whale.get(&full_cache_key) {
1234 let changed_at = node.changed_at;
1235 // Check if valid at current revision (shallow check)
1236 if self.whale.is_valid(&full_cache_key) {
1237 // Verify dependencies recursively (like query path does)
1238 let mut deps_verified = true;
1239 if let Some(deps) = self.whale.get_dependency_ids(&full_cache_key) {
1240 for dep in deps {
1241 if let Some(verifier) = self.verifiers.get(&dep) {
1242 // Re-run query/asset to verify it (triggers recursive verification)
1243 if verifier.verify(self as &dyn std::any::Any).is_err() {
1244 deps_verified = false;
1245 break;
1246 }
1247 }
1248 }
1249 }
1250
1251 // Re-check validity after deps are verified
1252 if deps_verified && self.whale.is_valid(&full_cache_key) {
1253 // For cached entries, check consistency for leaf assets (no locator deps).
1254 // This detects if resolve_asset/resolve_asset_error was called during query execution.
1255 let has_locator_deps = self
1256 .whale
1257 .get_dependency_ids(&full_cache_key)
1258 .is_some_and(|deps| !deps.is_empty());
1259
1260 match &node.data {
1261 Some(CachedEntry::AssetReady(arc)) => {
1262 // Check consistency for cached leaf assets
1263 if !has_locator_deps {
1264 check_leaf_asset_consistency(changed_at)?;
1265 }
1266 emit_requested(&self.tracer, &key, TracerAssetState::Ready);
1267 match arc.clone().downcast::<K::Asset>() {
1268 Ok(value) => {
1269 return Ok((AssetLoadingState::ready(key, value), changed_at))
1270 }
1271 Err(_) => {
1272 return Err(QueryError::MissingDependency {
1273 description: format!("Asset type mismatch: {:?}", key),
1274 })
1275 }
1276 }
1277 }
1278 Some(CachedEntry::AssetError(err)) => {
1279 // Check consistency for cached leaf errors
1280 if !has_locator_deps {
1281 check_leaf_asset_consistency(changed_at)?;
1282 }
1283 emit_requested(&self.tracer, &key, TracerAssetState::NotFound);
1284 return Err(QueryError::UserError(err.clone()));
1285 }
1286 None => {
1287 // Loading state - no value to be inconsistent with
1288 emit_requested(&self.tracer, &key, TracerAssetState::Loading);
1289 return Ok((AssetLoadingState::loading(key), changed_at));
1290 }
1291 _ => {
1292 // Query-related entries (Ok, UserError) shouldn't be here
1293 // Fall through to locator
1294 }
1295 }
1296 }
1297 }
1298 }
1299
1300 // Not in cache or invalid - try locator
1301 // Use LocatorContext to track deps on the asset itself
1302 check_cycle(&full_cache_key)?;
1303 let _guard = StackGuard::push(full_cache_key.clone());
1304
1305 let locator_ctx = LocatorContext::new(self, full_cache_key.clone());
1306 let locator_result =
1307 self.locators
1308 .locate_with_locator_ctx(TypeId::of::<K>(), &locator_ctx, &key);
1309
1310 if let Some(result) = locator_result {
1311 // Get collected dependencies from the locator context
1312 let locator_deps = locator_ctx.into_deps();
1313 match result {
1314 Ok(ErasedLocateResult::Ready {
1315 value: arc,
1316 durability: durability_level,
1317 }) => {
1318 emit_requested(&self.tracer, &key, TracerAssetState::Ready);
1319
1320 let typed_value: Arc<K::Asset> = match arc.downcast::<K::Asset>() {
1321 Ok(v) => v,
1322 Err(_) => {
1323 return Err(QueryError::MissingDependency {
1324 description: format!("Asset type mismatch: {:?}", key),
1325 });
1326 }
1327 };
1328
1329 // Store in whale atomically with early cutoff
1330 // Include locator's dependencies so the asset is invalidated when they change
1331 let entry = CachedEntry::AssetReady(typed_value.clone());
1332 let durability = Durability::new(durability_level.as_u8() as usize)
1333 .unwrap_or(Durability::volatile());
1334 let new_value = typed_value.clone();
1335 let result = self
1336 .whale
1337 .update_with_compare(
1338 full_cache_key.clone(),
1339 Some(entry),
1340 |old_data, _new_data| {
1341 let Some(CachedEntry::AssetReady(old_arc)) =
1342 old_data.and_then(|d| d.as_ref())
1343 else {
1344 return true;
1345 };
1346 let Ok(old_value) = old_arc.clone().downcast::<K::Asset>() else {
1347 return true;
1348 };
1349 !K::asset_eq(&old_value, &new_value)
1350 },
1351 durability,
1352 locator_deps,
1353 )
1354 .expect("update_with_compare should succeed");
1355
1356 // Register verifier for this asset (for verify-then-decide pattern)
1357 self.verifiers
1358 .insert_asset::<K, T>(full_cache_key, key.clone());
1359
1360 return Ok((AssetLoadingState::ready(key, typed_value), result.revision));
1361 }
1362 Ok(ErasedLocateResult::Pending) => {
1363 emit_requested(&self.tracer, &key, TracerAssetState::Loading);
1364
1365 // Add to pending list for Pending result
1366 self.pending.insert::<K>(full_asset_key, key.clone());
1367 match self
1368 .whale
1369 .get_or_insert(full_cache_key, None, Durability::volatile(), locator_deps)
1370 .expect("get_or_insert should succeed")
1371 {
1372 GetOrInsertResult::Inserted(node) => {
1373 return Ok((AssetLoadingState::loading(key), node.changed_at));
1374 }
1375 GetOrInsertResult::Existing(node) => {
1376 let changed_at = node.changed_at;
1377 match &node.data {
1378 Some(CachedEntry::AssetReady(arc)) => {
1379 match arc.clone().downcast::<K::Asset>() {
1380 Ok(value) => {
1381 return Ok((
1382 AssetLoadingState::ready(key, value),
1383 changed_at,
1384 ))
1385 }
1386 Err(_) => {
1387 return Ok((
1388 AssetLoadingState::loading(key),
1389 changed_at,
1390 ))
1391 }
1392 }
1393 }
1394 Some(CachedEntry::AssetError(err)) => {
1395 return Err(QueryError::UserError(err.clone()));
1396 }
1397 _ => return Ok((AssetLoadingState::loading(key), changed_at)),
1398 }
1399 }
1400 }
1401 }
1402 Err(QueryError::UserError(err)) => {
1403 // Locator returned a user error - cache it as AssetError
1404 let entry = CachedEntry::AssetError(err.clone());
1405 let _ = self.whale.register(
1406 full_cache_key,
1407 Some(entry),
1408 Durability::volatile(),
1409 locator_deps,
1410 );
1411 return Err(QueryError::UserError(err));
1412 }
1413 Err(e) => {
1414 // Other errors (Cycle, Suspended, etc.) - do NOT cache, propagate directly
1415 return Err(e);
1416 }
1417 }
1418 }
1419
1420 // No locator registered or locator returned None - mark as pending
1421 // (no locator was called, so no deps to track)
1422 emit_requested(&self.tracer, &key, TracerAssetState::Loading);
1423 self.pending
1424 .insert::<K>(full_asset_key.clone(), key.clone());
1425
1426 match self
1427 .whale
1428 .get_or_insert(full_cache_key, None, Durability::volatile(), vec![])
1429 .expect("get_or_insert with no dependencies cannot fail")
1430 {
1431 GetOrInsertResult::Inserted(node) => {
1432 Ok((AssetLoadingState::loading(key), node.changed_at))
1433 }
1434 GetOrInsertResult::Existing(node) => {
1435 let changed_at = node.changed_at;
1436 match &node.data {
1437 Some(CachedEntry::AssetReady(arc)) => {
1438 match arc.clone().downcast::<K::Asset>() {
1439 Ok(value) => Ok((AssetLoadingState::ready(key, value), changed_at)),
1440 Err(_) => Ok((AssetLoadingState::loading(key), changed_at)),
1441 }
1442 }
1443 Some(CachedEntry::AssetError(err)) => Err(QueryError::UserError(err.clone())),
1444 _ => Ok((AssetLoadingState::loading(key), changed_at)),
1445 }
1446 }
1447 }
1448 }
1449
1450 /// Internal: Get asset state and its changed_at revision atomically (with QueryContext).
1451 ///
1452 /// This version is called from QueryContext::asset. Consistency checking for
1453 /// cached leaf assets is done inside this function before returning.
1454 fn get_asset_with_revision_ctx<K: AssetKey>(
1455 &self,
1456 key: K,
1457 _ctx: &QueryContext<'_, T>,
1458 ) -> Result<(AssetLoadingState<K>, RevisionCounter), QueryError> {
1459 let full_asset_key = FullAssetKey::new(&key);
1460 let full_cache_key = FullCacheKey::from_asset_key(&full_asset_key);
1461
1462 // Helper to emit AssetRequested event
1463 let emit_requested = |tracer: &T, key: &K, state: TracerAssetState| {
1464 tracer.on_asset_requested(
1465 TracerAssetKey::new(std::any::type_name::<K>(), format!("{:?}", key)),
1466 state,
1467 );
1468 };
1469
1470 // Check whale cache first (single atomic read)
1471 if let Some(node) = self.whale.get(&full_cache_key) {
1472 let changed_at = node.changed_at;
1473 // Check if valid at current revision (shallow check)
1474 if self.whale.is_valid(&full_cache_key) {
1475 // Verify dependencies recursively (like query path does)
1476 let mut deps_verified = true;
1477 if let Some(deps) = self.whale.get_dependency_ids(&full_cache_key) {
1478 for dep in deps {
1479 if let Some(verifier) = self.verifiers.get(&dep) {
1480 // Re-run query/asset to verify it (triggers recursive verification)
1481 if verifier.verify(self as &dyn std::any::Any).is_err() {
1482 deps_verified = false;
1483 break;
1484 }
1485 }
1486 }
1487 }
1488
1489 // Re-check validity after deps are verified
1490 if deps_verified && self.whale.is_valid(&full_cache_key) {
1491 // For cached entries, check consistency for leaf assets (no locator deps).
1492 // This detects if resolve_asset/resolve_asset_error was called during query execution.
1493 let has_locator_deps = self
1494 .whale
1495 .get_dependency_ids(&full_cache_key)
1496 .is_some_and(|deps| !deps.is_empty());
1497
1498 match &node.data {
1499 Some(CachedEntry::AssetReady(arc)) => {
1500 // Check consistency for cached leaf assets
1501 if !has_locator_deps {
1502 check_leaf_asset_consistency(changed_at)?;
1503 }
1504 emit_requested(&self.tracer, &key, TracerAssetState::Ready);
1505 match arc.clone().downcast::<K::Asset>() {
1506 Ok(value) => {
1507 return Ok((AssetLoadingState::ready(key, value), changed_at))
1508 }
1509 Err(_) => {
1510 return Err(QueryError::MissingDependency {
1511 description: format!("Asset type mismatch: {:?}", key),
1512 })
1513 }
1514 }
1515 }
1516 Some(CachedEntry::AssetError(err)) => {
1517 // Check consistency for cached leaf errors
1518 if !has_locator_deps {
1519 check_leaf_asset_consistency(changed_at)?;
1520 }
1521 emit_requested(&self.tracer, &key, TracerAssetState::NotFound);
1522 return Err(QueryError::UserError(err.clone()));
1523 }
1524 None => {
1525 // Loading state - no value to be inconsistent with
1526 emit_requested(&self.tracer, &key, TracerAssetState::Loading);
1527 return Ok((AssetLoadingState::loading(key), changed_at));
1528 }
1529 _ => {
1530 // Query-related entries (Ok, UserError) shouldn't be here
1531 // Fall through to locator
1532 }
1533 }
1534 }
1535 }
1536 }
1537
1538 // Not in cache or invalid - try locator
1539 // Use LocatorContext to track deps on the asset itself (not the calling query)
1540 // Consistency tracking is handled via thread-local storage
1541 check_cycle(&full_cache_key)?;
1542 let _guard = StackGuard::push(full_cache_key.clone());
1543
1544 let locator_ctx = LocatorContext::new(self, full_cache_key.clone());
1545 let locator_result =
1546 self.locators
1547 .locate_with_locator_ctx(TypeId::of::<K>(), &locator_ctx, &key);
1548
1549 if let Some(result) = locator_result {
1550 // Get collected dependencies from the locator context
1551 let locator_deps = locator_ctx.into_deps();
1552 match result {
1553 Ok(ErasedLocateResult::Ready {
1554 value: arc,
1555 durability: durability_level,
1556 }) => {
1557 emit_requested(&self.tracer, &key, TracerAssetState::Ready);
1558
1559 let typed_value: Arc<K::Asset> = match arc.downcast::<K::Asset>() {
1560 Ok(v) => v,
1561 Err(_) => {
1562 return Err(QueryError::MissingDependency {
1563 description: format!("Asset type mismatch: {:?}", key),
1564 });
1565 }
1566 };
1567
1568 // Store in whale atomically with early cutoff
1569 // Include locator's dependencies so the asset is invalidated when they change
1570 let entry = CachedEntry::AssetReady(typed_value.clone());
1571 let durability = Durability::new(durability_level.as_u8() as usize)
1572 .unwrap_or(Durability::volatile());
1573 let new_value = typed_value.clone();
1574 let result = self
1575 .whale
1576 .update_with_compare(
1577 full_cache_key.clone(),
1578 Some(entry),
1579 |old_data, _new_data| {
1580 let Some(CachedEntry::AssetReady(old_arc)) =
1581 old_data.and_then(|d| d.as_ref())
1582 else {
1583 return true;
1584 };
1585 let Ok(old_value) = old_arc.clone().downcast::<K::Asset>() else {
1586 return true;
1587 };
1588 !K::asset_eq(&old_value, &new_value)
1589 },
1590 durability,
1591 locator_deps,
1592 )
1593 .expect("update_with_compare should succeed");
1594
1595 // Register verifier for this asset (for verify-then-decide pattern)
1596 self.verifiers
1597 .insert_asset::<K, T>(full_cache_key, key.clone());
1598
1599 return Ok((AssetLoadingState::ready(key, typed_value), result.revision));
1600 }
1601 Ok(ErasedLocateResult::Pending) => {
1602 emit_requested(&self.tracer, &key, TracerAssetState::Loading);
1603
1604 // Add to pending list for Pending result
1605 self.pending.insert::<K>(full_asset_key, key.clone());
1606 match self
1607 .whale
1608 .get_or_insert(full_cache_key, None, Durability::volatile(), locator_deps)
1609 .expect("get_or_insert should succeed")
1610 {
1611 GetOrInsertResult::Inserted(node) => {
1612 return Ok((AssetLoadingState::loading(key), node.changed_at));
1613 }
1614 GetOrInsertResult::Existing(node) => {
1615 let changed_at = node.changed_at;
1616 match &node.data {
1617 Some(CachedEntry::AssetReady(arc)) => {
1618 match arc.clone().downcast::<K::Asset>() {
1619 Ok(value) => {
1620 return Ok((
1621 AssetLoadingState::ready(key, value),
1622 changed_at,
1623 ));
1624 }
1625 Err(_) => {
1626 return Ok((
1627 AssetLoadingState::loading(key),
1628 changed_at,
1629 ))
1630 }
1631 }
1632 }
1633 Some(CachedEntry::AssetError(err)) => {
1634 return Err(QueryError::UserError(err.clone()));
1635 }
1636 _ => return Ok((AssetLoadingState::loading(key), changed_at)),
1637 }
1638 }
1639 }
1640 }
1641 Err(QueryError::UserError(err)) => {
1642 // Locator returned a user error - cache it as AssetError
1643 let entry = CachedEntry::AssetError(err.clone());
1644 let _ = self.whale.register(
1645 full_cache_key,
1646 Some(entry),
1647 Durability::volatile(),
1648 locator_deps,
1649 );
1650 return Err(QueryError::UserError(err));
1651 }
1652 Err(e) => {
1653 // Other errors (Cycle, Suspended, etc.) - do NOT cache, propagate directly
1654 return Err(e);
1655 }
1656 }
1657 }
1658
1659 // No locator registered or locator returned None - mark as pending
1660 emit_requested(&self.tracer, &key, TracerAssetState::Loading);
1661 self.pending
1662 .insert::<K>(full_asset_key.clone(), key.clone());
1663
1664 match self
1665 .whale
1666 .get_or_insert(full_cache_key, None, Durability::volatile(), vec![])
1667 .expect("get_or_insert with no dependencies cannot fail")
1668 {
1669 GetOrInsertResult::Inserted(node) => {
1670 Ok((AssetLoadingState::loading(key), node.changed_at))
1671 }
1672 GetOrInsertResult::Existing(node) => {
1673 let changed_at = node.changed_at;
1674 match &node.data {
1675 Some(CachedEntry::AssetReady(arc)) => {
1676 match arc.clone().downcast::<K::Asset>() {
1677 Ok(value) => Ok((AssetLoadingState::ready(key, value), changed_at)),
1678 Err(_) => Ok((AssetLoadingState::loading(key), changed_at)),
1679 }
1680 }
1681 Some(CachedEntry::AssetError(err)) => Err(QueryError::UserError(err.clone())),
1682 _ => Ok((AssetLoadingState::loading(key), changed_at)),
1683 }
1684 }
1685 }
1686 }
1687
1688 /// Internal: Get asset state, checking cache and locator.
1689 fn get_asset_internal<K: AssetKey>(&self, key: K) -> Result<AssetLoadingState<K>, QueryError> {
1690 self.get_asset_with_revision(key).map(|(state, _)| state)
1691 }
1692}
1693
1694impl<T: Tracer> Db for QueryRuntime<T> {
1695 fn query<Q: Query>(&self, query: Q) -> Result<Arc<Q::Output>, QueryError> {
1696 QueryRuntime::query(self, query)
1697 }
1698
1699 fn asset<K: AssetKey>(&self, key: K) -> Result<Arc<K::Asset>, QueryError> {
1700 self.get_asset_internal(key)?.suspend()
1701 }
1702
1703 fn asset_state<K: AssetKey>(&self, key: K) -> Result<AssetLoadingState<K>, QueryError> {
1704 self.get_asset_internal(key)
1705 }
1706
1707 fn list_queries<Q: Query>(&self) -> Vec<Q> {
1708 self.query_registry.get_all::<Q>()
1709 }
1710
1711 fn list_asset_keys<K: AssetKey>(&self) -> Vec<K> {
1712 self.asset_key_registry.get_all::<K>()
1713 }
1714}
1715
1716/// Tracks consistency of leaf asset accesses during query execution.
1717///
1718/// A "leaf" asset is one without dependencies (externally resolved via `resolve_asset`).
1719/// This tracker ensures that all leaf assets accessed during a query execution
1720/// (including those accessed by locators) are consistent - i.e., none were modified
1721/// via `resolve_asset` mid-execution.
1722///
1723/// The tracker is shared across `QueryContext` and `LocatorContext` to propagate
1724/// consistency checking through the entire execution tree.
1725#[derive(Debug)]
1726pub(crate) struct ConsistencyTracker {
1727 /// Global revision at query start. All leaf assets must have changed_at <= this.
1728 start_revision: RevisionCounter,
1729}
1730
1731impl ConsistencyTracker {
1732 /// Create a new tracker with the given start revision.
1733 pub fn new(start_revision: RevisionCounter) -> Self {
1734 Self { start_revision }
1735 }
1736
1737 /// Check consistency for a leaf asset access.
1738 ///
1739 /// A leaf asset is consistent if its changed_at <= start_revision.
1740 /// This detects if resolve_asset was called during query execution.
1741 ///
1742 /// Returns Ok(()) if consistent, Err if inconsistent.
1743 pub fn check_leaf_asset(&self, dep_changed_at: RevisionCounter) -> Result<(), QueryError> {
1744 if dep_changed_at > self.start_revision {
1745 Err(QueryError::InconsistentAssetResolution)
1746 } else {
1747 Ok(())
1748 }
1749 }
1750}
1751
1752/// Context provided to queries during execution.
1753///
1754/// Use this to access dependencies via `query()`.
1755pub struct QueryContext<'a, T: Tracer = NoopTracer> {
1756 runtime: &'a QueryRuntime<T>,
1757 current_key: FullCacheKey,
1758 parent_query_type: &'static str,
1759 exec_ctx: ExecutionContext,
1760 deps: RefCell<Vec<FullCacheKey>>,
1761}
1762
1763impl<'a, T: Tracer> QueryContext<'a, T> {
1764 /// Query a dependency.
1765 ///
1766 /// The dependency is automatically tracked for invalidation.
1767 ///
1768 /// # Example
1769 ///
1770 /// ```ignore
1771 /// fn query(self, db: &impl Db) -> Result<Self::Output, QueryError> {
1772 /// let dep_result = db.query(OtherQuery { id: self.id })?;
1773 /// Ok(process(&dep_result))
1774 /// }
1775 /// ```
1776 pub fn query<Q: Query>(&self, query: Q) -> Result<Arc<Q::Output>, QueryError> {
1777 let key = query.cache_key();
1778 let full_key = FullCacheKey::new::<Q, _>(&key);
1779
1780 // Emit dependency registered event
1781 self.runtime.tracer.on_dependency_registered(
1782 self.exec_ctx.span_id(),
1783 TracerQueryKey::new(self.parent_query_type, self.current_key.debug_repr()),
1784 TracerQueryKey::new(std::any::type_name::<Q>(), full_key.debug_repr()),
1785 );
1786
1787 // Record this as a dependency
1788 self.deps.borrow_mut().push(full_key.clone());
1789
1790 // Execute the query
1791 self.runtime.query(query)
1792 }
1793
1794 /// Access an asset, tracking it as a dependency.
1795 ///
1796 /// Returns the asset value if ready, or `Err(QueryError::Suspend)` if still loading.
1797 /// Use this with the `?` operator for automatic suspension on loading.
1798 ///
1799 /// # Example
1800 ///
1801 /// ```ignore
1802 /// #[query]
1803 /// fn process_file(db: &impl Db, path: FilePath) -> Result<Output, QueryError> {
1804 /// let content = db.asset(path)?;
1805 /// // Process content...
1806 /// Ok(output)
1807 /// }
1808 /// ```
1809 ///
1810 /// # Errors
1811 ///
1812 /// - Returns `Err(QueryError::Suspend)` if the asset is still loading.
1813 /// - Returns `Err(QueryError::MissingDependency)` if the asset was not found.
1814 pub fn asset<K: AssetKey>(&self, key: K) -> Result<Arc<K::Asset>, QueryError> {
1815 self.asset_state(key)?.suspend()
1816 }
1817
1818 /// Access an asset's loading state, tracking it as a dependency.
1819 ///
1820 /// Unlike [`asset()`](Self::asset), this method returns the full loading state,
1821 /// allowing you to check if an asset is loading without triggering suspension.
1822 ///
1823 /// # Example
1824 ///
1825 /// ```ignore
1826 /// let state = db.asset_state(key)?;
1827 /// if state.is_loading() {
1828 /// // Handle loading case explicitly
1829 /// } else {
1830 /// let value = state.get().unwrap();
1831 /// }
1832 /// ```
1833 ///
1834 /// # Errors
1835 ///
1836 /// Returns `Err(QueryError::MissingDependency)` if the asset was not found.
1837 pub fn asset_state<K: AssetKey>(&self, key: K) -> Result<AssetLoadingState<K>, QueryError> {
1838 let full_asset_key = FullAssetKey::new(&key);
1839 let full_cache_key = FullCacheKey::from_asset_key(&full_asset_key);
1840
1841 // 1. Emit asset dependency registered event
1842 self.runtime.tracer.on_asset_dependency_registered(
1843 self.exec_ctx.span_id(),
1844 TracerQueryKey::new(self.parent_query_type, self.current_key.debug_repr()),
1845 TracerAssetKey::new(std::any::type_name::<K>(), format!("{:?}", key)),
1846 );
1847
1848 // 2. Record dependency on this asset
1849 self.deps.borrow_mut().push(full_cache_key);
1850
1851 // 3. Get asset from cache/locator
1852 // Consistency checking for cached leaf assets is done inside get_asset_with_revision_ctx
1853 let (state, _changed_at) = self.runtime.get_asset_with_revision_ctx(key, self)?;
1854
1855 Ok(state)
1856 }
1857
1858 /// List all query instances of type Q that have been registered.
1859 ///
1860 /// This method establishes a dependency on the "set" of queries of type Q.
1861 /// The calling query will be invalidated when:
1862 /// - A new query of type Q is first executed (added to set)
1863 ///
1864 /// The calling query will NOT be invalidated when:
1865 /// - An individual query of type Q has its value change
1866 ///
1867 /// # Example
1868 ///
1869 /// ```ignore
1870 /// #[query]
1871 /// fn all_results(db: &impl Db) -> Result<Vec<i32>, QueryError> {
1872 /// let queries = db.list_queries::<MyQuery>();
1873 /// let mut results = Vec::new();
1874 /// for q in queries {
1875 /// results.push(*db.query(q)?);
1876 /// }
1877 /// Ok(results)
1878 /// }
1879 /// ```
1880 pub fn list_queries<Q: Query>(&self) -> Vec<Q> {
1881 // Record dependency on the sentinel (set-level dependency)
1882 let sentinel = FullCacheKey::query_set_sentinel::<Q>();
1883
1884 self.runtime.tracer.on_dependency_registered(
1885 self.exec_ctx.span_id(),
1886 TracerQueryKey::new(self.parent_query_type, self.current_key.debug_repr()),
1887 TracerQueryKey::new("QuerySet", sentinel.debug_repr()),
1888 );
1889
1890 // Ensure sentinel exists in whale (for dependency tracking)
1891 if self.runtime.whale.get(&sentinel).is_none() {
1892 let _ =
1893 self.runtime
1894 .whale
1895 .register(sentinel.clone(), None, Durability::volatile(), vec![]);
1896 }
1897
1898 self.deps.borrow_mut().push(sentinel);
1899
1900 // Return all registered queries
1901 self.runtime.query_registry.get_all::<Q>()
1902 }
1903
1904 /// List all asset keys of type K that have been registered.
1905 ///
1906 /// This method establishes a dependency on the "set" of asset keys of type K.
1907 /// The calling query will be invalidated when:
1908 /// - A new asset of type K is resolved for the first time (added to set)
1909 /// - An asset of type K is removed via remove_asset
1910 ///
1911 /// The calling query will NOT be invalidated when:
1912 /// - An individual asset's value changes (use `db.asset()` for that)
1913 ///
1914 /// # Example
1915 ///
1916 /// ```ignore
1917 /// #[query]
1918 /// fn all_configs(db: &impl Db) -> Result<Vec<String>, QueryError> {
1919 /// let keys = db.list_asset_keys::<ConfigFile>();
1920 /// let mut contents = Vec::new();
1921 /// for key in keys {
1922 /// let content = db.asset(key)?;
1923 /// contents.push((*content).clone());
1924 /// }
1925 /// Ok(contents)
1926 /// }
1927 /// ```
1928 pub fn list_asset_keys<K: AssetKey>(&self) -> Vec<K> {
1929 // Record dependency on the sentinel (set-level dependency)
1930 let sentinel = FullCacheKey::asset_key_set_sentinel::<K>();
1931
1932 self.runtime.tracer.on_asset_dependency_registered(
1933 self.exec_ctx.span_id(),
1934 TracerQueryKey::new(self.parent_query_type, self.current_key.debug_repr()),
1935 TracerAssetKey::new("AssetKeySet", sentinel.debug_repr()),
1936 );
1937
1938 // Ensure sentinel exists in whale (for dependency tracking)
1939 if self.runtime.whale.get(&sentinel).is_none() {
1940 let _ =
1941 self.runtime
1942 .whale
1943 .register(sentinel.clone(), None, Durability::volatile(), vec![]);
1944 }
1945
1946 self.deps.borrow_mut().push(sentinel);
1947
1948 // Return all registered asset keys
1949 self.runtime.asset_key_registry.get_all::<K>()
1950 }
1951}
1952
1953impl<'a, T: Tracer> Db for QueryContext<'a, T> {
1954 fn query<Q: Query>(&self, query: Q) -> Result<Arc<Q::Output>, QueryError> {
1955 QueryContext::query(self, query)
1956 }
1957
1958 fn asset<K: AssetKey>(&self, key: K) -> Result<Arc<K::Asset>, QueryError> {
1959 QueryContext::asset(self, key)
1960 }
1961
1962 fn asset_state<K: AssetKey>(&self, key: K) -> Result<AssetLoadingState<K>, QueryError> {
1963 QueryContext::asset_state(self, key)
1964 }
1965
1966 fn list_queries<Q: Query>(&self) -> Vec<Q> {
1967 QueryContext::list_queries(self)
1968 }
1969
1970 fn list_asset_keys<K: AssetKey>(&self) -> Vec<K> {
1971 QueryContext::list_asset_keys(self)
1972 }
1973}
1974
1975/// Context for collecting dependencies during asset locator execution.
1976///
1977/// Unlike `QueryContext`, this is specifically for locators and does not
1978/// register dependencies on any parent query. Dependencies collected here
1979/// are stored with the asset itself.
1980pub(crate) struct LocatorContext<'a, T: Tracer> {
1981 runtime: &'a QueryRuntime<T>,
1982 deps: RefCell<Vec<FullCacheKey>>,
1983}
1984
1985impl<'a, T: Tracer> LocatorContext<'a, T> {
1986 /// Create a new locator context for the given asset key.
1987 ///
1988 /// Consistency tracking is handled via thread-local storage, so leaf asset
1989 /// accesses will be checked against any active tracker from a parent query.
1990 pub(crate) fn new(runtime: &'a QueryRuntime<T>, _asset_key: FullCacheKey) -> Self {
1991 Self {
1992 runtime,
1993 deps: RefCell::new(Vec::new()),
1994 }
1995 }
1996
1997 /// Consume this context and return the collected dependencies.
1998 pub(crate) fn into_deps(self) -> Vec<FullCacheKey> {
1999 self.deps.into_inner()
2000 }
2001}
2002
2003impl<T: Tracer> Db for LocatorContext<'_, T> {
2004 fn query<Q: Query>(&self, query: Q) -> Result<Arc<Q::Output>, QueryError> {
2005 let key = query.cache_key();
2006 let full_key = FullCacheKey::new::<Q, _>(&key);
2007
2008 // Record this as a dependency of the asset being located
2009 self.deps.borrow_mut().push(full_key);
2010
2011 // Execute the query via the runtime
2012 self.runtime.query(query)
2013 }
2014
2015 fn asset<K: AssetKey>(&self, key: K) -> Result<Arc<K::Asset>, QueryError> {
2016 self.asset_state(key)?.suspend()
2017 }
2018
2019 fn asset_state<K: AssetKey>(&self, key: K) -> Result<AssetLoadingState<K>, QueryError> {
2020 let full_asset_key = FullAssetKey::new(&key);
2021 let full_cache_key = FullCacheKey::from_asset_key(&full_asset_key);
2022
2023 // Record this as a dependency of the asset being located
2024 self.deps.borrow_mut().push(full_cache_key);
2025
2026 // Access the asset - consistency checking is done inside get_asset_with_revision
2027 let (state, _changed_at) = self.runtime.get_asset_with_revision(key)?;
2028
2029 Ok(state)
2030 }
2031
2032 fn list_queries<Q: Query>(&self) -> Vec<Q> {
2033 self.runtime.list_queries()
2034 }
2035
2036 fn list_asset_keys<K: AssetKey>(&self) -> Vec<K> {
2037 self.runtime.list_asset_keys()
2038 }
2039}
2040
2041#[cfg(test)]
2042mod tests {
2043 use super::*;
2044
2045 #[test]
2046 fn test_simple_query() {
2047 #[derive(Clone)]
2048 struct Add {
2049 a: i32,
2050 b: i32,
2051 }
2052
2053 impl Query for Add {
2054 type CacheKey = (i32, i32);
2055 type Output = i32;
2056
2057 fn cache_key(&self) -> Self::CacheKey {
2058 (self.a, self.b)
2059 }
2060
2061 fn query(self, _db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
2062 Ok(Arc::new(self.a + self.b))
2063 }
2064
2065 fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
2066 old == new
2067 }
2068 }
2069
2070 let runtime = QueryRuntime::new();
2071
2072 let result = runtime.query(Add { a: 1, b: 2 }).unwrap();
2073 assert_eq!(*result, 3);
2074
2075 // Second query should be cached
2076 let result2 = runtime.query(Add { a: 1, b: 2 }).unwrap();
2077 assert_eq!(*result2, 3);
2078 }
2079
2080 #[test]
2081 fn test_dependent_queries() {
2082 #[derive(Clone)]
2083 struct Base {
2084 value: i32,
2085 }
2086
2087 impl Query for Base {
2088 type CacheKey = i32;
2089 type Output = i32;
2090
2091 fn cache_key(&self) -> Self::CacheKey {
2092 self.value
2093 }
2094
2095 fn query(self, _db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
2096 Ok(Arc::new(self.value * 2))
2097 }
2098
2099 fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
2100 old == new
2101 }
2102 }
2103
2104 #[derive(Clone)]
2105 struct Derived {
2106 base_value: i32,
2107 }
2108
2109 impl Query for Derived {
2110 type CacheKey = i32;
2111 type Output = i32;
2112
2113 fn cache_key(&self) -> Self::CacheKey {
2114 self.base_value
2115 }
2116
2117 fn query(self, db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
2118 let base = db.query(Base {
2119 value: self.base_value,
2120 })?;
2121 Ok(Arc::new(*base + 10))
2122 }
2123
2124 fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
2125 old == new
2126 }
2127 }
2128
2129 let runtime = QueryRuntime::new();
2130
2131 let result = runtime.query(Derived { base_value: 5 }).unwrap();
2132 assert_eq!(*result, 20); // 5 * 2 + 10
2133 }
2134
2135 #[test]
2136 fn test_cycle_detection() {
2137 #[derive(Clone)]
2138 struct CycleA {
2139 id: i32,
2140 }
2141
2142 #[derive(Clone)]
2143 struct CycleB {
2144 id: i32,
2145 }
2146
2147 impl Query for CycleA {
2148 type CacheKey = i32;
2149 type Output = i32;
2150
2151 fn cache_key(&self) -> Self::CacheKey {
2152 self.id
2153 }
2154
2155 fn query(self, db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
2156 let b = db.query(CycleB { id: self.id })?;
2157 Ok(Arc::new(*b + 1))
2158 }
2159
2160 fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
2161 old == new
2162 }
2163 }
2164
2165 impl Query for CycleB {
2166 type CacheKey = i32;
2167 type Output = i32;
2168
2169 fn cache_key(&self) -> Self::CacheKey {
2170 self.id
2171 }
2172
2173 fn query(self, db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
2174 let a = db.query(CycleA { id: self.id })?;
2175 Ok(Arc::new(*a + 1))
2176 }
2177
2178 fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
2179 old == new
2180 }
2181 }
2182
2183 let runtime = QueryRuntime::new();
2184
2185 let result = runtime.query(CycleA { id: 1 });
2186 assert!(matches!(result, Err(QueryError::Cycle { .. })));
2187 }
2188
2189 #[test]
2190 fn test_fallible_query() {
2191 #[derive(Clone)]
2192 struct ParseInt {
2193 input: String,
2194 }
2195
2196 impl Query for ParseInt {
2197 type CacheKey = String;
2198 type Output = Result<i32, std::num::ParseIntError>;
2199
2200 fn cache_key(&self) -> Self::CacheKey {
2201 self.input.clone()
2202 }
2203
2204 fn query(self, _db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
2205 Ok(Arc::new(self.input.parse()))
2206 }
2207
2208 fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
2209 old == new
2210 }
2211 }
2212
2213 let runtime = QueryRuntime::new();
2214
2215 // Valid parse
2216 let result = runtime
2217 .query(ParseInt {
2218 input: "42".to_string(),
2219 })
2220 .unwrap();
2221 assert_eq!(*result, Ok(42));
2222
2223 // Invalid parse - system succeeds, user error in output
2224 let result = runtime
2225 .query(ParseInt {
2226 input: "not_a_number".to_string(),
2227 })
2228 .unwrap();
2229 assert!(result.is_err());
2230 }
2231
2232 // Macro tests
2233 mod macro_tests {
2234 use super::*;
2235 use crate::query;
2236
2237 #[query]
2238 fn add(db: &impl Db, a: i32, b: i32) -> Result<i32, QueryError> {
2239 let _ = db; // silence unused warning
2240 Ok(a + b)
2241 }
2242
2243 #[test]
2244 fn test_macro_basic() {
2245 let runtime = QueryRuntime::new();
2246 let result = runtime.query(Add::new(1, 2)).unwrap();
2247 assert_eq!(*result, 3);
2248 }
2249
2250 #[query]
2251 fn simple_double(db: &impl Db, x: i32) -> Result<i32, QueryError> {
2252 let _ = db;
2253 Ok(x * 2)
2254 }
2255
2256 #[test]
2257 fn test_macro_simple() {
2258 let runtime = QueryRuntime::new();
2259 let result = runtime.query(SimpleDouble::new(5)).unwrap();
2260 assert_eq!(*result, 10);
2261 }
2262
2263 #[query(keys(id))]
2264 fn with_key_selection(
2265 db: &impl Db,
2266 id: u32,
2267 include_extra: bool,
2268 ) -> Result<String, QueryError> {
2269 let _ = db;
2270 Ok(format!("id={}, extra={}", id, include_extra))
2271 }
2272
2273 #[test]
2274 fn test_macro_key_selection() {
2275 let runtime = QueryRuntime::new();
2276
2277 // Same id, different include_extra - should return cached
2278 let r1 = runtime.query(WithKeySelection::new(1, true)).unwrap();
2279 let r2 = runtime.query(WithKeySelection::new(1, false)).unwrap();
2280
2281 // Both should have same value because only `id` is the key
2282 assert_eq!(*r1, "id=1, extra=true");
2283 assert_eq!(*r2, "id=1, extra=true"); // Cached!
2284 }
2285
2286 #[query]
2287 fn dependent(db: &impl Db, a: i32, b: i32) -> Result<i32, QueryError> {
2288 let sum = db.query(Add::new(a, b))?;
2289 Ok(*sum * 2)
2290 }
2291
2292 #[test]
2293 fn test_macro_dependencies() {
2294 let runtime = QueryRuntime::new();
2295 let result = runtime.query(Dependent::new(3, 4)).unwrap();
2296 assert_eq!(*result, 14); // (3 + 4) * 2
2297 }
2298
2299 #[query(output_eq)]
2300 fn with_output_eq(db: &impl Db, x: i32) -> Result<i32, QueryError> {
2301 let _ = db;
2302 Ok(x * 2)
2303 }
2304
2305 #[test]
2306 fn test_macro_output_eq() {
2307 let runtime = QueryRuntime::new();
2308 let result = runtime.query(WithOutputEq::new(5)).unwrap();
2309 assert_eq!(*result, 10);
2310 }
2311
2312 #[query(name = "CustomName")]
2313 fn original_name(db: &impl Db, x: i32) -> Result<i32, QueryError> {
2314 let _ = db;
2315 Ok(x)
2316 }
2317
2318 #[test]
2319 fn test_macro_custom_name() {
2320 let runtime = QueryRuntime::new();
2321 let result = runtime.query(CustomName::new(42)).unwrap();
2322 assert_eq!(*result, 42);
2323 }
2324
2325 // Test that attribute macros like #[tracing::instrument] are preserved
2326 // We use #[allow(unused_variables)] and #[inline] as test attributes since
2327 // they don't require external dependencies.
2328 #[allow(unused_variables)]
2329 #[inline]
2330 #[query]
2331 fn with_attributes(db: &impl Db, x: i32) -> Result<i32, QueryError> {
2332 // This would warn without #[allow(unused_variables)] on the generated method
2333 let unused_var = 42;
2334 Ok(x * 2)
2335 }
2336
2337 #[test]
2338 fn test_macro_preserves_attributes() {
2339 let runtime = QueryRuntime::new();
2340 // If attributes weren't preserved, this might warn about unused_var
2341 let result = runtime.query(WithAttributes::new(5)).unwrap();
2342 assert_eq!(*result, 10);
2343 }
2344 }
2345
2346 // Tests for poll() and changed_at()
2347 mod poll_tests {
2348 use super::*;
2349
2350 #[derive(Clone)]
2351 struct Counter {
2352 id: i32,
2353 }
2354
2355 impl Query for Counter {
2356 type CacheKey = i32;
2357 type Output = i32;
2358
2359 fn cache_key(&self) -> Self::CacheKey {
2360 self.id
2361 }
2362
2363 fn query(self, _db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
2364 Ok(Arc::new(self.id * 10))
2365 }
2366
2367 fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
2368 old == new
2369 }
2370 }
2371
2372 #[test]
2373 fn test_poll_returns_value_and_revision() {
2374 let runtime = QueryRuntime::new();
2375
2376 let result = runtime.poll(Counter { id: 1 }).unwrap();
2377
2378 // Value should be correct - access through Result and Arc
2379 assert_eq!(**result.value.as_ref().unwrap(), 10);
2380
2381 // Revision should be non-zero after first execution
2382 assert!(result.revision > 0);
2383 }
2384
2385 #[test]
2386 fn test_poll_revision_stable_on_cache_hit() {
2387 let runtime = QueryRuntime::new();
2388
2389 // First poll
2390 let result1 = runtime.poll(Counter { id: 1 }).unwrap();
2391 let rev1 = result1.revision;
2392
2393 // Second poll (cache hit)
2394 let result2 = runtime.poll(Counter { id: 1 }).unwrap();
2395 let rev2 = result2.revision;
2396
2397 // Revision should be the same (no change)
2398 assert_eq!(rev1, rev2);
2399 }
2400
2401 #[test]
2402 fn test_poll_revision_changes_on_invalidate() {
2403 let runtime = QueryRuntime::new();
2404
2405 // First poll
2406 let result1 = runtime.poll(Counter { id: 1 }).unwrap();
2407 let rev1 = result1.revision;
2408
2409 // Invalidate and poll again
2410 runtime.invalidate::<Counter>(&1);
2411 let result2 = runtime.poll(Counter { id: 1 }).unwrap();
2412 let rev2 = result2.revision;
2413
2414 // Revision should increase (value was recomputed)
2415 // Note: Since output_eq returns true (same value), this might not change
2416 // depending on early cutoff behavior. Let's verify the value is still correct.
2417 assert_eq!(**result2.value.as_ref().unwrap(), 10);
2418
2419 // With early cutoff, revision might stay the same if value didn't change
2420 // This is expected behavior
2421 assert!(rev2 >= rev1);
2422 }
2423
2424 #[test]
2425 fn test_changed_at_returns_none_for_unexecuted_query() {
2426 let runtime = QueryRuntime::new();
2427
2428 // Query has never been executed
2429 let rev = runtime.changed_at::<Counter>(&1);
2430 assert!(rev.is_none());
2431 }
2432
2433 #[test]
2434 fn test_changed_at_returns_revision_after_execution() {
2435 let runtime = QueryRuntime::new();
2436
2437 // Execute the query
2438 let _ = runtime.query(Counter { id: 1 }).unwrap();
2439
2440 // Now changed_at should return Some
2441 let rev = runtime.changed_at::<Counter>(&1);
2442 assert!(rev.is_some());
2443 assert!(rev.unwrap() > 0);
2444 }
2445
2446 #[test]
2447 fn test_changed_at_matches_poll_revision() {
2448 let runtime = QueryRuntime::new();
2449
2450 // Poll the query
2451 let result = runtime.poll(Counter { id: 1 }).unwrap();
2452
2453 // changed_at should match the revision from poll
2454 let rev = runtime.changed_at::<Counter>(&1);
2455 assert_eq!(rev, Some(result.revision));
2456 }
2457
2458 #[test]
2459 fn test_poll_value_access() {
2460 let runtime = QueryRuntime::new();
2461
2462 let result = runtime.poll(Counter { id: 5 }).unwrap();
2463
2464 // Access through Result and Arc
2465 let value: &i32 = result.value.as_ref().unwrap();
2466 assert_eq!(*value, 50);
2467
2468 // Access Arc directly via field after unwrapping Result
2469 let arc: &Arc<i32> = result.value.as_ref().unwrap();
2470 assert_eq!(**arc, 50);
2471 }
2472
2473 #[test]
2474 fn test_subscription_pattern() {
2475 let runtime = QueryRuntime::new();
2476
2477 // Simulate subscription pattern
2478 let mut last_revision: RevisionCounter = 0;
2479 let mut notifications = 0;
2480
2481 // First poll - should notify (new value)
2482 let result = runtime.poll(Counter { id: 1 }).unwrap();
2483 if result.revision > last_revision {
2484 notifications += 1;
2485 last_revision = result.revision;
2486 }
2487
2488 // Second poll - should NOT notify (no change)
2489 let result = runtime.poll(Counter { id: 1 }).unwrap();
2490 if result.revision > last_revision {
2491 notifications += 1;
2492 last_revision = result.revision;
2493 }
2494
2495 // Third poll - should NOT notify (no change)
2496 let result = runtime.poll(Counter { id: 1 }).unwrap();
2497 if result.revision > last_revision {
2498 notifications += 1;
2499 #[allow(unused_assignments)]
2500 {
2501 last_revision = result.revision;
2502 }
2503 }
2504
2505 // Only the first poll should have triggered a notification
2506 assert_eq!(notifications, 1);
2507 }
2508 }
2509
2510 // Tests for GC APIs
2511 mod gc_tests {
2512 use super::*;
2513 use std::collections::HashSet;
2514 use std::sync::atomic::{AtomicUsize, Ordering};
2515 use std::sync::Mutex;
2516
2517 #[derive(Clone)]
2518 struct Leaf {
2519 id: i32,
2520 }
2521
2522 impl Query for Leaf {
2523 type CacheKey = i32;
2524 type Output = i32;
2525
2526 fn cache_key(&self) -> Self::CacheKey {
2527 self.id
2528 }
2529
2530 fn query(self, _db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
2531 Ok(Arc::new(self.id * 10))
2532 }
2533
2534 fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
2535 old == new
2536 }
2537 }
2538
2539 #[derive(Clone)]
2540 struct Parent {
2541 child_id: i32,
2542 }
2543
2544 impl Query for Parent {
2545 type CacheKey = i32;
2546 type Output = i32;
2547
2548 fn cache_key(&self) -> Self::CacheKey {
2549 self.child_id
2550 }
2551
2552 fn query(self, db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
2553 let child = db.query(Leaf { id: self.child_id })?;
2554 Ok(Arc::new(*child + 1))
2555 }
2556
2557 fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
2558 old == new
2559 }
2560 }
2561
2562 #[test]
2563 fn test_query_keys_returns_all_cached_queries() {
2564 let runtime = QueryRuntime::new();
2565
2566 // Execute some queries
2567 let _ = runtime.query(Leaf { id: 1 }).unwrap();
2568 let _ = runtime.query(Leaf { id: 2 }).unwrap();
2569 let _ = runtime.query(Leaf { id: 3 }).unwrap();
2570
2571 // Get all keys
2572 let keys = runtime.query_keys();
2573
2574 // Should have at least 3 keys (might have more due to sentinels)
2575 assert!(keys.len() >= 3);
2576 }
2577
2578 #[test]
2579 fn test_remove_removes_query() {
2580 let runtime = QueryRuntime::new();
2581
2582 // Execute a query
2583 let _ = runtime.query(Leaf { id: 1 }).unwrap();
2584
2585 // Get the key
2586 let full_key = FullCacheKey::new::<Leaf, _>(&1);
2587
2588 // Query should exist
2589 assert!(runtime.changed_at::<Leaf>(&1).is_some());
2590
2591 // Remove it
2592 assert!(runtime.remove(&full_key));
2593
2594 // Query should no longer exist
2595 assert!(runtime.changed_at::<Leaf>(&1).is_none());
2596 }
2597
2598 #[test]
2599 fn test_remove_if_unused_removes_leaf_query() {
2600 let runtime = QueryRuntime::new();
2601
2602 // Execute a leaf query (no dependents)
2603 let _ = runtime.query(Leaf { id: 1 }).unwrap();
2604
2605 // Should be removable since no other query depends on it
2606 assert!(runtime.remove_query_if_unused::<Leaf>(&1));
2607
2608 // Query should no longer exist
2609 assert!(runtime.changed_at::<Leaf>(&1).is_none());
2610 }
2611
2612 #[test]
2613 fn test_remove_if_unused_does_not_remove_query_with_dependents() {
2614 let runtime = QueryRuntime::new();
2615
2616 // Execute parent query (which depends on Leaf)
2617 let _ = runtime.query(Parent { child_id: 1 }).unwrap();
2618
2619 // Leaf query should not be removable since Parent depends on it
2620 assert!(!runtime.remove_query_if_unused::<Leaf>(&1));
2621
2622 // Leaf query should still exist
2623 assert!(runtime.changed_at::<Leaf>(&1).is_some());
2624
2625 // But Parent should be removable (no dependents)
2626 assert!(runtime.remove_query_if_unused::<Parent>(&1));
2627 }
2628
2629 #[test]
2630 fn test_remove_if_unused_with_full_cache_key() {
2631 let runtime = QueryRuntime::new();
2632
2633 // Execute a leaf query
2634 let _ = runtime.query(Leaf { id: 1 }).unwrap();
2635
2636 let full_key = FullCacheKey::new::<Leaf, _>(&1);
2637
2638 // Should be removable via FullCacheKey
2639 assert!(runtime.remove_if_unused(&full_key));
2640
2641 // Query should no longer exist
2642 assert!(runtime.changed_at::<Leaf>(&1).is_none());
2643 }
2644
2645 // Test tracer receives on_query_key calls
2646 struct GcTracker {
2647 accessed_keys: Mutex<HashSet<String>>,
2648 access_count: AtomicUsize,
2649 }
2650
2651 impl GcTracker {
2652 fn new() -> Self {
2653 Self {
2654 accessed_keys: Mutex::new(HashSet::new()),
2655 access_count: AtomicUsize::new(0),
2656 }
2657 }
2658 }
2659
2660 impl Tracer for GcTracker {
2661 fn new_span_id(&self) -> SpanId {
2662 SpanId(1)
2663 }
2664
2665 fn on_query_key(&self, full_key: &FullCacheKey) {
2666 self.accessed_keys
2667 .lock()
2668 .unwrap()
2669 .insert(full_key.debug_repr().to_string());
2670 self.access_count.fetch_add(1, Ordering::Relaxed);
2671 }
2672 }
2673
2674 #[test]
2675 fn test_tracer_receives_on_query_key() {
2676 let tracker = GcTracker::new();
2677 let runtime = QueryRuntime::with_tracer(tracker);
2678
2679 // Execute some queries
2680 let _ = runtime.query(Leaf { id: 1 }).unwrap();
2681 let _ = runtime.query(Leaf { id: 2 }).unwrap();
2682
2683 // Tracer should have received on_query_key calls
2684 let count = runtime.tracer().access_count.load(Ordering::Relaxed);
2685 assert_eq!(count, 2);
2686
2687 // Check that the keys were recorded
2688 let keys = runtime.tracer().accessed_keys.lock().unwrap();
2689 assert!(keys.iter().any(|k| k.contains("Leaf")));
2690 }
2691
2692 #[test]
2693 fn test_tracer_receives_on_query_key_for_cache_hits() {
2694 let tracker = GcTracker::new();
2695 let runtime = QueryRuntime::with_tracer(tracker);
2696
2697 // Execute query twice (second is cache hit)
2698 let _ = runtime.query(Leaf { id: 1 }).unwrap();
2699 let _ = runtime.query(Leaf { id: 1 }).unwrap();
2700
2701 // Tracer should have received on_query_key for both calls
2702 let count = runtime.tracer().access_count.load(Ordering::Relaxed);
2703 assert_eq!(count, 2);
2704 }
2705
2706 #[test]
2707 fn test_gc_workflow() {
2708 let tracker = GcTracker::new();
2709 let runtime = QueryRuntime::with_tracer(tracker);
2710
2711 // Execute some queries
2712 let _ = runtime.query(Leaf { id: 1 }).unwrap();
2713 let _ = runtime.query(Leaf { id: 2 }).unwrap();
2714 let _ = runtime.query(Leaf { id: 3 }).unwrap();
2715
2716 // Simulate GC: remove all queries that are not in use
2717 let mut removed = 0;
2718 for key in runtime.query_keys() {
2719 if runtime.remove_if_unused(&key) {
2720 removed += 1;
2721 }
2722 }
2723
2724 // All leaf queries should be removable
2725 assert!(removed >= 3);
2726
2727 // Queries should no longer exist
2728 assert!(runtime.changed_at::<Leaf>(&1).is_none());
2729 assert!(runtime.changed_at::<Leaf>(&2).is_none());
2730 assert!(runtime.changed_at::<Leaf>(&3).is_none());
2731 }
2732 }
2733}