1use std::any::TypeId;
4use std::cell::RefCell;
5use std::sync::Arc;
6
7use std::ops::Deref;
8
9use whale::{Durability, RevisionCounter, Runtime as WhaleRuntime};
10
11use crate::asset::{AssetKey, AssetLocator, DurabilityLevel, FullAssetKey, PendingAsset};
12use crate::key::FullCacheKey;
13use crate::loading::LoadingState;
14use crate::query::Query;
15use crate::storage::{
16 AssetKeyRegistry, AssetState, AssetStorage, CachedEntry, CachedValue, LocatorStorage,
17 PendingStorage, QueryRegistry, VerifierStorage,
18};
19use crate::QueryError;
20
21pub type ErrorComparator = fn(&anyhow::Error, &anyhow::Error) -> bool;
26
27#[cfg(feature = "inspector")]
28use query_flow_inspector::{EventSink, FlowEvent, SpanId};
29
30const DURABILITY_LEVELS: usize = 4;
32
33thread_local! {
35 static QUERY_STACK: RefCell<Vec<FullCacheKey>> = const { RefCell::new(Vec::new()) };
36}
37
38#[cfg(feature = "inspector")]
43#[derive(Clone, Copy)]
44pub struct ExecutionContext {
45 span_id: SpanId,
46}
47
48#[cfg(not(feature = "inspector"))]
49#[derive(Clone, Copy)]
50pub struct ExecutionContext;
51
52impl ExecutionContext {
53 #[cfg(feature = "inspector")]
55 pub fn new() -> Self {
56 Self {
57 span_id: query_flow_inspector::new_span_id(),
58 }
59 }
60
61 #[cfg(not(feature = "inspector"))]
63 #[inline(always)]
64 pub fn new() -> Self {
65 Self
66 }
67
68 #[cfg(feature = "inspector")]
70 pub fn span_id(&self) -> SpanId {
71 self.span_id
72 }
73}
74
75impl Default for ExecutionContext {
76 fn default() -> Self {
77 Self::new()
78 }
79}
80
81#[derive(Debug, Clone)]
102pub struct Polled<T> {
103 pub value: T,
105 pub revision: RevisionCounter,
109}
110
111impl<T: Deref> Deref for Polled<T> {
112 type Target = T::Target;
113
114 fn deref(&self) -> &Self::Target {
115 &self.value
116 }
117}
118
119pub struct QueryRuntime {
135 whale: WhaleRuntime<FullCacheKey, Option<CachedEntry>, DURABILITY_LEVELS>,
138 assets: Arc<AssetStorage>,
140 locators: Arc<LocatorStorage>,
142 pending: Arc<PendingStorage>,
144 query_registry: Arc<QueryRegistry>,
146 asset_key_registry: Arc<AssetKeyRegistry>,
148 verifiers: Arc<VerifierStorage>,
150 error_comparator: ErrorComparator,
152 #[cfg(feature = "inspector")]
154 sink: Arc<parking_lot::RwLock<Option<Arc<dyn EventSink>>>>,
155}
156
157impl Default for QueryRuntime {
158 fn default() -> Self {
159 Self::new()
160 }
161}
162
163impl Clone for QueryRuntime {
164 fn clone(&self) -> Self {
165 Self {
166 whale: self.whale.clone(),
167 assets: self.assets.clone(),
168 locators: self.locators.clone(),
169 pending: self.pending.clone(),
170 query_registry: self.query_registry.clone(),
171 asset_key_registry: self.asset_key_registry.clone(),
172 verifiers: self.verifiers.clone(),
173 error_comparator: self.error_comparator,
174 #[cfg(feature = "inspector")]
175 sink: self.sink.clone(),
176 }
177 }
178}
179
180fn default_error_comparator(_a: &anyhow::Error, _b: &anyhow::Error) -> bool {
184 false
185}
186
187impl QueryRuntime {
188 fn get_cached_with_revision<Q: Query>(
190 &self,
191 key: &FullCacheKey,
192 ) -> Option<(CachedValue<Arc<Q::Output>>, RevisionCounter)> {
193 let node = self.whale.get(key)?;
194 let revision = node.changed_at;
195 let entry = node.data.as_ref()?;
196 let cached = entry.to_cached_value::<Q::Output>()?;
197 Some((cached, revision))
198 }
199}
200
201impl QueryRuntime {
202 pub fn new() -> Self {
204 Self::builder().build()
205 }
206
207 pub fn builder() -> QueryRuntimeBuilder {
223 QueryRuntimeBuilder::new()
224 }
225
226 #[cfg(feature = "inspector")]
242 pub fn set_sink(&self, sink: Option<Arc<dyn EventSink>>) {
243 *self.sink.write() = sink;
244 }
245
246 #[cfg(feature = "inspector")]
248 pub fn sink(&self) -> Option<Arc<dyn EventSink>> {
249 self.sink.read().clone()
250 }
251
252 #[cfg(feature = "inspector")]
254 #[inline]
255 fn emit<F: FnOnce() -> FlowEvent>(&self, event: F) {
256 let guard = self.sink.read();
257 if let Some(ref sink) = *guard {
258 sink.emit(event());
259 }
260 }
261
262 pub fn query<Q: Query>(&self, query: Q) -> Result<Arc<Q::Output>, QueryError> {
271 self.query_internal(query)
272 .and_then(|(inner_result, _)| inner_result.map_err(QueryError::UserError))
273 }
274
275 #[allow(clippy::type_complexity)]
280 fn query_internal<Q: Query>(
281 &self,
282 query: Q,
283 ) -> Result<(Result<Arc<Q::Output>, Arc<anyhow::Error>>, RevisionCounter), QueryError> {
284 let key = query.cache_key();
285 let full_key = FullCacheKey::new::<Q, _>(&key);
286
287 let exec_ctx = ExecutionContext::new();
289 #[cfg(feature = "inspector")]
290 let start_time = std::time::Instant::now();
291 #[cfg(feature = "inspector")]
292 let query_key =
293 query_flow_inspector::QueryKey::new(std::any::type_name::<Q>(), full_key.debug_repr());
294
295 #[cfg(feature = "inspector")]
296 self.emit(|| FlowEvent::QueryStart {
297 span_id: exec_ctx.span_id(),
298 query: query_key.clone(),
299 });
300
301 let cycle_detected = QUERY_STACK.with(|stack| {
303 let stack = stack.borrow();
304 stack.iter().any(|k| k == &full_key)
305 });
306
307 if cycle_detected {
308 let path = QUERY_STACK.with(|stack| {
309 let stack = stack.borrow();
310 let mut path: Vec<String> =
311 stack.iter().map(|k| k.debug_repr().to_string()).collect();
312 path.push(full_key.debug_repr().to_string());
313 path
314 });
315
316 #[cfg(feature = "inspector")]
317 self.emit(|| FlowEvent::CycleDetected {
318 path: path
319 .iter()
320 .map(|s| query_flow_inspector::QueryKey::new("", s.clone()))
321 .collect(),
322 });
323
324 #[cfg(feature = "inspector")]
325 self.emit(|| FlowEvent::QueryEnd {
326 span_id: exec_ctx.span_id(),
327 query: query_key.clone(),
328 result: query_flow_inspector::ExecutionResult::CycleDetected,
329 duration: start_time.elapsed(),
330 });
331
332 return Err(QueryError::Cycle { path });
333 }
334
335 let current_rev = self.whale.current_revision();
337
338 if self.whale.is_verified_at(&full_key, ¤t_rev) {
340 if let Some((cached, revision)) = self.get_cached_with_revision::<Q>(&full_key) {
342 #[cfg(feature = "inspector")]
343 self.emit(|| FlowEvent::CacheCheck {
344 span_id: exec_ctx.span_id(),
345 query: query_key.clone(),
346 valid: true,
347 });
348
349 #[cfg(feature = "inspector")]
350 self.emit(|| FlowEvent::QueryEnd {
351 span_id: exec_ctx.span_id(),
352 query: query_key.clone(),
353 result: query_flow_inspector::ExecutionResult::CacheHit,
354 duration: start_time.elapsed(),
355 });
356
357 return match cached {
358 CachedValue::Ok(output) => Ok((Ok(output), revision)),
359 CachedValue::UserError(err) => Ok((Err(err), revision)),
360 };
361 }
362 }
363
364 if self.whale.is_valid(&full_key) {
366 if let Some((cached, revision)) = self.get_cached_with_revision::<Q>(&full_key) {
368 let mut deps_verified = true;
370 if let Some(deps) = self.whale.get_dependency_ids(&full_key) {
371 for dep in deps {
372 if let Some(verifier) = self.verifiers.get(&dep) {
373 if verifier.verify(self).is_err() {
375 deps_verified = false;
376 break;
377 }
378 }
379 }
380 }
381
382 if deps_verified && self.whale.is_valid(&full_key) {
384 self.whale.mark_verified(&full_key, ¤t_rev);
386
387 #[cfg(feature = "inspector")]
388 self.emit(|| FlowEvent::CacheCheck {
389 span_id: exec_ctx.span_id(),
390 query: query_key.clone(),
391 valid: true,
392 });
393
394 #[cfg(feature = "inspector")]
395 self.emit(|| FlowEvent::QueryEnd {
396 span_id: exec_ctx.span_id(),
397 query: query_key.clone(),
398 result: query_flow_inspector::ExecutionResult::CacheHit,
399 duration: start_time.elapsed(),
400 });
401
402 return match cached {
403 CachedValue::Ok(output) => Ok((Ok(output), revision)),
404 CachedValue::UserError(err) => Ok((Err(err), revision)),
405 };
406 }
407 }
409 }
410
411 #[cfg(feature = "inspector")]
412 self.emit(|| FlowEvent::CacheCheck {
413 span_id: exec_ctx.span_id(),
414 query: query_key.clone(),
415 valid: false,
416 });
417
418 QUERY_STACK.with(|stack| {
420 stack.borrow_mut().push(full_key.clone());
421 });
422
423 let result = self.execute_query::<Q>(&query, &full_key, exec_ctx);
424
425 QUERY_STACK.with(|stack| {
426 stack.borrow_mut().pop();
427 });
428
429 #[cfg(feature = "inspector")]
431 {
432 let exec_result = match &result {
433 Ok((_, true, _)) => query_flow_inspector::ExecutionResult::Changed,
434 Ok((_, false, _)) => query_flow_inspector::ExecutionResult::Unchanged,
435 Err(QueryError::Suspend) => query_flow_inspector::ExecutionResult::Suspended,
436 Err(QueryError::Cycle { .. }) => {
437 query_flow_inspector::ExecutionResult::CycleDetected
438 }
439 Err(e) => query_flow_inspector::ExecutionResult::Error {
440 message: format!("{:?}", e),
441 },
442 };
443 self.emit(|| FlowEvent::QueryEnd {
444 span_id: exec_ctx.span_id(),
445 query: query_key.clone(),
446 result: exec_result,
447 duration: start_time.elapsed(),
448 });
449 }
450
451 result.map(|(inner_result, _, revision)| (inner_result, revision))
452 }
453
454 #[allow(clippy::type_complexity)]
460 fn execute_query<Q: Query>(
461 &self,
462 query: &Q,
463 full_key: &FullCacheKey,
464 exec_ctx: ExecutionContext,
465 ) -> Result<
466 (
467 Result<Arc<Q::Output>, Arc<anyhow::Error>>,
468 bool,
469 RevisionCounter,
470 ),
471 QueryError,
472 > {
473 let mut ctx = QueryContext {
475 runtime: self,
476 current_key: full_key.clone(),
477 #[cfg(feature = "inspector")]
478 parent_query_type: std::any::type_name::<Q>(),
479 #[cfg(feature = "inspector")]
480 exec_ctx,
481 deps: RefCell::new(Vec::new()),
482 };
483 #[cfg(not(feature = "inspector"))]
485 let _ = exec_ctx;
486
487 let result = query.query(&mut ctx);
489
490 let deps: Vec<FullCacheKey> = ctx.deps.borrow().clone();
492
493 let durability =
495 Durability::new(query.durability() as usize).unwrap_or(Durability::volatile());
496
497 match result {
498 Ok(output) => {
499 let output = Arc::new(output);
500
501 let existing_revision = if let Some((CachedValue::Ok(old), rev)) =
504 self.get_cached_with_revision::<Q>(full_key)
505 {
506 if Q::output_eq(&old, &output) {
507 Some(rev) } else {
509 None }
511 } else {
512 None };
514 let output_changed = existing_revision.is_none();
515
516 #[cfg(feature = "inspector")]
518 self.emit(|| FlowEvent::EarlyCutoffCheck {
519 span_id: exec_ctx.span_id(),
520 query: query_flow_inspector::QueryKey::new(
521 std::any::type_name::<Q>(),
522 full_key.debug_repr(),
523 ),
524 output_changed,
525 });
526
527 let entry = CachedEntry::Ok(output.clone() as Arc<dyn std::any::Any + Send + Sync>);
529 let revision = if let Some(existing_rev) = existing_revision {
530 let _ = self.whale.confirm_unchanged(full_key, deps);
532 existing_rev
533 } else {
534 match self
536 .whale
537 .register(full_key.clone(), Some(entry), durability, deps)
538 {
539 Ok(result) => result.new_rev,
540 Err(missing) => {
541 return Err(QueryError::DependenciesRemoved {
542 missing_keys: missing,
543 })
544 }
545 }
546 };
547
548 let is_new_query = self.query_registry.register(query);
550 if is_new_query {
551 let sentinel = FullCacheKey::query_set_sentinel::<Q>();
552 let _ = self
553 .whale
554 .register(sentinel, None, Durability::volatile(), vec![]);
555 }
556
557 self.verifiers.insert(full_key.clone(), query.clone());
559
560 Ok((Ok(output), output_changed, revision))
561 }
562 Err(QueryError::UserError(err)) => {
563 let existing_revision = if let Some((CachedValue::UserError(old_err), rev)) =
566 self.get_cached_with_revision::<Q>(full_key)
567 {
568 if (self.error_comparator)(old_err.as_ref(), err.as_ref()) {
569 Some(rev) } else {
571 None }
573 } else {
574 None };
576 let output_changed = existing_revision.is_none();
577
578 #[cfg(feature = "inspector")]
580 self.emit(|| FlowEvent::EarlyCutoffCheck {
581 span_id: exec_ctx.span_id(),
582 query: query_flow_inspector::QueryKey::new(
583 std::any::type_name::<Q>(),
584 full_key.debug_repr(),
585 ),
586 output_changed,
587 });
588
589 let entry = CachedEntry::UserError(err.clone());
591 let revision = if let Some(existing_rev) = existing_revision {
592 let _ = self.whale.confirm_unchanged(full_key, deps);
594 existing_rev
595 } else {
596 match self
598 .whale
599 .register(full_key.clone(), Some(entry), durability, deps)
600 {
601 Ok(result) => result.new_rev,
602 Err(missing) => {
603 return Err(QueryError::DependenciesRemoved {
604 missing_keys: missing,
605 })
606 }
607 }
608 };
609
610 let is_new_query = self.query_registry.register(query);
612 if is_new_query {
613 let sentinel = FullCacheKey::query_set_sentinel::<Q>();
614 let _ = self
615 .whale
616 .register(sentinel, None, Durability::volatile(), vec![]);
617 }
618
619 self.verifiers.insert(full_key.clone(), query.clone());
621
622 Ok((Err(err), output_changed, revision))
623 }
624 Err(e) => {
625 Err(e)
627 }
628 }
629 }
630
631 pub fn invalidate<Q: Query>(&self, key: &Q::CacheKey) {
635 let full_key = FullCacheKey::new::<Q, _>(key);
636
637 #[cfg(feature = "inspector")]
638 self.emit(|| FlowEvent::QueryInvalidated {
639 query: query_flow_inspector::QueryKey::new(
640 std::any::type_name::<Q>(),
641 full_key.debug_repr(),
642 ),
643 reason: query_flow_inspector::InvalidationReason::ManualInvalidation,
644 });
645
646 let _ = self
648 .whale
649 .register(full_key, None, Durability::volatile(), vec![]);
650 }
651
652 pub fn remove_query<Q: Query>(&self, key: &Q::CacheKey) {
660 let full_key = FullCacheKey::new::<Q, _>(key);
661
662 #[cfg(feature = "inspector")]
663 self.emit(|| FlowEvent::QueryInvalidated {
664 query: query_flow_inspector::QueryKey::new(
665 std::any::type_name::<Q>(),
666 full_key.debug_repr(),
667 ),
668 reason: query_flow_inspector::InvalidationReason::ManualInvalidation,
669 });
670
671 self.verifiers.remove(&full_key);
673
674 self.whale.remove(&full_key);
676
677 if self.query_registry.remove::<Q>(key) {
679 let sentinel = FullCacheKey::query_set_sentinel::<Q>();
680 let _ = self
681 .whale
682 .register(sentinel, None, Durability::volatile(), vec![]);
683 }
684 }
685
686 pub fn clear_cache(&self) {
690 let keys = self.whale.keys();
691 for key in keys {
692 self.whale.remove(&key);
693 }
694 }
695
696 #[allow(clippy::type_complexity)]
730 pub fn poll<Q: Query>(
731 &self,
732 query: Q,
733 ) -> Result<Polled<Result<Arc<Q::Output>, Arc<anyhow::Error>>>, QueryError> {
734 let (value, revision) = self.query_internal(query)?;
735 Ok(Polled { value, revision })
736 }
737
738 pub fn changed_at<Q: Query>(&self, key: &Q::CacheKey) -> Option<RevisionCounter> {
757 let full_key = FullCacheKey::new::<Q, _>(key);
758 self.whale.get(&full_key).map(|node| node.changed_at)
759 }
760}
761
762pub struct QueryRuntimeBuilder {
780 error_comparator: ErrorComparator,
781}
782
783impl Default for QueryRuntimeBuilder {
784 fn default() -> Self {
785 Self::new()
786 }
787}
788
789impl QueryRuntimeBuilder {
790 pub fn new() -> Self {
792 Self {
793 error_comparator: default_error_comparator,
794 }
795 }
796
797 pub fn error_comparator(mut self, f: ErrorComparator) -> Self {
815 self.error_comparator = f;
816 self
817 }
818
819 pub fn build(self) -> QueryRuntime {
821 QueryRuntime {
822 whale: WhaleRuntime::new(),
823 assets: Arc::new(AssetStorage::new()),
824 locators: Arc::new(LocatorStorage::new()),
825 pending: Arc::new(PendingStorage::new()),
826 query_registry: Arc::new(QueryRegistry::new()),
827 asset_key_registry: Arc::new(AssetKeyRegistry::new()),
828 verifiers: Arc::new(VerifierStorage::new()),
829 error_comparator: self.error_comparator,
830 #[cfg(feature = "inspector")]
831 sink: Arc::new(parking_lot::RwLock::new(None)),
832 }
833 }
834}
835
836impl QueryRuntime {
841 pub fn register_asset_locator<K, L>(&self, locator: L)
853 where
854 K: AssetKey,
855 L: AssetLocator<K>,
856 {
857 self.locators.insert::<K, L>(locator);
858 }
859
860 pub fn pending_assets(&self) -> Vec<PendingAsset> {
876 self.pending.get_all()
877 }
878
879 pub fn pending_assets_of<K: AssetKey>(&self) -> Vec<K> {
881 self.pending.get_of_type::<K>()
882 }
883
884 pub fn has_pending_assets(&self) -> bool {
886 !self.pending.is_empty()
887 }
888
889 pub fn resolve_asset<K: AssetKey>(&self, key: K, value: K::Asset) {
906 let durability = key.durability();
907 self.resolve_asset_internal(key, value, durability);
908 }
909
910 pub fn resolve_asset_with_durability<K: AssetKey>(
914 &self,
915 key: K,
916 value: K::Asset,
917 durability: DurabilityLevel,
918 ) {
919 self.resolve_asset_internal(key, value, durability);
920 }
921
922 fn resolve_asset_internal<K: AssetKey>(
923 &self,
924 key: K,
925 value: K::Asset,
926 durability_level: DurabilityLevel,
927 ) {
928 let full_asset_key = FullAssetKey::new(&key);
929 let full_cache_key = FullCacheKey::from_asset_key(&full_asset_key);
930
931 let changed = if let Some(old_value) = self.assets.get_ready::<K>(&full_asset_key) {
933 !K::asset_eq(&old_value, &value)
934 } else {
935 true };
937
938 #[cfg(feature = "inspector")]
940 self.emit(|| FlowEvent::AssetResolved {
941 asset: query_flow_inspector::AssetKey::new(
942 std::any::type_name::<K>(),
943 format!("{:?}", key),
944 ),
945 changed,
946 });
947
948 self.assets
950 .insert_ready::<K>(full_asset_key.clone(), Arc::new(value));
951
952 self.pending.remove(&full_asset_key);
954
955 let durability =
957 Durability::new(durability_level.as_u8() as usize).unwrap_or(Durability::volatile());
958
959 if changed {
960 let _ = self
962 .whale
963 .register(full_cache_key, None, durability, vec![]);
964 } else {
965 let _ = self.whale.confirm_unchanged(&full_cache_key, vec![]);
967 }
968
969 let is_new_asset = self.asset_key_registry.register(&key);
971 if is_new_asset {
972 let sentinel = FullCacheKey::asset_key_set_sentinel::<K>();
974 let _ = self
975 .whale
976 .register(sentinel, None, Durability::volatile(), vec![]);
977 }
978 }
979
980 pub fn invalidate_asset<K: AssetKey>(&self, key: &K) {
994 let full_asset_key = FullAssetKey::new(key);
995 let full_cache_key = FullCacheKey::from_asset_key(&full_asset_key);
996
997 #[cfg(feature = "inspector")]
999 self.emit(|| FlowEvent::AssetInvalidated {
1000 asset: query_flow_inspector::AssetKey::new(
1001 std::any::type_name::<K>(),
1002 format!("{:?}", key),
1003 ),
1004 });
1005
1006 self.assets
1008 .insert(full_asset_key.clone(), AssetState::Loading);
1009
1010 self.pending.insert::<K>(full_asset_key, key.clone());
1012
1013 let _ = self
1015 .whale
1016 .register(full_cache_key, None, Durability::volatile(), vec![]);
1017 }
1018
1019 pub fn remove_asset<K: AssetKey>(&self, key: &K) {
1024 let full_asset_key = FullAssetKey::new(key);
1025 let full_cache_key = FullCacheKey::from_asset_key(&full_asset_key);
1026
1027 let _ = self
1030 .whale
1031 .register(full_cache_key.clone(), None, Durability::volatile(), vec![]);
1032
1033 self.assets.remove(&full_asset_key);
1035 self.pending.remove(&full_asset_key);
1036
1037 self.whale.remove(&full_cache_key);
1039
1040 if self.asset_key_registry.remove::<K>(key) {
1042 let sentinel = FullCacheKey::asset_key_set_sentinel::<K>();
1043 let _ = self
1044 .whale
1045 .register(sentinel, None, Durability::volatile(), vec![]);
1046 }
1047 }
1048
1049 pub fn get_asset<K: AssetKey>(
1061 &self,
1062 key: &K,
1063 ) -> Result<LoadingState<Arc<K::Asset>>, QueryError> {
1064 self.get_asset_internal(key)
1065 }
1066
1067 fn get_asset_internal<K: AssetKey>(
1069 &self,
1070 key: &K,
1071 ) -> Result<LoadingState<Arc<K::Asset>>, QueryError> {
1072 let full_asset_key = FullAssetKey::new(key);
1073 let full_cache_key = FullCacheKey::from_asset_key(&full_asset_key);
1074
1075 #[cfg(feature = "inspector")]
1077 let emit_requested = |runtime: &Self, state: query_flow_inspector::AssetState| {
1078 runtime.emit(|| FlowEvent::AssetRequested {
1079 asset: query_flow_inspector::AssetKey::new(
1080 std::any::type_name::<K>(),
1081 format!("{:?}", key),
1082 ),
1083 state,
1084 });
1085 };
1086
1087 if let Some(state) = self.assets.get(&full_asset_key) {
1089 if self.whale.is_valid(&full_cache_key) {
1091 return match state {
1092 AssetState::Loading => {
1093 #[cfg(feature = "inspector")]
1094 emit_requested(self, query_flow_inspector::AssetState::Loading);
1095 Ok(LoadingState::Loading)
1096 }
1097 AssetState::Ready(arc) => {
1098 #[cfg(feature = "inspector")]
1099 emit_requested(self, query_flow_inspector::AssetState::Ready);
1100 match arc.downcast::<K::Asset>() {
1101 Ok(value) => Ok(LoadingState::Ready(value)),
1102 Err(_) => Err(QueryError::MissingDependency {
1103 description: format!("Asset type mismatch: {:?}", key),
1104 }),
1105 }
1106 }
1107 AssetState::NotFound => {
1108 #[cfg(feature = "inspector")]
1109 emit_requested(self, query_flow_inspector::AssetState::NotFound);
1110 Err(QueryError::MissingDependency {
1111 description: format!("Asset not found: {:?}", key),
1112 })
1113 }
1114 };
1115 }
1116 }
1117
1118 if let Some(locator) = self.locators.get(TypeId::of::<K>()) {
1120 if let Some(state) = locator.locate_any(key) {
1121 self.assets.insert(full_asset_key.clone(), state.clone());
1123
1124 match state {
1125 AssetState::Ready(arc) => {
1126 #[cfg(feature = "inspector")]
1127 emit_requested(self, query_flow_inspector::AssetState::Ready);
1128
1129 let durability = Durability::new(key.durability().as_u8() as usize)
1131 .unwrap_or(Durability::volatile());
1132 self.whale
1133 .register(full_cache_key, None, durability, vec![])
1134 .expect("register with no dependencies cannot fail");
1135
1136 match arc.downcast::<K::Asset>() {
1137 Ok(value) => return Ok(LoadingState::Ready(value)),
1138 Err(_) => {
1139 return Err(QueryError::MissingDependency {
1140 description: format!("Asset type mismatch: {:?}", key),
1141 })
1142 }
1143 }
1144 }
1145 AssetState::Loading => {
1146 #[cfg(feature = "inspector")]
1147 emit_requested(self, query_flow_inspector::AssetState::Loading);
1148 self.pending.insert::<K>(full_asset_key, key.clone());
1149
1150 self.whale
1152 .register(full_cache_key, None, Durability::volatile(), vec![])
1153 .expect("register with no dependencies cannot fail");
1154
1155 return Ok(LoadingState::Loading);
1156 }
1157 AssetState::NotFound => {
1158 #[cfg(feature = "inspector")]
1159 emit_requested(self, query_flow_inspector::AssetState::NotFound);
1160 return Err(QueryError::MissingDependency {
1161 description: format!("Asset not found: {:?}", key),
1162 });
1163 }
1164 }
1165 }
1166 }
1167
1168 #[cfg(feature = "inspector")]
1170 emit_requested(self, query_flow_inspector::AssetState::Loading);
1171 self.assets
1172 .insert(full_asset_key.clone(), AssetState::Loading);
1173 self.pending
1174 .insert::<K>(full_asset_key.clone(), key.clone());
1175
1176 self.whale
1178 .register(full_cache_key, None, Durability::volatile(), vec![])
1179 .expect("register with no dependencies cannot fail");
1180
1181 Ok(LoadingState::Loading)
1182 }
1183}
1184
1185pub struct QueryContext<'a> {
1189 runtime: &'a QueryRuntime,
1190 #[cfg_attr(not(feature = "inspector"), allow(dead_code))]
1191 current_key: FullCacheKey,
1192 #[cfg(feature = "inspector")]
1193 parent_query_type: &'static str,
1194 #[cfg(feature = "inspector")]
1195 exec_ctx: ExecutionContext,
1196 deps: RefCell<Vec<FullCacheKey>>,
1197}
1198
1199impl<'a> QueryContext<'a> {
1200 pub fn query<Q: Query>(&mut self, query: Q) -> Result<Arc<Q::Output>, QueryError> {
1213 let key = query.cache_key();
1214 let full_key = FullCacheKey::new::<Q, _>(&key);
1215
1216 #[cfg(feature = "inspector")]
1218 self.runtime.emit(|| FlowEvent::DependencyRegistered {
1219 span_id: self.exec_ctx.span_id(),
1220 parent: query_flow_inspector::QueryKey::new(
1221 self.parent_query_type,
1222 self.current_key.debug_repr(),
1223 ),
1224 dependency: query_flow_inspector::QueryKey::new(
1225 std::any::type_name::<Q>(),
1226 full_key.debug_repr(),
1227 ),
1228 });
1229
1230 self.deps.borrow_mut().push(full_key.clone());
1232
1233 self.runtime.query(query)
1235 }
1236
1237 pub fn asset<K: AssetKey>(
1258 &mut self,
1259 key: &K,
1260 ) -> Result<LoadingState<Arc<K::Asset>>, QueryError> {
1261 let full_asset_key = FullAssetKey::new(key);
1262 let full_cache_key = FullCacheKey::from_asset_key(&full_asset_key);
1263
1264 #[cfg(feature = "inspector")]
1266 self.runtime.emit(|| FlowEvent::AssetDependencyRegistered {
1267 span_id: self.exec_ctx.span_id(),
1268 parent: query_flow_inspector::QueryKey::new(
1269 self.parent_query_type,
1270 self.current_key.debug_repr(),
1271 ),
1272 asset: query_flow_inspector::AssetKey::new(
1273 std::any::type_name::<K>(),
1274 format!("{:?}", key),
1275 ),
1276 });
1277
1278 self.deps.borrow_mut().push(full_cache_key);
1280
1281 let result = self.runtime.get_asset_internal(key);
1283
1284 #[cfg(feature = "inspector")]
1286 if let Err(QueryError::MissingDependency { ref description }) = result {
1287 self.runtime.emit(|| FlowEvent::MissingDependency {
1288 query: query_flow_inspector::QueryKey::new(
1289 self.parent_query_type,
1290 self.current_key.debug_repr(),
1291 ),
1292 dependency_description: description.clone(),
1293 });
1294 }
1295
1296 result
1297 }
1298
1299 pub fn list_queries<Q: Query>(&mut self) -> Vec<Q> {
1322 let sentinel = FullCacheKey::query_set_sentinel::<Q>();
1324
1325 #[cfg(feature = "inspector")]
1326 self.runtime.emit(|| FlowEvent::DependencyRegistered {
1327 span_id: self.exec_ctx.span_id(),
1328 parent: query_flow_inspector::QueryKey::new(
1329 self.parent_query_type,
1330 self.current_key.debug_repr(),
1331 ),
1332 dependency: query_flow_inspector::QueryKey::new("QuerySet", sentinel.debug_repr()),
1333 });
1334
1335 if self.runtime.whale.get(&sentinel).is_none() {
1337 let _ =
1338 self.runtime
1339 .whale
1340 .register(sentinel.clone(), None, Durability::volatile(), vec![]);
1341 }
1342
1343 self.deps.borrow_mut().push(sentinel);
1344
1345 self.runtime.query_registry.get_all::<Q>()
1347 }
1348
1349 pub fn list_asset_keys<K: AssetKey>(&mut self) -> Vec<K> {
1374 let sentinel = FullCacheKey::asset_key_set_sentinel::<K>();
1376
1377 #[cfg(feature = "inspector")]
1378 self.runtime.emit(|| FlowEvent::AssetDependencyRegistered {
1379 span_id: self.exec_ctx.span_id(),
1380 parent: query_flow_inspector::QueryKey::new(
1381 self.parent_query_type,
1382 self.current_key.debug_repr(),
1383 ),
1384 asset: query_flow_inspector::AssetKey::new("AssetKeySet", sentinel.debug_repr()),
1385 });
1386
1387 if self.runtime.whale.get(&sentinel).is_none() {
1389 let _ =
1390 self.runtime
1391 .whale
1392 .register(sentinel.clone(), None, Durability::volatile(), vec![]);
1393 }
1394
1395 self.deps.borrow_mut().push(sentinel);
1396
1397 self.runtime.asset_key_registry.get_all::<K>()
1399 }
1400}
1401
1402#[cfg(test)]
1403mod tests {
1404 use super::*;
1405
1406 #[test]
1407 fn test_simple_query() {
1408 #[derive(Clone)]
1409 struct Add {
1410 a: i32,
1411 b: i32,
1412 }
1413
1414 impl Query for Add {
1415 type CacheKey = (i32, i32);
1416 type Output = i32;
1417
1418 fn cache_key(&self) -> Self::CacheKey {
1419 (self.a, self.b)
1420 }
1421
1422 fn query(&self, _ctx: &mut QueryContext) -> Result<Self::Output, QueryError> {
1423 Ok(self.a + self.b)
1424 }
1425
1426 fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
1427 old == new
1428 }
1429 }
1430
1431 let runtime = QueryRuntime::new();
1432
1433 let result = runtime.query(Add { a: 1, b: 2 }).unwrap();
1434 assert_eq!(*result, 3);
1435
1436 let result2 = runtime.query(Add { a: 1, b: 2 }).unwrap();
1438 assert_eq!(*result2, 3);
1439 }
1440
1441 #[test]
1442 fn test_dependent_queries() {
1443 #[derive(Clone)]
1444 struct Base {
1445 value: i32,
1446 }
1447
1448 impl Query for Base {
1449 type CacheKey = i32;
1450 type Output = i32;
1451
1452 fn cache_key(&self) -> Self::CacheKey {
1453 self.value
1454 }
1455
1456 fn query(&self, _ctx: &mut QueryContext) -> Result<Self::Output, QueryError> {
1457 Ok(self.value * 2)
1458 }
1459
1460 fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
1461 old == new
1462 }
1463 }
1464
1465 #[derive(Clone)]
1466 struct Derived {
1467 base_value: i32,
1468 }
1469
1470 impl Query for Derived {
1471 type CacheKey = i32;
1472 type Output = i32;
1473
1474 fn cache_key(&self) -> Self::CacheKey {
1475 self.base_value
1476 }
1477
1478 fn query(&self, ctx: &mut QueryContext) -> Result<Self::Output, QueryError> {
1479 let base = ctx.query(Base {
1480 value: self.base_value,
1481 })?;
1482 Ok(*base + 10)
1483 }
1484
1485 fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
1486 old == new
1487 }
1488 }
1489
1490 let runtime = QueryRuntime::new();
1491
1492 let result = runtime.query(Derived { base_value: 5 }).unwrap();
1493 assert_eq!(*result, 20); }
1495
1496 #[test]
1497 fn test_cycle_detection() {
1498 #[derive(Clone)]
1499 struct CycleA {
1500 id: i32,
1501 }
1502
1503 #[derive(Clone)]
1504 struct CycleB {
1505 id: i32,
1506 }
1507
1508 impl Query for CycleA {
1509 type CacheKey = i32;
1510 type Output = i32;
1511
1512 fn cache_key(&self) -> Self::CacheKey {
1513 self.id
1514 }
1515
1516 fn query(&self, ctx: &mut QueryContext) -> Result<Self::Output, QueryError> {
1517 let b = ctx.query(CycleB { id: self.id })?;
1518 Ok(*b + 1)
1519 }
1520
1521 fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
1522 old == new
1523 }
1524 }
1525
1526 impl Query for CycleB {
1527 type CacheKey = i32;
1528 type Output = i32;
1529
1530 fn cache_key(&self) -> Self::CacheKey {
1531 self.id
1532 }
1533
1534 fn query(&self, ctx: &mut QueryContext) -> Result<Self::Output, QueryError> {
1535 let a = ctx.query(CycleA { id: self.id })?;
1536 Ok(*a + 1)
1537 }
1538
1539 fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
1540 old == new
1541 }
1542 }
1543
1544 let runtime = QueryRuntime::new();
1545
1546 let result = runtime.query(CycleA { id: 1 });
1547 assert!(matches!(result, Err(QueryError::Cycle { .. })));
1548 }
1549
1550 #[test]
1551 fn test_fallible_query() {
1552 #[derive(Clone)]
1553 struct ParseInt {
1554 input: String,
1555 }
1556
1557 impl Query for ParseInt {
1558 type CacheKey = String;
1559 type Output = Result<i32, std::num::ParseIntError>;
1560
1561 fn cache_key(&self) -> Self::CacheKey {
1562 self.input.clone()
1563 }
1564
1565 fn query(&self, _ctx: &mut QueryContext) -> Result<Self::Output, QueryError> {
1566 Ok(self.input.parse())
1567 }
1568
1569 fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
1570 old == new
1571 }
1572 }
1573
1574 let runtime = QueryRuntime::new();
1575
1576 let result = runtime
1578 .query(ParseInt {
1579 input: "42".to_string(),
1580 })
1581 .unwrap();
1582 assert_eq!(*result, Ok(42));
1583
1584 let result = runtime
1586 .query(ParseInt {
1587 input: "not_a_number".to_string(),
1588 })
1589 .unwrap();
1590 assert!(result.is_err());
1591 }
1592
1593 mod macro_tests {
1595 use super::*;
1596 use crate::query;
1597
1598 #[query]
1599 fn add(ctx: &mut QueryContext, a: i32, b: i32) -> Result<i32, QueryError> {
1600 let _ = ctx; Ok(a + b)
1602 }
1603
1604 #[test]
1605 fn test_macro_basic() {
1606 let runtime = QueryRuntime::new();
1607 let result = runtime.query(Add::new(1, 2)).unwrap();
1608 assert_eq!(*result, 3);
1609 }
1610
1611 #[query(durability = 2)]
1612 fn with_durability(ctx: &mut QueryContext, x: i32) -> Result<i32, QueryError> {
1613 let _ = ctx;
1614 Ok(x * 2)
1615 }
1616
1617 #[test]
1618 fn test_macro_durability() {
1619 let runtime = QueryRuntime::new();
1620 let result = runtime.query(WithDurability::new(5)).unwrap();
1621 assert_eq!(*result, 10);
1622 }
1623
1624 #[query(keys(id))]
1625 fn with_key_selection(
1626 ctx: &mut QueryContext,
1627 id: u32,
1628 include_extra: bool,
1629 ) -> Result<String, QueryError> {
1630 let _ = ctx;
1631 Ok(format!("id={}, extra={}", id, include_extra))
1632 }
1633
1634 #[test]
1635 fn test_macro_key_selection() {
1636 let runtime = QueryRuntime::new();
1637
1638 let r1 = runtime.query(WithKeySelection::new(1, true)).unwrap();
1640 let r2 = runtime.query(WithKeySelection::new(1, false)).unwrap();
1641
1642 assert_eq!(*r1, "id=1, extra=true");
1644 assert_eq!(*r2, "id=1, extra=true"); }
1646
1647 #[query]
1648 fn dependent(ctx: &mut QueryContext, a: i32, b: i32) -> Result<i32, QueryError> {
1649 let sum = ctx.query(Add::new(*a, *b))?;
1650 Ok(*sum * 2)
1651 }
1652
1653 #[test]
1654 fn test_macro_dependencies() {
1655 let runtime = QueryRuntime::new();
1656 let result = runtime.query(Dependent::new(3, 4)).unwrap();
1657 assert_eq!(*result, 14); }
1659
1660 #[query(output_eq)]
1661 fn with_output_eq(ctx: &mut QueryContext, x: i32) -> Result<i32, QueryError> {
1662 let _ = ctx;
1663 Ok(*x * 2)
1664 }
1665
1666 #[test]
1667 fn test_macro_output_eq() {
1668 let runtime = QueryRuntime::new();
1669 let result = runtime.query(WithOutputEq::new(5)).unwrap();
1670 assert_eq!(*result, 10);
1671 }
1672
1673 #[query(name = "CustomName")]
1674 fn original_name(ctx: &mut QueryContext, x: i32) -> Result<i32, QueryError> {
1675 let _ = ctx;
1676 Ok(*x)
1677 }
1678
1679 #[test]
1680 fn test_macro_custom_name() {
1681 let runtime = QueryRuntime::new();
1682 let result = runtime.query(CustomName::new(42)).unwrap();
1683 assert_eq!(*result, 42);
1684 }
1685
1686 #[allow(unused_variables)]
1690 #[inline]
1691 #[query]
1692 fn with_attributes(ctx: &mut QueryContext, x: i32) -> Result<i32, QueryError> {
1693 let unused_var = 42;
1695 Ok(*x * 2)
1696 }
1697
1698 #[test]
1699 fn test_macro_preserves_attributes() {
1700 let runtime = QueryRuntime::new();
1701 let result = runtime.query(WithAttributes::new(5)).unwrap();
1703 assert_eq!(*result, 10);
1704 }
1705 }
1706
1707 mod poll_tests {
1709 use super::*;
1710
1711 #[derive(Clone)]
1712 struct Counter {
1713 id: i32,
1714 }
1715
1716 impl Query for Counter {
1717 type CacheKey = i32;
1718 type Output = i32;
1719
1720 fn cache_key(&self) -> Self::CacheKey {
1721 self.id
1722 }
1723
1724 fn query(&self, _ctx: &mut QueryContext) -> Result<Self::Output, QueryError> {
1725 Ok(self.id * 10)
1726 }
1727
1728 fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
1729 old == new
1730 }
1731 }
1732
1733 #[test]
1734 fn test_poll_returns_value_and_revision() {
1735 let runtime = QueryRuntime::new();
1736
1737 let result = runtime.poll(Counter { id: 1 }).unwrap();
1738
1739 assert_eq!(**result.value.as_ref().unwrap(), 10);
1741
1742 assert!(result.revision > 0);
1744 }
1745
1746 #[test]
1747 fn test_poll_revision_stable_on_cache_hit() {
1748 let runtime = QueryRuntime::new();
1749
1750 let result1 = runtime.poll(Counter { id: 1 }).unwrap();
1752 let rev1 = result1.revision;
1753
1754 let result2 = runtime.poll(Counter { id: 1 }).unwrap();
1756 let rev2 = result2.revision;
1757
1758 assert_eq!(rev1, rev2);
1760 }
1761
1762 #[test]
1763 fn test_poll_revision_changes_on_invalidate() {
1764 let runtime = QueryRuntime::new();
1765
1766 let result1 = runtime.poll(Counter { id: 1 }).unwrap();
1768 let rev1 = result1.revision;
1769
1770 runtime.invalidate::<Counter>(&1);
1772 let result2 = runtime.poll(Counter { id: 1 }).unwrap();
1773 let rev2 = result2.revision;
1774
1775 assert_eq!(**result2.value.as_ref().unwrap(), 10);
1779
1780 assert!(rev2 >= rev1);
1783 }
1784
1785 #[test]
1786 fn test_changed_at_returns_none_for_unexecuted_query() {
1787 let runtime = QueryRuntime::new();
1788
1789 let rev = runtime.changed_at::<Counter>(&1);
1791 assert!(rev.is_none());
1792 }
1793
1794 #[test]
1795 fn test_changed_at_returns_revision_after_execution() {
1796 let runtime = QueryRuntime::new();
1797
1798 let _ = runtime.query(Counter { id: 1 }).unwrap();
1800
1801 let rev = runtime.changed_at::<Counter>(&1);
1803 assert!(rev.is_some());
1804 assert!(rev.unwrap() > 0);
1805 }
1806
1807 #[test]
1808 fn test_changed_at_matches_poll_revision() {
1809 let runtime = QueryRuntime::new();
1810
1811 let result = runtime.poll(Counter { id: 1 }).unwrap();
1813
1814 let rev = runtime.changed_at::<Counter>(&1);
1816 assert_eq!(rev, Some(result.revision));
1817 }
1818
1819 #[test]
1820 fn test_poll_value_access() {
1821 let runtime = QueryRuntime::new();
1822
1823 let result = runtime.poll(Counter { id: 5 }).unwrap();
1824
1825 let value: &i32 = result.value.as_ref().unwrap();
1827 assert_eq!(*value, 50);
1828
1829 let arc: &Arc<i32> = result.value.as_ref().unwrap();
1831 assert_eq!(**arc, 50);
1832 }
1833
1834 #[test]
1835 fn test_subscription_pattern() {
1836 let runtime = QueryRuntime::new();
1837
1838 let mut last_revision: RevisionCounter = 0;
1840 let mut notifications = 0;
1841
1842 let result = runtime.poll(Counter { id: 1 }).unwrap();
1844 if result.revision > last_revision {
1845 notifications += 1;
1846 last_revision = result.revision;
1847 }
1848
1849 let result = runtime.poll(Counter { id: 1 }).unwrap();
1851 if result.revision > last_revision {
1852 notifications += 1;
1853 last_revision = result.revision;
1854 }
1855
1856 let result = runtime.poll(Counter { id: 1 }).unwrap();
1858 if result.revision > last_revision {
1859 notifications += 1;
1860 #[allow(unused_assignments)]
1861 {
1862 last_revision = result.revision;
1863 }
1864 }
1865
1866 assert_eq!(notifications, 1);
1868 }
1869 }
1870}