Skip to main content

hyperstack_interpreter/
vm.rs

1use crate::ast::{
2    BinaryOp, ComparisonOp, ComputedExpr, ComputedFieldSpec, FieldPath, Transformation,
3};
4use crate::compiler::{MultiEntityBytecode, OpCode};
5use crate::Mutation;
6use dashmap::DashMap;
7use lru::LruCache;
8use serde_json::{json, Value};
9use std::collections::{HashMap, HashSet};
10use std::num::NonZeroUsize;
11
12#[cfg(feature = "otel")]
13use tracing::instrument;
14
15/// Context metadata for blockchain updates (accounts and instructions)
16/// This structure is designed to be extended over time with additional metadata
17#[derive(Debug, Clone, Default)]
18pub struct UpdateContext {
19    /// Blockchain slot number
20    pub slot: Option<u64>,
21    /// Transaction signature
22    pub signature: Option<String>,
23    /// Unix timestamp (seconds since epoch)
24    /// If not provided, will default to current system time when accessed
25    pub timestamp: Option<i64>,
26    /// Write version for account updates (monotonically increasing per account within a slot)
27    /// Used for staleness detection to reject out-of-order updates
28    pub write_version: Option<u64>,
29    /// Transaction index for instruction updates (orders transactions within a slot)
30    /// Used for staleness detection to reject out-of-order updates
31    pub txn_index: Option<u64>,
32    /// Additional custom metadata that can be added without breaking changes
33    pub metadata: HashMap<String, Value>,
34}
35
36impl UpdateContext {
37    /// Create a new UpdateContext with slot and signature
38    pub fn new(slot: u64, signature: String) -> Self {
39        Self {
40            slot: Some(slot),
41            signature: Some(signature),
42            timestamp: None,
43            write_version: None,
44            txn_index: None,
45            metadata: HashMap::new(),
46        }
47    }
48
49    /// Create a new UpdateContext with slot, signature, and timestamp
50    pub fn with_timestamp(slot: u64, signature: String, timestamp: i64) -> Self {
51        Self {
52            slot: Some(slot),
53            signature: Some(signature),
54            timestamp: Some(timestamp),
55            write_version: None,
56            txn_index: None,
57            metadata: HashMap::new(),
58        }
59    }
60
61    /// Create context for account updates with write_version for staleness detection
62    pub fn new_account(slot: u64, signature: String, write_version: u64) -> Self {
63        Self {
64            slot: Some(slot),
65            signature: Some(signature),
66            timestamp: None,
67            write_version: Some(write_version),
68            txn_index: None,
69            metadata: HashMap::new(),
70        }
71    }
72
73    /// Create context for instruction updates with txn_index for staleness detection
74    pub fn new_instruction(slot: u64, signature: String, txn_index: u64) -> Self {
75        Self {
76            slot: Some(slot),
77            signature: Some(signature),
78            timestamp: None,
79            write_version: None,
80            txn_index: Some(txn_index),
81            metadata: HashMap::new(),
82        }
83    }
84
85    /// Get the timestamp, falling back to current system time if not set
86    pub fn timestamp(&self) -> i64 {
87        self.timestamp.unwrap_or_else(|| {
88            std::time::SystemTime::now()
89                .duration_since(std::time::UNIX_EPOCH)
90                .unwrap()
91                .as_secs() as i64
92        })
93    }
94
95    /// Create an empty context (for testing or when context is not available)
96    pub fn empty() -> Self {
97        Self::default()
98    }
99
100    /// Add custom metadata
101    pub fn with_metadata(mut self, key: String, value: Value) -> Self {
102        self.metadata.insert(key, value);
103        self
104    }
105
106    /// Get metadata value
107    pub fn get_metadata(&self, key: &str) -> Option<&Value> {
108        self.metadata.get(key)
109    }
110
111    /// Convert context to JSON value for injection into event data
112    pub fn to_value(&self) -> Value {
113        let mut obj = serde_json::Map::new();
114        if let Some(slot) = self.slot {
115            obj.insert("slot".to_string(), json!(slot));
116        }
117        if let Some(ref sig) = self.signature {
118            obj.insert("signature".to_string(), json!(sig));
119        }
120        // Always include timestamp (use current time if not set)
121        obj.insert("timestamp".to_string(), json!(self.timestamp()));
122        for (key, value) in &self.metadata {
123            obj.insert(key.clone(), value.clone());
124        }
125        Value::Object(obj)
126    }
127}
128
129pub type Register = usize;
130pub type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>;
131
132pub type RegisterValue = Value;
133
134/// Trait for evaluating computed fields
135/// Implement this in your generated spec to enable computed field evaluation
136pub trait ComputedFieldsEvaluator {
137    fn evaluate(&self, state: &mut Value) -> Result<()>;
138}
139
140// Pending queue configuration
141const MAX_PENDING_UPDATES_TOTAL: usize = 2_500;
142const MAX_PENDING_UPDATES_PER_PDA: usize = 50;
143const PENDING_UPDATE_TTL_SECONDS: i64 = 300; // 5 minutes
144
145// Temporal index configuration - prevents unbounded history growth
146const TEMPORAL_HISTORY_TTL_SECONDS: i64 = 300; // 5 minutes, matches pending queue TTL
147const MAX_TEMPORAL_ENTRIES_PER_KEY: usize = 250;
148
149// State table configuration - aligned with downstream EntityCache (500 per view)
150const DEFAULT_MAX_STATE_TABLE_ENTRIES: usize = 2_500;
151const DEFAULT_MAX_ARRAY_LENGTH: usize = 100;
152
153const DEFAULT_MAX_LOOKUP_INDEX_ENTRIES: usize = 2_500;
154
155const DEFAULT_MAX_VERSION_TRACKER_ENTRIES: usize = 2_500;
156
157const DEFAULT_MAX_TEMPORAL_INDEX_KEYS: usize = 2_500;
158
159const DEFAULT_MAX_PDA_REVERSE_LOOKUP_ENTRIES: usize = 2_500;
160
161/// Estimate the size of a JSON value in bytes
162fn estimate_json_size(value: &Value) -> usize {
163    match value {
164        Value::Null => 4,
165        Value::Bool(_) => 5,
166        Value::Number(_) => 8,
167        Value::String(s) => s.len() + 2,
168        Value::Array(arr) => 2 + arr.iter().map(|v| estimate_json_size(v) + 1).sum::<usize>(),
169        Value::Object(obj) => {
170            2 + obj
171                .iter()
172                .map(|(k, v)| k.len() + 3 + estimate_json_size(v) + 1)
173                .sum::<usize>()
174        }
175    }
176}
177
178#[derive(Debug, Clone)]
179pub struct CompiledPath {
180    pub segments: std::sync::Arc<[String]>,
181}
182
183impl CompiledPath {
184    pub fn new(path: &str) -> Self {
185        let segments: Vec<String> = path.split('.').map(|s| s.to_string()).collect();
186        CompiledPath {
187            segments: segments.into(),
188        }
189    }
190
191    fn segments(&self) -> &[String] {
192        &self.segments
193    }
194}
195
196/// Represents the type of change made to a field for granular dirty tracking.
197/// This enables emitting only the actual changes rather than entire field values.
198#[derive(Debug, Clone)]
199pub enum FieldChange {
200    /// Field was replaced with a new value (emit the full value from state)
201    Replaced,
202    /// Items were appended to an array field (emit only the new items)
203    Appended(Vec<Value>),
204}
205
206/// Tracks field modifications during handler execution with granular change information.
207/// This replaces the simple HashSet<String> approach to enable delta-only emissions.
208#[derive(Debug, Clone, Default)]
209pub struct DirtyTracker {
210    changes: HashMap<String, FieldChange>,
211}
212
213impl DirtyTracker {
214    /// Create a new empty DirtyTracker
215    pub fn new() -> Self {
216        Self {
217            changes: HashMap::new(),
218        }
219    }
220
221    /// Mark a field as replaced (full value will be emitted)
222    pub fn mark_replaced(&mut self, path: &str) {
223        // If there was an append, it's now superseded by a full replacement
224        self.changes.insert(path.to_string(), FieldChange::Replaced);
225    }
226
227    /// Record an appended value for a field
228    pub fn mark_appended(&mut self, path: &str, value: Value) {
229        match self.changes.get_mut(path) {
230            Some(FieldChange::Appended(values)) => {
231                // Add to existing appended values
232                values.push(value);
233            }
234            Some(FieldChange::Replaced) => {
235                // Field was already replaced, keep it as replaced
236                // (the full value including the append will be emitted)
237            }
238            None => {
239                // First append to this field
240                self.changes
241                    .insert(path.to_string(), FieldChange::Appended(vec![value]));
242            }
243        }
244    }
245
246    /// Check if there are any changes tracked
247    pub fn is_empty(&self) -> bool {
248        self.changes.is_empty()
249    }
250
251    /// Get the number of changed fields
252    pub fn len(&self) -> usize {
253        self.changes.len()
254    }
255
256    /// Iterate over all changes
257    pub fn iter(&self) -> impl Iterator<Item = (&String, &FieldChange)> {
258        self.changes.iter()
259    }
260
261    /// Get a set of all dirty field paths (for backward compatibility)
262    pub fn dirty_paths(&self) -> HashSet<String> {
263        self.changes.keys().cloned().collect()
264    }
265
266    /// Consume the tracker and return the changes map
267    pub fn into_changes(self) -> HashMap<String, FieldChange> {
268        self.changes
269    }
270
271    /// Get a reference to the changes map
272    pub fn changes(&self) -> &HashMap<String, FieldChange> {
273        &self.changes
274    }
275
276    /// Get paths that were appended (not replaced)
277    pub fn appended_paths(&self) -> Vec<String> {
278        self.changes
279            .iter()
280            .filter_map(|(path, change)| match change {
281                FieldChange::Appended(_) => Some(path.clone()),
282                FieldChange::Replaced => None,
283            })
284            .collect()
285    }
286}
287
288pub struct VmContext {
289    registers: Vec<RegisterValue>,
290    states: HashMap<u32, StateTable>,
291    pub instructions_executed: u64,
292    pub cache_hits: u64,
293    path_cache: HashMap<String, CompiledPath>,
294    pub pda_cache_hits: u64,
295    pub pda_cache_misses: u64,
296    pub pending_queue_size: u64,
297    current_context: Option<UpdateContext>,
298    warnings: Vec<String>,
299    last_pda_lookup_miss: Option<String>,
300    last_pda_registered: Option<String>,
301}
302
303#[derive(Debug)]
304pub struct LookupIndex {
305    index: std::sync::Mutex<LruCache<String, Value>>,
306}
307
308impl LookupIndex {
309    pub fn new() -> Self {
310        Self::with_capacity(DEFAULT_MAX_LOOKUP_INDEX_ENTRIES)
311    }
312
313    pub fn with_capacity(capacity: usize) -> Self {
314        LookupIndex {
315            index: std::sync::Mutex::new(LruCache::new(
316                NonZeroUsize::new(capacity).expect("capacity must be > 0"),
317            )),
318        }
319    }
320
321    pub fn lookup(&self, lookup_value: &Value) -> Option<Value> {
322        let key = value_to_cache_key(lookup_value);
323        self.index.lock().unwrap().get(&key).cloned()
324    }
325
326    pub fn insert(&self, lookup_value: Value, primary_key: Value) {
327        let key = value_to_cache_key(&lookup_value);
328        self.index.lock().unwrap().put(key, primary_key);
329    }
330
331    pub fn len(&self) -> usize {
332        self.index.lock().unwrap().len()
333    }
334
335    pub fn is_empty(&self) -> bool {
336        self.index.lock().unwrap().is_empty()
337    }
338}
339
340impl Default for LookupIndex {
341    fn default() -> Self {
342        Self::new()
343    }
344}
345
346fn value_to_cache_key(value: &Value) -> String {
347    match value {
348        Value::String(s) => s.clone(),
349        Value::Number(n) => n.to_string(),
350        Value::Bool(b) => b.to_string(),
351        Value::Null => "null".to_string(),
352        _ => serde_json::to_string(value).unwrap_or_else(|_| "unknown".to_string()),
353    }
354}
355
356#[derive(Debug)]
357pub struct TemporalIndex {
358    index: std::sync::Mutex<LruCache<String, Vec<(Value, i64)>>>,
359}
360
361impl Default for TemporalIndex {
362    fn default() -> Self {
363        Self::new()
364    }
365}
366
367impl TemporalIndex {
368    pub fn new() -> Self {
369        Self::with_capacity(DEFAULT_MAX_TEMPORAL_INDEX_KEYS)
370    }
371
372    pub fn with_capacity(capacity: usize) -> Self {
373        TemporalIndex {
374            index: std::sync::Mutex::new(LruCache::new(
375                NonZeroUsize::new(capacity).expect("capacity must be > 0"),
376            )),
377        }
378    }
379
380    pub fn lookup(&self, lookup_value: &Value, timestamp: i64) -> Option<Value> {
381        let key = value_to_cache_key(lookup_value);
382        let mut cache = self.index.lock().unwrap();
383        if let Some(entries) = cache.get(&key) {
384            for i in (0..entries.len()).rev() {
385                if entries[i].1 <= timestamp {
386                    return Some(entries[i].0.clone());
387                }
388            }
389        }
390        None
391    }
392
393    pub fn lookup_latest(&self, lookup_value: &Value) -> Option<Value> {
394        let key = value_to_cache_key(lookup_value);
395        let mut cache = self.index.lock().unwrap();
396        if let Some(entries) = cache.get(&key) {
397            if let Some(last) = entries.last() {
398                return Some(last.0.clone());
399            }
400        }
401        None
402    }
403
404    pub fn insert(&self, lookup_value: Value, primary_key: Value, timestamp: i64) {
405        let key = value_to_cache_key(&lookup_value);
406        let mut cache = self.index.lock().unwrap();
407
408        let entries = cache.get_or_insert_mut(key, Vec::new);
409        entries.push((primary_key, timestamp));
410        entries.sort_by_key(|(_, ts)| *ts);
411
412        let cutoff = timestamp - TEMPORAL_HISTORY_TTL_SECONDS;
413        entries.retain(|(_, ts)| *ts >= cutoff);
414
415        if entries.len() > MAX_TEMPORAL_ENTRIES_PER_KEY {
416            let excess = entries.len() - MAX_TEMPORAL_ENTRIES_PER_KEY;
417            entries.drain(0..excess);
418        }
419    }
420
421    pub fn len(&self) -> usize {
422        self.index.lock().unwrap().len()
423    }
424
425    pub fn is_empty(&self) -> bool {
426        self.index.lock().unwrap().is_empty()
427    }
428
429    pub fn total_entries(&self) -> usize {
430        self.index
431            .lock()
432            .unwrap()
433            .iter()
434            .map(|(_, entries)| entries.len())
435            .sum()
436    }
437
438    pub fn cleanup_expired(&self, cutoff_timestamp: i64) -> usize {
439        let mut cache = self.index.lock().unwrap();
440        let mut total_removed = 0;
441
442        for (_, entries) in cache.iter_mut() {
443            let original_len = entries.len();
444            entries.retain(|(_, ts)| *ts >= cutoff_timestamp);
445            total_removed += original_len - entries.len();
446        }
447
448        total_removed
449    }
450}
451
452#[derive(Debug)]
453pub struct PdaReverseLookup {
454    // Maps: PDA address -> seed value (e.g., bonding_curve_addr -> mint)
455    index: LruCache<String, String>,
456}
457
458impl PdaReverseLookup {
459    pub fn new(capacity: usize) -> Self {
460        PdaReverseLookup {
461            index: LruCache::new(NonZeroUsize::new(capacity).unwrap()),
462        }
463    }
464
465    pub fn lookup(&mut self, pda_address: &str) -> Option<String> {
466        self.index.get(pda_address).cloned()
467    }
468
469    pub fn insert(&mut self, pda_address: String, seed_value: String) -> Option<String> {
470        let evicted = if self.index.len() >= self.index.cap().get() {
471            self.index.peek_lru().map(|(k, _)| k.clone())
472        } else {
473            None
474        };
475
476        self.index.put(pda_address, seed_value);
477        evicted
478    }
479
480    pub fn len(&self) -> usize {
481        self.index.len()
482    }
483
484    pub fn is_empty(&self) -> bool {
485        self.index.is_empty()
486    }
487}
488
489/// Input for queueing an account update.
490#[derive(Debug, Clone)]
491pub struct QueuedAccountUpdate {
492    pub pda_address: String,
493    pub account_type: String,
494    pub account_data: Value,
495    pub slot: u64,
496    pub write_version: u64,
497    pub signature: String,
498}
499
500/// Internal representation of a pending account update with queue metadata.
501#[derive(Debug, Clone)]
502pub struct PendingAccountUpdate {
503    pub account_type: String,
504    pub pda_address: String,
505    pub account_data: Value,
506    pub slot: u64,
507    pub write_version: u64,
508    pub signature: String,
509    pub queued_at: i64,
510}
511
512/// Input for queueing an instruction event when PDA lookup fails.
513#[derive(Debug, Clone)]
514pub struct QueuedInstructionEvent {
515    pub pda_address: String,
516    pub event_type: String,
517    pub event_data: Value,
518    pub slot: u64,
519    pub signature: String,
520}
521
522/// Internal representation of a pending instruction event with queue metadata.
523#[derive(Debug, Clone)]
524pub struct PendingInstructionEvent {
525    pub event_type: String,
526    pub pda_address: String,
527    pub event_data: Value,
528    pub slot: u64,
529    pub signature: String,
530    pub queued_at: i64,
531}
532
533#[derive(Debug, Clone)]
534pub struct PendingQueueStats {
535    pub total_updates: usize,
536    pub unique_pdas: usize,
537    pub oldest_age_seconds: i64,
538    pub largest_pda_queue_size: usize,
539    pub estimated_memory_bytes: usize,
540}
541
542#[derive(Debug, Clone, Default)]
543pub struct VmMemoryStats {
544    pub state_table_entity_count: usize,
545    pub state_table_max_entries: usize,
546    pub state_table_at_capacity: bool,
547    pub lookup_index_count: usize,
548    pub lookup_index_total_entries: usize,
549    pub temporal_index_count: usize,
550    pub temporal_index_total_entries: usize,
551    pub pda_reverse_lookup_count: usize,
552    pub pda_reverse_lookup_total_entries: usize,
553    pub version_tracker_entries: usize,
554    pub pending_queue_stats: Option<PendingQueueStats>,
555    pub path_cache_size: usize,
556}
557
558#[derive(Debug, Clone, Default)]
559pub struct CleanupResult {
560    pub pending_updates_removed: usize,
561    pub temporal_entries_removed: usize,
562}
563
564#[derive(Debug, Clone)]
565pub struct CapacityWarning {
566    pub current_entries: usize,
567    pub max_entries: usize,
568    pub entries_over_limit: usize,
569}
570
571#[derive(Debug, Clone)]
572pub struct StateTableConfig {
573    pub max_entries: usize,
574    pub max_array_length: usize,
575}
576
577impl Default for StateTableConfig {
578    fn default() -> Self {
579        Self {
580            max_entries: DEFAULT_MAX_STATE_TABLE_ENTRIES,
581            max_array_length: DEFAULT_MAX_ARRAY_LENGTH,
582        }
583    }
584}
585
586#[derive(Debug)]
587pub struct VersionTracker {
588    cache: std::sync::Mutex<LruCache<String, (u64, u64)>>,
589}
590
591impl VersionTracker {
592    pub fn new() -> Self {
593        Self::with_capacity(DEFAULT_MAX_VERSION_TRACKER_ENTRIES)
594    }
595
596    pub fn with_capacity(capacity: usize) -> Self {
597        VersionTracker {
598            cache: std::sync::Mutex::new(LruCache::new(
599                NonZeroUsize::new(capacity).expect("capacity must be > 0"),
600            )),
601        }
602    }
603
604    fn make_key(primary_key: &Value, event_type: &str) -> String {
605        format!("{}:{}", primary_key, event_type)
606    }
607
608    pub fn get(&self, primary_key: &Value, event_type: &str) -> Option<(u64, u64)> {
609        let key = Self::make_key(primary_key, event_type);
610        self.cache.lock().unwrap().get(&key).copied()
611    }
612
613    pub fn insert(&self, primary_key: &Value, event_type: &str, slot: u64, ordering_value: u64) {
614        let key = Self::make_key(primary_key, event_type);
615        self.cache.lock().unwrap().put(key, (slot, ordering_value));
616    }
617
618    pub fn len(&self) -> usize {
619        self.cache.lock().unwrap().len()
620    }
621
622    pub fn is_empty(&self) -> bool {
623        self.cache.lock().unwrap().is_empty()
624    }
625}
626
627impl Default for VersionTracker {
628    fn default() -> Self {
629        Self::new()
630    }
631}
632
633#[derive(Debug)]
634pub struct StateTable {
635    pub data: DashMap<Value, Value>,
636    access_times: DashMap<Value, i64>,
637    pub lookup_indexes: HashMap<String, LookupIndex>,
638    pub temporal_indexes: HashMap<String, TemporalIndex>,
639    pub pda_reverse_lookups: HashMap<String, PdaReverseLookup>,
640    pub pending_updates: DashMap<String, Vec<PendingAccountUpdate>>,
641    pub pending_instruction_events: DashMap<String, Vec<PendingInstructionEvent>>,
642    version_tracker: VersionTracker,
643    config: StateTableConfig,
644    #[cfg_attr(not(feature = "otel"), allow(dead_code))]
645    entity_name: String,
646}
647
648impl StateTable {
649    pub fn is_at_capacity(&self) -> bool {
650        self.data.len() >= self.config.max_entries
651    }
652
653    pub fn entries_over_limit(&self) -> usize {
654        self.data.len().saturating_sub(self.config.max_entries)
655    }
656
657    pub fn max_array_length(&self) -> usize {
658        self.config.max_array_length
659    }
660
661    fn touch(&self, key: &Value) {
662        let now = std::time::SystemTime::now()
663            .duration_since(std::time::UNIX_EPOCH)
664            .unwrap()
665            .as_secs() as i64;
666        self.access_times.insert(key.clone(), now);
667    }
668
669    fn evict_lru(&self, count: usize) -> usize {
670        if count == 0 || self.data.is_empty() {
671            return 0;
672        }
673
674        let mut entries: Vec<(Value, i64)> = self
675            .access_times
676            .iter()
677            .map(|entry| (entry.key().clone(), *entry.value()))
678            .collect();
679
680        entries.sort_by_key(|(_, ts)| *ts);
681
682        let to_evict: Vec<Value> = entries.iter().take(count).map(|(k, _)| k.clone()).collect();
683
684        let mut evicted = 0;
685        for key in to_evict {
686            self.data.remove(&key);
687            self.access_times.remove(&key);
688            evicted += 1;
689        }
690
691        #[cfg(feature = "otel")]
692        if evicted > 0 {
693            crate::vm_metrics::record_state_table_eviction(evicted as u64, &self.entity_name);
694        }
695
696        evicted
697    }
698
699    pub fn insert_with_eviction(&self, key: Value, value: Value) {
700        if self.data.len() >= self.config.max_entries && !self.data.contains_key(&key) {
701            #[cfg(feature = "otel")]
702            crate::vm_metrics::record_state_table_at_capacity(&self.entity_name);
703            let to_evict = (self.data.len() + 1).saturating_sub(self.config.max_entries);
704            self.evict_lru(to_evict.max(1));
705        }
706        self.data.insert(key.clone(), value);
707        self.touch(&key);
708    }
709
710    pub fn get_and_touch(&self, key: &Value) -> Option<Value> {
711        let result = self.data.get(key).map(|v| v.clone());
712        if result.is_some() {
713            self.touch(key);
714        }
715        result
716    }
717
718    /// Check if an update is fresh and update the version tracker.
719    /// Returns true if the update should be processed (is fresh).
720    /// Returns false if the update is stale and should be skipped.
721    ///
722    /// Comparison is lexicographic on (slot, ordering_value):
723    /// (100, 5) > (100, 3) > (99, 999)
724    pub fn is_fresh_update(
725        &self,
726        primary_key: &Value,
727        event_type: &str,
728        slot: u64,
729        ordering_value: u64,
730    ) -> bool {
731        let dominated = self
732            .version_tracker
733            .get(primary_key, event_type)
734            .map(|(last_slot, last_version)| (slot, ordering_value) <= (last_slot, last_version))
735            .unwrap_or(false);
736
737        if dominated {
738            return false;
739        }
740
741        self.version_tracker
742            .insert(primary_key, event_type, slot, ordering_value);
743        true
744    }
745}
746
747impl VmContext {
748    pub fn new() -> Self {
749        let mut vm = VmContext {
750            registers: vec![Value::Null; 256],
751            states: HashMap::new(),
752            instructions_executed: 0,
753            cache_hits: 0,
754            path_cache: HashMap::new(),
755            pda_cache_hits: 0,
756            pda_cache_misses: 0,
757            pending_queue_size: 0,
758            current_context: None,
759            warnings: Vec::new(),
760            last_pda_lookup_miss: None,
761            last_pda_registered: None,
762        };
763        vm.states.insert(
764            0,
765            StateTable {
766                data: DashMap::new(),
767                access_times: DashMap::new(),
768                lookup_indexes: HashMap::new(),
769                temporal_indexes: HashMap::new(),
770                pda_reverse_lookups: HashMap::new(),
771                pending_updates: DashMap::new(),
772                pending_instruction_events: DashMap::new(),
773                version_tracker: VersionTracker::new(),
774                config: StateTableConfig::default(),
775                entity_name: "default".to_string(),
776            },
777        );
778        vm
779    }
780
781    pub fn new_with_config(state_config: StateTableConfig) -> Self {
782        let mut vm = VmContext {
783            registers: vec![Value::Null; 256],
784            states: HashMap::new(),
785            instructions_executed: 0,
786            cache_hits: 0,
787            path_cache: HashMap::new(),
788            pda_cache_hits: 0,
789            pda_cache_misses: 0,
790            pending_queue_size: 0,
791            current_context: None,
792            warnings: Vec::new(),
793            last_pda_lookup_miss: None,
794            last_pda_registered: None,
795        };
796        vm.states.insert(
797            0,
798            StateTable {
799                data: DashMap::new(),
800                access_times: DashMap::new(),
801                lookup_indexes: HashMap::new(),
802                temporal_indexes: HashMap::new(),
803                pda_reverse_lookups: HashMap::new(),
804                pending_updates: DashMap::new(),
805                pending_instruction_events: DashMap::new(),
806                version_tracker: VersionTracker::new(),
807                config: state_config,
808                entity_name: "default".to_string(),
809            },
810        );
811        vm
812    }
813
814    /// Get a mutable reference to a state table by ID
815    /// Returns None if the state ID doesn't exist
816    pub fn get_state_table_mut(&mut self, state_id: u32) -> Option<&mut StateTable> {
817        self.states.get_mut(&state_id)
818    }
819
820    /// Get public access to registers (for metrics context)
821    pub fn registers_mut(&mut self) -> &mut Vec<RegisterValue> {
822        &mut self.registers
823    }
824
825    /// Get public access to path cache (for metrics context)
826    pub fn path_cache(&self) -> &HashMap<String, CompiledPath> {
827        &self.path_cache
828    }
829
830    /// Get the current update context
831    pub fn current_context(&self) -> Option<&UpdateContext> {
832        self.current_context.as_ref()
833    }
834
835    fn add_warning(&mut self, msg: String) {
836        self.warnings.push(msg);
837    }
838
839    pub fn take_warnings(&mut self) -> Vec<String> {
840        std::mem::take(&mut self.warnings)
841    }
842
843    pub fn has_warnings(&self) -> bool {
844        !self.warnings.is_empty()
845    }
846
847    pub fn update_state_from_register(
848        &mut self,
849        state_id: u32,
850        key: Value,
851        register: Register,
852    ) -> Result<()> {
853        let state = self.states.get(&state_id).ok_or("State table not found")?;
854        let value = self.registers[register].clone();
855        state.insert_with_eviction(key, value);
856        Ok(())
857    }
858
859    fn reset_registers(&mut self) {
860        for reg in &mut self.registers {
861            *reg = Value::Null;
862        }
863    }
864
865    /// Extract only the dirty fields from state (public for use by instruction hooks)
866    pub fn extract_partial_state(
867        &self,
868        state_reg: Register,
869        dirty_fields: &HashSet<String>,
870    ) -> Result<Value> {
871        let full_state = &self.registers[state_reg];
872
873        if dirty_fields.is_empty() {
874            return Ok(json!({}));
875        }
876
877        let mut partial = serde_json::Map::new();
878
879        for path in dirty_fields {
880            let segments: Vec<&str> = path.split('.').collect();
881
882            let mut current = full_state;
883            let mut found = true;
884
885            for segment in &segments {
886                match current.get(segment) {
887                    Some(v) => current = v,
888                    None => {
889                        found = false;
890                        break;
891                    }
892                }
893            }
894
895            if !found {
896                continue;
897            }
898
899            let mut target = &mut partial;
900            for (i, segment) in segments.iter().enumerate() {
901                if i == segments.len() - 1 {
902                    target.insert(segment.to_string(), current.clone());
903                } else {
904                    target
905                        .entry(segment.to_string())
906                        .or_insert_with(|| json!({}));
907                    target = target
908                        .get_mut(*segment)
909                        .and_then(|v| v.as_object_mut())
910                        .ok_or("Failed to build nested structure")?;
911                }
912            }
913        }
914
915        Ok(Value::Object(partial))
916    }
917
918    /// Extract a patch from state based on the DirtyTracker.
919    /// For Replaced fields: extracts the full value from state.
920    /// For Appended fields: emits only the appended values as an array.
921    pub fn extract_partial_state_with_tracker(
922        &self,
923        state_reg: Register,
924        tracker: &DirtyTracker,
925    ) -> Result<Value> {
926        let full_state = &self.registers[state_reg];
927
928        if tracker.is_empty() {
929            return Ok(json!({}));
930        }
931
932        let mut partial = serde_json::Map::new();
933
934        for (path, change) in tracker.iter() {
935            let segments: Vec<&str> = path.split('.').collect();
936
937            let value_to_insert = match change {
938                FieldChange::Replaced => {
939                    let mut current = full_state;
940                    let mut found = true;
941
942                    for segment in &segments {
943                        match current.get(*segment) {
944                            Some(v) => current = v,
945                            None => {
946                                found = false;
947                                break;
948                            }
949                        }
950                    }
951
952                    if !found {
953                        continue;
954                    }
955                    current.clone()
956                }
957                FieldChange::Appended(values) => Value::Array(values.clone()),
958            };
959
960            let mut target = &mut partial;
961            for (i, segment) in segments.iter().enumerate() {
962                if i == segments.len() - 1 {
963                    target.insert(segment.to_string(), value_to_insert.clone());
964                } else {
965                    target
966                        .entry(segment.to_string())
967                        .or_insert_with(|| json!({}));
968                    target = target
969                        .get_mut(*segment)
970                        .and_then(|v| v.as_object_mut())
971                        .ok_or("Failed to build nested structure")?;
972                }
973            }
974        }
975
976        Ok(Value::Object(partial))
977    }
978
979    fn get_compiled_path(&mut self, path: &str) -> CompiledPath {
980        if let Some(compiled) = self.path_cache.get(path) {
981            self.cache_hits += 1;
982            #[cfg(feature = "otel")]
983            crate::vm_metrics::record_path_cache_hit();
984            return compiled.clone();
985        }
986        #[cfg(feature = "otel")]
987        crate::vm_metrics::record_path_cache_miss();
988        let compiled = CompiledPath::new(path);
989        self.path_cache.insert(path.to_string(), compiled.clone());
990        compiled
991    }
992
993    /// Process an event with optional context metadata
994    #[cfg_attr(feature = "otel", instrument(
995        name = "vm.process_event",
996        skip(self, bytecode, event_value, log),
997        level = "info",
998        fields(
999            event_type = %event_type,
1000            slot = context.as_ref().and_then(|c| c.slot),
1001        )
1002    ))]
1003    pub fn process_event(
1004        &mut self,
1005        bytecode: &MultiEntityBytecode,
1006        event_value: Value,
1007        event_type: &str,
1008        context: Option<&UpdateContext>,
1009        mut log: Option<&mut crate::canonical_log::CanonicalLog>,
1010    ) -> Result<Vec<Mutation>> {
1011        self.current_context = context.cloned();
1012
1013        let mut event_value = event_value;
1014        if let Some(ctx) = context {
1015            if let Some(obj) = event_value.as_object_mut() {
1016                obj.insert("__update_context".to_string(), ctx.to_value());
1017            }
1018        }
1019
1020        let mut all_mutations = Vec::new();
1021
1022        if let Some(entity_names) = bytecode.event_routing.get(event_type) {
1023            for entity_name in entity_names {
1024                if let Some(entity_bytecode) = bytecode.entities.get(entity_name) {
1025                    if let Some(handler) = entity_bytecode.handlers.get(event_type) {
1026                        if let Some(ref mut log) = log {
1027                            log.set("entity", entity_name.clone());
1028                            log.inc("handlers", 1);
1029                        }
1030
1031                        let opcodes_before = self.instructions_executed;
1032                        let cache_before = self.cache_hits;
1033                        let pda_hits_before = self.pda_cache_hits;
1034                        let pda_misses_before = self.pda_cache_misses;
1035
1036                        let mutations = self.execute_handler(
1037                            handler,
1038                            &event_value,
1039                            event_type,
1040                            entity_bytecode.state_id,
1041                            entity_name,
1042                            entity_bytecode.computed_fields_evaluator.as_ref(),
1043                        )?;
1044
1045                        if let Some(ref mut log) = log {
1046                            log.inc(
1047                                "opcodes",
1048                                (self.instructions_executed - opcodes_before) as i64,
1049                            );
1050                            log.inc("cache_hits", (self.cache_hits - cache_before) as i64);
1051                            log.inc("pda_hits", (self.pda_cache_hits - pda_hits_before) as i64);
1052                            log.inc(
1053                                "pda_misses",
1054                                (self.pda_cache_misses - pda_misses_before) as i64,
1055                            );
1056                        }
1057
1058                        if mutations.is_empty() {
1059                            if let Some(missed_pda) = self.take_last_pda_lookup_miss() {
1060                                if event_type.ends_with("IxState") {
1061                                    let slot = context.and_then(|c| c.slot).unwrap_or(0);
1062                                    let signature = context
1063                                        .and_then(|c| c.signature.clone())
1064                                        .unwrap_or_default();
1065                                    let _ = self.queue_instruction_event(
1066                                        entity_bytecode.state_id,
1067                                        QueuedInstructionEvent {
1068                                            pda_address: missed_pda,
1069                                            event_type: event_type.to_string(),
1070                                            event_data: event_value.clone(),
1071                                            slot,
1072                                            signature,
1073                                        },
1074                                    );
1075                                }
1076                            }
1077                        }
1078
1079                        all_mutations.extend(mutations);
1080
1081                        if let Some(registered_pda) = self.take_last_pda_registered() {
1082                            let pending_events = self.flush_pending_instruction_events(
1083                                entity_bytecode.state_id,
1084                                &registered_pda,
1085                            );
1086                            for pending in pending_events {
1087                                if let Some(pending_handler) =
1088                                    entity_bytecode.handlers.get(&pending.event_type)
1089                                {
1090                                    if let Ok(reprocessed_mutations) = self.execute_handler(
1091                                        pending_handler,
1092                                        &pending.event_data,
1093                                        &pending.event_type,
1094                                        entity_bytecode.state_id,
1095                                        entity_name,
1096                                        entity_bytecode.computed_fields_evaluator.as_ref(),
1097                                    ) {
1098                                        all_mutations.extend(reprocessed_mutations);
1099                                    }
1100                                }
1101                            }
1102                        }
1103                    } else if let Some(ref mut log) = log {
1104                        log.set("skip_reason", "no_handler");
1105                    }
1106                } else if let Some(ref mut log) = log {
1107                    log.set("skip_reason", "entity_not_found");
1108                }
1109            }
1110        } else if let Some(ref mut log) = log {
1111            log.set("skip_reason", "no_event_routing");
1112        }
1113
1114        if let Some(log) = log {
1115            log.set("mutations", all_mutations.len() as i64);
1116            if let Some(first) = all_mutations.first() {
1117                if let Some(key_str) = first.key.as_str() {
1118                    log.set("primary_key", key_str);
1119                } else if let Some(key_num) = first.key.as_u64() {
1120                    log.set("primary_key", key_num as i64);
1121                }
1122            }
1123            if let Some(state) = self.states.get(&0) {
1124                log.set("state_table_size", state.data.len() as i64);
1125            }
1126
1127            let warnings = self.take_warnings();
1128            if !warnings.is_empty() {
1129                log.set("warnings", warnings.len() as i64);
1130                log.set(
1131                    "warning_messages",
1132                    Value::Array(warnings.into_iter().map(Value::String).collect()),
1133                );
1134                log.set_level(crate::canonical_log::LogLevel::Warn);
1135            }
1136        } else {
1137            self.warnings.clear();
1138        }
1139
1140        Ok(all_mutations)
1141    }
1142
1143    pub fn process_any(
1144        &mut self,
1145        bytecode: &MultiEntityBytecode,
1146        any: prost_types::Any,
1147    ) -> Result<Vec<Mutation>> {
1148        let (event_value, event_type) = bytecode.proto_router.decode(any)?;
1149        self.process_event(bytecode, event_value, &event_type, None, None)
1150    }
1151
1152    #[cfg_attr(feature = "otel", instrument(
1153        name = "vm.execute_handler",
1154        skip(self, handler, event_value, entity_evaluator),
1155        level = "debug",
1156        fields(
1157            event_type = %event_type,
1158            handler_opcodes = handler.len(),
1159        )
1160    ))]
1161    #[allow(clippy::type_complexity)]
1162    fn execute_handler(
1163        &mut self,
1164        handler: &[OpCode],
1165        event_value: &Value,
1166        event_type: &str,
1167        override_state_id: u32,
1168        entity_name: &str,
1169        entity_evaluator: Option<&Box<dyn Fn(&mut Value) -> Result<()> + Send + Sync>>,
1170    ) -> Result<Vec<Mutation>> {
1171        self.reset_registers();
1172        self.last_pda_lookup_miss = None;
1173
1174        let mut pc: usize = 0;
1175        let mut output = Vec::new();
1176        let mut dirty_tracker = DirtyTracker::new();
1177
1178        while pc < handler.len() {
1179            match &handler[pc] {
1180                OpCode::LoadEventField {
1181                    path,
1182                    dest,
1183                    default,
1184                } => {
1185                    let value = self.load_field(event_value, path, default.as_ref())?;
1186                    self.registers[*dest] = value;
1187                    pc += 1;
1188                }
1189                OpCode::LoadConstant { value, dest } => {
1190                    self.registers[*dest] = value.clone();
1191                    pc += 1;
1192                }
1193                OpCode::CopyRegister { source, dest } => {
1194                    self.registers[*dest] = self.registers[*source].clone();
1195                    pc += 1;
1196                }
1197                OpCode::CopyRegisterIfNull { source, dest } => {
1198                    if self.registers[*dest].is_null() {
1199                        self.registers[*dest] = self.registers[*source].clone();
1200                    }
1201                    pc += 1;
1202                }
1203                OpCode::GetEventType { dest } => {
1204                    self.registers[*dest] = json!(event_type);
1205                    pc += 1;
1206                }
1207                OpCode::CreateObject { dest } => {
1208                    self.registers[*dest] = json!({});
1209                    pc += 1;
1210                }
1211                OpCode::SetField {
1212                    object,
1213                    path,
1214                    value,
1215                } => {
1216                    self.set_field_auto_vivify(*object, path, *value)?;
1217                    dirty_tracker.mark_replaced(path);
1218                    pc += 1;
1219                }
1220                OpCode::SetFields { object, fields } => {
1221                    for (path, value_reg) in fields {
1222                        self.set_field_auto_vivify(*object, path, *value_reg)?;
1223                        dirty_tracker.mark_replaced(path);
1224                    }
1225                    pc += 1;
1226                }
1227                OpCode::GetField { object, path, dest } => {
1228                    let value = self.get_field(*object, path)?;
1229                    self.registers[*dest] = value;
1230                    pc += 1;
1231                }
1232                OpCode::ReadOrInitState {
1233                    state_id: _,
1234                    key,
1235                    default,
1236                    dest,
1237                } => {
1238                    let actual_state_id = override_state_id;
1239                    let entity_name_owned = entity_name.to_string();
1240                    self.states
1241                        .entry(actual_state_id)
1242                        .or_insert_with(|| StateTable {
1243                            data: DashMap::new(),
1244                            access_times: DashMap::new(),
1245                            lookup_indexes: HashMap::new(),
1246                            temporal_indexes: HashMap::new(),
1247                            pda_reverse_lookups: HashMap::new(),
1248                            pending_updates: DashMap::new(),
1249                            pending_instruction_events: DashMap::new(),
1250                            version_tracker: VersionTracker::new(),
1251                            config: StateTableConfig::default(),
1252                            entity_name: entity_name_owned,
1253                        });
1254                    let key_value = self.registers[*key].clone();
1255                    let warn_null_key = key_value.is_null()
1256                        && event_type.ends_with("State")
1257                        && !event_type.ends_with("IxState");
1258
1259                    if warn_null_key {
1260                        self.add_warning(format!(
1261                            "ReadOrInitState: key register {} is NULL for account state, event_type={}",
1262                            key, event_type
1263                        ));
1264                    }
1265
1266                    let state = self
1267                        .states
1268                        .get(&actual_state_id)
1269                        .ok_or("State table not found")?;
1270
1271                    if !key_value.is_null() {
1272                        if let Some(ctx) = &self.current_context {
1273                            let ordering_value = ctx.write_version.or(ctx.txn_index);
1274                            if let (Some(slot), Some(version)) = (ctx.slot, ordering_value) {
1275                                if !state.is_fresh_update(&key_value, event_type, slot, version) {
1276                                    self.add_warning(format!(
1277                                        "Stale update skipped: slot={}, version={}",
1278                                        slot, version
1279                                    ));
1280                                    return Ok(Vec::new());
1281                                }
1282                            }
1283                        }
1284                    }
1285                    let value = state
1286                        .get_and_touch(&key_value)
1287                        .unwrap_or_else(|| default.clone());
1288
1289                    self.registers[*dest] = value;
1290                    pc += 1;
1291                }
1292                OpCode::UpdateState {
1293                    state_id: _,
1294                    key,
1295                    value,
1296                } => {
1297                    let actual_state_id = override_state_id;
1298                    let state = self
1299                        .states
1300                        .get(&actual_state_id)
1301                        .ok_or("State table not found")?;
1302                    let key_value = self.registers[*key].clone();
1303                    let value_data = self.registers[*value].clone();
1304
1305                    state.insert_with_eviction(key_value, value_data);
1306                    pc += 1;
1307                }
1308                OpCode::AppendToArray {
1309                    object,
1310                    path,
1311                    value,
1312                } => {
1313                    let appended_value = self.registers[*value].clone();
1314                    let max_len = self
1315                        .states
1316                        .get(&override_state_id)
1317                        .map(|s| s.max_array_length())
1318                        .unwrap_or(DEFAULT_MAX_ARRAY_LENGTH);
1319                    self.append_to_array(*object, path, *value, max_len)?;
1320                    dirty_tracker.mark_appended(path, appended_value);
1321                    pc += 1;
1322                }
1323                OpCode::GetCurrentTimestamp { dest } => {
1324                    let timestamp = std::time::SystemTime::now()
1325                        .duration_since(std::time::UNIX_EPOCH)
1326                        .unwrap()
1327                        .as_secs() as i64;
1328                    self.registers[*dest] = json!(timestamp);
1329                    pc += 1;
1330                }
1331                OpCode::CreateEvent { dest, event_value } => {
1332                    let timestamp = std::time::SystemTime::now()
1333                        .duration_since(std::time::UNIX_EPOCH)
1334                        .unwrap()
1335                        .as_secs() as i64;
1336
1337                    // Filter out __update_context from the event data
1338                    let mut event_data = self.registers[*event_value].clone();
1339                    if let Some(obj) = event_data.as_object_mut() {
1340                        obj.remove("__update_context");
1341                    }
1342
1343                    // Create event with timestamp, data, and optional slot/signature from context
1344                    let mut event = serde_json::Map::new();
1345                    event.insert("timestamp".to_string(), json!(timestamp));
1346                    event.insert("data".to_string(), event_data);
1347
1348                    // Add slot and signature if available from current context
1349                    if let Some(ref ctx) = self.current_context {
1350                        if let Some(slot) = ctx.slot {
1351                            event.insert("slot".to_string(), json!(slot));
1352                        }
1353                        if let Some(ref signature) = ctx.signature {
1354                            event.insert("signature".to_string(), json!(signature));
1355                        }
1356                    }
1357
1358                    self.registers[*dest] = Value::Object(event);
1359                    pc += 1;
1360                }
1361                OpCode::CreateCapture {
1362                    dest,
1363                    capture_value,
1364                } => {
1365                    let timestamp = std::time::SystemTime::now()
1366                        .duration_since(std::time::UNIX_EPOCH)
1367                        .unwrap()
1368                        .as_secs() as i64;
1369
1370                    // Get the capture data (already filtered by load_field)
1371                    let capture_data = self.registers[*capture_value].clone();
1372
1373                    // Extract account_address from the original event if available
1374                    let account_address = event_value
1375                        .get("__account_address")
1376                        .and_then(|v| v.as_str())
1377                        .unwrap_or("")
1378                        .to_string();
1379
1380                    // Create capture wrapper with timestamp, account_address, data, and optional slot/signature
1381                    let mut capture = serde_json::Map::new();
1382                    capture.insert("timestamp".to_string(), json!(timestamp));
1383                    capture.insert("account_address".to_string(), json!(account_address));
1384                    capture.insert("data".to_string(), capture_data);
1385
1386                    // Add slot and signature if available from current context
1387                    if let Some(ref ctx) = self.current_context {
1388                        if let Some(slot) = ctx.slot {
1389                            capture.insert("slot".to_string(), json!(slot));
1390                        }
1391                        if let Some(ref signature) = ctx.signature {
1392                            capture.insert("signature".to_string(), json!(signature));
1393                        }
1394                    }
1395
1396                    self.registers[*dest] = Value::Object(capture);
1397                    pc += 1;
1398                }
1399                OpCode::Transform {
1400                    source,
1401                    dest,
1402                    transformation,
1403                } => {
1404                    if source == dest {
1405                        self.transform_in_place(*source, transformation)?;
1406                    } else {
1407                        let source_value = &self.registers[*source];
1408                        let value = self.apply_transformation(source_value, transformation)?;
1409                        self.registers[*dest] = value;
1410                    }
1411                    pc += 1;
1412                }
1413                OpCode::EmitMutation {
1414                    entity_name,
1415                    key,
1416                    state,
1417                } => {
1418                    let debug_computed = std::env::var("HYPERSTACK_DEBUG_COMPUTED").is_ok();
1419                    let primary_key = self.registers[*key].clone();
1420
1421                    if primary_key.is_null() || dirty_tracker.is_empty() {
1422                        let reason = if dirty_tracker.is_empty() {
1423                            "no_fields_modified"
1424                        } else {
1425                            "null_primary_key"
1426                        };
1427                        self.add_warning(format!(
1428                            "Skipping mutation for entity '{}': {} (dirty_fields={})",
1429                            entity_name,
1430                            reason,
1431                            dirty_tracker.len()
1432                        ));
1433                    } else {
1434                        let patch =
1435                            self.extract_partial_state_with_tracker(*state, &dirty_tracker)?;
1436
1437                        if debug_computed {
1438                            if let Some(results) = patch.get("results") {
1439                                if results.get("rng").is_some()
1440                                    || results.get("winning_square").is_some()
1441                                    || results.get("did_hit_motherlode").is_some()
1442                                {
1443                                    tracing::warn!(
1444                                        "[VM_EMIT_MUTATION] entity={} key={} patch.results: rng={:?} winning_square={:?} did_hit_motherlode={:?}",
1445                                        entity_name,
1446                                        primary_key,
1447                                        results.get("rng"),
1448                                        results.get("winning_square"),
1449                                        results.get("did_hit_motherlode")
1450                                    );
1451                                }
1452                            }
1453                        }
1454
1455                        let append = dirty_tracker.appended_paths();
1456                        let mutation = Mutation {
1457                            export: entity_name.clone(),
1458                            key: primary_key,
1459                            patch,
1460                            append,
1461                        };
1462                        output.push(mutation);
1463                    }
1464                    pc += 1;
1465                }
1466                OpCode::SetFieldIfNull {
1467                    object,
1468                    path,
1469                    value,
1470                } => {
1471                    let was_set = self.set_field_if_null(*object, path, *value)?;
1472                    if was_set {
1473                        dirty_tracker.mark_replaced(path);
1474                    }
1475                    pc += 1;
1476                }
1477                OpCode::SetFieldMax {
1478                    object,
1479                    path,
1480                    value,
1481                } => {
1482                    let was_updated = self.set_field_max(*object, path, *value)?;
1483                    if was_updated {
1484                        dirty_tracker.mark_replaced(path);
1485                    }
1486                    pc += 1;
1487                }
1488                OpCode::UpdateTemporalIndex {
1489                    state_id: _,
1490                    index_name,
1491                    lookup_value,
1492                    primary_key,
1493                    timestamp,
1494                } => {
1495                    let actual_state_id = override_state_id;
1496                    let state = self
1497                        .states
1498                        .get_mut(&actual_state_id)
1499                        .ok_or("State table not found")?;
1500                    let index = state
1501                        .temporal_indexes
1502                        .entry(index_name.clone())
1503                        .or_insert_with(TemporalIndex::new);
1504
1505                    let lookup_val = self.registers[*lookup_value].clone();
1506                    let pk_val = self.registers[*primary_key].clone();
1507                    let ts_val = if let Some(val) = self.registers[*timestamp].as_i64() {
1508                        val
1509                    } else if let Some(val) = self.registers[*timestamp].as_u64() {
1510                        val as i64
1511                    } else {
1512                        return Err(format!(
1513                            "Timestamp must be a number (i64 or u64), got: {:?}",
1514                            self.registers[*timestamp]
1515                        )
1516                        .into());
1517                    };
1518
1519                    index.insert(lookup_val, pk_val, ts_val);
1520                    pc += 1;
1521                }
1522                OpCode::LookupTemporalIndex {
1523                    state_id: _,
1524                    index_name,
1525                    lookup_value,
1526                    timestamp,
1527                    dest,
1528                } => {
1529                    let actual_state_id = override_state_id;
1530                    let state = self
1531                        .states
1532                        .get(&actual_state_id)
1533                        .ok_or("State table not found")?;
1534                    let lookup_val = &self.registers[*lookup_value];
1535
1536                    let result = if self.registers[*timestamp].is_null() {
1537                        if let Some(index) = state.temporal_indexes.get(index_name) {
1538                            index.lookup_latest(lookup_val).unwrap_or(Value::Null)
1539                        } else {
1540                            Value::Null
1541                        }
1542                    } else {
1543                        let ts_val = if let Some(val) = self.registers[*timestamp].as_i64() {
1544                            val
1545                        } else if let Some(val) = self.registers[*timestamp].as_u64() {
1546                            val as i64
1547                        } else {
1548                            return Err(format!(
1549                                "Timestamp must be a number (i64 or u64), got: {:?}",
1550                                self.registers[*timestamp]
1551                            )
1552                            .into());
1553                        };
1554
1555                        if let Some(index) = state.temporal_indexes.get(index_name) {
1556                            index.lookup(lookup_val, ts_val).unwrap_or(Value::Null)
1557                        } else {
1558                            Value::Null
1559                        }
1560                    };
1561
1562                    self.registers[*dest] = result;
1563                    pc += 1;
1564                }
1565                OpCode::UpdateLookupIndex {
1566                    state_id: _,
1567                    index_name,
1568                    lookup_value,
1569                    primary_key,
1570                } => {
1571                    let actual_state_id = override_state_id;
1572                    let state = self
1573                        .states
1574                        .get_mut(&actual_state_id)
1575                        .ok_or("State table not found")?;
1576                    let index = state
1577                        .lookup_indexes
1578                        .entry(index_name.clone())
1579                        .or_insert_with(LookupIndex::new);
1580
1581                    let lookup_val = self.registers[*lookup_value].clone();
1582                    let pk_val = self.registers[*primary_key].clone();
1583
1584                    index.insert(lookup_val, pk_val);
1585                    pc += 1;
1586                }
1587                OpCode::LookupIndex {
1588                    state_id: _,
1589                    index_name,
1590                    lookup_value,
1591                    dest,
1592                } => {
1593                    let actual_state_id = override_state_id;
1594                    let lookup_val = self.registers[*lookup_value].clone();
1595
1596                    let result = {
1597                        let state = self
1598                            .states
1599                            .get(&actual_state_id)
1600                            .ok_or("State table not found")?;
1601
1602                        if let Some(index) = state.lookup_indexes.get(index_name) {
1603                            let found = index.lookup(&lookup_val).unwrap_or(Value::Null);
1604                            #[cfg(feature = "otel")]
1605                            if found.is_null() {
1606                                crate::vm_metrics::record_lookup_index_miss(index_name);
1607                            } else {
1608                                crate::vm_metrics::record_lookup_index_hit(index_name);
1609                            }
1610                            found
1611                        } else {
1612                            Value::Null
1613                        }
1614                    };
1615
1616                    let final_result = if result.is_null() {
1617                        if let Some(pda_str) = lookup_val.as_str() {
1618                            let state = self
1619                                .states
1620                                .get_mut(&actual_state_id)
1621                                .ok_or("State table not found")?;
1622
1623                            if let Some(pda_lookup) =
1624                                state.pda_reverse_lookups.get_mut("default_pda_lookup")
1625                            {
1626                                if let Some(resolved) = pda_lookup.lookup(pda_str) {
1627                                    Value::String(resolved)
1628                                } else {
1629                                    self.last_pda_lookup_miss = Some(pda_str.to_string());
1630                                    Value::Null
1631                                }
1632                            } else {
1633                                self.last_pda_lookup_miss = Some(pda_str.to_string());
1634                                Value::Null
1635                            }
1636                        } else {
1637                            Value::Null
1638                        }
1639                    } else {
1640                        result
1641                    };
1642
1643                    self.registers[*dest] = final_result;
1644                    pc += 1;
1645                }
1646                OpCode::SetFieldSum {
1647                    object,
1648                    path,
1649                    value,
1650                } => {
1651                    let was_updated = self.set_field_sum(*object, path, *value)?;
1652                    if was_updated {
1653                        dirty_tracker.mark_replaced(path);
1654                    }
1655                    pc += 1;
1656                }
1657                OpCode::SetFieldIncrement { object, path } => {
1658                    let was_updated = self.set_field_increment(*object, path)?;
1659                    if was_updated {
1660                        dirty_tracker.mark_replaced(path);
1661                    }
1662                    pc += 1;
1663                }
1664                OpCode::SetFieldMin {
1665                    object,
1666                    path,
1667                    value,
1668                } => {
1669                    let was_updated = self.set_field_min(*object, path, *value)?;
1670                    if was_updated {
1671                        dirty_tracker.mark_replaced(path);
1672                    }
1673                    pc += 1;
1674                }
1675                OpCode::AddToUniqueSet {
1676                    state_id: _,
1677                    set_name,
1678                    value,
1679                    count_object,
1680                    count_path,
1681                } => {
1682                    let value_to_add = self.registers[*value].clone();
1683
1684                    // Store the unique set within the entity object, not in the state table
1685                    // This ensures each entity instance has its own unique set
1686                    let set_field_path = format!("__unique_set:{}", set_name);
1687
1688                    // Get or create the unique set from the entity object
1689                    let mut set: HashSet<Value> =
1690                        if let Ok(existing) = self.get_field(*count_object, &set_field_path) {
1691                            if !existing.is_null() {
1692                                serde_json::from_value(existing).unwrap_or_default()
1693                            } else {
1694                                HashSet::new()
1695                            }
1696                        } else {
1697                            HashSet::new()
1698                        };
1699
1700                    // Add value to set
1701                    let was_new = set.insert(value_to_add);
1702
1703                    // Store updated set back in the entity object
1704                    let set_as_vec: Vec<Value> = set.iter().cloned().collect();
1705                    self.registers[100] = serde_json::to_value(set_as_vec)?;
1706                    self.set_field_auto_vivify(*count_object, &set_field_path, 100)?;
1707
1708                    // Update the count field in the object
1709                    if was_new {
1710                        self.registers[100] = Value::Number(serde_json::Number::from(set.len()));
1711                        self.set_field_auto_vivify(*count_object, count_path, 100)?;
1712                        dirty_tracker.mark_replaced(count_path);
1713                    }
1714
1715                    pc += 1;
1716                }
1717                OpCode::ConditionalSetField {
1718                    object,
1719                    path,
1720                    value,
1721                    condition_field,
1722                    condition_op,
1723                    condition_value,
1724                } => {
1725                    let field_value = self.load_field(event_value, condition_field, None)?;
1726                    let condition_met =
1727                        self.evaluate_comparison(&field_value, condition_op, condition_value)?;
1728
1729                    if condition_met {
1730                        self.set_field_auto_vivify(*object, path, *value)?;
1731                        dirty_tracker.mark_replaced(path);
1732                    }
1733                    pc += 1;
1734                }
1735                OpCode::ConditionalIncrement {
1736                    object,
1737                    path,
1738                    condition_field,
1739                    condition_op,
1740                    condition_value,
1741                } => {
1742                    let field_value = self.load_field(event_value, condition_field, None)?;
1743                    let condition_met =
1744                        self.evaluate_comparison(&field_value, condition_op, condition_value)?;
1745
1746                    if condition_met {
1747                        let was_updated = self.set_field_increment(*object, path)?;
1748                        if was_updated {
1749                            dirty_tracker.mark_replaced(path);
1750                        }
1751                    }
1752                    pc += 1;
1753                }
1754                OpCode::EvaluateComputedFields {
1755                    state,
1756                    computed_paths,
1757                } => {
1758                    let debug_computed = std::env::var("HYPERSTACK_DEBUG_COMPUTED").is_ok();
1759
1760                    if let Some(evaluator) = entity_evaluator {
1761                        let old_values: Vec<_> = computed_paths
1762                            .iter()
1763                            .map(|path| Self::get_value_at_path(&self.registers[*state], path))
1764                            .collect();
1765
1766                        if debug_computed {
1767                            tracing::warn!(
1768                                "[VM_EVAL_COMPUTED] entity={} BEFORE evaluator: {:?}",
1769                                entity_name,
1770                                computed_paths
1771                                    .iter()
1772                                    .zip(old_values.iter())
1773                                    .map(|(p, v)| format!("{}={:?}", p, v))
1774                                    .collect::<Vec<_>>()
1775                            );
1776                        }
1777
1778                        let state_value = &mut self.registers[*state];
1779                        let eval_result = evaluator(state_value);
1780
1781                        if debug_computed {
1782                            if let Err(ref e) = eval_result {
1783                                tracing::error!(
1784                                    "[VM_EVAL_COMPUTED] entity={} evaluator FAILED: {:?}",
1785                                    entity_name,
1786                                    e
1787                                );
1788                            }
1789                        }
1790
1791                        if eval_result.is_ok() {
1792                            for (path, old_value) in computed_paths.iter().zip(old_values.iter()) {
1793                                let new_value =
1794                                    Self::get_value_at_path(&self.registers[*state], path);
1795
1796                                if debug_computed {
1797                                    tracing::warn!(
1798                                        "[VM_EVAL_COMPUTED] entity={} path={} old={:?} new={:?} changed={}",
1799                                        entity_name,
1800                                        path,
1801                                        old_value,
1802                                        new_value,
1803                                        new_value != *old_value
1804                                    );
1805                                }
1806
1807                                if new_value != *old_value {
1808                                    dirty_tracker.mark_replaced(path);
1809                                }
1810                            }
1811                        }
1812                    } else if debug_computed {
1813                        tracing::warn!(
1814                            "[VM_EVAL_COMPUTED] entity={} NO EVALUATOR - skipping computed fields",
1815                            entity_name
1816                        );
1817                    }
1818                    pc += 1;
1819                }
1820                OpCode::UpdatePdaReverseLookup {
1821                    state_id: _,
1822                    lookup_name,
1823                    pda_address,
1824                    primary_key,
1825                } => {
1826                    let actual_state_id = override_state_id;
1827                    let state = self
1828                        .states
1829                        .get_mut(&actual_state_id)
1830                        .ok_or("State table not found")?;
1831
1832                    let pda_val = self.registers[*pda_address].clone();
1833                    let pk_val = self.registers[*primary_key].clone();
1834
1835                    if let (Some(pda_str), Some(pk_str)) = (pda_val.as_str(), pk_val.as_str()) {
1836                        let pda_lookup = state
1837                            .pda_reverse_lookups
1838                            .entry(lookup_name.clone())
1839                            .or_insert_with(|| {
1840                                PdaReverseLookup::new(DEFAULT_MAX_PDA_REVERSE_LOOKUP_ENTRIES)
1841                            });
1842
1843                        pda_lookup.insert(pda_str.to_string(), pk_str.to_string());
1844                        self.last_pda_registered = Some(pda_str.to_string());
1845                    } else if !pk_val.is_null() {
1846                        if let Some(pk_num) = pk_val.as_u64() {
1847                            if let Some(pda_str) = pda_val.as_str() {
1848                                let pda_lookup = state
1849                                    .pda_reverse_lookups
1850                                    .entry(lookup_name.clone())
1851                                    .or_insert_with(|| {
1852                                        PdaReverseLookup::new(
1853                                            DEFAULT_MAX_PDA_REVERSE_LOOKUP_ENTRIES,
1854                                        )
1855                                    });
1856
1857                                pda_lookup.insert(pda_str.to_string(), pk_num.to_string());
1858                                self.last_pda_registered = Some(pda_str.to_string());
1859                            }
1860                        }
1861                    }
1862
1863                    pc += 1;
1864                }
1865            }
1866
1867            self.instructions_executed += 1;
1868        }
1869
1870        Ok(output)
1871    }
1872
1873    fn load_field(
1874        &self,
1875        event_value: &Value,
1876        path: &FieldPath,
1877        default: Option<&Value>,
1878    ) -> Result<Value> {
1879        if path.segments.is_empty() {
1880            if let Some(obj) = event_value.as_object() {
1881                let filtered: serde_json::Map<String, Value> = obj
1882                    .iter()
1883                    .filter(|(k, _)| !k.starts_with("__"))
1884                    .map(|(k, v)| (k.clone(), v.clone()))
1885                    .collect();
1886                return Ok(Value::Object(filtered));
1887            }
1888            return Ok(event_value.clone());
1889        }
1890
1891        let mut current = event_value;
1892        for segment in path.segments.iter() {
1893            current = match current.get(segment) {
1894                Some(v) => v,
1895                None => return Ok(default.cloned().unwrap_or(Value::Null)),
1896            };
1897        }
1898
1899        Ok(current.clone())
1900    }
1901
1902    fn get_value_at_path(value: &Value, path: &str) -> Option<Value> {
1903        let mut current = value;
1904        for segment in path.split('.') {
1905            current = current.get(segment)?;
1906        }
1907        Some(current.clone())
1908    }
1909
1910    fn set_field_auto_vivify(
1911        &mut self,
1912        object_reg: Register,
1913        path: &str,
1914        value_reg: Register,
1915    ) -> Result<()> {
1916        let compiled = self.get_compiled_path(path);
1917        let segments = compiled.segments();
1918        let value = self.registers[value_reg].clone();
1919
1920        if !self.registers[object_reg].is_object() {
1921            self.registers[object_reg] = json!({});
1922        }
1923
1924        let obj = self.registers[object_reg]
1925            .as_object_mut()
1926            .ok_or("Not an object")?;
1927
1928        let mut current = obj;
1929        for (i, segment) in segments.iter().enumerate() {
1930            if i == segments.len() - 1 {
1931                current.insert(segment.to_string(), value);
1932                return Ok(());
1933            } else {
1934                current
1935                    .entry(segment.to_string())
1936                    .or_insert_with(|| json!({}));
1937                current = current
1938                    .get_mut(segment)
1939                    .and_then(|v| v.as_object_mut())
1940                    .ok_or("Path collision: expected object")?;
1941            }
1942        }
1943
1944        Ok(())
1945    }
1946
1947    fn set_field_if_null(
1948        &mut self,
1949        object_reg: Register,
1950        path: &str,
1951        value_reg: Register,
1952    ) -> Result<bool> {
1953        let compiled = self.get_compiled_path(path);
1954        let segments = compiled.segments();
1955        let value = self.registers[value_reg].clone();
1956
1957        if !self.registers[object_reg].is_object() {
1958            self.registers[object_reg] = json!({});
1959        }
1960
1961        let obj = self.registers[object_reg]
1962            .as_object_mut()
1963            .ok_or("Not an object")?;
1964
1965        let mut current = obj;
1966        for (i, segment) in segments.iter().enumerate() {
1967            if i == segments.len() - 1 {
1968                if !current.contains_key(segment) || current.get(segment).unwrap().is_null() {
1969                    current.insert(segment.to_string(), value);
1970                    return Ok(true);
1971                }
1972                return Ok(false);
1973            } else {
1974                current
1975                    .entry(segment.to_string())
1976                    .or_insert_with(|| json!({}));
1977                current = current
1978                    .get_mut(segment)
1979                    .and_then(|v| v.as_object_mut())
1980                    .ok_or("Path collision: expected object")?;
1981            }
1982        }
1983
1984        Ok(false)
1985    }
1986
1987    fn set_field_max(
1988        &mut self,
1989        object_reg: Register,
1990        path: &str,
1991        value_reg: Register,
1992    ) -> Result<bool> {
1993        let compiled = self.get_compiled_path(path);
1994        let segments = compiled.segments();
1995        let new_value = self.registers[value_reg].clone();
1996
1997        if !self.registers[object_reg].is_object() {
1998            self.registers[object_reg] = json!({});
1999        }
2000
2001        let obj = self.registers[object_reg]
2002            .as_object_mut()
2003            .ok_or("Not an object")?;
2004
2005        let mut current = obj;
2006        for (i, segment) in segments.iter().enumerate() {
2007            if i == segments.len() - 1 {
2008                let should_update = if let Some(current_value) = current.get(segment) {
2009                    if current_value.is_null() {
2010                        true
2011                    } else {
2012                        match (current_value.as_i64(), new_value.as_i64()) {
2013                            (Some(current_val), Some(new_val)) => new_val > current_val,
2014                            (Some(current_val), None) if new_value.as_u64().is_some() => {
2015                                new_value.as_u64().unwrap() as i64 > current_val
2016                            }
2017                            (None, Some(new_val)) if current_value.as_u64().is_some() => {
2018                                new_val > current_value.as_u64().unwrap() as i64
2019                            }
2020                            (None, None) => match (current_value.as_u64(), new_value.as_u64()) {
2021                                (Some(current_val), Some(new_val)) => new_val > current_val,
2022                                _ => match (current_value.as_f64(), new_value.as_f64()) {
2023                                    (Some(current_val), Some(new_val)) => new_val > current_val,
2024                                    _ => false,
2025                                },
2026                            },
2027                            _ => false,
2028                        }
2029                    }
2030                } else {
2031                    true
2032                };
2033
2034                if should_update {
2035                    current.insert(segment.to_string(), new_value);
2036                    return Ok(true);
2037                }
2038                return Ok(false);
2039            } else {
2040                current
2041                    .entry(segment.to_string())
2042                    .or_insert_with(|| json!({}));
2043                current = current
2044                    .get_mut(segment)
2045                    .and_then(|v| v.as_object_mut())
2046                    .ok_or("Path collision: expected object")?;
2047            }
2048        }
2049
2050        Ok(false)
2051    }
2052
2053    fn set_field_sum(
2054        &mut self,
2055        object_reg: Register,
2056        path: &str,
2057        value_reg: Register,
2058    ) -> Result<bool> {
2059        let compiled = self.get_compiled_path(path);
2060        let segments = compiled.segments();
2061        let new_value = &self.registers[value_reg];
2062
2063        // Extract numeric value before borrowing object_reg mutably
2064        let new_val_num = new_value
2065            .as_i64()
2066            .or_else(|| new_value.as_u64().map(|n| n as i64))
2067            .ok_or("Sum requires numeric value")?;
2068
2069        if !self.registers[object_reg].is_object() {
2070            self.registers[object_reg] = json!({});
2071        }
2072
2073        let obj = self.registers[object_reg]
2074            .as_object_mut()
2075            .ok_or("Not an object")?;
2076
2077        let mut current = obj;
2078        for (i, segment) in segments.iter().enumerate() {
2079            if i == segments.len() - 1 {
2080                let current_val = current
2081                    .get(segment)
2082                    .and_then(|v| {
2083                        if v.is_null() {
2084                            None
2085                        } else {
2086                            v.as_i64().or_else(|| v.as_u64().map(|n| n as i64))
2087                        }
2088                    })
2089                    .unwrap_or(0);
2090
2091                let sum = current_val + new_val_num;
2092                current.insert(segment.to_string(), json!(sum));
2093                return Ok(true);
2094            } else {
2095                current
2096                    .entry(segment.to_string())
2097                    .or_insert_with(|| json!({}));
2098                current = current
2099                    .get_mut(segment)
2100                    .and_then(|v| v.as_object_mut())
2101                    .ok_or("Path collision: expected object")?;
2102            }
2103        }
2104
2105        Ok(false)
2106    }
2107
2108    fn set_field_increment(&mut self, object_reg: Register, path: &str) -> Result<bool> {
2109        let compiled = self.get_compiled_path(path);
2110        let segments = compiled.segments();
2111
2112        if !self.registers[object_reg].is_object() {
2113            self.registers[object_reg] = json!({});
2114        }
2115
2116        let obj = self.registers[object_reg]
2117            .as_object_mut()
2118            .ok_or("Not an object")?;
2119
2120        let mut current = obj;
2121        for (i, segment) in segments.iter().enumerate() {
2122            if i == segments.len() - 1 {
2123                // Get current value (default to 0 if null/missing)
2124                let current_val = current
2125                    .get(segment)
2126                    .and_then(|v| {
2127                        if v.is_null() {
2128                            None
2129                        } else {
2130                            v.as_i64().or_else(|| v.as_u64().map(|n| n as i64))
2131                        }
2132                    })
2133                    .unwrap_or(0);
2134
2135                let incremented = current_val + 1;
2136                current.insert(segment.to_string(), json!(incremented));
2137                return Ok(true);
2138            } else {
2139                current
2140                    .entry(segment.to_string())
2141                    .or_insert_with(|| json!({}));
2142                current = current
2143                    .get_mut(segment)
2144                    .and_then(|v| v.as_object_mut())
2145                    .ok_or("Path collision: expected object")?;
2146            }
2147        }
2148
2149        Ok(false)
2150    }
2151
2152    fn set_field_min(
2153        &mut self,
2154        object_reg: Register,
2155        path: &str,
2156        value_reg: Register,
2157    ) -> Result<bool> {
2158        let compiled = self.get_compiled_path(path);
2159        let segments = compiled.segments();
2160        let new_value = self.registers[value_reg].clone();
2161
2162        if !self.registers[object_reg].is_object() {
2163            self.registers[object_reg] = json!({});
2164        }
2165
2166        let obj = self.registers[object_reg]
2167            .as_object_mut()
2168            .ok_or("Not an object")?;
2169
2170        let mut current = obj;
2171        for (i, segment) in segments.iter().enumerate() {
2172            if i == segments.len() - 1 {
2173                let should_update = if let Some(current_value) = current.get(segment) {
2174                    if current_value.is_null() {
2175                        true
2176                    } else {
2177                        match (current_value.as_i64(), new_value.as_i64()) {
2178                            (Some(current_val), Some(new_val)) => new_val < current_val,
2179                            (Some(current_val), None) if new_value.as_u64().is_some() => {
2180                                (new_value.as_u64().unwrap() as i64) < current_val
2181                            }
2182                            (None, Some(new_val)) if current_value.as_u64().is_some() => {
2183                                new_val < current_value.as_u64().unwrap() as i64
2184                            }
2185                            (None, None) => match (current_value.as_u64(), new_value.as_u64()) {
2186                                (Some(current_val), Some(new_val)) => new_val < current_val,
2187                                _ => match (current_value.as_f64(), new_value.as_f64()) {
2188                                    (Some(current_val), Some(new_val)) => new_val < current_val,
2189                                    _ => false,
2190                                },
2191                            },
2192                            _ => false,
2193                        }
2194                    }
2195                } else {
2196                    true
2197                };
2198
2199                if should_update {
2200                    current.insert(segment.to_string(), new_value);
2201                    return Ok(true);
2202                }
2203                return Ok(false);
2204            } else {
2205                current
2206                    .entry(segment.to_string())
2207                    .or_insert_with(|| json!({}));
2208                current = current
2209                    .get_mut(segment)
2210                    .and_then(|v| v.as_object_mut())
2211                    .ok_or("Path collision: expected object")?;
2212            }
2213        }
2214
2215        Ok(false)
2216    }
2217
2218    fn get_field(&mut self, object_reg: Register, path: &str) -> Result<Value> {
2219        let compiled = self.get_compiled_path(path);
2220        let segments = compiled.segments();
2221        let mut current = &self.registers[object_reg];
2222
2223        for segment in segments {
2224            current = current
2225                .get(segment)
2226                .ok_or_else(|| format!("Field not found: {}", segment))?;
2227        }
2228
2229        Ok(current.clone())
2230    }
2231
2232    fn append_to_array(
2233        &mut self,
2234        object_reg: Register,
2235        path: &str,
2236        value_reg: Register,
2237        max_length: usize,
2238    ) -> Result<()> {
2239        let compiled = self.get_compiled_path(path);
2240        let segments = compiled.segments();
2241        let value = self.registers[value_reg].clone();
2242
2243        if !self.registers[object_reg].is_object() {
2244            self.registers[object_reg] = json!({});
2245        }
2246
2247        let obj = self.registers[object_reg]
2248            .as_object_mut()
2249            .ok_or("Not an object")?;
2250
2251        let mut current = obj;
2252        for (i, segment) in segments.iter().enumerate() {
2253            if i == segments.len() - 1 {
2254                current
2255                    .entry(segment.to_string())
2256                    .or_insert_with(|| json!([]));
2257                let arr = current
2258                    .get_mut(segment)
2259                    .and_then(|v| v.as_array_mut())
2260                    .ok_or("Path is not an array")?;
2261                arr.push(value.clone());
2262
2263                if arr.len() > max_length {
2264                    let excess = arr.len() - max_length;
2265                    arr.drain(0..excess);
2266                }
2267            } else {
2268                current
2269                    .entry(segment.to_string())
2270                    .or_insert_with(|| json!({}));
2271                current = current
2272                    .get_mut(segment)
2273                    .and_then(|v| v.as_object_mut())
2274                    .ok_or("Path collision: expected object")?;
2275            }
2276        }
2277
2278        Ok(())
2279    }
2280
2281    fn transform_in_place(&mut self, reg: Register, transformation: &Transformation) -> Result<()> {
2282        let value = &self.registers[reg];
2283        let transformed = self.apply_transformation(value, transformation)?;
2284        self.registers[reg] = transformed;
2285        Ok(())
2286    }
2287
2288    fn apply_transformation(
2289        &self,
2290        value: &Value,
2291        transformation: &Transformation,
2292    ) -> Result<Value> {
2293        match transformation {
2294            Transformation::HexEncode => {
2295                if let Some(arr) = value.as_array() {
2296                    let bytes: Vec<u8> = arr
2297                        .iter()
2298                        .filter_map(|v| v.as_u64().map(|n| n as u8))
2299                        .collect();
2300                    let hex = hex::encode(&bytes);
2301                    Ok(json!(hex))
2302                } else {
2303                    Err("HexEncode requires an array of numbers".into())
2304                }
2305            }
2306            Transformation::HexDecode => {
2307                if let Some(s) = value.as_str() {
2308                    let s = s.strip_prefix("0x").unwrap_or(s);
2309                    let bytes = hex::decode(s).map_err(|e| format!("Hex decode error: {}", e))?;
2310                    Ok(json!(bytes))
2311                } else {
2312                    Err("HexDecode requires a string".into())
2313                }
2314            }
2315            Transformation::Base58Encode => {
2316                if let Some(arr) = value.as_array() {
2317                    let bytes: Vec<u8> = arr
2318                        .iter()
2319                        .filter_map(|v| v.as_u64().map(|n| n as u8))
2320                        .collect();
2321                    let encoded = bs58::encode(&bytes).into_string();
2322                    Ok(json!(encoded))
2323                } else if value.is_string() {
2324                    Ok(value.clone())
2325                } else {
2326                    Err("Base58Encode requires an array of numbers".into())
2327                }
2328            }
2329            Transformation::Base58Decode => {
2330                if let Some(s) = value.as_str() {
2331                    let bytes = bs58::decode(s)
2332                        .into_vec()
2333                        .map_err(|e| format!("Base58 decode error: {}", e))?;
2334                    Ok(json!(bytes))
2335                } else {
2336                    Err("Base58Decode requires a string".into())
2337                }
2338            }
2339            Transformation::ToString => Ok(json!(value.to_string())),
2340            Transformation::ToNumber => {
2341                if let Some(s) = value.as_str() {
2342                    let n = s
2343                        .parse::<i64>()
2344                        .map_err(|e| format!("Parse error: {}", e))?;
2345                    Ok(json!(n))
2346                } else {
2347                    Ok(value.clone())
2348                }
2349            }
2350        }
2351    }
2352
2353    fn evaluate_comparison(
2354        &self,
2355        field_value: &Value,
2356        op: &ComparisonOp,
2357        condition_value: &Value,
2358    ) -> Result<bool> {
2359        use ComparisonOp::*;
2360
2361        match op {
2362            Equal => Ok(field_value == condition_value),
2363            NotEqual => Ok(field_value != condition_value),
2364            GreaterThan => {
2365                // Try to compare as numbers
2366                match (field_value.as_i64(), condition_value.as_i64()) {
2367                    (Some(a), Some(b)) => Ok(a > b),
2368                    _ => match (field_value.as_u64(), condition_value.as_u64()) {
2369                        (Some(a), Some(b)) => Ok(a > b),
2370                        _ => match (field_value.as_f64(), condition_value.as_f64()) {
2371                            (Some(a), Some(b)) => Ok(a > b),
2372                            _ => Err("Cannot compare non-numeric values with GreaterThan".into()),
2373                        },
2374                    },
2375                }
2376            }
2377            GreaterThanOrEqual => match (field_value.as_i64(), condition_value.as_i64()) {
2378                (Some(a), Some(b)) => Ok(a >= b),
2379                _ => match (field_value.as_u64(), condition_value.as_u64()) {
2380                    (Some(a), Some(b)) => Ok(a >= b),
2381                    _ => match (field_value.as_f64(), condition_value.as_f64()) {
2382                        (Some(a), Some(b)) => Ok(a >= b),
2383                        _ => {
2384                            Err("Cannot compare non-numeric values with GreaterThanOrEqual".into())
2385                        }
2386                    },
2387                },
2388            },
2389            LessThan => match (field_value.as_i64(), condition_value.as_i64()) {
2390                (Some(a), Some(b)) => Ok(a < b),
2391                _ => match (field_value.as_u64(), condition_value.as_u64()) {
2392                    (Some(a), Some(b)) => Ok(a < b),
2393                    _ => match (field_value.as_f64(), condition_value.as_f64()) {
2394                        (Some(a), Some(b)) => Ok(a < b),
2395                        _ => Err("Cannot compare non-numeric values with LessThan".into()),
2396                    },
2397                },
2398            },
2399            LessThanOrEqual => match (field_value.as_i64(), condition_value.as_i64()) {
2400                (Some(a), Some(b)) => Ok(a <= b),
2401                _ => match (field_value.as_u64(), condition_value.as_u64()) {
2402                    (Some(a), Some(b)) => Ok(a <= b),
2403                    _ => match (field_value.as_f64(), condition_value.as_f64()) {
2404                        (Some(a), Some(b)) => Ok(a <= b),
2405                        _ => Err("Cannot compare non-numeric values with LessThanOrEqual".into()),
2406                    },
2407                },
2408            },
2409        }
2410    }
2411
2412    /// Update a PDA reverse lookup and return pending updates for reprocessing.
2413    /// Returns any pending account updates that were queued for this PDA.
2414    /// ```ignore
2415    /// let pending = vm.update_pda_reverse_lookup(state_id, lookup_name, pda_addr, seed)?;
2416    /// for update in pending {
2417    ///     vm.process_event(&bytecode, update.account_data, &update.account_type, None, None)?;
2418    /// }
2419    /// ```
2420    #[cfg_attr(feature = "otel", instrument(
2421        name = "vm.update_pda_lookup",
2422        skip(self),
2423        fields(
2424            pda = %pda_address,
2425            seed = %seed_value,
2426        )
2427    ))]
2428    pub fn update_pda_reverse_lookup(
2429        &mut self,
2430        state_id: u32,
2431        lookup_name: &str,
2432        pda_address: String,
2433        seed_value: String,
2434    ) -> Result<Vec<PendingAccountUpdate>> {
2435        let state = self
2436            .states
2437            .get_mut(&state_id)
2438            .ok_or("State table not found")?;
2439
2440        let lookup = state
2441            .pda_reverse_lookups
2442            .entry(lookup_name.to_string())
2443            .or_insert_with(|| PdaReverseLookup::new(DEFAULT_MAX_PDA_REVERSE_LOOKUP_ENTRIES));
2444
2445        let evicted_pda = lookup.insert(pda_address.clone(), seed_value);
2446
2447        if let Some(ref evicted) = evicted_pda {
2448            if let Some((_, evicted_updates)) = state.pending_updates.remove(evicted) {
2449                let count = evicted_updates.len();
2450                self.pending_queue_size = self.pending_queue_size.saturating_sub(count as u64);
2451            }
2452        }
2453
2454        // Flush and return pending updates for this PDA
2455        self.flush_pending_updates(state_id, &pda_address)
2456    }
2457
2458    /// Clean up expired pending updates that are older than the TTL
2459    ///
2460    /// Returns the number of updates that were removed.
2461    /// This should be called periodically to prevent memory leaks from orphaned updates.
2462    pub fn cleanup_expired_pending_updates(&mut self, state_id: u32) -> usize {
2463        let state = match self.states.get_mut(&state_id) {
2464            Some(s) => s,
2465            None => return 0,
2466        };
2467
2468        let now = std::time::SystemTime::now()
2469            .duration_since(std::time::UNIX_EPOCH)
2470            .unwrap()
2471            .as_secs() as i64;
2472
2473        let mut removed_count = 0;
2474
2475        // Iterate through all pending updates and remove expired ones
2476        state.pending_updates.retain(|_pda_address, updates| {
2477            let original_len = updates.len();
2478
2479            updates.retain(|update| {
2480                let age = now - update.queued_at;
2481                age <= PENDING_UPDATE_TTL_SECONDS
2482            });
2483
2484            removed_count += original_len - updates.len();
2485
2486            // Remove the entry entirely if no updates remain
2487            !updates.is_empty()
2488        });
2489
2490        // Update the global counter
2491        self.pending_queue_size = self.pending_queue_size.saturating_sub(removed_count as u64);
2492
2493        if removed_count > 0 {
2494            #[cfg(feature = "otel")]
2495            crate::vm_metrics::record_pending_updates_expired(
2496                removed_count as u64,
2497                &state.entity_name,
2498            );
2499        }
2500
2501        removed_count
2502    }
2503
2504    /// Queue an account update for later processing when PDA reverse lookup is not yet available
2505    ///
2506    /// # Workflow
2507    ///
2508    /// This implements a deferred processing pattern for account updates when the PDA reverse
2509    /// lookup needed to resolve the primary key is not yet available:
2510    ///
2511    /// 1. **Initial Account Update**: When an account update arrives but the PDA reverse lookup
2512    ///    is not available, call `queue_account_update()` to queue it for later.
2513    ///
2514    /// 2. **Register PDA Mapping**: When the instruction that establishes the PDA mapping is
2515    ///    processed, call `update_pda_reverse_lookup()` which returns pending updates.
2516    ///
2517    /// 3. **Reprocess Pending Updates**: Process the returned pending updates through the VM:
2518    ///    ```ignore
2519    ///    let pending = vm.update_pda_reverse_lookup(state_id, lookup_name, pda_addr, seed)?;
2520    ///    for update in pending {
2521    ///        let mutations = vm.process_event(
2522    ///            &bytecode, update.account_data, &update.account_type, None, None
2523    ///        )?;
2524    ///    }
2525    ///    ```
2526    ///
2527    /// # Arguments
2528    ///
2529    /// * `state_id` - The state table ID
2530    /// * `pda_address` - The PDA address that needs reverse lookup
2531    /// * `account_type` - The event type name for reprocessing
2532    /// * `account_data` - The account data (event value) for reprocessing
2533    /// * `slot` - The slot number when this update occurred
2534    /// * `signature` - The transaction signature
2535    #[cfg_attr(feature = "otel", instrument(
2536        name = "vm.queue_account_update",
2537        skip(self, update),
2538        fields(
2539            pda = %update.pda_address,
2540            account_type = %update.account_type,
2541            slot = update.slot,
2542        )
2543    ))]
2544    pub fn queue_account_update(
2545        &mut self,
2546        state_id: u32,
2547        update: QueuedAccountUpdate,
2548    ) -> Result<()> {
2549        if self.pending_queue_size >= MAX_PENDING_UPDATES_TOTAL as u64 {
2550            self.cleanup_expired_pending_updates(state_id);
2551            if self.pending_queue_size >= MAX_PENDING_UPDATES_TOTAL as u64 {
2552                self.drop_oldest_pending_update(state_id)?;
2553            }
2554        }
2555
2556        let state = self
2557            .states
2558            .get_mut(&state_id)
2559            .ok_or("State table not found")?;
2560
2561        let pending = PendingAccountUpdate {
2562            account_type: update.account_type,
2563            pda_address: update.pda_address.clone(),
2564            account_data: update.account_data,
2565            slot: update.slot,
2566            write_version: update.write_version,
2567            signature: update.signature,
2568            queued_at: std::time::SystemTime::now()
2569                .duration_since(std::time::UNIX_EPOCH)
2570                .unwrap()
2571                .as_secs() as i64,
2572        };
2573
2574        let pda_address = pending.pda_address.clone();
2575        let slot = pending.slot;
2576
2577        let mut updates = state
2578            .pending_updates
2579            .entry(pda_address.clone())
2580            .or_insert_with(Vec::new);
2581
2582        let original_len = updates.len();
2583        updates.retain(|existing| existing.slot > slot);
2584        let removed_by_dedup = original_len - updates.len();
2585
2586        if removed_by_dedup > 0 {
2587            self.pending_queue_size = self
2588                .pending_queue_size
2589                .saturating_sub(removed_by_dedup as u64);
2590        }
2591
2592        if updates.len() >= MAX_PENDING_UPDATES_PER_PDA {
2593            updates.remove(0);
2594            self.pending_queue_size = self.pending_queue_size.saturating_sub(1);
2595        }
2596
2597        updates.push(pending);
2598        #[cfg(feature = "otel")]
2599        crate::vm_metrics::record_pending_update_queued(&state.entity_name);
2600
2601        Ok(())
2602    }
2603
2604    pub fn queue_instruction_event(
2605        &mut self,
2606        state_id: u32,
2607        event: QueuedInstructionEvent,
2608    ) -> Result<()> {
2609        let state = self
2610            .states
2611            .get_mut(&state_id)
2612            .ok_or("State table not found")?;
2613
2614        let pda_address = event.pda_address.clone();
2615
2616        let pending = PendingInstructionEvent {
2617            event_type: event.event_type,
2618            pda_address: event.pda_address,
2619            event_data: event.event_data,
2620            slot: event.slot,
2621            signature: event.signature,
2622            queued_at: std::time::SystemTime::now()
2623                .duration_since(std::time::UNIX_EPOCH)
2624                .unwrap()
2625                .as_secs() as i64,
2626        };
2627
2628        let mut events = state
2629            .pending_instruction_events
2630            .entry(pda_address)
2631            .or_insert_with(Vec::new);
2632
2633        if events.len() >= MAX_PENDING_UPDATES_PER_PDA {
2634            events.remove(0);
2635        }
2636
2637        events.push(pending);
2638
2639        Ok(())
2640    }
2641
2642    pub fn take_last_pda_lookup_miss(&mut self) -> Option<String> {
2643        self.last_pda_lookup_miss.take()
2644    }
2645
2646    pub fn take_last_pda_registered(&mut self) -> Option<String> {
2647        self.last_pda_registered.take()
2648    }
2649
2650    pub fn flush_pending_instruction_events(
2651        &mut self,
2652        state_id: u32,
2653        pda_address: &str,
2654    ) -> Vec<PendingInstructionEvent> {
2655        let state = match self.states.get_mut(&state_id) {
2656            Some(s) => s,
2657            None => return Vec::new(),
2658        };
2659
2660        if let Some((_, events)) = state.pending_instruction_events.remove(pda_address) {
2661            events
2662        } else {
2663            Vec::new()
2664        }
2665    }
2666
2667    /// Get statistics about the pending queue for monitoring
2668    pub fn get_pending_queue_stats(&self, state_id: u32) -> Option<PendingQueueStats> {
2669        let state = self.states.get(&state_id)?;
2670
2671        let now = std::time::SystemTime::now()
2672            .duration_since(std::time::UNIX_EPOCH)
2673            .unwrap()
2674            .as_secs() as i64;
2675
2676        let mut total_updates = 0;
2677        let mut oldest_timestamp = now;
2678        let mut largest_pda_queue = 0;
2679        let mut estimated_memory = 0;
2680
2681        for entry in state.pending_updates.iter() {
2682            let (_, updates) = entry.pair();
2683            total_updates += updates.len();
2684            largest_pda_queue = largest_pda_queue.max(updates.len());
2685
2686            for update in updates.iter() {
2687                oldest_timestamp = oldest_timestamp.min(update.queued_at);
2688                // Rough memory estimate
2689                estimated_memory += update.account_type.len() +
2690                                   update.pda_address.len() +
2691                                   update.signature.len() +
2692                                   16 + // slot + queued_at
2693                                   estimate_json_size(&update.account_data);
2694            }
2695        }
2696
2697        Some(PendingQueueStats {
2698            total_updates,
2699            unique_pdas: state.pending_updates.len(),
2700            oldest_age_seconds: now - oldest_timestamp,
2701            largest_pda_queue_size: largest_pda_queue,
2702            estimated_memory_bytes: estimated_memory,
2703        })
2704    }
2705
2706    pub fn get_memory_stats(&self, state_id: u32) -> VmMemoryStats {
2707        let mut stats = VmMemoryStats {
2708            path_cache_size: self.path_cache.len(),
2709            ..Default::default()
2710        };
2711
2712        if let Some(state) = self.states.get(&state_id) {
2713            stats.state_table_entity_count = state.data.len();
2714            stats.state_table_max_entries = state.config.max_entries;
2715            stats.state_table_at_capacity = state.is_at_capacity();
2716
2717            stats.lookup_index_count = state.lookup_indexes.len();
2718            stats.lookup_index_total_entries =
2719                state.lookup_indexes.values().map(|idx| idx.len()).sum();
2720
2721            stats.temporal_index_count = state.temporal_indexes.len();
2722            stats.temporal_index_total_entries = state
2723                .temporal_indexes
2724                .values()
2725                .map(|idx| idx.total_entries())
2726                .sum();
2727
2728            stats.pda_reverse_lookup_count = state.pda_reverse_lookups.len();
2729            stats.pda_reverse_lookup_total_entries = state
2730                .pda_reverse_lookups
2731                .values()
2732                .map(|lookup| lookup.len())
2733                .sum();
2734
2735            stats.version_tracker_entries = state.version_tracker.len();
2736
2737            stats.pending_queue_stats = self.get_pending_queue_stats(state_id);
2738        }
2739
2740        stats
2741    }
2742
2743    pub fn cleanup_all_expired(&mut self, state_id: u32) -> CleanupResult {
2744        let pending_removed = self.cleanup_expired_pending_updates(state_id);
2745        let temporal_removed = self.cleanup_temporal_indexes(state_id);
2746
2747        #[cfg(feature = "otel")]
2748        if let Some(state) = self.states.get(&state_id) {
2749            crate::vm_metrics::record_cleanup(
2750                pending_removed,
2751                temporal_removed,
2752                &state.entity_name,
2753            );
2754        }
2755
2756        CleanupResult {
2757            pending_updates_removed: pending_removed,
2758            temporal_entries_removed: temporal_removed,
2759        }
2760    }
2761
2762    fn cleanup_temporal_indexes(&mut self, state_id: u32) -> usize {
2763        let state = match self.states.get_mut(&state_id) {
2764            Some(s) => s,
2765            None => return 0,
2766        };
2767
2768        let now = std::time::SystemTime::now()
2769            .duration_since(std::time::UNIX_EPOCH)
2770            .unwrap()
2771            .as_secs() as i64;
2772
2773        let cutoff = now - TEMPORAL_HISTORY_TTL_SECONDS;
2774        let mut total_removed = 0;
2775
2776        for (_, index) in state.temporal_indexes.iter_mut() {
2777            total_removed += index.cleanup_expired(cutoff);
2778        }
2779
2780        total_removed
2781    }
2782
2783    pub fn check_state_table_capacity(&self, state_id: u32) -> Option<CapacityWarning> {
2784        let state = self.states.get(&state_id)?;
2785
2786        if state.is_at_capacity() {
2787            Some(CapacityWarning {
2788                current_entries: state.data.len(),
2789                max_entries: state.config.max_entries,
2790                entries_over_limit: state.entries_over_limit(),
2791            })
2792        } else {
2793            None
2794        }
2795    }
2796
2797    /// Drop the oldest pending update across all PDAs
2798    fn drop_oldest_pending_update(&mut self, state_id: u32) -> Result<()> {
2799        let state = self
2800            .states
2801            .get_mut(&state_id)
2802            .ok_or("State table not found")?;
2803
2804        let mut oldest_pda: Option<String> = None;
2805        let mut oldest_timestamp = i64::MAX;
2806
2807        // Find the PDA with the oldest update
2808        for entry in state.pending_updates.iter() {
2809            let (pda, updates) = entry.pair();
2810            if let Some(update) = updates.first() {
2811                if update.queued_at < oldest_timestamp {
2812                    oldest_timestamp = update.queued_at;
2813                    oldest_pda = Some(pda.clone());
2814                }
2815            }
2816        }
2817
2818        // Remove the oldest update
2819        if let Some(pda) = oldest_pda {
2820            if let Some(mut updates) = state.pending_updates.get_mut(&pda) {
2821                if !updates.is_empty() {
2822                    updates.remove(0);
2823                    self.pending_queue_size = self.pending_queue_size.saturating_sub(1);
2824
2825                    // Remove the entry if it's now empty
2826                    if updates.is_empty() {
2827                        drop(updates);
2828                        state.pending_updates.remove(&pda);
2829                    }
2830                }
2831            }
2832        }
2833
2834        Ok(())
2835    }
2836
2837    /// Flush and return pending updates for a PDA for external reprocessing
2838    ///
2839    /// Returns the pending updates that were queued for this PDA address.
2840    /// The caller should reprocess these through the VM using process_event().
2841    fn flush_pending_updates(
2842        &mut self,
2843        state_id: u32,
2844        pda_address: &str,
2845    ) -> Result<Vec<PendingAccountUpdate>> {
2846        let state = self
2847            .states
2848            .get_mut(&state_id)
2849            .ok_or("State table not found")?;
2850
2851        if let Some((_, pending_updates)) = state.pending_updates.remove(pda_address) {
2852            let count = pending_updates.len();
2853            self.pending_queue_size = self.pending_queue_size.saturating_sub(count as u64);
2854            #[cfg(feature = "otel")]
2855            crate::vm_metrics::record_pending_updates_flushed(count as u64, &state.entity_name);
2856            Ok(pending_updates)
2857        } else {
2858            Ok(Vec::new())
2859        }
2860    }
2861
2862    /// Try to resolve a primary key via PDA reverse lookup
2863    pub fn try_pda_reverse_lookup(
2864        &mut self,
2865        state_id: u32,
2866        lookup_name: &str,
2867        pda_address: &str,
2868    ) -> Option<String> {
2869        let state = self.states.get_mut(&state_id)?;
2870
2871        if let Some(lookup) = state.pda_reverse_lookups.get_mut(lookup_name) {
2872            if let Some(value) = lookup.lookup(pda_address) {
2873                self.pda_cache_hits += 1;
2874                return Some(value);
2875            }
2876        }
2877
2878        self.pda_cache_misses += 1;
2879        None
2880    }
2881
2882    // ============================================================================
2883    // Computed Expression Evaluator (Task 5)
2884    // ============================================================================
2885
2886    /// Evaluate a computed expression AST against the current state
2887    /// This is the core runtime evaluator for computed fields from the AST
2888    pub fn evaluate_computed_expr(&self, expr: &ComputedExpr, state: &Value) -> Result<Value> {
2889        self.evaluate_computed_expr_with_env(expr, state, &std::collections::HashMap::new())
2890    }
2891
2892    /// Evaluate a computed expression with a variable environment (for let bindings)
2893    fn evaluate_computed_expr_with_env(
2894        &self,
2895        expr: &ComputedExpr,
2896        state: &Value,
2897        env: &std::collections::HashMap<String, Value>,
2898    ) -> Result<Value> {
2899        match expr {
2900            ComputedExpr::FieldRef { path } => self.get_field_from_state(state, path),
2901
2902            ComputedExpr::Var { name } => env
2903                .get(name)
2904                .cloned()
2905                .ok_or_else(|| format!("Undefined variable: {}", name).into()),
2906
2907            ComputedExpr::Let { name, value, body } => {
2908                let val = self.evaluate_computed_expr_with_env(value, state, env)?;
2909                let mut new_env = env.clone();
2910                new_env.insert(name.clone(), val);
2911                self.evaluate_computed_expr_with_env(body, state, &new_env)
2912            }
2913
2914            ComputedExpr::If {
2915                condition,
2916                then_branch,
2917                else_branch,
2918            } => {
2919                let cond_val = self.evaluate_computed_expr_with_env(condition, state, env)?;
2920                if self.value_to_bool(&cond_val) {
2921                    self.evaluate_computed_expr_with_env(then_branch, state, env)
2922                } else {
2923                    self.evaluate_computed_expr_with_env(else_branch, state, env)
2924                }
2925            }
2926
2927            ComputedExpr::None => Ok(Value::Null),
2928
2929            ComputedExpr::Some { value } => self.evaluate_computed_expr_with_env(value, state, env),
2930
2931            ComputedExpr::Slice { expr, start, end } => {
2932                let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
2933                match val {
2934                    Value::Array(arr) => {
2935                        let slice: Vec<Value> = arr.get(*start..*end).unwrap_or(&[]).to_vec();
2936                        Ok(Value::Array(slice))
2937                    }
2938                    _ => Err(format!("Cannot slice non-array value: {:?}", val).into()),
2939                }
2940            }
2941
2942            ComputedExpr::Index { expr, index } => {
2943                let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
2944                match val {
2945                    Value::Array(arr) => Ok(arr.get(*index).cloned().unwrap_or(Value::Null)),
2946                    _ => Err(format!("Cannot index non-array value: {:?}", val).into()),
2947                }
2948            }
2949
2950            ComputedExpr::U64FromLeBytes { bytes } => {
2951                let val = self.evaluate_computed_expr_with_env(bytes, state, env)?;
2952                let byte_vec = self.value_to_bytes(&val)?;
2953                if byte_vec.len() < 8 {
2954                    return Err(format!(
2955                        "u64::from_le_bytes requires 8 bytes, got {}",
2956                        byte_vec.len()
2957                    )
2958                    .into());
2959                }
2960                let arr: [u8; 8] = byte_vec[..8]
2961                    .try_into()
2962                    .map_err(|_| "Failed to convert to [u8; 8]")?;
2963                Ok(json!(u64::from_le_bytes(arr)))
2964            }
2965
2966            ComputedExpr::U64FromBeBytes { bytes } => {
2967                let val = self.evaluate_computed_expr_with_env(bytes, state, env)?;
2968                let byte_vec = self.value_to_bytes(&val)?;
2969                if byte_vec.len() < 8 {
2970                    return Err(format!(
2971                        "u64::from_be_bytes requires 8 bytes, got {}",
2972                        byte_vec.len()
2973                    )
2974                    .into());
2975                }
2976                let arr: [u8; 8] = byte_vec[..8]
2977                    .try_into()
2978                    .map_err(|_| "Failed to convert to [u8; 8]")?;
2979                Ok(json!(u64::from_be_bytes(arr)))
2980            }
2981
2982            ComputedExpr::ByteArray { bytes } => {
2983                Ok(Value::Array(bytes.iter().map(|b| json!(*b)).collect()))
2984            }
2985
2986            ComputedExpr::Closure { param, body } => {
2987                // Closures are stored as-is; they're evaluated when used in map()
2988                // Return a special representation
2989                Ok(json!({
2990                    "__closure": {
2991                        "param": param,
2992                        "body": serde_json::to_value(body).unwrap_or(Value::Null)
2993                    }
2994                }))
2995            }
2996
2997            ComputedExpr::Unary { op, expr } => {
2998                let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
2999                self.apply_unary_op(op, &val)
3000            }
3001
3002            ComputedExpr::JsonToBytes { expr } => {
3003                let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
3004                // Convert JSON array of numbers to byte array
3005                let bytes = self.value_to_bytes(&val)?;
3006                Ok(Value::Array(bytes.iter().map(|b| json!(*b)).collect()))
3007            }
3008
3009            ComputedExpr::UnwrapOr { expr, default } => {
3010                let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
3011                if val.is_null() {
3012                    Ok(default.clone())
3013                } else {
3014                    Ok(val)
3015                }
3016            }
3017
3018            ComputedExpr::Binary { op, left, right } => {
3019                let l = self.evaluate_computed_expr_with_env(left, state, env)?;
3020                let r = self.evaluate_computed_expr_with_env(right, state, env)?;
3021                self.apply_binary_op(op, &l, &r)
3022            }
3023
3024            ComputedExpr::Cast { expr, to_type } => {
3025                let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
3026                self.apply_cast(&val, to_type)
3027            }
3028
3029            ComputedExpr::MethodCall { expr, method, args } => {
3030                let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
3031                // Special handling for map() with closures
3032                if method == "map" && args.len() == 1 {
3033                    if let ComputedExpr::Closure { param, body } = &args[0] {
3034                        // If the value is null, return null (Option::None.map returns None)
3035                        if val.is_null() {
3036                            return Ok(Value::Null);
3037                        }
3038                        // Evaluate the closure body with the value bound to param
3039                        let mut closure_env = env.clone();
3040                        closure_env.insert(param.clone(), val);
3041                        return self.evaluate_computed_expr_with_env(body, state, &closure_env);
3042                    }
3043                }
3044                let evaluated_args: Vec<Value> = args
3045                    .iter()
3046                    .map(|a| self.evaluate_computed_expr_with_env(a, state, env))
3047                    .collect::<Result<Vec<_>>>()?;
3048                self.apply_method_call(&val, method, &evaluated_args)
3049            }
3050
3051            ComputedExpr::Literal { value } => Ok(value.clone()),
3052
3053            ComputedExpr::Paren { expr } => self.evaluate_computed_expr_with_env(expr, state, env),
3054        }
3055    }
3056
3057    /// Convert a JSON value to a byte vector
3058    fn value_to_bytes(&self, val: &Value) -> Result<Vec<u8>> {
3059        match val {
3060            Value::Array(arr) => arr
3061                .iter()
3062                .map(|v| {
3063                    v.as_u64()
3064                        .map(|n| n as u8)
3065                        .ok_or_else(|| "Array element not a valid byte".into())
3066                })
3067                .collect(),
3068            Value::String(s) => {
3069                // Try to decode as hex
3070                if s.starts_with("0x") || s.starts_with("0X") {
3071                    hex::decode(&s[2..]).map_err(|e| format!("Invalid hex string: {}", e).into())
3072                } else {
3073                    hex::decode(s).map_err(|e| format!("Invalid hex string: {}", e).into())
3074                }
3075            }
3076            _ => Err(format!("Cannot convert {:?} to bytes", val).into()),
3077        }
3078    }
3079
3080    /// Apply a unary operation
3081    fn apply_unary_op(&self, op: &crate::ast::UnaryOp, val: &Value) -> Result<Value> {
3082        use crate::ast::UnaryOp;
3083        match op {
3084            UnaryOp::Not => Ok(json!(!self.value_to_bool(val))),
3085            UnaryOp::ReverseBits => match val.as_u64() {
3086                Some(n) => Ok(json!(n.reverse_bits())),
3087                None => match val.as_i64() {
3088                    Some(n) => Ok(json!((n as u64).reverse_bits())),
3089                    None => Err("reverse_bits requires an integer".into()),
3090                },
3091            },
3092        }
3093    }
3094
3095    /// Get a field value from state by path (e.g., "section.field" or just "field")
3096    fn get_field_from_state(&self, state: &Value, path: &str) -> Result<Value> {
3097        let segments: Vec<&str> = path.split('.').collect();
3098        let mut current = state;
3099
3100        for segment in segments {
3101            match current.get(segment) {
3102                Some(v) => current = v,
3103                None => return Ok(Value::Null),
3104            }
3105        }
3106
3107        Ok(current.clone())
3108    }
3109
3110    /// Apply a binary operation to two values
3111    fn apply_binary_op(&self, op: &BinaryOp, left: &Value, right: &Value) -> Result<Value> {
3112        match op {
3113            // Arithmetic operations
3114            BinaryOp::Add => self.numeric_op(left, right, |a, b| a + b, |a, b| a + b),
3115            BinaryOp::Sub => self.numeric_op(left, right, |a, b| a - b, |a, b| a - b),
3116            BinaryOp::Mul => self.numeric_op(left, right, |a, b| a * b, |a, b| a * b),
3117            BinaryOp::Div => {
3118                // Check for division by zero
3119                if let Some(r) = right.as_i64() {
3120                    if r == 0 {
3121                        return Err("Division by zero".into());
3122                    }
3123                }
3124                if let Some(r) = right.as_f64() {
3125                    if r == 0.0 {
3126                        return Err("Division by zero".into());
3127                    }
3128                }
3129                self.numeric_op(left, right, |a, b| a / b, |a, b| a / b)
3130            }
3131            BinaryOp::Mod => {
3132                // Modulo - only for integers
3133                match (left.as_i64(), right.as_i64()) {
3134                    (Some(a), Some(b)) if b != 0 => Ok(json!(a % b)),
3135                    (None, _) | (_, None) => match (left.as_u64(), right.as_u64()) {
3136                        (Some(a), Some(b)) if b != 0 => Ok(json!(a % b)),
3137                        _ => Err("Modulo requires non-zero integer operands".into()),
3138                    },
3139                    _ => Err("Modulo by zero".into()),
3140                }
3141            }
3142
3143            // Comparison operations
3144            BinaryOp::Gt => self.comparison_op(left, right, |a, b| a > b, |a, b| a > b),
3145            BinaryOp::Lt => self.comparison_op(left, right, |a, b| a < b, |a, b| a < b),
3146            BinaryOp::Gte => self.comparison_op(left, right, |a, b| a >= b, |a, b| a >= b),
3147            BinaryOp::Lte => self.comparison_op(left, right, |a, b| a <= b, |a, b| a <= b),
3148            BinaryOp::Eq => Ok(json!(left == right)),
3149            BinaryOp::Ne => Ok(json!(left != right)),
3150
3151            // Logical operations
3152            BinaryOp::And => {
3153                let l_bool = self.value_to_bool(left);
3154                let r_bool = self.value_to_bool(right);
3155                Ok(json!(l_bool && r_bool))
3156            }
3157            BinaryOp::Or => {
3158                let l_bool = self.value_to_bool(left);
3159                let r_bool = self.value_to_bool(right);
3160                Ok(json!(l_bool || r_bool))
3161            }
3162
3163            // Bitwise operations
3164            BinaryOp::Xor => match (left.as_u64(), right.as_u64()) {
3165                (Some(a), Some(b)) => Ok(json!(a ^ b)),
3166                _ => match (left.as_i64(), right.as_i64()) {
3167                    (Some(a), Some(b)) => Ok(json!(a ^ b)),
3168                    _ => Err("XOR requires integer operands".into()),
3169                },
3170            },
3171            BinaryOp::BitAnd => match (left.as_u64(), right.as_u64()) {
3172                (Some(a), Some(b)) => Ok(json!(a & b)),
3173                _ => match (left.as_i64(), right.as_i64()) {
3174                    (Some(a), Some(b)) => Ok(json!(a & b)),
3175                    _ => Err("BitAnd requires integer operands".into()),
3176                },
3177            },
3178            BinaryOp::BitOr => match (left.as_u64(), right.as_u64()) {
3179                (Some(a), Some(b)) => Ok(json!(a | b)),
3180                _ => match (left.as_i64(), right.as_i64()) {
3181                    (Some(a), Some(b)) => Ok(json!(a | b)),
3182                    _ => Err("BitOr requires integer operands".into()),
3183                },
3184            },
3185            BinaryOp::Shl => match (left.as_u64(), right.as_u64()) {
3186                (Some(a), Some(b)) => Ok(json!(a << b)),
3187                _ => match (left.as_i64(), right.as_i64()) {
3188                    (Some(a), Some(b)) => Ok(json!(a << b)),
3189                    _ => Err("Shl requires integer operands".into()),
3190                },
3191            },
3192            BinaryOp::Shr => match (left.as_u64(), right.as_u64()) {
3193                (Some(a), Some(b)) => Ok(json!(a >> b)),
3194                _ => match (left.as_i64(), right.as_i64()) {
3195                    (Some(a), Some(b)) => Ok(json!(a >> b)),
3196                    _ => Err("Shr requires integer operands".into()),
3197                },
3198            },
3199        }
3200    }
3201
3202    /// Helper for numeric operations that can work on integers or floats
3203    fn numeric_op<F1, F2>(
3204        &self,
3205        left: &Value,
3206        right: &Value,
3207        int_op: F1,
3208        float_op: F2,
3209    ) -> Result<Value>
3210    where
3211        F1: Fn(i64, i64) -> i64,
3212        F2: Fn(f64, f64) -> f64,
3213    {
3214        // Try i64 first
3215        if let (Some(a), Some(b)) = (left.as_i64(), right.as_i64()) {
3216            return Ok(json!(int_op(a, b)));
3217        }
3218
3219        // Try u64
3220        if let (Some(a), Some(b)) = (left.as_u64(), right.as_u64()) {
3221            // For u64, we need to be careful with underflow in subtraction
3222            return Ok(json!(int_op(a as i64, b as i64)));
3223        }
3224
3225        // Try f64
3226        if let (Some(a), Some(b)) = (left.as_f64(), right.as_f64()) {
3227            return Ok(json!(float_op(a, b)));
3228        }
3229
3230        // If either is null, return null
3231        if left.is_null() || right.is_null() {
3232            return Ok(Value::Null);
3233        }
3234
3235        Err(format!(
3236            "Cannot perform numeric operation on {:?} and {:?}",
3237            left, right
3238        )
3239        .into())
3240    }
3241
3242    /// Helper for comparison operations
3243    fn comparison_op<F1, F2>(
3244        &self,
3245        left: &Value,
3246        right: &Value,
3247        int_cmp: F1,
3248        float_cmp: F2,
3249    ) -> Result<Value>
3250    where
3251        F1: Fn(i64, i64) -> bool,
3252        F2: Fn(f64, f64) -> bool,
3253    {
3254        // Try i64 first
3255        if let (Some(a), Some(b)) = (left.as_i64(), right.as_i64()) {
3256            return Ok(json!(int_cmp(a, b)));
3257        }
3258
3259        // Try u64
3260        if let (Some(a), Some(b)) = (left.as_u64(), right.as_u64()) {
3261            return Ok(json!(int_cmp(a as i64, b as i64)));
3262        }
3263
3264        // Try f64
3265        if let (Some(a), Some(b)) = (left.as_f64(), right.as_f64()) {
3266            return Ok(json!(float_cmp(a, b)));
3267        }
3268
3269        // If either is null, comparison returns false
3270        if left.is_null() || right.is_null() {
3271            return Ok(json!(false));
3272        }
3273
3274        Err(format!("Cannot compare {:?} and {:?}", left, right).into())
3275    }
3276
3277    /// Convert a value to boolean for logical operations
3278    fn value_to_bool(&self, value: &Value) -> bool {
3279        match value {
3280            Value::Null => false,
3281            Value::Bool(b) => *b,
3282            Value::Number(n) => {
3283                if let Some(i) = n.as_i64() {
3284                    i != 0
3285                } else if let Some(f) = n.as_f64() {
3286                    f != 0.0
3287                } else {
3288                    true
3289                }
3290            }
3291            Value::String(s) => !s.is_empty(),
3292            Value::Array(arr) => !arr.is_empty(),
3293            Value::Object(obj) => !obj.is_empty(),
3294        }
3295    }
3296
3297    /// Apply a type cast to a value
3298    fn apply_cast(&self, value: &Value, to_type: &str) -> Result<Value> {
3299        match to_type {
3300            "i8" | "i16" | "i32" | "i64" | "isize" => {
3301                if let Some(n) = value.as_i64() {
3302                    Ok(json!(n))
3303                } else if let Some(n) = value.as_u64() {
3304                    Ok(json!(n as i64))
3305                } else if let Some(n) = value.as_f64() {
3306                    Ok(json!(n as i64))
3307                } else if let Some(s) = value.as_str() {
3308                    s.parse::<i64>()
3309                        .map(|n| json!(n))
3310                        .map_err(|e| format!("Cannot parse '{}' as integer: {}", s, e).into())
3311                } else {
3312                    Err(format!("Cannot cast {:?} to {}", value, to_type).into())
3313                }
3314            }
3315            "u8" | "u16" | "u32" | "u64" | "usize" => {
3316                if let Some(n) = value.as_u64() {
3317                    Ok(json!(n))
3318                } else if let Some(n) = value.as_i64() {
3319                    Ok(json!(n as u64))
3320                } else if let Some(n) = value.as_f64() {
3321                    Ok(json!(n as u64))
3322                } else if let Some(s) = value.as_str() {
3323                    s.parse::<u64>().map(|n| json!(n)).map_err(|e| {
3324                        format!("Cannot parse '{}' as unsigned integer: {}", s, e).into()
3325                    })
3326                } else {
3327                    Err(format!("Cannot cast {:?} to {}", value, to_type).into())
3328                }
3329            }
3330            "f32" | "f64" => {
3331                if let Some(n) = value.as_f64() {
3332                    Ok(json!(n))
3333                } else if let Some(n) = value.as_i64() {
3334                    Ok(json!(n as f64))
3335                } else if let Some(n) = value.as_u64() {
3336                    Ok(json!(n as f64))
3337                } else if let Some(s) = value.as_str() {
3338                    s.parse::<f64>()
3339                        .map(|n| json!(n))
3340                        .map_err(|e| format!("Cannot parse '{}' as float: {}", s, e).into())
3341                } else {
3342                    Err(format!("Cannot cast {:?} to {}", value, to_type).into())
3343                }
3344            }
3345            "String" | "string" => Ok(json!(value.to_string())),
3346            "bool" => Ok(json!(self.value_to_bool(value))),
3347            _ => {
3348                // Unknown type, return value as-is
3349                Ok(value.clone())
3350            }
3351        }
3352    }
3353
3354    /// Apply a method call to a value
3355    fn apply_method_call(&self, value: &Value, method: &str, args: &[Value]) -> Result<Value> {
3356        match method {
3357            "unwrap_or" => {
3358                if value.is_null() && !args.is_empty() {
3359                    Ok(args[0].clone())
3360                } else {
3361                    Ok(value.clone())
3362                }
3363            }
3364            "unwrap_or_default" => {
3365                if value.is_null() {
3366                    // Return default for common types
3367                    Ok(json!(0))
3368                } else {
3369                    Ok(value.clone())
3370                }
3371            }
3372            "is_some" => Ok(json!(!value.is_null())),
3373            "is_none" => Ok(json!(value.is_null())),
3374            "abs" => {
3375                if let Some(n) = value.as_i64() {
3376                    Ok(json!(n.abs()))
3377                } else if let Some(n) = value.as_f64() {
3378                    Ok(json!(n.abs()))
3379                } else {
3380                    Err(format!("Cannot call abs() on {:?}", value).into())
3381                }
3382            }
3383            "len" => {
3384                if let Some(s) = value.as_str() {
3385                    Ok(json!(s.len()))
3386                } else if let Some(arr) = value.as_array() {
3387                    Ok(json!(arr.len()))
3388                } else if let Some(obj) = value.as_object() {
3389                    Ok(json!(obj.len()))
3390                } else {
3391                    Err(format!("Cannot call len() on {:?}", value).into())
3392                }
3393            }
3394            "to_string" => Ok(json!(value.to_string())),
3395            "min" => {
3396                if args.is_empty() {
3397                    return Err("min() requires an argument".into());
3398                }
3399                let other = &args[0];
3400                if let (Some(a), Some(b)) = (value.as_i64(), other.as_i64()) {
3401                    Ok(json!(a.min(b)))
3402                } else if let (Some(a), Some(b)) = (value.as_f64(), other.as_f64()) {
3403                    Ok(json!(a.min(b)))
3404                } else {
3405                    Err(format!("Cannot call min() on {:?} and {:?}", value, other).into())
3406                }
3407            }
3408            "max" => {
3409                if args.is_empty() {
3410                    return Err("max() requires an argument".into());
3411                }
3412                let other = &args[0];
3413                if let (Some(a), Some(b)) = (value.as_i64(), other.as_i64()) {
3414                    Ok(json!(a.max(b)))
3415                } else if let (Some(a), Some(b)) = (value.as_f64(), other.as_f64()) {
3416                    Ok(json!(a.max(b)))
3417                } else {
3418                    Err(format!("Cannot call max() on {:?} and {:?}", value, other).into())
3419                }
3420            }
3421            "saturating_add" => {
3422                if args.is_empty() {
3423                    return Err("saturating_add() requires an argument".into());
3424                }
3425                let other = &args[0];
3426                if let (Some(a), Some(b)) = (value.as_i64(), other.as_i64()) {
3427                    Ok(json!(a.saturating_add(b)))
3428                } else if let (Some(a), Some(b)) = (value.as_u64(), other.as_u64()) {
3429                    Ok(json!(a.saturating_add(b)))
3430                } else {
3431                    Err(format!(
3432                        "Cannot call saturating_add() on {:?} and {:?}",
3433                        value, other
3434                    )
3435                    .into())
3436                }
3437            }
3438            "saturating_sub" => {
3439                if args.is_empty() {
3440                    return Err("saturating_sub() requires an argument".into());
3441                }
3442                let other = &args[0];
3443                if let (Some(a), Some(b)) = (value.as_i64(), other.as_i64()) {
3444                    Ok(json!(a.saturating_sub(b)))
3445                } else if let (Some(a), Some(b)) = (value.as_u64(), other.as_u64()) {
3446                    Ok(json!(a.saturating_sub(b)))
3447                } else {
3448                    Err(format!(
3449                        "Cannot call saturating_sub() on {:?} and {:?}",
3450                        value, other
3451                    )
3452                    .into())
3453                }
3454            }
3455            _ => Err(format!("Unknown method call: {}()", method).into()),
3456        }
3457    }
3458
3459    /// Evaluate all computed fields for an entity and update the state
3460    /// This takes a list of ComputedFieldSpec from the AST and applies them
3461    pub fn evaluate_computed_fields_from_ast(
3462        &self,
3463        state: &mut Value,
3464        computed_field_specs: &[ComputedFieldSpec],
3465    ) -> Result<Vec<String>> {
3466        let mut updated_paths = Vec::new();
3467
3468        for spec in computed_field_specs {
3469            if let Ok(result) = self.evaluate_computed_expr(&spec.expression, state) {
3470                self.set_field_in_state(state, &spec.target_path, result)?;
3471                updated_paths.push(spec.target_path.clone());
3472            }
3473        }
3474
3475        Ok(updated_paths)
3476    }
3477
3478    /// Set a field value in state by path (e.g., "section.field")
3479    fn set_field_in_state(&self, state: &mut Value, path: &str, value: Value) -> Result<()> {
3480        let segments: Vec<&str> = path.split('.').collect();
3481
3482        if segments.is_empty() {
3483            return Err("Empty path".into());
3484        }
3485
3486        // Navigate to parent, creating intermediate objects as needed
3487        let mut current = state;
3488        for (i, segment) in segments.iter().enumerate() {
3489            if i == segments.len() - 1 {
3490                // Last segment - set the value
3491                if let Some(obj) = current.as_object_mut() {
3492                    obj.insert(segment.to_string(), value);
3493                    return Ok(());
3494                } else {
3495                    return Err(format!("Cannot set field '{}' on non-object", segment).into());
3496                }
3497            } else {
3498                // Intermediate segment - navigate or create
3499                if !current.is_object() {
3500                    *current = json!({});
3501                }
3502                let obj = current.as_object_mut().unwrap();
3503                current = obj.entry(segment.to_string()).or_insert_with(|| json!({}));
3504            }
3505        }
3506
3507        Ok(())
3508    }
3509
3510    /// Create a computed fields evaluator closure from AST specs
3511    /// This returns a function that can be passed to the bytecode builder
3512    pub fn create_evaluator_from_specs(
3513        specs: Vec<ComputedFieldSpec>,
3514    ) -> impl Fn(&mut Value) -> Result<()> + Send + Sync + 'static {
3515        move |state: &mut Value| {
3516            // Create a temporary VmContext just for evaluation
3517            // (We only need the expression evaluation methods)
3518            let vm = VmContext::new();
3519            vm.evaluate_computed_fields_from_ast(state, &specs)?;
3520            Ok(())
3521        }
3522    }
3523}
3524
3525impl Default for VmContext {
3526    fn default() -> Self {
3527        Self::new()
3528    }
3529}
3530
3531// Implement the ReverseLookupUpdater trait for VmContext
3532impl crate::resolvers::ReverseLookupUpdater for VmContext {
3533    fn update(&mut self, pda_address: String, seed_value: String) -> Vec<PendingAccountUpdate> {
3534        // Use default state_id=0 and default lookup name
3535        self.update_pda_reverse_lookup(0, "default_pda_lookup", pda_address, seed_value)
3536            .unwrap_or_else(|e| {
3537                tracing::error!("Failed to update PDA reverse lookup: {}", e);
3538                Vec::new()
3539            })
3540    }
3541
3542    fn flush_pending(&mut self, pda_address: &str) -> Vec<PendingAccountUpdate> {
3543        // Flush is handled inside update_pda_reverse_lookup, but we can also call it directly
3544        self.flush_pending_updates(0, pda_address)
3545            .unwrap_or_else(|e| {
3546                tracing::error!("Failed to flush pending updates: {}", e);
3547                Vec::new()
3548            })
3549    }
3550}
3551
3552#[cfg(test)]
3553mod tests {
3554    use super::*;
3555    use crate::ast::{BinaryOp, ComputedExpr, ComputedFieldSpec};
3556
3557    #[test]
3558    fn test_computed_field_preserves_integer_type() {
3559        let vm = VmContext::new();
3560
3561        let mut state = serde_json::json!({
3562            "trading": {
3563                "total_buy_volume": 20000000000_i64,
3564                "total_sell_volume": 17951316474_i64
3565            }
3566        });
3567
3568        let spec = ComputedFieldSpec {
3569            target_path: "trading.total_volume".to_string(),
3570            result_type: "Option<u64>".to_string(),
3571            expression: ComputedExpr::Binary {
3572                op: BinaryOp::Add,
3573                left: Box::new(ComputedExpr::UnwrapOr {
3574                    expr: Box::new(ComputedExpr::FieldRef {
3575                        path: "trading.total_buy_volume".to_string(),
3576                    }),
3577                    default: serde_json::json!(0),
3578                }),
3579                right: Box::new(ComputedExpr::UnwrapOr {
3580                    expr: Box::new(ComputedExpr::FieldRef {
3581                        path: "trading.total_sell_volume".to_string(),
3582                    }),
3583                    default: serde_json::json!(0),
3584                }),
3585            },
3586        };
3587
3588        vm.evaluate_computed_fields_from_ast(&mut state, &[spec])
3589            .unwrap();
3590
3591        let total_volume = state
3592            .get("trading")
3593            .and_then(|t| t.get("total_volume"))
3594            .expect("total_volume should exist");
3595
3596        let serialized = serde_json::to_string(total_volume).unwrap();
3597        assert!(
3598            !serialized.contains('.'),
3599            "Integer should not have decimal point: {}",
3600            serialized
3601        );
3602        assert_eq!(
3603            total_volume.as_i64(),
3604            Some(37951316474),
3605            "Value should be correct sum"
3606        );
3607    }
3608
3609    #[test]
3610    fn test_set_field_sum_preserves_integer_type() {
3611        let mut vm = VmContext::new();
3612        vm.registers[0] = serde_json::json!({});
3613        vm.registers[1] = serde_json::json!(20000000000_i64);
3614        vm.registers[2] = serde_json::json!(17951316474_i64);
3615
3616        vm.set_field_sum(0, "trading.total_buy_volume", 1).unwrap();
3617        vm.set_field_sum(0, "trading.total_sell_volume", 2).unwrap();
3618
3619        let state = &vm.registers[0];
3620        let buy_vol = state
3621            .get("trading")
3622            .and_then(|t| t.get("total_buy_volume"))
3623            .unwrap();
3624        let sell_vol = state
3625            .get("trading")
3626            .and_then(|t| t.get("total_sell_volume"))
3627            .unwrap();
3628
3629        let buy_serialized = serde_json::to_string(buy_vol).unwrap();
3630        let sell_serialized = serde_json::to_string(sell_vol).unwrap();
3631
3632        assert!(
3633            !buy_serialized.contains('.'),
3634            "Buy volume should not have decimal: {}",
3635            buy_serialized
3636        );
3637        assert!(
3638            !sell_serialized.contains('.'),
3639            "Sell volume should not have decimal: {}",
3640            sell_serialized
3641        );
3642    }
3643}