hyperstack_interpreter/
vm.rs

1use crate::ast::{
2    BinaryOp, ComparisonOp, ComputedExpr, ComputedFieldSpec, FieldPath, Transformation,
3};
4use crate::compiler::{MultiEntityBytecode, OpCode};
5use crate::Mutation;
6use dashmap::DashMap;
7use lru::LruCache;
8use serde_json::{json, Value};
9use std::collections::{HashMap, HashSet};
10use std::num::NonZeroUsize;
11
12#[cfg(feature = "otel")]
13use tracing::instrument;
14
15/// Context metadata for blockchain updates (accounts and instructions)
16/// This structure is designed to be extended over time with additional metadata
17#[derive(Debug, Clone, Default)]
18pub struct UpdateContext {
19    /// Blockchain slot number
20    pub slot: Option<u64>,
21    /// Transaction signature
22    pub signature: Option<String>,
23    /// Unix timestamp (seconds since epoch)
24    /// If not provided, will default to current system time when accessed
25    pub timestamp: Option<i64>,
26    /// Write version for account updates (monotonically increasing per account within a slot)
27    /// Used for staleness detection to reject out-of-order updates
28    pub write_version: Option<u64>,
29    /// Transaction index for instruction updates (orders transactions within a slot)
30    /// Used for staleness detection to reject out-of-order updates
31    pub txn_index: Option<u64>,
32    /// Additional custom metadata that can be added without breaking changes
33    pub metadata: HashMap<String, Value>,
34}
35
36impl UpdateContext {
37    /// Create a new UpdateContext with slot and signature
38    pub fn new(slot: u64, signature: String) -> Self {
39        Self {
40            slot: Some(slot),
41            signature: Some(signature),
42            timestamp: None,
43            write_version: None,
44            txn_index: None,
45            metadata: HashMap::new(),
46        }
47    }
48
49    /// Create a new UpdateContext with slot, signature, and timestamp
50    pub fn with_timestamp(slot: u64, signature: String, timestamp: i64) -> Self {
51        Self {
52            slot: Some(slot),
53            signature: Some(signature),
54            timestamp: Some(timestamp),
55            write_version: None,
56            txn_index: None,
57            metadata: HashMap::new(),
58        }
59    }
60
61    /// Create context for account updates with write_version for staleness detection
62    pub fn new_account(slot: u64, signature: String, write_version: u64) -> Self {
63        Self {
64            slot: Some(slot),
65            signature: Some(signature),
66            timestamp: None,
67            write_version: Some(write_version),
68            txn_index: None,
69            metadata: HashMap::new(),
70        }
71    }
72
73    /// Create context for instruction updates with txn_index for staleness detection
74    pub fn new_instruction(slot: u64, signature: String, txn_index: u64) -> Self {
75        Self {
76            slot: Some(slot),
77            signature: Some(signature),
78            timestamp: None,
79            write_version: None,
80            txn_index: Some(txn_index),
81            metadata: HashMap::new(),
82        }
83    }
84
85    /// Get the timestamp, falling back to current system time if not set
86    pub fn timestamp(&self) -> i64 {
87        self.timestamp.unwrap_or_else(|| {
88            std::time::SystemTime::now()
89                .duration_since(std::time::UNIX_EPOCH)
90                .unwrap()
91                .as_secs() as i64
92        })
93    }
94
95    /// Create an empty context (for testing or when context is not available)
96    pub fn empty() -> Self {
97        Self::default()
98    }
99
100    /// Add custom metadata
101    pub fn with_metadata(mut self, key: String, value: Value) -> Self {
102        self.metadata.insert(key, value);
103        self
104    }
105
106    /// Get metadata value
107    pub fn get_metadata(&self, key: &str) -> Option<&Value> {
108        self.metadata.get(key)
109    }
110
111    /// Convert context to JSON value for injection into event data
112    pub fn to_value(&self) -> Value {
113        let mut obj = serde_json::Map::new();
114        if let Some(slot) = self.slot {
115            obj.insert("slot".to_string(), json!(slot));
116        }
117        if let Some(ref sig) = self.signature {
118            obj.insert("signature".to_string(), json!(sig));
119        }
120        // Always include timestamp (use current time if not set)
121        obj.insert("timestamp".to_string(), json!(self.timestamp()));
122        for (key, value) in &self.metadata {
123            obj.insert(key.clone(), value.clone());
124        }
125        Value::Object(obj)
126    }
127}
128
129pub type Register = usize;
130pub type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>;
131
132pub type RegisterValue = Value;
133
134/// Trait for evaluating computed fields
135/// Implement this in your generated spec to enable computed field evaluation
136pub trait ComputedFieldsEvaluator {
137    fn evaluate(&self, state: &mut Value) -> Result<()>;
138}
139
140// Pending queue configuration
141const MAX_PENDING_UPDATES_TOTAL: usize = 2_500;
142const MAX_PENDING_UPDATES_PER_PDA: usize = 50;
143const PENDING_UPDATE_TTL_SECONDS: i64 = 300; // 5 minutes
144
145// Temporal index configuration - prevents unbounded history growth
146const TEMPORAL_HISTORY_TTL_SECONDS: i64 = 300; // 5 minutes, matches pending queue TTL
147const MAX_TEMPORAL_ENTRIES_PER_KEY: usize = 250;
148
149// State table configuration - aligned with downstream EntityCache (500 per view)
150const DEFAULT_MAX_STATE_TABLE_ENTRIES: usize = 2_500;
151const DEFAULT_MAX_ARRAY_LENGTH: usize = 100;
152
153const DEFAULT_MAX_LOOKUP_INDEX_ENTRIES: usize = 2_500;
154
155const DEFAULT_MAX_VERSION_TRACKER_ENTRIES: usize = 2_500;
156
157const DEFAULT_MAX_TEMPORAL_INDEX_KEYS: usize = 2_500;
158
159const DEFAULT_MAX_PDA_REVERSE_LOOKUP_ENTRIES: usize = 2_500;
160
161/// Estimate the size of a JSON value in bytes
162fn estimate_json_size(value: &Value) -> usize {
163    match value {
164        Value::Null => 4,
165        Value::Bool(_) => 5,
166        Value::Number(_) => 8,
167        Value::String(s) => s.len() + 2,
168        Value::Array(arr) => 2 + arr.iter().map(|v| estimate_json_size(v) + 1).sum::<usize>(),
169        Value::Object(obj) => {
170            2 + obj
171                .iter()
172                .map(|(k, v)| k.len() + 3 + estimate_json_size(v) + 1)
173                .sum::<usize>()
174        }
175    }
176}
177
178#[derive(Debug, Clone)]
179pub struct CompiledPath {
180    pub segments: std::sync::Arc<[String]>,
181}
182
183impl CompiledPath {
184    pub fn new(path: &str) -> Self {
185        let segments: Vec<String> = path.split('.').map(|s| s.to_string()).collect();
186        CompiledPath {
187            segments: segments.into(),
188        }
189    }
190
191    fn segments(&self) -> &[String] {
192        &self.segments
193    }
194}
195
196/// Represents the type of change made to a field for granular dirty tracking.
197/// This enables emitting only the actual changes rather than entire field values.
198#[derive(Debug, Clone)]
199pub enum FieldChange {
200    /// Field was replaced with a new value (emit the full value from state)
201    Replaced,
202    /// Items were appended to an array field (emit only the new items)
203    Appended(Vec<Value>),
204}
205
206/// Tracks field modifications during handler execution with granular change information.
207/// This replaces the simple HashSet<String> approach to enable delta-only emissions.
208#[derive(Debug, Clone, Default)]
209pub struct DirtyTracker {
210    changes: HashMap<String, FieldChange>,
211}
212
213impl DirtyTracker {
214    /// Create a new empty DirtyTracker
215    pub fn new() -> Self {
216        Self {
217            changes: HashMap::new(),
218        }
219    }
220
221    /// Mark a field as replaced (full value will be emitted)
222    pub fn mark_replaced(&mut self, path: &str) {
223        // If there was an append, it's now superseded by a full replacement
224        self.changes.insert(path.to_string(), FieldChange::Replaced);
225    }
226
227    /// Record an appended value for a field
228    pub fn mark_appended(&mut self, path: &str, value: Value) {
229        match self.changes.get_mut(path) {
230            Some(FieldChange::Appended(values)) => {
231                // Add to existing appended values
232                values.push(value);
233            }
234            Some(FieldChange::Replaced) => {
235                // Field was already replaced, keep it as replaced
236                // (the full value including the append will be emitted)
237            }
238            None => {
239                // First append to this field
240                self.changes
241                    .insert(path.to_string(), FieldChange::Appended(vec![value]));
242            }
243        }
244    }
245
246    /// Check if there are any changes tracked
247    pub fn is_empty(&self) -> bool {
248        self.changes.is_empty()
249    }
250
251    /// Get the number of changed fields
252    pub fn len(&self) -> usize {
253        self.changes.len()
254    }
255
256    /// Iterate over all changes
257    pub fn iter(&self) -> impl Iterator<Item = (&String, &FieldChange)> {
258        self.changes.iter()
259    }
260
261    /// Get a set of all dirty field paths (for backward compatibility)
262    pub fn dirty_paths(&self) -> HashSet<String> {
263        self.changes.keys().cloned().collect()
264    }
265
266    /// Consume the tracker and return the changes map
267    pub fn into_changes(self) -> HashMap<String, FieldChange> {
268        self.changes
269    }
270
271    /// Get a reference to the changes map
272    pub fn changes(&self) -> &HashMap<String, FieldChange> {
273        &self.changes
274    }
275
276    /// Get paths that were appended (not replaced)
277    pub fn appended_paths(&self) -> Vec<String> {
278        self.changes
279            .iter()
280            .filter_map(|(path, change)| match change {
281                FieldChange::Appended(_) => Some(path.clone()),
282                FieldChange::Replaced => None,
283            })
284            .collect()
285    }
286}
287
288pub struct VmContext {
289    registers: Vec<RegisterValue>,
290    states: HashMap<u32, StateTable>,
291    pub instructions_executed: u64,
292    pub cache_hits: u64,
293    path_cache: HashMap<String, CompiledPath>,
294    pub pda_cache_hits: u64,
295    pub pda_cache_misses: u64,
296    pub pending_queue_size: u64,
297    current_context: Option<UpdateContext>,
298    warnings: Vec<String>,
299    last_pda_lookup_miss: Option<String>,
300    last_pda_registered: Option<String>,
301}
302
303#[derive(Debug)]
304pub struct LookupIndex {
305    index: std::sync::Mutex<LruCache<String, Value>>,
306}
307
308impl LookupIndex {
309    pub fn new() -> Self {
310        Self::with_capacity(DEFAULT_MAX_LOOKUP_INDEX_ENTRIES)
311    }
312
313    pub fn with_capacity(capacity: usize) -> Self {
314        LookupIndex {
315            index: std::sync::Mutex::new(LruCache::new(
316                NonZeroUsize::new(capacity).expect("capacity must be > 0"),
317            )),
318        }
319    }
320
321    pub fn lookup(&self, lookup_value: &Value) -> Option<Value> {
322        let key = value_to_cache_key(lookup_value);
323        self.index.lock().unwrap().get(&key).cloned()
324    }
325
326    pub fn insert(&self, lookup_value: Value, primary_key: Value) {
327        let key = value_to_cache_key(&lookup_value);
328        self.index.lock().unwrap().put(key, primary_key);
329    }
330
331    pub fn len(&self) -> usize {
332        self.index.lock().unwrap().len()
333    }
334
335    pub fn is_empty(&self) -> bool {
336        self.index.lock().unwrap().is_empty()
337    }
338}
339
340impl Default for LookupIndex {
341    fn default() -> Self {
342        Self::new()
343    }
344}
345
346fn value_to_cache_key(value: &Value) -> String {
347    match value {
348        Value::String(s) => s.clone(),
349        Value::Number(n) => n.to_string(),
350        Value::Bool(b) => b.to_string(),
351        Value::Null => "null".to_string(),
352        _ => serde_json::to_string(value).unwrap_or_else(|_| "unknown".to_string()),
353    }
354}
355
356#[derive(Debug)]
357pub struct TemporalIndex {
358    index: std::sync::Mutex<LruCache<String, Vec<(Value, i64)>>>,
359}
360
361impl Default for TemporalIndex {
362    fn default() -> Self {
363        Self::new()
364    }
365}
366
367impl TemporalIndex {
368    pub fn new() -> Self {
369        Self::with_capacity(DEFAULT_MAX_TEMPORAL_INDEX_KEYS)
370    }
371
372    pub fn with_capacity(capacity: usize) -> Self {
373        TemporalIndex {
374            index: std::sync::Mutex::new(LruCache::new(
375                NonZeroUsize::new(capacity).expect("capacity must be > 0"),
376            )),
377        }
378    }
379
380    pub fn lookup(&self, lookup_value: &Value, timestamp: i64) -> Option<Value> {
381        let key = value_to_cache_key(lookup_value);
382        let mut cache = self.index.lock().unwrap();
383        if let Some(entries) = cache.get(&key) {
384            for i in (0..entries.len()).rev() {
385                if entries[i].1 <= timestamp {
386                    return Some(entries[i].0.clone());
387                }
388            }
389        }
390        None
391    }
392
393    pub fn lookup_latest(&self, lookup_value: &Value) -> Option<Value> {
394        let key = value_to_cache_key(lookup_value);
395        let mut cache = self.index.lock().unwrap();
396        if let Some(entries) = cache.get(&key) {
397            if let Some(last) = entries.last() {
398                return Some(last.0.clone());
399            }
400        }
401        None
402    }
403
404    pub fn insert(&self, lookup_value: Value, primary_key: Value, timestamp: i64) {
405        let key = value_to_cache_key(&lookup_value);
406        let mut cache = self.index.lock().unwrap();
407
408        let entries = cache.get_or_insert_mut(key, Vec::new);
409        entries.push((primary_key, timestamp));
410        entries.sort_by_key(|(_, ts)| *ts);
411
412        let cutoff = timestamp - TEMPORAL_HISTORY_TTL_SECONDS;
413        entries.retain(|(_, ts)| *ts >= cutoff);
414
415        if entries.len() > MAX_TEMPORAL_ENTRIES_PER_KEY {
416            let excess = entries.len() - MAX_TEMPORAL_ENTRIES_PER_KEY;
417            entries.drain(0..excess);
418        }
419    }
420
421    pub fn len(&self) -> usize {
422        self.index.lock().unwrap().len()
423    }
424
425    pub fn is_empty(&self) -> bool {
426        self.index.lock().unwrap().is_empty()
427    }
428
429    pub fn total_entries(&self) -> usize {
430        self.index
431            .lock()
432            .unwrap()
433            .iter()
434            .map(|(_, entries)| entries.len())
435            .sum()
436    }
437
438    pub fn cleanup_expired(&self, cutoff_timestamp: i64) -> usize {
439        let mut cache = self.index.lock().unwrap();
440        let mut total_removed = 0;
441
442        for (_, entries) in cache.iter_mut() {
443            let original_len = entries.len();
444            entries.retain(|(_, ts)| *ts >= cutoff_timestamp);
445            total_removed += original_len - entries.len();
446        }
447
448        total_removed
449    }
450}
451
452#[derive(Debug)]
453pub struct PdaReverseLookup {
454    // Maps: PDA address -> seed value (e.g., bonding_curve_addr -> mint)
455    index: LruCache<String, String>,
456}
457
458impl PdaReverseLookup {
459    pub fn new(capacity: usize) -> Self {
460        PdaReverseLookup {
461            index: LruCache::new(NonZeroUsize::new(capacity).unwrap()),
462        }
463    }
464
465    pub fn lookup(&mut self, pda_address: &str) -> Option<String> {
466        self.index.get(pda_address).cloned()
467    }
468
469    pub fn insert(&mut self, pda_address: String, seed_value: String) -> Option<String> {
470        let evicted = if self.index.len() >= self.index.cap().get() {
471            self.index.peek_lru().map(|(k, _)| k.clone())
472        } else {
473            None
474        };
475
476        self.index.put(pda_address, seed_value);
477        evicted
478    }
479
480    pub fn len(&self) -> usize {
481        self.index.len()
482    }
483
484    pub fn is_empty(&self) -> bool {
485        self.index.is_empty()
486    }
487}
488
489/// Input for queueing an account update.
490#[derive(Debug, Clone)]
491pub struct QueuedAccountUpdate {
492    pub pda_address: String,
493    pub account_type: String,
494    pub account_data: Value,
495    pub slot: u64,
496    pub write_version: u64,
497    pub signature: String,
498}
499
500/// Internal representation of a pending account update with queue metadata.
501#[derive(Debug, Clone)]
502pub struct PendingAccountUpdate {
503    pub account_type: String,
504    pub pda_address: String,
505    pub account_data: Value,
506    pub slot: u64,
507    pub write_version: u64,
508    pub signature: String,
509    pub queued_at: i64,
510}
511
512/// Input for queueing an instruction event when PDA lookup fails.
513#[derive(Debug, Clone)]
514pub struct QueuedInstructionEvent {
515    pub pda_address: String,
516    pub event_type: String,
517    pub event_data: Value,
518    pub slot: u64,
519    pub signature: String,
520}
521
522/// Internal representation of a pending instruction event with queue metadata.
523#[derive(Debug, Clone)]
524pub struct PendingInstructionEvent {
525    pub event_type: String,
526    pub pda_address: String,
527    pub event_data: Value,
528    pub slot: u64,
529    pub signature: String,
530    pub queued_at: i64,
531}
532
533#[derive(Debug, Clone)]
534pub struct PendingQueueStats {
535    pub total_updates: usize,
536    pub unique_pdas: usize,
537    pub oldest_age_seconds: i64,
538    pub largest_pda_queue_size: usize,
539    pub estimated_memory_bytes: usize,
540}
541
542#[derive(Debug, Clone, Default)]
543pub struct VmMemoryStats {
544    pub state_table_entity_count: usize,
545    pub state_table_max_entries: usize,
546    pub state_table_at_capacity: bool,
547    pub lookup_index_count: usize,
548    pub lookup_index_total_entries: usize,
549    pub temporal_index_count: usize,
550    pub temporal_index_total_entries: usize,
551    pub pda_reverse_lookup_count: usize,
552    pub pda_reverse_lookup_total_entries: usize,
553    pub version_tracker_entries: usize,
554    pub pending_queue_stats: Option<PendingQueueStats>,
555    pub path_cache_size: usize,
556}
557
558#[derive(Debug, Clone, Default)]
559pub struct CleanupResult {
560    pub pending_updates_removed: usize,
561    pub temporal_entries_removed: usize,
562}
563
564#[derive(Debug, Clone)]
565pub struct CapacityWarning {
566    pub current_entries: usize,
567    pub max_entries: usize,
568    pub entries_over_limit: usize,
569}
570
571#[derive(Debug, Clone)]
572pub struct StateTableConfig {
573    pub max_entries: usize,
574    pub max_array_length: usize,
575}
576
577impl Default for StateTableConfig {
578    fn default() -> Self {
579        Self {
580            max_entries: DEFAULT_MAX_STATE_TABLE_ENTRIES,
581            max_array_length: DEFAULT_MAX_ARRAY_LENGTH,
582        }
583    }
584}
585
586#[derive(Debug)]
587pub struct VersionTracker {
588    cache: std::sync::Mutex<LruCache<String, (u64, u64)>>,
589}
590
591impl VersionTracker {
592    pub fn new() -> Self {
593        Self::with_capacity(DEFAULT_MAX_VERSION_TRACKER_ENTRIES)
594    }
595
596    pub fn with_capacity(capacity: usize) -> Self {
597        VersionTracker {
598            cache: std::sync::Mutex::new(LruCache::new(
599                NonZeroUsize::new(capacity).expect("capacity must be > 0"),
600            )),
601        }
602    }
603
604    fn make_key(primary_key: &Value, event_type: &str) -> String {
605        format!("{}:{}", primary_key, event_type)
606    }
607
608    pub fn get(&self, primary_key: &Value, event_type: &str) -> Option<(u64, u64)> {
609        let key = Self::make_key(primary_key, event_type);
610        self.cache.lock().unwrap().get(&key).copied()
611    }
612
613    pub fn insert(&self, primary_key: &Value, event_type: &str, slot: u64, ordering_value: u64) {
614        let key = Self::make_key(primary_key, event_type);
615        self.cache.lock().unwrap().put(key, (slot, ordering_value));
616    }
617
618    pub fn len(&self) -> usize {
619        self.cache.lock().unwrap().len()
620    }
621
622    pub fn is_empty(&self) -> bool {
623        self.cache.lock().unwrap().is_empty()
624    }
625}
626
627impl Default for VersionTracker {
628    fn default() -> Self {
629        Self::new()
630    }
631}
632
633#[derive(Debug)]
634pub struct StateTable {
635    pub data: DashMap<Value, Value>,
636    access_times: DashMap<Value, i64>,
637    pub lookup_indexes: HashMap<String, LookupIndex>,
638    pub temporal_indexes: HashMap<String, TemporalIndex>,
639    pub pda_reverse_lookups: HashMap<String, PdaReverseLookup>,
640    pub pending_updates: DashMap<String, Vec<PendingAccountUpdate>>,
641    pub pending_instruction_events: DashMap<String, Vec<PendingInstructionEvent>>,
642    version_tracker: VersionTracker,
643    config: StateTableConfig,
644    #[cfg_attr(not(feature = "otel"), allow(dead_code))]
645    entity_name: String,
646}
647
648impl StateTable {
649    pub fn is_at_capacity(&self) -> bool {
650        self.data.len() >= self.config.max_entries
651    }
652
653    pub fn entries_over_limit(&self) -> usize {
654        self.data.len().saturating_sub(self.config.max_entries)
655    }
656
657    pub fn max_array_length(&self) -> usize {
658        self.config.max_array_length
659    }
660
661    fn touch(&self, key: &Value) {
662        let now = std::time::SystemTime::now()
663            .duration_since(std::time::UNIX_EPOCH)
664            .unwrap()
665            .as_secs() as i64;
666        self.access_times.insert(key.clone(), now);
667    }
668
669    fn evict_lru(&self, count: usize) -> usize {
670        if count == 0 || self.data.is_empty() {
671            return 0;
672        }
673
674        let mut entries: Vec<(Value, i64)> = self
675            .access_times
676            .iter()
677            .map(|entry| (entry.key().clone(), *entry.value()))
678            .collect();
679
680        entries.sort_by_key(|(_, ts)| *ts);
681
682        let to_evict: Vec<Value> = entries.iter().take(count).map(|(k, _)| k.clone()).collect();
683
684        let mut evicted = 0;
685        for key in to_evict {
686            self.data.remove(&key);
687            self.access_times.remove(&key);
688            evicted += 1;
689        }
690
691        #[cfg(feature = "otel")]
692        if evicted > 0 {
693            crate::vm_metrics::record_state_table_eviction(evicted as u64, &self.entity_name);
694        }
695
696        evicted
697    }
698
699    pub fn insert_with_eviction(&self, key: Value, value: Value) {
700        if self.data.len() >= self.config.max_entries && !self.data.contains_key(&key) {
701            #[cfg(feature = "otel")]
702            crate::vm_metrics::record_state_table_at_capacity(&self.entity_name);
703            let to_evict = (self.data.len() + 1).saturating_sub(self.config.max_entries);
704            self.evict_lru(to_evict.max(1));
705        }
706        self.data.insert(key.clone(), value);
707        self.touch(&key);
708    }
709
710    pub fn get_and_touch(&self, key: &Value) -> Option<Value> {
711        let result = self.data.get(key).map(|v| v.clone());
712        if result.is_some() {
713            self.touch(key);
714        }
715        result
716    }
717
718    /// Check if an update is fresh and update the version tracker.
719    /// Returns true if the update should be processed (is fresh).
720    /// Returns false if the update is stale and should be skipped.
721    ///
722    /// Comparison is lexicographic on (slot, ordering_value):
723    /// (100, 5) > (100, 3) > (99, 999)
724    pub fn is_fresh_update(
725        &self,
726        primary_key: &Value,
727        event_type: &str,
728        slot: u64,
729        ordering_value: u64,
730    ) -> bool {
731        let dominated = self
732            .version_tracker
733            .get(primary_key, event_type)
734            .map(|(last_slot, last_version)| (slot, ordering_value) <= (last_slot, last_version))
735            .unwrap_or(false);
736
737        if dominated {
738            return false;
739        }
740
741        self.version_tracker
742            .insert(primary_key, event_type, slot, ordering_value);
743        true
744    }
745}
746
747impl VmContext {
748    pub fn new() -> Self {
749        let mut vm = VmContext {
750            registers: vec![Value::Null; 256],
751            states: HashMap::new(),
752            instructions_executed: 0,
753            cache_hits: 0,
754            path_cache: HashMap::new(),
755            pda_cache_hits: 0,
756            pda_cache_misses: 0,
757            pending_queue_size: 0,
758            current_context: None,
759            warnings: Vec::new(),
760            last_pda_lookup_miss: None,
761            last_pda_registered: None,
762        };
763        vm.states.insert(
764            0,
765            StateTable {
766                data: DashMap::new(),
767                access_times: DashMap::new(),
768                lookup_indexes: HashMap::new(),
769                temporal_indexes: HashMap::new(),
770                pda_reverse_lookups: HashMap::new(),
771                pending_updates: DashMap::new(),
772                pending_instruction_events: DashMap::new(),
773                version_tracker: VersionTracker::new(),
774                config: StateTableConfig::default(),
775                entity_name: "default".to_string(),
776            },
777        );
778        vm
779    }
780
781    pub fn new_with_config(state_config: StateTableConfig) -> Self {
782        let mut vm = VmContext {
783            registers: vec![Value::Null; 256],
784            states: HashMap::new(),
785            instructions_executed: 0,
786            cache_hits: 0,
787            path_cache: HashMap::new(),
788            pda_cache_hits: 0,
789            pda_cache_misses: 0,
790            pending_queue_size: 0,
791            current_context: None,
792            warnings: Vec::new(),
793            last_pda_lookup_miss: None,
794            last_pda_registered: None,
795        };
796        vm.states.insert(
797            0,
798            StateTable {
799                data: DashMap::new(),
800                access_times: DashMap::new(),
801                lookup_indexes: HashMap::new(),
802                temporal_indexes: HashMap::new(),
803                pda_reverse_lookups: HashMap::new(),
804                pending_updates: DashMap::new(),
805                pending_instruction_events: DashMap::new(),
806                version_tracker: VersionTracker::new(),
807                config: state_config,
808                entity_name: "default".to_string(),
809            },
810        );
811        vm
812    }
813
814    /// Get a mutable reference to a state table by ID
815    /// Returns None if the state ID doesn't exist
816    pub fn get_state_table_mut(&mut self, state_id: u32) -> Option<&mut StateTable> {
817        self.states.get_mut(&state_id)
818    }
819
820    /// Get public access to registers (for metrics context)
821    pub fn registers_mut(&mut self) -> &mut Vec<RegisterValue> {
822        &mut self.registers
823    }
824
825    /// Get public access to path cache (for metrics context)
826    pub fn path_cache(&self) -> &HashMap<String, CompiledPath> {
827        &self.path_cache
828    }
829
830    /// Get the current update context
831    pub fn current_context(&self) -> Option<&UpdateContext> {
832        self.current_context.as_ref()
833    }
834
835    fn add_warning(&mut self, msg: String) {
836        self.warnings.push(msg);
837    }
838
839    pub fn take_warnings(&mut self) -> Vec<String> {
840        std::mem::take(&mut self.warnings)
841    }
842
843    pub fn has_warnings(&self) -> bool {
844        !self.warnings.is_empty()
845    }
846
847    pub fn update_state_from_register(
848        &mut self,
849        state_id: u32,
850        key: Value,
851        register: Register,
852    ) -> Result<()> {
853        let state = self.states.get(&state_id).ok_or("State table not found")?;
854        let value = self.registers[register].clone();
855        state.insert_with_eviction(key, value);
856        Ok(())
857    }
858
859    fn reset_registers(&mut self) {
860        for reg in &mut self.registers {
861            *reg = Value::Null;
862        }
863    }
864
865    /// Extract only the dirty fields from state (public for use by instruction hooks)
866    pub fn extract_partial_state(
867        &self,
868        state_reg: Register,
869        dirty_fields: &HashSet<String>,
870    ) -> Result<Value> {
871        let full_state = &self.registers[state_reg];
872
873        if dirty_fields.is_empty() {
874            return Ok(json!({}));
875        }
876
877        let mut partial = serde_json::Map::new();
878
879        for path in dirty_fields {
880            let segments: Vec<&str> = path.split('.').collect();
881
882            let mut current = full_state;
883            let mut found = true;
884
885            for segment in &segments {
886                match current.get(segment) {
887                    Some(v) => current = v,
888                    None => {
889                        found = false;
890                        break;
891                    }
892                }
893            }
894
895            if !found {
896                continue;
897            }
898
899            let mut target = &mut partial;
900            for (i, segment) in segments.iter().enumerate() {
901                if i == segments.len() - 1 {
902                    target.insert(segment.to_string(), current.clone());
903                } else {
904                    target
905                        .entry(segment.to_string())
906                        .or_insert_with(|| json!({}));
907                    target = target
908                        .get_mut(*segment)
909                        .and_then(|v| v.as_object_mut())
910                        .ok_or("Failed to build nested structure")?;
911                }
912            }
913        }
914
915        Ok(Value::Object(partial))
916    }
917
918    /// Extract a patch from state based on the DirtyTracker.
919    /// For Replaced fields: extracts the full value from state.
920    /// For Appended fields: emits only the appended values as an array.
921    pub fn extract_partial_state_with_tracker(
922        &self,
923        state_reg: Register,
924        tracker: &DirtyTracker,
925    ) -> Result<Value> {
926        let full_state = &self.registers[state_reg];
927
928        if tracker.is_empty() {
929            return Ok(json!({}));
930        }
931
932        let mut partial = serde_json::Map::new();
933
934        for (path, change) in tracker.iter() {
935            let segments: Vec<&str> = path.split('.').collect();
936
937            let value_to_insert = match change {
938                FieldChange::Replaced => {
939                    let mut current = full_state;
940                    let mut found = true;
941
942                    for segment in &segments {
943                        match current.get(*segment) {
944                            Some(v) => current = v,
945                            None => {
946                                found = false;
947                                break;
948                            }
949                        }
950                    }
951
952                    if !found {
953                        continue;
954                    }
955                    current.clone()
956                }
957                FieldChange::Appended(values) => Value::Array(values.clone()),
958            };
959
960            let mut target = &mut partial;
961            for (i, segment) in segments.iter().enumerate() {
962                if i == segments.len() - 1 {
963                    target.insert(segment.to_string(), value_to_insert.clone());
964                } else {
965                    target
966                        .entry(segment.to_string())
967                        .or_insert_with(|| json!({}));
968                    target = target
969                        .get_mut(*segment)
970                        .and_then(|v| v.as_object_mut())
971                        .ok_or("Failed to build nested structure")?;
972                }
973            }
974        }
975
976        Ok(Value::Object(partial))
977    }
978
979    fn get_compiled_path(&mut self, path: &str) -> CompiledPath {
980        if let Some(compiled) = self.path_cache.get(path) {
981            self.cache_hits += 1;
982            #[cfg(feature = "otel")]
983            crate::vm_metrics::record_path_cache_hit();
984            return compiled.clone();
985        }
986        #[cfg(feature = "otel")]
987        crate::vm_metrics::record_path_cache_miss();
988        let compiled = CompiledPath::new(path);
989        self.path_cache.insert(path.to_string(), compiled.clone());
990        compiled
991    }
992
993    /// Process an event with optional context metadata
994    #[cfg_attr(feature = "otel", instrument(
995        name = "vm.process_event",
996        skip(self, bytecode, event_value, log),
997        level = "info",
998        fields(
999            event_type = %event_type,
1000            slot = context.as_ref().and_then(|c| c.slot),
1001        )
1002    ))]
1003    pub fn process_event(
1004        &mut self,
1005        bytecode: &MultiEntityBytecode,
1006        event_value: Value,
1007        event_type: &str,
1008        context: Option<&UpdateContext>,
1009        mut log: Option<&mut crate::canonical_log::CanonicalLog>,
1010    ) -> Result<Vec<Mutation>> {
1011        self.current_context = context.cloned();
1012
1013        let mut event_value = event_value;
1014        if let Some(ctx) = context {
1015            if let Some(obj) = event_value.as_object_mut() {
1016                obj.insert("__update_context".to_string(), ctx.to_value());
1017            }
1018        }
1019
1020        let mut all_mutations = Vec::new();
1021
1022        if let Some(entity_names) = bytecode.event_routing.get(event_type) {
1023            for entity_name in entity_names {
1024                if let Some(entity_bytecode) = bytecode.entities.get(entity_name) {
1025                    if let Some(handler) = entity_bytecode.handlers.get(event_type) {
1026                        if let Some(ref mut log) = log {
1027                            log.set("entity", entity_name.clone());
1028                            log.inc("handlers", 1);
1029                        }
1030
1031                        let opcodes_before = self.instructions_executed;
1032                        let cache_before = self.cache_hits;
1033                        let pda_hits_before = self.pda_cache_hits;
1034                        let pda_misses_before = self.pda_cache_misses;
1035
1036                        let mutations = self.execute_handler(
1037                            handler,
1038                            &event_value,
1039                            event_type,
1040                            entity_bytecode.state_id,
1041                            entity_name,
1042                            entity_bytecode.computed_fields_evaluator.as_ref(),
1043                        )?;
1044
1045                        if let Some(ref mut log) = log {
1046                            log.inc(
1047                                "opcodes",
1048                                (self.instructions_executed - opcodes_before) as i64,
1049                            );
1050                            log.inc("cache_hits", (self.cache_hits - cache_before) as i64);
1051                            log.inc("pda_hits", (self.pda_cache_hits - pda_hits_before) as i64);
1052                            log.inc(
1053                                "pda_misses",
1054                                (self.pda_cache_misses - pda_misses_before) as i64,
1055                            );
1056                        }
1057
1058                        if mutations.is_empty() {
1059                            if let Some(missed_pda) = self.take_last_pda_lookup_miss() {
1060                                if event_type.ends_with("IxState") {
1061                                    let slot = context.and_then(|c| c.slot).unwrap_or(0);
1062                                    let signature = context
1063                                        .and_then(|c| c.signature.clone())
1064                                        .unwrap_or_default();
1065                                    let _ = self.queue_instruction_event(
1066                                        entity_bytecode.state_id,
1067                                        QueuedInstructionEvent {
1068                                            pda_address: missed_pda,
1069                                            event_type: event_type.to_string(),
1070                                            event_data: event_value.clone(),
1071                                            slot,
1072                                            signature,
1073                                        },
1074                                    );
1075                                }
1076                            }
1077                        }
1078
1079                        all_mutations.extend(mutations);
1080
1081                        if let Some(registered_pda) = self.take_last_pda_registered() {
1082                            let pending_events = self.flush_pending_instruction_events(
1083                                entity_bytecode.state_id,
1084                                &registered_pda,
1085                            );
1086                            for pending in pending_events {
1087                                if let Some(pending_handler) =
1088                                    entity_bytecode.handlers.get(&pending.event_type)
1089                                {
1090                                    if let Ok(reprocessed_mutations) = self.execute_handler(
1091                                        pending_handler,
1092                                        &pending.event_data,
1093                                        &pending.event_type,
1094                                        entity_bytecode.state_id,
1095                                        entity_name,
1096                                        entity_bytecode.computed_fields_evaluator.as_ref(),
1097                                    ) {
1098                                        all_mutations.extend(reprocessed_mutations);
1099                                    }
1100                                }
1101                            }
1102                        }
1103                    } else if let Some(ref mut log) = log {
1104                        log.set("skip_reason", "no_handler");
1105                    }
1106                } else if let Some(ref mut log) = log {
1107                    log.set("skip_reason", "entity_not_found");
1108                }
1109            }
1110        } else if let Some(ref mut log) = log {
1111            log.set("skip_reason", "no_event_routing");
1112        }
1113
1114        if let Some(log) = log {
1115            log.set("mutations", all_mutations.len() as i64);
1116            if let Some(first) = all_mutations.first() {
1117                if let Some(key_str) = first.key.as_str() {
1118                    log.set("primary_key", key_str);
1119                } else if let Some(key_num) = first.key.as_u64() {
1120                    log.set("primary_key", key_num as i64);
1121                }
1122            }
1123            if let Some(state) = self.states.get(&0) {
1124                log.set("state_table_size", state.data.len() as i64);
1125            }
1126
1127            let warnings = self.take_warnings();
1128            if !warnings.is_empty() {
1129                log.set("warnings", warnings.len() as i64);
1130                log.set(
1131                    "warning_messages",
1132                    Value::Array(warnings.into_iter().map(Value::String).collect()),
1133                );
1134                log.set_level(crate::canonical_log::LogLevel::Warn);
1135            }
1136        } else {
1137            self.warnings.clear();
1138        }
1139
1140        Ok(all_mutations)
1141    }
1142
1143    pub fn process_any(
1144        &mut self,
1145        bytecode: &MultiEntityBytecode,
1146        any: prost_types::Any,
1147    ) -> Result<Vec<Mutation>> {
1148        let (event_value, event_type) = bytecode.proto_router.decode(any)?;
1149        self.process_event(bytecode, event_value, &event_type, None, None)
1150    }
1151
1152    #[cfg_attr(feature = "otel", instrument(
1153        name = "vm.execute_handler",
1154        skip(self, handler, event_value, entity_evaluator),
1155        level = "debug",
1156        fields(
1157            event_type = %event_type,
1158            handler_opcodes = handler.len(),
1159        )
1160    ))]
1161    #[allow(clippy::type_complexity)]
1162    fn execute_handler(
1163        &mut self,
1164        handler: &[OpCode],
1165        event_value: &Value,
1166        event_type: &str,
1167        override_state_id: u32,
1168        entity_name: &str,
1169        entity_evaluator: Option<&Box<dyn Fn(&mut Value) -> Result<()> + Send + Sync>>,
1170    ) -> Result<Vec<Mutation>> {
1171        self.reset_registers();
1172        self.last_pda_lookup_miss = None;
1173
1174        let mut pc: usize = 0;
1175        let mut output = Vec::new();
1176        let mut dirty_tracker = DirtyTracker::new();
1177
1178        while pc < handler.len() {
1179            match &handler[pc] {
1180                OpCode::LoadEventField {
1181                    path,
1182                    dest,
1183                    default,
1184                } => {
1185                    let value = self.load_field(event_value, path, default.as_ref())?;
1186                    self.registers[*dest] = value;
1187                    pc += 1;
1188                }
1189                OpCode::LoadConstant { value, dest } => {
1190                    self.registers[*dest] = value.clone();
1191                    pc += 1;
1192                }
1193                OpCode::CopyRegister { source, dest } => {
1194                    self.registers[*dest] = self.registers[*source].clone();
1195                    pc += 1;
1196                }
1197                OpCode::CopyRegisterIfNull { source, dest } => {
1198                    if self.registers[*dest].is_null() {
1199                        self.registers[*dest] = self.registers[*source].clone();
1200                    }
1201                    pc += 1;
1202                }
1203                OpCode::GetEventType { dest } => {
1204                    self.registers[*dest] = json!(event_type);
1205                    pc += 1;
1206                }
1207                OpCode::CreateObject { dest } => {
1208                    self.registers[*dest] = json!({});
1209                    pc += 1;
1210                }
1211                OpCode::SetField {
1212                    object,
1213                    path,
1214                    value,
1215                } => {
1216                    self.set_field_auto_vivify(*object, path, *value)?;
1217                    dirty_tracker.mark_replaced(path);
1218                    pc += 1;
1219                }
1220                OpCode::SetFields { object, fields } => {
1221                    for (path, value_reg) in fields {
1222                        self.set_field_auto_vivify(*object, path, *value_reg)?;
1223                        dirty_tracker.mark_replaced(path);
1224                    }
1225                    pc += 1;
1226                }
1227                OpCode::GetField { object, path, dest } => {
1228                    let value = self.get_field(*object, path)?;
1229                    self.registers[*dest] = value;
1230                    pc += 1;
1231                }
1232                OpCode::ReadOrInitState {
1233                    state_id: _,
1234                    key,
1235                    default,
1236                    dest,
1237                } => {
1238                    let actual_state_id = override_state_id;
1239                    let entity_name_owned = entity_name.to_string();
1240                    self.states
1241                        .entry(actual_state_id)
1242                        .or_insert_with(|| StateTable {
1243                            data: DashMap::new(),
1244                            access_times: DashMap::new(),
1245                            lookup_indexes: HashMap::new(),
1246                            temporal_indexes: HashMap::new(),
1247                            pda_reverse_lookups: HashMap::new(),
1248                            pending_updates: DashMap::new(),
1249                            pending_instruction_events: DashMap::new(),
1250                            version_tracker: VersionTracker::new(),
1251                            config: StateTableConfig::default(),
1252                            entity_name: entity_name_owned,
1253                        });
1254                    let key_value = self.registers[*key].clone();
1255                    let warn_null_key = key_value.is_null()
1256                        && event_type.ends_with("State")
1257                        && !event_type.ends_with("IxState");
1258
1259                    if warn_null_key {
1260                        self.add_warning(format!(
1261                            "ReadOrInitState: key register {} is NULL for account state, event_type={}",
1262                            key, event_type
1263                        ));
1264                    }
1265
1266                    let state = self
1267                        .states
1268                        .get(&actual_state_id)
1269                        .ok_or("State table not found")?;
1270
1271                    if !key_value.is_null() {
1272                        if let Some(ctx) = &self.current_context {
1273                            let ordering_value = ctx.write_version.or(ctx.txn_index);
1274                            if let (Some(slot), Some(version)) = (ctx.slot, ordering_value) {
1275                                if !state.is_fresh_update(&key_value, event_type, slot, version) {
1276                                    self.add_warning(format!(
1277                                        "Stale update skipped: slot={}, version={}",
1278                                        slot, version
1279                                    ));
1280                                    return Ok(Vec::new());
1281                                }
1282                            }
1283                        }
1284                    }
1285                    let value = state
1286                        .get_and_touch(&key_value)
1287                        .unwrap_or_else(|| default.clone());
1288
1289                    self.registers[*dest] = value;
1290                    pc += 1;
1291                }
1292                OpCode::UpdateState {
1293                    state_id: _,
1294                    key,
1295                    value,
1296                } => {
1297                    let actual_state_id = override_state_id;
1298                    let state = self
1299                        .states
1300                        .get(&actual_state_id)
1301                        .ok_or("State table not found")?;
1302                    let key_value = self.registers[*key].clone();
1303                    let value_data = self.registers[*value].clone();
1304
1305                    state.insert_with_eviction(key_value, value_data);
1306                    pc += 1;
1307                }
1308                OpCode::AppendToArray {
1309                    object,
1310                    path,
1311                    value,
1312                } => {
1313                    let appended_value = self.registers[*value].clone();
1314                    let max_len = self
1315                        .states
1316                        .get(&override_state_id)
1317                        .map(|s| s.max_array_length())
1318                        .unwrap_or(DEFAULT_MAX_ARRAY_LENGTH);
1319                    self.append_to_array(*object, path, *value, max_len)?;
1320                    dirty_tracker.mark_appended(path, appended_value);
1321                    pc += 1;
1322                }
1323                OpCode::GetCurrentTimestamp { dest } => {
1324                    let timestamp = std::time::SystemTime::now()
1325                        .duration_since(std::time::UNIX_EPOCH)
1326                        .unwrap()
1327                        .as_secs() as i64;
1328                    self.registers[*dest] = json!(timestamp);
1329                    pc += 1;
1330                }
1331                OpCode::CreateEvent { dest, event_value } => {
1332                    let timestamp = std::time::SystemTime::now()
1333                        .duration_since(std::time::UNIX_EPOCH)
1334                        .unwrap()
1335                        .as_secs() as i64;
1336
1337                    // Filter out __update_context from the event data
1338                    let mut event_data = self.registers[*event_value].clone();
1339                    if let Some(obj) = event_data.as_object_mut() {
1340                        obj.remove("__update_context");
1341                    }
1342
1343                    // Create event with timestamp, data, and optional slot/signature from context
1344                    let mut event = serde_json::Map::new();
1345                    event.insert("timestamp".to_string(), json!(timestamp));
1346                    event.insert("data".to_string(), event_data);
1347
1348                    // Add slot and signature if available from current context
1349                    if let Some(ref ctx) = self.current_context {
1350                        if let Some(slot) = ctx.slot {
1351                            event.insert("slot".to_string(), json!(slot));
1352                        }
1353                        if let Some(ref signature) = ctx.signature {
1354                            event.insert("signature".to_string(), json!(signature));
1355                        }
1356                    }
1357
1358                    self.registers[*dest] = Value::Object(event);
1359                    pc += 1;
1360                }
1361                OpCode::CreateCapture {
1362                    dest,
1363                    capture_value,
1364                } => {
1365                    let timestamp = std::time::SystemTime::now()
1366                        .duration_since(std::time::UNIX_EPOCH)
1367                        .unwrap()
1368                        .as_secs() as i64;
1369
1370                    // Get the capture data (already filtered by load_field)
1371                    let capture_data = self.registers[*capture_value].clone();
1372
1373                    // Extract account_address from the original event if available
1374                    let account_address = event_value
1375                        .get("__account_address")
1376                        .and_then(|v| v.as_str())
1377                        .unwrap_or("")
1378                        .to_string();
1379
1380                    // Create capture wrapper with timestamp, account_address, data, and optional slot/signature
1381                    let mut capture = serde_json::Map::new();
1382                    capture.insert("timestamp".to_string(), json!(timestamp));
1383                    capture.insert("account_address".to_string(), json!(account_address));
1384                    capture.insert("data".to_string(), capture_data);
1385
1386                    // Add slot and signature if available from current context
1387                    if let Some(ref ctx) = self.current_context {
1388                        if let Some(slot) = ctx.slot {
1389                            capture.insert("slot".to_string(), json!(slot));
1390                        }
1391                        if let Some(ref signature) = ctx.signature {
1392                            capture.insert("signature".to_string(), json!(signature));
1393                        }
1394                    }
1395
1396                    self.registers[*dest] = Value::Object(capture);
1397                    pc += 1;
1398                }
1399                OpCode::Transform {
1400                    source,
1401                    dest,
1402                    transformation,
1403                } => {
1404                    if source == dest {
1405                        self.transform_in_place(*source, transformation)?;
1406                    } else {
1407                        let source_value = &self.registers[*source];
1408                        let value = self.apply_transformation(source_value, transformation)?;
1409                        self.registers[*dest] = value;
1410                    }
1411                    pc += 1;
1412                }
1413                OpCode::EmitMutation {
1414                    entity_name,
1415                    key,
1416                    state,
1417                } => {
1418                    let primary_key = self.registers[*key].clone();
1419
1420                    if primary_key.is_null() || dirty_tracker.is_empty() {
1421                        let reason = if dirty_tracker.is_empty() {
1422                            "no_fields_modified"
1423                        } else {
1424                            "null_primary_key"
1425                        };
1426                        self.add_warning(format!(
1427                            "Skipping mutation for entity '{}': {} (dirty_fields={})",
1428                            entity_name,
1429                            reason,
1430                            dirty_tracker.len()
1431                        ));
1432                    } else {
1433                        let patch =
1434                            self.extract_partial_state_with_tracker(*state, &dirty_tracker)?;
1435                        let append = dirty_tracker.appended_paths();
1436                        let mutation = Mutation {
1437                            export: entity_name.clone(),
1438                            key: primary_key,
1439                            patch,
1440                            append,
1441                        };
1442                        output.push(mutation);
1443                    }
1444                    pc += 1;
1445                }
1446                OpCode::SetFieldIfNull {
1447                    object,
1448                    path,
1449                    value,
1450                } => {
1451                    let was_set = self.set_field_if_null(*object, path, *value)?;
1452                    if was_set {
1453                        dirty_tracker.mark_replaced(path);
1454                    }
1455                    pc += 1;
1456                }
1457                OpCode::SetFieldMax {
1458                    object,
1459                    path,
1460                    value,
1461                } => {
1462                    let was_updated = self.set_field_max(*object, path, *value)?;
1463                    if was_updated {
1464                        dirty_tracker.mark_replaced(path);
1465                    }
1466                    pc += 1;
1467                }
1468                OpCode::UpdateTemporalIndex {
1469                    state_id: _,
1470                    index_name,
1471                    lookup_value,
1472                    primary_key,
1473                    timestamp,
1474                } => {
1475                    let actual_state_id = override_state_id;
1476                    let state = self
1477                        .states
1478                        .get_mut(&actual_state_id)
1479                        .ok_or("State table not found")?;
1480                    let index = state
1481                        .temporal_indexes
1482                        .entry(index_name.clone())
1483                        .or_insert_with(TemporalIndex::new);
1484
1485                    let lookup_val = self.registers[*lookup_value].clone();
1486                    let pk_val = self.registers[*primary_key].clone();
1487                    let ts_val = if let Some(val) = self.registers[*timestamp].as_i64() {
1488                        val
1489                    } else if let Some(val) = self.registers[*timestamp].as_u64() {
1490                        val as i64
1491                    } else {
1492                        return Err(format!(
1493                            "Timestamp must be a number (i64 or u64), got: {:?}",
1494                            self.registers[*timestamp]
1495                        )
1496                        .into());
1497                    };
1498
1499                    index.insert(lookup_val, pk_val, ts_val);
1500                    pc += 1;
1501                }
1502                OpCode::LookupTemporalIndex {
1503                    state_id: _,
1504                    index_name,
1505                    lookup_value,
1506                    timestamp,
1507                    dest,
1508                } => {
1509                    let actual_state_id = override_state_id;
1510                    let state = self
1511                        .states
1512                        .get(&actual_state_id)
1513                        .ok_or("State table not found")?;
1514                    let lookup_val = &self.registers[*lookup_value];
1515
1516                    let result = if self.registers[*timestamp].is_null() {
1517                        if let Some(index) = state.temporal_indexes.get(index_name) {
1518                            index.lookup_latest(lookup_val).unwrap_or(Value::Null)
1519                        } else {
1520                            Value::Null
1521                        }
1522                    } else {
1523                        let ts_val = if let Some(val) = self.registers[*timestamp].as_i64() {
1524                            val
1525                        } else if let Some(val) = self.registers[*timestamp].as_u64() {
1526                            val as i64
1527                        } else {
1528                            return Err(format!(
1529                                "Timestamp must be a number (i64 or u64), got: {:?}",
1530                                self.registers[*timestamp]
1531                            )
1532                            .into());
1533                        };
1534
1535                        if let Some(index) = state.temporal_indexes.get(index_name) {
1536                            index.lookup(lookup_val, ts_val).unwrap_or(Value::Null)
1537                        } else {
1538                            Value::Null
1539                        }
1540                    };
1541
1542                    self.registers[*dest] = result;
1543                    pc += 1;
1544                }
1545                OpCode::UpdateLookupIndex {
1546                    state_id: _,
1547                    index_name,
1548                    lookup_value,
1549                    primary_key,
1550                } => {
1551                    let actual_state_id = override_state_id;
1552                    let state = self
1553                        .states
1554                        .get_mut(&actual_state_id)
1555                        .ok_or("State table not found")?;
1556                    let index = state
1557                        .lookup_indexes
1558                        .entry(index_name.clone())
1559                        .or_insert_with(LookupIndex::new);
1560
1561                    let lookup_val = self.registers[*lookup_value].clone();
1562                    let pk_val = self.registers[*primary_key].clone();
1563
1564                    index.insert(lookup_val, pk_val);
1565                    pc += 1;
1566                }
1567                OpCode::LookupIndex {
1568                    state_id: _,
1569                    index_name,
1570                    lookup_value,
1571                    dest,
1572                } => {
1573                    let actual_state_id = override_state_id;
1574                    let lookup_val = self.registers[*lookup_value].clone();
1575
1576                    let result = {
1577                        let state = self
1578                            .states
1579                            .get(&actual_state_id)
1580                            .ok_or("State table not found")?;
1581
1582                        if let Some(index) = state.lookup_indexes.get(index_name) {
1583                            let found = index.lookup(&lookup_val).unwrap_or(Value::Null);
1584                            #[cfg(feature = "otel")]
1585                            if found.is_null() {
1586                                crate::vm_metrics::record_lookup_index_miss(index_name);
1587                            } else {
1588                                crate::vm_metrics::record_lookup_index_hit(index_name);
1589                            }
1590                            found
1591                        } else {
1592                            Value::Null
1593                        }
1594                    };
1595
1596                    let final_result = if result.is_null() {
1597                        if let Some(pda_str) = lookup_val.as_str() {
1598                            let state = self
1599                                .states
1600                                .get_mut(&actual_state_id)
1601                                .ok_or("State table not found")?;
1602
1603                            if let Some(pda_lookup) =
1604                                state.pda_reverse_lookups.get_mut("default_pda_lookup")
1605                            {
1606                                if let Some(resolved) = pda_lookup.lookup(pda_str) {
1607                                    Value::String(resolved)
1608                                } else {
1609                                    self.last_pda_lookup_miss = Some(pda_str.to_string());
1610                                    Value::Null
1611                                }
1612                            } else {
1613                                self.last_pda_lookup_miss = Some(pda_str.to_string());
1614                                Value::Null
1615                            }
1616                        } else {
1617                            Value::Null
1618                        }
1619                    } else {
1620                        result
1621                    };
1622
1623                    self.registers[*dest] = final_result;
1624                    pc += 1;
1625                }
1626                OpCode::SetFieldSum {
1627                    object,
1628                    path,
1629                    value,
1630                } => {
1631                    let was_updated = self.set_field_sum(*object, path, *value)?;
1632                    if was_updated {
1633                        dirty_tracker.mark_replaced(path);
1634                    }
1635                    pc += 1;
1636                }
1637                OpCode::SetFieldIncrement { object, path } => {
1638                    let was_updated = self.set_field_increment(*object, path)?;
1639                    if was_updated {
1640                        dirty_tracker.mark_replaced(path);
1641                    }
1642                    pc += 1;
1643                }
1644                OpCode::SetFieldMin {
1645                    object,
1646                    path,
1647                    value,
1648                } => {
1649                    let was_updated = self.set_field_min(*object, path, *value)?;
1650                    if was_updated {
1651                        dirty_tracker.mark_replaced(path);
1652                    }
1653                    pc += 1;
1654                }
1655                OpCode::AddToUniqueSet {
1656                    state_id: _,
1657                    set_name,
1658                    value,
1659                    count_object,
1660                    count_path,
1661                } => {
1662                    let value_to_add = self.registers[*value].clone();
1663
1664                    // Store the unique set within the entity object, not in the state table
1665                    // This ensures each entity instance has its own unique set
1666                    let set_field_path = format!("__unique_set:{}", set_name);
1667
1668                    // Get or create the unique set from the entity object
1669                    let mut set: HashSet<Value> =
1670                        if let Ok(existing) = self.get_field(*count_object, &set_field_path) {
1671                            if !existing.is_null() {
1672                                serde_json::from_value(existing).unwrap_or_default()
1673                            } else {
1674                                HashSet::new()
1675                            }
1676                        } else {
1677                            HashSet::new()
1678                        };
1679
1680                    // Add value to set
1681                    let was_new = set.insert(value_to_add);
1682
1683                    // Store updated set back in the entity object
1684                    let set_as_vec: Vec<Value> = set.iter().cloned().collect();
1685                    self.registers[100] = serde_json::to_value(set_as_vec)?;
1686                    self.set_field_auto_vivify(*count_object, &set_field_path, 100)?;
1687
1688                    // Update the count field in the object
1689                    if was_new {
1690                        self.registers[100] = Value::Number(serde_json::Number::from(set.len()));
1691                        self.set_field_auto_vivify(*count_object, count_path, 100)?;
1692                        dirty_tracker.mark_replaced(count_path);
1693                    }
1694
1695                    pc += 1;
1696                }
1697                OpCode::ConditionalSetField {
1698                    object,
1699                    path,
1700                    value,
1701                    condition_field,
1702                    condition_op,
1703                    condition_value,
1704                } => {
1705                    let field_value = self.load_field(event_value, condition_field, None)?;
1706                    let condition_met =
1707                        self.evaluate_comparison(&field_value, condition_op, condition_value)?;
1708
1709                    if condition_met {
1710                        self.set_field_auto_vivify(*object, path, *value)?;
1711                        dirty_tracker.mark_replaced(path);
1712                    }
1713                    pc += 1;
1714                }
1715                OpCode::ConditionalIncrement {
1716                    object,
1717                    path,
1718                    condition_field,
1719                    condition_op,
1720                    condition_value,
1721                } => {
1722                    let field_value = self.load_field(event_value, condition_field, None)?;
1723                    let condition_met =
1724                        self.evaluate_comparison(&field_value, condition_op, condition_value)?;
1725
1726                    if condition_met {
1727                        let was_updated = self.set_field_increment(*object, path)?;
1728                        if was_updated {
1729                            dirty_tracker.mark_replaced(path);
1730                        }
1731                    }
1732                    pc += 1;
1733                }
1734                OpCode::EvaluateComputedFields {
1735                    state,
1736                    computed_paths,
1737                } => {
1738                    if let Some(evaluator) = entity_evaluator {
1739                        let old_values: Vec<_> = computed_paths
1740                            .iter()
1741                            .map(|path| Self::get_value_at_path(&self.registers[*state], path))
1742                            .collect();
1743
1744                        let state_value = &mut self.registers[*state];
1745                        if evaluator(state_value).is_ok() {
1746                            for (path, old_value) in computed_paths.iter().zip(old_values.iter()) {
1747                                let new_value =
1748                                    Self::get_value_at_path(&self.registers[*state], path);
1749                                if new_value != *old_value {
1750                                    dirty_tracker.mark_replaced(path);
1751                                }
1752                            }
1753                        }
1754                    }
1755                    pc += 1;
1756                }
1757                OpCode::UpdatePdaReverseLookup {
1758                    state_id: _,
1759                    lookup_name,
1760                    pda_address,
1761                    primary_key,
1762                } => {
1763                    let actual_state_id = override_state_id;
1764                    let state = self
1765                        .states
1766                        .get_mut(&actual_state_id)
1767                        .ok_or("State table not found")?;
1768
1769                    let pda_val = self.registers[*pda_address].clone();
1770                    let pk_val = self.registers[*primary_key].clone();
1771
1772                    if let (Some(pda_str), Some(pk_str)) = (pda_val.as_str(), pk_val.as_str()) {
1773                        let pda_lookup = state
1774                            .pda_reverse_lookups
1775                            .entry(lookup_name.clone())
1776                            .or_insert_with(|| {
1777                                PdaReverseLookup::new(DEFAULT_MAX_PDA_REVERSE_LOOKUP_ENTRIES)
1778                            });
1779
1780                        pda_lookup.insert(pda_str.to_string(), pk_str.to_string());
1781                        self.last_pda_registered = Some(pda_str.to_string());
1782                    } else if !pk_val.is_null() {
1783                        if let Some(pk_num) = pk_val.as_u64() {
1784                            if let Some(pda_str) = pda_val.as_str() {
1785                                let pda_lookup = state
1786                                    .pda_reverse_lookups
1787                                    .entry(lookup_name.clone())
1788                                    .or_insert_with(|| {
1789                                        PdaReverseLookup::new(
1790                                            DEFAULT_MAX_PDA_REVERSE_LOOKUP_ENTRIES,
1791                                        )
1792                                    });
1793
1794                                pda_lookup.insert(pda_str.to_string(), pk_num.to_string());
1795                                self.last_pda_registered = Some(pda_str.to_string());
1796                            }
1797                        }
1798                    }
1799
1800                    pc += 1;
1801                }
1802            }
1803
1804            self.instructions_executed += 1;
1805        }
1806
1807        Ok(output)
1808    }
1809
1810    fn load_field(
1811        &self,
1812        event_value: &Value,
1813        path: &FieldPath,
1814        default: Option<&Value>,
1815    ) -> Result<Value> {
1816        if path.segments.is_empty() {
1817            if let Some(obj) = event_value.as_object() {
1818                let filtered: serde_json::Map<String, Value> = obj
1819                    .iter()
1820                    .filter(|(k, _)| !k.starts_with("__"))
1821                    .map(|(k, v)| (k.clone(), v.clone()))
1822                    .collect();
1823                return Ok(Value::Object(filtered));
1824            }
1825            return Ok(event_value.clone());
1826        }
1827
1828        let mut current = event_value;
1829        for segment in path.segments.iter() {
1830            current = match current.get(segment) {
1831                Some(v) => v,
1832                None => return Ok(default.cloned().unwrap_or(Value::Null)),
1833            };
1834        }
1835
1836        Ok(current.clone())
1837    }
1838
1839    fn get_value_at_path(value: &Value, path: &str) -> Option<Value> {
1840        let mut current = value;
1841        for segment in path.split('.') {
1842            current = current.get(segment)?;
1843        }
1844        Some(current.clone())
1845    }
1846
1847    fn set_field_auto_vivify(
1848        &mut self,
1849        object_reg: Register,
1850        path: &str,
1851        value_reg: Register,
1852    ) -> Result<()> {
1853        let compiled = self.get_compiled_path(path);
1854        let segments = compiled.segments();
1855        let value = self.registers[value_reg].clone();
1856
1857        if !self.registers[object_reg].is_object() {
1858            self.registers[object_reg] = json!({});
1859        }
1860
1861        let obj = self.registers[object_reg]
1862            .as_object_mut()
1863            .ok_or("Not an object")?;
1864
1865        let mut current = obj;
1866        for (i, segment) in segments.iter().enumerate() {
1867            if i == segments.len() - 1 {
1868                current.insert(segment.to_string(), value);
1869                return Ok(());
1870            } else {
1871                current
1872                    .entry(segment.to_string())
1873                    .or_insert_with(|| json!({}));
1874                current = current
1875                    .get_mut(segment)
1876                    .and_then(|v| v.as_object_mut())
1877                    .ok_or("Path collision: expected object")?;
1878            }
1879        }
1880
1881        Ok(())
1882    }
1883
1884    fn set_field_if_null(
1885        &mut self,
1886        object_reg: Register,
1887        path: &str,
1888        value_reg: Register,
1889    ) -> Result<bool> {
1890        let compiled = self.get_compiled_path(path);
1891        let segments = compiled.segments();
1892        let value = self.registers[value_reg].clone();
1893
1894        if !self.registers[object_reg].is_object() {
1895            self.registers[object_reg] = json!({});
1896        }
1897
1898        let obj = self.registers[object_reg]
1899            .as_object_mut()
1900            .ok_or("Not an object")?;
1901
1902        let mut current = obj;
1903        for (i, segment) in segments.iter().enumerate() {
1904            if i == segments.len() - 1 {
1905                if !current.contains_key(segment) || current.get(segment).unwrap().is_null() {
1906                    current.insert(segment.to_string(), value);
1907                    return Ok(true);
1908                }
1909                return Ok(false);
1910            } else {
1911                current
1912                    .entry(segment.to_string())
1913                    .or_insert_with(|| json!({}));
1914                current = current
1915                    .get_mut(segment)
1916                    .and_then(|v| v.as_object_mut())
1917                    .ok_or("Path collision: expected object")?;
1918            }
1919        }
1920
1921        Ok(false)
1922    }
1923
1924    fn set_field_max(
1925        &mut self,
1926        object_reg: Register,
1927        path: &str,
1928        value_reg: Register,
1929    ) -> Result<bool> {
1930        let compiled = self.get_compiled_path(path);
1931        let segments = compiled.segments();
1932        let new_value = self.registers[value_reg].clone();
1933
1934        if !self.registers[object_reg].is_object() {
1935            self.registers[object_reg] = json!({});
1936        }
1937
1938        let obj = self.registers[object_reg]
1939            .as_object_mut()
1940            .ok_or("Not an object")?;
1941
1942        let mut current = obj;
1943        for (i, segment) in segments.iter().enumerate() {
1944            if i == segments.len() - 1 {
1945                let should_update = if let Some(current_value) = current.get(segment) {
1946                    if current_value.is_null() {
1947                        true
1948                    } else {
1949                        match (current_value.as_i64(), new_value.as_i64()) {
1950                            (Some(current_val), Some(new_val)) => new_val > current_val,
1951                            (Some(current_val), None) if new_value.as_u64().is_some() => {
1952                                new_value.as_u64().unwrap() as i64 > current_val
1953                            }
1954                            (None, Some(new_val)) if current_value.as_u64().is_some() => {
1955                                new_val > current_value.as_u64().unwrap() as i64
1956                            }
1957                            (None, None) => match (current_value.as_u64(), new_value.as_u64()) {
1958                                (Some(current_val), Some(new_val)) => new_val > current_val,
1959                                _ => match (current_value.as_f64(), new_value.as_f64()) {
1960                                    (Some(current_val), Some(new_val)) => new_val > current_val,
1961                                    _ => false,
1962                                },
1963                            },
1964                            _ => false,
1965                        }
1966                    }
1967                } else {
1968                    true
1969                };
1970
1971                if should_update {
1972                    current.insert(segment.to_string(), new_value);
1973                    return Ok(true);
1974                }
1975                return Ok(false);
1976            } else {
1977                current
1978                    .entry(segment.to_string())
1979                    .or_insert_with(|| json!({}));
1980                current = current
1981                    .get_mut(segment)
1982                    .and_then(|v| v.as_object_mut())
1983                    .ok_or("Path collision: expected object")?;
1984            }
1985        }
1986
1987        Ok(false)
1988    }
1989
1990    fn set_field_sum(
1991        &mut self,
1992        object_reg: Register,
1993        path: &str,
1994        value_reg: Register,
1995    ) -> Result<bool> {
1996        let compiled = self.get_compiled_path(path);
1997        let segments = compiled.segments();
1998        let new_value = &self.registers[value_reg];
1999
2000        // Extract numeric value before borrowing object_reg mutably
2001        let new_val_num = new_value
2002            .as_i64()
2003            .or_else(|| new_value.as_u64().map(|n| n as i64))
2004            .ok_or("Sum requires numeric value")?;
2005
2006        if !self.registers[object_reg].is_object() {
2007            self.registers[object_reg] = json!({});
2008        }
2009
2010        let obj = self.registers[object_reg]
2011            .as_object_mut()
2012            .ok_or("Not an object")?;
2013
2014        let mut current = obj;
2015        for (i, segment) in segments.iter().enumerate() {
2016            if i == segments.len() - 1 {
2017                let current_val = current
2018                    .get(segment)
2019                    .and_then(|v| {
2020                        if v.is_null() {
2021                            None
2022                        } else {
2023                            v.as_i64().or_else(|| v.as_u64().map(|n| n as i64))
2024                        }
2025                    })
2026                    .unwrap_or(0);
2027
2028                let sum = current_val + new_val_num;
2029                current.insert(segment.to_string(), json!(sum));
2030                return Ok(true);
2031            } else {
2032                current
2033                    .entry(segment.to_string())
2034                    .or_insert_with(|| json!({}));
2035                current = current
2036                    .get_mut(segment)
2037                    .and_then(|v| v.as_object_mut())
2038                    .ok_or("Path collision: expected object")?;
2039            }
2040        }
2041
2042        Ok(false)
2043    }
2044
2045    fn set_field_increment(&mut self, object_reg: Register, path: &str) -> Result<bool> {
2046        let compiled = self.get_compiled_path(path);
2047        let segments = compiled.segments();
2048
2049        if !self.registers[object_reg].is_object() {
2050            self.registers[object_reg] = json!({});
2051        }
2052
2053        let obj = self.registers[object_reg]
2054            .as_object_mut()
2055            .ok_or("Not an object")?;
2056
2057        let mut current = obj;
2058        for (i, segment) in segments.iter().enumerate() {
2059            if i == segments.len() - 1 {
2060                // Get current value (default to 0 if null/missing)
2061                let current_val = current
2062                    .get(segment)
2063                    .and_then(|v| {
2064                        if v.is_null() {
2065                            None
2066                        } else {
2067                            v.as_i64().or_else(|| v.as_u64().map(|n| n as i64))
2068                        }
2069                    })
2070                    .unwrap_or(0);
2071
2072                let incremented = current_val + 1;
2073                current.insert(segment.to_string(), json!(incremented));
2074                return Ok(true);
2075            } else {
2076                current
2077                    .entry(segment.to_string())
2078                    .or_insert_with(|| json!({}));
2079                current = current
2080                    .get_mut(segment)
2081                    .and_then(|v| v.as_object_mut())
2082                    .ok_or("Path collision: expected object")?;
2083            }
2084        }
2085
2086        Ok(false)
2087    }
2088
2089    fn set_field_min(
2090        &mut self,
2091        object_reg: Register,
2092        path: &str,
2093        value_reg: Register,
2094    ) -> Result<bool> {
2095        let compiled = self.get_compiled_path(path);
2096        let segments = compiled.segments();
2097        let new_value = self.registers[value_reg].clone();
2098
2099        if !self.registers[object_reg].is_object() {
2100            self.registers[object_reg] = json!({});
2101        }
2102
2103        let obj = self.registers[object_reg]
2104            .as_object_mut()
2105            .ok_or("Not an object")?;
2106
2107        let mut current = obj;
2108        for (i, segment) in segments.iter().enumerate() {
2109            if i == segments.len() - 1 {
2110                let should_update = if let Some(current_value) = current.get(segment) {
2111                    if current_value.is_null() {
2112                        true
2113                    } else {
2114                        match (current_value.as_i64(), new_value.as_i64()) {
2115                            (Some(current_val), Some(new_val)) => new_val < current_val,
2116                            (Some(current_val), None) if new_value.as_u64().is_some() => {
2117                                (new_value.as_u64().unwrap() as i64) < current_val
2118                            }
2119                            (None, Some(new_val)) if current_value.as_u64().is_some() => {
2120                                new_val < current_value.as_u64().unwrap() as i64
2121                            }
2122                            (None, None) => match (current_value.as_u64(), new_value.as_u64()) {
2123                                (Some(current_val), Some(new_val)) => new_val < current_val,
2124                                _ => match (current_value.as_f64(), new_value.as_f64()) {
2125                                    (Some(current_val), Some(new_val)) => new_val < current_val,
2126                                    _ => false,
2127                                },
2128                            },
2129                            _ => false,
2130                        }
2131                    }
2132                } else {
2133                    true
2134                };
2135
2136                if should_update {
2137                    current.insert(segment.to_string(), new_value);
2138                    return Ok(true);
2139                }
2140                return Ok(false);
2141            } else {
2142                current
2143                    .entry(segment.to_string())
2144                    .or_insert_with(|| json!({}));
2145                current = current
2146                    .get_mut(segment)
2147                    .and_then(|v| v.as_object_mut())
2148                    .ok_or("Path collision: expected object")?;
2149            }
2150        }
2151
2152        Ok(false)
2153    }
2154
2155    fn get_field(&mut self, object_reg: Register, path: &str) -> Result<Value> {
2156        let compiled = self.get_compiled_path(path);
2157        let segments = compiled.segments();
2158        let mut current = &self.registers[object_reg];
2159
2160        for segment in segments {
2161            current = current
2162                .get(segment)
2163                .ok_or_else(|| format!("Field not found: {}", segment))?;
2164        }
2165
2166        Ok(current.clone())
2167    }
2168
2169    fn append_to_array(
2170        &mut self,
2171        object_reg: Register,
2172        path: &str,
2173        value_reg: Register,
2174        max_length: usize,
2175    ) -> Result<()> {
2176        let compiled = self.get_compiled_path(path);
2177        let segments = compiled.segments();
2178        let value = self.registers[value_reg].clone();
2179
2180        if !self.registers[object_reg].is_object() {
2181            self.registers[object_reg] = json!({});
2182        }
2183
2184        let obj = self.registers[object_reg]
2185            .as_object_mut()
2186            .ok_or("Not an object")?;
2187
2188        let mut current = obj;
2189        for (i, segment) in segments.iter().enumerate() {
2190            if i == segments.len() - 1 {
2191                current
2192                    .entry(segment.to_string())
2193                    .or_insert_with(|| json!([]));
2194                let arr = current
2195                    .get_mut(segment)
2196                    .and_then(|v| v.as_array_mut())
2197                    .ok_or("Path is not an array")?;
2198                arr.push(value.clone());
2199
2200                if arr.len() > max_length {
2201                    let excess = arr.len() - max_length;
2202                    arr.drain(0..excess);
2203                }
2204            } else {
2205                current
2206                    .entry(segment.to_string())
2207                    .or_insert_with(|| json!({}));
2208                current = current
2209                    .get_mut(segment)
2210                    .and_then(|v| v.as_object_mut())
2211                    .ok_or("Path collision: expected object")?;
2212            }
2213        }
2214
2215        Ok(())
2216    }
2217
2218    fn transform_in_place(&mut self, reg: Register, transformation: &Transformation) -> Result<()> {
2219        let value = &self.registers[reg];
2220        let transformed = self.apply_transformation(value, transformation)?;
2221        self.registers[reg] = transformed;
2222        Ok(())
2223    }
2224
2225    fn apply_transformation(
2226        &self,
2227        value: &Value,
2228        transformation: &Transformation,
2229    ) -> Result<Value> {
2230        match transformation {
2231            Transformation::HexEncode => {
2232                if let Some(arr) = value.as_array() {
2233                    let bytes: Vec<u8> = arr
2234                        .iter()
2235                        .filter_map(|v| v.as_u64().map(|n| n as u8))
2236                        .collect();
2237                    let hex = hex::encode(&bytes);
2238                    Ok(json!(hex))
2239                } else {
2240                    Err("HexEncode requires an array of numbers".into())
2241                }
2242            }
2243            Transformation::HexDecode => {
2244                if let Some(s) = value.as_str() {
2245                    let s = s.strip_prefix("0x").unwrap_or(s);
2246                    let bytes = hex::decode(s).map_err(|e| format!("Hex decode error: {}", e))?;
2247                    Ok(json!(bytes))
2248                } else {
2249                    Err("HexDecode requires a string".into())
2250                }
2251            }
2252            Transformation::Base58Encode => {
2253                if let Some(arr) = value.as_array() {
2254                    let bytes: Vec<u8> = arr
2255                        .iter()
2256                        .filter_map(|v| v.as_u64().map(|n| n as u8))
2257                        .collect();
2258                    let encoded = bs58::encode(&bytes).into_string();
2259                    Ok(json!(encoded))
2260                } else if value.is_string() {
2261                    Ok(value.clone())
2262                } else {
2263                    Err("Base58Encode requires an array of numbers".into())
2264                }
2265            }
2266            Transformation::Base58Decode => {
2267                if let Some(s) = value.as_str() {
2268                    let bytes = bs58::decode(s)
2269                        .into_vec()
2270                        .map_err(|e| format!("Base58 decode error: {}", e))?;
2271                    Ok(json!(bytes))
2272                } else {
2273                    Err("Base58Decode requires a string".into())
2274                }
2275            }
2276            Transformation::ToString => Ok(json!(value.to_string())),
2277            Transformation::ToNumber => {
2278                if let Some(s) = value.as_str() {
2279                    let n = s
2280                        .parse::<i64>()
2281                        .map_err(|e| format!("Parse error: {}", e))?;
2282                    Ok(json!(n))
2283                } else {
2284                    Ok(value.clone())
2285                }
2286            }
2287        }
2288    }
2289
2290    fn evaluate_comparison(
2291        &self,
2292        field_value: &Value,
2293        op: &ComparisonOp,
2294        condition_value: &Value,
2295    ) -> Result<bool> {
2296        use ComparisonOp::*;
2297
2298        match op {
2299            Equal => Ok(field_value == condition_value),
2300            NotEqual => Ok(field_value != condition_value),
2301            GreaterThan => {
2302                // Try to compare as numbers
2303                match (field_value.as_i64(), condition_value.as_i64()) {
2304                    (Some(a), Some(b)) => Ok(a > b),
2305                    _ => match (field_value.as_u64(), condition_value.as_u64()) {
2306                        (Some(a), Some(b)) => Ok(a > b),
2307                        _ => match (field_value.as_f64(), condition_value.as_f64()) {
2308                            (Some(a), Some(b)) => Ok(a > b),
2309                            _ => Err("Cannot compare non-numeric values with GreaterThan".into()),
2310                        },
2311                    },
2312                }
2313            }
2314            GreaterThanOrEqual => match (field_value.as_i64(), condition_value.as_i64()) {
2315                (Some(a), Some(b)) => Ok(a >= b),
2316                _ => match (field_value.as_u64(), condition_value.as_u64()) {
2317                    (Some(a), Some(b)) => Ok(a >= b),
2318                    _ => match (field_value.as_f64(), condition_value.as_f64()) {
2319                        (Some(a), Some(b)) => Ok(a >= b),
2320                        _ => {
2321                            Err("Cannot compare non-numeric values with GreaterThanOrEqual".into())
2322                        }
2323                    },
2324                },
2325            },
2326            LessThan => match (field_value.as_i64(), condition_value.as_i64()) {
2327                (Some(a), Some(b)) => Ok(a < b),
2328                _ => match (field_value.as_u64(), condition_value.as_u64()) {
2329                    (Some(a), Some(b)) => Ok(a < b),
2330                    _ => match (field_value.as_f64(), condition_value.as_f64()) {
2331                        (Some(a), Some(b)) => Ok(a < b),
2332                        _ => Err("Cannot compare non-numeric values with LessThan".into()),
2333                    },
2334                },
2335            },
2336            LessThanOrEqual => match (field_value.as_i64(), condition_value.as_i64()) {
2337                (Some(a), Some(b)) => Ok(a <= b),
2338                _ => match (field_value.as_u64(), condition_value.as_u64()) {
2339                    (Some(a), Some(b)) => Ok(a <= b),
2340                    _ => match (field_value.as_f64(), condition_value.as_f64()) {
2341                        (Some(a), Some(b)) => Ok(a <= b),
2342                        _ => Err("Cannot compare non-numeric values with LessThanOrEqual".into()),
2343                    },
2344                },
2345            },
2346        }
2347    }
2348
2349    /// Update a PDA reverse lookup and return pending updates for reprocessing.
2350    /// Returns any pending account updates that were queued for this PDA.
2351    /// ```ignore
2352    /// let pending = vm.update_pda_reverse_lookup(state_id, lookup_name, pda_addr, seed)?;
2353    /// for update in pending {
2354    ///     vm.process_event(&bytecode, update.account_data, &update.account_type, None, None)?;
2355    /// }
2356    /// ```
2357    #[cfg_attr(feature = "otel", instrument(
2358        name = "vm.update_pda_lookup",
2359        skip(self),
2360        fields(
2361            pda = %pda_address,
2362            seed = %seed_value,
2363        )
2364    ))]
2365    pub fn update_pda_reverse_lookup(
2366        &mut self,
2367        state_id: u32,
2368        lookup_name: &str,
2369        pda_address: String,
2370        seed_value: String,
2371    ) -> Result<Vec<PendingAccountUpdate>> {
2372        let state = self
2373            .states
2374            .get_mut(&state_id)
2375            .ok_or("State table not found")?;
2376
2377        let lookup = state
2378            .pda_reverse_lookups
2379            .entry(lookup_name.to_string())
2380            .or_insert_with(|| PdaReverseLookup::new(DEFAULT_MAX_PDA_REVERSE_LOOKUP_ENTRIES));
2381
2382        let evicted_pda = lookup.insert(pda_address.clone(), seed_value);
2383
2384        if let Some(ref evicted) = evicted_pda {
2385            if let Some((_, evicted_updates)) = state.pending_updates.remove(evicted) {
2386                let count = evicted_updates.len();
2387                self.pending_queue_size = self.pending_queue_size.saturating_sub(count as u64);
2388            }
2389        }
2390
2391        // Flush and return pending updates for this PDA
2392        self.flush_pending_updates(state_id, &pda_address)
2393    }
2394
2395    /// Clean up expired pending updates that are older than the TTL
2396    ///
2397    /// Returns the number of updates that were removed.
2398    /// This should be called periodically to prevent memory leaks from orphaned updates.
2399    pub fn cleanup_expired_pending_updates(&mut self, state_id: u32) -> usize {
2400        let state = match self.states.get_mut(&state_id) {
2401            Some(s) => s,
2402            None => return 0,
2403        };
2404
2405        let now = std::time::SystemTime::now()
2406            .duration_since(std::time::UNIX_EPOCH)
2407            .unwrap()
2408            .as_secs() as i64;
2409
2410        let mut removed_count = 0;
2411
2412        // Iterate through all pending updates and remove expired ones
2413        state.pending_updates.retain(|_pda_address, updates| {
2414            let original_len = updates.len();
2415
2416            updates.retain(|update| {
2417                let age = now - update.queued_at;
2418                age <= PENDING_UPDATE_TTL_SECONDS
2419            });
2420
2421            removed_count += original_len - updates.len();
2422
2423            // Remove the entry entirely if no updates remain
2424            !updates.is_empty()
2425        });
2426
2427        // Update the global counter
2428        self.pending_queue_size = self.pending_queue_size.saturating_sub(removed_count as u64);
2429
2430        if removed_count > 0 {
2431            #[cfg(feature = "otel")]
2432            crate::vm_metrics::record_pending_updates_expired(
2433                removed_count as u64,
2434                &state.entity_name,
2435            );
2436        }
2437
2438        removed_count
2439    }
2440
2441    /// Queue an account update for later processing when PDA reverse lookup is not yet available
2442    ///
2443    /// # Workflow
2444    ///
2445    /// This implements a deferred processing pattern for account updates when the PDA reverse
2446    /// lookup needed to resolve the primary key is not yet available:
2447    ///
2448    /// 1. **Initial Account Update**: When an account update arrives but the PDA reverse lookup
2449    ///    is not available, call `queue_account_update()` to queue it for later.
2450    ///
2451    /// 2. **Register PDA Mapping**: When the instruction that establishes the PDA mapping is
2452    ///    processed, call `update_pda_reverse_lookup()` which returns pending updates.
2453    ///
2454    /// 3. **Reprocess Pending Updates**: Process the returned pending updates through the VM:
2455    ///    ```ignore
2456    ///    let pending = vm.update_pda_reverse_lookup(state_id, lookup_name, pda_addr, seed)?;
2457    ///    for update in pending {
2458    ///        let mutations = vm.process_event(
2459    ///            &bytecode, update.account_data, &update.account_type, None, None
2460    ///        )?;
2461    ///    }
2462    ///    ```
2463    ///
2464    /// # Arguments
2465    ///
2466    /// * `state_id` - The state table ID
2467    /// * `pda_address` - The PDA address that needs reverse lookup
2468    /// * `account_type` - The event type name for reprocessing
2469    /// * `account_data` - The account data (event value) for reprocessing
2470    /// * `slot` - The slot number when this update occurred
2471    /// * `signature` - The transaction signature
2472    #[cfg_attr(feature = "otel", instrument(
2473        name = "vm.queue_account_update",
2474        skip(self, update),
2475        fields(
2476            pda = %update.pda_address,
2477            account_type = %update.account_type,
2478            slot = update.slot,
2479        )
2480    ))]
2481    pub fn queue_account_update(
2482        &mut self,
2483        state_id: u32,
2484        update: QueuedAccountUpdate,
2485    ) -> Result<()> {
2486        if self.pending_queue_size >= MAX_PENDING_UPDATES_TOTAL as u64 {
2487            self.cleanup_expired_pending_updates(state_id);
2488            if self.pending_queue_size >= MAX_PENDING_UPDATES_TOTAL as u64 {
2489                self.drop_oldest_pending_update(state_id)?;
2490            }
2491        }
2492
2493        let state = self
2494            .states
2495            .get_mut(&state_id)
2496            .ok_or("State table not found")?;
2497
2498        let pending = PendingAccountUpdate {
2499            account_type: update.account_type,
2500            pda_address: update.pda_address.clone(),
2501            account_data: update.account_data,
2502            slot: update.slot,
2503            write_version: update.write_version,
2504            signature: update.signature,
2505            queued_at: std::time::SystemTime::now()
2506                .duration_since(std::time::UNIX_EPOCH)
2507                .unwrap()
2508                .as_secs() as i64,
2509        };
2510
2511        let pda_address = pending.pda_address.clone();
2512        let slot = pending.slot;
2513
2514        let mut updates = state
2515            .pending_updates
2516            .entry(pda_address.clone())
2517            .or_insert_with(Vec::new);
2518
2519        let original_len = updates.len();
2520        updates.retain(|existing| existing.slot > slot);
2521        let removed_by_dedup = original_len - updates.len();
2522
2523        if removed_by_dedup > 0 {
2524            self.pending_queue_size = self
2525                .pending_queue_size
2526                .saturating_sub(removed_by_dedup as u64);
2527        }
2528
2529        if updates.len() >= MAX_PENDING_UPDATES_PER_PDA {
2530            updates.remove(0);
2531            self.pending_queue_size = self.pending_queue_size.saturating_sub(1);
2532        }
2533
2534        updates.push(pending);
2535        #[cfg(feature = "otel")]
2536        crate::vm_metrics::record_pending_update_queued(&state.entity_name);
2537
2538        Ok(())
2539    }
2540
2541    pub fn queue_instruction_event(
2542        &mut self,
2543        state_id: u32,
2544        event: QueuedInstructionEvent,
2545    ) -> Result<()> {
2546        let state = self
2547            .states
2548            .get_mut(&state_id)
2549            .ok_or("State table not found")?;
2550
2551        let pda_address = event.pda_address.clone();
2552
2553        let pending = PendingInstructionEvent {
2554            event_type: event.event_type,
2555            pda_address: event.pda_address,
2556            event_data: event.event_data,
2557            slot: event.slot,
2558            signature: event.signature,
2559            queued_at: std::time::SystemTime::now()
2560                .duration_since(std::time::UNIX_EPOCH)
2561                .unwrap()
2562                .as_secs() as i64,
2563        };
2564
2565        let mut events = state
2566            .pending_instruction_events
2567            .entry(pda_address)
2568            .or_insert_with(Vec::new);
2569
2570        if events.len() >= MAX_PENDING_UPDATES_PER_PDA {
2571            events.remove(0);
2572        }
2573
2574        events.push(pending);
2575
2576        Ok(())
2577    }
2578
2579    pub fn take_last_pda_lookup_miss(&mut self) -> Option<String> {
2580        self.last_pda_lookup_miss.take()
2581    }
2582
2583    pub fn take_last_pda_registered(&mut self) -> Option<String> {
2584        self.last_pda_registered.take()
2585    }
2586
2587    pub fn flush_pending_instruction_events(
2588        &mut self,
2589        state_id: u32,
2590        pda_address: &str,
2591    ) -> Vec<PendingInstructionEvent> {
2592        let state = match self.states.get_mut(&state_id) {
2593            Some(s) => s,
2594            None => return Vec::new(),
2595        };
2596
2597        if let Some((_, events)) = state.pending_instruction_events.remove(pda_address) {
2598            events
2599        } else {
2600            Vec::new()
2601        }
2602    }
2603
2604    /// Get statistics about the pending queue for monitoring
2605    pub fn get_pending_queue_stats(&self, state_id: u32) -> Option<PendingQueueStats> {
2606        let state = self.states.get(&state_id)?;
2607
2608        let now = std::time::SystemTime::now()
2609            .duration_since(std::time::UNIX_EPOCH)
2610            .unwrap()
2611            .as_secs() as i64;
2612
2613        let mut total_updates = 0;
2614        let mut oldest_timestamp = now;
2615        let mut largest_pda_queue = 0;
2616        let mut estimated_memory = 0;
2617
2618        for entry in state.pending_updates.iter() {
2619            let (_, updates) = entry.pair();
2620            total_updates += updates.len();
2621            largest_pda_queue = largest_pda_queue.max(updates.len());
2622
2623            for update in updates.iter() {
2624                oldest_timestamp = oldest_timestamp.min(update.queued_at);
2625                // Rough memory estimate
2626                estimated_memory += update.account_type.len() +
2627                                   update.pda_address.len() +
2628                                   update.signature.len() +
2629                                   16 + // slot + queued_at
2630                                   estimate_json_size(&update.account_data);
2631            }
2632        }
2633
2634        Some(PendingQueueStats {
2635            total_updates,
2636            unique_pdas: state.pending_updates.len(),
2637            oldest_age_seconds: now - oldest_timestamp,
2638            largest_pda_queue_size: largest_pda_queue,
2639            estimated_memory_bytes: estimated_memory,
2640        })
2641    }
2642
2643    pub fn get_memory_stats(&self, state_id: u32) -> VmMemoryStats {
2644        let mut stats = VmMemoryStats {
2645            path_cache_size: self.path_cache.len(),
2646            ..Default::default()
2647        };
2648
2649        if let Some(state) = self.states.get(&state_id) {
2650            stats.state_table_entity_count = state.data.len();
2651            stats.state_table_max_entries = state.config.max_entries;
2652            stats.state_table_at_capacity = state.is_at_capacity();
2653
2654            stats.lookup_index_count = state.lookup_indexes.len();
2655            stats.lookup_index_total_entries =
2656                state.lookup_indexes.values().map(|idx| idx.len()).sum();
2657
2658            stats.temporal_index_count = state.temporal_indexes.len();
2659            stats.temporal_index_total_entries = state
2660                .temporal_indexes
2661                .values()
2662                .map(|idx| idx.total_entries())
2663                .sum();
2664
2665            stats.pda_reverse_lookup_count = state.pda_reverse_lookups.len();
2666            stats.pda_reverse_lookup_total_entries = state
2667                .pda_reverse_lookups
2668                .values()
2669                .map(|lookup| lookup.len())
2670                .sum();
2671
2672            stats.version_tracker_entries = state.version_tracker.len();
2673
2674            stats.pending_queue_stats = self.get_pending_queue_stats(state_id);
2675        }
2676
2677        stats
2678    }
2679
2680    pub fn cleanup_all_expired(&mut self, state_id: u32) -> CleanupResult {
2681        let pending_removed = self.cleanup_expired_pending_updates(state_id);
2682        let temporal_removed = self.cleanup_temporal_indexes(state_id);
2683
2684        #[cfg(feature = "otel")]
2685        if let Some(state) = self.states.get(&state_id) {
2686            crate::vm_metrics::record_cleanup(
2687                pending_removed,
2688                temporal_removed,
2689                &state.entity_name,
2690            );
2691        }
2692
2693        CleanupResult {
2694            pending_updates_removed: pending_removed,
2695            temporal_entries_removed: temporal_removed,
2696        }
2697    }
2698
2699    fn cleanup_temporal_indexes(&mut self, state_id: u32) -> usize {
2700        let state = match self.states.get_mut(&state_id) {
2701            Some(s) => s,
2702            None => return 0,
2703        };
2704
2705        let now = std::time::SystemTime::now()
2706            .duration_since(std::time::UNIX_EPOCH)
2707            .unwrap()
2708            .as_secs() as i64;
2709
2710        let cutoff = now - TEMPORAL_HISTORY_TTL_SECONDS;
2711        let mut total_removed = 0;
2712
2713        for (_, index) in state.temporal_indexes.iter_mut() {
2714            total_removed += index.cleanup_expired(cutoff);
2715        }
2716
2717        total_removed
2718    }
2719
2720    pub fn check_state_table_capacity(&self, state_id: u32) -> Option<CapacityWarning> {
2721        let state = self.states.get(&state_id)?;
2722
2723        if state.is_at_capacity() {
2724            Some(CapacityWarning {
2725                current_entries: state.data.len(),
2726                max_entries: state.config.max_entries,
2727                entries_over_limit: state.entries_over_limit(),
2728            })
2729        } else {
2730            None
2731        }
2732    }
2733
2734    /// Drop the oldest pending update across all PDAs
2735    fn drop_oldest_pending_update(&mut self, state_id: u32) -> Result<()> {
2736        let state = self
2737            .states
2738            .get_mut(&state_id)
2739            .ok_or("State table not found")?;
2740
2741        let mut oldest_pda: Option<String> = None;
2742        let mut oldest_timestamp = i64::MAX;
2743
2744        // Find the PDA with the oldest update
2745        for entry in state.pending_updates.iter() {
2746            let (pda, updates) = entry.pair();
2747            if let Some(update) = updates.first() {
2748                if update.queued_at < oldest_timestamp {
2749                    oldest_timestamp = update.queued_at;
2750                    oldest_pda = Some(pda.clone());
2751                }
2752            }
2753        }
2754
2755        // Remove the oldest update
2756        if let Some(pda) = oldest_pda {
2757            if let Some(mut updates) = state.pending_updates.get_mut(&pda) {
2758                if !updates.is_empty() {
2759                    updates.remove(0);
2760                    self.pending_queue_size = self.pending_queue_size.saturating_sub(1);
2761
2762                    // Remove the entry if it's now empty
2763                    if updates.is_empty() {
2764                        drop(updates);
2765                        state.pending_updates.remove(&pda);
2766                    }
2767                }
2768            }
2769        }
2770
2771        Ok(())
2772    }
2773
2774    /// Flush and return pending updates for a PDA for external reprocessing
2775    ///
2776    /// Returns the pending updates that were queued for this PDA address.
2777    /// The caller should reprocess these through the VM using process_event().
2778    fn flush_pending_updates(
2779        &mut self,
2780        state_id: u32,
2781        pda_address: &str,
2782    ) -> Result<Vec<PendingAccountUpdate>> {
2783        let state = self
2784            .states
2785            .get_mut(&state_id)
2786            .ok_or("State table not found")?;
2787
2788        if let Some((_, pending_updates)) = state.pending_updates.remove(pda_address) {
2789            let count = pending_updates.len();
2790            self.pending_queue_size = self.pending_queue_size.saturating_sub(count as u64);
2791            #[cfg(feature = "otel")]
2792            crate::vm_metrics::record_pending_updates_flushed(count as u64, &state.entity_name);
2793            Ok(pending_updates)
2794        } else {
2795            Ok(Vec::new())
2796        }
2797    }
2798
2799    /// Try to resolve a primary key via PDA reverse lookup
2800    pub fn try_pda_reverse_lookup(
2801        &mut self,
2802        state_id: u32,
2803        lookup_name: &str,
2804        pda_address: &str,
2805    ) -> Option<String> {
2806        let state = self.states.get_mut(&state_id)?;
2807
2808        if let Some(lookup) = state.pda_reverse_lookups.get_mut(lookup_name) {
2809            if let Some(value) = lookup.lookup(pda_address) {
2810                self.pda_cache_hits += 1;
2811                return Some(value);
2812            }
2813        }
2814
2815        self.pda_cache_misses += 1;
2816        None
2817    }
2818
2819    // ============================================================================
2820    // Computed Expression Evaluator (Task 5)
2821    // ============================================================================
2822
2823    /// Evaluate a computed expression AST against the current state
2824    /// This is the core runtime evaluator for computed fields from the AST
2825    pub fn evaluate_computed_expr(&self, expr: &ComputedExpr, state: &Value) -> Result<Value> {
2826        self.evaluate_computed_expr_with_env(expr, state, &std::collections::HashMap::new())
2827    }
2828
2829    /// Evaluate a computed expression with a variable environment (for let bindings)
2830    fn evaluate_computed_expr_with_env(
2831        &self,
2832        expr: &ComputedExpr,
2833        state: &Value,
2834        env: &std::collections::HashMap<String, Value>,
2835    ) -> Result<Value> {
2836        match expr {
2837            ComputedExpr::FieldRef { path } => self.get_field_from_state(state, path),
2838
2839            ComputedExpr::Var { name } => env
2840                .get(name)
2841                .cloned()
2842                .ok_or_else(|| format!("Undefined variable: {}", name).into()),
2843
2844            ComputedExpr::Let { name, value, body } => {
2845                let val = self.evaluate_computed_expr_with_env(value, state, env)?;
2846                let mut new_env = env.clone();
2847                new_env.insert(name.clone(), val);
2848                self.evaluate_computed_expr_with_env(body, state, &new_env)
2849            }
2850
2851            ComputedExpr::If {
2852                condition,
2853                then_branch,
2854                else_branch,
2855            } => {
2856                let cond_val = self.evaluate_computed_expr_with_env(condition, state, env)?;
2857                if self.value_to_bool(&cond_val) {
2858                    self.evaluate_computed_expr_with_env(then_branch, state, env)
2859                } else {
2860                    self.evaluate_computed_expr_with_env(else_branch, state, env)
2861                }
2862            }
2863
2864            ComputedExpr::None => Ok(Value::Null),
2865
2866            ComputedExpr::Some { value } => self.evaluate_computed_expr_with_env(value, state, env),
2867
2868            ComputedExpr::Slice { expr, start, end } => {
2869                let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
2870                match val {
2871                    Value::Array(arr) => {
2872                        let slice: Vec<Value> = arr.get(*start..*end).unwrap_or(&[]).to_vec();
2873                        Ok(Value::Array(slice))
2874                    }
2875                    _ => Err(format!("Cannot slice non-array value: {:?}", val).into()),
2876                }
2877            }
2878
2879            ComputedExpr::Index { expr, index } => {
2880                let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
2881                match val {
2882                    Value::Array(arr) => Ok(arr.get(*index).cloned().unwrap_or(Value::Null)),
2883                    _ => Err(format!("Cannot index non-array value: {:?}", val).into()),
2884                }
2885            }
2886
2887            ComputedExpr::U64FromLeBytes { bytes } => {
2888                let val = self.evaluate_computed_expr_with_env(bytes, state, env)?;
2889                let byte_vec = self.value_to_bytes(&val)?;
2890                if byte_vec.len() < 8 {
2891                    return Err(format!(
2892                        "u64::from_le_bytes requires 8 bytes, got {}",
2893                        byte_vec.len()
2894                    )
2895                    .into());
2896                }
2897                let arr: [u8; 8] = byte_vec[..8]
2898                    .try_into()
2899                    .map_err(|_| "Failed to convert to [u8; 8]")?;
2900                Ok(json!(u64::from_le_bytes(arr)))
2901            }
2902
2903            ComputedExpr::U64FromBeBytes { bytes } => {
2904                let val = self.evaluate_computed_expr_with_env(bytes, state, env)?;
2905                let byte_vec = self.value_to_bytes(&val)?;
2906                if byte_vec.len() < 8 {
2907                    return Err(format!(
2908                        "u64::from_be_bytes requires 8 bytes, got {}",
2909                        byte_vec.len()
2910                    )
2911                    .into());
2912                }
2913                let arr: [u8; 8] = byte_vec[..8]
2914                    .try_into()
2915                    .map_err(|_| "Failed to convert to [u8; 8]")?;
2916                Ok(json!(u64::from_be_bytes(arr)))
2917            }
2918
2919            ComputedExpr::ByteArray { bytes } => {
2920                Ok(Value::Array(bytes.iter().map(|b| json!(*b)).collect()))
2921            }
2922
2923            ComputedExpr::Closure { param, body } => {
2924                // Closures are stored as-is; they're evaluated when used in map()
2925                // Return a special representation
2926                Ok(json!({
2927                    "__closure": {
2928                        "param": param,
2929                        "body": serde_json::to_value(body).unwrap_or(Value::Null)
2930                    }
2931                }))
2932            }
2933
2934            ComputedExpr::Unary { op, expr } => {
2935                let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
2936                self.apply_unary_op(op, &val)
2937            }
2938
2939            ComputedExpr::JsonToBytes { expr } => {
2940                let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
2941                // Convert JSON array of numbers to byte array
2942                let bytes = self.value_to_bytes(&val)?;
2943                Ok(Value::Array(bytes.iter().map(|b| json!(*b)).collect()))
2944            }
2945
2946            ComputedExpr::UnwrapOr { expr, default } => {
2947                let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
2948                if val.is_null() {
2949                    Ok(default.clone())
2950                } else {
2951                    Ok(val)
2952                }
2953            }
2954
2955            ComputedExpr::Binary { op, left, right } => {
2956                let l = self.evaluate_computed_expr_with_env(left, state, env)?;
2957                let r = self.evaluate_computed_expr_with_env(right, state, env)?;
2958                self.apply_binary_op(op, &l, &r)
2959            }
2960
2961            ComputedExpr::Cast { expr, to_type } => {
2962                let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
2963                self.apply_cast(&val, to_type)
2964            }
2965
2966            ComputedExpr::MethodCall { expr, method, args } => {
2967                let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
2968                // Special handling for map() with closures
2969                if method == "map" && args.len() == 1 {
2970                    if let ComputedExpr::Closure { param, body } = &args[0] {
2971                        // If the value is null, return null (Option::None.map returns None)
2972                        if val.is_null() {
2973                            return Ok(Value::Null);
2974                        }
2975                        // Evaluate the closure body with the value bound to param
2976                        let mut closure_env = env.clone();
2977                        closure_env.insert(param.clone(), val);
2978                        return self.evaluate_computed_expr_with_env(body, state, &closure_env);
2979                    }
2980                }
2981                let evaluated_args: Vec<Value> = args
2982                    .iter()
2983                    .map(|a| self.evaluate_computed_expr_with_env(a, state, env))
2984                    .collect::<Result<Vec<_>>>()?;
2985                self.apply_method_call(&val, method, &evaluated_args)
2986            }
2987
2988            ComputedExpr::Literal { value } => Ok(value.clone()),
2989
2990            ComputedExpr::Paren { expr } => self.evaluate_computed_expr_with_env(expr, state, env),
2991        }
2992    }
2993
2994    /// Convert a JSON value to a byte vector
2995    fn value_to_bytes(&self, val: &Value) -> Result<Vec<u8>> {
2996        match val {
2997            Value::Array(arr) => arr
2998                .iter()
2999                .map(|v| {
3000                    v.as_u64()
3001                        .map(|n| n as u8)
3002                        .ok_or_else(|| "Array element not a valid byte".into())
3003                })
3004                .collect(),
3005            Value::String(s) => {
3006                // Try to decode as hex
3007                if s.starts_with("0x") || s.starts_with("0X") {
3008                    hex::decode(&s[2..]).map_err(|e| format!("Invalid hex string: {}", e).into())
3009                } else {
3010                    hex::decode(s).map_err(|e| format!("Invalid hex string: {}", e).into())
3011                }
3012            }
3013            _ => Err(format!("Cannot convert {:?} to bytes", val).into()),
3014        }
3015    }
3016
3017    /// Apply a unary operation
3018    fn apply_unary_op(&self, op: &crate::ast::UnaryOp, val: &Value) -> Result<Value> {
3019        use crate::ast::UnaryOp;
3020        match op {
3021            UnaryOp::Not => Ok(json!(!self.value_to_bool(val))),
3022            UnaryOp::ReverseBits => match val.as_u64() {
3023                Some(n) => Ok(json!(n.reverse_bits())),
3024                None => match val.as_i64() {
3025                    Some(n) => Ok(json!((n as u64).reverse_bits())),
3026                    None => Err("reverse_bits requires an integer".into()),
3027                },
3028            },
3029        }
3030    }
3031
3032    /// Get a field value from state by path (e.g., "section.field" or just "field")
3033    fn get_field_from_state(&self, state: &Value, path: &str) -> Result<Value> {
3034        let segments: Vec<&str> = path.split('.').collect();
3035        let mut current = state;
3036
3037        for segment in segments {
3038            match current.get(segment) {
3039                Some(v) => current = v,
3040                None => return Ok(Value::Null),
3041            }
3042        }
3043
3044        Ok(current.clone())
3045    }
3046
3047    /// Apply a binary operation to two values
3048    fn apply_binary_op(&self, op: &BinaryOp, left: &Value, right: &Value) -> Result<Value> {
3049        match op {
3050            // Arithmetic operations
3051            BinaryOp::Add => self.numeric_op(left, right, |a, b| a + b, |a, b| a + b),
3052            BinaryOp::Sub => self.numeric_op(left, right, |a, b| a - b, |a, b| a - b),
3053            BinaryOp::Mul => self.numeric_op(left, right, |a, b| a * b, |a, b| a * b),
3054            BinaryOp::Div => {
3055                // Check for division by zero
3056                if let Some(r) = right.as_i64() {
3057                    if r == 0 {
3058                        return Err("Division by zero".into());
3059                    }
3060                }
3061                if let Some(r) = right.as_f64() {
3062                    if r == 0.0 {
3063                        return Err("Division by zero".into());
3064                    }
3065                }
3066                self.numeric_op(left, right, |a, b| a / b, |a, b| a / b)
3067            }
3068            BinaryOp::Mod => {
3069                // Modulo - only for integers
3070                match (left.as_i64(), right.as_i64()) {
3071                    (Some(a), Some(b)) if b != 0 => Ok(json!(a % b)),
3072                    (None, _) | (_, None) => match (left.as_u64(), right.as_u64()) {
3073                        (Some(a), Some(b)) if b != 0 => Ok(json!(a % b)),
3074                        _ => Err("Modulo requires non-zero integer operands".into()),
3075                    },
3076                    _ => Err("Modulo by zero".into()),
3077                }
3078            }
3079
3080            // Comparison operations
3081            BinaryOp::Gt => self.comparison_op(left, right, |a, b| a > b, |a, b| a > b),
3082            BinaryOp::Lt => self.comparison_op(left, right, |a, b| a < b, |a, b| a < b),
3083            BinaryOp::Gte => self.comparison_op(left, right, |a, b| a >= b, |a, b| a >= b),
3084            BinaryOp::Lte => self.comparison_op(left, right, |a, b| a <= b, |a, b| a <= b),
3085            BinaryOp::Eq => Ok(json!(left == right)),
3086            BinaryOp::Ne => Ok(json!(left != right)),
3087
3088            // Logical operations
3089            BinaryOp::And => {
3090                let l_bool = self.value_to_bool(left);
3091                let r_bool = self.value_to_bool(right);
3092                Ok(json!(l_bool && r_bool))
3093            }
3094            BinaryOp::Or => {
3095                let l_bool = self.value_to_bool(left);
3096                let r_bool = self.value_to_bool(right);
3097                Ok(json!(l_bool || r_bool))
3098            }
3099
3100            // Bitwise operations
3101            BinaryOp::Xor => match (left.as_u64(), right.as_u64()) {
3102                (Some(a), Some(b)) => Ok(json!(a ^ b)),
3103                _ => match (left.as_i64(), right.as_i64()) {
3104                    (Some(a), Some(b)) => Ok(json!(a ^ b)),
3105                    _ => Err("XOR requires integer operands".into()),
3106                },
3107            },
3108            BinaryOp::BitAnd => match (left.as_u64(), right.as_u64()) {
3109                (Some(a), Some(b)) => Ok(json!(a & b)),
3110                _ => match (left.as_i64(), right.as_i64()) {
3111                    (Some(a), Some(b)) => Ok(json!(a & b)),
3112                    _ => Err("BitAnd requires integer operands".into()),
3113                },
3114            },
3115            BinaryOp::BitOr => match (left.as_u64(), right.as_u64()) {
3116                (Some(a), Some(b)) => Ok(json!(a | b)),
3117                _ => match (left.as_i64(), right.as_i64()) {
3118                    (Some(a), Some(b)) => Ok(json!(a | b)),
3119                    _ => Err("BitOr requires integer operands".into()),
3120                },
3121            },
3122            BinaryOp::Shl => match (left.as_u64(), right.as_u64()) {
3123                (Some(a), Some(b)) => Ok(json!(a << b)),
3124                _ => match (left.as_i64(), right.as_i64()) {
3125                    (Some(a), Some(b)) => Ok(json!(a << b)),
3126                    _ => Err("Shl requires integer operands".into()),
3127                },
3128            },
3129            BinaryOp::Shr => match (left.as_u64(), right.as_u64()) {
3130                (Some(a), Some(b)) => Ok(json!(a >> b)),
3131                _ => match (left.as_i64(), right.as_i64()) {
3132                    (Some(a), Some(b)) => Ok(json!(a >> b)),
3133                    _ => Err("Shr requires integer operands".into()),
3134                },
3135            },
3136        }
3137    }
3138
3139    /// Helper for numeric operations that can work on integers or floats
3140    fn numeric_op<F1, F2>(
3141        &self,
3142        left: &Value,
3143        right: &Value,
3144        int_op: F1,
3145        float_op: F2,
3146    ) -> Result<Value>
3147    where
3148        F1: Fn(i64, i64) -> i64,
3149        F2: Fn(f64, f64) -> f64,
3150    {
3151        // Try i64 first
3152        if let (Some(a), Some(b)) = (left.as_i64(), right.as_i64()) {
3153            return Ok(json!(int_op(a, b)));
3154        }
3155
3156        // Try u64
3157        if let (Some(a), Some(b)) = (left.as_u64(), right.as_u64()) {
3158            // For u64, we need to be careful with underflow in subtraction
3159            return Ok(json!(int_op(a as i64, b as i64)));
3160        }
3161
3162        // Try f64
3163        if let (Some(a), Some(b)) = (left.as_f64(), right.as_f64()) {
3164            return Ok(json!(float_op(a, b)));
3165        }
3166
3167        // If either is null, return null
3168        if left.is_null() || right.is_null() {
3169            return Ok(Value::Null);
3170        }
3171
3172        Err(format!(
3173            "Cannot perform numeric operation on {:?} and {:?}",
3174            left, right
3175        )
3176        .into())
3177    }
3178
3179    /// Helper for comparison operations
3180    fn comparison_op<F1, F2>(
3181        &self,
3182        left: &Value,
3183        right: &Value,
3184        int_cmp: F1,
3185        float_cmp: F2,
3186    ) -> Result<Value>
3187    where
3188        F1: Fn(i64, i64) -> bool,
3189        F2: Fn(f64, f64) -> bool,
3190    {
3191        // Try i64 first
3192        if let (Some(a), Some(b)) = (left.as_i64(), right.as_i64()) {
3193            return Ok(json!(int_cmp(a, b)));
3194        }
3195
3196        // Try u64
3197        if let (Some(a), Some(b)) = (left.as_u64(), right.as_u64()) {
3198            return Ok(json!(int_cmp(a as i64, b as i64)));
3199        }
3200
3201        // Try f64
3202        if let (Some(a), Some(b)) = (left.as_f64(), right.as_f64()) {
3203            return Ok(json!(float_cmp(a, b)));
3204        }
3205
3206        // If either is null, comparison returns false
3207        if left.is_null() || right.is_null() {
3208            return Ok(json!(false));
3209        }
3210
3211        Err(format!("Cannot compare {:?} and {:?}", left, right).into())
3212    }
3213
3214    /// Convert a value to boolean for logical operations
3215    fn value_to_bool(&self, value: &Value) -> bool {
3216        match value {
3217            Value::Null => false,
3218            Value::Bool(b) => *b,
3219            Value::Number(n) => {
3220                if let Some(i) = n.as_i64() {
3221                    i != 0
3222                } else if let Some(f) = n.as_f64() {
3223                    f != 0.0
3224                } else {
3225                    true
3226                }
3227            }
3228            Value::String(s) => !s.is_empty(),
3229            Value::Array(arr) => !arr.is_empty(),
3230            Value::Object(obj) => !obj.is_empty(),
3231        }
3232    }
3233
3234    /// Apply a type cast to a value
3235    fn apply_cast(&self, value: &Value, to_type: &str) -> Result<Value> {
3236        match to_type {
3237            "i8" | "i16" | "i32" | "i64" | "isize" => {
3238                if let Some(n) = value.as_i64() {
3239                    Ok(json!(n))
3240                } else if let Some(n) = value.as_u64() {
3241                    Ok(json!(n as i64))
3242                } else if let Some(n) = value.as_f64() {
3243                    Ok(json!(n as i64))
3244                } else if let Some(s) = value.as_str() {
3245                    s.parse::<i64>()
3246                        .map(|n| json!(n))
3247                        .map_err(|e| format!("Cannot parse '{}' as integer: {}", s, e).into())
3248                } else {
3249                    Err(format!("Cannot cast {:?} to {}", value, to_type).into())
3250                }
3251            }
3252            "u8" | "u16" | "u32" | "u64" | "usize" => {
3253                if let Some(n) = value.as_u64() {
3254                    Ok(json!(n))
3255                } else if let Some(n) = value.as_i64() {
3256                    Ok(json!(n as u64))
3257                } else if let Some(n) = value.as_f64() {
3258                    Ok(json!(n as u64))
3259                } else if let Some(s) = value.as_str() {
3260                    s.parse::<u64>().map(|n| json!(n)).map_err(|e| {
3261                        format!("Cannot parse '{}' as unsigned integer: {}", s, e).into()
3262                    })
3263                } else {
3264                    Err(format!("Cannot cast {:?} to {}", value, to_type).into())
3265                }
3266            }
3267            "f32" | "f64" => {
3268                if let Some(n) = value.as_f64() {
3269                    Ok(json!(n))
3270                } else if let Some(n) = value.as_i64() {
3271                    Ok(json!(n as f64))
3272                } else if let Some(n) = value.as_u64() {
3273                    Ok(json!(n as f64))
3274                } else if let Some(s) = value.as_str() {
3275                    s.parse::<f64>()
3276                        .map(|n| json!(n))
3277                        .map_err(|e| format!("Cannot parse '{}' as float: {}", s, e).into())
3278                } else {
3279                    Err(format!("Cannot cast {:?} to {}", value, to_type).into())
3280                }
3281            }
3282            "String" | "string" => Ok(json!(value.to_string())),
3283            "bool" => Ok(json!(self.value_to_bool(value))),
3284            _ => {
3285                // Unknown type, return value as-is
3286                Ok(value.clone())
3287            }
3288        }
3289    }
3290
3291    /// Apply a method call to a value
3292    fn apply_method_call(&self, value: &Value, method: &str, args: &[Value]) -> Result<Value> {
3293        match method {
3294            "unwrap_or" => {
3295                if value.is_null() && !args.is_empty() {
3296                    Ok(args[0].clone())
3297                } else {
3298                    Ok(value.clone())
3299                }
3300            }
3301            "unwrap_or_default" => {
3302                if value.is_null() {
3303                    // Return default for common types
3304                    Ok(json!(0))
3305                } else {
3306                    Ok(value.clone())
3307                }
3308            }
3309            "is_some" => Ok(json!(!value.is_null())),
3310            "is_none" => Ok(json!(value.is_null())),
3311            "abs" => {
3312                if let Some(n) = value.as_i64() {
3313                    Ok(json!(n.abs()))
3314                } else if let Some(n) = value.as_f64() {
3315                    Ok(json!(n.abs()))
3316                } else {
3317                    Err(format!("Cannot call abs() on {:?}", value).into())
3318                }
3319            }
3320            "len" => {
3321                if let Some(s) = value.as_str() {
3322                    Ok(json!(s.len()))
3323                } else if let Some(arr) = value.as_array() {
3324                    Ok(json!(arr.len()))
3325                } else if let Some(obj) = value.as_object() {
3326                    Ok(json!(obj.len()))
3327                } else {
3328                    Err(format!("Cannot call len() on {:?}", value).into())
3329                }
3330            }
3331            "to_string" => Ok(json!(value.to_string())),
3332            "min" => {
3333                if args.is_empty() {
3334                    return Err("min() requires an argument".into());
3335                }
3336                let other = &args[0];
3337                if let (Some(a), Some(b)) = (value.as_i64(), other.as_i64()) {
3338                    Ok(json!(a.min(b)))
3339                } else if let (Some(a), Some(b)) = (value.as_f64(), other.as_f64()) {
3340                    Ok(json!(a.min(b)))
3341                } else {
3342                    Err(format!("Cannot call min() on {:?} and {:?}", value, other).into())
3343                }
3344            }
3345            "max" => {
3346                if args.is_empty() {
3347                    return Err("max() requires an argument".into());
3348                }
3349                let other = &args[0];
3350                if let (Some(a), Some(b)) = (value.as_i64(), other.as_i64()) {
3351                    Ok(json!(a.max(b)))
3352                } else if let (Some(a), Some(b)) = (value.as_f64(), other.as_f64()) {
3353                    Ok(json!(a.max(b)))
3354                } else {
3355                    Err(format!("Cannot call max() on {:?} and {:?}", value, other).into())
3356                }
3357            }
3358            "saturating_add" => {
3359                if args.is_empty() {
3360                    return Err("saturating_add() requires an argument".into());
3361                }
3362                let other = &args[0];
3363                if let (Some(a), Some(b)) = (value.as_i64(), other.as_i64()) {
3364                    Ok(json!(a.saturating_add(b)))
3365                } else if let (Some(a), Some(b)) = (value.as_u64(), other.as_u64()) {
3366                    Ok(json!(a.saturating_add(b)))
3367                } else {
3368                    Err(format!(
3369                        "Cannot call saturating_add() on {:?} and {:?}",
3370                        value, other
3371                    )
3372                    .into())
3373                }
3374            }
3375            "saturating_sub" => {
3376                if args.is_empty() {
3377                    return Err("saturating_sub() requires an argument".into());
3378                }
3379                let other = &args[0];
3380                if let (Some(a), Some(b)) = (value.as_i64(), other.as_i64()) {
3381                    Ok(json!(a.saturating_sub(b)))
3382                } else if let (Some(a), Some(b)) = (value.as_u64(), other.as_u64()) {
3383                    Ok(json!(a.saturating_sub(b)))
3384                } else {
3385                    Err(format!(
3386                        "Cannot call saturating_sub() on {:?} and {:?}",
3387                        value, other
3388                    )
3389                    .into())
3390                }
3391            }
3392            _ => Err(format!("Unknown method call: {}()", method).into()),
3393        }
3394    }
3395
3396    /// Evaluate all computed fields for an entity and update the state
3397    /// This takes a list of ComputedFieldSpec from the AST and applies them
3398    pub fn evaluate_computed_fields_from_ast(
3399        &self,
3400        state: &mut Value,
3401        computed_field_specs: &[ComputedFieldSpec],
3402    ) -> Result<Vec<String>> {
3403        let mut updated_paths = Vec::new();
3404
3405        for spec in computed_field_specs {
3406            if let Ok(result) = self.evaluate_computed_expr(&spec.expression, state) {
3407                self.set_field_in_state(state, &spec.target_path, result)?;
3408                updated_paths.push(spec.target_path.clone());
3409            }
3410        }
3411
3412        Ok(updated_paths)
3413    }
3414
3415    /// Set a field value in state by path (e.g., "section.field")
3416    fn set_field_in_state(&self, state: &mut Value, path: &str, value: Value) -> Result<()> {
3417        let segments: Vec<&str> = path.split('.').collect();
3418
3419        if segments.is_empty() {
3420            return Err("Empty path".into());
3421        }
3422
3423        // Navigate to parent, creating intermediate objects as needed
3424        let mut current = state;
3425        for (i, segment) in segments.iter().enumerate() {
3426            if i == segments.len() - 1 {
3427                // Last segment - set the value
3428                if let Some(obj) = current.as_object_mut() {
3429                    obj.insert(segment.to_string(), value);
3430                    return Ok(());
3431                } else {
3432                    return Err(format!("Cannot set field '{}' on non-object", segment).into());
3433                }
3434            } else {
3435                // Intermediate segment - navigate or create
3436                if !current.is_object() {
3437                    *current = json!({});
3438                }
3439                let obj = current.as_object_mut().unwrap();
3440                current = obj.entry(segment.to_string()).or_insert_with(|| json!({}));
3441            }
3442        }
3443
3444        Ok(())
3445    }
3446
3447    /// Create a computed fields evaluator closure from AST specs
3448    /// This returns a function that can be passed to the bytecode builder
3449    pub fn create_evaluator_from_specs(
3450        specs: Vec<ComputedFieldSpec>,
3451    ) -> impl Fn(&mut Value) -> Result<()> + Send + Sync + 'static {
3452        move |state: &mut Value| {
3453            // Create a temporary VmContext just for evaluation
3454            // (We only need the expression evaluation methods)
3455            let vm = VmContext::new();
3456            vm.evaluate_computed_fields_from_ast(state, &specs)?;
3457            Ok(())
3458        }
3459    }
3460}
3461
3462impl Default for VmContext {
3463    fn default() -> Self {
3464        Self::new()
3465    }
3466}
3467
3468// Implement the ReverseLookupUpdater trait for VmContext
3469impl crate::resolvers::ReverseLookupUpdater for VmContext {
3470    fn update(&mut self, pda_address: String, seed_value: String) -> Vec<PendingAccountUpdate> {
3471        // Use default state_id=0 and default lookup name
3472        self.update_pda_reverse_lookup(0, "default_pda_lookup", pda_address, seed_value)
3473            .unwrap_or_else(|e| {
3474                tracing::error!("Failed to update PDA reverse lookup: {}", e);
3475                Vec::new()
3476            })
3477    }
3478
3479    fn flush_pending(&mut self, pda_address: &str) -> Vec<PendingAccountUpdate> {
3480        // Flush is handled inside update_pda_reverse_lookup, but we can also call it directly
3481        self.flush_pending_updates(0, pda_address)
3482            .unwrap_or_else(|e| {
3483                tracing::error!("Failed to flush pending updates: {}", e);
3484                Vec::new()
3485            })
3486    }
3487}
3488
3489#[cfg(test)]
3490mod tests {
3491    use super::*;
3492    use crate::ast::{BinaryOp, ComputedExpr, ComputedFieldSpec};
3493
3494    #[test]
3495    fn test_computed_field_preserves_integer_type() {
3496        let vm = VmContext::new();
3497
3498        let mut state = serde_json::json!({
3499            "trading": {
3500                "total_buy_volume": 20000000000_i64,
3501                "total_sell_volume": 17951316474_i64
3502            }
3503        });
3504
3505        let spec = ComputedFieldSpec {
3506            target_path: "trading.total_volume".to_string(),
3507            result_type: "Option<u64>".to_string(),
3508            expression: ComputedExpr::Binary {
3509                op: BinaryOp::Add,
3510                left: Box::new(ComputedExpr::UnwrapOr {
3511                    expr: Box::new(ComputedExpr::FieldRef {
3512                        path: "trading.total_buy_volume".to_string(),
3513                    }),
3514                    default: serde_json::json!(0),
3515                }),
3516                right: Box::new(ComputedExpr::UnwrapOr {
3517                    expr: Box::new(ComputedExpr::FieldRef {
3518                        path: "trading.total_sell_volume".to_string(),
3519                    }),
3520                    default: serde_json::json!(0),
3521                }),
3522            },
3523        };
3524
3525        vm.evaluate_computed_fields_from_ast(&mut state, &[spec])
3526            .unwrap();
3527
3528        let total_volume = state
3529            .get("trading")
3530            .and_then(|t| t.get("total_volume"))
3531            .expect("total_volume should exist");
3532
3533        let serialized = serde_json::to_string(total_volume).unwrap();
3534        assert!(
3535            !serialized.contains('.'),
3536            "Integer should not have decimal point: {}",
3537            serialized
3538        );
3539        assert_eq!(
3540            total_volume.as_i64(),
3541            Some(37951316474),
3542            "Value should be correct sum"
3543        );
3544    }
3545
3546    #[test]
3547    fn test_set_field_sum_preserves_integer_type() {
3548        let mut vm = VmContext::new();
3549        vm.registers[0] = serde_json::json!({});
3550        vm.registers[1] = serde_json::json!(20000000000_i64);
3551        vm.registers[2] = serde_json::json!(17951316474_i64);
3552
3553        vm.set_field_sum(0, "trading.total_buy_volume", 1).unwrap();
3554        vm.set_field_sum(0, "trading.total_sell_volume", 2).unwrap();
3555
3556        let state = &vm.registers[0];
3557        let buy_vol = state
3558            .get("trading")
3559            .and_then(|t| t.get("total_buy_volume"))
3560            .unwrap();
3561        let sell_vol = state
3562            .get("trading")
3563            .and_then(|t| t.get("total_sell_volume"))
3564            .unwrap();
3565
3566        let buy_serialized = serde_json::to_string(buy_vol).unwrap();
3567        let sell_serialized = serde_json::to_string(sell_vol).unwrap();
3568
3569        assert!(
3570            !buy_serialized.contains('.'),
3571            "Buy volume should not have decimal: {}",
3572            buy_serialized
3573        );
3574        assert!(
3575            !sell_serialized.contains('.'),
3576            "Sell volume should not have decimal: {}",
3577            sell_serialized
3578        );
3579    }
3580}