1use std::any::TypeId;
4use std::cell::RefCell;
5use std::sync::Arc;
6
7use whale::{Durability, Runtime as WhaleRuntime};
8
9use crate::asset::{AssetKey, AssetLocator, DurabilityLevel, FullAssetKey, PendingAsset};
10use crate::key::FullCacheKey;
11use crate::loading::LoadingState;
12use crate::query::Query;
13use crate::storage::{
14 AssetKeyRegistry, AssetState, AssetStorage, CacheStorage, CachedValue, LocatorStorage,
15 PendingStorage, QueryRegistry, VerifierStorage,
16};
17use crate::QueryError;
18
19pub type ErrorComparator = fn(&anyhow::Error, &anyhow::Error) -> bool;
24
25#[cfg(feature = "inspector")]
26use query_flow_inspector::{EventSink, FlowEvent, SpanId};
27
28const DURABILITY_LEVELS: usize = 4;
30
31thread_local! {
33 static QUERY_STACK: RefCell<Vec<FullCacheKey>> = const { RefCell::new(Vec::new()) };
34}
35
36pub struct QueryRuntime {
52 whale: WhaleRuntime<FullCacheKey, (), DURABILITY_LEVELS>,
54 cache: Arc<CacheStorage>,
56 assets: Arc<AssetStorage>,
58 locators: Arc<LocatorStorage>,
60 pending: Arc<PendingStorage>,
62 query_registry: Arc<QueryRegistry>,
64 asset_key_registry: Arc<AssetKeyRegistry>,
66 verifiers: Arc<VerifierStorage>,
68 error_comparator: ErrorComparator,
70 #[cfg(feature = "inspector")]
72 sink: Arc<parking_lot::RwLock<Option<Arc<dyn EventSink>>>>,
73}
74
75impl Default for QueryRuntime {
76 fn default() -> Self {
77 Self::new()
78 }
79}
80
81impl Clone for QueryRuntime {
82 fn clone(&self) -> Self {
83 Self {
84 whale: self.whale.clone(),
85 cache: self.cache.clone(),
86 assets: self.assets.clone(),
87 locators: self.locators.clone(),
88 pending: self.pending.clone(),
89 query_registry: self.query_registry.clone(),
90 asset_key_registry: self.asset_key_registry.clone(),
91 verifiers: self.verifiers.clone(),
92 error_comparator: self.error_comparator,
93 #[cfg(feature = "inspector")]
94 sink: self.sink.clone(),
95 }
96 }
97}
98
99fn default_error_comparator(_a: &anyhow::Error, _b: &anyhow::Error) -> bool {
103 false
104}
105
106impl QueryRuntime {
107 pub fn new() -> Self {
109 Self::builder().build()
110 }
111
112 pub fn builder() -> QueryRuntimeBuilder {
128 QueryRuntimeBuilder::new()
129 }
130
131 #[cfg(feature = "inspector")]
147 pub fn set_sink(&self, sink: Option<Arc<dyn EventSink>>) {
148 *self.sink.write() = sink;
149 }
150
151 #[cfg(feature = "inspector")]
153 pub fn sink(&self) -> Option<Arc<dyn EventSink>> {
154 self.sink.read().clone()
155 }
156
157 #[cfg(feature = "inspector")]
159 #[inline]
160 fn emit<F: FnOnce() -> FlowEvent>(&self, event: F) {
161 let guard = self.sink.read();
162 if let Some(ref sink) = *guard {
163 sink.emit(event());
164 }
165 }
166
167 pub fn query<Q: Query>(&self, query: Q) -> Result<Arc<Q::Output>, QueryError> {
176 let key = query.cache_key();
177 let full_key = FullCacheKey::new::<Q, _>(&key);
178
179 #[cfg(feature = "inspector")]
181 let span_id = query_flow_inspector::new_span_id();
182 #[cfg(feature = "inspector")]
183 let start_time = std::time::Instant::now();
184 #[cfg(feature = "inspector")]
185 let query_key =
186 query_flow_inspector::QueryKey::new(std::any::type_name::<Q>(), full_key.debug_repr());
187
188 #[cfg(feature = "inspector")]
189 self.emit(|| FlowEvent::QueryStart {
190 span_id,
191 query: query_key.clone(),
192 });
193
194 let cycle_detected = QUERY_STACK.with(|stack| {
196 let stack = stack.borrow();
197 stack.iter().any(|k| k == &full_key)
198 });
199
200 if cycle_detected {
201 let path = QUERY_STACK.with(|stack| {
202 let stack = stack.borrow();
203 let mut path: Vec<String> =
204 stack.iter().map(|k| k.debug_repr().to_string()).collect();
205 path.push(full_key.debug_repr().to_string());
206 path
207 });
208
209 #[cfg(feature = "inspector")]
210 self.emit(|| FlowEvent::CycleDetected {
211 path: path
212 .iter()
213 .map(|s| query_flow_inspector::QueryKey::new("", s.clone()))
214 .collect(),
215 });
216
217 #[cfg(feature = "inspector")]
218 self.emit(|| FlowEvent::QueryEnd {
219 span_id,
220 query: query_key.clone(),
221 result: query_flow_inspector::ExecutionResult::CycleDetected,
222 duration: start_time.elapsed(),
223 });
224
225 return Err(QueryError::Cycle { path });
226 }
227
228 let current_rev = self.whale.current_revision();
230
231 if self.whale.is_verified_at(&full_key, ¤t_rev) {
233 if let Some(cached) = self.cache.get_cached::<Q>(&full_key) {
234 #[cfg(feature = "inspector")]
235 self.emit(|| FlowEvent::CacheCheck {
236 span_id,
237 query: query_key.clone(),
238 valid: true,
239 });
240
241 #[cfg(feature = "inspector")]
242 self.emit(|| FlowEvent::QueryEnd {
243 span_id,
244 query: query_key.clone(),
245 result: query_flow_inspector::ExecutionResult::CacheHit,
246 duration: start_time.elapsed(),
247 });
248
249 return match cached {
250 CachedValue::Ok(output) => Ok(output),
251 CachedValue::UserError(err) => Err(QueryError::UserError(err)),
252 };
253 }
254 }
255
256 if self.whale.is_valid(&full_key) {
258 if let Some(cached) = self.cache.get_cached::<Q>(&full_key) {
259 let mut deps_verified = true;
261 if let Some(deps) = self.whale.get_dependency_ids(&full_key) {
262 for dep in deps {
263 if let Some(verifier) = self.verifiers.get(&dep) {
264 if verifier.verify(self).is_err() {
266 deps_verified = false;
267 break;
268 }
269 }
270 }
271 }
272
273 if deps_verified && self.whale.is_valid(&full_key) {
275 self.whale.mark_verified(&full_key, ¤t_rev);
277
278 #[cfg(feature = "inspector")]
279 self.emit(|| FlowEvent::CacheCheck {
280 span_id,
281 query: query_key.clone(),
282 valid: true,
283 });
284
285 #[cfg(feature = "inspector")]
286 self.emit(|| FlowEvent::QueryEnd {
287 span_id,
288 query: query_key.clone(),
289 result: query_flow_inspector::ExecutionResult::CacheHit,
290 duration: start_time.elapsed(),
291 });
292
293 return match cached {
294 CachedValue::Ok(output) => Ok(output),
295 CachedValue::UserError(err) => Err(QueryError::UserError(err)),
296 };
297 }
298 }
300 }
301
302 #[cfg(feature = "inspector")]
303 self.emit(|| FlowEvent::CacheCheck {
304 span_id,
305 query: query_key.clone(),
306 valid: false,
307 });
308
309 QUERY_STACK.with(|stack| {
311 stack.borrow_mut().push(full_key.clone());
312 });
313
314 #[cfg(feature = "inspector")]
315 let result = self.execute_query::<Q>(&query, &full_key, span_id);
316 #[cfg(not(feature = "inspector"))]
317 let result = self.execute_query::<Q>(&query, &full_key);
318
319 QUERY_STACK.with(|stack| {
320 stack.borrow_mut().pop();
321 });
322
323 #[cfg(feature = "inspector")]
325 {
326 let exec_result = match &result {
327 Ok((_, true)) => query_flow_inspector::ExecutionResult::Changed,
328 Ok((_, false)) => query_flow_inspector::ExecutionResult::Unchanged,
329 Err(QueryError::Suspend) => query_flow_inspector::ExecutionResult::Suspended,
330 Err(QueryError::Cycle { .. }) => {
331 query_flow_inspector::ExecutionResult::CycleDetected
332 }
333 Err(e) => query_flow_inspector::ExecutionResult::Error {
334 message: format!("{:?}", e),
335 },
336 };
337 self.emit(|| FlowEvent::QueryEnd {
338 span_id,
339 query: query_key.clone(),
340 result: exec_result,
341 duration: start_time.elapsed(),
342 });
343 }
344
345 result.map(|(output, _)| output)
346 }
347
348 #[cfg(feature = "inspector")]
352 fn execute_query<Q: Query>(
353 &self,
354 query: &Q,
355 full_key: &FullCacheKey,
356 span_id: SpanId,
357 ) -> Result<(Arc<Q::Output>, bool), QueryError> {
358 let mut ctx = QueryContext {
360 runtime: self,
361 current_key: full_key.clone(),
362 parent_query_type: std::any::type_name::<Q>(),
363 span_id,
364 deps: RefCell::new(Vec::new()),
365 };
366
367 let result = query.query(&mut ctx);
369
370 let deps: Vec<FullCacheKey> = ctx.deps.borrow().clone();
372
373 let durability =
375 Durability::new(query.durability() as usize).unwrap_or(Durability::volatile());
376
377 match result {
378 Ok(output) => {
379 let output = Arc::new(output);
380
381 let output_changed =
383 if let Some(CachedValue::Ok(old)) = self.cache.get_cached::<Q>(full_key) {
384 !Q::output_eq(&old, &output)
385 } else {
386 true };
388
389 self.emit(|| FlowEvent::EarlyCutoffCheck {
391 span_id,
392 query: query_flow_inspector::QueryKey::new(
393 std::any::type_name::<Q>(),
394 full_key.debug_repr(),
395 ),
396 output_changed,
397 });
398
399 self.cache.insert_ok::<Q>(full_key.clone(), output.clone());
401
402 if output_changed {
404 let _ = self.whale.register(full_key.clone(), (), durability, deps);
405 } else {
406 let _ = self.whale.confirm_unchanged(full_key, deps);
407 }
408
409 let is_new_query = self.query_registry.register(query);
411 if is_new_query {
412 let sentinel = FullCacheKey::query_set_sentinel::<Q>();
413 let _ = self
414 .whale
415 .register(sentinel, (), Durability::volatile(), vec![]);
416 }
417
418 self.verifiers.insert(full_key.clone(), query.clone());
420
421 Ok((output, output_changed))
422 }
423 Err(QueryError::UserError(err)) => {
424 let output_changed = if let Some(CachedValue::UserError(old_err)) =
426 self.cache.get_cached::<Q>(full_key)
427 {
428 !(self.error_comparator)(old_err.as_ref(), err.as_ref())
429 } else {
430 true };
432
433 self.emit(|| FlowEvent::EarlyCutoffCheck {
435 span_id,
436 query: query_flow_inspector::QueryKey::new(
437 std::any::type_name::<Q>(),
438 full_key.debug_repr(),
439 ),
440 output_changed,
441 });
442
443 self.cache.insert_error(full_key.clone(), err.clone());
445
446 if output_changed {
448 let _ = self.whale.register(full_key.clone(), (), durability, deps);
449 } else {
450 let _ = self.whale.confirm_unchanged(full_key, deps);
451 }
452
453 let is_new_query = self.query_registry.register(query);
455 if is_new_query {
456 let sentinel = FullCacheKey::query_set_sentinel::<Q>();
457 let _ = self
458 .whale
459 .register(sentinel, (), Durability::volatile(), vec![]);
460 }
461
462 self.verifiers.insert(full_key.clone(), query.clone());
464
465 Err(QueryError::UserError(err))
466 }
467 Err(e) => {
468 Err(e)
470 }
471 }
472 }
473
474 #[cfg(not(feature = "inspector"))]
478 fn execute_query<Q: Query>(
479 &self,
480 query: &Q,
481 full_key: &FullCacheKey,
482 ) -> Result<(Arc<Q::Output>, bool), QueryError> {
483 let mut ctx = QueryContext {
485 runtime: self,
486 current_key: full_key.clone(),
487 deps: RefCell::new(Vec::new()),
488 };
489
490 let result = query.query(&mut ctx);
492
493 let deps: Vec<FullCacheKey> = ctx.deps.borrow().clone();
495
496 let durability =
498 Durability::new(query.durability() as usize).unwrap_or(Durability::volatile());
499
500 match result {
501 Ok(output) => {
502 let output = Arc::new(output);
503
504 let output_changed =
506 if let Some(CachedValue::Ok(old)) = self.cache.get_cached::<Q>(full_key) {
507 !Q::output_eq(&old, &output)
508 } else {
509 true };
511
512 self.cache.insert_ok::<Q>(full_key.clone(), output.clone());
514
515 if output_changed {
517 let _ = self.whale.register(full_key.clone(), (), durability, deps);
518 } else {
519 let _ = self.whale.confirm_unchanged(full_key, deps);
520 }
521
522 let is_new_query = self.query_registry.register(query);
524 if is_new_query {
525 let sentinel = FullCacheKey::query_set_sentinel::<Q>();
526 let _ = self
527 .whale
528 .register(sentinel, (), Durability::volatile(), vec![]);
529 }
530
531 self.verifiers.insert(full_key.clone(), query.clone());
533
534 Ok((output, output_changed))
535 }
536 Err(QueryError::UserError(err)) => {
537 let output_changed = if let Some(CachedValue::UserError(old_err)) =
539 self.cache.get_cached::<Q>(full_key)
540 {
541 !(self.error_comparator)(old_err.as_ref(), err.as_ref())
542 } else {
543 true };
545
546 self.cache.insert_error(full_key.clone(), err.clone());
548
549 if output_changed {
551 let _ = self.whale.register(full_key.clone(), (), durability, deps);
552 } else {
553 let _ = self.whale.confirm_unchanged(full_key, deps);
554 }
555
556 let is_new_query = self.query_registry.register(query);
558 if is_new_query {
559 let sentinel = FullCacheKey::query_set_sentinel::<Q>();
560 let _ = self
561 .whale
562 .register(sentinel, (), Durability::volatile(), vec![]);
563 }
564
565 self.verifiers.insert(full_key.clone(), query.clone());
567
568 Err(QueryError::UserError(err))
569 }
570 Err(e) => {
571 Err(e)
573 }
574 }
575 }
576
577 pub fn invalidate<Q: Query>(&self, key: &Q::CacheKey) {
581 let full_key = FullCacheKey::new::<Q, _>(key);
582
583 #[cfg(feature = "inspector")]
584 self.emit(|| FlowEvent::QueryInvalidated {
585 query: query_flow_inspector::QueryKey::new(
586 std::any::type_name::<Q>(),
587 full_key.debug_repr(),
588 ),
589 reason: query_flow_inspector::InvalidationReason::ManualInvalidation,
590 });
591
592 self.cache.remove(&full_key);
594
595 let _ = self
597 .whale
598 .register(full_key, (), Durability::volatile(), vec![]);
599 }
600
601 pub fn clear_cache(&self) {
603 self.cache.clear();
604 }
605}
606
607pub struct QueryRuntimeBuilder {
625 error_comparator: ErrorComparator,
626}
627
628impl Default for QueryRuntimeBuilder {
629 fn default() -> Self {
630 Self::new()
631 }
632}
633
634impl QueryRuntimeBuilder {
635 pub fn new() -> Self {
637 Self {
638 error_comparator: default_error_comparator,
639 }
640 }
641
642 pub fn error_comparator(mut self, f: ErrorComparator) -> Self {
660 self.error_comparator = f;
661 self
662 }
663
664 pub fn build(self) -> QueryRuntime {
666 QueryRuntime {
667 whale: WhaleRuntime::new(),
668 cache: Arc::new(CacheStorage::new()),
669 assets: Arc::new(AssetStorage::new()),
670 locators: Arc::new(LocatorStorage::new()),
671 pending: Arc::new(PendingStorage::new()),
672 query_registry: Arc::new(QueryRegistry::new()),
673 asset_key_registry: Arc::new(AssetKeyRegistry::new()),
674 verifiers: Arc::new(VerifierStorage::new()),
675 error_comparator: self.error_comparator,
676 #[cfg(feature = "inspector")]
677 sink: Arc::new(parking_lot::RwLock::new(None)),
678 }
679 }
680}
681
682impl QueryRuntime {
687 pub fn register_asset_locator<K, L>(&self, locator: L)
699 where
700 K: AssetKey,
701 L: AssetLocator<K>,
702 {
703 self.locators.insert::<K, L>(locator);
704 }
705
706 pub fn pending_assets(&self) -> Vec<PendingAsset> {
722 self.pending.get_all()
723 }
724
725 pub fn pending_assets_of<K: AssetKey>(&self) -> Vec<K> {
727 self.pending.get_of_type::<K>()
728 }
729
730 pub fn has_pending_assets(&self) -> bool {
732 !self.pending.is_empty()
733 }
734
735 pub fn resolve_asset<K: AssetKey>(&self, key: K, value: K::Asset) {
752 let durability = key.durability();
753 self.resolve_asset_internal(key, value, durability);
754 }
755
756 pub fn resolve_asset_with_durability<K: AssetKey>(
760 &self,
761 key: K,
762 value: K::Asset,
763 durability: DurabilityLevel,
764 ) {
765 self.resolve_asset_internal(key, value, durability);
766 }
767
768 fn resolve_asset_internal<K: AssetKey>(
769 &self,
770 key: K,
771 value: K::Asset,
772 durability_level: DurabilityLevel,
773 ) {
774 let full_asset_key = FullAssetKey::new(&key);
775 let full_cache_key = FullCacheKey::from_asset_key(&full_asset_key);
776
777 let changed = if let Some(old_value) = self.assets.get_ready::<K>(&full_asset_key) {
779 !K::asset_eq(&old_value, &value)
780 } else {
781 true };
783
784 #[cfg(feature = "inspector")]
786 self.emit(|| FlowEvent::AssetResolved {
787 asset: query_flow_inspector::AssetKey::new(
788 std::any::type_name::<K>(),
789 format!("{:?}", key),
790 ),
791 changed,
792 });
793
794 self.assets
796 .insert_ready::<K>(full_asset_key.clone(), Arc::new(value));
797
798 self.pending.remove(&full_asset_key);
800
801 let durability =
803 Durability::new(durability_level.as_u8() as usize).unwrap_or(Durability::volatile());
804
805 if changed {
806 let _ = self.whale.register(full_cache_key, (), durability, vec![]);
808 } else {
809 let _ = self.whale.confirm_unchanged(&full_cache_key, vec![]);
811 }
812
813 let is_new_asset = self.asset_key_registry.register(&key);
815 if is_new_asset {
816 let sentinel = FullCacheKey::asset_key_set_sentinel::<K>();
818 let _ = self
819 .whale
820 .register(sentinel, (), Durability::volatile(), vec![]);
821 }
822 }
823
824 pub fn invalidate_asset<K: AssetKey>(&self, key: &K) {
838 let full_asset_key = FullAssetKey::new(key);
839 let full_cache_key = FullCacheKey::from_asset_key(&full_asset_key);
840
841 #[cfg(feature = "inspector")]
843 self.emit(|| FlowEvent::AssetInvalidated {
844 asset: query_flow_inspector::AssetKey::new(
845 std::any::type_name::<K>(),
846 format!("{:?}", key),
847 ),
848 });
849
850 self.assets
852 .insert(full_asset_key.clone(), AssetState::Loading);
853
854 self.pending.insert::<K>(full_asset_key, key.clone());
856
857 let _ = self
859 .whale
860 .register(full_cache_key, (), Durability::volatile(), vec![]);
861 }
862
863 pub fn remove_asset<K: AssetKey>(&self, key: &K) {
868 let full_asset_key = FullAssetKey::new(key);
869 let full_cache_key = FullCacheKey::from_asset_key(&full_asset_key);
870
871 let _ = self
874 .whale
875 .register(full_cache_key.clone(), (), Durability::volatile(), vec![]);
876
877 self.assets.remove(&full_asset_key);
879 self.pending.remove(&full_asset_key);
880
881 self.whale.remove(&full_cache_key);
883
884 if self.asset_key_registry.remove::<K>(key) {
886 let sentinel = FullCacheKey::asset_key_set_sentinel::<K>();
887 let _ = self
888 .whale
889 .register(sentinel, (), Durability::volatile(), vec![]);
890 }
891 }
892
893 pub fn get_asset<K: AssetKey>(
905 &self,
906 key: &K,
907 ) -> Result<LoadingState<Arc<K::Asset>>, QueryError> {
908 self.get_asset_internal(key)
909 }
910
911 fn get_asset_internal<K: AssetKey>(
913 &self,
914 key: &K,
915 ) -> Result<LoadingState<Arc<K::Asset>>, QueryError> {
916 let full_asset_key = FullAssetKey::new(key);
917 let full_cache_key = FullCacheKey::from_asset_key(&full_asset_key);
918
919 #[cfg(feature = "inspector")]
921 let emit_requested = |runtime: &Self, state: query_flow_inspector::AssetState| {
922 runtime.emit(|| FlowEvent::AssetRequested {
923 asset: query_flow_inspector::AssetKey::new(
924 std::any::type_name::<K>(),
925 format!("{:?}", key),
926 ),
927 state,
928 });
929 };
930
931 if let Some(state) = self.assets.get(&full_asset_key) {
933 if self.whale.is_valid(&full_cache_key) {
935 return match state {
936 AssetState::Loading => {
937 #[cfg(feature = "inspector")]
938 emit_requested(self, query_flow_inspector::AssetState::Loading);
939 Ok(LoadingState::Loading)
940 }
941 AssetState::Ready(arc) => {
942 #[cfg(feature = "inspector")]
943 emit_requested(self, query_flow_inspector::AssetState::Ready);
944 match arc.downcast::<K::Asset>() {
945 Ok(value) => Ok(LoadingState::Ready(value)),
946 Err(_) => Err(QueryError::MissingDependency {
947 description: format!("Asset type mismatch: {:?}", key),
948 }),
949 }
950 }
951 AssetState::NotFound => {
952 #[cfg(feature = "inspector")]
953 emit_requested(self, query_flow_inspector::AssetState::NotFound);
954 Err(QueryError::MissingDependency {
955 description: format!("Asset not found: {:?}", key),
956 })
957 }
958 };
959 }
960 }
961
962 if let Some(locator) = self.locators.get(TypeId::of::<K>()) {
964 if let Some(state) = locator.locate_any(key) {
965 self.assets.insert(full_asset_key.clone(), state.clone());
967
968 match state {
969 AssetState::Ready(arc) => {
970 #[cfg(feature = "inspector")]
971 emit_requested(self, query_flow_inspector::AssetState::Ready);
972
973 let durability = Durability::new(key.durability().as_u8() as usize)
975 .unwrap_or(Durability::volatile());
976 let _ = self.whale.register(full_cache_key, (), durability, vec![]);
977
978 match arc.downcast::<K::Asset>() {
979 Ok(value) => return Ok(LoadingState::Ready(value)),
980 Err(_) => {
981 return Err(QueryError::MissingDependency {
982 description: format!("Asset type mismatch: {:?}", key),
983 })
984 }
985 }
986 }
987 AssetState::Loading => {
988 #[cfg(feature = "inspector")]
989 emit_requested(self, query_flow_inspector::AssetState::Loading);
990 self.pending.insert::<K>(full_asset_key, key.clone());
991 return Ok(LoadingState::Loading);
992 }
993 AssetState::NotFound => {
994 #[cfg(feature = "inspector")]
995 emit_requested(self, query_flow_inspector::AssetState::NotFound);
996 return Err(QueryError::MissingDependency {
997 description: format!("Asset not found: {:?}", key),
998 });
999 }
1000 }
1001 }
1002 }
1003
1004 #[cfg(feature = "inspector")]
1006 emit_requested(self, query_flow_inspector::AssetState::Loading);
1007 self.assets
1008 .insert(full_asset_key.clone(), AssetState::Loading);
1009 self.pending.insert::<K>(full_asset_key, key.clone());
1010 Ok(LoadingState::Loading)
1011 }
1012}
1013
1014#[cfg(feature = "inspector")]
1018pub struct QueryContext<'a> {
1019 runtime: &'a QueryRuntime,
1020 current_key: FullCacheKey,
1021 parent_query_type: &'static str,
1022 span_id: SpanId,
1023 deps: RefCell<Vec<FullCacheKey>>,
1024}
1025
1026#[cfg(not(feature = "inspector"))]
1030pub struct QueryContext<'a> {
1031 runtime: &'a QueryRuntime,
1032 #[allow(dead_code)]
1033 current_key: FullCacheKey,
1034 deps: RefCell<Vec<FullCacheKey>>,
1035}
1036
1037impl<'a> QueryContext<'a> {
1038 pub fn query<Q: Query>(&mut self, query: Q) -> Result<Arc<Q::Output>, QueryError> {
1051 let key = query.cache_key();
1052 let full_key = FullCacheKey::new::<Q, _>(&key);
1053
1054 #[cfg(feature = "inspector")]
1056 self.runtime.emit(|| FlowEvent::DependencyRegistered {
1057 span_id: self.span_id,
1058 parent: query_flow_inspector::QueryKey::new(
1059 self.parent_query_type,
1060 self.current_key.debug_repr(),
1061 ),
1062 dependency: query_flow_inspector::QueryKey::new(
1063 std::any::type_name::<Q>(),
1064 full_key.debug_repr(),
1065 ),
1066 });
1067
1068 self.deps.borrow_mut().push(full_key.clone());
1070
1071 self.runtime.query(query)
1073 }
1074
1075 pub fn asset<K: AssetKey>(
1096 &mut self,
1097 key: &K,
1098 ) -> Result<LoadingState<Arc<K::Asset>>, QueryError> {
1099 let full_asset_key = FullAssetKey::new(key);
1100 let full_cache_key = FullCacheKey::from_asset_key(&full_asset_key);
1101
1102 #[cfg(feature = "inspector")]
1104 self.runtime.emit(|| FlowEvent::AssetDependencyRegistered {
1105 span_id: self.span_id,
1106 parent: query_flow_inspector::QueryKey::new(
1107 self.parent_query_type,
1108 self.current_key.debug_repr(),
1109 ),
1110 asset: query_flow_inspector::AssetKey::new(
1111 std::any::type_name::<K>(),
1112 format!("{:?}", key),
1113 ),
1114 });
1115
1116 self.deps.borrow_mut().push(full_cache_key);
1118
1119 let result = self.runtime.get_asset_internal(key);
1121
1122 #[cfg(feature = "inspector")]
1124 if let Err(QueryError::MissingDependency { ref description }) = result {
1125 self.runtime.emit(|| FlowEvent::MissingDependency {
1126 query: query_flow_inspector::QueryKey::new(
1127 self.parent_query_type,
1128 self.current_key.debug_repr(),
1129 ),
1130 dependency_description: description.clone(),
1131 });
1132 }
1133
1134 result
1135 }
1136
1137 pub fn list_queries<Q: Query>(&mut self) -> Vec<Q> {
1160 let sentinel = FullCacheKey::query_set_sentinel::<Q>();
1162
1163 #[cfg(feature = "inspector")]
1164 self.runtime.emit(|| FlowEvent::DependencyRegistered {
1165 span_id: self.span_id,
1166 parent: query_flow_inspector::QueryKey::new(
1167 self.parent_query_type,
1168 self.current_key.debug_repr(),
1169 ),
1170 dependency: query_flow_inspector::QueryKey::new("QuerySet", sentinel.debug_repr()),
1171 });
1172
1173 self.deps.borrow_mut().push(sentinel);
1174
1175 self.runtime.query_registry.get_all::<Q>()
1177 }
1178
1179 pub fn list_asset_keys<K: AssetKey>(&mut self) -> Vec<K> {
1204 let sentinel = FullCacheKey::asset_key_set_sentinel::<K>();
1206
1207 #[cfg(feature = "inspector")]
1208 self.runtime.emit(|| FlowEvent::AssetDependencyRegistered {
1209 span_id: self.span_id,
1210 parent: query_flow_inspector::QueryKey::new(
1211 self.parent_query_type,
1212 self.current_key.debug_repr(),
1213 ),
1214 asset: query_flow_inspector::AssetKey::new("AssetKeySet", sentinel.debug_repr()),
1215 });
1216
1217 self.deps.borrow_mut().push(sentinel);
1218
1219 self.runtime.asset_key_registry.get_all::<K>()
1221 }
1222}
1223
1224#[cfg(test)]
1225mod tests {
1226 use super::*;
1227
1228 #[test]
1229 fn test_simple_query() {
1230 #[derive(Clone)]
1231 struct Add {
1232 a: i32,
1233 b: i32,
1234 }
1235
1236 impl Query for Add {
1237 type CacheKey = (i32, i32);
1238 type Output = i32;
1239
1240 fn cache_key(&self) -> Self::CacheKey {
1241 (self.a, self.b)
1242 }
1243
1244 fn query(&self, _ctx: &mut QueryContext) -> Result<Self::Output, QueryError> {
1245 Ok(self.a + self.b)
1246 }
1247
1248 fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
1249 old == new
1250 }
1251 }
1252
1253 let runtime = QueryRuntime::new();
1254
1255 let result = runtime.query(Add { a: 1, b: 2 }).unwrap();
1256 assert_eq!(*result, 3);
1257
1258 let result2 = runtime.query(Add { a: 1, b: 2 }).unwrap();
1260 assert_eq!(*result2, 3);
1261 }
1262
1263 #[test]
1264 fn test_dependent_queries() {
1265 #[derive(Clone)]
1266 struct Base {
1267 value: i32,
1268 }
1269
1270 impl Query for Base {
1271 type CacheKey = i32;
1272 type Output = i32;
1273
1274 fn cache_key(&self) -> Self::CacheKey {
1275 self.value
1276 }
1277
1278 fn query(&self, _ctx: &mut QueryContext) -> Result<Self::Output, QueryError> {
1279 Ok(self.value * 2)
1280 }
1281
1282 fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
1283 old == new
1284 }
1285 }
1286
1287 #[derive(Clone)]
1288 struct Derived {
1289 base_value: i32,
1290 }
1291
1292 impl Query for Derived {
1293 type CacheKey = i32;
1294 type Output = i32;
1295
1296 fn cache_key(&self) -> Self::CacheKey {
1297 self.base_value
1298 }
1299
1300 fn query(&self, ctx: &mut QueryContext) -> Result<Self::Output, QueryError> {
1301 let base = ctx.query(Base {
1302 value: self.base_value,
1303 })?;
1304 Ok(*base + 10)
1305 }
1306
1307 fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
1308 old == new
1309 }
1310 }
1311
1312 let runtime = QueryRuntime::new();
1313
1314 let result = runtime.query(Derived { base_value: 5 }).unwrap();
1315 assert_eq!(*result, 20); }
1317
1318 #[test]
1319 fn test_cycle_detection() {
1320 #[derive(Clone)]
1321 struct CycleA {
1322 id: i32,
1323 }
1324
1325 #[derive(Clone)]
1326 struct CycleB {
1327 id: i32,
1328 }
1329
1330 impl Query for CycleA {
1331 type CacheKey = i32;
1332 type Output = i32;
1333
1334 fn cache_key(&self) -> Self::CacheKey {
1335 self.id
1336 }
1337
1338 fn query(&self, ctx: &mut QueryContext) -> Result<Self::Output, QueryError> {
1339 let b = ctx.query(CycleB { id: self.id })?;
1340 Ok(*b + 1)
1341 }
1342
1343 fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
1344 old == new
1345 }
1346 }
1347
1348 impl Query for CycleB {
1349 type CacheKey = i32;
1350 type Output = i32;
1351
1352 fn cache_key(&self) -> Self::CacheKey {
1353 self.id
1354 }
1355
1356 fn query(&self, ctx: &mut QueryContext) -> Result<Self::Output, QueryError> {
1357 let a = ctx.query(CycleA { id: self.id })?;
1358 Ok(*a + 1)
1359 }
1360
1361 fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
1362 old == new
1363 }
1364 }
1365
1366 let runtime = QueryRuntime::new();
1367
1368 let result = runtime.query(CycleA { id: 1 });
1369 assert!(matches!(result, Err(QueryError::Cycle { .. })));
1370 }
1371
1372 #[test]
1373 fn test_fallible_query() {
1374 #[derive(Clone)]
1375 struct ParseInt {
1376 input: String,
1377 }
1378
1379 impl Query for ParseInt {
1380 type CacheKey = String;
1381 type Output = Result<i32, std::num::ParseIntError>;
1382
1383 fn cache_key(&self) -> Self::CacheKey {
1384 self.input.clone()
1385 }
1386
1387 fn query(&self, _ctx: &mut QueryContext) -> Result<Self::Output, QueryError> {
1388 Ok(self.input.parse())
1389 }
1390
1391 fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
1392 old == new
1393 }
1394 }
1395
1396 let runtime = QueryRuntime::new();
1397
1398 let result = runtime
1400 .query(ParseInt {
1401 input: "42".to_string(),
1402 })
1403 .unwrap();
1404 assert_eq!(*result, Ok(42));
1405
1406 let result = runtime
1408 .query(ParseInt {
1409 input: "not_a_number".to_string(),
1410 })
1411 .unwrap();
1412 assert!(result.is_err());
1413 }
1414
1415 mod macro_tests {
1417 use super::*;
1418 use crate::query;
1419
1420 #[query]
1421 fn add(ctx: &mut QueryContext, a: i32, b: i32) -> Result<i32, QueryError> {
1422 let _ = ctx; Ok(a + b)
1424 }
1425
1426 #[test]
1427 fn test_macro_basic() {
1428 let runtime = QueryRuntime::new();
1429 let result = runtime.query(Add::new(1, 2)).unwrap();
1430 assert_eq!(*result, 3);
1431 }
1432
1433 #[query(durability = 2)]
1434 fn with_durability(ctx: &mut QueryContext, x: i32) -> Result<i32, QueryError> {
1435 let _ = ctx;
1436 Ok(x * 2)
1437 }
1438
1439 #[test]
1440 fn test_macro_durability() {
1441 let runtime = QueryRuntime::new();
1442 let result = runtime.query(WithDurability::new(5)).unwrap();
1443 assert_eq!(*result, 10);
1444 }
1445
1446 #[query(keys(id))]
1447 fn with_key_selection(
1448 ctx: &mut QueryContext,
1449 id: u32,
1450 include_extra: bool,
1451 ) -> Result<String, QueryError> {
1452 let _ = ctx;
1453 Ok(format!("id={}, extra={}", id, include_extra))
1454 }
1455
1456 #[test]
1457 fn test_macro_key_selection() {
1458 let runtime = QueryRuntime::new();
1459
1460 let r1 = runtime.query(WithKeySelection::new(1, true)).unwrap();
1462 let r2 = runtime.query(WithKeySelection::new(1, false)).unwrap();
1463
1464 assert_eq!(*r1, "id=1, extra=true");
1466 assert_eq!(*r2, "id=1, extra=true"); }
1468
1469 #[query]
1470 fn dependent(ctx: &mut QueryContext, a: i32, b: i32) -> Result<i32, QueryError> {
1471 let sum = ctx.query(Add::new(*a, *b))?;
1472 Ok(*sum * 2)
1473 }
1474
1475 #[test]
1476 fn test_macro_dependencies() {
1477 let runtime = QueryRuntime::new();
1478 let result = runtime.query(Dependent::new(3, 4)).unwrap();
1479 assert_eq!(*result, 14); }
1481
1482 #[query(output_eq)]
1483 fn with_output_eq(ctx: &mut QueryContext, x: i32) -> Result<i32, QueryError> {
1484 let _ = ctx;
1485 Ok(*x * 2)
1486 }
1487
1488 #[test]
1489 fn test_macro_output_eq() {
1490 let runtime = QueryRuntime::new();
1491 let result = runtime.query(WithOutputEq::new(5)).unwrap();
1492 assert_eq!(*result, 10);
1493 }
1494
1495 #[query(name = "CustomName")]
1496 fn original_name(ctx: &mut QueryContext, x: i32) -> Result<i32, QueryError> {
1497 let _ = ctx;
1498 Ok(*x)
1499 }
1500
1501 #[test]
1502 fn test_macro_custom_name() {
1503 let runtime = QueryRuntime::new();
1504 let result = runtime.query(CustomName::new(42)).unwrap();
1505 assert_eq!(*result, 42);
1506 }
1507
1508 #[allow(unused_variables)]
1512 #[inline]
1513 #[query]
1514 fn with_attributes(ctx: &mut QueryContext, x: i32) -> Result<i32, QueryError> {
1515 let unused_var = 42;
1517 Ok(*x * 2)
1518 }
1519
1520 #[test]
1521 fn test_macro_preserves_attributes() {
1522 let runtime = QueryRuntime::new();
1523 let result = runtime.query(WithAttributes::new(5)).unwrap();
1525 assert_eq!(*result, 10);
1526 }
1527 }
1528}