query_flow/runtime.rs
1//! Query runtime and context.
2
3use std::any::{Any, TypeId};
4use std::cell::{Cell, RefCell};
5use std::ops::Deref;
6use std::sync::Arc;
7
8use whale::{Durability, GetOrInsertResult, RevisionCounter, Runtime as WhaleRuntime};
9
10use crate::asset::{AssetKey, AssetLocator, DurabilityLevel, FullAssetKey, PendingAsset};
11use crate::db::Db;
12use crate::key::FullCacheKey;
13use crate::loading::AssetLoadingState;
14use crate::query::Query;
15use crate::storage::{
16 AssetKeyRegistry, CachedEntry, CachedValue, ErasedLocateResult, LocatorStorage, PendingStorage,
17 QueryRegistry, VerifierStorage,
18};
19use crate::tracer::{
20 ExecutionResult, InvalidationReason, NoopTracer, SpanId, Tracer, TracerAssetKey,
21 TracerAssetState, TracerQueryKey,
22};
23use crate::QueryError;
24
25/// Function type for comparing user errors during early cutoff.
26///
27/// Used by `QueryRuntimeBuilder::error_comparator` to customize how
28/// `QueryError::UserError` values are compared for caching purposes.
29pub type ErrorComparator = fn(&anyhow::Error, &anyhow::Error) -> bool;
30
31/// Number of durability levels (matches whale's default).
32const DURABILITY_LEVELS: usize = 4;
33
34// Thread-local query execution stack for cycle detection.
35thread_local! {
36 static QUERY_STACK: RefCell<Vec<FullCacheKey>> = const { RefCell::new(Vec::new()) };
37}
38
39/// Execution context passed through query execution.
40///
41/// Contains a span ID for tracing correlation.
42#[derive(Clone, Copy)]
43pub struct ExecutionContext {
44 span_id: SpanId,
45}
46
47impl ExecutionContext {
48 /// Create a new execution context with the given span ID.
49 #[inline]
50 pub fn new(span_id: SpanId) -> Self {
51 Self { span_id }
52 }
53
54 /// Get the span ID for this execution context.
55 #[inline]
56 pub fn span_id(&self) -> SpanId {
57 self.span_id
58 }
59}
60
61/// Result of polling a query, containing the value and its revision.
62///
63/// This is returned by [`QueryRuntime::poll`] and provides both the query result
64/// and its change revision, enabling efficient change detection for subscription
65/// patterns.
66///
67/// # Example
68///
69/// ```ignore
70/// let result = runtime.poll(MyQuery::new())?;
71///
72/// // Access the value via Deref
73/// println!("{:?}", *result);
74///
75/// // Check if changed since last poll
76/// if result.revision > last_known_revision {
77/// notify_subscribers(&result.value);
78/// last_known_revision = result.revision;
79/// }
80/// ```
81#[derive(Debug, Clone)]
82pub struct Polled<T> {
83 /// The query result value.
84 pub value: T,
85 /// The revision at which this value was last changed.
86 ///
87 /// Compare this with a previously stored revision to detect changes.
88 pub revision: RevisionCounter,
89}
90
91impl<T: Deref> Deref for Polled<T> {
92 type Target = T::Target;
93
94 fn deref(&self) -> &Self::Target {
95 &self.value
96 }
97}
98
99/// The query runtime manages query execution, caching, and dependency tracking.
100///
101/// This is cheap to clone - all data is behind `Arc`.
102///
103/// # Type Parameter
104///
105/// - `T: Tracer` - The tracer type for observability. Use `NoopTracer` (default)
106/// for zero-cost when tracing is not needed.
107///
108/// # Example
109///
110/// ```ignore
111/// // Without tracing (default)
112/// let runtime = QueryRuntime::new();
113///
114/// // With tracing
115/// let tracer = MyTracer::new();
116/// let runtime = QueryRuntime::with_tracer(tracer);
117///
118/// // Sync query execution
119/// let result = runtime.query(MyQuery { ... })?;
120/// ```
121pub struct QueryRuntime<T: Tracer = NoopTracer> {
122 /// Whale runtime for dependency tracking and cache storage.
123 /// Query outputs and asset values are stored in Node.data as Option<CachedEntry>.
124 whale: WhaleRuntime<FullCacheKey, Option<CachedEntry>, DURABILITY_LEVELS>,
125 /// Registered asset locators
126 locators: Arc<LocatorStorage<T>>,
127 /// Pending asset requests
128 pending: Arc<PendingStorage>,
129 /// Registry for tracking query instances (for list_queries)
130 query_registry: Arc<QueryRegistry>,
131 /// Registry for tracking asset keys (for list_asset_keys)
132 asset_key_registry: Arc<AssetKeyRegistry>,
133 /// Verifiers for re-executing queries (for verify-then-decide pattern)
134 verifiers: Arc<VerifierStorage>,
135 /// Comparator for user errors during early cutoff
136 error_comparator: ErrorComparator,
137 /// Tracer for observability
138 tracer: Arc<T>,
139}
140
141impl Default for QueryRuntime<NoopTracer> {
142 fn default() -> Self {
143 Self::new()
144 }
145}
146
147impl<T: Tracer> Clone for QueryRuntime<T> {
148 fn clone(&self) -> Self {
149 Self {
150 whale: self.whale.clone(),
151 locators: self.locators.clone(),
152 pending: self.pending.clone(),
153 query_registry: self.query_registry.clone(),
154 asset_key_registry: self.asset_key_registry.clone(),
155 verifiers: self.verifiers.clone(),
156 error_comparator: self.error_comparator,
157 tracer: self.tracer.clone(),
158 }
159 }
160}
161
162/// Default error comparator that treats all errors as different.
163///
164/// This is conservative - it always triggers recomputation when an error occurs.
165fn default_error_comparator(_a: &anyhow::Error, _b: &anyhow::Error) -> bool {
166 false
167}
168
169impl<T: Tracer> QueryRuntime<T> {
170 /// Get cached output along with its revision (single atomic access).
171 fn get_cached_with_revision<Q: Query>(
172 &self,
173 key: &FullCacheKey,
174 ) -> Option<(CachedValue<Arc<Q::Output>>, RevisionCounter)> {
175 let node = self.whale.get(key)?;
176 let revision = node.changed_at;
177 let entry = node.data.as_ref()?;
178 let cached = entry.to_cached_value::<Q::Output>()?;
179 Some((cached, revision))
180 }
181
182 /// Get a reference to the tracer.
183 #[inline]
184 pub fn tracer(&self) -> &T {
185 &self.tracer
186 }
187}
188
189impl QueryRuntime<NoopTracer> {
190 /// Create a new query runtime with default settings.
191 pub fn new() -> Self {
192 Self::with_tracer(NoopTracer)
193 }
194
195 /// Create a builder for customizing the runtime.
196 ///
197 /// # Example
198 ///
199 /// ```ignore
200 /// let runtime = QueryRuntime::builder()
201 /// .error_comparator(|a, b| {
202 /// // Custom error comparison logic
203 /// match (a.downcast_ref::<MyError>(), b.downcast_ref::<MyError>()) {
204 /// (Some(a), Some(b)) => a == b,
205 /// _ => false,
206 /// }
207 /// })
208 /// .build();
209 /// ```
210 pub fn builder() -> QueryRuntimeBuilder<NoopTracer> {
211 QueryRuntimeBuilder::new()
212 }
213}
214
215impl<T: Tracer> QueryRuntime<T> {
216 /// Create a new query runtime with the specified tracer.
217 pub fn with_tracer(tracer: T) -> Self {
218 QueryRuntimeBuilder::new().tracer(tracer).build()
219 }
220
221 /// Execute a query synchronously.
222 ///
223 /// Returns the cached result if valid, otherwise executes the query.
224 ///
225 /// # Errors
226 ///
227 /// - `QueryError::Suspend` - Query is waiting for async loading
228 /// - `QueryError::Cycle` - Dependency cycle detected
229 pub fn query<Q: Query>(&self, query: Q) -> Result<Arc<Q::Output>, QueryError> {
230 self.query_internal(query)
231 .and_then(|(inner_result, _)| inner_result.map_err(QueryError::UserError))
232 }
233
234 /// Internal implementation shared by query() and poll().
235 ///
236 /// Returns (result, revision) tuple where result is either Ok(output) or Err(user_error).
237 /// System errors (Suspend, Cycle, etc.) are returned as the outer Err.
238 #[allow(clippy::type_complexity)]
239 fn query_internal<Q: Query>(
240 &self,
241 query: Q,
242 ) -> Result<(Result<Arc<Q::Output>, Arc<anyhow::Error>>, RevisionCounter), QueryError> {
243 let key = query.cache_key();
244 let full_key = FullCacheKey::new::<Q, _>(&key);
245
246 // Emit query key event for GC tracking (before other events)
247 self.tracer.on_query_key(&full_key);
248
249 // Create execution context and emit start event
250 let span_id = self.tracer.new_span_id();
251 let exec_ctx = ExecutionContext::new(span_id);
252 let query_key = TracerQueryKey::new(std::any::type_name::<Q>(), full_key.debug_repr());
253
254 self.tracer.on_query_start(span_id, query_key.clone());
255
256 // Check for cycles using thread-local stack
257 let cycle_detected = QUERY_STACK.with(|stack| {
258 let stack = stack.borrow();
259 stack.iter().any(|k| k == &full_key)
260 });
261
262 if cycle_detected {
263 let path = QUERY_STACK.with(|stack| {
264 let stack = stack.borrow();
265 let mut path: Vec<String> =
266 stack.iter().map(|k| k.debug_repr().to_string()).collect();
267 path.push(full_key.debug_repr().to_string());
268 path
269 });
270
271 self.tracer.on_cycle_detected(
272 path.iter()
273 .map(|s| TracerQueryKey::new("", s.clone()))
274 .collect(),
275 );
276 self.tracer
277 .on_query_end(span_id, query_key.clone(), ExecutionResult::CycleDetected);
278
279 return Err(QueryError::Cycle { path });
280 }
281
282 // Check if cached and valid (with verify-then-decide pattern)
283 let current_rev = self.whale.current_revision();
284
285 // Fast path: already verified at current revision
286 if self.whale.is_verified_at(&full_key, ¤t_rev) {
287 // Single atomic access to get both cached value and revision
288 if let Some((cached, revision)) = self.get_cached_with_revision::<Q>(&full_key) {
289 self.tracer.on_cache_check(span_id, query_key.clone(), true);
290 self.tracer
291 .on_query_end(span_id, query_key.clone(), ExecutionResult::CacheHit);
292
293 return match cached {
294 CachedValue::Ok(output) => Ok((Ok(output), revision)),
295 CachedValue::UserError(err) => Ok((Err(err), revision)),
296 };
297 }
298 }
299
300 // Check shallow validity (deps' changed_at ok) and try verify-then-decide
301 if self.whale.is_valid(&full_key) {
302 // Single atomic access to get both cached value and revision
303 if let Some((cached, revision)) = self.get_cached_with_revision::<Q>(&full_key) {
304 // Shallow valid but not verified - verify deps first
305 let mut deps_verified = true;
306 if let Some(deps) = self.whale.get_dependency_ids(&full_key) {
307 for dep in deps {
308 if let Some(verifier) = self.verifiers.get(&dep) {
309 // Re-query dep to verify it (triggers recursive verification)
310 if verifier.verify(self as &dyn std::any::Any).is_err() {
311 deps_verified = false;
312 break;
313 }
314 }
315 }
316 }
317
318 // Re-check validity after deps are verified
319 if deps_verified && self.whale.is_valid(&full_key) {
320 // Deps didn't change their changed_at, mark as verified and use cached
321 self.whale.mark_verified(&full_key, ¤t_rev);
322
323 self.tracer.on_cache_check(span_id, query_key.clone(), true);
324 self.tracer
325 .on_query_end(span_id, query_key.clone(), ExecutionResult::CacheHit);
326
327 return match cached {
328 CachedValue::Ok(output) => Ok((Ok(output), revision)),
329 CachedValue::UserError(err) => Ok((Err(err), revision)),
330 };
331 }
332 // A dep's changed_at increased, fall through to execute
333 }
334 }
335
336 self.tracer
337 .on_cache_check(span_id, query_key.clone(), false);
338
339 // Execute the query with cycle tracking
340 QUERY_STACK.with(|stack| {
341 stack.borrow_mut().push(full_key.clone());
342 });
343
344 let result = self.execute_query::<Q>(&query, &full_key, exec_ctx);
345
346 QUERY_STACK.with(|stack| {
347 stack.borrow_mut().pop();
348 });
349
350 // Emit end event
351 let exec_result = match &result {
352 Ok((_, true, _)) => ExecutionResult::Changed,
353 Ok((_, false, _)) => ExecutionResult::Unchanged,
354 Err(QueryError::Suspend { .. }) => ExecutionResult::Suspended,
355 Err(QueryError::Cycle { .. }) => ExecutionResult::CycleDetected,
356 Err(e) => ExecutionResult::Error {
357 message: format!("{:?}", e),
358 },
359 };
360 self.tracer
361 .on_query_end(span_id, query_key.clone(), exec_result);
362
363 result.map(|(inner_result, _, revision)| (inner_result, revision))
364 }
365
366 /// Execute a query, caching the result if appropriate.
367 ///
368 /// Returns (result, output_changed, revision) tuple.
369 /// - `result`: Ok(output) for success, Err(user_error) for user errors
370 /// - System errors (Suspend, Cycle, etc.) are returned as outer Err
371 #[allow(clippy::type_complexity)]
372 fn execute_query<Q: Query>(
373 &self,
374 query: &Q,
375 full_key: &FullCacheKey,
376 exec_ctx: ExecutionContext,
377 ) -> Result<
378 (
379 Result<Arc<Q::Output>, Arc<anyhow::Error>>,
380 bool,
381 RevisionCounter,
382 ),
383 QueryError,
384 > {
385 // Create context for this query execution
386 let ctx = QueryContext {
387 runtime: self,
388 current_key: full_key.clone(),
389 parent_query_type: std::any::type_name::<Q>(),
390 exec_ctx,
391 deps: RefCell::new(Vec::new()),
392 first_access_revision: Cell::new(None),
393 };
394
395 // Execute the query (clone because query() takes ownership)
396 let result = query.clone().query(&ctx);
397
398 // Get collected dependencies
399 let deps: Vec<FullCacheKey> = ctx.deps.borrow().clone();
400
401 // Query durability defaults to stable - Whale will automatically reduce
402 // the effective durability to min(requested, min(dep_durabilities)).
403 // A pure query with no dependencies remains stable.
404 // A query depending on volatile assets becomes volatile.
405 let durability = Durability::stable();
406
407 match result {
408 Ok(output) => {
409 // Check if output changed (for early cutoff)
410 // existing_revision is Some only when output is unchanged (can reuse revision)
411 let existing_revision = if let Some((CachedValue::Ok(old), rev)) =
412 self.get_cached_with_revision::<Q>(full_key)
413 {
414 if Q::output_eq(&*old, &*output) {
415 Some(rev) // Same output - reuse revision
416 } else {
417 None // Different output
418 }
419 } else {
420 None // No previous Ok value
421 };
422 let output_changed = existing_revision.is_none();
423
424 // Emit early cutoff check event
425 self.tracer.on_early_cutoff_check(
426 exec_ctx.span_id(),
427 TracerQueryKey::new(std::any::type_name::<Q>(), full_key.debug_repr()),
428 output_changed,
429 );
430
431 // Update whale with cached entry (atomic update of value + dependency state)
432 let entry = CachedEntry::Ok(output.clone() as Arc<dyn std::any::Any + Send + Sync>);
433 let revision = if let Some(existing_rev) = existing_revision {
434 // confirm_unchanged doesn't change changed_at, use existing
435 let _ = self.whale.confirm_unchanged(full_key, deps);
436 existing_rev
437 } else {
438 // Use new_rev from register result
439 match self
440 .whale
441 .register(full_key.clone(), Some(entry), durability, deps)
442 {
443 Ok(result) => result.new_rev,
444 Err(missing) => {
445 return Err(QueryError::DependenciesRemoved {
446 missing_keys: missing,
447 })
448 }
449 }
450 };
451
452 // Register query in registry for list_queries
453 let is_new_query = self.query_registry.register(query);
454 if is_new_query {
455 let sentinel = FullCacheKey::query_set_sentinel::<Q>();
456 let _ = self
457 .whale
458 .register(sentinel, None, Durability::stable(), vec![]);
459 }
460
461 // Store verifier for this query (for verify-then-decide pattern)
462 self.verifiers
463 .insert::<Q, T>(full_key.clone(), query.clone());
464
465 Ok((Ok(output), output_changed, revision))
466 }
467 Err(QueryError::UserError(err)) => {
468 // Check if error changed (for early cutoff)
469 // existing_revision is Some only when error is unchanged (can reuse revision)
470 let existing_revision = if let Some((CachedValue::UserError(old_err), rev)) =
471 self.get_cached_with_revision::<Q>(full_key)
472 {
473 if (self.error_comparator)(old_err.as_ref(), err.as_ref()) {
474 Some(rev) // Same error - reuse revision
475 } else {
476 None // Different error
477 }
478 } else {
479 None // No previous UserError
480 };
481 let output_changed = existing_revision.is_none();
482
483 // Emit early cutoff check event
484 self.tracer.on_early_cutoff_check(
485 exec_ctx.span_id(),
486 TracerQueryKey::new(std::any::type_name::<Q>(), full_key.debug_repr()),
487 output_changed,
488 );
489
490 // Update whale with cached error (atomic update of value + dependency state)
491 let entry = CachedEntry::UserError(err.clone());
492 let revision = if let Some(existing_rev) = existing_revision {
493 // confirm_unchanged doesn't change changed_at, use existing
494 let _ = self.whale.confirm_unchanged(full_key, deps);
495 existing_rev
496 } else {
497 // Use new_rev from register result
498 match self
499 .whale
500 .register(full_key.clone(), Some(entry), durability, deps)
501 {
502 Ok(result) => result.new_rev,
503 Err(missing) => {
504 return Err(QueryError::DependenciesRemoved {
505 missing_keys: missing,
506 })
507 }
508 }
509 };
510
511 // Register query in registry for list_queries
512 let is_new_query = self.query_registry.register(query);
513 if is_new_query {
514 let sentinel = FullCacheKey::query_set_sentinel::<Q>();
515 let _ = self
516 .whale
517 .register(sentinel, None, Durability::stable(), vec![]);
518 }
519
520 // Store verifier for this query (for verify-then-decide pattern)
521 self.verifiers
522 .insert::<Q, T>(full_key.clone(), query.clone());
523
524 Ok((Err(err), output_changed, revision))
525 }
526 Err(e) => {
527 // System errors (Suspend, Cycle, Cancelled, MissingDependency) are not cached
528 Err(e)
529 }
530 }
531 }
532
533 /// Invalidate a query, forcing recomputation on next access.
534 ///
535 /// This also invalidates any queries that depend on this one.
536 pub fn invalidate<Q: Query>(&self, key: &Q::CacheKey) {
537 let full_key = FullCacheKey::new::<Q, _>(key);
538
539 self.tracer.on_query_invalidated(
540 TracerQueryKey::new(std::any::type_name::<Q>(), full_key.debug_repr()),
541 InvalidationReason::ManualInvalidation,
542 );
543
544 // Update whale to invalidate dependents (register with None to clear cached value)
545 // Use stable durability to increment all revision counters, ensuring queries
546 // at any durability level will see this as a change.
547 let _ = self
548 .whale
549 .register(full_key, None, Durability::stable(), vec![]);
550 }
551
552 /// Remove a query from the cache entirely, freeing memory.
553 ///
554 /// Use this for GC when a query is no longer needed.
555 /// Unlike `invalidate`, this removes all traces of the query from storage.
556 /// The query will be recomputed from scratch on next access.
557 ///
558 /// This also invalidates any queries that depend on this one.
559 pub fn remove_query<Q: Query>(&self, key: &Q::CacheKey) {
560 let full_key = FullCacheKey::new::<Q, _>(key);
561
562 self.tracer.on_query_invalidated(
563 TracerQueryKey::new(std::any::type_name::<Q>(), full_key.debug_repr()),
564 InvalidationReason::ManualInvalidation,
565 );
566
567 // Remove verifier if exists
568 self.verifiers.remove(&full_key);
569
570 // Remove from whale storage (this also handles dependent invalidation)
571 self.whale.remove(&full_key);
572
573 // Remove from registry and update sentinel for list_queries
574 if self.query_registry.remove::<Q>(key) {
575 let sentinel = FullCacheKey::query_set_sentinel::<Q>();
576 let _ = self
577 .whale
578 .register(sentinel, None, Durability::stable(), vec![]);
579 }
580 }
581
582 /// Clear all cached values by removing all nodes from whale.
583 ///
584 /// Note: This is a relatively expensive operation as it iterates through all keys.
585 pub fn clear_cache(&self) {
586 let keys = self.whale.keys();
587 for key in keys {
588 self.whale.remove(&key);
589 }
590 }
591
592 /// Poll a query, returning both the result and its change revision.
593 ///
594 /// This is useful for implementing subscription patterns where you need to
595 /// detect changes efficiently. Compare the returned `revision` with a
596 /// previously stored value to determine if the query result has changed.
597 ///
598 /// The returned `Polled` contains a `Result<Arc<Q::Output>, Arc<anyhow::Error>>`
599 /// as its value, allowing you to track revision changes for both success and
600 /// user error cases.
601 ///
602 /// # Example
603 ///
604 /// ```ignore
605 /// struct Subscription<Q: Query> {
606 /// query: Q,
607 /// last_revision: RevisionCounter,
608 /// tx: Sender<Result<Arc<Q::Output>, Arc<anyhow::Error>>>,
609 /// }
610 ///
611 /// // Polling loop
612 /// for sub in &mut subscriptions {
613 /// let result = runtime.poll(sub.query.clone())?;
614 /// if result.revision > sub.last_revision {
615 /// sub.tx.send(result.value.clone())?;
616 /// sub.last_revision = result.revision;
617 /// }
618 /// }
619 /// ```
620 ///
621 /// # Errors
622 ///
623 /// Returns `Err` only for system errors (Suspend, Cycle, etc.).
624 /// User errors are returned as `Ok(Polled { value: Err(error), ... })`.
625 #[allow(clippy::type_complexity)]
626 pub fn poll<Q: Query>(
627 &self,
628 query: Q,
629 ) -> Result<Polled<Result<Arc<Q::Output>, Arc<anyhow::Error>>>, QueryError> {
630 let (value, revision) = self.query_internal(query)?;
631 Ok(Polled { value, revision })
632 }
633
634 /// Get the change revision of a query without executing it.
635 ///
636 /// Returns `None` if the query has never been executed.
637 ///
638 /// This is useful for checking if a query has changed since the last poll
639 /// without the cost of executing the query.
640 ///
641 /// # Example
642 ///
643 /// ```ignore
644 /// // Check if query has changed before deciding to poll
645 /// if let Some(rev) = runtime.changed_at::<MyQuery>(&key) {
646 /// if rev > last_known_revision {
647 /// let result = runtime.query(MyQuery::new(key))?;
648 /// // Process result...
649 /// }
650 /// }
651 /// ```
652 pub fn changed_at<Q: Query>(&self, key: &Q::CacheKey) -> Option<RevisionCounter> {
653 let full_key = FullCacheKey::new::<Q, _>(key);
654 self.whale.get(&full_key).map(|node| node.changed_at)
655 }
656}
657
658// ============================================================================
659// GC (Garbage Collection) API
660// ============================================================================
661
662impl<T: Tracer> QueryRuntime<T> {
663 /// Get all query keys currently in the cache.
664 ///
665 /// This is useful for implementing custom garbage collection strategies.
666 /// Use this in combination with [`Tracer::on_query_key`] to track access
667 /// times and implement LRU, TTL, or other GC algorithms externally.
668 ///
669 /// # Example
670 ///
671 /// ```ignore
672 /// // Collect all keys that haven't been accessed recently
673 /// let stale_keys: Vec<_> = runtime.query_keys()
674 /// .filter(|key| tracker.is_stale(key))
675 /// .collect();
676 ///
677 /// // Remove stale queries that have no dependents
678 /// for key in stale_keys {
679 /// runtime.remove_if_unused(&key);
680 /// }
681 /// ```
682 pub fn query_keys(&self) -> Vec<FullCacheKey> {
683 self.whale.keys()
684 }
685
686 /// Remove a query if it has no dependents.
687 ///
688 /// Returns `true` if the query was removed, `false` if it has dependents
689 /// or doesn't exist. This is the safe way to remove queries during GC,
690 /// as it won't break queries that depend on this one.
691 ///
692 /// # Example
693 ///
694 /// ```ignore
695 /// let key = FullCacheKey::new::<MyQuery, _>(&cache_key);
696 /// if runtime.remove_query_if_unused::<MyQuery>(&cache_key) {
697 /// println!("Query removed");
698 /// } else {
699 /// println!("Query has dependents, not removed");
700 /// }
701 /// ```
702 pub fn remove_query_if_unused<Q: Query>(&self, key: &Q::CacheKey) -> bool {
703 let full_key = FullCacheKey::new::<Q, _>(key);
704 self.remove_if_unused(&full_key)
705 }
706
707 /// Remove a query by its [`FullCacheKey`].
708 ///
709 /// This is the type-erased version of [`remove_query`](Self::remove_query).
710 /// Use this when you have a `FullCacheKey` from [`query_keys`](Self::query_keys)
711 /// or [`Tracer::on_query_key`].
712 ///
713 /// Returns `true` if the query was removed, `false` if it doesn't exist.
714 ///
715 /// # Warning
716 ///
717 /// This forcibly removes the query even if other queries depend on it.
718 /// Dependent queries will be recomputed on next access. For safe GC,
719 /// use [`remove_if_unused`](Self::remove_if_unused) instead.
720 pub fn remove(&self, key: &FullCacheKey) -> bool {
721 // Remove verifier if exists
722 self.verifiers.remove(key);
723
724 // Remove from whale storage
725 self.whale.remove(key).is_some()
726 }
727
728 /// Remove a query by its [`FullCacheKey`] if it has no dependents.
729 ///
730 /// This is the type-erased version of [`remove_query_if_unused`](Self::remove_query_if_unused).
731 /// Use this when you have a `FullCacheKey` from [`query_keys`](Self::query_keys)
732 /// or [`Tracer::on_query_key`].
733 ///
734 /// Returns `true` if the query was removed, `false` if it has dependents
735 /// or doesn't exist.
736 ///
737 /// # Example
738 ///
739 /// ```ignore
740 /// // Implement LRU GC
741 /// for key in runtime.query_keys() {
742 /// if tracker.is_expired(&key) {
743 /// runtime.remove_if_unused(&key);
744 /// }
745 /// }
746 /// ```
747 pub fn remove_if_unused(&self, key: &FullCacheKey) -> bool {
748 if self.whale.remove_if_unused(key.clone()).is_some() {
749 // Successfully removed - clean up verifier
750 self.verifiers.remove(key);
751 true
752 } else {
753 false
754 }
755 }
756}
757
758// ============================================================================
759// Builder
760// ============================================================================
761
762/// Builder for [`QueryRuntime`] with customizable settings.
763///
764/// # Example
765///
766/// ```ignore
767/// let runtime = QueryRuntime::builder()
768/// .error_comparator(|a, b| {
769/// // Treat all errors of the same type as equal
770/// a.downcast_ref::<std::io::Error>().is_some()
771/// == b.downcast_ref::<std::io::Error>().is_some()
772/// })
773/// .build();
774/// ```
775pub struct QueryRuntimeBuilder<T: Tracer = NoopTracer> {
776 error_comparator: ErrorComparator,
777 tracer: T,
778}
779
780impl Default for QueryRuntimeBuilder<NoopTracer> {
781 fn default() -> Self {
782 Self::new()
783 }
784}
785
786impl QueryRuntimeBuilder<NoopTracer> {
787 /// Create a new builder with default settings.
788 pub fn new() -> Self {
789 Self {
790 error_comparator: default_error_comparator,
791 tracer: NoopTracer,
792 }
793 }
794}
795
796impl<T: Tracer> QueryRuntimeBuilder<T> {
797 /// Set the error comparator function for early cutoff optimization.
798 ///
799 /// When a query returns `QueryError::UserError`, this function is used
800 /// to compare it with the previously cached error. If they are equal,
801 /// downstream queries can skip recomputation (early cutoff).
802 ///
803 /// The default comparator returns `false` for all errors, meaning errors
804 /// are always considered different (conservative, always recomputes).
805 ///
806 /// # Example
807 ///
808 /// ```ignore
809 /// // Treat errors as equal if they have the same display message
810 /// let runtime = QueryRuntime::builder()
811 /// .error_comparator(|a, b| a.to_string() == b.to_string())
812 /// .build();
813 /// ```
814 pub fn error_comparator(mut self, f: ErrorComparator) -> Self {
815 self.error_comparator = f;
816 self
817 }
818
819 /// Set the tracer for observability.
820 pub fn tracer<U: Tracer>(self, tracer: U) -> QueryRuntimeBuilder<U> {
821 QueryRuntimeBuilder {
822 error_comparator: self.error_comparator,
823 tracer,
824 }
825 }
826
827 /// Build the runtime with the configured settings.
828 pub fn build(self) -> QueryRuntime<T> {
829 QueryRuntime {
830 whale: WhaleRuntime::new(),
831 locators: Arc::new(LocatorStorage::new()),
832 pending: Arc::new(PendingStorage::new()),
833 query_registry: Arc::new(QueryRegistry::new()),
834 asset_key_registry: Arc::new(AssetKeyRegistry::new()),
835 verifiers: Arc::new(VerifierStorage::new()),
836 error_comparator: self.error_comparator,
837 tracer: Arc::new(self.tracer),
838 }
839 }
840}
841
842// ============================================================================
843// Asset API
844// ============================================================================
845
846impl<T: Tracer> QueryRuntime<T> {
847 /// Register an asset locator for a specific asset key type.
848 ///
849 /// Only one locator can be registered per key type. Later registrations
850 /// replace earlier ones.
851 ///
852 /// # Example
853 ///
854 /// ```ignore
855 /// let runtime = QueryRuntime::new();
856 /// runtime.register_asset_locator(FileSystemLocator::new("/assets"));
857 /// ```
858 pub fn register_asset_locator<K, L>(&self, locator: L)
859 where
860 K: AssetKey,
861 L: AssetLocator<K>,
862 {
863 self.locators.insert::<K, L>(locator);
864 }
865
866 /// Get an iterator over pending asset requests.
867 ///
868 /// Returns assets that have been requested but not yet resolved.
869 /// The user should fetch these externally and call `resolve_asset()`.
870 ///
871 /// # Example
872 ///
873 /// ```ignore
874 /// for pending in runtime.pending_assets() {
875 /// if let Some(path) = pending.key::<FilePath>() {
876 /// let content = fetch_file(path);
877 /// runtime.resolve_asset(path.clone(), content);
878 /// }
879 /// }
880 /// ```
881 pub fn pending_assets(&self) -> Vec<PendingAsset> {
882 self.pending.get_all()
883 }
884
885 /// Get pending assets filtered by key type.
886 pub fn pending_assets_of<K: AssetKey>(&self) -> Vec<K> {
887 self.pending.get_of_type::<K>()
888 }
889
890 /// Check if there are any pending assets.
891 pub fn has_pending_assets(&self) -> bool {
892 !self.pending.is_empty()
893 }
894
895 /// Resolve an asset with its loaded value.
896 ///
897 /// This marks the asset as ready and invalidates any queries that
898 /// depend on it (if the value changed), triggering recomputation on next access.
899 ///
900 /// This method is idempotent - resolving with the same value (via `asset_eq`)
901 /// will not trigger downstream recomputation.
902 ///
903 /// # Arguments
904 ///
905 /// * `key` - The asset key identifying this resource
906 /// * `value` - The loaded asset value
907 /// * `durability` - How frequently this asset is expected to change
908 ///
909 /// # Example
910 ///
911 /// ```ignore
912 /// let content = std::fs::read_to_string(&path)?;
913 /// runtime.resolve_asset(FilePath(path), content, DurabilityLevel::Volatile);
914 /// ```
915 pub fn resolve_asset<K: AssetKey>(&self, key: K, value: K::Asset, durability: DurabilityLevel) {
916 self.resolve_asset_internal(key, value, durability);
917 }
918
919 /// Resolve an asset with an error.
920 ///
921 /// This marks the asset as errored and caches the error. Queries depending
922 /// on this asset will receive `Err(QueryError::UserError(...))`.
923 ///
924 /// Use this when async loading fails (e.g., network error, file not found,
925 /// access denied).
926 ///
927 /// # Arguments
928 ///
929 /// * `key` - The asset key identifying this resource
930 /// * `error` - The error to cache (will be wrapped in `Arc`)
931 /// * `durability` - How frequently this error state is expected to change
932 ///
933 /// # Example
934 ///
935 /// ```ignore
936 /// match fetch_file(&path) {
937 /// Ok(content) => runtime.resolve_asset(FilePath(path), content, DurabilityLevel::Volatile),
938 /// Err(e) => runtime.resolve_asset_error(FilePath(path), e, DurabilityLevel::Volatile),
939 /// }
940 /// ```
941 pub fn resolve_asset_error<K: AssetKey>(
942 &self,
943 key: K,
944 error: impl Into<anyhow::Error>,
945 durability: DurabilityLevel,
946 ) {
947 let full_asset_key = FullAssetKey::new(&key);
948 let full_cache_key = FullCacheKey::from_asset_key(&full_asset_key);
949
950 // Remove from pending BEFORE registering the error
951 self.pending.remove(&full_asset_key);
952
953 // Prepare the error entry
954 let error_arc = Arc::new(error.into());
955 let entry = CachedEntry::AssetError(error_arc.clone());
956 let durability =
957 Durability::new(durability.as_u8() as usize).unwrap_or(Durability::volatile());
958
959 // Atomic compare-and-update (errors are always considered changed for now)
960 let result = self
961 .whale
962 .update_with_compare(
963 full_cache_key,
964 Some(entry),
965 |old_data, _new_data| {
966 // Compare old and new errors using error_comparator
967 match old_data.and_then(|d| d.as_ref()) {
968 Some(CachedEntry::AssetError(old_err)) => {
969 !(self.error_comparator)(old_err.as_ref(), error_arc.as_ref())
970 }
971 _ => true, // Loading, Ready, or not present -> changed
972 }
973 },
974 durability,
975 vec![],
976 )
977 .expect("update_with_compare with no dependencies cannot fail");
978
979 // Emit asset resolved event (with changed status)
980 self.tracer.on_asset_resolved(
981 TracerAssetKey::new(std::any::type_name::<K>(), format!("{:?}", key)),
982 result.changed,
983 );
984
985 // Register asset key in registry for list_asset_keys
986 let is_new_asset = self.asset_key_registry.register(&key);
987 if is_new_asset {
988 // Update sentinel to invalidate list_asset_keys dependents
989 let sentinel = FullCacheKey::asset_key_set_sentinel::<K>();
990 let _ = self
991 .whale
992 .register(sentinel, None, Durability::stable(), vec![]);
993 }
994 }
995
996 fn resolve_asset_internal<K: AssetKey>(
997 &self,
998 key: K,
999 value: K::Asset,
1000 durability_level: DurabilityLevel,
1001 ) {
1002 let full_asset_key = FullAssetKey::new(&key);
1003 let full_cache_key = FullCacheKey::from_asset_key(&full_asset_key);
1004
1005 // Remove from pending BEFORE registering the value
1006 self.pending.remove(&full_asset_key);
1007
1008 // Prepare the new entry
1009 let value_arc: Arc<K::Asset> = Arc::new(value);
1010 let entry = CachedEntry::AssetReady(value_arc.clone() as Arc<dyn Any + Send + Sync>);
1011 let durability =
1012 Durability::new(durability_level.as_u8() as usize).unwrap_or(Durability::volatile());
1013
1014 // Atomic compare-and-update
1015 let result = self
1016 .whale
1017 .update_with_compare(
1018 full_cache_key,
1019 Some(entry),
1020 |old_data, _new_data| {
1021 // Compare old and new values
1022 match old_data.and_then(|d| d.as_ref()) {
1023 Some(CachedEntry::AssetReady(old_arc)) => {
1024 match old_arc.clone().downcast::<K::Asset>() {
1025 Ok(old_value) => !K::asset_eq(&old_value, &value_arc),
1026 Err(_) => true, // Type mismatch, treat as changed
1027 }
1028 }
1029 _ => true, // Loading, NotFound, or not present -> changed
1030 }
1031 },
1032 durability,
1033 vec![],
1034 )
1035 .expect("update_with_compare with no dependencies cannot fail");
1036
1037 // Emit asset resolved event
1038 self.tracer.on_asset_resolved(
1039 TracerAssetKey::new(std::any::type_name::<K>(), format!("{:?}", key)),
1040 result.changed,
1041 );
1042
1043 // Register asset key in registry for list_asset_keys
1044 let is_new_asset = self.asset_key_registry.register(&key);
1045 if is_new_asset {
1046 // Update sentinel to invalidate list_asset_keys dependents
1047 let sentinel = FullCacheKey::asset_key_set_sentinel::<K>();
1048 let _ = self
1049 .whale
1050 .register(sentinel, None, Durability::stable(), vec![]);
1051 }
1052 }
1053
1054 /// Invalidate an asset, forcing queries to re-request it.
1055 ///
1056 /// The asset will be marked as loading and added to pending assets.
1057 /// Dependent queries will suspend until the asset is resolved again.
1058 ///
1059 /// # Example
1060 ///
1061 /// ```ignore
1062 /// // File was modified externally
1063 /// runtime.invalidate_asset(&FilePath("config.json".into()));
1064 /// // Queries depending on this asset will now suspend
1065 /// // User should fetch the new value and call resolve_asset
1066 /// ```
1067 pub fn invalidate_asset<K: AssetKey>(&self, key: &K) {
1068 let full_asset_key = FullAssetKey::new(key);
1069 let full_cache_key = FullCacheKey::from_asset_key(&full_asset_key);
1070
1071 // Emit asset invalidated event
1072 self.tracer.on_asset_invalidated(TracerAssetKey::new(
1073 std::any::type_name::<K>(),
1074 format!("{:?}", key),
1075 ));
1076
1077 // Add to pending FIRST (before clearing whale state)
1078 // This ensures: readers see either old value, or Loading+pending
1079 self.pending.insert::<K>(full_asset_key, key.clone());
1080
1081 // Atomic: clear cached value + invalidate dependents
1082 // Using None for data means "needs to be loaded"
1083 // Use stable durability to ensure queries at any durability level see the change.
1084 let _ = self
1085 .whale
1086 .register(full_cache_key, None, Durability::stable(), vec![]);
1087 }
1088
1089 /// Remove an asset from the cache entirely.
1090 ///
1091 /// Unlike `invalidate_asset`, this removes all traces of the asset.
1092 /// Dependent queries will go through the locator again on next access.
1093 pub fn remove_asset<K: AssetKey>(&self, key: &K) {
1094 let full_asset_key = FullAssetKey::new(key);
1095 let full_cache_key = FullCacheKey::from_asset_key(&full_asset_key);
1096
1097 // Remove from pending first
1098 self.pending.remove(&full_asset_key);
1099
1100 // Remove from whale (this also cleans up dependency edges)
1101 // whale.remove() invalidates dependents before removing
1102 self.whale.remove(&full_cache_key);
1103
1104 // Remove from registry and update sentinel for list_asset_keys
1105 if self.asset_key_registry.remove::<K>(key) {
1106 let sentinel = FullCacheKey::asset_key_set_sentinel::<K>();
1107 let _ = self
1108 .whale
1109 .register(sentinel, None, Durability::stable(), vec![]);
1110 }
1111 }
1112
1113 /// Get an asset by key without tracking dependencies.
1114 ///
1115 /// Unlike `QueryContext::asset()`, this method does NOT register the caller
1116 /// as a dependent of the asset. Use this for direct asset access outside
1117 /// of query execution.
1118 ///
1119 /// # Returns
1120 ///
1121 /// - `Ok(AssetLoadingState::ready(...))` - Asset is loaded and ready
1122 /// - `Ok(AssetLoadingState::loading(...))` - Asset is still loading (added to pending)
1123 /// - `Err(QueryError::MissingDependency)` - Asset was not found
1124 pub fn get_asset<K: AssetKey>(&self, key: K) -> Result<AssetLoadingState<K>, QueryError> {
1125 self.get_asset_internal(key)
1126 }
1127
1128 /// Internal: Get asset state and its changed_at revision atomically.
1129 ///
1130 /// This is the "direct" version called from QueryRuntime::asset (no dependency tracking).
1131 /// For calls from QueryContext::asset, use `get_asset_with_revision_ctx`.
1132 ///
1133 /// Returns (AssetLoadingState, changed_at) where changed_at is from the same
1134 /// whale node that provided the asset value.
1135 fn get_asset_with_revision<K: AssetKey>(
1136 &self,
1137 key: K,
1138 ) -> Result<(AssetLoadingState<K>, RevisionCounter), QueryError> {
1139 let full_asset_key = FullAssetKey::new(&key);
1140 let full_cache_key = FullCacheKey::from_asset_key(&full_asset_key);
1141
1142 // Helper to emit AssetRequested event
1143 let emit_requested = |tracer: &T, key: &K, state: TracerAssetState| {
1144 tracer.on_asset_requested(
1145 TracerAssetKey::new(std::any::type_name::<K>(), format!("{:?}", key)),
1146 state,
1147 );
1148 };
1149
1150 // Check whale cache first (single atomic read)
1151 if let Some(node) = self.whale.get(&full_cache_key) {
1152 let changed_at = node.changed_at;
1153 // Check if valid at current revision
1154 if self.whale.is_valid(&full_cache_key) {
1155 match &node.data {
1156 Some(CachedEntry::AssetReady(arc)) => {
1157 emit_requested(&self.tracer, &key, TracerAssetState::Ready);
1158 match arc.clone().downcast::<K::Asset>() {
1159 Ok(value) => {
1160 return Ok((AssetLoadingState::ready(key, value), changed_at))
1161 }
1162 Err(_) => {
1163 return Err(QueryError::MissingDependency {
1164 description: format!("Asset type mismatch: {:?}", key),
1165 })
1166 }
1167 }
1168 }
1169 Some(CachedEntry::AssetError(err)) => {
1170 emit_requested(&self.tracer, &key, TracerAssetState::NotFound);
1171 return Err(QueryError::UserError(err.clone()));
1172 }
1173 None => {
1174 // Loading state
1175 emit_requested(&self.tracer, &key, TracerAssetState::Loading);
1176 return Ok((AssetLoadingState::loading(key), changed_at));
1177 }
1178 _ => {
1179 // Query-related entries (Ok, UserError) shouldn't be here
1180 // Fall through to locator
1181 }
1182 }
1183 }
1184 }
1185
1186 // Not in cache or invalid - try locator
1187 if let Some(result) = self
1188 .locators
1189 .locate_with_runtime(TypeId::of::<K>(), self, &key)
1190 {
1191 match result {
1192 Ok(ErasedLocateResult::Ready {
1193 value: arc,
1194 durability: durability_level,
1195 }) => {
1196 emit_requested(&self.tracer, &key, TracerAssetState::Ready);
1197
1198 let typed_value: Arc<K::Asset> = match arc.downcast::<K::Asset>() {
1199 Ok(v) => v,
1200 Err(_) => {
1201 return Err(QueryError::MissingDependency {
1202 description: format!("Asset type mismatch: {:?}", key),
1203 });
1204 }
1205 };
1206
1207 // Store in whale atomically with early cutoff
1208 let entry = CachedEntry::AssetReady(typed_value.clone());
1209 let durability = Durability::new(durability_level.as_u8() as usize)
1210 .unwrap_or(Durability::volatile());
1211 let new_value = typed_value.clone();
1212 let result = self
1213 .whale
1214 .update_with_compare(
1215 full_cache_key,
1216 Some(entry),
1217 |old_data, _new_data| {
1218 let Some(CachedEntry::AssetReady(old_arc)) =
1219 old_data.and_then(|d| d.as_ref())
1220 else {
1221 return true;
1222 };
1223 let Ok(old_value) = old_arc.clone().downcast::<K::Asset>() else {
1224 return true;
1225 };
1226 !K::asset_eq(&old_value, &new_value)
1227 },
1228 durability,
1229 vec![],
1230 )
1231 .expect("update_with_compare with no dependencies cannot fail");
1232
1233 return Ok((AssetLoadingState::ready(key, typed_value), result.revision));
1234 }
1235 Ok(ErasedLocateResult::Pending) => {
1236 emit_requested(&self.tracer, &key, TracerAssetState::Loading);
1237
1238 // Add to pending list for Pending result
1239 self.pending.insert::<K>(full_asset_key, key.clone());
1240 match self
1241 .whale
1242 .get_or_insert(full_cache_key, None, Durability::volatile(), vec![])
1243 .expect("get_or_insert with no dependencies cannot fail")
1244 {
1245 GetOrInsertResult::Inserted(node) => {
1246 return Ok((AssetLoadingState::loading(key), node.changed_at));
1247 }
1248 GetOrInsertResult::Existing(node) => {
1249 let changed_at = node.changed_at;
1250 match &node.data {
1251 Some(CachedEntry::AssetReady(arc)) => {
1252 match arc.clone().downcast::<K::Asset>() {
1253 Ok(value) => {
1254 return Ok((
1255 AssetLoadingState::ready(key, value),
1256 changed_at,
1257 ))
1258 }
1259 Err(_) => {
1260 return Ok((
1261 AssetLoadingState::loading(key),
1262 changed_at,
1263 ))
1264 }
1265 }
1266 }
1267 Some(CachedEntry::AssetError(err)) => {
1268 return Err(QueryError::UserError(err.clone()));
1269 }
1270 _ => return Ok((AssetLoadingState::loading(key), changed_at)),
1271 }
1272 }
1273 }
1274 }
1275 Err(QueryError::UserError(err)) => {
1276 // Locator returned a user error - cache it as AssetError
1277 let entry = CachedEntry::AssetError(err.clone());
1278 let _ = self.whale.register(
1279 full_cache_key,
1280 Some(entry),
1281 Durability::volatile(),
1282 vec![],
1283 );
1284 return Err(QueryError::UserError(err));
1285 }
1286 Err(e) => {
1287 // Other errors (Cycle, Suspended, etc.) - do NOT cache, propagate directly
1288 return Err(e);
1289 }
1290 }
1291 }
1292
1293 // No locator registered or locator returned None - mark as pending
1294 emit_requested(&self.tracer, &key, TracerAssetState::Loading);
1295 self.pending
1296 .insert::<K>(full_asset_key.clone(), key.clone());
1297
1298 match self
1299 .whale
1300 .get_or_insert(full_cache_key, None, Durability::volatile(), vec![])
1301 .expect("get_or_insert with no dependencies cannot fail")
1302 {
1303 GetOrInsertResult::Inserted(node) => {
1304 Ok((AssetLoadingState::loading(key), node.changed_at))
1305 }
1306 GetOrInsertResult::Existing(node) => {
1307 let changed_at = node.changed_at;
1308 match &node.data {
1309 Some(CachedEntry::AssetReady(arc)) => {
1310 match arc.clone().downcast::<K::Asset>() {
1311 Ok(value) => Ok((AssetLoadingState::ready(key, value), changed_at)),
1312 Err(_) => Ok((AssetLoadingState::loading(key), changed_at)),
1313 }
1314 }
1315 Some(CachedEntry::AssetError(err)) => Err(QueryError::UserError(err.clone())),
1316 _ => Ok((AssetLoadingState::loading(key), changed_at)),
1317 }
1318 }
1319 }
1320 }
1321
1322 /// Internal: Get asset state and its changed_at revision atomically (with QueryContext).
1323 ///
1324 /// This version is called from QueryContext::asset and enables dependency tracking
1325 /// within the locator's db.query() and db.asset() calls.
1326 fn get_asset_with_revision_ctx<K: AssetKey>(
1327 &self,
1328 key: K,
1329 ctx: &QueryContext<'_, T>,
1330 ) -> Result<(AssetLoadingState<K>, RevisionCounter), QueryError> {
1331 let full_asset_key = FullAssetKey::new(&key);
1332 let full_cache_key = FullCacheKey::from_asset_key(&full_asset_key);
1333
1334 // Helper to emit AssetRequested event
1335 let emit_requested = |tracer: &T, key: &K, state: TracerAssetState| {
1336 tracer.on_asset_requested(
1337 TracerAssetKey::new(std::any::type_name::<K>(), format!("{:?}", key)),
1338 state,
1339 );
1340 };
1341
1342 // Check whale cache first (single atomic read)
1343 if let Some(node) = self.whale.get(&full_cache_key) {
1344 let changed_at = node.changed_at;
1345 // Check if valid at current revision
1346 if self.whale.is_valid(&full_cache_key) {
1347 match &node.data {
1348 Some(CachedEntry::AssetReady(arc)) => {
1349 emit_requested(&self.tracer, &key, TracerAssetState::Ready);
1350 match arc.clone().downcast::<K::Asset>() {
1351 Ok(value) => {
1352 return Ok((AssetLoadingState::ready(key, value), changed_at))
1353 }
1354 Err(_) => {
1355 return Err(QueryError::MissingDependency {
1356 description: format!("Asset type mismatch: {:?}", key),
1357 })
1358 }
1359 }
1360 }
1361 Some(CachedEntry::AssetError(err)) => {
1362 emit_requested(&self.tracer, &key, TracerAssetState::NotFound);
1363 return Err(QueryError::UserError(err.clone()));
1364 }
1365 None => {
1366 // Loading state
1367 emit_requested(&self.tracer, &key, TracerAssetState::Loading);
1368 return Ok((AssetLoadingState::loading(key), changed_at));
1369 }
1370 _ => {
1371 // Query-related entries (Ok, UserError) shouldn't be here
1372 // Fall through to locator
1373 }
1374 }
1375 }
1376 }
1377
1378 // Not in cache or invalid - try locator (with context for dependency tracking)
1379 // First, check for cycles using the query stack (assets share the same stack as queries)
1380 let cycle_detected = QUERY_STACK.with(|stack| {
1381 let stack = stack.borrow();
1382 stack.iter().any(|k| k == &full_cache_key)
1383 });
1384
1385 if cycle_detected {
1386 let path = QUERY_STACK.with(|stack| {
1387 let stack = stack.borrow();
1388 let mut path: Vec<String> =
1389 stack.iter().map(|k| k.debug_repr().to_string()).collect();
1390 path.push(full_cache_key.debug_repr().to_string());
1391 path
1392 });
1393 return Err(QueryError::Cycle { path });
1394 }
1395
1396 // Push asset to stack before calling locator
1397 QUERY_STACK.with(|stack| {
1398 stack.borrow_mut().push(full_cache_key.clone());
1399 });
1400
1401 let locator_result = self.locators.locate_with_ctx(TypeId::of::<K>(), ctx, &key);
1402
1403 // Pop asset from stack after locator returns
1404 QUERY_STACK.with(|stack| {
1405 stack.borrow_mut().pop();
1406 });
1407
1408 if let Some(result) = locator_result {
1409 match result {
1410 Ok(ErasedLocateResult::Ready {
1411 value: arc,
1412 durability: durability_level,
1413 }) => {
1414 emit_requested(&self.tracer, &key, TracerAssetState::Ready);
1415
1416 let typed_value: Arc<K::Asset> = match arc.downcast::<K::Asset>() {
1417 Ok(v) => v,
1418 Err(_) => {
1419 return Err(QueryError::MissingDependency {
1420 description: format!("Asset type mismatch: {:?}", key),
1421 });
1422 }
1423 };
1424
1425 // Store in whale atomically with early cutoff
1426 let entry = CachedEntry::AssetReady(typed_value.clone());
1427 let durability = Durability::new(durability_level.as_u8() as usize)
1428 .unwrap_or(Durability::volatile());
1429 let new_value = typed_value.clone();
1430 let result = self
1431 .whale
1432 .update_with_compare(
1433 full_cache_key,
1434 Some(entry),
1435 |old_data, _new_data| {
1436 let Some(CachedEntry::AssetReady(old_arc)) =
1437 old_data.and_then(|d| d.as_ref())
1438 else {
1439 return true;
1440 };
1441 let Ok(old_value) = old_arc.clone().downcast::<K::Asset>() else {
1442 return true;
1443 };
1444 !K::asset_eq(&old_value, &new_value)
1445 },
1446 durability,
1447 vec![],
1448 )
1449 .expect("update_with_compare with no dependencies cannot fail");
1450
1451 return Ok((AssetLoadingState::ready(key, typed_value), result.revision));
1452 }
1453 Ok(ErasedLocateResult::Pending) => {
1454 emit_requested(&self.tracer, &key, TracerAssetState::Loading);
1455
1456 // Add to pending list for Pending result
1457 self.pending.insert::<K>(full_asset_key, key.clone());
1458 match self
1459 .whale
1460 .get_or_insert(full_cache_key, None, Durability::volatile(), vec![])
1461 .expect("get_or_insert with no dependencies cannot fail")
1462 {
1463 GetOrInsertResult::Inserted(node) => {
1464 return Ok((AssetLoadingState::loading(key), node.changed_at));
1465 }
1466 GetOrInsertResult::Existing(node) => {
1467 let changed_at = node.changed_at;
1468 match &node.data {
1469 Some(CachedEntry::AssetReady(arc)) => {
1470 match arc.clone().downcast::<K::Asset>() {
1471 Ok(value) => {
1472 return Ok((
1473 AssetLoadingState::ready(key, value),
1474 changed_at,
1475 ))
1476 }
1477 Err(_) => {
1478 return Ok((
1479 AssetLoadingState::loading(key),
1480 changed_at,
1481 ))
1482 }
1483 }
1484 }
1485 Some(CachedEntry::AssetError(err)) => {
1486 return Err(QueryError::UserError(err.clone()));
1487 }
1488 _ => return Ok((AssetLoadingState::loading(key), changed_at)),
1489 }
1490 }
1491 }
1492 }
1493 Err(QueryError::UserError(err)) => {
1494 // Locator returned a user error - cache it as AssetError
1495 let entry = CachedEntry::AssetError(err.clone());
1496 let _ = self.whale.register(
1497 full_cache_key,
1498 Some(entry),
1499 Durability::volatile(),
1500 vec![],
1501 );
1502 return Err(QueryError::UserError(err));
1503 }
1504 Err(e) => {
1505 // Other errors (Cycle, Suspended, etc.) - do NOT cache, propagate directly
1506 return Err(e);
1507 }
1508 }
1509 }
1510
1511 // No locator registered or locator returned None - mark as pending
1512 emit_requested(&self.tracer, &key, TracerAssetState::Loading);
1513 self.pending
1514 .insert::<K>(full_asset_key.clone(), key.clone());
1515
1516 match self
1517 .whale
1518 .get_or_insert(full_cache_key, None, Durability::volatile(), vec![])
1519 .expect("get_or_insert with no dependencies cannot fail")
1520 {
1521 GetOrInsertResult::Inserted(node) => {
1522 Ok((AssetLoadingState::loading(key), node.changed_at))
1523 }
1524 GetOrInsertResult::Existing(node) => {
1525 let changed_at = node.changed_at;
1526 match &node.data {
1527 Some(CachedEntry::AssetReady(arc)) => {
1528 match arc.clone().downcast::<K::Asset>() {
1529 Ok(value) => Ok((AssetLoadingState::ready(key, value), changed_at)),
1530 Err(_) => Ok((AssetLoadingState::loading(key), changed_at)),
1531 }
1532 }
1533 Some(CachedEntry::AssetError(err)) => Err(QueryError::UserError(err.clone())),
1534 _ => Ok((AssetLoadingState::loading(key), changed_at)),
1535 }
1536 }
1537 }
1538 }
1539
1540 /// Internal: Get asset state, checking cache and locator.
1541 fn get_asset_internal<K: AssetKey>(&self, key: K) -> Result<AssetLoadingState<K>, QueryError> {
1542 self.get_asset_with_revision(key).map(|(state, _)| state)
1543 }
1544}
1545
1546impl<T: Tracer> Db for QueryRuntime<T> {
1547 fn query<Q: Query>(&self, query: Q) -> Result<Arc<Q::Output>, QueryError> {
1548 QueryRuntime::query(self, query)
1549 }
1550
1551 fn asset<K: AssetKey>(&self, key: K) -> Result<AssetLoadingState<K>, QueryError> {
1552 self.get_asset_internal(key)
1553 }
1554
1555 fn list_queries<Q: Query>(&self) -> Vec<Q> {
1556 self.query_registry.get_all::<Q>()
1557 }
1558
1559 fn list_asset_keys<K: AssetKey>(&self) -> Vec<K> {
1560 self.asset_key_registry.get_all::<K>()
1561 }
1562}
1563
1564/// Context provided to queries during execution.
1565///
1566/// Use this to access dependencies via `query()`.
1567pub struct QueryContext<'a, T: Tracer = NoopTracer> {
1568 runtime: &'a QueryRuntime<T>,
1569 current_key: FullCacheKey,
1570 parent_query_type: &'static str,
1571 exec_ctx: ExecutionContext,
1572 deps: RefCell<Vec<FullCacheKey>>,
1573 /// Revision at first asset access, used for consistency checking.
1574 /// Assets with changed_at > this value were modified during query execution.
1575 first_access_revision: Cell<Option<RevisionCounter>>,
1576}
1577
1578impl<'a, T: Tracer> QueryContext<'a, T> {
1579 /// Ensure consistency of asset access.
1580 ///
1581 /// On first asset access: records max(current_global, dep_changed_at) as baseline.
1582 /// On subsequent asset accesses: checks dep_changed_at <= baseline.
1583 ///
1584 /// IMPORTANT: current_global must be obtained BEFORE accessing the asset.
1585 #[inline]
1586 fn ensure_consistent(
1587 &self,
1588 current_global: RevisionCounter,
1589 dep_changed_at: RevisionCounter,
1590 ) -> Result<(), QueryError> {
1591 match self.first_access_revision.get() {
1592 None => {
1593 // First asset access: record max(current_global, dep_changed_at)
1594 // Using max ensures we don't get false positives when only one asset is accessed
1595 let first = current_global.max(dep_changed_at);
1596 self.first_access_revision.set(Some(first));
1597 Ok(())
1598 }
1599 Some(first) => {
1600 // Subsequent asset accesses: check consistency
1601 if dep_changed_at > first {
1602 Err(QueryError::InconsistentAssetResolution)
1603 } else {
1604 Ok(())
1605 }
1606 }
1607 }
1608 }
1609
1610 /// Query a dependency.
1611 ///
1612 /// The dependency is automatically tracked for invalidation.
1613 ///
1614 /// # Example
1615 ///
1616 /// ```ignore
1617 /// fn query(self, db: &impl Db) -> Result<Self::Output, QueryError> {
1618 /// let dep_result = db.query(OtherQuery { id: self.id })?;
1619 /// Ok(process(&dep_result))
1620 /// }
1621 /// ```
1622 pub fn query<Q: Query>(&self, query: Q) -> Result<Arc<Q::Output>, QueryError> {
1623 let key = query.cache_key();
1624 let full_key = FullCacheKey::new::<Q, _>(&key);
1625
1626 // Emit dependency registered event
1627 self.runtime.tracer.on_dependency_registered(
1628 self.exec_ctx.span_id(),
1629 TracerQueryKey::new(self.parent_query_type, self.current_key.debug_repr()),
1630 TracerQueryKey::new(std::any::type_name::<Q>(), full_key.debug_repr()),
1631 );
1632
1633 // Record this as a dependency
1634 self.deps.borrow_mut().push(full_key.clone());
1635
1636 // Execute the query
1637 self.runtime.query(query)
1638 }
1639
1640 /// Access an asset, tracking it as a dependency.
1641 ///
1642 /// Returns `AssetLoadingState<K>`:
1643 /// - `is_loading()` if the asset is still being loaded
1644 /// - `is_ready()` if the asset is available
1645 ///
1646 /// Use `.suspend()?` to convert to `Result<Arc<K::Asset>, QueryError>`,
1647 /// which returns `Err(QueryError::Suspend { asset })` if still loading.
1648 ///
1649 /// # Example
1650 ///
1651 /// ```ignore
1652 /// #[query]
1653 /// fn process_file(db: &impl Db, path: FilePath) -> Result<Output, QueryError> {
1654 /// let content = db.asset(path)?.suspend()?;
1655 /// // Process content...
1656 /// Ok(output)
1657 /// }
1658 /// ```
1659 ///
1660 /// # Errors
1661 ///
1662 /// Returns `Err(QueryError::MissingDependency)` if the asset was not found.
1663 pub fn asset<K: AssetKey>(&self, key: K) -> Result<AssetLoadingState<K>, QueryError> {
1664 let full_asset_key = FullAssetKey::new(&key);
1665 let full_cache_key = FullCacheKey::from_asset_key(&full_asset_key);
1666
1667 // 1. Get current_global FIRST (before accessing the asset)
1668 let current_global = self
1669 .runtime
1670 .whale
1671 .current_revision()
1672 .get(Durability::volatile());
1673
1674 // 2. Emit asset dependency registered event
1675 self.runtime.tracer.on_asset_dependency_registered(
1676 self.exec_ctx.span_id(),
1677 TracerQueryKey::new(self.parent_query_type, self.current_key.debug_repr()),
1678 TracerAssetKey::new(std::any::type_name::<K>(), format!("{:?}", key)),
1679 );
1680
1681 // 3. Record dependency on this asset
1682 self.deps.borrow_mut().push(full_cache_key);
1683
1684 // 4. Get asset AND changed_at from the same whale access (atomic)
1685 // Use the context-aware version to enable dependency tracking in locators
1686 let (result, dep_changed_at) = match self.runtime.get_asset_with_revision_ctx(key, self) {
1687 Ok((state, rev)) => (Ok(state), rev),
1688 Err(e) => return Err(e),
1689 };
1690
1691 // 5. Check consistency - detects if resolve_asset was called during query execution
1692 self.ensure_consistent(current_global, dep_changed_at)?;
1693
1694 // Emit missing dependency event on error
1695 if let Err(QueryError::MissingDependency { ref description }) = result {
1696 self.runtime.tracer.on_missing_dependency(
1697 TracerQueryKey::new(self.parent_query_type, self.current_key.debug_repr()),
1698 description.clone(),
1699 );
1700 }
1701
1702 result
1703 }
1704
1705 /// List all query instances of type Q that have been registered.
1706 ///
1707 /// This method establishes a dependency on the "set" of queries of type Q.
1708 /// The calling query will be invalidated when:
1709 /// - A new query of type Q is first executed (added to set)
1710 ///
1711 /// The calling query will NOT be invalidated when:
1712 /// - An individual query of type Q has its value change
1713 ///
1714 /// # Example
1715 ///
1716 /// ```ignore
1717 /// #[query]
1718 /// fn all_results(db: &impl Db) -> Result<Vec<i32>, QueryError> {
1719 /// let queries = db.list_queries::<MyQuery>();
1720 /// let mut results = Vec::new();
1721 /// for q in queries {
1722 /// results.push(*db.query(q)?);
1723 /// }
1724 /// Ok(results)
1725 /// }
1726 /// ```
1727 pub fn list_queries<Q: Query>(&self) -> Vec<Q> {
1728 // Record dependency on the sentinel (set-level dependency)
1729 let sentinel = FullCacheKey::query_set_sentinel::<Q>();
1730
1731 self.runtime.tracer.on_dependency_registered(
1732 self.exec_ctx.span_id(),
1733 TracerQueryKey::new(self.parent_query_type, self.current_key.debug_repr()),
1734 TracerQueryKey::new("QuerySet", sentinel.debug_repr()),
1735 );
1736
1737 // Ensure sentinel exists in whale (for dependency tracking)
1738 if self.runtime.whale.get(&sentinel).is_none() {
1739 let _ =
1740 self.runtime
1741 .whale
1742 .register(sentinel.clone(), None, Durability::volatile(), vec![]);
1743 }
1744
1745 self.deps.borrow_mut().push(sentinel);
1746
1747 // Return all registered queries
1748 self.runtime.query_registry.get_all::<Q>()
1749 }
1750
1751 /// List all asset keys of type K that have been registered.
1752 ///
1753 /// This method establishes a dependency on the "set" of asset keys of type K.
1754 /// The calling query will be invalidated when:
1755 /// - A new asset of type K is resolved for the first time (added to set)
1756 /// - An asset of type K is removed via remove_asset
1757 ///
1758 /// The calling query will NOT be invalidated when:
1759 /// - An individual asset's value changes (use `db.asset()` for that)
1760 ///
1761 /// # Example
1762 ///
1763 /// ```ignore
1764 /// #[query]
1765 /// fn all_configs(db: &impl Db) -> Result<Vec<String>, QueryError> {
1766 /// let keys = db.list_asset_keys::<ConfigFile>();
1767 /// let mut contents = Vec::new();
1768 /// for key in keys {
1769 /// let content = db.asset(&key)?.suspend()?;
1770 /// contents.push((*content).clone());
1771 /// }
1772 /// Ok(contents)
1773 /// }
1774 /// ```
1775 pub fn list_asset_keys<K: AssetKey>(&self) -> Vec<K> {
1776 // Record dependency on the sentinel (set-level dependency)
1777 let sentinel = FullCacheKey::asset_key_set_sentinel::<K>();
1778
1779 self.runtime.tracer.on_asset_dependency_registered(
1780 self.exec_ctx.span_id(),
1781 TracerQueryKey::new(self.parent_query_type, self.current_key.debug_repr()),
1782 TracerAssetKey::new("AssetKeySet", sentinel.debug_repr()),
1783 );
1784
1785 // Ensure sentinel exists in whale (for dependency tracking)
1786 if self.runtime.whale.get(&sentinel).is_none() {
1787 let _ =
1788 self.runtime
1789 .whale
1790 .register(sentinel.clone(), None, Durability::volatile(), vec![]);
1791 }
1792
1793 self.deps.borrow_mut().push(sentinel);
1794
1795 // Return all registered asset keys
1796 self.runtime.asset_key_registry.get_all::<K>()
1797 }
1798}
1799
1800impl<'a, T: Tracer> Db for QueryContext<'a, T> {
1801 fn query<Q: Query>(&self, query: Q) -> Result<Arc<Q::Output>, QueryError> {
1802 QueryContext::query(self, query)
1803 }
1804
1805 fn asset<K: AssetKey>(&self, key: K) -> Result<AssetLoadingState<K>, QueryError> {
1806 QueryContext::asset(self, key)
1807 }
1808
1809 fn list_queries<Q: Query>(&self) -> Vec<Q> {
1810 QueryContext::list_queries(self)
1811 }
1812
1813 fn list_asset_keys<K: AssetKey>(&self) -> Vec<K> {
1814 QueryContext::list_asset_keys(self)
1815 }
1816}
1817
1818#[cfg(test)]
1819mod tests {
1820 use super::*;
1821
1822 #[test]
1823 fn test_simple_query() {
1824 #[derive(Clone)]
1825 struct Add {
1826 a: i32,
1827 b: i32,
1828 }
1829
1830 impl Query for Add {
1831 type CacheKey = (i32, i32);
1832 type Output = i32;
1833
1834 fn cache_key(&self) -> Self::CacheKey {
1835 (self.a, self.b)
1836 }
1837
1838 fn query(self, _db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
1839 Ok(Arc::new(self.a + self.b))
1840 }
1841
1842 fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
1843 old == new
1844 }
1845 }
1846
1847 let runtime = QueryRuntime::new();
1848
1849 let result = runtime.query(Add { a: 1, b: 2 }).unwrap();
1850 assert_eq!(*result, 3);
1851
1852 // Second query should be cached
1853 let result2 = runtime.query(Add { a: 1, b: 2 }).unwrap();
1854 assert_eq!(*result2, 3);
1855 }
1856
1857 #[test]
1858 fn test_dependent_queries() {
1859 #[derive(Clone)]
1860 struct Base {
1861 value: i32,
1862 }
1863
1864 impl Query for Base {
1865 type CacheKey = i32;
1866 type Output = i32;
1867
1868 fn cache_key(&self) -> Self::CacheKey {
1869 self.value
1870 }
1871
1872 fn query(self, _db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
1873 Ok(Arc::new(self.value * 2))
1874 }
1875
1876 fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
1877 old == new
1878 }
1879 }
1880
1881 #[derive(Clone)]
1882 struct Derived {
1883 base_value: i32,
1884 }
1885
1886 impl Query for Derived {
1887 type CacheKey = i32;
1888 type Output = i32;
1889
1890 fn cache_key(&self) -> Self::CacheKey {
1891 self.base_value
1892 }
1893
1894 fn query(self, db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
1895 let base = db.query(Base {
1896 value: self.base_value,
1897 })?;
1898 Ok(Arc::new(*base + 10))
1899 }
1900
1901 fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
1902 old == new
1903 }
1904 }
1905
1906 let runtime = QueryRuntime::new();
1907
1908 let result = runtime.query(Derived { base_value: 5 }).unwrap();
1909 assert_eq!(*result, 20); // 5 * 2 + 10
1910 }
1911
1912 #[test]
1913 fn test_cycle_detection() {
1914 #[derive(Clone)]
1915 struct CycleA {
1916 id: i32,
1917 }
1918
1919 #[derive(Clone)]
1920 struct CycleB {
1921 id: i32,
1922 }
1923
1924 impl Query for CycleA {
1925 type CacheKey = i32;
1926 type Output = i32;
1927
1928 fn cache_key(&self) -> Self::CacheKey {
1929 self.id
1930 }
1931
1932 fn query(self, db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
1933 let b = db.query(CycleB { id: self.id })?;
1934 Ok(Arc::new(*b + 1))
1935 }
1936
1937 fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
1938 old == new
1939 }
1940 }
1941
1942 impl Query for CycleB {
1943 type CacheKey = i32;
1944 type Output = i32;
1945
1946 fn cache_key(&self) -> Self::CacheKey {
1947 self.id
1948 }
1949
1950 fn query(self, db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
1951 let a = db.query(CycleA { id: self.id })?;
1952 Ok(Arc::new(*a + 1))
1953 }
1954
1955 fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
1956 old == new
1957 }
1958 }
1959
1960 let runtime = QueryRuntime::new();
1961
1962 let result = runtime.query(CycleA { id: 1 });
1963 assert!(matches!(result, Err(QueryError::Cycle { .. })));
1964 }
1965
1966 #[test]
1967 fn test_fallible_query() {
1968 #[derive(Clone)]
1969 struct ParseInt {
1970 input: String,
1971 }
1972
1973 impl Query for ParseInt {
1974 type CacheKey = String;
1975 type Output = Result<i32, std::num::ParseIntError>;
1976
1977 fn cache_key(&self) -> Self::CacheKey {
1978 self.input.clone()
1979 }
1980
1981 fn query(self, _db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
1982 Ok(Arc::new(self.input.parse()))
1983 }
1984
1985 fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
1986 old == new
1987 }
1988 }
1989
1990 let runtime = QueryRuntime::new();
1991
1992 // Valid parse
1993 let result = runtime
1994 .query(ParseInt {
1995 input: "42".to_string(),
1996 })
1997 .unwrap();
1998 assert_eq!(*result, Ok(42));
1999
2000 // Invalid parse - system succeeds, user error in output
2001 let result = runtime
2002 .query(ParseInt {
2003 input: "not_a_number".to_string(),
2004 })
2005 .unwrap();
2006 assert!(result.is_err());
2007 }
2008
2009 // Macro tests
2010 mod macro_tests {
2011 use super::*;
2012 use crate::query;
2013
2014 #[query]
2015 fn add(db: &impl Db, a: i32, b: i32) -> Result<i32, QueryError> {
2016 let _ = db; // silence unused warning
2017 Ok(a + b)
2018 }
2019
2020 #[test]
2021 fn test_macro_basic() {
2022 let runtime = QueryRuntime::new();
2023 let result = runtime.query(Add::new(1, 2)).unwrap();
2024 assert_eq!(*result, 3);
2025 }
2026
2027 #[query]
2028 fn simple_double(db: &impl Db, x: i32) -> Result<i32, QueryError> {
2029 let _ = db;
2030 Ok(x * 2)
2031 }
2032
2033 #[test]
2034 fn test_macro_simple() {
2035 let runtime = QueryRuntime::new();
2036 let result = runtime.query(SimpleDouble::new(5)).unwrap();
2037 assert_eq!(*result, 10);
2038 }
2039
2040 #[query(keys(id))]
2041 fn with_key_selection(
2042 db: &impl Db,
2043 id: u32,
2044 include_extra: bool,
2045 ) -> Result<String, QueryError> {
2046 let _ = db;
2047 Ok(format!("id={}, extra={}", id, include_extra))
2048 }
2049
2050 #[test]
2051 fn test_macro_key_selection() {
2052 let runtime = QueryRuntime::new();
2053
2054 // Same id, different include_extra - should return cached
2055 let r1 = runtime.query(WithKeySelection::new(1, true)).unwrap();
2056 let r2 = runtime.query(WithKeySelection::new(1, false)).unwrap();
2057
2058 // Both should have same value because only `id` is the key
2059 assert_eq!(*r1, "id=1, extra=true");
2060 assert_eq!(*r2, "id=1, extra=true"); // Cached!
2061 }
2062
2063 #[query]
2064 fn dependent(db: &impl Db, a: i32, b: i32) -> Result<i32, QueryError> {
2065 let sum = db.query(Add::new(a, b))?;
2066 Ok(*sum * 2)
2067 }
2068
2069 #[test]
2070 fn test_macro_dependencies() {
2071 let runtime = QueryRuntime::new();
2072 let result = runtime.query(Dependent::new(3, 4)).unwrap();
2073 assert_eq!(*result, 14); // (3 + 4) * 2
2074 }
2075
2076 #[query(output_eq)]
2077 fn with_output_eq(db: &impl Db, x: i32) -> Result<i32, QueryError> {
2078 let _ = db;
2079 Ok(x * 2)
2080 }
2081
2082 #[test]
2083 fn test_macro_output_eq() {
2084 let runtime = QueryRuntime::new();
2085 let result = runtime.query(WithOutputEq::new(5)).unwrap();
2086 assert_eq!(*result, 10);
2087 }
2088
2089 #[query(name = "CustomName")]
2090 fn original_name(db: &impl Db, x: i32) -> Result<i32, QueryError> {
2091 let _ = db;
2092 Ok(x)
2093 }
2094
2095 #[test]
2096 fn test_macro_custom_name() {
2097 let runtime = QueryRuntime::new();
2098 let result = runtime.query(CustomName::new(42)).unwrap();
2099 assert_eq!(*result, 42);
2100 }
2101
2102 // Test that attribute macros like #[tracing::instrument] are preserved
2103 // We use #[allow(unused_variables)] and #[inline] as test attributes since
2104 // they don't require external dependencies.
2105 #[allow(unused_variables)]
2106 #[inline]
2107 #[query]
2108 fn with_attributes(db: &impl Db, x: i32) -> Result<i32, QueryError> {
2109 // This would warn without #[allow(unused_variables)] on the generated method
2110 let unused_var = 42;
2111 Ok(x * 2)
2112 }
2113
2114 #[test]
2115 fn test_macro_preserves_attributes() {
2116 let runtime = QueryRuntime::new();
2117 // If attributes weren't preserved, this might warn about unused_var
2118 let result = runtime.query(WithAttributes::new(5)).unwrap();
2119 assert_eq!(*result, 10);
2120 }
2121 }
2122
2123 // Tests for poll() and changed_at()
2124 mod poll_tests {
2125 use super::*;
2126
2127 #[derive(Clone)]
2128 struct Counter {
2129 id: i32,
2130 }
2131
2132 impl Query for Counter {
2133 type CacheKey = i32;
2134 type Output = i32;
2135
2136 fn cache_key(&self) -> Self::CacheKey {
2137 self.id
2138 }
2139
2140 fn query(self, _db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
2141 Ok(Arc::new(self.id * 10))
2142 }
2143
2144 fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
2145 old == new
2146 }
2147 }
2148
2149 #[test]
2150 fn test_poll_returns_value_and_revision() {
2151 let runtime = QueryRuntime::new();
2152
2153 let result = runtime.poll(Counter { id: 1 }).unwrap();
2154
2155 // Value should be correct - access through Result and Arc
2156 assert_eq!(**result.value.as_ref().unwrap(), 10);
2157
2158 // Revision should be non-zero after first execution
2159 assert!(result.revision > 0);
2160 }
2161
2162 #[test]
2163 fn test_poll_revision_stable_on_cache_hit() {
2164 let runtime = QueryRuntime::new();
2165
2166 // First poll
2167 let result1 = runtime.poll(Counter { id: 1 }).unwrap();
2168 let rev1 = result1.revision;
2169
2170 // Second poll (cache hit)
2171 let result2 = runtime.poll(Counter { id: 1 }).unwrap();
2172 let rev2 = result2.revision;
2173
2174 // Revision should be the same (no change)
2175 assert_eq!(rev1, rev2);
2176 }
2177
2178 #[test]
2179 fn test_poll_revision_changes_on_invalidate() {
2180 let runtime = QueryRuntime::new();
2181
2182 // First poll
2183 let result1 = runtime.poll(Counter { id: 1 }).unwrap();
2184 let rev1 = result1.revision;
2185
2186 // Invalidate and poll again
2187 runtime.invalidate::<Counter>(&1);
2188 let result2 = runtime.poll(Counter { id: 1 }).unwrap();
2189 let rev2 = result2.revision;
2190
2191 // Revision should increase (value was recomputed)
2192 // Note: Since output_eq returns true (same value), this might not change
2193 // depending on early cutoff behavior. Let's verify the value is still correct.
2194 assert_eq!(**result2.value.as_ref().unwrap(), 10);
2195
2196 // With early cutoff, revision might stay the same if value didn't change
2197 // This is expected behavior
2198 assert!(rev2 >= rev1);
2199 }
2200
2201 #[test]
2202 fn test_changed_at_returns_none_for_unexecuted_query() {
2203 let runtime = QueryRuntime::new();
2204
2205 // Query has never been executed
2206 let rev = runtime.changed_at::<Counter>(&1);
2207 assert!(rev.is_none());
2208 }
2209
2210 #[test]
2211 fn test_changed_at_returns_revision_after_execution() {
2212 let runtime = QueryRuntime::new();
2213
2214 // Execute the query
2215 let _ = runtime.query(Counter { id: 1 }).unwrap();
2216
2217 // Now changed_at should return Some
2218 let rev = runtime.changed_at::<Counter>(&1);
2219 assert!(rev.is_some());
2220 assert!(rev.unwrap() > 0);
2221 }
2222
2223 #[test]
2224 fn test_changed_at_matches_poll_revision() {
2225 let runtime = QueryRuntime::new();
2226
2227 // Poll the query
2228 let result = runtime.poll(Counter { id: 1 }).unwrap();
2229
2230 // changed_at should match the revision from poll
2231 let rev = runtime.changed_at::<Counter>(&1);
2232 assert_eq!(rev, Some(result.revision));
2233 }
2234
2235 #[test]
2236 fn test_poll_value_access() {
2237 let runtime = QueryRuntime::new();
2238
2239 let result = runtime.poll(Counter { id: 5 }).unwrap();
2240
2241 // Access through Result and Arc
2242 let value: &i32 = result.value.as_ref().unwrap();
2243 assert_eq!(*value, 50);
2244
2245 // Access Arc directly via field after unwrapping Result
2246 let arc: &Arc<i32> = result.value.as_ref().unwrap();
2247 assert_eq!(**arc, 50);
2248 }
2249
2250 #[test]
2251 fn test_subscription_pattern() {
2252 let runtime = QueryRuntime::new();
2253
2254 // Simulate subscription pattern
2255 let mut last_revision: RevisionCounter = 0;
2256 let mut notifications = 0;
2257
2258 // First poll - should notify (new value)
2259 let result = runtime.poll(Counter { id: 1 }).unwrap();
2260 if result.revision > last_revision {
2261 notifications += 1;
2262 last_revision = result.revision;
2263 }
2264
2265 // Second poll - should NOT notify (no change)
2266 let result = runtime.poll(Counter { id: 1 }).unwrap();
2267 if result.revision > last_revision {
2268 notifications += 1;
2269 last_revision = result.revision;
2270 }
2271
2272 // Third poll - should NOT notify (no change)
2273 let result = runtime.poll(Counter { id: 1 }).unwrap();
2274 if result.revision > last_revision {
2275 notifications += 1;
2276 #[allow(unused_assignments)]
2277 {
2278 last_revision = result.revision;
2279 }
2280 }
2281
2282 // Only the first poll should have triggered a notification
2283 assert_eq!(notifications, 1);
2284 }
2285 }
2286
2287 // Tests for GC APIs
2288 mod gc_tests {
2289 use super::*;
2290 use std::collections::HashSet;
2291 use std::sync::atomic::{AtomicUsize, Ordering};
2292 use std::sync::Mutex;
2293
2294 #[derive(Clone)]
2295 struct Leaf {
2296 id: i32,
2297 }
2298
2299 impl Query for Leaf {
2300 type CacheKey = i32;
2301 type Output = i32;
2302
2303 fn cache_key(&self) -> Self::CacheKey {
2304 self.id
2305 }
2306
2307 fn query(self, _db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
2308 Ok(Arc::new(self.id * 10))
2309 }
2310
2311 fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
2312 old == new
2313 }
2314 }
2315
2316 #[derive(Clone)]
2317 struct Parent {
2318 child_id: i32,
2319 }
2320
2321 impl Query for Parent {
2322 type CacheKey = i32;
2323 type Output = i32;
2324
2325 fn cache_key(&self) -> Self::CacheKey {
2326 self.child_id
2327 }
2328
2329 fn query(self, db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
2330 let child = db.query(Leaf { id: self.child_id })?;
2331 Ok(Arc::new(*child + 1))
2332 }
2333
2334 fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
2335 old == new
2336 }
2337 }
2338
2339 #[test]
2340 fn test_query_keys_returns_all_cached_queries() {
2341 let runtime = QueryRuntime::new();
2342
2343 // Execute some queries
2344 let _ = runtime.query(Leaf { id: 1 }).unwrap();
2345 let _ = runtime.query(Leaf { id: 2 }).unwrap();
2346 let _ = runtime.query(Leaf { id: 3 }).unwrap();
2347
2348 // Get all keys
2349 let keys = runtime.query_keys();
2350
2351 // Should have at least 3 keys (might have more due to sentinels)
2352 assert!(keys.len() >= 3);
2353 }
2354
2355 #[test]
2356 fn test_remove_removes_query() {
2357 let runtime = QueryRuntime::new();
2358
2359 // Execute a query
2360 let _ = runtime.query(Leaf { id: 1 }).unwrap();
2361
2362 // Get the key
2363 let full_key = FullCacheKey::new::<Leaf, _>(&1);
2364
2365 // Query should exist
2366 assert!(runtime.changed_at::<Leaf>(&1).is_some());
2367
2368 // Remove it
2369 assert!(runtime.remove(&full_key));
2370
2371 // Query should no longer exist
2372 assert!(runtime.changed_at::<Leaf>(&1).is_none());
2373 }
2374
2375 #[test]
2376 fn test_remove_if_unused_removes_leaf_query() {
2377 let runtime = QueryRuntime::new();
2378
2379 // Execute a leaf query (no dependents)
2380 let _ = runtime.query(Leaf { id: 1 }).unwrap();
2381
2382 // Should be removable since no other query depends on it
2383 assert!(runtime.remove_query_if_unused::<Leaf>(&1));
2384
2385 // Query should no longer exist
2386 assert!(runtime.changed_at::<Leaf>(&1).is_none());
2387 }
2388
2389 #[test]
2390 fn test_remove_if_unused_does_not_remove_query_with_dependents() {
2391 let runtime = QueryRuntime::new();
2392
2393 // Execute parent query (which depends on Leaf)
2394 let _ = runtime.query(Parent { child_id: 1 }).unwrap();
2395
2396 // Leaf query should not be removable since Parent depends on it
2397 assert!(!runtime.remove_query_if_unused::<Leaf>(&1));
2398
2399 // Leaf query should still exist
2400 assert!(runtime.changed_at::<Leaf>(&1).is_some());
2401
2402 // But Parent should be removable (no dependents)
2403 assert!(runtime.remove_query_if_unused::<Parent>(&1));
2404 }
2405
2406 #[test]
2407 fn test_remove_if_unused_with_full_cache_key() {
2408 let runtime = QueryRuntime::new();
2409
2410 // Execute a leaf query
2411 let _ = runtime.query(Leaf { id: 1 }).unwrap();
2412
2413 let full_key = FullCacheKey::new::<Leaf, _>(&1);
2414
2415 // Should be removable via FullCacheKey
2416 assert!(runtime.remove_if_unused(&full_key));
2417
2418 // Query should no longer exist
2419 assert!(runtime.changed_at::<Leaf>(&1).is_none());
2420 }
2421
2422 // Test tracer receives on_query_key calls
2423 struct GcTracker {
2424 accessed_keys: Mutex<HashSet<String>>,
2425 access_count: AtomicUsize,
2426 }
2427
2428 impl GcTracker {
2429 fn new() -> Self {
2430 Self {
2431 accessed_keys: Mutex::new(HashSet::new()),
2432 access_count: AtomicUsize::new(0),
2433 }
2434 }
2435 }
2436
2437 impl Tracer for GcTracker {
2438 fn new_span_id(&self) -> SpanId {
2439 SpanId(1)
2440 }
2441
2442 fn on_query_key(&self, full_key: &FullCacheKey) {
2443 self.accessed_keys
2444 .lock()
2445 .unwrap()
2446 .insert(full_key.debug_repr().to_string());
2447 self.access_count.fetch_add(1, Ordering::Relaxed);
2448 }
2449 }
2450
2451 #[test]
2452 fn test_tracer_receives_on_query_key() {
2453 let tracker = GcTracker::new();
2454 let runtime = QueryRuntime::with_tracer(tracker);
2455
2456 // Execute some queries
2457 let _ = runtime.query(Leaf { id: 1 }).unwrap();
2458 let _ = runtime.query(Leaf { id: 2 }).unwrap();
2459
2460 // Tracer should have received on_query_key calls
2461 let count = runtime.tracer().access_count.load(Ordering::Relaxed);
2462 assert_eq!(count, 2);
2463
2464 // Check that the keys were recorded
2465 let keys = runtime.tracer().accessed_keys.lock().unwrap();
2466 assert!(keys.iter().any(|k| k.contains("Leaf")));
2467 }
2468
2469 #[test]
2470 fn test_tracer_receives_on_query_key_for_cache_hits() {
2471 let tracker = GcTracker::new();
2472 let runtime = QueryRuntime::with_tracer(tracker);
2473
2474 // Execute query twice (second is cache hit)
2475 let _ = runtime.query(Leaf { id: 1 }).unwrap();
2476 let _ = runtime.query(Leaf { id: 1 }).unwrap();
2477
2478 // Tracer should have received on_query_key for both calls
2479 let count = runtime.tracer().access_count.load(Ordering::Relaxed);
2480 assert_eq!(count, 2);
2481 }
2482
2483 #[test]
2484 fn test_gc_workflow() {
2485 let tracker = GcTracker::new();
2486 let runtime = QueryRuntime::with_tracer(tracker);
2487
2488 // Execute some queries
2489 let _ = runtime.query(Leaf { id: 1 }).unwrap();
2490 let _ = runtime.query(Leaf { id: 2 }).unwrap();
2491 let _ = runtime.query(Leaf { id: 3 }).unwrap();
2492
2493 // Simulate GC: remove all queries that are not in use
2494 let mut removed = 0;
2495 for key in runtime.query_keys() {
2496 if runtime.remove_if_unused(&key) {
2497 removed += 1;
2498 }
2499 }
2500
2501 // All leaf queries should be removable
2502 assert!(removed >= 3);
2503
2504 // Queries should no longer exist
2505 assert!(runtime.changed_at::<Leaf>(&1).is_none());
2506 assert!(runtime.changed_at::<Leaf>(&2).is_none());
2507 assert!(runtime.changed_at::<Leaf>(&3).is_none());
2508 }
2509 }
2510}