query_flow/runtime.rs
1//! Query runtime and context.
2
3use std::any::{Any, TypeId};
4use std::cell::RefCell;
5use std::ops::Deref;
6use std::rc::Rc;
7use std::sync::Arc;
8
9use whale::{Durability, GetOrInsertResult, RevisionCounter, Runtime as WhaleRuntime};
10
11use crate::asset::{AssetKey, AssetLocator, DurabilityLevel, FullAssetKey, PendingAsset};
12use crate::db::Db;
13use crate::key::FullCacheKey;
14use crate::loading::AssetLoadingState;
15use crate::query::Query;
16use crate::storage::{
17 AssetKeyRegistry, CachedEntry, CachedValue, ErasedLocateResult, LocatorStorage, PendingStorage,
18 QueryRegistry, VerifierStorage,
19};
20use crate::tracer::{
21 ExecutionResult, InvalidationReason, NoopTracer, SpanId, Tracer, TracerAssetKey,
22 TracerAssetState, TracerQueryKey,
23};
24use crate::QueryError;
25
26/// Function type for comparing user errors during early cutoff.
27///
28/// Used by `QueryRuntimeBuilder::error_comparator` to customize how
29/// `QueryError::UserError` values are compared for caching purposes.
30pub type ErrorComparator = fn(&anyhow::Error, &anyhow::Error) -> bool;
31
32/// Number of durability levels (matches whale's default).
33const DURABILITY_LEVELS: usize = 4;
34
35// Thread-local query execution stack for cycle detection.
36thread_local! {
37 static QUERY_STACK: RefCell<Vec<FullCacheKey>> = const { RefCell::new(Vec::new()) };
38
39 /// Current consistency tracker for leaf asset checks.
40 /// Set during query execution, used by all nested locator calls.
41 /// Uses Rc since thread-local is single-threaded.
42 static CONSISTENCY_TRACKER: RefCell<Option<Rc<ConsistencyTracker>>> = const { RefCell::new(None) };
43}
44
45/// Check a leaf asset against the current consistency tracker (if any).
46/// Returns Ok if no tracker is set or if the check passes.
47fn check_leaf_asset_consistency(dep_changed_at: RevisionCounter) -> Result<(), QueryError> {
48 CONSISTENCY_TRACKER.with(|tracker| {
49 if let Some(ref t) = *tracker.borrow() {
50 t.check_leaf_asset(dep_changed_at)
51 } else {
52 Ok(())
53 }
54 })
55}
56
57/// RAII guard that sets the consistency tracker for the current thread.
58struct ConsistencyTrackerGuard {
59 previous: Option<Rc<ConsistencyTracker>>,
60}
61
62impl ConsistencyTrackerGuard {
63 fn new(tracker: Rc<ConsistencyTracker>) -> Self {
64 let previous = CONSISTENCY_TRACKER.with(|t| t.borrow_mut().replace(tracker));
65 Self { previous }
66 }
67}
68
69impl Drop for ConsistencyTrackerGuard {
70 fn drop(&mut self) {
71 CONSISTENCY_TRACKER.with(|t| {
72 *t.borrow_mut() = self.previous.take();
73 });
74 }
75}
76
77/// Check for cycles in the query stack and return error if detected.
78fn check_cycle(key: &FullCacheKey) -> Result<(), QueryError> {
79 let cycle_detected = QUERY_STACK.with(|stack| stack.borrow().iter().any(|k| k == key));
80 if cycle_detected {
81 let path = QUERY_STACK.with(|stack| {
82 let stack = stack.borrow();
83 let mut path: Vec<String> = stack.iter().map(|k| k.debug_repr().to_string()).collect();
84 path.push(key.debug_repr().to_string());
85 path
86 });
87 return Err(QueryError::Cycle { path });
88 }
89 Ok(())
90}
91
92/// RAII guard for pushing/popping from query stack.
93struct StackGuard;
94
95impl StackGuard {
96 fn push(key: FullCacheKey) -> Self {
97 QUERY_STACK.with(|stack| stack.borrow_mut().push(key));
98 StackGuard
99 }
100}
101
102impl Drop for StackGuard {
103 fn drop(&mut self) {
104 QUERY_STACK.with(|stack| {
105 stack.borrow_mut().pop();
106 });
107 }
108}
109
110/// Execution context passed through query execution.
111///
112/// Contains a span ID for tracing correlation.
113#[derive(Clone, Copy)]
114pub struct ExecutionContext {
115 span_id: SpanId,
116}
117
118impl ExecutionContext {
119 /// Create a new execution context with the given span ID.
120 #[inline]
121 pub fn new(span_id: SpanId) -> Self {
122 Self { span_id }
123 }
124
125 /// Get the span ID for this execution context.
126 #[inline]
127 pub fn span_id(&self) -> SpanId {
128 self.span_id
129 }
130}
131
132/// Result of polling a query, containing the value and its revision.
133///
134/// This is returned by [`QueryRuntime::poll`] and provides both the query result
135/// and its change revision, enabling efficient change detection for subscription
136/// patterns.
137///
138/// # Example
139///
140/// ```ignore
141/// let result = runtime.poll(MyQuery::new())?;
142///
143/// // Access the value via Deref
144/// println!("{:?}", *result);
145///
146/// // Check if changed since last poll
147/// if result.revision > last_known_revision {
148/// notify_subscribers(&result.value);
149/// last_known_revision = result.revision;
150/// }
151/// ```
152#[derive(Debug, Clone)]
153pub struct Polled<T> {
154 /// The query result value.
155 pub value: T,
156 /// The revision at which this value was last changed.
157 ///
158 /// Compare this with a previously stored revision to detect changes.
159 pub revision: RevisionCounter,
160}
161
162impl<T: Deref> Deref for Polled<T> {
163 type Target = T::Target;
164
165 fn deref(&self) -> &Self::Target {
166 &self.value
167 }
168}
169
170/// The query runtime manages query execution, caching, and dependency tracking.
171///
172/// This is cheap to clone - all data is behind `Arc`.
173///
174/// # Type Parameter
175///
176/// - `T: Tracer` - The tracer type for observability. Use `NoopTracer` (default)
177/// for zero-cost when tracing is not needed.
178///
179/// # Example
180///
181/// ```ignore
182/// // Without tracing (default)
183/// let runtime = QueryRuntime::new();
184///
185/// // With tracing
186/// let tracer = MyTracer::new();
187/// let runtime = QueryRuntime::with_tracer(tracer);
188///
189/// // Sync query execution
190/// let result = runtime.query(MyQuery { ... })?;
191/// ```
192pub struct QueryRuntime<T: Tracer = NoopTracer> {
193 /// Whale runtime for dependency tracking and cache storage.
194 /// Query outputs and asset values are stored in Node.data as Option<CachedEntry>.
195 whale: WhaleRuntime<FullCacheKey, Option<CachedEntry>, DURABILITY_LEVELS>,
196 /// Registered asset locators
197 locators: Arc<LocatorStorage<T>>,
198 /// Pending asset requests
199 pending: Arc<PendingStorage>,
200 /// Registry for tracking query instances (for list_queries)
201 query_registry: Arc<QueryRegistry>,
202 /// Registry for tracking asset keys (for list_asset_keys)
203 asset_key_registry: Arc<AssetKeyRegistry>,
204 /// Verifiers for re-executing queries (for verify-then-decide pattern)
205 verifiers: Arc<VerifierStorage>,
206 /// Comparator for user errors during early cutoff
207 error_comparator: ErrorComparator,
208 /// Tracer for observability
209 tracer: Arc<T>,
210}
211
212#[test]
213fn test_runtime_send_sync() {
214 fn assert_send_sync<T: Send + Sync>() {}
215 assert_send_sync::<QueryRuntime<NoopTracer>>();
216}
217
218impl Default for QueryRuntime<NoopTracer> {
219 fn default() -> Self {
220 Self::new()
221 }
222}
223
224impl<T: Tracer> Clone for QueryRuntime<T> {
225 fn clone(&self) -> Self {
226 Self {
227 whale: self.whale.clone(),
228 locators: self.locators.clone(),
229 pending: self.pending.clone(),
230 query_registry: self.query_registry.clone(),
231 asset_key_registry: self.asset_key_registry.clone(),
232 verifiers: self.verifiers.clone(),
233 error_comparator: self.error_comparator,
234 tracer: self.tracer.clone(),
235 }
236 }
237}
238
239/// Default error comparator that treats all errors as different.
240///
241/// This is conservative - it always triggers recomputation when an error occurs.
242fn default_error_comparator(_a: &anyhow::Error, _b: &anyhow::Error) -> bool {
243 false
244}
245
246impl<T: Tracer> QueryRuntime<T> {
247 /// Get cached output along with its revision (single atomic access).
248 fn get_cached_with_revision<Q: Query>(
249 &self,
250 key: &FullCacheKey,
251 ) -> Option<(CachedValue<Arc<Q::Output>>, RevisionCounter)> {
252 let node = self.whale.get(key)?;
253 let revision = node.changed_at;
254 let entry = node.data.as_ref()?;
255 let cached = entry.to_cached_value::<Q::Output>()?;
256 Some((cached, revision))
257 }
258
259 /// Get a reference to the tracer.
260 #[inline]
261 pub fn tracer(&self) -> &T {
262 &self.tracer
263 }
264}
265
266impl QueryRuntime<NoopTracer> {
267 /// Create a new query runtime with default settings.
268 pub fn new() -> Self {
269 Self::with_tracer(NoopTracer)
270 }
271
272 /// Create a builder for customizing the runtime.
273 ///
274 /// # Example
275 ///
276 /// ```ignore
277 /// let runtime = QueryRuntime::builder()
278 /// .error_comparator(|a, b| {
279 /// // Custom error comparison logic
280 /// match (a.downcast_ref::<MyError>(), b.downcast_ref::<MyError>()) {
281 /// (Some(a), Some(b)) => a == b,
282 /// _ => false,
283 /// }
284 /// })
285 /// .build();
286 /// ```
287 pub fn builder() -> QueryRuntimeBuilder<NoopTracer> {
288 QueryRuntimeBuilder::new()
289 }
290}
291
292impl<T: Tracer> QueryRuntime<T> {
293 /// Create a new query runtime with the specified tracer.
294 pub fn with_tracer(tracer: T) -> Self {
295 QueryRuntimeBuilder::new().tracer(tracer).build()
296 }
297
298 /// Execute a query synchronously.
299 ///
300 /// Returns the cached result if valid, otherwise executes the query.
301 ///
302 /// # Errors
303 ///
304 /// - `QueryError::Suspend` - Query is waiting for async loading
305 /// - `QueryError::Cycle` - Dependency cycle detected
306 pub fn query<Q: Query>(&self, query: Q) -> Result<Arc<Q::Output>, QueryError> {
307 self.query_internal(query)
308 .and_then(|(inner_result, _)| inner_result.map_err(QueryError::UserError))
309 }
310
311 /// Internal implementation shared by query() and poll().
312 ///
313 /// Returns (result, revision) tuple where result is either Ok(output) or Err(user_error).
314 /// System errors (Suspend, Cycle, etc.) are returned as the outer Err.
315 #[allow(clippy::type_complexity)]
316 fn query_internal<Q: Query>(
317 &self,
318 query: Q,
319 ) -> Result<(Result<Arc<Q::Output>, Arc<anyhow::Error>>, RevisionCounter), QueryError> {
320 let key = query.cache_key();
321 let full_key = FullCacheKey::new::<Q, _>(&key);
322
323 // Emit query key event for GC tracking (before other events)
324 self.tracer.on_query_key(&full_key);
325
326 // Create execution context and emit start event
327 let span_id = self.tracer.new_span_id();
328 let exec_ctx = ExecutionContext::new(span_id);
329 let query_key = TracerQueryKey::new(std::any::type_name::<Q>(), full_key.debug_repr());
330
331 self.tracer.on_query_start(span_id, query_key.clone());
332
333 // Check for cycles using thread-local stack
334 let cycle_detected = QUERY_STACK.with(|stack| {
335 let stack = stack.borrow();
336 stack.iter().any(|k| k == &full_key)
337 });
338
339 if cycle_detected {
340 let path = QUERY_STACK.with(|stack| {
341 let stack = stack.borrow();
342 let mut path: Vec<String> =
343 stack.iter().map(|k| k.debug_repr().to_string()).collect();
344 path.push(full_key.debug_repr().to_string());
345 path
346 });
347
348 self.tracer.on_cycle_detected(
349 path.iter()
350 .map(|s| TracerQueryKey::new("", s.clone()))
351 .collect(),
352 );
353 self.tracer
354 .on_query_end(span_id, query_key.clone(), ExecutionResult::CycleDetected);
355
356 return Err(QueryError::Cycle { path });
357 }
358
359 // Check if cached and valid (with verify-then-decide pattern)
360 let current_rev = self.whale.current_revision();
361
362 // Fast path: already verified at current revision
363 if self.whale.is_verified_at(&full_key, ¤t_rev) {
364 // Single atomic access to get both cached value and revision
365 if let Some((cached, revision)) = self.get_cached_with_revision::<Q>(&full_key) {
366 self.tracer.on_cache_check(span_id, query_key.clone(), true);
367 self.tracer
368 .on_query_end(span_id, query_key.clone(), ExecutionResult::CacheHit);
369
370 return match cached {
371 CachedValue::Ok(output) => Ok((Ok(output), revision)),
372 CachedValue::UserError(err) => Ok((Err(err), revision)),
373 };
374 }
375 }
376
377 // Check shallow validity (deps' changed_at ok) and try verify-then-decide
378 if self.whale.is_valid(&full_key) {
379 // Single atomic access to get both cached value and revision
380 if let Some((cached, revision)) = self.get_cached_with_revision::<Q>(&full_key) {
381 // Shallow valid but not verified - verify deps first
382 let mut deps_verified = true;
383 if let Some(deps) = self.whale.get_dependency_ids(&full_key) {
384 for dep in deps {
385 if let Some(verifier) = self.verifiers.get(&dep) {
386 // Re-run query/asset to verify it (triggers recursive verification)
387 // For assets, this re-accesses the asset which may re-run the locator
388 if verifier.verify(self as &dyn std::any::Any).is_err() {
389 deps_verified = false;
390 break;
391 }
392 }
393 // Note: deps without verifiers are assumed valid (they're verified
394 // by the final is_valid check if their changed_at increased)
395 }
396 }
397
398 // Re-check validity after deps are verified
399 if deps_verified && self.whale.is_valid(&full_key) {
400 // Deps didn't change their changed_at, mark as verified and use cached
401 self.whale.mark_verified(&full_key, ¤t_rev);
402
403 self.tracer.on_cache_check(span_id, query_key.clone(), true);
404 self.tracer
405 .on_query_end(span_id, query_key.clone(), ExecutionResult::CacheHit);
406
407 return match cached {
408 CachedValue::Ok(output) => Ok((Ok(output), revision)),
409 CachedValue::UserError(err) => Ok((Err(err), revision)),
410 };
411 }
412 // A dep's changed_at increased, fall through to execute
413 }
414 }
415
416 self.tracer
417 .on_cache_check(span_id, query_key.clone(), false);
418
419 // Execute the query with cycle tracking
420 let _guard = StackGuard::push(full_key.clone());
421 let result = self.execute_query::<Q>(&query, &full_key, exec_ctx);
422 drop(_guard);
423
424 // Emit end event
425 let exec_result = match &result {
426 Ok((_, true, _)) => ExecutionResult::Changed,
427 Ok((_, false, _)) => ExecutionResult::Unchanged,
428 Err(QueryError::Suspend { .. }) => ExecutionResult::Suspended,
429 Err(QueryError::Cycle { .. }) => ExecutionResult::CycleDetected,
430 Err(e) => ExecutionResult::Error {
431 message: format!("{:?}", e),
432 },
433 };
434 self.tracer
435 .on_query_end(span_id, query_key.clone(), exec_result);
436
437 result.map(|(inner_result, _, revision)| (inner_result, revision))
438 }
439
440 /// Execute a query, caching the result if appropriate.
441 ///
442 /// Returns (result, output_changed, revision) tuple.
443 /// - `result`: Ok(output) for success, Err(user_error) for user errors
444 /// - System errors (Suspend, Cycle, etc.) are returned as outer Err
445 #[allow(clippy::type_complexity)]
446 fn execute_query<Q: Query>(
447 &self,
448 query: &Q,
449 full_key: &FullCacheKey,
450 exec_ctx: ExecutionContext,
451 ) -> Result<
452 (
453 Result<Arc<Q::Output>, Arc<anyhow::Error>>,
454 bool,
455 RevisionCounter,
456 ),
457 QueryError,
458 > {
459 // Capture current global revision at query start for consistency checking
460 let start_revision = self.whale.current_revision().get(Durability::volatile());
461
462 // Create consistency tracker for this query execution
463 let tracker = Rc::new(ConsistencyTracker::new(start_revision));
464
465 // Set thread-local tracker for nested locator calls
466 let _tracker_guard = ConsistencyTrackerGuard::new(tracker);
467
468 // Create context for this query execution
469 let ctx = QueryContext {
470 runtime: self,
471 current_key: full_key.clone(),
472 parent_query_type: std::any::type_name::<Q>(),
473 exec_ctx,
474 deps: RefCell::new(Vec::new()),
475 };
476
477 // Execute the query (clone because query() takes ownership)
478 let result = query.clone().query(&ctx);
479
480 // Get collected dependencies
481 let deps: Vec<FullCacheKey> = ctx.deps.borrow().clone();
482
483 // Query durability defaults to stable - Whale will automatically reduce
484 // the effective durability to min(requested, min(dep_durabilities)).
485 // A pure query with no dependencies remains stable.
486 // A query depending on volatile assets becomes volatile.
487 let durability = Durability::stable();
488
489 match result {
490 Ok(output) => {
491 // Check if output changed (for early cutoff)
492 // existing_revision is Some only when output is unchanged (can reuse revision)
493 let existing_revision = if let Some((CachedValue::Ok(old), rev)) =
494 self.get_cached_with_revision::<Q>(full_key)
495 {
496 if Q::output_eq(&*old, &*output) {
497 Some(rev) // Same output - reuse revision
498 } else {
499 None // Different output
500 }
501 } else {
502 None // No previous Ok value
503 };
504 let output_changed = existing_revision.is_none();
505
506 // Emit early cutoff check event
507 self.tracer.on_early_cutoff_check(
508 exec_ctx.span_id(),
509 TracerQueryKey::new(std::any::type_name::<Q>(), full_key.debug_repr()),
510 output_changed,
511 );
512
513 // Update whale with cached entry (atomic update of value + dependency state)
514 let entry = CachedEntry::Ok(output.clone() as Arc<dyn std::any::Any + Send + Sync>);
515 let revision = if let Some(existing_rev) = existing_revision {
516 // confirm_unchanged doesn't change changed_at, use existing
517 let _ = self.whale.confirm_unchanged(full_key, deps);
518 existing_rev
519 } else {
520 // Use new_rev from register result
521 match self
522 .whale
523 .register(full_key.clone(), Some(entry), durability, deps)
524 {
525 Ok(result) => result.new_rev,
526 Err(missing) => {
527 return Err(QueryError::DependenciesRemoved {
528 missing_keys: missing,
529 })
530 }
531 }
532 };
533
534 // Register query in registry for list_queries
535 let is_new_query = self.query_registry.register(query);
536 if is_new_query {
537 let sentinel = FullCacheKey::query_set_sentinel::<Q>();
538 let _ = self
539 .whale
540 .register(sentinel, None, Durability::stable(), vec![]);
541 }
542
543 // Store verifier for this query (for verify-then-decide pattern)
544 self.verifiers
545 .insert::<Q, T>(full_key.clone(), query.clone());
546
547 Ok((Ok(output), output_changed, revision))
548 }
549 Err(QueryError::UserError(err)) => {
550 // Check if error changed (for early cutoff)
551 // existing_revision is Some only when error is unchanged (can reuse revision)
552 let existing_revision = if let Some((CachedValue::UserError(old_err), rev)) =
553 self.get_cached_with_revision::<Q>(full_key)
554 {
555 if (self.error_comparator)(old_err.as_ref(), err.as_ref()) {
556 Some(rev) // Same error - reuse revision
557 } else {
558 None // Different error
559 }
560 } else {
561 None // No previous UserError
562 };
563 let output_changed = existing_revision.is_none();
564
565 // Emit early cutoff check event
566 self.tracer.on_early_cutoff_check(
567 exec_ctx.span_id(),
568 TracerQueryKey::new(std::any::type_name::<Q>(), full_key.debug_repr()),
569 output_changed,
570 );
571
572 // Update whale with cached error (atomic update of value + dependency state)
573 let entry = CachedEntry::UserError(err.clone());
574 let revision = if let Some(existing_rev) = existing_revision {
575 // confirm_unchanged doesn't change changed_at, use existing
576 let _ = self.whale.confirm_unchanged(full_key, deps);
577 existing_rev
578 } else {
579 // Use new_rev from register result
580 match self
581 .whale
582 .register(full_key.clone(), Some(entry), durability, deps)
583 {
584 Ok(result) => result.new_rev,
585 Err(missing) => {
586 return Err(QueryError::DependenciesRemoved {
587 missing_keys: missing,
588 })
589 }
590 }
591 };
592
593 // Register query in registry for list_queries
594 let is_new_query = self.query_registry.register(query);
595 if is_new_query {
596 let sentinel = FullCacheKey::query_set_sentinel::<Q>();
597 let _ = self
598 .whale
599 .register(sentinel, None, Durability::stable(), vec![]);
600 }
601
602 // Store verifier for this query (for verify-then-decide pattern)
603 self.verifiers
604 .insert::<Q, T>(full_key.clone(), query.clone());
605
606 Ok((Err(err), output_changed, revision))
607 }
608 Err(e) => {
609 // System errors (Suspend, Cycle, Cancelled) are not cached
610 Err(e)
611 }
612 }
613 }
614
615 /// Invalidate a query, forcing recomputation on next access.
616 ///
617 /// This also invalidates any queries that depend on this one.
618 pub fn invalidate<Q: Query>(&self, key: &Q::CacheKey) {
619 let full_key = FullCacheKey::new::<Q, _>(key);
620
621 self.tracer.on_query_invalidated(
622 TracerQueryKey::new(std::any::type_name::<Q>(), full_key.debug_repr()),
623 InvalidationReason::ManualInvalidation,
624 );
625
626 // Update whale to invalidate dependents (register with None to clear cached value)
627 // Use stable durability to increment all revision counters, ensuring queries
628 // at any durability level will see this as a change.
629 let _ = self
630 .whale
631 .register(full_key, None, Durability::stable(), vec![]);
632 }
633
634 /// Remove a query from the cache entirely, freeing memory.
635 ///
636 /// Use this for GC when a query is no longer needed.
637 /// Unlike `invalidate`, this removes all traces of the query from storage.
638 /// The query will be recomputed from scratch on next access.
639 ///
640 /// This also invalidates any queries that depend on this one.
641 pub fn remove_query<Q: Query>(&self, key: &Q::CacheKey) {
642 let full_key = FullCacheKey::new::<Q, _>(key);
643
644 self.tracer.on_query_invalidated(
645 TracerQueryKey::new(std::any::type_name::<Q>(), full_key.debug_repr()),
646 InvalidationReason::ManualInvalidation,
647 );
648
649 // Remove verifier if exists
650 self.verifiers.remove(&full_key);
651
652 // Remove from whale storage (this also handles dependent invalidation)
653 self.whale.remove(&full_key);
654
655 // Remove from registry and update sentinel for list_queries
656 if self.query_registry.remove::<Q>(key) {
657 let sentinel = FullCacheKey::query_set_sentinel::<Q>();
658 let _ = self
659 .whale
660 .register(sentinel, None, Durability::stable(), vec![]);
661 }
662 }
663
664 /// Clear all cached values by removing all nodes from whale.
665 ///
666 /// Note: This is a relatively expensive operation as it iterates through all keys.
667 pub fn clear_cache(&self) {
668 let keys = self.whale.keys();
669 for key in keys {
670 self.whale.remove(&key);
671 }
672 }
673
674 /// Poll a query, returning both the result and its change revision.
675 ///
676 /// This is useful for implementing subscription patterns where you need to
677 /// detect changes efficiently. Compare the returned `revision` with a
678 /// previously stored value to determine if the query result has changed.
679 ///
680 /// The returned `Polled` contains a `Result<Arc<Q::Output>, Arc<anyhow::Error>>`
681 /// as its value, allowing you to track revision changes for both success and
682 /// user error cases.
683 ///
684 /// # Example
685 ///
686 /// ```ignore
687 /// struct Subscription<Q: Query> {
688 /// query: Q,
689 /// last_revision: RevisionCounter,
690 /// tx: Sender<Result<Arc<Q::Output>, Arc<anyhow::Error>>>,
691 /// }
692 ///
693 /// // Polling loop
694 /// for sub in &mut subscriptions {
695 /// let result = runtime.poll(sub.query.clone())?;
696 /// if result.revision > sub.last_revision {
697 /// sub.tx.send(result.value.clone())?;
698 /// sub.last_revision = result.revision;
699 /// }
700 /// }
701 /// ```
702 ///
703 /// # Errors
704 ///
705 /// Returns `Err` only for system errors (Suspend, Cycle, etc.).
706 /// User errors are returned as `Ok(Polled { value: Err(error), ... })`.
707 #[allow(clippy::type_complexity)]
708 pub fn poll<Q: Query>(
709 &self,
710 query: Q,
711 ) -> Result<Polled<Result<Arc<Q::Output>, Arc<anyhow::Error>>>, QueryError> {
712 let (value, revision) = self.query_internal(query)?;
713 Ok(Polled { value, revision })
714 }
715
716 /// Get the change revision of a query without executing it.
717 ///
718 /// Returns `None` if the query has never been executed.
719 ///
720 /// This is useful for checking if a query has changed since the last poll
721 /// without the cost of executing the query.
722 ///
723 /// # Example
724 ///
725 /// ```ignore
726 /// // Check if query has changed before deciding to poll
727 /// if let Some(rev) = runtime.changed_at::<MyQuery>(&key) {
728 /// if rev > last_known_revision {
729 /// let result = runtime.query(MyQuery::new(key))?;
730 /// // Process result...
731 /// }
732 /// }
733 /// ```
734 pub fn changed_at<Q: Query>(&self, key: &Q::CacheKey) -> Option<RevisionCounter> {
735 let full_key = FullCacheKey::new::<Q, _>(key);
736 self.whale.get(&full_key).map(|node| node.changed_at)
737 }
738}
739
740// ============================================================================
741// GC (Garbage Collection) API
742// ============================================================================
743
744impl<T: Tracer> QueryRuntime<T> {
745 /// Get all query keys currently in the cache.
746 ///
747 /// This is useful for implementing custom garbage collection strategies.
748 /// Use this in combination with [`Tracer::on_query_key`] to track access
749 /// times and implement LRU, TTL, or other GC algorithms externally.
750 ///
751 /// # Example
752 ///
753 /// ```ignore
754 /// // Collect all keys that haven't been accessed recently
755 /// let stale_keys: Vec<_> = runtime.query_keys()
756 /// .filter(|key| tracker.is_stale(key))
757 /// .collect();
758 ///
759 /// // Remove stale queries that have no dependents
760 /// for key in stale_keys {
761 /// runtime.remove_if_unused(&key);
762 /// }
763 /// ```
764 pub fn query_keys(&self) -> Vec<FullCacheKey> {
765 self.whale.keys()
766 }
767
768 /// Remove a query if it has no dependents.
769 ///
770 /// Returns `true` if the query was removed, `false` if it has dependents
771 /// or doesn't exist. This is the safe way to remove queries during GC,
772 /// as it won't break queries that depend on this one.
773 ///
774 /// # Example
775 ///
776 /// ```ignore
777 /// let key = FullCacheKey::new::<MyQuery, _>(&cache_key);
778 /// if runtime.remove_query_if_unused::<MyQuery>(&cache_key) {
779 /// println!("Query removed");
780 /// } else {
781 /// println!("Query has dependents, not removed");
782 /// }
783 /// ```
784 pub fn remove_query_if_unused<Q: Query>(&self, key: &Q::CacheKey) -> bool {
785 let full_key = FullCacheKey::new::<Q, _>(key);
786 self.remove_if_unused(&full_key)
787 }
788
789 /// Remove a query by its [`FullCacheKey`].
790 ///
791 /// This is the type-erased version of [`remove_query`](Self::remove_query).
792 /// Use this when you have a `FullCacheKey` from [`query_keys`](Self::query_keys)
793 /// or [`Tracer::on_query_key`].
794 ///
795 /// Returns `true` if the query was removed, `false` if it doesn't exist.
796 ///
797 /// # Warning
798 ///
799 /// This forcibly removes the query even if other queries depend on it.
800 /// Dependent queries will be recomputed on next access. For safe GC,
801 /// use [`remove_if_unused`](Self::remove_if_unused) instead.
802 pub fn remove(&self, key: &FullCacheKey) -> bool {
803 // Remove verifier if exists
804 self.verifiers.remove(key);
805
806 // Remove from whale storage
807 self.whale.remove(key).is_some()
808 }
809
810 /// Remove a query by its [`FullCacheKey`] if it has no dependents.
811 ///
812 /// This is the type-erased version of [`remove_query_if_unused`](Self::remove_query_if_unused).
813 /// Use this when you have a `FullCacheKey` from [`query_keys`](Self::query_keys)
814 /// or [`Tracer::on_query_key`].
815 ///
816 /// Returns `true` if the query was removed, `false` if it has dependents
817 /// or doesn't exist.
818 ///
819 /// # Example
820 ///
821 /// ```ignore
822 /// // Implement LRU GC
823 /// for key in runtime.query_keys() {
824 /// if tracker.is_expired(&key) {
825 /// runtime.remove_if_unused(&key);
826 /// }
827 /// }
828 /// ```
829 pub fn remove_if_unused(&self, key: &FullCacheKey) -> bool {
830 if self.whale.remove_if_unused(key.clone()).is_some() {
831 // Successfully removed - clean up verifier
832 self.verifiers.remove(key);
833 true
834 } else {
835 false
836 }
837 }
838}
839
840// ============================================================================
841// Builder
842// ============================================================================
843
844/// Builder for [`QueryRuntime`] with customizable settings.
845///
846/// # Example
847///
848/// ```ignore
849/// let runtime = QueryRuntime::builder()
850/// .error_comparator(|a, b| {
851/// // Treat all errors of the same type as equal
852/// a.downcast_ref::<std::io::Error>().is_some()
853/// == b.downcast_ref::<std::io::Error>().is_some()
854/// })
855/// .build();
856/// ```
857pub struct QueryRuntimeBuilder<T: Tracer = NoopTracer> {
858 error_comparator: ErrorComparator,
859 tracer: T,
860}
861
862impl Default for QueryRuntimeBuilder<NoopTracer> {
863 fn default() -> Self {
864 Self::new()
865 }
866}
867
868impl QueryRuntimeBuilder<NoopTracer> {
869 /// Create a new builder with default settings.
870 pub fn new() -> Self {
871 Self {
872 error_comparator: default_error_comparator,
873 tracer: NoopTracer,
874 }
875 }
876}
877
878impl<T: Tracer> QueryRuntimeBuilder<T> {
879 /// Set the error comparator function for early cutoff optimization.
880 ///
881 /// When a query returns `QueryError::UserError`, this function is used
882 /// to compare it with the previously cached error. If they are equal,
883 /// downstream queries can skip recomputation (early cutoff).
884 ///
885 /// The default comparator returns `false` for all errors, meaning errors
886 /// are always considered different (conservative, always recomputes).
887 ///
888 /// # Example
889 ///
890 /// ```ignore
891 /// // Treat errors as equal if they have the same display message
892 /// let runtime = QueryRuntime::builder()
893 /// .error_comparator(|a, b| a.to_string() == b.to_string())
894 /// .build();
895 /// ```
896 pub fn error_comparator(mut self, f: ErrorComparator) -> Self {
897 self.error_comparator = f;
898 self
899 }
900
901 /// Set the tracer for observability.
902 pub fn tracer<U: Tracer>(self, tracer: U) -> QueryRuntimeBuilder<U> {
903 QueryRuntimeBuilder {
904 error_comparator: self.error_comparator,
905 tracer,
906 }
907 }
908
909 /// Build the runtime with the configured settings.
910 pub fn build(self) -> QueryRuntime<T> {
911 QueryRuntime {
912 whale: WhaleRuntime::new(),
913 locators: Arc::new(LocatorStorage::new()),
914 pending: Arc::new(PendingStorage::new()),
915 query_registry: Arc::new(QueryRegistry::new()),
916 asset_key_registry: Arc::new(AssetKeyRegistry::new()),
917 verifiers: Arc::new(VerifierStorage::new()),
918 error_comparator: self.error_comparator,
919 tracer: Arc::new(self.tracer),
920 }
921 }
922}
923
924// ============================================================================
925// Asset API
926// ============================================================================
927
928impl<T: Tracer> QueryRuntime<T> {
929 /// Register an asset locator for a specific asset key type.
930 ///
931 /// Only one locator can be registered per key type. Later registrations
932 /// replace earlier ones.
933 ///
934 /// # Example
935 ///
936 /// ```ignore
937 /// let runtime = QueryRuntime::new();
938 /// runtime.register_asset_locator(FileSystemLocator::new("/assets"));
939 /// ```
940 pub fn register_asset_locator<K, L>(&self, locator: L)
941 where
942 K: AssetKey,
943 L: AssetLocator<K>,
944 {
945 self.locators.insert::<K, L>(locator);
946 }
947
948 /// Get an iterator over pending asset requests.
949 ///
950 /// Returns assets that have been requested but not yet resolved.
951 /// The user should fetch these externally and call `resolve_asset()`.
952 ///
953 /// # Example
954 ///
955 /// ```ignore
956 /// for pending in runtime.pending_assets() {
957 /// if let Some(path) = pending.key::<FilePath>() {
958 /// let content = fetch_file(path);
959 /// runtime.resolve_asset(path.clone(), content);
960 /// }
961 /// }
962 /// ```
963 pub fn pending_assets(&self) -> Vec<PendingAsset> {
964 self.pending.get_all()
965 }
966
967 /// Get pending assets filtered by key type.
968 pub fn pending_assets_of<K: AssetKey>(&self) -> Vec<K> {
969 self.pending.get_of_type::<K>()
970 }
971
972 /// Check if there are any pending assets.
973 pub fn has_pending_assets(&self) -> bool {
974 !self.pending.is_empty()
975 }
976
977 /// Resolve an asset with its loaded value.
978 ///
979 /// This marks the asset as ready and invalidates any queries that
980 /// depend on it (if the value changed), triggering recomputation on next access.
981 ///
982 /// This method is idempotent - resolving with the same value (via `asset_eq`)
983 /// will not trigger downstream recomputation.
984 ///
985 /// # Arguments
986 ///
987 /// * `key` - The asset key identifying this resource
988 /// * `value` - The loaded asset value
989 /// * `durability` - How frequently this asset is expected to change
990 ///
991 /// # Example
992 ///
993 /// ```ignore
994 /// let content = std::fs::read_to_string(&path)?;
995 /// runtime.resolve_asset(FilePath(path), content, DurabilityLevel::Volatile);
996 /// ```
997 pub fn resolve_asset<K: AssetKey>(&self, key: K, value: K::Asset, durability: DurabilityLevel) {
998 self.resolve_asset_internal(key, value, durability);
999 }
1000
1001 /// Resolve an asset with an error.
1002 ///
1003 /// This marks the asset as errored and caches the error. Queries depending
1004 /// on this asset will receive `Err(QueryError::UserError(...))`.
1005 ///
1006 /// Use this when async loading fails (e.g., network error, file not found,
1007 /// access denied).
1008 ///
1009 /// # Arguments
1010 ///
1011 /// * `key` - The asset key identifying this resource
1012 /// * `error` - The error to cache (will be wrapped in `Arc`)
1013 /// * `durability` - How frequently this error state is expected to change
1014 ///
1015 /// # Example
1016 ///
1017 /// ```ignore
1018 /// match fetch_file(&path) {
1019 /// Ok(content) => runtime.resolve_asset(FilePath(path), content, DurabilityLevel::Volatile),
1020 /// Err(e) => runtime.resolve_asset_error(FilePath(path), e, DurabilityLevel::Volatile),
1021 /// }
1022 /// ```
1023 pub fn resolve_asset_error<K: AssetKey>(
1024 &self,
1025 key: K,
1026 error: impl Into<anyhow::Error>,
1027 durability: DurabilityLevel,
1028 ) {
1029 let full_asset_key = FullAssetKey::new(&key);
1030 let full_cache_key = FullCacheKey::from_asset_key(&full_asset_key);
1031
1032 // Remove from pending BEFORE registering the error
1033 self.pending.remove(&full_asset_key);
1034
1035 // Prepare the error entry
1036 let error_arc = Arc::new(error.into());
1037 let entry = CachedEntry::AssetError(error_arc.clone());
1038 let durability =
1039 Durability::new(durability.as_u8() as usize).unwrap_or(Durability::volatile());
1040
1041 // Atomic compare-and-update (errors are always considered changed for now)
1042 let result = self
1043 .whale
1044 .update_with_compare(
1045 full_cache_key,
1046 Some(entry),
1047 |old_data, _new_data| {
1048 // Compare old and new errors using error_comparator
1049 match old_data.and_then(|d| d.as_ref()) {
1050 Some(CachedEntry::AssetError(old_err)) => {
1051 !(self.error_comparator)(old_err.as_ref(), error_arc.as_ref())
1052 }
1053 _ => true, // Loading, Ready, or not present -> changed
1054 }
1055 },
1056 durability,
1057 vec![],
1058 )
1059 .expect("update_with_compare with no dependencies cannot fail");
1060
1061 // Emit asset resolved event (with changed status)
1062 self.tracer.on_asset_resolved(
1063 TracerAssetKey::new(std::any::type_name::<K>(), format!("{:?}", key)),
1064 result.changed,
1065 );
1066
1067 // Register asset key in registry for list_asset_keys
1068 let is_new_asset = self.asset_key_registry.register(&key);
1069 if is_new_asset {
1070 // Update sentinel to invalidate list_asset_keys dependents
1071 let sentinel = FullCacheKey::asset_key_set_sentinel::<K>();
1072 let _ = self
1073 .whale
1074 .register(sentinel, None, Durability::stable(), vec![]);
1075 }
1076 }
1077
1078 fn resolve_asset_internal<K: AssetKey>(
1079 &self,
1080 key: K,
1081 value: K::Asset,
1082 durability_level: DurabilityLevel,
1083 ) {
1084 let full_asset_key = FullAssetKey::new(&key);
1085 let full_cache_key = FullCacheKey::from_asset_key(&full_asset_key);
1086
1087 // Remove from pending BEFORE registering the value
1088 self.pending.remove(&full_asset_key);
1089
1090 // Prepare the new entry
1091 let value_arc: Arc<K::Asset> = Arc::new(value);
1092 let entry = CachedEntry::AssetReady(value_arc.clone() as Arc<dyn Any + Send + Sync>);
1093 let durability =
1094 Durability::new(durability_level.as_u8() as usize).unwrap_or(Durability::volatile());
1095
1096 // Atomic compare-and-update
1097 let result = self
1098 .whale
1099 .update_with_compare(
1100 full_cache_key,
1101 Some(entry),
1102 |old_data, _new_data| {
1103 // Compare old and new values
1104 match old_data.and_then(|d| d.as_ref()) {
1105 Some(CachedEntry::AssetReady(old_arc)) => {
1106 match old_arc.clone().downcast::<K::Asset>() {
1107 Ok(old_value) => !K::asset_eq(&old_value, &value_arc),
1108 Err(_) => true, // Type mismatch, treat as changed
1109 }
1110 }
1111 _ => true, // Loading, NotFound, or not present -> changed
1112 }
1113 },
1114 durability,
1115 vec![],
1116 )
1117 .expect("update_with_compare with no dependencies cannot fail");
1118
1119 // Emit asset resolved event
1120 self.tracer.on_asset_resolved(
1121 TracerAssetKey::new(std::any::type_name::<K>(), format!("{:?}", key)),
1122 result.changed,
1123 );
1124
1125 // Register asset key in registry for list_asset_keys
1126 let is_new_asset = self.asset_key_registry.register(&key);
1127 if is_new_asset {
1128 // Update sentinel to invalidate list_asset_keys dependents
1129 let sentinel = FullCacheKey::asset_key_set_sentinel::<K>();
1130 let _ = self
1131 .whale
1132 .register(sentinel, None, Durability::stable(), vec![]);
1133 }
1134 }
1135
1136 /// Invalidate an asset, forcing queries to re-request it.
1137 ///
1138 /// The asset will be marked as loading and added to pending assets.
1139 /// Dependent queries will suspend until the asset is resolved again.
1140 ///
1141 /// # Example
1142 ///
1143 /// ```ignore
1144 /// // File was modified externally
1145 /// runtime.invalidate_asset(&FilePath("config.json".into()));
1146 /// // Queries depending on this asset will now suspend
1147 /// // User should fetch the new value and call resolve_asset
1148 /// ```
1149 pub fn invalidate_asset<K: AssetKey>(&self, key: &K) {
1150 let full_asset_key = FullAssetKey::new(key);
1151 let full_cache_key = FullCacheKey::from_asset_key(&full_asset_key);
1152
1153 // Emit asset invalidated event
1154 self.tracer.on_asset_invalidated(TracerAssetKey::new(
1155 std::any::type_name::<K>(),
1156 format!("{:?}", key),
1157 ));
1158
1159 // Add to pending FIRST (before clearing whale state)
1160 // This ensures: readers see either old value, or Loading+pending
1161 self.pending.insert::<K>(full_asset_key, key.clone());
1162
1163 // Atomic: clear cached value + invalidate dependents
1164 // Using None for data means "needs to be loaded"
1165 // Use stable durability to ensure queries at any durability level see the change.
1166 let _ = self
1167 .whale
1168 .register(full_cache_key, None, Durability::stable(), vec![]);
1169 }
1170
1171 /// Remove an asset from the cache entirely.
1172 ///
1173 /// Unlike `invalidate_asset`, this removes all traces of the asset.
1174 /// Dependent queries will go through the locator again on next access.
1175 pub fn remove_asset<K: AssetKey>(&self, key: &K) {
1176 let full_asset_key = FullAssetKey::new(key);
1177 let full_cache_key = FullCacheKey::from_asset_key(&full_asset_key);
1178
1179 // Remove from pending first
1180 self.pending.remove(&full_asset_key);
1181
1182 // Remove from whale (this also cleans up dependency edges)
1183 // whale.remove() invalidates dependents before removing
1184 self.whale.remove(&full_cache_key);
1185
1186 // Remove from registry and update sentinel for list_asset_keys
1187 if self.asset_key_registry.remove::<K>(key) {
1188 let sentinel = FullCacheKey::asset_key_set_sentinel::<K>();
1189 let _ = self
1190 .whale
1191 .register(sentinel, None, Durability::stable(), vec![]);
1192 }
1193 }
1194
1195 /// Get an asset by key without tracking dependencies.
1196 ///
1197 /// Unlike `QueryContext::asset()`, this method does NOT register the caller
1198 /// as a dependent of the asset. Use this for direct asset access outside
1199 /// of query execution.
1200 ///
1201 /// # Returns
1202 ///
1203 /// - `Ok(AssetLoadingState::ready(...))` - Asset is loaded and ready
1204 /// - `Ok(AssetLoadingState::loading(...))` - Asset is still loading (added to pending)
1205 /// - `Err(QueryError::UserError)` - Asset was not found or locator returned an error
1206 pub fn get_asset<K: AssetKey>(&self, key: K) -> Result<AssetLoadingState<K>, QueryError> {
1207 self.get_asset_internal(key)
1208 }
1209
1210 /// Internal: Get asset state and its changed_at revision atomically.
1211 ///
1212 /// This is the "direct" version called from QueryRuntime::asset (no dependency tracking).
1213 /// For calls from QueryContext::asset, use `get_asset_with_revision_ctx`.
1214 ///
1215 /// Returns (AssetLoadingState, changed_at) where changed_at is from the same
1216 /// whale node that provided the asset value.
1217 fn get_asset_with_revision<K: AssetKey>(
1218 &self,
1219 key: K,
1220 ) -> Result<(AssetLoadingState<K>, RevisionCounter), QueryError> {
1221 let full_asset_key = FullAssetKey::new(&key);
1222 let full_cache_key = FullCacheKey::from_asset_key(&full_asset_key);
1223
1224 // Helper to emit AssetRequested event
1225 let emit_requested = |tracer: &T, key: &K, state: TracerAssetState| {
1226 tracer.on_asset_requested(
1227 TracerAssetKey::new(std::any::type_name::<K>(), format!("{:?}", key)),
1228 state,
1229 );
1230 };
1231
1232 // Check whale cache first (single atomic read)
1233 if let Some(node) = self.whale.get(&full_cache_key) {
1234 let changed_at = node.changed_at;
1235 // Check if valid at current revision (shallow check)
1236 if self.whale.is_valid(&full_cache_key) {
1237 // Verify dependencies recursively (like query path does)
1238 let mut deps_verified = true;
1239 if let Some(deps) = self.whale.get_dependency_ids(&full_cache_key) {
1240 for dep in deps {
1241 if let Some(verifier) = self.verifiers.get(&dep) {
1242 // Re-run query/asset to verify it (triggers recursive verification)
1243 if verifier.verify(self as &dyn std::any::Any).is_err() {
1244 deps_verified = false;
1245 break;
1246 }
1247 }
1248 }
1249 }
1250
1251 // Re-check validity after deps are verified
1252 if deps_verified && self.whale.is_valid(&full_cache_key) {
1253 // For cached entries, check consistency for leaf assets (no locator deps).
1254 // This detects if resolve_asset/resolve_asset_error was called during query execution.
1255 let has_locator_deps = self
1256 .whale
1257 .get_dependency_ids(&full_cache_key)
1258 .is_some_and(|deps| !deps.is_empty());
1259
1260 match &node.data {
1261 Some(CachedEntry::AssetReady(arc)) => {
1262 // Check consistency for cached leaf assets
1263 if !has_locator_deps {
1264 check_leaf_asset_consistency(changed_at)?;
1265 }
1266 emit_requested(&self.tracer, &key, TracerAssetState::Ready);
1267 match arc.clone().downcast::<K::Asset>() {
1268 Ok(value) => {
1269 return Ok((AssetLoadingState::ready(key, value), changed_at))
1270 }
1271 Err(_) => {
1272 unreachable!("Asset type mismatch: {:?}", key)
1273 }
1274 }
1275 }
1276 Some(CachedEntry::AssetError(err)) => {
1277 // Check consistency for cached leaf errors
1278 if !has_locator_deps {
1279 check_leaf_asset_consistency(changed_at)?;
1280 }
1281 emit_requested(&self.tracer, &key, TracerAssetState::NotFound);
1282 return Err(QueryError::UserError(err.clone()));
1283 }
1284 None => {
1285 // Loading state - no value to be inconsistent with
1286 emit_requested(&self.tracer, &key, TracerAssetState::Loading);
1287 return Ok((AssetLoadingState::loading(key), changed_at));
1288 }
1289 _ => {
1290 // Query-related entries (Ok, UserError) shouldn't be here
1291 // Fall through to locator
1292 }
1293 }
1294 }
1295 }
1296 }
1297
1298 // Not in cache or invalid - try locator
1299 // Use LocatorContext to track deps on the asset itself
1300 check_cycle(&full_cache_key)?;
1301 let _guard = StackGuard::push(full_cache_key.clone());
1302
1303 let locator_ctx = LocatorContext::new(self, full_cache_key.clone());
1304 let locator_result =
1305 self.locators
1306 .locate_with_locator_ctx(TypeId::of::<K>(), &locator_ctx, &key);
1307
1308 if let Some(result) = locator_result {
1309 // Get collected dependencies from the locator context
1310 let locator_deps = locator_ctx.into_deps();
1311 match result {
1312 Ok(ErasedLocateResult::Ready {
1313 value: arc,
1314 durability: durability_level,
1315 }) => {
1316 emit_requested(&self.tracer, &key, TracerAssetState::Ready);
1317
1318 let typed_value: Arc<K::Asset> = match arc.downcast::<K::Asset>() {
1319 Ok(v) => v,
1320 Err(_) => {
1321 unreachable!("Asset type mismatch: {:?}", key);
1322 }
1323 };
1324
1325 // Store in whale atomically with early cutoff
1326 // Include locator's dependencies so the asset is invalidated when they change
1327 let entry = CachedEntry::AssetReady(typed_value.clone());
1328 let durability = Durability::new(durability_level.as_u8() as usize)
1329 .unwrap_or(Durability::volatile());
1330 let new_value = typed_value.clone();
1331 let result = self
1332 .whale
1333 .update_with_compare(
1334 full_cache_key.clone(),
1335 Some(entry),
1336 |old_data, _new_data| {
1337 let Some(CachedEntry::AssetReady(old_arc)) =
1338 old_data.and_then(|d| d.as_ref())
1339 else {
1340 return true;
1341 };
1342 let Ok(old_value) = old_arc.clone().downcast::<K::Asset>() else {
1343 return true;
1344 };
1345 !K::asset_eq(&old_value, &new_value)
1346 },
1347 durability,
1348 locator_deps,
1349 )
1350 .expect("update_with_compare should succeed");
1351
1352 // Register verifier for this asset (for verify-then-decide pattern)
1353 self.verifiers
1354 .insert_asset::<K, T>(full_cache_key, key.clone());
1355
1356 return Ok((AssetLoadingState::ready(key, typed_value), result.revision));
1357 }
1358 Ok(ErasedLocateResult::Pending) => {
1359 emit_requested(&self.tracer, &key, TracerAssetState::Loading);
1360
1361 // Add to pending list for Pending result
1362 self.pending.insert::<K>(full_asset_key, key.clone());
1363 match self
1364 .whale
1365 .get_or_insert(full_cache_key, None, Durability::volatile(), locator_deps)
1366 .expect("get_or_insert should succeed")
1367 {
1368 GetOrInsertResult::Inserted(node) => {
1369 return Ok((AssetLoadingState::loading(key), node.changed_at));
1370 }
1371 GetOrInsertResult::Existing(node) => {
1372 let changed_at = node.changed_at;
1373 match &node.data {
1374 Some(CachedEntry::AssetReady(arc)) => {
1375 match arc.clone().downcast::<K::Asset>() {
1376 Ok(value) => {
1377 return Ok((
1378 AssetLoadingState::ready(key, value),
1379 changed_at,
1380 ))
1381 }
1382 Err(_) => {
1383 return Ok((
1384 AssetLoadingState::loading(key),
1385 changed_at,
1386 ))
1387 }
1388 }
1389 }
1390 Some(CachedEntry::AssetError(err)) => {
1391 return Err(QueryError::UserError(err.clone()));
1392 }
1393 _ => return Ok((AssetLoadingState::loading(key), changed_at)),
1394 }
1395 }
1396 }
1397 }
1398 Err(QueryError::UserError(err)) => {
1399 // Locator returned a user error - cache it as AssetError
1400 let entry = CachedEntry::AssetError(err.clone());
1401 let _ = self.whale.register(
1402 full_cache_key,
1403 Some(entry),
1404 Durability::volatile(),
1405 locator_deps,
1406 );
1407 return Err(QueryError::UserError(err));
1408 }
1409 Err(e) => {
1410 // Other errors (Cycle, Suspended, etc.) - do NOT cache, propagate directly
1411 return Err(e);
1412 }
1413 }
1414 }
1415
1416 // No locator registered or locator returned None - mark as pending
1417 // (no locator was called, so no deps to track)
1418 emit_requested(&self.tracer, &key, TracerAssetState::Loading);
1419 self.pending
1420 .insert::<K>(full_asset_key.clone(), key.clone());
1421
1422 match self
1423 .whale
1424 .get_or_insert(full_cache_key, None, Durability::volatile(), vec![])
1425 .expect("get_or_insert with no dependencies cannot fail")
1426 {
1427 GetOrInsertResult::Inserted(node) => {
1428 Ok((AssetLoadingState::loading(key), node.changed_at))
1429 }
1430 GetOrInsertResult::Existing(node) => {
1431 let changed_at = node.changed_at;
1432 match &node.data {
1433 Some(CachedEntry::AssetReady(arc)) => {
1434 match arc.clone().downcast::<K::Asset>() {
1435 Ok(value) => Ok((AssetLoadingState::ready(key, value), changed_at)),
1436 Err(_) => Ok((AssetLoadingState::loading(key), changed_at)),
1437 }
1438 }
1439 Some(CachedEntry::AssetError(err)) => Err(QueryError::UserError(err.clone())),
1440 _ => Ok((AssetLoadingState::loading(key), changed_at)),
1441 }
1442 }
1443 }
1444 }
1445
1446 /// Internal: Get asset state and its changed_at revision atomically (with QueryContext).
1447 ///
1448 /// This version is called from QueryContext::asset. Consistency checking for
1449 /// cached leaf assets is done inside this function before returning.
1450 fn get_asset_with_revision_ctx<K: AssetKey>(
1451 &self,
1452 key: K,
1453 _ctx: &QueryContext<'_, T>,
1454 ) -> Result<(AssetLoadingState<K>, RevisionCounter), QueryError> {
1455 let full_asset_key = FullAssetKey::new(&key);
1456 let full_cache_key = FullCacheKey::from_asset_key(&full_asset_key);
1457
1458 // Helper to emit AssetRequested event
1459 let emit_requested = |tracer: &T, key: &K, state: TracerAssetState| {
1460 tracer.on_asset_requested(
1461 TracerAssetKey::new(std::any::type_name::<K>(), format!("{:?}", key)),
1462 state,
1463 );
1464 };
1465
1466 // Check whale cache first (single atomic read)
1467 if let Some(node) = self.whale.get(&full_cache_key) {
1468 let changed_at = node.changed_at;
1469 // Check if valid at current revision (shallow check)
1470 if self.whale.is_valid(&full_cache_key) {
1471 // Verify dependencies recursively (like query path does)
1472 let mut deps_verified = true;
1473 if let Some(deps) = self.whale.get_dependency_ids(&full_cache_key) {
1474 for dep in deps {
1475 if let Some(verifier) = self.verifiers.get(&dep) {
1476 // Re-run query/asset to verify it (triggers recursive verification)
1477 if verifier.verify(self as &dyn std::any::Any).is_err() {
1478 deps_verified = false;
1479 break;
1480 }
1481 }
1482 }
1483 }
1484
1485 // Re-check validity after deps are verified
1486 if deps_verified && self.whale.is_valid(&full_cache_key) {
1487 // For cached entries, check consistency for leaf assets (no locator deps).
1488 // This detects if resolve_asset/resolve_asset_error was called during query execution.
1489 let has_locator_deps = self
1490 .whale
1491 .get_dependency_ids(&full_cache_key)
1492 .is_some_and(|deps| !deps.is_empty());
1493
1494 match &node.data {
1495 Some(CachedEntry::AssetReady(arc)) => {
1496 // Check consistency for cached leaf assets
1497 if !has_locator_deps {
1498 check_leaf_asset_consistency(changed_at)?;
1499 }
1500 emit_requested(&self.tracer, &key, TracerAssetState::Ready);
1501 match arc.clone().downcast::<K::Asset>() {
1502 Ok(value) => {
1503 return Ok((AssetLoadingState::ready(key, value), changed_at))
1504 }
1505 Err(_) => {
1506 unreachable!("Asset type mismatch: {:?}", key)
1507 }
1508 }
1509 }
1510 Some(CachedEntry::AssetError(err)) => {
1511 // Check consistency for cached leaf errors
1512 if !has_locator_deps {
1513 check_leaf_asset_consistency(changed_at)?;
1514 }
1515 emit_requested(&self.tracer, &key, TracerAssetState::NotFound);
1516 return Err(QueryError::UserError(err.clone()));
1517 }
1518 None => {
1519 // Loading state - no value to be inconsistent with
1520 emit_requested(&self.tracer, &key, TracerAssetState::Loading);
1521 return Ok((AssetLoadingState::loading(key), changed_at));
1522 }
1523 _ => {
1524 // Query-related entries (Ok, UserError) shouldn't be here
1525 // Fall through to locator
1526 }
1527 }
1528 }
1529 }
1530 }
1531
1532 // Not in cache or invalid - try locator
1533 // Use LocatorContext to track deps on the asset itself (not the calling query)
1534 // Consistency tracking is handled via thread-local storage
1535 check_cycle(&full_cache_key)?;
1536 let _guard = StackGuard::push(full_cache_key.clone());
1537
1538 let locator_ctx = LocatorContext::new(self, full_cache_key.clone());
1539 let locator_result =
1540 self.locators
1541 .locate_with_locator_ctx(TypeId::of::<K>(), &locator_ctx, &key);
1542
1543 if let Some(result) = locator_result {
1544 // Get collected dependencies from the locator context
1545 let locator_deps = locator_ctx.into_deps();
1546 match result {
1547 Ok(ErasedLocateResult::Ready {
1548 value: arc,
1549 durability: durability_level,
1550 }) => {
1551 emit_requested(&self.tracer, &key, TracerAssetState::Ready);
1552
1553 let typed_value: Arc<K::Asset> = match arc.downcast::<K::Asset>() {
1554 Ok(v) => v,
1555 Err(_) => {
1556 unreachable!("Asset type mismatch: {:?}", key);
1557 }
1558 };
1559
1560 // Store in whale atomically with early cutoff
1561 // Include locator's dependencies so the asset is invalidated when they change
1562 let entry = CachedEntry::AssetReady(typed_value.clone());
1563 let durability = Durability::new(durability_level.as_u8() as usize)
1564 .unwrap_or(Durability::volatile());
1565 let new_value = typed_value.clone();
1566 let result = self
1567 .whale
1568 .update_with_compare(
1569 full_cache_key.clone(),
1570 Some(entry),
1571 |old_data, _new_data| {
1572 let Some(CachedEntry::AssetReady(old_arc)) =
1573 old_data.and_then(|d| d.as_ref())
1574 else {
1575 return true;
1576 };
1577 let Ok(old_value) = old_arc.clone().downcast::<K::Asset>() else {
1578 return true;
1579 };
1580 !K::asset_eq(&old_value, &new_value)
1581 },
1582 durability,
1583 locator_deps,
1584 )
1585 .expect("update_with_compare should succeed");
1586
1587 // Register verifier for this asset (for verify-then-decide pattern)
1588 self.verifiers
1589 .insert_asset::<K, T>(full_cache_key, key.clone());
1590
1591 return Ok((AssetLoadingState::ready(key, typed_value), result.revision));
1592 }
1593 Ok(ErasedLocateResult::Pending) => {
1594 emit_requested(&self.tracer, &key, TracerAssetState::Loading);
1595
1596 // Add to pending list for Pending result
1597 self.pending.insert::<K>(full_asset_key, key.clone());
1598 match self
1599 .whale
1600 .get_or_insert(full_cache_key, None, Durability::volatile(), locator_deps)
1601 .expect("get_or_insert should succeed")
1602 {
1603 GetOrInsertResult::Inserted(node) => {
1604 return Ok((AssetLoadingState::loading(key), node.changed_at));
1605 }
1606 GetOrInsertResult::Existing(node) => {
1607 let changed_at = node.changed_at;
1608 match &node.data {
1609 Some(CachedEntry::AssetReady(arc)) => {
1610 match arc.clone().downcast::<K::Asset>() {
1611 Ok(value) => {
1612 return Ok((
1613 AssetLoadingState::ready(key, value),
1614 changed_at,
1615 ));
1616 }
1617 Err(_) => {
1618 return Ok((
1619 AssetLoadingState::loading(key),
1620 changed_at,
1621 ))
1622 }
1623 }
1624 }
1625 Some(CachedEntry::AssetError(err)) => {
1626 return Err(QueryError::UserError(err.clone()));
1627 }
1628 _ => return Ok((AssetLoadingState::loading(key), changed_at)),
1629 }
1630 }
1631 }
1632 }
1633 Err(QueryError::UserError(err)) => {
1634 // Locator returned a user error - cache it as AssetError
1635 let entry = CachedEntry::AssetError(err.clone());
1636 let _ = self.whale.register(
1637 full_cache_key,
1638 Some(entry),
1639 Durability::volatile(),
1640 locator_deps,
1641 );
1642 return Err(QueryError::UserError(err));
1643 }
1644 Err(e) => {
1645 // Other errors (Cycle, Suspended, etc.) - do NOT cache, propagate directly
1646 return Err(e);
1647 }
1648 }
1649 }
1650
1651 // No locator registered or locator returned None - mark as pending
1652 emit_requested(&self.tracer, &key, TracerAssetState::Loading);
1653 self.pending
1654 .insert::<K>(full_asset_key.clone(), key.clone());
1655
1656 match self
1657 .whale
1658 .get_or_insert(full_cache_key, None, Durability::volatile(), vec![])
1659 .expect("get_or_insert with no dependencies cannot fail")
1660 {
1661 GetOrInsertResult::Inserted(node) => {
1662 Ok((AssetLoadingState::loading(key), node.changed_at))
1663 }
1664 GetOrInsertResult::Existing(node) => {
1665 let changed_at = node.changed_at;
1666 match &node.data {
1667 Some(CachedEntry::AssetReady(arc)) => {
1668 match arc.clone().downcast::<K::Asset>() {
1669 Ok(value) => Ok((AssetLoadingState::ready(key, value), changed_at)),
1670 Err(_) => Ok((AssetLoadingState::loading(key), changed_at)),
1671 }
1672 }
1673 Some(CachedEntry::AssetError(err)) => Err(QueryError::UserError(err.clone())),
1674 _ => Ok((AssetLoadingState::loading(key), changed_at)),
1675 }
1676 }
1677 }
1678 }
1679
1680 /// Internal: Get asset state, checking cache and locator.
1681 fn get_asset_internal<K: AssetKey>(&self, key: K) -> Result<AssetLoadingState<K>, QueryError> {
1682 self.get_asset_with_revision(key).map(|(state, _)| state)
1683 }
1684}
1685
1686impl<T: Tracer> Db for QueryRuntime<T> {
1687 fn query<Q: Query>(&self, query: Q) -> Result<Arc<Q::Output>, QueryError> {
1688 QueryRuntime::query(self, query)
1689 }
1690
1691 fn asset<K: AssetKey>(&self, key: K) -> Result<Arc<K::Asset>, QueryError> {
1692 self.get_asset_internal(key)?.suspend()
1693 }
1694
1695 fn asset_state<K: AssetKey>(&self, key: K) -> Result<AssetLoadingState<K>, QueryError> {
1696 self.get_asset_internal(key)
1697 }
1698
1699 fn list_queries<Q: Query>(&self) -> Vec<Q> {
1700 self.query_registry.get_all::<Q>()
1701 }
1702
1703 fn list_asset_keys<K: AssetKey>(&self) -> Vec<K> {
1704 self.asset_key_registry.get_all::<K>()
1705 }
1706}
1707
1708/// Tracks consistency of leaf asset accesses during query execution.
1709///
1710/// A "leaf" asset is one without dependencies (externally resolved via `resolve_asset`).
1711/// This tracker ensures that all leaf assets accessed during a query execution
1712/// (including those accessed by locators) are consistent - i.e., none were modified
1713/// via `resolve_asset` mid-execution.
1714///
1715/// The tracker is shared across `QueryContext` and `LocatorContext` to propagate
1716/// consistency checking through the entire execution tree.
1717#[derive(Debug)]
1718pub(crate) struct ConsistencyTracker {
1719 /// Global revision at query start. All leaf assets must have changed_at <= this.
1720 start_revision: RevisionCounter,
1721}
1722
1723impl ConsistencyTracker {
1724 /// Create a new tracker with the given start revision.
1725 pub fn new(start_revision: RevisionCounter) -> Self {
1726 Self { start_revision }
1727 }
1728
1729 /// Check consistency for a leaf asset access.
1730 ///
1731 /// A leaf asset is consistent if its changed_at <= start_revision.
1732 /// This detects if resolve_asset was called during query execution.
1733 ///
1734 /// Returns Ok(()) if consistent, Err if inconsistent.
1735 pub fn check_leaf_asset(&self, dep_changed_at: RevisionCounter) -> Result<(), QueryError> {
1736 if dep_changed_at > self.start_revision {
1737 Err(QueryError::InconsistentAssetResolution)
1738 } else {
1739 Ok(())
1740 }
1741 }
1742}
1743
1744/// Context provided to queries during execution.
1745///
1746/// Use this to access dependencies via `query()`.
1747pub struct QueryContext<'a, T: Tracer = NoopTracer> {
1748 runtime: &'a QueryRuntime<T>,
1749 current_key: FullCacheKey,
1750 parent_query_type: &'static str,
1751 exec_ctx: ExecutionContext,
1752 deps: RefCell<Vec<FullCacheKey>>,
1753}
1754
1755impl<'a, T: Tracer> QueryContext<'a, T> {
1756 /// Query a dependency.
1757 ///
1758 /// The dependency is automatically tracked for invalidation.
1759 ///
1760 /// # Example
1761 ///
1762 /// ```ignore
1763 /// fn query(self, db: &impl Db) -> Result<Self::Output, QueryError> {
1764 /// let dep_result = db.query(OtherQuery { id: self.id })?;
1765 /// Ok(process(&dep_result))
1766 /// }
1767 /// ```
1768 pub fn query<Q: Query>(&self, query: Q) -> Result<Arc<Q::Output>, QueryError> {
1769 let key = query.cache_key();
1770 let full_key = FullCacheKey::new::<Q, _>(&key);
1771
1772 // Emit dependency registered event
1773 self.runtime.tracer.on_dependency_registered(
1774 self.exec_ctx.span_id(),
1775 TracerQueryKey::new(self.parent_query_type, self.current_key.debug_repr()),
1776 TracerQueryKey::new(std::any::type_name::<Q>(), full_key.debug_repr()),
1777 );
1778
1779 // Record this as a dependency
1780 self.deps.borrow_mut().push(full_key.clone());
1781
1782 // Execute the query
1783 self.runtime.query(query)
1784 }
1785
1786 /// Access an asset, tracking it as a dependency.
1787 ///
1788 /// Returns the asset value if ready, or `Err(QueryError::Suspend)` if still loading.
1789 /// Use this with the `?` operator for automatic suspension on loading.
1790 ///
1791 /// # Example
1792 ///
1793 /// ```ignore
1794 /// #[query]
1795 /// fn process_file(db: &impl Db, path: FilePath) -> Result<Output, QueryError> {
1796 /// let content = db.asset(path)?;
1797 /// // Process content...
1798 /// Ok(output)
1799 /// }
1800 /// ```
1801 ///
1802 /// # Errors
1803 ///
1804 /// - Returns `Err(QueryError::Suspend)` if the asset is still loading.
1805 /// - Returns `Err(QueryError::UserError)` if the asset was not found.
1806 pub fn asset<K: AssetKey>(&self, key: K) -> Result<Arc<K::Asset>, QueryError> {
1807 self.asset_state(key)?.suspend()
1808 }
1809
1810 /// Access an asset's loading state, tracking it as a dependency.
1811 ///
1812 /// Unlike [`asset()`](Self::asset), this method returns the full loading state,
1813 /// allowing you to check if an asset is loading without triggering suspension.
1814 ///
1815 /// # Example
1816 ///
1817 /// ```ignore
1818 /// let state = db.asset_state(key)?;
1819 /// if state.is_loading() {
1820 /// // Handle loading case explicitly
1821 /// } else {
1822 /// let value = state.get().unwrap();
1823 /// }
1824 /// ```
1825 ///
1826 /// # Errors
1827 ///
1828 /// Returns `Err(QueryError::UserError)` if the asset was not found.
1829 pub fn asset_state<K: AssetKey>(&self, key: K) -> Result<AssetLoadingState<K>, QueryError> {
1830 let full_asset_key = FullAssetKey::new(&key);
1831 let full_cache_key = FullCacheKey::from_asset_key(&full_asset_key);
1832
1833 // 1. Emit asset dependency registered event
1834 self.runtime.tracer.on_asset_dependency_registered(
1835 self.exec_ctx.span_id(),
1836 TracerQueryKey::new(self.parent_query_type, self.current_key.debug_repr()),
1837 TracerAssetKey::new(std::any::type_name::<K>(), format!("{:?}", key)),
1838 );
1839
1840 // 2. Record dependency on this asset
1841 self.deps.borrow_mut().push(full_cache_key);
1842
1843 // 3. Get asset from cache/locator
1844 // Consistency checking for cached leaf assets is done inside get_asset_with_revision_ctx
1845 let (state, _changed_at) = self.runtime.get_asset_with_revision_ctx(key, self)?;
1846
1847 Ok(state)
1848 }
1849
1850 /// List all query instances of type Q that have been registered.
1851 ///
1852 /// This method establishes a dependency on the "set" of queries of type Q.
1853 /// The calling query will be invalidated when:
1854 /// - A new query of type Q is first executed (added to set)
1855 ///
1856 /// The calling query will NOT be invalidated when:
1857 /// - An individual query of type Q has its value change
1858 ///
1859 /// # Example
1860 ///
1861 /// ```ignore
1862 /// #[query]
1863 /// fn all_results(db: &impl Db) -> Result<Vec<i32>, QueryError> {
1864 /// let queries = db.list_queries::<MyQuery>();
1865 /// let mut results = Vec::new();
1866 /// for q in queries {
1867 /// results.push(*db.query(q)?);
1868 /// }
1869 /// Ok(results)
1870 /// }
1871 /// ```
1872 pub fn list_queries<Q: Query>(&self) -> Vec<Q> {
1873 // Record dependency on the sentinel (set-level dependency)
1874 let sentinel = FullCacheKey::query_set_sentinel::<Q>();
1875
1876 self.runtime.tracer.on_dependency_registered(
1877 self.exec_ctx.span_id(),
1878 TracerQueryKey::new(self.parent_query_type, self.current_key.debug_repr()),
1879 TracerQueryKey::new("QuerySet", sentinel.debug_repr()),
1880 );
1881
1882 // Ensure sentinel exists in whale (for dependency tracking)
1883 if self.runtime.whale.get(&sentinel).is_none() {
1884 let _ =
1885 self.runtime
1886 .whale
1887 .register(sentinel.clone(), None, Durability::volatile(), vec![]);
1888 }
1889
1890 self.deps.borrow_mut().push(sentinel);
1891
1892 // Return all registered queries
1893 self.runtime.query_registry.get_all::<Q>()
1894 }
1895
1896 /// List all asset keys of type K that have been registered.
1897 ///
1898 /// This method establishes a dependency on the "set" of asset keys of type K.
1899 /// The calling query will be invalidated when:
1900 /// - A new asset of type K is resolved for the first time (added to set)
1901 /// - An asset of type K is removed via remove_asset
1902 ///
1903 /// The calling query will NOT be invalidated when:
1904 /// - An individual asset's value changes (use `db.asset()` for that)
1905 ///
1906 /// # Example
1907 ///
1908 /// ```ignore
1909 /// #[query]
1910 /// fn all_configs(db: &impl Db) -> Result<Vec<String>, QueryError> {
1911 /// let keys = db.list_asset_keys::<ConfigFile>();
1912 /// let mut contents = Vec::new();
1913 /// for key in keys {
1914 /// let content = db.asset(key)?;
1915 /// contents.push((*content).clone());
1916 /// }
1917 /// Ok(contents)
1918 /// }
1919 /// ```
1920 pub fn list_asset_keys<K: AssetKey>(&self) -> Vec<K> {
1921 // Record dependency on the sentinel (set-level dependency)
1922 let sentinel = FullCacheKey::asset_key_set_sentinel::<K>();
1923
1924 self.runtime.tracer.on_asset_dependency_registered(
1925 self.exec_ctx.span_id(),
1926 TracerQueryKey::new(self.parent_query_type, self.current_key.debug_repr()),
1927 TracerAssetKey::new("AssetKeySet", sentinel.debug_repr()),
1928 );
1929
1930 // Ensure sentinel exists in whale (for dependency tracking)
1931 if self.runtime.whale.get(&sentinel).is_none() {
1932 let _ =
1933 self.runtime
1934 .whale
1935 .register(sentinel.clone(), None, Durability::volatile(), vec![]);
1936 }
1937
1938 self.deps.borrow_mut().push(sentinel);
1939
1940 // Return all registered asset keys
1941 self.runtime.asset_key_registry.get_all::<K>()
1942 }
1943}
1944
1945impl<'a, T: Tracer> Db for QueryContext<'a, T> {
1946 fn query<Q: Query>(&self, query: Q) -> Result<Arc<Q::Output>, QueryError> {
1947 QueryContext::query(self, query)
1948 }
1949
1950 fn asset<K: AssetKey>(&self, key: K) -> Result<Arc<K::Asset>, QueryError> {
1951 QueryContext::asset(self, key)
1952 }
1953
1954 fn asset_state<K: AssetKey>(&self, key: K) -> Result<AssetLoadingState<K>, QueryError> {
1955 QueryContext::asset_state(self, key)
1956 }
1957
1958 fn list_queries<Q: Query>(&self) -> Vec<Q> {
1959 QueryContext::list_queries(self)
1960 }
1961
1962 fn list_asset_keys<K: AssetKey>(&self) -> Vec<K> {
1963 QueryContext::list_asset_keys(self)
1964 }
1965}
1966
1967/// Context for collecting dependencies during asset locator execution.
1968///
1969/// Unlike `QueryContext`, this is specifically for locators and does not
1970/// register dependencies on any parent query. Dependencies collected here
1971/// are stored with the asset itself.
1972pub(crate) struct LocatorContext<'a, T: Tracer> {
1973 runtime: &'a QueryRuntime<T>,
1974 deps: RefCell<Vec<FullCacheKey>>,
1975}
1976
1977impl<'a, T: Tracer> LocatorContext<'a, T> {
1978 /// Create a new locator context for the given asset key.
1979 ///
1980 /// Consistency tracking is handled via thread-local storage, so leaf asset
1981 /// accesses will be checked against any active tracker from a parent query.
1982 pub(crate) fn new(runtime: &'a QueryRuntime<T>, _asset_key: FullCacheKey) -> Self {
1983 Self {
1984 runtime,
1985 deps: RefCell::new(Vec::new()),
1986 }
1987 }
1988
1989 /// Consume this context and return the collected dependencies.
1990 pub(crate) fn into_deps(self) -> Vec<FullCacheKey> {
1991 self.deps.into_inner()
1992 }
1993}
1994
1995impl<T: Tracer> Db for LocatorContext<'_, T> {
1996 fn query<Q: Query>(&self, query: Q) -> Result<Arc<Q::Output>, QueryError> {
1997 let key = query.cache_key();
1998 let full_key = FullCacheKey::new::<Q, _>(&key);
1999
2000 // Record this as a dependency of the asset being located
2001 self.deps.borrow_mut().push(full_key);
2002
2003 // Execute the query via the runtime
2004 self.runtime.query(query)
2005 }
2006
2007 fn asset<K: AssetKey>(&self, key: K) -> Result<Arc<K::Asset>, QueryError> {
2008 self.asset_state(key)?.suspend()
2009 }
2010
2011 fn asset_state<K: AssetKey>(&self, key: K) -> Result<AssetLoadingState<K>, QueryError> {
2012 let full_asset_key = FullAssetKey::new(&key);
2013 let full_cache_key = FullCacheKey::from_asset_key(&full_asset_key);
2014
2015 // Record this as a dependency of the asset being located
2016 self.deps.borrow_mut().push(full_cache_key);
2017
2018 // Access the asset - consistency checking is done inside get_asset_with_revision
2019 let (state, _changed_at) = self.runtime.get_asset_with_revision(key)?;
2020
2021 Ok(state)
2022 }
2023
2024 fn list_queries<Q: Query>(&self) -> Vec<Q> {
2025 self.runtime.list_queries()
2026 }
2027
2028 fn list_asset_keys<K: AssetKey>(&self) -> Vec<K> {
2029 self.runtime.list_asset_keys()
2030 }
2031}
2032
2033#[cfg(test)]
2034mod tests {
2035 use super::*;
2036
2037 #[test]
2038 fn test_simple_query() {
2039 #[derive(Clone)]
2040 struct Add {
2041 a: i32,
2042 b: i32,
2043 }
2044
2045 impl Query for Add {
2046 type CacheKey = (i32, i32);
2047 type Output = i32;
2048
2049 fn cache_key(&self) -> Self::CacheKey {
2050 (self.a, self.b)
2051 }
2052
2053 fn query(self, _db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
2054 Ok(Arc::new(self.a + self.b))
2055 }
2056
2057 fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
2058 old == new
2059 }
2060 }
2061
2062 let runtime = QueryRuntime::new();
2063
2064 let result = runtime.query(Add { a: 1, b: 2 }).unwrap();
2065 assert_eq!(*result, 3);
2066
2067 // Second query should be cached
2068 let result2 = runtime.query(Add { a: 1, b: 2 }).unwrap();
2069 assert_eq!(*result2, 3);
2070 }
2071
2072 #[test]
2073 fn test_dependent_queries() {
2074 #[derive(Clone)]
2075 struct Base {
2076 value: i32,
2077 }
2078
2079 impl Query for Base {
2080 type CacheKey = i32;
2081 type Output = i32;
2082
2083 fn cache_key(&self) -> Self::CacheKey {
2084 self.value
2085 }
2086
2087 fn query(self, _db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
2088 Ok(Arc::new(self.value * 2))
2089 }
2090
2091 fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
2092 old == new
2093 }
2094 }
2095
2096 #[derive(Clone)]
2097 struct Derived {
2098 base_value: i32,
2099 }
2100
2101 impl Query for Derived {
2102 type CacheKey = i32;
2103 type Output = i32;
2104
2105 fn cache_key(&self) -> Self::CacheKey {
2106 self.base_value
2107 }
2108
2109 fn query(self, db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
2110 let base = db.query(Base {
2111 value: self.base_value,
2112 })?;
2113 Ok(Arc::new(*base + 10))
2114 }
2115
2116 fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
2117 old == new
2118 }
2119 }
2120
2121 let runtime = QueryRuntime::new();
2122
2123 let result = runtime.query(Derived { base_value: 5 }).unwrap();
2124 assert_eq!(*result, 20); // 5 * 2 + 10
2125 }
2126
2127 #[test]
2128 fn test_cycle_detection() {
2129 #[derive(Clone)]
2130 struct CycleA {
2131 id: i32,
2132 }
2133
2134 #[derive(Clone)]
2135 struct CycleB {
2136 id: i32,
2137 }
2138
2139 impl Query for CycleA {
2140 type CacheKey = i32;
2141 type Output = i32;
2142
2143 fn cache_key(&self) -> Self::CacheKey {
2144 self.id
2145 }
2146
2147 fn query(self, db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
2148 let b = db.query(CycleB { id: self.id })?;
2149 Ok(Arc::new(*b + 1))
2150 }
2151
2152 fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
2153 old == new
2154 }
2155 }
2156
2157 impl Query for CycleB {
2158 type CacheKey = i32;
2159 type Output = i32;
2160
2161 fn cache_key(&self) -> Self::CacheKey {
2162 self.id
2163 }
2164
2165 fn query(self, db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
2166 let a = db.query(CycleA { id: self.id })?;
2167 Ok(Arc::new(*a + 1))
2168 }
2169
2170 fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
2171 old == new
2172 }
2173 }
2174
2175 let runtime = QueryRuntime::new();
2176
2177 let result = runtime.query(CycleA { id: 1 });
2178 assert!(matches!(result, Err(QueryError::Cycle { .. })));
2179 }
2180
2181 #[test]
2182 fn test_fallible_query() {
2183 #[derive(Clone)]
2184 struct ParseInt {
2185 input: String,
2186 }
2187
2188 impl Query for ParseInt {
2189 type CacheKey = String;
2190 type Output = Result<i32, std::num::ParseIntError>;
2191
2192 fn cache_key(&self) -> Self::CacheKey {
2193 self.input.clone()
2194 }
2195
2196 fn query(self, _db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
2197 Ok(Arc::new(self.input.parse()))
2198 }
2199
2200 fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
2201 old == new
2202 }
2203 }
2204
2205 let runtime = QueryRuntime::new();
2206
2207 // Valid parse
2208 let result = runtime
2209 .query(ParseInt {
2210 input: "42".to_string(),
2211 })
2212 .unwrap();
2213 assert_eq!(*result, Ok(42));
2214
2215 // Invalid parse - system succeeds, user error in output
2216 let result = runtime
2217 .query(ParseInt {
2218 input: "not_a_number".to_string(),
2219 })
2220 .unwrap();
2221 assert!(result.is_err());
2222 }
2223
2224 // Macro tests
2225 mod macro_tests {
2226 use super::*;
2227 use crate::query;
2228
2229 #[query]
2230 fn add(db: &impl Db, a: i32, b: i32) -> Result<i32, QueryError> {
2231 let _ = db; // silence unused warning
2232 Ok(a + b)
2233 }
2234
2235 #[test]
2236 fn test_macro_basic() {
2237 let runtime = QueryRuntime::new();
2238 let result = runtime.query(Add::new(1, 2)).unwrap();
2239 assert_eq!(*result, 3);
2240 }
2241
2242 #[query]
2243 fn simple_double(db: &impl Db, x: i32) -> Result<i32, QueryError> {
2244 let _ = db;
2245 Ok(x * 2)
2246 }
2247
2248 #[test]
2249 fn test_macro_simple() {
2250 let runtime = QueryRuntime::new();
2251 let result = runtime.query(SimpleDouble::new(5)).unwrap();
2252 assert_eq!(*result, 10);
2253 }
2254
2255 #[query(keys(id))]
2256 fn with_key_selection(
2257 db: &impl Db,
2258 id: u32,
2259 include_extra: bool,
2260 ) -> Result<String, QueryError> {
2261 let _ = db;
2262 Ok(format!("id={}, extra={}", id, include_extra))
2263 }
2264
2265 #[test]
2266 fn test_macro_key_selection() {
2267 let runtime = QueryRuntime::new();
2268
2269 // Same id, different include_extra - should return cached
2270 let r1 = runtime.query(WithKeySelection::new(1, true)).unwrap();
2271 let r2 = runtime.query(WithKeySelection::new(1, false)).unwrap();
2272
2273 // Both should have same value because only `id` is the key
2274 assert_eq!(*r1, "id=1, extra=true");
2275 assert_eq!(*r2, "id=1, extra=true"); // Cached!
2276 }
2277
2278 #[query]
2279 fn dependent(db: &impl Db, a: i32, b: i32) -> Result<i32, QueryError> {
2280 let sum = db.query(Add::new(a, b))?;
2281 Ok(*sum * 2)
2282 }
2283
2284 #[test]
2285 fn test_macro_dependencies() {
2286 let runtime = QueryRuntime::new();
2287 let result = runtime.query(Dependent::new(3, 4)).unwrap();
2288 assert_eq!(*result, 14); // (3 + 4) * 2
2289 }
2290
2291 #[query(output_eq)]
2292 fn with_output_eq(db: &impl Db, x: i32) -> Result<i32, QueryError> {
2293 let _ = db;
2294 Ok(x * 2)
2295 }
2296
2297 #[test]
2298 fn test_macro_output_eq() {
2299 let runtime = QueryRuntime::new();
2300 let result = runtime.query(WithOutputEq::new(5)).unwrap();
2301 assert_eq!(*result, 10);
2302 }
2303
2304 #[query(name = "CustomName")]
2305 fn original_name(db: &impl Db, x: i32) -> Result<i32, QueryError> {
2306 let _ = db;
2307 Ok(x)
2308 }
2309
2310 #[test]
2311 fn test_macro_custom_name() {
2312 let runtime = QueryRuntime::new();
2313 let result = runtime.query(CustomName::new(42)).unwrap();
2314 assert_eq!(*result, 42);
2315 }
2316
2317 // Test that attribute macros like #[tracing::instrument] are preserved
2318 // We use #[allow(unused_variables)] and #[inline] as test attributes since
2319 // they don't require external dependencies.
2320 #[allow(unused_variables)]
2321 #[inline]
2322 #[query]
2323 fn with_attributes(db: &impl Db, x: i32) -> Result<i32, QueryError> {
2324 // This would warn without #[allow(unused_variables)] on the generated method
2325 let unused_var = 42;
2326 Ok(x * 2)
2327 }
2328
2329 #[test]
2330 fn test_macro_preserves_attributes() {
2331 let runtime = QueryRuntime::new();
2332 // If attributes weren't preserved, this might warn about unused_var
2333 let result = runtime.query(WithAttributes::new(5)).unwrap();
2334 assert_eq!(*result, 10);
2335 }
2336 }
2337
2338 // Tests for poll() and changed_at()
2339 mod poll_tests {
2340 use super::*;
2341
2342 #[derive(Clone)]
2343 struct Counter {
2344 id: i32,
2345 }
2346
2347 impl Query for Counter {
2348 type CacheKey = i32;
2349 type Output = i32;
2350
2351 fn cache_key(&self) -> Self::CacheKey {
2352 self.id
2353 }
2354
2355 fn query(self, _db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
2356 Ok(Arc::new(self.id * 10))
2357 }
2358
2359 fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
2360 old == new
2361 }
2362 }
2363
2364 #[test]
2365 fn test_poll_returns_value_and_revision() {
2366 let runtime = QueryRuntime::new();
2367
2368 let result = runtime.poll(Counter { id: 1 }).unwrap();
2369
2370 // Value should be correct - access through Result and Arc
2371 assert_eq!(**result.value.as_ref().unwrap(), 10);
2372
2373 // Revision should be non-zero after first execution
2374 assert!(result.revision > 0);
2375 }
2376
2377 #[test]
2378 fn test_poll_revision_stable_on_cache_hit() {
2379 let runtime = QueryRuntime::new();
2380
2381 // First poll
2382 let result1 = runtime.poll(Counter { id: 1 }).unwrap();
2383 let rev1 = result1.revision;
2384
2385 // Second poll (cache hit)
2386 let result2 = runtime.poll(Counter { id: 1 }).unwrap();
2387 let rev2 = result2.revision;
2388
2389 // Revision should be the same (no change)
2390 assert_eq!(rev1, rev2);
2391 }
2392
2393 #[test]
2394 fn test_poll_revision_changes_on_invalidate() {
2395 let runtime = QueryRuntime::new();
2396
2397 // First poll
2398 let result1 = runtime.poll(Counter { id: 1 }).unwrap();
2399 let rev1 = result1.revision;
2400
2401 // Invalidate and poll again
2402 runtime.invalidate::<Counter>(&1);
2403 let result2 = runtime.poll(Counter { id: 1 }).unwrap();
2404 let rev2 = result2.revision;
2405
2406 // Revision should increase (value was recomputed)
2407 // Note: Since output_eq returns true (same value), this might not change
2408 // depending on early cutoff behavior. Let's verify the value is still correct.
2409 assert_eq!(**result2.value.as_ref().unwrap(), 10);
2410
2411 // With early cutoff, revision might stay the same if value didn't change
2412 // This is expected behavior
2413 assert!(rev2 >= rev1);
2414 }
2415
2416 #[test]
2417 fn test_changed_at_returns_none_for_unexecuted_query() {
2418 let runtime = QueryRuntime::new();
2419
2420 // Query has never been executed
2421 let rev = runtime.changed_at::<Counter>(&1);
2422 assert!(rev.is_none());
2423 }
2424
2425 #[test]
2426 fn test_changed_at_returns_revision_after_execution() {
2427 let runtime = QueryRuntime::new();
2428
2429 // Execute the query
2430 let _ = runtime.query(Counter { id: 1 }).unwrap();
2431
2432 // Now changed_at should return Some
2433 let rev = runtime.changed_at::<Counter>(&1);
2434 assert!(rev.is_some());
2435 assert!(rev.unwrap() > 0);
2436 }
2437
2438 #[test]
2439 fn test_changed_at_matches_poll_revision() {
2440 let runtime = QueryRuntime::new();
2441
2442 // Poll the query
2443 let result = runtime.poll(Counter { id: 1 }).unwrap();
2444
2445 // changed_at should match the revision from poll
2446 let rev = runtime.changed_at::<Counter>(&1);
2447 assert_eq!(rev, Some(result.revision));
2448 }
2449
2450 #[test]
2451 fn test_poll_value_access() {
2452 let runtime = QueryRuntime::new();
2453
2454 let result = runtime.poll(Counter { id: 5 }).unwrap();
2455
2456 // Access through Result and Arc
2457 let value: &i32 = result.value.as_ref().unwrap();
2458 assert_eq!(*value, 50);
2459
2460 // Access Arc directly via field after unwrapping Result
2461 let arc: &Arc<i32> = result.value.as_ref().unwrap();
2462 assert_eq!(**arc, 50);
2463 }
2464
2465 #[test]
2466 fn test_subscription_pattern() {
2467 let runtime = QueryRuntime::new();
2468
2469 // Simulate subscription pattern
2470 let mut last_revision: RevisionCounter = 0;
2471 let mut notifications = 0;
2472
2473 // First poll - should notify (new value)
2474 let result = runtime.poll(Counter { id: 1 }).unwrap();
2475 if result.revision > last_revision {
2476 notifications += 1;
2477 last_revision = result.revision;
2478 }
2479
2480 // Second poll - should NOT notify (no change)
2481 let result = runtime.poll(Counter { id: 1 }).unwrap();
2482 if result.revision > last_revision {
2483 notifications += 1;
2484 last_revision = result.revision;
2485 }
2486
2487 // Third poll - should NOT notify (no change)
2488 let result = runtime.poll(Counter { id: 1 }).unwrap();
2489 if result.revision > last_revision {
2490 notifications += 1;
2491 #[allow(unused_assignments)]
2492 {
2493 last_revision = result.revision;
2494 }
2495 }
2496
2497 // Only the first poll should have triggered a notification
2498 assert_eq!(notifications, 1);
2499 }
2500 }
2501
2502 // Tests for GC APIs
2503 mod gc_tests {
2504 use super::*;
2505 use std::collections::HashSet;
2506 use std::sync::atomic::{AtomicUsize, Ordering};
2507 use std::sync::Mutex;
2508
2509 #[derive(Clone)]
2510 struct Leaf {
2511 id: i32,
2512 }
2513
2514 impl Query for Leaf {
2515 type CacheKey = i32;
2516 type Output = i32;
2517
2518 fn cache_key(&self) -> Self::CacheKey {
2519 self.id
2520 }
2521
2522 fn query(self, _db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
2523 Ok(Arc::new(self.id * 10))
2524 }
2525
2526 fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
2527 old == new
2528 }
2529 }
2530
2531 #[derive(Clone)]
2532 struct Parent {
2533 child_id: i32,
2534 }
2535
2536 impl Query for Parent {
2537 type CacheKey = i32;
2538 type Output = i32;
2539
2540 fn cache_key(&self) -> Self::CacheKey {
2541 self.child_id
2542 }
2543
2544 fn query(self, db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
2545 let child = db.query(Leaf { id: self.child_id })?;
2546 Ok(Arc::new(*child + 1))
2547 }
2548
2549 fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
2550 old == new
2551 }
2552 }
2553
2554 #[test]
2555 fn test_query_keys_returns_all_cached_queries() {
2556 let runtime = QueryRuntime::new();
2557
2558 // Execute some queries
2559 let _ = runtime.query(Leaf { id: 1 }).unwrap();
2560 let _ = runtime.query(Leaf { id: 2 }).unwrap();
2561 let _ = runtime.query(Leaf { id: 3 }).unwrap();
2562
2563 // Get all keys
2564 let keys = runtime.query_keys();
2565
2566 // Should have at least 3 keys (might have more due to sentinels)
2567 assert!(keys.len() >= 3);
2568 }
2569
2570 #[test]
2571 fn test_remove_removes_query() {
2572 let runtime = QueryRuntime::new();
2573
2574 // Execute a query
2575 let _ = runtime.query(Leaf { id: 1 }).unwrap();
2576
2577 // Get the key
2578 let full_key = FullCacheKey::new::<Leaf, _>(&1);
2579
2580 // Query should exist
2581 assert!(runtime.changed_at::<Leaf>(&1).is_some());
2582
2583 // Remove it
2584 assert!(runtime.remove(&full_key));
2585
2586 // Query should no longer exist
2587 assert!(runtime.changed_at::<Leaf>(&1).is_none());
2588 }
2589
2590 #[test]
2591 fn test_remove_if_unused_removes_leaf_query() {
2592 let runtime = QueryRuntime::new();
2593
2594 // Execute a leaf query (no dependents)
2595 let _ = runtime.query(Leaf { id: 1 }).unwrap();
2596
2597 // Should be removable since no other query depends on it
2598 assert!(runtime.remove_query_if_unused::<Leaf>(&1));
2599
2600 // Query should no longer exist
2601 assert!(runtime.changed_at::<Leaf>(&1).is_none());
2602 }
2603
2604 #[test]
2605 fn test_remove_if_unused_does_not_remove_query_with_dependents() {
2606 let runtime = QueryRuntime::new();
2607
2608 // Execute parent query (which depends on Leaf)
2609 let _ = runtime.query(Parent { child_id: 1 }).unwrap();
2610
2611 // Leaf query should not be removable since Parent depends on it
2612 assert!(!runtime.remove_query_if_unused::<Leaf>(&1));
2613
2614 // Leaf query should still exist
2615 assert!(runtime.changed_at::<Leaf>(&1).is_some());
2616
2617 // But Parent should be removable (no dependents)
2618 assert!(runtime.remove_query_if_unused::<Parent>(&1));
2619 }
2620
2621 #[test]
2622 fn test_remove_if_unused_with_full_cache_key() {
2623 let runtime = QueryRuntime::new();
2624
2625 // Execute a leaf query
2626 let _ = runtime.query(Leaf { id: 1 }).unwrap();
2627
2628 let full_key = FullCacheKey::new::<Leaf, _>(&1);
2629
2630 // Should be removable via FullCacheKey
2631 assert!(runtime.remove_if_unused(&full_key));
2632
2633 // Query should no longer exist
2634 assert!(runtime.changed_at::<Leaf>(&1).is_none());
2635 }
2636
2637 // Test tracer receives on_query_key calls
2638 struct GcTracker {
2639 accessed_keys: Mutex<HashSet<String>>,
2640 access_count: AtomicUsize,
2641 }
2642
2643 impl GcTracker {
2644 fn new() -> Self {
2645 Self {
2646 accessed_keys: Mutex::new(HashSet::new()),
2647 access_count: AtomicUsize::new(0),
2648 }
2649 }
2650 }
2651
2652 impl Tracer for GcTracker {
2653 fn new_span_id(&self) -> SpanId {
2654 SpanId(1)
2655 }
2656
2657 fn on_query_key(&self, full_key: &FullCacheKey) {
2658 self.accessed_keys
2659 .lock()
2660 .unwrap()
2661 .insert(full_key.debug_repr().to_string());
2662 self.access_count.fetch_add(1, Ordering::Relaxed);
2663 }
2664 }
2665
2666 #[test]
2667 fn test_tracer_receives_on_query_key() {
2668 let tracker = GcTracker::new();
2669 let runtime = QueryRuntime::with_tracer(tracker);
2670
2671 // Execute some queries
2672 let _ = runtime.query(Leaf { id: 1 }).unwrap();
2673 let _ = runtime.query(Leaf { id: 2 }).unwrap();
2674
2675 // Tracer should have received on_query_key calls
2676 let count = runtime.tracer().access_count.load(Ordering::Relaxed);
2677 assert_eq!(count, 2);
2678
2679 // Check that the keys were recorded
2680 let keys = runtime.tracer().accessed_keys.lock().unwrap();
2681 assert!(keys.iter().any(|k| k.contains("Leaf")));
2682 }
2683
2684 #[test]
2685 fn test_tracer_receives_on_query_key_for_cache_hits() {
2686 let tracker = GcTracker::new();
2687 let runtime = QueryRuntime::with_tracer(tracker);
2688
2689 // Execute query twice (second is cache hit)
2690 let _ = runtime.query(Leaf { id: 1 }).unwrap();
2691 let _ = runtime.query(Leaf { id: 1 }).unwrap();
2692
2693 // Tracer should have received on_query_key for both calls
2694 let count = runtime.tracer().access_count.load(Ordering::Relaxed);
2695 assert_eq!(count, 2);
2696 }
2697
2698 #[test]
2699 fn test_gc_workflow() {
2700 let tracker = GcTracker::new();
2701 let runtime = QueryRuntime::with_tracer(tracker);
2702
2703 // Execute some queries
2704 let _ = runtime.query(Leaf { id: 1 }).unwrap();
2705 let _ = runtime.query(Leaf { id: 2 }).unwrap();
2706 let _ = runtime.query(Leaf { id: 3 }).unwrap();
2707
2708 // Simulate GC: remove all queries that are not in use
2709 let mut removed = 0;
2710 for key in runtime.query_keys() {
2711 if runtime.remove_if_unused(&key) {
2712 removed += 1;
2713 }
2714 }
2715
2716 // All leaf queries should be removable
2717 assert!(removed >= 3);
2718
2719 // Queries should no longer exist
2720 assert!(runtime.changed_at::<Leaf>(&1).is_none());
2721 assert!(runtime.changed_at::<Leaf>(&2).is_none());
2722 assert!(runtime.changed_at::<Leaf>(&3).is_none());
2723 }
2724 }
2725}