query_flow/runtime.rs
1//! Query runtime and context.
2
3use std::any::{Any, TypeId};
4use std::cell::RefCell;
5use std::ops::Deref;
6use std::rc::Rc;
7use std::sync::Arc;
8
9use whale::{Durability, GetOrInsertResult, RevisionCounter, Runtime as WhaleRuntime};
10
11use crate::asset::{AssetKey, AssetLocator, DurabilityLevel, PendingAsset};
12use crate::db::Db;
13use crate::key::{
14 AssetCacheKey, AssetKeySetSentinelKey, FullCacheKey, QueryCacheKey, QuerySetSentinelKey,
15};
16use crate::loading::AssetLoadingState;
17use crate::query::Query;
18use crate::storage::{
19 AssetKeyRegistry, CachedEntry, CachedValue, ErasedLocateResult, LocatorStorage, PendingStorage,
20 QueryRegistry, VerifierStorage,
21};
22use crate::tracer::{
23 ExecutionResult, InvalidationReason, NoopTracer, SpanContext, SpanId, TraceId, Tracer,
24 TracerAssetState,
25};
26use crate::QueryError;
27
28/// Function type for comparing user errors during early cutoff.
29///
30/// Used by `QueryRuntimeBuilder::error_comparator` to customize how
31/// `QueryError::UserError` values are compared for caching purposes.
32pub type ErrorComparator = fn(&anyhow::Error, &anyhow::Error) -> bool;
33
34/// Number of durability levels (matches whale's default).
35const DURABILITY_LEVELS: usize = 4;
36
37// Thread-local query execution stack for cycle detection.
38thread_local! {
39 static QUERY_STACK: RefCell<Vec<FullCacheKey>> = const { RefCell::new(Vec::new()) };
40
41 /// Current consistency tracker for leaf asset checks.
42 /// Set during query execution, used by all nested locator calls.
43 /// Uses Rc since thread-local is single-threaded.
44 static CONSISTENCY_TRACKER: RefCell<Option<Rc<ConsistencyTracker>>> = const { RefCell::new(None) };
45
46 /// Stack for tracking parent-child query relationships.
47 static SPAN_STACK: RefCell<SpanStack> = const { RefCell::new(SpanStack::Empty) };
48}
49
50/// Thread-local span stack state.
51/// Empty when no query is executing; Active with trace_id and span stack during execution.
52enum SpanStack {
53 Empty,
54 Active(TraceId, Vec<SpanId>),
55}
56
57/// Check a leaf asset against the current consistency tracker (if any).
58/// Returns Ok if no tracker is set or if the check passes.
59fn check_leaf_asset_consistency(dep_changed_at: RevisionCounter) -> Result<(), QueryError> {
60 CONSISTENCY_TRACKER.with(|tracker| {
61 if let Some(ref t) = *tracker.borrow() {
62 t.check_leaf_asset(dep_changed_at)
63 } else {
64 Ok(())
65 }
66 })
67}
68
69/// RAII guard that sets the consistency tracker for the current thread.
70struct ConsistencyTrackerGuard {
71 previous: Option<Rc<ConsistencyTracker>>,
72}
73
74impl ConsistencyTrackerGuard {
75 fn new(tracker: Rc<ConsistencyTracker>) -> Self {
76 let previous = CONSISTENCY_TRACKER.with(|t| t.borrow_mut().replace(tracker));
77 Self { previous }
78 }
79}
80
81impl Drop for ConsistencyTrackerGuard {
82 fn drop(&mut self) {
83 CONSISTENCY_TRACKER.with(|t| {
84 *t.borrow_mut() = self.previous.take();
85 });
86 }
87}
88
89/// Check for cycles in the query stack and return error if detected.
90fn check_cycle(key: &FullCacheKey) -> Result<(), QueryError> {
91 let cycle_detected = QUERY_STACK.with(|stack| stack.borrow().iter().any(|k| k == key));
92 if cycle_detected {
93 let path = QUERY_STACK.with(|stack| {
94 let stack = stack.borrow();
95 let mut path: Vec<FullCacheKey> = stack.iter().cloned().collect();
96 path.push(key.clone());
97 path
98 });
99 return Err(QueryError::Cycle { path });
100 }
101 Ok(())
102}
103
104/// RAII guard for pushing/popping from query stack.
105struct StackGuard;
106
107impl StackGuard {
108 fn push(key: FullCacheKey) -> Self {
109 QUERY_STACK.with(|stack| stack.borrow_mut().push(key));
110 StackGuard
111 }
112}
113
114impl Drop for StackGuard {
115 fn drop(&mut self) {
116 QUERY_STACK.with(|stack| {
117 stack.borrow_mut().pop();
118 });
119 }
120}
121
122/// RAII guard for pushing/popping from span stack.
123struct SpanStackGuard;
124
125impl SpanStackGuard {
126 /// Push a span onto the stack. Sets trace_id if this is the root span.
127 fn push(trace_id: TraceId, span_id: SpanId) -> Self {
128 SPAN_STACK.with(|stack| {
129 let mut s = stack.borrow_mut();
130 match &mut *s {
131 SpanStack::Empty => *s = SpanStack::Active(trace_id, vec![span_id]),
132 SpanStack::Active(_, spans) => spans.push(span_id),
133 }
134 });
135 SpanStackGuard
136 }
137}
138
139impl Drop for SpanStackGuard {
140 fn drop(&mut self) {
141 SPAN_STACK.with(|stack| {
142 let mut s = stack.borrow_mut();
143 if let SpanStack::Active(_, spans) = &mut *s {
144 spans.pop();
145 if spans.is_empty() {
146 *s = SpanStack::Empty;
147 }
148 }
149 });
150 }
151}
152
153/// Execution context passed through query execution.
154///
155/// Contains a SpanContext for tracing correlation with parent-child relationships.
156#[derive(Clone, Copy)]
157pub struct ExecutionContext {
158 span_ctx: SpanContext,
159}
160
161impl ExecutionContext {
162 /// Create a new execution context with the given span context.
163 #[inline]
164 pub fn new(span_ctx: SpanContext) -> Self {
165 Self { span_ctx }
166 }
167
168 /// Get the span context for this execution context.
169 #[inline]
170 pub fn span_ctx(&self) -> &SpanContext {
171 &self.span_ctx
172 }
173}
174
175/// Result of polling a query, containing the value and its revision.
176///
177/// This is returned by [`QueryRuntime::poll`] and provides both the query result
178/// and its change revision, enabling efficient change detection for subscription
179/// patterns.
180///
181/// # Example
182///
183/// ```ignore
184/// let result = runtime.poll(MyQuery::new())?;
185///
186/// // Access the value via Deref
187/// println!("{:?}", *result);
188///
189/// // Check if changed since last poll
190/// if result.revision > last_known_revision {
191/// notify_subscribers(&result.value);
192/// last_known_revision = result.revision;
193/// }
194/// ```
195#[derive(Debug, Clone)]
196pub struct Polled<T> {
197 /// The query result value.
198 pub value: T,
199 /// The revision at which this value was last changed.
200 ///
201 /// Compare this with a previously stored revision to detect changes.
202 pub revision: RevisionCounter,
203}
204
205impl<T: Deref> Deref for Polled<T> {
206 type Target = T::Target;
207
208 fn deref(&self) -> &Self::Target {
209 &self.value
210 }
211}
212
213/// The query runtime manages query execution, caching, and dependency tracking.
214///
215/// This is cheap to clone - all data is behind `Arc`.
216///
217/// # Type Parameter
218///
219/// - `T: Tracer` - The tracer type for observability. Use `NoopTracer` (default)
220/// for zero-cost when tracing is not needed.
221///
222/// # Example
223///
224/// ```ignore
225/// // Without tracing (default)
226/// let runtime = QueryRuntime::new();
227///
228/// // With tracing
229/// let tracer = MyTracer::new();
230/// let runtime = QueryRuntime::with_tracer(tracer);
231///
232/// // Sync query execution
233/// let result = runtime.query(MyQuery { ... })?;
234/// ```
235pub struct QueryRuntime<T: Tracer = NoopTracer> {
236 /// Whale runtime for dependency tracking and cache storage.
237 /// Query outputs and asset values are stored in Node.data as Option<CachedEntry>.
238 whale: WhaleRuntime<FullCacheKey, Option<CachedEntry>, DURABILITY_LEVELS>,
239 /// Registered asset locators
240 locators: Arc<LocatorStorage<T>>,
241 /// Pending asset requests
242 pending: Arc<PendingStorage>,
243 /// Registry for tracking query instances (for list_queries)
244 query_registry: Arc<QueryRegistry>,
245 /// Registry for tracking asset keys (for list_asset_keys)
246 asset_key_registry: Arc<AssetKeyRegistry>,
247 /// Verifiers for re-executing queries (for verify-then-decide pattern)
248 verifiers: Arc<VerifierStorage>,
249 /// Comparator for user errors during early cutoff
250 error_comparator: ErrorComparator,
251 /// Tracer for observability
252 tracer: Arc<T>,
253}
254
255#[test]
256fn test_runtime_send_sync() {
257 fn assert_send_sync<T: Send + Sync>() {}
258 assert_send_sync::<QueryRuntime<NoopTracer>>();
259}
260
261impl Default for QueryRuntime<NoopTracer> {
262 fn default() -> Self {
263 Self::new()
264 }
265}
266
267impl<T: Tracer> Clone for QueryRuntime<T> {
268 fn clone(&self) -> Self {
269 Self {
270 whale: self.whale.clone(),
271 locators: self.locators.clone(),
272 pending: self.pending.clone(),
273 query_registry: self.query_registry.clone(),
274 asset_key_registry: self.asset_key_registry.clone(),
275 verifiers: self.verifiers.clone(),
276 error_comparator: self.error_comparator,
277 tracer: self.tracer.clone(),
278 }
279 }
280}
281
282/// Default error comparator that treats all errors as different.
283///
284/// This is conservative - it always triggers recomputation when an error occurs.
285fn default_error_comparator(_a: &anyhow::Error, _b: &anyhow::Error) -> bool {
286 false
287}
288
289impl<T: Tracer> QueryRuntime<T> {
290 /// Get cached output along with its revision (single atomic access).
291 fn get_cached_with_revision<Q: Query>(
292 &self,
293 key: &FullCacheKey,
294 ) -> Option<(CachedValue<Arc<Q::Output>>, RevisionCounter)> {
295 let node = self.whale.get(key)?;
296 let revision = node.changed_at;
297 let entry = node.data.as_ref()?;
298 let cached = entry.to_cached_value::<Q::Output>()?;
299 Some((cached, revision))
300 }
301
302 /// Get a reference to the tracer.
303 #[inline]
304 pub fn tracer(&self) -> &T {
305 &self.tracer
306 }
307}
308
309impl QueryRuntime<NoopTracer> {
310 /// Create a new query runtime with default settings.
311 pub fn new() -> Self {
312 Self::with_tracer(NoopTracer)
313 }
314
315 /// Create a builder for customizing the runtime.
316 ///
317 /// # Example
318 ///
319 /// ```ignore
320 /// let runtime = QueryRuntime::builder()
321 /// .error_comparator(|a, b| {
322 /// // Custom error comparison logic
323 /// match (a.downcast_ref::<MyError>(), b.downcast_ref::<MyError>()) {
324 /// (Some(a), Some(b)) => a == b,
325 /// _ => false,
326 /// }
327 /// })
328 /// .build();
329 /// ```
330 pub fn builder() -> QueryRuntimeBuilder<NoopTracer> {
331 QueryRuntimeBuilder::new()
332 }
333}
334
335impl<T: Tracer> QueryRuntime<T> {
336 /// Create a new query runtime with the specified tracer.
337 pub fn with_tracer(tracer: T) -> Self {
338 QueryRuntimeBuilder::new().tracer(tracer).build()
339 }
340
341 /// Execute a query synchronously.
342 ///
343 /// Returns the cached result if valid, otherwise executes the query.
344 ///
345 /// # Errors
346 ///
347 /// - `QueryError::Suspend` - Query is waiting for async loading
348 /// - `QueryError::Cycle` - Dependency cycle detected
349 pub fn query<Q: Query>(&self, query: Q) -> Result<Arc<Q::Output>, QueryError> {
350 self.query_internal(query)
351 .and_then(|(inner_result, _)| inner_result.map_err(QueryError::UserError))
352 }
353
354 /// Internal implementation shared by query() and poll().
355 ///
356 /// Returns (result, revision) tuple where result is either Ok(output) or Err(user_error).
357 /// System errors (Suspend, Cycle, etc.) are returned as the outer Err.
358 #[allow(clippy::type_complexity)]
359 fn query_internal<Q: Query>(
360 &self,
361 query: Q,
362 ) -> Result<(Result<Arc<Q::Output>, Arc<anyhow::Error>>, RevisionCounter), QueryError> {
363 let query_cache_key = QueryCacheKey::new(query.clone());
364 let full_key: FullCacheKey = query_cache_key.clone().into();
365
366 // Create SpanContext with parent relationship from SPAN_STACK
367 let span_id = self.tracer.new_span_id();
368 let (trace_id, parent_span_id) = SPAN_STACK.with(|stack| match &*stack.borrow() {
369 SpanStack::Empty => (self.tracer.new_trace_id(), None),
370 SpanStack::Active(tid, spans) => (*tid, spans.last().copied()),
371 });
372 let span_ctx = SpanContext {
373 span_id,
374 trace_id,
375 parent_span_id,
376 };
377
378 // Push to span stack and create execution context
379 let _span_guard = SpanStackGuard::push(trace_id, span_id);
380 let exec_ctx = ExecutionContext::new(span_ctx);
381
382 self.tracer.on_query_start(&span_ctx, &query_cache_key);
383
384 // Check for cycles using thread-local stack
385 let cycle_detected = QUERY_STACK.with(|stack| {
386 let stack = stack.borrow();
387 stack.iter().any(|k| k == &full_key)
388 });
389
390 if cycle_detected {
391 let path = QUERY_STACK.with(|stack| {
392 let stack = stack.borrow();
393 let mut path: Vec<FullCacheKey> = stack.iter().cloned().collect();
394 path.push(full_key.clone());
395 path
396 });
397
398 self.tracer.on_cycle_detected(&path);
399 self.tracer
400 .on_query_end(&span_ctx, &query_cache_key, ExecutionResult::CycleDetected);
401
402 return Err(QueryError::Cycle { path });
403 }
404
405 // Check if cached and valid (with verify-then-decide pattern)
406 let current_rev = self.whale.current_revision();
407
408 // Fast path: already verified at current revision
409 if self.whale.is_verified_at(&full_key, ¤t_rev) {
410 // Single atomic access to get both cached value and revision
411 if let Some((cached, revision)) = self.get_cached_with_revision::<Q>(&full_key) {
412 self.tracer
413 .on_cache_check(&span_ctx, &query_cache_key, true);
414 self.tracer
415 .on_query_end(&span_ctx, &query_cache_key, ExecutionResult::CacheHit);
416
417 return match cached {
418 CachedValue::Ok(output) => Ok((Ok(output), revision)),
419 CachedValue::UserError(err) => Ok((Err(err), revision)),
420 };
421 }
422 }
423
424 // Check shallow validity (deps' changed_at ok) and try verify-then-decide
425 if self.whale.is_valid(&full_key) {
426 // Single atomic access to get both cached value and revision
427 if let Some((cached, revision)) = self.get_cached_with_revision::<Q>(&full_key) {
428 // Shallow valid but not verified - verify deps first
429 let mut deps_verified = true;
430 if let Some(deps) = self.whale.get_dependency_ids(&full_key) {
431 for dep in deps {
432 if let Some(verifier) = self.verifiers.get(&dep) {
433 // Re-run query/asset to verify it (triggers recursive verification)
434 // For assets, this re-accesses the asset which may re-run the locator
435 if verifier.verify(self as &dyn std::any::Any).is_err() {
436 deps_verified = false;
437 break;
438 }
439 }
440 // Note: deps without verifiers are assumed valid (they're verified
441 // by the final is_valid check if their changed_at increased)
442 }
443 }
444
445 // Re-check validity after deps are verified
446 if deps_verified && self.whale.is_valid(&full_key) {
447 // Deps didn't change their changed_at, mark as verified and use cached
448 self.whale.mark_verified(&full_key, ¤t_rev);
449
450 self.tracer
451 .on_cache_check(&span_ctx, &query_cache_key, true);
452 self.tracer.on_query_end(
453 &span_ctx,
454 &query_cache_key,
455 ExecutionResult::CacheHit,
456 );
457
458 return match cached {
459 CachedValue::Ok(output) => Ok((Ok(output), revision)),
460 CachedValue::UserError(err) => Ok((Err(err), revision)),
461 };
462 }
463 // A dep's changed_at increased, fall through to execute
464 }
465 }
466
467 self.tracer
468 .on_cache_check(&span_ctx, &query_cache_key, false);
469
470 // Execute the query with cycle tracking
471 let _guard = StackGuard::push(full_key.clone());
472 let result = self.execute_query::<Q>(&query, &query_cache_key, &full_key, exec_ctx);
473 drop(_guard);
474
475 // Emit end event
476 let exec_result = match &result {
477 Ok((_, true, _)) => ExecutionResult::Changed,
478 Ok((_, false, _)) => ExecutionResult::Unchanged,
479 Err(QueryError::Suspend { .. }) => ExecutionResult::Suspended,
480 Err(QueryError::Cycle { .. }) => ExecutionResult::CycleDetected,
481 Err(e) => ExecutionResult::Error {
482 message: format!("{:?}", e),
483 },
484 };
485 self.tracer
486 .on_query_end(&span_ctx, &query_cache_key, exec_result);
487
488 result.map(|(inner_result, _, revision)| (inner_result, revision))
489 }
490
491 /// Execute a query, caching the result if appropriate.
492 ///
493 /// Returns (result, output_changed, revision) tuple.
494 /// - `result`: Ok(output) for success, Err(user_error) for user errors
495 /// - System errors (Suspend, Cycle, etc.) are returned as outer Err
496 #[allow(clippy::type_complexity)]
497 fn execute_query<Q: Query>(
498 &self,
499 query: &Q,
500 query_cache_key: &QueryCacheKey,
501 full_key: &FullCacheKey,
502 exec_ctx: ExecutionContext,
503 ) -> Result<
504 (
505 Result<Arc<Q::Output>, Arc<anyhow::Error>>,
506 bool,
507 RevisionCounter,
508 ),
509 QueryError,
510 > {
511 // Capture current global revision at query start for consistency checking
512 let start_revision = self.whale.current_revision().get(Durability::volatile());
513
514 // Create consistency tracker for this query execution
515 let tracker = Rc::new(ConsistencyTracker::new(start_revision));
516
517 // Set thread-local tracker for nested locator calls
518 let _tracker_guard = ConsistencyTrackerGuard::new(tracker);
519
520 // Create context for this query execution
521 let ctx = QueryContext {
522 runtime: self,
523 current_key: full_key.clone(),
524 exec_ctx,
525 deps: RefCell::new(Vec::new()),
526 };
527
528 // Execute the query (clone because query() takes ownership)
529 let db = DbDispatch::QueryContext(&ctx);
530 let result = query.clone().query(&db);
531
532 // Get collected dependencies
533 let deps: Vec<FullCacheKey> = ctx.deps.borrow().clone();
534
535 // Query durability defaults to stable - Whale will automatically reduce
536 // the effective durability to min(requested, min(dep_durabilities)).
537 // A pure query with no dependencies remains stable.
538 // A query depending on volatile assets becomes volatile.
539 let durability = Durability::stable();
540
541 match result {
542 Ok(output) => {
543 // Check if output changed (for early cutoff)
544 // existing_revision is Some only when output is unchanged (can reuse revision)
545 let existing_revision = if let Some((CachedValue::Ok(old), rev)) =
546 self.get_cached_with_revision::<Q>(full_key)
547 {
548 if Q::output_eq(&*old, &*output) {
549 Some(rev) // Same output - reuse revision
550 } else {
551 None // Different output
552 }
553 } else {
554 None // No previous Ok value
555 };
556 let output_changed = existing_revision.is_none();
557
558 // Emit early cutoff check event
559 self.tracer.on_early_cutoff_check(
560 exec_ctx.span_ctx(),
561 query_cache_key,
562 output_changed,
563 );
564
565 // Update whale with cached entry (atomic update of value + dependency state)
566 let entry = CachedEntry::Ok(output.clone() as Arc<dyn std::any::Any + Send + Sync>);
567 let revision = if let Some(existing_rev) = existing_revision {
568 // confirm_unchanged doesn't change changed_at, use existing
569 let _ = self.whale.confirm_unchanged(full_key, deps);
570 existing_rev
571 } else {
572 // Use new_rev from register result
573 match self
574 .whale
575 .register(full_key.clone(), Some(entry), durability, deps)
576 {
577 Ok(result) => result.new_rev,
578 Err(missing) => {
579 return Err(QueryError::DependenciesRemoved {
580 missing_keys: missing,
581 })
582 }
583 }
584 };
585
586 // Register query in registry for list_queries
587 let is_new_query = self.query_registry.register(query);
588 if is_new_query {
589 let sentinel = QuerySetSentinelKey::new::<Q>().into();
590 let _ = self
591 .whale
592 .register(sentinel, None, Durability::stable(), vec![]);
593 }
594
595 // Store verifier for this query (for verify-then-decide pattern)
596 self.verifiers
597 .insert::<Q, T>(full_key.clone(), query.clone());
598
599 Ok((Ok(output), output_changed, revision))
600 }
601 Err(QueryError::UserError(err)) => {
602 // Check if error changed (for early cutoff)
603 // existing_revision is Some only when error is unchanged (can reuse revision)
604 let existing_revision = if let Some((CachedValue::UserError(old_err), rev)) =
605 self.get_cached_with_revision::<Q>(full_key)
606 {
607 if (self.error_comparator)(old_err.as_ref(), err.as_ref()) {
608 Some(rev) // Same error - reuse revision
609 } else {
610 None // Different error
611 }
612 } else {
613 None // No previous UserError
614 };
615 let output_changed = existing_revision.is_none();
616
617 // Emit early cutoff check event
618 self.tracer.on_early_cutoff_check(
619 exec_ctx.span_ctx(),
620 query_cache_key,
621 output_changed,
622 );
623
624 // Update whale with cached error (atomic update of value + dependency state)
625 let entry = CachedEntry::UserError(err.clone());
626 let revision = if let Some(existing_rev) = existing_revision {
627 // confirm_unchanged doesn't change changed_at, use existing
628 let _ = self.whale.confirm_unchanged(full_key, deps);
629 existing_rev
630 } else {
631 // Use new_rev from register result
632 match self
633 .whale
634 .register(full_key.clone(), Some(entry), durability, deps)
635 {
636 Ok(result) => result.new_rev,
637 Err(missing) => {
638 return Err(QueryError::DependenciesRemoved {
639 missing_keys: missing,
640 })
641 }
642 }
643 };
644
645 // Register query in registry for list_queries
646 let is_new_query = self.query_registry.register(query);
647 if is_new_query {
648 let sentinel = QuerySetSentinelKey::new::<Q>().into();
649 let _ = self
650 .whale
651 .register(sentinel, None, Durability::stable(), vec![]);
652 }
653
654 // Store verifier for this query (for verify-then-decide pattern)
655 self.verifiers
656 .insert::<Q, T>(full_key.clone(), query.clone());
657
658 Ok((Err(err), output_changed, revision))
659 }
660 Err(e) => {
661 // System errors (Suspend, Cycle, Cancelled) are not cached
662 Err(e)
663 }
664 }
665 }
666
667 /// Invalidate a query, forcing recomputation on next access.
668 ///
669 /// This also invalidates any queries that depend on this one.
670 pub fn invalidate<Q: Query>(&self, query: &Q) {
671 let query_cache_key = QueryCacheKey::new(query.clone());
672 let full_key: FullCacheKey = query_cache_key.clone().into();
673
674 self.tracer
675 .on_query_invalidated(&query_cache_key, InvalidationReason::ManualInvalidation);
676
677 // Update whale to invalidate dependents (register with None to clear cached value)
678 // Use stable durability to increment all revision counters, ensuring queries
679 // at any durability level will see this as a change.
680 let _ = self
681 .whale
682 .register(full_key, None, Durability::stable(), vec![]);
683 }
684
685 /// Remove a query from the cache entirely, freeing memory.
686 ///
687 /// Use this for GC when a query is no longer needed.
688 /// Unlike `invalidate`, this removes all traces of the query from storage.
689 /// The query will be recomputed from scratch on next access.
690 ///
691 /// This also invalidates any queries that depend on this one.
692 pub fn remove_query<Q: Query>(&self, query: &Q) {
693 let query_cache_key = QueryCacheKey::new(query.clone());
694 let full_key: FullCacheKey = query_cache_key.clone().into();
695
696 self.tracer
697 .on_query_invalidated(&query_cache_key, InvalidationReason::ManualInvalidation);
698
699 // Remove verifier if exists
700 self.verifiers.remove(&full_key);
701
702 // Remove from whale storage (this also handles dependent invalidation)
703 self.whale.remove(&full_key);
704
705 // Remove from registry and update sentinel for list_queries
706 if self.query_registry.remove::<Q>(query) {
707 let sentinel = QuerySetSentinelKey::new::<Q>().into();
708 let _ = self
709 .whale
710 .register(sentinel, None, Durability::stable(), vec![]);
711 }
712 }
713
714 /// Clear all cached values by removing all nodes from whale.
715 ///
716 /// Note: This is a relatively expensive operation as it iterates through all keys.
717 pub fn clear_cache(&self) {
718 let keys = self.whale.keys();
719 for key in keys {
720 self.whale.remove(&key);
721 }
722 }
723
724 /// Poll a query, returning both the result and its change revision.
725 ///
726 /// This is useful for implementing subscription patterns where you need to
727 /// detect changes efficiently. Compare the returned `revision` with a
728 /// previously stored value to determine if the query result has changed.
729 ///
730 /// The returned `Polled` contains a `Result<Arc<Q::Output>, Arc<anyhow::Error>>`
731 /// as its value, allowing you to track revision changes for both success and
732 /// user error cases.
733 ///
734 /// # Example
735 ///
736 /// ```ignore
737 /// struct Subscription<Q: Query> {
738 /// query: Q,
739 /// last_revision: RevisionCounter,
740 /// tx: Sender<Result<Arc<Q::Output>, Arc<anyhow::Error>>>,
741 /// }
742 ///
743 /// // Polling loop
744 /// for sub in &mut subscriptions {
745 /// let result = runtime.poll(sub.query.clone())?;
746 /// if result.revision > sub.last_revision {
747 /// sub.tx.send(result.value.clone())?;
748 /// sub.last_revision = result.revision;
749 /// }
750 /// }
751 /// ```
752 ///
753 /// # Errors
754 ///
755 /// Returns `Err` only for system errors (Suspend, Cycle, etc.).
756 /// User errors are returned as `Ok(Polled { value: Err(error), ... })`.
757 #[allow(clippy::type_complexity)]
758 pub fn poll<Q: Query>(
759 &self,
760 query: Q,
761 ) -> Result<Polled<Result<Arc<Q::Output>, Arc<anyhow::Error>>>, QueryError> {
762 let (value, revision) = self.query_internal(query)?;
763 Ok(Polled { value, revision })
764 }
765
766 /// Get the change revision of a query without executing it.
767 ///
768 /// Returns `None` if the query has never been executed.
769 ///
770 /// This is useful for checking if a query has changed since the last poll
771 /// without the cost of executing the query.
772 ///
773 /// # Example
774 ///
775 /// ```ignore
776 /// // Check if query has changed before deciding to poll
777 /// if let Some(rev) = runtime.changed_at(&MyQuery::new(key)) {
778 /// if rev > last_known_revision {
779 /// let result = runtime.query(MyQuery::new(key))?;
780 /// // Process result...
781 /// }
782 /// }
783 /// ```
784 pub fn changed_at<Q: Query>(&self, query: &Q) -> Option<RevisionCounter> {
785 let full_key = QueryCacheKey::new(query.clone()).into();
786 self.whale.get(&full_key).map(|node| node.changed_at)
787 }
788}
789
790// ============================================================================
791// GC (Garbage Collection) API
792// ============================================================================
793
794impl<T: Tracer> QueryRuntime<T> {
795 /// Get all query keys currently in the cache.
796 ///
797 /// This is useful for implementing custom garbage collection strategies.
798 /// Use this in combination with [`Tracer::on_query_key`] to track access
799 /// times and implement LRU, TTL, or other GC algorithms externally.
800 ///
801 /// # Example
802 ///
803 /// ```ignore
804 /// // Collect all keys that haven't been accessed recently
805 /// let stale_keys: Vec<_> = runtime.query_keys()
806 /// .filter(|key| tracker.is_stale(key))
807 /// .collect();
808 ///
809 /// // Remove stale queries that have no dependents
810 /// for key in stale_keys {
811 /// runtime.remove_if_unused(&key);
812 /// }
813 /// ```
814 pub fn query_keys(&self) -> Vec<FullCacheKey> {
815 self.whale.keys()
816 }
817
818 /// Remove a query if it has no dependents.
819 ///
820 /// Returns `true` if the query was removed, `false` if it has dependents
821 /// or doesn't exist. This is the safe way to remove queries during GC,
822 /// as it won't break queries that depend on this one.
823 ///
824 /// # Example
825 ///
826 /// ```ignore
827 /// let query = MyQuery::new(cache_key);
828 /// if runtime.remove_query_if_unused(&query) {
829 /// println!("Query removed");
830 /// } else {
831 /// println!("Query has dependents, not removed");
832 /// }
833 /// ```
834 pub fn remove_query_if_unused<Q: Query>(&self, query: &Q) -> bool {
835 let full_key = QueryCacheKey::new(query.clone()).into();
836 self.remove_if_unused(&full_key)
837 }
838
839 /// Remove a query by its [`FullCacheKey`].
840 ///
841 /// This is the type-erased version of [`remove_query`](Self::remove_query).
842 /// Use this when you have a `FullCacheKey` from [`query_keys`](Self::query_keys)
843 /// or [`Tracer::on_query_key`].
844 ///
845 /// Returns `true` if the query was removed, `false` if it doesn't exist.
846 ///
847 /// # Warning
848 ///
849 /// This forcibly removes the query even if other queries depend on it.
850 /// Dependent queries will be recomputed on next access. For safe GC,
851 /// use [`remove_if_unused`](Self::remove_if_unused) instead.
852 pub fn remove(&self, key: &FullCacheKey) -> bool {
853 // Remove verifier if exists
854 self.verifiers.remove(key);
855
856 // Remove from whale storage
857 self.whale.remove(key).is_some()
858 }
859
860 /// Remove a query by its [`FullCacheKey`] if it has no dependents.
861 ///
862 /// This is the type-erased version of [`remove_query_if_unused`](Self::remove_query_if_unused).
863 /// Use this when you have a `FullCacheKey` from [`query_keys`](Self::query_keys)
864 /// or [`Tracer::on_query_key`].
865 ///
866 /// Returns `true` if the query was removed, `false` if it has dependents
867 /// or doesn't exist.
868 ///
869 /// # Example
870 ///
871 /// ```ignore
872 /// // Implement LRU GC
873 /// for key in runtime.query_keys() {
874 /// if tracker.is_expired(&key) {
875 /// runtime.remove_if_unused(&key);
876 /// }
877 /// }
878 /// ```
879 pub fn remove_if_unused(&self, key: &FullCacheKey) -> bool {
880 if self.whale.remove_if_unused(key.clone()).is_some() {
881 // Successfully removed - clean up verifier
882 self.verifiers.remove(key);
883 true
884 } else {
885 false
886 }
887 }
888}
889
890// ============================================================================
891// Builder
892// ============================================================================
893
894/// Builder for [`QueryRuntime`] with customizable settings.
895///
896/// # Example
897///
898/// ```ignore
899/// let runtime = QueryRuntime::builder()
900/// .error_comparator(|a, b| {
901/// // Treat all errors of the same type as equal
902/// a.downcast_ref::<std::io::Error>().is_some()
903/// == b.downcast_ref::<std::io::Error>().is_some()
904/// })
905/// .build();
906/// ```
907pub struct QueryRuntimeBuilder<T: Tracer = NoopTracer> {
908 error_comparator: ErrorComparator,
909 tracer: T,
910}
911
912impl Default for QueryRuntimeBuilder<NoopTracer> {
913 fn default() -> Self {
914 Self::new()
915 }
916}
917
918impl QueryRuntimeBuilder<NoopTracer> {
919 /// Create a new builder with default settings.
920 pub fn new() -> Self {
921 Self {
922 error_comparator: default_error_comparator,
923 tracer: NoopTracer,
924 }
925 }
926}
927
928impl<T: Tracer> QueryRuntimeBuilder<T> {
929 /// Set the error comparator function for early cutoff optimization.
930 ///
931 /// When a query returns `QueryError::UserError`, this function is used
932 /// to compare it with the previously cached error. If they are equal,
933 /// downstream queries can skip recomputation (early cutoff).
934 ///
935 /// The default comparator returns `false` for all errors, meaning errors
936 /// are always considered different (conservative, always recomputes).
937 ///
938 /// # Example
939 ///
940 /// ```ignore
941 /// // Treat errors as equal if they have the same display message
942 /// let runtime = QueryRuntime::builder()
943 /// .error_comparator(|a, b| a.to_string() == b.to_string())
944 /// .build();
945 /// ```
946 pub fn error_comparator(mut self, f: ErrorComparator) -> Self {
947 self.error_comparator = f;
948 self
949 }
950
951 /// Set the tracer for observability.
952 pub fn tracer<U: Tracer>(self, tracer: U) -> QueryRuntimeBuilder<U> {
953 QueryRuntimeBuilder {
954 error_comparator: self.error_comparator,
955 tracer,
956 }
957 }
958
959 /// Build the runtime with the configured settings.
960 pub fn build(self) -> QueryRuntime<T> {
961 QueryRuntime {
962 whale: WhaleRuntime::new(),
963 locators: Arc::new(LocatorStorage::new()),
964 pending: Arc::new(PendingStorage::new()),
965 query_registry: Arc::new(QueryRegistry::new()),
966 asset_key_registry: Arc::new(AssetKeyRegistry::new()),
967 verifiers: Arc::new(VerifierStorage::new()),
968 error_comparator: self.error_comparator,
969 tracer: Arc::new(self.tracer),
970 }
971 }
972}
973
974// ============================================================================
975// Asset API
976// ============================================================================
977
978impl<T: Tracer> QueryRuntime<T> {
979 /// Register an asset locator for a specific asset key type.
980 ///
981 /// Only one locator can be registered per key type. Later registrations
982 /// replace earlier ones.
983 ///
984 /// # Example
985 ///
986 /// ```ignore
987 /// let runtime = QueryRuntime::new();
988 /// runtime.register_asset_locator(FileSystemLocator::new("/assets"));
989 /// ```
990 pub fn register_asset_locator<K, L>(&self, locator: L)
991 where
992 K: AssetKey,
993 L: AssetLocator<K>,
994 {
995 self.locators.insert::<K, L>(locator);
996 }
997
998 /// Get an iterator over pending asset requests.
999 ///
1000 /// Returns assets that have been requested but not yet resolved.
1001 /// The user should fetch these externally and call `resolve_asset()`.
1002 ///
1003 /// # Example
1004 ///
1005 /// ```ignore
1006 /// for pending in runtime.pending_assets() {
1007 /// if let Some(path) = pending.key::<FilePath>() {
1008 /// let content = fetch_file(path);
1009 /// runtime.resolve_asset(path.clone(), content);
1010 /// }
1011 /// }
1012 /// ```
1013 pub fn pending_assets(&self) -> Vec<PendingAsset> {
1014 self.pending.get_all()
1015 }
1016
1017 /// Get pending assets filtered by key type.
1018 pub fn pending_assets_of<K: AssetKey>(&self) -> Vec<K> {
1019 self.pending.get_of_type::<K>()
1020 }
1021
1022 /// Check if there are any pending assets.
1023 pub fn has_pending_assets(&self) -> bool {
1024 !self.pending.is_empty()
1025 }
1026
1027 /// Resolve an asset with its loaded value.
1028 ///
1029 /// This marks the asset as ready and invalidates any queries that
1030 /// depend on it (if the value changed), triggering recomputation on next access.
1031 ///
1032 /// This method is idempotent - resolving with the same value (via `asset_eq`)
1033 /// will not trigger downstream recomputation.
1034 ///
1035 /// # Arguments
1036 ///
1037 /// * `key` - The asset key identifying this resource
1038 /// * `value` - The loaded asset value
1039 /// * `durability` - How frequently this asset is expected to change
1040 ///
1041 /// # Example
1042 ///
1043 /// ```ignore
1044 /// let content = std::fs::read_to_string(&path)?;
1045 /// runtime.resolve_asset(FilePath(path), content, DurabilityLevel::Volatile);
1046 /// ```
1047 pub fn resolve_asset<K: AssetKey>(&self, key: K, value: K::Asset, durability: DurabilityLevel) {
1048 self.resolve_asset_internal(key, value, durability);
1049 }
1050
1051 /// Resolve an asset with an error.
1052 ///
1053 /// This marks the asset as errored and caches the error. Queries depending
1054 /// on this asset will receive `Err(QueryError::UserError(...))`.
1055 ///
1056 /// Use this when async loading fails (e.g., network error, file not found,
1057 /// access denied).
1058 ///
1059 /// # Arguments
1060 ///
1061 /// * `key` - The asset key identifying this resource
1062 /// * `error` - The error to cache (will be wrapped in `Arc`)
1063 /// * `durability` - How frequently this error state is expected to change
1064 ///
1065 /// # Example
1066 ///
1067 /// ```ignore
1068 /// match fetch_file(&path) {
1069 /// Ok(content) => runtime.resolve_asset(FilePath(path), content, DurabilityLevel::Volatile),
1070 /// Err(e) => runtime.resolve_asset_error(FilePath(path), e, DurabilityLevel::Volatile),
1071 /// }
1072 /// ```
1073 pub fn resolve_asset_error<K: AssetKey>(
1074 &self,
1075 key: K,
1076 error: impl Into<anyhow::Error>,
1077 durability: DurabilityLevel,
1078 ) {
1079 let asset_cache_key = AssetCacheKey::new(key.clone());
1080
1081 // Remove from pending BEFORE registering the error
1082 self.pending.remove(&asset_cache_key);
1083
1084 // Prepare the error entry
1085 let error_arc = Arc::new(error.into());
1086 let entry = CachedEntry::AssetError(error_arc.clone());
1087 let durability =
1088 Durability::new(durability.as_u8() as usize).unwrap_or(Durability::volatile());
1089
1090 // Atomic compare-and-update (errors are always considered changed for now)
1091 let result = self
1092 .whale
1093 .update_with_compare(
1094 asset_cache_key.into(),
1095 Some(entry),
1096 |old_data, _new_data| {
1097 // Compare old and new errors using error_comparator
1098 match old_data.and_then(|d| d.as_ref()) {
1099 Some(CachedEntry::AssetError(old_err)) => {
1100 !(self.error_comparator)(old_err.as_ref(), error_arc.as_ref())
1101 }
1102 _ => true, // Loading, Ready, or not present -> changed
1103 }
1104 },
1105 durability,
1106 vec![],
1107 )
1108 .expect("update_with_compare with no dependencies cannot fail");
1109
1110 // Emit asset resolved event (with changed status)
1111 let asset_cache_key = AssetCacheKey::new(key.clone());
1112 self.tracer
1113 .on_asset_resolved(&asset_cache_key, result.changed);
1114
1115 // Register asset key in registry for list_asset_keys
1116 let is_new_asset = self.asset_key_registry.register(&key);
1117 if is_new_asset {
1118 // Update sentinel to invalidate list_asset_keys dependents
1119 let sentinel = AssetKeySetSentinelKey::new::<K>().into();
1120 let _ = self
1121 .whale
1122 .register(sentinel, None, Durability::stable(), vec![]);
1123 }
1124 }
1125
1126 fn resolve_asset_internal<K: AssetKey>(
1127 &self,
1128 key: K,
1129 value: K::Asset,
1130 durability_level: DurabilityLevel,
1131 ) {
1132 let asset_cache_key = AssetCacheKey::new(key.clone());
1133
1134 // Remove from pending BEFORE registering the value
1135 self.pending.remove(&asset_cache_key);
1136
1137 // Prepare the new entry
1138 let value_arc: Arc<K::Asset> = Arc::new(value);
1139 let entry = CachedEntry::AssetReady(value_arc.clone() as Arc<dyn Any + Send + Sync>);
1140 let durability =
1141 Durability::new(durability_level.as_u8() as usize).unwrap_or(Durability::volatile());
1142
1143 // Atomic compare-and-update
1144 let result = self
1145 .whale
1146 .update_with_compare(
1147 asset_cache_key.into(),
1148 Some(entry),
1149 |old_data, _new_data| {
1150 // Compare old and new values
1151 match old_data.and_then(|d| d.as_ref()) {
1152 Some(CachedEntry::AssetReady(old_arc)) => {
1153 match old_arc.clone().downcast::<K::Asset>() {
1154 Ok(old_value) => !K::asset_eq(&old_value, &value_arc),
1155 Err(_) => true, // Type mismatch, treat as changed
1156 }
1157 }
1158 _ => true, // Loading, NotFound, or not present -> changed
1159 }
1160 },
1161 durability,
1162 vec![],
1163 )
1164 .expect("update_with_compare with no dependencies cannot fail");
1165
1166 // Emit asset resolved event
1167 let asset_cache_key = AssetCacheKey::new(key.clone());
1168 self.tracer
1169 .on_asset_resolved(&asset_cache_key, result.changed);
1170
1171 // Register asset key in registry for list_asset_keys
1172 let is_new_asset = self.asset_key_registry.register(&key);
1173 if is_new_asset {
1174 // Update sentinel to invalidate list_asset_keys dependents
1175 let sentinel = AssetKeySetSentinelKey::new::<K>().into();
1176 let _ = self
1177 .whale
1178 .register(sentinel, None, Durability::stable(), vec![]);
1179 }
1180 }
1181
1182 /// Invalidate an asset, forcing queries to re-request it.
1183 ///
1184 /// The asset will be marked as loading and added to pending assets.
1185 /// Dependent queries will suspend until the asset is resolved again.
1186 ///
1187 /// # Example
1188 ///
1189 /// ```ignore
1190 /// // File was modified externally
1191 /// runtime.invalidate_asset(&FilePath("config.json".into()));
1192 /// // Queries depending on this asset will now suspend
1193 /// // User should fetch the new value and call resolve_asset
1194 /// ```
1195 pub fn invalidate_asset<K: AssetKey>(&self, key: &K) {
1196 let asset_cache_key = AssetCacheKey::new(key.clone());
1197 let full_cache_key: FullCacheKey = asset_cache_key.clone().into();
1198
1199 // Emit asset invalidated event
1200 self.tracer.on_asset_invalidated(&asset_cache_key);
1201
1202 // Add to pending FIRST (before clearing whale state)
1203 // This ensures: readers see either old value, or Loading+pending
1204 self.pending
1205 .insert::<K>(asset_cache_key.clone(), key.clone());
1206
1207 // Atomic: clear cached value + invalidate dependents
1208 // Using None for data means "needs to be loaded"
1209 // Use stable durability to ensure queries at any durability level see the change.
1210 let _ = self
1211 .whale
1212 .register(full_cache_key, None, Durability::stable(), vec![]);
1213 }
1214
1215 /// Remove an asset from the cache entirely.
1216 ///
1217 /// Unlike `invalidate_asset`, this removes all traces of the asset.
1218 /// Dependent queries will go through the locator again on next access.
1219 pub fn remove_asset<K: AssetKey>(&self, key: &K) {
1220 let asset_cache_key = AssetCacheKey::new(key.clone());
1221 let full_cache_key: FullCacheKey = asset_cache_key.clone().into();
1222
1223 // Remove from pending first
1224 self.pending.remove(&asset_cache_key);
1225
1226 // Remove from whale (this also cleans up dependency edges)
1227 // whale.remove() invalidates dependents before removing
1228 self.whale.remove(&full_cache_key);
1229
1230 // Remove from registry and update sentinel for list_asset_keys
1231 if self.asset_key_registry.remove::<K>(key) {
1232 let sentinel = AssetKeySetSentinelKey::new::<K>().into();
1233 let _ = self
1234 .whale
1235 .register(sentinel, None, Durability::stable(), vec![]);
1236 }
1237 }
1238
1239 /// Get an asset by key without tracking dependencies.
1240 ///
1241 /// Unlike `QueryContext::asset()`, this method does NOT register the caller
1242 /// as a dependent of the asset. Use this for direct asset access outside
1243 /// of query execution.
1244 ///
1245 /// # Returns
1246 ///
1247 /// - `Ok(AssetLoadingState::ready(...))` - Asset is loaded and ready
1248 /// - `Ok(AssetLoadingState::loading(...))` - Asset is still loading (added to pending)
1249 /// - `Err(QueryError::UserError)` - Asset was not found or locator returned an error
1250 pub fn get_asset<K: AssetKey>(&self, key: K) -> Result<AssetLoadingState<K>, QueryError> {
1251 self.get_asset_internal(key)
1252 }
1253
1254 /// Internal: Get asset state and its changed_at revision atomically.
1255 ///
1256 /// This is the "direct" version called from QueryRuntime::asset (no dependency tracking).
1257 /// For calls from QueryContext::asset, use `get_asset_with_revision_ctx`.
1258 ///
1259 /// Returns (AssetLoadingState, changed_at) where changed_at is from the same
1260 /// whale node that provided the asset value.
1261 fn get_asset_with_revision<K: AssetKey>(
1262 &self,
1263 key: K,
1264 ) -> Result<(AssetLoadingState<K>, RevisionCounter), QueryError> {
1265 let asset_cache_key = AssetCacheKey::new(key.clone());
1266 let full_cache_key: FullCacheKey = asset_cache_key.clone().into();
1267
1268 // Create a span for this asset request (like queries do)
1269 // This ensures child queries called from locators show as children of this asset
1270 let asset_span_id = self.tracer.new_span_id();
1271 let (trace_id, parent_span_id) = SPAN_STACK.with(|stack| match &*stack.borrow() {
1272 SpanStack::Empty => (self.tracer.new_trace_id(), None),
1273 SpanStack::Active(tid, spans) => (*tid, spans.last().copied()),
1274 });
1275 let span_ctx = SpanContext {
1276 span_id: asset_span_id,
1277 trace_id,
1278 parent_span_id,
1279 };
1280
1281 // Push asset span to stack so child queries see this asset as their parent
1282 let _span_guard = SpanStackGuard::push(trace_id, asset_span_id);
1283
1284 // Check whale cache first (single atomic read)
1285 if let Some(node) = self.whale.get(&full_cache_key) {
1286 let changed_at = node.changed_at;
1287 // Check if valid at current revision (shallow check)
1288 if self.whale.is_valid(&full_cache_key) {
1289 // Verify dependencies recursively (like query path does)
1290 let mut deps_verified = true;
1291 if let Some(deps) = self.whale.get_dependency_ids(&full_cache_key) {
1292 for dep in deps {
1293 if let Some(verifier) = self.verifiers.get(&dep) {
1294 // Re-run query/asset to verify it (triggers recursive verification)
1295 if verifier.verify(self as &dyn std::any::Any).is_err() {
1296 deps_verified = false;
1297 break;
1298 }
1299 }
1300 }
1301 }
1302
1303 // Re-check validity after deps are verified
1304 if deps_verified && self.whale.is_valid(&full_cache_key) {
1305 // For cached entries, check consistency for leaf assets (no locator deps).
1306 // This detects if resolve_asset/resolve_asset_error was called during query execution.
1307 let has_locator_deps = self
1308 .whale
1309 .get_dependency_ids(&full_cache_key)
1310 .is_some_and(|deps| !deps.is_empty());
1311
1312 match &node.data {
1313 Some(CachedEntry::AssetReady(arc)) => {
1314 // Check consistency for cached leaf assets
1315 if !has_locator_deps {
1316 check_leaf_asset_consistency(changed_at)?;
1317 }
1318 // Cache hit: start + end immediately (no locator runs)
1319 self.tracer.on_asset_requested(&span_ctx, &asset_cache_key);
1320 self.tracer.on_asset_located(
1321 &span_ctx,
1322 &asset_cache_key,
1323 TracerAssetState::Ready,
1324 );
1325 match arc.clone().downcast::<K::Asset>() {
1326 Ok(value) => {
1327 return Ok((AssetLoadingState::ready(key, value), changed_at))
1328 }
1329 Err(_) => {
1330 unreachable!("Asset type mismatch: {:?}", key)
1331 }
1332 }
1333 }
1334 Some(CachedEntry::AssetError(err)) => {
1335 // Check consistency for cached leaf errors
1336 if !has_locator_deps {
1337 check_leaf_asset_consistency(changed_at)?;
1338 }
1339 // Cache hit: start + end immediately (no locator runs)
1340 self.tracer.on_asset_requested(&span_ctx, &asset_cache_key);
1341 self.tracer.on_asset_located(
1342 &span_ctx,
1343 &asset_cache_key,
1344 TracerAssetState::NotFound,
1345 );
1346 return Err(QueryError::UserError(err.clone()));
1347 }
1348 None => {
1349 // Loading state - no value to be inconsistent with
1350 // Cache hit: start + end immediately (no locator runs)
1351 self.tracer.on_asset_requested(&span_ctx, &asset_cache_key);
1352 self.tracer.on_asset_located(
1353 &span_ctx,
1354 &asset_cache_key,
1355 TracerAssetState::Loading,
1356 );
1357 return Ok((AssetLoadingState::loading(key), changed_at));
1358 }
1359 _ => {
1360 // Query-related entries (Ok, UserError) shouldn't be here
1361 // Fall through to locator
1362 }
1363 }
1364 }
1365 }
1366 }
1367
1368 // Not in cache or invalid - try locator
1369 // Use LocatorContext to track deps on the asset itself
1370 check_cycle(&full_cache_key)?;
1371 let _guard = StackGuard::push(full_cache_key.clone());
1372
1373 // Notify tracer BEFORE locator runs (START event) so child queries appear as children
1374 self.tracer.on_asset_requested(&span_ctx, &asset_cache_key);
1375
1376 let locator_ctx = LocatorContext::new(self, full_cache_key.clone());
1377 let locator_result =
1378 self.locators
1379 .locate_with_locator_ctx(TypeId::of::<K>(), &locator_ctx, &key);
1380
1381 if let Some(result) = locator_result {
1382 // Get collected dependencies from the locator context
1383 let locator_deps = locator_ctx.into_deps();
1384 match result {
1385 Ok(ErasedLocateResult::Ready {
1386 value: arc,
1387 durability: durability_level,
1388 }) => {
1389 // END event after locator completes
1390 self.tracer.on_asset_located(
1391 &span_ctx,
1392 &asset_cache_key,
1393 TracerAssetState::Ready,
1394 );
1395
1396 let typed_value: Arc<K::Asset> = match arc.downcast::<K::Asset>() {
1397 Ok(v) => v,
1398 Err(_) => {
1399 unreachable!("Asset type mismatch: {:?}", key);
1400 }
1401 };
1402
1403 // Store in whale atomically with early cutoff
1404 // Include locator's dependencies so the asset is invalidated when they change
1405 let entry = CachedEntry::AssetReady(typed_value.clone());
1406 let durability = Durability::new(durability_level.as_u8() as usize)
1407 .unwrap_or(Durability::volatile());
1408 let new_value = typed_value.clone();
1409 let result = self
1410 .whale
1411 .update_with_compare(
1412 full_cache_key.clone(),
1413 Some(entry),
1414 |old_data, _new_data| {
1415 let Some(CachedEntry::AssetReady(old_arc)) =
1416 old_data.and_then(|d| d.as_ref())
1417 else {
1418 return true;
1419 };
1420 let Ok(old_value) = old_arc.clone().downcast::<K::Asset>() else {
1421 return true;
1422 };
1423 !K::asset_eq(&old_value, &new_value)
1424 },
1425 durability,
1426 locator_deps,
1427 )
1428 .expect("update_with_compare should succeed");
1429
1430 // Register verifier for this asset (for verify-then-decide pattern)
1431 self.verifiers
1432 .insert_asset::<K, T>(full_cache_key, key.clone());
1433
1434 return Ok((AssetLoadingState::ready(key, typed_value), result.revision));
1435 }
1436 Ok(ErasedLocateResult::Pending) => {
1437 // END event after locator completes with pending
1438 self.tracer.on_asset_located(
1439 &span_ctx,
1440 &asset_cache_key,
1441 TracerAssetState::Loading,
1442 );
1443
1444 // Add to pending list for Pending result
1445 self.pending
1446 .insert::<K>(asset_cache_key.clone(), key.clone());
1447 match self
1448 .whale
1449 .get_or_insert(full_cache_key, None, Durability::volatile(), locator_deps)
1450 .expect("get_or_insert should succeed")
1451 {
1452 GetOrInsertResult::Inserted(node) => {
1453 return Ok((AssetLoadingState::loading(key), node.changed_at));
1454 }
1455 GetOrInsertResult::Existing(node) => {
1456 let changed_at = node.changed_at;
1457 match &node.data {
1458 Some(CachedEntry::AssetReady(arc)) => {
1459 match arc.clone().downcast::<K::Asset>() {
1460 Ok(value) => {
1461 return Ok((
1462 AssetLoadingState::ready(key, value),
1463 changed_at,
1464 ))
1465 }
1466 Err(_) => {
1467 return Ok((
1468 AssetLoadingState::loading(key),
1469 changed_at,
1470 ))
1471 }
1472 }
1473 }
1474 Some(CachedEntry::AssetError(err)) => {
1475 return Err(QueryError::UserError(err.clone()));
1476 }
1477 _ => return Ok((AssetLoadingState::loading(key), changed_at)),
1478 }
1479 }
1480 }
1481 }
1482 Err(QueryError::UserError(err)) => {
1483 // END event after locator completes with error
1484 self.tracer.on_asset_located(
1485 &span_ctx,
1486 &asset_cache_key,
1487 TracerAssetState::NotFound,
1488 );
1489 // Locator returned a user error - cache it as AssetError
1490 let entry = CachedEntry::AssetError(err.clone());
1491 let _ = self.whale.register(
1492 full_cache_key,
1493 Some(entry),
1494 Durability::volatile(),
1495 locator_deps,
1496 );
1497 return Err(QueryError::UserError(err));
1498 }
1499 Err(e) => {
1500 // Other errors (Cycle, Suspended, etc.) - do NOT cache, propagate directly
1501 return Err(e);
1502 }
1503 }
1504 }
1505
1506 // No locator registered or locator returned None - mark as pending
1507 // (no locator was called, so no deps to track)
1508 // END event - no locator ran
1509 self.tracer
1510 .on_asset_located(&span_ctx, &asset_cache_key, TracerAssetState::Loading);
1511 self.pending
1512 .insert::<K>(asset_cache_key.clone(), key.clone());
1513
1514 match self
1515 .whale
1516 .get_or_insert(full_cache_key, None, Durability::volatile(), vec![])
1517 .expect("get_or_insert with no dependencies cannot fail")
1518 {
1519 GetOrInsertResult::Inserted(node) => {
1520 Ok((AssetLoadingState::loading(key), node.changed_at))
1521 }
1522 GetOrInsertResult::Existing(node) => {
1523 let changed_at = node.changed_at;
1524 match &node.data {
1525 Some(CachedEntry::AssetReady(arc)) => {
1526 match arc.clone().downcast::<K::Asset>() {
1527 Ok(value) => Ok((AssetLoadingState::ready(key, value), changed_at)),
1528 Err(_) => Ok((AssetLoadingState::loading(key), changed_at)),
1529 }
1530 }
1531 Some(CachedEntry::AssetError(err)) => Err(QueryError::UserError(err.clone())),
1532 _ => Ok((AssetLoadingState::loading(key), changed_at)),
1533 }
1534 }
1535 }
1536 }
1537
1538 /// Internal: Get asset state and its changed_at revision atomically (with QueryContext).
1539 ///
1540 /// This version is called from QueryContext::asset. Consistency checking for
1541 /// cached leaf assets is done inside this function before returning.
1542 fn get_asset_with_revision_ctx<K: AssetKey>(
1543 &self,
1544 key: K,
1545 _ctx: &QueryContext<'_, T>,
1546 ) -> Result<(AssetLoadingState<K>, RevisionCounter), QueryError> {
1547 let asset_cache_key = AssetCacheKey::new(key.clone());
1548 let full_cache_key: FullCacheKey = asset_cache_key.clone().into();
1549
1550 // Create a span for this asset request (like queries do)
1551 // This ensures child queries called from locators show as children of this asset
1552 let asset_span_id = self.tracer.new_span_id();
1553 let (trace_id, parent_span_id) = SPAN_STACK.with(|stack| match &*stack.borrow() {
1554 SpanStack::Empty => (self.tracer.new_trace_id(), None),
1555 SpanStack::Active(tid, spans) => (*tid, spans.last().copied()),
1556 });
1557 let span_ctx = SpanContext {
1558 span_id: asset_span_id,
1559 trace_id,
1560 parent_span_id,
1561 };
1562
1563 // Push asset span to stack so child queries see this asset as their parent
1564 let _span_guard = SpanStackGuard::push(trace_id, asset_span_id);
1565
1566 // Check whale cache first (single atomic read)
1567 if let Some(node) = self.whale.get(&full_cache_key) {
1568 let changed_at = node.changed_at;
1569 // Check if valid at current revision (shallow check)
1570 if self.whale.is_valid(&full_cache_key) {
1571 // Verify dependencies recursively (like query path does)
1572 let mut deps_verified = true;
1573 if let Some(deps) = self.whale.get_dependency_ids(&full_cache_key) {
1574 for dep in deps {
1575 if let Some(verifier) = self.verifiers.get(&dep) {
1576 // Re-run query/asset to verify it (triggers recursive verification)
1577 if verifier.verify(self as &dyn std::any::Any).is_err() {
1578 deps_verified = false;
1579 break;
1580 }
1581 }
1582 }
1583 }
1584
1585 // Re-check validity after deps are verified
1586 if deps_verified && self.whale.is_valid(&full_cache_key) {
1587 // For cached entries, check consistency for leaf assets (no locator deps).
1588 // This detects if resolve_asset/resolve_asset_error was called during query execution.
1589 let has_locator_deps = self
1590 .whale
1591 .get_dependency_ids(&full_cache_key)
1592 .is_some_and(|deps| !deps.is_empty());
1593
1594 match &node.data {
1595 Some(CachedEntry::AssetReady(arc)) => {
1596 // Check consistency for cached leaf assets
1597 if !has_locator_deps {
1598 check_leaf_asset_consistency(changed_at)?;
1599 }
1600 // Cache hit: start + end immediately (no locator runs)
1601 self.tracer.on_asset_requested(&span_ctx, &asset_cache_key);
1602 self.tracer.on_asset_located(
1603 &span_ctx,
1604 &asset_cache_key,
1605 TracerAssetState::Ready,
1606 );
1607 match arc.clone().downcast::<K::Asset>() {
1608 Ok(value) => {
1609 return Ok((AssetLoadingState::ready(key, value), changed_at))
1610 }
1611 Err(_) => {
1612 unreachable!("Asset type mismatch: {:?}", key)
1613 }
1614 }
1615 }
1616 Some(CachedEntry::AssetError(err)) => {
1617 // Check consistency for cached leaf errors
1618 if !has_locator_deps {
1619 check_leaf_asset_consistency(changed_at)?;
1620 }
1621 // Cache hit: start + end immediately (no locator runs)
1622 self.tracer.on_asset_requested(&span_ctx, &asset_cache_key);
1623 self.tracer.on_asset_located(
1624 &span_ctx,
1625 &asset_cache_key,
1626 TracerAssetState::NotFound,
1627 );
1628 return Err(QueryError::UserError(err.clone()));
1629 }
1630 None => {
1631 // Loading state - no value to be inconsistent with
1632 // Cache hit: start + end immediately (no locator runs)
1633 self.tracer.on_asset_requested(&span_ctx, &asset_cache_key);
1634 self.tracer.on_asset_located(
1635 &span_ctx,
1636 &asset_cache_key,
1637 TracerAssetState::Loading,
1638 );
1639 return Ok((AssetLoadingState::loading(key), changed_at));
1640 }
1641 _ => {
1642 // Query-related entries (Ok, UserError) shouldn't be here
1643 // Fall through to locator
1644 }
1645 }
1646 }
1647 }
1648 }
1649
1650 // Not in cache or invalid - try locator
1651 // Use LocatorContext to track deps on the asset itself (not the calling query)
1652 // Consistency tracking is handled via thread-local storage
1653 check_cycle(&full_cache_key)?;
1654 let _guard = StackGuard::push(full_cache_key.clone());
1655
1656 // START event before locator runs
1657 self.tracer.on_asset_requested(&span_ctx, &asset_cache_key);
1658
1659 let locator_ctx = LocatorContext::new(self, full_cache_key.clone());
1660 let locator_result =
1661 self.locators
1662 .locate_with_locator_ctx(TypeId::of::<K>(), &locator_ctx, &key);
1663
1664 if let Some(result) = locator_result {
1665 // Get collected dependencies from the locator context
1666 let locator_deps = locator_ctx.into_deps();
1667 match result {
1668 Ok(ErasedLocateResult::Ready {
1669 value: arc,
1670 durability: durability_level,
1671 }) => {
1672 // END event after locator completes
1673 self.tracer.on_asset_located(
1674 &span_ctx,
1675 &asset_cache_key,
1676 TracerAssetState::Ready,
1677 );
1678
1679 let typed_value: Arc<K::Asset> = match arc.downcast::<K::Asset>() {
1680 Ok(v) => v,
1681 Err(_) => {
1682 unreachable!("Asset type mismatch: {:?}", key);
1683 }
1684 };
1685
1686 // Store in whale atomically with early cutoff
1687 // Include locator's dependencies so the asset is invalidated when they change
1688 let entry = CachedEntry::AssetReady(typed_value.clone());
1689 let durability = Durability::new(durability_level.as_u8() as usize)
1690 .unwrap_or(Durability::volatile());
1691 let new_value = typed_value.clone();
1692 let result = self
1693 .whale
1694 .update_with_compare(
1695 full_cache_key.clone(),
1696 Some(entry),
1697 |old_data, _new_data| {
1698 let Some(CachedEntry::AssetReady(old_arc)) =
1699 old_data.and_then(|d| d.as_ref())
1700 else {
1701 return true;
1702 };
1703 let Ok(old_value) = old_arc.clone().downcast::<K::Asset>() else {
1704 return true;
1705 };
1706 !K::asset_eq(&old_value, &new_value)
1707 },
1708 durability,
1709 locator_deps,
1710 )
1711 .expect("update_with_compare should succeed");
1712
1713 // Register verifier for this asset (for verify-then-decide pattern)
1714 self.verifiers
1715 .insert_asset::<K, T>(full_cache_key, key.clone());
1716
1717 return Ok((AssetLoadingState::ready(key, typed_value), result.revision));
1718 }
1719 Ok(ErasedLocateResult::Pending) => {
1720 // END event after locator completes with pending
1721 self.tracer.on_asset_located(
1722 &span_ctx,
1723 &asset_cache_key,
1724 TracerAssetState::Loading,
1725 );
1726
1727 // Add to pending list for Pending result
1728 self.pending
1729 .insert::<K>(asset_cache_key.clone(), key.clone());
1730 match self
1731 .whale
1732 .get_or_insert(full_cache_key, None, Durability::volatile(), locator_deps)
1733 .expect("get_or_insert should succeed")
1734 {
1735 GetOrInsertResult::Inserted(node) => {
1736 return Ok((AssetLoadingState::loading(key), node.changed_at));
1737 }
1738 GetOrInsertResult::Existing(node) => {
1739 let changed_at = node.changed_at;
1740 match &node.data {
1741 Some(CachedEntry::AssetReady(arc)) => {
1742 match arc.clone().downcast::<K::Asset>() {
1743 Ok(value) => {
1744 return Ok((
1745 AssetLoadingState::ready(key, value),
1746 changed_at,
1747 ));
1748 }
1749 Err(_) => {
1750 return Ok((
1751 AssetLoadingState::loading(key),
1752 changed_at,
1753 ))
1754 }
1755 }
1756 }
1757 Some(CachedEntry::AssetError(err)) => {
1758 return Err(QueryError::UserError(err.clone()));
1759 }
1760 _ => return Ok((AssetLoadingState::loading(key), changed_at)),
1761 }
1762 }
1763 }
1764 }
1765 Err(QueryError::UserError(err)) => {
1766 // END event after locator completes with error
1767 self.tracer.on_asset_located(
1768 &span_ctx,
1769 &asset_cache_key,
1770 TracerAssetState::NotFound,
1771 );
1772 // Locator returned a user error - cache it as AssetError
1773 let entry = CachedEntry::AssetError(err.clone());
1774 let _ = self.whale.register(
1775 full_cache_key,
1776 Some(entry),
1777 Durability::volatile(),
1778 locator_deps,
1779 );
1780 return Err(QueryError::UserError(err));
1781 }
1782 Err(e) => {
1783 // Other errors (Cycle, Suspended, etc.) - do NOT cache, propagate directly
1784 return Err(e);
1785 }
1786 }
1787 }
1788
1789 // No locator registered or locator returned None - mark as pending
1790 // END event - no locator ran
1791 self.tracer
1792 .on_asset_located(&span_ctx, &asset_cache_key, TracerAssetState::Loading);
1793 self.pending
1794 .insert::<K>(asset_cache_key.clone(), key.clone());
1795
1796 match self
1797 .whale
1798 .get_or_insert(full_cache_key, None, Durability::volatile(), vec![])
1799 .expect("get_or_insert with no dependencies cannot fail")
1800 {
1801 GetOrInsertResult::Inserted(node) => {
1802 Ok((AssetLoadingState::loading(key), node.changed_at))
1803 }
1804 GetOrInsertResult::Existing(node) => {
1805 let changed_at = node.changed_at;
1806 match &node.data {
1807 Some(CachedEntry::AssetReady(arc)) => {
1808 match arc.clone().downcast::<K::Asset>() {
1809 Ok(value) => Ok((AssetLoadingState::ready(key, value), changed_at)),
1810 Err(_) => Ok((AssetLoadingState::loading(key), changed_at)),
1811 }
1812 }
1813 Some(CachedEntry::AssetError(err)) => Err(QueryError::UserError(err.clone())),
1814 _ => Ok((AssetLoadingState::loading(key), changed_at)),
1815 }
1816 }
1817 }
1818 }
1819
1820 /// Internal: Get asset state, checking cache and locator.
1821 fn get_asset_internal<K: AssetKey>(&self, key: K) -> Result<AssetLoadingState<K>, QueryError> {
1822 self.get_asset_with_revision(key).map(|(state, _)| state)
1823 }
1824}
1825
1826impl<T: Tracer> Db for QueryRuntime<T> {
1827 fn query<Q: Query>(&self, query: Q) -> Result<Arc<Q::Output>, QueryError> {
1828 QueryRuntime::query(self, query)
1829 }
1830
1831 fn asset<K: AssetKey>(&self, key: K) -> Result<Arc<K::Asset>, QueryError> {
1832 self.get_asset_internal(key)?.suspend()
1833 }
1834
1835 fn asset_state<K: AssetKey>(&self, key: K) -> Result<AssetLoadingState<K>, QueryError> {
1836 self.get_asset_internal(key)
1837 }
1838
1839 fn list_queries<Q: Query>(&self) -> Vec<Q> {
1840 self.query_registry.get_all::<Q>()
1841 }
1842
1843 fn list_asset_keys<K: AssetKey>(&self) -> Vec<K> {
1844 self.asset_key_registry.get_all::<K>()
1845 }
1846}
1847
1848/// Tracks consistency of leaf asset accesses during query execution.
1849///
1850/// A "leaf" asset is one without dependencies (externally resolved via `resolve_asset`).
1851/// This tracker ensures that all leaf assets accessed during a query execution
1852/// (including those accessed by locators) are consistent - i.e., none were modified
1853/// via `resolve_asset` mid-execution.
1854///
1855/// The tracker is shared across `QueryContext` and `LocatorContext` to propagate
1856/// consistency checking through the entire execution tree.
1857#[derive(Debug)]
1858pub(crate) struct ConsistencyTracker {
1859 /// Global revision at query start. All leaf assets must have changed_at <= this.
1860 start_revision: RevisionCounter,
1861}
1862
1863impl ConsistencyTracker {
1864 /// Create a new tracker with the given start revision.
1865 pub fn new(start_revision: RevisionCounter) -> Self {
1866 Self { start_revision }
1867 }
1868
1869 /// Check consistency for a leaf asset access.
1870 ///
1871 /// A leaf asset is consistent if its changed_at <= start_revision.
1872 /// This detects if resolve_asset was called during query execution.
1873 ///
1874 /// Returns Ok(()) if consistent, Err if inconsistent.
1875 pub fn check_leaf_asset(&self, dep_changed_at: RevisionCounter) -> Result<(), QueryError> {
1876 if dep_changed_at > self.start_revision {
1877 Err(QueryError::InconsistentAssetResolution)
1878 } else {
1879 Ok(())
1880 }
1881 }
1882}
1883
1884/// Context provided to queries during execution.
1885///
1886/// Use this to access dependencies via `query()`.
1887pub(crate) struct QueryContext<'a, T: Tracer = NoopTracer> {
1888 runtime: &'a QueryRuntime<T>,
1889 current_key: FullCacheKey,
1890 exec_ctx: ExecutionContext,
1891 deps: RefCell<Vec<FullCacheKey>>,
1892}
1893
1894impl<'a, T: Tracer> QueryContext<'a, T> {
1895 /// Query a dependency.
1896 ///
1897 /// The dependency is automatically tracked for invalidation.
1898 ///
1899 /// # Example
1900 ///
1901 /// ```ignore
1902 /// fn query(self, db: &impl Db) -> Result<Self::Output, QueryError> {
1903 /// let dep_result = db.query(OtherQuery { id: self.id })?;
1904 /// Ok(process(&dep_result))
1905 /// }
1906 /// ```
1907 pub fn query<Q: Query>(&self, query: Q) -> Result<Arc<Q::Output>, QueryError> {
1908 let full_key: FullCacheKey = QueryCacheKey::new(query.clone()).into();
1909
1910 // Emit dependency registered event
1911 self.runtime.tracer.on_dependency_registered(
1912 self.exec_ctx.span_ctx(),
1913 &self.current_key,
1914 &full_key,
1915 );
1916
1917 // Record this as a dependency
1918 self.deps.borrow_mut().push(full_key);
1919
1920 // Execute the query
1921 self.runtime.query(query)
1922 }
1923
1924 /// Access an asset, tracking it as a dependency.
1925 ///
1926 /// Returns the asset value if ready, or `Err(QueryError::Suspend)` if still loading.
1927 /// Use this with the `?` operator for automatic suspension on loading.
1928 ///
1929 /// # Example
1930 ///
1931 /// ```ignore
1932 /// #[query]
1933 /// fn process_file(db: &impl Db, path: FilePath) -> Result<Output, QueryError> {
1934 /// let content = db.asset(path)?;
1935 /// // Process content...
1936 /// Ok(output)
1937 /// }
1938 /// ```
1939 ///
1940 /// # Errors
1941 ///
1942 /// - Returns `Err(QueryError::Suspend)` if the asset is still loading.
1943 /// - Returns `Err(QueryError::UserError)` if the asset was not found.
1944 pub fn asset<K: AssetKey>(&self, key: K) -> Result<Arc<K::Asset>, QueryError> {
1945 self.asset_state(key)?.suspend()
1946 }
1947
1948 /// Access an asset's loading state, tracking it as a dependency.
1949 ///
1950 /// Unlike [`asset()`](Self::asset), this method returns the full loading state,
1951 /// allowing you to check if an asset is loading without triggering suspension.
1952 ///
1953 /// # Example
1954 ///
1955 /// ```ignore
1956 /// let state = db.asset_state(key)?;
1957 /// if state.is_loading() {
1958 /// // Handle loading case explicitly
1959 /// } else {
1960 /// let value = state.get().unwrap();
1961 /// }
1962 /// ```
1963 ///
1964 /// # Errors
1965 ///
1966 /// Returns `Err(QueryError::UserError)` if the asset was not found.
1967 pub fn asset_state<K: AssetKey>(&self, key: K) -> Result<AssetLoadingState<K>, QueryError> {
1968 let full_cache_key: FullCacheKey = AssetCacheKey::new(key.clone()).into();
1969
1970 // 1. Emit asset dependency registered event
1971 self.runtime.tracer.on_asset_dependency_registered(
1972 self.exec_ctx.span_ctx(),
1973 &self.current_key,
1974 &full_cache_key,
1975 );
1976
1977 // 2. Record dependency on this asset
1978 self.deps.borrow_mut().push(full_cache_key);
1979
1980 // 3. Get asset from cache/locator
1981 // Consistency checking for cached leaf assets is done inside get_asset_with_revision_ctx
1982 let (state, _changed_at) = self.runtime.get_asset_with_revision_ctx(key, self)?;
1983
1984 Ok(state)
1985 }
1986
1987 /// List all query instances of type Q that have been registered.
1988 ///
1989 /// This method establishes a dependency on the "set" of queries of type Q.
1990 /// The calling query will be invalidated when:
1991 /// - A new query of type Q is first executed (added to set)
1992 ///
1993 /// The calling query will NOT be invalidated when:
1994 /// - An individual query of type Q has its value change
1995 ///
1996 /// # Example
1997 ///
1998 /// ```ignore
1999 /// #[query]
2000 /// fn all_results(db: &impl Db) -> Result<Vec<i32>, QueryError> {
2001 /// let queries = db.list_queries::<MyQuery>();
2002 /// let mut results = Vec::new();
2003 /// for q in queries {
2004 /// results.push(*db.query(q)?);
2005 /// }
2006 /// Ok(results)
2007 /// }
2008 /// ```
2009 pub fn list_queries<Q: Query>(&self) -> Vec<Q> {
2010 // Record dependency on the sentinel (set-level dependency)
2011 let sentinel: FullCacheKey = QuerySetSentinelKey::new::<Q>().into();
2012
2013 self.runtime.tracer.on_dependency_registered(
2014 self.exec_ctx.span_ctx(),
2015 &self.current_key,
2016 &sentinel,
2017 );
2018
2019 // Ensure sentinel exists in whale (for dependency tracking)
2020 if self.runtime.whale.get(&sentinel).is_none() {
2021 let _ =
2022 self.runtime
2023 .whale
2024 .register(sentinel.clone(), None, Durability::volatile(), vec![]);
2025 }
2026
2027 self.deps.borrow_mut().push(sentinel);
2028
2029 // Return all registered queries
2030 self.runtime.query_registry.get_all::<Q>()
2031 }
2032
2033 /// List all asset keys of type K that have been registered.
2034 ///
2035 /// This method establishes a dependency on the "set" of asset keys of type K.
2036 /// The calling query will be invalidated when:
2037 /// - A new asset of type K is resolved for the first time (added to set)
2038 /// - An asset of type K is removed via remove_asset
2039 ///
2040 /// The calling query will NOT be invalidated when:
2041 /// - An individual asset's value changes (use `db.asset()` for that)
2042 ///
2043 /// # Example
2044 ///
2045 /// ```ignore
2046 /// #[query]
2047 /// fn all_configs(db: &impl Db) -> Result<Vec<String>, QueryError> {
2048 /// let keys = db.list_asset_keys::<ConfigFile>();
2049 /// let mut contents = Vec::new();
2050 /// for key in keys {
2051 /// let content = db.asset(key)?;
2052 /// contents.push((*content).clone());
2053 /// }
2054 /// Ok(contents)
2055 /// }
2056 /// ```
2057 pub fn list_asset_keys<K: AssetKey>(&self) -> Vec<K> {
2058 // Record dependency on the sentinel (set-level dependency)
2059 let sentinel: FullCacheKey = AssetKeySetSentinelKey::new::<K>().into();
2060
2061 self.runtime.tracer.on_asset_dependency_registered(
2062 self.exec_ctx.span_ctx(),
2063 &self.current_key,
2064 &sentinel,
2065 );
2066
2067 // Ensure sentinel exists in whale (for dependency tracking)
2068 if self.runtime.whale.get(&sentinel).is_none() {
2069 let _ =
2070 self.runtime
2071 .whale
2072 .register(sentinel.clone(), None, Durability::volatile(), vec![]);
2073 }
2074
2075 self.deps.borrow_mut().push(sentinel);
2076
2077 // Return all registered asset keys
2078 self.runtime.asset_key_registry.get_all::<K>()
2079 }
2080}
2081
2082impl<'a, T: Tracer> Db for QueryContext<'a, T> {
2083 fn query<Q: Query>(&self, query: Q) -> Result<Arc<Q::Output>, QueryError> {
2084 QueryContext::query(self, query)
2085 }
2086
2087 fn asset<K: AssetKey>(&self, key: K) -> Result<Arc<K::Asset>, QueryError> {
2088 QueryContext::asset(self, key)
2089 }
2090
2091 fn asset_state<K: AssetKey>(&self, key: K) -> Result<AssetLoadingState<K>, QueryError> {
2092 QueryContext::asset_state(self, key)
2093 }
2094
2095 fn list_queries<Q: Query>(&self) -> Vec<Q> {
2096 QueryContext::list_queries(self)
2097 }
2098
2099 fn list_asset_keys<K: AssetKey>(&self) -> Vec<K> {
2100 QueryContext::list_asset_keys(self)
2101 }
2102}
2103
2104/// Context for collecting dependencies during asset locator execution.
2105///
2106/// Unlike `QueryContext`, this is specifically for locators and does not
2107/// register dependencies on any parent query. Dependencies collected here
2108/// are stored with the asset itself.
2109pub(crate) struct LocatorContext<'a, T: Tracer> {
2110 runtime: &'a QueryRuntime<T>,
2111 deps: RefCell<Vec<FullCacheKey>>,
2112}
2113
2114impl<'a, T: Tracer> LocatorContext<'a, T> {
2115 /// Create a new locator context for the given asset key.
2116 ///
2117 /// Consistency tracking is handled via thread-local storage, so leaf asset
2118 /// accesses will be checked against any active tracker from a parent query.
2119 pub(crate) fn new(runtime: &'a QueryRuntime<T>, _asset_key: FullCacheKey) -> Self {
2120 Self {
2121 runtime,
2122 deps: RefCell::new(Vec::new()),
2123 }
2124 }
2125
2126 /// Consume this context and return the collected dependencies.
2127 pub(crate) fn into_deps(self) -> Vec<FullCacheKey> {
2128 self.deps.into_inner()
2129 }
2130}
2131
2132impl<T: Tracer> Db for LocatorContext<'_, T> {
2133 fn query<Q: Query>(&self, query: Q) -> Result<Arc<Q::Output>, QueryError> {
2134 let full_key = QueryCacheKey::new(query.clone()).into();
2135
2136 // Record this as a dependency of the asset being located
2137 self.deps.borrow_mut().push(full_key);
2138
2139 // Execute the query via the runtime
2140 self.runtime.query(query)
2141 }
2142
2143 fn asset<K: AssetKey>(&self, key: K) -> Result<Arc<K::Asset>, QueryError> {
2144 self.asset_state(key)?.suspend()
2145 }
2146
2147 fn asset_state<K: AssetKey>(&self, key: K) -> Result<AssetLoadingState<K>, QueryError> {
2148 let full_cache_key = AssetCacheKey::new(key.clone()).into();
2149
2150 // Record this as a dependency of the asset being located
2151 self.deps.borrow_mut().push(full_cache_key);
2152
2153 // Access the asset - consistency checking is done inside get_asset_with_revision
2154 let (state, _changed_at) = self.runtime.get_asset_with_revision(key)?;
2155
2156 Ok(state)
2157 }
2158
2159 fn list_queries<Q: Query>(&self) -> Vec<Q> {
2160 self.runtime.list_queries()
2161 }
2162
2163 fn list_asset_keys<K: AssetKey>(&self) -> Vec<K> {
2164 self.runtime.list_asset_keys()
2165 }
2166}
2167
2168/// Enum dispatch wrapper for Db implementations.
2169///
2170/// Reduces monomorphization by providing a single concrete type
2171/// for `&impl Db` parameters in user code.
2172pub(crate) enum DbDispatch<'a, T: Tracer = NoopTracer> {
2173 /// Query execution context (tracks query dependencies)
2174 QueryContext(&'a QueryContext<'a, T>),
2175 /// Locator execution context (tracks asset dependencies)
2176 LocatorContext(&'a LocatorContext<'a, T>),
2177}
2178
2179impl<T: Tracer> Db for DbDispatch<'_, T> {
2180 fn query<Q: Query>(&self, query: Q) -> Result<Arc<Q::Output>, QueryError> {
2181 match self {
2182 DbDispatch::QueryContext(ctx) => ctx.query(query),
2183 DbDispatch::LocatorContext(ctx) => ctx.query(query),
2184 }
2185 }
2186
2187 fn asset<K: AssetKey>(&self, key: K) -> Result<Arc<K::Asset>, QueryError> {
2188 match self {
2189 DbDispatch::QueryContext(ctx) => ctx.asset(key),
2190 DbDispatch::LocatorContext(ctx) => ctx.asset(key),
2191 }
2192 }
2193
2194 fn asset_state<K: AssetKey>(&self, key: K) -> Result<AssetLoadingState<K>, QueryError> {
2195 match self {
2196 DbDispatch::QueryContext(ctx) => ctx.asset_state(key),
2197 DbDispatch::LocatorContext(ctx) => ctx.asset_state(key),
2198 }
2199 }
2200
2201 fn list_queries<Q: Query>(&self) -> Vec<Q> {
2202 match self {
2203 DbDispatch::QueryContext(ctx) => ctx.list_queries(),
2204 DbDispatch::LocatorContext(ctx) => ctx.list_queries(),
2205 }
2206 }
2207
2208 fn list_asset_keys<K: AssetKey>(&self) -> Vec<K> {
2209 match self {
2210 DbDispatch::QueryContext(ctx) => ctx.list_asset_keys(),
2211 DbDispatch::LocatorContext(ctx) => ctx.list_asset_keys(),
2212 }
2213 }
2214}
2215
2216#[cfg(test)]
2217mod tests {
2218 use super::*;
2219
2220 #[test]
2221 fn test_simple_query() {
2222 #[derive(Clone, Debug, Hash, PartialEq, Eq)]
2223 struct Add {
2224 a: i32,
2225 b: i32,
2226 }
2227
2228 impl Query for Add {
2229 type Output = i32;
2230
2231 fn query(self, _db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
2232 Ok(Arc::new(self.a + self.b))
2233 }
2234
2235 fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
2236 old == new
2237 }
2238 }
2239
2240 let runtime = QueryRuntime::new();
2241
2242 let result = runtime.query(Add { a: 1, b: 2 }).unwrap();
2243 assert_eq!(*result, 3);
2244
2245 // Second query should be cached
2246 let result2 = runtime.query(Add { a: 1, b: 2 }).unwrap();
2247 assert_eq!(*result2, 3);
2248 }
2249
2250 #[test]
2251 fn test_dependent_queries() {
2252 #[derive(Clone, Debug, Hash, PartialEq, Eq)]
2253 struct Base {
2254 value: i32,
2255 }
2256
2257 impl Query for Base {
2258 type Output = i32;
2259
2260 fn query(self, _db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
2261 Ok(Arc::new(self.value * 2))
2262 }
2263
2264 fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
2265 old == new
2266 }
2267 }
2268
2269 #[derive(Clone, Debug, Hash, PartialEq, Eq)]
2270 struct Derived {
2271 base_value: i32,
2272 }
2273
2274 impl Query for Derived {
2275 type Output = i32;
2276
2277 fn query(self, db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
2278 let base = db.query(Base {
2279 value: self.base_value,
2280 })?;
2281 Ok(Arc::new(*base + 10))
2282 }
2283
2284 fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
2285 old == new
2286 }
2287 }
2288
2289 let runtime = QueryRuntime::new();
2290
2291 let result = runtime.query(Derived { base_value: 5 }).unwrap();
2292 assert_eq!(*result, 20); // 5 * 2 + 10
2293 }
2294
2295 #[test]
2296 fn test_cycle_detection() {
2297 #[derive(Clone, Debug, Hash, PartialEq, Eq)]
2298 struct CycleA {
2299 id: i32,
2300 }
2301
2302 #[derive(Clone, Debug, Hash, PartialEq, Eq)]
2303 struct CycleB {
2304 id: i32,
2305 }
2306
2307 impl Query for CycleA {
2308 type Output = i32;
2309
2310 fn query(self, db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
2311 let b = db.query(CycleB { id: self.id })?;
2312 Ok(Arc::new(*b + 1))
2313 }
2314
2315 fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
2316 old == new
2317 }
2318 }
2319
2320 impl Query for CycleB {
2321 type Output = i32;
2322
2323 fn query(self, db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
2324 let a = db.query(CycleA { id: self.id })?;
2325 Ok(Arc::new(*a + 1))
2326 }
2327
2328 fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
2329 old == new
2330 }
2331 }
2332
2333 let runtime = QueryRuntime::new();
2334
2335 let result = runtime.query(CycleA { id: 1 });
2336 assert!(matches!(result, Err(QueryError::Cycle { .. })));
2337 }
2338
2339 #[test]
2340 fn test_fallible_query() {
2341 #[derive(Clone, Debug, Hash, PartialEq, Eq)]
2342 struct ParseInt {
2343 input: String,
2344 }
2345
2346 impl Query for ParseInt {
2347 type Output = Result<i32, std::num::ParseIntError>;
2348
2349 fn query(self, _db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
2350 Ok(Arc::new(self.input.parse()))
2351 }
2352
2353 fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
2354 old == new
2355 }
2356 }
2357
2358 let runtime = QueryRuntime::new();
2359
2360 // Valid parse
2361 let result = runtime
2362 .query(ParseInt {
2363 input: "42".to_string(),
2364 })
2365 .unwrap();
2366 assert_eq!(*result, Ok(42));
2367
2368 // Invalid parse - system succeeds, user error in output
2369 let result = runtime
2370 .query(ParseInt {
2371 input: "not_a_number".to_string(),
2372 })
2373 .unwrap();
2374 assert!(result.is_err());
2375 }
2376
2377 // Macro tests
2378 mod macro_tests {
2379 use super::*;
2380 use crate::query;
2381
2382 #[query]
2383 fn add(db: &impl Db, a: i32, b: i32) -> Result<i32, QueryError> {
2384 let _ = db; // silence unused warning
2385 Ok(a + b)
2386 }
2387
2388 #[test]
2389 fn test_macro_basic() {
2390 let runtime = QueryRuntime::new();
2391 let result = runtime.query(Add::new(1, 2)).unwrap();
2392 assert_eq!(*result, 3);
2393 }
2394
2395 #[query]
2396 fn simple_double(db: &impl Db, x: i32) -> Result<i32, QueryError> {
2397 let _ = db;
2398 Ok(x * 2)
2399 }
2400
2401 #[test]
2402 fn test_macro_simple() {
2403 let runtime = QueryRuntime::new();
2404 let result = runtime.query(SimpleDouble::new(5)).unwrap();
2405 assert_eq!(*result, 10);
2406 }
2407
2408 #[query(keys(id))]
2409 fn with_key_selection(
2410 db: &impl Db,
2411 id: u32,
2412 include_extra: bool,
2413 ) -> Result<String, QueryError> {
2414 let _ = db;
2415 Ok(format!("id={}, extra={}", id, include_extra))
2416 }
2417
2418 #[test]
2419 fn test_macro_key_selection() {
2420 let runtime = QueryRuntime::new();
2421
2422 // Same id, different include_extra - should return cached
2423 let r1 = runtime.query(WithKeySelection::new(1, true)).unwrap();
2424 let r2 = runtime.query(WithKeySelection::new(1, false)).unwrap();
2425
2426 // Both should have same value because only `id` is the key
2427 assert_eq!(*r1, "id=1, extra=true");
2428 assert_eq!(*r2, "id=1, extra=true"); // Cached!
2429 }
2430
2431 #[query]
2432 fn dependent(db: &impl Db, a: i32, b: i32) -> Result<i32, QueryError> {
2433 let sum = db.query(Add::new(a, b))?;
2434 Ok(*sum * 2)
2435 }
2436
2437 #[test]
2438 fn test_macro_dependencies() {
2439 let runtime = QueryRuntime::new();
2440 let result = runtime.query(Dependent::new(3, 4)).unwrap();
2441 assert_eq!(*result, 14); // (3 + 4) * 2
2442 }
2443
2444 #[query(output_eq)]
2445 fn with_output_eq(db: &impl Db, x: i32) -> Result<i32, QueryError> {
2446 let _ = db;
2447 Ok(x * 2)
2448 }
2449
2450 #[test]
2451 fn test_macro_output_eq() {
2452 let runtime = QueryRuntime::new();
2453 let result = runtime.query(WithOutputEq::new(5)).unwrap();
2454 assert_eq!(*result, 10);
2455 }
2456
2457 #[query(name = "CustomName")]
2458 fn original_name(db: &impl Db, x: i32) -> Result<i32, QueryError> {
2459 let _ = db;
2460 Ok(x)
2461 }
2462
2463 #[test]
2464 fn test_macro_custom_name() {
2465 let runtime = QueryRuntime::new();
2466 let result = runtime.query(CustomName::new(42)).unwrap();
2467 assert_eq!(*result, 42);
2468 }
2469
2470 // Test that attribute macros like #[tracing::instrument] are preserved
2471 // We use #[allow(unused_variables)] and #[inline] as test attributes since
2472 // they don't require external dependencies.
2473 #[allow(unused_variables)]
2474 #[inline]
2475 #[query]
2476 fn with_attributes(db: &impl Db, x: i32) -> Result<i32, QueryError> {
2477 // This would warn without #[allow(unused_variables)] on the generated method
2478 let unused_var = 42;
2479 Ok(x * 2)
2480 }
2481
2482 #[test]
2483 fn test_macro_preserves_attributes() {
2484 let runtime = QueryRuntime::new();
2485 // If attributes weren't preserved, this might warn about unused_var
2486 let result = runtime.query(WithAttributes::new(5)).unwrap();
2487 assert_eq!(*result, 10);
2488 }
2489 }
2490
2491 // Tests for poll() and changed_at()
2492 mod poll_tests {
2493 use super::*;
2494
2495 #[derive(Clone, Debug, Hash, PartialEq, Eq)]
2496 struct Counter {
2497 id: i32,
2498 }
2499
2500 impl Query for Counter {
2501 type Output = i32;
2502
2503 fn query(self, _db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
2504 Ok(Arc::new(self.id * 10))
2505 }
2506
2507 fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
2508 old == new
2509 }
2510 }
2511
2512 #[test]
2513 fn test_poll_returns_value_and_revision() {
2514 let runtime = QueryRuntime::new();
2515
2516 let result = runtime.poll(Counter { id: 1 }).unwrap();
2517
2518 // Value should be correct - access through Result and Arc
2519 assert_eq!(**result.value.as_ref().unwrap(), 10);
2520
2521 // Revision should be non-zero after first execution
2522 assert!(result.revision > 0);
2523 }
2524
2525 #[test]
2526 fn test_poll_revision_stable_on_cache_hit() {
2527 let runtime = QueryRuntime::new();
2528
2529 // First poll
2530 let result1 = runtime.poll(Counter { id: 1 }).unwrap();
2531 let rev1 = result1.revision;
2532
2533 // Second poll (cache hit)
2534 let result2 = runtime.poll(Counter { id: 1 }).unwrap();
2535 let rev2 = result2.revision;
2536
2537 // Revision should be the same (no change)
2538 assert_eq!(rev1, rev2);
2539 }
2540
2541 #[test]
2542 fn test_poll_revision_changes_on_invalidate() {
2543 let runtime = QueryRuntime::new();
2544
2545 // First poll
2546 let result1 = runtime.poll(Counter { id: 1 }).unwrap();
2547 let rev1 = result1.revision;
2548
2549 // Invalidate and poll again
2550 runtime.invalidate(&Counter { id: 1 });
2551 let result2 = runtime.poll(Counter { id: 1 }).unwrap();
2552 let rev2 = result2.revision;
2553
2554 // Revision should increase (value was recomputed)
2555 // Note: Since output_eq returns true (same value), this might not change
2556 // depending on early cutoff behavior. Let's verify the value is still correct.
2557 assert_eq!(**result2.value.as_ref().unwrap(), 10);
2558
2559 // With early cutoff, revision might stay the same if value didn't change
2560 // This is expected behavior
2561 assert!(rev2 >= rev1);
2562 }
2563
2564 #[test]
2565 fn test_changed_at_returns_none_for_unexecuted_query() {
2566 let runtime = QueryRuntime::new();
2567
2568 // Query has never been executed
2569 let rev = runtime.changed_at(&Counter { id: 1 });
2570 assert!(rev.is_none());
2571 }
2572
2573 #[test]
2574 fn test_changed_at_returns_revision_after_execution() {
2575 let runtime = QueryRuntime::new();
2576
2577 // Execute the query
2578 let _ = runtime.query(Counter { id: 1 }).unwrap();
2579
2580 // Now changed_at should return Some
2581 let rev = runtime.changed_at(&Counter { id: 1 });
2582 assert!(rev.is_some());
2583 assert!(rev.unwrap() > 0);
2584 }
2585
2586 #[test]
2587 fn test_changed_at_matches_poll_revision() {
2588 let runtime = QueryRuntime::new();
2589
2590 // Poll the query
2591 let result = runtime.poll(Counter { id: 1 }).unwrap();
2592
2593 // changed_at should match the revision from poll
2594 let rev = runtime.changed_at(&Counter { id: 1 });
2595 assert_eq!(rev, Some(result.revision));
2596 }
2597
2598 #[test]
2599 fn test_poll_value_access() {
2600 let runtime = QueryRuntime::new();
2601
2602 let result = runtime.poll(Counter { id: 5 }).unwrap();
2603
2604 // Access through Result and Arc
2605 let value: &i32 = result.value.as_ref().unwrap();
2606 assert_eq!(*value, 50);
2607
2608 // Access Arc directly via field after unwrapping Result
2609 let arc: &Arc<i32> = result.value.as_ref().unwrap();
2610 assert_eq!(**arc, 50);
2611 }
2612
2613 #[test]
2614 fn test_subscription_pattern() {
2615 let runtime = QueryRuntime::new();
2616
2617 // Simulate subscription pattern
2618 let mut last_revision: RevisionCounter = 0;
2619 let mut notifications = 0;
2620
2621 // First poll - should notify (new value)
2622 let result = runtime.poll(Counter { id: 1 }).unwrap();
2623 if result.revision > last_revision {
2624 notifications += 1;
2625 last_revision = result.revision;
2626 }
2627
2628 // Second poll - should NOT notify (no change)
2629 let result = runtime.poll(Counter { id: 1 }).unwrap();
2630 if result.revision > last_revision {
2631 notifications += 1;
2632 last_revision = result.revision;
2633 }
2634
2635 // Third poll - should NOT notify (no change)
2636 let result = runtime.poll(Counter { id: 1 }).unwrap();
2637 if result.revision > last_revision {
2638 notifications += 1;
2639 #[allow(unused_assignments)]
2640 {
2641 last_revision = result.revision;
2642 }
2643 }
2644
2645 // Only the first poll should have triggered a notification
2646 assert_eq!(notifications, 1);
2647 }
2648 }
2649
2650 // Tests for GC APIs
2651 mod gc_tests {
2652 use super::*;
2653 use crate::tracer::{SpanContext, SpanId, TraceId};
2654 use std::collections::HashSet;
2655 use std::sync::atomic::{AtomicUsize, Ordering};
2656 use std::sync::Mutex;
2657
2658 #[derive(Clone, Debug, Hash, PartialEq, Eq)]
2659 struct Leaf {
2660 id: i32,
2661 }
2662
2663 impl Query for Leaf {
2664 type Output = i32;
2665
2666 fn query(self, _db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
2667 Ok(Arc::new(self.id * 10))
2668 }
2669
2670 fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
2671 old == new
2672 }
2673 }
2674
2675 #[derive(Clone, Debug, Hash, PartialEq, Eq)]
2676 struct Parent {
2677 child_id: i32,
2678 }
2679
2680 impl Query for Parent {
2681 type Output = i32;
2682
2683 fn query(self, db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
2684 let child = db.query(Leaf { id: self.child_id })?;
2685 Ok(Arc::new(*child + 1))
2686 }
2687
2688 fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
2689 old == new
2690 }
2691 }
2692
2693 #[test]
2694 fn test_query_keys_returns_all_cached_queries() {
2695 let runtime = QueryRuntime::new();
2696
2697 // Execute some queries
2698 let _ = runtime.query(Leaf { id: 1 }).unwrap();
2699 let _ = runtime.query(Leaf { id: 2 }).unwrap();
2700 let _ = runtime.query(Leaf { id: 3 }).unwrap();
2701
2702 // Get all keys
2703 let keys = runtime.query_keys();
2704
2705 // Should have at least 3 keys (might have more due to sentinels)
2706 assert!(keys.len() >= 3);
2707 }
2708
2709 #[test]
2710 fn test_remove_removes_query() {
2711 let runtime = QueryRuntime::new();
2712
2713 // Execute a query
2714 let _ = runtime.query(Leaf { id: 1 }).unwrap();
2715
2716 // Get the key
2717 let full_key = QueryCacheKey::new(Leaf { id: 1 }).into();
2718
2719 // Query should exist
2720 assert!(runtime.changed_at(&Leaf { id: 1 }).is_some());
2721
2722 // Remove it
2723 assert!(runtime.remove(&full_key));
2724
2725 // Query should no longer exist
2726 assert!(runtime.changed_at(&Leaf { id: 1 }).is_none());
2727 }
2728
2729 #[test]
2730 fn test_remove_if_unused_removes_leaf_query() {
2731 let runtime = QueryRuntime::new();
2732
2733 // Execute a leaf query (no dependents)
2734 let _ = runtime.query(Leaf { id: 1 }).unwrap();
2735
2736 // Should be removable since no other query depends on it
2737 assert!(runtime.remove_query_if_unused(&Leaf { id: 1 }));
2738
2739 // Query should no longer exist
2740 assert!(runtime.changed_at(&Leaf { id: 1 }).is_none());
2741 }
2742
2743 #[test]
2744 fn test_remove_if_unused_does_not_remove_query_with_dependents() {
2745 let runtime = QueryRuntime::new();
2746
2747 // Execute parent query (which depends on Leaf)
2748 let _ = runtime.query(Parent { child_id: 1 }).unwrap();
2749
2750 // Leaf query should not be removable since Parent depends on it
2751 assert!(!runtime.remove_query_if_unused(&Leaf { id: 1 }));
2752
2753 // Leaf query should still exist
2754 assert!(runtime.changed_at(&Leaf { id: 1 }).is_some());
2755
2756 // But Parent should be removable (no dependents)
2757 assert!(runtime.remove_query_if_unused(&Parent { child_id: 1 }));
2758 }
2759
2760 #[test]
2761 fn test_remove_if_unused_with_full_cache_key() {
2762 let runtime = QueryRuntime::new();
2763
2764 // Execute a leaf query
2765 let _ = runtime.query(Leaf { id: 1 }).unwrap();
2766
2767 let full_key = QueryCacheKey::new(Leaf { id: 1 }).into();
2768
2769 // Should be removable via FullCacheKey
2770 assert!(runtime.remove_if_unused(&full_key));
2771
2772 // Query should no longer exist
2773 assert!(runtime.changed_at(&Leaf { id: 1 }).is_none());
2774 }
2775
2776 // Test tracer receives on_query_start calls (for GC tracking)
2777 struct GcTracker {
2778 accessed_keys: Mutex<HashSet<String>>,
2779 access_count: AtomicUsize,
2780 }
2781
2782 impl GcTracker {
2783 fn new() -> Self {
2784 Self {
2785 accessed_keys: Mutex::new(HashSet::new()),
2786 access_count: AtomicUsize::new(0),
2787 }
2788 }
2789 }
2790
2791 impl Tracer for GcTracker {
2792 fn new_span_id(&self) -> SpanId {
2793 SpanId(1)
2794 }
2795
2796 fn new_trace_id(&self) -> TraceId {
2797 TraceId(1)
2798 }
2799
2800 fn on_query_start(&self, _ctx: &SpanContext, query_key: &QueryCacheKey) {
2801 self.accessed_keys
2802 .lock()
2803 .unwrap()
2804 .insert(query_key.debug_repr().to_string());
2805 self.access_count.fetch_add(1, Ordering::Relaxed);
2806 }
2807 }
2808
2809 #[test]
2810 fn test_tracer_receives_on_query_start() {
2811 let tracker = GcTracker::new();
2812 let runtime = QueryRuntime::with_tracer(tracker);
2813
2814 // Execute some queries
2815 let _ = runtime.query(Leaf { id: 1 }).unwrap();
2816 let _ = runtime.query(Leaf { id: 2 }).unwrap();
2817
2818 // Tracer should have received on_query_start calls
2819 let count = runtime.tracer().access_count.load(Ordering::Relaxed);
2820 assert_eq!(count, 2);
2821
2822 // Check that the keys were recorded
2823 let keys = runtime.tracer().accessed_keys.lock().unwrap();
2824 assert!(keys.iter().any(|k| k.contains("Leaf")));
2825 }
2826
2827 #[test]
2828 fn test_tracer_receives_on_query_start_for_cache_hits() {
2829 let tracker = GcTracker::new();
2830 let runtime = QueryRuntime::with_tracer(tracker);
2831
2832 // Execute query twice (second is cache hit)
2833 let _ = runtime.query(Leaf { id: 1 }).unwrap();
2834 let _ = runtime.query(Leaf { id: 1 }).unwrap();
2835
2836 // Tracer should have received on_query_start for both calls
2837 let count = runtime.tracer().access_count.load(Ordering::Relaxed);
2838 assert_eq!(count, 2);
2839 }
2840
2841 #[test]
2842 fn test_gc_workflow() {
2843 let tracker = GcTracker::new();
2844 let runtime = QueryRuntime::with_tracer(tracker);
2845
2846 // Execute some queries
2847 let _ = runtime.query(Leaf { id: 1 }).unwrap();
2848 let _ = runtime.query(Leaf { id: 2 }).unwrap();
2849 let _ = runtime.query(Leaf { id: 3 }).unwrap();
2850
2851 // Simulate GC: remove all queries that are not in use
2852 let mut removed = 0;
2853 for key in runtime.query_keys() {
2854 if runtime.remove_if_unused(&key) {
2855 removed += 1;
2856 }
2857 }
2858
2859 // All leaf queries should be removable
2860 assert!(removed >= 3);
2861
2862 // Queries should no longer exist
2863 assert!(runtime.changed_at(&Leaf { id: 1 }).is_none());
2864 assert!(runtime.changed_at(&Leaf { id: 2 }).is_none());
2865 assert!(runtime.changed_at(&Leaf { id: 3 }).is_none());
2866 }
2867 }
2868}