1use std::any::TypeId;
4use std::cell::RefCell;
5use std::sync::Arc;
6
7use whale::{Durability, Runtime as WhaleRuntime};
8
9use crate::asset::{AssetKey, AssetLocator, DurabilityLevel, FullAssetKey, PendingAsset};
10use crate::key::FullCacheKey;
11use crate::loading::LoadingState;
12use crate::query::Query;
13use crate::storage::{
14 AssetKeyRegistry, AssetState, AssetStorage, CacheStorage, CachedValue, LocatorStorage,
15 PendingStorage, QueryRegistry, VerifierStorage,
16};
17use crate::QueryError;
18
19pub type ErrorComparator = fn(&anyhow::Error, &anyhow::Error) -> bool;
24
25#[cfg(feature = "inspector")]
26use query_flow_inspector::{EventSink, FlowEvent, SpanId};
27
28const DURABILITY_LEVELS: usize = 4;
30
31thread_local! {
33 static QUERY_STACK: RefCell<Vec<FullCacheKey>> = const { RefCell::new(Vec::new()) };
34}
35
36pub struct QueryRuntime {
52 whale: WhaleRuntime<FullCacheKey, (), DURABILITY_LEVELS>,
54 cache: Arc<CacheStorage>,
56 assets: Arc<AssetStorage>,
58 locators: Arc<LocatorStorage>,
60 pending: Arc<PendingStorage>,
62 query_registry: Arc<QueryRegistry>,
64 asset_key_registry: Arc<AssetKeyRegistry>,
66 verifiers: Arc<VerifierStorage>,
68 error_comparator: ErrorComparator,
70 #[cfg(feature = "inspector")]
72 sink: Arc<parking_lot::RwLock<Option<Arc<dyn EventSink>>>>,
73}
74
75impl Default for QueryRuntime {
76 fn default() -> Self {
77 Self::new()
78 }
79}
80
81impl Clone for QueryRuntime {
82 fn clone(&self) -> Self {
83 Self {
84 whale: self.whale.clone(),
85 cache: self.cache.clone(),
86 assets: self.assets.clone(),
87 locators: self.locators.clone(),
88 pending: self.pending.clone(),
89 query_registry: self.query_registry.clone(),
90 asset_key_registry: self.asset_key_registry.clone(),
91 verifiers: self.verifiers.clone(),
92 error_comparator: self.error_comparator,
93 #[cfg(feature = "inspector")]
94 sink: self.sink.clone(),
95 }
96 }
97}
98
99fn default_error_comparator(_a: &anyhow::Error, _b: &anyhow::Error) -> bool {
103 false
104}
105
106impl QueryRuntime {
107 pub fn new() -> Self {
109 Self::builder().build()
110 }
111
112 pub fn builder() -> QueryRuntimeBuilder {
128 QueryRuntimeBuilder::new()
129 }
130
131 #[cfg(feature = "inspector")]
147 pub fn set_sink(&self, sink: Option<Arc<dyn EventSink>>) {
148 *self.sink.write() = sink;
149 }
150
151 #[cfg(feature = "inspector")]
153 pub fn sink(&self) -> Option<Arc<dyn EventSink>> {
154 self.sink.read().clone()
155 }
156
157 #[cfg(feature = "inspector")]
159 #[inline]
160 fn emit<F: FnOnce() -> FlowEvent>(&self, event: F) {
161 let guard = self.sink.read();
162 if let Some(ref sink) = *guard {
163 sink.emit(event());
164 }
165 }
166
167 pub fn query<Q: Query>(&self, query: Q) -> Result<Arc<Q::Output>, QueryError> {
176 let key = query.cache_key();
177 let full_key = FullCacheKey::new::<Q, _>(&key);
178
179 #[cfg(feature = "inspector")]
181 let span_id = query_flow_inspector::new_span_id();
182 #[cfg(feature = "inspector")]
183 let start_time = std::time::Instant::now();
184 #[cfg(feature = "inspector")]
185 let query_key =
186 query_flow_inspector::QueryKey::new(std::any::type_name::<Q>(), full_key.debug_repr());
187
188 #[cfg(feature = "inspector")]
189 self.emit(|| FlowEvent::QueryStart {
190 span_id,
191 query: query_key.clone(),
192 });
193
194 let cycle_detected = QUERY_STACK.with(|stack| {
196 let stack = stack.borrow();
197 stack.iter().any(|k| k == &full_key)
198 });
199
200 if cycle_detected {
201 let path = QUERY_STACK.with(|stack| {
202 let stack = stack.borrow();
203 let mut path: Vec<String> =
204 stack.iter().map(|k| k.debug_repr().to_string()).collect();
205 path.push(full_key.debug_repr().to_string());
206 path
207 });
208
209 #[cfg(feature = "inspector")]
210 self.emit(|| FlowEvent::CycleDetected {
211 path: path
212 .iter()
213 .map(|s| query_flow_inspector::QueryKey::new("", s.clone()))
214 .collect(),
215 });
216
217 #[cfg(feature = "inspector")]
218 self.emit(|| FlowEvent::QueryEnd {
219 span_id,
220 query: query_key.clone(),
221 result: query_flow_inspector::ExecutionResult::CycleDetected,
222 duration: start_time.elapsed(),
223 });
224
225 return Err(QueryError::Cycle { path });
226 }
227
228 let current_rev = self.whale.current_revision();
230
231 if self.whale.is_verified_at(&full_key, ¤t_rev) {
233 if let Some(cached) = self.cache.get_cached::<Q>(&full_key) {
234 #[cfg(feature = "inspector")]
235 self.emit(|| FlowEvent::CacheCheck {
236 span_id,
237 query: query_key.clone(),
238 valid: true,
239 });
240
241 #[cfg(feature = "inspector")]
242 self.emit(|| FlowEvent::QueryEnd {
243 span_id,
244 query: query_key.clone(),
245 result: query_flow_inspector::ExecutionResult::CacheHit,
246 duration: start_time.elapsed(),
247 });
248
249 return match cached {
250 CachedValue::Ok(output) => Ok(output),
251 CachedValue::UserError(err) => Err(QueryError::UserError(err)),
252 };
253 }
254 }
255
256 if self.whale.is_valid(&full_key) {
258 if let Some(cached) = self.cache.get_cached::<Q>(&full_key) {
259 let mut deps_verified = true;
261 if let Some(deps) = self.whale.get_dependency_ids(&full_key) {
262 for dep in deps {
263 if let Some(verifier) = self.verifiers.get(&dep) {
264 if verifier.verify(self).is_err() {
266 deps_verified = false;
267 break;
268 }
269 }
270 }
271 }
272
273 if deps_verified && self.whale.is_valid(&full_key) {
275 self.whale.mark_verified(&full_key, ¤t_rev);
277
278 #[cfg(feature = "inspector")]
279 self.emit(|| FlowEvent::CacheCheck {
280 span_id,
281 query: query_key.clone(),
282 valid: true,
283 });
284
285 #[cfg(feature = "inspector")]
286 self.emit(|| FlowEvent::QueryEnd {
287 span_id,
288 query: query_key.clone(),
289 result: query_flow_inspector::ExecutionResult::CacheHit,
290 duration: start_time.elapsed(),
291 });
292
293 return match cached {
294 CachedValue::Ok(output) => Ok(output),
295 CachedValue::UserError(err) => Err(QueryError::UserError(err)),
296 };
297 }
298 }
300 }
301
302 #[cfg(feature = "inspector")]
303 self.emit(|| FlowEvent::CacheCheck {
304 span_id,
305 query: query_key.clone(),
306 valid: false,
307 });
308
309 QUERY_STACK.with(|stack| {
311 stack.borrow_mut().push(full_key.clone());
312 });
313
314 #[cfg(feature = "inspector")]
315 let result = self.execute_query::<Q>(&query, &full_key, span_id);
316 #[cfg(not(feature = "inspector"))]
317 let result = self.execute_query::<Q>(&query, &full_key);
318
319 QUERY_STACK.with(|stack| {
320 stack.borrow_mut().pop();
321 });
322
323 #[cfg(feature = "inspector")]
325 {
326 let exec_result = match &result {
327 Ok((_, true)) => query_flow_inspector::ExecutionResult::Changed,
328 Ok((_, false)) => query_flow_inspector::ExecutionResult::Unchanged,
329 Err(QueryError::Suspend) => query_flow_inspector::ExecutionResult::Suspended,
330 Err(QueryError::Cycle { .. }) => {
331 query_flow_inspector::ExecutionResult::CycleDetected
332 }
333 Err(e) => query_flow_inspector::ExecutionResult::Error {
334 message: format!("{:?}", e),
335 },
336 };
337 self.emit(|| FlowEvent::QueryEnd {
338 span_id,
339 query: query_key.clone(),
340 result: exec_result,
341 duration: start_time.elapsed(),
342 });
343 }
344
345 result.map(|(output, _)| output)
346 }
347
348 #[cfg(feature = "inspector")]
352 fn execute_query<Q: Query>(
353 &self,
354 query: &Q,
355 full_key: &FullCacheKey,
356 span_id: SpanId,
357 ) -> Result<(Arc<Q::Output>, bool), QueryError> {
358 let mut ctx = QueryContext {
360 runtime: self,
361 current_key: full_key.clone(),
362 parent_query_type: std::any::type_name::<Q>(),
363 span_id,
364 deps: RefCell::new(Vec::new()),
365 };
366
367 let result = query.query(&mut ctx);
369
370 let deps: Vec<FullCacheKey> = ctx.deps.borrow().clone();
372
373 let durability =
375 Durability::new(query.durability() as usize).unwrap_or(Durability::volatile());
376
377 match result {
378 Ok(output) => {
379 let output = Arc::new(output);
380
381 let output_changed =
383 if let Some(CachedValue::Ok(old)) = self.cache.get_cached::<Q>(full_key) {
384 !Q::output_eq(&old, &output)
385 } else {
386 true };
388
389 self.emit(|| FlowEvent::EarlyCutoffCheck {
391 span_id,
392 query: query_flow_inspector::QueryKey::new(
393 std::any::type_name::<Q>(),
394 full_key.debug_repr(),
395 ),
396 output_changed,
397 });
398
399 self.cache.insert_ok::<Q>(full_key.clone(), output.clone());
401
402 if output_changed {
404 let _ = self.whale.register(full_key.clone(), (), durability, deps);
405 } else {
406 let _ = self.whale.confirm_unchanged(full_key, deps);
407 }
408
409 let is_new_query = self.query_registry.register(query);
411 if is_new_query {
412 let sentinel = FullCacheKey::query_set_sentinel::<Q>();
413 let _ = self
414 .whale
415 .register(sentinel, (), Durability::volatile(), vec![]);
416 }
417
418 self.verifiers.insert(full_key.clone(), query.clone());
420
421 Ok((output, output_changed))
422 }
423 Err(QueryError::UserError(err)) => {
424 let output_changed = if let Some(CachedValue::UserError(old_err)) =
426 self.cache.get_cached::<Q>(full_key)
427 {
428 !(self.error_comparator)(old_err.as_ref(), err.as_ref())
429 } else {
430 true };
432
433 self.emit(|| FlowEvent::EarlyCutoffCheck {
435 span_id,
436 query: query_flow_inspector::QueryKey::new(
437 std::any::type_name::<Q>(),
438 full_key.debug_repr(),
439 ),
440 output_changed,
441 });
442
443 self.cache.insert_error(full_key.clone(), err.clone());
445
446 if output_changed {
448 let _ = self.whale.register(full_key.clone(), (), durability, deps);
449 } else {
450 let _ = self.whale.confirm_unchanged(full_key, deps);
451 }
452
453 let is_new_query = self.query_registry.register(query);
455 if is_new_query {
456 let sentinel = FullCacheKey::query_set_sentinel::<Q>();
457 let _ = self
458 .whale
459 .register(sentinel, (), Durability::volatile(), vec![]);
460 }
461
462 self.verifiers.insert(full_key.clone(), query.clone());
464
465 Err(QueryError::UserError(err))
466 }
467 Err(e) => {
468 Err(e)
470 }
471 }
472 }
473
474 #[cfg(not(feature = "inspector"))]
478 fn execute_query<Q: Query>(
479 &self,
480 query: &Q,
481 full_key: &FullCacheKey,
482 ) -> Result<(Arc<Q::Output>, bool), QueryError> {
483 let mut ctx = QueryContext {
485 runtime: self,
486 current_key: full_key.clone(),
487 deps: RefCell::new(Vec::new()),
488 };
489
490 let result = query.query(&mut ctx);
492
493 let deps: Vec<FullCacheKey> = ctx.deps.borrow().clone();
495
496 let durability =
498 Durability::new(query.durability() as usize).unwrap_or(Durability::volatile());
499
500 match result {
501 Ok(output) => {
502 let output = Arc::new(output);
503
504 let output_changed =
506 if let Some(CachedValue::Ok(old)) = self.cache.get_cached::<Q>(full_key) {
507 !Q::output_eq(&old, &output)
508 } else {
509 true };
511
512 self.cache.insert_ok::<Q>(full_key.clone(), output.clone());
514
515 if output_changed {
517 let _ = self.whale.register(full_key.clone(), (), durability, deps);
518 } else {
519 let _ = self.whale.confirm_unchanged(full_key, deps);
520 }
521
522 let is_new_query = self.query_registry.register(query);
524 if is_new_query {
525 let sentinel = FullCacheKey::query_set_sentinel::<Q>();
526 let _ = self
527 .whale
528 .register(sentinel, (), Durability::volatile(), vec![]);
529 }
530
531 self.verifiers.insert(full_key.clone(), query.clone());
533
534 Ok((output, output_changed))
535 }
536 Err(QueryError::UserError(err)) => {
537 let output_changed = if let Some(CachedValue::UserError(old_err)) =
539 self.cache.get_cached::<Q>(full_key)
540 {
541 !(self.error_comparator)(old_err.as_ref(), err.as_ref())
542 } else {
543 true };
545
546 self.cache.insert_error(full_key.clone(), err.clone());
548
549 if output_changed {
551 let _ = self.whale.register(full_key.clone(), (), durability, deps);
552 } else {
553 let _ = self.whale.confirm_unchanged(full_key, deps);
554 }
555
556 let is_new_query = self.query_registry.register(query);
558 if is_new_query {
559 let sentinel = FullCacheKey::query_set_sentinel::<Q>();
560 let _ = self
561 .whale
562 .register(sentinel, (), Durability::volatile(), vec![]);
563 }
564
565 self.verifiers.insert(full_key.clone(), query.clone());
567
568 Err(QueryError::UserError(err))
569 }
570 Err(e) => {
571 Err(e)
573 }
574 }
575 }
576
577 pub fn invalidate<Q: Query>(&self, key: &Q::CacheKey) {
581 let full_key = FullCacheKey::new::<Q, _>(key);
582
583 #[cfg(feature = "inspector")]
584 self.emit(|| FlowEvent::QueryInvalidated {
585 query: query_flow_inspector::QueryKey::new(
586 std::any::type_name::<Q>(),
587 full_key.debug_repr(),
588 ),
589 reason: query_flow_inspector::InvalidationReason::ManualInvalidation,
590 });
591
592 self.cache.remove(&full_key);
594
595 let _ = self
597 .whale
598 .register(full_key, (), Durability::volatile(), vec![]);
599 }
600
601 pub fn clear_cache(&self) {
603 self.cache.clear();
604 }
605}
606
607pub struct QueryRuntimeBuilder {
625 error_comparator: ErrorComparator,
626}
627
628impl Default for QueryRuntimeBuilder {
629 fn default() -> Self {
630 Self::new()
631 }
632}
633
634impl QueryRuntimeBuilder {
635 pub fn new() -> Self {
637 Self {
638 error_comparator: default_error_comparator,
639 }
640 }
641
642 pub fn error_comparator(mut self, f: ErrorComparator) -> Self {
660 self.error_comparator = f;
661 self
662 }
663
664 pub fn build(self) -> QueryRuntime {
666 QueryRuntime {
667 whale: WhaleRuntime::new(),
668 cache: Arc::new(CacheStorage::new()),
669 assets: Arc::new(AssetStorage::new()),
670 locators: Arc::new(LocatorStorage::new()),
671 pending: Arc::new(PendingStorage::new()),
672 query_registry: Arc::new(QueryRegistry::new()),
673 asset_key_registry: Arc::new(AssetKeyRegistry::new()),
674 verifiers: Arc::new(VerifierStorage::new()),
675 error_comparator: self.error_comparator,
676 #[cfg(feature = "inspector")]
677 sink: Arc::new(parking_lot::RwLock::new(None)),
678 }
679 }
680}
681
682impl QueryRuntime {
687 pub fn register_asset_locator<K, L>(&self, locator: L)
699 where
700 K: AssetKey,
701 L: AssetLocator<K>,
702 {
703 self.locators.insert::<K, L>(locator);
704 }
705
706 pub fn pending_assets(&self) -> Vec<PendingAsset> {
722 self.pending.get_all()
723 }
724
725 pub fn pending_assets_of<K: AssetKey>(&self) -> Vec<K> {
727 self.pending.get_of_type::<K>()
728 }
729
730 pub fn has_pending_assets(&self) -> bool {
732 !self.pending.is_empty()
733 }
734
735 pub fn resolve_asset<K: AssetKey>(&self, key: K, value: K::Asset) {
752 let durability = key.durability();
753 self.resolve_asset_internal(key, value, durability);
754 }
755
756 pub fn resolve_asset_with_durability<K: AssetKey>(
760 &self,
761 key: K,
762 value: K::Asset,
763 durability: DurabilityLevel,
764 ) {
765 self.resolve_asset_internal(key, value, durability);
766 }
767
768 fn resolve_asset_internal<K: AssetKey>(
769 &self,
770 key: K,
771 value: K::Asset,
772 durability_level: DurabilityLevel,
773 ) {
774 let full_asset_key = FullAssetKey::new(&key);
775 let full_cache_key = FullCacheKey::from_asset_key(&full_asset_key);
776
777 let changed = if let Some(old_value) = self.assets.get_ready::<K>(&full_asset_key) {
779 !K::asset_eq(&old_value, &value)
780 } else {
781 true };
783
784 #[cfg(feature = "inspector")]
786 self.emit(|| FlowEvent::AssetResolved {
787 asset: query_flow_inspector::AssetKey::new(
788 std::any::type_name::<K>(),
789 format!("{:?}", key),
790 ),
791 changed,
792 });
793
794 self.assets
796 .insert_ready::<K>(full_asset_key.clone(), Arc::new(value));
797
798 self.pending.remove(&full_asset_key);
800
801 let durability =
803 Durability::new(durability_level.as_u8() as usize).unwrap_or(Durability::volatile());
804
805 if changed {
806 let _ = self.whale.register(full_cache_key, (), durability, vec![]);
808 } else {
809 let _ = self.whale.confirm_unchanged(&full_cache_key, vec![]);
811 }
812
813 let is_new_asset = self.asset_key_registry.register(&key);
815 if is_new_asset {
816 let sentinel = FullCacheKey::asset_key_set_sentinel::<K>();
818 let _ = self
819 .whale
820 .register(sentinel, (), Durability::volatile(), vec![]);
821 }
822 }
823
824 pub fn invalidate_asset<K: AssetKey>(&self, key: &K) {
838 let full_asset_key = FullAssetKey::new(key);
839 let full_cache_key = FullCacheKey::from_asset_key(&full_asset_key);
840
841 #[cfg(feature = "inspector")]
843 self.emit(|| FlowEvent::AssetInvalidated {
844 asset: query_flow_inspector::AssetKey::new(
845 std::any::type_name::<K>(),
846 format!("{:?}", key),
847 ),
848 });
849
850 self.assets
852 .insert(full_asset_key.clone(), AssetState::Loading);
853
854 self.pending.insert::<K>(full_asset_key, key.clone());
856
857 let _ = self
859 .whale
860 .register(full_cache_key, (), Durability::volatile(), vec![]);
861 }
862
863 pub fn remove_asset<K: AssetKey>(&self, key: &K) {
868 let full_asset_key = FullAssetKey::new(key);
869 let full_cache_key = FullCacheKey::from_asset_key(&full_asset_key);
870
871 let _ = self
874 .whale
875 .register(full_cache_key.clone(), (), Durability::volatile(), vec![]);
876
877 self.assets.remove(&full_asset_key);
879 self.pending.remove(&full_asset_key);
880
881 self.whale.remove(&full_cache_key);
883
884 if self.asset_key_registry.remove::<K>(key) {
886 let sentinel = FullCacheKey::asset_key_set_sentinel::<K>();
887 let _ = self
888 .whale
889 .register(sentinel, (), Durability::volatile(), vec![]);
890 }
891 }
892
893 fn get_asset_internal<K: AssetKey>(
895 &self,
896 key: &K,
897 ) -> Result<LoadingState<Arc<K::Asset>>, QueryError> {
898 let full_asset_key = FullAssetKey::new(key);
899 let full_cache_key = FullCacheKey::from_asset_key(&full_asset_key);
900
901 #[cfg(feature = "inspector")]
903 let emit_requested = |runtime: &Self, state: query_flow_inspector::AssetState| {
904 runtime.emit(|| FlowEvent::AssetRequested {
905 asset: query_flow_inspector::AssetKey::new(
906 std::any::type_name::<K>(),
907 format!("{:?}", key),
908 ),
909 state,
910 });
911 };
912
913 if let Some(state) = self.assets.get(&full_asset_key) {
915 if self.whale.is_valid(&full_cache_key) {
917 return match state {
918 AssetState::Loading => {
919 #[cfg(feature = "inspector")]
920 emit_requested(self, query_flow_inspector::AssetState::Loading);
921 Ok(LoadingState::Loading)
922 }
923 AssetState::Ready(arc) => {
924 #[cfg(feature = "inspector")]
925 emit_requested(self, query_flow_inspector::AssetState::Ready);
926 match arc.downcast::<K::Asset>() {
927 Ok(value) => Ok(LoadingState::Ready(value)),
928 Err(_) => Err(QueryError::MissingDependency {
929 description: format!("Asset type mismatch: {:?}", key),
930 }),
931 }
932 }
933 AssetState::NotFound => {
934 #[cfg(feature = "inspector")]
935 emit_requested(self, query_flow_inspector::AssetState::NotFound);
936 Err(QueryError::MissingDependency {
937 description: format!("Asset not found: {:?}", key),
938 })
939 }
940 };
941 }
942 }
943
944 if let Some(locator) = self.locators.get(TypeId::of::<K>()) {
946 if let Some(state) = locator.locate_any(key) {
947 self.assets.insert(full_asset_key.clone(), state.clone());
949
950 match state {
951 AssetState::Ready(arc) => {
952 #[cfg(feature = "inspector")]
953 emit_requested(self, query_flow_inspector::AssetState::Ready);
954
955 let durability = Durability::new(key.durability().as_u8() as usize)
957 .unwrap_or(Durability::volatile());
958 let _ = self.whale.register(full_cache_key, (), durability, vec![]);
959
960 match arc.downcast::<K::Asset>() {
961 Ok(value) => return Ok(LoadingState::Ready(value)),
962 Err(_) => {
963 return Err(QueryError::MissingDependency {
964 description: format!("Asset type mismatch: {:?}", key),
965 })
966 }
967 }
968 }
969 AssetState::Loading => {
970 #[cfg(feature = "inspector")]
971 emit_requested(self, query_flow_inspector::AssetState::Loading);
972 self.pending.insert::<K>(full_asset_key, key.clone());
973 return Ok(LoadingState::Loading);
974 }
975 AssetState::NotFound => {
976 #[cfg(feature = "inspector")]
977 emit_requested(self, query_flow_inspector::AssetState::NotFound);
978 return Err(QueryError::MissingDependency {
979 description: format!("Asset not found: {:?}", key),
980 });
981 }
982 }
983 }
984 }
985
986 #[cfg(feature = "inspector")]
988 emit_requested(self, query_flow_inspector::AssetState::Loading);
989 self.assets
990 .insert(full_asset_key.clone(), AssetState::Loading);
991 self.pending.insert::<K>(full_asset_key, key.clone());
992 Ok(LoadingState::Loading)
993 }
994}
995
996#[cfg(feature = "inspector")]
1000pub struct QueryContext<'a> {
1001 runtime: &'a QueryRuntime,
1002 current_key: FullCacheKey,
1003 parent_query_type: &'static str,
1004 span_id: SpanId,
1005 deps: RefCell<Vec<FullCacheKey>>,
1006}
1007
1008#[cfg(not(feature = "inspector"))]
1012pub struct QueryContext<'a> {
1013 runtime: &'a QueryRuntime,
1014 #[allow(dead_code)]
1015 current_key: FullCacheKey,
1016 deps: RefCell<Vec<FullCacheKey>>,
1017}
1018
1019impl<'a> QueryContext<'a> {
1020 pub fn query<Q: Query>(&mut self, query: Q) -> Result<Arc<Q::Output>, QueryError> {
1033 let key = query.cache_key();
1034 let full_key = FullCacheKey::new::<Q, _>(&key);
1035
1036 #[cfg(feature = "inspector")]
1038 self.runtime.emit(|| FlowEvent::DependencyRegistered {
1039 span_id: self.span_id,
1040 parent: query_flow_inspector::QueryKey::new(
1041 self.parent_query_type,
1042 self.current_key.debug_repr(),
1043 ),
1044 dependency: query_flow_inspector::QueryKey::new(
1045 std::any::type_name::<Q>(),
1046 full_key.debug_repr(),
1047 ),
1048 });
1049
1050 self.deps.borrow_mut().push(full_key.clone());
1052
1053 self.runtime.query(query)
1055 }
1056
1057 pub fn asset<K: AssetKey>(
1078 &mut self,
1079 key: &K,
1080 ) -> Result<LoadingState<Arc<K::Asset>>, QueryError> {
1081 let full_asset_key = FullAssetKey::new(key);
1082 let full_cache_key = FullCacheKey::from_asset_key(&full_asset_key);
1083
1084 #[cfg(feature = "inspector")]
1086 self.runtime.emit(|| FlowEvent::AssetDependencyRegistered {
1087 span_id: self.span_id,
1088 parent: query_flow_inspector::QueryKey::new(
1089 self.parent_query_type,
1090 self.current_key.debug_repr(),
1091 ),
1092 asset: query_flow_inspector::AssetKey::new(
1093 std::any::type_name::<K>(),
1094 format!("{:?}", key),
1095 ),
1096 });
1097
1098 self.deps.borrow_mut().push(full_cache_key);
1100
1101 let result = self.runtime.get_asset_internal(key);
1103
1104 #[cfg(feature = "inspector")]
1106 if let Err(QueryError::MissingDependency { ref description }) = result {
1107 self.runtime.emit(|| FlowEvent::MissingDependency {
1108 query: query_flow_inspector::QueryKey::new(
1109 self.parent_query_type,
1110 self.current_key.debug_repr(),
1111 ),
1112 dependency_description: description.clone(),
1113 });
1114 }
1115
1116 result
1117 }
1118
1119 pub fn list_queries<Q: Query>(&mut self) -> Vec<Q> {
1142 let sentinel = FullCacheKey::query_set_sentinel::<Q>();
1144
1145 #[cfg(feature = "inspector")]
1146 self.runtime.emit(|| FlowEvent::DependencyRegistered {
1147 span_id: self.span_id,
1148 parent: query_flow_inspector::QueryKey::new(
1149 self.parent_query_type,
1150 self.current_key.debug_repr(),
1151 ),
1152 dependency: query_flow_inspector::QueryKey::new("QuerySet", sentinel.debug_repr()),
1153 });
1154
1155 self.deps.borrow_mut().push(sentinel);
1156
1157 self.runtime.query_registry.get_all::<Q>()
1159 }
1160
1161 pub fn list_asset_keys<K: AssetKey>(&mut self) -> Vec<K> {
1186 let sentinel = FullCacheKey::asset_key_set_sentinel::<K>();
1188
1189 #[cfg(feature = "inspector")]
1190 self.runtime.emit(|| FlowEvent::AssetDependencyRegistered {
1191 span_id: self.span_id,
1192 parent: query_flow_inspector::QueryKey::new(
1193 self.parent_query_type,
1194 self.current_key.debug_repr(),
1195 ),
1196 asset: query_flow_inspector::AssetKey::new("AssetKeySet", sentinel.debug_repr()),
1197 });
1198
1199 self.deps.borrow_mut().push(sentinel);
1200
1201 self.runtime.asset_key_registry.get_all::<K>()
1203 }
1204}
1205
1206#[cfg(test)]
1207mod tests {
1208 use super::*;
1209
1210 #[test]
1211 fn test_simple_query() {
1212 #[derive(Clone)]
1213 struct Add {
1214 a: i32,
1215 b: i32,
1216 }
1217
1218 impl Query for Add {
1219 type CacheKey = (i32, i32);
1220 type Output = i32;
1221
1222 fn cache_key(&self) -> Self::CacheKey {
1223 (self.a, self.b)
1224 }
1225
1226 fn query(&self, _ctx: &mut QueryContext) -> Result<Self::Output, QueryError> {
1227 Ok(self.a + self.b)
1228 }
1229
1230 fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
1231 old == new
1232 }
1233 }
1234
1235 let runtime = QueryRuntime::new();
1236
1237 let result = runtime.query(Add { a: 1, b: 2 }).unwrap();
1238 assert_eq!(*result, 3);
1239
1240 let result2 = runtime.query(Add { a: 1, b: 2 }).unwrap();
1242 assert_eq!(*result2, 3);
1243 }
1244
1245 #[test]
1246 fn test_dependent_queries() {
1247 #[derive(Clone)]
1248 struct Base {
1249 value: i32,
1250 }
1251
1252 impl Query for Base {
1253 type CacheKey = i32;
1254 type Output = i32;
1255
1256 fn cache_key(&self) -> Self::CacheKey {
1257 self.value
1258 }
1259
1260 fn query(&self, _ctx: &mut QueryContext) -> Result<Self::Output, QueryError> {
1261 Ok(self.value * 2)
1262 }
1263
1264 fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
1265 old == new
1266 }
1267 }
1268
1269 #[derive(Clone)]
1270 struct Derived {
1271 base_value: i32,
1272 }
1273
1274 impl Query for Derived {
1275 type CacheKey = i32;
1276 type Output = i32;
1277
1278 fn cache_key(&self) -> Self::CacheKey {
1279 self.base_value
1280 }
1281
1282 fn query(&self, ctx: &mut QueryContext) -> Result<Self::Output, QueryError> {
1283 let base = ctx.query(Base {
1284 value: self.base_value,
1285 })?;
1286 Ok(*base + 10)
1287 }
1288
1289 fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
1290 old == new
1291 }
1292 }
1293
1294 let runtime = QueryRuntime::new();
1295
1296 let result = runtime.query(Derived { base_value: 5 }).unwrap();
1297 assert_eq!(*result, 20); }
1299
1300 #[test]
1301 fn test_cycle_detection() {
1302 #[derive(Clone)]
1303 struct CycleA {
1304 id: i32,
1305 }
1306
1307 #[derive(Clone)]
1308 struct CycleB {
1309 id: i32,
1310 }
1311
1312 impl Query for CycleA {
1313 type CacheKey = i32;
1314 type Output = i32;
1315
1316 fn cache_key(&self) -> Self::CacheKey {
1317 self.id
1318 }
1319
1320 fn query(&self, ctx: &mut QueryContext) -> Result<Self::Output, QueryError> {
1321 let b = ctx.query(CycleB { id: self.id })?;
1322 Ok(*b + 1)
1323 }
1324
1325 fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
1326 old == new
1327 }
1328 }
1329
1330 impl Query for CycleB {
1331 type CacheKey = i32;
1332 type Output = i32;
1333
1334 fn cache_key(&self) -> Self::CacheKey {
1335 self.id
1336 }
1337
1338 fn query(&self, ctx: &mut QueryContext) -> Result<Self::Output, QueryError> {
1339 let a = ctx.query(CycleA { id: self.id })?;
1340 Ok(*a + 1)
1341 }
1342
1343 fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
1344 old == new
1345 }
1346 }
1347
1348 let runtime = QueryRuntime::new();
1349
1350 let result = runtime.query(CycleA { id: 1 });
1351 assert!(matches!(result, Err(QueryError::Cycle { .. })));
1352 }
1353
1354 #[test]
1355 fn test_fallible_query() {
1356 #[derive(Clone)]
1357 struct ParseInt {
1358 input: String,
1359 }
1360
1361 impl Query for ParseInt {
1362 type CacheKey = String;
1363 type Output = Result<i32, std::num::ParseIntError>;
1364
1365 fn cache_key(&self) -> Self::CacheKey {
1366 self.input.clone()
1367 }
1368
1369 fn query(&self, _ctx: &mut QueryContext) -> Result<Self::Output, QueryError> {
1370 Ok(self.input.parse())
1371 }
1372
1373 fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
1374 old == new
1375 }
1376 }
1377
1378 let runtime = QueryRuntime::new();
1379
1380 let result = runtime
1382 .query(ParseInt {
1383 input: "42".to_string(),
1384 })
1385 .unwrap();
1386 assert_eq!(*result, Ok(42));
1387
1388 let result = runtime
1390 .query(ParseInt {
1391 input: "not_a_number".to_string(),
1392 })
1393 .unwrap();
1394 assert!(result.is_err());
1395 }
1396
1397 mod macro_tests {
1399 use super::*;
1400 use crate::query;
1401
1402 #[query]
1403 fn add(ctx: &mut QueryContext, a: i32, b: i32) -> Result<i32, QueryError> {
1404 let _ = ctx; Ok(a + b)
1406 }
1407
1408 #[test]
1409 fn test_macro_basic() {
1410 let runtime = QueryRuntime::new();
1411 let result = runtime.query(Add::new(1, 2)).unwrap();
1412 assert_eq!(*result, 3);
1413 }
1414
1415 #[query(durability = 2)]
1416 fn with_durability(ctx: &mut QueryContext, x: i32) -> Result<i32, QueryError> {
1417 let _ = ctx;
1418 Ok(x * 2)
1419 }
1420
1421 #[test]
1422 fn test_macro_durability() {
1423 let runtime = QueryRuntime::new();
1424 let result = runtime.query(WithDurability::new(5)).unwrap();
1425 assert_eq!(*result, 10);
1426 }
1427
1428 #[query(keys(id))]
1429 fn with_key_selection(
1430 ctx: &mut QueryContext,
1431 id: u32,
1432 include_extra: bool,
1433 ) -> Result<String, QueryError> {
1434 let _ = ctx;
1435 Ok(format!("id={}, extra={}", id, include_extra))
1436 }
1437
1438 #[test]
1439 fn test_macro_key_selection() {
1440 let runtime = QueryRuntime::new();
1441
1442 let r1 = runtime.query(WithKeySelection::new(1, true)).unwrap();
1444 let r2 = runtime.query(WithKeySelection::new(1, false)).unwrap();
1445
1446 assert_eq!(*r1, "id=1, extra=true");
1448 assert_eq!(*r2, "id=1, extra=true"); }
1450
1451 #[query]
1452 fn dependent(ctx: &mut QueryContext, a: i32, b: i32) -> Result<i32, QueryError> {
1453 let sum = ctx.query(Add::new(*a, *b))?;
1454 Ok(*sum * 2)
1455 }
1456
1457 #[test]
1458 fn test_macro_dependencies() {
1459 let runtime = QueryRuntime::new();
1460 let result = runtime.query(Dependent::new(3, 4)).unwrap();
1461 assert_eq!(*result, 14); }
1463
1464 #[query(output_eq)]
1465 fn with_output_eq(ctx: &mut QueryContext, x: i32) -> Result<i32, QueryError> {
1466 let _ = ctx;
1467 Ok(*x * 2)
1468 }
1469
1470 #[test]
1471 fn test_macro_output_eq() {
1472 let runtime = QueryRuntime::new();
1473 let result = runtime.query(WithOutputEq::new(5)).unwrap();
1474 assert_eq!(*result, 10);
1475 }
1476
1477 #[query(name = "CustomName")]
1478 fn original_name(ctx: &mut QueryContext, x: i32) -> Result<i32, QueryError> {
1479 let _ = ctx;
1480 Ok(*x)
1481 }
1482
1483 #[test]
1484 fn test_macro_custom_name() {
1485 let runtime = QueryRuntime::new();
1486 let result = runtime.query(CustomName::new(42)).unwrap();
1487 assert_eq!(*result, 42);
1488 }
1489
1490 #[allow(unused_variables)]
1494 #[inline]
1495 #[query]
1496 fn with_attributes(ctx: &mut QueryContext, x: i32) -> Result<i32, QueryError> {
1497 let unused_var = 42;
1499 Ok(*x * 2)
1500 }
1501
1502 #[test]
1503 fn test_macro_preserves_attributes() {
1504 let runtime = QueryRuntime::new();
1505 let result = runtime.query(WithAttributes::new(5)).unwrap();
1507 assert_eq!(*result, 10);
1508 }
1509 }
1510}