query_flow/runtime.rs
1//! Query runtime and context.
2
3use std::any::{Any, TypeId};
4use std::cell::{Cell, RefCell};
5use std::ops::Deref;
6use std::sync::Arc;
7
8use whale::{Durability, GetOrInsertResult, RevisionCounter, Runtime as WhaleRuntime};
9
10use crate::asset::{AssetKey, AssetLocator, DurabilityLevel, FullAssetKey, PendingAsset};
11use crate::db::Db;
12use crate::key::FullCacheKey;
13use crate::loading::AssetLoadingState;
14use crate::query::Query;
15use crate::storage::{
16 AssetKeyRegistry, CachedEntry, CachedValue, ErasedLocateResult, LocatorStorage, PendingStorage,
17 QueryRegistry, VerifierStorage,
18};
19use crate::tracer::{
20 ExecutionResult, InvalidationReason, NoopTracer, SpanId, Tracer, TracerAssetKey,
21 TracerAssetState, TracerQueryKey,
22};
23use crate::QueryError;
24
25/// Function type for comparing user errors during early cutoff.
26///
27/// Used by `QueryRuntimeBuilder::error_comparator` to customize how
28/// `QueryError::UserError` values are compared for caching purposes.
29pub type ErrorComparator = fn(&anyhow::Error, &anyhow::Error) -> bool;
30
31/// Number of durability levels (matches whale's default).
32const DURABILITY_LEVELS: usize = 4;
33
34// Thread-local query execution stack for cycle detection.
35thread_local! {
36 static QUERY_STACK: RefCell<Vec<FullCacheKey>> = const { RefCell::new(Vec::new()) };
37}
38
39/// Check for cycles in the query stack and return error if detected.
40fn check_cycle(key: &FullCacheKey) -> Result<(), QueryError> {
41 let cycle_detected = QUERY_STACK.with(|stack| stack.borrow().iter().any(|k| k == key));
42 if cycle_detected {
43 let path = QUERY_STACK.with(|stack| {
44 let stack = stack.borrow();
45 let mut path: Vec<String> = stack.iter().map(|k| k.debug_repr().to_string()).collect();
46 path.push(key.debug_repr().to_string());
47 path
48 });
49 return Err(QueryError::Cycle { path });
50 }
51 Ok(())
52}
53
54/// RAII guard for pushing/popping from query stack.
55struct StackGuard;
56
57impl StackGuard {
58 fn push(key: FullCacheKey) -> Self {
59 QUERY_STACK.with(|stack| stack.borrow_mut().push(key));
60 StackGuard
61 }
62}
63
64impl Drop for StackGuard {
65 fn drop(&mut self) {
66 QUERY_STACK.with(|stack| {
67 stack.borrow_mut().pop();
68 });
69 }
70}
71
72/// Execution context passed through query execution.
73///
74/// Contains a span ID for tracing correlation.
75#[derive(Clone, Copy)]
76pub struct ExecutionContext {
77 span_id: SpanId,
78}
79
80impl ExecutionContext {
81 /// Create a new execution context with the given span ID.
82 #[inline]
83 pub fn new(span_id: SpanId) -> Self {
84 Self { span_id }
85 }
86
87 /// Get the span ID for this execution context.
88 #[inline]
89 pub fn span_id(&self) -> SpanId {
90 self.span_id
91 }
92}
93
94/// Result of polling a query, containing the value and its revision.
95///
96/// This is returned by [`QueryRuntime::poll`] and provides both the query result
97/// and its change revision, enabling efficient change detection for subscription
98/// patterns.
99///
100/// # Example
101///
102/// ```ignore
103/// let result = runtime.poll(MyQuery::new())?;
104///
105/// // Access the value via Deref
106/// println!("{:?}", *result);
107///
108/// // Check if changed since last poll
109/// if result.revision > last_known_revision {
110/// notify_subscribers(&result.value);
111/// last_known_revision = result.revision;
112/// }
113/// ```
114#[derive(Debug, Clone)]
115pub struct Polled<T> {
116 /// The query result value.
117 pub value: T,
118 /// The revision at which this value was last changed.
119 ///
120 /// Compare this with a previously stored revision to detect changes.
121 pub revision: RevisionCounter,
122}
123
124impl<T: Deref> Deref for Polled<T> {
125 type Target = T::Target;
126
127 fn deref(&self) -> &Self::Target {
128 &self.value
129 }
130}
131
132/// The query runtime manages query execution, caching, and dependency tracking.
133///
134/// This is cheap to clone - all data is behind `Arc`.
135///
136/// # Type Parameter
137///
138/// - `T: Tracer` - The tracer type for observability. Use `NoopTracer` (default)
139/// for zero-cost when tracing is not needed.
140///
141/// # Example
142///
143/// ```ignore
144/// // Without tracing (default)
145/// let runtime = QueryRuntime::new();
146///
147/// // With tracing
148/// let tracer = MyTracer::new();
149/// let runtime = QueryRuntime::with_tracer(tracer);
150///
151/// // Sync query execution
152/// let result = runtime.query(MyQuery { ... })?;
153/// ```
154pub struct QueryRuntime<T: Tracer = NoopTracer> {
155 /// Whale runtime for dependency tracking and cache storage.
156 /// Query outputs and asset values are stored in Node.data as Option<CachedEntry>.
157 whale: WhaleRuntime<FullCacheKey, Option<CachedEntry>, DURABILITY_LEVELS>,
158 /// Registered asset locators
159 locators: Arc<LocatorStorage<T>>,
160 /// Pending asset requests
161 pending: Arc<PendingStorage>,
162 /// Registry for tracking query instances (for list_queries)
163 query_registry: Arc<QueryRegistry>,
164 /// Registry for tracking asset keys (for list_asset_keys)
165 asset_key_registry: Arc<AssetKeyRegistry>,
166 /// Verifiers for re-executing queries (for verify-then-decide pattern)
167 verifiers: Arc<VerifierStorage>,
168 /// Comparator for user errors during early cutoff
169 error_comparator: ErrorComparator,
170 /// Tracer for observability
171 tracer: Arc<T>,
172}
173
174impl Default for QueryRuntime<NoopTracer> {
175 fn default() -> Self {
176 Self::new()
177 }
178}
179
180impl<T: Tracer> Clone for QueryRuntime<T> {
181 fn clone(&self) -> Self {
182 Self {
183 whale: self.whale.clone(),
184 locators: self.locators.clone(),
185 pending: self.pending.clone(),
186 query_registry: self.query_registry.clone(),
187 asset_key_registry: self.asset_key_registry.clone(),
188 verifiers: self.verifiers.clone(),
189 error_comparator: self.error_comparator,
190 tracer: self.tracer.clone(),
191 }
192 }
193}
194
195/// Default error comparator that treats all errors as different.
196///
197/// This is conservative - it always triggers recomputation when an error occurs.
198fn default_error_comparator(_a: &anyhow::Error, _b: &anyhow::Error) -> bool {
199 false
200}
201
202impl<T: Tracer> QueryRuntime<T> {
203 /// Get cached output along with its revision (single atomic access).
204 fn get_cached_with_revision<Q: Query>(
205 &self,
206 key: &FullCacheKey,
207 ) -> Option<(CachedValue<Arc<Q::Output>>, RevisionCounter)> {
208 let node = self.whale.get(key)?;
209 let revision = node.changed_at;
210 let entry = node.data.as_ref()?;
211 let cached = entry.to_cached_value::<Q::Output>()?;
212 Some((cached, revision))
213 }
214
215 /// Get a reference to the tracer.
216 #[inline]
217 pub fn tracer(&self) -> &T {
218 &self.tracer
219 }
220}
221
222impl QueryRuntime<NoopTracer> {
223 /// Create a new query runtime with default settings.
224 pub fn new() -> Self {
225 Self::with_tracer(NoopTracer)
226 }
227
228 /// Create a builder for customizing the runtime.
229 ///
230 /// # Example
231 ///
232 /// ```ignore
233 /// let runtime = QueryRuntime::builder()
234 /// .error_comparator(|a, b| {
235 /// // Custom error comparison logic
236 /// match (a.downcast_ref::<MyError>(), b.downcast_ref::<MyError>()) {
237 /// (Some(a), Some(b)) => a == b,
238 /// _ => false,
239 /// }
240 /// })
241 /// .build();
242 /// ```
243 pub fn builder() -> QueryRuntimeBuilder<NoopTracer> {
244 QueryRuntimeBuilder::new()
245 }
246}
247
248impl<T: Tracer> QueryRuntime<T> {
249 /// Create a new query runtime with the specified tracer.
250 pub fn with_tracer(tracer: T) -> Self {
251 QueryRuntimeBuilder::new().tracer(tracer).build()
252 }
253
254 /// Execute a query synchronously.
255 ///
256 /// Returns the cached result if valid, otherwise executes the query.
257 ///
258 /// # Errors
259 ///
260 /// - `QueryError::Suspend` - Query is waiting for async loading
261 /// - `QueryError::Cycle` - Dependency cycle detected
262 pub fn query<Q: Query>(&self, query: Q) -> Result<Arc<Q::Output>, QueryError> {
263 self.query_internal(query)
264 .and_then(|(inner_result, _)| inner_result.map_err(QueryError::UserError))
265 }
266
267 /// Internal implementation shared by query() and poll().
268 ///
269 /// Returns (result, revision) tuple where result is either Ok(output) or Err(user_error).
270 /// System errors (Suspend, Cycle, etc.) are returned as the outer Err.
271 #[allow(clippy::type_complexity)]
272 fn query_internal<Q: Query>(
273 &self,
274 query: Q,
275 ) -> Result<(Result<Arc<Q::Output>, Arc<anyhow::Error>>, RevisionCounter), QueryError> {
276 let key = query.cache_key();
277 let full_key = FullCacheKey::new::<Q, _>(&key);
278
279 // Emit query key event for GC tracking (before other events)
280 self.tracer.on_query_key(&full_key);
281
282 // Create execution context and emit start event
283 let span_id = self.tracer.new_span_id();
284 let exec_ctx = ExecutionContext::new(span_id);
285 let query_key = TracerQueryKey::new(std::any::type_name::<Q>(), full_key.debug_repr());
286
287 self.tracer.on_query_start(span_id, query_key.clone());
288
289 // Check for cycles using thread-local stack
290 let cycle_detected = QUERY_STACK.with(|stack| {
291 let stack = stack.borrow();
292 stack.iter().any(|k| k == &full_key)
293 });
294
295 if cycle_detected {
296 let path = QUERY_STACK.with(|stack| {
297 let stack = stack.borrow();
298 let mut path: Vec<String> =
299 stack.iter().map(|k| k.debug_repr().to_string()).collect();
300 path.push(full_key.debug_repr().to_string());
301 path
302 });
303
304 self.tracer.on_cycle_detected(
305 path.iter()
306 .map(|s| TracerQueryKey::new("", s.clone()))
307 .collect(),
308 );
309 self.tracer
310 .on_query_end(span_id, query_key.clone(), ExecutionResult::CycleDetected);
311
312 return Err(QueryError::Cycle { path });
313 }
314
315 // Check if cached and valid (with verify-then-decide pattern)
316 let current_rev = self.whale.current_revision();
317
318 // Fast path: already verified at current revision
319 if self.whale.is_verified_at(&full_key, ¤t_rev) {
320 // Single atomic access to get both cached value and revision
321 if let Some((cached, revision)) = self.get_cached_with_revision::<Q>(&full_key) {
322 self.tracer.on_cache_check(span_id, query_key.clone(), true);
323 self.tracer
324 .on_query_end(span_id, query_key.clone(), ExecutionResult::CacheHit);
325
326 return match cached {
327 CachedValue::Ok(output) => Ok((Ok(output), revision)),
328 CachedValue::UserError(err) => Ok((Err(err), revision)),
329 };
330 }
331 }
332
333 // Check shallow validity (deps' changed_at ok) and try verify-then-decide
334 if self.whale.is_valid(&full_key) {
335 // Single atomic access to get both cached value and revision
336 if let Some((cached, revision)) = self.get_cached_with_revision::<Q>(&full_key) {
337 // Shallow valid but not verified - verify deps first
338 let mut deps_verified = true;
339 if let Some(deps) = self.whale.get_dependency_ids(&full_key) {
340 for dep in deps {
341 if let Some(verifier) = self.verifiers.get(&dep) {
342 // Re-query dep to verify it (triggers recursive verification)
343 if verifier.verify(self as &dyn std::any::Any).is_err() {
344 deps_verified = false;
345 break;
346 }
347 }
348 }
349 }
350
351 // Re-check validity after deps are verified
352 if deps_verified && self.whale.is_valid(&full_key) {
353 // Deps didn't change their changed_at, mark as verified and use cached
354 self.whale.mark_verified(&full_key, ¤t_rev);
355
356 self.tracer.on_cache_check(span_id, query_key.clone(), true);
357 self.tracer
358 .on_query_end(span_id, query_key.clone(), ExecutionResult::CacheHit);
359
360 return match cached {
361 CachedValue::Ok(output) => Ok((Ok(output), revision)),
362 CachedValue::UserError(err) => Ok((Err(err), revision)),
363 };
364 }
365 // A dep's changed_at increased, fall through to execute
366 }
367 }
368
369 self.tracer
370 .on_cache_check(span_id, query_key.clone(), false);
371
372 // Execute the query with cycle tracking
373 let _guard = StackGuard::push(full_key.clone());
374 let result = self.execute_query::<Q>(&query, &full_key, exec_ctx);
375 drop(_guard);
376
377 // Emit end event
378 let exec_result = match &result {
379 Ok((_, true, _)) => ExecutionResult::Changed,
380 Ok((_, false, _)) => ExecutionResult::Unchanged,
381 Err(QueryError::Suspend { .. }) => ExecutionResult::Suspended,
382 Err(QueryError::Cycle { .. }) => ExecutionResult::CycleDetected,
383 Err(e) => ExecutionResult::Error {
384 message: format!("{:?}", e),
385 },
386 };
387 self.tracer
388 .on_query_end(span_id, query_key.clone(), exec_result);
389
390 result.map(|(inner_result, _, revision)| (inner_result, revision))
391 }
392
393 /// Execute a query, caching the result if appropriate.
394 ///
395 /// Returns (result, output_changed, revision) tuple.
396 /// - `result`: Ok(output) for success, Err(user_error) for user errors
397 /// - System errors (Suspend, Cycle, etc.) are returned as outer Err
398 #[allow(clippy::type_complexity)]
399 fn execute_query<Q: Query>(
400 &self,
401 query: &Q,
402 full_key: &FullCacheKey,
403 exec_ctx: ExecutionContext,
404 ) -> Result<
405 (
406 Result<Arc<Q::Output>, Arc<anyhow::Error>>,
407 bool,
408 RevisionCounter,
409 ),
410 QueryError,
411 > {
412 // Create context for this query execution
413 let ctx = QueryContext {
414 runtime: self,
415 current_key: full_key.clone(),
416 parent_query_type: std::any::type_name::<Q>(),
417 exec_ctx,
418 deps: RefCell::new(Vec::new()),
419 first_access_revision: Cell::new(None),
420 };
421
422 // Execute the query (clone because query() takes ownership)
423 let result = query.clone().query(&ctx);
424
425 // Get collected dependencies
426 let deps: Vec<FullCacheKey> = ctx.deps.borrow().clone();
427
428 // Query durability defaults to stable - Whale will automatically reduce
429 // the effective durability to min(requested, min(dep_durabilities)).
430 // A pure query with no dependencies remains stable.
431 // A query depending on volatile assets becomes volatile.
432 let durability = Durability::stable();
433
434 match result {
435 Ok(output) => {
436 // Check if output changed (for early cutoff)
437 // existing_revision is Some only when output is unchanged (can reuse revision)
438 let existing_revision = if let Some((CachedValue::Ok(old), rev)) =
439 self.get_cached_with_revision::<Q>(full_key)
440 {
441 if Q::output_eq(&*old, &*output) {
442 Some(rev) // Same output - reuse revision
443 } else {
444 None // Different output
445 }
446 } else {
447 None // No previous Ok value
448 };
449 let output_changed = existing_revision.is_none();
450
451 // Emit early cutoff check event
452 self.tracer.on_early_cutoff_check(
453 exec_ctx.span_id(),
454 TracerQueryKey::new(std::any::type_name::<Q>(), full_key.debug_repr()),
455 output_changed,
456 );
457
458 // Update whale with cached entry (atomic update of value + dependency state)
459 let entry = CachedEntry::Ok(output.clone() as Arc<dyn std::any::Any + Send + Sync>);
460 let revision = if let Some(existing_rev) = existing_revision {
461 // confirm_unchanged doesn't change changed_at, use existing
462 let _ = self.whale.confirm_unchanged(full_key, deps);
463 existing_rev
464 } else {
465 // Use new_rev from register result
466 match self
467 .whale
468 .register(full_key.clone(), Some(entry), durability, deps)
469 {
470 Ok(result) => result.new_rev,
471 Err(missing) => {
472 return Err(QueryError::DependenciesRemoved {
473 missing_keys: missing,
474 })
475 }
476 }
477 };
478
479 // Register query in registry for list_queries
480 let is_new_query = self.query_registry.register(query);
481 if is_new_query {
482 let sentinel = FullCacheKey::query_set_sentinel::<Q>();
483 let _ = self
484 .whale
485 .register(sentinel, None, Durability::stable(), vec![]);
486 }
487
488 // Store verifier for this query (for verify-then-decide pattern)
489 self.verifiers
490 .insert::<Q, T>(full_key.clone(), query.clone());
491
492 Ok((Ok(output), output_changed, revision))
493 }
494 Err(QueryError::UserError(err)) => {
495 // Check if error changed (for early cutoff)
496 // existing_revision is Some only when error is unchanged (can reuse revision)
497 let existing_revision = if let Some((CachedValue::UserError(old_err), rev)) =
498 self.get_cached_with_revision::<Q>(full_key)
499 {
500 if (self.error_comparator)(old_err.as_ref(), err.as_ref()) {
501 Some(rev) // Same error - reuse revision
502 } else {
503 None // Different error
504 }
505 } else {
506 None // No previous UserError
507 };
508 let output_changed = existing_revision.is_none();
509
510 // Emit early cutoff check event
511 self.tracer.on_early_cutoff_check(
512 exec_ctx.span_id(),
513 TracerQueryKey::new(std::any::type_name::<Q>(), full_key.debug_repr()),
514 output_changed,
515 );
516
517 // Update whale with cached error (atomic update of value + dependency state)
518 let entry = CachedEntry::UserError(err.clone());
519 let revision = if let Some(existing_rev) = existing_revision {
520 // confirm_unchanged doesn't change changed_at, use existing
521 let _ = self.whale.confirm_unchanged(full_key, deps);
522 existing_rev
523 } else {
524 // Use new_rev from register result
525 match self
526 .whale
527 .register(full_key.clone(), Some(entry), durability, deps)
528 {
529 Ok(result) => result.new_rev,
530 Err(missing) => {
531 return Err(QueryError::DependenciesRemoved {
532 missing_keys: missing,
533 })
534 }
535 }
536 };
537
538 // Register query in registry for list_queries
539 let is_new_query = self.query_registry.register(query);
540 if is_new_query {
541 let sentinel = FullCacheKey::query_set_sentinel::<Q>();
542 let _ = self
543 .whale
544 .register(sentinel, None, Durability::stable(), vec![]);
545 }
546
547 // Store verifier for this query (for verify-then-decide pattern)
548 self.verifiers
549 .insert::<Q, T>(full_key.clone(), query.clone());
550
551 Ok((Err(err), output_changed, revision))
552 }
553 Err(e) => {
554 // System errors (Suspend, Cycle, Cancelled, MissingDependency) are not cached
555 Err(e)
556 }
557 }
558 }
559
560 /// Invalidate a query, forcing recomputation on next access.
561 ///
562 /// This also invalidates any queries that depend on this one.
563 pub fn invalidate<Q: Query>(&self, key: &Q::CacheKey) {
564 let full_key = FullCacheKey::new::<Q, _>(key);
565
566 self.tracer.on_query_invalidated(
567 TracerQueryKey::new(std::any::type_name::<Q>(), full_key.debug_repr()),
568 InvalidationReason::ManualInvalidation,
569 );
570
571 // Update whale to invalidate dependents (register with None to clear cached value)
572 // Use stable durability to increment all revision counters, ensuring queries
573 // at any durability level will see this as a change.
574 let _ = self
575 .whale
576 .register(full_key, None, Durability::stable(), vec![]);
577 }
578
579 /// Remove a query from the cache entirely, freeing memory.
580 ///
581 /// Use this for GC when a query is no longer needed.
582 /// Unlike `invalidate`, this removes all traces of the query from storage.
583 /// The query will be recomputed from scratch on next access.
584 ///
585 /// This also invalidates any queries that depend on this one.
586 pub fn remove_query<Q: Query>(&self, key: &Q::CacheKey) {
587 let full_key = FullCacheKey::new::<Q, _>(key);
588
589 self.tracer.on_query_invalidated(
590 TracerQueryKey::new(std::any::type_name::<Q>(), full_key.debug_repr()),
591 InvalidationReason::ManualInvalidation,
592 );
593
594 // Remove verifier if exists
595 self.verifiers.remove(&full_key);
596
597 // Remove from whale storage (this also handles dependent invalidation)
598 self.whale.remove(&full_key);
599
600 // Remove from registry and update sentinel for list_queries
601 if self.query_registry.remove::<Q>(key) {
602 let sentinel = FullCacheKey::query_set_sentinel::<Q>();
603 let _ = self
604 .whale
605 .register(sentinel, None, Durability::stable(), vec![]);
606 }
607 }
608
609 /// Clear all cached values by removing all nodes from whale.
610 ///
611 /// Note: This is a relatively expensive operation as it iterates through all keys.
612 pub fn clear_cache(&self) {
613 let keys = self.whale.keys();
614 for key in keys {
615 self.whale.remove(&key);
616 }
617 }
618
619 /// Poll a query, returning both the result and its change revision.
620 ///
621 /// This is useful for implementing subscription patterns where you need to
622 /// detect changes efficiently. Compare the returned `revision` with a
623 /// previously stored value to determine if the query result has changed.
624 ///
625 /// The returned `Polled` contains a `Result<Arc<Q::Output>, Arc<anyhow::Error>>`
626 /// as its value, allowing you to track revision changes for both success and
627 /// user error cases.
628 ///
629 /// # Example
630 ///
631 /// ```ignore
632 /// struct Subscription<Q: Query> {
633 /// query: Q,
634 /// last_revision: RevisionCounter,
635 /// tx: Sender<Result<Arc<Q::Output>, Arc<anyhow::Error>>>,
636 /// }
637 ///
638 /// // Polling loop
639 /// for sub in &mut subscriptions {
640 /// let result = runtime.poll(sub.query.clone())?;
641 /// if result.revision > sub.last_revision {
642 /// sub.tx.send(result.value.clone())?;
643 /// sub.last_revision = result.revision;
644 /// }
645 /// }
646 /// ```
647 ///
648 /// # Errors
649 ///
650 /// Returns `Err` only for system errors (Suspend, Cycle, etc.).
651 /// User errors are returned as `Ok(Polled { value: Err(error), ... })`.
652 #[allow(clippy::type_complexity)]
653 pub fn poll<Q: Query>(
654 &self,
655 query: Q,
656 ) -> Result<Polled<Result<Arc<Q::Output>, Arc<anyhow::Error>>>, QueryError> {
657 let (value, revision) = self.query_internal(query)?;
658 Ok(Polled { value, revision })
659 }
660
661 /// Get the change revision of a query without executing it.
662 ///
663 /// Returns `None` if the query has never been executed.
664 ///
665 /// This is useful for checking if a query has changed since the last poll
666 /// without the cost of executing the query.
667 ///
668 /// # Example
669 ///
670 /// ```ignore
671 /// // Check if query has changed before deciding to poll
672 /// if let Some(rev) = runtime.changed_at::<MyQuery>(&key) {
673 /// if rev > last_known_revision {
674 /// let result = runtime.query(MyQuery::new(key))?;
675 /// // Process result...
676 /// }
677 /// }
678 /// ```
679 pub fn changed_at<Q: Query>(&self, key: &Q::CacheKey) -> Option<RevisionCounter> {
680 let full_key = FullCacheKey::new::<Q, _>(key);
681 self.whale.get(&full_key).map(|node| node.changed_at)
682 }
683}
684
685// ============================================================================
686// GC (Garbage Collection) API
687// ============================================================================
688
689impl<T: Tracer> QueryRuntime<T> {
690 /// Get all query keys currently in the cache.
691 ///
692 /// This is useful for implementing custom garbage collection strategies.
693 /// Use this in combination with [`Tracer::on_query_key`] to track access
694 /// times and implement LRU, TTL, or other GC algorithms externally.
695 ///
696 /// # Example
697 ///
698 /// ```ignore
699 /// // Collect all keys that haven't been accessed recently
700 /// let stale_keys: Vec<_> = runtime.query_keys()
701 /// .filter(|key| tracker.is_stale(key))
702 /// .collect();
703 ///
704 /// // Remove stale queries that have no dependents
705 /// for key in stale_keys {
706 /// runtime.remove_if_unused(&key);
707 /// }
708 /// ```
709 pub fn query_keys(&self) -> Vec<FullCacheKey> {
710 self.whale.keys()
711 }
712
713 /// Remove a query if it has no dependents.
714 ///
715 /// Returns `true` if the query was removed, `false` if it has dependents
716 /// or doesn't exist. This is the safe way to remove queries during GC,
717 /// as it won't break queries that depend on this one.
718 ///
719 /// # Example
720 ///
721 /// ```ignore
722 /// let key = FullCacheKey::new::<MyQuery, _>(&cache_key);
723 /// if runtime.remove_query_if_unused::<MyQuery>(&cache_key) {
724 /// println!("Query removed");
725 /// } else {
726 /// println!("Query has dependents, not removed");
727 /// }
728 /// ```
729 pub fn remove_query_if_unused<Q: Query>(&self, key: &Q::CacheKey) -> bool {
730 let full_key = FullCacheKey::new::<Q, _>(key);
731 self.remove_if_unused(&full_key)
732 }
733
734 /// Remove a query by its [`FullCacheKey`].
735 ///
736 /// This is the type-erased version of [`remove_query`](Self::remove_query).
737 /// Use this when you have a `FullCacheKey` from [`query_keys`](Self::query_keys)
738 /// or [`Tracer::on_query_key`].
739 ///
740 /// Returns `true` if the query was removed, `false` if it doesn't exist.
741 ///
742 /// # Warning
743 ///
744 /// This forcibly removes the query even if other queries depend on it.
745 /// Dependent queries will be recomputed on next access. For safe GC,
746 /// use [`remove_if_unused`](Self::remove_if_unused) instead.
747 pub fn remove(&self, key: &FullCacheKey) -> bool {
748 // Remove verifier if exists
749 self.verifiers.remove(key);
750
751 // Remove from whale storage
752 self.whale.remove(key).is_some()
753 }
754
755 /// Remove a query by its [`FullCacheKey`] if it has no dependents.
756 ///
757 /// This is the type-erased version of [`remove_query_if_unused`](Self::remove_query_if_unused).
758 /// Use this when you have a `FullCacheKey` from [`query_keys`](Self::query_keys)
759 /// or [`Tracer::on_query_key`].
760 ///
761 /// Returns `true` if the query was removed, `false` if it has dependents
762 /// or doesn't exist.
763 ///
764 /// # Example
765 ///
766 /// ```ignore
767 /// // Implement LRU GC
768 /// for key in runtime.query_keys() {
769 /// if tracker.is_expired(&key) {
770 /// runtime.remove_if_unused(&key);
771 /// }
772 /// }
773 /// ```
774 pub fn remove_if_unused(&self, key: &FullCacheKey) -> bool {
775 if self.whale.remove_if_unused(key.clone()).is_some() {
776 // Successfully removed - clean up verifier
777 self.verifiers.remove(key);
778 true
779 } else {
780 false
781 }
782 }
783}
784
785// ============================================================================
786// Builder
787// ============================================================================
788
789/// Builder for [`QueryRuntime`] with customizable settings.
790///
791/// # Example
792///
793/// ```ignore
794/// let runtime = QueryRuntime::builder()
795/// .error_comparator(|a, b| {
796/// // Treat all errors of the same type as equal
797/// a.downcast_ref::<std::io::Error>().is_some()
798/// == b.downcast_ref::<std::io::Error>().is_some()
799/// })
800/// .build();
801/// ```
802pub struct QueryRuntimeBuilder<T: Tracer = NoopTracer> {
803 error_comparator: ErrorComparator,
804 tracer: T,
805}
806
807impl Default for QueryRuntimeBuilder<NoopTracer> {
808 fn default() -> Self {
809 Self::new()
810 }
811}
812
813impl QueryRuntimeBuilder<NoopTracer> {
814 /// Create a new builder with default settings.
815 pub fn new() -> Self {
816 Self {
817 error_comparator: default_error_comparator,
818 tracer: NoopTracer,
819 }
820 }
821}
822
823impl<T: Tracer> QueryRuntimeBuilder<T> {
824 /// Set the error comparator function for early cutoff optimization.
825 ///
826 /// When a query returns `QueryError::UserError`, this function is used
827 /// to compare it with the previously cached error. If they are equal,
828 /// downstream queries can skip recomputation (early cutoff).
829 ///
830 /// The default comparator returns `false` for all errors, meaning errors
831 /// are always considered different (conservative, always recomputes).
832 ///
833 /// # Example
834 ///
835 /// ```ignore
836 /// // Treat errors as equal if they have the same display message
837 /// let runtime = QueryRuntime::builder()
838 /// .error_comparator(|a, b| a.to_string() == b.to_string())
839 /// .build();
840 /// ```
841 pub fn error_comparator(mut self, f: ErrorComparator) -> Self {
842 self.error_comparator = f;
843 self
844 }
845
846 /// Set the tracer for observability.
847 pub fn tracer<U: Tracer>(self, tracer: U) -> QueryRuntimeBuilder<U> {
848 QueryRuntimeBuilder {
849 error_comparator: self.error_comparator,
850 tracer,
851 }
852 }
853
854 /// Build the runtime with the configured settings.
855 pub fn build(self) -> QueryRuntime<T> {
856 QueryRuntime {
857 whale: WhaleRuntime::new(),
858 locators: Arc::new(LocatorStorage::new()),
859 pending: Arc::new(PendingStorage::new()),
860 query_registry: Arc::new(QueryRegistry::new()),
861 asset_key_registry: Arc::new(AssetKeyRegistry::new()),
862 verifiers: Arc::new(VerifierStorage::new()),
863 error_comparator: self.error_comparator,
864 tracer: Arc::new(self.tracer),
865 }
866 }
867}
868
869// ============================================================================
870// Asset API
871// ============================================================================
872
873impl<T: Tracer> QueryRuntime<T> {
874 /// Register an asset locator for a specific asset key type.
875 ///
876 /// Only one locator can be registered per key type. Later registrations
877 /// replace earlier ones.
878 ///
879 /// # Example
880 ///
881 /// ```ignore
882 /// let runtime = QueryRuntime::new();
883 /// runtime.register_asset_locator(FileSystemLocator::new("/assets"));
884 /// ```
885 pub fn register_asset_locator<K, L>(&self, locator: L)
886 where
887 K: AssetKey,
888 L: AssetLocator<K>,
889 {
890 self.locators.insert::<K, L>(locator);
891 }
892
893 /// Get an iterator over pending asset requests.
894 ///
895 /// Returns assets that have been requested but not yet resolved.
896 /// The user should fetch these externally and call `resolve_asset()`.
897 ///
898 /// # Example
899 ///
900 /// ```ignore
901 /// for pending in runtime.pending_assets() {
902 /// if let Some(path) = pending.key::<FilePath>() {
903 /// let content = fetch_file(path);
904 /// runtime.resolve_asset(path.clone(), content);
905 /// }
906 /// }
907 /// ```
908 pub fn pending_assets(&self) -> Vec<PendingAsset> {
909 self.pending.get_all()
910 }
911
912 /// Get pending assets filtered by key type.
913 pub fn pending_assets_of<K: AssetKey>(&self) -> Vec<K> {
914 self.pending.get_of_type::<K>()
915 }
916
917 /// Check if there are any pending assets.
918 pub fn has_pending_assets(&self) -> bool {
919 !self.pending.is_empty()
920 }
921
922 /// Resolve an asset with its loaded value.
923 ///
924 /// This marks the asset as ready and invalidates any queries that
925 /// depend on it (if the value changed), triggering recomputation on next access.
926 ///
927 /// This method is idempotent - resolving with the same value (via `asset_eq`)
928 /// will not trigger downstream recomputation.
929 ///
930 /// # Arguments
931 ///
932 /// * `key` - The asset key identifying this resource
933 /// * `value` - The loaded asset value
934 /// * `durability` - How frequently this asset is expected to change
935 ///
936 /// # Example
937 ///
938 /// ```ignore
939 /// let content = std::fs::read_to_string(&path)?;
940 /// runtime.resolve_asset(FilePath(path), content, DurabilityLevel::Volatile);
941 /// ```
942 pub fn resolve_asset<K: AssetKey>(&self, key: K, value: K::Asset, durability: DurabilityLevel) {
943 self.resolve_asset_internal(key, value, durability);
944 }
945
946 /// Resolve an asset with an error.
947 ///
948 /// This marks the asset as errored and caches the error. Queries depending
949 /// on this asset will receive `Err(QueryError::UserError(...))`.
950 ///
951 /// Use this when async loading fails (e.g., network error, file not found,
952 /// access denied).
953 ///
954 /// # Arguments
955 ///
956 /// * `key` - The asset key identifying this resource
957 /// * `error` - The error to cache (will be wrapped in `Arc`)
958 /// * `durability` - How frequently this error state is expected to change
959 ///
960 /// # Example
961 ///
962 /// ```ignore
963 /// match fetch_file(&path) {
964 /// Ok(content) => runtime.resolve_asset(FilePath(path), content, DurabilityLevel::Volatile),
965 /// Err(e) => runtime.resolve_asset_error(FilePath(path), e, DurabilityLevel::Volatile),
966 /// }
967 /// ```
968 pub fn resolve_asset_error<K: AssetKey>(
969 &self,
970 key: K,
971 error: impl Into<anyhow::Error>,
972 durability: DurabilityLevel,
973 ) {
974 let full_asset_key = FullAssetKey::new(&key);
975 let full_cache_key = FullCacheKey::from_asset_key(&full_asset_key);
976
977 // Remove from pending BEFORE registering the error
978 self.pending.remove(&full_asset_key);
979
980 // Prepare the error entry
981 let error_arc = Arc::new(error.into());
982 let entry = CachedEntry::AssetError(error_arc.clone());
983 let durability =
984 Durability::new(durability.as_u8() as usize).unwrap_or(Durability::volatile());
985
986 // Atomic compare-and-update (errors are always considered changed for now)
987 let result = self
988 .whale
989 .update_with_compare(
990 full_cache_key,
991 Some(entry),
992 |old_data, _new_data| {
993 // Compare old and new errors using error_comparator
994 match old_data.and_then(|d| d.as_ref()) {
995 Some(CachedEntry::AssetError(old_err)) => {
996 !(self.error_comparator)(old_err.as_ref(), error_arc.as_ref())
997 }
998 _ => true, // Loading, Ready, or not present -> changed
999 }
1000 },
1001 durability,
1002 vec![],
1003 )
1004 .expect("update_with_compare with no dependencies cannot fail");
1005
1006 // Emit asset resolved event (with changed status)
1007 self.tracer.on_asset_resolved(
1008 TracerAssetKey::new(std::any::type_name::<K>(), format!("{:?}", key)),
1009 result.changed,
1010 );
1011
1012 // Register asset key in registry for list_asset_keys
1013 let is_new_asset = self.asset_key_registry.register(&key);
1014 if is_new_asset {
1015 // Update sentinel to invalidate list_asset_keys dependents
1016 let sentinel = FullCacheKey::asset_key_set_sentinel::<K>();
1017 let _ = self
1018 .whale
1019 .register(sentinel, None, Durability::stable(), vec![]);
1020 }
1021 }
1022
1023 fn resolve_asset_internal<K: AssetKey>(
1024 &self,
1025 key: K,
1026 value: K::Asset,
1027 durability_level: DurabilityLevel,
1028 ) {
1029 let full_asset_key = FullAssetKey::new(&key);
1030 let full_cache_key = FullCacheKey::from_asset_key(&full_asset_key);
1031
1032 // Remove from pending BEFORE registering the value
1033 self.pending.remove(&full_asset_key);
1034
1035 // Prepare the new entry
1036 let value_arc: Arc<K::Asset> = Arc::new(value);
1037 let entry = CachedEntry::AssetReady(value_arc.clone() as Arc<dyn Any + Send + Sync>);
1038 let durability =
1039 Durability::new(durability_level.as_u8() as usize).unwrap_or(Durability::volatile());
1040
1041 // Atomic compare-and-update
1042 let result = self
1043 .whale
1044 .update_with_compare(
1045 full_cache_key,
1046 Some(entry),
1047 |old_data, _new_data| {
1048 // Compare old and new values
1049 match old_data.and_then(|d| d.as_ref()) {
1050 Some(CachedEntry::AssetReady(old_arc)) => {
1051 match old_arc.clone().downcast::<K::Asset>() {
1052 Ok(old_value) => !K::asset_eq(&old_value, &value_arc),
1053 Err(_) => true, // Type mismatch, treat as changed
1054 }
1055 }
1056 _ => true, // Loading, NotFound, or not present -> changed
1057 }
1058 },
1059 durability,
1060 vec![],
1061 )
1062 .expect("update_with_compare with no dependencies cannot fail");
1063
1064 // Emit asset resolved event
1065 self.tracer.on_asset_resolved(
1066 TracerAssetKey::new(std::any::type_name::<K>(), format!("{:?}", key)),
1067 result.changed,
1068 );
1069
1070 // Register asset key in registry for list_asset_keys
1071 let is_new_asset = self.asset_key_registry.register(&key);
1072 if is_new_asset {
1073 // Update sentinel to invalidate list_asset_keys dependents
1074 let sentinel = FullCacheKey::asset_key_set_sentinel::<K>();
1075 let _ = self
1076 .whale
1077 .register(sentinel, None, Durability::stable(), vec![]);
1078 }
1079 }
1080
1081 /// Invalidate an asset, forcing queries to re-request it.
1082 ///
1083 /// The asset will be marked as loading and added to pending assets.
1084 /// Dependent queries will suspend until the asset is resolved again.
1085 ///
1086 /// # Example
1087 ///
1088 /// ```ignore
1089 /// // File was modified externally
1090 /// runtime.invalidate_asset(&FilePath("config.json".into()));
1091 /// // Queries depending on this asset will now suspend
1092 /// // User should fetch the new value and call resolve_asset
1093 /// ```
1094 pub fn invalidate_asset<K: AssetKey>(&self, key: &K) {
1095 let full_asset_key = FullAssetKey::new(key);
1096 let full_cache_key = FullCacheKey::from_asset_key(&full_asset_key);
1097
1098 // Emit asset invalidated event
1099 self.tracer.on_asset_invalidated(TracerAssetKey::new(
1100 std::any::type_name::<K>(),
1101 format!("{:?}", key),
1102 ));
1103
1104 // Add to pending FIRST (before clearing whale state)
1105 // This ensures: readers see either old value, or Loading+pending
1106 self.pending.insert::<K>(full_asset_key, key.clone());
1107
1108 // Atomic: clear cached value + invalidate dependents
1109 // Using None for data means "needs to be loaded"
1110 // Use stable durability to ensure queries at any durability level see the change.
1111 let _ = self
1112 .whale
1113 .register(full_cache_key, None, Durability::stable(), vec![]);
1114 }
1115
1116 /// Remove an asset from the cache entirely.
1117 ///
1118 /// Unlike `invalidate_asset`, this removes all traces of the asset.
1119 /// Dependent queries will go through the locator again on next access.
1120 pub fn remove_asset<K: AssetKey>(&self, key: &K) {
1121 let full_asset_key = FullAssetKey::new(key);
1122 let full_cache_key = FullCacheKey::from_asset_key(&full_asset_key);
1123
1124 // Remove from pending first
1125 self.pending.remove(&full_asset_key);
1126
1127 // Remove from whale (this also cleans up dependency edges)
1128 // whale.remove() invalidates dependents before removing
1129 self.whale.remove(&full_cache_key);
1130
1131 // Remove from registry and update sentinel for list_asset_keys
1132 if self.asset_key_registry.remove::<K>(key) {
1133 let sentinel = FullCacheKey::asset_key_set_sentinel::<K>();
1134 let _ = self
1135 .whale
1136 .register(sentinel, None, Durability::stable(), vec![]);
1137 }
1138 }
1139
1140 /// Get an asset by key without tracking dependencies.
1141 ///
1142 /// Unlike `QueryContext::asset()`, this method does NOT register the caller
1143 /// as a dependent of the asset. Use this for direct asset access outside
1144 /// of query execution.
1145 ///
1146 /// # Returns
1147 ///
1148 /// - `Ok(AssetLoadingState::ready(...))` - Asset is loaded and ready
1149 /// - `Ok(AssetLoadingState::loading(...))` - Asset is still loading (added to pending)
1150 /// - `Err(QueryError::MissingDependency)` - Asset was not found
1151 pub fn get_asset<K: AssetKey>(&self, key: K) -> Result<AssetLoadingState<K>, QueryError> {
1152 self.get_asset_internal(key)
1153 }
1154
1155 /// Internal: Get asset state and its changed_at revision atomically.
1156 ///
1157 /// This is the "direct" version called from QueryRuntime::asset (no dependency tracking).
1158 /// For calls from QueryContext::asset, use `get_asset_with_revision_ctx`.
1159 ///
1160 /// Returns (AssetLoadingState, changed_at) where changed_at is from the same
1161 /// whale node that provided the asset value.
1162 fn get_asset_with_revision<K: AssetKey>(
1163 &self,
1164 key: K,
1165 ) -> Result<(AssetLoadingState<K>, RevisionCounter), QueryError> {
1166 let full_asset_key = FullAssetKey::new(&key);
1167 let full_cache_key = FullCacheKey::from_asset_key(&full_asset_key);
1168
1169 // Helper to emit AssetRequested event
1170 let emit_requested = |tracer: &T, key: &K, state: TracerAssetState| {
1171 tracer.on_asset_requested(
1172 TracerAssetKey::new(std::any::type_name::<K>(), format!("{:?}", key)),
1173 state,
1174 );
1175 };
1176
1177 // Check whale cache first (single atomic read)
1178 if let Some(node) = self.whale.get(&full_cache_key) {
1179 let changed_at = node.changed_at;
1180 // Check if valid at current revision
1181 if self.whale.is_valid(&full_cache_key) {
1182 match &node.data {
1183 Some(CachedEntry::AssetReady(arc)) => {
1184 emit_requested(&self.tracer, &key, TracerAssetState::Ready);
1185 match arc.clone().downcast::<K::Asset>() {
1186 Ok(value) => {
1187 return Ok((AssetLoadingState::ready(key, value), changed_at))
1188 }
1189 Err(_) => {
1190 return Err(QueryError::MissingDependency {
1191 description: format!("Asset type mismatch: {:?}", key),
1192 })
1193 }
1194 }
1195 }
1196 Some(CachedEntry::AssetError(err)) => {
1197 emit_requested(&self.tracer, &key, TracerAssetState::NotFound);
1198 return Err(QueryError::UserError(err.clone()));
1199 }
1200 None => {
1201 // Loading state
1202 emit_requested(&self.tracer, &key, TracerAssetState::Loading);
1203 return Ok((AssetLoadingState::loading(key), changed_at));
1204 }
1205 _ => {
1206 // Query-related entries (Ok, UserError) shouldn't be here
1207 // Fall through to locator
1208 }
1209 }
1210 }
1211 }
1212
1213 // Not in cache or invalid - try locator
1214 check_cycle(&full_cache_key)?;
1215 let _guard = StackGuard::push(full_cache_key.clone());
1216
1217 let locator_result = self
1218 .locators
1219 .locate_with_runtime(TypeId::of::<K>(), self, &key);
1220
1221 if let Some(result) = locator_result {
1222 match result {
1223 Ok(ErasedLocateResult::Ready {
1224 value: arc,
1225 durability: durability_level,
1226 }) => {
1227 emit_requested(&self.tracer, &key, TracerAssetState::Ready);
1228
1229 let typed_value: Arc<K::Asset> = match arc.downcast::<K::Asset>() {
1230 Ok(v) => v,
1231 Err(_) => {
1232 return Err(QueryError::MissingDependency {
1233 description: format!("Asset type mismatch: {:?}", key),
1234 });
1235 }
1236 };
1237
1238 // Store in whale atomically with early cutoff
1239 let entry = CachedEntry::AssetReady(typed_value.clone());
1240 let durability = Durability::new(durability_level.as_u8() as usize)
1241 .unwrap_or(Durability::volatile());
1242 let new_value = typed_value.clone();
1243 let result = self
1244 .whale
1245 .update_with_compare(
1246 full_cache_key,
1247 Some(entry),
1248 |old_data, _new_data| {
1249 let Some(CachedEntry::AssetReady(old_arc)) =
1250 old_data.and_then(|d| d.as_ref())
1251 else {
1252 return true;
1253 };
1254 let Ok(old_value) = old_arc.clone().downcast::<K::Asset>() else {
1255 return true;
1256 };
1257 !K::asset_eq(&old_value, &new_value)
1258 },
1259 durability,
1260 vec![],
1261 )
1262 .expect("update_with_compare with no dependencies cannot fail");
1263
1264 return Ok((AssetLoadingState::ready(key, typed_value), result.revision));
1265 }
1266 Ok(ErasedLocateResult::Pending) => {
1267 emit_requested(&self.tracer, &key, TracerAssetState::Loading);
1268
1269 // Add to pending list for Pending result
1270 self.pending.insert::<K>(full_asset_key, key.clone());
1271 match self
1272 .whale
1273 .get_or_insert(full_cache_key, None, Durability::volatile(), vec![])
1274 .expect("get_or_insert with no dependencies cannot fail")
1275 {
1276 GetOrInsertResult::Inserted(node) => {
1277 return Ok((AssetLoadingState::loading(key), node.changed_at));
1278 }
1279 GetOrInsertResult::Existing(node) => {
1280 let changed_at = node.changed_at;
1281 match &node.data {
1282 Some(CachedEntry::AssetReady(arc)) => {
1283 match arc.clone().downcast::<K::Asset>() {
1284 Ok(value) => {
1285 return Ok((
1286 AssetLoadingState::ready(key, value),
1287 changed_at,
1288 ))
1289 }
1290 Err(_) => {
1291 return Ok((
1292 AssetLoadingState::loading(key),
1293 changed_at,
1294 ))
1295 }
1296 }
1297 }
1298 Some(CachedEntry::AssetError(err)) => {
1299 return Err(QueryError::UserError(err.clone()));
1300 }
1301 _ => return Ok((AssetLoadingState::loading(key), changed_at)),
1302 }
1303 }
1304 }
1305 }
1306 Err(QueryError::UserError(err)) => {
1307 // Locator returned a user error - cache it as AssetError
1308 let entry = CachedEntry::AssetError(err.clone());
1309 let _ = self.whale.register(
1310 full_cache_key,
1311 Some(entry),
1312 Durability::volatile(),
1313 vec![],
1314 );
1315 return Err(QueryError::UserError(err));
1316 }
1317 Err(e) => {
1318 // Other errors (Cycle, Suspended, etc.) - do NOT cache, propagate directly
1319 return Err(e);
1320 }
1321 }
1322 }
1323
1324 // No locator registered or locator returned None - mark as pending
1325 emit_requested(&self.tracer, &key, TracerAssetState::Loading);
1326 self.pending
1327 .insert::<K>(full_asset_key.clone(), key.clone());
1328
1329 match self
1330 .whale
1331 .get_or_insert(full_cache_key, None, Durability::volatile(), vec![])
1332 .expect("get_or_insert with no dependencies cannot fail")
1333 {
1334 GetOrInsertResult::Inserted(node) => {
1335 Ok((AssetLoadingState::loading(key), node.changed_at))
1336 }
1337 GetOrInsertResult::Existing(node) => {
1338 let changed_at = node.changed_at;
1339 match &node.data {
1340 Some(CachedEntry::AssetReady(arc)) => {
1341 match arc.clone().downcast::<K::Asset>() {
1342 Ok(value) => Ok((AssetLoadingState::ready(key, value), changed_at)),
1343 Err(_) => Ok((AssetLoadingState::loading(key), changed_at)),
1344 }
1345 }
1346 Some(CachedEntry::AssetError(err)) => Err(QueryError::UserError(err.clone())),
1347 _ => Ok((AssetLoadingState::loading(key), changed_at)),
1348 }
1349 }
1350 }
1351 }
1352
1353 /// Internal: Get asset state and its changed_at revision atomically (with QueryContext).
1354 ///
1355 /// This version is called from QueryContext::asset and enables dependency tracking
1356 /// within the locator's db.query() and db.asset() calls.
1357 fn get_asset_with_revision_ctx<K: AssetKey>(
1358 &self,
1359 key: K,
1360 ctx: &QueryContext<'_, T>,
1361 ) -> Result<(AssetLoadingState<K>, RevisionCounter), QueryError> {
1362 let full_asset_key = FullAssetKey::new(&key);
1363 let full_cache_key = FullCacheKey::from_asset_key(&full_asset_key);
1364
1365 // Helper to emit AssetRequested event
1366 let emit_requested = |tracer: &T, key: &K, state: TracerAssetState| {
1367 tracer.on_asset_requested(
1368 TracerAssetKey::new(std::any::type_name::<K>(), format!("{:?}", key)),
1369 state,
1370 );
1371 };
1372
1373 // Check whale cache first (single atomic read)
1374 if let Some(node) = self.whale.get(&full_cache_key) {
1375 let changed_at = node.changed_at;
1376 // Check if valid at current revision
1377 if self.whale.is_valid(&full_cache_key) {
1378 match &node.data {
1379 Some(CachedEntry::AssetReady(arc)) => {
1380 emit_requested(&self.tracer, &key, TracerAssetState::Ready);
1381 match arc.clone().downcast::<K::Asset>() {
1382 Ok(value) => {
1383 return Ok((AssetLoadingState::ready(key, value), changed_at))
1384 }
1385 Err(_) => {
1386 return Err(QueryError::MissingDependency {
1387 description: format!("Asset type mismatch: {:?}", key),
1388 })
1389 }
1390 }
1391 }
1392 Some(CachedEntry::AssetError(err)) => {
1393 emit_requested(&self.tracer, &key, TracerAssetState::NotFound);
1394 return Err(QueryError::UserError(err.clone()));
1395 }
1396 None => {
1397 // Loading state
1398 emit_requested(&self.tracer, &key, TracerAssetState::Loading);
1399 return Ok((AssetLoadingState::loading(key), changed_at));
1400 }
1401 _ => {
1402 // Query-related entries (Ok, UserError) shouldn't be here
1403 // Fall through to locator
1404 }
1405 }
1406 }
1407 }
1408
1409 // Not in cache or invalid - try locator (with context for dependency tracking)
1410 check_cycle(&full_cache_key)?;
1411 let _guard = StackGuard::push(full_cache_key.clone());
1412
1413 let locator_result = self.locators.locate_with_ctx(TypeId::of::<K>(), ctx, &key);
1414
1415 if let Some(result) = locator_result {
1416 match result {
1417 Ok(ErasedLocateResult::Ready {
1418 value: arc,
1419 durability: durability_level,
1420 }) => {
1421 emit_requested(&self.tracer, &key, TracerAssetState::Ready);
1422
1423 let typed_value: Arc<K::Asset> = match arc.downcast::<K::Asset>() {
1424 Ok(v) => v,
1425 Err(_) => {
1426 return Err(QueryError::MissingDependency {
1427 description: format!("Asset type mismatch: {:?}", key),
1428 });
1429 }
1430 };
1431
1432 // Store in whale atomically with early cutoff
1433 let entry = CachedEntry::AssetReady(typed_value.clone());
1434 let durability = Durability::new(durability_level.as_u8() as usize)
1435 .unwrap_or(Durability::volatile());
1436 let new_value = typed_value.clone();
1437 let result = self
1438 .whale
1439 .update_with_compare(
1440 full_cache_key,
1441 Some(entry),
1442 |old_data, _new_data| {
1443 let Some(CachedEntry::AssetReady(old_arc)) =
1444 old_data.and_then(|d| d.as_ref())
1445 else {
1446 return true;
1447 };
1448 let Ok(old_value) = old_arc.clone().downcast::<K::Asset>() else {
1449 return true;
1450 };
1451 !K::asset_eq(&old_value, &new_value)
1452 },
1453 durability,
1454 vec![],
1455 )
1456 .expect("update_with_compare with no dependencies cannot fail");
1457
1458 return Ok((AssetLoadingState::ready(key, typed_value), result.revision));
1459 }
1460 Ok(ErasedLocateResult::Pending) => {
1461 emit_requested(&self.tracer, &key, TracerAssetState::Loading);
1462
1463 // Add to pending list for Pending result
1464 self.pending.insert::<K>(full_asset_key, key.clone());
1465 match self
1466 .whale
1467 .get_or_insert(full_cache_key, None, Durability::volatile(), vec![])
1468 .expect("get_or_insert with no dependencies cannot fail")
1469 {
1470 GetOrInsertResult::Inserted(node) => {
1471 return Ok((AssetLoadingState::loading(key), node.changed_at));
1472 }
1473 GetOrInsertResult::Existing(node) => {
1474 let changed_at = node.changed_at;
1475 match &node.data {
1476 Some(CachedEntry::AssetReady(arc)) => {
1477 match arc.clone().downcast::<K::Asset>() {
1478 Ok(value) => {
1479 return Ok((
1480 AssetLoadingState::ready(key, value),
1481 changed_at,
1482 ))
1483 }
1484 Err(_) => {
1485 return Ok((
1486 AssetLoadingState::loading(key),
1487 changed_at,
1488 ))
1489 }
1490 }
1491 }
1492 Some(CachedEntry::AssetError(err)) => {
1493 return Err(QueryError::UserError(err.clone()));
1494 }
1495 _ => return Ok((AssetLoadingState::loading(key), changed_at)),
1496 }
1497 }
1498 }
1499 }
1500 Err(QueryError::UserError(err)) => {
1501 // Locator returned a user error - cache it as AssetError
1502 let entry = CachedEntry::AssetError(err.clone());
1503 let _ = self.whale.register(
1504 full_cache_key,
1505 Some(entry),
1506 Durability::volatile(),
1507 vec![],
1508 );
1509 return Err(QueryError::UserError(err));
1510 }
1511 Err(e) => {
1512 // Other errors (Cycle, Suspended, etc.) - do NOT cache, propagate directly
1513 return Err(e);
1514 }
1515 }
1516 }
1517
1518 // No locator registered or locator returned None - mark as pending
1519 emit_requested(&self.tracer, &key, TracerAssetState::Loading);
1520 self.pending
1521 .insert::<K>(full_asset_key.clone(), key.clone());
1522
1523 match self
1524 .whale
1525 .get_or_insert(full_cache_key, None, Durability::volatile(), vec![])
1526 .expect("get_or_insert with no dependencies cannot fail")
1527 {
1528 GetOrInsertResult::Inserted(node) => {
1529 Ok((AssetLoadingState::loading(key), node.changed_at))
1530 }
1531 GetOrInsertResult::Existing(node) => {
1532 let changed_at = node.changed_at;
1533 match &node.data {
1534 Some(CachedEntry::AssetReady(arc)) => {
1535 match arc.clone().downcast::<K::Asset>() {
1536 Ok(value) => Ok((AssetLoadingState::ready(key, value), changed_at)),
1537 Err(_) => Ok((AssetLoadingState::loading(key), changed_at)),
1538 }
1539 }
1540 Some(CachedEntry::AssetError(err)) => Err(QueryError::UserError(err.clone())),
1541 _ => Ok((AssetLoadingState::loading(key), changed_at)),
1542 }
1543 }
1544 }
1545 }
1546
1547 /// Internal: Get asset state, checking cache and locator.
1548 fn get_asset_internal<K: AssetKey>(&self, key: K) -> Result<AssetLoadingState<K>, QueryError> {
1549 self.get_asset_with_revision(key).map(|(state, _)| state)
1550 }
1551}
1552
1553impl<T: Tracer> Db for QueryRuntime<T> {
1554 fn query<Q: Query>(&self, query: Q) -> Result<Arc<Q::Output>, QueryError> {
1555 QueryRuntime::query(self, query)
1556 }
1557
1558 fn asset<K: AssetKey>(&self, key: K) -> Result<AssetLoadingState<K>, QueryError> {
1559 self.get_asset_internal(key)
1560 }
1561
1562 fn list_queries<Q: Query>(&self) -> Vec<Q> {
1563 self.query_registry.get_all::<Q>()
1564 }
1565
1566 fn list_asset_keys<K: AssetKey>(&self) -> Vec<K> {
1567 self.asset_key_registry.get_all::<K>()
1568 }
1569}
1570
1571/// Context provided to queries during execution.
1572///
1573/// Use this to access dependencies via `query()`.
1574pub struct QueryContext<'a, T: Tracer = NoopTracer> {
1575 runtime: &'a QueryRuntime<T>,
1576 current_key: FullCacheKey,
1577 parent_query_type: &'static str,
1578 exec_ctx: ExecutionContext,
1579 deps: RefCell<Vec<FullCacheKey>>,
1580 /// Revision at first asset access, used for consistency checking.
1581 /// Assets with changed_at > this value were modified during query execution.
1582 first_access_revision: Cell<Option<RevisionCounter>>,
1583}
1584
1585impl<'a, T: Tracer> QueryContext<'a, T> {
1586 /// Ensure consistency of asset access.
1587 ///
1588 /// On first asset access: records max(current_global, dep_changed_at) as baseline.
1589 /// On subsequent asset accesses: checks dep_changed_at <= baseline.
1590 ///
1591 /// IMPORTANT: current_global must be obtained BEFORE accessing the asset.
1592 #[inline]
1593 fn ensure_consistent(
1594 &self,
1595 current_global: RevisionCounter,
1596 dep_changed_at: RevisionCounter,
1597 ) -> Result<(), QueryError> {
1598 match self.first_access_revision.get() {
1599 None => {
1600 // First asset access: record max(current_global, dep_changed_at)
1601 // Using max ensures we don't get false positives when only one asset is accessed
1602 let first = current_global.max(dep_changed_at);
1603 self.first_access_revision.set(Some(first));
1604 Ok(())
1605 }
1606 Some(first) => {
1607 // Subsequent asset accesses: check consistency
1608 if dep_changed_at > first {
1609 Err(QueryError::InconsistentAssetResolution)
1610 } else {
1611 Ok(())
1612 }
1613 }
1614 }
1615 }
1616
1617 /// Query a dependency.
1618 ///
1619 /// The dependency is automatically tracked for invalidation.
1620 ///
1621 /// # Example
1622 ///
1623 /// ```ignore
1624 /// fn query(self, db: &impl Db) -> Result<Self::Output, QueryError> {
1625 /// let dep_result = db.query(OtherQuery { id: self.id })?;
1626 /// Ok(process(&dep_result))
1627 /// }
1628 /// ```
1629 pub fn query<Q: Query>(&self, query: Q) -> Result<Arc<Q::Output>, QueryError> {
1630 let key = query.cache_key();
1631 let full_key = FullCacheKey::new::<Q, _>(&key);
1632
1633 // Emit dependency registered event
1634 self.runtime.tracer.on_dependency_registered(
1635 self.exec_ctx.span_id(),
1636 TracerQueryKey::new(self.parent_query_type, self.current_key.debug_repr()),
1637 TracerQueryKey::new(std::any::type_name::<Q>(), full_key.debug_repr()),
1638 );
1639
1640 // Record this as a dependency
1641 self.deps.borrow_mut().push(full_key.clone());
1642
1643 // Execute the query
1644 self.runtime.query(query)
1645 }
1646
1647 /// Access an asset, tracking it as a dependency.
1648 ///
1649 /// Returns `AssetLoadingState<K>`:
1650 /// - `is_loading()` if the asset is still being loaded
1651 /// - `is_ready()` if the asset is available
1652 ///
1653 /// Use `.suspend()?` to convert to `Result<Arc<K::Asset>, QueryError>`,
1654 /// which returns `Err(QueryError::Suspend { asset })` if still loading.
1655 ///
1656 /// # Example
1657 ///
1658 /// ```ignore
1659 /// #[query]
1660 /// fn process_file(db: &impl Db, path: FilePath) -> Result<Output, QueryError> {
1661 /// let content = db.asset(path)?.suspend()?;
1662 /// // Process content...
1663 /// Ok(output)
1664 /// }
1665 /// ```
1666 ///
1667 /// # Errors
1668 ///
1669 /// Returns `Err(QueryError::MissingDependency)` if the asset was not found.
1670 pub fn asset<K: AssetKey>(&self, key: K) -> Result<AssetLoadingState<K>, QueryError> {
1671 let full_asset_key = FullAssetKey::new(&key);
1672 let full_cache_key = FullCacheKey::from_asset_key(&full_asset_key);
1673
1674 // 1. Get current_global FIRST (before accessing the asset)
1675 let current_global = self
1676 .runtime
1677 .whale
1678 .current_revision()
1679 .get(Durability::volatile());
1680
1681 // 2. Emit asset dependency registered event
1682 self.runtime.tracer.on_asset_dependency_registered(
1683 self.exec_ctx.span_id(),
1684 TracerQueryKey::new(self.parent_query_type, self.current_key.debug_repr()),
1685 TracerAssetKey::new(std::any::type_name::<K>(), format!("{:?}", key)),
1686 );
1687
1688 // 3. Record dependency on this asset
1689 self.deps.borrow_mut().push(full_cache_key);
1690
1691 // 4. Get asset AND changed_at from the same whale access (atomic)
1692 // Use the context-aware version to enable dependency tracking in locators
1693 let (result, dep_changed_at) = match self.runtime.get_asset_with_revision_ctx(key, self) {
1694 Ok((state, rev)) => (Ok(state), rev),
1695 Err(e) => return Err(e),
1696 };
1697
1698 // 5. Check consistency - detects if resolve_asset was called during query execution
1699 self.ensure_consistent(current_global, dep_changed_at)?;
1700
1701 // Emit missing dependency event on error
1702 if let Err(QueryError::MissingDependency { ref description }) = result {
1703 self.runtime.tracer.on_missing_dependency(
1704 TracerQueryKey::new(self.parent_query_type, self.current_key.debug_repr()),
1705 description.clone(),
1706 );
1707 }
1708
1709 result
1710 }
1711
1712 /// List all query instances of type Q that have been registered.
1713 ///
1714 /// This method establishes a dependency on the "set" of queries of type Q.
1715 /// The calling query will be invalidated when:
1716 /// - A new query of type Q is first executed (added to set)
1717 ///
1718 /// The calling query will NOT be invalidated when:
1719 /// - An individual query of type Q has its value change
1720 ///
1721 /// # Example
1722 ///
1723 /// ```ignore
1724 /// #[query]
1725 /// fn all_results(db: &impl Db) -> Result<Vec<i32>, QueryError> {
1726 /// let queries = db.list_queries::<MyQuery>();
1727 /// let mut results = Vec::new();
1728 /// for q in queries {
1729 /// results.push(*db.query(q)?);
1730 /// }
1731 /// Ok(results)
1732 /// }
1733 /// ```
1734 pub fn list_queries<Q: Query>(&self) -> Vec<Q> {
1735 // Record dependency on the sentinel (set-level dependency)
1736 let sentinel = FullCacheKey::query_set_sentinel::<Q>();
1737
1738 self.runtime.tracer.on_dependency_registered(
1739 self.exec_ctx.span_id(),
1740 TracerQueryKey::new(self.parent_query_type, self.current_key.debug_repr()),
1741 TracerQueryKey::new("QuerySet", sentinel.debug_repr()),
1742 );
1743
1744 // Ensure sentinel exists in whale (for dependency tracking)
1745 if self.runtime.whale.get(&sentinel).is_none() {
1746 let _ =
1747 self.runtime
1748 .whale
1749 .register(sentinel.clone(), None, Durability::volatile(), vec![]);
1750 }
1751
1752 self.deps.borrow_mut().push(sentinel);
1753
1754 // Return all registered queries
1755 self.runtime.query_registry.get_all::<Q>()
1756 }
1757
1758 /// List all asset keys of type K that have been registered.
1759 ///
1760 /// This method establishes a dependency on the "set" of asset keys of type K.
1761 /// The calling query will be invalidated when:
1762 /// - A new asset of type K is resolved for the first time (added to set)
1763 /// - An asset of type K is removed via remove_asset
1764 ///
1765 /// The calling query will NOT be invalidated when:
1766 /// - An individual asset's value changes (use `db.asset()` for that)
1767 ///
1768 /// # Example
1769 ///
1770 /// ```ignore
1771 /// #[query]
1772 /// fn all_configs(db: &impl Db) -> Result<Vec<String>, QueryError> {
1773 /// let keys = db.list_asset_keys::<ConfigFile>();
1774 /// let mut contents = Vec::new();
1775 /// for key in keys {
1776 /// let content = db.asset(&key)?.suspend()?;
1777 /// contents.push((*content).clone());
1778 /// }
1779 /// Ok(contents)
1780 /// }
1781 /// ```
1782 pub fn list_asset_keys<K: AssetKey>(&self) -> Vec<K> {
1783 // Record dependency on the sentinel (set-level dependency)
1784 let sentinel = FullCacheKey::asset_key_set_sentinel::<K>();
1785
1786 self.runtime.tracer.on_asset_dependency_registered(
1787 self.exec_ctx.span_id(),
1788 TracerQueryKey::new(self.parent_query_type, self.current_key.debug_repr()),
1789 TracerAssetKey::new("AssetKeySet", sentinel.debug_repr()),
1790 );
1791
1792 // Ensure sentinel exists in whale (for dependency tracking)
1793 if self.runtime.whale.get(&sentinel).is_none() {
1794 let _ =
1795 self.runtime
1796 .whale
1797 .register(sentinel.clone(), None, Durability::volatile(), vec![]);
1798 }
1799
1800 self.deps.borrow_mut().push(sentinel);
1801
1802 // Return all registered asset keys
1803 self.runtime.asset_key_registry.get_all::<K>()
1804 }
1805}
1806
1807impl<'a, T: Tracer> Db for QueryContext<'a, T> {
1808 fn query<Q: Query>(&self, query: Q) -> Result<Arc<Q::Output>, QueryError> {
1809 QueryContext::query(self, query)
1810 }
1811
1812 fn asset<K: AssetKey>(&self, key: K) -> Result<AssetLoadingState<K>, QueryError> {
1813 QueryContext::asset(self, key)
1814 }
1815
1816 fn list_queries<Q: Query>(&self) -> Vec<Q> {
1817 QueryContext::list_queries(self)
1818 }
1819
1820 fn list_asset_keys<K: AssetKey>(&self) -> Vec<K> {
1821 QueryContext::list_asset_keys(self)
1822 }
1823}
1824
1825#[cfg(test)]
1826mod tests {
1827 use super::*;
1828
1829 #[test]
1830 fn test_simple_query() {
1831 #[derive(Clone)]
1832 struct Add {
1833 a: i32,
1834 b: i32,
1835 }
1836
1837 impl Query for Add {
1838 type CacheKey = (i32, i32);
1839 type Output = i32;
1840
1841 fn cache_key(&self) -> Self::CacheKey {
1842 (self.a, self.b)
1843 }
1844
1845 fn query(self, _db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
1846 Ok(Arc::new(self.a + self.b))
1847 }
1848
1849 fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
1850 old == new
1851 }
1852 }
1853
1854 let runtime = QueryRuntime::new();
1855
1856 let result = runtime.query(Add { a: 1, b: 2 }).unwrap();
1857 assert_eq!(*result, 3);
1858
1859 // Second query should be cached
1860 let result2 = runtime.query(Add { a: 1, b: 2 }).unwrap();
1861 assert_eq!(*result2, 3);
1862 }
1863
1864 #[test]
1865 fn test_dependent_queries() {
1866 #[derive(Clone)]
1867 struct Base {
1868 value: i32,
1869 }
1870
1871 impl Query for Base {
1872 type CacheKey = i32;
1873 type Output = i32;
1874
1875 fn cache_key(&self) -> Self::CacheKey {
1876 self.value
1877 }
1878
1879 fn query(self, _db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
1880 Ok(Arc::new(self.value * 2))
1881 }
1882
1883 fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
1884 old == new
1885 }
1886 }
1887
1888 #[derive(Clone)]
1889 struct Derived {
1890 base_value: i32,
1891 }
1892
1893 impl Query for Derived {
1894 type CacheKey = i32;
1895 type Output = i32;
1896
1897 fn cache_key(&self) -> Self::CacheKey {
1898 self.base_value
1899 }
1900
1901 fn query(self, db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
1902 let base = db.query(Base {
1903 value: self.base_value,
1904 })?;
1905 Ok(Arc::new(*base + 10))
1906 }
1907
1908 fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
1909 old == new
1910 }
1911 }
1912
1913 let runtime = QueryRuntime::new();
1914
1915 let result = runtime.query(Derived { base_value: 5 }).unwrap();
1916 assert_eq!(*result, 20); // 5 * 2 + 10
1917 }
1918
1919 #[test]
1920 fn test_cycle_detection() {
1921 #[derive(Clone)]
1922 struct CycleA {
1923 id: i32,
1924 }
1925
1926 #[derive(Clone)]
1927 struct CycleB {
1928 id: i32,
1929 }
1930
1931 impl Query for CycleA {
1932 type CacheKey = i32;
1933 type Output = i32;
1934
1935 fn cache_key(&self) -> Self::CacheKey {
1936 self.id
1937 }
1938
1939 fn query(self, db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
1940 let b = db.query(CycleB { id: self.id })?;
1941 Ok(Arc::new(*b + 1))
1942 }
1943
1944 fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
1945 old == new
1946 }
1947 }
1948
1949 impl Query for CycleB {
1950 type CacheKey = i32;
1951 type Output = i32;
1952
1953 fn cache_key(&self) -> Self::CacheKey {
1954 self.id
1955 }
1956
1957 fn query(self, db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
1958 let a = db.query(CycleA { id: self.id })?;
1959 Ok(Arc::new(*a + 1))
1960 }
1961
1962 fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
1963 old == new
1964 }
1965 }
1966
1967 let runtime = QueryRuntime::new();
1968
1969 let result = runtime.query(CycleA { id: 1 });
1970 assert!(matches!(result, Err(QueryError::Cycle { .. })));
1971 }
1972
1973 #[test]
1974 fn test_fallible_query() {
1975 #[derive(Clone)]
1976 struct ParseInt {
1977 input: String,
1978 }
1979
1980 impl Query for ParseInt {
1981 type CacheKey = String;
1982 type Output = Result<i32, std::num::ParseIntError>;
1983
1984 fn cache_key(&self) -> Self::CacheKey {
1985 self.input.clone()
1986 }
1987
1988 fn query(self, _db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
1989 Ok(Arc::new(self.input.parse()))
1990 }
1991
1992 fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
1993 old == new
1994 }
1995 }
1996
1997 let runtime = QueryRuntime::new();
1998
1999 // Valid parse
2000 let result = runtime
2001 .query(ParseInt {
2002 input: "42".to_string(),
2003 })
2004 .unwrap();
2005 assert_eq!(*result, Ok(42));
2006
2007 // Invalid parse - system succeeds, user error in output
2008 let result = runtime
2009 .query(ParseInt {
2010 input: "not_a_number".to_string(),
2011 })
2012 .unwrap();
2013 assert!(result.is_err());
2014 }
2015
2016 // Macro tests
2017 mod macro_tests {
2018 use super::*;
2019 use crate::query;
2020
2021 #[query]
2022 fn add(db: &impl Db, a: i32, b: i32) -> Result<i32, QueryError> {
2023 let _ = db; // silence unused warning
2024 Ok(a + b)
2025 }
2026
2027 #[test]
2028 fn test_macro_basic() {
2029 let runtime = QueryRuntime::new();
2030 let result = runtime.query(Add::new(1, 2)).unwrap();
2031 assert_eq!(*result, 3);
2032 }
2033
2034 #[query]
2035 fn simple_double(db: &impl Db, x: i32) -> Result<i32, QueryError> {
2036 let _ = db;
2037 Ok(x * 2)
2038 }
2039
2040 #[test]
2041 fn test_macro_simple() {
2042 let runtime = QueryRuntime::new();
2043 let result = runtime.query(SimpleDouble::new(5)).unwrap();
2044 assert_eq!(*result, 10);
2045 }
2046
2047 #[query(keys(id))]
2048 fn with_key_selection(
2049 db: &impl Db,
2050 id: u32,
2051 include_extra: bool,
2052 ) -> Result<String, QueryError> {
2053 let _ = db;
2054 Ok(format!("id={}, extra={}", id, include_extra))
2055 }
2056
2057 #[test]
2058 fn test_macro_key_selection() {
2059 let runtime = QueryRuntime::new();
2060
2061 // Same id, different include_extra - should return cached
2062 let r1 = runtime.query(WithKeySelection::new(1, true)).unwrap();
2063 let r2 = runtime.query(WithKeySelection::new(1, false)).unwrap();
2064
2065 // Both should have same value because only `id` is the key
2066 assert_eq!(*r1, "id=1, extra=true");
2067 assert_eq!(*r2, "id=1, extra=true"); // Cached!
2068 }
2069
2070 #[query]
2071 fn dependent(db: &impl Db, a: i32, b: i32) -> Result<i32, QueryError> {
2072 let sum = db.query(Add::new(a, b))?;
2073 Ok(*sum * 2)
2074 }
2075
2076 #[test]
2077 fn test_macro_dependencies() {
2078 let runtime = QueryRuntime::new();
2079 let result = runtime.query(Dependent::new(3, 4)).unwrap();
2080 assert_eq!(*result, 14); // (3 + 4) * 2
2081 }
2082
2083 #[query(output_eq)]
2084 fn with_output_eq(db: &impl Db, x: i32) -> Result<i32, QueryError> {
2085 let _ = db;
2086 Ok(x * 2)
2087 }
2088
2089 #[test]
2090 fn test_macro_output_eq() {
2091 let runtime = QueryRuntime::new();
2092 let result = runtime.query(WithOutputEq::new(5)).unwrap();
2093 assert_eq!(*result, 10);
2094 }
2095
2096 #[query(name = "CustomName")]
2097 fn original_name(db: &impl Db, x: i32) -> Result<i32, QueryError> {
2098 let _ = db;
2099 Ok(x)
2100 }
2101
2102 #[test]
2103 fn test_macro_custom_name() {
2104 let runtime = QueryRuntime::new();
2105 let result = runtime.query(CustomName::new(42)).unwrap();
2106 assert_eq!(*result, 42);
2107 }
2108
2109 // Test that attribute macros like #[tracing::instrument] are preserved
2110 // We use #[allow(unused_variables)] and #[inline] as test attributes since
2111 // they don't require external dependencies.
2112 #[allow(unused_variables)]
2113 #[inline]
2114 #[query]
2115 fn with_attributes(db: &impl Db, x: i32) -> Result<i32, QueryError> {
2116 // This would warn without #[allow(unused_variables)] on the generated method
2117 let unused_var = 42;
2118 Ok(x * 2)
2119 }
2120
2121 #[test]
2122 fn test_macro_preserves_attributes() {
2123 let runtime = QueryRuntime::new();
2124 // If attributes weren't preserved, this might warn about unused_var
2125 let result = runtime.query(WithAttributes::new(5)).unwrap();
2126 assert_eq!(*result, 10);
2127 }
2128 }
2129
2130 // Tests for poll() and changed_at()
2131 mod poll_tests {
2132 use super::*;
2133
2134 #[derive(Clone)]
2135 struct Counter {
2136 id: i32,
2137 }
2138
2139 impl Query for Counter {
2140 type CacheKey = i32;
2141 type Output = i32;
2142
2143 fn cache_key(&self) -> Self::CacheKey {
2144 self.id
2145 }
2146
2147 fn query(self, _db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
2148 Ok(Arc::new(self.id * 10))
2149 }
2150
2151 fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
2152 old == new
2153 }
2154 }
2155
2156 #[test]
2157 fn test_poll_returns_value_and_revision() {
2158 let runtime = QueryRuntime::new();
2159
2160 let result = runtime.poll(Counter { id: 1 }).unwrap();
2161
2162 // Value should be correct - access through Result and Arc
2163 assert_eq!(**result.value.as_ref().unwrap(), 10);
2164
2165 // Revision should be non-zero after first execution
2166 assert!(result.revision > 0);
2167 }
2168
2169 #[test]
2170 fn test_poll_revision_stable_on_cache_hit() {
2171 let runtime = QueryRuntime::new();
2172
2173 // First poll
2174 let result1 = runtime.poll(Counter { id: 1 }).unwrap();
2175 let rev1 = result1.revision;
2176
2177 // Second poll (cache hit)
2178 let result2 = runtime.poll(Counter { id: 1 }).unwrap();
2179 let rev2 = result2.revision;
2180
2181 // Revision should be the same (no change)
2182 assert_eq!(rev1, rev2);
2183 }
2184
2185 #[test]
2186 fn test_poll_revision_changes_on_invalidate() {
2187 let runtime = QueryRuntime::new();
2188
2189 // First poll
2190 let result1 = runtime.poll(Counter { id: 1 }).unwrap();
2191 let rev1 = result1.revision;
2192
2193 // Invalidate and poll again
2194 runtime.invalidate::<Counter>(&1);
2195 let result2 = runtime.poll(Counter { id: 1 }).unwrap();
2196 let rev2 = result2.revision;
2197
2198 // Revision should increase (value was recomputed)
2199 // Note: Since output_eq returns true (same value), this might not change
2200 // depending on early cutoff behavior. Let's verify the value is still correct.
2201 assert_eq!(**result2.value.as_ref().unwrap(), 10);
2202
2203 // With early cutoff, revision might stay the same if value didn't change
2204 // This is expected behavior
2205 assert!(rev2 >= rev1);
2206 }
2207
2208 #[test]
2209 fn test_changed_at_returns_none_for_unexecuted_query() {
2210 let runtime = QueryRuntime::new();
2211
2212 // Query has never been executed
2213 let rev = runtime.changed_at::<Counter>(&1);
2214 assert!(rev.is_none());
2215 }
2216
2217 #[test]
2218 fn test_changed_at_returns_revision_after_execution() {
2219 let runtime = QueryRuntime::new();
2220
2221 // Execute the query
2222 let _ = runtime.query(Counter { id: 1 }).unwrap();
2223
2224 // Now changed_at should return Some
2225 let rev = runtime.changed_at::<Counter>(&1);
2226 assert!(rev.is_some());
2227 assert!(rev.unwrap() > 0);
2228 }
2229
2230 #[test]
2231 fn test_changed_at_matches_poll_revision() {
2232 let runtime = QueryRuntime::new();
2233
2234 // Poll the query
2235 let result = runtime.poll(Counter { id: 1 }).unwrap();
2236
2237 // changed_at should match the revision from poll
2238 let rev = runtime.changed_at::<Counter>(&1);
2239 assert_eq!(rev, Some(result.revision));
2240 }
2241
2242 #[test]
2243 fn test_poll_value_access() {
2244 let runtime = QueryRuntime::new();
2245
2246 let result = runtime.poll(Counter { id: 5 }).unwrap();
2247
2248 // Access through Result and Arc
2249 let value: &i32 = result.value.as_ref().unwrap();
2250 assert_eq!(*value, 50);
2251
2252 // Access Arc directly via field after unwrapping Result
2253 let arc: &Arc<i32> = result.value.as_ref().unwrap();
2254 assert_eq!(**arc, 50);
2255 }
2256
2257 #[test]
2258 fn test_subscription_pattern() {
2259 let runtime = QueryRuntime::new();
2260
2261 // Simulate subscription pattern
2262 let mut last_revision: RevisionCounter = 0;
2263 let mut notifications = 0;
2264
2265 // First poll - should notify (new value)
2266 let result = runtime.poll(Counter { id: 1 }).unwrap();
2267 if result.revision > last_revision {
2268 notifications += 1;
2269 last_revision = result.revision;
2270 }
2271
2272 // Second poll - should NOT notify (no change)
2273 let result = runtime.poll(Counter { id: 1 }).unwrap();
2274 if result.revision > last_revision {
2275 notifications += 1;
2276 last_revision = result.revision;
2277 }
2278
2279 // Third poll - should NOT notify (no change)
2280 let result = runtime.poll(Counter { id: 1 }).unwrap();
2281 if result.revision > last_revision {
2282 notifications += 1;
2283 #[allow(unused_assignments)]
2284 {
2285 last_revision = result.revision;
2286 }
2287 }
2288
2289 // Only the first poll should have triggered a notification
2290 assert_eq!(notifications, 1);
2291 }
2292 }
2293
2294 // Tests for GC APIs
2295 mod gc_tests {
2296 use super::*;
2297 use std::collections::HashSet;
2298 use std::sync::atomic::{AtomicUsize, Ordering};
2299 use std::sync::Mutex;
2300
2301 #[derive(Clone)]
2302 struct Leaf {
2303 id: i32,
2304 }
2305
2306 impl Query for Leaf {
2307 type CacheKey = i32;
2308 type Output = i32;
2309
2310 fn cache_key(&self) -> Self::CacheKey {
2311 self.id
2312 }
2313
2314 fn query(self, _db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
2315 Ok(Arc::new(self.id * 10))
2316 }
2317
2318 fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
2319 old == new
2320 }
2321 }
2322
2323 #[derive(Clone)]
2324 struct Parent {
2325 child_id: i32,
2326 }
2327
2328 impl Query for Parent {
2329 type CacheKey = i32;
2330 type Output = i32;
2331
2332 fn cache_key(&self) -> Self::CacheKey {
2333 self.child_id
2334 }
2335
2336 fn query(self, db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
2337 let child = db.query(Leaf { id: self.child_id })?;
2338 Ok(Arc::new(*child + 1))
2339 }
2340
2341 fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
2342 old == new
2343 }
2344 }
2345
2346 #[test]
2347 fn test_query_keys_returns_all_cached_queries() {
2348 let runtime = QueryRuntime::new();
2349
2350 // Execute some queries
2351 let _ = runtime.query(Leaf { id: 1 }).unwrap();
2352 let _ = runtime.query(Leaf { id: 2 }).unwrap();
2353 let _ = runtime.query(Leaf { id: 3 }).unwrap();
2354
2355 // Get all keys
2356 let keys = runtime.query_keys();
2357
2358 // Should have at least 3 keys (might have more due to sentinels)
2359 assert!(keys.len() >= 3);
2360 }
2361
2362 #[test]
2363 fn test_remove_removes_query() {
2364 let runtime = QueryRuntime::new();
2365
2366 // Execute a query
2367 let _ = runtime.query(Leaf { id: 1 }).unwrap();
2368
2369 // Get the key
2370 let full_key = FullCacheKey::new::<Leaf, _>(&1);
2371
2372 // Query should exist
2373 assert!(runtime.changed_at::<Leaf>(&1).is_some());
2374
2375 // Remove it
2376 assert!(runtime.remove(&full_key));
2377
2378 // Query should no longer exist
2379 assert!(runtime.changed_at::<Leaf>(&1).is_none());
2380 }
2381
2382 #[test]
2383 fn test_remove_if_unused_removes_leaf_query() {
2384 let runtime = QueryRuntime::new();
2385
2386 // Execute a leaf query (no dependents)
2387 let _ = runtime.query(Leaf { id: 1 }).unwrap();
2388
2389 // Should be removable since no other query depends on it
2390 assert!(runtime.remove_query_if_unused::<Leaf>(&1));
2391
2392 // Query should no longer exist
2393 assert!(runtime.changed_at::<Leaf>(&1).is_none());
2394 }
2395
2396 #[test]
2397 fn test_remove_if_unused_does_not_remove_query_with_dependents() {
2398 let runtime = QueryRuntime::new();
2399
2400 // Execute parent query (which depends on Leaf)
2401 let _ = runtime.query(Parent { child_id: 1 }).unwrap();
2402
2403 // Leaf query should not be removable since Parent depends on it
2404 assert!(!runtime.remove_query_if_unused::<Leaf>(&1));
2405
2406 // Leaf query should still exist
2407 assert!(runtime.changed_at::<Leaf>(&1).is_some());
2408
2409 // But Parent should be removable (no dependents)
2410 assert!(runtime.remove_query_if_unused::<Parent>(&1));
2411 }
2412
2413 #[test]
2414 fn test_remove_if_unused_with_full_cache_key() {
2415 let runtime = QueryRuntime::new();
2416
2417 // Execute a leaf query
2418 let _ = runtime.query(Leaf { id: 1 }).unwrap();
2419
2420 let full_key = FullCacheKey::new::<Leaf, _>(&1);
2421
2422 // Should be removable via FullCacheKey
2423 assert!(runtime.remove_if_unused(&full_key));
2424
2425 // Query should no longer exist
2426 assert!(runtime.changed_at::<Leaf>(&1).is_none());
2427 }
2428
2429 // Test tracer receives on_query_key calls
2430 struct GcTracker {
2431 accessed_keys: Mutex<HashSet<String>>,
2432 access_count: AtomicUsize,
2433 }
2434
2435 impl GcTracker {
2436 fn new() -> Self {
2437 Self {
2438 accessed_keys: Mutex::new(HashSet::new()),
2439 access_count: AtomicUsize::new(0),
2440 }
2441 }
2442 }
2443
2444 impl Tracer for GcTracker {
2445 fn new_span_id(&self) -> SpanId {
2446 SpanId(1)
2447 }
2448
2449 fn on_query_key(&self, full_key: &FullCacheKey) {
2450 self.accessed_keys
2451 .lock()
2452 .unwrap()
2453 .insert(full_key.debug_repr().to_string());
2454 self.access_count.fetch_add(1, Ordering::Relaxed);
2455 }
2456 }
2457
2458 #[test]
2459 fn test_tracer_receives_on_query_key() {
2460 let tracker = GcTracker::new();
2461 let runtime = QueryRuntime::with_tracer(tracker);
2462
2463 // Execute some queries
2464 let _ = runtime.query(Leaf { id: 1 }).unwrap();
2465 let _ = runtime.query(Leaf { id: 2 }).unwrap();
2466
2467 // Tracer should have received on_query_key calls
2468 let count = runtime.tracer().access_count.load(Ordering::Relaxed);
2469 assert_eq!(count, 2);
2470
2471 // Check that the keys were recorded
2472 let keys = runtime.tracer().accessed_keys.lock().unwrap();
2473 assert!(keys.iter().any(|k| k.contains("Leaf")));
2474 }
2475
2476 #[test]
2477 fn test_tracer_receives_on_query_key_for_cache_hits() {
2478 let tracker = GcTracker::new();
2479 let runtime = QueryRuntime::with_tracer(tracker);
2480
2481 // Execute query twice (second is cache hit)
2482 let _ = runtime.query(Leaf { id: 1 }).unwrap();
2483 let _ = runtime.query(Leaf { id: 1 }).unwrap();
2484
2485 // Tracer should have received on_query_key for both calls
2486 let count = runtime.tracer().access_count.load(Ordering::Relaxed);
2487 assert_eq!(count, 2);
2488 }
2489
2490 #[test]
2491 fn test_gc_workflow() {
2492 let tracker = GcTracker::new();
2493 let runtime = QueryRuntime::with_tracer(tracker);
2494
2495 // Execute some queries
2496 let _ = runtime.query(Leaf { id: 1 }).unwrap();
2497 let _ = runtime.query(Leaf { id: 2 }).unwrap();
2498 let _ = runtime.query(Leaf { id: 3 }).unwrap();
2499
2500 // Simulate GC: remove all queries that are not in use
2501 let mut removed = 0;
2502 for key in runtime.query_keys() {
2503 if runtime.remove_if_unused(&key) {
2504 removed += 1;
2505 }
2506 }
2507
2508 // All leaf queries should be removable
2509 assert!(removed >= 3);
2510
2511 // Queries should no longer exist
2512 assert!(runtime.changed_at::<Leaf>(&1).is_none());
2513 assert!(runtime.changed_at::<Leaf>(&2).is_none());
2514 assert!(runtime.changed_at::<Leaf>(&3).is_none());
2515 }
2516 }
2517}