1use std::any::TypeId;
4use std::cell::RefCell;
5use std::sync::Arc;
6
7use whale::{Durability, Runtime as WhaleRuntime};
8
9use crate::asset::{AssetKey, AssetLocator, DurabilityLevel, FullAssetKey, PendingAsset};
10use crate::key::FullCacheKey;
11use crate::loading::LoadingState;
12use crate::query::Query;
13use crate::storage::{
14 AssetKeyRegistry, AssetState, AssetStorage, CacheStorage, CachedValue, LocatorStorage,
15 PendingStorage, QueryRegistry,
16};
17use crate::QueryError;
18
19pub type ErrorComparator = fn(&anyhow::Error, &anyhow::Error) -> bool;
24
25#[cfg(feature = "inspector")]
26use query_flow_inspector::{EventSink, FlowEvent, SpanId};
27
28const DURABILITY_LEVELS: usize = 4;
30
31thread_local! {
33 static QUERY_STACK: RefCell<Vec<FullCacheKey>> = const { RefCell::new(Vec::new()) };
34}
35
36pub struct QueryRuntime {
52 whale: WhaleRuntime<FullCacheKey, (), DURABILITY_LEVELS>,
54 cache: Arc<CacheStorage>,
56 assets: Arc<AssetStorage>,
58 locators: Arc<LocatorStorage>,
60 pending: Arc<PendingStorage>,
62 query_registry: Arc<QueryRegistry>,
64 asset_key_registry: Arc<AssetKeyRegistry>,
66 error_comparator: ErrorComparator,
68 #[cfg(feature = "inspector")]
70 sink: Arc<parking_lot::RwLock<Option<Arc<dyn EventSink>>>>,
71}
72
73impl Default for QueryRuntime {
74 fn default() -> Self {
75 Self::new()
76 }
77}
78
79impl Clone for QueryRuntime {
80 fn clone(&self) -> Self {
81 Self {
82 whale: self.whale.clone(),
83 cache: self.cache.clone(),
84 assets: self.assets.clone(),
85 locators: self.locators.clone(),
86 pending: self.pending.clone(),
87 query_registry: self.query_registry.clone(),
88 asset_key_registry: self.asset_key_registry.clone(),
89 error_comparator: self.error_comparator,
90 #[cfg(feature = "inspector")]
91 sink: self.sink.clone(),
92 }
93 }
94}
95
96fn default_error_comparator(_a: &anyhow::Error, _b: &anyhow::Error) -> bool {
100 false
101}
102
103impl QueryRuntime {
104 pub fn new() -> Self {
106 Self::builder().build()
107 }
108
109 pub fn builder() -> QueryRuntimeBuilder {
125 QueryRuntimeBuilder::new()
126 }
127
128 #[cfg(feature = "inspector")]
144 pub fn set_sink(&self, sink: Option<Arc<dyn EventSink>>) {
145 *self.sink.write() = sink;
146 }
147
148 #[cfg(feature = "inspector")]
150 pub fn sink(&self) -> Option<Arc<dyn EventSink>> {
151 self.sink.read().clone()
152 }
153
154 #[cfg(feature = "inspector")]
156 #[inline]
157 fn emit<F: FnOnce() -> FlowEvent>(&self, event: F) {
158 let guard = self.sink.read();
159 if let Some(ref sink) = *guard {
160 sink.emit(event());
161 }
162 }
163
164 pub fn query<Q: Query>(&self, query: Q) -> Result<Arc<Q::Output>, QueryError> {
173 let key = query.cache_key();
174 let full_key = FullCacheKey::new::<Q, _>(&key);
175
176 #[cfg(feature = "inspector")]
178 let span_id = query_flow_inspector::new_span_id();
179 #[cfg(feature = "inspector")]
180 let start_time = std::time::Instant::now();
181 #[cfg(feature = "inspector")]
182 let query_key =
183 query_flow_inspector::QueryKey::new(std::any::type_name::<Q>(), full_key.debug_repr());
184
185 #[cfg(feature = "inspector")]
186 self.emit(|| FlowEvent::QueryStart {
187 span_id,
188 query: query_key.clone(),
189 });
190
191 let cycle_detected = QUERY_STACK.with(|stack| {
193 let stack = stack.borrow();
194 stack.iter().any(|k| k == &full_key)
195 });
196
197 if cycle_detected {
198 let path = QUERY_STACK.with(|stack| {
199 let stack = stack.borrow();
200 let mut path: Vec<String> =
201 stack.iter().map(|k| k.debug_repr().to_string()).collect();
202 path.push(full_key.debug_repr().to_string());
203 path
204 });
205
206 #[cfg(feature = "inspector")]
207 self.emit(|| FlowEvent::CycleDetected {
208 path: path
209 .iter()
210 .map(|s| query_flow_inspector::QueryKey::new("", s.clone()))
211 .collect(),
212 });
213
214 #[cfg(feature = "inspector")]
215 self.emit(|| FlowEvent::QueryEnd {
216 span_id,
217 query: query_key.clone(),
218 result: query_flow_inspector::ExecutionResult::CycleDetected,
219 duration: start_time.elapsed(),
220 });
221
222 return Err(QueryError::Cycle { path });
223 }
224
225 if let Some(cached) = self.get_cached_if_valid::<Q>(&full_key) {
227 #[cfg(feature = "inspector")]
228 self.emit(|| FlowEvent::CacheCheck {
229 span_id,
230 query: query_key.clone(),
231 valid: true,
232 });
233
234 #[cfg(feature = "inspector")]
235 self.emit(|| FlowEvent::QueryEnd {
236 span_id,
237 query: query_key.clone(),
238 result: query_flow_inspector::ExecutionResult::CacheHit,
239 duration: start_time.elapsed(),
240 });
241
242 return match cached {
243 CachedValue::Ok(output) => Ok(output),
244 CachedValue::UserError(err) => Err(QueryError::UserError(err)),
245 };
246 }
247
248 #[cfg(feature = "inspector")]
249 self.emit(|| FlowEvent::CacheCheck {
250 span_id,
251 query: query_key.clone(),
252 valid: false,
253 });
254
255 QUERY_STACK.with(|stack| {
257 stack.borrow_mut().push(full_key.clone());
258 });
259
260 #[cfg(feature = "inspector")]
261 let result = self.execute_query::<Q>(&query, &full_key, span_id);
262 #[cfg(not(feature = "inspector"))]
263 let result = self.execute_query::<Q>(&query, &full_key);
264
265 QUERY_STACK.with(|stack| {
266 stack.borrow_mut().pop();
267 });
268
269 #[cfg(feature = "inspector")]
271 {
272 let exec_result = match &result {
273 Ok((_, true)) => query_flow_inspector::ExecutionResult::Changed,
274 Ok((_, false)) => query_flow_inspector::ExecutionResult::Unchanged,
275 Err(QueryError::Suspend) => query_flow_inspector::ExecutionResult::Suspended,
276 Err(QueryError::Cycle { .. }) => {
277 query_flow_inspector::ExecutionResult::CycleDetected
278 }
279 Err(e) => query_flow_inspector::ExecutionResult::Error {
280 message: format!("{:?}", e),
281 },
282 };
283 self.emit(|| FlowEvent::QueryEnd {
284 span_id,
285 query: query_key.clone(),
286 result: exec_result,
287 duration: start_time.elapsed(),
288 });
289 }
290
291 result.map(|(output, _)| output)
292 }
293
294 #[cfg(feature = "inspector")]
298 fn execute_query<Q: Query>(
299 &self,
300 query: &Q,
301 full_key: &FullCacheKey,
302 span_id: SpanId,
303 ) -> Result<(Arc<Q::Output>, bool), QueryError> {
304 let mut ctx = QueryContext {
306 runtime: self,
307 current_key: full_key.clone(),
308 parent_query_type: std::any::type_name::<Q>(),
309 span_id,
310 deps: RefCell::new(Vec::new()),
311 };
312
313 let result = query.query(&mut ctx);
315
316 let deps: Vec<FullCacheKey> = ctx.deps.borrow().clone();
318
319 let durability =
321 Durability::new(query.durability() as usize).unwrap_or(Durability::volatile());
322
323 match result {
324 Ok(output) => {
325 let output = Arc::new(output);
326
327 let output_changed =
329 if let Some(CachedValue::Ok(old)) = self.cache.get_cached::<Q>(full_key) {
330 !Q::output_eq(&old, &output)
331 } else {
332 true };
334
335 self.emit(|| FlowEvent::EarlyCutoffCheck {
337 span_id,
338 query: query_flow_inspector::QueryKey::new(
339 std::any::type_name::<Q>(),
340 full_key.debug_repr(),
341 ),
342 output_changed,
343 });
344
345 self.cache.insert_ok::<Q>(full_key.clone(), output.clone());
347
348 if output_changed {
350 let _ = self.whale.register(full_key.clone(), (), durability, deps);
351 } else {
352 let _ = self.whale.confirm_unchanged(full_key, deps);
353 }
354
355 let is_new_query = self.query_registry.register(query);
357 if is_new_query {
358 let sentinel = FullCacheKey::query_set_sentinel::<Q>();
359 let _ = self
360 .whale
361 .register(sentinel, (), Durability::volatile(), vec![]);
362 }
363
364 Ok((output, output_changed))
365 }
366 Err(QueryError::UserError(err)) => {
367 let output_changed = if let Some(CachedValue::UserError(old_err)) =
369 self.cache.get_cached::<Q>(full_key)
370 {
371 !(self.error_comparator)(old_err.as_ref(), err.as_ref())
372 } else {
373 true };
375
376 self.emit(|| FlowEvent::EarlyCutoffCheck {
378 span_id,
379 query: query_flow_inspector::QueryKey::new(
380 std::any::type_name::<Q>(),
381 full_key.debug_repr(),
382 ),
383 output_changed,
384 });
385
386 self.cache.insert_error(full_key.clone(), err.clone());
388
389 if output_changed {
391 let _ = self.whale.register(full_key.clone(), (), durability, deps);
392 } else {
393 let _ = self.whale.confirm_unchanged(full_key, deps);
394 }
395
396 let is_new_query = self.query_registry.register(query);
398 if is_new_query {
399 let sentinel = FullCacheKey::query_set_sentinel::<Q>();
400 let _ = self
401 .whale
402 .register(sentinel, (), Durability::volatile(), vec![]);
403 }
404
405 Err(QueryError::UserError(err))
406 }
407 Err(e) => {
408 Err(e)
410 }
411 }
412 }
413
414 #[cfg(not(feature = "inspector"))]
418 fn execute_query<Q: Query>(
419 &self,
420 query: &Q,
421 full_key: &FullCacheKey,
422 ) -> Result<(Arc<Q::Output>, bool), QueryError> {
423 let mut ctx = QueryContext {
425 runtime: self,
426 current_key: full_key.clone(),
427 deps: RefCell::new(Vec::new()),
428 };
429
430 let result = query.query(&mut ctx);
432
433 let deps: Vec<FullCacheKey> = ctx.deps.borrow().clone();
435
436 let durability =
438 Durability::new(query.durability() as usize).unwrap_or(Durability::volatile());
439
440 match result {
441 Ok(output) => {
442 let output = Arc::new(output);
443
444 let output_changed =
446 if let Some(CachedValue::Ok(old)) = self.cache.get_cached::<Q>(full_key) {
447 !Q::output_eq(&old, &output)
448 } else {
449 true };
451
452 self.cache.insert_ok::<Q>(full_key.clone(), output.clone());
454
455 if output_changed {
457 let _ = self.whale.register(full_key.clone(), (), durability, deps);
458 } else {
459 let _ = self.whale.confirm_unchanged(full_key, deps);
460 }
461
462 let is_new_query = self.query_registry.register(query);
464 if is_new_query {
465 let sentinel = FullCacheKey::query_set_sentinel::<Q>();
466 let _ = self
467 .whale
468 .register(sentinel, (), Durability::volatile(), vec![]);
469 }
470
471 Ok((output, output_changed))
472 }
473 Err(QueryError::UserError(err)) => {
474 let output_changed = if let Some(CachedValue::UserError(old_err)) =
476 self.cache.get_cached::<Q>(full_key)
477 {
478 !(self.error_comparator)(old_err.as_ref(), err.as_ref())
479 } else {
480 true };
482
483 self.cache.insert_error(full_key.clone(), err.clone());
485
486 if output_changed {
488 let _ = self.whale.register(full_key.clone(), (), durability, deps);
489 } else {
490 let _ = self.whale.confirm_unchanged(full_key, deps);
491 }
492
493 let is_new_query = self.query_registry.register(query);
495 if is_new_query {
496 let sentinel = FullCacheKey::query_set_sentinel::<Q>();
497 let _ = self
498 .whale
499 .register(sentinel, (), Durability::volatile(), vec![]);
500 }
501
502 Err(QueryError::UserError(err))
503 }
504 Err(e) => {
505 Err(e)
507 }
508 }
509 }
510
511 fn get_cached_if_valid<Q: Query>(
513 &self,
514 full_key: &FullCacheKey,
515 ) -> Option<CachedValue<Arc<Q::Output>>> {
516 if !self.whale.is_valid(full_key) {
518 return None;
519 }
520
521 self.cache.get_cached::<Q>(full_key)
523 }
524
525 pub fn invalidate<Q: Query>(&self, key: &Q::CacheKey) {
529 let full_key = FullCacheKey::new::<Q, _>(key);
530
531 #[cfg(feature = "inspector")]
532 self.emit(|| FlowEvent::QueryInvalidated {
533 query: query_flow_inspector::QueryKey::new(
534 std::any::type_name::<Q>(),
535 full_key.debug_repr(),
536 ),
537 reason: query_flow_inspector::InvalidationReason::ManualInvalidation,
538 });
539
540 self.cache.remove(&full_key);
542
543 let _ = self
545 .whale
546 .register(full_key, (), Durability::volatile(), vec![]);
547 }
548
549 pub fn clear_cache(&self) {
551 self.cache.clear();
552 }
553}
554
555pub struct QueryRuntimeBuilder {
573 error_comparator: ErrorComparator,
574}
575
576impl Default for QueryRuntimeBuilder {
577 fn default() -> Self {
578 Self::new()
579 }
580}
581
582impl QueryRuntimeBuilder {
583 pub fn new() -> Self {
585 Self {
586 error_comparator: default_error_comparator,
587 }
588 }
589
590 pub fn error_comparator(mut self, f: ErrorComparator) -> Self {
608 self.error_comparator = f;
609 self
610 }
611
612 pub fn build(self) -> QueryRuntime {
614 QueryRuntime {
615 whale: WhaleRuntime::new(),
616 cache: Arc::new(CacheStorage::new()),
617 assets: Arc::new(AssetStorage::new()),
618 locators: Arc::new(LocatorStorage::new()),
619 pending: Arc::new(PendingStorage::new()),
620 query_registry: Arc::new(QueryRegistry::new()),
621 asset_key_registry: Arc::new(AssetKeyRegistry::new()),
622 error_comparator: self.error_comparator,
623 #[cfg(feature = "inspector")]
624 sink: Arc::new(parking_lot::RwLock::new(None)),
625 }
626 }
627}
628
629impl QueryRuntime {
634 pub fn register_asset_locator<K, L>(&self, locator: L)
646 where
647 K: AssetKey,
648 L: AssetLocator<K>,
649 {
650 self.locators.insert::<K, L>(locator);
651 }
652
653 pub fn pending_assets(&self) -> Vec<PendingAsset> {
669 self.pending.get_all()
670 }
671
672 pub fn pending_assets_of<K: AssetKey>(&self) -> Vec<K> {
674 self.pending.get_of_type::<K>()
675 }
676
677 pub fn has_pending_assets(&self) -> bool {
679 !self.pending.is_empty()
680 }
681
682 pub fn resolve_asset<K: AssetKey>(&self, key: K, value: K::Asset) {
699 let durability = key.durability();
700 self.resolve_asset_internal(key, value, durability);
701 }
702
703 pub fn resolve_asset_with_durability<K: AssetKey>(
707 &self,
708 key: K,
709 value: K::Asset,
710 durability: DurabilityLevel,
711 ) {
712 self.resolve_asset_internal(key, value, durability);
713 }
714
715 fn resolve_asset_internal<K: AssetKey>(
716 &self,
717 key: K,
718 value: K::Asset,
719 durability_level: DurabilityLevel,
720 ) {
721 let full_asset_key = FullAssetKey::new(&key);
722 let full_cache_key = FullCacheKey::from_asset_key(&full_asset_key);
723
724 let changed = if let Some(old_value) = self.assets.get_ready::<K>(&full_asset_key) {
726 !K::asset_eq(&old_value, &value)
727 } else {
728 true };
730
731 #[cfg(feature = "inspector")]
733 self.emit(|| FlowEvent::AssetResolved {
734 asset: query_flow_inspector::AssetKey::new(
735 std::any::type_name::<K>(),
736 format!("{:?}", key),
737 ),
738 changed,
739 });
740
741 self.assets
743 .insert_ready::<K>(full_asset_key.clone(), Arc::new(value));
744
745 self.pending.remove(&full_asset_key);
747
748 let durability =
750 Durability::new(durability_level.as_u8() as usize).unwrap_or(Durability::volatile());
751
752 if changed {
753 let _ = self.whale.register(full_cache_key, (), durability, vec![]);
755 } else {
756 let _ = self.whale.confirm_unchanged(&full_cache_key, vec![]);
758 }
759
760 let is_new_asset = self.asset_key_registry.register(&key);
762 if is_new_asset {
763 let sentinel = FullCacheKey::asset_key_set_sentinel::<K>();
765 let _ = self
766 .whale
767 .register(sentinel, (), Durability::volatile(), vec![]);
768 }
769 }
770
771 pub fn invalidate_asset<K: AssetKey>(&self, key: &K) {
785 let full_asset_key = FullAssetKey::new(key);
786 let full_cache_key = FullCacheKey::from_asset_key(&full_asset_key);
787
788 #[cfg(feature = "inspector")]
790 self.emit(|| FlowEvent::AssetInvalidated {
791 asset: query_flow_inspector::AssetKey::new(
792 std::any::type_name::<K>(),
793 format!("{:?}", key),
794 ),
795 });
796
797 self.assets
799 .insert(full_asset_key.clone(), AssetState::Loading);
800
801 self.pending.insert::<K>(full_asset_key, key.clone());
803
804 let _ = self
806 .whale
807 .register(full_cache_key, (), Durability::volatile(), vec![]);
808 }
809
810 pub fn remove_asset<K: AssetKey>(&self, key: &K) {
815 let full_asset_key = FullAssetKey::new(key);
816 let full_cache_key = FullCacheKey::from_asset_key(&full_asset_key);
817
818 let _ = self
821 .whale
822 .register(full_cache_key.clone(), (), Durability::volatile(), vec![]);
823
824 self.assets.remove(&full_asset_key);
826 self.pending.remove(&full_asset_key);
827
828 self.whale.remove(&full_cache_key);
830
831 if self.asset_key_registry.remove::<K>(key) {
833 let sentinel = FullCacheKey::asset_key_set_sentinel::<K>();
834 let _ = self
835 .whale
836 .register(sentinel, (), Durability::volatile(), vec![]);
837 }
838 }
839
840 fn get_asset_internal<K: AssetKey>(
842 &self,
843 key: &K,
844 ) -> Result<LoadingState<Arc<K::Asset>>, QueryError> {
845 let full_asset_key = FullAssetKey::new(key);
846 let full_cache_key = FullCacheKey::from_asset_key(&full_asset_key);
847
848 #[cfg(feature = "inspector")]
850 let emit_requested = |runtime: &Self, state: query_flow_inspector::AssetState| {
851 runtime.emit(|| FlowEvent::AssetRequested {
852 asset: query_flow_inspector::AssetKey::new(
853 std::any::type_name::<K>(),
854 format!("{:?}", key),
855 ),
856 state,
857 });
858 };
859
860 if let Some(state) = self.assets.get(&full_asset_key) {
862 if self.whale.is_valid(&full_cache_key) {
864 return match state {
865 AssetState::Loading => {
866 #[cfg(feature = "inspector")]
867 emit_requested(self, query_flow_inspector::AssetState::Loading);
868 Ok(LoadingState::Loading)
869 }
870 AssetState::Ready(arc) => {
871 #[cfg(feature = "inspector")]
872 emit_requested(self, query_flow_inspector::AssetState::Ready);
873 match arc.downcast::<K::Asset>() {
874 Ok(value) => Ok(LoadingState::Ready(value)),
875 Err(_) => Err(QueryError::MissingDependency {
876 description: format!("Asset type mismatch: {:?}", key),
877 }),
878 }
879 }
880 AssetState::NotFound => {
881 #[cfg(feature = "inspector")]
882 emit_requested(self, query_flow_inspector::AssetState::NotFound);
883 Err(QueryError::MissingDependency {
884 description: format!("Asset not found: {:?}", key),
885 })
886 }
887 };
888 }
889 }
890
891 if let Some(locator) = self.locators.get(TypeId::of::<K>()) {
893 if let Some(state) = locator.locate_any(key) {
894 self.assets.insert(full_asset_key.clone(), state.clone());
896
897 match state {
898 AssetState::Ready(arc) => {
899 #[cfg(feature = "inspector")]
900 emit_requested(self, query_flow_inspector::AssetState::Ready);
901
902 let durability = Durability::new(key.durability().as_u8() as usize)
904 .unwrap_or(Durability::volatile());
905 let _ = self.whale.register(full_cache_key, (), durability, vec![]);
906
907 match arc.downcast::<K::Asset>() {
908 Ok(value) => return Ok(LoadingState::Ready(value)),
909 Err(_) => {
910 return Err(QueryError::MissingDependency {
911 description: format!("Asset type mismatch: {:?}", key),
912 })
913 }
914 }
915 }
916 AssetState::Loading => {
917 #[cfg(feature = "inspector")]
918 emit_requested(self, query_flow_inspector::AssetState::Loading);
919 self.pending.insert::<K>(full_asset_key, key.clone());
920 return Ok(LoadingState::Loading);
921 }
922 AssetState::NotFound => {
923 #[cfg(feature = "inspector")]
924 emit_requested(self, query_flow_inspector::AssetState::NotFound);
925 return Err(QueryError::MissingDependency {
926 description: format!("Asset not found: {:?}", key),
927 });
928 }
929 }
930 }
931 }
932
933 #[cfg(feature = "inspector")]
935 emit_requested(self, query_flow_inspector::AssetState::Loading);
936 self.assets
937 .insert(full_asset_key.clone(), AssetState::Loading);
938 self.pending.insert::<K>(full_asset_key, key.clone());
939 Ok(LoadingState::Loading)
940 }
941}
942
943#[cfg(feature = "inspector")]
947pub struct QueryContext<'a> {
948 runtime: &'a QueryRuntime,
949 current_key: FullCacheKey,
950 parent_query_type: &'static str,
951 span_id: SpanId,
952 deps: RefCell<Vec<FullCacheKey>>,
953}
954
955#[cfg(not(feature = "inspector"))]
959pub struct QueryContext<'a> {
960 runtime: &'a QueryRuntime,
961 #[allow(dead_code)]
962 current_key: FullCacheKey,
963 deps: RefCell<Vec<FullCacheKey>>,
964}
965
966impl<'a> QueryContext<'a> {
967 pub fn query<Q: Query>(&mut self, query: Q) -> Result<Arc<Q::Output>, QueryError> {
980 let key = query.cache_key();
981 let full_key = FullCacheKey::new::<Q, _>(&key);
982
983 #[cfg(feature = "inspector")]
985 self.runtime.emit(|| FlowEvent::DependencyRegistered {
986 span_id: self.span_id,
987 parent: query_flow_inspector::QueryKey::new(
988 self.parent_query_type,
989 self.current_key.debug_repr(),
990 ),
991 dependency: query_flow_inspector::QueryKey::new(
992 std::any::type_name::<Q>(),
993 full_key.debug_repr(),
994 ),
995 });
996
997 self.deps.borrow_mut().push(full_key.clone());
999
1000 self.runtime.query(query)
1002 }
1003
1004 pub fn asset<K: AssetKey>(
1025 &mut self,
1026 key: &K,
1027 ) -> Result<LoadingState<Arc<K::Asset>>, QueryError> {
1028 let full_asset_key = FullAssetKey::new(key);
1029 let full_cache_key = FullCacheKey::from_asset_key(&full_asset_key);
1030
1031 #[cfg(feature = "inspector")]
1033 self.runtime.emit(|| FlowEvent::AssetDependencyRegistered {
1034 span_id: self.span_id,
1035 parent: query_flow_inspector::QueryKey::new(
1036 self.parent_query_type,
1037 self.current_key.debug_repr(),
1038 ),
1039 asset: query_flow_inspector::AssetKey::new(
1040 std::any::type_name::<K>(),
1041 format!("{:?}", key),
1042 ),
1043 });
1044
1045 self.deps.borrow_mut().push(full_cache_key);
1047
1048 let result = self.runtime.get_asset_internal(key);
1050
1051 #[cfg(feature = "inspector")]
1053 if let Err(QueryError::MissingDependency { ref description }) = result {
1054 self.runtime.emit(|| FlowEvent::MissingDependency {
1055 query: query_flow_inspector::QueryKey::new(
1056 self.parent_query_type,
1057 self.current_key.debug_repr(),
1058 ),
1059 dependency_description: description.clone(),
1060 });
1061 }
1062
1063 result
1064 }
1065
1066 pub fn list_queries<Q: Query>(&mut self) -> Vec<Q> {
1089 let sentinel = FullCacheKey::query_set_sentinel::<Q>();
1091
1092 #[cfg(feature = "inspector")]
1093 self.runtime.emit(|| FlowEvent::DependencyRegistered {
1094 span_id: self.span_id,
1095 parent: query_flow_inspector::QueryKey::new(
1096 self.parent_query_type,
1097 self.current_key.debug_repr(),
1098 ),
1099 dependency: query_flow_inspector::QueryKey::new("QuerySet", sentinel.debug_repr()),
1100 });
1101
1102 self.deps.borrow_mut().push(sentinel);
1103
1104 self.runtime.query_registry.get_all::<Q>()
1106 }
1107
1108 pub fn list_asset_keys<K: AssetKey>(&mut self) -> Vec<K> {
1133 let sentinel = FullCacheKey::asset_key_set_sentinel::<K>();
1135
1136 #[cfg(feature = "inspector")]
1137 self.runtime.emit(|| FlowEvent::AssetDependencyRegistered {
1138 span_id: self.span_id,
1139 parent: query_flow_inspector::QueryKey::new(
1140 self.parent_query_type,
1141 self.current_key.debug_repr(),
1142 ),
1143 asset: query_flow_inspector::AssetKey::new("AssetKeySet", sentinel.debug_repr()),
1144 });
1145
1146 self.deps.borrow_mut().push(sentinel);
1147
1148 self.runtime.asset_key_registry.get_all::<K>()
1150 }
1151}
1152
1153#[cfg(test)]
1154mod tests {
1155 use super::*;
1156
1157 #[test]
1158 fn test_simple_query() {
1159 #[derive(Clone)]
1160 struct Add {
1161 a: i32,
1162 b: i32,
1163 }
1164
1165 impl Query for Add {
1166 type CacheKey = (i32, i32);
1167 type Output = i32;
1168
1169 fn cache_key(&self) -> Self::CacheKey {
1170 (self.a, self.b)
1171 }
1172
1173 fn query(&self, _ctx: &mut QueryContext) -> Result<Self::Output, QueryError> {
1174 Ok(self.a + self.b)
1175 }
1176
1177 fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
1178 old == new
1179 }
1180 }
1181
1182 let runtime = QueryRuntime::new();
1183
1184 let result = runtime.query(Add { a: 1, b: 2 }).unwrap();
1185 assert_eq!(*result, 3);
1186
1187 let result2 = runtime.query(Add { a: 1, b: 2 }).unwrap();
1189 assert_eq!(*result2, 3);
1190 }
1191
1192 #[test]
1193 fn test_dependent_queries() {
1194 #[derive(Clone)]
1195 struct Base {
1196 value: i32,
1197 }
1198
1199 impl Query for Base {
1200 type CacheKey = i32;
1201 type Output = i32;
1202
1203 fn cache_key(&self) -> Self::CacheKey {
1204 self.value
1205 }
1206
1207 fn query(&self, _ctx: &mut QueryContext) -> Result<Self::Output, QueryError> {
1208 Ok(self.value * 2)
1209 }
1210
1211 fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
1212 old == new
1213 }
1214 }
1215
1216 #[derive(Clone)]
1217 struct Derived {
1218 base_value: i32,
1219 }
1220
1221 impl Query for Derived {
1222 type CacheKey = i32;
1223 type Output = i32;
1224
1225 fn cache_key(&self) -> Self::CacheKey {
1226 self.base_value
1227 }
1228
1229 fn query(&self, ctx: &mut QueryContext) -> Result<Self::Output, QueryError> {
1230 let base = ctx.query(Base {
1231 value: self.base_value,
1232 })?;
1233 Ok(*base + 10)
1234 }
1235
1236 fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
1237 old == new
1238 }
1239 }
1240
1241 let runtime = QueryRuntime::new();
1242
1243 let result = runtime.query(Derived { base_value: 5 }).unwrap();
1244 assert_eq!(*result, 20); }
1246
1247 #[test]
1248 fn test_cycle_detection() {
1249 #[derive(Clone)]
1250 struct CycleA {
1251 id: i32,
1252 }
1253
1254 #[derive(Clone)]
1255 struct CycleB {
1256 id: i32,
1257 }
1258
1259 impl Query for CycleA {
1260 type CacheKey = i32;
1261 type Output = i32;
1262
1263 fn cache_key(&self) -> Self::CacheKey {
1264 self.id
1265 }
1266
1267 fn query(&self, ctx: &mut QueryContext) -> Result<Self::Output, QueryError> {
1268 let b = ctx.query(CycleB { id: self.id })?;
1269 Ok(*b + 1)
1270 }
1271
1272 fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
1273 old == new
1274 }
1275 }
1276
1277 impl Query for CycleB {
1278 type CacheKey = i32;
1279 type Output = i32;
1280
1281 fn cache_key(&self) -> Self::CacheKey {
1282 self.id
1283 }
1284
1285 fn query(&self, ctx: &mut QueryContext) -> Result<Self::Output, QueryError> {
1286 let a = ctx.query(CycleA { id: self.id })?;
1287 Ok(*a + 1)
1288 }
1289
1290 fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
1291 old == new
1292 }
1293 }
1294
1295 let runtime = QueryRuntime::new();
1296
1297 let result = runtime.query(CycleA { id: 1 });
1298 assert!(matches!(result, Err(QueryError::Cycle { .. })));
1299 }
1300
1301 #[test]
1302 fn test_fallible_query() {
1303 #[derive(Clone)]
1304 struct ParseInt {
1305 input: String,
1306 }
1307
1308 impl Query for ParseInt {
1309 type CacheKey = String;
1310 type Output = Result<i32, std::num::ParseIntError>;
1311
1312 fn cache_key(&self) -> Self::CacheKey {
1313 self.input.clone()
1314 }
1315
1316 fn query(&self, _ctx: &mut QueryContext) -> Result<Self::Output, QueryError> {
1317 Ok(self.input.parse())
1318 }
1319
1320 fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
1321 old == new
1322 }
1323 }
1324
1325 let runtime = QueryRuntime::new();
1326
1327 let result = runtime
1329 .query(ParseInt {
1330 input: "42".to_string(),
1331 })
1332 .unwrap();
1333 assert_eq!(*result, Ok(42));
1334
1335 let result = runtime
1337 .query(ParseInt {
1338 input: "not_a_number".to_string(),
1339 })
1340 .unwrap();
1341 assert!(result.is_err());
1342 }
1343
1344 mod macro_tests {
1346 use super::*;
1347 use crate::query;
1348
1349 #[query]
1350 fn add(ctx: &mut QueryContext, a: i32, b: i32) -> Result<i32, QueryError> {
1351 let _ = ctx; Ok(a + b)
1353 }
1354
1355 #[test]
1356 fn test_macro_basic() {
1357 let runtime = QueryRuntime::new();
1358 let result = runtime.query(Add::new(1, 2)).unwrap();
1359 assert_eq!(*result, 3);
1360 }
1361
1362 #[query(durability = 2)]
1363 fn with_durability(ctx: &mut QueryContext, x: i32) -> Result<i32, QueryError> {
1364 let _ = ctx;
1365 Ok(x * 2)
1366 }
1367
1368 #[test]
1369 fn test_macro_durability() {
1370 let runtime = QueryRuntime::new();
1371 let result = runtime.query(WithDurability::new(5)).unwrap();
1372 assert_eq!(*result, 10);
1373 }
1374
1375 #[query(keys(id))]
1376 fn with_key_selection(
1377 ctx: &mut QueryContext,
1378 id: u32,
1379 include_extra: bool,
1380 ) -> Result<String, QueryError> {
1381 let _ = ctx;
1382 Ok(format!("id={}, extra={}", id, include_extra))
1383 }
1384
1385 #[test]
1386 fn test_macro_key_selection() {
1387 let runtime = QueryRuntime::new();
1388
1389 let r1 = runtime.query(WithKeySelection::new(1, true)).unwrap();
1391 let r2 = runtime.query(WithKeySelection::new(1, false)).unwrap();
1392
1393 assert_eq!(*r1, "id=1, extra=true");
1395 assert_eq!(*r2, "id=1, extra=true"); }
1397
1398 #[query]
1399 fn dependent(ctx: &mut QueryContext, a: i32, b: i32) -> Result<i32, QueryError> {
1400 let sum = ctx.query(Add::new(*a, *b))?;
1401 Ok(*sum * 2)
1402 }
1403
1404 #[test]
1405 fn test_macro_dependencies() {
1406 let runtime = QueryRuntime::new();
1407 let result = runtime.query(Dependent::new(3, 4)).unwrap();
1408 assert_eq!(*result, 14); }
1410
1411 #[query(output_eq)]
1412 fn with_output_eq(ctx: &mut QueryContext, x: i32) -> Result<i32, QueryError> {
1413 let _ = ctx;
1414 Ok(*x * 2)
1415 }
1416
1417 #[test]
1418 fn test_macro_output_eq() {
1419 let runtime = QueryRuntime::new();
1420 let result = runtime.query(WithOutputEq::new(5)).unwrap();
1421 assert_eq!(*result, 10);
1422 }
1423
1424 #[query(name = "CustomName")]
1425 fn original_name(ctx: &mut QueryContext, x: i32) -> Result<i32, QueryError> {
1426 let _ = ctx;
1427 Ok(*x)
1428 }
1429
1430 #[test]
1431 fn test_macro_custom_name() {
1432 let runtime = QueryRuntime::new();
1433 let result = runtime.query(CustomName::new(42)).unwrap();
1434 assert_eq!(*result, 42);
1435 }
1436
1437 #[allow(unused_variables)]
1441 #[inline]
1442 #[query]
1443 fn with_attributes(ctx: &mut QueryContext, x: i32) -> Result<i32, QueryError> {
1444 let unused_var = 42;
1446 Ok(*x * 2)
1447 }
1448
1449 #[test]
1450 fn test_macro_preserves_attributes() {
1451 let runtime = QueryRuntime::new();
1452 let result = runtime.query(WithAttributes::new(5)).unwrap();
1454 assert_eq!(*result, 10);
1455 }
1456 }
1457}