query_flow/runtime.rs
1//! Query runtime and context.
2
3use std::any::{Any, TypeId};
4use std::cell::RefCell;
5use std::ops::Deref;
6use std::rc::Rc;
7use std::sync::Arc;
8
9use whale::{Durability, GetOrInsertResult, RevisionCounter, Runtime as WhaleRuntime};
10
11use crate::asset::{AssetKey, AssetLocator, DurabilityLevel, PendingAsset};
12use crate::db::Db;
13use crate::key::{
14 AssetCacheKey, AssetKeySetSentinelKey, FullCacheKey, QueryCacheKey, QuerySetSentinelKey,
15};
16use crate::loading::AssetLoadingState;
17use crate::query::Query;
18use crate::storage::{
19 AssetKeyRegistry, CachedEntry, CachedValue, ErasedLocateResult, LocatorStorage, PendingStorage,
20 QueryRegistry, VerifierStorage,
21};
22use crate::tracer::{
23 ExecutionResult, InvalidationReason, NoopTracer, SpanContext, SpanId, TraceId, Tracer,
24 TracerAssetState,
25};
26use crate::QueryError;
27
28/// Function type for comparing user errors during early cutoff.
29///
30/// Used by `QueryRuntimeBuilder::error_comparator` to customize how
31/// `QueryError::UserError` values are compared for caching purposes.
32pub type ErrorComparator = fn(&anyhow::Error, &anyhow::Error) -> bool;
33
34/// Number of durability levels (matches whale's default).
35const DURABILITY_LEVELS: usize = 4;
36
37// Thread-local query execution stack for cycle detection.
38thread_local! {
39 static QUERY_STACK: RefCell<Vec<FullCacheKey>> = const { RefCell::new(Vec::new()) };
40
41 /// Current consistency tracker for leaf asset checks.
42 /// Set during query execution, used by all nested locator calls.
43 /// Uses Rc since thread-local is single-threaded.
44 static CONSISTENCY_TRACKER: RefCell<Option<Rc<ConsistencyTracker>>> = const { RefCell::new(None) };
45
46 /// Stack for tracking parent-child query relationships.
47 static SPAN_STACK: RefCell<SpanStack> = const { RefCell::new(SpanStack::Empty) };
48}
49
50/// Thread-local span stack state.
51/// Empty when no query is executing; Active with trace_id and span stack during execution.
52enum SpanStack {
53 Empty,
54 Active(TraceId, Vec<SpanId>),
55}
56
57/// Check a leaf asset against the current consistency tracker (if any).
58/// Returns Ok if no tracker is set or if the check passes.
59fn check_leaf_asset_consistency(dep_changed_at: RevisionCounter) -> Result<(), QueryError> {
60 CONSISTENCY_TRACKER.with(|tracker| {
61 if let Some(ref t) = *tracker.borrow() {
62 t.check_leaf_asset(dep_changed_at)
63 } else {
64 Ok(())
65 }
66 })
67}
68
69/// RAII guard that sets the consistency tracker for the current thread.
70struct ConsistencyTrackerGuard {
71 previous: Option<Rc<ConsistencyTracker>>,
72}
73
74impl ConsistencyTrackerGuard {
75 fn new(tracker: Rc<ConsistencyTracker>) -> Self {
76 let previous = CONSISTENCY_TRACKER.with(|t| t.borrow_mut().replace(tracker));
77 Self { previous }
78 }
79}
80
81impl Drop for ConsistencyTrackerGuard {
82 fn drop(&mut self) {
83 CONSISTENCY_TRACKER.with(|t| {
84 *t.borrow_mut() = self.previous.take();
85 });
86 }
87}
88
89/// Check for cycles in the query stack and return error if detected.
90fn check_cycle(key: &FullCacheKey) -> Result<(), QueryError> {
91 let cycle_detected = QUERY_STACK.with(|stack| stack.borrow().iter().any(|k| k == key));
92 if cycle_detected {
93 let path = QUERY_STACK.with(|stack| {
94 let stack = stack.borrow();
95 let mut path: Vec<FullCacheKey> = stack.iter().cloned().collect();
96 path.push(key.clone());
97 path
98 });
99 return Err(QueryError::Cycle { path });
100 }
101 Ok(())
102}
103
104/// RAII guard for pushing/popping from query stack.
105struct StackGuard;
106
107impl StackGuard {
108 fn push(key: FullCacheKey) -> Self {
109 QUERY_STACK.with(|stack| stack.borrow_mut().push(key));
110 StackGuard
111 }
112}
113
114impl Drop for StackGuard {
115 fn drop(&mut self) {
116 QUERY_STACK.with(|stack| {
117 stack.borrow_mut().pop();
118 });
119 }
120}
121
122/// RAII guard for pushing/popping from span stack.
123struct SpanStackGuard;
124
125impl SpanStackGuard {
126 /// Push a span onto the stack. Sets trace_id if this is the root span.
127 fn push(trace_id: TraceId, span_id: SpanId) -> Self {
128 SPAN_STACK.with(|stack| {
129 let mut s = stack.borrow_mut();
130 match &mut *s {
131 SpanStack::Empty => *s = SpanStack::Active(trace_id, vec![span_id]),
132 SpanStack::Active(_, spans) => spans.push(span_id),
133 }
134 });
135 SpanStackGuard
136 }
137}
138
139impl Drop for SpanStackGuard {
140 fn drop(&mut self) {
141 SPAN_STACK.with(|stack| {
142 let mut s = stack.borrow_mut();
143 if let SpanStack::Active(_, spans) = &mut *s {
144 spans.pop();
145 if spans.is_empty() {
146 *s = SpanStack::Empty;
147 }
148 }
149 });
150 }
151}
152
153/// Execution context passed through query execution.
154///
155/// Contains a SpanContext for tracing correlation with parent-child relationships.
156#[derive(Clone, Copy)]
157pub struct ExecutionContext {
158 span_ctx: SpanContext,
159}
160
161impl ExecutionContext {
162 /// Create a new execution context with the given span context.
163 #[inline]
164 pub fn new(span_ctx: SpanContext) -> Self {
165 Self { span_ctx }
166 }
167
168 /// Get the span context for this execution context.
169 #[inline]
170 pub fn span_ctx(&self) -> &SpanContext {
171 &self.span_ctx
172 }
173}
174
175/// Result of polling a query, containing the value and its revision.
176///
177/// This is returned by [`QueryRuntime::poll`] and provides both the query result
178/// and its change revision, enabling efficient change detection for subscription
179/// patterns.
180///
181/// # Example
182///
183/// ```ignore
184/// let result = runtime.poll(MyQuery::new())?;
185///
186/// // Access the value via Deref
187/// println!("{:?}", *result);
188///
189/// // Check if changed since last poll
190/// if result.revision > last_known_revision {
191/// notify_subscribers(&result.value);
192/// last_known_revision = result.revision;
193/// }
194/// ```
195#[derive(Debug, Clone)]
196pub struct Polled<T> {
197 /// The query result value.
198 pub value: T,
199 /// The revision at which this value was last changed.
200 ///
201 /// Compare this with a previously stored revision to detect changes.
202 pub revision: RevisionCounter,
203}
204
205impl<T: Deref> Deref for Polled<T> {
206 type Target = T::Target;
207
208 fn deref(&self) -> &Self::Target {
209 &self.value
210 }
211}
212
213/// The query runtime manages query execution, caching, and dependency tracking.
214///
215/// This is cheap to clone - all data is behind `Arc`.
216///
217/// # Type Parameter
218///
219/// - `T: Tracer` - The tracer type for observability. Use `NoopTracer` (default)
220/// for zero-cost when tracing is not needed.
221///
222/// # Example
223///
224/// ```ignore
225/// // Without tracing (default)
226/// let runtime = QueryRuntime::new();
227///
228/// // With tracing
229/// let tracer = MyTracer::new();
230/// let runtime = QueryRuntime::with_tracer(tracer);
231///
232/// // Sync query execution
233/// let result = runtime.query(MyQuery { ... })?;
234/// ```
235pub struct QueryRuntime<T: Tracer = NoopTracer> {
236 /// Whale runtime for dependency tracking and cache storage.
237 /// Query outputs and asset values are stored in Node.data as Option<CachedEntry>.
238 whale: WhaleRuntime<FullCacheKey, Option<CachedEntry>, DURABILITY_LEVELS>,
239 /// Registered asset locators
240 locators: Arc<LocatorStorage<T>>,
241 /// Pending asset requests
242 pending: Arc<PendingStorage>,
243 /// Registry for tracking query instances (for list_queries)
244 query_registry: Arc<QueryRegistry>,
245 /// Registry for tracking asset keys (for list_asset_keys)
246 asset_key_registry: Arc<AssetKeyRegistry>,
247 /// Verifiers for re-executing queries (for verify-then-decide pattern)
248 verifiers: Arc<VerifierStorage>,
249 /// Comparator for user errors during early cutoff
250 error_comparator: ErrorComparator,
251 /// Tracer for observability
252 tracer: Arc<T>,
253}
254
255#[test]
256fn test_runtime_send_sync() {
257 fn assert_send_sync<T: Send + Sync>() {}
258 assert_send_sync::<QueryRuntime<NoopTracer>>();
259}
260
261impl Default for QueryRuntime<NoopTracer> {
262 fn default() -> Self {
263 Self::new()
264 }
265}
266
267impl<T: Tracer> Clone for QueryRuntime<T> {
268 fn clone(&self) -> Self {
269 Self {
270 whale: self.whale.clone(),
271 locators: self.locators.clone(),
272 pending: self.pending.clone(),
273 query_registry: self.query_registry.clone(),
274 asset_key_registry: self.asset_key_registry.clone(),
275 verifiers: self.verifiers.clone(),
276 error_comparator: self.error_comparator,
277 tracer: self.tracer.clone(),
278 }
279 }
280}
281
282/// Default error comparator that treats all errors as different.
283///
284/// This is conservative - it always triggers recomputation when an error occurs.
285fn default_error_comparator(_a: &anyhow::Error, _b: &anyhow::Error) -> bool {
286 false
287}
288
289impl<T: Tracer> QueryRuntime<T> {
290 /// Get cached output along with its revision (single atomic access).
291 fn get_cached_with_revision<Q: Query>(
292 &self,
293 key: &FullCacheKey,
294 ) -> Option<(CachedValue<Arc<Q::Output>>, RevisionCounter)> {
295 let node = self.whale.get(key)?;
296 let revision = node.changed_at;
297 let entry = node.data.as_ref()?;
298 let cached = entry.to_cached_value::<Q::Output>()?;
299 Some((cached, revision))
300 }
301
302 /// Get a reference to the tracer.
303 #[inline]
304 pub fn tracer(&self) -> &T {
305 &self.tracer
306 }
307}
308
309impl QueryRuntime<NoopTracer> {
310 /// Create a new query runtime with default settings.
311 pub fn new() -> Self {
312 Self::with_tracer(NoopTracer)
313 }
314
315 /// Create a builder for customizing the runtime.
316 ///
317 /// # Example
318 ///
319 /// ```ignore
320 /// let runtime = QueryRuntime::builder()
321 /// .error_comparator(|a, b| {
322 /// // Custom error comparison logic
323 /// match (a.downcast_ref::<MyError>(), b.downcast_ref::<MyError>()) {
324 /// (Some(a), Some(b)) => a == b,
325 /// _ => false,
326 /// }
327 /// })
328 /// .build();
329 /// ```
330 pub fn builder() -> QueryRuntimeBuilder<NoopTracer> {
331 QueryRuntimeBuilder::new()
332 }
333}
334
335impl<T: Tracer> QueryRuntime<T> {
336 /// Create a new query runtime with the specified tracer.
337 pub fn with_tracer(tracer: T) -> Self {
338 QueryRuntimeBuilder::new().tracer(tracer).build()
339 }
340
341 /// Execute a query synchronously.
342 ///
343 /// Returns the cached result if valid, otherwise executes the query.
344 ///
345 /// # Errors
346 ///
347 /// - `QueryError::Suspend` - Query is waiting for async loading
348 /// - `QueryError::Cycle` - Dependency cycle detected
349 pub fn query<Q: Query>(&self, query: Q) -> Result<Arc<Q::Output>, QueryError> {
350 self.query_internal(query)
351 .and_then(|(inner_result, _)| inner_result.map_err(QueryError::UserError))
352 }
353
354 /// Internal implementation shared by query() and poll().
355 ///
356 /// Returns (result, revision) tuple where result is either Ok(output) or Err(user_error).
357 /// System errors (Suspend, Cycle, etc.) are returned as the outer Err.
358 #[allow(clippy::type_complexity)]
359 fn query_internal<Q: Query>(
360 &self,
361 query: Q,
362 ) -> Result<(Result<Arc<Q::Output>, Arc<anyhow::Error>>, RevisionCounter), QueryError> {
363 let query_cache_key = QueryCacheKey::new(query.clone());
364 let full_key: FullCacheKey = query_cache_key.clone().into();
365
366 // Create SpanContext with parent relationship from SPAN_STACK
367 let span_id = self.tracer.new_span_id();
368 let (trace_id, parent_span_id) = SPAN_STACK.with(|stack| match &*stack.borrow() {
369 SpanStack::Empty => (self.tracer.new_trace_id(), None),
370 SpanStack::Active(tid, spans) => (*tid, spans.last().copied()),
371 });
372 let span_ctx = SpanContext {
373 span_id,
374 trace_id,
375 parent_span_id,
376 };
377
378 // Push to span stack and create execution context
379 let _span_guard = SpanStackGuard::push(trace_id, span_id);
380 let exec_ctx = ExecutionContext::new(span_ctx);
381
382 self.tracer.on_query_start(&span_ctx, &query_cache_key);
383
384 // Check for cycles using thread-local stack
385 let cycle_detected = QUERY_STACK.with(|stack| {
386 let stack = stack.borrow();
387 stack.iter().any(|k| k == &full_key)
388 });
389
390 if cycle_detected {
391 let path = QUERY_STACK.with(|stack| {
392 let stack = stack.borrow();
393 let mut path: Vec<FullCacheKey> = stack.iter().cloned().collect();
394 path.push(full_key.clone());
395 path
396 });
397
398 self.tracer.on_cycle_detected(&path);
399 self.tracer
400 .on_query_end(&span_ctx, &query_cache_key, ExecutionResult::CycleDetected);
401
402 return Err(QueryError::Cycle { path });
403 }
404
405 // Check if cached and valid (with verify-then-decide pattern)
406 let current_rev = self.whale.current_revision();
407
408 // Fast path: already verified at current revision
409 if self.whale.is_verified_at(&full_key, ¤t_rev) {
410 // Single atomic access to get both cached value and revision
411 if let Some((cached, revision)) = self.get_cached_with_revision::<Q>(&full_key) {
412 self.tracer
413 .on_cache_check(&span_ctx, &query_cache_key, true);
414 self.tracer
415 .on_query_end(&span_ctx, &query_cache_key, ExecutionResult::CacheHit);
416
417 return match cached {
418 CachedValue::Ok(output) => Ok((Ok(output), revision)),
419 CachedValue::UserError(err) => Ok((Err(err), revision)),
420 };
421 }
422 }
423
424 // Check shallow validity (deps' changed_at ok) and try verify-then-decide
425 if self.whale.is_valid(&full_key) {
426 // Single atomic access to get both cached value and revision
427 if let Some((cached, revision)) = self.get_cached_with_revision::<Q>(&full_key) {
428 // Shallow valid but not verified - verify deps first
429 let mut deps_verified = true;
430 if let Some(deps) = self.whale.get_dependency_ids(&full_key) {
431 for dep in deps {
432 if let Some(verifier) = self.verifiers.get(&dep) {
433 // Re-run query/asset to verify it (triggers recursive verification)
434 // For assets, this re-accesses the asset which may re-run the locator
435 if verifier.verify(self as &dyn std::any::Any).is_err() {
436 deps_verified = false;
437 break;
438 }
439 }
440 // Note: deps without verifiers are assumed valid (they're verified
441 // by the final is_valid check if their changed_at increased)
442 }
443 }
444
445 // Re-check validity after deps are verified
446 if deps_verified && self.whale.is_valid(&full_key) {
447 // Deps didn't change their changed_at, mark as verified and use cached
448 self.whale.mark_verified(&full_key, ¤t_rev);
449
450 self.tracer
451 .on_cache_check(&span_ctx, &query_cache_key, true);
452 self.tracer.on_query_end(
453 &span_ctx,
454 &query_cache_key,
455 ExecutionResult::CacheHit,
456 );
457
458 return match cached {
459 CachedValue::Ok(output) => Ok((Ok(output), revision)),
460 CachedValue::UserError(err) => Ok((Err(err), revision)),
461 };
462 }
463 // A dep's changed_at increased, fall through to execute
464 }
465 }
466
467 self.tracer
468 .on_cache_check(&span_ctx, &query_cache_key, false);
469
470 // Execute the query with cycle tracking
471 let _guard = StackGuard::push(full_key.clone());
472 let result = self.execute_query::<Q>(&query, &query_cache_key, &full_key, exec_ctx);
473 drop(_guard);
474
475 // Emit end event
476 let exec_result = match &result {
477 Ok((_, true, _)) => ExecutionResult::Changed,
478 Ok((_, false, _)) => ExecutionResult::Unchanged,
479 Err(QueryError::Suspend { .. }) => ExecutionResult::Suspended,
480 Err(QueryError::Cycle { .. }) => ExecutionResult::CycleDetected,
481 Err(e) => ExecutionResult::Error {
482 message: format!("{:?}", e),
483 },
484 };
485 self.tracer
486 .on_query_end(&span_ctx, &query_cache_key, exec_result);
487
488 result.map(|(inner_result, _, revision)| (inner_result, revision))
489 }
490
491 /// Execute a query, caching the result if appropriate.
492 ///
493 /// Returns (result, output_changed, revision) tuple.
494 /// - `result`: Ok(output) for success, Err(user_error) for user errors
495 /// - System errors (Suspend, Cycle, etc.) are returned as outer Err
496 #[allow(clippy::type_complexity)]
497 fn execute_query<Q: Query>(
498 &self,
499 query: &Q,
500 query_cache_key: &QueryCacheKey,
501 full_key: &FullCacheKey,
502 exec_ctx: ExecutionContext,
503 ) -> Result<
504 (
505 Result<Arc<Q::Output>, Arc<anyhow::Error>>,
506 bool,
507 RevisionCounter,
508 ),
509 QueryError,
510 > {
511 // Capture current global revision at query start for consistency checking
512 let start_revision = self.whale.current_revision().get(Durability::volatile());
513
514 // Create consistency tracker for this query execution
515 let tracker = Rc::new(ConsistencyTracker::new(start_revision));
516
517 // Set thread-local tracker for nested locator calls
518 let _tracker_guard = ConsistencyTrackerGuard::new(tracker);
519
520 // Create context for this query execution
521 let ctx = QueryContext {
522 runtime: self,
523 current_key: full_key.clone(),
524 exec_ctx,
525 deps: RefCell::new(Vec::new()),
526 };
527
528 // Execute the query (clone because query() takes ownership)
529 let result = query.clone().query(&ctx);
530
531 // Get collected dependencies
532 let deps: Vec<FullCacheKey> = ctx.deps.borrow().clone();
533
534 // Query durability defaults to stable - Whale will automatically reduce
535 // the effective durability to min(requested, min(dep_durabilities)).
536 // A pure query with no dependencies remains stable.
537 // A query depending on volatile assets becomes volatile.
538 let durability = Durability::stable();
539
540 match result {
541 Ok(output) => {
542 // Check if output changed (for early cutoff)
543 // existing_revision is Some only when output is unchanged (can reuse revision)
544 let existing_revision = if let Some((CachedValue::Ok(old), rev)) =
545 self.get_cached_with_revision::<Q>(full_key)
546 {
547 if Q::output_eq(&*old, &*output) {
548 Some(rev) // Same output - reuse revision
549 } else {
550 None // Different output
551 }
552 } else {
553 None // No previous Ok value
554 };
555 let output_changed = existing_revision.is_none();
556
557 // Emit early cutoff check event
558 self.tracer.on_early_cutoff_check(
559 exec_ctx.span_ctx(),
560 query_cache_key,
561 output_changed,
562 );
563
564 // Update whale with cached entry (atomic update of value + dependency state)
565 let entry = CachedEntry::Ok(output.clone() as Arc<dyn std::any::Any + Send + Sync>);
566 let revision = if let Some(existing_rev) = existing_revision {
567 // confirm_unchanged doesn't change changed_at, use existing
568 let _ = self.whale.confirm_unchanged(full_key, deps);
569 existing_rev
570 } else {
571 // Use new_rev from register result
572 match self
573 .whale
574 .register(full_key.clone(), Some(entry), durability, deps)
575 {
576 Ok(result) => result.new_rev,
577 Err(missing) => {
578 return Err(QueryError::DependenciesRemoved {
579 missing_keys: missing,
580 })
581 }
582 }
583 };
584
585 // Register query in registry for list_queries
586 let is_new_query = self.query_registry.register(query);
587 if is_new_query {
588 let sentinel = QuerySetSentinelKey::new::<Q>().into();
589 let _ = self
590 .whale
591 .register(sentinel, None, Durability::stable(), vec![]);
592 }
593
594 // Store verifier for this query (for verify-then-decide pattern)
595 self.verifiers
596 .insert::<Q, T>(full_key.clone(), query.clone());
597
598 Ok((Ok(output), output_changed, revision))
599 }
600 Err(QueryError::UserError(err)) => {
601 // Check if error changed (for early cutoff)
602 // existing_revision is Some only when error is unchanged (can reuse revision)
603 let existing_revision = if let Some((CachedValue::UserError(old_err), rev)) =
604 self.get_cached_with_revision::<Q>(full_key)
605 {
606 if (self.error_comparator)(old_err.as_ref(), err.as_ref()) {
607 Some(rev) // Same error - reuse revision
608 } else {
609 None // Different error
610 }
611 } else {
612 None // No previous UserError
613 };
614 let output_changed = existing_revision.is_none();
615
616 // Emit early cutoff check event
617 self.tracer.on_early_cutoff_check(
618 exec_ctx.span_ctx(),
619 query_cache_key,
620 output_changed,
621 );
622
623 // Update whale with cached error (atomic update of value + dependency state)
624 let entry = CachedEntry::UserError(err.clone());
625 let revision = if let Some(existing_rev) = existing_revision {
626 // confirm_unchanged doesn't change changed_at, use existing
627 let _ = self.whale.confirm_unchanged(full_key, deps);
628 existing_rev
629 } else {
630 // Use new_rev from register result
631 match self
632 .whale
633 .register(full_key.clone(), Some(entry), durability, deps)
634 {
635 Ok(result) => result.new_rev,
636 Err(missing) => {
637 return Err(QueryError::DependenciesRemoved {
638 missing_keys: missing,
639 })
640 }
641 }
642 };
643
644 // Register query in registry for list_queries
645 let is_new_query = self.query_registry.register(query);
646 if is_new_query {
647 let sentinel = QuerySetSentinelKey::new::<Q>().into();
648 let _ = self
649 .whale
650 .register(sentinel, None, Durability::stable(), vec![]);
651 }
652
653 // Store verifier for this query (for verify-then-decide pattern)
654 self.verifiers
655 .insert::<Q, T>(full_key.clone(), query.clone());
656
657 Ok((Err(err), output_changed, revision))
658 }
659 Err(e) => {
660 // System errors (Suspend, Cycle, Cancelled) are not cached
661 Err(e)
662 }
663 }
664 }
665
666 /// Invalidate a query, forcing recomputation on next access.
667 ///
668 /// This also invalidates any queries that depend on this one.
669 pub fn invalidate<Q: Query>(&self, query: &Q) {
670 let query_cache_key = QueryCacheKey::new(query.clone());
671 let full_key: FullCacheKey = query_cache_key.clone().into();
672
673 self.tracer
674 .on_query_invalidated(&query_cache_key, InvalidationReason::ManualInvalidation);
675
676 // Update whale to invalidate dependents (register with None to clear cached value)
677 // Use stable durability to increment all revision counters, ensuring queries
678 // at any durability level will see this as a change.
679 let _ = self
680 .whale
681 .register(full_key, None, Durability::stable(), vec![]);
682 }
683
684 /// Remove a query from the cache entirely, freeing memory.
685 ///
686 /// Use this for GC when a query is no longer needed.
687 /// Unlike `invalidate`, this removes all traces of the query from storage.
688 /// The query will be recomputed from scratch on next access.
689 ///
690 /// This also invalidates any queries that depend on this one.
691 pub fn remove_query<Q: Query>(&self, query: &Q) {
692 let query_cache_key = QueryCacheKey::new(query.clone());
693 let full_key: FullCacheKey = query_cache_key.clone().into();
694
695 self.tracer
696 .on_query_invalidated(&query_cache_key, InvalidationReason::ManualInvalidation);
697
698 // Remove verifier if exists
699 self.verifiers.remove(&full_key);
700
701 // Remove from whale storage (this also handles dependent invalidation)
702 self.whale.remove(&full_key);
703
704 // Remove from registry and update sentinel for list_queries
705 if self.query_registry.remove::<Q>(query) {
706 let sentinel = QuerySetSentinelKey::new::<Q>().into();
707 let _ = self
708 .whale
709 .register(sentinel, None, Durability::stable(), vec![]);
710 }
711 }
712
713 /// Clear all cached values by removing all nodes from whale.
714 ///
715 /// Note: This is a relatively expensive operation as it iterates through all keys.
716 pub fn clear_cache(&self) {
717 let keys = self.whale.keys();
718 for key in keys {
719 self.whale.remove(&key);
720 }
721 }
722
723 /// Poll a query, returning both the result and its change revision.
724 ///
725 /// This is useful for implementing subscription patterns where you need to
726 /// detect changes efficiently. Compare the returned `revision` with a
727 /// previously stored value to determine if the query result has changed.
728 ///
729 /// The returned `Polled` contains a `Result<Arc<Q::Output>, Arc<anyhow::Error>>`
730 /// as its value, allowing you to track revision changes for both success and
731 /// user error cases.
732 ///
733 /// # Example
734 ///
735 /// ```ignore
736 /// struct Subscription<Q: Query> {
737 /// query: Q,
738 /// last_revision: RevisionCounter,
739 /// tx: Sender<Result<Arc<Q::Output>, Arc<anyhow::Error>>>,
740 /// }
741 ///
742 /// // Polling loop
743 /// for sub in &mut subscriptions {
744 /// let result = runtime.poll(sub.query.clone())?;
745 /// if result.revision > sub.last_revision {
746 /// sub.tx.send(result.value.clone())?;
747 /// sub.last_revision = result.revision;
748 /// }
749 /// }
750 /// ```
751 ///
752 /// # Errors
753 ///
754 /// Returns `Err` only for system errors (Suspend, Cycle, etc.).
755 /// User errors are returned as `Ok(Polled { value: Err(error), ... })`.
756 #[allow(clippy::type_complexity)]
757 pub fn poll<Q: Query>(
758 &self,
759 query: Q,
760 ) -> Result<Polled<Result<Arc<Q::Output>, Arc<anyhow::Error>>>, QueryError> {
761 let (value, revision) = self.query_internal(query)?;
762 Ok(Polled { value, revision })
763 }
764
765 /// Get the change revision of a query without executing it.
766 ///
767 /// Returns `None` if the query has never been executed.
768 ///
769 /// This is useful for checking if a query has changed since the last poll
770 /// without the cost of executing the query.
771 ///
772 /// # Example
773 ///
774 /// ```ignore
775 /// // Check if query has changed before deciding to poll
776 /// if let Some(rev) = runtime.changed_at(&MyQuery::new(key)) {
777 /// if rev > last_known_revision {
778 /// let result = runtime.query(MyQuery::new(key))?;
779 /// // Process result...
780 /// }
781 /// }
782 /// ```
783 pub fn changed_at<Q: Query>(&self, query: &Q) -> Option<RevisionCounter> {
784 let full_key = QueryCacheKey::new(query.clone()).into();
785 self.whale.get(&full_key).map(|node| node.changed_at)
786 }
787}
788
789// ============================================================================
790// GC (Garbage Collection) API
791// ============================================================================
792
793impl<T: Tracer> QueryRuntime<T> {
794 /// Get all query keys currently in the cache.
795 ///
796 /// This is useful for implementing custom garbage collection strategies.
797 /// Use this in combination with [`Tracer::on_query_key`] to track access
798 /// times and implement LRU, TTL, or other GC algorithms externally.
799 ///
800 /// # Example
801 ///
802 /// ```ignore
803 /// // Collect all keys that haven't been accessed recently
804 /// let stale_keys: Vec<_> = runtime.query_keys()
805 /// .filter(|key| tracker.is_stale(key))
806 /// .collect();
807 ///
808 /// // Remove stale queries that have no dependents
809 /// for key in stale_keys {
810 /// runtime.remove_if_unused(&key);
811 /// }
812 /// ```
813 pub fn query_keys(&self) -> Vec<FullCacheKey> {
814 self.whale.keys()
815 }
816
817 /// Remove a query if it has no dependents.
818 ///
819 /// Returns `true` if the query was removed, `false` if it has dependents
820 /// or doesn't exist. This is the safe way to remove queries during GC,
821 /// as it won't break queries that depend on this one.
822 ///
823 /// # Example
824 ///
825 /// ```ignore
826 /// let query = MyQuery::new(cache_key);
827 /// if runtime.remove_query_if_unused(&query) {
828 /// println!("Query removed");
829 /// } else {
830 /// println!("Query has dependents, not removed");
831 /// }
832 /// ```
833 pub fn remove_query_if_unused<Q: Query>(&self, query: &Q) -> bool {
834 let full_key = QueryCacheKey::new(query.clone()).into();
835 self.remove_if_unused(&full_key)
836 }
837
838 /// Remove a query by its [`FullCacheKey`].
839 ///
840 /// This is the type-erased version of [`remove_query`](Self::remove_query).
841 /// Use this when you have a `FullCacheKey` from [`query_keys`](Self::query_keys)
842 /// or [`Tracer::on_query_key`].
843 ///
844 /// Returns `true` if the query was removed, `false` if it doesn't exist.
845 ///
846 /// # Warning
847 ///
848 /// This forcibly removes the query even if other queries depend on it.
849 /// Dependent queries will be recomputed on next access. For safe GC,
850 /// use [`remove_if_unused`](Self::remove_if_unused) instead.
851 pub fn remove(&self, key: &FullCacheKey) -> bool {
852 // Remove verifier if exists
853 self.verifiers.remove(key);
854
855 // Remove from whale storage
856 self.whale.remove(key).is_some()
857 }
858
859 /// Remove a query by its [`FullCacheKey`] if it has no dependents.
860 ///
861 /// This is the type-erased version of [`remove_query_if_unused`](Self::remove_query_if_unused).
862 /// Use this when you have a `FullCacheKey` from [`query_keys`](Self::query_keys)
863 /// or [`Tracer::on_query_key`].
864 ///
865 /// Returns `true` if the query was removed, `false` if it has dependents
866 /// or doesn't exist.
867 ///
868 /// # Example
869 ///
870 /// ```ignore
871 /// // Implement LRU GC
872 /// for key in runtime.query_keys() {
873 /// if tracker.is_expired(&key) {
874 /// runtime.remove_if_unused(&key);
875 /// }
876 /// }
877 /// ```
878 pub fn remove_if_unused(&self, key: &FullCacheKey) -> bool {
879 if self.whale.remove_if_unused(key.clone()).is_some() {
880 // Successfully removed - clean up verifier
881 self.verifiers.remove(key);
882 true
883 } else {
884 false
885 }
886 }
887}
888
889// ============================================================================
890// Builder
891// ============================================================================
892
893/// Builder for [`QueryRuntime`] with customizable settings.
894///
895/// # Example
896///
897/// ```ignore
898/// let runtime = QueryRuntime::builder()
899/// .error_comparator(|a, b| {
900/// // Treat all errors of the same type as equal
901/// a.downcast_ref::<std::io::Error>().is_some()
902/// == b.downcast_ref::<std::io::Error>().is_some()
903/// })
904/// .build();
905/// ```
906pub struct QueryRuntimeBuilder<T: Tracer = NoopTracer> {
907 error_comparator: ErrorComparator,
908 tracer: T,
909}
910
911impl Default for QueryRuntimeBuilder<NoopTracer> {
912 fn default() -> Self {
913 Self::new()
914 }
915}
916
917impl QueryRuntimeBuilder<NoopTracer> {
918 /// Create a new builder with default settings.
919 pub fn new() -> Self {
920 Self {
921 error_comparator: default_error_comparator,
922 tracer: NoopTracer,
923 }
924 }
925}
926
927impl<T: Tracer> QueryRuntimeBuilder<T> {
928 /// Set the error comparator function for early cutoff optimization.
929 ///
930 /// When a query returns `QueryError::UserError`, this function is used
931 /// to compare it with the previously cached error. If they are equal,
932 /// downstream queries can skip recomputation (early cutoff).
933 ///
934 /// The default comparator returns `false` for all errors, meaning errors
935 /// are always considered different (conservative, always recomputes).
936 ///
937 /// # Example
938 ///
939 /// ```ignore
940 /// // Treat errors as equal if they have the same display message
941 /// let runtime = QueryRuntime::builder()
942 /// .error_comparator(|a, b| a.to_string() == b.to_string())
943 /// .build();
944 /// ```
945 pub fn error_comparator(mut self, f: ErrorComparator) -> Self {
946 self.error_comparator = f;
947 self
948 }
949
950 /// Set the tracer for observability.
951 pub fn tracer<U: Tracer>(self, tracer: U) -> QueryRuntimeBuilder<U> {
952 QueryRuntimeBuilder {
953 error_comparator: self.error_comparator,
954 tracer,
955 }
956 }
957
958 /// Build the runtime with the configured settings.
959 pub fn build(self) -> QueryRuntime<T> {
960 QueryRuntime {
961 whale: WhaleRuntime::new(),
962 locators: Arc::new(LocatorStorage::new()),
963 pending: Arc::new(PendingStorage::new()),
964 query_registry: Arc::new(QueryRegistry::new()),
965 asset_key_registry: Arc::new(AssetKeyRegistry::new()),
966 verifiers: Arc::new(VerifierStorage::new()),
967 error_comparator: self.error_comparator,
968 tracer: Arc::new(self.tracer),
969 }
970 }
971}
972
973// ============================================================================
974// Asset API
975// ============================================================================
976
977impl<T: Tracer> QueryRuntime<T> {
978 /// Register an asset locator for a specific asset key type.
979 ///
980 /// Only one locator can be registered per key type. Later registrations
981 /// replace earlier ones.
982 ///
983 /// # Example
984 ///
985 /// ```ignore
986 /// let runtime = QueryRuntime::new();
987 /// runtime.register_asset_locator(FileSystemLocator::new("/assets"));
988 /// ```
989 pub fn register_asset_locator<K, L>(&self, locator: L)
990 where
991 K: AssetKey,
992 L: AssetLocator<K>,
993 {
994 self.locators.insert::<K, L>(locator);
995 }
996
997 /// Get an iterator over pending asset requests.
998 ///
999 /// Returns assets that have been requested but not yet resolved.
1000 /// The user should fetch these externally and call `resolve_asset()`.
1001 ///
1002 /// # Example
1003 ///
1004 /// ```ignore
1005 /// for pending in runtime.pending_assets() {
1006 /// if let Some(path) = pending.key::<FilePath>() {
1007 /// let content = fetch_file(path);
1008 /// runtime.resolve_asset(path.clone(), content);
1009 /// }
1010 /// }
1011 /// ```
1012 pub fn pending_assets(&self) -> Vec<PendingAsset> {
1013 self.pending.get_all()
1014 }
1015
1016 /// Get pending assets filtered by key type.
1017 pub fn pending_assets_of<K: AssetKey>(&self) -> Vec<K> {
1018 self.pending.get_of_type::<K>()
1019 }
1020
1021 /// Check if there are any pending assets.
1022 pub fn has_pending_assets(&self) -> bool {
1023 !self.pending.is_empty()
1024 }
1025
1026 /// Resolve an asset with its loaded value.
1027 ///
1028 /// This marks the asset as ready and invalidates any queries that
1029 /// depend on it (if the value changed), triggering recomputation on next access.
1030 ///
1031 /// This method is idempotent - resolving with the same value (via `asset_eq`)
1032 /// will not trigger downstream recomputation.
1033 ///
1034 /// # Arguments
1035 ///
1036 /// * `key` - The asset key identifying this resource
1037 /// * `value` - The loaded asset value
1038 /// * `durability` - How frequently this asset is expected to change
1039 ///
1040 /// # Example
1041 ///
1042 /// ```ignore
1043 /// let content = std::fs::read_to_string(&path)?;
1044 /// runtime.resolve_asset(FilePath(path), content, DurabilityLevel::Volatile);
1045 /// ```
1046 pub fn resolve_asset<K: AssetKey>(&self, key: K, value: K::Asset, durability: DurabilityLevel) {
1047 self.resolve_asset_internal(key, value, durability);
1048 }
1049
1050 /// Resolve an asset with an error.
1051 ///
1052 /// This marks the asset as errored and caches the error. Queries depending
1053 /// on this asset will receive `Err(QueryError::UserError(...))`.
1054 ///
1055 /// Use this when async loading fails (e.g., network error, file not found,
1056 /// access denied).
1057 ///
1058 /// # Arguments
1059 ///
1060 /// * `key` - The asset key identifying this resource
1061 /// * `error` - The error to cache (will be wrapped in `Arc`)
1062 /// * `durability` - How frequently this error state is expected to change
1063 ///
1064 /// # Example
1065 ///
1066 /// ```ignore
1067 /// match fetch_file(&path) {
1068 /// Ok(content) => runtime.resolve_asset(FilePath(path), content, DurabilityLevel::Volatile),
1069 /// Err(e) => runtime.resolve_asset_error(FilePath(path), e, DurabilityLevel::Volatile),
1070 /// }
1071 /// ```
1072 pub fn resolve_asset_error<K: AssetKey>(
1073 &self,
1074 key: K,
1075 error: impl Into<anyhow::Error>,
1076 durability: DurabilityLevel,
1077 ) {
1078 let asset_cache_key = AssetCacheKey::new(key.clone());
1079
1080 // Remove from pending BEFORE registering the error
1081 self.pending.remove(&asset_cache_key);
1082
1083 // Prepare the error entry
1084 let error_arc = Arc::new(error.into());
1085 let entry = CachedEntry::AssetError(error_arc.clone());
1086 let durability =
1087 Durability::new(durability.as_u8() as usize).unwrap_or(Durability::volatile());
1088
1089 // Atomic compare-and-update (errors are always considered changed for now)
1090 let result = self
1091 .whale
1092 .update_with_compare(
1093 asset_cache_key.into(),
1094 Some(entry),
1095 |old_data, _new_data| {
1096 // Compare old and new errors using error_comparator
1097 match old_data.and_then(|d| d.as_ref()) {
1098 Some(CachedEntry::AssetError(old_err)) => {
1099 !(self.error_comparator)(old_err.as_ref(), error_arc.as_ref())
1100 }
1101 _ => true, // Loading, Ready, or not present -> changed
1102 }
1103 },
1104 durability,
1105 vec![],
1106 )
1107 .expect("update_with_compare with no dependencies cannot fail");
1108
1109 // Emit asset resolved event (with changed status)
1110 let asset_cache_key = AssetCacheKey::new(key.clone());
1111 self.tracer
1112 .on_asset_resolved(&asset_cache_key, result.changed);
1113
1114 // Register asset key in registry for list_asset_keys
1115 let is_new_asset = self.asset_key_registry.register(&key);
1116 if is_new_asset {
1117 // Update sentinel to invalidate list_asset_keys dependents
1118 let sentinel = AssetKeySetSentinelKey::new::<K>().into();
1119 let _ = self
1120 .whale
1121 .register(sentinel, None, Durability::stable(), vec![]);
1122 }
1123 }
1124
1125 fn resolve_asset_internal<K: AssetKey>(
1126 &self,
1127 key: K,
1128 value: K::Asset,
1129 durability_level: DurabilityLevel,
1130 ) {
1131 let asset_cache_key = AssetCacheKey::new(key.clone());
1132
1133 // Remove from pending BEFORE registering the value
1134 self.pending.remove(&asset_cache_key);
1135
1136 // Prepare the new entry
1137 let value_arc: Arc<K::Asset> = Arc::new(value);
1138 let entry = CachedEntry::AssetReady(value_arc.clone() as Arc<dyn Any + Send + Sync>);
1139 let durability =
1140 Durability::new(durability_level.as_u8() as usize).unwrap_or(Durability::volatile());
1141
1142 // Atomic compare-and-update
1143 let result = self
1144 .whale
1145 .update_with_compare(
1146 asset_cache_key.into(),
1147 Some(entry),
1148 |old_data, _new_data| {
1149 // Compare old and new values
1150 match old_data.and_then(|d| d.as_ref()) {
1151 Some(CachedEntry::AssetReady(old_arc)) => {
1152 match old_arc.clone().downcast::<K::Asset>() {
1153 Ok(old_value) => !K::asset_eq(&old_value, &value_arc),
1154 Err(_) => true, // Type mismatch, treat as changed
1155 }
1156 }
1157 _ => true, // Loading, NotFound, or not present -> changed
1158 }
1159 },
1160 durability,
1161 vec![],
1162 )
1163 .expect("update_with_compare with no dependencies cannot fail");
1164
1165 // Emit asset resolved event
1166 let asset_cache_key = AssetCacheKey::new(key.clone());
1167 self.tracer
1168 .on_asset_resolved(&asset_cache_key, result.changed);
1169
1170 // Register asset key in registry for list_asset_keys
1171 let is_new_asset = self.asset_key_registry.register(&key);
1172 if is_new_asset {
1173 // Update sentinel to invalidate list_asset_keys dependents
1174 let sentinel = AssetKeySetSentinelKey::new::<K>().into();
1175 let _ = self
1176 .whale
1177 .register(sentinel, None, Durability::stable(), vec![]);
1178 }
1179 }
1180
1181 /// Invalidate an asset, forcing queries to re-request it.
1182 ///
1183 /// The asset will be marked as loading and added to pending assets.
1184 /// Dependent queries will suspend until the asset is resolved again.
1185 ///
1186 /// # Example
1187 ///
1188 /// ```ignore
1189 /// // File was modified externally
1190 /// runtime.invalidate_asset(&FilePath("config.json".into()));
1191 /// // Queries depending on this asset will now suspend
1192 /// // User should fetch the new value and call resolve_asset
1193 /// ```
1194 pub fn invalidate_asset<K: AssetKey>(&self, key: &K) {
1195 let asset_cache_key = AssetCacheKey::new(key.clone());
1196 let full_cache_key: FullCacheKey = asset_cache_key.clone().into();
1197
1198 // Emit asset invalidated event
1199 self.tracer.on_asset_invalidated(&asset_cache_key);
1200
1201 // Add to pending FIRST (before clearing whale state)
1202 // This ensures: readers see either old value, or Loading+pending
1203 self.pending
1204 .insert::<K>(asset_cache_key.clone(), key.clone());
1205
1206 // Atomic: clear cached value + invalidate dependents
1207 // Using None for data means "needs to be loaded"
1208 // Use stable durability to ensure queries at any durability level see the change.
1209 let _ = self
1210 .whale
1211 .register(full_cache_key, None, Durability::stable(), vec![]);
1212 }
1213
1214 /// Remove an asset from the cache entirely.
1215 ///
1216 /// Unlike `invalidate_asset`, this removes all traces of the asset.
1217 /// Dependent queries will go through the locator again on next access.
1218 pub fn remove_asset<K: AssetKey>(&self, key: &K) {
1219 let asset_cache_key = AssetCacheKey::new(key.clone());
1220 let full_cache_key: FullCacheKey = asset_cache_key.clone().into();
1221
1222 // Remove from pending first
1223 self.pending.remove(&asset_cache_key);
1224
1225 // Remove from whale (this also cleans up dependency edges)
1226 // whale.remove() invalidates dependents before removing
1227 self.whale.remove(&full_cache_key);
1228
1229 // Remove from registry and update sentinel for list_asset_keys
1230 if self.asset_key_registry.remove::<K>(key) {
1231 let sentinel = AssetKeySetSentinelKey::new::<K>().into();
1232 let _ = self
1233 .whale
1234 .register(sentinel, None, Durability::stable(), vec![]);
1235 }
1236 }
1237
1238 /// Get an asset by key without tracking dependencies.
1239 ///
1240 /// Unlike `QueryContext::asset()`, this method does NOT register the caller
1241 /// as a dependent of the asset. Use this for direct asset access outside
1242 /// of query execution.
1243 ///
1244 /// # Returns
1245 ///
1246 /// - `Ok(AssetLoadingState::ready(...))` - Asset is loaded and ready
1247 /// - `Ok(AssetLoadingState::loading(...))` - Asset is still loading (added to pending)
1248 /// - `Err(QueryError::UserError)` - Asset was not found or locator returned an error
1249 pub fn get_asset<K: AssetKey>(&self, key: K) -> Result<AssetLoadingState<K>, QueryError> {
1250 self.get_asset_internal(key)
1251 }
1252
1253 /// Internal: Get asset state and its changed_at revision atomically.
1254 ///
1255 /// This is the "direct" version called from QueryRuntime::asset (no dependency tracking).
1256 /// For calls from QueryContext::asset, use `get_asset_with_revision_ctx`.
1257 ///
1258 /// Returns (AssetLoadingState, changed_at) where changed_at is from the same
1259 /// whale node that provided the asset value.
1260 fn get_asset_with_revision<K: AssetKey>(
1261 &self,
1262 key: K,
1263 ) -> Result<(AssetLoadingState<K>, RevisionCounter), QueryError> {
1264 let asset_cache_key = AssetCacheKey::new(key.clone());
1265 let full_cache_key: FullCacheKey = asset_cache_key.clone().into();
1266
1267 // Create a span for this asset request (like queries do)
1268 // This ensures child queries called from locators show as children of this asset
1269 let asset_span_id = self.tracer.new_span_id();
1270 let (trace_id, parent_span_id) = SPAN_STACK.with(|stack| match &*stack.borrow() {
1271 SpanStack::Empty => (self.tracer.new_trace_id(), None),
1272 SpanStack::Active(tid, spans) => (*tid, spans.last().copied()),
1273 });
1274 let span_ctx = SpanContext {
1275 span_id: asset_span_id,
1276 trace_id,
1277 parent_span_id,
1278 };
1279
1280 // Push asset span to stack so child queries see this asset as their parent
1281 let _span_guard = SpanStackGuard::push(trace_id, asset_span_id);
1282
1283 // Check whale cache first (single atomic read)
1284 if let Some(node) = self.whale.get(&full_cache_key) {
1285 let changed_at = node.changed_at;
1286 // Check if valid at current revision (shallow check)
1287 if self.whale.is_valid(&full_cache_key) {
1288 // Verify dependencies recursively (like query path does)
1289 let mut deps_verified = true;
1290 if let Some(deps) = self.whale.get_dependency_ids(&full_cache_key) {
1291 for dep in deps {
1292 if let Some(verifier) = self.verifiers.get(&dep) {
1293 // Re-run query/asset to verify it (triggers recursive verification)
1294 if verifier.verify(self as &dyn std::any::Any).is_err() {
1295 deps_verified = false;
1296 break;
1297 }
1298 }
1299 }
1300 }
1301
1302 // Re-check validity after deps are verified
1303 if deps_verified && self.whale.is_valid(&full_cache_key) {
1304 // For cached entries, check consistency for leaf assets (no locator deps).
1305 // This detects if resolve_asset/resolve_asset_error was called during query execution.
1306 let has_locator_deps = self
1307 .whale
1308 .get_dependency_ids(&full_cache_key)
1309 .is_some_and(|deps| !deps.is_empty());
1310
1311 match &node.data {
1312 Some(CachedEntry::AssetReady(arc)) => {
1313 // Check consistency for cached leaf assets
1314 if !has_locator_deps {
1315 check_leaf_asset_consistency(changed_at)?;
1316 }
1317 // Cache hit: start + end immediately (no locator runs)
1318 self.tracer.on_asset_requested(&span_ctx, &asset_cache_key);
1319 self.tracer.on_asset_located(
1320 &span_ctx,
1321 &asset_cache_key,
1322 TracerAssetState::Ready,
1323 );
1324 match arc.clone().downcast::<K::Asset>() {
1325 Ok(value) => {
1326 return Ok((AssetLoadingState::ready(key, value), changed_at))
1327 }
1328 Err(_) => {
1329 unreachable!("Asset type mismatch: {:?}", key)
1330 }
1331 }
1332 }
1333 Some(CachedEntry::AssetError(err)) => {
1334 // Check consistency for cached leaf errors
1335 if !has_locator_deps {
1336 check_leaf_asset_consistency(changed_at)?;
1337 }
1338 // Cache hit: start + end immediately (no locator runs)
1339 self.tracer.on_asset_requested(&span_ctx, &asset_cache_key);
1340 self.tracer.on_asset_located(
1341 &span_ctx,
1342 &asset_cache_key,
1343 TracerAssetState::NotFound,
1344 );
1345 return Err(QueryError::UserError(err.clone()));
1346 }
1347 None => {
1348 // Loading state - no value to be inconsistent with
1349 // Cache hit: start + end immediately (no locator runs)
1350 self.tracer.on_asset_requested(&span_ctx, &asset_cache_key);
1351 self.tracer.on_asset_located(
1352 &span_ctx,
1353 &asset_cache_key,
1354 TracerAssetState::Loading,
1355 );
1356 return Ok((AssetLoadingState::loading(key), changed_at));
1357 }
1358 _ => {
1359 // Query-related entries (Ok, UserError) shouldn't be here
1360 // Fall through to locator
1361 }
1362 }
1363 }
1364 }
1365 }
1366
1367 // Not in cache or invalid - try locator
1368 // Use LocatorContext to track deps on the asset itself
1369 check_cycle(&full_cache_key)?;
1370 let _guard = StackGuard::push(full_cache_key.clone());
1371
1372 // Notify tracer BEFORE locator runs (START event) so child queries appear as children
1373 self.tracer.on_asset_requested(&span_ctx, &asset_cache_key);
1374
1375 let locator_ctx = LocatorContext::new(self, full_cache_key.clone());
1376 let locator_result =
1377 self.locators
1378 .locate_with_locator_ctx(TypeId::of::<K>(), &locator_ctx, &key);
1379
1380 if let Some(result) = locator_result {
1381 // Get collected dependencies from the locator context
1382 let locator_deps = locator_ctx.into_deps();
1383 match result {
1384 Ok(ErasedLocateResult::Ready {
1385 value: arc,
1386 durability: durability_level,
1387 }) => {
1388 // END event after locator completes
1389 self.tracer.on_asset_located(
1390 &span_ctx,
1391 &asset_cache_key,
1392 TracerAssetState::Ready,
1393 );
1394
1395 let typed_value: Arc<K::Asset> = match arc.downcast::<K::Asset>() {
1396 Ok(v) => v,
1397 Err(_) => {
1398 unreachable!("Asset type mismatch: {:?}", key);
1399 }
1400 };
1401
1402 // Store in whale atomically with early cutoff
1403 // Include locator's dependencies so the asset is invalidated when they change
1404 let entry = CachedEntry::AssetReady(typed_value.clone());
1405 let durability = Durability::new(durability_level.as_u8() as usize)
1406 .unwrap_or(Durability::volatile());
1407 let new_value = typed_value.clone();
1408 let result = self
1409 .whale
1410 .update_with_compare(
1411 full_cache_key.clone(),
1412 Some(entry),
1413 |old_data, _new_data| {
1414 let Some(CachedEntry::AssetReady(old_arc)) =
1415 old_data.and_then(|d| d.as_ref())
1416 else {
1417 return true;
1418 };
1419 let Ok(old_value) = old_arc.clone().downcast::<K::Asset>() else {
1420 return true;
1421 };
1422 !K::asset_eq(&old_value, &new_value)
1423 },
1424 durability,
1425 locator_deps,
1426 )
1427 .expect("update_with_compare should succeed");
1428
1429 // Register verifier for this asset (for verify-then-decide pattern)
1430 self.verifiers
1431 .insert_asset::<K, T>(full_cache_key, key.clone());
1432
1433 return Ok((AssetLoadingState::ready(key, typed_value), result.revision));
1434 }
1435 Ok(ErasedLocateResult::Pending) => {
1436 // END event after locator completes with pending
1437 self.tracer.on_asset_located(
1438 &span_ctx,
1439 &asset_cache_key,
1440 TracerAssetState::Loading,
1441 );
1442
1443 // Add to pending list for Pending result
1444 self.pending
1445 .insert::<K>(asset_cache_key.clone(), key.clone());
1446 match self
1447 .whale
1448 .get_or_insert(full_cache_key, None, Durability::volatile(), locator_deps)
1449 .expect("get_or_insert should succeed")
1450 {
1451 GetOrInsertResult::Inserted(node) => {
1452 return Ok((AssetLoadingState::loading(key), node.changed_at));
1453 }
1454 GetOrInsertResult::Existing(node) => {
1455 let changed_at = node.changed_at;
1456 match &node.data {
1457 Some(CachedEntry::AssetReady(arc)) => {
1458 match arc.clone().downcast::<K::Asset>() {
1459 Ok(value) => {
1460 return Ok((
1461 AssetLoadingState::ready(key, value),
1462 changed_at,
1463 ))
1464 }
1465 Err(_) => {
1466 return Ok((
1467 AssetLoadingState::loading(key),
1468 changed_at,
1469 ))
1470 }
1471 }
1472 }
1473 Some(CachedEntry::AssetError(err)) => {
1474 return Err(QueryError::UserError(err.clone()));
1475 }
1476 _ => return Ok((AssetLoadingState::loading(key), changed_at)),
1477 }
1478 }
1479 }
1480 }
1481 Err(QueryError::UserError(err)) => {
1482 // END event after locator completes with error
1483 self.tracer.on_asset_located(
1484 &span_ctx,
1485 &asset_cache_key,
1486 TracerAssetState::NotFound,
1487 );
1488 // Locator returned a user error - cache it as AssetError
1489 let entry = CachedEntry::AssetError(err.clone());
1490 let _ = self.whale.register(
1491 full_cache_key,
1492 Some(entry),
1493 Durability::volatile(),
1494 locator_deps,
1495 );
1496 return Err(QueryError::UserError(err));
1497 }
1498 Err(e) => {
1499 // Other errors (Cycle, Suspended, etc.) - do NOT cache, propagate directly
1500 return Err(e);
1501 }
1502 }
1503 }
1504
1505 // No locator registered or locator returned None - mark as pending
1506 // (no locator was called, so no deps to track)
1507 // END event - no locator ran
1508 self.tracer
1509 .on_asset_located(&span_ctx, &asset_cache_key, TracerAssetState::Loading);
1510 self.pending
1511 .insert::<K>(asset_cache_key.clone(), key.clone());
1512
1513 match self
1514 .whale
1515 .get_or_insert(full_cache_key, None, Durability::volatile(), vec![])
1516 .expect("get_or_insert with no dependencies cannot fail")
1517 {
1518 GetOrInsertResult::Inserted(node) => {
1519 Ok((AssetLoadingState::loading(key), node.changed_at))
1520 }
1521 GetOrInsertResult::Existing(node) => {
1522 let changed_at = node.changed_at;
1523 match &node.data {
1524 Some(CachedEntry::AssetReady(arc)) => {
1525 match arc.clone().downcast::<K::Asset>() {
1526 Ok(value) => Ok((AssetLoadingState::ready(key, value), changed_at)),
1527 Err(_) => Ok((AssetLoadingState::loading(key), changed_at)),
1528 }
1529 }
1530 Some(CachedEntry::AssetError(err)) => Err(QueryError::UserError(err.clone())),
1531 _ => Ok((AssetLoadingState::loading(key), changed_at)),
1532 }
1533 }
1534 }
1535 }
1536
1537 /// Internal: Get asset state and its changed_at revision atomically (with QueryContext).
1538 ///
1539 /// This version is called from QueryContext::asset. Consistency checking for
1540 /// cached leaf assets is done inside this function before returning.
1541 fn get_asset_with_revision_ctx<K: AssetKey>(
1542 &self,
1543 key: K,
1544 _ctx: &QueryContext<'_, T>,
1545 ) -> Result<(AssetLoadingState<K>, RevisionCounter), QueryError> {
1546 let asset_cache_key = AssetCacheKey::new(key.clone());
1547 let full_cache_key: FullCacheKey = asset_cache_key.clone().into();
1548
1549 // Create a span for this asset request (like queries do)
1550 // This ensures child queries called from locators show as children of this asset
1551 let asset_span_id = self.tracer.new_span_id();
1552 let (trace_id, parent_span_id) = SPAN_STACK.with(|stack| match &*stack.borrow() {
1553 SpanStack::Empty => (self.tracer.new_trace_id(), None),
1554 SpanStack::Active(tid, spans) => (*tid, spans.last().copied()),
1555 });
1556 let span_ctx = SpanContext {
1557 span_id: asset_span_id,
1558 trace_id,
1559 parent_span_id,
1560 };
1561
1562 // Push asset span to stack so child queries see this asset as their parent
1563 let _span_guard = SpanStackGuard::push(trace_id, asset_span_id);
1564
1565 // Check whale cache first (single atomic read)
1566 if let Some(node) = self.whale.get(&full_cache_key) {
1567 let changed_at = node.changed_at;
1568 // Check if valid at current revision (shallow check)
1569 if self.whale.is_valid(&full_cache_key) {
1570 // Verify dependencies recursively (like query path does)
1571 let mut deps_verified = true;
1572 if let Some(deps) = self.whale.get_dependency_ids(&full_cache_key) {
1573 for dep in deps {
1574 if let Some(verifier) = self.verifiers.get(&dep) {
1575 // Re-run query/asset to verify it (triggers recursive verification)
1576 if verifier.verify(self as &dyn std::any::Any).is_err() {
1577 deps_verified = false;
1578 break;
1579 }
1580 }
1581 }
1582 }
1583
1584 // Re-check validity after deps are verified
1585 if deps_verified && self.whale.is_valid(&full_cache_key) {
1586 // For cached entries, check consistency for leaf assets (no locator deps).
1587 // This detects if resolve_asset/resolve_asset_error was called during query execution.
1588 let has_locator_deps = self
1589 .whale
1590 .get_dependency_ids(&full_cache_key)
1591 .is_some_and(|deps| !deps.is_empty());
1592
1593 match &node.data {
1594 Some(CachedEntry::AssetReady(arc)) => {
1595 // Check consistency for cached leaf assets
1596 if !has_locator_deps {
1597 check_leaf_asset_consistency(changed_at)?;
1598 }
1599 // Cache hit: start + end immediately (no locator runs)
1600 self.tracer.on_asset_requested(&span_ctx, &asset_cache_key);
1601 self.tracer.on_asset_located(
1602 &span_ctx,
1603 &asset_cache_key,
1604 TracerAssetState::Ready,
1605 );
1606 match arc.clone().downcast::<K::Asset>() {
1607 Ok(value) => {
1608 return Ok((AssetLoadingState::ready(key, value), changed_at))
1609 }
1610 Err(_) => {
1611 unreachable!("Asset type mismatch: {:?}", key)
1612 }
1613 }
1614 }
1615 Some(CachedEntry::AssetError(err)) => {
1616 // Check consistency for cached leaf errors
1617 if !has_locator_deps {
1618 check_leaf_asset_consistency(changed_at)?;
1619 }
1620 // Cache hit: start + end immediately (no locator runs)
1621 self.tracer.on_asset_requested(&span_ctx, &asset_cache_key);
1622 self.tracer.on_asset_located(
1623 &span_ctx,
1624 &asset_cache_key,
1625 TracerAssetState::NotFound,
1626 );
1627 return Err(QueryError::UserError(err.clone()));
1628 }
1629 None => {
1630 // Loading state - no value to be inconsistent with
1631 // Cache hit: start + end immediately (no locator runs)
1632 self.tracer.on_asset_requested(&span_ctx, &asset_cache_key);
1633 self.tracer.on_asset_located(
1634 &span_ctx,
1635 &asset_cache_key,
1636 TracerAssetState::Loading,
1637 );
1638 return Ok((AssetLoadingState::loading(key), changed_at));
1639 }
1640 _ => {
1641 // Query-related entries (Ok, UserError) shouldn't be here
1642 // Fall through to locator
1643 }
1644 }
1645 }
1646 }
1647 }
1648
1649 // Not in cache or invalid - try locator
1650 // Use LocatorContext to track deps on the asset itself (not the calling query)
1651 // Consistency tracking is handled via thread-local storage
1652 check_cycle(&full_cache_key)?;
1653 let _guard = StackGuard::push(full_cache_key.clone());
1654
1655 // START event before locator runs
1656 self.tracer.on_asset_requested(&span_ctx, &asset_cache_key);
1657
1658 let locator_ctx = LocatorContext::new(self, full_cache_key.clone());
1659 let locator_result =
1660 self.locators
1661 .locate_with_locator_ctx(TypeId::of::<K>(), &locator_ctx, &key);
1662
1663 if let Some(result) = locator_result {
1664 // Get collected dependencies from the locator context
1665 let locator_deps = locator_ctx.into_deps();
1666 match result {
1667 Ok(ErasedLocateResult::Ready {
1668 value: arc,
1669 durability: durability_level,
1670 }) => {
1671 // END event after locator completes
1672 self.tracer.on_asset_located(
1673 &span_ctx,
1674 &asset_cache_key,
1675 TracerAssetState::Ready,
1676 );
1677
1678 let typed_value: Arc<K::Asset> = match arc.downcast::<K::Asset>() {
1679 Ok(v) => v,
1680 Err(_) => {
1681 unreachable!("Asset type mismatch: {:?}", key);
1682 }
1683 };
1684
1685 // Store in whale atomically with early cutoff
1686 // Include locator's dependencies so the asset is invalidated when they change
1687 let entry = CachedEntry::AssetReady(typed_value.clone());
1688 let durability = Durability::new(durability_level.as_u8() as usize)
1689 .unwrap_or(Durability::volatile());
1690 let new_value = typed_value.clone();
1691 let result = self
1692 .whale
1693 .update_with_compare(
1694 full_cache_key.clone(),
1695 Some(entry),
1696 |old_data, _new_data| {
1697 let Some(CachedEntry::AssetReady(old_arc)) =
1698 old_data.and_then(|d| d.as_ref())
1699 else {
1700 return true;
1701 };
1702 let Ok(old_value) = old_arc.clone().downcast::<K::Asset>() else {
1703 return true;
1704 };
1705 !K::asset_eq(&old_value, &new_value)
1706 },
1707 durability,
1708 locator_deps,
1709 )
1710 .expect("update_with_compare should succeed");
1711
1712 // Register verifier for this asset (for verify-then-decide pattern)
1713 self.verifiers
1714 .insert_asset::<K, T>(full_cache_key, key.clone());
1715
1716 return Ok((AssetLoadingState::ready(key, typed_value), result.revision));
1717 }
1718 Ok(ErasedLocateResult::Pending) => {
1719 // END event after locator completes with pending
1720 self.tracer.on_asset_located(
1721 &span_ctx,
1722 &asset_cache_key,
1723 TracerAssetState::Loading,
1724 );
1725
1726 // Add to pending list for Pending result
1727 self.pending
1728 .insert::<K>(asset_cache_key.clone(), key.clone());
1729 match self
1730 .whale
1731 .get_or_insert(full_cache_key, None, Durability::volatile(), locator_deps)
1732 .expect("get_or_insert should succeed")
1733 {
1734 GetOrInsertResult::Inserted(node) => {
1735 return Ok((AssetLoadingState::loading(key), node.changed_at));
1736 }
1737 GetOrInsertResult::Existing(node) => {
1738 let changed_at = node.changed_at;
1739 match &node.data {
1740 Some(CachedEntry::AssetReady(arc)) => {
1741 match arc.clone().downcast::<K::Asset>() {
1742 Ok(value) => {
1743 return Ok((
1744 AssetLoadingState::ready(key, value),
1745 changed_at,
1746 ));
1747 }
1748 Err(_) => {
1749 return Ok((
1750 AssetLoadingState::loading(key),
1751 changed_at,
1752 ))
1753 }
1754 }
1755 }
1756 Some(CachedEntry::AssetError(err)) => {
1757 return Err(QueryError::UserError(err.clone()));
1758 }
1759 _ => return Ok((AssetLoadingState::loading(key), changed_at)),
1760 }
1761 }
1762 }
1763 }
1764 Err(QueryError::UserError(err)) => {
1765 // END event after locator completes with error
1766 self.tracer.on_asset_located(
1767 &span_ctx,
1768 &asset_cache_key,
1769 TracerAssetState::NotFound,
1770 );
1771 // Locator returned a user error - cache it as AssetError
1772 let entry = CachedEntry::AssetError(err.clone());
1773 let _ = self.whale.register(
1774 full_cache_key,
1775 Some(entry),
1776 Durability::volatile(),
1777 locator_deps,
1778 );
1779 return Err(QueryError::UserError(err));
1780 }
1781 Err(e) => {
1782 // Other errors (Cycle, Suspended, etc.) - do NOT cache, propagate directly
1783 return Err(e);
1784 }
1785 }
1786 }
1787
1788 // No locator registered or locator returned None - mark as pending
1789 // END event - no locator ran
1790 self.tracer
1791 .on_asset_located(&span_ctx, &asset_cache_key, TracerAssetState::Loading);
1792 self.pending
1793 .insert::<K>(asset_cache_key.clone(), key.clone());
1794
1795 match self
1796 .whale
1797 .get_or_insert(full_cache_key, None, Durability::volatile(), vec![])
1798 .expect("get_or_insert with no dependencies cannot fail")
1799 {
1800 GetOrInsertResult::Inserted(node) => {
1801 Ok((AssetLoadingState::loading(key), node.changed_at))
1802 }
1803 GetOrInsertResult::Existing(node) => {
1804 let changed_at = node.changed_at;
1805 match &node.data {
1806 Some(CachedEntry::AssetReady(arc)) => {
1807 match arc.clone().downcast::<K::Asset>() {
1808 Ok(value) => Ok((AssetLoadingState::ready(key, value), changed_at)),
1809 Err(_) => Ok((AssetLoadingState::loading(key), changed_at)),
1810 }
1811 }
1812 Some(CachedEntry::AssetError(err)) => Err(QueryError::UserError(err.clone())),
1813 _ => Ok((AssetLoadingState::loading(key), changed_at)),
1814 }
1815 }
1816 }
1817 }
1818
1819 /// Internal: Get asset state, checking cache and locator.
1820 fn get_asset_internal<K: AssetKey>(&self, key: K) -> Result<AssetLoadingState<K>, QueryError> {
1821 self.get_asset_with_revision(key).map(|(state, _)| state)
1822 }
1823}
1824
1825impl<T: Tracer> Db for QueryRuntime<T> {
1826 fn query<Q: Query>(&self, query: Q) -> Result<Arc<Q::Output>, QueryError> {
1827 QueryRuntime::query(self, query)
1828 }
1829
1830 fn asset<K: AssetKey>(&self, key: K) -> Result<Arc<K::Asset>, QueryError> {
1831 self.get_asset_internal(key)?.suspend()
1832 }
1833
1834 fn asset_state<K: AssetKey>(&self, key: K) -> Result<AssetLoadingState<K>, QueryError> {
1835 self.get_asset_internal(key)
1836 }
1837
1838 fn list_queries<Q: Query>(&self) -> Vec<Q> {
1839 self.query_registry.get_all::<Q>()
1840 }
1841
1842 fn list_asset_keys<K: AssetKey>(&self) -> Vec<K> {
1843 self.asset_key_registry.get_all::<K>()
1844 }
1845}
1846
1847/// Tracks consistency of leaf asset accesses during query execution.
1848///
1849/// A "leaf" asset is one without dependencies (externally resolved via `resolve_asset`).
1850/// This tracker ensures that all leaf assets accessed during a query execution
1851/// (including those accessed by locators) are consistent - i.e., none were modified
1852/// via `resolve_asset` mid-execution.
1853///
1854/// The tracker is shared across `QueryContext` and `LocatorContext` to propagate
1855/// consistency checking through the entire execution tree.
1856#[derive(Debug)]
1857pub(crate) struct ConsistencyTracker {
1858 /// Global revision at query start. All leaf assets must have changed_at <= this.
1859 start_revision: RevisionCounter,
1860}
1861
1862impl ConsistencyTracker {
1863 /// Create a new tracker with the given start revision.
1864 pub fn new(start_revision: RevisionCounter) -> Self {
1865 Self { start_revision }
1866 }
1867
1868 /// Check consistency for a leaf asset access.
1869 ///
1870 /// A leaf asset is consistent if its changed_at <= start_revision.
1871 /// This detects if resolve_asset was called during query execution.
1872 ///
1873 /// Returns Ok(()) if consistent, Err if inconsistent.
1874 pub fn check_leaf_asset(&self, dep_changed_at: RevisionCounter) -> Result<(), QueryError> {
1875 if dep_changed_at > self.start_revision {
1876 Err(QueryError::InconsistentAssetResolution)
1877 } else {
1878 Ok(())
1879 }
1880 }
1881}
1882
1883/// Context provided to queries during execution.
1884///
1885/// Use this to access dependencies via `query()`.
1886pub struct QueryContext<'a, T: Tracer = NoopTracer> {
1887 runtime: &'a QueryRuntime<T>,
1888 current_key: FullCacheKey,
1889 exec_ctx: ExecutionContext,
1890 deps: RefCell<Vec<FullCacheKey>>,
1891}
1892
1893impl<'a, T: Tracer> QueryContext<'a, T> {
1894 /// Query a dependency.
1895 ///
1896 /// The dependency is automatically tracked for invalidation.
1897 ///
1898 /// # Example
1899 ///
1900 /// ```ignore
1901 /// fn query(self, db: &impl Db) -> Result<Self::Output, QueryError> {
1902 /// let dep_result = db.query(OtherQuery { id: self.id })?;
1903 /// Ok(process(&dep_result))
1904 /// }
1905 /// ```
1906 pub fn query<Q: Query>(&self, query: Q) -> Result<Arc<Q::Output>, QueryError> {
1907 let full_key: FullCacheKey = QueryCacheKey::new(query.clone()).into();
1908
1909 // Emit dependency registered event
1910 self.runtime.tracer.on_dependency_registered(
1911 self.exec_ctx.span_ctx(),
1912 &self.current_key,
1913 &full_key,
1914 );
1915
1916 // Record this as a dependency
1917 self.deps.borrow_mut().push(full_key);
1918
1919 // Execute the query
1920 self.runtime.query(query)
1921 }
1922
1923 /// Access an asset, tracking it as a dependency.
1924 ///
1925 /// Returns the asset value if ready, or `Err(QueryError::Suspend)` if still loading.
1926 /// Use this with the `?` operator for automatic suspension on loading.
1927 ///
1928 /// # Example
1929 ///
1930 /// ```ignore
1931 /// #[query]
1932 /// fn process_file(db: &impl Db, path: FilePath) -> Result<Output, QueryError> {
1933 /// let content = db.asset(path)?;
1934 /// // Process content...
1935 /// Ok(output)
1936 /// }
1937 /// ```
1938 ///
1939 /// # Errors
1940 ///
1941 /// - Returns `Err(QueryError::Suspend)` if the asset is still loading.
1942 /// - Returns `Err(QueryError::UserError)` if the asset was not found.
1943 pub fn asset<K: AssetKey>(&self, key: K) -> Result<Arc<K::Asset>, QueryError> {
1944 self.asset_state(key)?.suspend()
1945 }
1946
1947 /// Access an asset's loading state, tracking it as a dependency.
1948 ///
1949 /// Unlike [`asset()`](Self::asset), this method returns the full loading state,
1950 /// allowing you to check if an asset is loading without triggering suspension.
1951 ///
1952 /// # Example
1953 ///
1954 /// ```ignore
1955 /// let state = db.asset_state(key)?;
1956 /// if state.is_loading() {
1957 /// // Handle loading case explicitly
1958 /// } else {
1959 /// let value = state.get().unwrap();
1960 /// }
1961 /// ```
1962 ///
1963 /// # Errors
1964 ///
1965 /// Returns `Err(QueryError::UserError)` if the asset was not found.
1966 pub fn asset_state<K: AssetKey>(&self, key: K) -> Result<AssetLoadingState<K>, QueryError> {
1967 let full_cache_key: FullCacheKey = AssetCacheKey::new(key.clone()).into();
1968
1969 // 1. Emit asset dependency registered event
1970 self.runtime.tracer.on_asset_dependency_registered(
1971 self.exec_ctx.span_ctx(),
1972 &self.current_key,
1973 &full_cache_key,
1974 );
1975
1976 // 2. Record dependency on this asset
1977 self.deps.borrow_mut().push(full_cache_key);
1978
1979 // 3. Get asset from cache/locator
1980 // Consistency checking for cached leaf assets is done inside get_asset_with_revision_ctx
1981 let (state, _changed_at) = self.runtime.get_asset_with_revision_ctx(key, self)?;
1982
1983 Ok(state)
1984 }
1985
1986 /// List all query instances of type Q that have been registered.
1987 ///
1988 /// This method establishes a dependency on the "set" of queries of type Q.
1989 /// The calling query will be invalidated when:
1990 /// - A new query of type Q is first executed (added to set)
1991 ///
1992 /// The calling query will NOT be invalidated when:
1993 /// - An individual query of type Q has its value change
1994 ///
1995 /// # Example
1996 ///
1997 /// ```ignore
1998 /// #[query]
1999 /// fn all_results(db: &impl Db) -> Result<Vec<i32>, QueryError> {
2000 /// let queries = db.list_queries::<MyQuery>();
2001 /// let mut results = Vec::new();
2002 /// for q in queries {
2003 /// results.push(*db.query(q)?);
2004 /// }
2005 /// Ok(results)
2006 /// }
2007 /// ```
2008 pub fn list_queries<Q: Query>(&self) -> Vec<Q> {
2009 // Record dependency on the sentinel (set-level dependency)
2010 let sentinel: FullCacheKey = QuerySetSentinelKey::new::<Q>().into();
2011
2012 self.runtime.tracer.on_dependency_registered(
2013 self.exec_ctx.span_ctx(),
2014 &self.current_key,
2015 &sentinel,
2016 );
2017
2018 // Ensure sentinel exists in whale (for dependency tracking)
2019 if self.runtime.whale.get(&sentinel).is_none() {
2020 let _ =
2021 self.runtime
2022 .whale
2023 .register(sentinel.clone(), None, Durability::volatile(), vec![]);
2024 }
2025
2026 self.deps.borrow_mut().push(sentinel);
2027
2028 // Return all registered queries
2029 self.runtime.query_registry.get_all::<Q>()
2030 }
2031
2032 /// List all asset keys of type K that have been registered.
2033 ///
2034 /// This method establishes a dependency on the "set" of asset keys of type K.
2035 /// The calling query will be invalidated when:
2036 /// - A new asset of type K is resolved for the first time (added to set)
2037 /// - An asset of type K is removed via remove_asset
2038 ///
2039 /// The calling query will NOT be invalidated when:
2040 /// - An individual asset's value changes (use `db.asset()` for that)
2041 ///
2042 /// # Example
2043 ///
2044 /// ```ignore
2045 /// #[query]
2046 /// fn all_configs(db: &impl Db) -> Result<Vec<String>, QueryError> {
2047 /// let keys = db.list_asset_keys::<ConfigFile>();
2048 /// let mut contents = Vec::new();
2049 /// for key in keys {
2050 /// let content = db.asset(key)?;
2051 /// contents.push((*content).clone());
2052 /// }
2053 /// Ok(contents)
2054 /// }
2055 /// ```
2056 pub fn list_asset_keys<K: AssetKey>(&self) -> Vec<K> {
2057 // Record dependency on the sentinel (set-level dependency)
2058 let sentinel: FullCacheKey = AssetKeySetSentinelKey::new::<K>().into();
2059
2060 self.runtime.tracer.on_asset_dependency_registered(
2061 self.exec_ctx.span_ctx(),
2062 &self.current_key,
2063 &sentinel,
2064 );
2065
2066 // Ensure sentinel exists in whale (for dependency tracking)
2067 if self.runtime.whale.get(&sentinel).is_none() {
2068 let _ =
2069 self.runtime
2070 .whale
2071 .register(sentinel.clone(), None, Durability::volatile(), vec![]);
2072 }
2073
2074 self.deps.borrow_mut().push(sentinel);
2075
2076 // Return all registered asset keys
2077 self.runtime.asset_key_registry.get_all::<K>()
2078 }
2079}
2080
2081impl<'a, T: Tracer> Db for QueryContext<'a, T> {
2082 fn query<Q: Query>(&self, query: Q) -> Result<Arc<Q::Output>, QueryError> {
2083 QueryContext::query(self, query)
2084 }
2085
2086 fn asset<K: AssetKey>(&self, key: K) -> Result<Arc<K::Asset>, QueryError> {
2087 QueryContext::asset(self, key)
2088 }
2089
2090 fn asset_state<K: AssetKey>(&self, key: K) -> Result<AssetLoadingState<K>, QueryError> {
2091 QueryContext::asset_state(self, key)
2092 }
2093
2094 fn list_queries<Q: Query>(&self) -> Vec<Q> {
2095 QueryContext::list_queries(self)
2096 }
2097
2098 fn list_asset_keys<K: AssetKey>(&self) -> Vec<K> {
2099 QueryContext::list_asset_keys(self)
2100 }
2101}
2102
2103/// Context for collecting dependencies during asset locator execution.
2104///
2105/// Unlike `QueryContext`, this is specifically for locators and does not
2106/// register dependencies on any parent query. Dependencies collected here
2107/// are stored with the asset itself.
2108pub(crate) struct LocatorContext<'a, T: Tracer> {
2109 runtime: &'a QueryRuntime<T>,
2110 deps: RefCell<Vec<FullCacheKey>>,
2111}
2112
2113impl<'a, T: Tracer> LocatorContext<'a, T> {
2114 /// Create a new locator context for the given asset key.
2115 ///
2116 /// Consistency tracking is handled via thread-local storage, so leaf asset
2117 /// accesses will be checked against any active tracker from a parent query.
2118 pub(crate) fn new(runtime: &'a QueryRuntime<T>, _asset_key: FullCacheKey) -> Self {
2119 Self {
2120 runtime,
2121 deps: RefCell::new(Vec::new()),
2122 }
2123 }
2124
2125 /// Consume this context and return the collected dependencies.
2126 pub(crate) fn into_deps(self) -> Vec<FullCacheKey> {
2127 self.deps.into_inner()
2128 }
2129}
2130
2131impl<T: Tracer> Db for LocatorContext<'_, T> {
2132 fn query<Q: Query>(&self, query: Q) -> Result<Arc<Q::Output>, QueryError> {
2133 let full_key = QueryCacheKey::new(query.clone()).into();
2134
2135 // Record this as a dependency of the asset being located
2136 self.deps.borrow_mut().push(full_key);
2137
2138 // Execute the query via the runtime
2139 self.runtime.query(query)
2140 }
2141
2142 fn asset<K: AssetKey>(&self, key: K) -> Result<Arc<K::Asset>, QueryError> {
2143 self.asset_state(key)?.suspend()
2144 }
2145
2146 fn asset_state<K: AssetKey>(&self, key: K) -> Result<AssetLoadingState<K>, QueryError> {
2147 let full_cache_key = AssetCacheKey::new(key.clone()).into();
2148
2149 // Record this as a dependency of the asset being located
2150 self.deps.borrow_mut().push(full_cache_key);
2151
2152 // Access the asset - consistency checking is done inside get_asset_with_revision
2153 let (state, _changed_at) = self.runtime.get_asset_with_revision(key)?;
2154
2155 Ok(state)
2156 }
2157
2158 fn list_queries<Q: Query>(&self) -> Vec<Q> {
2159 self.runtime.list_queries()
2160 }
2161
2162 fn list_asset_keys<K: AssetKey>(&self) -> Vec<K> {
2163 self.runtime.list_asset_keys()
2164 }
2165}
2166
2167#[cfg(test)]
2168mod tests {
2169 use super::*;
2170
2171 #[test]
2172 fn test_simple_query() {
2173 #[derive(Clone, Debug, Hash, PartialEq, Eq)]
2174 struct Add {
2175 a: i32,
2176 b: i32,
2177 }
2178
2179 impl Query for Add {
2180 type Output = i32;
2181
2182 fn query(self, _db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
2183 Ok(Arc::new(self.a + self.b))
2184 }
2185
2186 fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
2187 old == new
2188 }
2189 }
2190
2191 let runtime = QueryRuntime::new();
2192
2193 let result = runtime.query(Add { a: 1, b: 2 }).unwrap();
2194 assert_eq!(*result, 3);
2195
2196 // Second query should be cached
2197 let result2 = runtime.query(Add { a: 1, b: 2 }).unwrap();
2198 assert_eq!(*result2, 3);
2199 }
2200
2201 #[test]
2202 fn test_dependent_queries() {
2203 #[derive(Clone, Debug, Hash, PartialEq, Eq)]
2204 struct Base {
2205 value: i32,
2206 }
2207
2208 impl Query for Base {
2209 type Output = i32;
2210
2211 fn query(self, _db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
2212 Ok(Arc::new(self.value * 2))
2213 }
2214
2215 fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
2216 old == new
2217 }
2218 }
2219
2220 #[derive(Clone, Debug, Hash, PartialEq, Eq)]
2221 struct Derived {
2222 base_value: i32,
2223 }
2224
2225 impl Query for Derived {
2226 type Output = i32;
2227
2228 fn query(self, db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
2229 let base = db.query(Base {
2230 value: self.base_value,
2231 })?;
2232 Ok(Arc::new(*base + 10))
2233 }
2234
2235 fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
2236 old == new
2237 }
2238 }
2239
2240 let runtime = QueryRuntime::new();
2241
2242 let result = runtime.query(Derived { base_value: 5 }).unwrap();
2243 assert_eq!(*result, 20); // 5 * 2 + 10
2244 }
2245
2246 #[test]
2247 fn test_cycle_detection() {
2248 #[derive(Clone, Debug, Hash, PartialEq, Eq)]
2249 struct CycleA {
2250 id: i32,
2251 }
2252
2253 #[derive(Clone, Debug, Hash, PartialEq, Eq)]
2254 struct CycleB {
2255 id: i32,
2256 }
2257
2258 impl Query for CycleA {
2259 type Output = i32;
2260
2261 fn query(self, db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
2262 let b = db.query(CycleB { id: self.id })?;
2263 Ok(Arc::new(*b + 1))
2264 }
2265
2266 fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
2267 old == new
2268 }
2269 }
2270
2271 impl Query for CycleB {
2272 type Output = i32;
2273
2274 fn query(self, db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
2275 let a = db.query(CycleA { id: self.id })?;
2276 Ok(Arc::new(*a + 1))
2277 }
2278
2279 fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
2280 old == new
2281 }
2282 }
2283
2284 let runtime = QueryRuntime::new();
2285
2286 let result = runtime.query(CycleA { id: 1 });
2287 assert!(matches!(result, Err(QueryError::Cycle { .. })));
2288 }
2289
2290 #[test]
2291 fn test_fallible_query() {
2292 #[derive(Clone, Debug, Hash, PartialEq, Eq)]
2293 struct ParseInt {
2294 input: String,
2295 }
2296
2297 impl Query for ParseInt {
2298 type Output = Result<i32, std::num::ParseIntError>;
2299
2300 fn query(self, _db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
2301 Ok(Arc::new(self.input.parse()))
2302 }
2303
2304 fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
2305 old == new
2306 }
2307 }
2308
2309 let runtime = QueryRuntime::new();
2310
2311 // Valid parse
2312 let result = runtime
2313 .query(ParseInt {
2314 input: "42".to_string(),
2315 })
2316 .unwrap();
2317 assert_eq!(*result, Ok(42));
2318
2319 // Invalid parse - system succeeds, user error in output
2320 let result = runtime
2321 .query(ParseInt {
2322 input: "not_a_number".to_string(),
2323 })
2324 .unwrap();
2325 assert!(result.is_err());
2326 }
2327
2328 // Macro tests
2329 mod macro_tests {
2330 use super::*;
2331 use crate::query;
2332
2333 #[query]
2334 fn add(db: &impl Db, a: i32, b: i32) -> Result<i32, QueryError> {
2335 let _ = db; // silence unused warning
2336 Ok(a + b)
2337 }
2338
2339 #[test]
2340 fn test_macro_basic() {
2341 let runtime = QueryRuntime::new();
2342 let result = runtime.query(Add::new(1, 2)).unwrap();
2343 assert_eq!(*result, 3);
2344 }
2345
2346 #[query]
2347 fn simple_double(db: &impl Db, x: i32) -> Result<i32, QueryError> {
2348 let _ = db;
2349 Ok(x * 2)
2350 }
2351
2352 #[test]
2353 fn test_macro_simple() {
2354 let runtime = QueryRuntime::new();
2355 let result = runtime.query(SimpleDouble::new(5)).unwrap();
2356 assert_eq!(*result, 10);
2357 }
2358
2359 #[query(keys(id))]
2360 fn with_key_selection(
2361 db: &impl Db,
2362 id: u32,
2363 include_extra: bool,
2364 ) -> Result<String, QueryError> {
2365 let _ = db;
2366 Ok(format!("id={}, extra={}", id, include_extra))
2367 }
2368
2369 #[test]
2370 fn test_macro_key_selection() {
2371 let runtime = QueryRuntime::new();
2372
2373 // Same id, different include_extra - should return cached
2374 let r1 = runtime.query(WithKeySelection::new(1, true)).unwrap();
2375 let r2 = runtime.query(WithKeySelection::new(1, false)).unwrap();
2376
2377 // Both should have same value because only `id` is the key
2378 assert_eq!(*r1, "id=1, extra=true");
2379 assert_eq!(*r2, "id=1, extra=true"); // Cached!
2380 }
2381
2382 #[query]
2383 fn dependent(db: &impl Db, a: i32, b: i32) -> Result<i32, QueryError> {
2384 let sum = db.query(Add::new(a, b))?;
2385 Ok(*sum * 2)
2386 }
2387
2388 #[test]
2389 fn test_macro_dependencies() {
2390 let runtime = QueryRuntime::new();
2391 let result = runtime.query(Dependent::new(3, 4)).unwrap();
2392 assert_eq!(*result, 14); // (3 + 4) * 2
2393 }
2394
2395 #[query(output_eq)]
2396 fn with_output_eq(db: &impl Db, x: i32) -> Result<i32, QueryError> {
2397 let _ = db;
2398 Ok(x * 2)
2399 }
2400
2401 #[test]
2402 fn test_macro_output_eq() {
2403 let runtime = QueryRuntime::new();
2404 let result = runtime.query(WithOutputEq::new(5)).unwrap();
2405 assert_eq!(*result, 10);
2406 }
2407
2408 #[query(name = "CustomName")]
2409 fn original_name(db: &impl Db, x: i32) -> Result<i32, QueryError> {
2410 let _ = db;
2411 Ok(x)
2412 }
2413
2414 #[test]
2415 fn test_macro_custom_name() {
2416 let runtime = QueryRuntime::new();
2417 let result = runtime.query(CustomName::new(42)).unwrap();
2418 assert_eq!(*result, 42);
2419 }
2420
2421 // Test that attribute macros like #[tracing::instrument] are preserved
2422 // We use #[allow(unused_variables)] and #[inline] as test attributes since
2423 // they don't require external dependencies.
2424 #[allow(unused_variables)]
2425 #[inline]
2426 #[query]
2427 fn with_attributes(db: &impl Db, x: i32) -> Result<i32, QueryError> {
2428 // This would warn without #[allow(unused_variables)] on the generated method
2429 let unused_var = 42;
2430 Ok(x * 2)
2431 }
2432
2433 #[test]
2434 fn test_macro_preserves_attributes() {
2435 let runtime = QueryRuntime::new();
2436 // If attributes weren't preserved, this might warn about unused_var
2437 let result = runtime.query(WithAttributes::new(5)).unwrap();
2438 assert_eq!(*result, 10);
2439 }
2440 }
2441
2442 // Tests for poll() and changed_at()
2443 mod poll_tests {
2444 use super::*;
2445
2446 #[derive(Clone, Debug, Hash, PartialEq, Eq)]
2447 struct Counter {
2448 id: i32,
2449 }
2450
2451 impl Query for Counter {
2452 type Output = i32;
2453
2454 fn query(self, _db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
2455 Ok(Arc::new(self.id * 10))
2456 }
2457
2458 fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
2459 old == new
2460 }
2461 }
2462
2463 #[test]
2464 fn test_poll_returns_value_and_revision() {
2465 let runtime = QueryRuntime::new();
2466
2467 let result = runtime.poll(Counter { id: 1 }).unwrap();
2468
2469 // Value should be correct - access through Result and Arc
2470 assert_eq!(**result.value.as_ref().unwrap(), 10);
2471
2472 // Revision should be non-zero after first execution
2473 assert!(result.revision > 0);
2474 }
2475
2476 #[test]
2477 fn test_poll_revision_stable_on_cache_hit() {
2478 let runtime = QueryRuntime::new();
2479
2480 // First poll
2481 let result1 = runtime.poll(Counter { id: 1 }).unwrap();
2482 let rev1 = result1.revision;
2483
2484 // Second poll (cache hit)
2485 let result2 = runtime.poll(Counter { id: 1 }).unwrap();
2486 let rev2 = result2.revision;
2487
2488 // Revision should be the same (no change)
2489 assert_eq!(rev1, rev2);
2490 }
2491
2492 #[test]
2493 fn test_poll_revision_changes_on_invalidate() {
2494 let runtime = QueryRuntime::new();
2495
2496 // First poll
2497 let result1 = runtime.poll(Counter { id: 1 }).unwrap();
2498 let rev1 = result1.revision;
2499
2500 // Invalidate and poll again
2501 runtime.invalidate(&Counter { id: 1 });
2502 let result2 = runtime.poll(Counter { id: 1 }).unwrap();
2503 let rev2 = result2.revision;
2504
2505 // Revision should increase (value was recomputed)
2506 // Note: Since output_eq returns true (same value), this might not change
2507 // depending on early cutoff behavior. Let's verify the value is still correct.
2508 assert_eq!(**result2.value.as_ref().unwrap(), 10);
2509
2510 // With early cutoff, revision might stay the same if value didn't change
2511 // This is expected behavior
2512 assert!(rev2 >= rev1);
2513 }
2514
2515 #[test]
2516 fn test_changed_at_returns_none_for_unexecuted_query() {
2517 let runtime = QueryRuntime::new();
2518
2519 // Query has never been executed
2520 let rev = runtime.changed_at(&Counter { id: 1 });
2521 assert!(rev.is_none());
2522 }
2523
2524 #[test]
2525 fn test_changed_at_returns_revision_after_execution() {
2526 let runtime = QueryRuntime::new();
2527
2528 // Execute the query
2529 let _ = runtime.query(Counter { id: 1 }).unwrap();
2530
2531 // Now changed_at should return Some
2532 let rev = runtime.changed_at(&Counter { id: 1 });
2533 assert!(rev.is_some());
2534 assert!(rev.unwrap() > 0);
2535 }
2536
2537 #[test]
2538 fn test_changed_at_matches_poll_revision() {
2539 let runtime = QueryRuntime::new();
2540
2541 // Poll the query
2542 let result = runtime.poll(Counter { id: 1 }).unwrap();
2543
2544 // changed_at should match the revision from poll
2545 let rev = runtime.changed_at(&Counter { id: 1 });
2546 assert_eq!(rev, Some(result.revision));
2547 }
2548
2549 #[test]
2550 fn test_poll_value_access() {
2551 let runtime = QueryRuntime::new();
2552
2553 let result = runtime.poll(Counter { id: 5 }).unwrap();
2554
2555 // Access through Result and Arc
2556 let value: &i32 = result.value.as_ref().unwrap();
2557 assert_eq!(*value, 50);
2558
2559 // Access Arc directly via field after unwrapping Result
2560 let arc: &Arc<i32> = result.value.as_ref().unwrap();
2561 assert_eq!(**arc, 50);
2562 }
2563
2564 #[test]
2565 fn test_subscription_pattern() {
2566 let runtime = QueryRuntime::new();
2567
2568 // Simulate subscription pattern
2569 let mut last_revision: RevisionCounter = 0;
2570 let mut notifications = 0;
2571
2572 // First poll - should notify (new value)
2573 let result = runtime.poll(Counter { id: 1 }).unwrap();
2574 if result.revision > last_revision {
2575 notifications += 1;
2576 last_revision = result.revision;
2577 }
2578
2579 // Second poll - should NOT notify (no change)
2580 let result = runtime.poll(Counter { id: 1 }).unwrap();
2581 if result.revision > last_revision {
2582 notifications += 1;
2583 last_revision = result.revision;
2584 }
2585
2586 // Third poll - should NOT notify (no change)
2587 let result = runtime.poll(Counter { id: 1 }).unwrap();
2588 if result.revision > last_revision {
2589 notifications += 1;
2590 #[allow(unused_assignments)]
2591 {
2592 last_revision = result.revision;
2593 }
2594 }
2595
2596 // Only the first poll should have triggered a notification
2597 assert_eq!(notifications, 1);
2598 }
2599 }
2600
2601 // Tests for GC APIs
2602 mod gc_tests {
2603 use super::*;
2604 use crate::tracer::{SpanContext, SpanId, TraceId};
2605 use std::collections::HashSet;
2606 use std::sync::atomic::{AtomicUsize, Ordering};
2607 use std::sync::Mutex;
2608
2609 #[derive(Clone, Debug, Hash, PartialEq, Eq)]
2610 struct Leaf {
2611 id: i32,
2612 }
2613
2614 impl Query for Leaf {
2615 type Output = i32;
2616
2617 fn query(self, _db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
2618 Ok(Arc::new(self.id * 10))
2619 }
2620
2621 fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
2622 old == new
2623 }
2624 }
2625
2626 #[derive(Clone, Debug, Hash, PartialEq, Eq)]
2627 struct Parent {
2628 child_id: i32,
2629 }
2630
2631 impl Query for Parent {
2632 type Output = i32;
2633
2634 fn query(self, db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
2635 let child = db.query(Leaf { id: self.child_id })?;
2636 Ok(Arc::new(*child + 1))
2637 }
2638
2639 fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
2640 old == new
2641 }
2642 }
2643
2644 #[test]
2645 fn test_query_keys_returns_all_cached_queries() {
2646 let runtime = QueryRuntime::new();
2647
2648 // Execute some queries
2649 let _ = runtime.query(Leaf { id: 1 }).unwrap();
2650 let _ = runtime.query(Leaf { id: 2 }).unwrap();
2651 let _ = runtime.query(Leaf { id: 3 }).unwrap();
2652
2653 // Get all keys
2654 let keys = runtime.query_keys();
2655
2656 // Should have at least 3 keys (might have more due to sentinels)
2657 assert!(keys.len() >= 3);
2658 }
2659
2660 #[test]
2661 fn test_remove_removes_query() {
2662 let runtime = QueryRuntime::new();
2663
2664 // Execute a query
2665 let _ = runtime.query(Leaf { id: 1 }).unwrap();
2666
2667 // Get the key
2668 let full_key = QueryCacheKey::new(Leaf { id: 1 }).into();
2669
2670 // Query should exist
2671 assert!(runtime.changed_at(&Leaf { id: 1 }).is_some());
2672
2673 // Remove it
2674 assert!(runtime.remove(&full_key));
2675
2676 // Query should no longer exist
2677 assert!(runtime.changed_at(&Leaf { id: 1 }).is_none());
2678 }
2679
2680 #[test]
2681 fn test_remove_if_unused_removes_leaf_query() {
2682 let runtime = QueryRuntime::new();
2683
2684 // Execute a leaf query (no dependents)
2685 let _ = runtime.query(Leaf { id: 1 }).unwrap();
2686
2687 // Should be removable since no other query depends on it
2688 assert!(runtime.remove_query_if_unused(&Leaf { id: 1 }));
2689
2690 // Query should no longer exist
2691 assert!(runtime.changed_at(&Leaf { id: 1 }).is_none());
2692 }
2693
2694 #[test]
2695 fn test_remove_if_unused_does_not_remove_query_with_dependents() {
2696 let runtime = QueryRuntime::new();
2697
2698 // Execute parent query (which depends on Leaf)
2699 let _ = runtime.query(Parent { child_id: 1 }).unwrap();
2700
2701 // Leaf query should not be removable since Parent depends on it
2702 assert!(!runtime.remove_query_if_unused(&Leaf { id: 1 }));
2703
2704 // Leaf query should still exist
2705 assert!(runtime.changed_at(&Leaf { id: 1 }).is_some());
2706
2707 // But Parent should be removable (no dependents)
2708 assert!(runtime.remove_query_if_unused(&Parent { child_id: 1 }));
2709 }
2710
2711 #[test]
2712 fn test_remove_if_unused_with_full_cache_key() {
2713 let runtime = QueryRuntime::new();
2714
2715 // Execute a leaf query
2716 let _ = runtime.query(Leaf { id: 1 }).unwrap();
2717
2718 let full_key = QueryCacheKey::new(Leaf { id: 1 }).into();
2719
2720 // Should be removable via FullCacheKey
2721 assert!(runtime.remove_if_unused(&full_key));
2722
2723 // Query should no longer exist
2724 assert!(runtime.changed_at(&Leaf { id: 1 }).is_none());
2725 }
2726
2727 // Test tracer receives on_query_start calls (for GC tracking)
2728 struct GcTracker {
2729 accessed_keys: Mutex<HashSet<String>>,
2730 access_count: AtomicUsize,
2731 }
2732
2733 impl GcTracker {
2734 fn new() -> Self {
2735 Self {
2736 accessed_keys: Mutex::new(HashSet::new()),
2737 access_count: AtomicUsize::new(0),
2738 }
2739 }
2740 }
2741
2742 impl Tracer for GcTracker {
2743 fn new_span_id(&self) -> SpanId {
2744 SpanId(1)
2745 }
2746
2747 fn new_trace_id(&self) -> TraceId {
2748 TraceId(1)
2749 }
2750
2751 fn on_query_start(&self, _ctx: &SpanContext, query_key: &QueryCacheKey) {
2752 self.accessed_keys
2753 .lock()
2754 .unwrap()
2755 .insert(query_key.debug_repr().to_string());
2756 self.access_count.fetch_add(1, Ordering::Relaxed);
2757 }
2758 }
2759
2760 #[test]
2761 fn test_tracer_receives_on_query_start() {
2762 let tracker = GcTracker::new();
2763 let runtime = QueryRuntime::with_tracer(tracker);
2764
2765 // Execute some queries
2766 let _ = runtime.query(Leaf { id: 1 }).unwrap();
2767 let _ = runtime.query(Leaf { id: 2 }).unwrap();
2768
2769 // Tracer should have received on_query_start calls
2770 let count = runtime.tracer().access_count.load(Ordering::Relaxed);
2771 assert_eq!(count, 2);
2772
2773 // Check that the keys were recorded
2774 let keys = runtime.tracer().accessed_keys.lock().unwrap();
2775 assert!(keys.iter().any(|k| k.contains("Leaf")));
2776 }
2777
2778 #[test]
2779 fn test_tracer_receives_on_query_start_for_cache_hits() {
2780 let tracker = GcTracker::new();
2781 let runtime = QueryRuntime::with_tracer(tracker);
2782
2783 // Execute query twice (second is cache hit)
2784 let _ = runtime.query(Leaf { id: 1 }).unwrap();
2785 let _ = runtime.query(Leaf { id: 1 }).unwrap();
2786
2787 // Tracer should have received on_query_start for both calls
2788 let count = runtime.tracer().access_count.load(Ordering::Relaxed);
2789 assert_eq!(count, 2);
2790 }
2791
2792 #[test]
2793 fn test_gc_workflow() {
2794 let tracker = GcTracker::new();
2795 let runtime = QueryRuntime::with_tracer(tracker);
2796
2797 // Execute some queries
2798 let _ = runtime.query(Leaf { id: 1 }).unwrap();
2799 let _ = runtime.query(Leaf { id: 2 }).unwrap();
2800 let _ = runtime.query(Leaf { id: 3 }).unwrap();
2801
2802 // Simulate GC: remove all queries that are not in use
2803 let mut removed = 0;
2804 for key in runtime.query_keys() {
2805 if runtime.remove_if_unused(&key) {
2806 removed += 1;
2807 }
2808 }
2809
2810 // All leaf queries should be removable
2811 assert!(removed >= 3);
2812
2813 // Queries should no longer exist
2814 assert!(runtime.changed_at(&Leaf { id: 1 }).is_none());
2815 assert!(runtime.changed_at(&Leaf { id: 2 }).is_none());
2816 assert!(runtime.changed_at(&Leaf { id: 3 }).is_none());
2817 }
2818 }
2819}