1mod config;
11mod data_access;
12mod data_cache;
13mod registries;
14mod scope;
15mod variables;
16
17pub use data_cache::DataLoadMode;
19pub use variables::Variable;
20
21use std::collections::HashMap;
22use std::sync::Arc;
23
24use super::alerts::AlertRouter;
25use super::annotation_context::{AnnotationContext, AnnotationRegistry};
26use super::data::DataFrame;
27use super::event_queue::{SharedEventQueue, SuspensionState};
28use super::lookahead_guard::LookAheadGuard;
29use super::metadata::MetadataRegistry;
30use super::type_methods::TypeMethodRegistry;
31use super::type_schema::TypeSchemaRegistry;
32use crate::data::Timeframe;
33use crate::snapshot::{
34 ContextSnapshot, SnapshotStore, SuspensionStateSnapshot, TypeAliasRuntimeEntrySnapshot,
35 VariableSnapshot,
36};
37use anyhow::{Result, anyhow};
38use chrono::{DateTime, Utc};
39use shape_value::KindedSlot;
40
41#[derive(Clone)]
43pub struct ExecutionContext {
44 data_provider: Option<Arc<dyn std::any::Any + Send + Sync>>,
46 pub(crate) data_cache: Option<crate::data::DataCache>,
49 provider_registry: Arc<super::provider_registry::ProviderRegistry>,
51 type_mapping_registry: Arc<super::type_mapping::TypeMappingRegistry>,
53 type_schema_registry: Arc<TypeSchemaRegistry>,
55 metadata_registry: Arc<MetadataRegistry>,
57 data_load_mode: DataLoadMode,
59 current_id: Option<String>,
61 current_row_index: usize,
63 variable_scopes: Vec<HashMap<String, Variable>>,
65 reference_datetime: Option<DateTime<Utc>>,
70 current_timeframe: Option<Timeframe>,
72 base_timeframe: Option<Timeframe>,
74 lookahead_guard: Option<LookAheadGuard>,
76 type_method_registry: Arc<TypeMethodRegistry>,
78 date_range: Option<(DateTime<Utc>, DateTime<Utc>)>,
80 range_start: usize,
82 range_end: usize,
84 range_active: bool,
86 pattern_registry: HashMap<String, super::closure::Closure>,
90 annotation_context: AnnotationContext,
92 annotation_registry: AnnotationRegistry,
94 event_queue: Option<SharedEventQueue>,
96 suspension_state: Option<SuspensionState>,
98 alert_pipeline: Option<Arc<AlertRouter>>,
100 output_adapter: Box<dyn crate::output_adapter::OutputAdapter>,
102 type_alias_registry: HashMap<String, TypeAliasRuntimeEntry>,
105 enum_registry: EnumRegistry,
107 struct_type_registry: HashMap<String, shape_ast::ast::StructTypeDef>,
110 progress_registry: Option<Arc<super::progress::ProgressRegistry>>,
112}
113
114#[derive(Debug, Clone)]
116pub struct TypeAliasRuntimeEntry {
117 pub base_type: String,
119 pub overrides: Option<HashMap<String, KindedSlot>>,
122}
123
124#[derive(Debug, Clone, Default)]
130pub struct EnumRegistry {
131 enums: HashMap<String, shape_ast::ast::EnumDef>,
133}
134
135impl EnumRegistry {
136 pub fn new() -> Self {
138 Self {
139 enums: HashMap::new(),
140 }
141 }
142
143 pub fn register(&mut self, enum_def: shape_ast::ast::EnumDef) {
145 self.enums.insert(enum_def.name.clone(), enum_def);
146 }
147
148 pub fn get(&self, name: &str) -> Option<&shape_ast::ast::EnumDef> {
150 self.enums.get(name)
151 }
152
153 pub fn contains(&self, name: &str) -> bool {
155 self.enums.contains_key(name)
156 }
157
158 pub fn names(&self) -> impl Iterator<Item = &String> {
160 self.enums.keys()
161 }
162
163 pub fn value_matches_type(&self, value_enum_name: &str, type_name: &str) -> bool {
169 if value_enum_name == type_name {
171 return true;
172 }
173 false
176 }
177}
178
179impl std::fmt::Debug for ExecutionContext {
180 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
181 f.debug_struct("ExecutionContext")
182 .field("data_provider", &"<DataProvider>")
183 .field("current_id", &self.current_id)
184 .field("current_row_index", &self.current_row_index)
185 .field("variable_scopes", &self.variable_scopes)
186 .field("reference_datetime", &self.reference_datetime)
187 .field("current_timeframe", &self.current_timeframe)
188 .field("lookahead_guard", &self.lookahead_guard)
189 .finish()
190 }
191}
192
193impl ExecutionContext {
194 pub fn new_with_registry(
196 data: &DataFrame,
197 type_method_registry: Arc<TypeMethodRegistry>,
198 ) -> Self {
199 let current_row_index = if data.row_count() == 0 {
201 0
202 } else {
203 data.row_count() - 1
204 };
205
206 Self {
207 data_provider: None,
208 data_cache: None,
209 provider_registry: Arc::new(super::provider_registry::ProviderRegistry::new()),
210 type_mapping_registry: Arc::new(super::type_mapping::TypeMappingRegistry::new()),
211 type_schema_registry: Arc::new(TypeSchemaRegistry::with_stdlib_types()),
212 metadata_registry: Arc::new(MetadataRegistry::new()),
213 data_load_mode: DataLoadMode::default(),
214 current_id: Some(data.id.clone()),
215 current_row_index,
216 variable_scopes: vec![HashMap::new()], reference_datetime: None,
219 current_timeframe: Some(data.timeframe),
220 base_timeframe: Some(data.timeframe),
221 lookahead_guard: None,
222 type_method_registry,
223 date_range: None,
224 range_start: 0,
225 range_end: usize::MAX,
226 range_active: false,
227 pattern_registry: HashMap::new(),
228 annotation_context: AnnotationContext::new(),
229 annotation_registry: AnnotationRegistry::new(),
230 event_queue: None,
231 suspension_state: None,
232 alert_pipeline: None,
233 output_adapter: Box::new(crate::output_adapter::StdoutAdapter),
234 type_alias_registry: HashMap::new(),
235 enum_registry: EnumRegistry::new(),
236 struct_type_registry: HashMap::new(),
237 progress_registry: None,
238 }
239 }
240
241 pub fn new(data: &DataFrame) -> Self {
243 Self::new_with_registry(data, Arc::new(TypeMethodRegistry::new()))
244 }
245
246 pub fn new_empty_with_registry(type_method_registry: Arc<TypeMethodRegistry>) -> Self {
248 Self {
249 data_provider: None,
250 data_cache: None,
251 provider_registry: Arc::new(super::provider_registry::ProviderRegistry::new()),
252 type_mapping_registry: Arc::new(super::type_mapping::TypeMappingRegistry::new()),
253 type_schema_registry: Arc::new(TypeSchemaRegistry::with_stdlib_types()),
254 metadata_registry: Arc::new(MetadataRegistry::new()),
255 data_load_mode: DataLoadMode::default(),
256 current_id: None,
257 current_row_index: 0,
258 variable_scopes: vec![HashMap::new()], reference_datetime: None,
261 current_timeframe: None,
262 base_timeframe: None,
263 lookahead_guard: None,
264 type_method_registry,
265 date_range: None,
266 range_start: 0,
267 range_end: usize::MAX,
268 range_active: false,
269 pattern_registry: HashMap::new(),
270 annotation_context: AnnotationContext::new(),
271 annotation_registry: AnnotationRegistry::new(),
272 event_queue: None,
273 suspension_state: None,
274 alert_pipeline: None,
275 output_adapter: Box::new(crate::output_adapter::StdoutAdapter),
276 type_alias_registry: HashMap::new(),
277 enum_registry: EnumRegistry::new(),
278 struct_type_registry: HashMap::new(),
279 progress_registry: None,
280 }
281 }
282
283 pub fn new_empty() -> Self {
285 Self::new_empty_with_registry(Arc::new(TypeMethodRegistry::new()))
286 }
287
288 pub fn with_data_provider_and_registry(
290 data_provider: Arc<dyn std::any::Any + Send + Sync>,
291 type_method_registry: Arc<TypeMethodRegistry>,
292 ) -> Self {
293 Self {
294 data_provider: Some(data_provider),
295 data_cache: None,
296 provider_registry: Arc::new(super::provider_registry::ProviderRegistry::new()),
297 type_mapping_registry: Arc::new(super::type_mapping::TypeMappingRegistry::new()),
298 type_schema_registry: Arc::new(TypeSchemaRegistry::with_stdlib_types()),
299 metadata_registry: Arc::new(MetadataRegistry::new()),
300 data_load_mode: DataLoadMode::default(),
301 current_id: None,
302 current_row_index: 0,
303 variable_scopes: vec![HashMap::new()],
304 reference_datetime: None,
306 current_timeframe: None,
307 base_timeframe: None,
308 lookahead_guard: None,
309 type_method_registry,
310 date_range: None,
311 range_start: 0,
312 range_end: usize::MAX,
313 range_active: false,
314 pattern_registry: HashMap::new(),
315 annotation_context: AnnotationContext::new(),
316 annotation_registry: AnnotationRegistry::new(),
317 event_queue: None,
318 suspension_state: None,
319 alert_pipeline: None,
320 output_adapter: Box::new(crate::output_adapter::StdoutAdapter),
321 type_alias_registry: HashMap::new(),
322 enum_registry: EnumRegistry::new(),
323 struct_type_registry: HashMap::new(),
324 progress_registry: None,
325 }
326 }
327
328 pub fn with_data_provider(data_provider: Arc<dyn std::any::Any + Send + Sync>) -> Self {
330 Self::with_data_provider_and_registry(data_provider, Arc::new(TypeMethodRegistry::new()))
331 }
332
333 pub fn with_async_provider(
338 provider: crate::data::SharedAsyncProvider,
339 runtime: tokio::runtime::Handle,
340 ) -> Self {
341 let data_cache = crate::data::DataCache::new(provider, runtime);
342 Self {
343 data_provider: None,
344 data_cache: Some(data_cache),
345 provider_registry: Arc::new(super::provider_registry::ProviderRegistry::new()),
346 type_mapping_registry: Arc::new(super::type_mapping::TypeMappingRegistry::new()),
347 type_schema_registry: Arc::new(TypeSchemaRegistry::with_stdlib_types()),
348 metadata_registry: Arc::new(MetadataRegistry::new()),
349 data_load_mode: DataLoadMode::default(),
350 current_id: None,
351 current_row_index: 0,
352 variable_scopes: vec![HashMap::new()],
353 reference_datetime: None,
355 current_timeframe: None,
356 base_timeframe: None,
357 lookahead_guard: None,
358 type_method_registry: Arc::new(TypeMethodRegistry::new()),
359 date_range: None,
360 range_start: 0,
361 range_end: usize::MAX,
362 range_active: false,
363 pattern_registry: HashMap::new(),
364 annotation_context: AnnotationContext::new(),
365 annotation_registry: AnnotationRegistry::new(),
366 event_queue: None,
367 suspension_state: None,
368 alert_pipeline: None,
369 output_adapter: Box::new(crate::output_adapter::StdoutAdapter),
370 type_alias_registry: HashMap::new(),
371 enum_registry: EnumRegistry::new(),
372 struct_type_registry: HashMap::new(),
373 progress_registry: None,
374 }
375 }
376
377 pub fn set_output_adapter(&mut self, adapter: Box<dyn crate::output_adapter::OutputAdapter>) {
379 self.output_adapter = adapter;
380 }
381
382 pub fn output_adapter_mut(&mut self) -> &mut Box<dyn crate::output_adapter::OutputAdapter> {
384 &mut self.output_adapter
385 }
386
387 pub fn metadata_registry(&self) -> &Arc<MetadataRegistry> {
389 &self.metadata_registry
390 }
391
392 pub fn register_type_alias(
402 &mut self,
403 alias_name: &str,
404 base_type: &str,
405 overrides: Option<HashMap<String, KindedSlot>>,
406 ) {
407 self.type_alias_registry.insert(
408 alias_name.to_string(),
409 TypeAliasRuntimeEntry {
410 base_type: base_type.to_string(),
411 overrides,
412 },
413 );
414 }
415
416 pub fn lookup_type_alias(&self, name: &str) -> Option<&TypeAliasRuntimeEntry> {
420 self.type_alias_registry.get(name)
421 }
422
423 pub fn resolve_type_for_format(
429 &self,
430 type_name: &str,
431 ) -> (String, Option<HashMap<String, KindedSlot>>) {
432 if let Some(entry) = self.type_alias_registry.get(type_name) {
433 (entry.base_type.clone(), entry.overrides.clone())
434 } else {
435 (type_name.to_string(), None)
436 }
437 }
438
439 pub fn snapshot(&self, _store: &SnapshotStore) -> Result<ContextSnapshot> {
453 let _ = (
454 &self.variable_scopes,
455 &self.type_alias_registry,
456 &self.enum_registry,
457 &self.struct_type_registry,
458 &self.data_cache,
459 );
460 let _: Option<SuspensionStateSnapshot> = None;
461 let _: Option<TypeAliasRuntimeEntrySnapshot> = None;
462 let _: Option<VariableSnapshot> = None;
463 Err(anyhow!(
464 "ExecutionContext::snapshot: W17-snapshot-resume surface — \
465 kind-threaded `slot_to_serializable(slot, store)` replacement \
466 for the deleted `nanboxed_to_serializable` has not landed. \
467 Tracked as W17-snapshot-resume per \
468 docs/cluster-audits/phase-2d-playbook.md §3. ADR-006 §2.7.4 \
469 (snapshot serialization deferral) + §2.7.5.1 (post-proof \
470 wire-format shape for new HeapKinds: HashSet, Iterator, \
471 Result, Option, Deque, Channel, PriorityQueue, Range, \
472 Reference, FilterExpr, SharedCell)."
473 ))
474 }
475
476 pub fn restore_from_snapshot(
480 &mut self,
481 _snapshot: ContextSnapshot,
482 _store: &SnapshotStore,
483 ) -> Result<()> {
484 Err(anyhow!(
485 "ExecutionContext::restore_from_snapshot: W17-snapshot-resume \
486 surface — symmetric to `snapshot()`. The kinded \
487 `serializable_to_slot(sv, expected_kind, store)` inverse \
488 reconstructs scope-binding parallel kind tracks from the \
489 persisted discriminator. Tracked as W17-snapshot-resume per \
490 docs/cluster-audits/phase-2d-playbook.md §3. ADR-006 §2.7.4 \
491 + §2.7.5.1."
492 ))
493 }
494
495 pub fn set_event_queue(&mut self, queue: SharedEventQueue) {
499 self.event_queue = Some(queue);
500 }
501
502 pub fn event_queue(&self) -> Option<&SharedEventQueue> {
504 self.event_queue.as_ref()
505 }
506
507 pub fn event_queue_mut(&mut self) -> Option<&mut SharedEventQueue> {
509 self.event_queue.as_mut()
510 }
511
512 pub fn set_suspension_state(&mut self, state: SuspensionState) {
514 self.suspension_state = Some(state);
515 }
516
517 pub fn suspension_state(&self) -> Option<&SuspensionState> {
519 self.suspension_state.as_ref()
520 }
521
522 pub fn clear_suspension_state(&mut self) -> Option<SuspensionState> {
524 self.suspension_state.take()
525 }
526
527 pub fn is_suspended(&self) -> bool {
529 self.suspension_state.is_some()
530 }
531
532 pub fn set_alert_pipeline(&mut self, pipeline: Arc<AlertRouter>) {
534 self.alert_pipeline = Some(pipeline);
535 }
536
537 pub fn alert_pipeline(&self) -> Option<&Arc<AlertRouter>> {
539 self.alert_pipeline.as_ref()
540 }
541
542 pub fn emit_alert(&self, alert: super::alerts::Alert) {
544 if let Some(pipeline) = &self.alert_pipeline {
545 pipeline.emit(alert);
546 }
547 }
548
549 pub fn set_progress_registry(&mut self, registry: Arc<super::progress::ProgressRegistry>) {
551 self.progress_registry = Some(registry);
552 }
553
554 pub fn progress_registry(&self) -> Option<&Arc<super::progress::ProgressRegistry>> {
556 self.progress_registry.as_ref()
557 }
558
559}
560
561#[cfg(test)]
562mod tests {
563 use super::*;
564 use crate::data::{AsyncDataProvider, CacheKey, DataQuery, Timeframe};
565 use shape_ast::ast::VarKind;
566 use std::collections::HashMap;
567 use std::sync::Arc;
568 use std::sync::atomic::{AtomicUsize, Ordering};
569
570 #[test]
571 fn test_execution_context_new_empty() {
572 let ctx = ExecutionContext::new_empty();
573 assert_eq!(ctx.current_row_index(), 0);
574 }
575
576 #[test]
577 fn test_execution_context_set_current_row() {
578 let mut ctx = ExecutionContext::new_empty();
579 ctx.set_current_row(5).unwrap();
580 assert_eq!(ctx.current_row_index(), 5);
581 }
582
583 #[test]
584 fn test_execution_context_variable_scope() {
585 let mut ctx = ExecutionContext::new_empty();
586
587 ctx.set_variable("x", KindedSlot::from_number(10.0))
589 .unwrap_or_else(|_| {
590 });
593 }
594
595 #[test]
600 fn test_type_alias_registry_basic() {
601 let mut ctx = ExecutionContext::new_empty();
602
603 let mut overrides = HashMap::new();
605 overrides.insert("decimals".to_string(), KindedSlot::from_number(4.0));
606 ctx.register_type_alias("Percent4", "Percent", Some(overrides));
607
608 let entry = ctx.lookup_type_alias("Percent4");
610 assert!(entry.is_some());
611 let entry = entry.unwrap();
612 assert_eq!(entry.base_type, "Percent");
613 assert!(entry.overrides.is_some());
614
615 let overrides = entry.overrides.as_ref().unwrap();
616 assert_eq!(
617 overrides.get("decimals").map(|v| v.slot().as_f64()),
618 Some(4.0)
619 );
620 }
621
622 #[test]
623 fn test_type_alias_registry_no_overrides() {
624 let mut ctx = ExecutionContext::new_empty();
625
626 ctx.register_type_alias("MyPercent", "Percent", None);
628
629 let entry = ctx.lookup_type_alias("MyPercent");
630 assert!(entry.is_some());
631 let entry = entry.unwrap();
632 assert_eq!(entry.base_type, "Percent");
633 assert!(entry.overrides.is_none());
634 }
635
636 #[test]
637 fn test_type_alias_registry_unknown_type() {
638 let ctx = ExecutionContext::new_empty();
639
640 let entry = ctx.lookup_type_alias("NonExistent");
642 assert!(entry.is_none());
643 }
644
645 #[test]
646 fn test_resolve_type_for_format_alias() {
647 let mut ctx = ExecutionContext::new_empty();
648
649 let mut overrides = HashMap::new();
651 overrides.insert("decimals".to_string(), KindedSlot::from_number(4.0));
652 ctx.register_type_alias("Percent4", "Percent", Some(overrides.clone()));
653
654 let (base_type, resolved_overrides) = ctx.resolve_type_for_format("Percent4");
656 assert_eq!(base_type, "Percent");
657 assert!(resolved_overrides.is_some());
658 assert_eq!(
659 resolved_overrides
660 .unwrap()
661 .get("decimals")
662 .map(|v| v.slot().as_f64()),
663 Some(4.0)
664 );
665 }
666
667 #[test]
668 fn test_resolve_type_for_format_non_alias() {
669 let ctx = ExecutionContext::new_empty();
670
671 let (base_type, resolved_overrides) = ctx.resolve_type_for_format("Number");
673 assert_eq!(base_type, "Number");
674 assert!(resolved_overrides.is_none());
675 }
676
677 #[derive(Clone)]
678 struct TestAsyncProvider {
679 frames: Arc<HashMap<CacheKey, DataFrame>>,
680 load_calls: Arc<AtomicUsize>,
681 }
682
683 impl AsyncDataProvider for TestAsyncProvider {
684 fn load<'a>(
685 &'a self,
686 query: &'a DataQuery,
687 ) -> std::pin::Pin<
688 Box<
689 dyn std::future::Future<Output = Result<DataFrame, crate::data::AsyncDataError>>
690 + Send
691 + 'a,
692 >,
693 > {
694 let key = CacheKey::new(query.id.clone(), query.timeframe);
695 let frames = self.frames.clone();
696 let calls = self.load_calls.clone();
697 Box::pin(async move {
698 calls.fetch_add(1, Ordering::SeqCst);
699 frames
700 .get(&key)
701 .cloned()
702 .ok_or_else(|| crate::data::AsyncDataError::SymbolNotFound(query.id.clone()))
703 })
704 }
705
706 fn has_data(&self, symbol: &str, timeframe: &Timeframe) -> bool {
707 let key = CacheKey::new(symbol.to_string(), *timeframe);
708 self.frames.contains_key(&key)
709 }
710
711 fn symbols(&self) -> Vec<String> {
712 self.frames.keys().map(|k| k.id.clone()).collect()
713 }
714 }
715
716 #[allow(dead_code)]
721 fn _unused_snapshot_imports(
722 _provider: TestAsyncProvider,
723 _df: DataFrame,
724 _query: DataQuery,
725 _key: CacheKey,
726 _tf: Timeframe,
727 _kind: VarKind,
728 _kinded: KindedSlot,
729 _arc: Arc<()>,
730 _hashmap: HashMap<String, KindedSlot>,
731 _atomic: AtomicUsize,
732 _ordering: Ordering,
733 ) {
734 }
735
736 #[test]
741 fn test_w17_execution_context_snapshot_returns_structured_error() {
742 let tmp = tempfile::tempdir().expect("tempdir");
743 let store = SnapshotStore::new(tmp.path()).expect("snapshot store");
744 let ctx = ExecutionContext::new_empty();
745
746 let result = ctx.snapshot(&store);
747 let err = result.expect_err("expected Err, got Ok");
748 let msg = format!("{err}");
749 assert!(
750 msg.contains("W17-snapshot-resume surface"),
751 "missing W17 marker; got: {msg}"
752 );
753 assert!(
754 msg.contains("§2.7.4"),
755 "missing ADR-006 §2.7.4 cite; got: {msg}"
756 );
757 }
758}