query_flow/runtime.rs
1//! Query runtime and context.
2
3use std::any::{Any, TypeId};
4use std::cell::{Cell, RefCell};
5use std::ops::Deref;
6use std::sync::Arc;
7
8use whale::{Durability, GetOrInsertResult, RevisionCounter, Runtime as WhaleRuntime};
9
10use crate::asset::{AssetKey, AssetLocator, DurabilityLevel, FullAssetKey, PendingAsset};
11use crate::db::Db;
12use crate::key::FullCacheKey;
13use crate::loading::AssetLoadingState;
14use crate::query::Query;
15use crate::storage::{
16 AssetKeyRegistry, AssetState, CachedEntry, CachedValue, LocatorStorage, PendingStorage,
17 QueryRegistry, VerifierStorage,
18};
19use crate::tracer::{
20 ExecutionResult, InvalidationReason, NoopTracer, SpanId, Tracer, TracerAssetKey,
21 TracerAssetState, TracerQueryKey,
22};
23use crate::QueryError;
24
25/// Function type for comparing user errors during early cutoff.
26///
27/// Used by `QueryRuntimeBuilder::error_comparator` to customize how
28/// `QueryError::UserError` values are compared for caching purposes.
29pub type ErrorComparator = fn(&anyhow::Error, &anyhow::Error) -> bool;
30
31/// Number of durability levels (matches whale's default).
32const DURABILITY_LEVELS: usize = 4;
33
34// Thread-local query execution stack for cycle detection.
35thread_local! {
36 static QUERY_STACK: RefCell<Vec<FullCacheKey>> = const { RefCell::new(Vec::new()) };
37}
38
39/// Execution context passed through query execution.
40///
41/// Contains a span ID for tracing correlation.
42#[derive(Clone, Copy)]
43pub struct ExecutionContext {
44 span_id: SpanId,
45}
46
47impl ExecutionContext {
48 /// Create a new execution context with the given span ID.
49 #[inline]
50 pub fn new(span_id: SpanId) -> Self {
51 Self { span_id }
52 }
53
54 /// Get the span ID for this execution context.
55 #[inline]
56 pub fn span_id(&self) -> SpanId {
57 self.span_id
58 }
59}
60
61/// Result of polling a query, containing the value and its revision.
62///
63/// This is returned by [`QueryRuntime::poll`] and provides both the query result
64/// and its change revision, enabling efficient change detection for subscription
65/// patterns.
66///
67/// # Example
68///
69/// ```ignore
70/// let result = runtime.poll(MyQuery::new())?;
71///
72/// // Access the value via Deref
73/// println!("{:?}", *result);
74///
75/// // Check if changed since last poll
76/// if result.revision > last_known_revision {
77/// notify_subscribers(&result.value);
78/// last_known_revision = result.revision;
79/// }
80/// ```
81#[derive(Debug, Clone)]
82pub struct Polled<T> {
83 /// The query result value.
84 pub value: T,
85 /// The revision at which this value was last changed.
86 ///
87 /// Compare this with a previously stored revision to detect changes.
88 pub revision: RevisionCounter,
89}
90
91impl<T: Deref> Deref for Polled<T> {
92 type Target = T::Target;
93
94 fn deref(&self) -> &Self::Target {
95 &self.value
96 }
97}
98
99/// The query runtime manages query execution, caching, and dependency tracking.
100///
101/// This is cheap to clone - all data is behind `Arc`.
102///
103/// # Type Parameter
104///
105/// - `T: Tracer` - The tracer type for observability. Use `NoopTracer` (default)
106/// for zero-cost when tracing is not needed.
107///
108/// # Example
109///
110/// ```ignore
111/// // Without tracing (default)
112/// let runtime = QueryRuntime::new();
113///
114/// // With tracing
115/// let tracer = MyTracer::new();
116/// let runtime = QueryRuntime::with_tracer(tracer);
117///
118/// // Sync query execution
119/// let result = runtime.query(MyQuery { ... })?;
120/// ```
121pub struct QueryRuntime<T: Tracer = NoopTracer> {
122 /// Whale runtime for dependency tracking and cache storage.
123 /// Query outputs and asset values are stored in Node.data as Option<CachedEntry>.
124 whale: WhaleRuntime<FullCacheKey, Option<CachedEntry>, DURABILITY_LEVELS>,
125 /// Registered asset locators
126 locators: Arc<LocatorStorage>,
127 /// Pending asset requests
128 pending: Arc<PendingStorage>,
129 /// Registry for tracking query instances (for list_queries)
130 query_registry: Arc<QueryRegistry>,
131 /// Registry for tracking asset keys (for list_asset_keys)
132 asset_key_registry: Arc<AssetKeyRegistry>,
133 /// Verifiers for re-executing queries (for verify-then-decide pattern)
134 verifiers: Arc<VerifierStorage>,
135 /// Comparator for user errors during early cutoff
136 error_comparator: ErrorComparator,
137 /// Tracer for observability
138 tracer: Arc<T>,
139}
140
141impl Default for QueryRuntime<NoopTracer> {
142 fn default() -> Self {
143 Self::new()
144 }
145}
146
147impl<T: Tracer> Clone for QueryRuntime<T> {
148 fn clone(&self) -> Self {
149 Self {
150 whale: self.whale.clone(),
151 locators: self.locators.clone(),
152 pending: self.pending.clone(),
153 query_registry: self.query_registry.clone(),
154 asset_key_registry: self.asset_key_registry.clone(),
155 verifiers: self.verifiers.clone(),
156 error_comparator: self.error_comparator,
157 tracer: self.tracer.clone(),
158 }
159 }
160}
161
162/// Default error comparator that treats all errors as different.
163///
164/// This is conservative - it always triggers recomputation when an error occurs.
165fn default_error_comparator(_a: &anyhow::Error, _b: &anyhow::Error) -> bool {
166 false
167}
168
169impl<T: Tracer> QueryRuntime<T> {
170 /// Get cached output along with its revision (single atomic access).
171 fn get_cached_with_revision<Q: Query>(
172 &self,
173 key: &FullCacheKey,
174 ) -> Option<(CachedValue<Arc<Q::Output>>, RevisionCounter)> {
175 let node = self.whale.get(key)?;
176 let revision = node.changed_at;
177 let entry = node.data.as_ref()?;
178 let cached = entry.to_cached_value::<Q::Output>()?;
179 Some((cached, revision))
180 }
181
182 /// Get a reference to the tracer.
183 #[inline]
184 pub fn tracer(&self) -> &T {
185 &self.tracer
186 }
187}
188
189impl QueryRuntime<NoopTracer> {
190 /// Create a new query runtime with default settings.
191 pub fn new() -> Self {
192 Self::with_tracer(NoopTracer)
193 }
194
195 /// Create a builder for customizing the runtime.
196 ///
197 /// # Example
198 ///
199 /// ```ignore
200 /// let runtime = QueryRuntime::builder()
201 /// .error_comparator(|a, b| {
202 /// // Custom error comparison logic
203 /// match (a.downcast_ref::<MyError>(), b.downcast_ref::<MyError>()) {
204 /// (Some(a), Some(b)) => a == b,
205 /// _ => false,
206 /// }
207 /// })
208 /// .build();
209 /// ```
210 pub fn builder() -> QueryRuntimeBuilder<NoopTracer> {
211 QueryRuntimeBuilder::new()
212 }
213}
214
215impl<T: Tracer> QueryRuntime<T> {
216 /// Create a new query runtime with the specified tracer.
217 pub fn with_tracer(tracer: T) -> Self {
218 QueryRuntimeBuilder::new().tracer(tracer).build()
219 }
220
221 /// Execute a query synchronously.
222 ///
223 /// Returns the cached result if valid, otherwise executes the query.
224 ///
225 /// # Errors
226 ///
227 /// - `QueryError::Suspend` - Query is waiting for async loading
228 /// - `QueryError::Cycle` - Dependency cycle detected
229 pub fn query<Q: Query>(&self, query: Q) -> Result<Arc<Q::Output>, QueryError> {
230 self.query_internal(query)
231 .and_then(|(inner_result, _)| inner_result.map_err(QueryError::UserError))
232 }
233
234 /// Internal implementation shared by query() and poll().
235 ///
236 /// Returns (result, revision) tuple where result is either Ok(output) or Err(user_error).
237 /// System errors (Suspend, Cycle, etc.) are returned as the outer Err.
238 #[allow(clippy::type_complexity)]
239 fn query_internal<Q: Query>(
240 &self,
241 query: Q,
242 ) -> Result<(Result<Arc<Q::Output>, Arc<anyhow::Error>>, RevisionCounter), QueryError> {
243 let key = query.cache_key();
244 let full_key = FullCacheKey::new::<Q, _>(&key);
245
246 // Emit query key event for GC tracking (before other events)
247 self.tracer.on_query_key(&full_key);
248
249 // Create execution context and emit start event
250 let span_id = self.tracer.new_span_id();
251 let exec_ctx = ExecutionContext::new(span_id);
252 let query_key = TracerQueryKey::new(std::any::type_name::<Q>(), full_key.debug_repr());
253
254 self.tracer.on_query_start(span_id, query_key.clone());
255
256 // Check for cycles using thread-local stack
257 let cycle_detected = QUERY_STACK.with(|stack| {
258 let stack = stack.borrow();
259 stack.iter().any(|k| k == &full_key)
260 });
261
262 if cycle_detected {
263 let path = QUERY_STACK.with(|stack| {
264 let stack = stack.borrow();
265 let mut path: Vec<String> =
266 stack.iter().map(|k| k.debug_repr().to_string()).collect();
267 path.push(full_key.debug_repr().to_string());
268 path
269 });
270
271 self.tracer.on_cycle_detected(
272 path.iter()
273 .map(|s| TracerQueryKey::new("", s.clone()))
274 .collect(),
275 );
276 self.tracer
277 .on_query_end(span_id, query_key.clone(), ExecutionResult::CycleDetected);
278
279 return Err(QueryError::Cycle { path });
280 }
281
282 // Check if cached and valid (with verify-then-decide pattern)
283 let current_rev = self.whale.current_revision();
284
285 // Fast path: already verified at current revision
286 if self.whale.is_verified_at(&full_key, ¤t_rev) {
287 // Single atomic access to get both cached value and revision
288 if let Some((cached, revision)) = self.get_cached_with_revision::<Q>(&full_key) {
289 self.tracer.on_cache_check(span_id, query_key.clone(), true);
290 self.tracer
291 .on_query_end(span_id, query_key.clone(), ExecutionResult::CacheHit);
292
293 return match cached {
294 CachedValue::Ok(output) => Ok((Ok(output), revision)),
295 CachedValue::UserError(err) => Ok((Err(err), revision)),
296 };
297 }
298 }
299
300 // Check shallow validity (deps' changed_at ok) and try verify-then-decide
301 if self.whale.is_valid(&full_key) {
302 // Single atomic access to get both cached value and revision
303 if let Some((cached, revision)) = self.get_cached_with_revision::<Q>(&full_key) {
304 // Shallow valid but not verified - verify deps first
305 let mut deps_verified = true;
306 if let Some(deps) = self.whale.get_dependency_ids(&full_key) {
307 for dep in deps {
308 if let Some(verifier) = self.verifiers.get(&dep) {
309 // Re-query dep to verify it (triggers recursive verification)
310 if verifier.verify(self as &dyn std::any::Any).is_err() {
311 deps_verified = false;
312 break;
313 }
314 }
315 }
316 }
317
318 // Re-check validity after deps are verified
319 if deps_verified && self.whale.is_valid(&full_key) {
320 // Deps didn't change their changed_at, mark as verified and use cached
321 self.whale.mark_verified(&full_key, ¤t_rev);
322
323 self.tracer.on_cache_check(span_id, query_key.clone(), true);
324 self.tracer
325 .on_query_end(span_id, query_key.clone(), ExecutionResult::CacheHit);
326
327 return match cached {
328 CachedValue::Ok(output) => Ok((Ok(output), revision)),
329 CachedValue::UserError(err) => Ok((Err(err), revision)),
330 };
331 }
332 // A dep's changed_at increased, fall through to execute
333 }
334 }
335
336 self.tracer
337 .on_cache_check(span_id, query_key.clone(), false);
338
339 // Execute the query with cycle tracking
340 QUERY_STACK.with(|stack| {
341 stack.borrow_mut().push(full_key.clone());
342 });
343
344 let result = self.execute_query::<Q>(&query, &full_key, exec_ctx);
345
346 QUERY_STACK.with(|stack| {
347 stack.borrow_mut().pop();
348 });
349
350 // Emit end event
351 let exec_result = match &result {
352 Ok((_, true, _)) => ExecutionResult::Changed,
353 Ok((_, false, _)) => ExecutionResult::Unchanged,
354 Err(QueryError::Suspend { .. }) => ExecutionResult::Suspended,
355 Err(QueryError::Cycle { .. }) => ExecutionResult::CycleDetected,
356 Err(e) => ExecutionResult::Error {
357 message: format!("{:?}", e),
358 },
359 };
360 self.tracer
361 .on_query_end(span_id, query_key.clone(), exec_result);
362
363 result.map(|(inner_result, _, revision)| (inner_result, revision))
364 }
365
366 /// Execute a query, caching the result if appropriate.
367 ///
368 /// Returns (result, output_changed, revision) tuple.
369 /// - `result`: Ok(output) for success, Err(user_error) for user errors
370 /// - System errors (Suspend, Cycle, etc.) are returned as outer Err
371 #[allow(clippy::type_complexity)]
372 fn execute_query<Q: Query>(
373 &self,
374 query: &Q,
375 full_key: &FullCacheKey,
376 exec_ctx: ExecutionContext,
377 ) -> Result<
378 (
379 Result<Arc<Q::Output>, Arc<anyhow::Error>>,
380 bool,
381 RevisionCounter,
382 ),
383 QueryError,
384 > {
385 // Create context for this query execution
386 let ctx = QueryContext {
387 runtime: self,
388 current_key: full_key.clone(),
389 parent_query_type: std::any::type_name::<Q>(),
390 exec_ctx,
391 deps: RefCell::new(Vec::new()),
392 first_access_revision: Cell::new(None),
393 };
394
395 // Execute the query (clone because query() takes ownership)
396 let result = query.clone().query(&ctx);
397
398 // Get collected dependencies
399 let deps: Vec<FullCacheKey> = ctx.deps.borrow().clone();
400
401 // Query durability defaults to stable - Whale will automatically reduce
402 // the effective durability to min(requested, min(dep_durabilities)).
403 // A pure query with no dependencies remains stable.
404 // A query depending on volatile assets becomes volatile.
405 let durability = Durability::stable();
406
407 match result {
408 Ok(output) => {
409 // Check if output changed (for early cutoff)
410 // existing_revision is Some only when output is unchanged (can reuse revision)
411 let existing_revision = if let Some((CachedValue::Ok(old), rev)) =
412 self.get_cached_with_revision::<Q>(full_key)
413 {
414 if Q::output_eq(&*old, &*output) {
415 Some(rev) // Same output - reuse revision
416 } else {
417 None // Different output
418 }
419 } else {
420 None // No previous Ok value
421 };
422 let output_changed = existing_revision.is_none();
423
424 // Emit early cutoff check event
425 self.tracer.on_early_cutoff_check(
426 exec_ctx.span_id(),
427 TracerQueryKey::new(std::any::type_name::<Q>(), full_key.debug_repr()),
428 output_changed,
429 );
430
431 // Update whale with cached entry (atomic update of value + dependency state)
432 let entry = CachedEntry::Ok(output.clone() as Arc<dyn std::any::Any + Send + Sync>);
433 let revision = if let Some(existing_rev) = existing_revision {
434 // confirm_unchanged doesn't change changed_at, use existing
435 let _ = self.whale.confirm_unchanged(full_key, deps);
436 existing_rev
437 } else {
438 // Use new_rev from register result
439 match self
440 .whale
441 .register(full_key.clone(), Some(entry), durability, deps)
442 {
443 Ok(result) => result.new_rev,
444 Err(missing) => {
445 return Err(QueryError::DependenciesRemoved {
446 missing_keys: missing,
447 })
448 }
449 }
450 };
451
452 // Register query in registry for list_queries
453 let is_new_query = self.query_registry.register(query);
454 if is_new_query {
455 let sentinel = FullCacheKey::query_set_sentinel::<Q>();
456 let _ = self
457 .whale
458 .register(sentinel, None, Durability::stable(), vec![]);
459 }
460
461 // Store verifier for this query (for verify-then-decide pattern)
462 self.verifiers
463 .insert::<Q, T>(full_key.clone(), query.clone());
464
465 Ok((Ok(output), output_changed, revision))
466 }
467 Err(QueryError::UserError(err)) => {
468 // Check if error changed (for early cutoff)
469 // existing_revision is Some only when error is unchanged (can reuse revision)
470 let existing_revision = if let Some((CachedValue::UserError(old_err), rev)) =
471 self.get_cached_with_revision::<Q>(full_key)
472 {
473 if (self.error_comparator)(old_err.as_ref(), err.as_ref()) {
474 Some(rev) // Same error - reuse revision
475 } else {
476 None // Different error
477 }
478 } else {
479 None // No previous UserError
480 };
481 let output_changed = existing_revision.is_none();
482
483 // Emit early cutoff check event
484 self.tracer.on_early_cutoff_check(
485 exec_ctx.span_id(),
486 TracerQueryKey::new(std::any::type_name::<Q>(), full_key.debug_repr()),
487 output_changed,
488 );
489
490 // Update whale with cached error (atomic update of value + dependency state)
491 let entry = CachedEntry::UserError(err.clone());
492 let revision = if let Some(existing_rev) = existing_revision {
493 // confirm_unchanged doesn't change changed_at, use existing
494 let _ = self.whale.confirm_unchanged(full_key, deps);
495 existing_rev
496 } else {
497 // Use new_rev from register result
498 match self
499 .whale
500 .register(full_key.clone(), Some(entry), durability, deps)
501 {
502 Ok(result) => result.new_rev,
503 Err(missing) => {
504 return Err(QueryError::DependenciesRemoved {
505 missing_keys: missing,
506 })
507 }
508 }
509 };
510
511 // Register query in registry for list_queries
512 let is_new_query = self.query_registry.register(query);
513 if is_new_query {
514 let sentinel = FullCacheKey::query_set_sentinel::<Q>();
515 let _ = self
516 .whale
517 .register(sentinel, None, Durability::stable(), vec![]);
518 }
519
520 // Store verifier for this query (for verify-then-decide pattern)
521 self.verifiers
522 .insert::<Q, T>(full_key.clone(), query.clone());
523
524 Ok((Err(err), output_changed, revision))
525 }
526 Err(e) => {
527 // System errors (Suspend, Cycle, Cancelled, MissingDependency) are not cached
528 Err(e)
529 }
530 }
531 }
532
533 /// Invalidate a query, forcing recomputation on next access.
534 ///
535 /// This also invalidates any queries that depend on this one.
536 pub fn invalidate<Q: Query>(&self, key: &Q::CacheKey) {
537 let full_key = FullCacheKey::new::<Q, _>(key);
538
539 self.tracer.on_query_invalidated(
540 TracerQueryKey::new(std::any::type_name::<Q>(), full_key.debug_repr()),
541 InvalidationReason::ManualInvalidation,
542 );
543
544 // Update whale to invalidate dependents (register with None to clear cached value)
545 // Use stable durability to increment all revision counters, ensuring queries
546 // at any durability level will see this as a change.
547 let _ = self
548 .whale
549 .register(full_key, None, Durability::stable(), vec![]);
550 }
551
552 /// Remove a query from the cache entirely, freeing memory.
553 ///
554 /// Use this for GC when a query is no longer needed.
555 /// Unlike `invalidate`, this removes all traces of the query from storage.
556 /// The query will be recomputed from scratch on next access.
557 ///
558 /// This also invalidates any queries that depend on this one.
559 pub fn remove_query<Q: Query>(&self, key: &Q::CacheKey) {
560 let full_key = FullCacheKey::new::<Q, _>(key);
561
562 self.tracer.on_query_invalidated(
563 TracerQueryKey::new(std::any::type_name::<Q>(), full_key.debug_repr()),
564 InvalidationReason::ManualInvalidation,
565 );
566
567 // Remove verifier if exists
568 self.verifiers.remove(&full_key);
569
570 // Remove from whale storage (this also handles dependent invalidation)
571 self.whale.remove(&full_key);
572
573 // Remove from registry and update sentinel for list_queries
574 if self.query_registry.remove::<Q>(key) {
575 let sentinel = FullCacheKey::query_set_sentinel::<Q>();
576 let _ = self
577 .whale
578 .register(sentinel, None, Durability::stable(), vec![]);
579 }
580 }
581
582 /// Clear all cached values by removing all nodes from whale.
583 ///
584 /// Note: This is a relatively expensive operation as it iterates through all keys.
585 pub fn clear_cache(&self) {
586 let keys = self.whale.keys();
587 for key in keys {
588 self.whale.remove(&key);
589 }
590 }
591
592 /// Poll a query, returning both the result and its change revision.
593 ///
594 /// This is useful for implementing subscription patterns where you need to
595 /// detect changes efficiently. Compare the returned `revision` with a
596 /// previously stored value to determine if the query result has changed.
597 ///
598 /// The returned `Polled` contains a `Result<Arc<Q::Output>, Arc<anyhow::Error>>`
599 /// as its value, allowing you to track revision changes for both success and
600 /// user error cases.
601 ///
602 /// # Example
603 ///
604 /// ```ignore
605 /// struct Subscription<Q: Query> {
606 /// query: Q,
607 /// last_revision: RevisionCounter,
608 /// tx: Sender<Result<Arc<Q::Output>, Arc<anyhow::Error>>>,
609 /// }
610 ///
611 /// // Polling loop
612 /// for sub in &mut subscriptions {
613 /// let result = runtime.poll(sub.query.clone())?;
614 /// if result.revision > sub.last_revision {
615 /// sub.tx.send(result.value.clone())?;
616 /// sub.last_revision = result.revision;
617 /// }
618 /// }
619 /// ```
620 ///
621 /// # Errors
622 ///
623 /// Returns `Err` only for system errors (Suspend, Cycle, etc.).
624 /// User errors are returned as `Ok(Polled { value: Err(error), ... })`.
625 #[allow(clippy::type_complexity)]
626 pub fn poll<Q: Query>(
627 &self,
628 query: Q,
629 ) -> Result<Polled<Result<Arc<Q::Output>, Arc<anyhow::Error>>>, QueryError> {
630 let (value, revision) = self.query_internal(query)?;
631 Ok(Polled { value, revision })
632 }
633
634 /// Get the change revision of a query without executing it.
635 ///
636 /// Returns `None` if the query has never been executed.
637 ///
638 /// This is useful for checking if a query has changed since the last poll
639 /// without the cost of executing the query.
640 ///
641 /// # Example
642 ///
643 /// ```ignore
644 /// // Check if query has changed before deciding to poll
645 /// if let Some(rev) = runtime.changed_at::<MyQuery>(&key) {
646 /// if rev > last_known_revision {
647 /// let result = runtime.query(MyQuery::new(key))?;
648 /// // Process result...
649 /// }
650 /// }
651 /// ```
652 pub fn changed_at<Q: Query>(&self, key: &Q::CacheKey) -> Option<RevisionCounter> {
653 let full_key = FullCacheKey::new::<Q, _>(key);
654 self.whale.get(&full_key).map(|node| node.changed_at)
655 }
656}
657
658// ============================================================================
659// GC (Garbage Collection) API
660// ============================================================================
661
662impl<T: Tracer> QueryRuntime<T> {
663 /// Get all query keys currently in the cache.
664 ///
665 /// This is useful for implementing custom garbage collection strategies.
666 /// Use this in combination with [`Tracer::on_query_key`] to track access
667 /// times and implement LRU, TTL, or other GC algorithms externally.
668 ///
669 /// # Example
670 ///
671 /// ```ignore
672 /// // Collect all keys that haven't been accessed recently
673 /// let stale_keys: Vec<_> = runtime.query_keys()
674 /// .filter(|key| tracker.is_stale(key))
675 /// .collect();
676 ///
677 /// // Remove stale queries that have no dependents
678 /// for key in stale_keys {
679 /// runtime.remove_if_unused(&key);
680 /// }
681 /// ```
682 pub fn query_keys(&self) -> Vec<FullCacheKey> {
683 self.whale.keys()
684 }
685
686 /// Remove a query if it has no dependents.
687 ///
688 /// Returns `true` if the query was removed, `false` if it has dependents
689 /// or doesn't exist. This is the safe way to remove queries during GC,
690 /// as it won't break queries that depend on this one.
691 ///
692 /// # Example
693 ///
694 /// ```ignore
695 /// let key = FullCacheKey::new::<MyQuery, _>(&cache_key);
696 /// if runtime.remove_query_if_unused::<MyQuery>(&cache_key) {
697 /// println!("Query removed");
698 /// } else {
699 /// println!("Query has dependents, not removed");
700 /// }
701 /// ```
702 pub fn remove_query_if_unused<Q: Query>(&self, key: &Q::CacheKey) -> bool {
703 let full_key = FullCacheKey::new::<Q, _>(key);
704 self.remove_if_unused(&full_key)
705 }
706
707 /// Remove a query by its [`FullCacheKey`].
708 ///
709 /// This is the type-erased version of [`remove_query`](Self::remove_query).
710 /// Use this when you have a `FullCacheKey` from [`query_keys`](Self::query_keys)
711 /// or [`Tracer::on_query_key`].
712 ///
713 /// Returns `true` if the query was removed, `false` if it doesn't exist.
714 ///
715 /// # Warning
716 ///
717 /// This forcibly removes the query even if other queries depend on it.
718 /// Dependent queries will be recomputed on next access. For safe GC,
719 /// use [`remove_if_unused`](Self::remove_if_unused) instead.
720 pub fn remove(&self, key: &FullCacheKey) -> bool {
721 // Remove verifier if exists
722 self.verifiers.remove(key);
723
724 // Remove from whale storage
725 self.whale.remove(key).is_some()
726 }
727
728 /// Remove a query by its [`FullCacheKey`] if it has no dependents.
729 ///
730 /// This is the type-erased version of [`remove_query_if_unused`](Self::remove_query_if_unused).
731 /// Use this when you have a `FullCacheKey` from [`query_keys`](Self::query_keys)
732 /// or [`Tracer::on_query_key`].
733 ///
734 /// Returns `true` if the query was removed, `false` if it has dependents
735 /// or doesn't exist.
736 ///
737 /// # Example
738 ///
739 /// ```ignore
740 /// // Implement LRU GC
741 /// for key in runtime.query_keys() {
742 /// if tracker.is_expired(&key) {
743 /// runtime.remove_if_unused(&key);
744 /// }
745 /// }
746 /// ```
747 pub fn remove_if_unused(&self, key: &FullCacheKey) -> bool {
748 if self.whale.remove_if_unused(key.clone()).is_some() {
749 // Successfully removed - clean up verifier
750 self.verifiers.remove(key);
751 true
752 } else {
753 false
754 }
755 }
756}
757
758// ============================================================================
759// Builder
760// ============================================================================
761
762/// Builder for [`QueryRuntime`] with customizable settings.
763///
764/// # Example
765///
766/// ```ignore
767/// let runtime = QueryRuntime::builder()
768/// .error_comparator(|a, b| {
769/// // Treat all errors of the same type as equal
770/// a.downcast_ref::<std::io::Error>().is_some()
771/// == b.downcast_ref::<std::io::Error>().is_some()
772/// })
773/// .build();
774/// ```
775pub struct QueryRuntimeBuilder<T: Tracer = NoopTracer> {
776 error_comparator: ErrorComparator,
777 tracer: T,
778}
779
780impl Default for QueryRuntimeBuilder<NoopTracer> {
781 fn default() -> Self {
782 Self::new()
783 }
784}
785
786impl QueryRuntimeBuilder<NoopTracer> {
787 /// Create a new builder with default settings.
788 pub fn new() -> Self {
789 Self {
790 error_comparator: default_error_comparator,
791 tracer: NoopTracer,
792 }
793 }
794}
795
796impl<T: Tracer> QueryRuntimeBuilder<T> {
797 /// Set the error comparator function for early cutoff optimization.
798 ///
799 /// When a query returns `QueryError::UserError`, this function is used
800 /// to compare it with the previously cached error. If they are equal,
801 /// downstream queries can skip recomputation (early cutoff).
802 ///
803 /// The default comparator returns `false` for all errors, meaning errors
804 /// are always considered different (conservative, always recomputes).
805 ///
806 /// # Example
807 ///
808 /// ```ignore
809 /// // Treat errors as equal if they have the same display message
810 /// let runtime = QueryRuntime::builder()
811 /// .error_comparator(|a, b| a.to_string() == b.to_string())
812 /// .build();
813 /// ```
814 pub fn error_comparator(mut self, f: ErrorComparator) -> Self {
815 self.error_comparator = f;
816 self
817 }
818
819 /// Set the tracer for observability.
820 pub fn tracer<U: Tracer>(self, tracer: U) -> QueryRuntimeBuilder<U> {
821 QueryRuntimeBuilder {
822 error_comparator: self.error_comparator,
823 tracer,
824 }
825 }
826
827 /// Build the runtime with the configured settings.
828 pub fn build(self) -> QueryRuntime<T> {
829 QueryRuntime {
830 whale: WhaleRuntime::new(),
831 locators: Arc::new(LocatorStorage::new()),
832 pending: Arc::new(PendingStorage::new()),
833 query_registry: Arc::new(QueryRegistry::new()),
834 asset_key_registry: Arc::new(AssetKeyRegistry::new()),
835 verifiers: Arc::new(VerifierStorage::new()),
836 error_comparator: self.error_comparator,
837 tracer: Arc::new(self.tracer),
838 }
839 }
840}
841
842// ============================================================================
843// Asset API
844// ============================================================================
845
846impl<T: Tracer> QueryRuntime<T> {
847 /// Register an asset locator for a specific asset key type.
848 ///
849 /// Only one locator can be registered per key type. Later registrations
850 /// replace earlier ones.
851 ///
852 /// # Example
853 ///
854 /// ```ignore
855 /// let runtime = QueryRuntime::new();
856 /// runtime.register_asset_locator(FileSystemLocator::new("/assets"));
857 /// ```
858 pub fn register_asset_locator<K, L>(&self, locator: L)
859 where
860 K: AssetKey,
861 L: AssetLocator<K>,
862 {
863 self.locators.insert::<K, L>(locator);
864 }
865
866 /// Get an iterator over pending asset requests.
867 ///
868 /// Returns assets that have been requested but not yet resolved.
869 /// The user should fetch these externally and call `resolve_asset()`.
870 ///
871 /// # Example
872 ///
873 /// ```ignore
874 /// for pending in runtime.pending_assets() {
875 /// if let Some(path) = pending.key::<FilePath>() {
876 /// let content = fetch_file(path);
877 /// runtime.resolve_asset(path.clone(), content);
878 /// }
879 /// }
880 /// ```
881 pub fn pending_assets(&self) -> Vec<PendingAsset> {
882 self.pending.get_all()
883 }
884
885 /// Get pending assets filtered by key type.
886 pub fn pending_assets_of<K: AssetKey>(&self) -> Vec<K> {
887 self.pending.get_of_type::<K>()
888 }
889
890 /// Check if there are any pending assets.
891 pub fn has_pending_assets(&self) -> bool {
892 !self.pending.is_empty()
893 }
894
895 /// Resolve an asset with its loaded value.
896 ///
897 /// This marks the asset as ready and invalidates any queries that
898 /// depend on it (if the value changed), triggering recomputation on next access.
899 ///
900 /// This method is idempotent - resolving with the same value (via `asset_eq`)
901 /// will not trigger downstream recomputation.
902 ///
903 /// # Arguments
904 ///
905 /// * `key` - The asset key identifying this resource
906 /// * `value` - The loaded asset value
907 /// * `durability` - How frequently this asset is expected to change
908 ///
909 /// # Example
910 ///
911 /// ```ignore
912 /// let content = std::fs::read_to_string(&path)?;
913 /// runtime.resolve_asset(FilePath(path), content, DurabilityLevel::Volatile);
914 /// ```
915 pub fn resolve_asset<K: AssetKey>(&self, key: K, value: K::Asset, durability: DurabilityLevel) {
916 self.resolve_asset_internal(key, value, durability);
917 }
918
919 fn resolve_asset_internal<K: AssetKey>(
920 &self,
921 key: K,
922 value: K::Asset,
923 durability_level: DurabilityLevel,
924 ) {
925 let full_asset_key = FullAssetKey::new(&key);
926 let full_cache_key = FullCacheKey::from_asset_key(&full_asset_key);
927
928 // Remove from pending BEFORE registering the value
929 self.pending.remove(&full_asset_key);
930
931 // Prepare the new entry
932 let value_arc: Arc<K::Asset> = Arc::new(value);
933 let entry = CachedEntry::AssetReady(value_arc.clone() as Arc<dyn Any + Send + Sync>);
934 let durability =
935 Durability::new(durability_level.as_u8() as usize).unwrap_or(Durability::volatile());
936
937 // Atomic compare-and-update
938 let result = self
939 .whale
940 .update_with_compare(
941 full_cache_key,
942 Some(entry),
943 |old_data, _new_data| {
944 // Compare old and new values
945 match old_data.and_then(|d| d.as_ref()) {
946 Some(CachedEntry::AssetReady(old_arc)) => {
947 match old_arc.clone().downcast::<K::Asset>() {
948 Ok(old_value) => !K::asset_eq(&old_value, &value_arc),
949 Err(_) => true, // Type mismatch, treat as changed
950 }
951 }
952 _ => true, // Loading, NotFound, or not present -> changed
953 }
954 },
955 durability,
956 vec![],
957 )
958 .expect("update_with_compare with no dependencies cannot fail");
959
960 // Emit asset resolved event
961 self.tracer.on_asset_resolved(
962 TracerAssetKey::new(std::any::type_name::<K>(), format!("{:?}", key)),
963 result.changed,
964 );
965
966 // Register asset key in registry for list_asset_keys
967 let is_new_asset = self.asset_key_registry.register(&key);
968 if is_new_asset {
969 // Update sentinel to invalidate list_asset_keys dependents
970 let sentinel = FullCacheKey::asset_key_set_sentinel::<K>();
971 let _ = self
972 .whale
973 .register(sentinel, None, Durability::stable(), vec![]);
974 }
975 }
976
977 /// Invalidate an asset, forcing queries to re-request it.
978 ///
979 /// The asset will be marked as loading and added to pending assets.
980 /// Dependent queries will suspend until the asset is resolved again.
981 ///
982 /// # Example
983 ///
984 /// ```ignore
985 /// // File was modified externally
986 /// runtime.invalidate_asset(&FilePath("config.json".into()));
987 /// // Queries depending on this asset will now suspend
988 /// // User should fetch the new value and call resolve_asset
989 /// ```
990 pub fn invalidate_asset<K: AssetKey>(&self, key: &K) {
991 let full_asset_key = FullAssetKey::new(key);
992 let full_cache_key = FullCacheKey::from_asset_key(&full_asset_key);
993
994 // Emit asset invalidated event
995 self.tracer.on_asset_invalidated(TracerAssetKey::new(
996 std::any::type_name::<K>(),
997 format!("{:?}", key),
998 ));
999
1000 // Add to pending FIRST (before clearing whale state)
1001 // This ensures: readers see either old value, or Loading+pending
1002 self.pending.insert::<K>(full_asset_key, key.clone());
1003
1004 // Atomic: clear cached value + invalidate dependents
1005 // Using None for data means "needs to be loaded"
1006 // Use stable durability to ensure queries at any durability level see the change.
1007 let _ = self
1008 .whale
1009 .register(full_cache_key, None, Durability::stable(), vec![]);
1010 }
1011
1012 /// Remove an asset from the cache entirely.
1013 ///
1014 /// Unlike `invalidate_asset`, this removes all traces of the asset.
1015 /// Dependent queries will go through the locator again on next access.
1016 pub fn remove_asset<K: AssetKey>(&self, key: &K) {
1017 let full_asset_key = FullAssetKey::new(key);
1018 let full_cache_key = FullCacheKey::from_asset_key(&full_asset_key);
1019
1020 // Remove from pending first
1021 self.pending.remove(&full_asset_key);
1022
1023 // Remove from whale (this also cleans up dependency edges)
1024 // whale.remove() invalidates dependents before removing
1025 self.whale.remove(&full_cache_key);
1026
1027 // Remove from registry and update sentinel for list_asset_keys
1028 if self.asset_key_registry.remove::<K>(key) {
1029 let sentinel = FullCacheKey::asset_key_set_sentinel::<K>();
1030 let _ = self
1031 .whale
1032 .register(sentinel, None, Durability::stable(), vec![]);
1033 }
1034 }
1035
1036 /// Get an asset by key without tracking dependencies.
1037 ///
1038 /// Unlike `QueryContext::asset()`, this method does NOT register the caller
1039 /// as a dependent of the asset. Use this for direct asset access outside
1040 /// of query execution.
1041 ///
1042 /// # Returns
1043 ///
1044 /// - `Ok(AssetLoadingState::ready(...))` - Asset is loaded and ready
1045 /// - `Ok(AssetLoadingState::loading(...))` - Asset is still loading (added to pending)
1046 /// - `Err(QueryError::MissingDependency)` - Asset was not found
1047 pub fn get_asset<K: AssetKey>(&self, key: K) -> Result<AssetLoadingState<K>, QueryError> {
1048 self.get_asset_internal(key)
1049 }
1050
1051 /// Internal: Get asset state and its changed_at revision atomically.
1052 ///
1053 /// Returns (AssetLoadingState, changed_at) where changed_at is from the same
1054 /// whale node that provided the asset value.
1055 fn get_asset_with_revision<K: AssetKey>(
1056 &self,
1057 key: K,
1058 ) -> Result<(AssetLoadingState<K>, RevisionCounter), QueryError> {
1059 let full_asset_key = FullAssetKey::new(&key);
1060 let full_cache_key = FullCacheKey::from_asset_key(&full_asset_key);
1061
1062 // Helper to emit AssetRequested event
1063 let emit_requested = |tracer: &T, key: &K, state: TracerAssetState| {
1064 tracer.on_asset_requested(
1065 TracerAssetKey::new(std::any::type_name::<K>(), format!("{:?}", key)),
1066 state,
1067 );
1068 };
1069
1070 // Check whale cache first (single atomic read)
1071 if let Some(node) = self.whale.get(&full_cache_key) {
1072 let changed_at = node.changed_at;
1073 // Check if valid at current revision
1074 if self.whale.is_valid(&full_cache_key) {
1075 match &node.data {
1076 Some(CachedEntry::AssetReady(arc)) => {
1077 emit_requested(&self.tracer, &key, TracerAssetState::Ready);
1078 match arc.clone().downcast::<K::Asset>() {
1079 Ok(value) => {
1080 return Ok((AssetLoadingState::ready(key, value), changed_at))
1081 }
1082 Err(_) => {
1083 return Err(QueryError::MissingDependency {
1084 description: format!("Asset type mismatch: {:?}", key),
1085 })
1086 }
1087 }
1088 }
1089 Some(CachedEntry::AssetNotFound) => {
1090 emit_requested(&self.tracer, &key, TracerAssetState::NotFound);
1091 return Err(QueryError::MissingDependency {
1092 description: format!("Asset not found: {:?}", key),
1093 });
1094 }
1095 None => {
1096 // Loading state
1097 emit_requested(&self.tracer, &key, TracerAssetState::Loading);
1098 return Ok((AssetLoadingState::loading(key), changed_at));
1099 }
1100 _ => {
1101 // Query-related entries (Ok, UserError) shouldn't be here
1102 // Fall through to locator
1103 }
1104 }
1105 }
1106 }
1107
1108 // Not in cache or invalid - try locator
1109 if let Some(locator) = self.locators.get(TypeId::of::<K>()) {
1110 if let Some(state) = locator.locate_any(&key) {
1111 match state {
1112 AssetState::Ready {
1113 value: arc,
1114 durability: durability_level,
1115 } => {
1116 emit_requested(&self.tracer, &key, TracerAssetState::Ready);
1117
1118 let typed_value: Arc<K::Asset> = match arc.downcast::<K::Asset>() {
1119 Ok(v) => v,
1120 Err(_) => {
1121 return Err(QueryError::MissingDependency {
1122 description: format!("Asset type mismatch: {:?}", key),
1123 });
1124 }
1125 };
1126
1127 // Store in whale atomically with early cutoff
1128 // Use durability from LocateResult::Ready
1129 let entry = CachedEntry::AssetReady(typed_value.clone());
1130 let durability = Durability::new(durability_level.as_u8() as usize)
1131 .unwrap_or(Durability::volatile());
1132 let new_value = typed_value.clone();
1133 let result = self
1134 .whale
1135 .update_with_compare(
1136 full_cache_key,
1137 Some(entry),
1138 |old_data, _new_data| {
1139 let Some(CachedEntry::AssetReady(old_arc)) =
1140 old_data.and_then(|d| d.as_ref())
1141 else {
1142 return true;
1143 };
1144 let Ok(old_value) = old_arc.clone().downcast::<K::Asset>()
1145 else {
1146 return true;
1147 };
1148 !K::asset_eq(&old_value, &new_value)
1149 },
1150 durability,
1151 vec![],
1152 )
1153 .expect("update_with_compare with no dependencies cannot fail");
1154
1155 return Ok((AssetLoadingState::ready(key, typed_value), result.revision));
1156 }
1157 AssetState::Loading => {
1158 emit_requested(&self.tracer, &key, TracerAssetState::Loading);
1159
1160 self.pending.insert::<K>(full_asset_key, key.clone());
1161 match self
1162 .whale
1163 .get_or_insert(full_cache_key, None, Durability::volatile(), vec![])
1164 .expect("get_or_insert with no dependencies cannot fail")
1165 {
1166 GetOrInsertResult::Inserted(node) => {
1167 return Ok((AssetLoadingState::loading(key), node.changed_at));
1168 }
1169 GetOrInsertResult::Existing(node) => {
1170 let changed_at = node.changed_at;
1171 match &node.data {
1172 Some(CachedEntry::AssetReady(arc)) => {
1173 match arc.clone().downcast::<K::Asset>() {
1174 Ok(value) => {
1175 return Ok((
1176 AssetLoadingState::ready(key, value),
1177 changed_at,
1178 ))
1179 }
1180 Err(_) => {
1181 return Ok((
1182 AssetLoadingState::loading(key),
1183 changed_at,
1184 ))
1185 }
1186 }
1187 }
1188 Some(CachedEntry::AssetNotFound) => {
1189 return Err(QueryError::MissingDependency {
1190 description: format!("Asset not found: {:?}", key),
1191 });
1192 }
1193 _ => return Ok((AssetLoadingState::loading(key), changed_at)),
1194 }
1195 }
1196 }
1197 }
1198 AssetState::NotFound => {
1199 emit_requested(&self.tracer, &key, TracerAssetState::NotFound);
1200
1201 let entry = CachedEntry::AssetNotFound;
1202 let durability = Durability::volatile();
1203 let _ = self.whale.update_with_compare(
1204 full_cache_key,
1205 Some(entry),
1206 |old_data, _new_data| {
1207 !matches!(
1208 old_data.and_then(|d| d.as_ref()),
1209 Some(CachedEntry::AssetNotFound)
1210 )
1211 },
1212 durability,
1213 vec![],
1214 );
1215
1216 return Err(QueryError::MissingDependency {
1217 description: format!("Asset not found: {:?}", key),
1218 });
1219 }
1220 }
1221 }
1222 }
1223
1224 // No locator registered or locator returned None - mark as pending
1225 emit_requested(&self.tracer, &key, TracerAssetState::Loading);
1226 self.pending
1227 .insert::<K>(full_asset_key.clone(), key.clone());
1228
1229 match self
1230 .whale
1231 .get_or_insert(full_cache_key, None, Durability::volatile(), vec![])
1232 .expect("get_or_insert with no dependencies cannot fail")
1233 {
1234 GetOrInsertResult::Inserted(node) => {
1235 Ok((AssetLoadingState::loading(key), node.changed_at))
1236 }
1237 GetOrInsertResult::Existing(node) => {
1238 let changed_at = node.changed_at;
1239 match &node.data {
1240 Some(CachedEntry::AssetReady(arc)) => {
1241 match arc.clone().downcast::<K::Asset>() {
1242 Ok(value) => Ok((AssetLoadingState::ready(key, value), changed_at)),
1243 Err(_) => Ok((AssetLoadingState::loading(key), changed_at)),
1244 }
1245 }
1246 Some(CachedEntry::AssetNotFound) => Err(QueryError::MissingDependency {
1247 description: format!("Asset not found: {:?}", key),
1248 }),
1249 _ => Ok((AssetLoadingState::loading(key), changed_at)),
1250 }
1251 }
1252 }
1253 }
1254
1255 /// Internal: Get asset state, checking cache and locator.
1256 fn get_asset_internal<K: AssetKey>(&self, key: K) -> Result<AssetLoadingState<K>, QueryError> {
1257 self.get_asset_with_revision(key).map(|(state, _)| state)
1258 }
1259}
1260
1261impl<T: Tracer> Db for QueryRuntime<T> {
1262 fn query<Q: Query>(&self, query: Q) -> Result<Arc<Q::Output>, QueryError> {
1263 QueryRuntime::query(self, query)
1264 }
1265
1266 fn asset<K: AssetKey>(&self, key: K) -> Result<AssetLoadingState<K>, QueryError> {
1267 self.get_asset_internal(key)
1268 }
1269
1270 fn list_queries<Q: Query>(&self) -> Vec<Q> {
1271 self.query_registry.get_all::<Q>()
1272 }
1273
1274 fn list_asset_keys<K: AssetKey>(&self) -> Vec<K> {
1275 self.asset_key_registry.get_all::<K>()
1276 }
1277}
1278
1279/// Context provided to queries during execution.
1280///
1281/// Use this to access dependencies via `query()`.
1282pub struct QueryContext<'a, T: Tracer = NoopTracer> {
1283 runtime: &'a QueryRuntime<T>,
1284 current_key: FullCacheKey,
1285 parent_query_type: &'static str,
1286 exec_ctx: ExecutionContext,
1287 deps: RefCell<Vec<FullCacheKey>>,
1288 /// Revision at first asset access, used for consistency checking.
1289 /// Assets with changed_at > this value were modified during query execution.
1290 first_access_revision: Cell<Option<RevisionCounter>>,
1291}
1292
1293impl<'a, T: Tracer> QueryContext<'a, T> {
1294 /// Ensure consistency of asset access.
1295 ///
1296 /// On first asset access: records max(current_global, dep_changed_at) as baseline.
1297 /// On subsequent asset accesses: checks dep_changed_at <= baseline.
1298 ///
1299 /// IMPORTANT: current_global must be obtained BEFORE accessing the asset.
1300 #[inline]
1301 fn ensure_consistent(
1302 &self,
1303 current_global: RevisionCounter,
1304 dep_changed_at: RevisionCounter,
1305 ) -> Result<(), QueryError> {
1306 match self.first_access_revision.get() {
1307 None => {
1308 // First asset access: record max(current_global, dep_changed_at)
1309 // Using max ensures we don't get false positives when only one asset is accessed
1310 let first = current_global.max(dep_changed_at);
1311 self.first_access_revision.set(Some(first));
1312 Ok(())
1313 }
1314 Some(first) => {
1315 // Subsequent asset accesses: check consistency
1316 if dep_changed_at > first {
1317 Err(QueryError::InconsistentAssetResolution)
1318 } else {
1319 Ok(())
1320 }
1321 }
1322 }
1323 }
1324
1325 /// Query a dependency.
1326 ///
1327 /// The dependency is automatically tracked for invalidation.
1328 ///
1329 /// # Example
1330 ///
1331 /// ```ignore
1332 /// fn query(self, db: &impl Db) -> Result<Self::Output, QueryError> {
1333 /// let dep_result = db.query(OtherQuery { id: self.id })?;
1334 /// Ok(process(&dep_result))
1335 /// }
1336 /// ```
1337 pub fn query<Q: Query>(&self, query: Q) -> Result<Arc<Q::Output>, QueryError> {
1338 let key = query.cache_key();
1339 let full_key = FullCacheKey::new::<Q, _>(&key);
1340
1341 // Emit dependency registered event
1342 self.runtime.tracer.on_dependency_registered(
1343 self.exec_ctx.span_id(),
1344 TracerQueryKey::new(self.parent_query_type, self.current_key.debug_repr()),
1345 TracerQueryKey::new(std::any::type_name::<Q>(), full_key.debug_repr()),
1346 );
1347
1348 // Record this as a dependency
1349 self.deps.borrow_mut().push(full_key.clone());
1350
1351 // Execute the query
1352 self.runtime.query(query)
1353 }
1354
1355 /// Access an asset, tracking it as a dependency.
1356 ///
1357 /// Returns `AssetLoadingState<K>`:
1358 /// - `is_loading()` if the asset is still being loaded
1359 /// - `is_ready()` if the asset is available
1360 ///
1361 /// Use `.suspend()?` to convert to `Result<Arc<K::Asset>, QueryError>`,
1362 /// which returns `Err(QueryError::Suspend { asset })` if still loading.
1363 ///
1364 /// # Example
1365 ///
1366 /// ```ignore
1367 /// #[query]
1368 /// fn process_file(db: &impl Db, path: FilePath) -> Result<Output, QueryError> {
1369 /// let content = db.asset(path)?.suspend()?;
1370 /// // Process content...
1371 /// Ok(output)
1372 /// }
1373 /// ```
1374 ///
1375 /// # Errors
1376 ///
1377 /// Returns `Err(QueryError::MissingDependency)` if the asset was not found.
1378 pub fn asset<K: AssetKey>(&self, key: K) -> Result<AssetLoadingState<K>, QueryError> {
1379 let full_asset_key = FullAssetKey::new(&key);
1380 let full_cache_key = FullCacheKey::from_asset_key(&full_asset_key);
1381
1382 // 1. Get current_global FIRST (before accessing the asset)
1383 let current_global = self
1384 .runtime
1385 .whale
1386 .current_revision()
1387 .get(Durability::volatile());
1388
1389 // 2. Emit asset dependency registered event
1390 self.runtime.tracer.on_asset_dependency_registered(
1391 self.exec_ctx.span_id(),
1392 TracerQueryKey::new(self.parent_query_type, self.current_key.debug_repr()),
1393 TracerAssetKey::new(std::any::type_name::<K>(), format!("{:?}", key)),
1394 );
1395
1396 // 3. Record dependency on this asset
1397 self.deps.borrow_mut().push(full_cache_key);
1398
1399 // 4. Get asset AND changed_at from the same whale access (atomic)
1400 let (result, dep_changed_at) = match self.runtime.get_asset_with_revision(key) {
1401 Ok((state, rev)) => (Ok(state), rev),
1402 Err(e) => return Err(e),
1403 };
1404
1405 // 5. Check consistency - detects if resolve_asset was called during query execution
1406 self.ensure_consistent(current_global, dep_changed_at)?;
1407
1408 // Emit missing dependency event on error
1409 if let Err(QueryError::MissingDependency { ref description }) = result {
1410 self.runtime.tracer.on_missing_dependency(
1411 TracerQueryKey::new(self.parent_query_type, self.current_key.debug_repr()),
1412 description.clone(),
1413 );
1414 }
1415
1416 result
1417 }
1418
1419 /// List all query instances of type Q that have been registered.
1420 ///
1421 /// This method establishes a dependency on the "set" of queries of type Q.
1422 /// The calling query will be invalidated when:
1423 /// - A new query of type Q is first executed (added to set)
1424 ///
1425 /// The calling query will NOT be invalidated when:
1426 /// - An individual query of type Q has its value change
1427 ///
1428 /// # Example
1429 ///
1430 /// ```ignore
1431 /// #[query]
1432 /// fn all_results(db: &impl Db) -> Result<Vec<i32>, QueryError> {
1433 /// let queries = db.list_queries::<MyQuery>();
1434 /// let mut results = Vec::new();
1435 /// for q in queries {
1436 /// results.push(*db.query(q)?);
1437 /// }
1438 /// Ok(results)
1439 /// }
1440 /// ```
1441 pub fn list_queries<Q: Query>(&self) -> Vec<Q> {
1442 // Record dependency on the sentinel (set-level dependency)
1443 let sentinel = FullCacheKey::query_set_sentinel::<Q>();
1444
1445 self.runtime.tracer.on_dependency_registered(
1446 self.exec_ctx.span_id(),
1447 TracerQueryKey::new(self.parent_query_type, self.current_key.debug_repr()),
1448 TracerQueryKey::new("QuerySet", sentinel.debug_repr()),
1449 );
1450
1451 // Ensure sentinel exists in whale (for dependency tracking)
1452 if self.runtime.whale.get(&sentinel).is_none() {
1453 let _ =
1454 self.runtime
1455 .whale
1456 .register(sentinel.clone(), None, Durability::volatile(), vec![]);
1457 }
1458
1459 self.deps.borrow_mut().push(sentinel);
1460
1461 // Return all registered queries
1462 self.runtime.query_registry.get_all::<Q>()
1463 }
1464
1465 /// List all asset keys of type K that have been registered.
1466 ///
1467 /// This method establishes a dependency on the "set" of asset keys of type K.
1468 /// The calling query will be invalidated when:
1469 /// - A new asset of type K is resolved for the first time (added to set)
1470 /// - An asset of type K is removed via remove_asset
1471 ///
1472 /// The calling query will NOT be invalidated when:
1473 /// - An individual asset's value changes (use `db.asset()` for that)
1474 ///
1475 /// # Example
1476 ///
1477 /// ```ignore
1478 /// #[query]
1479 /// fn all_configs(db: &impl Db) -> Result<Vec<String>, QueryError> {
1480 /// let keys = db.list_asset_keys::<ConfigFile>();
1481 /// let mut contents = Vec::new();
1482 /// for key in keys {
1483 /// let content = db.asset(&key)?.suspend()?;
1484 /// contents.push((*content).clone());
1485 /// }
1486 /// Ok(contents)
1487 /// }
1488 /// ```
1489 pub fn list_asset_keys<K: AssetKey>(&self) -> Vec<K> {
1490 // Record dependency on the sentinel (set-level dependency)
1491 let sentinel = FullCacheKey::asset_key_set_sentinel::<K>();
1492
1493 self.runtime.tracer.on_asset_dependency_registered(
1494 self.exec_ctx.span_id(),
1495 TracerQueryKey::new(self.parent_query_type, self.current_key.debug_repr()),
1496 TracerAssetKey::new("AssetKeySet", sentinel.debug_repr()),
1497 );
1498
1499 // Ensure sentinel exists in whale (for dependency tracking)
1500 if self.runtime.whale.get(&sentinel).is_none() {
1501 let _ =
1502 self.runtime
1503 .whale
1504 .register(sentinel.clone(), None, Durability::volatile(), vec![]);
1505 }
1506
1507 self.deps.borrow_mut().push(sentinel);
1508
1509 // Return all registered asset keys
1510 self.runtime.asset_key_registry.get_all::<K>()
1511 }
1512}
1513
1514impl<'a, T: Tracer> Db for QueryContext<'a, T> {
1515 fn query<Q: Query>(&self, query: Q) -> Result<Arc<Q::Output>, QueryError> {
1516 QueryContext::query(self, query)
1517 }
1518
1519 fn asset<K: AssetKey>(&self, key: K) -> Result<AssetLoadingState<K>, QueryError> {
1520 QueryContext::asset(self, key)
1521 }
1522
1523 fn list_queries<Q: Query>(&self) -> Vec<Q> {
1524 QueryContext::list_queries(self)
1525 }
1526
1527 fn list_asset_keys<K: AssetKey>(&self) -> Vec<K> {
1528 QueryContext::list_asset_keys(self)
1529 }
1530}
1531
1532#[cfg(test)]
1533mod tests {
1534 use super::*;
1535
1536 #[test]
1537 fn test_simple_query() {
1538 #[derive(Clone)]
1539 struct Add {
1540 a: i32,
1541 b: i32,
1542 }
1543
1544 impl Query for Add {
1545 type CacheKey = (i32, i32);
1546 type Output = i32;
1547
1548 fn cache_key(&self) -> Self::CacheKey {
1549 (self.a, self.b)
1550 }
1551
1552 fn query(self, _db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
1553 Ok(Arc::new(self.a + self.b))
1554 }
1555
1556 fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
1557 old == new
1558 }
1559 }
1560
1561 let runtime = QueryRuntime::new();
1562
1563 let result = runtime.query(Add { a: 1, b: 2 }).unwrap();
1564 assert_eq!(*result, 3);
1565
1566 // Second query should be cached
1567 let result2 = runtime.query(Add { a: 1, b: 2 }).unwrap();
1568 assert_eq!(*result2, 3);
1569 }
1570
1571 #[test]
1572 fn test_dependent_queries() {
1573 #[derive(Clone)]
1574 struct Base {
1575 value: i32,
1576 }
1577
1578 impl Query for Base {
1579 type CacheKey = i32;
1580 type Output = i32;
1581
1582 fn cache_key(&self) -> Self::CacheKey {
1583 self.value
1584 }
1585
1586 fn query(self, _db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
1587 Ok(Arc::new(self.value * 2))
1588 }
1589
1590 fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
1591 old == new
1592 }
1593 }
1594
1595 #[derive(Clone)]
1596 struct Derived {
1597 base_value: i32,
1598 }
1599
1600 impl Query for Derived {
1601 type CacheKey = i32;
1602 type Output = i32;
1603
1604 fn cache_key(&self) -> Self::CacheKey {
1605 self.base_value
1606 }
1607
1608 fn query(self, db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
1609 let base = db.query(Base {
1610 value: self.base_value,
1611 })?;
1612 Ok(Arc::new(*base + 10))
1613 }
1614
1615 fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
1616 old == new
1617 }
1618 }
1619
1620 let runtime = QueryRuntime::new();
1621
1622 let result = runtime.query(Derived { base_value: 5 }).unwrap();
1623 assert_eq!(*result, 20); // 5 * 2 + 10
1624 }
1625
1626 #[test]
1627 fn test_cycle_detection() {
1628 #[derive(Clone)]
1629 struct CycleA {
1630 id: i32,
1631 }
1632
1633 #[derive(Clone)]
1634 struct CycleB {
1635 id: i32,
1636 }
1637
1638 impl Query for CycleA {
1639 type CacheKey = i32;
1640 type Output = i32;
1641
1642 fn cache_key(&self) -> Self::CacheKey {
1643 self.id
1644 }
1645
1646 fn query(self, db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
1647 let b = db.query(CycleB { id: self.id })?;
1648 Ok(Arc::new(*b + 1))
1649 }
1650
1651 fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
1652 old == new
1653 }
1654 }
1655
1656 impl Query for CycleB {
1657 type CacheKey = i32;
1658 type Output = i32;
1659
1660 fn cache_key(&self) -> Self::CacheKey {
1661 self.id
1662 }
1663
1664 fn query(self, db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
1665 let a = db.query(CycleA { id: self.id })?;
1666 Ok(Arc::new(*a + 1))
1667 }
1668
1669 fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
1670 old == new
1671 }
1672 }
1673
1674 let runtime = QueryRuntime::new();
1675
1676 let result = runtime.query(CycleA { id: 1 });
1677 assert!(matches!(result, Err(QueryError::Cycle { .. })));
1678 }
1679
1680 #[test]
1681 fn test_fallible_query() {
1682 #[derive(Clone)]
1683 struct ParseInt {
1684 input: String,
1685 }
1686
1687 impl Query for ParseInt {
1688 type CacheKey = String;
1689 type Output = Result<i32, std::num::ParseIntError>;
1690
1691 fn cache_key(&self) -> Self::CacheKey {
1692 self.input.clone()
1693 }
1694
1695 fn query(self, _db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
1696 Ok(Arc::new(self.input.parse()))
1697 }
1698
1699 fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
1700 old == new
1701 }
1702 }
1703
1704 let runtime = QueryRuntime::new();
1705
1706 // Valid parse
1707 let result = runtime
1708 .query(ParseInt {
1709 input: "42".to_string(),
1710 })
1711 .unwrap();
1712 assert_eq!(*result, Ok(42));
1713
1714 // Invalid parse - system succeeds, user error in output
1715 let result = runtime
1716 .query(ParseInt {
1717 input: "not_a_number".to_string(),
1718 })
1719 .unwrap();
1720 assert!(result.is_err());
1721 }
1722
1723 // Macro tests
1724 mod macro_tests {
1725 use super::*;
1726 use crate::query;
1727
1728 #[query]
1729 fn add(db: &impl Db, a: i32, b: i32) -> Result<i32, QueryError> {
1730 let _ = db; // silence unused warning
1731 Ok(a + b)
1732 }
1733
1734 #[test]
1735 fn test_macro_basic() {
1736 let runtime = QueryRuntime::new();
1737 let result = runtime.query(Add::new(1, 2)).unwrap();
1738 assert_eq!(*result, 3);
1739 }
1740
1741 #[query]
1742 fn simple_double(db: &impl Db, x: i32) -> Result<i32, QueryError> {
1743 let _ = db;
1744 Ok(x * 2)
1745 }
1746
1747 #[test]
1748 fn test_macro_simple() {
1749 let runtime = QueryRuntime::new();
1750 let result = runtime.query(SimpleDouble::new(5)).unwrap();
1751 assert_eq!(*result, 10);
1752 }
1753
1754 #[query(keys(id))]
1755 fn with_key_selection(
1756 db: &impl Db,
1757 id: u32,
1758 include_extra: bool,
1759 ) -> Result<String, QueryError> {
1760 let _ = db;
1761 Ok(format!("id={}, extra={}", id, include_extra))
1762 }
1763
1764 #[test]
1765 fn test_macro_key_selection() {
1766 let runtime = QueryRuntime::new();
1767
1768 // Same id, different include_extra - should return cached
1769 let r1 = runtime.query(WithKeySelection::new(1, true)).unwrap();
1770 let r2 = runtime.query(WithKeySelection::new(1, false)).unwrap();
1771
1772 // Both should have same value because only `id` is the key
1773 assert_eq!(*r1, "id=1, extra=true");
1774 assert_eq!(*r2, "id=1, extra=true"); // Cached!
1775 }
1776
1777 #[query]
1778 fn dependent(db: &impl Db, a: i32, b: i32) -> Result<i32, QueryError> {
1779 let sum = db.query(Add::new(a, b))?;
1780 Ok(*sum * 2)
1781 }
1782
1783 #[test]
1784 fn test_macro_dependencies() {
1785 let runtime = QueryRuntime::new();
1786 let result = runtime.query(Dependent::new(3, 4)).unwrap();
1787 assert_eq!(*result, 14); // (3 + 4) * 2
1788 }
1789
1790 #[query(output_eq)]
1791 fn with_output_eq(db: &impl Db, x: i32) -> Result<i32, QueryError> {
1792 let _ = db;
1793 Ok(x * 2)
1794 }
1795
1796 #[test]
1797 fn test_macro_output_eq() {
1798 let runtime = QueryRuntime::new();
1799 let result = runtime.query(WithOutputEq::new(5)).unwrap();
1800 assert_eq!(*result, 10);
1801 }
1802
1803 #[query(name = "CustomName")]
1804 fn original_name(db: &impl Db, x: i32) -> Result<i32, QueryError> {
1805 let _ = db;
1806 Ok(x)
1807 }
1808
1809 #[test]
1810 fn test_macro_custom_name() {
1811 let runtime = QueryRuntime::new();
1812 let result = runtime.query(CustomName::new(42)).unwrap();
1813 assert_eq!(*result, 42);
1814 }
1815
1816 // Test that attribute macros like #[tracing::instrument] are preserved
1817 // We use #[allow(unused_variables)] and #[inline] as test attributes since
1818 // they don't require external dependencies.
1819 #[allow(unused_variables)]
1820 #[inline]
1821 #[query]
1822 fn with_attributes(db: &impl Db, x: i32) -> Result<i32, QueryError> {
1823 // This would warn without #[allow(unused_variables)] on the generated method
1824 let unused_var = 42;
1825 Ok(x * 2)
1826 }
1827
1828 #[test]
1829 fn test_macro_preserves_attributes() {
1830 let runtime = QueryRuntime::new();
1831 // If attributes weren't preserved, this might warn about unused_var
1832 let result = runtime.query(WithAttributes::new(5)).unwrap();
1833 assert_eq!(*result, 10);
1834 }
1835 }
1836
1837 // Tests for poll() and changed_at()
1838 mod poll_tests {
1839 use super::*;
1840
1841 #[derive(Clone)]
1842 struct Counter {
1843 id: i32,
1844 }
1845
1846 impl Query for Counter {
1847 type CacheKey = i32;
1848 type Output = i32;
1849
1850 fn cache_key(&self) -> Self::CacheKey {
1851 self.id
1852 }
1853
1854 fn query(self, _db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
1855 Ok(Arc::new(self.id * 10))
1856 }
1857
1858 fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
1859 old == new
1860 }
1861 }
1862
1863 #[test]
1864 fn test_poll_returns_value_and_revision() {
1865 let runtime = QueryRuntime::new();
1866
1867 let result = runtime.poll(Counter { id: 1 }).unwrap();
1868
1869 // Value should be correct - access through Result and Arc
1870 assert_eq!(**result.value.as_ref().unwrap(), 10);
1871
1872 // Revision should be non-zero after first execution
1873 assert!(result.revision > 0);
1874 }
1875
1876 #[test]
1877 fn test_poll_revision_stable_on_cache_hit() {
1878 let runtime = QueryRuntime::new();
1879
1880 // First poll
1881 let result1 = runtime.poll(Counter { id: 1 }).unwrap();
1882 let rev1 = result1.revision;
1883
1884 // Second poll (cache hit)
1885 let result2 = runtime.poll(Counter { id: 1 }).unwrap();
1886 let rev2 = result2.revision;
1887
1888 // Revision should be the same (no change)
1889 assert_eq!(rev1, rev2);
1890 }
1891
1892 #[test]
1893 fn test_poll_revision_changes_on_invalidate() {
1894 let runtime = QueryRuntime::new();
1895
1896 // First poll
1897 let result1 = runtime.poll(Counter { id: 1 }).unwrap();
1898 let rev1 = result1.revision;
1899
1900 // Invalidate and poll again
1901 runtime.invalidate::<Counter>(&1);
1902 let result2 = runtime.poll(Counter { id: 1 }).unwrap();
1903 let rev2 = result2.revision;
1904
1905 // Revision should increase (value was recomputed)
1906 // Note: Since output_eq returns true (same value), this might not change
1907 // depending on early cutoff behavior. Let's verify the value is still correct.
1908 assert_eq!(**result2.value.as_ref().unwrap(), 10);
1909
1910 // With early cutoff, revision might stay the same if value didn't change
1911 // This is expected behavior
1912 assert!(rev2 >= rev1);
1913 }
1914
1915 #[test]
1916 fn test_changed_at_returns_none_for_unexecuted_query() {
1917 let runtime = QueryRuntime::new();
1918
1919 // Query has never been executed
1920 let rev = runtime.changed_at::<Counter>(&1);
1921 assert!(rev.is_none());
1922 }
1923
1924 #[test]
1925 fn test_changed_at_returns_revision_after_execution() {
1926 let runtime = QueryRuntime::new();
1927
1928 // Execute the query
1929 let _ = runtime.query(Counter { id: 1 }).unwrap();
1930
1931 // Now changed_at should return Some
1932 let rev = runtime.changed_at::<Counter>(&1);
1933 assert!(rev.is_some());
1934 assert!(rev.unwrap() > 0);
1935 }
1936
1937 #[test]
1938 fn test_changed_at_matches_poll_revision() {
1939 let runtime = QueryRuntime::new();
1940
1941 // Poll the query
1942 let result = runtime.poll(Counter { id: 1 }).unwrap();
1943
1944 // changed_at should match the revision from poll
1945 let rev = runtime.changed_at::<Counter>(&1);
1946 assert_eq!(rev, Some(result.revision));
1947 }
1948
1949 #[test]
1950 fn test_poll_value_access() {
1951 let runtime = QueryRuntime::new();
1952
1953 let result = runtime.poll(Counter { id: 5 }).unwrap();
1954
1955 // Access through Result and Arc
1956 let value: &i32 = result.value.as_ref().unwrap();
1957 assert_eq!(*value, 50);
1958
1959 // Access Arc directly via field after unwrapping Result
1960 let arc: &Arc<i32> = result.value.as_ref().unwrap();
1961 assert_eq!(**arc, 50);
1962 }
1963
1964 #[test]
1965 fn test_subscription_pattern() {
1966 let runtime = QueryRuntime::new();
1967
1968 // Simulate subscription pattern
1969 let mut last_revision: RevisionCounter = 0;
1970 let mut notifications = 0;
1971
1972 // First poll - should notify (new value)
1973 let result = runtime.poll(Counter { id: 1 }).unwrap();
1974 if result.revision > last_revision {
1975 notifications += 1;
1976 last_revision = result.revision;
1977 }
1978
1979 // Second poll - should NOT notify (no change)
1980 let result = runtime.poll(Counter { id: 1 }).unwrap();
1981 if result.revision > last_revision {
1982 notifications += 1;
1983 last_revision = result.revision;
1984 }
1985
1986 // Third poll - should NOT notify (no change)
1987 let result = runtime.poll(Counter { id: 1 }).unwrap();
1988 if result.revision > last_revision {
1989 notifications += 1;
1990 #[allow(unused_assignments)]
1991 {
1992 last_revision = result.revision;
1993 }
1994 }
1995
1996 // Only the first poll should have triggered a notification
1997 assert_eq!(notifications, 1);
1998 }
1999 }
2000
2001 // Tests for GC APIs
2002 mod gc_tests {
2003 use super::*;
2004 use std::collections::HashSet;
2005 use std::sync::atomic::{AtomicUsize, Ordering};
2006 use std::sync::Mutex;
2007
2008 #[derive(Clone)]
2009 struct Leaf {
2010 id: i32,
2011 }
2012
2013 impl Query for Leaf {
2014 type CacheKey = i32;
2015 type Output = i32;
2016
2017 fn cache_key(&self) -> Self::CacheKey {
2018 self.id
2019 }
2020
2021 fn query(self, _db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
2022 Ok(Arc::new(self.id * 10))
2023 }
2024
2025 fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
2026 old == new
2027 }
2028 }
2029
2030 #[derive(Clone)]
2031 struct Parent {
2032 child_id: i32,
2033 }
2034
2035 impl Query for Parent {
2036 type CacheKey = i32;
2037 type Output = i32;
2038
2039 fn cache_key(&self) -> Self::CacheKey {
2040 self.child_id
2041 }
2042
2043 fn query(self, db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
2044 let child = db.query(Leaf { id: self.child_id })?;
2045 Ok(Arc::new(*child + 1))
2046 }
2047
2048 fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
2049 old == new
2050 }
2051 }
2052
2053 #[test]
2054 fn test_query_keys_returns_all_cached_queries() {
2055 let runtime = QueryRuntime::new();
2056
2057 // Execute some queries
2058 let _ = runtime.query(Leaf { id: 1 }).unwrap();
2059 let _ = runtime.query(Leaf { id: 2 }).unwrap();
2060 let _ = runtime.query(Leaf { id: 3 }).unwrap();
2061
2062 // Get all keys
2063 let keys = runtime.query_keys();
2064
2065 // Should have at least 3 keys (might have more due to sentinels)
2066 assert!(keys.len() >= 3);
2067 }
2068
2069 #[test]
2070 fn test_remove_removes_query() {
2071 let runtime = QueryRuntime::new();
2072
2073 // Execute a query
2074 let _ = runtime.query(Leaf { id: 1 }).unwrap();
2075
2076 // Get the key
2077 let full_key = FullCacheKey::new::<Leaf, _>(&1);
2078
2079 // Query should exist
2080 assert!(runtime.changed_at::<Leaf>(&1).is_some());
2081
2082 // Remove it
2083 assert!(runtime.remove(&full_key));
2084
2085 // Query should no longer exist
2086 assert!(runtime.changed_at::<Leaf>(&1).is_none());
2087 }
2088
2089 #[test]
2090 fn test_remove_if_unused_removes_leaf_query() {
2091 let runtime = QueryRuntime::new();
2092
2093 // Execute a leaf query (no dependents)
2094 let _ = runtime.query(Leaf { id: 1 }).unwrap();
2095
2096 // Should be removable since no other query depends on it
2097 assert!(runtime.remove_query_if_unused::<Leaf>(&1));
2098
2099 // Query should no longer exist
2100 assert!(runtime.changed_at::<Leaf>(&1).is_none());
2101 }
2102
2103 #[test]
2104 fn test_remove_if_unused_does_not_remove_query_with_dependents() {
2105 let runtime = QueryRuntime::new();
2106
2107 // Execute parent query (which depends on Leaf)
2108 let _ = runtime.query(Parent { child_id: 1 }).unwrap();
2109
2110 // Leaf query should not be removable since Parent depends on it
2111 assert!(!runtime.remove_query_if_unused::<Leaf>(&1));
2112
2113 // Leaf query should still exist
2114 assert!(runtime.changed_at::<Leaf>(&1).is_some());
2115
2116 // But Parent should be removable (no dependents)
2117 assert!(runtime.remove_query_if_unused::<Parent>(&1));
2118 }
2119
2120 #[test]
2121 fn test_remove_if_unused_with_full_cache_key() {
2122 let runtime = QueryRuntime::new();
2123
2124 // Execute a leaf query
2125 let _ = runtime.query(Leaf { id: 1 }).unwrap();
2126
2127 let full_key = FullCacheKey::new::<Leaf, _>(&1);
2128
2129 // Should be removable via FullCacheKey
2130 assert!(runtime.remove_if_unused(&full_key));
2131
2132 // Query should no longer exist
2133 assert!(runtime.changed_at::<Leaf>(&1).is_none());
2134 }
2135
2136 // Test tracer receives on_query_key calls
2137 struct GcTracker {
2138 accessed_keys: Mutex<HashSet<String>>,
2139 access_count: AtomicUsize,
2140 }
2141
2142 impl GcTracker {
2143 fn new() -> Self {
2144 Self {
2145 accessed_keys: Mutex::new(HashSet::new()),
2146 access_count: AtomicUsize::new(0),
2147 }
2148 }
2149 }
2150
2151 impl Tracer for GcTracker {
2152 fn new_span_id(&self) -> SpanId {
2153 SpanId(1)
2154 }
2155
2156 fn on_query_key(&self, full_key: &FullCacheKey) {
2157 self.accessed_keys
2158 .lock()
2159 .unwrap()
2160 .insert(full_key.debug_repr().to_string());
2161 self.access_count.fetch_add(1, Ordering::Relaxed);
2162 }
2163 }
2164
2165 #[test]
2166 fn test_tracer_receives_on_query_key() {
2167 let tracker = GcTracker::new();
2168 let runtime = QueryRuntime::with_tracer(tracker);
2169
2170 // Execute some queries
2171 let _ = runtime.query(Leaf { id: 1 }).unwrap();
2172 let _ = runtime.query(Leaf { id: 2 }).unwrap();
2173
2174 // Tracer should have received on_query_key calls
2175 let count = runtime.tracer().access_count.load(Ordering::Relaxed);
2176 assert_eq!(count, 2);
2177
2178 // Check that the keys were recorded
2179 let keys = runtime.tracer().accessed_keys.lock().unwrap();
2180 assert!(keys.iter().any(|k| k.contains("Leaf")));
2181 }
2182
2183 #[test]
2184 fn test_tracer_receives_on_query_key_for_cache_hits() {
2185 let tracker = GcTracker::new();
2186 let runtime = QueryRuntime::with_tracer(tracker);
2187
2188 // Execute query twice (second is cache hit)
2189 let _ = runtime.query(Leaf { id: 1 }).unwrap();
2190 let _ = runtime.query(Leaf { id: 1 }).unwrap();
2191
2192 // Tracer should have received on_query_key for both calls
2193 let count = runtime.tracer().access_count.load(Ordering::Relaxed);
2194 assert_eq!(count, 2);
2195 }
2196
2197 #[test]
2198 fn test_gc_workflow() {
2199 let tracker = GcTracker::new();
2200 let runtime = QueryRuntime::with_tracer(tracker);
2201
2202 // Execute some queries
2203 let _ = runtime.query(Leaf { id: 1 }).unwrap();
2204 let _ = runtime.query(Leaf { id: 2 }).unwrap();
2205 let _ = runtime.query(Leaf { id: 3 }).unwrap();
2206
2207 // Simulate GC: remove all queries that are not in use
2208 let mut removed = 0;
2209 for key in runtime.query_keys() {
2210 if runtime.remove_if_unused(&key) {
2211 removed += 1;
2212 }
2213 }
2214
2215 // All leaf queries should be removable
2216 assert!(removed >= 3);
2217
2218 // Queries should no longer exist
2219 assert!(runtime.changed_at::<Leaf>(&1).is_none());
2220 assert!(runtime.changed_at::<Leaf>(&2).is_none());
2221 assert!(runtime.changed_at::<Leaf>(&3).is_none());
2222 }
2223 }
2224}