mod config;
mod data_access;
mod data_cache;
mod registries;
mod scope;
mod variables;
pub use data_cache::DataLoadMode;
pub use variables::Variable;
use std::collections::HashMap;
use std::sync::Arc;
use super::alerts::AlertRouter;
use super::annotation_context::{AnnotationContext, AnnotationRegistry};
use super::data::DataFrame;
use super::event_queue::{SharedEventQueue, SuspensionState};
use super::lookahead_guard::LookAheadGuard;
use super::metadata::MetadataRegistry;
use super::type_methods::TypeMethodRegistry;
use super::type_schema::TypeSchemaRegistry;
use crate::data::Timeframe;
use crate::snapshot::{
ContextSnapshot, SnapshotStore, SuspensionStateSnapshot, TypeAliasRuntimeEntrySnapshot,
VariableSnapshot,
};
use anyhow::{Result, anyhow};
use chrono::{DateTime, Utc};
use shape_value::KindedSlot;
#[derive(Clone)]
pub struct ExecutionContext {
data_provider: Option<Arc<dyn std::any::Any + Send + Sync>>,
pub(crate) data_cache: Option<crate::data::DataCache>,
provider_registry: Arc<super::provider_registry::ProviderRegistry>,
type_mapping_registry: Arc<super::type_mapping::TypeMappingRegistry>,
type_schema_registry: Arc<TypeSchemaRegistry>,
metadata_registry: Arc<MetadataRegistry>,
data_load_mode: DataLoadMode,
current_id: Option<String>,
current_row_index: usize,
variable_scopes: Vec<HashMap<String, Variable>>,
reference_datetime: Option<DateTime<Utc>>,
current_timeframe: Option<Timeframe>,
base_timeframe: Option<Timeframe>,
lookahead_guard: Option<LookAheadGuard>,
type_method_registry: Arc<TypeMethodRegistry>,
date_range: Option<(DateTime<Utc>, DateTime<Utc>)>,
range_start: usize,
range_end: usize,
range_active: bool,
pattern_registry: HashMap<String, super::closure::Closure>,
annotation_context: AnnotationContext,
annotation_registry: AnnotationRegistry,
event_queue: Option<SharedEventQueue>,
suspension_state: Option<SuspensionState>,
alert_pipeline: Option<Arc<AlertRouter>>,
output_adapter: Box<dyn crate::output_adapter::OutputAdapter>,
type_alias_registry: HashMap<String, TypeAliasRuntimeEntry>,
enum_registry: EnumRegistry,
struct_type_registry: HashMap<String, shape_ast::ast::StructTypeDef>,
progress_registry: Option<Arc<super::progress::ProgressRegistry>>,
}
#[derive(Debug, Clone)]
pub struct TypeAliasRuntimeEntry {
pub base_type: String,
pub overrides: Option<HashMap<String, KindedSlot>>,
}
#[derive(Debug, Clone, Default)]
pub struct EnumRegistry {
enums: HashMap<String, shape_ast::ast::EnumDef>,
}
impl EnumRegistry {
pub fn new() -> Self {
Self {
enums: HashMap::new(),
}
}
pub fn register(&mut self, enum_def: shape_ast::ast::EnumDef) {
self.enums.insert(enum_def.name.clone(), enum_def);
}
pub fn get(&self, name: &str) -> Option<&shape_ast::ast::EnumDef> {
self.enums.get(name)
}
pub fn contains(&self, name: &str) -> bool {
self.enums.contains_key(name)
}
pub fn names(&self) -> impl Iterator<Item = &String> {
self.enums.keys()
}
pub fn value_matches_type(&self, value_enum_name: &str, type_name: &str) -> bool {
if value_enum_name == type_name {
return true;
}
false
}
}
impl std::fmt::Debug for ExecutionContext {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ExecutionContext")
.field("data_provider", &"<DataProvider>")
.field("current_id", &self.current_id)
.field("current_row_index", &self.current_row_index)
.field("variable_scopes", &self.variable_scopes)
.field("reference_datetime", &self.reference_datetime)
.field("current_timeframe", &self.current_timeframe)
.field("lookahead_guard", &self.lookahead_guard)
.finish()
}
}
impl ExecutionContext {
pub fn new_with_registry(
data: &DataFrame,
type_method_registry: Arc<TypeMethodRegistry>,
) -> Self {
let current_row_index = if data.row_count() == 0 {
0
} else {
data.row_count() - 1
};
Self {
data_provider: None,
data_cache: None,
provider_registry: Arc::new(super::provider_registry::ProviderRegistry::new()),
type_mapping_registry: Arc::new(super::type_mapping::TypeMappingRegistry::new()),
type_schema_registry: Arc::new(TypeSchemaRegistry::with_stdlib_types()),
metadata_registry: Arc::new(MetadataRegistry::new()),
data_load_mode: DataLoadMode::default(),
current_id: Some(data.id.clone()),
current_row_index,
variable_scopes: vec![HashMap::new()], reference_datetime: None,
current_timeframe: Some(data.timeframe),
base_timeframe: Some(data.timeframe),
lookahead_guard: None,
type_method_registry,
date_range: None,
range_start: 0,
range_end: usize::MAX,
range_active: false,
pattern_registry: HashMap::new(),
annotation_context: AnnotationContext::new(),
annotation_registry: AnnotationRegistry::new(),
event_queue: None,
suspension_state: None,
alert_pipeline: None,
output_adapter: Box::new(crate::output_adapter::StdoutAdapter),
type_alias_registry: HashMap::new(),
enum_registry: EnumRegistry::new(),
struct_type_registry: HashMap::new(),
progress_registry: None,
}
}
pub fn new(data: &DataFrame) -> Self {
Self::new_with_registry(data, Arc::new(TypeMethodRegistry::new()))
}
pub fn new_empty_with_registry(type_method_registry: Arc<TypeMethodRegistry>) -> Self {
Self {
data_provider: None,
data_cache: None,
provider_registry: Arc::new(super::provider_registry::ProviderRegistry::new()),
type_mapping_registry: Arc::new(super::type_mapping::TypeMappingRegistry::new()),
type_schema_registry: Arc::new(TypeSchemaRegistry::with_stdlib_types()),
metadata_registry: Arc::new(MetadataRegistry::new()),
data_load_mode: DataLoadMode::default(),
current_id: None,
current_row_index: 0,
variable_scopes: vec![HashMap::new()], reference_datetime: None,
current_timeframe: None,
base_timeframe: None,
lookahead_guard: None,
type_method_registry,
date_range: None,
range_start: 0,
range_end: usize::MAX,
range_active: false,
pattern_registry: HashMap::new(),
annotation_context: AnnotationContext::new(),
annotation_registry: AnnotationRegistry::new(),
event_queue: None,
suspension_state: None,
alert_pipeline: None,
output_adapter: Box::new(crate::output_adapter::StdoutAdapter),
type_alias_registry: HashMap::new(),
enum_registry: EnumRegistry::new(),
struct_type_registry: HashMap::new(),
progress_registry: None,
}
}
pub fn new_empty() -> Self {
Self::new_empty_with_registry(Arc::new(TypeMethodRegistry::new()))
}
pub fn with_data_provider_and_registry(
data_provider: Arc<dyn std::any::Any + Send + Sync>,
type_method_registry: Arc<TypeMethodRegistry>,
) -> Self {
Self {
data_provider: Some(data_provider),
data_cache: None,
provider_registry: Arc::new(super::provider_registry::ProviderRegistry::new()),
type_mapping_registry: Arc::new(super::type_mapping::TypeMappingRegistry::new()),
type_schema_registry: Arc::new(TypeSchemaRegistry::with_stdlib_types()),
metadata_registry: Arc::new(MetadataRegistry::new()),
data_load_mode: DataLoadMode::default(),
current_id: None,
current_row_index: 0,
variable_scopes: vec![HashMap::new()],
reference_datetime: None,
current_timeframe: None,
base_timeframe: None,
lookahead_guard: None,
type_method_registry,
date_range: None,
range_start: 0,
range_end: usize::MAX,
range_active: false,
pattern_registry: HashMap::new(),
annotation_context: AnnotationContext::new(),
annotation_registry: AnnotationRegistry::new(),
event_queue: None,
suspension_state: None,
alert_pipeline: None,
output_adapter: Box::new(crate::output_adapter::StdoutAdapter),
type_alias_registry: HashMap::new(),
enum_registry: EnumRegistry::new(),
struct_type_registry: HashMap::new(),
progress_registry: None,
}
}
pub fn with_data_provider(data_provider: Arc<dyn std::any::Any + Send + Sync>) -> Self {
Self::with_data_provider_and_registry(data_provider, Arc::new(TypeMethodRegistry::new()))
}
pub fn with_async_provider(
provider: crate::data::SharedAsyncProvider,
runtime: tokio::runtime::Handle,
) -> Self {
let data_cache = crate::data::DataCache::new(provider, runtime);
Self {
data_provider: None,
data_cache: Some(data_cache),
provider_registry: Arc::new(super::provider_registry::ProviderRegistry::new()),
type_mapping_registry: Arc::new(super::type_mapping::TypeMappingRegistry::new()),
type_schema_registry: Arc::new(TypeSchemaRegistry::with_stdlib_types()),
metadata_registry: Arc::new(MetadataRegistry::new()),
data_load_mode: DataLoadMode::default(),
current_id: None,
current_row_index: 0,
variable_scopes: vec![HashMap::new()],
reference_datetime: None,
current_timeframe: None,
base_timeframe: None,
lookahead_guard: None,
type_method_registry: Arc::new(TypeMethodRegistry::new()),
date_range: None,
range_start: 0,
range_end: usize::MAX,
range_active: false,
pattern_registry: HashMap::new(),
annotation_context: AnnotationContext::new(),
annotation_registry: AnnotationRegistry::new(),
event_queue: None,
suspension_state: None,
alert_pipeline: None,
output_adapter: Box::new(crate::output_adapter::StdoutAdapter),
type_alias_registry: HashMap::new(),
enum_registry: EnumRegistry::new(),
struct_type_registry: HashMap::new(),
progress_registry: None,
}
}
pub fn set_output_adapter(&mut self, adapter: Box<dyn crate::output_adapter::OutputAdapter>) {
self.output_adapter = adapter;
}
pub fn output_adapter_mut(&mut self) -> &mut Box<dyn crate::output_adapter::OutputAdapter> {
&mut self.output_adapter
}
pub fn metadata_registry(&self) -> &Arc<MetadataRegistry> {
&self.metadata_registry
}
pub fn register_type_alias(
&mut self,
alias_name: &str,
base_type: &str,
overrides: Option<HashMap<String, KindedSlot>>,
) {
self.type_alias_registry.insert(
alias_name.to_string(),
TypeAliasRuntimeEntry {
base_type: base_type.to_string(),
overrides,
},
);
}
pub fn lookup_type_alias(&self, name: &str) -> Option<&TypeAliasRuntimeEntry> {
self.type_alias_registry.get(name)
}
pub fn resolve_type_for_format(
&self,
type_name: &str,
) -> (String, Option<HashMap<String, KindedSlot>>) {
if let Some(entry) = self.type_alias_registry.get(type_name) {
(entry.base_type.clone(), entry.overrides.clone())
} else {
(type_name.to_string(), None)
}
}
pub fn snapshot(&self, _store: &SnapshotStore) -> Result<ContextSnapshot> {
let _ = (
&self.variable_scopes,
&self.type_alias_registry,
&self.enum_registry,
&self.struct_type_registry,
&self.data_cache,
);
let _: Option<SuspensionStateSnapshot> = None;
let _: Option<TypeAliasRuntimeEntrySnapshot> = None;
let _: Option<VariableSnapshot> = None;
Err(anyhow!(
"ExecutionContext::snapshot: W17-snapshot-resume surface — \
kind-threaded `slot_to_serializable(slot, store)` replacement \
for the deleted `nanboxed_to_serializable` has not landed. \
Tracked as W17-snapshot-resume per \
docs/cluster-audits/phase-2d-playbook.md §3. ADR-006 §2.7.4 \
(snapshot serialization deferral) + §2.7.5.1 (post-proof \
wire-format shape for new HeapKinds: HashSet, Iterator, \
Result, Option, Deque, Channel, PriorityQueue, Range, \
Reference, FilterExpr, SharedCell)."
))
}
pub fn restore_from_snapshot(
&mut self,
_snapshot: ContextSnapshot,
_store: &SnapshotStore,
) -> Result<()> {
Err(anyhow!(
"ExecutionContext::restore_from_snapshot: W17-snapshot-resume \
surface — symmetric to `snapshot()`. The kinded \
`serializable_to_slot(sv, expected_kind, store)` inverse \
reconstructs scope-binding parallel kind tracks from the \
persisted discriminator. Tracked as W17-snapshot-resume per \
docs/cluster-audits/phase-2d-playbook.md §3. ADR-006 §2.7.4 \
+ §2.7.5.1."
))
}
pub fn set_event_queue(&mut self, queue: SharedEventQueue) {
self.event_queue = Some(queue);
}
pub fn event_queue(&self) -> Option<&SharedEventQueue> {
self.event_queue.as_ref()
}
pub fn event_queue_mut(&mut self) -> Option<&mut SharedEventQueue> {
self.event_queue.as_mut()
}
pub fn set_suspension_state(&mut self, state: SuspensionState) {
self.suspension_state = Some(state);
}
pub fn suspension_state(&self) -> Option<&SuspensionState> {
self.suspension_state.as_ref()
}
pub fn clear_suspension_state(&mut self) -> Option<SuspensionState> {
self.suspension_state.take()
}
pub fn is_suspended(&self) -> bool {
self.suspension_state.is_some()
}
pub fn set_alert_pipeline(&mut self, pipeline: Arc<AlertRouter>) {
self.alert_pipeline = Some(pipeline);
}
pub fn alert_pipeline(&self) -> Option<&Arc<AlertRouter>> {
self.alert_pipeline.as_ref()
}
pub fn emit_alert(&self, alert: super::alerts::Alert) {
if let Some(pipeline) = &self.alert_pipeline {
pipeline.emit(alert);
}
}
pub fn set_progress_registry(&mut self, registry: Arc<super::progress::ProgressRegistry>) {
self.progress_registry = Some(registry);
}
pub fn progress_registry(&self) -> Option<&Arc<super::progress::ProgressRegistry>> {
self.progress_registry.as_ref()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::data::{AsyncDataProvider, CacheKey, DataQuery, Timeframe};
use shape_ast::ast::VarKind;
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
#[test]
fn test_execution_context_new_empty() {
let ctx = ExecutionContext::new_empty();
assert_eq!(ctx.current_row_index(), 0);
}
#[test]
fn test_execution_context_set_current_row() {
let mut ctx = ExecutionContext::new_empty();
ctx.set_current_row(5).unwrap();
assert_eq!(ctx.current_row_index(), 5);
}
#[test]
fn test_execution_context_variable_scope() {
let mut ctx = ExecutionContext::new_empty();
ctx.set_variable("x", KindedSlot::from_number(10.0))
.unwrap_or_else(|_| {
});
}
#[test]
fn test_type_alias_registry_basic() {
let mut ctx = ExecutionContext::new_empty();
let mut overrides = HashMap::new();
overrides.insert("decimals".to_string(), KindedSlot::from_number(4.0));
ctx.register_type_alias("Percent4", "Percent", Some(overrides));
let entry = ctx.lookup_type_alias("Percent4");
assert!(entry.is_some());
let entry = entry.unwrap();
assert_eq!(entry.base_type, "Percent");
assert!(entry.overrides.is_some());
let overrides = entry.overrides.as_ref().unwrap();
assert_eq!(
overrides.get("decimals").map(|v| v.slot().as_f64()),
Some(4.0)
);
}
#[test]
fn test_type_alias_registry_no_overrides() {
let mut ctx = ExecutionContext::new_empty();
ctx.register_type_alias("MyPercent", "Percent", None);
let entry = ctx.lookup_type_alias("MyPercent");
assert!(entry.is_some());
let entry = entry.unwrap();
assert_eq!(entry.base_type, "Percent");
assert!(entry.overrides.is_none());
}
#[test]
fn test_type_alias_registry_unknown_type() {
let ctx = ExecutionContext::new_empty();
let entry = ctx.lookup_type_alias("NonExistent");
assert!(entry.is_none());
}
#[test]
fn test_resolve_type_for_format_alias() {
let mut ctx = ExecutionContext::new_empty();
let mut overrides = HashMap::new();
overrides.insert("decimals".to_string(), KindedSlot::from_number(4.0));
ctx.register_type_alias("Percent4", "Percent", Some(overrides.clone()));
let (base_type, resolved_overrides) = ctx.resolve_type_for_format("Percent4");
assert_eq!(base_type, "Percent");
assert!(resolved_overrides.is_some());
assert_eq!(
resolved_overrides
.unwrap()
.get("decimals")
.map(|v| v.slot().as_f64()),
Some(4.0)
);
}
#[test]
fn test_resolve_type_for_format_non_alias() {
let ctx = ExecutionContext::new_empty();
let (base_type, resolved_overrides) = ctx.resolve_type_for_format("Number");
assert_eq!(base_type, "Number");
assert!(resolved_overrides.is_none());
}
#[derive(Clone)]
struct TestAsyncProvider {
frames: Arc<HashMap<CacheKey, DataFrame>>,
load_calls: Arc<AtomicUsize>,
}
impl AsyncDataProvider for TestAsyncProvider {
fn load<'a>(
&'a self,
query: &'a DataQuery,
) -> std::pin::Pin<
Box<
dyn std::future::Future<Output = Result<DataFrame, crate::data::AsyncDataError>>
+ Send
+ 'a,
>,
> {
let key = CacheKey::new(query.id.clone(), query.timeframe);
let frames = self.frames.clone();
let calls = self.load_calls.clone();
Box::pin(async move {
calls.fetch_add(1, Ordering::SeqCst);
frames
.get(&key)
.cloned()
.ok_or_else(|| crate::data::AsyncDataError::SymbolNotFound(query.id.clone()))
})
}
fn has_data(&self, symbol: &str, timeframe: &Timeframe) -> bool {
let key = CacheKey::new(symbol.to_string(), *timeframe);
self.frames.contains_key(&key)
}
fn symbols(&self) -> Vec<String> {
self.frames.keys().map(|k| k.id.clone()).collect()
}
}
#[allow(dead_code)]
fn _unused_snapshot_imports(
_provider: TestAsyncProvider,
_df: DataFrame,
_query: DataQuery,
_key: CacheKey,
_tf: Timeframe,
_kind: VarKind,
_kinded: KindedSlot,
_arc: Arc<()>,
_hashmap: HashMap<String, KindedSlot>,
_atomic: AtomicUsize,
_ordering: Ordering,
) {
}
#[test]
fn test_w17_execution_context_snapshot_returns_structured_error() {
let tmp = tempfile::tempdir().expect("tempdir");
let store = SnapshotStore::new(tmp.path()).expect("snapshot store");
let ctx = ExecutionContext::new_empty();
let result = ctx.snapshot(&store);
let err = result.expect_err("expected Err, got Ok");
let msg = format!("{err}");
assert!(
msg.contains("W17-snapshot-resume surface"),
"missing W17 marker; got: {msg}"
);
assert!(
msg.contains("§2.7.4"),
"missing ADR-006 §2.7.4 cite; got: {msg}"
);
}
}