Skip to main content

hyperstack_interpreter/
vm.rs

1use crate::ast::{
2    BinaryOp, ComparisonOp, ComputedExpr, ComputedFieldSpec, FieldPath, Transformation,
3};
4use crate::compiler::{MultiEntityBytecode, OpCode};
5use crate::Mutation;
6use dashmap::DashMap;
7use lru::LruCache;
8use serde_json::{json, Value};
9use std::collections::{HashMap, HashSet};
10use std::num::NonZeroUsize;
11
12#[cfg(feature = "otel")]
13use tracing::instrument;
14/// Context metadata for blockchain updates (accounts and instructions)
15/// This structure is designed to be extended over time with additional metadata
16#[derive(Debug, Clone, Default)]
17pub struct UpdateContext {
18    /// Blockchain slot number
19    pub slot: Option<u64>,
20    /// Transaction signature
21    pub signature: Option<String>,
22    /// Unix timestamp (seconds since epoch)
23    /// If not provided, will default to current system time when accessed
24    pub timestamp: Option<i64>,
25    /// Write version for account updates (monotonically increasing per account within a slot)
26    /// Used for staleness detection to reject out-of-order updates
27    pub write_version: Option<u64>,
28    /// Transaction index for instruction updates (orders transactions within a slot)
29    /// Used for staleness detection to reject out-of-order updates
30    pub txn_index: Option<u64>,
31    /// Additional custom metadata that can be added without breaking changes
32    pub metadata: HashMap<String, Value>,
33}
34
35impl UpdateContext {
36    /// Create a new UpdateContext with slot and signature
37    pub fn new(slot: u64, signature: String) -> Self {
38        Self {
39            slot: Some(slot),
40            signature: Some(signature),
41            timestamp: None,
42            write_version: None,
43            txn_index: None,
44            metadata: HashMap::new(),
45        }
46    }
47
48    /// Create a new UpdateContext with slot, signature, and timestamp
49    pub fn with_timestamp(slot: u64, signature: String, timestamp: i64) -> Self {
50        Self {
51            slot: Some(slot),
52            signature: Some(signature),
53            timestamp: Some(timestamp),
54            write_version: None,
55            txn_index: None,
56            metadata: HashMap::new(),
57        }
58    }
59
60    /// Create context for account updates with write_version for staleness detection
61    pub fn new_account(slot: u64, signature: String, write_version: u64) -> Self {
62        Self {
63            slot: Some(slot),
64            signature: Some(signature),
65            timestamp: None,
66            write_version: Some(write_version),
67            txn_index: None,
68            metadata: HashMap::new(),
69        }
70    }
71
72    /// Create context for instruction updates with txn_index for staleness detection
73    pub fn new_instruction(slot: u64, signature: String, txn_index: u64) -> Self {
74        Self {
75            slot: Some(slot),
76            signature: Some(signature),
77            timestamp: None,
78            write_version: None,
79            txn_index: Some(txn_index),
80            metadata: HashMap::new(),
81        }
82    }
83
84    /// Get the timestamp, falling back to current system time if not set
85    pub fn timestamp(&self) -> i64 {
86        self.timestamp.unwrap_or_else(|| {
87            std::time::SystemTime::now()
88                .duration_since(std::time::UNIX_EPOCH)
89                .unwrap()
90                .as_secs() as i64
91        })
92    }
93
94    /// Create an empty context (for testing or when context is not available)
95    pub fn empty() -> Self {
96        Self::default()
97    }
98
99    /// Add custom metadata
100    /// Returns true if this is an account update context (has write_version, no txn_index)
101    pub fn is_account_update(&self) -> bool {
102        self.write_version.is_some() && self.txn_index.is_none()
103    }
104
105    /// Returns true if this is an instruction update context (has txn_index, no write_version)
106    pub fn is_instruction_update(&self) -> bool {
107        self.txn_index.is_some() && self.write_version.is_none()
108    }
109
110    pub fn with_metadata(mut self, key: String, value: Value) -> Self {
111        self.metadata.insert(key, value);
112        self
113    }
114
115    /// Get metadata value
116    pub fn get_metadata(&self, key: &str) -> Option<&Value> {
117        self.metadata.get(key)
118    }
119
120    /// Convert context to JSON value for injection into event data
121    pub fn to_value(&self) -> Value {
122        let mut obj = serde_json::Map::new();
123        if let Some(slot) = self.slot {
124            obj.insert("slot".to_string(), json!(slot));
125        }
126        if let Some(ref sig) = self.signature {
127            obj.insert("signature".to_string(), json!(sig));
128        }
129        // Always include timestamp (use current time if not set)
130        obj.insert("timestamp".to_string(), json!(self.timestamp()));
131        for (key, value) in &self.metadata {
132            obj.insert(key.clone(), value.clone());
133        }
134        Value::Object(obj)
135    }
136}
137
138pub type Register = usize;
139pub type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>;
140
141pub type RegisterValue = Value;
142
143/// Trait for evaluating computed fields
144/// Implement this in your generated spec to enable computed field evaluation
145pub trait ComputedFieldsEvaluator {
146    fn evaluate(&self, state: &mut Value) -> Result<()>;
147}
148
149// Pending queue configuration
150const MAX_PENDING_UPDATES_TOTAL: usize = 2_500;
151const MAX_PENDING_UPDATES_PER_PDA: usize = 50;
152const PENDING_UPDATE_TTL_SECONDS: i64 = 300; // 5 minutes
153
154// Temporal index configuration - prevents unbounded history growth
155const TEMPORAL_HISTORY_TTL_SECONDS: i64 = 300; // 5 minutes, matches pending queue TTL
156const MAX_TEMPORAL_ENTRIES_PER_KEY: usize = 250;
157
158// State table configuration - aligned with downstream EntityCache (500 per view)
159const DEFAULT_MAX_STATE_TABLE_ENTRIES: usize = 2_500;
160const DEFAULT_MAX_ARRAY_LENGTH: usize = 100;
161
162const DEFAULT_MAX_LOOKUP_INDEX_ENTRIES: usize = 2_500;
163
164const DEFAULT_MAX_VERSION_TRACKER_ENTRIES: usize = 2_500;
165
166// Smaller cache for instruction deduplication - provides shorter effective TTL
167// since we don't expect instruction duplicates to arrive much later
168const DEFAULT_MAX_INSTRUCTION_DEDUP_ENTRIES: usize = 500;
169
170const DEFAULT_MAX_TEMPORAL_INDEX_KEYS: usize = 2_500;
171
172const DEFAULT_MAX_PDA_REVERSE_LOOKUP_ENTRIES: usize = 2_500;
173
174/// Estimate the size of a JSON value in bytes
175fn estimate_json_size(value: &Value) -> usize {
176    match value {
177        Value::Null => 4,
178        Value::Bool(_) => 5,
179        Value::Number(_) => 8,
180        Value::String(s) => s.len() + 2,
181        Value::Array(arr) => 2 + arr.iter().map(|v| estimate_json_size(v) + 1).sum::<usize>(),
182        Value::Object(obj) => {
183            2 + obj
184                .iter()
185                .map(|(k, v)| k.len() + 3 + estimate_json_size(v) + 1)
186                .sum::<usize>()
187        }
188    }
189}
190
191#[derive(Debug, Clone)]
192pub struct CompiledPath {
193    pub segments: std::sync::Arc<[String]>,
194}
195
196impl CompiledPath {
197    pub fn new(path: &str) -> Self {
198        let segments: Vec<String> = path.split('.').map(|s| s.to_string()).collect();
199        CompiledPath {
200            segments: segments.into(),
201        }
202    }
203
204    fn segments(&self) -> &[String] {
205        &self.segments
206    }
207}
208
209/// Represents the type of change made to a field for granular dirty tracking.
210/// This enables emitting only the actual changes rather than entire field values.
211#[derive(Debug, Clone)]
212pub enum FieldChange {
213    /// Field was replaced with a new value (emit the full value from state)
214    Replaced,
215    /// Items were appended to an array field (emit only the new items)
216    Appended(Vec<Value>),
217}
218
219/// Tracks field modifications during handler execution with granular change information.
220/// This replaces the simple HashSet<String> approach to enable delta-only emissions.
221#[derive(Debug, Clone, Default)]
222pub struct DirtyTracker {
223    changes: HashMap<String, FieldChange>,
224}
225
226impl DirtyTracker {
227    /// Create a new empty DirtyTracker
228    pub fn new() -> Self {
229        Self {
230            changes: HashMap::new(),
231        }
232    }
233
234    /// Mark a field as replaced (full value will be emitted)
235    pub fn mark_replaced(&mut self, path: &str) {
236        // If there was an append, it's now superseded by a full replacement
237        self.changes.insert(path.to_string(), FieldChange::Replaced);
238    }
239
240    /// Record an appended value for a field
241    pub fn mark_appended(&mut self, path: &str, value: Value) {
242        match self.changes.get_mut(path) {
243            Some(FieldChange::Appended(values)) => {
244                // Add to existing appended values
245                values.push(value);
246            }
247            Some(FieldChange::Replaced) => {
248                // Field was already replaced, keep it as replaced
249                // (the full value including the append will be emitted)
250            }
251            None => {
252                // First append to this field
253                self.changes
254                    .insert(path.to_string(), FieldChange::Appended(vec![value]));
255            }
256        }
257    }
258
259    /// Check if there are any changes tracked
260    pub fn is_empty(&self) -> bool {
261        self.changes.is_empty()
262    }
263
264    /// Get the number of changed fields
265    pub fn len(&self) -> usize {
266        self.changes.len()
267    }
268
269    /// Iterate over all changes
270    pub fn iter(&self) -> impl Iterator<Item = (&String, &FieldChange)> {
271        self.changes.iter()
272    }
273
274    /// Get a set of all dirty field paths (for backward compatibility)
275    pub fn dirty_paths(&self) -> HashSet<String> {
276        self.changes.keys().cloned().collect()
277    }
278
279    /// Consume the tracker and return the changes map
280    pub fn into_changes(self) -> HashMap<String, FieldChange> {
281        self.changes
282    }
283
284    /// Get a reference to the changes map
285    pub fn changes(&self) -> &HashMap<String, FieldChange> {
286        &self.changes
287    }
288
289    /// Get paths that were appended (not replaced)
290    pub fn appended_paths(&self) -> Vec<String> {
291        self.changes
292            .iter()
293            .filter_map(|(path, change)| match change {
294                FieldChange::Appended(_) => Some(path.clone()),
295                FieldChange::Replaced => None,
296            })
297            .collect()
298    }
299}
300
301pub struct VmContext {
302    registers: Vec<RegisterValue>,
303    states: HashMap<u32, StateTable>,
304    pub instructions_executed: u64,
305    pub cache_hits: u64,
306    path_cache: HashMap<String, CompiledPath>,
307    pub pda_cache_hits: u64,
308    pub pda_cache_misses: u64,
309    pub pending_queue_size: u64,
310    current_context: Option<UpdateContext>,
311    warnings: Vec<String>,
312    last_pda_lookup_miss: Option<String>,
313    last_pda_registered: Option<String>,
314    last_lookup_index_keys: Vec<String>,
315}
316
317#[derive(Debug)]
318pub struct LookupIndex {
319    index: std::sync::Mutex<LruCache<String, Value>>,
320}
321
322impl LookupIndex {
323    pub fn new() -> Self {
324        Self::with_capacity(DEFAULT_MAX_LOOKUP_INDEX_ENTRIES)
325    }
326
327    pub fn with_capacity(capacity: usize) -> Self {
328        LookupIndex {
329            index: std::sync::Mutex::new(LruCache::new(
330                NonZeroUsize::new(capacity).expect("capacity must be > 0"),
331            )),
332        }
333    }
334
335    pub fn lookup(&self, lookup_value: &Value) -> Option<Value> {
336        let key = value_to_cache_key(lookup_value);
337        self.index.lock().unwrap().get(&key).cloned()
338    }
339
340    pub fn insert(&self, lookup_value: Value, primary_key: Value) {
341        let key = value_to_cache_key(&lookup_value);
342        self.index.lock().unwrap().put(key, primary_key);
343    }
344
345    pub fn len(&self) -> usize {
346        self.index.lock().unwrap().len()
347    }
348
349    pub fn is_empty(&self) -> bool {
350        self.index.lock().unwrap().is_empty()
351    }
352}
353
354impl Default for LookupIndex {
355    fn default() -> Self {
356        Self::new()
357    }
358}
359
360fn value_to_cache_key(value: &Value) -> String {
361    match value {
362        Value::String(s) => s.clone(),
363        Value::Number(n) => n.to_string(),
364        Value::Bool(b) => b.to_string(),
365        Value::Null => "null".to_string(),
366        _ => serde_json::to_string(value).unwrap_or_else(|_| "unknown".to_string()),
367    }
368}
369
370#[derive(Debug)]
371pub struct TemporalIndex {
372    index: std::sync::Mutex<LruCache<String, Vec<(Value, i64)>>>,
373}
374
375impl Default for TemporalIndex {
376    fn default() -> Self {
377        Self::new()
378    }
379}
380
381impl TemporalIndex {
382    pub fn new() -> Self {
383        Self::with_capacity(DEFAULT_MAX_TEMPORAL_INDEX_KEYS)
384    }
385
386    pub fn with_capacity(capacity: usize) -> Self {
387        TemporalIndex {
388            index: std::sync::Mutex::new(LruCache::new(
389                NonZeroUsize::new(capacity).expect("capacity must be > 0"),
390            )),
391        }
392    }
393
394    pub fn lookup(&self, lookup_value: &Value, timestamp: i64) -> Option<Value> {
395        let key = value_to_cache_key(lookup_value);
396        let mut cache = self.index.lock().unwrap();
397        if let Some(entries) = cache.get(&key) {
398            for i in (0..entries.len()).rev() {
399                if entries[i].1 <= timestamp {
400                    return Some(entries[i].0.clone());
401                }
402            }
403        }
404        None
405    }
406
407    pub fn lookup_latest(&self, lookup_value: &Value) -> Option<Value> {
408        let key = value_to_cache_key(lookup_value);
409        let mut cache = self.index.lock().unwrap();
410        if let Some(entries) = cache.get(&key) {
411            if let Some(last) = entries.last() {
412                return Some(last.0.clone());
413            }
414        }
415        None
416    }
417
418    pub fn insert(&self, lookup_value: Value, primary_key: Value, timestamp: i64) {
419        let key = value_to_cache_key(&lookup_value);
420        let mut cache = self.index.lock().unwrap();
421
422        let entries = cache.get_or_insert_mut(key, Vec::new);
423        entries.push((primary_key, timestamp));
424        entries.sort_by_key(|(_, ts)| *ts);
425
426        let cutoff = timestamp - TEMPORAL_HISTORY_TTL_SECONDS;
427        entries.retain(|(_, ts)| *ts >= cutoff);
428
429        if entries.len() > MAX_TEMPORAL_ENTRIES_PER_KEY {
430            let excess = entries.len() - MAX_TEMPORAL_ENTRIES_PER_KEY;
431            entries.drain(0..excess);
432        }
433    }
434
435    pub fn len(&self) -> usize {
436        self.index.lock().unwrap().len()
437    }
438
439    pub fn is_empty(&self) -> bool {
440        self.index.lock().unwrap().is_empty()
441    }
442
443    pub fn total_entries(&self) -> usize {
444        self.index
445            .lock()
446            .unwrap()
447            .iter()
448            .map(|(_, entries)| entries.len())
449            .sum()
450    }
451
452    pub fn cleanup_expired(&self, cutoff_timestamp: i64) -> usize {
453        let mut cache = self.index.lock().unwrap();
454        let mut total_removed = 0;
455
456        for (_, entries) in cache.iter_mut() {
457            let original_len = entries.len();
458            entries.retain(|(_, ts)| *ts >= cutoff_timestamp);
459            total_removed += original_len - entries.len();
460        }
461
462        total_removed
463    }
464}
465
466#[derive(Debug)]
467pub struct PdaReverseLookup {
468    // Maps: PDA address -> seed value (e.g., bonding_curve_addr -> mint)
469    index: LruCache<String, String>,
470}
471
472impl PdaReverseLookup {
473    pub fn new(capacity: usize) -> Self {
474        PdaReverseLookup {
475            index: LruCache::new(NonZeroUsize::new(capacity).unwrap()),
476        }
477    }
478
479    pub fn lookup(&mut self, pda_address: &str) -> Option<String> {
480        self.index.get(pda_address).cloned()
481    }
482
483    pub fn insert(&mut self, pda_address: String, seed_value: String) -> Option<String> {
484        let evicted = if self.index.len() >= self.index.cap().get() {
485            self.index.peek_lru().map(|(k, _)| k.clone())
486        } else {
487            None
488        };
489
490        self.index.put(pda_address, seed_value);
491        evicted
492    }
493
494    pub fn len(&self) -> usize {
495        self.index.len()
496    }
497
498    pub fn is_empty(&self) -> bool {
499        self.index.is_empty()
500    }
501}
502
503/// Input for queueing an account update.
504#[derive(Debug, Clone)]
505pub struct QueuedAccountUpdate {
506    pub pda_address: String,
507    pub account_type: String,
508    pub account_data: Value,
509    pub slot: u64,
510    pub write_version: u64,
511    pub signature: String,
512}
513
514/// Internal representation of a pending account update with queue metadata.
515#[derive(Debug, Clone)]
516pub struct PendingAccountUpdate {
517    pub account_type: String,
518    pub pda_address: String,
519    pub account_data: Value,
520    pub slot: u64,
521    pub write_version: u64,
522    pub signature: String,
523    pub queued_at: i64,
524}
525
526/// Input for queueing an instruction event when PDA lookup fails.
527#[derive(Debug, Clone)]
528pub struct QueuedInstructionEvent {
529    pub pda_address: String,
530    pub event_type: String,
531    pub event_data: Value,
532    pub slot: u64,
533    pub signature: String,
534}
535
536/// Internal representation of a pending instruction event with queue metadata.
537#[derive(Debug, Clone)]
538pub struct PendingInstructionEvent {
539    pub event_type: String,
540    pub pda_address: String,
541    pub event_data: Value,
542    pub slot: u64,
543    pub signature: String,
544    pub queued_at: i64,
545}
546
547#[derive(Debug, Clone)]
548pub struct PendingQueueStats {
549    pub total_updates: usize,
550    pub unique_pdas: usize,
551    pub oldest_age_seconds: i64,
552    pub largest_pda_queue_size: usize,
553    pub estimated_memory_bytes: usize,
554}
555
556#[derive(Debug, Clone, Default)]
557pub struct VmMemoryStats {
558    pub state_table_entity_count: usize,
559    pub state_table_max_entries: usize,
560    pub state_table_at_capacity: bool,
561    pub lookup_index_count: usize,
562    pub lookup_index_total_entries: usize,
563    pub temporal_index_count: usize,
564    pub temporal_index_total_entries: usize,
565    pub pda_reverse_lookup_count: usize,
566    pub pda_reverse_lookup_total_entries: usize,
567    pub version_tracker_entries: usize,
568    pub pending_queue_stats: Option<PendingQueueStats>,
569    pub path_cache_size: usize,
570}
571
572#[derive(Debug, Clone, Default)]
573pub struct CleanupResult {
574    pub pending_updates_removed: usize,
575    pub temporal_entries_removed: usize,
576}
577
578#[derive(Debug, Clone)]
579pub struct CapacityWarning {
580    pub current_entries: usize,
581    pub max_entries: usize,
582    pub entries_over_limit: usize,
583}
584
585#[derive(Debug, Clone)]
586pub struct StateTableConfig {
587    pub max_entries: usize,
588    pub max_array_length: usize,
589}
590
591impl Default for StateTableConfig {
592    fn default() -> Self {
593        Self {
594            max_entries: DEFAULT_MAX_STATE_TABLE_ENTRIES,
595            max_array_length: DEFAULT_MAX_ARRAY_LENGTH,
596        }
597    }
598}
599
600#[derive(Debug)]
601pub struct VersionTracker {
602    cache: std::sync::Mutex<LruCache<String, (u64, u64)>>,
603}
604
605impl VersionTracker {
606    pub fn new() -> Self {
607        Self::with_capacity(DEFAULT_MAX_VERSION_TRACKER_ENTRIES)
608    }
609
610    pub fn with_capacity(capacity: usize) -> Self {
611        VersionTracker {
612            cache: std::sync::Mutex::new(LruCache::new(
613                NonZeroUsize::new(capacity).expect("capacity must be > 0"),
614            )),
615        }
616    }
617
618    fn make_key(primary_key: &Value, event_type: &str) -> String {
619        format!("{}:{}", primary_key, event_type)
620    }
621
622    pub fn get(&self, primary_key: &Value, event_type: &str) -> Option<(u64, u64)> {
623        let key = Self::make_key(primary_key, event_type);
624        self.cache.lock().unwrap().get(&key).copied()
625    }
626
627    pub fn insert(&self, primary_key: &Value, event_type: &str, slot: u64, ordering_value: u64) {
628        let key = Self::make_key(primary_key, event_type);
629        self.cache.lock().unwrap().put(key, (slot, ordering_value));
630    }
631
632    pub fn len(&self) -> usize {
633        self.cache.lock().unwrap().len()
634    }
635
636    pub fn is_empty(&self) -> bool {
637        self.cache.lock().unwrap().is_empty()
638    }
639}
640
641impl Default for VersionTracker {
642    fn default() -> Self {
643        Self::new()
644    }
645}
646
647#[derive(Debug)]
648pub struct StateTable {
649    pub data: DashMap<Value, Value>,
650    access_times: DashMap<Value, i64>,
651    pub lookup_indexes: HashMap<String, LookupIndex>,
652    pub temporal_indexes: HashMap<String, TemporalIndex>,
653    pub pda_reverse_lookups: HashMap<String, PdaReverseLookup>,
654    pub pending_updates: DashMap<String, Vec<PendingAccountUpdate>>,
655    pub pending_instruction_events: DashMap<String, Vec<PendingInstructionEvent>>,
656    version_tracker: VersionTracker,
657    instruction_dedup_cache: VersionTracker,
658    config: StateTableConfig,
659    #[cfg_attr(not(feature = "otel"), allow(dead_code))]
660    entity_name: String,
661}
662
663impl StateTable {
664    pub fn is_at_capacity(&self) -> bool {
665        self.data.len() >= self.config.max_entries
666    }
667
668    pub fn entries_over_limit(&self) -> usize {
669        self.data.len().saturating_sub(self.config.max_entries)
670    }
671
672    pub fn max_array_length(&self) -> usize {
673        self.config.max_array_length
674    }
675
676    fn touch(&self, key: &Value) {
677        let now = std::time::SystemTime::now()
678            .duration_since(std::time::UNIX_EPOCH)
679            .unwrap()
680            .as_secs() as i64;
681        self.access_times.insert(key.clone(), now);
682    }
683
684    fn evict_lru(&self, count: usize) -> usize {
685        if count == 0 || self.data.is_empty() {
686            return 0;
687        }
688
689        let mut entries: Vec<(Value, i64)> = self
690            .access_times
691            .iter()
692            .map(|entry| (entry.key().clone(), *entry.value()))
693            .collect();
694
695        entries.sort_by_key(|(_, ts)| *ts);
696
697        let to_evict: Vec<Value> = entries.iter().take(count).map(|(k, _)| k.clone()).collect();
698
699        let mut evicted = 0;
700        for key in to_evict {
701            self.data.remove(&key);
702            self.access_times.remove(&key);
703            evicted += 1;
704        }
705
706        #[cfg(feature = "otel")]
707        if evicted > 0 {
708            crate::vm_metrics::record_state_table_eviction(evicted as u64, &self.entity_name);
709        }
710
711        evicted
712    }
713
714    pub fn insert_with_eviction(&self, key: Value, value: Value) {
715        if self.data.len() >= self.config.max_entries && !self.data.contains_key(&key) {
716            #[cfg(feature = "otel")]
717            crate::vm_metrics::record_state_table_at_capacity(&self.entity_name);
718            let to_evict = (self.data.len() + 1).saturating_sub(self.config.max_entries);
719            self.evict_lru(to_evict.max(1));
720        }
721        self.data.insert(key.clone(), value);
722        self.touch(&key);
723    }
724
725    pub fn get_and_touch(&self, key: &Value) -> Option<Value> {
726        let result = self.data.get(key).map(|v| v.clone());
727        if result.is_some() {
728            self.touch(key);
729        }
730        result
731    }
732
733    /// Check if an update is fresh and update the version tracker.
734    /// Returns true if the update should be processed (is fresh).
735    /// Returns false if the update is stale and should be skipped.
736    ///
737    /// Comparison is lexicographic on (slot, ordering_value):
738    /// (100, 5) > (100, 3) > (99, 999)
739    pub fn is_fresh_update(
740        &self,
741        primary_key: &Value,
742        event_type: &str,
743        slot: u64,
744        ordering_value: u64,
745    ) -> bool {
746        let dominated = self
747            .version_tracker
748            .get(primary_key, event_type)
749            .map(|(last_slot, last_version)| (slot, ordering_value) <= (last_slot, last_version))
750            .unwrap_or(false);
751
752        if dominated {
753            return false;
754        }
755
756        self.version_tracker
757            .insert(primary_key, event_type, slot, ordering_value);
758        true
759    }
760
761    /// Check if an instruction is a duplicate of one we've seen recently.
762    /// Returns true if this exact instruction has been seen before (is a duplicate).
763    /// Returns false if this is a new instruction that should be processed.
764    ///
765    /// Unlike account updates, instructions don't use recency checks - all
766    /// unique instructions are processed. Only exact duplicates are skipped.
767    /// Uses a smaller cache capacity for shorter effective TTL.
768    pub fn is_duplicate_instruction(
769        &self,
770        primary_key: &Value,
771        event_type: &str,
772        slot: u64,
773        txn_index: u64,
774    ) -> bool {
775        // Check if we've seen this exact instruction before
776        let is_duplicate = self
777            .instruction_dedup_cache
778            .get(primary_key, event_type)
779            .map(|(last_slot, last_txn_index)| slot == last_slot && txn_index == last_txn_index)
780            .unwrap_or(false);
781
782        if is_duplicate {
783            return true;
784        }
785
786        // Record this instruction for deduplication
787        self.instruction_dedup_cache
788            .insert(primary_key, event_type, slot, txn_index);
789        false
790    }
791}
792
793impl VmContext {
794    pub fn new() -> Self {
795        let mut vm = VmContext {
796            registers: vec![Value::Null; 256],
797            states: HashMap::new(),
798            instructions_executed: 0,
799            cache_hits: 0,
800            path_cache: HashMap::new(),
801            pda_cache_hits: 0,
802            pda_cache_misses: 0,
803            pending_queue_size: 0,
804            current_context: None,
805            warnings: Vec::new(),
806            last_pda_lookup_miss: None,
807            last_pda_registered: None,
808            last_lookup_index_keys: Vec::new(),
809        };
810        vm.states.insert(
811            0,
812            StateTable {
813                data: DashMap::new(),
814                access_times: DashMap::new(),
815                lookup_indexes: HashMap::new(),
816                temporal_indexes: HashMap::new(),
817                pda_reverse_lookups: HashMap::new(),
818                pending_updates: DashMap::new(),
819                pending_instruction_events: DashMap::new(),
820                version_tracker: VersionTracker::new(),
821                instruction_dedup_cache: VersionTracker::with_capacity(
822                    DEFAULT_MAX_INSTRUCTION_DEDUP_ENTRIES,
823                ),
824                config: StateTableConfig::default(),
825                entity_name: "default".to_string(),
826            },
827        );
828        vm
829    }
830
831    pub fn new_with_config(state_config: StateTableConfig) -> Self {
832        let mut vm = VmContext {
833            registers: vec![Value::Null; 256],
834            states: HashMap::new(),
835            instructions_executed: 0,
836            cache_hits: 0,
837            path_cache: HashMap::new(),
838            pda_cache_hits: 0,
839            pda_cache_misses: 0,
840            pending_queue_size: 0,
841            current_context: None,
842            warnings: Vec::new(),
843            last_pda_lookup_miss: None,
844            last_pda_registered: None,
845            last_lookup_index_keys: Vec::new(),
846        };
847        vm.states.insert(
848            0,
849            StateTable {
850                data: DashMap::new(),
851                access_times: DashMap::new(),
852                lookup_indexes: HashMap::new(),
853                temporal_indexes: HashMap::new(),
854                pda_reverse_lookups: HashMap::new(),
855                pending_updates: DashMap::new(),
856                pending_instruction_events: DashMap::new(),
857                version_tracker: VersionTracker::new(),
858                instruction_dedup_cache: VersionTracker::with_capacity(
859                    DEFAULT_MAX_INSTRUCTION_DEDUP_ENTRIES,
860                ),
861                config: state_config,
862                entity_name: "default".to_string(),
863            },
864        );
865        vm
866    }
867
868    /// Get a mutable reference to a state table by ID
869    /// Returns None if the state ID doesn't exist
870    pub fn get_state_table_mut(&mut self, state_id: u32) -> Option<&mut StateTable> {
871        self.states.get_mut(&state_id)
872    }
873
874    /// Get public access to registers (for metrics context)
875    pub fn registers_mut(&mut self) -> &mut Vec<RegisterValue> {
876        &mut self.registers
877    }
878
879    /// Get public access to path cache (for metrics context)
880    pub fn path_cache(&self) -> &HashMap<String, CompiledPath> {
881        &self.path_cache
882    }
883
884    /// Get the current update context
885    pub fn current_context(&self) -> Option<&UpdateContext> {
886        self.current_context.as_ref()
887    }
888
889    fn add_warning(&mut self, msg: String) {
890        self.warnings.push(msg);
891    }
892
893    pub fn take_warnings(&mut self) -> Vec<String> {
894        std::mem::take(&mut self.warnings)
895    }
896
897    pub fn has_warnings(&self) -> bool {
898        !self.warnings.is_empty()
899    }
900
901    pub fn update_state_from_register(
902        &mut self,
903        state_id: u32,
904        key: Value,
905        register: Register,
906    ) -> Result<()> {
907        let state = self.states.get(&state_id).ok_or("State table not found")?;
908        let value = self.registers[register].clone();
909        state.insert_with_eviction(key, value);
910        Ok(())
911    }
912
913    fn reset_registers(&mut self) {
914        for reg in &mut self.registers {
915            *reg = Value::Null;
916        }
917    }
918
919    /// Extract only the dirty fields from state (public for use by instruction hooks)
920    pub fn extract_partial_state(
921        &self,
922        state_reg: Register,
923        dirty_fields: &HashSet<String>,
924    ) -> Result<Value> {
925        let full_state = &self.registers[state_reg];
926
927        if dirty_fields.is_empty() {
928            return Ok(json!({}));
929        }
930
931        let mut partial = serde_json::Map::new();
932
933        for path in dirty_fields {
934            let segments: Vec<&str> = path.split('.').collect();
935
936            let mut current = full_state;
937            let mut found = true;
938
939            for segment in &segments {
940                match current.get(segment) {
941                    Some(v) => current = v,
942                    None => {
943                        found = false;
944                        break;
945                    }
946                }
947            }
948
949            if !found {
950                continue;
951            }
952
953            let mut target = &mut partial;
954            for (i, segment) in segments.iter().enumerate() {
955                if i == segments.len() - 1 {
956                    target.insert(segment.to_string(), current.clone());
957                } else {
958                    target
959                        .entry(segment.to_string())
960                        .or_insert_with(|| json!({}));
961                    target = target
962                        .get_mut(*segment)
963                        .and_then(|v| v.as_object_mut())
964                        .ok_or("Failed to build nested structure")?;
965                }
966            }
967        }
968
969        Ok(Value::Object(partial))
970    }
971
972    /// Extract a patch from state based on the DirtyTracker.
973    /// For Replaced fields: extracts the full value from state.
974    /// For Appended fields: emits only the appended values as an array.
975    pub fn extract_partial_state_with_tracker(
976        &self,
977        state_reg: Register,
978        tracker: &DirtyTracker,
979    ) -> Result<Value> {
980        let full_state = &self.registers[state_reg];
981
982        if tracker.is_empty() {
983            return Ok(json!({}));
984        }
985
986        let mut partial = serde_json::Map::new();
987
988        for (path, change) in tracker.iter() {
989            let segments: Vec<&str> = path.split('.').collect();
990
991            let value_to_insert = match change {
992                FieldChange::Replaced => {
993                    let mut current = full_state;
994                    let mut found = true;
995
996                    for segment in &segments {
997                        match current.get(*segment) {
998                            Some(v) => current = v,
999                            None => {
1000                                found = false;
1001                                break;
1002                            }
1003                        }
1004                    }
1005
1006                    if !found {
1007                        continue;
1008                    }
1009                    current.clone()
1010                }
1011                FieldChange::Appended(values) => Value::Array(values.clone()),
1012            };
1013
1014            let mut target = &mut partial;
1015            for (i, segment) in segments.iter().enumerate() {
1016                if i == segments.len() - 1 {
1017                    target.insert(segment.to_string(), value_to_insert.clone());
1018                } else {
1019                    target
1020                        .entry(segment.to_string())
1021                        .or_insert_with(|| json!({}));
1022                    target = target
1023                        .get_mut(*segment)
1024                        .and_then(|v| v.as_object_mut())
1025                        .ok_or("Failed to build nested structure")?;
1026                }
1027            }
1028        }
1029
1030        Ok(Value::Object(partial))
1031    }
1032
1033    fn get_compiled_path(&mut self, path: &str) -> CompiledPath {
1034        if let Some(compiled) = self.path_cache.get(path) {
1035            self.cache_hits += 1;
1036            #[cfg(feature = "otel")]
1037            crate::vm_metrics::record_path_cache_hit();
1038            return compiled.clone();
1039        }
1040        #[cfg(feature = "otel")]
1041        crate::vm_metrics::record_path_cache_miss();
1042        let compiled = CompiledPath::new(path);
1043        self.path_cache.insert(path.to_string(), compiled.clone());
1044        compiled
1045    }
1046
1047    /// Process an event with optional context metadata
1048    #[cfg_attr(feature = "otel", instrument(
1049        name = "vm.process_event",
1050        skip(self, bytecode, event_value, log),
1051        level = "info",
1052        fields(
1053            event_type = %event_type,
1054            slot = context.as_ref().and_then(|c| c.slot),
1055        )
1056    ))]
1057    pub fn process_event(
1058        &mut self,
1059        bytecode: &MultiEntityBytecode,
1060        event_value: Value,
1061        event_type: &str,
1062        context: Option<&UpdateContext>,
1063        mut log: Option<&mut crate::canonical_log::CanonicalLog>,
1064    ) -> Result<Vec<Mutation>> {
1065        self.current_context = context.cloned();
1066
1067        let mut event_value = event_value;
1068        if let Some(ctx) = context {
1069            if let Some(obj) = event_value.as_object_mut() {
1070                obj.insert("__update_context".to_string(), ctx.to_value());
1071            }
1072        }
1073
1074        let mut all_mutations = Vec::new();
1075
1076        if let Some(entity_names) = bytecode.event_routing.get(event_type) {
1077            for entity_name in entity_names {
1078                if let Some(entity_bytecode) = bytecode.entities.get(entity_name) {
1079                    if let Some(handler) = entity_bytecode.handlers.get(event_type) {
1080                        if let Some(ref mut log) = log {
1081                            log.set("entity", entity_name.clone());
1082                            log.inc("handlers", 1);
1083                        }
1084
1085                        let opcodes_before = self.instructions_executed;
1086                        let cache_before = self.cache_hits;
1087                        let pda_hits_before = self.pda_cache_hits;
1088                        let pda_misses_before = self.pda_cache_misses;
1089
1090                        let mutations = self.execute_handler(
1091                            handler,
1092                            &event_value,
1093                            event_type,
1094                            entity_bytecode.state_id,
1095                            entity_name,
1096                            entity_bytecode.computed_fields_evaluator.as_ref(),
1097                        )?;
1098
1099                        if let Some(ref mut log) = log {
1100                            log.inc(
1101                                "opcodes",
1102                                (self.instructions_executed - opcodes_before) as i64,
1103                            );
1104                            log.inc("cache_hits", (self.cache_hits - cache_before) as i64);
1105                            log.inc("pda_hits", (self.pda_cache_hits - pda_hits_before) as i64);
1106                            log.inc(
1107                                "pda_misses",
1108                                (self.pda_cache_misses - pda_misses_before) as i64,
1109                            );
1110                        }
1111
1112                        if mutations.is_empty() {
1113                            if let Some(missed_pda) = self.take_last_pda_lookup_miss() {
1114                                if event_type.ends_with("IxState") {
1115                                    let slot = context.and_then(|c| c.slot).unwrap_or(0);
1116                                    let signature = context
1117                                        .and_then(|c| c.signature.clone())
1118                                        .unwrap_or_default();
1119                                    let _ = self.queue_instruction_event(
1120                                        entity_bytecode.state_id,
1121                                        QueuedInstructionEvent {
1122                                            pda_address: missed_pda,
1123                                            event_type: event_type.to_string(),
1124                                            event_data: event_value.clone(),
1125                                            slot,
1126                                            signature,
1127                                        },
1128                                    );
1129                                }
1130                            }
1131                        }
1132
1133                        all_mutations.extend(mutations);
1134
1135                        if let Some(registered_pda) = self.take_last_pda_registered() {
1136                            let pending_events = self.flush_pending_instruction_events(
1137                                entity_bytecode.state_id,
1138                                &registered_pda,
1139                            );
1140                            for pending in pending_events {
1141                                if let Some(pending_handler) =
1142                                    entity_bytecode.handlers.get(&pending.event_type)
1143                                {
1144                                    if let Ok(reprocessed_mutations) = self.execute_handler(
1145                                        pending_handler,
1146                                        &pending.event_data,
1147                                        &pending.event_type,
1148                                        entity_bytecode.state_id,
1149                                        entity_name,
1150                                        entity_bytecode.computed_fields_evaluator.as_ref(),
1151                                    ) {
1152                                        all_mutations.extend(reprocessed_mutations);
1153                                    }
1154                                }
1155                            }
1156                        }
1157
1158                        let lookup_keys = self.take_last_lookup_index_keys();
1159                        for lookup_key in lookup_keys {
1160                            if let Ok(pending_updates) =
1161                                self.flush_pending_updates(entity_bytecode.state_id, &lookup_key)
1162                            {
1163                                for pending in pending_updates {
1164                                    if let Some(pending_handler) =
1165                                        entity_bytecode.handlers.get(&pending.account_type)
1166                                    {
1167                                        self.current_context = Some(UpdateContext::new_account(
1168                                            pending.slot,
1169                                            pending.signature.clone(),
1170                                            pending.write_version,
1171                                        ));
1172                                        if let Ok(reprocessed) = self.execute_handler(
1173                                            pending_handler,
1174                                            &pending.account_data,
1175                                            &pending.account_type,
1176                                            entity_bytecode.state_id,
1177                                            entity_name,
1178                                            entity_bytecode.computed_fields_evaluator.as_ref(),
1179                                        ) {
1180                                            all_mutations.extend(reprocessed);
1181                                        }
1182                                    }
1183                                }
1184                            }
1185                        }
1186                    } else if let Some(ref mut log) = log {
1187                        log.set("skip_reason", "no_handler");
1188                    }
1189                } else if let Some(ref mut log) = log {
1190                    log.set("skip_reason", "entity_not_found");
1191                }
1192            }
1193        } else if let Some(ref mut log) = log {
1194            log.set("skip_reason", "no_event_routing");
1195        }
1196
1197        if let Some(log) = log {
1198            log.set("mutations", all_mutations.len() as i64);
1199            if let Some(first) = all_mutations.first() {
1200                if let Some(key_str) = first.key.as_str() {
1201                    log.set("primary_key", key_str);
1202                } else if let Some(key_num) = first.key.as_u64() {
1203                    log.set("primary_key", key_num as i64);
1204                }
1205            }
1206            if let Some(state) = self.states.get(&0) {
1207                log.set("state_table_size", state.data.len() as i64);
1208            }
1209
1210            let warnings = self.take_warnings();
1211            if !warnings.is_empty() {
1212                log.set("warnings", warnings.len() as i64);
1213                log.set(
1214                    "warning_messages",
1215                    Value::Array(warnings.into_iter().map(Value::String).collect()),
1216                );
1217                log.set_level(crate::canonical_log::LogLevel::Warn);
1218            }
1219        } else {
1220            self.warnings.clear();
1221        }
1222
1223        Ok(all_mutations)
1224    }
1225
1226    pub fn process_any(
1227        &mut self,
1228        bytecode: &MultiEntityBytecode,
1229        any: prost_types::Any,
1230    ) -> Result<Vec<Mutation>> {
1231        let (event_value, event_type) = bytecode.proto_router.decode(any)?;
1232        self.process_event(bytecode, event_value, &event_type, None, None)
1233    }
1234
1235    #[cfg_attr(feature = "otel", instrument(
1236        name = "vm.execute_handler",
1237        skip(self, handler, event_value, entity_evaluator),
1238        level = "debug",
1239        fields(
1240            event_type = %event_type,
1241            handler_opcodes = handler.len(),
1242        )
1243    ))]
1244    #[allow(clippy::type_complexity)]
1245    fn execute_handler(
1246        &mut self,
1247        handler: &[OpCode],
1248        event_value: &Value,
1249        event_type: &str,
1250        override_state_id: u32,
1251        entity_name: &str,
1252        entity_evaluator: Option<&Box<dyn Fn(&mut Value) -> Result<()> + Send + Sync>>,
1253    ) -> Result<Vec<Mutation>> {
1254        self.reset_registers();
1255        self.last_pda_lookup_miss = None;
1256
1257        let mut pc: usize = 0;
1258        let mut output = Vec::new();
1259        let mut dirty_tracker = DirtyTracker::new();
1260
1261        while pc < handler.len() {
1262            match &handler[pc] {
1263                OpCode::LoadEventField {
1264                    path,
1265                    dest,
1266                    default,
1267                } => {
1268                    let value = self.load_field(event_value, path, default.as_ref())?;
1269                    self.registers[*dest] = value;
1270                    pc += 1;
1271                }
1272                OpCode::LoadConstant { value, dest } => {
1273                    self.registers[*dest] = value.clone();
1274                    pc += 1;
1275                }
1276                OpCode::CopyRegister { source, dest } => {
1277                    self.registers[*dest] = self.registers[*source].clone();
1278                    pc += 1;
1279                }
1280                OpCode::CopyRegisterIfNull { source, dest } => {
1281                    if self.registers[*dest].is_null() {
1282                        self.registers[*dest] = self.registers[*source].clone();
1283                    }
1284                    pc += 1;
1285                }
1286                OpCode::GetEventType { dest } => {
1287                    self.registers[*dest] = json!(event_type);
1288                    pc += 1;
1289                }
1290                OpCode::CreateObject { dest } => {
1291                    self.registers[*dest] = json!({});
1292                    pc += 1;
1293                }
1294                OpCode::SetField {
1295                    object,
1296                    path,
1297                    value,
1298                } => {
1299                    self.set_field_auto_vivify(*object, path, *value)?;
1300                    dirty_tracker.mark_replaced(path);
1301                    pc += 1;
1302                }
1303                OpCode::SetFields { object, fields } => {
1304                    for (path, value_reg) in fields {
1305                        self.set_field_auto_vivify(*object, path, *value_reg)?;
1306                        dirty_tracker.mark_replaced(path);
1307                    }
1308                    pc += 1;
1309                }
1310                OpCode::GetField { object, path, dest } => {
1311                    let value = self.get_field(*object, path)?;
1312                    self.registers[*dest] = value;
1313                    pc += 1;
1314                }
1315                OpCode::ReadOrInitState {
1316                    state_id: _,
1317                    key,
1318                    default,
1319                    dest,
1320                } => {
1321                    let actual_state_id = override_state_id;
1322                    let entity_name_owned = entity_name.to_string();
1323                    self.states
1324                        .entry(actual_state_id)
1325                        .or_insert_with(|| StateTable {
1326                            data: DashMap::new(),
1327                            access_times: DashMap::new(),
1328                            lookup_indexes: HashMap::new(),
1329                            temporal_indexes: HashMap::new(),
1330                            pda_reverse_lookups: HashMap::new(),
1331                            pending_updates: DashMap::new(),
1332                            pending_instruction_events: DashMap::new(),
1333                            version_tracker: VersionTracker::new(),
1334                            instruction_dedup_cache: VersionTracker::with_capacity(
1335                                DEFAULT_MAX_INSTRUCTION_DEDUP_ENTRIES,
1336                            ),
1337                            config: StateTableConfig::default(),
1338                            entity_name: entity_name_owned,
1339                        });
1340                    let key_value = self.registers[*key].clone();
1341                    let warn_null_key = key_value.is_null()
1342                        && event_type.ends_with("State")
1343                        && !event_type.ends_with("IxState");
1344
1345                    if warn_null_key {
1346                        self.add_warning(format!(
1347                            "ReadOrInitState: key register {} is NULL for account state, event_type={}",
1348                            key, event_type
1349                        ));
1350                    }
1351
1352                    let state = self
1353                        .states
1354                        .get(&actual_state_id)
1355                        .ok_or("State table not found")?;
1356
1357                    if !key_value.is_null() {
1358                        if let Some(ctx) = &self.current_context {
1359                            // Account updates: use recency check to discard stale updates
1360                            if ctx.is_account_update() {
1361                                if let (Some(slot), Some(write_version)) =
1362                                    (ctx.slot, ctx.write_version)
1363                                {
1364                                    if !state.is_fresh_update(
1365                                        &key_value,
1366                                        event_type,
1367                                        slot,
1368                                        write_version,
1369                                    ) {
1370                                        self.add_warning(format!(
1371                                            "Stale account update skipped: slot={}, write_version={}",
1372                                            slot, write_version
1373                                        ));
1374                                        return Ok(Vec::new());
1375                                    }
1376                                }
1377                            }
1378                            // Instruction updates: process all, but skip exact duplicates
1379                            else if ctx.is_instruction_update() {
1380                                if let (Some(slot), Some(txn_index)) = (ctx.slot, ctx.txn_index) {
1381                                    if state.is_duplicate_instruction(
1382                                        &key_value, event_type, slot, txn_index,
1383                                    ) {
1384                                        self.add_warning(format!(
1385                                            "Duplicate instruction skipped: slot={}, txn_index={}",
1386                                            slot, txn_index
1387                                        ));
1388                                        return Ok(Vec::new());
1389                                    }
1390                                }
1391                            }
1392                        }
1393                    }
1394                    let value = state
1395                        .get_and_touch(&key_value)
1396                        .unwrap_or_else(|| default.clone());
1397
1398                    self.registers[*dest] = value;
1399                    pc += 1;
1400                }
1401                OpCode::UpdateState {
1402                    state_id: _,
1403                    key,
1404                    value,
1405                } => {
1406                    let actual_state_id = override_state_id;
1407                    let state = self
1408                        .states
1409                        .get(&actual_state_id)
1410                        .ok_or("State table not found")?;
1411                    let key_value = self.registers[*key].clone();
1412                    let value_data = self.registers[*value].clone();
1413
1414                    state.insert_with_eviction(key_value, value_data);
1415                    pc += 1;
1416                }
1417                OpCode::AppendToArray {
1418                    object,
1419                    path,
1420                    value,
1421                } => {
1422                    let appended_value = self.registers[*value].clone();
1423                    let max_len = self
1424                        .states
1425                        .get(&override_state_id)
1426                        .map(|s| s.max_array_length())
1427                        .unwrap_or(DEFAULT_MAX_ARRAY_LENGTH);
1428                    self.append_to_array(*object, path, *value, max_len)?;
1429                    dirty_tracker.mark_appended(path, appended_value);
1430                    pc += 1;
1431                }
1432                OpCode::GetCurrentTimestamp { dest } => {
1433                    let timestamp = std::time::SystemTime::now()
1434                        .duration_since(std::time::UNIX_EPOCH)
1435                        .unwrap()
1436                        .as_secs() as i64;
1437                    self.registers[*dest] = json!(timestamp);
1438                    pc += 1;
1439                }
1440                OpCode::CreateEvent { dest, event_value } => {
1441                    let timestamp = std::time::SystemTime::now()
1442                        .duration_since(std::time::UNIX_EPOCH)
1443                        .unwrap()
1444                        .as_secs() as i64;
1445
1446                    // Filter out __update_context from the event data
1447                    let mut event_data = self.registers[*event_value].clone();
1448                    if let Some(obj) = event_data.as_object_mut() {
1449                        obj.remove("__update_context");
1450                    }
1451
1452                    // Create event with timestamp, data, and optional slot/signature from context
1453                    let mut event = serde_json::Map::new();
1454                    event.insert("timestamp".to_string(), json!(timestamp));
1455                    event.insert("data".to_string(), event_data);
1456
1457                    // Add slot and signature if available from current context
1458                    if let Some(ref ctx) = self.current_context {
1459                        if let Some(slot) = ctx.slot {
1460                            event.insert("slot".to_string(), json!(slot));
1461                        }
1462                        if let Some(ref signature) = ctx.signature {
1463                            event.insert("signature".to_string(), json!(signature));
1464                        }
1465                    }
1466
1467                    self.registers[*dest] = Value::Object(event);
1468                    pc += 1;
1469                }
1470                OpCode::CreateCapture {
1471                    dest,
1472                    capture_value,
1473                } => {
1474                    let timestamp = std::time::SystemTime::now()
1475                        .duration_since(std::time::UNIX_EPOCH)
1476                        .unwrap()
1477                        .as_secs() as i64;
1478
1479                    // Get the capture data (already filtered by load_field)
1480                    let capture_data = self.registers[*capture_value].clone();
1481
1482                    // Extract account_address from the original event if available
1483                    let account_address = event_value
1484                        .get("__account_address")
1485                        .and_then(|v| v.as_str())
1486                        .unwrap_or("")
1487                        .to_string();
1488
1489                    // Create capture wrapper with timestamp, account_address, data, and optional slot/signature
1490                    let mut capture = serde_json::Map::new();
1491                    capture.insert("timestamp".to_string(), json!(timestamp));
1492                    capture.insert("account_address".to_string(), json!(account_address));
1493                    capture.insert("data".to_string(), capture_data);
1494
1495                    // Add slot and signature if available from current context
1496                    if let Some(ref ctx) = self.current_context {
1497                        if let Some(slot) = ctx.slot {
1498                            capture.insert("slot".to_string(), json!(slot));
1499                        }
1500                        if let Some(ref signature) = ctx.signature {
1501                            capture.insert("signature".to_string(), json!(signature));
1502                        }
1503                    }
1504
1505                    self.registers[*dest] = Value::Object(capture);
1506                    pc += 1;
1507                }
1508                OpCode::Transform {
1509                    source,
1510                    dest,
1511                    transformation,
1512                } => {
1513                    if source == dest {
1514                        self.transform_in_place(*source, transformation)?;
1515                    } else {
1516                        let source_value = &self.registers[*source];
1517                        let value = self.apply_transformation(source_value, transformation)?;
1518                        self.registers[*dest] = value;
1519                    }
1520                    pc += 1;
1521                }
1522                OpCode::EmitMutation {
1523                    entity_name,
1524                    key,
1525                    state,
1526                } => {
1527                    let primary_key = self.registers[*key].clone();
1528
1529                    if primary_key.is_null() || dirty_tracker.is_empty() {
1530                        let reason = if dirty_tracker.is_empty() {
1531                            "no_fields_modified"
1532                        } else {
1533                            "null_primary_key"
1534                        };
1535                        self.add_warning(format!(
1536                            "Skipping mutation for entity '{}': {} (dirty_fields={})",
1537                            entity_name,
1538                            reason,
1539                            dirty_tracker.len()
1540                        ));
1541                    } else {
1542                        let patch =
1543                            self.extract_partial_state_with_tracker(*state, &dirty_tracker)?;
1544
1545                        let append = dirty_tracker.appended_paths();
1546                        let mutation = Mutation {
1547                            export: entity_name.clone(),
1548                            key: primary_key,
1549                            patch,
1550                            append,
1551                        };
1552                        output.push(mutation);
1553                    }
1554                    pc += 1;
1555                }
1556                OpCode::SetFieldIfNull {
1557                    object,
1558                    path,
1559                    value,
1560                } => {
1561                    let was_set = self.set_field_if_null(*object, path, *value)?;
1562                    if was_set {
1563                        dirty_tracker.mark_replaced(path);
1564                    }
1565                    pc += 1;
1566                }
1567                OpCode::SetFieldMax {
1568                    object,
1569                    path,
1570                    value,
1571                } => {
1572                    let was_updated = self.set_field_max(*object, path, *value)?;
1573                    if was_updated {
1574                        dirty_tracker.mark_replaced(path);
1575                    }
1576                    pc += 1;
1577                }
1578                OpCode::UpdateTemporalIndex {
1579                    state_id: _,
1580                    index_name,
1581                    lookup_value,
1582                    primary_key,
1583                    timestamp,
1584                } => {
1585                    let actual_state_id = override_state_id;
1586                    let state = self
1587                        .states
1588                        .get_mut(&actual_state_id)
1589                        .ok_or("State table not found")?;
1590                    let index = state
1591                        .temporal_indexes
1592                        .entry(index_name.clone())
1593                        .or_insert_with(TemporalIndex::new);
1594
1595                    let lookup_val = self.registers[*lookup_value].clone();
1596                    let pk_val = self.registers[*primary_key].clone();
1597                    let ts_val = if let Some(val) = self.registers[*timestamp].as_i64() {
1598                        val
1599                    } else if let Some(val) = self.registers[*timestamp].as_u64() {
1600                        val as i64
1601                    } else {
1602                        return Err(format!(
1603                            "Timestamp must be a number (i64 or u64), got: {:?}",
1604                            self.registers[*timestamp]
1605                        )
1606                        .into());
1607                    };
1608
1609                    index.insert(lookup_val, pk_val, ts_val);
1610                    pc += 1;
1611                }
1612                OpCode::LookupTemporalIndex {
1613                    state_id: _,
1614                    index_name,
1615                    lookup_value,
1616                    timestamp,
1617                    dest,
1618                } => {
1619                    let actual_state_id = override_state_id;
1620                    let state = self
1621                        .states
1622                        .get(&actual_state_id)
1623                        .ok_or("State table not found")?;
1624                    let lookup_val = &self.registers[*lookup_value];
1625
1626                    let result = if self.registers[*timestamp].is_null() {
1627                        if let Some(index) = state.temporal_indexes.get(index_name) {
1628                            index.lookup_latest(lookup_val).unwrap_or(Value::Null)
1629                        } else {
1630                            Value::Null
1631                        }
1632                    } else {
1633                        let ts_val = if let Some(val) = self.registers[*timestamp].as_i64() {
1634                            val
1635                        } else if let Some(val) = self.registers[*timestamp].as_u64() {
1636                            val as i64
1637                        } else {
1638                            return Err(format!(
1639                                "Timestamp must be a number (i64 or u64), got: {:?}",
1640                                self.registers[*timestamp]
1641                            )
1642                            .into());
1643                        };
1644
1645                        if let Some(index) = state.temporal_indexes.get(index_name) {
1646                            index.lookup(lookup_val, ts_val).unwrap_or(Value::Null)
1647                        } else {
1648                            Value::Null
1649                        }
1650                    };
1651
1652                    self.registers[*dest] = result;
1653                    pc += 1;
1654                }
1655                OpCode::UpdateLookupIndex {
1656                    state_id: _,
1657                    index_name,
1658                    lookup_value,
1659                    primary_key,
1660                } => {
1661                    let actual_state_id = override_state_id;
1662                    let state = self
1663                        .states
1664                        .get_mut(&actual_state_id)
1665                        .ok_or("State table not found")?;
1666                    let index = state
1667                        .lookup_indexes
1668                        .entry(index_name.clone())
1669                        .or_insert_with(LookupIndex::new);
1670
1671                    let lookup_val = self.registers[*lookup_value].clone();
1672                    let pk_val = self.registers[*primary_key].clone();
1673
1674                    index.insert(lookup_val.clone(), pk_val);
1675
1676                    // Track lookup keys so process_event can flush queued account updates
1677                    if let Some(key_str) = lookup_val.as_str() {
1678                        self.last_lookup_index_keys.push(key_str.to_string());
1679                    }
1680
1681                    pc += 1;
1682                }
1683                OpCode::LookupIndex {
1684                    state_id: _,
1685                    index_name,
1686                    lookup_value,
1687                    dest,
1688                } => {
1689                    let actual_state_id = override_state_id;
1690                    let lookup_val = self.registers[*lookup_value].clone();
1691
1692                    let result = {
1693                        let state = self
1694                            .states
1695                            .get(&actual_state_id)
1696                            .ok_or("State table not found")?;
1697
1698                        if let Some(index) = state.lookup_indexes.get(index_name) {
1699                            let found = index.lookup(&lookup_val).unwrap_or(Value::Null);
1700                            #[cfg(feature = "otel")]
1701                            if found.is_null() {
1702                                crate::vm_metrics::record_lookup_index_miss(index_name);
1703                            } else {
1704                                crate::vm_metrics::record_lookup_index_hit(index_name);
1705                            }
1706                            found
1707                        } else {
1708                            Value::Null
1709                        }
1710                    };
1711
1712                    let final_result = if result.is_null() {
1713                        if let Some(pda_str) = lookup_val.as_str() {
1714                            let state = self
1715                                .states
1716                                .get_mut(&actual_state_id)
1717                                .ok_or("State table not found")?;
1718
1719                            if let Some(pda_lookup) =
1720                                state.pda_reverse_lookups.get_mut("default_pda_lookup")
1721                            {
1722                                if let Some(resolved) = pda_lookup.lookup(pda_str) {
1723                                    Value::String(resolved)
1724                                } else {
1725                                    self.last_pda_lookup_miss = Some(pda_str.to_string());
1726                                    Value::Null
1727                                }
1728                            } else {
1729                                self.last_pda_lookup_miss = Some(pda_str.to_string());
1730                                Value::Null
1731                            }
1732                        } else {
1733                            Value::Null
1734                        }
1735                    } else {
1736                        result
1737                    };
1738
1739                    self.registers[*dest] = final_result;
1740                    pc += 1;
1741                }
1742                OpCode::SetFieldSum {
1743                    object,
1744                    path,
1745                    value,
1746                } => {
1747                    let was_updated = self.set_field_sum(*object, path, *value)?;
1748                    if was_updated {
1749                        dirty_tracker.mark_replaced(path);
1750                    }
1751                    pc += 1;
1752                }
1753                OpCode::SetFieldIncrement { object, path } => {
1754                    let was_updated = self.set_field_increment(*object, path)?;
1755                    if was_updated {
1756                        dirty_tracker.mark_replaced(path);
1757                    }
1758                    pc += 1;
1759                }
1760                OpCode::SetFieldMin {
1761                    object,
1762                    path,
1763                    value,
1764                } => {
1765                    let was_updated = self.set_field_min(*object, path, *value)?;
1766                    if was_updated {
1767                        dirty_tracker.mark_replaced(path);
1768                    }
1769                    pc += 1;
1770                }
1771                OpCode::AddToUniqueSet {
1772                    state_id: _,
1773                    set_name,
1774                    value,
1775                    count_object,
1776                    count_path,
1777                } => {
1778                    let value_to_add = self.registers[*value].clone();
1779
1780                    // Store the unique set within the entity object, not in the state table
1781                    // This ensures each entity instance has its own unique set
1782                    let set_field_path = format!("__unique_set:{}", set_name);
1783
1784                    // Get or create the unique set from the entity object
1785                    let mut set: HashSet<Value> =
1786                        if let Ok(existing) = self.get_field(*count_object, &set_field_path) {
1787                            if !existing.is_null() {
1788                                serde_json::from_value(existing).unwrap_or_default()
1789                            } else {
1790                                HashSet::new()
1791                            }
1792                        } else {
1793                            HashSet::new()
1794                        };
1795
1796                    // Add value to set
1797                    let was_new = set.insert(value_to_add);
1798
1799                    // Store updated set back in the entity object
1800                    let set_as_vec: Vec<Value> = set.iter().cloned().collect();
1801                    self.registers[100] = serde_json::to_value(set_as_vec)?;
1802                    self.set_field_auto_vivify(*count_object, &set_field_path, 100)?;
1803
1804                    // Update the count field in the object
1805                    if was_new {
1806                        self.registers[100] = Value::Number(serde_json::Number::from(set.len()));
1807                        self.set_field_auto_vivify(*count_object, count_path, 100)?;
1808                        dirty_tracker.mark_replaced(count_path);
1809                    }
1810
1811                    pc += 1;
1812                }
1813                OpCode::ConditionalSetField {
1814                    object,
1815                    path,
1816                    value,
1817                    condition_field,
1818                    condition_op,
1819                    condition_value,
1820                } => {
1821                    let field_value = self.load_field(event_value, condition_field, None)?;
1822                    let condition_met =
1823                        self.evaluate_comparison(&field_value, condition_op, condition_value)?;
1824
1825                    if condition_met {
1826                        self.set_field_auto_vivify(*object, path, *value)?;
1827                        dirty_tracker.mark_replaced(path);
1828                    }
1829                    pc += 1;
1830                }
1831                OpCode::ConditionalIncrement {
1832                    object,
1833                    path,
1834                    condition_field,
1835                    condition_op,
1836                    condition_value,
1837                } => {
1838                    let field_value = self.load_field(event_value, condition_field, None)?;
1839                    let condition_met =
1840                        self.evaluate_comparison(&field_value, condition_op, condition_value)?;
1841
1842                    if condition_met {
1843                        let was_updated = self.set_field_increment(*object, path)?;
1844                        if was_updated {
1845                            dirty_tracker.mark_replaced(path);
1846                        }
1847                    }
1848                    pc += 1;
1849                }
1850                OpCode::EvaluateComputedFields {
1851                    state,
1852                    computed_paths,
1853                } => {
1854                    if let Some(evaluator) = entity_evaluator {
1855                        let old_values: Vec<_> = computed_paths
1856                            .iter()
1857                            .map(|path| Self::get_value_at_path(&self.registers[*state], path))
1858                            .collect();
1859
1860                        let state_value = &mut self.registers[*state];
1861                        let eval_result = evaluator(state_value);
1862
1863                        if eval_result.is_ok() {
1864                            for (path, old_value) in computed_paths.iter().zip(old_values.iter()) {
1865                                let new_value =
1866                                    Self::get_value_at_path(&self.registers[*state], path);
1867
1868                                if new_value != *old_value {
1869                                    dirty_tracker.mark_replaced(path);
1870                                }
1871                            }
1872                        }
1873                    }
1874                    pc += 1;
1875                }
1876                OpCode::UpdatePdaReverseLookup {
1877                    state_id: _,
1878                    lookup_name,
1879                    pda_address,
1880                    primary_key,
1881                } => {
1882                    let actual_state_id = override_state_id;
1883                    let state = self
1884                        .states
1885                        .get_mut(&actual_state_id)
1886                        .ok_or("State table not found")?;
1887
1888                    let pda_val = self.registers[*pda_address].clone();
1889                    let pk_val = self.registers[*primary_key].clone();
1890
1891                    if let (Some(pda_str), Some(pk_str)) = (pda_val.as_str(), pk_val.as_str()) {
1892                        let pda_lookup = state
1893                            .pda_reverse_lookups
1894                            .entry(lookup_name.clone())
1895                            .or_insert_with(|| {
1896                                PdaReverseLookup::new(DEFAULT_MAX_PDA_REVERSE_LOOKUP_ENTRIES)
1897                            });
1898
1899                        pda_lookup.insert(pda_str.to_string(), pk_str.to_string());
1900                        self.last_pda_registered = Some(pda_str.to_string());
1901                    } else if !pk_val.is_null() {
1902                        if let Some(pk_num) = pk_val.as_u64() {
1903                            if let Some(pda_str) = pda_val.as_str() {
1904                                let pda_lookup = state
1905                                    .pda_reverse_lookups
1906                                    .entry(lookup_name.clone())
1907                                    .or_insert_with(|| {
1908                                        PdaReverseLookup::new(
1909                                            DEFAULT_MAX_PDA_REVERSE_LOOKUP_ENTRIES,
1910                                        )
1911                                    });
1912
1913                                pda_lookup.insert(pda_str.to_string(), pk_num.to_string());
1914                                self.last_pda_registered = Some(pda_str.to_string());
1915                            }
1916                        }
1917                    }
1918
1919                    pc += 1;
1920                }
1921            }
1922
1923            self.instructions_executed += 1;
1924        }
1925
1926        Ok(output)
1927    }
1928
1929    fn load_field(
1930        &self,
1931        event_value: &Value,
1932        path: &FieldPath,
1933        default: Option<&Value>,
1934    ) -> Result<Value> {
1935        if path.segments.is_empty() {
1936            if let Some(obj) = event_value.as_object() {
1937                let filtered: serde_json::Map<String, Value> = obj
1938                    .iter()
1939                    .filter(|(k, _)| !k.starts_with("__"))
1940                    .map(|(k, v)| (k.clone(), v.clone()))
1941                    .collect();
1942                return Ok(Value::Object(filtered));
1943            }
1944            return Ok(event_value.clone());
1945        }
1946
1947        let mut current = event_value;
1948        for segment in path.segments.iter() {
1949            current = match current.get(segment) {
1950                Some(v) => v,
1951                None => return Ok(default.cloned().unwrap_or(Value::Null)),
1952            };
1953        }
1954
1955        Ok(current.clone())
1956    }
1957
1958    fn get_value_at_path(value: &Value, path: &str) -> Option<Value> {
1959        let mut current = value;
1960        for segment in path.split('.') {
1961            current = current.get(segment)?;
1962        }
1963        Some(current.clone())
1964    }
1965
1966    fn set_field_auto_vivify(
1967        &mut self,
1968        object_reg: Register,
1969        path: &str,
1970        value_reg: Register,
1971    ) -> Result<()> {
1972        let compiled = self.get_compiled_path(path);
1973        let segments = compiled.segments();
1974        let value = self.registers[value_reg].clone();
1975
1976        if !self.registers[object_reg].is_object() {
1977            self.registers[object_reg] = json!({});
1978        }
1979
1980        let obj = self.registers[object_reg]
1981            .as_object_mut()
1982            .ok_or("Not an object")?;
1983
1984        let mut current = obj;
1985        for (i, segment) in segments.iter().enumerate() {
1986            if i == segments.len() - 1 {
1987                current.insert(segment.to_string(), value);
1988                return Ok(());
1989            } else {
1990                current
1991                    .entry(segment.to_string())
1992                    .or_insert_with(|| json!({}));
1993                current = current
1994                    .get_mut(segment)
1995                    .and_then(|v| v.as_object_mut())
1996                    .ok_or("Path collision: expected object")?;
1997            }
1998        }
1999
2000        Ok(())
2001    }
2002
2003    fn set_field_if_null(
2004        &mut self,
2005        object_reg: Register,
2006        path: &str,
2007        value_reg: Register,
2008    ) -> Result<bool> {
2009        let compiled = self.get_compiled_path(path);
2010        let segments = compiled.segments();
2011        let value = self.registers[value_reg].clone();
2012
2013        // SetOnce should only set meaningful values. A null source typically means
2014        // the field doesn't exist in this event type (e.g., instruction events don't
2015        // have account data). Skip to preserve any existing value.
2016        if value.is_null() {
2017            return Ok(false);
2018        }
2019
2020        if !self.registers[object_reg].is_object() {
2021            self.registers[object_reg] = json!({});
2022        }
2023
2024        let obj = self.registers[object_reg]
2025            .as_object_mut()
2026            .ok_or("Not an object")?;
2027
2028        let mut current = obj;
2029        for (i, segment) in segments.iter().enumerate() {
2030            if i == segments.len() - 1 {
2031                if !current.contains_key(segment) || current.get(segment).unwrap().is_null() {
2032                    current.insert(segment.to_string(), value);
2033                    return Ok(true);
2034                }
2035                return Ok(false);
2036            } else {
2037                current
2038                    .entry(segment.to_string())
2039                    .or_insert_with(|| json!({}));
2040                current = current
2041                    .get_mut(segment)
2042                    .and_then(|v| v.as_object_mut())
2043                    .ok_or("Path collision: expected object")?;
2044            }
2045        }
2046
2047        Ok(false)
2048    }
2049
2050    fn set_field_max(
2051        &mut self,
2052        object_reg: Register,
2053        path: &str,
2054        value_reg: Register,
2055    ) -> Result<bool> {
2056        let compiled = self.get_compiled_path(path);
2057        let segments = compiled.segments();
2058        let new_value = self.registers[value_reg].clone();
2059
2060        if !self.registers[object_reg].is_object() {
2061            self.registers[object_reg] = json!({});
2062        }
2063
2064        let obj = self.registers[object_reg]
2065            .as_object_mut()
2066            .ok_or("Not an object")?;
2067
2068        let mut current = obj;
2069        for (i, segment) in segments.iter().enumerate() {
2070            if i == segments.len() - 1 {
2071                let should_update = if let Some(current_value) = current.get(segment) {
2072                    if current_value.is_null() {
2073                        true
2074                    } else {
2075                        match (current_value.as_i64(), new_value.as_i64()) {
2076                            (Some(current_val), Some(new_val)) => new_val > current_val,
2077                            (Some(current_val), None) if new_value.as_u64().is_some() => {
2078                                new_value.as_u64().unwrap() as i64 > current_val
2079                            }
2080                            (None, Some(new_val)) if current_value.as_u64().is_some() => {
2081                                new_val > current_value.as_u64().unwrap() as i64
2082                            }
2083                            (None, None) => match (current_value.as_u64(), new_value.as_u64()) {
2084                                (Some(current_val), Some(new_val)) => new_val > current_val,
2085                                _ => match (current_value.as_f64(), new_value.as_f64()) {
2086                                    (Some(current_val), Some(new_val)) => new_val > current_val,
2087                                    _ => false,
2088                                },
2089                            },
2090                            _ => false,
2091                        }
2092                    }
2093                } else {
2094                    true
2095                };
2096
2097                if should_update {
2098                    current.insert(segment.to_string(), new_value);
2099                    return Ok(true);
2100                }
2101                return Ok(false);
2102            } else {
2103                current
2104                    .entry(segment.to_string())
2105                    .or_insert_with(|| json!({}));
2106                current = current
2107                    .get_mut(segment)
2108                    .and_then(|v| v.as_object_mut())
2109                    .ok_or("Path collision: expected object")?;
2110            }
2111        }
2112
2113        Ok(false)
2114    }
2115
2116    fn set_field_sum(
2117        &mut self,
2118        object_reg: Register,
2119        path: &str,
2120        value_reg: Register,
2121    ) -> Result<bool> {
2122        let compiled = self.get_compiled_path(path);
2123        let segments = compiled.segments();
2124        let new_value = &self.registers[value_reg];
2125
2126        // Extract numeric value before borrowing object_reg mutably
2127        let new_val_num = new_value
2128            .as_i64()
2129            .or_else(|| new_value.as_u64().map(|n| n as i64))
2130            .ok_or("Sum requires numeric value")?;
2131
2132        if !self.registers[object_reg].is_object() {
2133            self.registers[object_reg] = json!({});
2134        }
2135
2136        let obj = self.registers[object_reg]
2137            .as_object_mut()
2138            .ok_or("Not an object")?;
2139
2140        let mut current = obj;
2141        for (i, segment) in segments.iter().enumerate() {
2142            if i == segments.len() - 1 {
2143                let current_val = current
2144                    .get(segment)
2145                    .and_then(|v| {
2146                        if v.is_null() {
2147                            None
2148                        } else {
2149                            v.as_i64().or_else(|| v.as_u64().map(|n| n as i64))
2150                        }
2151                    })
2152                    .unwrap_or(0);
2153
2154                let sum = current_val + new_val_num;
2155                current.insert(segment.to_string(), json!(sum));
2156                return Ok(true);
2157            } else {
2158                current
2159                    .entry(segment.to_string())
2160                    .or_insert_with(|| json!({}));
2161                current = current
2162                    .get_mut(segment)
2163                    .and_then(|v| v.as_object_mut())
2164                    .ok_or("Path collision: expected object")?;
2165            }
2166        }
2167
2168        Ok(false)
2169    }
2170
2171    fn set_field_increment(&mut self, object_reg: Register, path: &str) -> Result<bool> {
2172        let compiled = self.get_compiled_path(path);
2173        let segments = compiled.segments();
2174
2175        if !self.registers[object_reg].is_object() {
2176            self.registers[object_reg] = json!({});
2177        }
2178
2179        let obj = self.registers[object_reg]
2180            .as_object_mut()
2181            .ok_or("Not an object")?;
2182
2183        let mut current = obj;
2184        for (i, segment) in segments.iter().enumerate() {
2185            if i == segments.len() - 1 {
2186                // Get current value (default to 0 if null/missing)
2187                let current_val = current
2188                    .get(segment)
2189                    .and_then(|v| {
2190                        if v.is_null() {
2191                            None
2192                        } else {
2193                            v.as_i64().or_else(|| v.as_u64().map(|n| n as i64))
2194                        }
2195                    })
2196                    .unwrap_or(0);
2197
2198                let incremented = current_val + 1;
2199                current.insert(segment.to_string(), json!(incremented));
2200                return Ok(true);
2201            } else {
2202                current
2203                    .entry(segment.to_string())
2204                    .or_insert_with(|| json!({}));
2205                current = current
2206                    .get_mut(segment)
2207                    .and_then(|v| v.as_object_mut())
2208                    .ok_or("Path collision: expected object")?;
2209            }
2210        }
2211
2212        Ok(false)
2213    }
2214
2215    fn set_field_min(
2216        &mut self,
2217        object_reg: Register,
2218        path: &str,
2219        value_reg: Register,
2220    ) -> Result<bool> {
2221        let compiled = self.get_compiled_path(path);
2222        let segments = compiled.segments();
2223        let new_value = self.registers[value_reg].clone();
2224
2225        if !self.registers[object_reg].is_object() {
2226            self.registers[object_reg] = json!({});
2227        }
2228
2229        let obj = self.registers[object_reg]
2230            .as_object_mut()
2231            .ok_or("Not an object")?;
2232
2233        let mut current = obj;
2234        for (i, segment) in segments.iter().enumerate() {
2235            if i == segments.len() - 1 {
2236                let should_update = if let Some(current_value) = current.get(segment) {
2237                    if current_value.is_null() {
2238                        true
2239                    } else {
2240                        match (current_value.as_i64(), new_value.as_i64()) {
2241                            (Some(current_val), Some(new_val)) => new_val < current_val,
2242                            (Some(current_val), None) if new_value.as_u64().is_some() => {
2243                                (new_value.as_u64().unwrap() as i64) < current_val
2244                            }
2245                            (None, Some(new_val)) if current_value.as_u64().is_some() => {
2246                                new_val < current_value.as_u64().unwrap() as i64
2247                            }
2248                            (None, None) => match (current_value.as_u64(), new_value.as_u64()) {
2249                                (Some(current_val), Some(new_val)) => new_val < current_val,
2250                                _ => match (current_value.as_f64(), new_value.as_f64()) {
2251                                    (Some(current_val), Some(new_val)) => new_val < current_val,
2252                                    _ => false,
2253                                },
2254                            },
2255                            _ => false,
2256                        }
2257                    }
2258                } else {
2259                    true
2260                };
2261
2262                if should_update {
2263                    current.insert(segment.to_string(), new_value);
2264                    return Ok(true);
2265                }
2266                return Ok(false);
2267            } else {
2268                current
2269                    .entry(segment.to_string())
2270                    .or_insert_with(|| json!({}));
2271                current = current
2272                    .get_mut(segment)
2273                    .and_then(|v| v.as_object_mut())
2274                    .ok_or("Path collision: expected object")?;
2275            }
2276        }
2277
2278        Ok(false)
2279    }
2280
2281    fn get_field(&mut self, object_reg: Register, path: &str) -> Result<Value> {
2282        let compiled = self.get_compiled_path(path);
2283        let segments = compiled.segments();
2284        let mut current = &self.registers[object_reg];
2285
2286        for segment in segments {
2287            current = current
2288                .get(segment)
2289                .ok_or_else(|| format!("Field not found: {}", segment))?;
2290        }
2291
2292        Ok(current.clone())
2293    }
2294
2295    fn append_to_array(
2296        &mut self,
2297        object_reg: Register,
2298        path: &str,
2299        value_reg: Register,
2300        max_length: usize,
2301    ) -> Result<()> {
2302        let compiled = self.get_compiled_path(path);
2303        let segments = compiled.segments();
2304        let value = self.registers[value_reg].clone();
2305
2306        if !self.registers[object_reg].is_object() {
2307            self.registers[object_reg] = json!({});
2308        }
2309
2310        let obj = self.registers[object_reg]
2311            .as_object_mut()
2312            .ok_or("Not an object")?;
2313
2314        let mut current = obj;
2315        for (i, segment) in segments.iter().enumerate() {
2316            if i == segments.len() - 1 {
2317                current
2318                    .entry(segment.to_string())
2319                    .or_insert_with(|| json!([]));
2320                let arr = current
2321                    .get_mut(segment)
2322                    .and_then(|v| v.as_array_mut())
2323                    .ok_or("Path is not an array")?;
2324                arr.push(value.clone());
2325
2326                if arr.len() > max_length {
2327                    let excess = arr.len() - max_length;
2328                    arr.drain(0..excess);
2329                }
2330            } else {
2331                current
2332                    .entry(segment.to_string())
2333                    .or_insert_with(|| json!({}));
2334                current = current
2335                    .get_mut(segment)
2336                    .and_then(|v| v.as_object_mut())
2337                    .ok_or("Path collision: expected object")?;
2338            }
2339        }
2340
2341        Ok(())
2342    }
2343
2344    fn transform_in_place(&mut self, reg: Register, transformation: &Transformation) -> Result<()> {
2345        let value = &self.registers[reg];
2346        let transformed = self.apply_transformation(value, transformation)?;
2347        self.registers[reg] = transformed;
2348        Ok(())
2349    }
2350
2351    fn apply_transformation(
2352        &self,
2353        value: &Value,
2354        transformation: &Transformation,
2355    ) -> Result<Value> {
2356        match transformation {
2357            Transformation::HexEncode => {
2358                if let Some(arr) = value.as_array() {
2359                    let bytes: Vec<u8> = arr
2360                        .iter()
2361                        .filter_map(|v| v.as_u64().map(|n| n as u8))
2362                        .collect();
2363                    let hex = hex::encode(&bytes);
2364                    Ok(json!(hex))
2365                } else {
2366                    Err("HexEncode requires an array of numbers".into())
2367                }
2368            }
2369            Transformation::HexDecode => {
2370                if let Some(s) = value.as_str() {
2371                    let s = s.strip_prefix("0x").unwrap_or(s);
2372                    let bytes = hex::decode(s).map_err(|e| format!("Hex decode error: {}", e))?;
2373                    Ok(json!(bytes))
2374                } else {
2375                    Err("HexDecode requires a string".into())
2376                }
2377            }
2378            Transformation::Base58Encode => {
2379                if let Some(arr) = value.as_array() {
2380                    let bytes: Vec<u8> = arr
2381                        .iter()
2382                        .filter_map(|v| v.as_u64().map(|n| n as u8))
2383                        .collect();
2384                    let encoded = bs58::encode(&bytes).into_string();
2385                    Ok(json!(encoded))
2386                } else if value.is_string() {
2387                    Ok(value.clone())
2388                } else {
2389                    Err("Base58Encode requires an array of numbers".into())
2390                }
2391            }
2392            Transformation::Base58Decode => {
2393                if let Some(s) = value.as_str() {
2394                    let bytes = bs58::decode(s)
2395                        .into_vec()
2396                        .map_err(|e| format!("Base58 decode error: {}", e))?;
2397                    Ok(json!(bytes))
2398                } else {
2399                    Err("Base58Decode requires a string".into())
2400                }
2401            }
2402            Transformation::ToString => Ok(json!(value.to_string())),
2403            Transformation::ToNumber => {
2404                if let Some(s) = value.as_str() {
2405                    let n = s
2406                        .parse::<i64>()
2407                        .map_err(|e| format!("Parse error: {}", e))?;
2408                    Ok(json!(n))
2409                } else {
2410                    Ok(value.clone())
2411                }
2412            }
2413        }
2414    }
2415
2416    fn evaluate_comparison(
2417        &self,
2418        field_value: &Value,
2419        op: &ComparisonOp,
2420        condition_value: &Value,
2421    ) -> Result<bool> {
2422        use ComparisonOp::*;
2423
2424        match op {
2425            Equal => Ok(field_value == condition_value),
2426            NotEqual => Ok(field_value != condition_value),
2427            GreaterThan => {
2428                // Try to compare as numbers
2429                match (field_value.as_i64(), condition_value.as_i64()) {
2430                    (Some(a), Some(b)) => Ok(a > b),
2431                    _ => match (field_value.as_u64(), condition_value.as_u64()) {
2432                        (Some(a), Some(b)) => Ok(a > b),
2433                        _ => match (field_value.as_f64(), condition_value.as_f64()) {
2434                            (Some(a), Some(b)) => Ok(a > b),
2435                            _ => Err("Cannot compare non-numeric values with GreaterThan".into()),
2436                        },
2437                    },
2438                }
2439            }
2440            GreaterThanOrEqual => match (field_value.as_i64(), condition_value.as_i64()) {
2441                (Some(a), Some(b)) => Ok(a >= b),
2442                _ => match (field_value.as_u64(), condition_value.as_u64()) {
2443                    (Some(a), Some(b)) => Ok(a >= b),
2444                    _ => match (field_value.as_f64(), condition_value.as_f64()) {
2445                        (Some(a), Some(b)) => Ok(a >= b),
2446                        _ => {
2447                            Err("Cannot compare non-numeric values with GreaterThanOrEqual".into())
2448                        }
2449                    },
2450                },
2451            },
2452            LessThan => match (field_value.as_i64(), condition_value.as_i64()) {
2453                (Some(a), Some(b)) => Ok(a < b),
2454                _ => match (field_value.as_u64(), condition_value.as_u64()) {
2455                    (Some(a), Some(b)) => Ok(a < b),
2456                    _ => match (field_value.as_f64(), condition_value.as_f64()) {
2457                        (Some(a), Some(b)) => Ok(a < b),
2458                        _ => Err("Cannot compare non-numeric values with LessThan".into()),
2459                    },
2460                },
2461            },
2462            LessThanOrEqual => match (field_value.as_i64(), condition_value.as_i64()) {
2463                (Some(a), Some(b)) => Ok(a <= b),
2464                _ => match (field_value.as_u64(), condition_value.as_u64()) {
2465                    (Some(a), Some(b)) => Ok(a <= b),
2466                    _ => match (field_value.as_f64(), condition_value.as_f64()) {
2467                        (Some(a), Some(b)) => Ok(a <= b),
2468                        _ => Err("Cannot compare non-numeric values with LessThanOrEqual".into()),
2469                    },
2470                },
2471            },
2472        }
2473    }
2474
2475    /// Update a PDA reverse lookup and return pending updates for reprocessing.
2476    /// Returns any pending account updates that were queued for this PDA.
2477    /// ```ignore
2478    /// let pending = vm.update_pda_reverse_lookup(state_id, lookup_name, pda_addr, seed)?;
2479    /// for update in pending {
2480    ///     vm.process_event(&bytecode, update.account_data, &update.account_type, None, None)?;
2481    /// }
2482    /// ```
2483    #[cfg_attr(feature = "otel", instrument(
2484        name = "vm.update_pda_lookup",
2485        skip(self),
2486        fields(
2487            pda = %pda_address,
2488            seed = %seed_value,
2489        )
2490    ))]
2491    pub fn update_pda_reverse_lookup(
2492        &mut self,
2493        state_id: u32,
2494        lookup_name: &str,
2495        pda_address: String,
2496        seed_value: String,
2497    ) -> Result<Vec<PendingAccountUpdate>> {
2498        let state = self
2499            .states
2500            .get_mut(&state_id)
2501            .ok_or("State table not found")?;
2502
2503        let lookup = state
2504            .pda_reverse_lookups
2505            .entry(lookup_name.to_string())
2506            .or_insert_with(|| PdaReverseLookup::new(DEFAULT_MAX_PDA_REVERSE_LOOKUP_ENTRIES));
2507
2508        let evicted_pda = lookup.insert(pda_address.clone(), seed_value);
2509
2510        if let Some(ref evicted) = evicted_pda {
2511            if let Some((_, evicted_updates)) = state.pending_updates.remove(evicted) {
2512                let count = evicted_updates.len();
2513                self.pending_queue_size = self.pending_queue_size.saturating_sub(count as u64);
2514            }
2515        }
2516
2517        // Flush and return pending updates for this PDA
2518        self.flush_pending_updates(state_id, &pda_address)
2519    }
2520
2521    /// Clean up expired pending updates that are older than the TTL
2522    ///
2523    /// Returns the number of updates that were removed.
2524    /// This should be called periodically to prevent memory leaks from orphaned updates.
2525    pub fn cleanup_expired_pending_updates(&mut self, state_id: u32) -> usize {
2526        let state = match self.states.get_mut(&state_id) {
2527            Some(s) => s,
2528            None => return 0,
2529        };
2530
2531        let now = std::time::SystemTime::now()
2532            .duration_since(std::time::UNIX_EPOCH)
2533            .unwrap()
2534            .as_secs() as i64;
2535
2536        let mut removed_count = 0;
2537
2538        // Iterate through all pending updates and remove expired ones
2539        state.pending_updates.retain(|_pda_address, updates| {
2540            let original_len = updates.len();
2541
2542            updates.retain(|update| {
2543                let age = now - update.queued_at;
2544                age <= PENDING_UPDATE_TTL_SECONDS
2545            });
2546
2547            removed_count += original_len - updates.len();
2548
2549            // Remove the entry entirely if no updates remain
2550            !updates.is_empty()
2551        });
2552
2553        // Update the global counter
2554        self.pending_queue_size = self.pending_queue_size.saturating_sub(removed_count as u64);
2555
2556        if removed_count > 0 {
2557            #[cfg(feature = "otel")]
2558            crate::vm_metrics::record_pending_updates_expired(
2559                removed_count as u64,
2560                &state.entity_name,
2561            );
2562        }
2563
2564        removed_count
2565    }
2566
2567    /// Queue an account update for later processing when PDA reverse lookup is not yet available
2568    ///
2569    /// # Workflow
2570    ///
2571    /// This implements a deferred processing pattern for account updates when the PDA reverse
2572    /// lookup needed to resolve the primary key is not yet available:
2573    ///
2574    /// 1. **Initial Account Update**: When an account update arrives but the PDA reverse lookup
2575    ///    is not available, call `queue_account_update()` to queue it for later.
2576    ///
2577    /// 2. **Register PDA Mapping**: When the instruction that establishes the PDA mapping is
2578    ///    processed, call `update_pda_reverse_lookup()` which returns pending updates.
2579    ///
2580    /// 3. **Reprocess Pending Updates**: Process the returned pending updates through the VM:
2581    ///    ```ignore
2582    ///    let pending = vm.update_pda_reverse_lookup(state_id, lookup_name, pda_addr, seed)?;
2583    ///    for update in pending {
2584    ///        let mutations = vm.process_event(
2585    ///            &bytecode, update.account_data, &update.account_type, None, None
2586    ///        )?;
2587    ///    }
2588    ///    ```
2589    ///
2590    /// # Arguments
2591    ///
2592    /// * `state_id` - The state table ID
2593    /// * `pda_address` - The PDA address that needs reverse lookup
2594    /// * `account_type` - The event type name for reprocessing
2595    /// * `account_data` - The account data (event value) for reprocessing
2596    /// * `slot` - The slot number when this update occurred
2597    /// * `signature` - The transaction signature
2598    #[cfg_attr(feature = "otel", instrument(
2599        name = "vm.queue_account_update",
2600        skip(self, update),
2601        fields(
2602            pda = %update.pda_address,
2603            account_type = %update.account_type,
2604            slot = update.slot,
2605        )
2606    ))]
2607    pub fn queue_account_update(
2608        &mut self,
2609        state_id: u32,
2610        update: QueuedAccountUpdate,
2611    ) -> Result<()> {
2612        if self.pending_queue_size >= MAX_PENDING_UPDATES_TOTAL as u64 {
2613            self.cleanup_expired_pending_updates(state_id);
2614            if self.pending_queue_size >= MAX_PENDING_UPDATES_TOTAL as u64 {
2615                self.drop_oldest_pending_update(state_id)?;
2616            }
2617        }
2618
2619        let state = self
2620            .states
2621            .get_mut(&state_id)
2622            .ok_or("State table not found")?;
2623
2624        let pending = PendingAccountUpdate {
2625            account_type: update.account_type,
2626            pda_address: update.pda_address.clone(),
2627            account_data: update.account_data,
2628            slot: update.slot,
2629            write_version: update.write_version,
2630            signature: update.signature,
2631            queued_at: std::time::SystemTime::now()
2632                .duration_since(std::time::UNIX_EPOCH)
2633                .unwrap()
2634                .as_secs() as i64,
2635        };
2636
2637        let pda_address = pending.pda_address.clone();
2638        let slot = pending.slot;
2639
2640        let mut updates = state
2641            .pending_updates
2642            .entry(pda_address.clone())
2643            .or_insert_with(Vec::new);
2644
2645        let original_len = updates.len();
2646        updates.retain(|existing| existing.slot > slot);
2647        let removed_by_dedup = original_len - updates.len();
2648
2649        if removed_by_dedup > 0 {
2650            self.pending_queue_size = self
2651                .pending_queue_size
2652                .saturating_sub(removed_by_dedup as u64);
2653        }
2654
2655        if updates.len() >= MAX_PENDING_UPDATES_PER_PDA {
2656            updates.remove(0);
2657            self.pending_queue_size = self.pending_queue_size.saturating_sub(1);
2658        }
2659
2660        updates.push(pending);
2661        #[cfg(feature = "otel")]
2662        crate::vm_metrics::record_pending_update_queued(&state.entity_name);
2663
2664        Ok(())
2665    }
2666
2667    pub fn queue_instruction_event(
2668        &mut self,
2669        state_id: u32,
2670        event: QueuedInstructionEvent,
2671    ) -> Result<()> {
2672        let state = self
2673            .states
2674            .get_mut(&state_id)
2675            .ok_or("State table not found")?;
2676
2677        let pda_address = event.pda_address.clone();
2678
2679        let pending = PendingInstructionEvent {
2680            event_type: event.event_type,
2681            pda_address: event.pda_address,
2682            event_data: event.event_data,
2683            slot: event.slot,
2684            signature: event.signature,
2685            queued_at: std::time::SystemTime::now()
2686                .duration_since(std::time::UNIX_EPOCH)
2687                .unwrap()
2688                .as_secs() as i64,
2689        };
2690
2691        let mut events = state
2692            .pending_instruction_events
2693            .entry(pda_address)
2694            .or_insert_with(Vec::new);
2695
2696        if events.len() >= MAX_PENDING_UPDATES_PER_PDA {
2697            events.remove(0);
2698        }
2699
2700        events.push(pending);
2701
2702        Ok(())
2703    }
2704
2705    pub fn take_last_pda_lookup_miss(&mut self) -> Option<String> {
2706        self.last_pda_lookup_miss.take()
2707    }
2708
2709    pub fn take_last_pda_registered(&mut self) -> Option<String> {
2710        self.last_pda_registered.take()
2711    }
2712
2713    pub fn take_last_lookup_index_keys(&mut self) -> Vec<String> {
2714        std::mem::take(&mut self.last_lookup_index_keys)
2715    }
2716
2717    pub fn flush_pending_instruction_events(
2718        &mut self,
2719        state_id: u32,
2720        pda_address: &str,
2721    ) -> Vec<PendingInstructionEvent> {
2722        let state = match self.states.get_mut(&state_id) {
2723            Some(s) => s,
2724            None => return Vec::new(),
2725        };
2726
2727        if let Some((_, events)) = state.pending_instruction_events.remove(pda_address) {
2728            events
2729        } else {
2730            Vec::new()
2731        }
2732    }
2733
2734    /// Get statistics about the pending queue for monitoring
2735    pub fn get_pending_queue_stats(&self, state_id: u32) -> Option<PendingQueueStats> {
2736        let state = self.states.get(&state_id)?;
2737
2738        let now = std::time::SystemTime::now()
2739            .duration_since(std::time::UNIX_EPOCH)
2740            .unwrap()
2741            .as_secs() as i64;
2742
2743        let mut total_updates = 0;
2744        let mut oldest_timestamp = now;
2745        let mut largest_pda_queue = 0;
2746        let mut estimated_memory = 0;
2747
2748        for entry in state.pending_updates.iter() {
2749            let (_, updates) = entry.pair();
2750            total_updates += updates.len();
2751            largest_pda_queue = largest_pda_queue.max(updates.len());
2752
2753            for update in updates.iter() {
2754                oldest_timestamp = oldest_timestamp.min(update.queued_at);
2755                // Rough memory estimate
2756                estimated_memory += update.account_type.len() +
2757                                   update.pda_address.len() +
2758                                   update.signature.len() +
2759                                   16 + // slot + queued_at
2760                                   estimate_json_size(&update.account_data);
2761            }
2762        }
2763
2764        Some(PendingQueueStats {
2765            total_updates,
2766            unique_pdas: state.pending_updates.len(),
2767            oldest_age_seconds: now - oldest_timestamp,
2768            largest_pda_queue_size: largest_pda_queue,
2769            estimated_memory_bytes: estimated_memory,
2770        })
2771    }
2772
2773    pub fn get_memory_stats(&self, state_id: u32) -> VmMemoryStats {
2774        let mut stats = VmMemoryStats {
2775            path_cache_size: self.path_cache.len(),
2776            ..Default::default()
2777        };
2778
2779        if let Some(state) = self.states.get(&state_id) {
2780            stats.state_table_entity_count = state.data.len();
2781            stats.state_table_max_entries = state.config.max_entries;
2782            stats.state_table_at_capacity = state.is_at_capacity();
2783
2784            stats.lookup_index_count = state.lookup_indexes.len();
2785            stats.lookup_index_total_entries =
2786                state.lookup_indexes.values().map(|idx| idx.len()).sum();
2787
2788            stats.temporal_index_count = state.temporal_indexes.len();
2789            stats.temporal_index_total_entries = state
2790                .temporal_indexes
2791                .values()
2792                .map(|idx| idx.total_entries())
2793                .sum();
2794
2795            stats.pda_reverse_lookup_count = state.pda_reverse_lookups.len();
2796            stats.pda_reverse_lookup_total_entries = state
2797                .pda_reverse_lookups
2798                .values()
2799                .map(|lookup| lookup.len())
2800                .sum();
2801
2802            stats.version_tracker_entries = state.version_tracker.len();
2803
2804            stats.pending_queue_stats = self.get_pending_queue_stats(state_id);
2805        }
2806
2807        stats
2808    }
2809
2810    pub fn cleanup_all_expired(&mut self, state_id: u32) -> CleanupResult {
2811        let pending_removed = self.cleanup_expired_pending_updates(state_id);
2812        let temporal_removed = self.cleanup_temporal_indexes(state_id);
2813
2814        #[cfg(feature = "otel")]
2815        if let Some(state) = self.states.get(&state_id) {
2816            crate::vm_metrics::record_cleanup(
2817                pending_removed,
2818                temporal_removed,
2819                &state.entity_name,
2820            );
2821        }
2822
2823        CleanupResult {
2824            pending_updates_removed: pending_removed,
2825            temporal_entries_removed: temporal_removed,
2826        }
2827    }
2828
2829    fn cleanup_temporal_indexes(&mut self, state_id: u32) -> usize {
2830        let state = match self.states.get_mut(&state_id) {
2831            Some(s) => s,
2832            None => return 0,
2833        };
2834
2835        let now = std::time::SystemTime::now()
2836            .duration_since(std::time::UNIX_EPOCH)
2837            .unwrap()
2838            .as_secs() as i64;
2839
2840        let cutoff = now - TEMPORAL_HISTORY_TTL_SECONDS;
2841        let mut total_removed = 0;
2842
2843        for (_, index) in state.temporal_indexes.iter_mut() {
2844            total_removed += index.cleanup_expired(cutoff);
2845        }
2846
2847        total_removed
2848    }
2849
2850    pub fn check_state_table_capacity(&self, state_id: u32) -> Option<CapacityWarning> {
2851        let state = self.states.get(&state_id)?;
2852
2853        if state.is_at_capacity() {
2854            Some(CapacityWarning {
2855                current_entries: state.data.len(),
2856                max_entries: state.config.max_entries,
2857                entries_over_limit: state.entries_over_limit(),
2858            })
2859        } else {
2860            None
2861        }
2862    }
2863
2864    /// Drop the oldest pending update across all PDAs
2865    fn drop_oldest_pending_update(&mut self, state_id: u32) -> Result<()> {
2866        let state = self
2867            .states
2868            .get_mut(&state_id)
2869            .ok_or("State table not found")?;
2870
2871        let mut oldest_pda: Option<String> = None;
2872        let mut oldest_timestamp = i64::MAX;
2873
2874        // Find the PDA with the oldest update
2875        for entry in state.pending_updates.iter() {
2876            let (pda, updates) = entry.pair();
2877            if let Some(update) = updates.first() {
2878                if update.queued_at < oldest_timestamp {
2879                    oldest_timestamp = update.queued_at;
2880                    oldest_pda = Some(pda.clone());
2881                }
2882            }
2883        }
2884
2885        // Remove the oldest update
2886        if let Some(pda) = oldest_pda {
2887            if let Some(mut updates) = state.pending_updates.get_mut(&pda) {
2888                if !updates.is_empty() {
2889                    updates.remove(0);
2890                    self.pending_queue_size = self.pending_queue_size.saturating_sub(1);
2891
2892                    // Remove the entry if it's now empty
2893                    if updates.is_empty() {
2894                        drop(updates);
2895                        state.pending_updates.remove(&pda);
2896                    }
2897                }
2898            }
2899        }
2900
2901        Ok(())
2902    }
2903
2904    /// Flush and return pending updates for a PDA for external reprocessing
2905    ///
2906    /// Returns the pending updates that were queued for this PDA address.
2907    /// The caller should reprocess these through the VM using process_event().
2908    fn flush_pending_updates(
2909        &mut self,
2910        state_id: u32,
2911        pda_address: &str,
2912    ) -> Result<Vec<PendingAccountUpdate>> {
2913        let state = self
2914            .states
2915            .get_mut(&state_id)
2916            .ok_or("State table not found")?;
2917
2918        if let Some((_, pending_updates)) = state.pending_updates.remove(pda_address) {
2919            let count = pending_updates.len();
2920            self.pending_queue_size = self.pending_queue_size.saturating_sub(count as u64);
2921            #[cfg(feature = "otel")]
2922            crate::vm_metrics::record_pending_updates_flushed(count as u64, &state.entity_name);
2923            Ok(pending_updates)
2924        } else {
2925            Ok(Vec::new())
2926        }
2927    }
2928
2929    /// Try to resolve a primary key via PDA reverse lookup
2930    pub fn try_pda_reverse_lookup(
2931        &mut self,
2932        state_id: u32,
2933        lookup_name: &str,
2934        pda_address: &str,
2935    ) -> Option<String> {
2936        let state = self.states.get_mut(&state_id)?;
2937
2938        if let Some(lookup) = state.pda_reverse_lookups.get_mut(lookup_name) {
2939            if let Some(value) = lookup.lookup(pda_address) {
2940                self.pda_cache_hits += 1;
2941                return Some(value);
2942            }
2943        }
2944
2945        self.pda_cache_misses += 1;
2946        None
2947    }
2948
2949    // ============================================================================
2950    // Computed Expression Evaluator (Task 5)
2951    // ============================================================================
2952
2953    /// Evaluate a computed expression AST against the current state
2954    /// This is the core runtime evaluator for computed fields from the AST
2955    pub fn evaluate_computed_expr(&self, expr: &ComputedExpr, state: &Value) -> Result<Value> {
2956        self.evaluate_computed_expr_with_env(expr, state, &std::collections::HashMap::new())
2957    }
2958
2959    /// Evaluate a computed expression with a variable environment (for let bindings)
2960    fn evaluate_computed_expr_with_env(
2961        &self,
2962        expr: &ComputedExpr,
2963        state: &Value,
2964        env: &std::collections::HashMap<String, Value>,
2965    ) -> Result<Value> {
2966        match expr {
2967            ComputedExpr::FieldRef { path } => self.get_field_from_state(state, path),
2968
2969            ComputedExpr::Var { name } => env
2970                .get(name)
2971                .cloned()
2972                .ok_or_else(|| format!("Undefined variable: {}", name).into()),
2973
2974            ComputedExpr::Let { name, value, body } => {
2975                let val = self.evaluate_computed_expr_with_env(value, state, env)?;
2976                let mut new_env = env.clone();
2977                new_env.insert(name.clone(), val);
2978                self.evaluate_computed_expr_with_env(body, state, &new_env)
2979            }
2980
2981            ComputedExpr::If {
2982                condition,
2983                then_branch,
2984                else_branch,
2985            } => {
2986                let cond_val = self.evaluate_computed_expr_with_env(condition, state, env)?;
2987                if self.value_to_bool(&cond_val) {
2988                    self.evaluate_computed_expr_with_env(then_branch, state, env)
2989                } else {
2990                    self.evaluate_computed_expr_with_env(else_branch, state, env)
2991                }
2992            }
2993
2994            ComputedExpr::None => Ok(Value::Null),
2995
2996            ComputedExpr::Some { value } => self.evaluate_computed_expr_with_env(value, state, env),
2997
2998            ComputedExpr::Slice { expr, start, end } => {
2999                let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
3000                match val {
3001                    Value::Array(arr) => {
3002                        let slice: Vec<Value> = arr.get(*start..*end).unwrap_or(&[]).to_vec();
3003                        Ok(Value::Array(slice))
3004                    }
3005                    _ => Err(format!("Cannot slice non-array value: {:?}", val).into()),
3006                }
3007            }
3008
3009            ComputedExpr::Index { expr, index } => {
3010                let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
3011                match val {
3012                    Value::Array(arr) => Ok(arr.get(*index).cloned().unwrap_or(Value::Null)),
3013                    _ => Err(format!("Cannot index non-array value: {:?}", val).into()),
3014                }
3015            }
3016
3017            ComputedExpr::U64FromLeBytes { bytes } => {
3018                let val = self.evaluate_computed_expr_with_env(bytes, state, env)?;
3019                let byte_vec = self.value_to_bytes(&val)?;
3020                if byte_vec.len() < 8 {
3021                    return Err(format!(
3022                        "u64::from_le_bytes requires 8 bytes, got {}",
3023                        byte_vec.len()
3024                    )
3025                    .into());
3026                }
3027                let arr: [u8; 8] = byte_vec[..8]
3028                    .try_into()
3029                    .map_err(|_| "Failed to convert to [u8; 8]")?;
3030                Ok(json!(u64::from_le_bytes(arr)))
3031            }
3032
3033            ComputedExpr::U64FromBeBytes { bytes } => {
3034                let val = self.evaluate_computed_expr_with_env(bytes, state, env)?;
3035                let byte_vec = self.value_to_bytes(&val)?;
3036                if byte_vec.len() < 8 {
3037                    return Err(format!(
3038                        "u64::from_be_bytes requires 8 bytes, got {}",
3039                        byte_vec.len()
3040                    )
3041                    .into());
3042                }
3043                let arr: [u8; 8] = byte_vec[..8]
3044                    .try_into()
3045                    .map_err(|_| "Failed to convert to [u8; 8]")?;
3046                Ok(json!(u64::from_be_bytes(arr)))
3047            }
3048
3049            ComputedExpr::ByteArray { bytes } => {
3050                Ok(Value::Array(bytes.iter().map(|b| json!(*b)).collect()))
3051            }
3052
3053            ComputedExpr::Closure { param, body } => {
3054                // Closures are stored as-is; they're evaluated when used in map()
3055                // Return a special representation
3056                Ok(json!({
3057                    "__closure": {
3058                        "param": param,
3059                        "body": serde_json::to_value(body).unwrap_or(Value::Null)
3060                    }
3061                }))
3062            }
3063
3064            ComputedExpr::Unary { op, expr } => {
3065                let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
3066                self.apply_unary_op(op, &val)
3067            }
3068
3069            ComputedExpr::JsonToBytes { expr } => {
3070                let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
3071                // Convert JSON array of numbers to byte array
3072                let bytes = self.value_to_bytes(&val)?;
3073                Ok(Value::Array(bytes.iter().map(|b| json!(*b)).collect()))
3074            }
3075
3076            ComputedExpr::UnwrapOr { expr, default } => {
3077                let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
3078                if val.is_null() {
3079                    Ok(default.clone())
3080                } else {
3081                    Ok(val)
3082                }
3083            }
3084
3085            ComputedExpr::Binary { op, left, right } => {
3086                let l = self.evaluate_computed_expr_with_env(left, state, env)?;
3087                let r = self.evaluate_computed_expr_with_env(right, state, env)?;
3088                self.apply_binary_op(op, &l, &r)
3089            }
3090
3091            ComputedExpr::Cast { expr, to_type } => {
3092                let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
3093                self.apply_cast(&val, to_type)
3094            }
3095
3096            ComputedExpr::MethodCall { expr, method, args } => {
3097                let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
3098                // Special handling for map() with closures
3099                if method == "map" && args.len() == 1 {
3100                    if let ComputedExpr::Closure { param, body } = &args[0] {
3101                        // If the value is null, return null (Option::None.map returns None)
3102                        if val.is_null() {
3103                            return Ok(Value::Null);
3104                        }
3105                        // Evaluate the closure body with the value bound to param
3106                        let mut closure_env = env.clone();
3107                        closure_env.insert(param.clone(), val);
3108                        return self.evaluate_computed_expr_with_env(body, state, &closure_env);
3109                    }
3110                }
3111                let evaluated_args: Vec<Value> = args
3112                    .iter()
3113                    .map(|a| self.evaluate_computed_expr_with_env(a, state, env))
3114                    .collect::<Result<Vec<_>>>()?;
3115                self.apply_method_call(&val, method, &evaluated_args)
3116            }
3117
3118            ComputedExpr::Literal { value } => Ok(value.clone()),
3119
3120            ComputedExpr::Paren { expr } => self.evaluate_computed_expr_with_env(expr, state, env),
3121        }
3122    }
3123
3124    /// Convert a JSON value to a byte vector
3125    fn value_to_bytes(&self, val: &Value) -> Result<Vec<u8>> {
3126        match val {
3127            Value::Array(arr) => arr
3128                .iter()
3129                .map(|v| {
3130                    v.as_u64()
3131                        .map(|n| n as u8)
3132                        .ok_or_else(|| "Array element not a valid byte".into())
3133                })
3134                .collect(),
3135            Value::String(s) => {
3136                // Try to decode as hex
3137                if s.starts_with("0x") || s.starts_with("0X") {
3138                    hex::decode(&s[2..]).map_err(|e| format!("Invalid hex string: {}", e).into())
3139                } else {
3140                    hex::decode(s).map_err(|e| format!("Invalid hex string: {}", e).into())
3141                }
3142            }
3143            _ => Err(format!("Cannot convert {:?} to bytes", val).into()),
3144        }
3145    }
3146
3147    /// Apply a unary operation
3148    fn apply_unary_op(&self, op: &crate::ast::UnaryOp, val: &Value) -> Result<Value> {
3149        use crate::ast::UnaryOp;
3150        match op {
3151            UnaryOp::Not => Ok(json!(!self.value_to_bool(val))),
3152            UnaryOp::ReverseBits => match val.as_u64() {
3153                Some(n) => Ok(json!(n.reverse_bits())),
3154                None => match val.as_i64() {
3155                    Some(n) => Ok(json!((n as u64).reverse_bits())),
3156                    None => Err("reverse_bits requires an integer".into()),
3157                },
3158            },
3159        }
3160    }
3161
3162    /// Get a field value from state by path (e.g., "section.field" or just "field")
3163    fn get_field_from_state(&self, state: &Value, path: &str) -> Result<Value> {
3164        let segments: Vec<&str> = path.split('.').collect();
3165        let mut current = state;
3166
3167        for segment in segments {
3168            match current.get(segment) {
3169                Some(v) => current = v,
3170                None => return Ok(Value::Null),
3171            }
3172        }
3173
3174        Ok(current.clone())
3175    }
3176
3177    /// Apply a binary operation to two values
3178    fn apply_binary_op(&self, op: &BinaryOp, left: &Value, right: &Value) -> Result<Value> {
3179        match op {
3180            // Arithmetic operations
3181            BinaryOp::Add => self.numeric_op(left, right, |a, b| a + b, |a, b| a + b),
3182            BinaryOp::Sub => self.numeric_op(left, right, |a, b| a - b, |a, b| a - b),
3183            BinaryOp::Mul => self.numeric_op(left, right, |a, b| a * b, |a, b| a * b),
3184            BinaryOp::Div => {
3185                // Check for division by zero
3186                if let Some(r) = right.as_i64() {
3187                    if r == 0 {
3188                        return Err("Division by zero".into());
3189                    }
3190                }
3191                if let Some(r) = right.as_f64() {
3192                    if r == 0.0 {
3193                        return Err("Division by zero".into());
3194                    }
3195                }
3196                self.numeric_op(left, right, |a, b| a / b, |a, b| a / b)
3197            }
3198            BinaryOp::Mod => {
3199                // Modulo - only for integers
3200                match (left.as_i64(), right.as_i64()) {
3201                    (Some(a), Some(b)) if b != 0 => Ok(json!(a % b)),
3202                    (None, _) | (_, None) => match (left.as_u64(), right.as_u64()) {
3203                        (Some(a), Some(b)) if b != 0 => Ok(json!(a % b)),
3204                        _ => Err("Modulo requires non-zero integer operands".into()),
3205                    },
3206                    _ => Err("Modulo by zero".into()),
3207                }
3208            }
3209
3210            // Comparison operations
3211            BinaryOp::Gt => self.comparison_op(left, right, |a, b| a > b, |a, b| a > b),
3212            BinaryOp::Lt => self.comparison_op(left, right, |a, b| a < b, |a, b| a < b),
3213            BinaryOp::Gte => self.comparison_op(left, right, |a, b| a >= b, |a, b| a >= b),
3214            BinaryOp::Lte => self.comparison_op(left, right, |a, b| a <= b, |a, b| a <= b),
3215            BinaryOp::Eq => Ok(json!(left == right)),
3216            BinaryOp::Ne => Ok(json!(left != right)),
3217
3218            // Logical operations
3219            BinaryOp::And => {
3220                let l_bool = self.value_to_bool(left);
3221                let r_bool = self.value_to_bool(right);
3222                Ok(json!(l_bool && r_bool))
3223            }
3224            BinaryOp::Or => {
3225                let l_bool = self.value_to_bool(left);
3226                let r_bool = self.value_to_bool(right);
3227                Ok(json!(l_bool || r_bool))
3228            }
3229
3230            // Bitwise operations
3231            BinaryOp::Xor => match (left.as_u64(), right.as_u64()) {
3232                (Some(a), Some(b)) => Ok(json!(a ^ b)),
3233                _ => match (left.as_i64(), right.as_i64()) {
3234                    (Some(a), Some(b)) => Ok(json!(a ^ b)),
3235                    _ => Err("XOR requires integer operands".into()),
3236                },
3237            },
3238            BinaryOp::BitAnd => match (left.as_u64(), right.as_u64()) {
3239                (Some(a), Some(b)) => Ok(json!(a & b)),
3240                _ => match (left.as_i64(), right.as_i64()) {
3241                    (Some(a), Some(b)) => Ok(json!(a & b)),
3242                    _ => Err("BitAnd requires integer operands".into()),
3243                },
3244            },
3245            BinaryOp::BitOr => match (left.as_u64(), right.as_u64()) {
3246                (Some(a), Some(b)) => Ok(json!(a | b)),
3247                _ => match (left.as_i64(), right.as_i64()) {
3248                    (Some(a), Some(b)) => Ok(json!(a | b)),
3249                    _ => Err("BitOr requires integer operands".into()),
3250                },
3251            },
3252            BinaryOp::Shl => match (left.as_u64(), right.as_u64()) {
3253                (Some(a), Some(b)) => Ok(json!(a << b)),
3254                _ => match (left.as_i64(), right.as_i64()) {
3255                    (Some(a), Some(b)) => Ok(json!(a << b)),
3256                    _ => Err("Shl requires integer operands".into()),
3257                },
3258            },
3259            BinaryOp::Shr => match (left.as_u64(), right.as_u64()) {
3260                (Some(a), Some(b)) => Ok(json!(a >> b)),
3261                _ => match (left.as_i64(), right.as_i64()) {
3262                    (Some(a), Some(b)) => Ok(json!(a >> b)),
3263                    _ => Err("Shr requires integer operands".into()),
3264                },
3265            },
3266        }
3267    }
3268
3269    /// Helper for numeric operations that can work on integers or floats
3270    fn numeric_op<F1, F2>(
3271        &self,
3272        left: &Value,
3273        right: &Value,
3274        int_op: F1,
3275        float_op: F2,
3276    ) -> Result<Value>
3277    where
3278        F1: Fn(i64, i64) -> i64,
3279        F2: Fn(f64, f64) -> f64,
3280    {
3281        // Try i64 first
3282        if let (Some(a), Some(b)) = (left.as_i64(), right.as_i64()) {
3283            return Ok(json!(int_op(a, b)));
3284        }
3285
3286        // Try u64
3287        if let (Some(a), Some(b)) = (left.as_u64(), right.as_u64()) {
3288            // For u64, we need to be careful with underflow in subtraction
3289            return Ok(json!(int_op(a as i64, b as i64)));
3290        }
3291
3292        // Try f64
3293        if let (Some(a), Some(b)) = (left.as_f64(), right.as_f64()) {
3294            return Ok(json!(float_op(a, b)));
3295        }
3296
3297        // If either is null, return null
3298        if left.is_null() || right.is_null() {
3299            return Ok(Value::Null);
3300        }
3301
3302        Err(format!(
3303            "Cannot perform numeric operation on {:?} and {:?}",
3304            left, right
3305        )
3306        .into())
3307    }
3308
3309    /// Helper for comparison operations
3310    fn comparison_op<F1, F2>(
3311        &self,
3312        left: &Value,
3313        right: &Value,
3314        int_cmp: F1,
3315        float_cmp: F2,
3316    ) -> Result<Value>
3317    where
3318        F1: Fn(i64, i64) -> bool,
3319        F2: Fn(f64, f64) -> bool,
3320    {
3321        // Try i64 first
3322        if let (Some(a), Some(b)) = (left.as_i64(), right.as_i64()) {
3323            return Ok(json!(int_cmp(a, b)));
3324        }
3325
3326        // Try u64
3327        if let (Some(a), Some(b)) = (left.as_u64(), right.as_u64()) {
3328            return Ok(json!(int_cmp(a as i64, b as i64)));
3329        }
3330
3331        // Try f64
3332        if let (Some(a), Some(b)) = (left.as_f64(), right.as_f64()) {
3333            return Ok(json!(float_cmp(a, b)));
3334        }
3335
3336        // If either is null, comparison returns false
3337        if left.is_null() || right.is_null() {
3338            return Ok(json!(false));
3339        }
3340
3341        Err(format!("Cannot compare {:?} and {:?}", left, right).into())
3342    }
3343
3344    /// Convert a value to boolean for logical operations
3345    fn value_to_bool(&self, value: &Value) -> bool {
3346        match value {
3347            Value::Null => false,
3348            Value::Bool(b) => *b,
3349            Value::Number(n) => {
3350                if let Some(i) = n.as_i64() {
3351                    i != 0
3352                } else if let Some(f) = n.as_f64() {
3353                    f != 0.0
3354                } else {
3355                    true
3356                }
3357            }
3358            Value::String(s) => !s.is_empty(),
3359            Value::Array(arr) => !arr.is_empty(),
3360            Value::Object(obj) => !obj.is_empty(),
3361        }
3362    }
3363
3364    /// Apply a type cast to a value
3365    fn apply_cast(&self, value: &Value, to_type: &str) -> Result<Value> {
3366        match to_type {
3367            "i8" | "i16" | "i32" | "i64" | "isize" => {
3368                if let Some(n) = value.as_i64() {
3369                    Ok(json!(n))
3370                } else if let Some(n) = value.as_u64() {
3371                    Ok(json!(n as i64))
3372                } else if let Some(n) = value.as_f64() {
3373                    Ok(json!(n as i64))
3374                } else if let Some(s) = value.as_str() {
3375                    s.parse::<i64>()
3376                        .map(|n| json!(n))
3377                        .map_err(|e| format!("Cannot parse '{}' as integer: {}", s, e).into())
3378                } else {
3379                    Err(format!("Cannot cast {:?} to {}", value, to_type).into())
3380                }
3381            }
3382            "u8" | "u16" | "u32" | "u64" | "usize" => {
3383                if let Some(n) = value.as_u64() {
3384                    Ok(json!(n))
3385                } else if let Some(n) = value.as_i64() {
3386                    Ok(json!(n as u64))
3387                } else if let Some(n) = value.as_f64() {
3388                    Ok(json!(n as u64))
3389                } else if let Some(s) = value.as_str() {
3390                    s.parse::<u64>().map(|n| json!(n)).map_err(|e| {
3391                        format!("Cannot parse '{}' as unsigned integer: {}", s, e).into()
3392                    })
3393                } else {
3394                    Err(format!("Cannot cast {:?} to {}", value, to_type).into())
3395                }
3396            }
3397            "f32" | "f64" => {
3398                if let Some(n) = value.as_f64() {
3399                    Ok(json!(n))
3400                } else if let Some(n) = value.as_i64() {
3401                    Ok(json!(n as f64))
3402                } else if let Some(n) = value.as_u64() {
3403                    Ok(json!(n as f64))
3404                } else if let Some(s) = value.as_str() {
3405                    s.parse::<f64>()
3406                        .map(|n| json!(n))
3407                        .map_err(|e| format!("Cannot parse '{}' as float: {}", s, e).into())
3408                } else {
3409                    Err(format!("Cannot cast {:?} to {}", value, to_type).into())
3410                }
3411            }
3412            "String" | "string" => Ok(json!(value.to_string())),
3413            "bool" => Ok(json!(self.value_to_bool(value))),
3414            _ => {
3415                // Unknown type, return value as-is
3416                Ok(value.clone())
3417            }
3418        }
3419    }
3420
3421    /// Apply a method call to a value
3422    fn apply_method_call(&self, value: &Value, method: &str, args: &[Value]) -> Result<Value> {
3423        match method {
3424            "unwrap_or" => {
3425                if value.is_null() && !args.is_empty() {
3426                    Ok(args[0].clone())
3427                } else {
3428                    Ok(value.clone())
3429                }
3430            }
3431            "unwrap_or_default" => {
3432                if value.is_null() {
3433                    // Return default for common types
3434                    Ok(json!(0))
3435                } else {
3436                    Ok(value.clone())
3437                }
3438            }
3439            "is_some" => Ok(json!(!value.is_null())),
3440            "is_none" => Ok(json!(value.is_null())),
3441            "abs" => {
3442                if let Some(n) = value.as_i64() {
3443                    Ok(json!(n.abs()))
3444                } else if let Some(n) = value.as_f64() {
3445                    Ok(json!(n.abs()))
3446                } else {
3447                    Err(format!("Cannot call abs() on {:?}", value).into())
3448                }
3449            }
3450            "len" => {
3451                if let Some(s) = value.as_str() {
3452                    Ok(json!(s.len()))
3453                } else if let Some(arr) = value.as_array() {
3454                    Ok(json!(arr.len()))
3455                } else if let Some(obj) = value.as_object() {
3456                    Ok(json!(obj.len()))
3457                } else {
3458                    Err(format!("Cannot call len() on {:?}", value).into())
3459                }
3460            }
3461            "to_string" => Ok(json!(value.to_string())),
3462            "min" => {
3463                if args.is_empty() {
3464                    return Err("min() requires an argument".into());
3465                }
3466                let other = &args[0];
3467                if let (Some(a), Some(b)) = (value.as_i64(), other.as_i64()) {
3468                    Ok(json!(a.min(b)))
3469                } else if let (Some(a), Some(b)) = (value.as_f64(), other.as_f64()) {
3470                    Ok(json!(a.min(b)))
3471                } else {
3472                    Err(format!("Cannot call min() on {:?} and {:?}", value, other).into())
3473                }
3474            }
3475            "max" => {
3476                if args.is_empty() {
3477                    return Err("max() requires an argument".into());
3478                }
3479                let other = &args[0];
3480                if let (Some(a), Some(b)) = (value.as_i64(), other.as_i64()) {
3481                    Ok(json!(a.max(b)))
3482                } else if let (Some(a), Some(b)) = (value.as_f64(), other.as_f64()) {
3483                    Ok(json!(a.max(b)))
3484                } else {
3485                    Err(format!("Cannot call max() on {:?} and {:?}", value, other).into())
3486                }
3487            }
3488            "saturating_add" => {
3489                if args.is_empty() {
3490                    return Err("saturating_add() requires an argument".into());
3491                }
3492                let other = &args[0];
3493                if let (Some(a), Some(b)) = (value.as_i64(), other.as_i64()) {
3494                    Ok(json!(a.saturating_add(b)))
3495                } else if let (Some(a), Some(b)) = (value.as_u64(), other.as_u64()) {
3496                    Ok(json!(a.saturating_add(b)))
3497                } else {
3498                    Err(format!(
3499                        "Cannot call saturating_add() on {:?} and {:?}",
3500                        value, other
3501                    )
3502                    .into())
3503                }
3504            }
3505            "saturating_sub" => {
3506                if args.is_empty() {
3507                    return Err("saturating_sub() requires an argument".into());
3508                }
3509                let other = &args[0];
3510                if let (Some(a), Some(b)) = (value.as_i64(), other.as_i64()) {
3511                    Ok(json!(a.saturating_sub(b)))
3512                } else if let (Some(a), Some(b)) = (value.as_u64(), other.as_u64()) {
3513                    Ok(json!(a.saturating_sub(b)))
3514                } else {
3515                    Err(format!(
3516                        "Cannot call saturating_sub() on {:?} and {:?}",
3517                        value, other
3518                    )
3519                    .into())
3520                }
3521            }
3522            _ => Err(format!("Unknown method call: {}()", method).into()),
3523        }
3524    }
3525
3526    /// Evaluate all computed fields for an entity and update the state
3527    /// This takes a list of ComputedFieldSpec from the AST and applies them
3528    pub fn evaluate_computed_fields_from_ast(
3529        &self,
3530        state: &mut Value,
3531        computed_field_specs: &[ComputedFieldSpec],
3532    ) -> Result<Vec<String>> {
3533        let mut updated_paths = Vec::new();
3534
3535        for spec in computed_field_specs {
3536            if let Ok(result) = self.evaluate_computed_expr(&spec.expression, state) {
3537                self.set_field_in_state(state, &spec.target_path, result)?;
3538                updated_paths.push(spec.target_path.clone());
3539            }
3540        }
3541
3542        Ok(updated_paths)
3543    }
3544
3545    /// Set a field value in state by path (e.g., "section.field")
3546    fn set_field_in_state(&self, state: &mut Value, path: &str, value: Value) -> Result<()> {
3547        let segments: Vec<&str> = path.split('.').collect();
3548
3549        if segments.is_empty() {
3550            return Err("Empty path".into());
3551        }
3552
3553        // Navigate to parent, creating intermediate objects as needed
3554        let mut current = state;
3555        for (i, segment) in segments.iter().enumerate() {
3556            if i == segments.len() - 1 {
3557                // Last segment - set the value
3558                if let Some(obj) = current.as_object_mut() {
3559                    obj.insert(segment.to_string(), value);
3560                    return Ok(());
3561                } else {
3562                    return Err(format!("Cannot set field '{}' on non-object", segment).into());
3563                }
3564            } else {
3565                // Intermediate segment - navigate or create
3566                if !current.is_object() {
3567                    *current = json!({});
3568                }
3569                let obj = current.as_object_mut().unwrap();
3570                current = obj.entry(segment.to_string()).or_insert_with(|| json!({}));
3571            }
3572        }
3573
3574        Ok(())
3575    }
3576
3577    /// Create a computed fields evaluator closure from AST specs
3578    /// This returns a function that can be passed to the bytecode builder
3579    pub fn create_evaluator_from_specs(
3580        specs: Vec<ComputedFieldSpec>,
3581    ) -> impl Fn(&mut Value) -> Result<()> + Send + Sync + 'static {
3582        move |state: &mut Value| {
3583            // Create a temporary VmContext just for evaluation
3584            // (We only need the expression evaluation methods)
3585            let vm = VmContext::new();
3586            vm.evaluate_computed_fields_from_ast(state, &specs)?;
3587            Ok(())
3588        }
3589    }
3590}
3591
3592impl Default for VmContext {
3593    fn default() -> Self {
3594        Self::new()
3595    }
3596}
3597
3598// Implement the ReverseLookupUpdater trait for VmContext
3599impl crate::resolvers::ReverseLookupUpdater for VmContext {
3600    fn update(&mut self, pda_address: String, seed_value: String) -> Vec<PendingAccountUpdate> {
3601        // Use default state_id=0 and default lookup name
3602        self.update_pda_reverse_lookup(0, "default_pda_lookup", pda_address, seed_value)
3603            .unwrap_or_else(|e| {
3604                tracing::error!("Failed to update PDA reverse lookup: {}", e);
3605                Vec::new()
3606            })
3607    }
3608
3609    fn flush_pending(&mut self, pda_address: &str) -> Vec<PendingAccountUpdate> {
3610        // Flush is handled inside update_pda_reverse_lookup, but we can also call it directly
3611        self.flush_pending_updates(0, pda_address)
3612            .unwrap_or_else(|e| {
3613                tracing::error!("Failed to flush pending updates: {}", e);
3614                Vec::new()
3615            })
3616    }
3617}
3618
3619#[cfg(test)]
3620mod tests {
3621    use super::*;
3622    use crate::ast::{BinaryOp, ComputedExpr, ComputedFieldSpec};
3623
3624    #[test]
3625    fn test_computed_field_preserves_integer_type() {
3626        let vm = VmContext::new();
3627
3628        let mut state = serde_json::json!({
3629            "trading": {
3630                "total_buy_volume": 20000000000_i64,
3631                "total_sell_volume": 17951316474_i64
3632            }
3633        });
3634
3635        let spec = ComputedFieldSpec {
3636            target_path: "trading.total_volume".to_string(),
3637            result_type: "Option<u64>".to_string(),
3638            expression: ComputedExpr::Binary {
3639                op: BinaryOp::Add,
3640                left: Box::new(ComputedExpr::UnwrapOr {
3641                    expr: Box::new(ComputedExpr::FieldRef {
3642                        path: "trading.total_buy_volume".to_string(),
3643                    }),
3644                    default: serde_json::json!(0),
3645                }),
3646                right: Box::new(ComputedExpr::UnwrapOr {
3647                    expr: Box::new(ComputedExpr::FieldRef {
3648                        path: "trading.total_sell_volume".to_string(),
3649                    }),
3650                    default: serde_json::json!(0),
3651                }),
3652            },
3653        };
3654
3655        vm.evaluate_computed_fields_from_ast(&mut state, &[spec])
3656            .unwrap();
3657
3658        let total_volume = state
3659            .get("trading")
3660            .and_then(|t| t.get("total_volume"))
3661            .expect("total_volume should exist");
3662
3663        let serialized = serde_json::to_string(total_volume).unwrap();
3664        assert!(
3665            !serialized.contains('.'),
3666            "Integer should not have decimal point: {}",
3667            serialized
3668        );
3669        assert_eq!(
3670            total_volume.as_i64(),
3671            Some(37951316474),
3672            "Value should be correct sum"
3673        );
3674    }
3675
3676    #[test]
3677    fn test_set_field_sum_preserves_integer_type() {
3678        let mut vm = VmContext::new();
3679        vm.registers[0] = serde_json::json!({});
3680        vm.registers[1] = serde_json::json!(20000000000_i64);
3681        vm.registers[2] = serde_json::json!(17951316474_i64);
3682
3683        vm.set_field_sum(0, "trading.total_buy_volume", 1).unwrap();
3684        vm.set_field_sum(0, "trading.total_sell_volume", 2).unwrap();
3685
3686        let state = &vm.registers[0];
3687        let buy_vol = state
3688            .get("trading")
3689            .and_then(|t| t.get("total_buy_volume"))
3690            .unwrap();
3691        let sell_vol = state
3692            .get("trading")
3693            .and_then(|t| t.get("total_sell_volume"))
3694            .unwrap();
3695
3696        let buy_serialized = serde_json::to_string(buy_vol).unwrap();
3697        let sell_serialized = serde_json::to_string(sell_vol).unwrap();
3698
3699        assert!(
3700            !buy_serialized.contains('.'),
3701            "Buy volume should not have decimal: {}",
3702            buy_serialized
3703        );
3704        assert!(
3705            !sell_serialized.contains('.'),
3706            "Sell volume should not have decimal: {}",
3707            sell_serialized
3708        );
3709    }
3710}