1use std::any::TypeId;
4use std::cell::RefCell;
5use std::sync::Arc;
6
7use std::ops::Deref;
8
9use whale::{Durability, RevisionCounter, Runtime as WhaleRuntime};
10
11use crate::asset::{AssetKey, AssetLocator, DurabilityLevel, FullAssetKey, PendingAsset};
12use crate::key::FullCacheKey;
13use crate::loading::AssetLoadingState;
14use crate::query::Query;
15use crate::storage::{
16 AssetKeyRegistry, AssetState, AssetStorage, CachedEntry, CachedValue, LocatorStorage,
17 PendingStorage, QueryRegistry, VerifierStorage,
18};
19use crate::QueryError;
20
21pub type ErrorComparator = fn(&anyhow::Error, &anyhow::Error) -> bool;
26
27#[cfg(feature = "inspector")]
28use query_flow_inspector::{EventSink, FlowEvent, SpanId};
29
30const DURABILITY_LEVELS: usize = 4;
32
33thread_local! {
35 static QUERY_STACK: RefCell<Vec<FullCacheKey>> = const { RefCell::new(Vec::new()) };
36}
37
38#[cfg(feature = "inspector")]
43#[derive(Clone, Copy)]
44pub struct ExecutionContext {
45 span_id: SpanId,
46}
47
48#[cfg(not(feature = "inspector"))]
49#[derive(Clone, Copy)]
50pub struct ExecutionContext;
51
52impl ExecutionContext {
53 #[cfg(feature = "inspector")]
55 pub fn new() -> Self {
56 Self {
57 span_id: query_flow_inspector::new_span_id(),
58 }
59 }
60
61 #[cfg(not(feature = "inspector"))]
63 #[inline(always)]
64 pub fn new() -> Self {
65 Self
66 }
67
68 #[cfg(feature = "inspector")]
70 pub fn span_id(&self) -> SpanId {
71 self.span_id
72 }
73}
74
75impl Default for ExecutionContext {
76 fn default() -> Self {
77 Self::new()
78 }
79}
80
81#[derive(Debug, Clone)]
102pub struct Polled<T> {
103 pub value: T,
105 pub revision: RevisionCounter,
109}
110
111impl<T: Deref> Deref for Polled<T> {
112 type Target = T::Target;
113
114 fn deref(&self) -> &Self::Target {
115 &self.value
116 }
117}
118
119pub struct QueryRuntime {
135 whale: WhaleRuntime<FullCacheKey, Option<CachedEntry>, DURABILITY_LEVELS>,
138 assets: Arc<AssetStorage>,
140 locators: Arc<LocatorStorage>,
142 pending: Arc<PendingStorage>,
144 query_registry: Arc<QueryRegistry>,
146 asset_key_registry: Arc<AssetKeyRegistry>,
148 verifiers: Arc<VerifierStorage>,
150 error_comparator: ErrorComparator,
152 #[cfg(feature = "inspector")]
154 sink: Arc<parking_lot::RwLock<Option<Arc<dyn EventSink>>>>,
155}
156
157impl Default for QueryRuntime {
158 fn default() -> Self {
159 Self::new()
160 }
161}
162
163impl Clone for QueryRuntime {
164 fn clone(&self) -> Self {
165 Self {
166 whale: self.whale.clone(),
167 assets: self.assets.clone(),
168 locators: self.locators.clone(),
169 pending: self.pending.clone(),
170 query_registry: self.query_registry.clone(),
171 asset_key_registry: self.asset_key_registry.clone(),
172 verifiers: self.verifiers.clone(),
173 error_comparator: self.error_comparator,
174 #[cfg(feature = "inspector")]
175 sink: self.sink.clone(),
176 }
177 }
178}
179
180fn default_error_comparator(_a: &anyhow::Error, _b: &anyhow::Error) -> bool {
184 false
185}
186
187impl QueryRuntime {
188 fn get_cached_with_revision<Q: Query>(
190 &self,
191 key: &FullCacheKey,
192 ) -> Option<(CachedValue<Arc<Q::Output>>, RevisionCounter)> {
193 let node = self.whale.get(key)?;
194 let revision = node.changed_at;
195 let entry = node.data.as_ref()?;
196 let cached = entry.to_cached_value::<Q::Output>()?;
197 Some((cached, revision))
198 }
199}
200
201impl QueryRuntime {
202 pub fn new() -> Self {
204 Self::builder().build()
205 }
206
207 pub fn builder() -> QueryRuntimeBuilder {
223 QueryRuntimeBuilder::new()
224 }
225
226 #[cfg(feature = "inspector")]
242 pub fn set_sink(&self, sink: Option<Arc<dyn EventSink>>) {
243 *self.sink.write() = sink;
244 }
245
246 #[cfg(feature = "inspector")]
248 pub fn sink(&self) -> Option<Arc<dyn EventSink>> {
249 self.sink.read().clone()
250 }
251
252 #[cfg(feature = "inspector")]
254 #[inline]
255 fn emit<F: FnOnce() -> FlowEvent>(&self, event: F) {
256 let guard = self.sink.read();
257 if let Some(ref sink) = *guard {
258 sink.emit(event());
259 }
260 }
261
262 pub fn query<Q: Query>(&self, query: Q) -> Result<Arc<Q::Output>, QueryError> {
271 self.query_internal(query)
272 .and_then(|(inner_result, _)| inner_result.map_err(QueryError::UserError))
273 }
274
275 #[allow(clippy::type_complexity)]
280 fn query_internal<Q: Query>(
281 &self,
282 query: Q,
283 ) -> Result<(Result<Arc<Q::Output>, Arc<anyhow::Error>>, RevisionCounter), QueryError> {
284 let key = query.cache_key();
285 let full_key = FullCacheKey::new::<Q, _>(&key);
286
287 let exec_ctx = ExecutionContext::new();
289 #[cfg(feature = "inspector")]
290 let start_time = std::time::Instant::now();
291 #[cfg(feature = "inspector")]
292 let query_key =
293 query_flow_inspector::QueryKey::new(std::any::type_name::<Q>(), full_key.debug_repr());
294
295 #[cfg(feature = "inspector")]
296 self.emit(|| FlowEvent::QueryStart {
297 span_id: exec_ctx.span_id(),
298 query: query_key.clone(),
299 });
300
301 let cycle_detected = QUERY_STACK.with(|stack| {
303 let stack = stack.borrow();
304 stack.iter().any(|k| k == &full_key)
305 });
306
307 if cycle_detected {
308 let path = QUERY_STACK.with(|stack| {
309 let stack = stack.borrow();
310 let mut path: Vec<String> =
311 stack.iter().map(|k| k.debug_repr().to_string()).collect();
312 path.push(full_key.debug_repr().to_string());
313 path
314 });
315
316 #[cfg(feature = "inspector")]
317 self.emit(|| FlowEvent::CycleDetected {
318 path: path
319 .iter()
320 .map(|s| query_flow_inspector::QueryKey::new("", s.clone()))
321 .collect(),
322 });
323
324 #[cfg(feature = "inspector")]
325 self.emit(|| FlowEvent::QueryEnd {
326 span_id: exec_ctx.span_id(),
327 query: query_key.clone(),
328 result: query_flow_inspector::ExecutionResult::CycleDetected,
329 duration: start_time.elapsed(),
330 });
331
332 return Err(QueryError::Cycle { path });
333 }
334
335 let current_rev = self.whale.current_revision();
337
338 if self.whale.is_verified_at(&full_key, ¤t_rev) {
340 if let Some((cached, revision)) = self.get_cached_with_revision::<Q>(&full_key) {
342 #[cfg(feature = "inspector")]
343 self.emit(|| FlowEvent::CacheCheck {
344 span_id: exec_ctx.span_id(),
345 query: query_key.clone(),
346 valid: true,
347 });
348
349 #[cfg(feature = "inspector")]
350 self.emit(|| FlowEvent::QueryEnd {
351 span_id: exec_ctx.span_id(),
352 query: query_key.clone(),
353 result: query_flow_inspector::ExecutionResult::CacheHit,
354 duration: start_time.elapsed(),
355 });
356
357 return match cached {
358 CachedValue::Ok(output) => Ok((Ok(output), revision)),
359 CachedValue::UserError(err) => Ok((Err(err), revision)),
360 };
361 }
362 }
363
364 if self.whale.is_valid(&full_key) {
366 if let Some((cached, revision)) = self.get_cached_with_revision::<Q>(&full_key) {
368 let mut deps_verified = true;
370 if let Some(deps) = self.whale.get_dependency_ids(&full_key) {
371 for dep in deps {
372 if let Some(verifier) = self.verifiers.get(&dep) {
373 if verifier.verify(self).is_err() {
375 deps_verified = false;
376 break;
377 }
378 }
379 }
380 }
381
382 if deps_verified && self.whale.is_valid(&full_key) {
384 self.whale.mark_verified(&full_key, ¤t_rev);
386
387 #[cfg(feature = "inspector")]
388 self.emit(|| FlowEvent::CacheCheck {
389 span_id: exec_ctx.span_id(),
390 query: query_key.clone(),
391 valid: true,
392 });
393
394 #[cfg(feature = "inspector")]
395 self.emit(|| FlowEvent::QueryEnd {
396 span_id: exec_ctx.span_id(),
397 query: query_key.clone(),
398 result: query_flow_inspector::ExecutionResult::CacheHit,
399 duration: start_time.elapsed(),
400 });
401
402 return match cached {
403 CachedValue::Ok(output) => Ok((Ok(output), revision)),
404 CachedValue::UserError(err) => Ok((Err(err), revision)),
405 };
406 }
407 }
409 }
410
411 #[cfg(feature = "inspector")]
412 self.emit(|| FlowEvent::CacheCheck {
413 span_id: exec_ctx.span_id(),
414 query: query_key.clone(),
415 valid: false,
416 });
417
418 QUERY_STACK.with(|stack| {
420 stack.borrow_mut().push(full_key.clone());
421 });
422
423 let result = self.execute_query::<Q>(&query, &full_key, exec_ctx);
424
425 QUERY_STACK.with(|stack| {
426 stack.borrow_mut().pop();
427 });
428
429 #[cfg(feature = "inspector")]
431 {
432 let exec_result = match &result {
433 Ok((_, true, _)) => query_flow_inspector::ExecutionResult::Changed,
434 Ok((_, false, _)) => query_flow_inspector::ExecutionResult::Unchanged,
435 Err(QueryError::Suspend { .. }) => query_flow_inspector::ExecutionResult::Suspended,
436 Err(QueryError::Cycle { .. }) => {
437 query_flow_inspector::ExecutionResult::CycleDetected
438 }
439 Err(e) => query_flow_inspector::ExecutionResult::Error {
440 message: format!("{:?}", e),
441 },
442 };
443 self.emit(|| FlowEvent::QueryEnd {
444 span_id: exec_ctx.span_id(),
445 query: query_key.clone(),
446 result: exec_result,
447 duration: start_time.elapsed(),
448 });
449 }
450
451 result.map(|(inner_result, _, revision)| (inner_result, revision))
452 }
453
454 #[allow(clippy::type_complexity)]
460 fn execute_query<Q: Query>(
461 &self,
462 query: &Q,
463 full_key: &FullCacheKey,
464 exec_ctx: ExecutionContext,
465 ) -> Result<
466 (
467 Result<Arc<Q::Output>, Arc<anyhow::Error>>,
468 bool,
469 RevisionCounter,
470 ),
471 QueryError,
472 > {
473 let mut ctx = QueryContext {
475 runtime: self,
476 current_key: full_key.clone(),
477 #[cfg(feature = "inspector")]
478 parent_query_type: std::any::type_name::<Q>(),
479 #[cfg(feature = "inspector")]
480 exec_ctx,
481 deps: RefCell::new(Vec::new()),
482 };
483 #[cfg(not(feature = "inspector"))]
485 let _ = exec_ctx;
486
487 let result = query.query(&mut ctx);
489
490 let deps: Vec<FullCacheKey> = ctx.deps.borrow().clone();
492
493 let durability =
495 Durability::new(query.durability() as usize).unwrap_or(Durability::volatile());
496
497 match result {
498 Ok(output) => {
499 let output = Arc::new(output);
500
501 let existing_revision = if let Some((CachedValue::Ok(old), rev)) =
504 self.get_cached_with_revision::<Q>(full_key)
505 {
506 if Q::output_eq(&old, &output) {
507 Some(rev) } else {
509 None }
511 } else {
512 None };
514 let output_changed = existing_revision.is_none();
515
516 #[cfg(feature = "inspector")]
518 self.emit(|| FlowEvent::EarlyCutoffCheck {
519 span_id: exec_ctx.span_id(),
520 query: query_flow_inspector::QueryKey::new(
521 std::any::type_name::<Q>(),
522 full_key.debug_repr(),
523 ),
524 output_changed,
525 });
526
527 let entry = CachedEntry::Ok(output.clone() as Arc<dyn std::any::Any + Send + Sync>);
529 let revision = if let Some(existing_rev) = existing_revision {
530 let _ = self.whale.confirm_unchanged(full_key, deps);
532 existing_rev
533 } else {
534 match self
536 .whale
537 .register(full_key.clone(), Some(entry), durability, deps)
538 {
539 Ok(result) => result.new_rev,
540 Err(missing) => {
541 return Err(QueryError::DependenciesRemoved {
542 missing_keys: missing,
543 })
544 }
545 }
546 };
547
548 let is_new_query = self.query_registry.register(query);
550 if is_new_query {
551 let sentinel = FullCacheKey::query_set_sentinel::<Q>();
552 let _ = self
553 .whale
554 .register(sentinel, None, Durability::volatile(), vec![]);
555 }
556
557 self.verifiers.insert(full_key.clone(), query.clone());
559
560 Ok((Ok(output), output_changed, revision))
561 }
562 Err(QueryError::UserError(err)) => {
563 let existing_revision = if let Some((CachedValue::UserError(old_err), rev)) =
566 self.get_cached_with_revision::<Q>(full_key)
567 {
568 if (self.error_comparator)(old_err.as_ref(), err.as_ref()) {
569 Some(rev) } else {
571 None }
573 } else {
574 None };
576 let output_changed = existing_revision.is_none();
577
578 #[cfg(feature = "inspector")]
580 self.emit(|| FlowEvent::EarlyCutoffCheck {
581 span_id: exec_ctx.span_id(),
582 query: query_flow_inspector::QueryKey::new(
583 std::any::type_name::<Q>(),
584 full_key.debug_repr(),
585 ),
586 output_changed,
587 });
588
589 let entry = CachedEntry::UserError(err.clone());
591 let revision = if let Some(existing_rev) = existing_revision {
592 let _ = self.whale.confirm_unchanged(full_key, deps);
594 existing_rev
595 } else {
596 match self
598 .whale
599 .register(full_key.clone(), Some(entry), durability, deps)
600 {
601 Ok(result) => result.new_rev,
602 Err(missing) => {
603 return Err(QueryError::DependenciesRemoved {
604 missing_keys: missing,
605 })
606 }
607 }
608 };
609
610 let is_new_query = self.query_registry.register(query);
612 if is_new_query {
613 let sentinel = FullCacheKey::query_set_sentinel::<Q>();
614 let _ = self
615 .whale
616 .register(sentinel, None, Durability::volatile(), vec![]);
617 }
618
619 self.verifiers.insert(full_key.clone(), query.clone());
621
622 Ok((Err(err), output_changed, revision))
623 }
624 Err(e) => {
625 Err(e)
627 }
628 }
629 }
630
631 pub fn invalidate<Q: Query>(&self, key: &Q::CacheKey) {
635 let full_key = FullCacheKey::new::<Q, _>(key);
636
637 #[cfg(feature = "inspector")]
638 self.emit(|| FlowEvent::QueryInvalidated {
639 query: query_flow_inspector::QueryKey::new(
640 std::any::type_name::<Q>(),
641 full_key.debug_repr(),
642 ),
643 reason: query_flow_inspector::InvalidationReason::ManualInvalidation,
644 });
645
646 let _ = self
648 .whale
649 .register(full_key, None, Durability::volatile(), vec![]);
650 }
651
652 pub fn remove_query<Q: Query>(&self, key: &Q::CacheKey) {
660 let full_key = FullCacheKey::new::<Q, _>(key);
661
662 #[cfg(feature = "inspector")]
663 self.emit(|| FlowEvent::QueryInvalidated {
664 query: query_flow_inspector::QueryKey::new(
665 std::any::type_name::<Q>(),
666 full_key.debug_repr(),
667 ),
668 reason: query_flow_inspector::InvalidationReason::ManualInvalidation,
669 });
670
671 self.verifiers.remove(&full_key);
673
674 self.whale.remove(&full_key);
676
677 if self.query_registry.remove::<Q>(key) {
679 let sentinel = FullCacheKey::query_set_sentinel::<Q>();
680 let _ = self
681 .whale
682 .register(sentinel, None, Durability::volatile(), vec![]);
683 }
684 }
685
686 pub fn clear_cache(&self) {
690 let keys = self.whale.keys();
691 for key in keys {
692 self.whale.remove(&key);
693 }
694 }
695
696 #[allow(clippy::type_complexity)]
730 pub fn poll<Q: Query>(
731 &self,
732 query: Q,
733 ) -> Result<Polled<Result<Arc<Q::Output>, Arc<anyhow::Error>>>, QueryError> {
734 let (value, revision) = self.query_internal(query)?;
735 Ok(Polled { value, revision })
736 }
737
738 pub fn changed_at<Q: Query>(&self, key: &Q::CacheKey) -> Option<RevisionCounter> {
757 let full_key = FullCacheKey::new::<Q, _>(key);
758 self.whale.get(&full_key).map(|node| node.changed_at)
759 }
760}
761
762pub struct QueryRuntimeBuilder {
780 error_comparator: ErrorComparator,
781}
782
783impl Default for QueryRuntimeBuilder {
784 fn default() -> Self {
785 Self::new()
786 }
787}
788
789impl QueryRuntimeBuilder {
790 pub fn new() -> Self {
792 Self {
793 error_comparator: default_error_comparator,
794 }
795 }
796
797 pub fn error_comparator(mut self, f: ErrorComparator) -> Self {
815 self.error_comparator = f;
816 self
817 }
818
819 pub fn build(self) -> QueryRuntime {
821 QueryRuntime {
822 whale: WhaleRuntime::new(),
823 assets: Arc::new(AssetStorage::new()),
824 locators: Arc::new(LocatorStorage::new()),
825 pending: Arc::new(PendingStorage::new()),
826 query_registry: Arc::new(QueryRegistry::new()),
827 asset_key_registry: Arc::new(AssetKeyRegistry::new()),
828 verifiers: Arc::new(VerifierStorage::new()),
829 error_comparator: self.error_comparator,
830 #[cfg(feature = "inspector")]
831 sink: Arc::new(parking_lot::RwLock::new(None)),
832 }
833 }
834}
835
836impl QueryRuntime {
841 pub fn register_asset_locator<K, L>(&self, locator: L)
853 where
854 K: AssetKey,
855 L: AssetLocator<K>,
856 {
857 self.locators.insert::<K, L>(locator);
858 }
859
860 pub fn pending_assets(&self) -> Vec<PendingAsset> {
876 self.pending.get_all()
877 }
878
879 pub fn pending_assets_of<K: AssetKey>(&self) -> Vec<K> {
881 self.pending.get_of_type::<K>()
882 }
883
884 pub fn has_pending_assets(&self) -> bool {
886 !self.pending.is_empty()
887 }
888
889 pub fn resolve_asset<K: AssetKey>(&self, key: K, value: K::Asset) {
906 let durability = key.durability();
907 self.resolve_asset_internal(key, value, durability);
908 }
909
910 pub fn resolve_asset_with_durability<K: AssetKey>(
914 &self,
915 key: K,
916 value: K::Asset,
917 durability: DurabilityLevel,
918 ) {
919 self.resolve_asset_internal(key, value, durability);
920 }
921
922 fn resolve_asset_internal<K: AssetKey>(
923 &self,
924 key: K,
925 value: K::Asset,
926 durability_level: DurabilityLevel,
927 ) {
928 let full_asset_key = FullAssetKey::new(&key);
929 let full_cache_key = FullCacheKey::from_asset_key(&full_asset_key);
930
931 let changed = if let Some(old_value) = self.assets.get_ready::<K>(&full_asset_key) {
933 !K::asset_eq(&old_value, &value)
934 } else {
935 true };
937
938 #[cfg(feature = "inspector")]
940 self.emit(|| FlowEvent::AssetResolved {
941 asset: query_flow_inspector::AssetKey::new(
942 std::any::type_name::<K>(),
943 format!("{:?}", key),
944 ),
945 changed,
946 });
947
948 self.assets
950 .insert_ready::<K>(full_asset_key.clone(), Arc::new(value));
951
952 self.pending.remove(&full_asset_key);
954
955 let durability =
957 Durability::new(durability_level.as_u8() as usize).unwrap_or(Durability::volatile());
958
959 if changed {
960 let _ = self
962 .whale
963 .register(full_cache_key, None, durability, vec![]);
964 } else {
965 let _ = self.whale.confirm_unchanged(&full_cache_key, vec![]);
967 }
968
969 let is_new_asset = self.asset_key_registry.register(&key);
971 if is_new_asset {
972 let sentinel = FullCacheKey::asset_key_set_sentinel::<K>();
974 let _ = self
975 .whale
976 .register(sentinel, None, Durability::volatile(), vec![]);
977 }
978 }
979
980 pub fn invalidate_asset<K: AssetKey>(&self, key: &K) {
994 let full_asset_key = FullAssetKey::new(key);
995 let full_cache_key = FullCacheKey::from_asset_key(&full_asset_key);
996
997 #[cfg(feature = "inspector")]
999 self.emit(|| FlowEvent::AssetInvalidated {
1000 asset: query_flow_inspector::AssetKey::new(
1001 std::any::type_name::<K>(),
1002 format!("{:?}", key),
1003 ),
1004 });
1005
1006 self.assets
1008 .insert(full_asset_key.clone(), AssetState::Loading);
1009
1010 self.pending.insert::<K>(full_asset_key, key.clone());
1012
1013 let _ = self
1015 .whale
1016 .register(full_cache_key, None, Durability::volatile(), vec![]);
1017 }
1018
1019 pub fn remove_asset<K: AssetKey>(&self, key: &K) {
1024 let full_asset_key = FullAssetKey::new(key);
1025 let full_cache_key = FullCacheKey::from_asset_key(&full_asset_key);
1026
1027 let _ = self
1030 .whale
1031 .register(full_cache_key.clone(), None, Durability::volatile(), vec![]);
1032
1033 self.assets.remove(&full_asset_key);
1035 self.pending.remove(&full_asset_key);
1036
1037 self.whale.remove(&full_cache_key);
1039
1040 if self.asset_key_registry.remove::<K>(key) {
1042 let sentinel = FullCacheKey::asset_key_set_sentinel::<K>();
1043 let _ = self
1044 .whale
1045 .register(sentinel, None, Durability::volatile(), vec![]);
1046 }
1047 }
1048
1049 pub fn get_asset<K: AssetKey>(&self, key: K) -> Result<AssetLoadingState<K>, QueryError> {
1061 self.get_asset_internal(key)
1062 }
1063
1064 fn get_asset_internal<K: AssetKey>(&self, key: K) -> Result<AssetLoadingState<K>, QueryError> {
1066 let full_asset_key = FullAssetKey::new(&key);
1067 let full_cache_key = FullCacheKey::from_asset_key(&full_asset_key);
1068
1069 #[cfg(feature = "inspector")]
1071 let emit_requested = |runtime: &Self, key: &K, state: query_flow_inspector::AssetState| {
1072 runtime.emit(|| FlowEvent::AssetRequested {
1073 asset: query_flow_inspector::AssetKey::new(
1074 std::any::type_name::<K>(),
1075 format!("{:?}", key),
1076 ),
1077 state,
1078 });
1079 };
1080
1081 if let Some(state) = self.assets.get(&full_asset_key) {
1083 if self.whale.is_valid(&full_cache_key) {
1085 return match state {
1086 AssetState::Loading => {
1087 #[cfg(feature = "inspector")]
1088 emit_requested(self, &key, query_flow_inspector::AssetState::Loading);
1089 Ok(AssetLoadingState::loading(key))
1090 }
1091 AssetState::Ready(arc) => {
1092 #[cfg(feature = "inspector")]
1093 emit_requested(self, &key, query_flow_inspector::AssetState::Ready);
1094 match arc.downcast::<K::Asset>() {
1095 Ok(value) => Ok(AssetLoadingState::ready(key, value)),
1096 Err(_) => Err(QueryError::MissingDependency {
1097 description: format!("Asset type mismatch: {:?}", key),
1098 }),
1099 }
1100 }
1101 AssetState::NotFound => {
1102 #[cfg(feature = "inspector")]
1103 emit_requested(self, &key, query_flow_inspector::AssetState::NotFound);
1104 Err(QueryError::MissingDependency {
1105 description: format!("Asset not found: {:?}", key),
1106 })
1107 }
1108 };
1109 }
1110 }
1111
1112 if let Some(locator) = self.locators.get(TypeId::of::<K>()) {
1114 if let Some(state) = locator.locate_any(&key) {
1115 self.assets.insert(full_asset_key.clone(), state.clone());
1117
1118 match state {
1119 AssetState::Ready(arc) => {
1120 #[cfg(feature = "inspector")]
1121 emit_requested(self, &key, query_flow_inspector::AssetState::Ready);
1122
1123 let durability = Durability::new(key.durability().as_u8() as usize)
1125 .unwrap_or(Durability::volatile());
1126 self.whale
1127 .register(full_cache_key, None, durability, vec![])
1128 .expect("register with no dependencies cannot fail");
1129
1130 match arc.downcast::<K::Asset>() {
1131 Ok(value) => return Ok(AssetLoadingState::ready(key, value)),
1132 Err(_) => {
1133 return Err(QueryError::MissingDependency {
1134 description: format!("Asset type mismatch: {:?}", key),
1135 })
1136 }
1137 }
1138 }
1139 AssetState::Loading => {
1140 #[cfg(feature = "inspector")]
1141 emit_requested(self, &key, query_flow_inspector::AssetState::Loading);
1142 self.pending.insert::<K>(full_asset_key, key.clone());
1143
1144 self.whale
1146 .register(full_cache_key, None, Durability::volatile(), vec![])
1147 .expect("register with no dependencies cannot fail");
1148
1149 return Ok(AssetLoadingState::loading(key));
1150 }
1151 AssetState::NotFound => {
1152 #[cfg(feature = "inspector")]
1153 emit_requested(self, &key, query_flow_inspector::AssetState::NotFound);
1154 return Err(QueryError::MissingDependency {
1155 description: format!("Asset not found: {:?}", key),
1156 });
1157 }
1158 }
1159 }
1160 }
1161
1162 #[cfg(feature = "inspector")]
1164 emit_requested(self, &key, query_flow_inspector::AssetState::Loading);
1165 self.assets
1166 .insert(full_asset_key.clone(), AssetState::Loading);
1167 self.pending
1168 .insert::<K>(full_asset_key.clone(), key.clone());
1169
1170 self.whale
1172 .register(full_cache_key, None, Durability::volatile(), vec![])
1173 .expect("register with no dependencies cannot fail");
1174
1175 Ok(AssetLoadingState::loading(key))
1176 }
1177}
1178
1179pub struct QueryContext<'a> {
1183 runtime: &'a QueryRuntime,
1184 #[cfg_attr(not(feature = "inspector"), allow(dead_code))]
1185 current_key: FullCacheKey,
1186 #[cfg(feature = "inspector")]
1187 parent_query_type: &'static str,
1188 #[cfg(feature = "inspector")]
1189 exec_ctx: ExecutionContext,
1190 deps: RefCell<Vec<FullCacheKey>>,
1191}
1192
1193impl<'a> QueryContext<'a> {
1194 pub fn query<Q: Query>(&mut self, query: Q) -> Result<Arc<Q::Output>, QueryError> {
1207 let key = query.cache_key();
1208 let full_key = FullCacheKey::new::<Q, _>(&key);
1209
1210 #[cfg(feature = "inspector")]
1212 self.runtime.emit(|| FlowEvent::DependencyRegistered {
1213 span_id: self.exec_ctx.span_id(),
1214 parent: query_flow_inspector::QueryKey::new(
1215 self.parent_query_type,
1216 self.current_key.debug_repr(),
1217 ),
1218 dependency: query_flow_inspector::QueryKey::new(
1219 std::any::type_name::<Q>(),
1220 full_key.debug_repr(),
1221 ),
1222 });
1223
1224 self.deps.borrow_mut().push(full_key.clone());
1226
1227 self.runtime.query(query)
1229 }
1230
1231 pub fn asset<K: AssetKey>(&mut self, key: K) -> Result<AssetLoadingState<K>, QueryError> {
1255 let full_asset_key = FullAssetKey::new(&key);
1256 let full_cache_key = FullCacheKey::from_asset_key(&full_asset_key);
1257
1258 #[cfg(feature = "inspector")]
1260 self.runtime.emit(|| FlowEvent::AssetDependencyRegistered {
1261 span_id: self.exec_ctx.span_id(),
1262 parent: query_flow_inspector::QueryKey::new(
1263 self.parent_query_type,
1264 self.current_key.debug_repr(),
1265 ),
1266 asset: query_flow_inspector::AssetKey::new(
1267 std::any::type_name::<K>(),
1268 format!("{:?}", key),
1269 ),
1270 });
1271
1272 self.deps.borrow_mut().push(full_cache_key);
1274
1275 let result = self.runtime.get_asset_internal(key);
1277
1278 #[cfg(feature = "inspector")]
1280 if let Err(QueryError::MissingDependency { ref description }) = result {
1281 self.runtime.emit(|| FlowEvent::MissingDependency {
1282 query: query_flow_inspector::QueryKey::new(
1283 self.parent_query_type,
1284 self.current_key.debug_repr(),
1285 ),
1286 dependency_description: description.clone(),
1287 });
1288 }
1289
1290 result
1291 }
1292
1293 pub fn list_queries<Q: Query>(&mut self) -> Vec<Q> {
1316 let sentinel = FullCacheKey::query_set_sentinel::<Q>();
1318
1319 #[cfg(feature = "inspector")]
1320 self.runtime.emit(|| FlowEvent::DependencyRegistered {
1321 span_id: self.exec_ctx.span_id(),
1322 parent: query_flow_inspector::QueryKey::new(
1323 self.parent_query_type,
1324 self.current_key.debug_repr(),
1325 ),
1326 dependency: query_flow_inspector::QueryKey::new("QuerySet", sentinel.debug_repr()),
1327 });
1328
1329 if self.runtime.whale.get(&sentinel).is_none() {
1331 let _ =
1332 self.runtime
1333 .whale
1334 .register(sentinel.clone(), None, Durability::volatile(), vec![]);
1335 }
1336
1337 self.deps.borrow_mut().push(sentinel);
1338
1339 self.runtime.query_registry.get_all::<Q>()
1341 }
1342
1343 pub fn list_asset_keys<K: AssetKey>(&mut self) -> Vec<K> {
1368 let sentinel = FullCacheKey::asset_key_set_sentinel::<K>();
1370
1371 #[cfg(feature = "inspector")]
1372 self.runtime.emit(|| FlowEvent::AssetDependencyRegistered {
1373 span_id: self.exec_ctx.span_id(),
1374 parent: query_flow_inspector::QueryKey::new(
1375 self.parent_query_type,
1376 self.current_key.debug_repr(),
1377 ),
1378 asset: query_flow_inspector::AssetKey::new("AssetKeySet", sentinel.debug_repr()),
1379 });
1380
1381 if self.runtime.whale.get(&sentinel).is_none() {
1383 let _ =
1384 self.runtime
1385 .whale
1386 .register(sentinel.clone(), None, Durability::volatile(), vec![]);
1387 }
1388
1389 self.deps.borrow_mut().push(sentinel);
1390
1391 self.runtime.asset_key_registry.get_all::<K>()
1393 }
1394}
1395
1396#[cfg(test)]
1397mod tests {
1398 use super::*;
1399
1400 #[test]
1401 fn test_simple_query() {
1402 #[derive(Clone)]
1403 struct Add {
1404 a: i32,
1405 b: i32,
1406 }
1407
1408 impl Query for Add {
1409 type CacheKey = (i32, i32);
1410 type Output = i32;
1411
1412 fn cache_key(&self) -> Self::CacheKey {
1413 (self.a, self.b)
1414 }
1415
1416 fn query(&self, _ctx: &mut QueryContext) -> Result<Self::Output, QueryError> {
1417 Ok(self.a + self.b)
1418 }
1419
1420 fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
1421 old == new
1422 }
1423 }
1424
1425 let runtime = QueryRuntime::new();
1426
1427 let result = runtime.query(Add { a: 1, b: 2 }).unwrap();
1428 assert_eq!(*result, 3);
1429
1430 let result2 = runtime.query(Add { a: 1, b: 2 }).unwrap();
1432 assert_eq!(*result2, 3);
1433 }
1434
1435 #[test]
1436 fn test_dependent_queries() {
1437 #[derive(Clone)]
1438 struct Base {
1439 value: i32,
1440 }
1441
1442 impl Query for Base {
1443 type CacheKey = i32;
1444 type Output = i32;
1445
1446 fn cache_key(&self) -> Self::CacheKey {
1447 self.value
1448 }
1449
1450 fn query(&self, _ctx: &mut QueryContext) -> Result<Self::Output, QueryError> {
1451 Ok(self.value * 2)
1452 }
1453
1454 fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
1455 old == new
1456 }
1457 }
1458
1459 #[derive(Clone)]
1460 struct Derived {
1461 base_value: i32,
1462 }
1463
1464 impl Query for Derived {
1465 type CacheKey = i32;
1466 type Output = i32;
1467
1468 fn cache_key(&self) -> Self::CacheKey {
1469 self.base_value
1470 }
1471
1472 fn query(&self, ctx: &mut QueryContext) -> Result<Self::Output, QueryError> {
1473 let base = ctx.query(Base {
1474 value: self.base_value,
1475 })?;
1476 Ok(*base + 10)
1477 }
1478
1479 fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
1480 old == new
1481 }
1482 }
1483
1484 let runtime = QueryRuntime::new();
1485
1486 let result = runtime.query(Derived { base_value: 5 }).unwrap();
1487 assert_eq!(*result, 20); }
1489
1490 #[test]
1491 fn test_cycle_detection() {
1492 #[derive(Clone)]
1493 struct CycleA {
1494 id: i32,
1495 }
1496
1497 #[derive(Clone)]
1498 struct CycleB {
1499 id: i32,
1500 }
1501
1502 impl Query for CycleA {
1503 type CacheKey = i32;
1504 type Output = i32;
1505
1506 fn cache_key(&self) -> Self::CacheKey {
1507 self.id
1508 }
1509
1510 fn query(&self, ctx: &mut QueryContext) -> Result<Self::Output, QueryError> {
1511 let b = ctx.query(CycleB { id: self.id })?;
1512 Ok(*b + 1)
1513 }
1514
1515 fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
1516 old == new
1517 }
1518 }
1519
1520 impl Query for CycleB {
1521 type CacheKey = i32;
1522 type Output = i32;
1523
1524 fn cache_key(&self) -> Self::CacheKey {
1525 self.id
1526 }
1527
1528 fn query(&self, ctx: &mut QueryContext) -> Result<Self::Output, QueryError> {
1529 let a = ctx.query(CycleA { id: self.id })?;
1530 Ok(*a + 1)
1531 }
1532
1533 fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
1534 old == new
1535 }
1536 }
1537
1538 let runtime = QueryRuntime::new();
1539
1540 let result = runtime.query(CycleA { id: 1 });
1541 assert!(matches!(result, Err(QueryError::Cycle { .. })));
1542 }
1543
1544 #[test]
1545 fn test_fallible_query() {
1546 #[derive(Clone)]
1547 struct ParseInt {
1548 input: String,
1549 }
1550
1551 impl Query for ParseInt {
1552 type CacheKey = String;
1553 type Output = Result<i32, std::num::ParseIntError>;
1554
1555 fn cache_key(&self) -> Self::CacheKey {
1556 self.input.clone()
1557 }
1558
1559 fn query(&self, _ctx: &mut QueryContext) -> Result<Self::Output, QueryError> {
1560 Ok(self.input.parse())
1561 }
1562
1563 fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
1564 old == new
1565 }
1566 }
1567
1568 let runtime = QueryRuntime::new();
1569
1570 let result = runtime
1572 .query(ParseInt {
1573 input: "42".to_string(),
1574 })
1575 .unwrap();
1576 assert_eq!(*result, Ok(42));
1577
1578 let result = runtime
1580 .query(ParseInt {
1581 input: "not_a_number".to_string(),
1582 })
1583 .unwrap();
1584 assert!(result.is_err());
1585 }
1586
1587 mod macro_tests {
1589 use super::*;
1590 use crate::query;
1591
1592 #[query]
1593 fn add(ctx: &mut QueryContext, a: i32, b: i32) -> Result<i32, QueryError> {
1594 let _ = ctx; Ok(a + b)
1596 }
1597
1598 #[test]
1599 fn test_macro_basic() {
1600 let runtime = QueryRuntime::new();
1601 let result = runtime.query(Add::new(1, 2)).unwrap();
1602 assert_eq!(*result, 3);
1603 }
1604
1605 #[query(durability = 2)]
1606 fn with_durability(ctx: &mut QueryContext, x: i32) -> Result<i32, QueryError> {
1607 let _ = ctx;
1608 Ok(x * 2)
1609 }
1610
1611 #[test]
1612 fn test_macro_durability() {
1613 let runtime = QueryRuntime::new();
1614 let result = runtime.query(WithDurability::new(5)).unwrap();
1615 assert_eq!(*result, 10);
1616 }
1617
1618 #[query(keys(id))]
1619 fn with_key_selection(
1620 ctx: &mut QueryContext,
1621 id: u32,
1622 include_extra: bool,
1623 ) -> Result<String, QueryError> {
1624 let _ = ctx;
1625 Ok(format!("id={}, extra={}", id, include_extra))
1626 }
1627
1628 #[test]
1629 fn test_macro_key_selection() {
1630 let runtime = QueryRuntime::new();
1631
1632 let r1 = runtime.query(WithKeySelection::new(1, true)).unwrap();
1634 let r2 = runtime.query(WithKeySelection::new(1, false)).unwrap();
1635
1636 assert_eq!(*r1, "id=1, extra=true");
1638 assert_eq!(*r2, "id=1, extra=true"); }
1640
1641 #[query]
1642 fn dependent(ctx: &mut QueryContext, a: i32, b: i32) -> Result<i32, QueryError> {
1643 let sum = ctx.query(Add::new(*a, *b))?;
1644 Ok(*sum * 2)
1645 }
1646
1647 #[test]
1648 fn test_macro_dependencies() {
1649 let runtime = QueryRuntime::new();
1650 let result = runtime.query(Dependent::new(3, 4)).unwrap();
1651 assert_eq!(*result, 14); }
1653
1654 #[query(output_eq)]
1655 fn with_output_eq(ctx: &mut QueryContext, x: i32) -> Result<i32, QueryError> {
1656 let _ = ctx;
1657 Ok(*x * 2)
1658 }
1659
1660 #[test]
1661 fn test_macro_output_eq() {
1662 let runtime = QueryRuntime::new();
1663 let result = runtime.query(WithOutputEq::new(5)).unwrap();
1664 assert_eq!(*result, 10);
1665 }
1666
1667 #[query(name = "CustomName")]
1668 fn original_name(ctx: &mut QueryContext, x: i32) -> Result<i32, QueryError> {
1669 let _ = ctx;
1670 Ok(*x)
1671 }
1672
1673 #[test]
1674 fn test_macro_custom_name() {
1675 let runtime = QueryRuntime::new();
1676 let result = runtime.query(CustomName::new(42)).unwrap();
1677 assert_eq!(*result, 42);
1678 }
1679
1680 #[allow(unused_variables)]
1684 #[inline]
1685 #[query]
1686 fn with_attributes(ctx: &mut QueryContext, x: i32) -> Result<i32, QueryError> {
1687 let unused_var = 42;
1689 Ok(*x * 2)
1690 }
1691
1692 #[test]
1693 fn test_macro_preserves_attributes() {
1694 let runtime = QueryRuntime::new();
1695 let result = runtime.query(WithAttributes::new(5)).unwrap();
1697 assert_eq!(*result, 10);
1698 }
1699 }
1700
1701 mod poll_tests {
1703 use super::*;
1704
1705 #[derive(Clone)]
1706 struct Counter {
1707 id: i32,
1708 }
1709
1710 impl Query for Counter {
1711 type CacheKey = i32;
1712 type Output = i32;
1713
1714 fn cache_key(&self) -> Self::CacheKey {
1715 self.id
1716 }
1717
1718 fn query(&self, _ctx: &mut QueryContext) -> Result<Self::Output, QueryError> {
1719 Ok(self.id * 10)
1720 }
1721
1722 fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
1723 old == new
1724 }
1725 }
1726
1727 #[test]
1728 fn test_poll_returns_value_and_revision() {
1729 let runtime = QueryRuntime::new();
1730
1731 let result = runtime.poll(Counter { id: 1 }).unwrap();
1732
1733 assert_eq!(**result.value.as_ref().unwrap(), 10);
1735
1736 assert!(result.revision > 0);
1738 }
1739
1740 #[test]
1741 fn test_poll_revision_stable_on_cache_hit() {
1742 let runtime = QueryRuntime::new();
1743
1744 let result1 = runtime.poll(Counter { id: 1 }).unwrap();
1746 let rev1 = result1.revision;
1747
1748 let result2 = runtime.poll(Counter { id: 1 }).unwrap();
1750 let rev2 = result2.revision;
1751
1752 assert_eq!(rev1, rev2);
1754 }
1755
1756 #[test]
1757 fn test_poll_revision_changes_on_invalidate() {
1758 let runtime = QueryRuntime::new();
1759
1760 let result1 = runtime.poll(Counter { id: 1 }).unwrap();
1762 let rev1 = result1.revision;
1763
1764 runtime.invalidate::<Counter>(&1);
1766 let result2 = runtime.poll(Counter { id: 1 }).unwrap();
1767 let rev2 = result2.revision;
1768
1769 assert_eq!(**result2.value.as_ref().unwrap(), 10);
1773
1774 assert!(rev2 >= rev1);
1777 }
1778
1779 #[test]
1780 fn test_changed_at_returns_none_for_unexecuted_query() {
1781 let runtime = QueryRuntime::new();
1782
1783 let rev = runtime.changed_at::<Counter>(&1);
1785 assert!(rev.is_none());
1786 }
1787
1788 #[test]
1789 fn test_changed_at_returns_revision_after_execution() {
1790 let runtime = QueryRuntime::new();
1791
1792 let _ = runtime.query(Counter { id: 1 }).unwrap();
1794
1795 let rev = runtime.changed_at::<Counter>(&1);
1797 assert!(rev.is_some());
1798 assert!(rev.unwrap() > 0);
1799 }
1800
1801 #[test]
1802 fn test_changed_at_matches_poll_revision() {
1803 let runtime = QueryRuntime::new();
1804
1805 let result = runtime.poll(Counter { id: 1 }).unwrap();
1807
1808 let rev = runtime.changed_at::<Counter>(&1);
1810 assert_eq!(rev, Some(result.revision));
1811 }
1812
1813 #[test]
1814 fn test_poll_value_access() {
1815 let runtime = QueryRuntime::new();
1816
1817 let result = runtime.poll(Counter { id: 5 }).unwrap();
1818
1819 let value: &i32 = result.value.as_ref().unwrap();
1821 assert_eq!(*value, 50);
1822
1823 let arc: &Arc<i32> = result.value.as_ref().unwrap();
1825 assert_eq!(**arc, 50);
1826 }
1827
1828 #[test]
1829 fn test_subscription_pattern() {
1830 let runtime = QueryRuntime::new();
1831
1832 let mut last_revision: RevisionCounter = 0;
1834 let mut notifications = 0;
1835
1836 let result = runtime.poll(Counter { id: 1 }).unwrap();
1838 if result.revision > last_revision {
1839 notifications += 1;
1840 last_revision = result.revision;
1841 }
1842
1843 let result = runtime.poll(Counter { id: 1 }).unwrap();
1845 if result.revision > last_revision {
1846 notifications += 1;
1847 last_revision = result.revision;
1848 }
1849
1850 let result = runtime.poll(Counter { id: 1 }).unwrap();
1852 if result.revision > last_revision {
1853 notifications += 1;
1854 #[allow(unused_assignments)]
1855 {
1856 last_revision = result.revision;
1857 }
1858 }
1859
1860 assert_eq!(notifications, 1);
1862 }
1863 }
1864}