Skip to main content

hyperstack_interpreter/
vm.rs

1use crate::ast::{
2    self, BinaryOp, ComparisonOp, ComputedExpr, ComputedFieldSpec, FieldPath, ResolveStrategy,
3    ResolverExtractSpec, ResolverType, Transformation,
4};
5use crate::compiler::{MultiEntityBytecode, OpCode};
6use crate::Mutation;
7use dashmap::DashMap;
8use lru::LruCache;
9use serde_json::{json, Value};
10use std::collections::{HashMap, HashSet, VecDeque};
11use std::num::NonZeroUsize;
12
13#[cfg(feature = "otel")]
14use tracing::instrument;
15/// Context metadata for blockchain updates (accounts and instructions)
16/// This structure is designed to be extended over time with additional metadata
17#[derive(Debug, Clone, Default)]
18pub struct UpdateContext {
19    /// Blockchain slot number
20    pub slot: Option<u64>,
21    /// Transaction signature
22    pub signature: Option<String>,
23    /// Unix timestamp (seconds since epoch)
24    /// If not provided, will default to current system time when accessed
25    pub timestamp: Option<i64>,
26    /// Write version for account updates (monotonically increasing per account within a slot)
27    /// Used for staleness detection to reject out-of-order updates
28    pub write_version: Option<u64>,
29    /// Transaction index for instruction updates (orders transactions within a slot)
30    /// Used for staleness detection to reject out-of-order updates
31    pub txn_index: Option<u64>,
32    /// When true, QueueResolver opcodes are skipped during handler execution.
33    /// Set for reprocessed cached data from PDA mapping changes to prevent
34    /// stale data from triggering resolvers or locking in wrong values via SetOnce.
35    pub skip_resolvers: bool,
36    /// Additional custom metadata that can be added without breaking changes
37    pub metadata: HashMap<String, Value>,
38}
39
40impl UpdateContext {
41    /// Create a new UpdateContext with slot and signature
42    pub fn new(slot: u64, signature: String) -> Self {
43        Self {
44            slot: Some(slot),
45            signature: Some(signature),
46            timestamp: None,
47            write_version: None,
48            txn_index: None,
49            skip_resolvers: false,
50            metadata: HashMap::new(),
51        }
52    }
53
54    /// Create a new UpdateContext with slot, signature, and timestamp
55    pub fn with_timestamp(slot: u64, signature: String, timestamp: i64) -> Self {
56        Self {
57            slot: Some(slot),
58            signature: Some(signature),
59            timestamp: Some(timestamp),
60            write_version: None,
61            txn_index: None,
62            skip_resolvers: false,
63            metadata: HashMap::new(),
64        }
65    }
66
67    /// Create context for account updates with write_version for staleness detection
68    pub fn new_account(slot: u64, signature: String, write_version: u64) -> Self {
69        Self {
70            slot: Some(slot),
71            signature: Some(signature),
72            timestamp: None,
73            write_version: Some(write_version),
74            txn_index: None,
75            skip_resolvers: false,
76            metadata: HashMap::new(),
77        }
78    }
79
80    /// Create context for instruction updates with txn_index for staleness detection
81    pub fn new_instruction(slot: u64, signature: String, txn_index: u64) -> Self {
82        Self {
83            slot: Some(slot),
84            signature: Some(signature),
85            timestamp: None,
86            write_version: None,
87            txn_index: Some(txn_index),
88            skip_resolvers: false,
89            metadata: HashMap::new(),
90        }
91    }
92
93    /// Create context for reprocessed cached account data from PDA mapping changes.
94    /// Uses empty signature to prevent `when` guards from matching stale instructions,
95    /// and sets skip_resolvers to prevent stale scheduling/SetOnce lock-in.
96    pub fn new_reprocessed(slot: u64, write_version: u64) -> Self {
97        Self {
98            slot: Some(slot),
99            signature: None,
100            timestamp: None,
101            write_version: Some(write_version),
102            txn_index: None,
103            skip_resolvers: true,
104            metadata: HashMap::new(),
105        }
106    }
107
108    /// Get the timestamp, falling back to current system time if not set
109    pub fn timestamp(&self) -> i64 {
110        self.timestamp.unwrap_or_else(|| {
111            std::time::SystemTime::now()
112                .duration_since(std::time::UNIX_EPOCH)
113                .unwrap()
114                .as_secs() as i64
115        })
116    }
117
118    /// Create an empty context (for testing or when context is not available)
119    pub fn empty() -> Self {
120        Self::default()
121    }
122
123    /// Add custom metadata
124    /// Returns true if this is an account update context (has write_version, no txn_index)
125    pub fn is_account_update(&self) -> bool {
126        self.write_version.is_some() && self.txn_index.is_none()
127    }
128
129    /// Returns true if this is an instruction update context (has txn_index, no write_version)
130    pub fn is_instruction_update(&self) -> bool {
131        self.txn_index.is_some() && self.write_version.is_none()
132    }
133
134    pub fn with_metadata(mut self, key: String, value: Value) -> Self {
135        self.metadata.insert(key, value);
136        self
137    }
138
139    /// Get metadata value
140    pub fn get_metadata(&self, key: &str) -> Option<&Value> {
141        self.metadata.get(key)
142    }
143
144    /// Convert context to JSON value for injection into event data
145    pub fn to_value(&self) -> Value {
146        let mut obj = serde_json::Map::new();
147        if let Some(slot) = self.slot {
148            obj.insert("slot".to_string(), json!(slot));
149        }
150        if let Some(ref sig) = self.signature {
151            obj.insert("signature".to_string(), json!(sig));
152        }
153        // Always include timestamp (use current time if not set)
154        obj.insert("timestamp".to_string(), json!(self.timestamp()));
155        for (key, value) in &self.metadata {
156            obj.insert(key.clone(), value.clone());
157        }
158        Value::Object(obj)
159    }
160}
161
162pub type Register = usize;
163pub type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>;
164
165pub type RegisterValue = Value;
166
167/// Trait for evaluating computed fields
168/// Implement this in your generated spec to enable computed field evaluation
169pub trait ComputedFieldsEvaluator {
170    fn evaluate(&self, state: &mut Value) -> Result<()>;
171}
172
173// Pending queue configuration
174const MAX_PENDING_UPDATES_TOTAL: usize = 2_500;
175const MAX_PENDING_UPDATES_PER_PDA: usize = 50;
176const PENDING_UPDATE_TTL_SECONDS: i64 = 300; // 5 minutes
177
178// Temporal index configuration - prevents unbounded history growth
179const TEMPORAL_HISTORY_TTL_SECONDS: i64 = 300; // 5 minutes, matches pending queue TTL
180const MAX_TEMPORAL_ENTRIES_PER_KEY: usize = 250;
181
182// State table configuration - aligned with downstream EntityCache (500 per view)
183const DEFAULT_MAX_STATE_TABLE_ENTRIES: usize = 2_500;
184const DEFAULT_MAX_ARRAY_LENGTH: usize = 100;
185
186const DEFAULT_MAX_LOOKUP_INDEX_ENTRIES: usize = 2_500;
187
188const DEFAULT_MAX_VERSION_TRACKER_ENTRIES: usize = 2_500;
189
190// Smaller cache for instruction deduplication - provides shorter effective TTL
191// since we don't expect instruction duplicates to arrive much later
192const DEFAULT_MAX_INSTRUCTION_DEDUP_ENTRIES: usize = 500;
193
194const DEFAULT_MAX_TEMPORAL_INDEX_KEYS: usize = 2_500;
195
196const DEFAULT_MAX_PDA_REVERSE_LOOKUP_ENTRIES: usize = 2_500;
197
198const DEFAULT_MAX_RESOLVER_CACHE_ENTRIES: usize = 5_000;
199
200/// Estimate the size of a JSON value in bytes
201fn estimate_json_size(value: &Value) -> usize {
202    match value {
203        Value::Null => 4,
204        Value::Bool(_) => 5,
205        Value::Number(_) => 8,
206        Value::String(s) => s.len() + 2,
207        Value::Array(arr) => 2 + arr.iter().map(|v| estimate_json_size(v) + 1).sum::<usize>(),
208        Value::Object(obj) => {
209            2 + obj
210                .iter()
211                .map(|(k, v)| k.len() + 3 + estimate_json_size(v) + 1)
212                .sum::<usize>()
213        }
214    }
215}
216
217#[derive(Debug, Clone)]
218pub struct CompiledPath {
219    pub segments: std::sync::Arc<[String]>,
220}
221
222impl CompiledPath {
223    pub fn new(path: &str) -> Self {
224        let segments: Vec<String> = path.split('.').map(|s| s.to_string()).collect();
225        CompiledPath {
226            segments: segments.into(),
227        }
228    }
229
230    fn segments(&self) -> &[String] {
231        &self.segments
232    }
233}
234
235/// Represents the type of change made to a field for granular dirty tracking.
236/// This enables emitting only the actual changes rather than entire field values.
237#[derive(Debug, Clone)]
238pub enum FieldChange {
239    /// Field was replaced with a new value (emit the full value from state)
240    Replaced,
241    /// Items were appended to an array field (emit only the new items)
242    Appended(Vec<Value>),
243}
244
245/// Tracks field modifications during handler execution with granular change information.
246/// This replaces the simple HashSet<String> approach to enable delta-only emissions.
247#[derive(Debug, Clone, Default)]
248pub struct DirtyTracker {
249    changes: HashMap<String, FieldChange>,
250}
251
252impl DirtyTracker {
253    /// Create a new empty DirtyTracker
254    pub fn new() -> Self {
255        Self {
256            changes: HashMap::new(),
257        }
258    }
259
260    /// Mark a field as replaced (full value will be emitted)
261    pub fn mark_replaced(&mut self, path: &str) {
262        // If there was an append, it's now superseded by a full replacement
263        self.changes.insert(path.to_string(), FieldChange::Replaced);
264    }
265
266    /// Record an appended value for a field
267    pub fn mark_appended(&mut self, path: &str, value: Value) {
268        match self.changes.get_mut(path) {
269            Some(FieldChange::Appended(values)) => {
270                // Add to existing appended values
271                values.push(value);
272            }
273            Some(FieldChange::Replaced) => {
274                // Field was already replaced, keep it as replaced
275                // (the full value including the append will be emitted)
276            }
277            None => {
278                // First append to this field
279                self.changes
280                    .insert(path.to_string(), FieldChange::Appended(vec![value]));
281            }
282        }
283    }
284
285    /// Check if there are any changes tracked
286    pub fn is_empty(&self) -> bool {
287        self.changes.is_empty()
288    }
289
290    /// Get the number of changed fields
291    pub fn len(&self) -> usize {
292        self.changes.len()
293    }
294
295    /// Iterate over all changes
296    pub fn iter(&self) -> impl Iterator<Item = (&String, &FieldChange)> {
297        self.changes.iter()
298    }
299
300    /// Get a set of all dirty field paths (for backward compatibility)
301    pub fn dirty_paths(&self) -> HashSet<String> {
302        self.changes.keys().cloned().collect()
303    }
304
305    /// Consume the tracker and return the changes map
306    pub fn into_changes(self) -> HashMap<String, FieldChange> {
307        self.changes
308    }
309
310    /// Get a reference to the changes map
311    pub fn changes(&self) -> &HashMap<String, FieldChange> {
312        &self.changes
313    }
314
315    /// Get paths that were appended (not replaced)
316    pub fn appended_paths(&self) -> Vec<String> {
317        self.changes
318            .iter()
319            .filter_map(|(path, change)| match change {
320                FieldChange::Appended(_) => Some(path.clone()),
321                FieldChange::Replaced => None,
322            })
323            .collect()
324    }
325}
326
327pub struct VmContext {
328    registers: Vec<RegisterValue>,
329    states: HashMap<u32, StateTable>,
330    pub instructions_executed: u64,
331    pub cache_hits: u64,
332    path_cache: HashMap<String, CompiledPath>,
333    pub pda_cache_hits: u64,
334    pub pda_cache_misses: u64,
335    pub pending_queue_size: u64,
336    resolver_requests: VecDeque<ResolverRequest>,
337    resolver_pending: HashMap<String, PendingResolverEntry>,
338    resolver_cache: LruCache<String, Value>,
339    pub resolver_cache_hits: u64,
340    pub resolver_cache_misses: u64,
341    current_context: Option<UpdateContext>,
342    warnings: Vec<String>,
343    last_pda_lookup_miss: Option<String>,
344    last_lookup_index_miss: Option<String>,
345    last_pda_registered: Option<String>,
346    last_lookup_index_keys: Vec<String>,
347    scheduled_callbacks: Vec<(u64, ScheduledCallback)>,
348}
349
350#[derive(Debug)]
351pub struct LookupIndex {
352    index: std::sync::Mutex<LruCache<String, Value>>,
353}
354
355impl LookupIndex {
356    pub fn new() -> Self {
357        Self::with_capacity(DEFAULT_MAX_LOOKUP_INDEX_ENTRIES)
358    }
359
360    pub fn with_capacity(capacity: usize) -> Self {
361        LookupIndex {
362            index: std::sync::Mutex::new(LruCache::new(
363                NonZeroUsize::new(capacity).expect("capacity must be > 0"),
364            )),
365        }
366    }
367
368    pub fn lookup(&self, lookup_value: &Value) -> Option<Value> {
369        let key = value_to_cache_key(lookup_value);
370        self.index.lock().unwrap().get(&key).cloned()
371    }
372
373    pub fn insert(&self, lookup_value: Value, primary_key: Value) {
374        let key = value_to_cache_key(&lookup_value);
375        self.index.lock().unwrap().put(key, primary_key);
376    }
377
378    /// Remove an entry by lookup value.  Used when a PDA mapping changes to
379    /// clear stale entries that would otherwise shadow the updated PDA mapping.
380    pub fn remove(&self, lookup_value: &Value) {
381        let key = value_to_cache_key(lookup_value);
382        self.index.lock().unwrap().pop(&key);
383    }
384
385    pub fn len(&self) -> usize {
386        self.index.lock().unwrap().len()
387    }
388
389    pub fn is_empty(&self) -> bool {
390        self.index.lock().unwrap().is_empty()
391    }
392}
393
394impl Default for LookupIndex {
395    fn default() -> Self {
396        Self::new()
397    }
398}
399
400fn value_to_cache_key(value: &Value) -> String {
401    match value {
402        Value::String(s) => s.clone(),
403        Value::Number(n) => n.to_string(),
404        Value::Bool(b) => b.to_string(),
405        Value::Null => "null".to_string(),
406        _ => serde_json::to_string(value).unwrap_or_else(|_| "unknown".to_string()),
407    }
408}
409
410fn resolver_type_key(resolver: &ResolverType) -> String {
411    match resolver {
412        ResolverType::Token => "token".to_string(),
413        ResolverType::Url(config) => match &config.url_source {
414            ast::UrlSource::FieldPath(path) => format!("url:{}", path),
415            ast::UrlSource::Template(parts) => {
416                let key: String = parts
417                    .iter()
418                    .map(|p| match p {
419                        ast::UrlTemplatePart::Literal(s) => s.clone(),
420                        ast::UrlTemplatePart::FieldRef(f) => format!("{{{}}}", f),
421                    })
422                    .collect();
423                format!("url:{}", key)
424            }
425        },
426    }
427}
428
429fn resolver_cache_key(resolver: &ResolverType, input: &Value) -> String {
430    format!(
431        "{}:{}",
432        resolver_type_key(resolver),
433        value_to_cache_key(input)
434    )
435}
436
437#[derive(Debug)]
438pub struct TemporalIndex {
439    index: std::sync::Mutex<LruCache<String, Vec<(Value, i64)>>>,
440}
441
442impl Default for TemporalIndex {
443    fn default() -> Self {
444        Self::new()
445    }
446}
447
448impl TemporalIndex {
449    pub fn new() -> Self {
450        Self::with_capacity(DEFAULT_MAX_TEMPORAL_INDEX_KEYS)
451    }
452
453    pub fn with_capacity(capacity: usize) -> Self {
454        TemporalIndex {
455            index: std::sync::Mutex::new(LruCache::new(
456                NonZeroUsize::new(capacity).expect("capacity must be > 0"),
457            )),
458        }
459    }
460
461    pub fn lookup(&self, lookup_value: &Value, timestamp: i64) -> Option<Value> {
462        let key = value_to_cache_key(lookup_value);
463        let mut cache = self.index.lock().unwrap();
464        if let Some(entries) = cache.get(&key) {
465            for i in (0..entries.len()).rev() {
466                if entries[i].1 <= timestamp {
467                    return Some(entries[i].0.clone());
468                }
469            }
470        }
471        None
472    }
473
474    pub fn lookup_latest(&self, lookup_value: &Value) -> Option<Value> {
475        let key = value_to_cache_key(lookup_value);
476        let mut cache = self.index.lock().unwrap();
477        if let Some(entries) = cache.get(&key) {
478            if let Some(last) = entries.last() {
479                return Some(last.0.clone());
480            }
481        }
482        None
483    }
484
485    pub fn insert(&self, lookup_value: Value, primary_key: Value, timestamp: i64) {
486        let key = value_to_cache_key(&lookup_value);
487        let mut cache = self.index.lock().unwrap();
488
489        let entries = cache.get_or_insert_mut(key, Vec::new);
490        entries.push((primary_key, timestamp));
491        entries.sort_by_key(|(_, ts)| *ts);
492
493        let cutoff = timestamp - TEMPORAL_HISTORY_TTL_SECONDS;
494        entries.retain(|(_, ts)| *ts >= cutoff);
495
496        if entries.len() > MAX_TEMPORAL_ENTRIES_PER_KEY {
497            let excess = entries.len() - MAX_TEMPORAL_ENTRIES_PER_KEY;
498            entries.drain(0..excess);
499        }
500    }
501
502    pub fn len(&self) -> usize {
503        self.index.lock().unwrap().len()
504    }
505
506    pub fn is_empty(&self) -> bool {
507        self.index.lock().unwrap().is_empty()
508    }
509
510    pub fn total_entries(&self) -> usize {
511        self.index
512            .lock()
513            .unwrap()
514            .iter()
515            .map(|(_, entries)| entries.len())
516            .sum()
517    }
518
519    pub fn cleanup_expired(&self, cutoff_timestamp: i64) -> usize {
520        let mut cache = self.index.lock().unwrap();
521        let mut total_removed = 0;
522
523        for (_, entries) in cache.iter_mut() {
524            let original_len = entries.len();
525            entries.retain(|(_, ts)| *ts >= cutoff_timestamp);
526            total_removed += original_len - entries.len();
527        }
528
529        total_removed
530    }
531}
532
533#[derive(Debug)]
534pub struct PdaReverseLookup {
535    // Maps: PDA address -> seed value (e.g., bonding_curve_addr -> mint)
536    index: LruCache<String, String>,
537}
538
539impl PdaReverseLookup {
540    pub fn new(capacity: usize) -> Self {
541        PdaReverseLookup {
542            index: LruCache::new(NonZeroUsize::new(capacity).unwrap()),
543        }
544    }
545
546    pub fn lookup(&mut self, pda_address: &str) -> Option<String> {
547        self.index.get(pda_address).cloned()
548    }
549
550    pub fn insert(&mut self, pda_address: String, seed_value: String) -> Option<String> {
551        let evicted = if self.index.len() >= self.index.cap().get() {
552            self.index.peek_lru().map(|(k, _)| k.clone())
553        } else {
554            None
555        };
556
557        self.index.put(pda_address, seed_value);
558        evicted
559    }
560
561    pub fn len(&self) -> usize {
562        self.index.len()
563    }
564
565    pub fn is_empty(&self) -> bool {
566        self.index.is_empty()
567    }
568
569    pub fn contains(&self, pda_address: &str) -> bool {
570        self.index.peek(pda_address).is_some()
571    }
572}
573
574/// Input for queueing an account update.
575#[derive(Debug, Clone)]
576pub struct QueuedAccountUpdate {
577    pub pda_address: String,
578    pub account_type: String,
579    pub account_data: Value,
580    pub slot: u64,
581    pub write_version: u64,
582    pub signature: String,
583}
584
585/// Internal representation of a pending account update with queue metadata.
586#[derive(Debug, Clone)]
587pub struct PendingAccountUpdate {
588    pub account_type: String,
589    pub pda_address: String,
590    pub account_data: Value,
591    pub slot: u64,
592    pub write_version: u64,
593    pub signature: String,
594    pub queued_at: i64,
595    /// When true, this update was pulled from the `last_account_data` cache
596    /// during a PDA mapping change. It carries stale data from a previous
597    /// entity mapping and should use `UpdateContext::new_reprocessed` to
598    /// prevent `when` guards from matching stale instruction signatures
599    /// and to skip resolver scheduling.
600    pub is_stale_reprocess: bool,
601}
602
603/// Input for queueing an instruction event when PDA lookup fails.
604#[derive(Debug, Clone)]
605pub struct QueuedInstructionEvent {
606    pub pda_address: String,
607    pub event_type: String,
608    pub event_data: Value,
609    pub slot: u64,
610    pub signature: String,
611}
612
613/// Internal representation of a pending instruction event with queue metadata.
614#[derive(Debug, Clone)]
615pub struct PendingInstructionEvent {
616    pub event_type: String,
617    pub pda_address: String,
618    pub event_data: Value,
619    pub slot: u64,
620    pub signature: String,
621    pub queued_at: i64,
622}
623
624#[derive(Debug, Clone)]
625pub struct DeferredWhenOperation {
626    pub entity_name: String,
627    pub primary_key: Value,
628    pub field_path: String,
629    pub field_value: Value,
630    pub when_instruction: String,
631    pub signature: String,
632    pub slot: u64,
633    pub deferred_at: i64,
634    pub emit: bool,
635}
636
637#[derive(Debug, Clone)]
638pub struct ResolverRequest {
639    pub cache_key: String,
640    pub resolver: ResolverType,
641    pub input: Value,
642}
643
644#[derive(Debug, Clone)]
645pub struct ResolverTarget {
646    pub state_id: u32,
647    pub entity_name: String,
648    pub primary_key: Value,
649    pub extracts: Vec<ResolverExtractSpec>,
650}
651
652#[derive(Debug, Clone)]
653pub struct PendingResolverEntry {
654    pub resolver: ResolverType,
655    pub input: Value,
656    pub targets: Vec<ResolverTarget>,
657    pub queued_at: i64,
658}
659
660#[derive(Debug, Clone, PartialEq)]
661pub struct ScheduledCallback {
662    pub state_id: u32,
663    pub entity_name: String,
664    pub primary_key: Value,
665    pub resolver: ResolverType,
666    pub url_template: Option<Vec<ast::UrlTemplatePart>>,
667    pub input_value: Option<Value>,
668    pub input_path: Option<String>,
669    pub condition: Option<ast::ResolverCondition>,
670    pub strategy: ResolveStrategy,
671    pub extracts: Vec<ResolverExtractSpec>,
672    pub retry_count: u32,
673}
674
675impl PendingResolverEntry {
676    fn add_target(&mut self, target: ResolverTarget) {
677        if let Some(existing) = self.targets.iter_mut().find(|t| {
678            t.state_id == target.state_id
679                && t.entity_name == target.entity_name
680                && t.primary_key == target.primary_key
681        }) {
682            let mut seen = HashSet::new();
683            for extract in &existing.extracts {
684                seen.insert((extract.target_path.clone(), extract.source_path.clone()));
685            }
686
687            for extract in target.extracts {
688                let key = (extract.target_path.clone(), extract.source_path.clone());
689                if seen.insert(key) {
690                    existing.extracts.push(extract);
691                }
692            }
693        } else {
694            self.targets.push(target);
695        }
696    }
697}
698
699#[derive(Debug, Clone)]
700pub struct PendingQueueStats {
701    pub total_updates: usize,
702    pub unique_pdas: usize,
703    pub oldest_age_seconds: i64,
704    pub largest_pda_queue_size: usize,
705    pub estimated_memory_bytes: usize,
706}
707
708#[derive(Debug, Clone, Default)]
709pub struct VmMemoryStats {
710    pub state_table_entity_count: usize,
711    pub state_table_max_entries: usize,
712    pub state_table_at_capacity: bool,
713    pub lookup_index_count: usize,
714    pub lookup_index_total_entries: usize,
715    pub temporal_index_count: usize,
716    pub temporal_index_total_entries: usize,
717    pub pda_reverse_lookup_count: usize,
718    pub pda_reverse_lookup_total_entries: usize,
719    pub version_tracker_entries: usize,
720    pub pending_queue_stats: Option<PendingQueueStats>,
721    pub path_cache_size: usize,
722}
723
724#[derive(Debug, Clone, Default)]
725pub struct CleanupResult {
726    pub pending_updates_removed: usize,
727    pub temporal_entries_removed: usize,
728}
729
730#[derive(Debug, Clone)]
731pub struct CapacityWarning {
732    pub current_entries: usize,
733    pub max_entries: usize,
734    pub entries_over_limit: usize,
735}
736
737#[derive(Debug, Clone)]
738pub struct StateTableConfig {
739    pub max_entries: usize,
740    pub max_array_length: usize,
741}
742
743impl Default for StateTableConfig {
744    fn default() -> Self {
745        Self {
746            max_entries: DEFAULT_MAX_STATE_TABLE_ENTRIES,
747            max_array_length: DEFAULT_MAX_ARRAY_LENGTH,
748        }
749    }
750}
751
752#[derive(Debug)]
753pub struct VersionTracker {
754    cache: std::sync::Mutex<LruCache<String, (u64, u64)>>,
755}
756
757impl VersionTracker {
758    pub fn new() -> Self {
759        Self::with_capacity(DEFAULT_MAX_VERSION_TRACKER_ENTRIES)
760    }
761
762    pub fn with_capacity(capacity: usize) -> Self {
763        VersionTracker {
764            cache: std::sync::Mutex::new(LruCache::new(
765                NonZeroUsize::new(capacity).expect("capacity must be > 0"),
766            )),
767        }
768    }
769
770    fn make_key(primary_key: &Value, event_type: &str) -> String {
771        format!("{}:{}", primary_key, event_type)
772    }
773
774    pub fn get(&self, primary_key: &Value, event_type: &str) -> Option<(u64, u64)> {
775        let key = Self::make_key(primary_key, event_type);
776        self.cache.lock().unwrap().get(&key).copied()
777    }
778
779    pub fn insert(&self, primary_key: &Value, event_type: &str, slot: u64, ordering_value: u64) {
780        let key = Self::make_key(primary_key, event_type);
781        self.cache.lock().unwrap().put(key, (slot, ordering_value));
782    }
783
784    pub fn len(&self) -> usize {
785        self.cache.lock().unwrap().len()
786    }
787
788    pub fn is_empty(&self) -> bool {
789        self.cache.lock().unwrap().is_empty()
790    }
791}
792
793impl Default for VersionTracker {
794    fn default() -> Self {
795        Self::new()
796    }
797}
798
799#[derive(Debug)]
800pub struct StateTable {
801    pub data: DashMap<Value, Value>,
802    access_times: DashMap<Value, i64>,
803    pub lookup_indexes: HashMap<String, LookupIndex>,
804    pub temporal_indexes: HashMap<String, TemporalIndex>,
805    pub pda_reverse_lookups: HashMap<String, PdaReverseLookup>,
806    pub pending_updates: DashMap<String, Vec<PendingAccountUpdate>>,
807    pub pending_instruction_events: DashMap<String, Vec<PendingInstructionEvent>>,
808    /// Cache of the most recent account data per PDA address.  When a PDA
809    /// mapping changes (same PDA, different seed) the cached data is returned
810    /// for reprocessing so cross-account Lookup handlers resolve to the new key.
811    pub last_account_data: DashMap<String, PendingAccountUpdate>,
812    version_tracker: VersionTracker,
813    instruction_dedup_cache: VersionTracker,
814    config: StateTableConfig,
815    #[cfg_attr(not(feature = "otel"), allow(dead_code))]
816    entity_name: String,
817    pub recent_tx_instructions:
818        std::sync::Mutex<lru::LruCache<String, std::collections::HashSet<String>>>,
819    pub deferred_when_ops: DashMap<(String, String), Vec<DeferredWhenOperation>>,
820}
821
822impl StateTable {
823    pub fn is_at_capacity(&self) -> bool {
824        self.data.len() >= self.config.max_entries
825    }
826
827    pub fn entries_over_limit(&self) -> usize {
828        self.data.len().saturating_sub(self.config.max_entries)
829    }
830
831    pub fn max_array_length(&self) -> usize {
832        self.config.max_array_length
833    }
834
835    fn touch(&self, key: &Value) {
836        let now = std::time::SystemTime::now()
837            .duration_since(std::time::UNIX_EPOCH)
838            .unwrap()
839            .as_secs() as i64;
840        self.access_times.insert(key.clone(), now);
841    }
842
843    fn evict_lru(&self, count: usize) -> usize {
844        if count == 0 || self.data.is_empty() {
845            return 0;
846        }
847
848        let mut entries: Vec<(Value, i64)> = self
849            .access_times
850            .iter()
851            .map(|entry| (entry.key().clone(), *entry.value()))
852            .collect();
853
854        entries.sort_by_key(|(_, ts)| *ts);
855
856        let to_evict: Vec<Value> = entries.iter().take(count).map(|(k, _)| k.clone()).collect();
857
858        let mut evicted = 0;
859        for key in to_evict {
860            self.data.remove(&key);
861            self.access_times.remove(&key);
862            evicted += 1;
863        }
864
865        #[cfg(feature = "otel")]
866        if evicted > 0 {
867            crate::vm_metrics::record_state_table_eviction(evicted as u64, &self.entity_name);
868        }
869
870        evicted
871    }
872
873    pub fn insert_with_eviction(&self, key: Value, value: Value) {
874        if self.data.len() >= self.config.max_entries && !self.data.contains_key(&key) {
875            #[cfg(feature = "otel")]
876            crate::vm_metrics::record_state_table_at_capacity(&self.entity_name);
877            let to_evict = (self.data.len() + 1).saturating_sub(self.config.max_entries);
878            self.evict_lru(to_evict.max(1));
879        }
880        self.data.insert(key.clone(), value);
881        self.touch(&key);
882    }
883
884    pub fn get_and_touch(&self, key: &Value) -> Option<Value> {
885        let result = self.data.get(key).map(|v| v.clone());
886        if result.is_some() {
887            self.touch(key);
888        }
889        result
890    }
891
892    /// Check if an update is fresh and update the version tracker.
893    /// Returns true if the update should be processed (is fresh).
894    /// Returns false if the update is stale and should be skipped.
895    ///
896    /// Comparison is lexicographic on (slot, ordering_value):
897    /// (100, 5) > (100, 3) > (99, 999)
898    pub fn is_fresh_update(
899        &self,
900        primary_key: &Value,
901        event_type: &str,
902        slot: u64,
903        ordering_value: u64,
904    ) -> bool {
905        let dominated = self
906            .version_tracker
907            .get(primary_key, event_type)
908            .map(|(last_slot, last_version)| (slot, ordering_value) <= (last_slot, last_version))
909            .unwrap_or(false);
910
911        if dominated {
912            return false;
913        }
914
915        self.version_tracker
916            .insert(primary_key, event_type, slot, ordering_value);
917        true
918    }
919
920    /// Check if an instruction is a duplicate of one we've seen recently.
921    /// Returns true if this exact instruction has been seen before (is a duplicate).
922    /// Returns false if this is a new instruction that should be processed.
923    ///
924    /// Unlike account updates, instructions don't use recency checks - all
925    /// unique instructions are processed. Only exact duplicates are skipped.
926    /// Uses a smaller cache capacity for shorter effective TTL.
927    pub fn is_duplicate_instruction(
928        &self,
929        primary_key: &Value,
930        event_type: &str,
931        slot: u64,
932        txn_index: u64,
933    ) -> bool {
934        // Check if we've seen this exact instruction before
935        let is_duplicate = self
936            .instruction_dedup_cache
937            .get(primary_key, event_type)
938            .map(|(last_slot, last_txn_index)| slot == last_slot && txn_index == last_txn_index)
939            .unwrap_or(false);
940
941        if is_duplicate {
942            return true;
943        }
944
945        // Record this instruction for deduplication
946        self.instruction_dedup_cache
947            .insert(primary_key, event_type, slot, txn_index);
948        false
949    }
950}
951
952impl VmContext {
953    pub fn new() -> Self {
954        let mut vm = VmContext {
955            registers: vec![Value::Null; 256],
956            states: HashMap::new(),
957            instructions_executed: 0,
958            cache_hits: 0,
959            path_cache: HashMap::new(),
960            pda_cache_hits: 0,
961            pda_cache_misses: 0,
962            pending_queue_size: 0,
963            resolver_requests: VecDeque::new(),
964            resolver_pending: HashMap::new(),
965            resolver_cache: LruCache::new(
966                NonZeroUsize::new(DEFAULT_MAX_RESOLVER_CACHE_ENTRIES)
967                    .expect("capacity must be > 0"),
968            ),
969            resolver_cache_hits: 0,
970            resolver_cache_misses: 0,
971            current_context: None,
972            warnings: Vec::new(),
973            last_pda_lookup_miss: None,
974            last_lookup_index_miss: None,
975            last_pda_registered: None,
976            last_lookup_index_keys: Vec::new(),
977            scheduled_callbacks: Vec::new(),
978        };
979        vm.states.insert(
980            0,
981            StateTable {
982                data: DashMap::new(),
983                access_times: DashMap::new(),
984                lookup_indexes: HashMap::new(),
985                temporal_indexes: HashMap::new(),
986                pda_reverse_lookups: HashMap::new(),
987                pending_updates: DashMap::new(),
988                pending_instruction_events: DashMap::new(),
989                last_account_data: DashMap::new(),
990                version_tracker: VersionTracker::new(),
991                instruction_dedup_cache: VersionTracker::with_capacity(
992                    DEFAULT_MAX_INSTRUCTION_DEDUP_ENTRIES,
993                ),
994                config: StateTableConfig::default(),
995                entity_name: String::new(),
996                recent_tx_instructions: std::sync::Mutex::new(LruCache::new(
997                    NonZeroUsize::new(1000).unwrap(),
998                )),
999                deferred_when_ops: DashMap::new(),
1000            },
1001        );
1002
1003        vm
1004    }
1005
1006    /// Create a new VmContext specifically for multi-entity operation.
1007    pub fn new_multi_entity() -> Self {
1008        VmContext {
1009            registers: vec![Value::Null; 256],
1010            states: HashMap::new(),
1011            instructions_executed: 0,
1012            cache_hits: 0,
1013            path_cache: HashMap::new(),
1014            pda_cache_hits: 0,
1015            pda_cache_misses: 0,
1016            pending_queue_size: 0,
1017            resolver_requests: VecDeque::new(),
1018            resolver_pending: HashMap::new(),
1019            resolver_cache: LruCache::new(
1020                NonZeroUsize::new(DEFAULT_MAX_RESOLVER_CACHE_ENTRIES)
1021                    .expect("capacity must be > 0"),
1022            ),
1023            resolver_cache_hits: 0,
1024            resolver_cache_misses: 0,
1025            current_context: None,
1026            warnings: Vec::new(),
1027            last_pda_lookup_miss: None,
1028            last_lookup_index_miss: None,
1029            last_pda_registered: None,
1030            last_lookup_index_keys: Vec::new(),
1031            scheduled_callbacks: Vec::new(),
1032        }
1033    }
1034
1035    pub fn new_with_config(state_config: StateTableConfig) -> Self {
1036        let mut vm = VmContext {
1037            registers: vec![Value::Null; 256],
1038            states: HashMap::new(),
1039            instructions_executed: 0,
1040            cache_hits: 0,
1041            path_cache: HashMap::new(),
1042            pda_cache_hits: 0,
1043            pda_cache_misses: 0,
1044            pending_queue_size: 0,
1045            resolver_requests: VecDeque::new(),
1046            resolver_pending: HashMap::new(),
1047            resolver_cache: LruCache::new(
1048                NonZeroUsize::new(DEFAULT_MAX_RESOLVER_CACHE_ENTRIES)
1049                    .expect("capacity must be > 0"),
1050            ),
1051            resolver_cache_hits: 0,
1052            resolver_cache_misses: 0,
1053            current_context: None,
1054            warnings: Vec::new(),
1055            last_pda_lookup_miss: None,
1056            last_lookup_index_miss: None,
1057            last_pda_registered: None,
1058            last_lookup_index_keys: Vec::new(),
1059            scheduled_callbacks: Vec::new(),
1060        };
1061        vm.states.insert(
1062            0,
1063            StateTable {
1064                data: DashMap::new(),
1065                access_times: DashMap::new(),
1066                lookup_indexes: HashMap::new(),
1067                temporal_indexes: HashMap::new(),
1068                pda_reverse_lookups: HashMap::new(),
1069                pending_updates: DashMap::new(),
1070                pending_instruction_events: DashMap::new(),
1071                last_account_data: DashMap::new(),
1072                version_tracker: VersionTracker::new(),
1073                instruction_dedup_cache: VersionTracker::with_capacity(
1074                    DEFAULT_MAX_INSTRUCTION_DEDUP_ENTRIES,
1075                ),
1076                config: state_config,
1077                entity_name: "default".to_string(),
1078                recent_tx_instructions: std::sync::Mutex::new(LruCache::new(
1079                    NonZeroUsize::new(1000).unwrap(),
1080                )),
1081                deferred_when_ops: DashMap::new(),
1082            },
1083        );
1084        vm
1085    }
1086
1087    pub fn take_resolver_requests(&mut self) -> Vec<ResolverRequest> {
1088        self.resolver_requests.drain(..).collect()
1089    }
1090
1091    pub fn take_scheduled_callbacks(&mut self) -> Vec<(u64, ScheduledCallback)> {
1092        std::mem::take(&mut self.scheduled_callbacks)
1093    }
1094
1095    pub fn get_entity_state(&self, state_id: u32, key: &Value) -> Option<Value> {
1096        self.states.get(&state_id)?.get_and_touch(key)
1097    }
1098
1099    pub fn restore_resolver_requests(&mut self, requests: Vec<ResolverRequest>) {
1100        if requests.is_empty() {
1101            return;
1102        }
1103
1104        self.resolver_requests.extend(requests);
1105    }
1106
1107    pub fn apply_resolver_result(
1108        &mut self,
1109        bytecode: &MultiEntityBytecode,
1110        cache_key: &str,
1111        resolved_value: Value,
1112    ) -> Result<Vec<Mutation>> {
1113        self.resolver_cache
1114            .put(cache_key.to_string(), resolved_value.clone());
1115
1116        let entry = match self.resolver_pending.remove(cache_key) {
1117            Some(entry) => entry,
1118            None => return Ok(Vec::new()),
1119        };
1120
1121        let mut mutations = Vec::new();
1122
1123        for target in entry.targets {
1124            if target.primary_key.is_null() {
1125                continue;
1126            }
1127
1128            let entity_bytecode = match bytecode.entities.get(&target.entity_name) {
1129                Some(bc) => bc,
1130                None => continue,
1131            };
1132
1133            let state = match self.states.get(&target.state_id) {
1134                Some(s) => s,
1135                None => continue,
1136            };
1137
1138            let mut entity_state = state
1139                .get_and_touch(&target.primary_key)
1140                .unwrap_or_else(|| json!({}));
1141
1142            let mut dirty_tracker = DirtyTracker::new();
1143            let should_emit = |path: &str| !entity_bytecode.non_emitted_fields.contains(path);
1144
1145            Self::apply_resolver_extractions_to_value(
1146                &mut entity_state,
1147                &resolved_value,
1148                &target.extracts,
1149                &mut dirty_tracker,
1150                &should_emit,
1151            )?;
1152
1153            if let Some(evaluator) = entity_bytecode.computed_fields_evaluator.as_ref() {
1154                let old_values: Vec<_> = entity_bytecode
1155                    .computed_paths
1156                    .iter()
1157                    .map(|path| Self::get_value_at_path(&entity_state, path))
1158                    .collect();
1159
1160                let context_slot = self.current_context.as_ref().and_then(|c| c.slot);
1161                let context_timestamp = self
1162                    .current_context
1163                    .as_ref()
1164                    .map(|c| c.timestamp())
1165                    .unwrap_or_else(|| {
1166                        std::time::SystemTime::now()
1167                            .duration_since(std::time::UNIX_EPOCH)
1168                            .unwrap()
1169                            .as_secs() as i64
1170                    });
1171                let eval_result = evaluator(&mut entity_state, context_slot, context_timestamp);
1172
1173                if eval_result.is_ok() {
1174                    let mut changed_fields = Vec::new();
1175                    for (path, old_value) in
1176                        entity_bytecode.computed_paths.iter().zip(old_values.iter())
1177                    {
1178                        let new_value = Self::get_value_at_path(&entity_state, path);
1179                        let changed = new_value != *old_value;
1180                        let will_emit = should_emit(path);
1181
1182                        if changed && will_emit {
1183                            dirty_tracker.mark_replaced(path);
1184                            changed_fields.push(path.clone());
1185                        }
1186                    }
1187                }
1188            }
1189
1190            state.insert_with_eviction(target.primary_key.clone(), entity_state.clone());
1191
1192            if dirty_tracker.is_empty() {
1193                continue;
1194            }
1195
1196            let patch = Self::build_partial_state_from_value(&entity_state, &dirty_tracker)?;
1197
1198            mutations.push(Mutation {
1199                export: target.entity_name.clone(),
1200                key: target.primary_key.clone(),
1201                patch,
1202                append: vec![],
1203            });
1204        }
1205
1206        Ok(mutations)
1207    }
1208
1209    pub fn enqueue_resolver_request(
1210        &mut self,
1211        cache_key: String,
1212        resolver: ResolverType,
1213        input: Value,
1214        target: ResolverTarget,
1215    ) {
1216        if let Some(entry) = self.resolver_pending.get_mut(&cache_key) {
1217            entry.add_target(target);
1218            return;
1219        }
1220
1221        let queued_at = std::time::SystemTime::now()
1222            .duration_since(std::time::UNIX_EPOCH)
1223            .unwrap()
1224            .as_secs() as i64;
1225
1226        self.resolver_pending.insert(
1227            cache_key.clone(),
1228            PendingResolverEntry {
1229                resolver: resolver.clone(),
1230                input: input.clone(),
1231                targets: vec![target],
1232                queued_at,
1233            },
1234        );
1235
1236        self.resolver_requests.push_back(ResolverRequest {
1237            cache_key,
1238            resolver,
1239            input,
1240        });
1241    }
1242
1243    fn apply_resolver_extractions_to_value<F>(
1244        state: &mut Value,
1245        resolved_value: &Value,
1246        extracts: &[ResolverExtractSpec],
1247        dirty_tracker: &mut DirtyTracker,
1248        should_emit: &F,
1249    ) -> Result<()>
1250    where
1251        F: Fn(&str) -> bool,
1252    {
1253        for extract in extracts {
1254            let resolved = match extract.source_path.as_deref() {
1255                Some(path) => Self::get_value_at_path(resolved_value, path),
1256                None => Some(resolved_value.clone()),
1257            };
1258
1259            let Some(value) = resolved else {
1260                continue;
1261            };
1262
1263            let value = if let Some(transform) = &extract.transform {
1264                Self::apply_transformation(&value, transform)?
1265            } else {
1266                value
1267            };
1268
1269            Self::set_nested_field_value(state, &extract.target_path, value)?;
1270            if should_emit(&extract.target_path) {
1271                dirty_tracker.mark_replaced(&extract.target_path);
1272            }
1273        }
1274
1275        Ok(())
1276    }
1277
1278    fn build_partial_state_from_value(state: &Value, tracker: &DirtyTracker) -> Result<Value> {
1279        if tracker.is_empty() {
1280            return Ok(json!({}));
1281        }
1282
1283        let mut partial = serde_json::Map::new();
1284
1285        for (path, change) in tracker.iter() {
1286            let segments: Vec<&str> = path.split('.').collect();
1287
1288            let value_to_insert = match change {
1289                FieldChange::Replaced => {
1290                    let mut current = state;
1291                    let mut found = true;
1292
1293                    for segment in &segments {
1294                        match current.get(*segment) {
1295                            Some(v) => current = v,
1296                            None => {
1297                                found = false;
1298                                break;
1299                            }
1300                        }
1301                    }
1302
1303                    if !found {
1304                        continue;
1305                    }
1306                    current.clone()
1307                }
1308                FieldChange::Appended(values) => Value::Array(values.clone()),
1309            };
1310
1311            let mut target = &mut partial;
1312            for (i, segment) in segments.iter().enumerate() {
1313                if i == segments.len() - 1 {
1314                    target.insert(segment.to_string(), value_to_insert.clone());
1315                } else {
1316                    target
1317                        .entry(segment.to_string())
1318                        .or_insert_with(|| json!({}));
1319                    target = target
1320                        .get_mut(*segment)
1321                        .and_then(|v| v.as_object_mut())
1322                        .ok_or("Failed to build nested structure")?;
1323                }
1324            }
1325        }
1326
1327        Ok(Value::Object(partial))
1328    }
1329
1330    /// Get a mutable reference to a state table by ID
1331    /// Returns None if the state ID doesn't exist
1332    pub fn get_state_table_mut(&mut self, state_id: u32) -> Option<&mut StateTable> {
1333        self.states.get_mut(&state_id)
1334    }
1335
1336    /// Get public access to registers (for metrics context)
1337    pub fn registers_mut(&mut self) -> &mut Vec<RegisterValue> {
1338        &mut self.registers
1339    }
1340
1341    /// Get public access to path cache (for metrics context)
1342    pub fn path_cache(&self) -> &HashMap<String, CompiledPath> {
1343        &self.path_cache
1344    }
1345
1346    /// Get the current update context
1347    pub fn current_context(&self) -> Option<&UpdateContext> {
1348        self.current_context.as_ref()
1349    }
1350
1351    /// Set the current update context for computed field evaluation
1352    pub fn set_current_context(&mut self, context: Option<UpdateContext>) {
1353        self.current_context = context;
1354    }
1355
1356    fn add_warning(&mut self, msg: String) {
1357        self.warnings.push(msg);
1358    }
1359
1360    pub fn take_warnings(&mut self) -> Vec<String> {
1361        std::mem::take(&mut self.warnings)
1362    }
1363
1364    pub fn has_warnings(&self) -> bool {
1365        !self.warnings.is_empty()
1366    }
1367
1368    pub fn update_state_from_register(
1369        &mut self,
1370        state_id: u32,
1371        key: Value,
1372        register: Register,
1373    ) -> Result<()> {
1374        let state = self.states.get(&state_id).ok_or("State table not found")?;
1375        let value = self.registers[register].clone();
1376        state.insert_with_eviction(key, value);
1377        Ok(())
1378    }
1379
1380    fn reset_registers(&mut self) {
1381        for reg in &mut self.registers {
1382            *reg = Value::Null;
1383        }
1384    }
1385
1386    /// Extract only the dirty fields from state (public for use by instruction hooks)
1387    pub fn extract_partial_state(
1388        &self,
1389        state_reg: Register,
1390        dirty_fields: &HashSet<String>,
1391    ) -> Result<Value> {
1392        let full_state = &self.registers[state_reg];
1393
1394        if dirty_fields.is_empty() {
1395            return Ok(json!({}));
1396        }
1397
1398        let mut partial = serde_json::Map::new();
1399
1400        for path in dirty_fields {
1401            let segments: Vec<&str> = path.split('.').collect();
1402
1403            let mut current = full_state;
1404            let mut found = true;
1405
1406            for segment in &segments {
1407                match current.get(segment) {
1408                    Some(v) => current = v,
1409                    None => {
1410                        found = false;
1411                        break;
1412                    }
1413                }
1414            }
1415
1416            if !found {
1417                continue;
1418            }
1419
1420            let mut target = &mut partial;
1421            for (i, segment) in segments.iter().enumerate() {
1422                if i == segments.len() - 1 {
1423                    target.insert(segment.to_string(), current.clone());
1424                } else {
1425                    target
1426                        .entry(segment.to_string())
1427                        .or_insert_with(|| json!({}));
1428                    target = target
1429                        .get_mut(*segment)
1430                        .and_then(|v| v.as_object_mut())
1431                        .ok_or("Failed to build nested structure")?;
1432                }
1433            }
1434        }
1435
1436        Ok(Value::Object(partial))
1437    }
1438
1439    /// Extract a patch from state based on the DirtyTracker.
1440    /// For Replaced fields: extracts the full value from state.
1441    /// For Appended fields: emits only the appended values as an array.
1442    pub fn extract_partial_state_with_tracker(
1443        &self,
1444        state_reg: Register,
1445        tracker: &DirtyTracker,
1446    ) -> Result<Value> {
1447        let full_state = &self.registers[state_reg];
1448
1449        if tracker.is_empty() {
1450            return Ok(json!({}));
1451        }
1452
1453        let mut partial = serde_json::Map::new();
1454
1455        for (path, change) in tracker.iter() {
1456            let segments: Vec<&str> = path.split('.').collect();
1457
1458            let value_to_insert = match change {
1459                FieldChange::Replaced => {
1460                    let mut current = full_state;
1461                    let mut found = true;
1462
1463                    for segment in &segments {
1464                        match current.get(*segment) {
1465                            Some(v) => current = v,
1466                            None => {
1467                                found = false;
1468                                break;
1469                            }
1470                        }
1471                    }
1472
1473                    if !found {
1474                        continue;
1475                    }
1476                    current.clone()
1477                }
1478                FieldChange::Appended(values) => Value::Array(values.clone()),
1479            };
1480
1481            let mut target = &mut partial;
1482            for (i, segment) in segments.iter().enumerate() {
1483                if i == segments.len() - 1 {
1484                    target.insert(segment.to_string(), value_to_insert.clone());
1485                } else {
1486                    target
1487                        .entry(segment.to_string())
1488                        .or_insert_with(|| json!({}));
1489                    target = target
1490                        .get_mut(*segment)
1491                        .and_then(|v| v.as_object_mut())
1492                        .ok_or("Failed to build nested structure")?;
1493                }
1494            }
1495        }
1496
1497        Ok(Value::Object(partial))
1498    }
1499
1500    fn get_compiled_path(&mut self, path: &str) -> CompiledPath {
1501        if let Some(compiled) = self.path_cache.get(path) {
1502            self.cache_hits += 1;
1503            #[cfg(feature = "otel")]
1504            crate::vm_metrics::record_path_cache_hit();
1505            return compiled.clone();
1506        }
1507        #[cfg(feature = "otel")]
1508        crate::vm_metrics::record_path_cache_miss();
1509        let compiled = CompiledPath::new(path);
1510        self.path_cache.insert(path.to_string(), compiled.clone());
1511        compiled
1512    }
1513
1514    /// Process an event with optional context metadata
1515    #[cfg_attr(feature = "otel", instrument(
1516        name = "vm.process_event",
1517        skip(self, bytecode, event_value, log),
1518        level = "info",
1519        fields(
1520            event_type = %event_type,
1521            slot = context.as_ref().and_then(|c| c.slot),
1522        )
1523    ))]
1524    pub fn process_event(
1525        &mut self,
1526        bytecode: &MultiEntityBytecode,
1527        event_value: Value,
1528        event_type: &str,
1529        context: Option<&UpdateContext>,
1530        mut log: Option<&mut crate::canonical_log::CanonicalLog>,
1531    ) -> Result<Vec<Mutation>> {
1532        self.current_context = context.cloned();
1533
1534        let mut event_value = event_value;
1535        if let Some(ctx) = context {
1536            if let Some(obj) = event_value.as_object_mut() {
1537                obj.insert("__update_context".to_string(), ctx.to_value());
1538            }
1539        }
1540
1541        let mut all_mutations = Vec::new();
1542
1543        if event_type.ends_with("IxState") && bytecode.when_events.contains(event_type) {
1544            if let Some(ctx) = context {
1545                if let Some(signature) = ctx.signature.clone() {
1546                    let state_ids: Vec<u32> = self.states.keys().cloned().collect();
1547                    for state_id in state_ids {
1548                        if let Some(state) = self.states.get(&state_id) {
1549                            {
1550                                let mut cache = state.recent_tx_instructions.lock().unwrap();
1551                                let entry =
1552                                    cache.get_or_insert_mut(signature.clone(), HashSet::new);
1553                                entry.insert(event_type.to_string());
1554                            }
1555
1556                            let key = (signature.clone(), event_type.to_string());
1557                            if let Some((_, deferred_ops)) = state.deferred_when_ops.remove(&key) {
1558                                tracing::debug!(
1559                                    event_type = %event_type,
1560                                    signature = %signature,
1561                                    deferred_count = deferred_ops.len(),
1562                                    "flushing deferred when-ops"
1563                                );
1564                                for op in deferred_ops {
1565                                    // Look up the entity bytecode to get the computed fields evaluator
1566                                    let (evaluator, computed_paths) = bytecode
1567                                        .entities
1568                                        .get(&op.entity_name)
1569                                        .map(|eb| {
1570                                            (
1571                                                eb.computed_fields_evaluator.as_ref(),
1572                                                eb.computed_paths.as_slice(),
1573                                            )
1574                                        })
1575                                        .unwrap_or((None, &[]));
1576
1577                                    match self.apply_deferred_when_op(
1578                                        state_id,
1579                                        &op,
1580                                        evaluator,
1581                                        Some(computed_paths),
1582                                    ) {
1583                                        Ok(mutations) => all_mutations.extend(mutations),
1584                                        Err(e) => tracing::warn!(
1585                                            "Failed to apply deferred when-op: {}",
1586                                            e
1587                                        ),
1588                                    }
1589                                }
1590                            }
1591                        }
1592                    }
1593                }
1594            }
1595        }
1596
1597        if let Some(entity_names) = bytecode.event_routing.get(event_type) {
1598            for entity_name in entity_names {
1599                if let Some(entity_bytecode) = bytecode.entities.get(entity_name) {
1600                    if let Some(handler) = entity_bytecode.handlers.get(event_type) {
1601                        if let Some(ref mut log) = log {
1602                            log.set("entity", entity_name.clone());
1603                            log.inc("handlers", 1);
1604                        }
1605
1606                        let opcodes_before = self.instructions_executed;
1607                        let cache_before = self.cache_hits;
1608                        let pda_hits_before = self.pda_cache_hits;
1609                        let pda_misses_before = self.pda_cache_misses;
1610
1611                        let mutations = self.execute_handler(
1612                            handler,
1613                            &event_value,
1614                            event_type,
1615                            entity_bytecode.state_id,
1616                            entity_name,
1617                            entity_bytecode.computed_fields_evaluator.as_ref(),
1618                            Some(&entity_bytecode.non_emitted_fields),
1619                        )?;
1620
1621                        if let Some(ref mut log) = log {
1622                            log.inc(
1623                                "opcodes",
1624                                (self.instructions_executed - opcodes_before) as i64,
1625                            );
1626                            log.inc("cache_hits", (self.cache_hits - cache_before) as i64);
1627                            log.inc("pda_hits", (self.pda_cache_hits - pda_hits_before) as i64);
1628                            log.inc(
1629                                "pda_misses",
1630                                (self.pda_cache_misses - pda_misses_before) as i64,
1631                            );
1632                        }
1633
1634                        if mutations.is_empty() {
1635                            // CPI events (suffix "CpiEvent") are transaction-scoped like instructions
1636                            // (suffix "IxState") and should be queued the same way when PDA lookup fails.
1637                            let is_tx_event =
1638                                event_type.ends_with("IxState") || event_type.ends_with("CpiEvent");
1639                            if let Some(missed_pda) = self.take_last_pda_lookup_miss() {
1640                                if is_tx_event {
1641                                    let slot = context.and_then(|c| c.slot).unwrap_or(0);
1642                                    let signature = context
1643                                        .and_then(|c| c.signature.clone())
1644                                        .unwrap_or_default();
1645                                    let _ = self.queue_instruction_event(
1646                                        entity_bytecode.state_id,
1647                                        QueuedInstructionEvent {
1648                                            pda_address: missed_pda,
1649                                            event_type: event_type.to_string(),
1650                                            event_data: event_value.clone(),
1651                                            slot,
1652                                            signature,
1653                                        },
1654                                    );
1655                                } else {
1656                                    // Queue account updates (e.g. BondingCurve) when PDA
1657                                    // reverse lookup fails. These will be flushed when an
1658                                    // instruction registers the PDA mapping via
1659                                    // UpdateLookupIndex.
1660                                    let slot = context.and_then(|c| c.slot).unwrap_or(0);
1661                                    let signature = context
1662                                        .and_then(|c| c.signature.clone())
1663                                        .unwrap_or_default();
1664                                    if let Some(write_version) =
1665                                        context.and_then(|c| c.write_version)
1666                                    {
1667                                        let _ = self.queue_account_update(
1668                                            entity_bytecode.state_id,
1669                                            QueuedAccountUpdate {
1670                                                pda_address: missed_pda,
1671                                                account_type: event_type.to_string(),
1672                                                account_data: event_value.clone(),
1673                                                slot,
1674                                                write_version,
1675                                                signature,
1676                                            },
1677                                        );
1678                                    } else {
1679                                        tracing::warn!(
1680                                            event_type = %event_type,
1681                                            "Dropping queued account update: write_version missing from context"
1682                                        );
1683                                    }
1684                                }
1685                            }
1686                            if let Some(missed_lookup) = self.take_last_lookup_index_miss() {
1687                                if !is_tx_event {
1688                                    let slot = context.and_then(|c| c.slot).unwrap_or(0);
1689                                    let signature = context
1690                                        .and_then(|c| c.signature.clone())
1691                                        .unwrap_or_default();
1692                                    if let Some(write_version) =
1693                                        context.and_then(|c| c.write_version)
1694                                    {
1695                                        let _ = self.queue_account_update(
1696                                            entity_bytecode.state_id,
1697                                            QueuedAccountUpdate {
1698                                                pda_address: missed_lookup,
1699                                                account_type: event_type.to_string(),
1700                                                account_data: event_value.clone(),
1701                                                slot,
1702                                                write_version,
1703                                                signature,
1704                                            },
1705                                        );
1706                                    } else {
1707                                        tracing::trace!(
1708                                            event_type = %event_type,
1709                                            "Discarding lookup_index_miss for tx-scoped event (IxState/CpiEvent do not use lookup-index queuing)"
1710                                        );
1711                                    }
1712                                }
1713                            }
1714                        }
1715
1716                        all_mutations.extend(mutations);
1717
1718                        if event_type.ends_with("IxState") || event_type.ends_with("CpiEvent") {
1719                            if let Some(ctx) = context {
1720                                if let Some(ref signature) = ctx.signature {
1721                                    if let Some(state) = self.states.get(&entity_bytecode.state_id)
1722                                    {
1723                                        {
1724                                            let mut cache =
1725                                                state.recent_tx_instructions.lock().unwrap();
1726                                            let entry = cache
1727                                                .get_or_insert_mut(signature.clone(), HashSet::new);
1728                                            entry.insert(event_type.to_string());
1729                                        }
1730
1731                                        let key = (signature.clone(), event_type.to_string());
1732                                        if let Some((_, deferred_ops)) =
1733                                            state.deferred_when_ops.remove(&key)
1734                                        {
1735                                            tracing::debug!(
1736                                                event_type = %event_type,
1737                                                signature = %signature,
1738                                                deferred_count = deferred_ops.len(),
1739                                                "flushing deferred when-ops"
1740                                            );
1741                                            for op in deferred_ops {
1742                                                match self.apply_deferred_when_op(
1743                                                    entity_bytecode.state_id,
1744                                                    &op,
1745                                                    entity_bytecode
1746                                                        .computed_fields_evaluator
1747                                                        .as_ref(),
1748                                                    Some(&entity_bytecode.computed_paths),
1749                                                ) {
1750                                                    Ok(mutations) => {
1751                                                        all_mutations.extend(mutations)
1752                                                    }
1753                                                    Err(e) => {
1754                                                        tracing::warn!(
1755                                                            "Failed to apply deferred when-op: {}",
1756                                                            e
1757                                                        );
1758                                                    }
1759                                                }
1760                                            }
1761                                        }
1762                                    }
1763                                }
1764                            }
1765                        }
1766
1767                        if let Some(registered_pda) = self.take_last_pda_registered() {
1768                            let pending_events = self.flush_pending_instruction_events(
1769                                entity_bytecode.state_id,
1770                                &registered_pda,
1771                            );
1772                            for pending in pending_events {
1773                                if let Some(pending_handler) =
1774                                    entity_bytecode.handlers.get(&pending.event_type)
1775                                {
1776                                    if let Ok(reprocessed_mutations) = self.execute_handler(
1777                                        pending_handler,
1778                                        &pending.event_data,
1779                                        &pending.event_type,
1780                                        entity_bytecode.state_id,
1781                                        entity_name,
1782                                        entity_bytecode.computed_fields_evaluator.as_ref(),
1783                                        Some(&entity_bytecode.non_emitted_fields),
1784                                    ) {
1785                                        all_mutations.extend(reprocessed_mutations);
1786                                    }
1787                                }
1788                            }
1789                        }
1790
1791                        let lookup_keys = self.take_last_lookup_index_keys();
1792                        if !lookup_keys.is_empty() {
1793                            tracing::info!(
1794                                keys = ?lookup_keys,
1795                                entity = %entity_name,
1796                                "vm.process_event: flushing pending updates for lookup_keys"
1797                            );
1798                        }
1799                        for lookup_key in lookup_keys {
1800                            if let Ok(pending_updates) =
1801                                self.flush_pending_updates(entity_bytecode.state_id, &lookup_key)
1802                            {
1803                                for pending in pending_updates {
1804                                    if let Some(pending_handler) =
1805                                        entity_bytecode.handlers.get(&pending.account_type)
1806                                    {
1807                                        self.current_context = Some(UpdateContext::new_account(
1808                                            pending.slot,
1809                                            pending.signature.clone(),
1810                                            pending.write_version,
1811                                        ));
1812                                        match self.execute_handler(
1813                                            pending_handler,
1814                                            &pending.account_data,
1815                                            &pending.account_type,
1816                                            entity_bytecode.state_id,
1817                                            entity_name,
1818                                            entity_bytecode.computed_fields_evaluator.as_ref(),
1819                                            Some(&entity_bytecode.non_emitted_fields),
1820                                        ) {
1821                                            Ok(reprocessed) => {
1822                                                all_mutations.extend(reprocessed);
1823                                            }
1824                                            Err(e) => {
1825                                                tracing::warn!(
1826                                                    error = %e,
1827                                                    account_type = %pending.account_type,
1828                                                    "Flushed event reprocessing failed"
1829                                                );
1830                                            }
1831                                        }
1832                                    }
1833                                }
1834                            }
1835                        }
1836                    } else if let Some(ref mut log) = log {
1837                        log.set("skip_reason", "no_handler");
1838                    }
1839                } else if let Some(ref mut log) = log {
1840                    log.set("skip_reason", "entity_not_found");
1841                }
1842            }
1843        } else if let Some(ref mut log) = log {
1844            log.set("skip_reason", "no_event_routing");
1845        }
1846
1847        if let Some(log) = log {
1848            log.set("mutations", all_mutations.len() as i64);
1849            if let Some(first) = all_mutations.first() {
1850                if let Some(key_str) = first.key.as_str() {
1851                    log.set("primary_key", key_str);
1852                } else if let Some(key_num) = first.key.as_u64() {
1853                    log.set("primary_key", key_num as i64);
1854                }
1855            }
1856            if let Some(state) = self.states.get(&0) {
1857                log.set("state_table_size", state.data.len() as i64);
1858            }
1859
1860            let warnings = self.take_warnings();
1861            if !warnings.is_empty() {
1862                log.set("warnings", warnings.len() as i64);
1863                log.set(
1864                    "warning_messages",
1865                    Value::Array(warnings.into_iter().map(Value::String).collect()),
1866                );
1867                log.set_level(crate::canonical_log::LogLevel::Warn);
1868            }
1869        } else {
1870            self.warnings.clear();
1871        }
1872
1873        if self.instructions_executed.is_multiple_of(1000) {
1874            let state_ids: Vec<u32> = self.states.keys().cloned().collect();
1875            for state_id in state_ids {
1876                let expired = self.cleanup_expired_when_ops(state_id, 60);
1877                if expired > 0 {
1878                    tracing::debug!(
1879                        "Cleaned up {} expired deferred when-ops for state {}",
1880                        expired,
1881                        state_id
1882                    );
1883                }
1884            }
1885        }
1886
1887        Ok(all_mutations)
1888    }
1889
1890    pub fn process_any(
1891        &mut self,
1892        bytecode: &MultiEntityBytecode,
1893        any: prost_types::Any,
1894    ) -> Result<Vec<Mutation>> {
1895        let (event_value, event_type) = bytecode.proto_router.decode(any)?;
1896        self.process_event(bytecode, event_value, &event_type, None, None)
1897    }
1898
1899    #[cfg_attr(feature = "otel", instrument(
1900        name = "vm.execute_handler",
1901        skip(self, handler, event_value, entity_evaluator),
1902        level = "debug",
1903        fields(
1904            event_type = %event_type,
1905            handler_opcodes = handler.len(),
1906        )
1907    ))]
1908    #[allow(clippy::type_complexity, clippy::too_many_arguments)]
1909    fn execute_handler(
1910        &mut self,
1911        handler: &[OpCode],
1912        event_value: &Value,
1913        event_type: &str,
1914        override_state_id: u32,
1915        entity_name: &str,
1916        entity_evaluator: Option<
1917            &Box<dyn Fn(&mut Value, Option<u64>, i64) -> Result<()> + Send + Sync>,
1918        >,
1919        non_emitted_fields: Option<&HashSet<String>>,
1920    ) -> Result<Vec<Mutation>> {
1921        self.reset_registers();
1922        self.last_pda_lookup_miss = None;
1923
1924        let mut pc: usize = 0;
1925        let mut output = Vec::new();
1926        let mut dirty_tracker = DirtyTracker::new();
1927        let should_emit = |path: &str| {
1928            non_emitted_fields
1929                .map(|fields| !fields.contains(path))
1930                .unwrap_or(true)
1931        };
1932
1933        while pc < handler.len() {
1934            match &handler[pc] {
1935                OpCode::LoadEventField {
1936                    path,
1937                    dest,
1938                    default,
1939                } => {
1940                    let value = self.load_field(event_value, path, default.as_ref())?;
1941                    self.registers[*dest] = value;
1942                    pc += 1;
1943                }
1944                OpCode::LoadConstant { value, dest } => {
1945                    self.registers[*dest] = value.clone();
1946                    pc += 1;
1947                }
1948                OpCode::CopyRegister { source, dest } => {
1949                    self.registers[*dest] = self.registers[*source].clone();
1950                    pc += 1;
1951                }
1952                OpCode::CopyRegisterIfNull { source, dest } => {
1953                    if self.registers[*dest].is_null() {
1954                        self.registers[*dest] = self.registers[*source].clone();
1955                    }
1956                    pc += 1;
1957                }
1958                OpCode::GetEventType { dest } => {
1959                    self.registers[*dest] = json!(event_type);
1960                    pc += 1;
1961                }
1962                OpCode::CreateObject { dest } => {
1963                    self.registers[*dest] = json!({});
1964                    pc += 1;
1965                }
1966                OpCode::SetField {
1967                    object,
1968                    path,
1969                    value,
1970                } => {
1971                    self.set_field_auto_vivify(*object, path, *value)?;
1972                    if should_emit(path) {
1973                        dirty_tracker.mark_replaced(path);
1974                    }
1975                    pc += 1;
1976                }
1977                OpCode::SetFields { object, fields } => {
1978                    for (path, value_reg) in fields {
1979                        self.set_field_auto_vivify(*object, path, *value_reg)?;
1980                        if should_emit(path) {
1981                            dirty_tracker.mark_replaced(path);
1982                        }
1983                    }
1984                    pc += 1;
1985                }
1986                OpCode::GetField { object, path, dest } => {
1987                    let value = self.get_field(*object, path)?;
1988                    self.registers[*dest] = value;
1989                    pc += 1;
1990                }
1991                OpCode::AbortIfNullKey {
1992                    key,
1993                    is_account_event,
1994                } => {
1995                    let key_value = &self.registers[*key];
1996                    if key_value.is_null() && *is_account_event {
1997                        tracing::debug!(
1998                            event_type = %event_type,
1999                            "AbortIfNullKey: key is null for account state event, \
2000                             returning empty mutations for queueing"
2001                        );
2002                        return Ok(Vec::new());
2003                    }
2004
2005                    pc += 1;
2006                }
2007                OpCode::ReadOrInitState {
2008                    state_id: _,
2009                    key,
2010                    default,
2011                    dest,
2012                } => {
2013                    let actual_state_id = override_state_id;
2014                    let entity_name_owned = entity_name.to_string();
2015                    self.states
2016                        .entry(actual_state_id)
2017                        .or_insert_with(|| StateTable {
2018                            data: DashMap::new(),
2019                            access_times: DashMap::new(),
2020                            lookup_indexes: HashMap::new(),
2021                            temporal_indexes: HashMap::new(),
2022                            pda_reverse_lookups: HashMap::new(),
2023                            pending_updates: DashMap::new(),
2024                            pending_instruction_events: DashMap::new(),
2025                            last_account_data: DashMap::new(),
2026                            version_tracker: VersionTracker::new(),
2027                            instruction_dedup_cache: VersionTracker::with_capacity(
2028                                DEFAULT_MAX_INSTRUCTION_DEDUP_ENTRIES,
2029                            ),
2030                            config: StateTableConfig::default(),
2031                            entity_name: entity_name_owned,
2032                            recent_tx_instructions: std::sync::Mutex::new(LruCache::new(
2033                                NonZeroUsize::new(1000).unwrap(),
2034                            )),
2035                            deferred_when_ops: DashMap::new(),
2036                        });
2037                    let key_value = self.registers[*key].clone();
2038                    // Warn if key is null for account state events (not instruction events or CPI events)
2039                    let warn_null_key = key_value.is_null()
2040                        && event_type.ends_with("State")
2041                        && !event_type.ends_with("IxState")
2042                        && !event_type.ends_with("CpiEvent");
2043
2044                    if warn_null_key {
2045                        self.add_warning(format!(
2046                            "ReadOrInitState: key register {} is NULL for account state, event_type={}",
2047                            key, event_type
2048                        ));
2049                    }
2050
2051                    let state = self
2052                        .states
2053                        .get(&actual_state_id)
2054                        .ok_or("State table not found")?;
2055
2056                    if !key_value.is_null() {
2057                        if let Some(ctx) = &self.current_context {
2058                            // Account updates: use recency check to discard stale updates
2059                            // IMPORTANT: Use account address (PDA) as the key, not entity key,
2060                            // because multiple accounts can update the same entity and each
2061                            // has its own independent write_version
2062                            if ctx.is_account_update() {
2063                                if let (Some(slot), Some(write_version)) =
2064                                    (ctx.slot, ctx.write_version)
2065                                {
2066                                    // Get account address from event_value for proper tracking
2067                                    let account_address = event_value
2068                                        .get("__account_address")
2069                                        .cloned()
2070                                        .unwrap_or_else(|| key_value.clone());
2071
2072                                    if !state.is_fresh_update(
2073                                        &account_address,
2074                                        event_type,
2075                                        slot,
2076                                        write_version,
2077                                    ) {
2078                                        self.add_warning(format!(
2079                                            "Stale account update skipped: slot={}, write_version={}, account={}",
2080                                            slot, write_version, account_address
2081                                        ));
2082                                        return Ok(Vec::new());
2083                                    }
2084                                }
2085                            }
2086                            // Instruction updates: process all, but skip exact duplicates
2087                            else if ctx.is_instruction_update() {
2088                                if let (Some(slot), Some(txn_index)) = (ctx.slot, ctx.txn_index) {
2089                                    if state.is_duplicate_instruction(
2090                                        &key_value, event_type, slot, txn_index,
2091                                    ) {
2092                                        self.add_warning(format!(
2093                                            "Duplicate instruction skipped: slot={}, txn_index={}",
2094                                            slot, txn_index
2095                                        ));
2096                                        return Ok(Vec::new());
2097                                    }
2098                                }
2099                            }
2100                        }
2101                    }
2102                    let value = state
2103                        .get_and_touch(&key_value)
2104                        .unwrap_or_else(|| default.clone());
2105
2106                    self.registers[*dest] = value;
2107                    pc += 1;
2108                }
2109                OpCode::UpdateState {
2110                    state_id: _,
2111                    key,
2112                    value,
2113                } => {
2114                    let actual_state_id = override_state_id;
2115                    let state = self
2116                        .states
2117                        .get(&actual_state_id)
2118                        .ok_or("State table not found")?;
2119                    let key_value = self.registers[*key].clone();
2120                    let value_data = self.registers[*value].clone();
2121
2122                    state.insert_with_eviction(key_value, value_data);
2123                    pc += 1;
2124                }
2125                OpCode::AppendToArray {
2126                    object,
2127                    path,
2128                    value,
2129                } => {
2130                    let appended_value = self.registers[*value].clone();
2131                    let max_len = self
2132                        .states
2133                        .get(&override_state_id)
2134                        .map(|s| s.max_array_length())
2135                        .unwrap_or(DEFAULT_MAX_ARRAY_LENGTH);
2136                    self.append_to_array(*object, path, *value, max_len)?;
2137                    if should_emit(path) {
2138                        dirty_tracker.mark_appended(path, appended_value);
2139                    }
2140                    pc += 1;
2141                }
2142                OpCode::GetCurrentTimestamp { dest } => {
2143                    let timestamp = std::time::SystemTime::now()
2144                        .duration_since(std::time::UNIX_EPOCH)
2145                        .unwrap()
2146                        .as_secs() as i64;
2147                    self.registers[*dest] = json!(timestamp);
2148                    pc += 1;
2149                }
2150                OpCode::CreateEvent { dest, event_value } => {
2151                    let timestamp = std::time::SystemTime::now()
2152                        .duration_since(std::time::UNIX_EPOCH)
2153                        .unwrap()
2154                        .as_secs() as i64;
2155
2156                    // Filter out __update_context from the event data
2157                    let mut event_data = self.registers[*event_value].clone();
2158                    if let Some(obj) = event_data.as_object_mut() {
2159                        obj.remove("__update_context");
2160                    }
2161
2162                    // Create event with timestamp, data, and optional slot/signature from context
2163                    let mut event = serde_json::Map::new();
2164                    event.insert("timestamp".to_string(), json!(timestamp));
2165                    event.insert("data".to_string(), event_data);
2166
2167                    // Add slot and signature if available from current context
2168                    if let Some(ref ctx) = self.current_context {
2169                        if let Some(slot) = ctx.slot {
2170                            event.insert("slot".to_string(), json!(slot));
2171                        }
2172                        if let Some(ref signature) = ctx.signature {
2173                            event.insert("signature".to_string(), json!(signature));
2174                        }
2175                    }
2176
2177                    self.registers[*dest] = Value::Object(event);
2178                    pc += 1;
2179                }
2180                OpCode::CreateCapture {
2181                    dest,
2182                    capture_value,
2183                } => {
2184                    let timestamp = std::time::SystemTime::now()
2185                        .duration_since(std::time::UNIX_EPOCH)
2186                        .unwrap()
2187                        .as_secs() as i64;
2188
2189                    // Get the capture data (already filtered by load_field)
2190                    let capture_data = self.registers[*capture_value].clone();
2191
2192                    // Extract account_address from the original event if available
2193                    let account_address = event_value
2194                        .get("__account_address")
2195                        .and_then(|v| v.as_str())
2196                        .unwrap_or("")
2197                        .to_string();
2198
2199                    // Create capture wrapper with timestamp, account_address, data, and optional slot/signature
2200                    let mut capture = serde_json::Map::new();
2201                    capture.insert("timestamp".to_string(), json!(timestamp));
2202                    capture.insert("account_address".to_string(), json!(account_address));
2203                    capture.insert("data".to_string(), capture_data);
2204
2205                    // Add slot and signature if available from current context
2206                    if let Some(ref ctx) = self.current_context {
2207                        if let Some(slot) = ctx.slot {
2208                            capture.insert("slot".to_string(), json!(slot));
2209                        }
2210                        if let Some(ref signature) = ctx.signature {
2211                            capture.insert("signature".to_string(), json!(signature));
2212                        }
2213                    }
2214
2215                    self.registers[*dest] = Value::Object(capture);
2216                    pc += 1;
2217                }
2218                OpCode::Transform {
2219                    source,
2220                    dest,
2221                    transformation,
2222                } => {
2223                    if source == dest {
2224                        self.transform_in_place(*source, transformation)?;
2225                    } else {
2226                        let source_value = &self.registers[*source];
2227                        let value = Self::apply_transformation(source_value, transformation)?;
2228                        self.registers[*dest] = value;
2229                    }
2230                    pc += 1;
2231                }
2232                OpCode::EmitMutation {
2233                    entity_name,
2234                    key,
2235                    state,
2236                } => {
2237                    let primary_key = self.registers[*key].clone();
2238
2239                    if primary_key.is_null() || dirty_tracker.is_empty() {
2240                        let reason = if dirty_tracker.is_empty() {
2241                            "no_fields_modified"
2242                        } else {
2243                            "null_primary_key"
2244                        };
2245                        self.add_warning(format!(
2246                            "Skipping mutation for entity '{}': {} (dirty_fields={})",
2247                            entity_name,
2248                            reason,
2249                            dirty_tracker.len()
2250                        ));
2251                    } else {
2252                        let patch =
2253                            self.extract_partial_state_with_tracker(*state, &dirty_tracker)?;
2254
2255                        let append = dirty_tracker.appended_paths();
2256                        let mutation = Mutation {
2257                            export: entity_name.clone(),
2258                            key: primary_key,
2259                            patch,
2260                            append,
2261                        };
2262                        output.push(mutation);
2263                    }
2264                    pc += 1;
2265                }
2266                OpCode::SetFieldIfNull {
2267                    object,
2268                    path,
2269                    value,
2270                } => {
2271                    let was_set = self.set_field_if_null(*object, path, *value)?;
2272                    if was_set && should_emit(path) {
2273                        dirty_tracker.mark_replaced(path);
2274                    }
2275                    pc += 1;
2276                }
2277                OpCode::SetFieldMax {
2278                    object,
2279                    path,
2280                    value,
2281                } => {
2282                    let was_updated = self.set_field_max(*object, path, *value)?;
2283                    if was_updated && should_emit(path) {
2284                        dirty_tracker.mark_replaced(path);
2285                    }
2286                    pc += 1;
2287                }
2288                OpCode::UpdateTemporalIndex {
2289                    state_id: _,
2290                    index_name,
2291                    lookup_value,
2292                    primary_key,
2293                    timestamp,
2294                } => {
2295                    let actual_state_id = override_state_id;
2296                    let state = self
2297                        .states
2298                        .get_mut(&actual_state_id)
2299                        .ok_or("State table not found")?;
2300                    let index = state
2301                        .temporal_indexes
2302                        .entry(index_name.clone())
2303                        .or_insert_with(TemporalIndex::new);
2304
2305                    let lookup_val = self.registers[*lookup_value].clone();
2306                    let pk_val = self.registers[*primary_key].clone();
2307                    let ts_val = if let Some(val) = self.registers[*timestamp].as_i64() {
2308                        val
2309                    } else if let Some(val) = self.registers[*timestamp].as_u64() {
2310                        val as i64
2311                    } else {
2312                        return Err(format!(
2313                            "Timestamp must be a number (i64 or u64), got: {:?}",
2314                            self.registers[*timestamp]
2315                        )
2316                        .into());
2317                    };
2318
2319                    index.insert(lookup_val, pk_val, ts_val);
2320                    pc += 1;
2321                }
2322                OpCode::LookupTemporalIndex {
2323                    state_id: _,
2324                    index_name,
2325                    lookup_value,
2326                    timestamp,
2327                    dest,
2328                } => {
2329                    let actual_state_id = override_state_id;
2330                    let state = self
2331                        .states
2332                        .get(&actual_state_id)
2333                        .ok_or("State table not found")?;
2334                    let lookup_val = &self.registers[*lookup_value];
2335
2336                    let result = if self.registers[*timestamp].is_null() {
2337                        if let Some(index) = state.temporal_indexes.get(index_name) {
2338                            index.lookup_latest(lookup_val).unwrap_or(Value::Null)
2339                        } else {
2340                            Value::Null
2341                        }
2342                    } else {
2343                        let ts_val = if let Some(val) = self.registers[*timestamp].as_i64() {
2344                            val
2345                        } else if let Some(val) = self.registers[*timestamp].as_u64() {
2346                            val as i64
2347                        } else {
2348                            return Err(format!(
2349                                "Timestamp must be a number (i64 or u64), got: {:?}",
2350                                self.registers[*timestamp]
2351                            )
2352                            .into());
2353                        };
2354
2355                        if let Some(index) = state.temporal_indexes.get(index_name) {
2356                            index.lookup(lookup_val, ts_val).unwrap_or(Value::Null)
2357                        } else {
2358                            Value::Null
2359                        }
2360                    };
2361
2362                    self.registers[*dest] = result;
2363                    pc += 1;
2364                }
2365                OpCode::UpdateLookupIndex {
2366                    state_id: _,
2367                    index_name,
2368                    lookup_value,
2369                    primary_key,
2370                } => {
2371                    let actual_state_id = override_state_id;
2372                    let state = self
2373                        .states
2374                        .get_mut(&actual_state_id)
2375                        .ok_or("State table not found")?;
2376                    let index = state
2377                        .lookup_indexes
2378                        .entry(index_name.clone())
2379                        .or_insert_with(LookupIndex::new);
2380
2381                    let lookup_val = self.registers[*lookup_value].clone();
2382                    let pk_val = self.registers[*primary_key].clone();
2383
2384                    index.insert(lookup_val.clone(), pk_val);
2385
2386                    // Track lookup keys so process_event can flush queued account updates
2387                    if let Some(key_str) = lookup_val.as_str() {
2388                        self.last_lookup_index_keys.push(key_str.to_string());
2389                    }
2390
2391                    pc += 1;
2392                }
2393                OpCode::LookupIndex {
2394                    state_id: _,
2395                    index_name,
2396                    lookup_value,
2397                    dest,
2398                } => {
2399                    let actual_state_id = override_state_id;
2400                    let mut current_value = self.registers[*lookup_value].clone();
2401
2402                    const MAX_CHAIN_DEPTH: usize = 5;
2403                    let mut iterations = 0;
2404
2405                    let final_result = if self.states.contains_key(&actual_state_id) {
2406                        loop {
2407                            iterations += 1;
2408                            if iterations > MAX_CHAIN_DEPTH {
2409                                break current_value;
2410                            }
2411
2412                            let resolved = self
2413                                .states
2414                                .get(&actual_state_id)
2415                                .and_then(|state| {
2416                                    if let Some(index) = state.lookup_indexes.get(index_name) {
2417                                        if let Some(found) = index.lookup(&current_value) {
2418                                            return Some(found);
2419                                        }
2420                                    }
2421
2422                                    for (name, index) in state.lookup_indexes.iter() {
2423                                        if name == index_name {
2424                                            continue;
2425                                        }
2426                                        if let Some(found) = index.lookup(&current_value) {
2427                                            return Some(found);
2428                                        }
2429                                    }
2430
2431                                    None
2432                                })
2433                                .unwrap_or(Value::Null);
2434
2435                            let mut resolved_from_pda = false;
2436                            let resolved = if resolved.is_null() {
2437                                if let Some(pda_str) = current_value.as_str() {
2438                                    resolved_from_pda = true;
2439                                    self.states
2440                                        .get_mut(&actual_state_id)
2441                                        .and_then(|state_mut| {
2442                                            state_mut
2443                                                .pda_reverse_lookups
2444                                                .get_mut("default_pda_lookup")
2445                                        })
2446                                        .and_then(|pda_lookup| pda_lookup.lookup(pda_str))
2447                                        .map(Value::String)
2448                                        .unwrap_or(Value::Null)
2449                                } else {
2450                                    Value::Null
2451                                }
2452                            } else {
2453                                resolved
2454                            };
2455
2456                            if resolved.is_null() {
2457                                if iterations == 1 {
2458                                    if let Some(pda_str) = current_value.as_str() {
2459                                        self.last_pda_lookup_miss = Some(pda_str.to_string());
2460                                    }
2461                                }
2462                                break Value::Null;
2463                            }
2464
2465                            let can_chain =
2466                                self.can_resolve_further(&resolved, actual_state_id, index_name);
2467
2468                            if !can_chain {
2469                                if resolved_from_pda {
2470                                    if let Some(resolved_str) = resolved.as_str() {
2471                                        self.last_lookup_index_miss =
2472                                            Some(resolved_str.to_string());
2473                                    }
2474                                    break Value::Null;
2475                                }
2476                                break resolved;
2477                            }
2478
2479                            current_value = resolved;
2480                        }
2481                    } else {
2482                        Value::Null
2483                    };
2484
2485                    self.registers[*dest] = final_result;
2486                    pc += 1;
2487                }
2488                OpCode::SetFieldSum {
2489                    object,
2490                    path,
2491                    value,
2492                } => {
2493                    let was_updated = self.set_field_sum(*object, path, *value)?;
2494                    if was_updated && should_emit(path) {
2495                        dirty_tracker.mark_replaced(path);
2496                    }
2497                    pc += 1;
2498                }
2499                OpCode::SetFieldIncrement { object, path } => {
2500                    let was_updated = self.set_field_increment(*object, path)?;
2501                    if was_updated && should_emit(path) {
2502                        dirty_tracker.mark_replaced(path);
2503                    }
2504                    pc += 1;
2505                }
2506                OpCode::SetFieldMin {
2507                    object,
2508                    path,
2509                    value,
2510                } => {
2511                    let was_updated = self.set_field_min(*object, path, *value)?;
2512                    if was_updated && should_emit(path) {
2513                        dirty_tracker.mark_replaced(path);
2514                    }
2515                    pc += 1;
2516                }
2517                OpCode::AddToUniqueSet {
2518                    state_id: _,
2519                    set_name,
2520                    value,
2521                    count_object,
2522                    count_path,
2523                } => {
2524                    let value_to_add = self.registers[*value].clone();
2525
2526                    // Store the unique set within the entity object, not in the state table
2527                    // This ensures each entity instance has its own unique set
2528                    let set_field_path = format!("__unique_set:{}", set_name);
2529
2530                    // Get or create the unique set from the entity object
2531                    let mut set: HashSet<Value> =
2532                        if let Ok(existing) = self.get_field(*count_object, &set_field_path) {
2533                            if !existing.is_null() {
2534                                serde_json::from_value(existing).unwrap_or_default()
2535                            } else {
2536                                HashSet::new()
2537                            }
2538                        } else {
2539                            HashSet::new()
2540                        };
2541
2542                    // Add value to set
2543                    let was_new = set.insert(value_to_add);
2544
2545                    // Store updated set back in the entity object
2546                    let set_as_vec: Vec<Value> = set.iter().cloned().collect();
2547                    self.registers[100] = serde_json::to_value(set_as_vec)?;
2548                    self.set_field_auto_vivify(*count_object, &set_field_path, 100)?;
2549
2550                    // Update the count field in the object
2551                    if was_new {
2552                        self.registers[100] = Value::Number(serde_json::Number::from(set.len()));
2553                        self.set_field_auto_vivify(*count_object, count_path, 100)?;
2554                        if should_emit(count_path) {
2555                            dirty_tracker.mark_replaced(count_path);
2556                        }
2557                    }
2558
2559                    pc += 1;
2560                }
2561                OpCode::ConditionalSetField {
2562                    object,
2563                    path,
2564                    value,
2565                    condition_field,
2566                    condition_op,
2567                    condition_value,
2568                } => {
2569                    let field_value = self.load_field(event_value, condition_field, None)?;
2570                    let condition_met =
2571                        self.evaluate_comparison(&field_value, condition_op, condition_value)?;
2572
2573                    if condition_met {
2574                        self.set_field_auto_vivify(*object, path, *value)?;
2575                        if should_emit(path) {
2576                            dirty_tracker.mark_replaced(path);
2577                        }
2578                    }
2579                    pc += 1;
2580                }
2581                OpCode::SetFieldWhen {
2582                    object,
2583                    path,
2584                    value,
2585                    when_instruction,
2586                    entity_name,
2587                    key_reg,
2588                    condition_field,
2589                    condition_op,
2590                    condition_value,
2591                } => {
2592                    let actual_state_id = override_state_id;
2593                    let condition_met = if let (Some(field), Some(op), Some(cond_value)) = (
2594                        condition_field.as_ref(),
2595                        condition_op.as_ref(),
2596                        condition_value.as_ref(),
2597                    ) {
2598                        let field_value = self.load_field(event_value, field, None)?;
2599                        self.evaluate_comparison(&field_value, op, cond_value)?
2600                    } else {
2601                        true
2602                    };
2603
2604                    if !condition_met {
2605                        pc += 1;
2606                        continue;
2607                    }
2608
2609                    let signature = self
2610                        .current_context
2611                        .as_ref()
2612                        .and_then(|c| c.signature.clone())
2613                        .unwrap_or_default();
2614
2615                    let emit = should_emit(path);
2616
2617                    let instruction_seen = if !signature.is_empty() {
2618                        if let Some(state) = self.states.get(&actual_state_id) {
2619                            let mut cache = state.recent_tx_instructions.lock().unwrap();
2620                            cache
2621                                .get(&signature)
2622                                .map(|set| set.contains(when_instruction))
2623                                .unwrap_or(false)
2624                        } else {
2625                            false
2626                        }
2627                    } else {
2628                        false
2629                    };
2630
2631                    if instruction_seen {
2632                        self.set_field_auto_vivify(*object, path, *value)?;
2633                        if emit {
2634                            dirty_tracker.mark_replaced(path);
2635                        }
2636                    } else if !signature.is_empty() {
2637                        let deferred = DeferredWhenOperation {
2638                            entity_name: entity_name.clone(),
2639                            primary_key: self.registers[*key_reg].clone(),
2640                            field_path: path.clone(),
2641                            field_value: self.registers[*value].clone(),
2642                            when_instruction: when_instruction.clone(),
2643                            signature: signature.clone(),
2644                            slot: self
2645                                .current_context
2646                                .as_ref()
2647                                .and_then(|c| c.slot)
2648                                .unwrap_or(0),
2649                            deferred_at: std::time::SystemTime::now()
2650                                .duration_since(std::time::UNIX_EPOCH)
2651                                .unwrap()
2652                                .as_secs() as i64,
2653                            emit,
2654                        };
2655
2656                        if let Some(state) = self.states.get(&actual_state_id) {
2657                            let key = (signature, when_instruction.clone());
2658                            state
2659                                .deferred_when_ops
2660                                .entry(key)
2661                                .or_insert_with(Vec::new)
2662                                .push(deferred);
2663                        }
2664                    }
2665
2666                    pc += 1;
2667                }
2668                OpCode::SetFieldUnlessStopped {
2669                    object,
2670                    path,
2671                    value,
2672                    stop_field,
2673                    stop_instruction,
2674                    entity_name,
2675                    key_reg: _,
2676                } => {
2677                    let stop_value = self.get_field(*object, stop_field).unwrap_or(Value::Null);
2678                    let stopped = stop_value.as_bool().unwrap_or(false);
2679
2680                    if stopped {
2681                        tracing::debug!(
2682                            entity = %entity_name,
2683                            field = %path,
2684                            stop_field = %stop_field,
2685                            stop_instruction = %stop_instruction,
2686                            "stop flag set; skipping field update"
2687                        );
2688                        pc += 1;
2689                        continue;
2690                    }
2691
2692                    self.set_field_auto_vivify(*object, path, *value)?;
2693                    if should_emit(path) {
2694                        dirty_tracker.mark_replaced(path);
2695                    }
2696                    pc += 1;
2697                }
2698                OpCode::ConditionalIncrement {
2699                    object,
2700                    path,
2701                    condition_field,
2702                    condition_op,
2703                    condition_value,
2704                } => {
2705                    let field_value = self.load_field(event_value, condition_field, None)?;
2706                    let condition_met =
2707                        self.evaluate_comparison(&field_value, condition_op, condition_value)?;
2708
2709                    if condition_met {
2710                        let was_updated = self.set_field_increment(*object, path)?;
2711                        if was_updated && should_emit(path) {
2712                            dirty_tracker.mark_replaced(path);
2713                        }
2714                    }
2715                    pc += 1;
2716                }
2717                OpCode::EvaluateComputedFields {
2718                    state,
2719                    computed_paths,
2720                } => {
2721                    if let Some(evaluator) = entity_evaluator {
2722                        let old_values: Vec<_> = computed_paths
2723                            .iter()
2724                            .map(|path| Self::get_value_at_path(&self.registers[*state], path))
2725                            .collect();
2726
2727                        let state_value = &mut self.registers[*state];
2728                        let context_slot = self.current_context.as_ref().and_then(|c| c.slot);
2729                        let context_timestamp = self
2730                            .current_context
2731                            .as_ref()
2732                            .map(|c| c.timestamp())
2733                            .unwrap_or_else(|| {
2734                                std::time::SystemTime::now()
2735                                    .duration_since(std::time::UNIX_EPOCH)
2736                                    .unwrap()
2737                                    .as_secs() as i64
2738                            });
2739                        let eval_result = evaluator(state_value, context_slot, context_timestamp);
2740
2741                        if eval_result.is_ok() {
2742                            for (path, old_value) in computed_paths.iter().zip(old_values.iter()) {
2743                                let new_value =
2744                                    Self::get_value_at_path(&self.registers[*state], path);
2745
2746                                if new_value != *old_value && should_emit(path) {
2747                                    dirty_tracker.mark_replaced(path);
2748                                }
2749                            }
2750                        }
2751                    }
2752                    pc += 1;
2753                }
2754                OpCode::QueueResolver {
2755                    state_id: _,
2756                    entity_name,
2757                    resolver,
2758                    input_path,
2759                    input_value,
2760                    url_template,
2761                    strategy,
2762                    extracts,
2763                    condition,
2764                    schedule_at,
2765                    state,
2766                    key,
2767                } => {
2768                    let actual_state_id = override_state_id;
2769
2770                    // Skip resolvers for reprocessed cached data from PDA mapping changes.
2771                    // Stale data can carry wrong field values (e.g. old entropy_value) and
2772                    // wrong schedule_at slots that would lock in incorrect results via SetOnce.
2773                    if self
2774                        .current_context
2775                        .as_ref()
2776                        .map(|c| c.skip_resolvers)
2777                        .unwrap_or(false)
2778                    {
2779                        pc += 1;
2780                        continue;
2781                    }
2782
2783                    // Evaluate condition if present
2784                    if let Some(cond) = condition {
2785                        let field_val =
2786                            Self::get_value_at_path(&self.registers[*state], &cond.field_path)
2787                                .unwrap_or(Value::Null);
2788                        let condition_met =
2789                            self.evaluate_comparison(&field_val, &cond.op, &cond.value)?;
2790
2791                        if !condition_met {
2792                            pc += 1;
2793                            continue;
2794                        }
2795                    }
2796
2797                    // Check schedule_at: defer if target slot is in the future
2798                    if let Some(schedule_path) = schedule_at {
2799                        let target_val =
2800                            Self::get_value_at_path(&self.registers[*state], schedule_path);
2801
2802                        match target_val.and_then(|v| v.as_u64()) {
2803                            Some(target_slot) => {
2804                                let current_slot = self
2805                                    .current_context
2806                                    .as_ref()
2807                                    .and_then(|ctx| ctx.slot)
2808                                    .unwrap_or(0);
2809                                if current_slot < target_slot {
2810                                    let key_value = &self.registers[*key];
2811                                    if !key_value.is_null() {
2812                                        self.scheduled_callbacks.push((
2813                                            target_slot,
2814                                            ScheduledCallback {
2815                                                state_id: actual_state_id,
2816                                                entity_name: entity_name.clone(),
2817                                                primary_key: key_value.clone(),
2818                                                resolver: resolver.clone(),
2819                                                url_template: url_template.clone(),
2820                                                input_value: input_value.clone(),
2821                                                input_path: input_path.clone(),
2822                                                condition: condition.clone(),
2823                                                strategy: strategy.clone(),
2824                                                extracts: extracts.clone(),
2825                                                retry_count: 0,
2826                                            },
2827                                        ));
2828                                    }
2829                                    pc += 1;
2830                                    continue;
2831                                }
2832                                // current_slot >= target_slot: fall through to immediate resolution
2833                            }
2834                            None => {
2835                                // schedule_at path is missing or value is not a u64 —
2836                                // skip resolver entirely rather than executing immediately,
2837                                // since the state likely hasn't been fully populated yet.
2838                                pc += 1;
2839                                continue;
2840                            }
2841                        }
2842                    }
2843
2844                    // Build resolver input from template, literal value, or field path
2845                    let resolved_input = if let Some(template) = url_template {
2846                        crate::scheduler::build_url_from_template(template, &self.registers[*state])
2847                            .map(Value::String)
2848                    } else if let Some(value) = input_value {
2849                        Some(value.clone())
2850                    } else if let Some(path) = input_path.as_ref() {
2851                        Self::get_value_at_path(&self.registers[*state], path)
2852                    } else {
2853                        None
2854                    };
2855
2856                    if let Some(input) = resolved_input {
2857                        let key_value = &self.registers[*key];
2858
2859                        if input.is_null() || key_value.is_null() {
2860                            pc += 1;
2861                            continue;
2862                        }
2863
2864                        if matches!(strategy, ResolveStrategy::SetOnce)
2865                            // all(): fire resolver if any target is still null.
2866                            // Already-set fields are protected from overwrite by
2867                            // set_value_at_path's SetOnce null-source guard.
2868                            && extracts.iter().all(|extract| {
2869                                match Self::get_value_at_path(
2870                                    &self.registers[*state],
2871                                    &extract.target_path,
2872                                ) {
2873                                    Some(value) => !value.is_null(),
2874                                    None => false,
2875                                }
2876                            })
2877                        {
2878                            pc += 1;
2879                            continue;
2880                        }
2881
2882                        let cache_key = resolver_cache_key(resolver, &input);
2883
2884                        let cached_value = self.resolver_cache.get(&cache_key).cloned();
2885                        if let Some(cached) = cached_value {
2886                            self.resolver_cache_hits += 1;
2887                            Self::apply_resolver_extractions_to_value(
2888                                &mut self.registers[*state],
2889                                &cached,
2890                                extracts,
2891                                &mut dirty_tracker,
2892                                &should_emit,
2893                            )?;
2894                        } else {
2895                            self.resolver_cache_misses += 1;
2896                            let target = ResolverTarget {
2897                                state_id: actual_state_id,
2898                                entity_name: entity_name.clone(),
2899                                primary_key: self.registers[*key].clone(),
2900                                extracts: extracts.clone(),
2901                            };
2902
2903                            self.enqueue_resolver_request(
2904                                cache_key,
2905                                resolver.clone(),
2906                                input,
2907                                target,
2908                            );
2909                        }
2910                    }
2911
2912                    pc += 1;
2913                }
2914                OpCode::UpdatePdaReverseLookup {
2915                    state_id: _,
2916                    lookup_name,
2917                    pda_address,
2918                    primary_key,
2919                } => {
2920                    let actual_state_id = override_state_id;
2921                    let state = self
2922                        .states
2923                        .get_mut(&actual_state_id)
2924                        .ok_or("State table not found")?;
2925
2926                    let pda_val = self.registers[*pda_address].clone();
2927                    let pk_val = self.registers[*primary_key].clone();
2928
2929                    if let (Some(pda_str), Some(pk_str)) = (pda_val.as_str(), pk_val.as_str()) {
2930                        let pda_lookup = state
2931                            .pda_reverse_lookups
2932                            .entry(lookup_name.clone())
2933                            .or_insert_with(|| {
2934                                PdaReverseLookup::new(DEFAULT_MAX_PDA_REVERSE_LOOKUP_ENTRIES)
2935                            });
2936
2937                        pda_lookup.insert(pda_str.to_string(), pk_str.to_string());
2938                        self.last_pda_registered = Some(pda_str.to_string());
2939                    } else if !pk_val.is_null() {
2940                        if let Some(pk_num) = pk_val.as_u64() {
2941                            if let Some(pda_str) = pda_val.as_str() {
2942                                let pda_lookup = state
2943                                    .pda_reverse_lookups
2944                                    .entry(lookup_name.clone())
2945                                    .or_insert_with(|| {
2946                                        PdaReverseLookup::new(
2947                                            DEFAULT_MAX_PDA_REVERSE_LOOKUP_ENTRIES,
2948                                        )
2949                                    });
2950
2951                                pda_lookup.insert(pda_str.to_string(), pk_num.to_string());
2952                                self.last_pda_registered = Some(pda_str.to_string());
2953                            }
2954                        }
2955                    }
2956
2957                    pc += 1;
2958                }
2959            }
2960
2961            self.instructions_executed += 1;
2962        }
2963
2964        Ok(output)
2965    }
2966
2967    fn load_field(
2968        &self,
2969        event_value: &Value,
2970        path: &FieldPath,
2971        default: Option<&Value>,
2972    ) -> Result<Value> {
2973        if path.segments.is_empty() {
2974            if let Some(obj) = event_value.as_object() {
2975                let filtered: serde_json::Map<String, Value> = obj
2976                    .iter()
2977                    .filter(|(k, _)| !k.starts_with("__"))
2978                    .map(|(k, v)| (k.clone(), v.clone()))
2979                    .collect();
2980                return Ok(Value::Object(filtered));
2981            }
2982            return Ok(event_value.clone());
2983        }
2984
2985        let mut current = event_value;
2986        for segment in path.segments.iter() {
2987            current = match current.get(segment) {
2988                Some(v) => v,
2989                None => {
2990                    tracing::trace!(
2991                        "load_field: segment={:?} not found in {:?}, returning default",
2992                        segment,
2993                        current
2994                    );
2995                    return Ok(default.cloned().unwrap_or(Value::Null));
2996                }
2997            };
2998        }
2999
3000        tracing::trace!("load_field: path={:?}, result={:?}", path.segments, current);
3001        Ok(current.clone())
3002    }
3003
3004    fn get_value_at_path(value: &Value, path: &str) -> Option<Value> {
3005        let mut current = value;
3006        for segment in path.split('.') {
3007            current = current.get(segment)?;
3008        }
3009        Some(current.clone())
3010    }
3011
3012    fn set_field_auto_vivify(
3013        &mut self,
3014        object_reg: Register,
3015        path: &str,
3016        value_reg: Register,
3017    ) -> Result<()> {
3018        let compiled = self.get_compiled_path(path);
3019        let segments = compiled.segments();
3020        let value = self.registers[value_reg].clone();
3021
3022        if !self.registers[object_reg].is_object() {
3023            self.registers[object_reg] = json!({});
3024        }
3025
3026        let obj = self.registers[object_reg]
3027            .as_object_mut()
3028            .ok_or("Not an object")?;
3029
3030        let mut current = obj;
3031        for (i, segment) in segments.iter().enumerate() {
3032            if i == segments.len() - 1 {
3033                current.insert(segment.to_string(), value);
3034                return Ok(());
3035            } else {
3036                current
3037                    .entry(segment.to_string())
3038                    .or_insert_with(|| json!({}));
3039                current = current
3040                    .get_mut(segment)
3041                    .and_then(|v| v.as_object_mut())
3042                    .ok_or("Path collision: expected object")?;
3043            }
3044        }
3045
3046        Ok(())
3047    }
3048
3049    fn set_field_if_null(
3050        &mut self,
3051        object_reg: Register,
3052        path: &str,
3053        value_reg: Register,
3054    ) -> Result<bool> {
3055        let compiled = self.get_compiled_path(path);
3056        let segments = compiled.segments();
3057        let value = self.registers[value_reg].clone();
3058
3059        // SetOnce should only set meaningful values. A null source typically means
3060        // the field doesn't exist in this event type (e.g., instruction events don't
3061        // have account data). Skip to preserve any existing value.
3062        if value.is_null() {
3063            return Ok(false);
3064        }
3065
3066        if !self.registers[object_reg].is_object() {
3067            self.registers[object_reg] = json!({});
3068        }
3069
3070        let obj = self.registers[object_reg]
3071            .as_object_mut()
3072            .ok_or("Not an object")?;
3073
3074        let mut current = obj;
3075        for (i, segment) in segments.iter().enumerate() {
3076            if i == segments.len() - 1 {
3077                if !current.contains_key(segment) || current.get(segment).unwrap().is_null() {
3078                    current.insert(segment.to_string(), value);
3079                    return Ok(true);
3080                }
3081                return Ok(false);
3082            } else {
3083                current
3084                    .entry(segment.to_string())
3085                    .or_insert_with(|| json!({}));
3086                current = current
3087                    .get_mut(segment)
3088                    .and_then(|v| v.as_object_mut())
3089                    .ok_or("Path collision: expected object")?;
3090            }
3091        }
3092
3093        Ok(false)
3094    }
3095
3096    fn set_field_max(
3097        &mut self,
3098        object_reg: Register,
3099        path: &str,
3100        value_reg: Register,
3101    ) -> Result<bool> {
3102        let compiled = self.get_compiled_path(path);
3103        let segments = compiled.segments();
3104        let new_value = self.registers[value_reg].clone();
3105
3106        if !self.registers[object_reg].is_object() {
3107            self.registers[object_reg] = json!({});
3108        }
3109
3110        let obj = self.registers[object_reg]
3111            .as_object_mut()
3112            .ok_or("Not an object")?;
3113
3114        let mut current = obj;
3115        for (i, segment) in segments.iter().enumerate() {
3116            if i == segments.len() - 1 {
3117                let should_update = if let Some(current_value) = current.get(segment) {
3118                    if current_value.is_null() {
3119                        true
3120                    } else {
3121                        match (current_value.as_i64(), new_value.as_i64()) {
3122                            (Some(current_val), Some(new_val)) => new_val > current_val,
3123                            (Some(current_val), None) if new_value.as_u64().is_some() => {
3124                                new_value.as_u64().unwrap() as i64 > current_val
3125                            }
3126                            (None, Some(new_val)) if current_value.as_u64().is_some() => {
3127                                new_val > current_value.as_u64().unwrap() as i64
3128                            }
3129                            (None, None) => match (current_value.as_u64(), new_value.as_u64()) {
3130                                (Some(current_val), Some(new_val)) => new_val > current_val,
3131                                _ => match (current_value.as_f64(), new_value.as_f64()) {
3132                                    (Some(current_val), Some(new_val)) => new_val > current_val,
3133                                    _ => false,
3134                                },
3135                            },
3136                            _ => false,
3137                        }
3138                    }
3139                } else {
3140                    true
3141                };
3142
3143                if should_update {
3144                    current.insert(segment.to_string(), new_value);
3145                    return Ok(true);
3146                }
3147                return Ok(false);
3148            } else {
3149                current
3150                    .entry(segment.to_string())
3151                    .or_insert_with(|| json!({}));
3152                current = current
3153                    .get_mut(segment)
3154                    .and_then(|v| v.as_object_mut())
3155                    .ok_or("Path collision: expected object")?;
3156            }
3157        }
3158
3159        Ok(false)
3160    }
3161
3162    fn set_field_sum(
3163        &mut self,
3164        object_reg: Register,
3165        path: &str,
3166        value_reg: Register,
3167    ) -> Result<bool> {
3168        let compiled = self.get_compiled_path(path);
3169        let segments = compiled.segments();
3170        let new_value = &self.registers[value_reg];
3171
3172        // Extract numeric value before borrowing object_reg mutably
3173        tracing::trace!(
3174            "set_field_sum: path={:?}, value={:?}, value_type={}",
3175            path,
3176            new_value,
3177            match new_value {
3178                serde_json::Value::Null => "null",
3179                serde_json::Value::Bool(_) => "bool",
3180                serde_json::Value::Number(_) => "number",
3181                serde_json::Value::String(_) => "string",
3182                serde_json::Value::Array(_) => "array",
3183                serde_json::Value::Object(_) => "object",
3184            }
3185        );
3186        let new_val_num = new_value
3187            .as_i64()
3188            .or_else(|| new_value.as_u64().map(|n| n as i64))
3189            .ok_or("Sum requires numeric value")?;
3190
3191        if !self.registers[object_reg].is_object() {
3192            self.registers[object_reg] = json!({});
3193        }
3194
3195        let obj = self.registers[object_reg]
3196            .as_object_mut()
3197            .ok_or("Not an object")?;
3198
3199        let mut current = obj;
3200        for (i, segment) in segments.iter().enumerate() {
3201            if i == segments.len() - 1 {
3202                let current_val = current
3203                    .get(segment)
3204                    .and_then(|v| {
3205                        if v.is_null() {
3206                            None
3207                        } else {
3208                            v.as_i64().or_else(|| v.as_u64().map(|n| n as i64))
3209                        }
3210                    })
3211                    .unwrap_or(0);
3212
3213                let sum = current_val + new_val_num;
3214                current.insert(segment.to_string(), json!(sum));
3215                return Ok(true);
3216            } else {
3217                current
3218                    .entry(segment.to_string())
3219                    .or_insert_with(|| json!({}));
3220                current = current
3221                    .get_mut(segment)
3222                    .and_then(|v| v.as_object_mut())
3223                    .ok_or("Path collision: expected object")?;
3224            }
3225        }
3226
3227        Ok(false)
3228    }
3229
3230    fn set_field_increment(&mut self, object_reg: Register, path: &str) -> Result<bool> {
3231        let compiled = self.get_compiled_path(path);
3232        let segments = compiled.segments();
3233
3234        if !self.registers[object_reg].is_object() {
3235            self.registers[object_reg] = json!({});
3236        }
3237
3238        let obj = self.registers[object_reg]
3239            .as_object_mut()
3240            .ok_or("Not an object")?;
3241
3242        let mut current = obj;
3243        for (i, segment) in segments.iter().enumerate() {
3244            if i == segments.len() - 1 {
3245                // Get current value (default to 0 if null/missing)
3246                let current_val = current
3247                    .get(segment)
3248                    .and_then(|v| {
3249                        if v.is_null() {
3250                            None
3251                        } else {
3252                            v.as_i64().or_else(|| v.as_u64().map(|n| n as i64))
3253                        }
3254                    })
3255                    .unwrap_or(0);
3256
3257                let incremented = current_val + 1;
3258                current.insert(segment.to_string(), json!(incremented));
3259                return Ok(true);
3260            } else {
3261                current
3262                    .entry(segment.to_string())
3263                    .or_insert_with(|| json!({}));
3264                current = current
3265                    .get_mut(segment)
3266                    .and_then(|v| v.as_object_mut())
3267                    .ok_or("Path collision: expected object")?;
3268            }
3269        }
3270
3271        Ok(false)
3272    }
3273
3274    fn set_field_min(
3275        &mut self,
3276        object_reg: Register,
3277        path: &str,
3278        value_reg: Register,
3279    ) -> Result<bool> {
3280        let compiled = self.get_compiled_path(path);
3281        let segments = compiled.segments();
3282        let new_value = self.registers[value_reg].clone();
3283
3284        if !self.registers[object_reg].is_object() {
3285            self.registers[object_reg] = json!({});
3286        }
3287
3288        let obj = self.registers[object_reg]
3289            .as_object_mut()
3290            .ok_or("Not an object")?;
3291
3292        let mut current = obj;
3293        for (i, segment) in segments.iter().enumerate() {
3294            if i == segments.len() - 1 {
3295                let should_update = if let Some(current_value) = current.get(segment) {
3296                    if current_value.is_null() {
3297                        true
3298                    } else {
3299                        match (current_value.as_i64(), new_value.as_i64()) {
3300                            (Some(current_val), Some(new_val)) => new_val < current_val,
3301                            (Some(current_val), None) if new_value.as_u64().is_some() => {
3302                                (new_value.as_u64().unwrap() as i64) < current_val
3303                            }
3304                            (None, Some(new_val)) if current_value.as_u64().is_some() => {
3305                                new_val < current_value.as_u64().unwrap() as i64
3306                            }
3307                            (None, None) => match (current_value.as_u64(), new_value.as_u64()) {
3308                                (Some(current_val), Some(new_val)) => new_val < current_val,
3309                                _ => match (current_value.as_f64(), new_value.as_f64()) {
3310                                    (Some(current_val), Some(new_val)) => new_val < current_val,
3311                                    _ => false,
3312                                },
3313                            },
3314                            _ => false,
3315                        }
3316                    }
3317                } else {
3318                    true
3319                };
3320
3321                if should_update {
3322                    current.insert(segment.to_string(), new_value);
3323                    return Ok(true);
3324                }
3325                return Ok(false);
3326            } else {
3327                current
3328                    .entry(segment.to_string())
3329                    .or_insert_with(|| json!({}));
3330                current = current
3331                    .get_mut(segment)
3332                    .and_then(|v| v.as_object_mut())
3333                    .ok_or("Path collision: expected object")?;
3334            }
3335        }
3336
3337        Ok(false)
3338    }
3339
3340    fn get_field(&mut self, object_reg: Register, path: &str) -> Result<Value> {
3341        let compiled = self.get_compiled_path(path);
3342        let segments = compiled.segments();
3343        let mut current = &self.registers[object_reg];
3344
3345        for segment in segments {
3346            current = current
3347                .get(segment)
3348                .ok_or_else(|| format!("Field not found: {}", segment))?;
3349        }
3350
3351        Ok(current.clone())
3352    }
3353
3354    fn append_to_array(
3355        &mut self,
3356        object_reg: Register,
3357        path: &str,
3358        value_reg: Register,
3359        max_length: usize,
3360    ) -> Result<()> {
3361        let compiled = self.get_compiled_path(path);
3362        let segments = compiled.segments();
3363        let value = self.registers[value_reg].clone();
3364
3365        if !self.registers[object_reg].is_object() {
3366            self.registers[object_reg] = json!({});
3367        }
3368
3369        let obj = self.registers[object_reg]
3370            .as_object_mut()
3371            .ok_or("Not an object")?;
3372
3373        let mut current = obj;
3374        for (i, segment) in segments.iter().enumerate() {
3375            if i == segments.len() - 1 {
3376                current
3377                    .entry(segment.to_string())
3378                    .or_insert_with(|| json!([]));
3379                let arr = current
3380                    .get_mut(segment)
3381                    .and_then(|v| v.as_array_mut())
3382                    .ok_or("Path is not an array")?;
3383                arr.push(value.clone());
3384
3385                if arr.len() > max_length {
3386                    let excess = arr.len() - max_length;
3387                    arr.drain(0..excess);
3388                }
3389            } else {
3390                current
3391                    .entry(segment.to_string())
3392                    .or_insert_with(|| json!({}));
3393                current = current
3394                    .get_mut(segment)
3395                    .and_then(|v| v.as_object_mut())
3396                    .ok_or("Path collision: expected object")?;
3397            }
3398        }
3399
3400        Ok(())
3401    }
3402
3403    fn transform_in_place(&mut self, reg: Register, transformation: &Transformation) -> Result<()> {
3404        let value = &self.registers[reg];
3405        let transformed = Self::apply_transformation(value, transformation)?;
3406        self.registers[reg] = transformed;
3407        Ok(())
3408    }
3409
3410    fn apply_transformation(value: &Value, transformation: &Transformation) -> Result<Value> {
3411        match transformation {
3412            Transformation::HexEncode => {
3413                if let Some(arr) = value.as_array() {
3414                    let bytes: Vec<u8> = arr
3415                        .iter()
3416                        .filter_map(|v| v.as_u64().map(|n| n as u8))
3417                        .collect();
3418                    let hex = hex::encode(&bytes);
3419                    Ok(json!(hex))
3420                } else if value.is_string() {
3421                    Ok(value.clone())
3422                } else {
3423                    Err("HexEncode requires an array of numbers".into())
3424                }
3425            }
3426            Transformation::HexDecode => {
3427                if let Some(s) = value.as_str() {
3428                    let s = s.strip_prefix("0x").unwrap_or(s);
3429                    let bytes = hex::decode(s).map_err(|e| format!("Hex decode error: {}", e))?;
3430                    Ok(json!(bytes))
3431                } else {
3432                    Err("HexDecode requires a string".into())
3433                }
3434            }
3435            Transformation::Base58Encode => {
3436                if let Some(arr) = value.as_array() {
3437                    let bytes: Vec<u8> = arr
3438                        .iter()
3439                        .filter_map(|v| v.as_u64().map(|n| n as u8))
3440                        .collect();
3441                    let encoded = bs58::encode(&bytes).into_string();
3442                    Ok(json!(encoded))
3443                } else if value.is_string() {
3444                    Ok(value.clone())
3445                } else {
3446                    Err("Base58Encode requires an array of numbers".into())
3447                }
3448            }
3449            Transformation::Base58Decode => {
3450                if let Some(s) = value.as_str() {
3451                    let bytes = bs58::decode(s)
3452                        .into_vec()
3453                        .map_err(|e| format!("Base58 decode error: {}", e))?;
3454                    Ok(json!(bytes))
3455                } else {
3456                    Err("Base58Decode requires a string".into())
3457                }
3458            }
3459            Transformation::ToString => Ok(json!(value.to_string())),
3460            Transformation::ToNumber => {
3461                if let Some(s) = value.as_str() {
3462                    let n = s
3463                        .parse::<i64>()
3464                        .map_err(|e| format!("Parse error: {}", e))?;
3465                    Ok(json!(n))
3466                } else {
3467                    Ok(value.clone())
3468                }
3469            }
3470        }
3471    }
3472
3473    fn evaluate_comparison(
3474        &self,
3475        field_value: &Value,
3476        op: &ComparisonOp,
3477        condition_value: &Value,
3478    ) -> Result<bool> {
3479        use ComparisonOp::*;
3480
3481        match op {
3482            Equal => Ok(field_value == condition_value),
3483            NotEqual => Ok(field_value != condition_value),
3484            GreaterThan => {
3485                // Try to compare as numbers
3486                match (field_value.as_i64(), condition_value.as_i64()) {
3487                    (Some(a), Some(b)) => Ok(a > b),
3488                    _ => match (field_value.as_u64(), condition_value.as_u64()) {
3489                        (Some(a), Some(b)) => Ok(a > b),
3490                        _ => match (field_value.as_f64(), condition_value.as_f64()) {
3491                            (Some(a), Some(b)) => Ok(a > b),
3492                            _ => Err("Cannot compare non-numeric values with GreaterThan".into()),
3493                        },
3494                    },
3495                }
3496            }
3497            GreaterThanOrEqual => match (field_value.as_i64(), condition_value.as_i64()) {
3498                (Some(a), Some(b)) => Ok(a >= b),
3499                _ => match (field_value.as_u64(), condition_value.as_u64()) {
3500                    (Some(a), Some(b)) => Ok(a >= b),
3501                    _ => match (field_value.as_f64(), condition_value.as_f64()) {
3502                        (Some(a), Some(b)) => Ok(a >= b),
3503                        _ => {
3504                            Err("Cannot compare non-numeric values with GreaterThanOrEqual".into())
3505                        }
3506                    },
3507                },
3508            },
3509            LessThan => match (field_value.as_i64(), condition_value.as_i64()) {
3510                (Some(a), Some(b)) => Ok(a < b),
3511                _ => match (field_value.as_u64(), condition_value.as_u64()) {
3512                    (Some(a), Some(b)) => Ok(a < b),
3513                    _ => match (field_value.as_f64(), condition_value.as_f64()) {
3514                        (Some(a), Some(b)) => Ok(a < b),
3515                        _ => Err("Cannot compare non-numeric values with LessThan".into()),
3516                    },
3517                },
3518            },
3519            LessThanOrEqual => match (field_value.as_i64(), condition_value.as_i64()) {
3520                (Some(a), Some(b)) => Ok(a <= b),
3521                _ => match (field_value.as_u64(), condition_value.as_u64()) {
3522                    (Some(a), Some(b)) => Ok(a <= b),
3523                    _ => match (field_value.as_f64(), condition_value.as_f64()) {
3524                        (Some(a), Some(b)) => Ok(a <= b),
3525                        _ => Err("Cannot compare non-numeric values with LessThanOrEqual".into()),
3526                    },
3527                },
3528            },
3529        }
3530    }
3531
3532    fn can_resolve_further(&self, value: &Value, state_id: u32, index_name: &str) -> bool {
3533        if let Some(state) = self.states.get(&state_id) {
3534            if let Some(index) = state.lookup_indexes.get(index_name) {
3535                if index.lookup(value).is_some() {
3536                    return true;
3537                }
3538            }
3539
3540            for (name, index) in state.lookup_indexes.iter() {
3541                if name == index_name {
3542                    continue;
3543                }
3544                if index.lookup(value).is_some() {
3545                    return true;
3546                }
3547            }
3548
3549            if let Some(pda_str) = value.as_str() {
3550                if let Some(pda_lookup) = state.pda_reverse_lookups.get("default_pda_lookup") {
3551                    if pda_lookup.contains(pda_str) {
3552                        return true;
3553                    }
3554                }
3555            }
3556        }
3557
3558        false
3559    }
3560
3561    #[allow(clippy::type_complexity)]
3562    fn apply_deferred_when_op(
3563        &mut self,
3564        state_id: u32,
3565        op: &DeferredWhenOperation,
3566        entity_evaluator: Option<
3567            &Box<dyn Fn(&mut Value, Option<u64>, i64) -> Result<()> + Send + Sync>,
3568        >,
3569        computed_paths: Option<&[String]>,
3570    ) -> Result<Vec<Mutation>> {
3571        let state = self.states.get(&state_id).ok_or("State not found")?;
3572
3573        if op.primary_key.is_null() {
3574            return Ok(vec![]);
3575        }
3576
3577        let mut entity_state = state
3578            .get_and_touch(&op.primary_key)
3579            .unwrap_or_else(|| json!({}));
3580
3581        // Track old values of computed fields before setting the new value
3582        let old_computed_values: Vec<_> = computed_paths
3583            .map(|paths| {
3584                paths
3585                    .iter()
3586                    .map(|path| Self::get_value_at_path(&entity_state, path))
3587                    .collect()
3588            })
3589            .unwrap_or_default();
3590
3591        Self::set_nested_field_value(&mut entity_state, &op.field_path, op.field_value.clone())?;
3592
3593        // Re-evaluate computed fields if an evaluator is provided
3594        if let Some(evaluator) = entity_evaluator {
3595            let context_slot = self.current_context.as_ref().and_then(|c| c.slot);
3596            let context_timestamp = self
3597                .current_context
3598                .as_ref()
3599                .map(|c| c.timestamp())
3600                .unwrap_or_else(|| {
3601                    std::time::SystemTime::now()
3602                        .duration_since(std::time::UNIX_EPOCH)
3603                        .unwrap()
3604                        .as_secs() as i64
3605                });
3606
3607            tracing::debug!(
3608                entity_name = %op.entity_name,
3609                primary_key = %op.primary_key,
3610                field_path = %op.field_path,
3611                "Re-evaluating computed fields after deferred when-op"
3612            );
3613
3614            if let Err(e) = evaluator(&mut entity_state, context_slot, context_timestamp) {
3615                tracing::warn!(
3616                    entity_name = %op.entity_name,
3617                    primary_key = %op.primary_key,
3618                    error = %e,
3619                    "Failed to evaluate computed fields after deferred when-op"
3620                );
3621            }
3622        }
3623
3624        state.insert_with_eviction(op.primary_key.clone(), entity_state.clone());
3625
3626        if !op.emit {
3627            return Ok(vec![]);
3628        }
3629
3630        let mut patch = json!({});
3631        Self::set_nested_field_value(&mut patch, &op.field_path, op.field_value.clone())?;
3632
3633        // Add computed field changes to the patch
3634        if let Some(paths) = computed_paths {
3635            tracing::debug!(
3636                entity_name = %op.entity_name,
3637                primary_key = %op.primary_key,
3638                computed_paths_count = paths.len(),
3639                "Checking computed fields for changes after deferred when-op"
3640            );
3641            for (path, old_value) in paths.iter().zip(old_computed_values.iter()) {
3642                let new_value = Self::get_value_at_path(&entity_state, path);
3643                tracing::debug!(
3644                    entity_name = %op.entity_name,
3645                    primary_key = %op.primary_key,
3646                    field_path = %path,
3647                    old_value = ?old_value,
3648                    new_value = ?new_value,
3649                    "Comparing computed field values"
3650                );
3651                if let Some(ref new_val) = new_value {
3652                    if Some(new_val) != old_value.as_ref() {
3653                        Self::set_nested_field_value(&mut patch, path, new_val.clone())?;
3654                        tracing::info!(
3655                            entity_name = %op.entity_name,
3656                            primary_key = %op.primary_key,
3657                            field_path = %path,
3658                            "Computed field changed after deferred when-op, including in mutation"
3659                        );
3660                    }
3661                }
3662            }
3663        }
3664
3665        Ok(vec![Mutation {
3666            export: op.entity_name.clone(),
3667            key: op.primary_key.clone(),
3668            patch,
3669            append: vec![],
3670        }])
3671    }
3672
3673    fn set_nested_field_value(obj: &mut Value, path: &str, value: Value) -> Result<()> {
3674        let parts: Vec<&str> = path.split('.').collect();
3675        let mut current = obj;
3676
3677        for (i, part) in parts.iter().enumerate() {
3678            if i == parts.len() - 1 {
3679                if let Some(map) = current.as_object_mut() {
3680                    map.insert(part.to_string(), value);
3681                    return Ok(());
3682                }
3683                return Err("Cannot set field on non-object".into());
3684            }
3685
3686            if current.get(*part).is_none() || !current.get(*part).unwrap().is_object() {
3687                if let Some(map) = current.as_object_mut() {
3688                    map.insert(part.to_string(), json!({}));
3689                }
3690            }
3691
3692            current = current.get_mut(*part).ok_or("Path navigation failed")?;
3693        }
3694
3695        Ok(())
3696    }
3697
3698    pub fn cleanup_expired_when_ops(&mut self, state_id: u32, max_age_secs: i64) -> usize {
3699        let now = std::time::SystemTime::now()
3700            .duration_since(std::time::UNIX_EPOCH)
3701            .unwrap()
3702            .as_secs() as i64;
3703
3704        let state = match self.states.get(&state_id) {
3705            Some(s) => s,
3706            None => return 0,
3707        };
3708
3709        let mut removed = 0;
3710        state.deferred_when_ops.retain(|_, ops| {
3711            let before = ops.len();
3712            ops.retain(|op| now - op.deferred_at < max_age_secs);
3713            removed += before - ops.len();
3714            !ops.is_empty()
3715        });
3716
3717        removed
3718    }
3719
3720    /// Update a PDA reverse lookup and return pending updates for reprocessing.
3721    /// Returns any pending account updates that were queued for this PDA.
3722    /// ```ignore
3723    /// let pending = vm.update_pda_reverse_lookup(state_id, lookup_name, pda_addr, seed)?;
3724    /// for update in pending {
3725    ///     vm.process_event(&bytecode, update.account_data, &update.account_type, None, None)?;
3726    /// }
3727    /// ```
3728    #[cfg_attr(feature = "otel", instrument(
3729        name = "vm.update_pda_lookup",
3730        skip(self),
3731        fields(
3732            pda = %pda_address,
3733            seed = %seed_value,
3734        )
3735    ))]
3736    pub fn update_pda_reverse_lookup(
3737        &mut self,
3738        state_id: u32,
3739        lookup_name: &str,
3740        pda_address: String,
3741        seed_value: String,
3742    ) -> Result<Vec<PendingAccountUpdate>> {
3743        let state = self
3744            .states
3745            .get_mut(&state_id)
3746            .ok_or("State table not found")?;
3747
3748        let lookup = state
3749            .pda_reverse_lookups
3750            .entry(lookup_name.to_string())
3751            .or_insert_with(|| PdaReverseLookup::new(DEFAULT_MAX_PDA_REVERSE_LOOKUP_ENTRIES));
3752
3753        // Detect if the PDA mapping is CHANGING (same PDA, different seed).
3754        // This happens at round boundaries when e.g. entropyVar is remapped
3755        // from old_round to new_round by a Reset instruction.
3756        let old_seed = lookup.index.peek(&pda_address).cloned();
3757        let mapping_changed = old_seed
3758            .as_ref()
3759            .map(|old| old != &seed_value)
3760            .unwrap_or(false);
3761
3762        if !mapping_changed && old_seed.is_none() {
3763            tracing::info!(
3764                pda = %pda_address,
3765                seed = %seed_value,
3766                "[PDA] First-time PDA reverse lookup established"
3767            );
3768        } else if !mapping_changed {
3769            tracing::debug!(
3770                pda = %pda_address,
3771                seed = %seed_value,
3772                "[PDA] PDA reverse lookup re-registered (same mapping)"
3773            );
3774        }
3775
3776        let evicted_pda = lookup.insert(pda_address.clone(), seed_value.clone());
3777
3778        if let Some(ref evicted) = evicted_pda {
3779            if let Some((_, evicted_updates)) = state.pending_updates.remove(evicted) {
3780                let count = evicted_updates.len();
3781                self.pending_queue_size = self.pending_queue_size.saturating_sub(count as u64);
3782            }
3783        }
3784
3785        // Flush pending updates from QueueUntil for this PDA
3786        let mut pending = self.flush_pending_updates(state_id, &pda_address)?;
3787
3788        // When the mapping changed, the last account update for this PDA was
3789        // processed with the OLD seed (wrong key).  We need to:
3790        // 1. Remove stale lookup-index entries that map this PDA address to the
3791        //    old primary key — otherwise LookupIndex resolves the stale entry
3792        //    before PDA reverse lookup is even tried.
3793        // 2. Pull the cached account data and return it for reprocessing with
3794        //    the new mapping.
3795        if mapping_changed {
3796            if let Some(state) = self.states.get(&state_id) {
3797                // Clear stale lookup-index entries for this PDA address
3798                for index in state.lookup_indexes.values() {
3799                    index.remove(&Value::String(pda_address.clone()));
3800                }
3801
3802                if let Some((_, mut cached)) = state.last_account_data.remove(&pda_address) {
3803                    tracing::info!(
3804                        pda = %pda_address,
3805                        old_seed = ?old_seed,
3806                        new_seed = %seed_value,
3807                        account_type = %cached.account_type,
3808                        "PDA mapping changed — clearing stale indexes and reprocessing cached data"
3809                    );
3810                    cached.is_stale_reprocess = true;
3811                    pending.push(cached);
3812                }
3813            }
3814        }
3815
3816        Ok(pending)
3817    }
3818
3819    /// Clean up expired pending updates that are older than the TTL
3820    ///
3821    /// Returns the number of updates that were removed.
3822    /// This should be called periodically to prevent memory leaks from orphaned updates.
3823    pub fn cleanup_expired_pending_updates(&mut self, state_id: u32) -> usize {
3824        let state = match self.states.get_mut(&state_id) {
3825            Some(s) => s,
3826            None => return 0,
3827        };
3828
3829        let now = std::time::SystemTime::now()
3830            .duration_since(std::time::UNIX_EPOCH)
3831            .unwrap()
3832            .as_secs() as i64;
3833
3834        let mut removed_count = 0;
3835
3836        // Iterate through all pending updates and remove expired ones
3837        state.pending_updates.retain(|_pda_address, updates| {
3838            let original_len = updates.len();
3839
3840            updates.retain(|update| {
3841                let age = now - update.queued_at;
3842                age <= PENDING_UPDATE_TTL_SECONDS
3843            });
3844
3845            removed_count += original_len - updates.len();
3846
3847            // Remove the entry entirely if no updates remain
3848            !updates.is_empty()
3849        });
3850
3851        // Update the global counter
3852        self.pending_queue_size = self.pending_queue_size.saturating_sub(removed_count as u64);
3853
3854        if removed_count > 0 {
3855            #[cfg(feature = "otel")]
3856            crate::vm_metrics::record_pending_updates_expired(
3857                removed_count as u64,
3858                &state.entity_name,
3859            );
3860        }
3861
3862        removed_count
3863    }
3864
3865    /// Queue an account update for later processing when PDA reverse lookup is not yet available
3866    ///
3867    /// # Workflow
3868    ///
3869    /// This implements a deferred processing pattern for account updates when the PDA reverse
3870    /// lookup needed to resolve the primary key is not yet available:
3871    ///
3872    /// 1. **Initial Account Update**: When an account update arrives but the PDA reverse lookup
3873    ///    is not available, call `queue_account_update()` to queue it for later.
3874    ///
3875    /// 2. **Register PDA Mapping**: When the instruction that establishes the PDA mapping is
3876    ///    processed, call `update_pda_reverse_lookup()` which returns pending updates.
3877    ///
3878    /// 3. **Reprocess Pending Updates**: Process the returned pending updates through the VM:
3879    ///    ```ignore
3880    ///    let pending = vm.update_pda_reverse_lookup(state_id, lookup_name, pda_addr, seed)?;
3881    ///    for update in pending {
3882    ///        let mutations = vm.process_event(
3883    ///            &bytecode, update.account_data, &update.account_type, None, None
3884    ///        )?;
3885    ///    }
3886    ///    ```
3887    ///
3888    /// # Arguments
3889    ///
3890    /// * `state_id` - The state table ID
3891    /// * `pda_address` - The PDA address that needs reverse lookup
3892    /// * `account_type` - The event type name for reprocessing
3893    /// * `account_data` - The account data (event value) for reprocessing
3894    /// * `slot` - The slot number when this update occurred
3895    /// * `signature` - The transaction signature
3896    #[cfg_attr(feature = "otel", instrument(
3897        name = "vm.queue_account_update",
3898        skip(self, update),
3899        fields(
3900            pda = %update.pda_address,
3901            account_type = %update.account_type,
3902            slot = update.slot,
3903        )
3904    ))]
3905    pub fn queue_account_update(
3906        &mut self,
3907        state_id: u32,
3908        update: QueuedAccountUpdate,
3909    ) -> Result<()> {
3910        if self.pending_queue_size >= MAX_PENDING_UPDATES_TOTAL as u64 {
3911            self.cleanup_expired_pending_updates(state_id);
3912            if self.pending_queue_size >= MAX_PENDING_UPDATES_TOTAL as u64 {
3913                self.drop_oldest_pending_update(state_id)?;
3914            }
3915        }
3916
3917        let state = self
3918            .states
3919            .get_mut(&state_id)
3920            .ok_or("State table not found")?;
3921
3922        let pending = PendingAccountUpdate {
3923            account_type: update.account_type,
3924            pda_address: update.pda_address.clone(),
3925            account_data: update.account_data,
3926            slot: update.slot,
3927            write_version: update.write_version,
3928            signature: update.signature,
3929            queued_at: std::time::SystemTime::now()
3930                .duration_since(std::time::UNIX_EPOCH)
3931                .unwrap()
3932                .as_secs() as i64,
3933            is_stale_reprocess: false,
3934        };
3935
3936        let pda_address = pending.pda_address.clone();
3937        let slot = pending.slot;
3938
3939        let mut updates = state
3940            .pending_updates
3941            .entry(pda_address.clone())
3942            .or_insert_with(Vec::new);
3943
3944        let original_len = updates.len();
3945        updates.retain(|existing| existing.slot > slot);
3946        let removed_by_dedup = original_len - updates.len();
3947
3948        if removed_by_dedup > 0 {
3949            self.pending_queue_size = self
3950                .pending_queue_size
3951                .saturating_sub(removed_by_dedup as u64);
3952        }
3953
3954        if updates.len() >= MAX_PENDING_UPDATES_PER_PDA {
3955            updates.remove(0);
3956            self.pending_queue_size = self.pending_queue_size.saturating_sub(1);
3957        }
3958
3959        updates.push(pending);
3960        #[cfg(feature = "otel")]
3961        crate::vm_metrics::record_pending_update_queued(&state.entity_name);
3962
3963        Ok(())
3964    }
3965
3966    pub fn queue_instruction_event(
3967        &mut self,
3968        state_id: u32,
3969        event: QueuedInstructionEvent,
3970    ) -> Result<()> {
3971        let state = self
3972            .states
3973            .get_mut(&state_id)
3974            .ok_or("State table not found")?;
3975
3976        let pda_address = event.pda_address.clone();
3977
3978        let pending = PendingInstructionEvent {
3979            event_type: event.event_type,
3980            pda_address: event.pda_address,
3981            event_data: event.event_data,
3982            slot: event.slot,
3983            signature: event.signature,
3984            queued_at: std::time::SystemTime::now()
3985                .duration_since(std::time::UNIX_EPOCH)
3986                .unwrap()
3987                .as_secs() as i64,
3988        };
3989
3990        let mut events = state
3991            .pending_instruction_events
3992            .entry(pda_address)
3993            .or_insert_with(Vec::new);
3994
3995        if events.len() >= MAX_PENDING_UPDATES_PER_PDA {
3996            events.remove(0);
3997        }
3998
3999        events.push(pending);
4000
4001        Ok(())
4002    }
4003
4004    pub fn take_last_pda_lookup_miss(&mut self) -> Option<String> {
4005        self.last_pda_lookup_miss.take()
4006    }
4007
4008    pub fn take_last_lookup_index_miss(&mut self) -> Option<String> {
4009        self.last_lookup_index_miss.take()
4010    }
4011
4012    pub fn take_last_pda_registered(&mut self) -> Option<String> {
4013        self.last_pda_registered.take()
4014    }
4015
4016    pub fn take_last_lookup_index_keys(&mut self) -> Vec<String> {
4017        std::mem::take(&mut self.last_lookup_index_keys)
4018    }
4019
4020    pub fn flush_pending_instruction_events(
4021        &mut self,
4022        state_id: u32,
4023        pda_address: &str,
4024    ) -> Vec<PendingInstructionEvent> {
4025        let state = match self.states.get_mut(&state_id) {
4026            Some(s) => s,
4027            None => return Vec::new(),
4028        };
4029
4030        if let Some((_, events)) = state.pending_instruction_events.remove(pda_address) {
4031            events
4032        } else {
4033            Vec::new()
4034        }
4035    }
4036
4037    /// Get statistics about the pending queue for monitoring
4038    pub fn get_pending_queue_stats(&self, state_id: u32) -> Option<PendingQueueStats> {
4039        let state = self.states.get(&state_id)?;
4040
4041        let now = std::time::SystemTime::now()
4042            .duration_since(std::time::UNIX_EPOCH)
4043            .unwrap()
4044            .as_secs() as i64;
4045
4046        let mut total_updates = 0;
4047        let mut oldest_timestamp = now;
4048        let mut largest_pda_queue = 0;
4049        let mut estimated_memory = 0;
4050
4051        for entry in state.pending_updates.iter() {
4052            let (_, updates) = entry.pair();
4053            total_updates += updates.len();
4054            largest_pda_queue = largest_pda_queue.max(updates.len());
4055
4056            for update in updates.iter() {
4057                oldest_timestamp = oldest_timestamp.min(update.queued_at);
4058                // Rough memory estimate
4059                estimated_memory += update.account_type.len() +
4060                                   update.pda_address.len() +
4061                                   update.signature.len() +
4062                                   16 + // slot + queued_at
4063                                   estimate_json_size(&update.account_data);
4064            }
4065        }
4066
4067        Some(PendingQueueStats {
4068            total_updates,
4069            unique_pdas: state.pending_updates.len(),
4070            oldest_age_seconds: now - oldest_timestamp,
4071            largest_pda_queue_size: largest_pda_queue,
4072            estimated_memory_bytes: estimated_memory,
4073        })
4074    }
4075
4076    pub fn get_memory_stats(&self, state_id: u32) -> VmMemoryStats {
4077        let mut stats = VmMemoryStats {
4078            path_cache_size: self.path_cache.len(),
4079            ..Default::default()
4080        };
4081
4082        if let Some(state) = self.states.get(&state_id) {
4083            stats.state_table_entity_count = state.data.len();
4084            stats.state_table_max_entries = state.config.max_entries;
4085            stats.state_table_at_capacity = state.is_at_capacity();
4086
4087            stats.lookup_index_count = state.lookup_indexes.len();
4088            stats.lookup_index_total_entries =
4089                state.lookup_indexes.values().map(|idx| idx.len()).sum();
4090
4091            stats.temporal_index_count = state.temporal_indexes.len();
4092            stats.temporal_index_total_entries = state
4093                .temporal_indexes
4094                .values()
4095                .map(|idx| idx.total_entries())
4096                .sum();
4097
4098            stats.pda_reverse_lookup_count = state.pda_reverse_lookups.len();
4099            stats.pda_reverse_lookup_total_entries = state
4100                .pda_reverse_lookups
4101                .values()
4102                .map(|lookup| lookup.len())
4103                .sum();
4104
4105            stats.version_tracker_entries = state.version_tracker.len();
4106
4107            stats.pending_queue_stats = self.get_pending_queue_stats(state_id);
4108        }
4109
4110        stats
4111    }
4112
4113    pub fn cleanup_all_expired(&mut self, state_id: u32) -> CleanupResult {
4114        let pending_removed = self.cleanup_expired_pending_updates(state_id);
4115        let temporal_removed = self.cleanup_temporal_indexes(state_id);
4116
4117        #[cfg(feature = "otel")]
4118        if let Some(state) = self.states.get(&state_id) {
4119            crate::vm_metrics::record_cleanup(
4120                pending_removed,
4121                temporal_removed,
4122                &state.entity_name,
4123            );
4124        }
4125
4126        CleanupResult {
4127            pending_updates_removed: pending_removed,
4128            temporal_entries_removed: temporal_removed,
4129        }
4130    }
4131
4132    fn cleanup_temporal_indexes(&mut self, state_id: u32) -> usize {
4133        let state = match self.states.get_mut(&state_id) {
4134            Some(s) => s,
4135            None => return 0,
4136        };
4137
4138        let now = std::time::SystemTime::now()
4139            .duration_since(std::time::UNIX_EPOCH)
4140            .unwrap()
4141            .as_secs() as i64;
4142
4143        let cutoff = now - TEMPORAL_HISTORY_TTL_SECONDS;
4144        let mut total_removed = 0;
4145
4146        for (_, index) in state.temporal_indexes.iter_mut() {
4147            total_removed += index.cleanup_expired(cutoff);
4148        }
4149
4150        total_removed
4151    }
4152
4153    pub fn check_state_table_capacity(&self, state_id: u32) -> Option<CapacityWarning> {
4154        let state = self.states.get(&state_id)?;
4155
4156        if state.is_at_capacity() {
4157            Some(CapacityWarning {
4158                current_entries: state.data.len(),
4159                max_entries: state.config.max_entries,
4160                entries_over_limit: state.entries_over_limit(),
4161            })
4162        } else {
4163            None
4164        }
4165    }
4166
4167    /// Drop the oldest pending update across all PDAs
4168    fn drop_oldest_pending_update(&mut self, state_id: u32) -> Result<()> {
4169        let state = self
4170            .states
4171            .get_mut(&state_id)
4172            .ok_or("State table not found")?;
4173
4174        let mut oldest_pda: Option<String> = None;
4175        let mut oldest_timestamp = i64::MAX;
4176
4177        // Find the PDA with the oldest update
4178        for entry in state.pending_updates.iter() {
4179            let (pda, updates) = entry.pair();
4180            if let Some(update) = updates.first() {
4181                if update.queued_at < oldest_timestamp {
4182                    oldest_timestamp = update.queued_at;
4183                    oldest_pda = Some(pda.clone());
4184                }
4185            }
4186        }
4187
4188        // Remove the oldest update
4189        if let Some(pda) = oldest_pda {
4190            if let Some(mut updates) = state.pending_updates.get_mut(&pda) {
4191                if !updates.is_empty() {
4192                    updates.remove(0);
4193                    self.pending_queue_size = self.pending_queue_size.saturating_sub(1);
4194
4195                    // Remove the entry if it's now empty
4196                    if updates.is_empty() {
4197                        drop(updates);
4198                        state.pending_updates.remove(&pda);
4199                    }
4200                }
4201            }
4202        }
4203
4204        Ok(())
4205    }
4206
4207    /// Flush and return pending updates for a PDA for external reprocessing
4208    ///
4209    /// Returns the pending updates that were queued for this PDA address.
4210    /// The caller should reprocess these through the VM using process_event().
4211    fn flush_pending_updates(
4212        &mut self,
4213        state_id: u32,
4214        pda_address: &str,
4215    ) -> Result<Vec<PendingAccountUpdate>> {
4216        let state = self
4217            .states
4218            .get_mut(&state_id)
4219            .ok_or("State table not found")?;
4220
4221        if let Some((_, pending_updates)) = state.pending_updates.remove(pda_address) {
4222            let count = pending_updates.len();
4223            self.pending_queue_size = self.pending_queue_size.saturating_sub(count as u64);
4224            #[cfg(feature = "otel")]
4225            crate::vm_metrics::record_pending_updates_flushed(count as u64, &state.entity_name);
4226            Ok(pending_updates)
4227        } else {
4228            Ok(Vec::new())
4229        }
4230    }
4231
4232    /// Cache the most recent account data for a PDA address.
4233    /// Called by the vixen runtime after a Lookup-handler account update is
4234    /// successfully processed.  When a PDA mapping later changes (e.g. at a
4235    /// round boundary), the cached data is returned for reprocessing with the
4236    /// new mapping.
4237    pub fn cache_last_account_data(
4238        &mut self,
4239        state_id: u32,
4240        pda_address: &str,
4241        update: PendingAccountUpdate,
4242    ) {
4243        if let Some(state) = self.states.get(&state_id) {
4244            state
4245                .last_account_data
4246                .insert(pda_address.to_string(), update);
4247        }
4248    }
4249
4250    /// Try to resolve a primary key via PDA reverse lookup
4251    pub fn try_pda_reverse_lookup(
4252        &mut self,
4253        state_id: u32,
4254        lookup_name: &str,
4255        pda_address: &str,
4256    ) -> Option<String> {
4257        let state = self.states.get_mut(&state_id)?;
4258
4259        if let Some(lookup) = state.pda_reverse_lookups.get_mut(lookup_name) {
4260            if let Some(value) = lookup.lookup(pda_address) {
4261                self.pda_cache_hits += 1;
4262                return Some(value);
4263            }
4264        }
4265
4266        self.pda_cache_misses += 1;
4267        None
4268    }
4269
4270    /// Try to resolve a value through lookup indexes.
4271    /// This attempts to resolve the value using any lookup index in the state.
4272    pub fn try_lookup_index_resolution(&self, state_id: u32, value: &Value) -> Option<Value> {
4273        let state = self.states.get(&state_id)?;
4274
4275        for index in state.lookup_indexes.values() {
4276            if let Some(resolved) = index.lookup(value) {
4277                return Some(resolved);
4278            }
4279        }
4280
4281        None
4282    }
4283
4284    /// Try to resolve a primary key via chained PDA + lookup index resolution.
4285    /// First tries PDA reverse lookup, then tries to resolve the result through lookup indexes.
4286    /// This is useful when the PDA maps to an intermediate value (e.g., round address)
4287    /// that needs to be resolved to the actual primary key (e.g., round_id).
4288    pub fn try_chained_pda_lookup(
4289        &mut self,
4290        state_id: u32,
4291        lookup_name: &str,
4292        pda_address: &str,
4293    ) -> Option<String> {
4294        // First, try PDA reverse lookup
4295        let pda_result = self.try_pda_reverse_lookup(state_id, lookup_name, pda_address)?;
4296
4297        // Try to resolve the PDA result through lookup indexes
4298        // (e.g., round address -> round_id)
4299        let pda_value = Value::String(pda_result.clone());
4300        if let Some(resolved) = self.try_lookup_index_resolution(state_id, &pda_value) {
4301            resolved.as_str().map(|s| s.to_string())
4302        } else {
4303            // Return the PDA result if we can't resolve further
4304            pda_value.as_str().map(|s| s.to_string())
4305        }
4306    }
4307
4308    // ============================================================================
4309    // Computed Expression Evaluator (Task 5)
4310    // ============================================================================
4311
4312    /// Evaluate a computed expression AST against the current state
4313    /// This is the core runtime evaluator for computed fields from the AST
4314    pub fn evaluate_computed_expr(&self, expr: &ComputedExpr, state: &Value) -> Result<Value> {
4315        self.evaluate_computed_expr_with_env(expr, state, &std::collections::HashMap::new())
4316    }
4317
4318    /// Evaluate a computed expression with a variable environment (for let bindings)
4319    fn evaluate_computed_expr_with_env(
4320        &self,
4321        expr: &ComputedExpr,
4322        state: &Value,
4323        env: &std::collections::HashMap<String, Value>,
4324    ) -> Result<Value> {
4325        match expr {
4326            ComputedExpr::FieldRef { path } => self.get_field_from_state(state, path),
4327
4328            ComputedExpr::Var { name } => env
4329                .get(name)
4330                .cloned()
4331                .ok_or_else(|| format!("Undefined variable: {}", name).into()),
4332
4333            ComputedExpr::Let { name, value, body } => {
4334                let val = self.evaluate_computed_expr_with_env(value, state, env)?;
4335                let mut new_env = env.clone();
4336                new_env.insert(name.clone(), val);
4337                self.evaluate_computed_expr_with_env(body, state, &new_env)
4338            }
4339
4340            ComputedExpr::If {
4341                condition,
4342                then_branch,
4343                else_branch,
4344            } => {
4345                let cond_val = self.evaluate_computed_expr_with_env(condition, state, env)?;
4346                if self.value_to_bool(&cond_val) {
4347                    self.evaluate_computed_expr_with_env(then_branch, state, env)
4348                } else {
4349                    self.evaluate_computed_expr_with_env(else_branch, state, env)
4350                }
4351            }
4352
4353            ComputedExpr::None => Ok(Value::Null),
4354
4355            ComputedExpr::Some { value } => self.evaluate_computed_expr_with_env(value, state, env),
4356
4357            ComputedExpr::Slice { expr, start, end } => {
4358                let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
4359                match val {
4360                    Value::Array(arr) => {
4361                        let slice: Vec<Value> = arr.get(*start..*end).unwrap_or(&[]).to_vec();
4362                        Ok(Value::Array(slice))
4363                    }
4364                    _ => Err(format!("Cannot slice non-array value: {:?}", val).into()),
4365                }
4366            }
4367
4368            ComputedExpr::Index { expr, index } => {
4369                let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
4370                match val {
4371                    Value::Array(arr) => Ok(arr.get(*index).cloned().unwrap_or(Value::Null)),
4372                    _ => Err(format!("Cannot index non-array value: {:?}", val).into()),
4373                }
4374            }
4375
4376            ComputedExpr::U64FromLeBytes { bytes } => {
4377                let val = self.evaluate_computed_expr_with_env(bytes, state, env)?;
4378                let byte_vec = self.value_to_bytes(&val)?;
4379                if byte_vec.len() < 8 {
4380                    return Err(format!(
4381                        "u64::from_le_bytes requires 8 bytes, got {}",
4382                        byte_vec.len()
4383                    )
4384                    .into());
4385                }
4386                let arr: [u8; 8] = byte_vec[..8]
4387                    .try_into()
4388                    .map_err(|_| "Failed to convert to [u8; 8]")?;
4389                Ok(Value::Number(serde_json::Number::from(u64::from_le_bytes(
4390                    arr,
4391                ))))
4392            }
4393
4394            ComputedExpr::U64FromBeBytes { bytes } => {
4395                let val = self.evaluate_computed_expr_with_env(bytes, state, env)?;
4396                let byte_vec = self.value_to_bytes(&val)?;
4397                if byte_vec.len() < 8 {
4398                    return Err(format!(
4399                        "u64::from_be_bytes requires 8 bytes, got {}",
4400                        byte_vec.len()
4401                    )
4402                    .into());
4403                }
4404                let arr: [u8; 8] = byte_vec[..8]
4405                    .try_into()
4406                    .map_err(|_| "Failed to convert to [u8; 8]")?;
4407                Ok(Value::Number(serde_json::Number::from(u64::from_be_bytes(
4408                    arr,
4409                ))))
4410            }
4411
4412            ComputedExpr::ByteArray { bytes } => {
4413                Ok(Value::Array(bytes.iter().map(|b| json!(*b)).collect()))
4414            }
4415
4416            ComputedExpr::Closure { param, body } => {
4417                // Closures are stored as-is; they're evaluated when used in map()
4418                // Return a special representation
4419                Ok(json!({
4420                    "__closure": {
4421                        "param": param,
4422                        "body": serde_json::to_value(body).unwrap_or(Value::Null)
4423                    }
4424                }))
4425            }
4426
4427            ComputedExpr::Unary { op, expr } => {
4428                let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
4429                self.apply_unary_op(op, &val)
4430            }
4431
4432            ComputedExpr::JsonToBytes { expr } => {
4433                let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
4434                // Convert JSON array of numbers to byte array
4435                let bytes = self.value_to_bytes(&val)?;
4436                Ok(Value::Array(bytes.iter().map(|b| json!(*b)).collect()))
4437            }
4438
4439            ComputedExpr::UnwrapOr { expr, default } => {
4440                let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
4441                if val.is_null() {
4442                    Ok(default.clone())
4443                } else {
4444                    Ok(val)
4445                }
4446            }
4447
4448            ComputedExpr::Binary { op, left, right } => {
4449                let l = self.evaluate_computed_expr_with_env(left, state, env)?;
4450                let r = self.evaluate_computed_expr_with_env(right, state, env)?;
4451                self.apply_binary_op(op, &l, &r)
4452            }
4453
4454            ComputedExpr::Cast { expr, to_type } => {
4455                let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
4456                self.apply_cast(&val, to_type)
4457            }
4458
4459            ComputedExpr::ResolverComputed {
4460                resolver,
4461                method,
4462                args,
4463            } => {
4464                let evaluated_args: Vec<Value> = args
4465                    .iter()
4466                    .map(|arg| self.evaluate_computed_expr_with_env(arg, state, env))
4467                    .collect::<Result<Vec<_>>>()?;
4468                crate::resolvers::evaluate_resolver_computed(resolver, method, &evaluated_args)
4469            }
4470
4471            ComputedExpr::MethodCall { expr, method, args } => {
4472                let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
4473                // Special handling for map() with closures
4474                if method == "map" && args.len() == 1 {
4475                    if let ComputedExpr::Closure { param, body } = &args[0] {
4476                        // If the value is null, return null (Option::None.map returns None)
4477                        if val.is_null() {
4478                            return Ok(Value::Null);
4479                        }
4480
4481                        if let Value::Array(arr) = &val {
4482                            let results: Result<Vec<Value>> = arr
4483                                .iter()
4484                                .map(|elem| {
4485                                    let mut closure_env = env.clone();
4486                                    closure_env.insert(param.clone(), elem.clone());
4487                                    self.evaluate_computed_expr_with_env(body, state, &closure_env)
4488                                })
4489                                .collect();
4490                            return Ok(Value::Array(results?));
4491                        }
4492
4493                        let mut closure_env = env.clone();
4494                        closure_env.insert(param.clone(), val);
4495                        return self.evaluate_computed_expr_with_env(body, state, &closure_env);
4496                    }
4497                }
4498                let evaluated_args: Vec<Value> = args
4499                    .iter()
4500                    .map(|a| self.evaluate_computed_expr_with_env(a, state, env))
4501                    .collect::<Result<Vec<_>>>()?;
4502                self.apply_method_call(&val, method, &evaluated_args)
4503            }
4504
4505            ComputedExpr::Literal { value } => Ok(value.clone()),
4506
4507            ComputedExpr::Paren { expr } => self.evaluate_computed_expr_with_env(expr, state, env),
4508
4509            ComputedExpr::ContextSlot => Ok(self
4510                .current_context
4511                .as_ref()
4512                .and_then(|ctx| ctx.slot)
4513                .map(|s| json!(s))
4514                .unwrap_or(Value::Null)),
4515
4516            ComputedExpr::ContextTimestamp => Ok(self
4517                .current_context
4518                .as_ref()
4519                .map(|ctx| json!(ctx.timestamp()))
4520                .unwrap_or(Value::Null)),
4521
4522            ComputedExpr::Keccak256 { expr } => {
4523                let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
4524                let bytes = self.value_to_bytes(&val)?;
4525                use sha3::{Digest, Keccak256};
4526                let hash = Keccak256::digest(&bytes);
4527                Ok(Value::Array(
4528                    hash.to_vec().iter().map(|b| json!(*b)).collect(),
4529                ))
4530            }
4531        }
4532    }
4533
4534    /// Convert a JSON value to a byte vector
4535    fn value_to_bytes(&self, val: &Value) -> Result<Vec<u8>> {
4536        match val {
4537            Value::Array(arr) => arr
4538                .iter()
4539                .map(|v| {
4540                    v.as_u64()
4541                        .map(|n| n as u8)
4542                        .ok_or_else(|| "Array element not a valid byte".into())
4543                })
4544                .collect(),
4545            Value::String(s) => {
4546                // Try to decode as hex
4547                if s.starts_with("0x") || s.starts_with("0X") {
4548                    hex::decode(&s[2..]).map_err(|e| format!("Invalid hex string: {}", e).into())
4549                } else {
4550                    hex::decode(s).map_err(|e| format!("Invalid hex string: {}", e).into())
4551                }
4552            }
4553            _ => Err(format!("Cannot convert {:?} to bytes", val).into()),
4554        }
4555    }
4556
4557    /// Apply a unary operation
4558    fn apply_unary_op(&self, op: &crate::ast::UnaryOp, val: &Value) -> Result<Value> {
4559        use crate::ast::UnaryOp;
4560        match op {
4561            UnaryOp::Not => Ok(json!(!self.value_to_bool(val))),
4562            UnaryOp::ReverseBits => match val.as_u64() {
4563                Some(n) => Ok(json!(n.reverse_bits())),
4564                None => match val.as_i64() {
4565                    Some(n) => Ok(json!((n as u64).reverse_bits())),
4566                    None => Err("reverse_bits requires an integer".into()),
4567                },
4568            },
4569        }
4570    }
4571
4572    /// Get a field value from state by path (e.g., "section.field" or just "field")
4573    fn get_field_from_state(&self, state: &Value, path: &str) -> Result<Value> {
4574        let segments: Vec<&str> = path.split('.').collect();
4575        let mut current = state;
4576
4577        for segment in segments {
4578            match current.get(segment) {
4579                Some(v) => current = v,
4580                None => return Ok(Value::Null),
4581            }
4582        }
4583
4584        Ok(current.clone())
4585    }
4586
4587    /// Apply a binary operation to two values
4588    fn apply_binary_op(&self, op: &BinaryOp, left: &Value, right: &Value) -> Result<Value> {
4589        match op {
4590            // Arithmetic operations
4591            BinaryOp::Add => self.numeric_op(left, right, |a, b| a + b, |a, b| a + b),
4592            BinaryOp::Sub => self.numeric_op(left, right, |a, b| a - b, |a, b| a - b),
4593            BinaryOp::Mul => self.numeric_op(left, right, |a, b| a * b, |a, b| a * b),
4594            BinaryOp::Div => {
4595                // Check for division by zero
4596                if let Some(r) = right.as_i64() {
4597                    if r == 0 {
4598                        return Err("Division by zero".into());
4599                    }
4600                }
4601                if let Some(r) = right.as_f64() {
4602                    if r == 0.0 {
4603                        return Err("Division by zero".into());
4604                    }
4605                }
4606                self.numeric_op(left, right, |a, b| a / b, |a, b| a / b)
4607            }
4608            BinaryOp::Mod => {
4609                // Modulo - only for integers
4610                match (left.as_i64(), right.as_i64()) {
4611                    (Some(a), Some(b)) if b != 0 => Ok(json!(a % b)),
4612                    (None, _) | (_, None) => match (left.as_u64(), right.as_u64()) {
4613                        (Some(a), Some(b)) if b != 0 => Ok(json!(a % b)),
4614                        _ => Err("Modulo requires non-zero integer operands".into()),
4615                    },
4616                    _ => Err("Modulo by zero".into()),
4617                }
4618            }
4619
4620            // Comparison operations
4621            BinaryOp::Gt => self.comparison_op(left, right, |a, b| a > b, |a, b| a > b),
4622            BinaryOp::Lt => self.comparison_op(left, right, |a, b| a < b, |a, b| a < b),
4623            BinaryOp::Gte => self.comparison_op(left, right, |a, b| a >= b, |a, b| a >= b),
4624            BinaryOp::Lte => self.comparison_op(left, right, |a, b| a <= b, |a, b| a <= b),
4625            BinaryOp::Eq => Ok(json!(left == right)),
4626            BinaryOp::Ne => Ok(json!(left != right)),
4627
4628            // Logical operations
4629            BinaryOp::And => {
4630                let l_bool = self.value_to_bool(left);
4631                let r_bool = self.value_to_bool(right);
4632                Ok(json!(l_bool && r_bool))
4633            }
4634            BinaryOp::Or => {
4635                let l_bool = self.value_to_bool(left);
4636                let r_bool = self.value_to_bool(right);
4637                Ok(json!(l_bool || r_bool))
4638            }
4639
4640            // Bitwise operations
4641            BinaryOp::Xor => match (left.as_u64(), right.as_u64()) {
4642                (Some(a), Some(b)) => Ok(json!(a ^ b)),
4643                _ => match (left.as_i64(), right.as_i64()) {
4644                    (Some(a), Some(b)) => Ok(json!(a ^ b)),
4645                    _ => Err("XOR requires integer operands".into()),
4646                },
4647            },
4648            BinaryOp::BitAnd => match (left.as_u64(), right.as_u64()) {
4649                (Some(a), Some(b)) => Ok(json!(a & b)),
4650                _ => match (left.as_i64(), right.as_i64()) {
4651                    (Some(a), Some(b)) => Ok(json!(a & b)),
4652                    _ => Err("BitAnd requires integer operands".into()),
4653                },
4654            },
4655            BinaryOp::BitOr => match (left.as_u64(), right.as_u64()) {
4656                (Some(a), Some(b)) => Ok(json!(a | b)),
4657                _ => match (left.as_i64(), right.as_i64()) {
4658                    (Some(a), Some(b)) => Ok(json!(a | b)),
4659                    _ => Err("BitOr requires integer operands".into()),
4660                },
4661            },
4662            BinaryOp::Shl => match (left.as_u64(), right.as_u64()) {
4663                (Some(a), Some(b)) => Ok(json!(a << b)),
4664                _ => match (left.as_i64(), right.as_i64()) {
4665                    (Some(a), Some(b)) => Ok(json!(a << b)),
4666                    _ => Err("Shl requires integer operands".into()),
4667                },
4668            },
4669            BinaryOp::Shr => match (left.as_u64(), right.as_u64()) {
4670                (Some(a), Some(b)) => Ok(json!(a >> b)),
4671                _ => match (left.as_i64(), right.as_i64()) {
4672                    (Some(a), Some(b)) => Ok(json!(a >> b)),
4673                    _ => Err("Shr requires integer operands".into()),
4674                },
4675            },
4676        }
4677    }
4678
4679    /// Helper for numeric operations that can work on integers or floats
4680    fn numeric_op<F1, F2>(
4681        &self,
4682        left: &Value,
4683        right: &Value,
4684        int_op: F1,
4685        float_op: F2,
4686    ) -> Result<Value>
4687    where
4688        F1: Fn(i64, i64) -> i64,
4689        F2: Fn(f64, f64) -> f64,
4690    {
4691        // Try i64 first
4692        if let (Some(a), Some(b)) = (left.as_i64(), right.as_i64()) {
4693            return Ok(json!(int_op(a, b)));
4694        }
4695
4696        // Try u64
4697        if let (Some(a), Some(b)) = (left.as_u64(), right.as_u64()) {
4698            // For u64, we need to be careful with underflow in subtraction
4699            return Ok(json!(int_op(a as i64, b as i64)));
4700        }
4701
4702        // Try f64
4703        if let (Some(a), Some(b)) = (left.as_f64(), right.as_f64()) {
4704            return Ok(json!(float_op(a, b)));
4705        }
4706
4707        // If either is null, return null
4708        if left.is_null() || right.is_null() {
4709            return Ok(Value::Null);
4710        }
4711
4712        Err(format!(
4713            "Cannot perform numeric operation on {:?} and {:?}",
4714            left, right
4715        )
4716        .into())
4717    }
4718
4719    /// Helper for comparison operations
4720    fn comparison_op<F1, F2>(
4721        &self,
4722        left: &Value,
4723        right: &Value,
4724        int_cmp: F1,
4725        float_cmp: F2,
4726    ) -> Result<Value>
4727    where
4728        F1: Fn(i64, i64) -> bool,
4729        F2: Fn(f64, f64) -> bool,
4730    {
4731        // Try i64 first
4732        if let (Some(a), Some(b)) = (left.as_i64(), right.as_i64()) {
4733            return Ok(json!(int_cmp(a, b)));
4734        }
4735
4736        // Try u64
4737        if let (Some(a), Some(b)) = (left.as_u64(), right.as_u64()) {
4738            return Ok(json!(int_cmp(a as i64, b as i64)));
4739        }
4740
4741        // Try f64
4742        if let (Some(a), Some(b)) = (left.as_f64(), right.as_f64()) {
4743            return Ok(json!(float_cmp(a, b)));
4744        }
4745
4746        // If either is null, comparison returns false
4747        if left.is_null() || right.is_null() {
4748            return Ok(json!(false));
4749        }
4750
4751        Err(format!("Cannot compare {:?} and {:?}", left, right).into())
4752    }
4753
4754    /// Convert a value to boolean for logical operations
4755    fn value_to_bool(&self, value: &Value) -> bool {
4756        match value {
4757            Value::Null => false,
4758            Value::Bool(b) => *b,
4759            Value::Number(n) => {
4760                if let Some(i) = n.as_i64() {
4761                    i != 0
4762                } else if let Some(f) = n.as_f64() {
4763                    f != 0.0
4764                } else {
4765                    true
4766                }
4767            }
4768            Value::String(s) => !s.is_empty(),
4769            Value::Array(arr) => !arr.is_empty(),
4770            Value::Object(obj) => !obj.is_empty(),
4771        }
4772    }
4773
4774    /// Apply a type cast to a value
4775    fn apply_cast(&self, value: &Value, to_type: &str) -> Result<Value> {
4776        match to_type {
4777            "i8" | "i16" | "i32" | "i64" | "isize" => {
4778                if let Some(n) = value.as_i64() {
4779                    Ok(json!(n))
4780                } else if let Some(n) = value.as_u64() {
4781                    Ok(json!(n as i64))
4782                } else if let Some(n) = value.as_f64() {
4783                    Ok(json!(n as i64))
4784                } else if let Some(s) = value.as_str() {
4785                    s.parse::<i64>()
4786                        .map(|n| json!(n))
4787                        .map_err(|e| format!("Cannot parse '{}' as integer: {}", s, e).into())
4788                } else {
4789                    Err(format!("Cannot cast {:?} to {}", value, to_type).into())
4790                }
4791            }
4792            "u8" | "u16" | "u32" | "u64" | "usize" => {
4793                if let Some(n) = value.as_u64() {
4794                    Ok(json!(n))
4795                } else if let Some(n) = value.as_i64() {
4796                    Ok(json!(n as u64))
4797                } else if let Some(n) = value.as_f64() {
4798                    Ok(json!(n as u64))
4799                } else if let Some(s) = value.as_str() {
4800                    s.parse::<u64>().map(|n| json!(n)).map_err(|e| {
4801                        format!("Cannot parse '{}' as unsigned integer: {}", s, e).into()
4802                    })
4803                } else {
4804                    Err(format!("Cannot cast {:?} to {}", value, to_type).into())
4805                }
4806            }
4807            "f32" | "f64" => {
4808                if let Some(n) = value.as_f64() {
4809                    Ok(json!(n))
4810                } else if let Some(n) = value.as_i64() {
4811                    Ok(json!(n as f64))
4812                } else if let Some(n) = value.as_u64() {
4813                    Ok(json!(n as f64))
4814                } else if let Some(s) = value.as_str() {
4815                    s.parse::<f64>()
4816                        .map(|n| json!(n))
4817                        .map_err(|e| format!("Cannot parse '{}' as float: {}", s, e).into())
4818                } else {
4819                    Err(format!("Cannot cast {:?} to {}", value, to_type).into())
4820                }
4821            }
4822            "String" | "string" => Ok(json!(value.to_string())),
4823            "bool" => Ok(json!(self.value_to_bool(value))),
4824            _ => {
4825                // Unknown type, return value as-is
4826                Ok(value.clone())
4827            }
4828        }
4829    }
4830
4831    /// Apply a method call to a value
4832    fn apply_method_call(&self, value: &Value, method: &str, args: &[Value]) -> Result<Value> {
4833        match method {
4834            "unwrap_or" => {
4835                if value.is_null() && !args.is_empty() {
4836                    Ok(args[0].clone())
4837                } else {
4838                    Ok(value.clone())
4839                }
4840            }
4841            "unwrap_or_default" => {
4842                if value.is_null() {
4843                    // Return default for common types
4844                    Ok(json!(0))
4845                } else {
4846                    Ok(value.clone())
4847                }
4848            }
4849            "is_some" => Ok(json!(!value.is_null())),
4850            "is_none" => Ok(json!(value.is_null())),
4851            "abs" => {
4852                if let Some(n) = value.as_i64() {
4853                    Ok(json!(n.abs()))
4854                } else if let Some(n) = value.as_f64() {
4855                    Ok(json!(n.abs()))
4856                } else {
4857                    Err(format!("Cannot call abs() on {:?}", value).into())
4858                }
4859            }
4860            "len" => {
4861                if let Some(s) = value.as_str() {
4862                    Ok(json!(s.len()))
4863                } else if let Some(arr) = value.as_array() {
4864                    Ok(json!(arr.len()))
4865                } else if let Some(obj) = value.as_object() {
4866                    Ok(json!(obj.len()))
4867                } else {
4868                    Err(format!("Cannot call len() on {:?}", value).into())
4869                }
4870            }
4871            "to_string" => Ok(json!(value.to_string())),
4872            "min" => {
4873                if args.is_empty() {
4874                    return Err("min() requires an argument".into());
4875                }
4876                let other = &args[0];
4877                if let (Some(a), Some(b)) = (value.as_i64(), other.as_i64()) {
4878                    Ok(json!(a.min(b)))
4879                } else if let (Some(a), Some(b)) = (value.as_f64(), other.as_f64()) {
4880                    Ok(json!(a.min(b)))
4881                } else {
4882                    Err(format!("Cannot call min() on {:?} and {:?}", value, other).into())
4883                }
4884            }
4885            "max" => {
4886                if args.is_empty() {
4887                    return Err("max() requires an argument".into());
4888                }
4889                let other = &args[0];
4890                if let (Some(a), Some(b)) = (value.as_i64(), other.as_i64()) {
4891                    Ok(json!(a.max(b)))
4892                } else if let (Some(a), Some(b)) = (value.as_f64(), other.as_f64()) {
4893                    Ok(json!(a.max(b)))
4894                } else {
4895                    Err(format!("Cannot call max() on {:?} and {:?}", value, other).into())
4896                }
4897            }
4898            "saturating_add" => {
4899                if args.is_empty() {
4900                    return Err("saturating_add() requires an argument".into());
4901                }
4902                let other = &args[0];
4903                if let (Some(a), Some(b)) = (value.as_i64(), other.as_i64()) {
4904                    Ok(json!(a.saturating_add(b)))
4905                } else if let (Some(a), Some(b)) = (value.as_u64(), other.as_u64()) {
4906                    Ok(json!(a.saturating_add(b)))
4907                } else {
4908                    Err(format!(
4909                        "Cannot call saturating_add() on {:?} and {:?}",
4910                        value, other
4911                    )
4912                    .into())
4913                }
4914            }
4915            "saturating_sub" => {
4916                if args.is_empty() {
4917                    return Err("saturating_sub() requires an argument".into());
4918                }
4919                let other = &args[0];
4920                if let (Some(a), Some(b)) = (value.as_i64(), other.as_i64()) {
4921                    Ok(json!(a.saturating_sub(b)))
4922                } else if let (Some(a), Some(b)) = (value.as_u64(), other.as_u64()) {
4923                    Ok(json!(a.saturating_sub(b)))
4924                } else {
4925                    Err(format!(
4926                        "Cannot call saturating_sub() on {:?} and {:?}",
4927                        value, other
4928                    )
4929                    .into())
4930                }
4931            }
4932            _ => Err(format!("Unknown method call: {}()", method).into()),
4933        }
4934    }
4935
4936    /// Evaluate all computed fields for an entity and update the state
4937    /// This takes a list of ComputedFieldSpec from the AST and applies them
4938    pub fn evaluate_computed_fields_from_ast(
4939        &self,
4940        state: &mut Value,
4941        computed_field_specs: &[ComputedFieldSpec],
4942    ) -> Result<Vec<String>> {
4943        let mut updated_paths = Vec::new();
4944
4945        crate::resolvers::validate_resolver_computed_specs(computed_field_specs)?;
4946
4947        for spec in computed_field_specs {
4948            match self.evaluate_computed_expr(&spec.expression, state) {
4949                Ok(result) => {
4950                    self.set_field_in_state(state, &spec.target_path, result)?;
4951                    updated_paths.push(spec.target_path.clone());
4952                }
4953                Err(e) => {
4954                    tracing::warn!(
4955                        target_path = %spec.target_path,
4956                        error = %e,
4957                        "Failed to evaluate computed field"
4958                    );
4959                }
4960            }
4961        }
4962
4963        Ok(updated_paths)
4964    }
4965
4966    /// Set a field value in state by path (e.g., "section.field")
4967    fn set_field_in_state(&self, state: &mut Value, path: &str, value: Value) -> Result<()> {
4968        let segments: Vec<&str> = path.split('.').collect();
4969
4970        if segments.is_empty() {
4971            return Err("Empty path".into());
4972        }
4973
4974        // Navigate to parent, creating intermediate objects as needed
4975        let mut current = state;
4976        for (i, segment) in segments.iter().enumerate() {
4977            if i == segments.len() - 1 {
4978                // Last segment - set the value
4979                if let Some(obj) = current.as_object_mut() {
4980                    obj.insert(segment.to_string(), value);
4981                    return Ok(());
4982                } else {
4983                    return Err(format!("Cannot set field '{}' on non-object", segment).into());
4984                }
4985            } else {
4986                // Intermediate segment - navigate or create
4987                if !current.is_object() {
4988                    *current = json!({});
4989                }
4990                let obj = current.as_object_mut().unwrap();
4991                current = obj.entry(segment.to_string()).or_insert_with(|| json!({}));
4992            }
4993        }
4994
4995        Ok(())
4996    }
4997
4998    /// Create a computed fields evaluator closure from AST specs
4999    /// This returns a function that can be passed to the bytecode builder
5000    pub fn create_evaluator_from_specs(
5001        specs: Vec<ComputedFieldSpec>,
5002    ) -> impl Fn(&mut Value, Option<u64>, i64) -> Result<()> + Send + Sync + 'static {
5003        move |state: &mut Value, context_slot: Option<u64>, context_timestamp: i64| {
5004            let mut vm = VmContext::new();
5005            vm.current_context = Some(UpdateContext {
5006                slot: context_slot,
5007                timestamp: Some(context_timestamp),
5008                ..Default::default()
5009            });
5010            vm.evaluate_computed_fields_from_ast(state, &specs)?;
5011            Ok(())
5012        }
5013    }
5014}
5015
5016impl Default for VmContext {
5017    fn default() -> Self {
5018        Self::new()
5019    }
5020}
5021
5022// Implement the ReverseLookupUpdater trait for VmContext
5023impl crate::resolvers::ReverseLookupUpdater for VmContext {
5024    fn update(&mut self, pda_address: String, seed_value: String) -> Vec<PendingAccountUpdate> {
5025        // Use default state_id=0 and default lookup name
5026        self.update_pda_reverse_lookup(0, "default_pda_lookup", pda_address, seed_value)
5027            .unwrap_or_else(|e| {
5028                tracing::error!("Failed to update PDA reverse lookup: {}", e);
5029                Vec::new()
5030            })
5031    }
5032
5033    fn flush_pending(&mut self, pda_address: &str) -> Vec<PendingAccountUpdate> {
5034        // Flush is handled inside update_pda_reverse_lookup, but we can also call it directly
5035        self.flush_pending_updates(0, pda_address)
5036            .unwrap_or_else(|e| {
5037                tracing::error!("Failed to flush pending updates: {}", e);
5038                Vec::new()
5039            })
5040    }
5041}
5042
5043#[cfg(test)]
5044mod tests {
5045    use super::*;
5046    use crate::ast::{BinaryOp, ComputedExpr, ComputedFieldSpec};
5047
5048    #[test]
5049    fn test_computed_field_preserves_integer_type() {
5050        let vm = VmContext::new();
5051
5052        let mut state = serde_json::json!({
5053            "trading": {
5054                "total_buy_volume": 20000000000_i64,
5055                "total_sell_volume": 17951316474_i64
5056            }
5057        });
5058
5059        let spec = ComputedFieldSpec {
5060            target_path: "trading.total_volume".to_string(),
5061            result_type: "Option<u64>".to_string(),
5062            expression: ComputedExpr::Binary {
5063                op: BinaryOp::Add,
5064                left: Box::new(ComputedExpr::UnwrapOr {
5065                    expr: Box::new(ComputedExpr::FieldRef {
5066                        path: "trading.total_buy_volume".to_string(),
5067                    }),
5068                    default: serde_json::json!(0),
5069                }),
5070                right: Box::new(ComputedExpr::UnwrapOr {
5071                    expr: Box::new(ComputedExpr::FieldRef {
5072                        path: "trading.total_sell_volume".to_string(),
5073                    }),
5074                    default: serde_json::json!(0),
5075                }),
5076            },
5077        };
5078
5079        vm.evaluate_computed_fields_from_ast(&mut state, &[spec])
5080            .unwrap();
5081
5082        let total_volume = state
5083            .get("trading")
5084            .and_then(|t| t.get("total_volume"))
5085            .expect("total_volume should exist");
5086
5087        let serialized = serde_json::to_string(total_volume).unwrap();
5088        assert!(
5089            !serialized.contains('.'),
5090            "Integer should not have decimal point: {}",
5091            serialized
5092        );
5093        assert_eq!(
5094            total_volume.as_i64(),
5095            Some(37951316474),
5096            "Value should be correct sum"
5097        );
5098    }
5099
5100    #[test]
5101    fn test_set_field_sum_preserves_integer_type() {
5102        let mut vm = VmContext::new();
5103        vm.registers[0] = serde_json::json!({});
5104        vm.registers[1] = serde_json::json!(20000000000_i64);
5105        vm.registers[2] = serde_json::json!(17951316474_i64);
5106
5107        vm.set_field_sum(0, "trading.total_buy_volume", 1).unwrap();
5108        vm.set_field_sum(0, "trading.total_sell_volume", 2).unwrap();
5109
5110        let state = &vm.registers[0];
5111        let buy_vol = state
5112            .get("trading")
5113            .and_then(|t| t.get("total_buy_volume"))
5114            .unwrap();
5115        let sell_vol = state
5116            .get("trading")
5117            .and_then(|t| t.get("total_sell_volume"))
5118            .unwrap();
5119
5120        let buy_serialized = serde_json::to_string(buy_vol).unwrap();
5121        let sell_serialized = serde_json::to_string(sell_vol).unwrap();
5122
5123        assert!(
5124            !buy_serialized.contains('.'),
5125            "Buy volume should not have decimal: {}",
5126            buy_serialized
5127        );
5128        assert!(
5129            !sell_serialized.contains('.'),
5130            "Sell volume should not have decimal: {}",
5131            sell_serialized
5132        );
5133    }
5134
5135    #[test]
5136    fn test_lookup_index_chaining() {
5137        let mut vm = VmContext::new();
5138
5139        let state = vm.states.get_mut(&0).unwrap();
5140
5141        state
5142            .pda_reverse_lookups
5143            .entry("default_pda_lookup".to_string())
5144            .or_insert_with(|| PdaReverseLookup::new(1000))
5145            .insert("pda_123".to_string(), "addr_456".to_string());
5146
5147        state
5148            .lookup_indexes
5149            .entry("round_address_lookup_index".to_string())
5150            .or_insert_with(LookupIndex::new)
5151            .insert(json!("addr_456"), json!(789));
5152
5153        let handler = vec![
5154            OpCode::LoadConstant {
5155                value: json!("pda_123"),
5156                dest: 0,
5157            },
5158            OpCode::LookupIndex {
5159                state_id: 0,
5160                index_name: "round_address_lookup_index".to_string(),
5161                lookup_value: 0,
5162                dest: 1,
5163            },
5164        ];
5165
5166        vm.execute_handler(&handler, &json!({}), "test", 0, "TestEntity", None, None)
5167            .unwrap();
5168
5169        assert_eq!(vm.registers[1], json!(789));
5170    }
5171
5172    #[test]
5173    fn test_lookup_index_no_chain() {
5174        let mut vm = VmContext::new();
5175
5176        let state = vm.states.get_mut(&0).unwrap();
5177        state
5178            .lookup_indexes
5179            .entry("test_index".to_string())
5180            .or_insert_with(LookupIndex::new)
5181            .insert(json!("key_abc"), json!(42));
5182
5183        let handler = vec![
5184            OpCode::LoadConstant {
5185                value: json!("key_abc"),
5186                dest: 0,
5187            },
5188            OpCode::LookupIndex {
5189                state_id: 0,
5190                index_name: "test_index".to_string(),
5191                lookup_value: 0,
5192                dest: 1,
5193            },
5194        ];
5195
5196        vm.execute_handler(&handler, &json!({}), "test", 0, "TestEntity", None, None)
5197            .unwrap();
5198
5199        assert_eq!(vm.registers[1], json!(42));
5200    }
5201
5202    #[test]
5203    fn test_conditional_set_field_with_zero_array() {
5204        let mut vm = VmContext::new();
5205
5206        let event_zeros = json!({
5207            "value": [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
5208                      0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
5209        });
5210
5211        let event_nonzero = json!({
5212            "value": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16,
5213                      17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32]
5214        });
5215
5216        let zero_32: Value = json!([
5217            0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
5218            0, 0, 0
5219        ]);
5220
5221        let handler = vec![
5222            OpCode::CreateObject { dest: 2 },
5223            OpCode::LoadEventField {
5224                path: FieldPath::new(&["value"]),
5225                dest: 10,
5226                default: None,
5227            },
5228            OpCode::ConditionalSetField {
5229                object: 2,
5230                path: "captured_value".to_string(),
5231                value: 10,
5232                condition_field: FieldPath::new(&["value"]),
5233                condition_op: ComparisonOp::NotEqual,
5234                condition_value: zero_32,
5235            },
5236        ];
5237
5238        vm.execute_handler(&handler, &event_zeros, "test", 0, "Test", None, None)
5239            .unwrap();
5240        assert!(
5241            vm.registers[2].get("captured_value").is_none(),
5242            "Field should not be set when value is all zeros"
5243        );
5244
5245        vm.reset_registers();
5246        vm.execute_handler(&handler, &event_nonzero, "test", 0, "Test", None, None)
5247            .unwrap();
5248        assert!(
5249            vm.registers[2].get("captured_value").is_some(),
5250            "Field should be set when value is non-zero"
5251        );
5252    }
5253
5254    #[test]
5255    fn test_when_instruction_arrives_first() {
5256        let mut vm = VmContext::new();
5257
5258        let signature = "test_sig_123".to_string();
5259
5260        {
5261            let state = vm.states.get(&0).unwrap();
5262            let mut cache = state.recent_tx_instructions.lock().unwrap();
5263            let mut set = HashSet::new();
5264            set.insert("RevealIxState".to_string());
5265            cache.put(signature.clone(), set);
5266        }
5267
5268        vm.current_context = Some(UpdateContext::new(100, signature.clone()));
5269
5270        let handler = vec![
5271            OpCode::CreateObject { dest: 2 },
5272            OpCode::LoadConstant {
5273                value: json!("primary_key_value"),
5274                dest: 1,
5275            },
5276            OpCode::LoadConstant {
5277                value: json!("the_revealed_value"),
5278                dest: 10,
5279            },
5280            OpCode::SetFieldWhen {
5281                object: 2,
5282                path: "entropy_value".to_string(),
5283                value: 10,
5284                when_instruction: "RevealIxState".to_string(),
5285                entity_name: "TestEntity".to_string(),
5286                key_reg: 1,
5287                condition_field: None,
5288                condition_op: None,
5289                condition_value: None,
5290            },
5291        ];
5292
5293        vm.execute_handler(
5294            &handler,
5295            &json!({}),
5296            "VarState",
5297            0,
5298            "TestEntity",
5299            None,
5300            None,
5301        )
5302        .unwrap();
5303
5304        assert_eq!(
5305            vm.registers[2].get("entropy_value").unwrap(),
5306            "the_revealed_value",
5307            "Field should be set when instruction was already seen"
5308        );
5309    }
5310
5311    #[test]
5312    fn test_when_account_arrives_first() {
5313        let mut vm = VmContext::new();
5314
5315        let signature = "test_sig_456".to_string();
5316
5317        vm.current_context = Some(UpdateContext::new(100, signature.clone()));
5318
5319        let handler = vec![
5320            OpCode::CreateObject { dest: 2 },
5321            OpCode::LoadConstant {
5322                value: json!("pk_123"),
5323                dest: 1,
5324            },
5325            OpCode::LoadConstant {
5326                value: json!("deferred_value"),
5327                dest: 10,
5328            },
5329            OpCode::SetFieldWhen {
5330                object: 2,
5331                path: "entropy_value".to_string(),
5332                value: 10,
5333                when_instruction: "RevealIxState".to_string(),
5334                entity_name: "TestEntity".to_string(),
5335                key_reg: 1,
5336                condition_field: None,
5337                condition_op: None,
5338                condition_value: None,
5339            },
5340        ];
5341
5342        vm.execute_handler(
5343            &handler,
5344            &json!({}),
5345            "VarState",
5346            0,
5347            "TestEntity",
5348            None,
5349            None,
5350        )
5351        .unwrap();
5352
5353        assert!(
5354            vm.registers[2].get("entropy_value").is_none(),
5355            "Field should not be set when instruction hasn't been seen"
5356        );
5357
5358        let state = vm.states.get(&0).unwrap();
5359        let key = (signature.clone(), "RevealIxState".to_string());
5360        assert!(
5361            state.deferred_when_ops.contains_key(&key),
5362            "Operation should be queued"
5363        );
5364
5365        {
5366            let mut cache = state.recent_tx_instructions.lock().unwrap();
5367            let mut set = HashSet::new();
5368            set.insert("RevealIxState".to_string());
5369            cache.put(signature.clone(), set);
5370        }
5371
5372        let deferred = state.deferred_when_ops.remove(&key).unwrap().1;
5373        for op in deferred {
5374            vm.apply_deferred_when_op(0, &op, None, None).unwrap();
5375        }
5376
5377        let state = vm.states.get(&0).unwrap();
5378        let entity = state.data.get(&json!("pk_123")).unwrap();
5379        assert_eq!(
5380            entity.get("entropy_value").unwrap(),
5381            "deferred_value",
5382            "Field should be set after instruction arrives"
5383        );
5384    }
5385
5386    #[test]
5387    fn test_when_cleanup_expired() {
5388        let mut vm = VmContext::new();
5389
5390        let state = vm.states.get(&0).unwrap();
5391        let key = ("old_sig".to_string(), "SomeIxState".to_string());
5392        state.deferred_when_ops.insert(
5393            key,
5394            vec![DeferredWhenOperation {
5395                entity_name: "Test".to_string(),
5396                primary_key: json!("pk"),
5397                field_path: "field".to_string(),
5398                field_value: json!("value"),
5399                when_instruction: "SomeIxState".to_string(),
5400                signature: "old_sig".to_string(),
5401                slot: 0,
5402                deferred_at: 0,
5403                emit: true,
5404            }],
5405        );
5406
5407        let removed = vm.cleanup_expired_when_ops(0, 60);
5408
5409        assert_eq!(removed, 1, "Should have removed 1 expired op");
5410        assert!(
5411            vm.states.get(&0).unwrap().deferred_when_ops.is_empty(),
5412            "Deferred ops should be empty after cleanup"
5413        );
5414    }
5415
5416    #[test]
5417    fn test_deferred_when_op_recomputes_dependent_fields() {
5418        use crate::ast::{BinaryOp, ComputedExpr, ComputedFieldSpec};
5419
5420        let mut vm = VmContext::new();
5421
5422        // Create computed field specs similar to the ore stack:
5423        // pre_reveal_rng depends on base_value
5424        // pre_reveal_winning_square depends on pre_reveal_rng
5425        let computed_specs = vec![
5426            ComputedFieldSpec {
5427                target_path: "results.pre_reveal_rng".to_string(),
5428                result_type: "Option<u64>".to_string(),
5429                expression: ComputedExpr::FieldRef {
5430                    path: "entropy.base_value".to_string(),
5431                },
5432            },
5433            ComputedFieldSpec {
5434                target_path: "results.pre_reveal_winning_square".to_string(),
5435                result_type: "Option<u64>".to_string(),
5436                expression: ComputedExpr::MethodCall {
5437                    expr: Box::new(ComputedExpr::FieldRef {
5438                        path: "results.pre_reveal_rng".to_string(),
5439                    }),
5440                    method: "map".to_string(),
5441                    args: vec![ComputedExpr::Closure {
5442                        param: "r".to_string(),
5443                        body: Box::new(ComputedExpr::Binary {
5444                            op: BinaryOp::Mod,
5445                            left: Box::new(ComputedExpr::Var {
5446                                name: "r".to_string(),
5447                            }),
5448                            right: Box::new(ComputedExpr::Literal {
5449                                value: serde_json::json!(25),
5450                            }),
5451                        }),
5452                    }],
5453                },
5454            },
5455        ];
5456
5457        let evaluator: Box<dyn Fn(&mut Value, Option<u64>, i64) -> Result<()> + Send + Sync> =
5458            Box::new(VmContext::create_evaluator_from_specs(computed_specs));
5459
5460        // Test that when we set entropy.base_value via deferred when-op,
5461        // both computed fields are updated
5462        let primary_key = json!("test_pk");
5463        let op = DeferredWhenOperation {
5464            entity_name: "TestEntity".to_string(),
5465            primary_key: primary_key.clone(),
5466            field_path: "entropy.base_value".to_string(),
5467            field_value: json!(100),
5468            when_instruction: "TestIxState".to_string(),
5469            signature: "test_sig".to_string(),
5470            slot: 100,
5471            deferred_at: 0,
5472            emit: true,
5473        };
5474
5475        // Store the entity in state first
5476        let initial_state = json!({
5477            "results": {}
5478        });
5479        vm.states
5480            .get(&0)
5481            .unwrap()
5482            .insert_with_eviction(primary_key.clone(), initial_state);
5483
5484        // Apply the deferred when-op with the evaluator
5485        let mutations = vm
5486            .apply_deferred_when_op(
5487                0,
5488                &op,
5489                Some(&evaluator),
5490                Some(&[
5491                    "results.pre_reveal_rng".to_string(),
5492                    "results.pre_reveal_winning_square".to_string(),
5493                ]),
5494            )
5495            .unwrap();
5496
5497        // Check the entity state
5498        let state = vm.states.get(&0).unwrap();
5499        let entity = state.data.get(&primary_key).unwrap();
5500
5501        println!(
5502            "Entity state: {}",
5503            serde_json::to_string_pretty(&*entity).unwrap()
5504        );
5505
5506        // Verify base value was set
5507        assert_eq!(
5508            entity.get("entropy").and_then(|e| e.get("base_value")),
5509            Some(&json!(100)),
5510            "Base value should be set"
5511        );
5512
5513        // Verify computed fields were calculated
5514        let pre_reveal_rng = entity
5515            .get("results")
5516            .and_then(|r| r.get("pre_reveal_rng"))
5517            .cloned();
5518        let pre_reveal_winning_square = entity
5519            .get("results")
5520            .and_then(|r| r.get("pre_reveal_winning_square"))
5521            .cloned();
5522
5523        assert_eq!(
5524            pre_reveal_rng,
5525            Some(json!(100)),
5526            "pre_reveal_rng should be computed"
5527        );
5528        assert_eq!(
5529            pre_reveal_winning_square,
5530            Some(json!(0)),
5531            "pre_reveal_winning_square should be 100 % 25 = 0"
5532        );
5533
5534        // Verify mutations include computed fields
5535        assert!(!mutations.is_empty(), "Should have mutations");
5536        let mutation = &mutations[0];
5537        let patch = &mutation.patch;
5538
5539        assert!(
5540            patch
5541                .get("results")
5542                .and_then(|r| r.get("pre_reveal_rng"))
5543                .is_some(),
5544            "Mutation should include pre_reveal_rng"
5545        );
5546        assert!(
5547            patch
5548                .get("results")
5549                .and_then(|r| r.get("pre_reveal_winning_square"))
5550                .is_some(),
5551            "Mutation should include pre_reveal_winning_square"
5552        );
5553    }
5554}