1use crate::ast::{
2 BinaryOp, ComparisonOp, ComputedExpr, ComputedFieldSpec, FieldPath, Transformation,
3};
4use crate::compiler::{MultiEntityBytecode, OpCode};
5use crate::Mutation;
6use dashmap::DashMap;
7use lru::LruCache;
8use serde_json::{json, Value};
9use std::collections::{HashMap, HashSet};
10use std::num::NonZeroUsize;
11
12#[cfg(feature = "otel")]
13use tracing::instrument;
14#[derive(Debug, Clone, Default)]
17pub struct UpdateContext {
18 pub slot: Option<u64>,
20 pub signature: Option<String>,
22 pub timestamp: Option<i64>,
25 pub write_version: Option<u64>,
28 pub txn_index: Option<u64>,
31 pub metadata: HashMap<String, Value>,
33}
34
35impl UpdateContext {
36 pub fn new(slot: u64, signature: String) -> Self {
38 Self {
39 slot: Some(slot),
40 signature: Some(signature),
41 timestamp: None,
42 write_version: None,
43 txn_index: None,
44 metadata: HashMap::new(),
45 }
46 }
47
48 pub fn with_timestamp(slot: u64, signature: String, timestamp: i64) -> Self {
50 Self {
51 slot: Some(slot),
52 signature: Some(signature),
53 timestamp: Some(timestamp),
54 write_version: None,
55 txn_index: None,
56 metadata: HashMap::new(),
57 }
58 }
59
60 pub fn new_account(slot: u64, signature: String, write_version: u64) -> Self {
62 Self {
63 slot: Some(slot),
64 signature: Some(signature),
65 timestamp: None,
66 write_version: Some(write_version),
67 txn_index: None,
68 metadata: HashMap::new(),
69 }
70 }
71
72 pub fn new_instruction(slot: u64, signature: String, txn_index: u64) -> Self {
74 Self {
75 slot: Some(slot),
76 signature: Some(signature),
77 timestamp: None,
78 write_version: None,
79 txn_index: Some(txn_index),
80 metadata: HashMap::new(),
81 }
82 }
83
84 pub fn timestamp(&self) -> i64 {
86 self.timestamp.unwrap_or_else(|| {
87 std::time::SystemTime::now()
88 .duration_since(std::time::UNIX_EPOCH)
89 .unwrap()
90 .as_secs() as i64
91 })
92 }
93
94 pub fn empty() -> Self {
96 Self::default()
97 }
98
99 pub fn is_account_update(&self) -> bool {
102 self.write_version.is_some() && self.txn_index.is_none()
103 }
104
105 pub fn is_instruction_update(&self) -> bool {
107 self.txn_index.is_some() && self.write_version.is_none()
108 }
109
110 pub fn with_metadata(mut self, key: String, value: Value) -> Self {
111 self.metadata.insert(key, value);
112 self
113 }
114
115 pub fn get_metadata(&self, key: &str) -> Option<&Value> {
117 self.metadata.get(key)
118 }
119
120 pub fn to_value(&self) -> Value {
122 let mut obj = serde_json::Map::new();
123 if let Some(slot) = self.slot {
124 obj.insert("slot".to_string(), json!(slot));
125 }
126 if let Some(ref sig) = self.signature {
127 obj.insert("signature".to_string(), json!(sig));
128 }
129 obj.insert("timestamp".to_string(), json!(self.timestamp()));
131 for (key, value) in &self.metadata {
132 obj.insert(key.clone(), value.clone());
133 }
134 Value::Object(obj)
135 }
136}
137
138pub type Register = usize;
139pub type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>;
140
141pub type RegisterValue = Value;
142
143pub trait ComputedFieldsEvaluator {
146 fn evaluate(&self, state: &mut Value) -> Result<()>;
147}
148
149const MAX_PENDING_UPDATES_TOTAL: usize = 2_500;
151const MAX_PENDING_UPDATES_PER_PDA: usize = 50;
152const PENDING_UPDATE_TTL_SECONDS: i64 = 300; const TEMPORAL_HISTORY_TTL_SECONDS: i64 = 300; const MAX_TEMPORAL_ENTRIES_PER_KEY: usize = 250;
157
158const DEFAULT_MAX_STATE_TABLE_ENTRIES: usize = 2_500;
160const DEFAULT_MAX_ARRAY_LENGTH: usize = 100;
161
162const DEFAULT_MAX_LOOKUP_INDEX_ENTRIES: usize = 2_500;
163
164const DEFAULT_MAX_VERSION_TRACKER_ENTRIES: usize = 2_500;
165
166const DEFAULT_MAX_INSTRUCTION_DEDUP_ENTRIES: usize = 500;
169
170const DEFAULT_MAX_TEMPORAL_INDEX_KEYS: usize = 2_500;
171
172const DEFAULT_MAX_PDA_REVERSE_LOOKUP_ENTRIES: usize = 2_500;
173
174fn estimate_json_size(value: &Value) -> usize {
176 match value {
177 Value::Null => 4,
178 Value::Bool(_) => 5,
179 Value::Number(_) => 8,
180 Value::String(s) => s.len() + 2,
181 Value::Array(arr) => 2 + arr.iter().map(|v| estimate_json_size(v) + 1).sum::<usize>(),
182 Value::Object(obj) => {
183 2 + obj
184 .iter()
185 .map(|(k, v)| k.len() + 3 + estimate_json_size(v) + 1)
186 .sum::<usize>()
187 }
188 }
189}
190
191#[derive(Debug, Clone)]
192pub struct CompiledPath {
193 pub segments: std::sync::Arc<[String]>,
194}
195
196impl CompiledPath {
197 pub fn new(path: &str) -> Self {
198 let segments: Vec<String> = path.split('.').map(|s| s.to_string()).collect();
199 CompiledPath {
200 segments: segments.into(),
201 }
202 }
203
204 fn segments(&self) -> &[String] {
205 &self.segments
206 }
207}
208
209#[derive(Debug, Clone)]
212pub enum FieldChange {
213 Replaced,
215 Appended(Vec<Value>),
217}
218
219#[derive(Debug, Clone, Default)]
222pub struct DirtyTracker {
223 changes: HashMap<String, FieldChange>,
224}
225
226impl DirtyTracker {
227 pub fn new() -> Self {
229 Self {
230 changes: HashMap::new(),
231 }
232 }
233
234 pub fn mark_replaced(&mut self, path: &str) {
236 self.changes.insert(path.to_string(), FieldChange::Replaced);
238 }
239
240 pub fn mark_appended(&mut self, path: &str, value: Value) {
242 match self.changes.get_mut(path) {
243 Some(FieldChange::Appended(values)) => {
244 values.push(value);
246 }
247 Some(FieldChange::Replaced) => {
248 }
251 None => {
252 self.changes
254 .insert(path.to_string(), FieldChange::Appended(vec![value]));
255 }
256 }
257 }
258
259 pub fn is_empty(&self) -> bool {
261 self.changes.is_empty()
262 }
263
264 pub fn len(&self) -> usize {
266 self.changes.len()
267 }
268
269 pub fn iter(&self) -> impl Iterator<Item = (&String, &FieldChange)> {
271 self.changes.iter()
272 }
273
274 pub fn dirty_paths(&self) -> HashSet<String> {
276 self.changes.keys().cloned().collect()
277 }
278
279 pub fn into_changes(self) -> HashMap<String, FieldChange> {
281 self.changes
282 }
283
284 pub fn changes(&self) -> &HashMap<String, FieldChange> {
286 &self.changes
287 }
288
289 pub fn appended_paths(&self) -> Vec<String> {
291 self.changes
292 .iter()
293 .filter_map(|(path, change)| match change {
294 FieldChange::Appended(_) => Some(path.clone()),
295 FieldChange::Replaced => None,
296 })
297 .collect()
298 }
299}
300
301pub struct VmContext {
302 registers: Vec<RegisterValue>,
303 states: HashMap<u32, StateTable>,
304 pub instructions_executed: u64,
305 pub cache_hits: u64,
306 path_cache: HashMap<String, CompiledPath>,
307 pub pda_cache_hits: u64,
308 pub pda_cache_misses: u64,
309 pub pending_queue_size: u64,
310 current_context: Option<UpdateContext>,
311 warnings: Vec<String>,
312 last_pda_lookup_miss: Option<String>,
313 last_pda_registered: Option<String>,
314 last_lookup_index_keys: Vec<String>,
315}
316
317#[derive(Debug)]
318pub struct LookupIndex {
319 index: std::sync::Mutex<LruCache<String, Value>>,
320}
321
322impl LookupIndex {
323 pub fn new() -> Self {
324 Self::with_capacity(DEFAULT_MAX_LOOKUP_INDEX_ENTRIES)
325 }
326
327 pub fn with_capacity(capacity: usize) -> Self {
328 LookupIndex {
329 index: std::sync::Mutex::new(LruCache::new(
330 NonZeroUsize::new(capacity).expect("capacity must be > 0"),
331 )),
332 }
333 }
334
335 pub fn lookup(&self, lookup_value: &Value) -> Option<Value> {
336 let key = value_to_cache_key(lookup_value);
337 self.index.lock().unwrap().get(&key).cloned()
338 }
339
340 pub fn insert(&self, lookup_value: Value, primary_key: Value) {
341 let key = value_to_cache_key(&lookup_value);
342 self.index.lock().unwrap().put(key, primary_key);
343 }
344
345 pub fn len(&self) -> usize {
346 self.index.lock().unwrap().len()
347 }
348
349 pub fn is_empty(&self) -> bool {
350 self.index.lock().unwrap().is_empty()
351 }
352}
353
354impl Default for LookupIndex {
355 fn default() -> Self {
356 Self::new()
357 }
358}
359
360fn value_to_cache_key(value: &Value) -> String {
361 match value {
362 Value::String(s) => s.clone(),
363 Value::Number(n) => n.to_string(),
364 Value::Bool(b) => b.to_string(),
365 Value::Null => "null".to_string(),
366 _ => serde_json::to_string(value).unwrap_or_else(|_| "unknown".to_string()),
367 }
368}
369
370#[derive(Debug)]
371pub struct TemporalIndex {
372 index: std::sync::Mutex<LruCache<String, Vec<(Value, i64)>>>,
373}
374
375impl Default for TemporalIndex {
376 fn default() -> Self {
377 Self::new()
378 }
379}
380
381impl TemporalIndex {
382 pub fn new() -> Self {
383 Self::with_capacity(DEFAULT_MAX_TEMPORAL_INDEX_KEYS)
384 }
385
386 pub fn with_capacity(capacity: usize) -> Self {
387 TemporalIndex {
388 index: std::sync::Mutex::new(LruCache::new(
389 NonZeroUsize::new(capacity).expect("capacity must be > 0"),
390 )),
391 }
392 }
393
394 pub fn lookup(&self, lookup_value: &Value, timestamp: i64) -> Option<Value> {
395 let key = value_to_cache_key(lookup_value);
396 let mut cache = self.index.lock().unwrap();
397 if let Some(entries) = cache.get(&key) {
398 for i in (0..entries.len()).rev() {
399 if entries[i].1 <= timestamp {
400 return Some(entries[i].0.clone());
401 }
402 }
403 }
404 None
405 }
406
407 pub fn lookup_latest(&self, lookup_value: &Value) -> Option<Value> {
408 let key = value_to_cache_key(lookup_value);
409 let mut cache = self.index.lock().unwrap();
410 if let Some(entries) = cache.get(&key) {
411 if let Some(last) = entries.last() {
412 return Some(last.0.clone());
413 }
414 }
415 None
416 }
417
418 pub fn insert(&self, lookup_value: Value, primary_key: Value, timestamp: i64) {
419 let key = value_to_cache_key(&lookup_value);
420 let mut cache = self.index.lock().unwrap();
421
422 let entries = cache.get_or_insert_mut(key, Vec::new);
423 entries.push((primary_key, timestamp));
424 entries.sort_by_key(|(_, ts)| *ts);
425
426 let cutoff = timestamp - TEMPORAL_HISTORY_TTL_SECONDS;
427 entries.retain(|(_, ts)| *ts >= cutoff);
428
429 if entries.len() > MAX_TEMPORAL_ENTRIES_PER_KEY {
430 let excess = entries.len() - MAX_TEMPORAL_ENTRIES_PER_KEY;
431 entries.drain(0..excess);
432 }
433 }
434
435 pub fn len(&self) -> usize {
436 self.index.lock().unwrap().len()
437 }
438
439 pub fn is_empty(&self) -> bool {
440 self.index.lock().unwrap().is_empty()
441 }
442
443 pub fn total_entries(&self) -> usize {
444 self.index
445 .lock()
446 .unwrap()
447 .iter()
448 .map(|(_, entries)| entries.len())
449 .sum()
450 }
451
452 pub fn cleanup_expired(&self, cutoff_timestamp: i64) -> usize {
453 let mut cache = self.index.lock().unwrap();
454 let mut total_removed = 0;
455
456 for (_, entries) in cache.iter_mut() {
457 let original_len = entries.len();
458 entries.retain(|(_, ts)| *ts >= cutoff_timestamp);
459 total_removed += original_len - entries.len();
460 }
461
462 total_removed
463 }
464}
465
466#[derive(Debug)]
467pub struct PdaReverseLookup {
468 index: LruCache<String, String>,
470}
471
472impl PdaReverseLookup {
473 pub fn new(capacity: usize) -> Self {
474 PdaReverseLookup {
475 index: LruCache::new(NonZeroUsize::new(capacity).unwrap()),
476 }
477 }
478
479 pub fn lookup(&mut self, pda_address: &str) -> Option<String> {
480 self.index.get(pda_address).cloned()
481 }
482
483 pub fn insert(&mut self, pda_address: String, seed_value: String) -> Option<String> {
484 let evicted = if self.index.len() >= self.index.cap().get() {
485 self.index.peek_lru().map(|(k, _)| k.clone())
486 } else {
487 None
488 };
489
490 self.index.put(pda_address, seed_value);
491 evicted
492 }
493
494 pub fn len(&self) -> usize {
495 self.index.len()
496 }
497
498 pub fn is_empty(&self) -> bool {
499 self.index.is_empty()
500 }
501}
502
503#[derive(Debug, Clone)]
505pub struct QueuedAccountUpdate {
506 pub pda_address: String,
507 pub account_type: String,
508 pub account_data: Value,
509 pub slot: u64,
510 pub write_version: u64,
511 pub signature: String,
512}
513
514#[derive(Debug, Clone)]
516pub struct PendingAccountUpdate {
517 pub account_type: String,
518 pub pda_address: String,
519 pub account_data: Value,
520 pub slot: u64,
521 pub write_version: u64,
522 pub signature: String,
523 pub queued_at: i64,
524}
525
526#[derive(Debug, Clone)]
528pub struct QueuedInstructionEvent {
529 pub pda_address: String,
530 pub event_type: String,
531 pub event_data: Value,
532 pub slot: u64,
533 pub signature: String,
534}
535
536#[derive(Debug, Clone)]
538pub struct PendingInstructionEvent {
539 pub event_type: String,
540 pub pda_address: String,
541 pub event_data: Value,
542 pub slot: u64,
543 pub signature: String,
544 pub queued_at: i64,
545}
546
547#[derive(Debug, Clone)]
548pub struct PendingQueueStats {
549 pub total_updates: usize,
550 pub unique_pdas: usize,
551 pub oldest_age_seconds: i64,
552 pub largest_pda_queue_size: usize,
553 pub estimated_memory_bytes: usize,
554}
555
556#[derive(Debug, Clone, Default)]
557pub struct VmMemoryStats {
558 pub state_table_entity_count: usize,
559 pub state_table_max_entries: usize,
560 pub state_table_at_capacity: bool,
561 pub lookup_index_count: usize,
562 pub lookup_index_total_entries: usize,
563 pub temporal_index_count: usize,
564 pub temporal_index_total_entries: usize,
565 pub pda_reverse_lookup_count: usize,
566 pub pda_reverse_lookup_total_entries: usize,
567 pub version_tracker_entries: usize,
568 pub pending_queue_stats: Option<PendingQueueStats>,
569 pub path_cache_size: usize,
570}
571
572#[derive(Debug, Clone, Default)]
573pub struct CleanupResult {
574 pub pending_updates_removed: usize,
575 pub temporal_entries_removed: usize,
576}
577
578#[derive(Debug, Clone)]
579pub struct CapacityWarning {
580 pub current_entries: usize,
581 pub max_entries: usize,
582 pub entries_over_limit: usize,
583}
584
585#[derive(Debug, Clone)]
586pub struct StateTableConfig {
587 pub max_entries: usize,
588 pub max_array_length: usize,
589}
590
591impl Default for StateTableConfig {
592 fn default() -> Self {
593 Self {
594 max_entries: DEFAULT_MAX_STATE_TABLE_ENTRIES,
595 max_array_length: DEFAULT_MAX_ARRAY_LENGTH,
596 }
597 }
598}
599
600#[derive(Debug)]
601pub struct VersionTracker {
602 cache: std::sync::Mutex<LruCache<String, (u64, u64)>>,
603}
604
605impl VersionTracker {
606 pub fn new() -> Self {
607 Self::with_capacity(DEFAULT_MAX_VERSION_TRACKER_ENTRIES)
608 }
609
610 pub fn with_capacity(capacity: usize) -> Self {
611 VersionTracker {
612 cache: std::sync::Mutex::new(LruCache::new(
613 NonZeroUsize::new(capacity).expect("capacity must be > 0"),
614 )),
615 }
616 }
617
618 fn make_key(primary_key: &Value, event_type: &str) -> String {
619 format!("{}:{}", primary_key, event_type)
620 }
621
622 pub fn get(&self, primary_key: &Value, event_type: &str) -> Option<(u64, u64)> {
623 let key = Self::make_key(primary_key, event_type);
624 self.cache.lock().unwrap().get(&key).copied()
625 }
626
627 pub fn insert(&self, primary_key: &Value, event_type: &str, slot: u64, ordering_value: u64) {
628 let key = Self::make_key(primary_key, event_type);
629 self.cache.lock().unwrap().put(key, (slot, ordering_value));
630 }
631
632 pub fn len(&self) -> usize {
633 self.cache.lock().unwrap().len()
634 }
635
636 pub fn is_empty(&self) -> bool {
637 self.cache.lock().unwrap().is_empty()
638 }
639}
640
641impl Default for VersionTracker {
642 fn default() -> Self {
643 Self::new()
644 }
645}
646
647#[derive(Debug)]
648pub struct StateTable {
649 pub data: DashMap<Value, Value>,
650 access_times: DashMap<Value, i64>,
651 pub lookup_indexes: HashMap<String, LookupIndex>,
652 pub temporal_indexes: HashMap<String, TemporalIndex>,
653 pub pda_reverse_lookups: HashMap<String, PdaReverseLookup>,
654 pub pending_updates: DashMap<String, Vec<PendingAccountUpdate>>,
655 pub pending_instruction_events: DashMap<String, Vec<PendingInstructionEvent>>,
656 version_tracker: VersionTracker,
657 instruction_dedup_cache: VersionTracker,
658 config: StateTableConfig,
659 #[cfg_attr(not(feature = "otel"), allow(dead_code))]
660 entity_name: String,
661}
662
663impl StateTable {
664 pub fn is_at_capacity(&self) -> bool {
665 self.data.len() >= self.config.max_entries
666 }
667
668 pub fn entries_over_limit(&self) -> usize {
669 self.data.len().saturating_sub(self.config.max_entries)
670 }
671
672 pub fn max_array_length(&self) -> usize {
673 self.config.max_array_length
674 }
675
676 fn touch(&self, key: &Value) {
677 let now = std::time::SystemTime::now()
678 .duration_since(std::time::UNIX_EPOCH)
679 .unwrap()
680 .as_secs() as i64;
681 self.access_times.insert(key.clone(), now);
682 }
683
684 fn evict_lru(&self, count: usize) -> usize {
685 if count == 0 || self.data.is_empty() {
686 return 0;
687 }
688
689 let mut entries: Vec<(Value, i64)> = self
690 .access_times
691 .iter()
692 .map(|entry| (entry.key().clone(), *entry.value()))
693 .collect();
694
695 entries.sort_by_key(|(_, ts)| *ts);
696
697 let to_evict: Vec<Value> = entries.iter().take(count).map(|(k, _)| k.clone()).collect();
698
699 let mut evicted = 0;
700 for key in to_evict {
701 self.data.remove(&key);
702 self.access_times.remove(&key);
703 evicted += 1;
704 }
705
706 #[cfg(feature = "otel")]
707 if evicted > 0 {
708 crate::vm_metrics::record_state_table_eviction(evicted as u64, &self.entity_name);
709 }
710
711 evicted
712 }
713
714 pub fn insert_with_eviction(&self, key: Value, value: Value) {
715 if self.data.len() >= self.config.max_entries && !self.data.contains_key(&key) {
716 #[cfg(feature = "otel")]
717 crate::vm_metrics::record_state_table_at_capacity(&self.entity_name);
718 let to_evict = (self.data.len() + 1).saturating_sub(self.config.max_entries);
719 self.evict_lru(to_evict.max(1));
720 }
721 self.data.insert(key.clone(), value);
722 self.touch(&key);
723 }
724
725 pub fn get_and_touch(&self, key: &Value) -> Option<Value> {
726 let result = self.data.get(key).map(|v| v.clone());
727 if result.is_some() {
728 self.touch(key);
729 }
730 result
731 }
732
733 pub fn is_fresh_update(
740 &self,
741 primary_key: &Value,
742 event_type: &str,
743 slot: u64,
744 ordering_value: u64,
745 ) -> bool {
746 let dominated = self
747 .version_tracker
748 .get(primary_key, event_type)
749 .map(|(last_slot, last_version)| (slot, ordering_value) <= (last_slot, last_version))
750 .unwrap_or(false);
751
752 if dominated {
753 return false;
754 }
755
756 self.version_tracker
757 .insert(primary_key, event_type, slot, ordering_value);
758 true
759 }
760
761 pub fn is_duplicate_instruction(
769 &self,
770 primary_key: &Value,
771 event_type: &str,
772 slot: u64,
773 txn_index: u64,
774 ) -> bool {
775 let is_duplicate = self
777 .instruction_dedup_cache
778 .get(primary_key, event_type)
779 .map(|(last_slot, last_txn_index)| slot == last_slot && txn_index == last_txn_index)
780 .unwrap_or(false);
781
782 if is_duplicate {
783 return true;
784 }
785
786 self.instruction_dedup_cache
788 .insert(primary_key, event_type, slot, txn_index);
789 false
790 }
791}
792
793impl VmContext {
794 pub fn new() -> Self {
795 let mut vm = VmContext {
796 registers: vec![Value::Null; 256],
797 states: HashMap::new(),
798 instructions_executed: 0,
799 cache_hits: 0,
800 path_cache: HashMap::new(),
801 pda_cache_hits: 0,
802 pda_cache_misses: 0,
803 pending_queue_size: 0,
804 current_context: None,
805 warnings: Vec::new(),
806 last_pda_lookup_miss: None,
807 last_pda_registered: None,
808 last_lookup_index_keys: Vec::new(),
809 };
810 vm.states.insert(
811 0,
812 StateTable {
813 data: DashMap::new(),
814 access_times: DashMap::new(),
815 lookup_indexes: HashMap::new(),
816 temporal_indexes: HashMap::new(),
817 pda_reverse_lookups: HashMap::new(),
818 pending_updates: DashMap::new(),
819 pending_instruction_events: DashMap::new(),
820 version_tracker: VersionTracker::new(),
821 instruction_dedup_cache: VersionTracker::with_capacity(
822 DEFAULT_MAX_INSTRUCTION_DEDUP_ENTRIES,
823 ),
824 config: StateTableConfig::default(),
825 entity_name: "default".to_string(),
826 },
827 );
828 vm
829 }
830
831 pub fn new_with_config(state_config: StateTableConfig) -> Self {
832 let mut vm = VmContext {
833 registers: vec![Value::Null; 256],
834 states: HashMap::new(),
835 instructions_executed: 0,
836 cache_hits: 0,
837 path_cache: HashMap::new(),
838 pda_cache_hits: 0,
839 pda_cache_misses: 0,
840 pending_queue_size: 0,
841 current_context: None,
842 warnings: Vec::new(),
843 last_pda_lookup_miss: None,
844 last_pda_registered: None,
845 last_lookup_index_keys: Vec::new(),
846 };
847 vm.states.insert(
848 0,
849 StateTable {
850 data: DashMap::new(),
851 access_times: DashMap::new(),
852 lookup_indexes: HashMap::new(),
853 temporal_indexes: HashMap::new(),
854 pda_reverse_lookups: HashMap::new(),
855 pending_updates: DashMap::new(),
856 pending_instruction_events: DashMap::new(),
857 version_tracker: VersionTracker::new(),
858 instruction_dedup_cache: VersionTracker::with_capacity(
859 DEFAULT_MAX_INSTRUCTION_DEDUP_ENTRIES,
860 ),
861 config: state_config,
862 entity_name: "default".to_string(),
863 },
864 );
865 vm
866 }
867
868 pub fn get_state_table_mut(&mut self, state_id: u32) -> Option<&mut StateTable> {
871 self.states.get_mut(&state_id)
872 }
873
874 pub fn registers_mut(&mut self) -> &mut Vec<RegisterValue> {
876 &mut self.registers
877 }
878
879 pub fn path_cache(&self) -> &HashMap<String, CompiledPath> {
881 &self.path_cache
882 }
883
884 pub fn current_context(&self) -> Option<&UpdateContext> {
886 self.current_context.as_ref()
887 }
888
889 fn add_warning(&mut self, msg: String) {
890 self.warnings.push(msg);
891 }
892
893 pub fn take_warnings(&mut self) -> Vec<String> {
894 std::mem::take(&mut self.warnings)
895 }
896
897 pub fn has_warnings(&self) -> bool {
898 !self.warnings.is_empty()
899 }
900
901 pub fn update_state_from_register(
902 &mut self,
903 state_id: u32,
904 key: Value,
905 register: Register,
906 ) -> Result<()> {
907 let state = self.states.get(&state_id).ok_or("State table not found")?;
908 let value = self.registers[register].clone();
909 state.insert_with_eviction(key, value);
910 Ok(())
911 }
912
913 fn reset_registers(&mut self) {
914 for reg in &mut self.registers {
915 *reg = Value::Null;
916 }
917 }
918
919 pub fn extract_partial_state(
921 &self,
922 state_reg: Register,
923 dirty_fields: &HashSet<String>,
924 ) -> Result<Value> {
925 let full_state = &self.registers[state_reg];
926
927 if dirty_fields.is_empty() {
928 return Ok(json!({}));
929 }
930
931 let mut partial = serde_json::Map::new();
932
933 for path in dirty_fields {
934 let segments: Vec<&str> = path.split('.').collect();
935
936 let mut current = full_state;
937 let mut found = true;
938
939 for segment in &segments {
940 match current.get(segment) {
941 Some(v) => current = v,
942 None => {
943 found = false;
944 break;
945 }
946 }
947 }
948
949 if !found {
950 continue;
951 }
952
953 let mut target = &mut partial;
954 for (i, segment) in segments.iter().enumerate() {
955 if i == segments.len() - 1 {
956 target.insert(segment.to_string(), current.clone());
957 } else {
958 target
959 .entry(segment.to_string())
960 .or_insert_with(|| json!({}));
961 target = target
962 .get_mut(*segment)
963 .and_then(|v| v.as_object_mut())
964 .ok_or("Failed to build nested structure")?;
965 }
966 }
967 }
968
969 Ok(Value::Object(partial))
970 }
971
972 pub fn extract_partial_state_with_tracker(
976 &self,
977 state_reg: Register,
978 tracker: &DirtyTracker,
979 ) -> Result<Value> {
980 let full_state = &self.registers[state_reg];
981
982 if tracker.is_empty() {
983 return Ok(json!({}));
984 }
985
986 let mut partial = serde_json::Map::new();
987
988 for (path, change) in tracker.iter() {
989 let segments: Vec<&str> = path.split('.').collect();
990
991 let value_to_insert = match change {
992 FieldChange::Replaced => {
993 let mut current = full_state;
994 let mut found = true;
995
996 for segment in &segments {
997 match current.get(*segment) {
998 Some(v) => current = v,
999 None => {
1000 found = false;
1001 break;
1002 }
1003 }
1004 }
1005
1006 if !found {
1007 continue;
1008 }
1009 current.clone()
1010 }
1011 FieldChange::Appended(values) => Value::Array(values.clone()),
1012 };
1013
1014 let mut target = &mut partial;
1015 for (i, segment) in segments.iter().enumerate() {
1016 if i == segments.len() - 1 {
1017 target.insert(segment.to_string(), value_to_insert.clone());
1018 } else {
1019 target
1020 .entry(segment.to_string())
1021 .or_insert_with(|| json!({}));
1022 target = target
1023 .get_mut(*segment)
1024 .and_then(|v| v.as_object_mut())
1025 .ok_or("Failed to build nested structure")?;
1026 }
1027 }
1028 }
1029
1030 Ok(Value::Object(partial))
1031 }
1032
1033 fn get_compiled_path(&mut self, path: &str) -> CompiledPath {
1034 if let Some(compiled) = self.path_cache.get(path) {
1035 self.cache_hits += 1;
1036 #[cfg(feature = "otel")]
1037 crate::vm_metrics::record_path_cache_hit();
1038 return compiled.clone();
1039 }
1040 #[cfg(feature = "otel")]
1041 crate::vm_metrics::record_path_cache_miss();
1042 let compiled = CompiledPath::new(path);
1043 self.path_cache.insert(path.to_string(), compiled.clone());
1044 compiled
1045 }
1046
1047 #[cfg_attr(feature = "otel", instrument(
1049 name = "vm.process_event",
1050 skip(self, bytecode, event_value, log),
1051 level = "info",
1052 fields(
1053 event_type = %event_type,
1054 slot = context.as_ref().and_then(|c| c.slot),
1055 )
1056 ))]
1057 pub fn process_event(
1058 &mut self,
1059 bytecode: &MultiEntityBytecode,
1060 event_value: Value,
1061 event_type: &str,
1062 context: Option<&UpdateContext>,
1063 mut log: Option<&mut crate::canonical_log::CanonicalLog>,
1064 ) -> Result<Vec<Mutation>> {
1065 self.current_context = context.cloned();
1066
1067 let mut event_value = event_value;
1068 if let Some(ctx) = context {
1069 if let Some(obj) = event_value.as_object_mut() {
1070 obj.insert("__update_context".to_string(), ctx.to_value());
1071 }
1072 }
1073
1074 let mut all_mutations = Vec::new();
1075
1076 if let Some(entity_names) = bytecode.event_routing.get(event_type) {
1077 for entity_name in entity_names {
1078 if let Some(entity_bytecode) = bytecode.entities.get(entity_name) {
1079 if let Some(handler) = entity_bytecode.handlers.get(event_type) {
1080 if let Some(ref mut log) = log {
1081 log.set("entity", entity_name.clone());
1082 log.inc("handlers", 1);
1083 }
1084
1085 let opcodes_before = self.instructions_executed;
1086 let cache_before = self.cache_hits;
1087 let pda_hits_before = self.pda_cache_hits;
1088 let pda_misses_before = self.pda_cache_misses;
1089
1090 let mutations = self.execute_handler(
1091 handler,
1092 &event_value,
1093 event_type,
1094 entity_bytecode.state_id,
1095 entity_name,
1096 entity_bytecode.computed_fields_evaluator.as_ref(),
1097 )?;
1098
1099 if let Some(ref mut log) = log {
1100 log.inc(
1101 "opcodes",
1102 (self.instructions_executed - opcodes_before) as i64,
1103 );
1104 log.inc("cache_hits", (self.cache_hits - cache_before) as i64);
1105 log.inc("pda_hits", (self.pda_cache_hits - pda_hits_before) as i64);
1106 log.inc(
1107 "pda_misses",
1108 (self.pda_cache_misses - pda_misses_before) as i64,
1109 );
1110 }
1111
1112 if mutations.is_empty() {
1113 if let Some(missed_pda) = self.take_last_pda_lookup_miss() {
1114 if event_type.ends_with("IxState") {
1115 let slot = context.and_then(|c| c.slot).unwrap_or(0);
1116 let signature = context
1117 .and_then(|c| c.signature.clone())
1118 .unwrap_or_default();
1119 let _ = self.queue_instruction_event(
1120 entity_bytecode.state_id,
1121 QueuedInstructionEvent {
1122 pda_address: missed_pda,
1123 event_type: event_type.to_string(),
1124 event_data: event_value.clone(),
1125 slot,
1126 signature,
1127 },
1128 );
1129 }
1130 }
1131 }
1132
1133 all_mutations.extend(mutations);
1134
1135 if let Some(registered_pda) = self.take_last_pda_registered() {
1136 let pending_events = self.flush_pending_instruction_events(
1137 entity_bytecode.state_id,
1138 ®istered_pda,
1139 );
1140 for pending in pending_events {
1141 if let Some(pending_handler) =
1142 entity_bytecode.handlers.get(&pending.event_type)
1143 {
1144 if let Ok(reprocessed_mutations) = self.execute_handler(
1145 pending_handler,
1146 &pending.event_data,
1147 &pending.event_type,
1148 entity_bytecode.state_id,
1149 entity_name,
1150 entity_bytecode.computed_fields_evaluator.as_ref(),
1151 ) {
1152 all_mutations.extend(reprocessed_mutations);
1153 }
1154 }
1155 }
1156 }
1157
1158 let lookup_keys = self.take_last_lookup_index_keys();
1159 for lookup_key in lookup_keys {
1160 if let Ok(pending_updates) =
1161 self.flush_pending_updates(entity_bytecode.state_id, &lookup_key)
1162 {
1163 for pending in pending_updates {
1164 if let Some(pending_handler) =
1165 entity_bytecode.handlers.get(&pending.account_type)
1166 {
1167 self.current_context = Some(UpdateContext::new_account(
1168 pending.slot,
1169 pending.signature.clone(),
1170 pending.write_version,
1171 ));
1172 if let Ok(reprocessed) = self.execute_handler(
1173 pending_handler,
1174 &pending.account_data,
1175 &pending.account_type,
1176 entity_bytecode.state_id,
1177 entity_name,
1178 entity_bytecode.computed_fields_evaluator.as_ref(),
1179 ) {
1180 all_mutations.extend(reprocessed);
1181 }
1182 }
1183 }
1184 }
1185 }
1186 } else if let Some(ref mut log) = log {
1187 log.set("skip_reason", "no_handler");
1188 }
1189 } else if let Some(ref mut log) = log {
1190 log.set("skip_reason", "entity_not_found");
1191 }
1192 }
1193 } else if let Some(ref mut log) = log {
1194 log.set("skip_reason", "no_event_routing");
1195 }
1196
1197 if let Some(log) = log {
1198 log.set("mutations", all_mutations.len() as i64);
1199 if let Some(first) = all_mutations.first() {
1200 if let Some(key_str) = first.key.as_str() {
1201 log.set("primary_key", key_str);
1202 } else if let Some(key_num) = first.key.as_u64() {
1203 log.set("primary_key", key_num as i64);
1204 }
1205 }
1206 if let Some(state) = self.states.get(&0) {
1207 log.set("state_table_size", state.data.len() as i64);
1208 }
1209
1210 let warnings = self.take_warnings();
1211 if !warnings.is_empty() {
1212 log.set("warnings", warnings.len() as i64);
1213 log.set(
1214 "warning_messages",
1215 Value::Array(warnings.into_iter().map(Value::String).collect()),
1216 );
1217 log.set_level(crate::canonical_log::LogLevel::Warn);
1218 }
1219 } else {
1220 self.warnings.clear();
1221 }
1222
1223 Ok(all_mutations)
1224 }
1225
1226 pub fn process_any(
1227 &mut self,
1228 bytecode: &MultiEntityBytecode,
1229 any: prost_types::Any,
1230 ) -> Result<Vec<Mutation>> {
1231 let (event_value, event_type) = bytecode.proto_router.decode(any)?;
1232 self.process_event(bytecode, event_value, &event_type, None, None)
1233 }
1234
1235 #[cfg_attr(feature = "otel", instrument(
1236 name = "vm.execute_handler",
1237 skip(self, handler, event_value, entity_evaluator),
1238 level = "debug",
1239 fields(
1240 event_type = %event_type,
1241 handler_opcodes = handler.len(),
1242 )
1243 ))]
1244 #[allow(clippy::type_complexity)]
1245 fn execute_handler(
1246 &mut self,
1247 handler: &[OpCode],
1248 event_value: &Value,
1249 event_type: &str,
1250 override_state_id: u32,
1251 entity_name: &str,
1252 entity_evaluator: Option<&Box<dyn Fn(&mut Value) -> Result<()> + Send + Sync>>,
1253 ) -> Result<Vec<Mutation>> {
1254 self.reset_registers();
1255 self.last_pda_lookup_miss = None;
1256
1257 let mut pc: usize = 0;
1258 let mut output = Vec::new();
1259 let mut dirty_tracker = DirtyTracker::new();
1260
1261 while pc < handler.len() {
1262 match &handler[pc] {
1263 OpCode::LoadEventField {
1264 path,
1265 dest,
1266 default,
1267 } => {
1268 let value = self.load_field(event_value, path, default.as_ref())?;
1269 self.registers[*dest] = value;
1270 pc += 1;
1271 }
1272 OpCode::LoadConstant { value, dest } => {
1273 self.registers[*dest] = value.clone();
1274 pc += 1;
1275 }
1276 OpCode::CopyRegister { source, dest } => {
1277 self.registers[*dest] = self.registers[*source].clone();
1278 pc += 1;
1279 }
1280 OpCode::CopyRegisterIfNull { source, dest } => {
1281 if self.registers[*dest].is_null() {
1282 self.registers[*dest] = self.registers[*source].clone();
1283 }
1284 pc += 1;
1285 }
1286 OpCode::GetEventType { dest } => {
1287 self.registers[*dest] = json!(event_type);
1288 pc += 1;
1289 }
1290 OpCode::CreateObject { dest } => {
1291 self.registers[*dest] = json!({});
1292 pc += 1;
1293 }
1294 OpCode::SetField {
1295 object,
1296 path,
1297 value,
1298 } => {
1299 self.set_field_auto_vivify(*object, path, *value)?;
1300 dirty_tracker.mark_replaced(path);
1301 pc += 1;
1302 }
1303 OpCode::SetFields { object, fields } => {
1304 for (path, value_reg) in fields {
1305 self.set_field_auto_vivify(*object, path, *value_reg)?;
1306 dirty_tracker.mark_replaced(path);
1307 }
1308 pc += 1;
1309 }
1310 OpCode::GetField { object, path, dest } => {
1311 let value = self.get_field(*object, path)?;
1312 self.registers[*dest] = value;
1313 pc += 1;
1314 }
1315 OpCode::ReadOrInitState {
1316 state_id: _,
1317 key,
1318 default,
1319 dest,
1320 } => {
1321 let actual_state_id = override_state_id;
1322 let entity_name_owned = entity_name.to_string();
1323 self.states
1324 .entry(actual_state_id)
1325 .or_insert_with(|| StateTable {
1326 data: DashMap::new(),
1327 access_times: DashMap::new(),
1328 lookup_indexes: HashMap::new(),
1329 temporal_indexes: HashMap::new(),
1330 pda_reverse_lookups: HashMap::new(),
1331 pending_updates: DashMap::new(),
1332 pending_instruction_events: DashMap::new(),
1333 version_tracker: VersionTracker::new(),
1334 instruction_dedup_cache: VersionTracker::with_capacity(
1335 DEFAULT_MAX_INSTRUCTION_DEDUP_ENTRIES,
1336 ),
1337 config: StateTableConfig::default(),
1338 entity_name: entity_name_owned,
1339 });
1340 let key_value = self.registers[*key].clone();
1341 let warn_null_key = key_value.is_null()
1342 && event_type.ends_with("State")
1343 && !event_type.ends_with("IxState");
1344
1345 if warn_null_key {
1346 self.add_warning(format!(
1347 "ReadOrInitState: key register {} is NULL for account state, event_type={}",
1348 key, event_type
1349 ));
1350 }
1351
1352 let state = self
1353 .states
1354 .get(&actual_state_id)
1355 .ok_or("State table not found")?;
1356
1357 if !key_value.is_null() {
1358 if let Some(ctx) = &self.current_context {
1359 if ctx.is_account_update() {
1361 if let (Some(slot), Some(write_version)) =
1362 (ctx.slot, ctx.write_version)
1363 {
1364 if !state.is_fresh_update(
1365 &key_value,
1366 event_type,
1367 slot,
1368 write_version,
1369 ) {
1370 self.add_warning(format!(
1371 "Stale account update skipped: slot={}, write_version={}",
1372 slot, write_version
1373 ));
1374 return Ok(Vec::new());
1375 }
1376 }
1377 }
1378 else if ctx.is_instruction_update() {
1380 if let (Some(slot), Some(txn_index)) = (ctx.slot, ctx.txn_index) {
1381 if state.is_duplicate_instruction(
1382 &key_value, event_type, slot, txn_index,
1383 ) {
1384 self.add_warning(format!(
1385 "Duplicate instruction skipped: slot={}, txn_index={}",
1386 slot, txn_index
1387 ));
1388 return Ok(Vec::new());
1389 }
1390 }
1391 }
1392 }
1393 }
1394 let value = state
1395 .get_and_touch(&key_value)
1396 .unwrap_or_else(|| default.clone());
1397
1398 self.registers[*dest] = value;
1399 pc += 1;
1400 }
1401 OpCode::UpdateState {
1402 state_id: _,
1403 key,
1404 value,
1405 } => {
1406 let actual_state_id = override_state_id;
1407 let state = self
1408 .states
1409 .get(&actual_state_id)
1410 .ok_or("State table not found")?;
1411 let key_value = self.registers[*key].clone();
1412 let value_data = self.registers[*value].clone();
1413
1414 state.insert_with_eviction(key_value, value_data);
1415 pc += 1;
1416 }
1417 OpCode::AppendToArray {
1418 object,
1419 path,
1420 value,
1421 } => {
1422 let appended_value = self.registers[*value].clone();
1423 let max_len = self
1424 .states
1425 .get(&override_state_id)
1426 .map(|s| s.max_array_length())
1427 .unwrap_or(DEFAULT_MAX_ARRAY_LENGTH);
1428 self.append_to_array(*object, path, *value, max_len)?;
1429 dirty_tracker.mark_appended(path, appended_value);
1430 pc += 1;
1431 }
1432 OpCode::GetCurrentTimestamp { dest } => {
1433 let timestamp = std::time::SystemTime::now()
1434 .duration_since(std::time::UNIX_EPOCH)
1435 .unwrap()
1436 .as_secs() as i64;
1437 self.registers[*dest] = json!(timestamp);
1438 pc += 1;
1439 }
1440 OpCode::CreateEvent { dest, event_value } => {
1441 let timestamp = std::time::SystemTime::now()
1442 .duration_since(std::time::UNIX_EPOCH)
1443 .unwrap()
1444 .as_secs() as i64;
1445
1446 let mut event_data = self.registers[*event_value].clone();
1448 if let Some(obj) = event_data.as_object_mut() {
1449 obj.remove("__update_context");
1450 }
1451
1452 let mut event = serde_json::Map::new();
1454 event.insert("timestamp".to_string(), json!(timestamp));
1455 event.insert("data".to_string(), event_data);
1456
1457 if let Some(ref ctx) = self.current_context {
1459 if let Some(slot) = ctx.slot {
1460 event.insert("slot".to_string(), json!(slot));
1461 }
1462 if let Some(ref signature) = ctx.signature {
1463 event.insert("signature".to_string(), json!(signature));
1464 }
1465 }
1466
1467 self.registers[*dest] = Value::Object(event);
1468 pc += 1;
1469 }
1470 OpCode::CreateCapture {
1471 dest,
1472 capture_value,
1473 } => {
1474 let timestamp = std::time::SystemTime::now()
1475 .duration_since(std::time::UNIX_EPOCH)
1476 .unwrap()
1477 .as_secs() as i64;
1478
1479 let capture_data = self.registers[*capture_value].clone();
1481
1482 let account_address = event_value
1484 .get("__account_address")
1485 .and_then(|v| v.as_str())
1486 .unwrap_or("")
1487 .to_string();
1488
1489 let mut capture = serde_json::Map::new();
1491 capture.insert("timestamp".to_string(), json!(timestamp));
1492 capture.insert("account_address".to_string(), json!(account_address));
1493 capture.insert("data".to_string(), capture_data);
1494
1495 if let Some(ref ctx) = self.current_context {
1497 if let Some(slot) = ctx.slot {
1498 capture.insert("slot".to_string(), json!(slot));
1499 }
1500 if let Some(ref signature) = ctx.signature {
1501 capture.insert("signature".to_string(), json!(signature));
1502 }
1503 }
1504
1505 self.registers[*dest] = Value::Object(capture);
1506 pc += 1;
1507 }
1508 OpCode::Transform {
1509 source,
1510 dest,
1511 transformation,
1512 } => {
1513 if source == dest {
1514 self.transform_in_place(*source, transformation)?;
1515 } else {
1516 let source_value = &self.registers[*source];
1517 let value = self.apply_transformation(source_value, transformation)?;
1518 self.registers[*dest] = value;
1519 }
1520 pc += 1;
1521 }
1522 OpCode::EmitMutation {
1523 entity_name,
1524 key,
1525 state,
1526 } => {
1527 let primary_key = self.registers[*key].clone();
1528
1529 if primary_key.is_null() || dirty_tracker.is_empty() {
1530 let reason = if dirty_tracker.is_empty() {
1531 "no_fields_modified"
1532 } else {
1533 "null_primary_key"
1534 };
1535 self.add_warning(format!(
1536 "Skipping mutation for entity '{}': {} (dirty_fields={})",
1537 entity_name,
1538 reason,
1539 dirty_tracker.len()
1540 ));
1541 } else {
1542 let patch =
1543 self.extract_partial_state_with_tracker(*state, &dirty_tracker)?;
1544
1545 let append = dirty_tracker.appended_paths();
1546 let mutation = Mutation {
1547 export: entity_name.clone(),
1548 key: primary_key,
1549 patch,
1550 append,
1551 };
1552 output.push(mutation);
1553 }
1554 pc += 1;
1555 }
1556 OpCode::SetFieldIfNull {
1557 object,
1558 path,
1559 value,
1560 } => {
1561 let was_set = self.set_field_if_null(*object, path, *value)?;
1562 if was_set {
1563 dirty_tracker.mark_replaced(path);
1564 }
1565 pc += 1;
1566 }
1567 OpCode::SetFieldMax {
1568 object,
1569 path,
1570 value,
1571 } => {
1572 let was_updated = self.set_field_max(*object, path, *value)?;
1573 if was_updated {
1574 dirty_tracker.mark_replaced(path);
1575 }
1576 pc += 1;
1577 }
1578 OpCode::UpdateTemporalIndex {
1579 state_id: _,
1580 index_name,
1581 lookup_value,
1582 primary_key,
1583 timestamp,
1584 } => {
1585 let actual_state_id = override_state_id;
1586 let state = self
1587 .states
1588 .get_mut(&actual_state_id)
1589 .ok_or("State table not found")?;
1590 let index = state
1591 .temporal_indexes
1592 .entry(index_name.clone())
1593 .or_insert_with(TemporalIndex::new);
1594
1595 let lookup_val = self.registers[*lookup_value].clone();
1596 let pk_val = self.registers[*primary_key].clone();
1597 let ts_val = if let Some(val) = self.registers[*timestamp].as_i64() {
1598 val
1599 } else if let Some(val) = self.registers[*timestamp].as_u64() {
1600 val as i64
1601 } else {
1602 return Err(format!(
1603 "Timestamp must be a number (i64 or u64), got: {:?}",
1604 self.registers[*timestamp]
1605 )
1606 .into());
1607 };
1608
1609 index.insert(lookup_val, pk_val, ts_val);
1610 pc += 1;
1611 }
1612 OpCode::LookupTemporalIndex {
1613 state_id: _,
1614 index_name,
1615 lookup_value,
1616 timestamp,
1617 dest,
1618 } => {
1619 let actual_state_id = override_state_id;
1620 let state = self
1621 .states
1622 .get(&actual_state_id)
1623 .ok_or("State table not found")?;
1624 let lookup_val = &self.registers[*lookup_value];
1625
1626 let result = if self.registers[*timestamp].is_null() {
1627 if let Some(index) = state.temporal_indexes.get(index_name) {
1628 index.lookup_latest(lookup_val).unwrap_or(Value::Null)
1629 } else {
1630 Value::Null
1631 }
1632 } else {
1633 let ts_val = if let Some(val) = self.registers[*timestamp].as_i64() {
1634 val
1635 } else if let Some(val) = self.registers[*timestamp].as_u64() {
1636 val as i64
1637 } else {
1638 return Err(format!(
1639 "Timestamp must be a number (i64 or u64), got: {:?}",
1640 self.registers[*timestamp]
1641 )
1642 .into());
1643 };
1644
1645 if let Some(index) = state.temporal_indexes.get(index_name) {
1646 index.lookup(lookup_val, ts_val).unwrap_or(Value::Null)
1647 } else {
1648 Value::Null
1649 }
1650 };
1651
1652 self.registers[*dest] = result;
1653 pc += 1;
1654 }
1655 OpCode::UpdateLookupIndex {
1656 state_id: _,
1657 index_name,
1658 lookup_value,
1659 primary_key,
1660 } => {
1661 let actual_state_id = override_state_id;
1662 let state = self
1663 .states
1664 .get_mut(&actual_state_id)
1665 .ok_or("State table not found")?;
1666 let index = state
1667 .lookup_indexes
1668 .entry(index_name.clone())
1669 .or_insert_with(LookupIndex::new);
1670
1671 let lookup_val = self.registers[*lookup_value].clone();
1672 let pk_val = self.registers[*primary_key].clone();
1673
1674 index.insert(lookup_val.clone(), pk_val);
1675
1676 if let Some(key_str) = lookup_val.as_str() {
1678 self.last_lookup_index_keys.push(key_str.to_string());
1679 }
1680
1681 pc += 1;
1682 }
1683 OpCode::LookupIndex {
1684 state_id: _,
1685 index_name,
1686 lookup_value,
1687 dest,
1688 } => {
1689 let actual_state_id = override_state_id;
1690 let lookup_val = self.registers[*lookup_value].clone();
1691
1692 let result = {
1693 let state = self
1694 .states
1695 .get(&actual_state_id)
1696 .ok_or("State table not found")?;
1697
1698 if let Some(index) = state.lookup_indexes.get(index_name) {
1699 let found = index.lookup(&lookup_val).unwrap_or(Value::Null);
1700 #[cfg(feature = "otel")]
1701 if found.is_null() {
1702 crate::vm_metrics::record_lookup_index_miss(index_name);
1703 } else {
1704 crate::vm_metrics::record_lookup_index_hit(index_name);
1705 }
1706 found
1707 } else {
1708 Value::Null
1709 }
1710 };
1711
1712 let final_result = if result.is_null() {
1713 if let Some(pda_str) = lookup_val.as_str() {
1714 let state = self
1715 .states
1716 .get_mut(&actual_state_id)
1717 .ok_or("State table not found")?;
1718
1719 if let Some(pda_lookup) =
1720 state.pda_reverse_lookups.get_mut("default_pda_lookup")
1721 {
1722 if let Some(resolved) = pda_lookup.lookup(pda_str) {
1723 Value::String(resolved)
1724 } else {
1725 self.last_pda_lookup_miss = Some(pda_str.to_string());
1726 Value::Null
1727 }
1728 } else {
1729 self.last_pda_lookup_miss = Some(pda_str.to_string());
1730 Value::Null
1731 }
1732 } else {
1733 Value::Null
1734 }
1735 } else {
1736 result
1737 };
1738
1739 self.registers[*dest] = final_result;
1740 pc += 1;
1741 }
1742 OpCode::SetFieldSum {
1743 object,
1744 path,
1745 value,
1746 } => {
1747 let was_updated = self.set_field_sum(*object, path, *value)?;
1748 if was_updated {
1749 dirty_tracker.mark_replaced(path);
1750 }
1751 pc += 1;
1752 }
1753 OpCode::SetFieldIncrement { object, path } => {
1754 let was_updated = self.set_field_increment(*object, path)?;
1755 if was_updated {
1756 dirty_tracker.mark_replaced(path);
1757 }
1758 pc += 1;
1759 }
1760 OpCode::SetFieldMin {
1761 object,
1762 path,
1763 value,
1764 } => {
1765 let was_updated = self.set_field_min(*object, path, *value)?;
1766 if was_updated {
1767 dirty_tracker.mark_replaced(path);
1768 }
1769 pc += 1;
1770 }
1771 OpCode::AddToUniqueSet {
1772 state_id: _,
1773 set_name,
1774 value,
1775 count_object,
1776 count_path,
1777 } => {
1778 let value_to_add = self.registers[*value].clone();
1779
1780 let set_field_path = format!("__unique_set:{}", set_name);
1783
1784 let mut set: HashSet<Value> =
1786 if let Ok(existing) = self.get_field(*count_object, &set_field_path) {
1787 if !existing.is_null() {
1788 serde_json::from_value(existing).unwrap_or_default()
1789 } else {
1790 HashSet::new()
1791 }
1792 } else {
1793 HashSet::new()
1794 };
1795
1796 let was_new = set.insert(value_to_add);
1798
1799 let set_as_vec: Vec<Value> = set.iter().cloned().collect();
1801 self.registers[100] = serde_json::to_value(set_as_vec)?;
1802 self.set_field_auto_vivify(*count_object, &set_field_path, 100)?;
1803
1804 if was_new {
1806 self.registers[100] = Value::Number(serde_json::Number::from(set.len()));
1807 self.set_field_auto_vivify(*count_object, count_path, 100)?;
1808 dirty_tracker.mark_replaced(count_path);
1809 }
1810
1811 pc += 1;
1812 }
1813 OpCode::ConditionalSetField {
1814 object,
1815 path,
1816 value,
1817 condition_field,
1818 condition_op,
1819 condition_value,
1820 } => {
1821 let field_value = self.load_field(event_value, condition_field, None)?;
1822 let condition_met =
1823 self.evaluate_comparison(&field_value, condition_op, condition_value)?;
1824
1825 if condition_met {
1826 self.set_field_auto_vivify(*object, path, *value)?;
1827 dirty_tracker.mark_replaced(path);
1828 }
1829 pc += 1;
1830 }
1831 OpCode::ConditionalIncrement {
1832 object,
1833 path,
1834 condition_field,
1835 condition_op,
1836 condition_value,
1837 } => {
1838 let field_value = self.load_field(event_value, condition_field, None)?;
1839 let condition_met =
1840 self.evaluate_comparison(&field_value, condition_op, condition_value)?;
1841
1842 if condition_met {
1843 let was_updated = self.set_field_increment(*object, path)?;
1844 if was_updated {
1845 dirty_tracker.mark_replaced(path);
1846 }
1847 }
1848 pc += 1;
1849 }
1850 OpCode::EvaluateComputedFields {
1851 state,
1852 computed_paths,
1853 } => {
1854 if let Some(evaluator) = entity_evaluator {
1855 let old_values: Vec<_> = computed_paths
1856 .iter()
1857 .map(|path| Self::get_value_at_path(&self.registers[*state], path))
1858 .collect();
1859
1860 let state_value = &mut self.registers[*state];
1861 let eval_result = evaluator(state_value);
1862
1863 if eval_result.is_ok() {
1864 for (path, old_value) in computed_paths.iter().zip(old_values.iter()) {
1865 let new_value =
1866 Self::get_value_at_path(&self.registers[*state], path);
1867
1868 if new_value != *old_value {
1869 dirty_tracker.mark_replaced(path);
1870 }
1871 }
1872 }
1873 }
1874 pc += 1;
1875 }
1876 OpCode::UpdatePdaReverseLookup {
1877 state_id: _,
1878 lookup_name,
1879 pda_address,
1880 primary_key,
1881 } => {
1882 let actual_state_id = override_state_id;
1883 let state = self
1884 .states
1885 .get_mut(&actual_state_id)
1886 .ok_or("State table not found")?;
1887
1888 let pda_val = self.registers[*pda_address].clone();
1889 let pk_val = self.registers[*primary_key].clone();
1890
1891 if let (Some(pda_str), Some(pk_str)) = (pda_val.as_str(), pk_val.as_str()) {
1892 let pda_lookup = state
1893 .pda_reverse_lookups
1894 .entry(lookup_name.clone())
1895 .or_insert_with(|| {
1896 PdaReverseLookup::new(DEFAULT_MAX_PDA_REVERSE_LOOKUP_ENTRIES)
1897 });
1898
1899 pda_lookup.insert(pda_str.to_string(), pk_str.to_string());
1900 self.last_pda_registered = Some(pda_str.to_string());
1901 } else if !pk_val.is_null() {
1902 if let Some(pk_num) = pk_val.as_u64() {
1903 if let Some(pda_str) = pda_val.as_str() {
1904 let pda_lookup = state
1905 .pda_reverse_lookups
1906 .entry(lookup_name.clone())
1907 .or_insert_with(|| {
1908 PdaReverseLookup::new(
1909 DEFAULT_MAX_PDA_REVERSE_LOOKUP_ENTRIES,
1910 )
1911 });
1912
1913 pda_lookup.insert(pda_str.to_string(), pk_num.to_string());
1914 self.last_pda_registered = Some(pda_str.to_string());
1915 }
1916 }
1917 }
1918
1919 pc += 1;
1920 }
1921 }
1922
1923 self.instructions_executed += 1;
1924 }
1925
1926 Ok(output)
1927 }
1928
1929 fn load_field(
1930 &self,
1931 event_value: &Value,
1932 path: &FieldPath,
1933 default: Option<&Value>,
1934 ) -> Result<Value> {
1935 if path.segments.is_empty() {
1936 if let Some(obj) = event_value.as_object() {
1937 let filtered: serde_json::Map<String, Value> = obj
1938 .iter()
1939 .filter(|(k, _)| !k.starts_with("__"))
1940 .map(|(k, v)| (k.clone(), v.clone()))
1941 .collect();
1942 return Ok(Value::Object(filtered));
1943 }
1944 return Ok(event_value.clone());
1945 }
1946
1947 let mut current = event_value;
1948 for segment in path.segments.iter() {
1949 current = match current.get(segment) {
1950 Some(v) => v,
1951 None => return Ok(default.cloned().unwrap_or(Value::Null)),
1952 };
1953 }
1954
1955 Ok(current.clone())
1956 }
1957
1958 fn get_value_at_path(value: &Value, path: &str) -> Option<Value> {
1959 let mut current = value;
1960 for segment in path.split('.') {
1961 current = current.get(segment)?;
1962 }
1963 Some(current.clone())
1964 }
1965
1966 fn set_field_auto_vivify(
1967 &mut self,
1968 object_reg: Register,
1969 path: &str,
1970 value_reg: Register,
1971 ) -> Result<()> {
1972 let compiled = self.get_compiled_path(path);
1973 let segments = compiled.segments();
1974 let value = self.registers[value_reg].clone();
1975
1976 if !self.registers[object_reg].is_object() {
1977 self.registers[object_reg] = json!({});
1978 }
1979
1980 let obj = self.registers[object_reg]
1981 .as_object_mut()
1982 .ok_or("Not an object")?;
1983
1984 let mut current = obj;
1985 for (i, segment) in segments.iter().enumerate() {
1986 if i == segments.len() - 1 {
1987 current.insert(segment.to_string(), value);
1988 return Ok(());
1989 } else {
1990 current
1991 .entry(segment.to_string())
1992 .or_insert_with(|| json!({}));
1993 current = current
1994 .get_mut(segment)
1995 .and_then(|v| v.as_object_mut())
1996 .ok_or("Path collision: expected object")?;
1997 }
1998 }
1999
2000 Ok(())
2001 }
2002
2003 fn set_field_if_null(
2004 &mut self,
2005 object_reg: Register,
2006 path: &str,
2007 value_reg: Register,
2008 ) -> Result<bool> {
2009 let compiled = self.get_compiled_path(path);
2010 let segments = compiled.segments();
2011 let value = self.registers[value_reg].clone();
2012
2013 if value.is_null() {
2017 return Ok(false);
2018 }
2019
2020 if !self.registers[object_reg].is_object() {
2021 self.registers[object_reg] = json!({});
2022 }
2023
2024 let obj = self.registers[object_reg]
2025 .as_object_mut()
2026 .ok_or("Not an object")?;
2027
2028 let mut current = obj;
2029 for (i, segment) in segments.iter().enumerate() {
2030 if i == segments.len() - 1 {
2031 if !current.contains_key(segment) || current.get(segment).unwrap().is_null() {
2032 current.insert(segment.to_string(), value);
2033 return Ok(true);
2034 }
2035 return Ok(false);
2036 } else {
2037 current
2038 .entry(segment.to_string())
2039 .or_insert_with(|| json!({}));
2040 current = current
2041 .get_mut(segment)
2042 .and_then(|v| v.as_object_mut())
2043 .ok_or("Path collision: expected object")?;
2044 }
2045 }
2046
2047 Ok(false)
2048 }
2049
2050 fn set_field_max(
2051 &mut self,
2052 object_reg: Register,
2053 path: &str,
2054 value_reg: Register,
2055 ) -> Result<bool> {
2056 let compiled = self.get_compiled_path(path);
2057 let segments = compiled.segments();
2058 let new_value = self.registers[value_reg].clone();
2059
2060 if !self.registers[object_reg].is_object() {
2061 self.registers[object_reg] = json!({});
2062 }
2063
2064 let obj = self.registers[object_reg]
2065 .as_object_mut()
2066 .ok_or("Not an object")?;
2067
2068 let mut current = obj;
2069 for (i, segment) in segments.iter().enumerate() {
2070 if i == segments.len() - 1 {
2071 let should_update = if let Some(current_value) = current.get(segment) {
2072 if current_value.is_null() {
2073 true
2074 } else {
2075 match (current_value.as_i64(), new_value.as_i64()) {
2076 (Some(current_val), Some(new_val)) => new_val > current_val,
2077 (Some(current_val), None) if new_value.as_u64().is_some() => {
2078 new_value.as_u64().unwrap() as i64 > current_val
2079 }
2080 (None, Some(new_val)) if current_value.as_u64().is_some() => {
2081 new_val > current_value.as_u64().unwrap() as i64
2082 }
2083 (None, None) => match (current_value.as_u64(), new_value.as_u64()) {
2084 (Some(current_val), Some(new_val)) => new_val > current_val,
2085 _ => match (current_value.as_f64(), new_value.as_f64()) {
2086 (Some(current_val), Some(new_val)) => new_val > current_val,
2087 _ => false,
2088 },
2089 },
2090 _ => false,
2091 }
2092 }
2093 } else {
2094 true
2095 };
2096
2097 if should_update {
2098 current.insert(segment.to_string(), new_value);
2099 return Ok(true);
2100 }
2101 return Ok(false);
2102 } else {
2103 current
2104 .entry(segment.to_string())
2105 .or_insert_with(|| json!({}));
2106 current = current
2107 .get_mut(segment)
2108 .and_then(|v| v.as_object_mut())
2109 .ok_or("Path collision: expected object")?;
2110 }
2111 }
2112
2113 Ok(false)
2114 }
2115
2116 fn set_field_sum(
2117 &mut self,
2118 object_reg: Register,
2119 path: &str,
2120 value_reg: Register,
2121 ) -> Result<bool> {
2122 let compiled = self.get_compiled_path(path);
2123 let segments = compiled.segments();
2124 let new_value = &self.registers[value_reg];
2125
2126 let new_val_num = new_value
2128 .as_i64()
2129 .or_else(|| new_value.as_u64().map(|n| n as i64))
2130 .ok_or("Sum requires numeric value")?;
2131
2132 if !self.registers[object_reg].is_object() {
2133 self.registers[object_reg] = json!({});
2134 }
2135
2136 let obj = self.registers[object_reg]
2137 .as_object_mut()
2138 .ok_or("Not an object")?;
2139
2140 let mut current = obj;
2141 for (i, segment) in segments.iter().enumerate() {
2142 if i == segments.len() - 1 {
2143 let current_val = current
2144 .get(segment)
2145 .and_then(|v| {
2146 if v.is_null() {
2147 None
2148 } else {
2149 v.as_i64().or_else(|| v.as_u64().map(|n| n as i64))
2150 }
2151 })
2152 .unwrap_or(0);
2153
2154 let sum = current_val + new_val_num;
2155 current.insert(segment.to_string(), json!(sum));
2156 return Ok(true);
2157 } else {
2158 current
2159 .entry(segment.to_string())
2160 .or_insert_with(|| json!({}));
2161 current = current
2162 .get_mut(segment)
2163 .and_then(|v| v.as_object_mut())
2164 .ok_or("Path collision: expected object")?;
2165 }
2166 }
2167
2168 Ok(false)
2169 }
2170
2171 fn set_field_increment(&mut self, object_reg: Register, path: &str) -> Result<bool> {
2172 let compiled = self.get_compiled_path(path);
2173 let segments = compiled.segments();
2174
2175 if !self.registers[object_reg].is_object() {
2176 self.registers[object_reg] = json!({});
2177 }
2178
2179 let obj = self.registers[object_reg]
2180 .as_object_mut()
2181 .ok_or("Not an object")?;
2182
2183 let mut current = obj;
2184 for (i, segment) in segments.iter().enumerate() {
2185 if i == segments.len() - 1 {
2186 let current_val = current
2188 .get(segment)
2189 .and_then(|v| {
2190 if v.is_null() {
2191 None
2192 } else {
2193 v.as_i64().or_else(|| v.as_u64().map(|n| n as i64))
2194 }
2195 })
2196 .unwrap_or(0);
2197
2198 let incremented = current_val + 1;
2199 current.insert(segment.to_string(), json!(incremented));
2200 return Ok(true);
2201 } else {
2202 current
2203 .entry(segment.to_string())
2204 .or_insert_with(|| json!({}));
2205 current = current
2206 .get_mut(segment)
2207 .and_then(|v| v.as_object_mut())
2208 .ok_or("Path collision: expected object")?;
2209 }
2210 }
2211
2212 Ok(false)
2213 }
2214
2215 fn set_field_min(
2216 &mut self,
2217 object_reg: Register,
2218 path: &str,
2219 value_reg: Register,
2220 ) -> Result<bool> {
2221 let compiled = self.get_compiled_path(path);
2222 let segments = compiled.segments();
2223 let new_value = self.registers[value_reg].clone();
2224
2225 if !self.registers[object_reg].is_object() {
2226 self.registers[object_reg] = json!({});
2227 }
2228
2229 let obj = self.registers[object_reg]
2230 .as_object_mut()
2231 .ok_or("Not an object")?;
2232
2233 let mut current = obj;
2234 for (i, segment) in segments.iter().enumerate() {
2235 if i == segments.len() - 1 {
2236 let should_update = if let Some(current_value) = current.get(segment) {
2237 if current_value.is_null() {
2238 true
2239 } else {
2240 match (current_value.as_i64(), new_value.as_i64()) {
2241 (Some(current_val), Some(new_val)) => new_val < current_val,
2242 (Some(current_val), None) if new_value.as_u64().is_some() => {
2243 (new_value.as_u64().unwrap() as i64) < current_val
2244 }
2245 (None, Some(new_val)) if current_value.as_u64().is_some() => {
2246 new_val < current_value.as_u64().unwrap() as i64
2247 }
2248 (None, None) => match (current_value.as_u64(), new_value.as_u64()) {
2249 (Some(current_val), Some(new_val)) => new_val < current_val,
2250 _ => match (current_value.as_f64(), new_value.as_f64()) {
2251 (Some(current_val), Some(new_val)) => new_val < current_val,
2252 _ => false,
2253 },
2254 },
2255 _ => false,
2256 }
2257 }
2258 } else {
2259 true
2260 };
2261
2262 if should_update {
2263 current.insert(segment.to_string(), new_value);
2264 return Ok(true);
2265 }
2266 return Ok(false);
2267 } else {
2268 current
2269 .entry(segment.to_string())
2270 .or_insert_with(|| json!({}));
2271 current = current
2272 .get_mut(segment)
2273 .and_then(|v| v.as_object_mut())
2274 .ok_or("Path collision: expected object")?;
2275 }
2276 }
2277
2278 Ok(false)
2279 }
2280
2281 fn get_field(&mut self, object_reg: Register, path: &str) -> Result<Value> {
2282 let compiled = self.get_compiled_path(path);
2283 let segments = compiled.segments();
2284 let mut current = &self.registers[object_reg];
2285
2286 for segment in segments {
2287 current = current
2288 .get(segment)
2289 .ok_or_else(|| format!("Field not found: {}", segment))?;
2290 }
2291
2292 Ok(current.clone())
2293 }
2294
2295 fn append_to_array(
2296 &mut self,
2297 object_reg: Register,
2298 path: &str,
2299 value_reg: Register,
2300 max_length: usize,
2301 ) -> Result<()> {
2302 let compiled = self.get_compiled_path(path);
2303 let segments = compiled.segments();
2304 let value = self.registers[value_reg].clone();
2305
2306 if !self.registers[object_reg].is_object() {
2307 self.registers[object_reg] = json!({});
2308 }
2309
2310 let obj = self.registers[object_reg]
2311 .as_object_mut()
2312 .ok_or("Not an object")?;
2313
2314 let mut current = obj;
2315 for (i, segment) in segments.iter().enumerate() {
2316 if i == segments.len() - 1 {
2317 current
2318 .entry(segment.to_string())
2319 .or_insert_with(|| json!([]));
2320 let arr = current
2321 .get_mut(segment)
2322 .and_then(|v| v.as_array_mut())
2323 .ok_or("Path is not an array")?;
2324 arr.push(value.clone());
2325
2326 if arr.len() > max_length {
2327 let excess = arr.len() - max_length;
2328 arr.drain(0..excess);
2329 }
2330 } else {
2331 current
2332 .entry(segment.to_string())
2333 .or_insert_with(|| json!({}));
2334 current = current
2335 .get_mut(segment)
2336 .and_then(|v| v.as_object_mut())
2337 .ok_or("Path collision: expected object")?;
2338 }
2339 }
2340
2341 Ok(())
2342 }
2343
2344 fn transform_in_place(&mut self, reg: Register, transformation: &Transformation) -> Result<()> {
2345 let value = &self.registers[reg];
2346 let transformed = self.apply_transformation(value, transformation)?;
2347 self.registers[reg] = transformed;
2348 Ok(())
2349 }
2350
2351 fn apply_transformation(
2352 &self,
2353 value: &Value,
2354 transformation: &Transformation,
2355 ) -> Result<Value> {
2356 match transformation {
2357 Transformation::HexEncode => {
2358 if let Some(arr) = value.as_array() {
2359 let bytes: Vec<u8> = arr
2360 .iter()
2361 .filter_map(|v| v.as_u64().map(|n| n as u8))
2362 .collect();
2363 let hex = hex::encode(&bytes);
2364 Ok(json!(hex))
2365 } else {
2366 Err("HexEncode requires an array of numbers".into())
2367 }
2368 }
2369 Transformation::HexDecode => {
2370 if let Some(s) = value.as_str() {
2371 let s = s.strip_prefix("0x").unwrap_or(s);
2372 let bytes = hex::decode(s).map_err(|e| format!("Hex decode error: {}", e))?;
2373 Ok(json!(bytes))
2374 } else {
2375 Err("HexDecode requires a string".into())
2376 }
2377 }
2378 Transformation::Base58Encode => {
2379 if let Some(arr) = value.as_array() {
2380 let bytes: Vec<u8> = arr
2381 .iter()
2382 .filter_map(|v| v.as_u64().map(|n| n as u8))
2383 .collect();
2384 let encoded = bs58::encode(&bytes).into_string();
2385 Ok(json!(encoded))
2386 } else if value.is_string() {
2387 Ok(value.clone())
2388 } else {
2389 Err("Base58Encode requires an array of numbers".into())
2390 }
2391 }
2392 Transformation::Base58Decode => {
2393 if let Some(s) = value.as_str() {
2394 let bytes = bs58::decode(s)
2395 .into_vec()
2396 .map_err(|e| format!("Base58 decode error: {}", e))?;
2397 Ok(json!(bytes))
2398 } else {
2399 Err("Base58Decode requires a string".into())
2400 }
2401 }
2402 Transformation::ToString => Ok(json!(value.to_string())),
2403 Transformation::ToNumber => {
2404 if let Some(s) = value.as_str() {
2405 let n = s
2406 .parse::<i64>()
2407 .map_err(|e| format!("Parse error: {}", e))?;
2408 Ok(json!(n))
2409 } else {
2410 Ok(value.clone())
2411 }
2412 }
2413 }
2414 }
2415
2416 fn evaluate_comparison(
2417 &self,
2418 field_value: &Value,
2419 op: &ComparisonOp,
2420 condition_value: &Value,
2421 ) -> Result<bool> {
2422 use ComparisonOp::*;
2423
2424 match op {
2425 Equal => Ok(field_value == condition_value),
2426 NotEqual => Ok(field_value != condition_value),
2427 GreaterThan => {
2428 match (field_value.as_i64(), condition_value.as_i64()) {
2430 (Some(a), Some(b)) => Ok(a > b),
2431 _ => match (field_value.as_u64(), condition_value.as_u64()) {
2432 (Some(a), Some(b)) => Ok(a > b),
2433 _ => match (field_value.as_f64(), condition_value.as_f64()) {
2434 (Some(a), Some(b)) => Ok(a > b),
2435 _ => Err("Cannot compare non-numeric values with GreaterThan".into()),
2436 },
2437 },
2438 }
2439 }
2440 GreaterThanOrEqual => match (field_value.as_i64(), condition_value.as_i64()) {
2441 (Some(a), Some(b)) => Ok(a >= b),
2442 _ => match (field_value.as_u64(), condition_value.as_u64()) {
2443 (Some(a), Some(b)) => Ok(a >= b),
2444 _ => match (field_value.as_f64(), condition_value.as_f64()) {
2445 (Some(a), Some(b)) => Ok(a >= b),
2446 _ => {
2447 Err("Cannot compare non-numeric values with GreaterThanOrEqual".into())
2448 }
2449 },
2450 },
2451 },
2452 LessThan => match (field_value.as_i64(), condition_value.as_i64()) {
2453 (Some(a), Some(b)) => Ok(a < b),
2454 _ => match (field_value.as_u64(), condition_value.as_u64()) {
2455 (Some(a), Some(b)) => Ok(a < b),
2456 _ => match (field_value.as_f64(), condition_value.as_f64()) {
2457 (Some(a), Some(b)) => Ok(a < b),
2458 _ => Err("Cannot compare non-numeric values with LessThan".into()),
2459 },
2460 },
2461 },
2462 LessThanOrEqual => match (field_value.as_i64(), condition_value.as_i64()) {
2463 (Some(a), Some(b)) => Ok(a <= b),
2464 _ => match (field_value.as_u64(), condition_value.as_u64()) {
2465 (Some(a), Some(b)) => Ok(a <= b),
2466 _ => match (field_value.as_f64(), condition_value.as_f64()) {
2467 (Some(a), Some(b)) => Ok(a <= b),
2468 _ => Err("Cannot compare non-numeric values with LessThanOrEqual".into()),
2469 },
2470 },
2471 },
2472 }
2473 }
2474
2475 #[cfg_attr(feature = "otel", instrument(
2484 name = "vm.update_pda_lookup",
2485 skip(self),
2486 fields(
2487 pda = %pda_address,
2488 seed = %seed_value,
2489 )
2490 ))]
2491 pub fn update_pda_reverse_lookup(
2492 &mut self,
2493 state_id: u32,
2494 lookup_name: &str,
2495 pda_address: String,
2496 seed_value: String,
2497 ) -> Result<Vec<PendingAccountUpdate>> {
2498 let state = self
2499 .states
2500 .get_mut(&state_id)
2501 .ok_or("State table not found")?;
2502
2503 let lookup = state
2504 .pda_reverse_lookups
2505 .entry(lookup_name.to_string())
2506 .or_insert_with(|| PdaReverseLookup::new(DEFAULT_MAX_PDA_REVERSE_LOOKUP_ENTRIES));
2507
2508 let evicted_pda = lookup.insert(pda_address.clone(), seed_value);
2509
2510 if let Some(ref evicted) = evicted_pda {
2511 if let Some((_, evicted_updates)) = state.pending_updates.remove(evicted) {
2512 let count = evicted_updates.len();
2513 self.pending_queue_size = self.pending_queue_size.saturating_sub(count as u64);
2514 }
2515 }
2516
2517 self.flush_pending_updates(state_id, &pda_address)
2519 }
2520
2521 pub fn cleanup_expired_pending_updates(&mut self, state_id: u32) -> usize {
2526 let state = match self.states.get_mut(&state_id) {
2527 Some(s) => s,
2528 None => return 0,
2529 };
2530
2531 let now = std::time::SystemTime::now()
2532 .duration_since(std::time::UNIX_EPOCH)
2533 .unwrap()
2534 .as_secs() as i64;
2535
2536 let mut removed_count = 0;
2537
2538 state.pending_updates.retain(|_pda_address, updates| {
2540 let original_len = updates.len();
2541
2542 updates.retain(|update| {
2543 let age = now - update.queued_at;
2544 age <= PENDING_UPDATE_TTL_SECONDS
2545 });
2546
2547 removed_count += original_len - updates.len();
2548
2549 !updates.is_empty()
2551 });
2552
2553 self.pending_queue_size = self.pending_queue_size.saturating_sub(removed_count as u64);
2555
2556 if removed_count > 0 {
2557 #[cfg(feature = "otel")]
2558 crate::vm_metrics::record_pending_updates_expired(
2559 removed_count as u64,
2560 &state.entity_name,
2561 );
2562 }
2563
2564 removed_count
2565 }
2566
2567 #[cfg_attr(feature = "otel", instrument(
2599 name = "vm.queue_account_update",
2600 skip(self, update),
2601 fields(
2602 pda = %update.pda_address,
2603 account_type = %update.account_type,
2604 slot = update.slot,
2605 )
2606 ))]
2607 pub fn queue_account_update(
2608 &mut self,
2609 state_id: u32,
2610 update: QueuedAccountUpdate,
2611 ) -> Result<()> {
2612 if self.pending_queue_size >= MAX_PENDING_UPDATES_TOTAL as u64 {
2613 self.cleanup_expired_pending_updates(state_id);
2614 if self.pending_queue_size >= MAX_PENDING_UPDATES_TOTAL as u64 {
2615 self.drop_oldest_pending_update(state_id)?;
2616 }
2617 }
2618
2619 let state = self
2620 .states
2621 .get_mut(&state_id)
2622 .ok_or("State table not found")?;
2623
2624 let pending = PendingAccountUpdate {
2625 account_type: update.account_type,
2626 pda_address: update.pda_address.clone(),
2627 account_data: update.account_data,
2628 slot: update.slot,
2629 write_version: update.write_version,
2630 signature: update.signature,
2631 queued_at: std::time::SystemTime::now()
2632 .duration_since(std::time::UNIX_EPOCH)
2633 .unwrap()
2634 .as_secs() as i64,
2635 };
2636
2637 let pda_address = pending.pda_address.clone();
2638 let slot = pending.slot;
2639
2640 let mut updates = state
2641 .pending_updates
2642 .entry(pda_address.clone())
2643 .or_insert_with(Vec::new);
2644
2645 let original_len = updates.len();
2646 updates.retain(|existing| existing.slot > slot);
2647 let removed_by_dedup = original_len - updates.len();
2648
2649 if removed_by_dedup > 0 {
2650 self.pending_queue_size = self
2651 .pending_queue_size
2652 .saturating_sub(removed_by_dedup as u64);
2653 }
2654
2655 if updates.len() >= MAX_PENDING_UPDATES_PER_PDA {
2656 updates.remove(0);
2657 self.pending_queue_size = self.pending_queue_size.saturating_sub(1);
2658 }
2659
2660 updates.push(pending);
2661 #[cfg(feature = "otel")]
2662 crate::vm_metrics::record_pending_update_queued(&state.entity_name);
2663
2664 Ok(())
2665 }
2666
2667 pub fn queue_instruction_event(
2668 &mut self,
2669 state_id: u32,
2670 event: QueuedInstructionEvent,
2671 ) -> Result<()> {
2672 let state = self
2673 .states
2674 .get_mut(&state_id)
2675 .ok_or("State table not found")?;
2676
2677 let pda_address = event.pda_address.clone();
2678
2679 let pending = PendingInstructionEvent {
2680 event_type: event.event_type,
2681 pda_address: event.pda_address,
2682 event_data: event.event_data,
2683 slot: event.slot,
2684 signature: event.signature,
2685 queued_at: std::time::SystemTime::now()
2686 .duration_since(std::time::UNIX_EPOCH)
2687 .unwrap()
2688 .as_secs() as i64,
2689 };
2690
2691 let mut events = state
2692 .pending_instruction_events
2693 .entry(pda_address)
2694 .or_insert_with(Vec::new);
2695
2696 if events.len() >= MAX_PENDING_UPDATES_PER_PDA {
2697 events.remove(0);
2698 }
2699
2700 events.push(pending);
2701
2702 Ok(())
2703 }
2704
2705 pub fn take_last_pda_lookup_miss(&mut self) -> Option<String> {
2706 self.last_pda_lookup_miss.take()
2707 }
2708
2709 pub fn take_last_pda_registered(&mut self) -> Option<String> {
2710 self.last_pda_registered.take()
2711 }
2712
2713 pub fn take_last_lookup_index_keys(&mut self) -> Vec<String> {
2714 std::mem::take(&mut self.last_lookup_index_keys)
2715 }
2716
2717 pub fn flush_pending_instruction_events(
2718 &mut self,
2719 state_id: u32,
2720 pda_address: &str,
2721 ) -> Vec<PendingInstructionEvent> {
2722 let state = match self.states.get_mut(&state_id) {
2723 Some(s) => s,
2724 None => return Vec::new(),
2725 };
2726
2727 if let Some((_, events)) = state.pending_instruction_events.remove(pda_address) {
2728 events
2729 } else {
2730 Vec::new()
2731 }
2732 }
2733
2734 pub fn get_pending_queue_stats(&self, state_id: u32) -> Option<PendingQueueStats> {
2736 let state = self.states.get(&state_id)?;
2737
2738 let now = std::time::SystemTime::now()
2739 .duration_since(std::time::UNIX_EPOCH)
2740 .unwrap()
2741 .as_secs() as i64;
2742
2743 let mut total_updates = 0;
2744 let mut oldest_timestamp = now;
2745 let mut largest_pda_queue = 0;
2746 let mut estimated_memory = 0;
2747
2748 for entry in state.pending_updates.iter() {
2749 let (_, updates) = entry.pair();
2750 total_updates += updates.len();
2751 largest_pda_queue = largest_pda_queue.max(updates.len());
2752
2753 for update in updates.iter() {
2754 oldest_timestamp = oldest_timestamp.min(update.queued_at);
2755 estimated_memory += update.account_type.len() +
2757 update.pda_address.len() +
2758 update.signature.len() +
2759 16 + estimate_json_size(&update.account_data);
2761 }
2762 }
2763
2764 Some(PendingQueueStats {
2765 total_updates,
2766 unique_pdas: state.pending_updates.len(),
2767 oldest_age_seconds: now - oldest_timestamp,
2768 largest_pda_queue_size: largest_pda_queue,
2769 estimated_memory_bytes: estimated_memory,
2770 })
2771 }
2772
2773 pub fn get_memory_stats(&self, state_id: u32) -> VmMemoryStats {
2774 let mut stats = VmMemoryStats {
2775 path_cache_size: self.path_cache.len(),
2776 ..Default::default()
2777 };
2778
2779 if let Some(state) = self.states.get(&state_id) {
2780 stats.state_table_entity_count = state.data.len();
2781 stats.state_table_max_entries = state.config.max_entries;
2782 stats.state_table_at_capacity = state.is_at_capacity();
2783
2784 stats.lookup_index_count = state.lookup_indexes.len();
2785 stats.lookup_index_total_entries =
2786 state.lookup_indexes.values().map(|idx| idx.len()).sum();
2787
2788 stats.temporal_index_count = state.temporal_indexes.len();
2789 stats.temporal_index_total_entries = state
2790 .temporal_indexes
2791 .values()
2792 .map(|idx| idx.total_entries())
2793 .sum();
2794
2795 stats.pda_reverse_lookup_count = state.pda_reverse_lookups.len();
2796 stats.pda_reverse_lookup_total_entries = state
2797 .pda_reverse_lookups
2798 .values()
2799 .map(|lookup| lookup.len())
2800 .sum();
2801
2802 stats.version_tracker_entries = state.version_tracker.len();
2803
2804 stats.pending_queue_stats = self.get_pending_queue_stats(state_id);
2805 }
2806
2807 stats
2808 }
2809
2810 pub fn cleanup_all_expired(&mut self, state_id: u32) -> CleanupResult {
2811 let pending_removed = self.cleanup_expired_pending_updates(state_id);
2812 let temporal_removed = self.cleanup_temporal_indexes(state_id);
2813
2814 #[cfg(feature = "otel")]
2815 if let Some(state) = self.states.get(&state_id) {
2816 crate::vm_metrics::record_cleanup(
2817 pending_removed,
2818 temporal_removed,
2819 &state.entity_name,
2820 );
2821 }
2822
2823 CleanupResult {
2824 pending_updates_removed: pending_removed,
2825 temporal_entries_removed: temporal_removed,
2826 }
2827 }
2828
2829 fn cleanup_temporal_indexes(&mut self, state_id: u32) -> usize {
2830 let state = match self.states.get_mut(&state_id) {
2831 Some(s) => s,
2832 None => return 0,
2833 };
2834
2835 let now = std::time::SystemTime::now()
2836 .duration_since(std::time::UNIX_EPOCH)
2837 .unwrap()
2838 .as_secs() as i64;
2839
2840 let cutoff = now - TEMPORAL_HISTORY_TTL_SECONDS;
2841 let mut total_removed = 0;
2842
2843 for (_, index) in state.temporal_indexes.iter_mut() {
2844 total_removed += index.cleanup_expired(cutoff);
2845 }
2846
2847 total_removed
2848 }
2849
2850 pub fn check_state_table_capacity(&self, state_id: u32) -> Option<CapacityWarning> {
2851 let state = self.states.get(&state_id)?;
2852
2853 if state.is_at_capacity() {
2854 Some(CapacityWarning {
2855 current_entries: state.data.len(),
2856 max_entries: state.config.max_entries,
2857 entries_over_limit: state.entries_over_limit(),
2858 })
2859 } else {
2860 None
2861 }
2862 }
2863
2864 fn drop_oldest_pending_update(&mut self, state_id: u32) -> Result<()> {
2866 let state = self
2867 .states
2868 .get_mut(&state_id)
2869 .ok_or("State table not found")?;
2870
2871 let mut oldest_pda: Option<String> = None;
2872 let mut oldest_timestamp = i64::MAX;
2873
2874 for entry in state.pending_updates.iter() {
2876 let (pda, updates) = entry.pair();
2877 if let Some(update) = updates.first() {
2878 if update.queued_at < oldest_timestamp {
2879 oldest_timestamp = update.queued_at;
2880 oldest_pda = Some(pda.clone());
2881 }
2882 }
2883 }
2884
2885 if let Some(pda) = oldest_pda {
2887 if let Some(mut updates) = state.pending_updates.get_mut(&pda) {
2888 if !updates.is_empty() {
2889 updates.remove(0);
2890 self.pending_queue_size = self.pending_queue_size.saturating_sub(1);
2891
2892 if updates.is_empty() {
2894 drop(updates);
2895 state.pending_updates.remove(&pda);
2896 }
2897 }
2898 }
2899 }
2900
2901 Ok(())
2902 }
2903
2904 fn flush_pending_updates(
2909 &mut self,
2910 state_id: u32,
2911 pda_address: &str,
2912 ) -> Result<Vec<PendingAccountUpdate>> {
2913 let state = self
2914 .states
2915 .get_mut(&state_id)
2916 .ok_or("State table not found")?;
2917
2918 if let Some((_, pending_updates)) = state.pending_updates.remove(pda_address) {
2919 let count = pending_updates.len();
2920 self.pending_queue_size = self.pending_queue_size.saturating_sub(count as u64);
2921 #[cfg(feature = "otel")]
2922 crate::vm_metrics::record_pending_updates_flushed(count as u64, &state.entity_name);
2923 Ok(pending_updates)
2924 } else {
2925 Ok(Vec::new())
2926 }
2927 }
2928
2929 pub fn try_pda_reverse_lookup(
2931 &mut self,
2932 state_id: u32,
2933 lookup_name: &str,
2934 pda_address: &str,
2935 ) -> Option<String> {
2936 let state = self.states.get_mut(&state_id)?;
2937
2938 if let Some(lookup) = state.pda_reverse_lookups.get_mut(lookup_name) {
2939 if let Some(value) = lookup.lookup(pda_address) {
2940 self.pda_cache_hits += 1;
2941 return Some(value);
2942 }
2943 }
2944
2945 self.pda_cache_misses += 1;
2946 None
2947 }
2948
2949 pub fn evaluate_computed_expr(&self, expr: &ComputedExpr, state: &Value) -> Result<Value> {
2956 self.evaluate_computed_expr_with_env(expr, state, &std::collections::HashMap::new())
2957 }
2958
2959 fn evaluate_computed_expr_with_env(
2961 &self,
2962 expr: &ComputedExpr,
2963 state: &Value,
2964 env: &std::collections::HashMap<String, Value>,
2965 ) -> Result<Value> {
2966 match expr {
2967 ComputedExpr::FieldRef { path } => self.get_field_from_state(state, path),
2968
2969 ComputedExpr::Var { name } => env
2970 .get(name)
2971 .cloned()
2972 .ok_or_else(|| format!("Undefined variable: {}", name).into()),
2973
2974 ComputedExpr::Let { name, value, body } => {
2975 let val = self.evaluate_computed_expr_with_env(value, state, env)?;
2976 let mut new_env = env.clone();
2977 new_env.insert(name.clone(), val);
2978 self.evaluate_computed_expr_with_env(body, state, &new_env)
2979 }
2980
2981 ComputedExpr::If {
2982 condition,
2983 then_branch,
2984 else_branch,
2985 } => {
2986 let cond_val = self.evaluate_computed_expr_with_env(condition, state, env)?;
2987 if self.value_to_bool(&cond_val) {
2988 self.evaluate_computed_expr_with_env(then_branch, state, env)
2989 } else {
2990 self.evaluate_computed_expr_with_env(else_branch, state, env)
2991 }
2992 }
2993
2994 ComputedExpr::None => Ok(Value::Null),
2995
2996 ComputedExpr::Some { value } => self.evaluate_computed_expr_with_env(value, state, env),
2997
2998 ComputedExpr::Slice { expr, start, end } => {
2999 let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
3000 match val {
3001 Value::Array(arr) => {
3002 let slice: Vec<Value> = arr.get(*start..*end).unwrap_or(&[]).to_vec();
3003 Ok(Value::Array(slice))
3004 }
3005 _ => Err(format!("Cannot slice non-array value: {:?}", val).into()),
3006 }
3007 }
3008
3009 ComputedExpr::Index { expr, index } => {
3010 let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
3011 match val {
3012 Value::Array(arr) => Ok(arr.get(*index).cloned().unwrap_or(Value::Null)),
3013 _ => Err(format!("Cannot index non-array value: {:?}", val).into()),
3014 }
3015 }
3016
3017 ComputedExpr::U64FromLeBytes { bytes } => {
3018 let val = self.evaluate_computed_expr_with_env(bytes, state, env)?;
3019 let byte_vec = self.value_to_bytes(&val)?;
3020 if byte_vec.len() < 8 {
3021 return Err(format!(
3022 "u64::from_le_bytes requires 8 bytes, got {}",
3023 byte_vec.len()
3024 )
3025 .into());
3026 }
3027 let arr: [u8; 8] = byte_vec[..8]
3028 .try_into()
3029 .map_err(|_| "Failed to convert to [u8; 8]")?;
3030 Ok(json!(u64::from_le_bytes(arr)))
3031 }
3032
3033 ComputedExpr::U64FromBeBytes { bytes } => {
3034 let val = self.evaluate_computed_expr_with_env(bytes, state, env)?;
3035 let byte_vec = self.value_to_bytes(&val)?;
3036 if byte_vec.len() < 8 {
3037 return Err(format!(
3038 "u64::from_be_bytes requires 8 bytes, got {}",
3039 byte_vec.len()
3040 )
3041 .into());
3042 }
3043 let arr: [u8; 8] = byte_vec[..8]
3044 .try_into()
3045 .map_err(|_| "Failed to convert to [u8; 8]")?;
3046 Ok(json!(u64::from_be_bytes(arr)))
3047 }
3048
3049 ComputedExpr::ByteArray { bytes } => {
3050 Ok(Value::Array(bytes.iter().map(|b| json!(*b)).collect()))
3051 }
3052
3053 ComputedExpr::Closure { param, body } => {
3054 Ok(json!({
3057 "__closure": {
3058 "param": param,
3059 "body": serde_json::to_value(body).unwrap_or(Value::Null)
3060 }
3061 }))
3062 }
3063
3064 ComputedExpr::Unary { op, expr } => {
3065 let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
3066 self.apply_unary_op(op, &val)
3067 }
3068
3069 ComputedExpr::JsonToBytes { expr } => {
3070 let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
3071 let bytes = self.value_to_bytes(&val)?;
3073 Ok(Value::Array(bytes.iter().map(|b| json!(*b)).collect()))
3074 }
3075
3076 ComputedExpr::UnwrapOr { expr, default } => {
3077 let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
3078 if val.is_null() {
3079 Ok(default.clone())
3080 } else {
3081 Ok(val)
3082 }
3083 }
3084
3085 ComputedExpr::Binary { op, left, right } => {
3086 let l = self.evaluate_computed_expr_with_env(left, state, env)?;
3087 let r = self.evaluate_computed_expr_with_env(right, state, env)?;
3088 self.apply_binary_op(op, &l, &r)
3089 }
3090
3091 ComputedExpr::Cast { expr, to_type } => {
3092 let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
3093 self.apply_cast(&val, to_type)
3094 }
3095
3096 ComputedExpr::MethodCall { expr, method, args } => {
3097 let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
3098 if method == "map" && args.len() == 1 {
3100 if let ComputedExpr::Closure { param, body } = &args[0] {
3101 if val.is_null() {
3103 return Ok(Value::Null);
3104 }
3105 let mut closure_env = env.clone();
3107 closure_env.insert(param.clone(), val);
3108 return self.evaluate_computed_expr_with_env(body, state, &closure_env);
3109 }
3110 }
3111 let evaluated_args: Vec<Value> = args
3112 .iter()
3113 .map(|a| self.evaluate_computed_expr_with_env(a, state, env))
3114 .collect::<Result<Vec<_>>>()?;
3115 self.apply_method_call(&val, method, &evaluated_args)
3116 }
3117
3118 ComputedExpr::Literal { value } => Ok(value.clone()),
3119
3120 ComputedExpr::Paren { expr } => self.evaluate_computed_expr_with_env(expr, state, env),
3121 }
3122 }
3123
3124 fn value_to_bytes(&self, val: &Value) -> Result<Vec<u8>> {
3126 match val {
3127 Value::Array(arr) => arr
3128 .iter()
3129 .map(|v| {
3130 v.as_u64()
3131 .map(|n| n as u8)
3132 .ok_or_else(|| "Array element not a valid byte".into())
3133 })
3134 .collect(),
3135 Value::String(s) => {
3136 if s.starts_with("0x") || s.starts_with("0X") {
3138 hex::decode(&s[2..]).map_err(|e| format!("Invalid hex string: {}", e).into())
3139 } else {
3140 hex::decode(s).map_err(|e| format!("Invalid hex string: {}", e).into())
3141 }
3142 }
3143 _ => Err(format!("Cannot convert {:?} to bytes", val).into()),
3144 }
3145 }
3146
3147 fn apply_unary_op(&self, op: &crate::ast::UnaryOp, val: &Value) -> Result<Value> {
3149 use crate::ast::UnaryOp;
3150 match op {
3151 UnaryOp::Not => Ok(json!(!self.value_to_bool(val))),
3152 UnaryOp::ReverseBits => match val.as_u64() {
3153 Some(n) => Ok(json!(n.reverse_bits())),
3154 None => match val.as_i64() {
3155 Some(n) => Ok(json!((n as u64).reverse_bits())),
3156 None => Err("reverse_bits requires an integer".into()),
3157 },
3158 },
3159 }
3160 }
3161
3162 fn get_field_from_state(&self, state: &Value, path: &str) -> Result<Value> {
3164 let segments: Vec<&str> = path.split('.').collect();
3165 let mut current = state;
3166
3167 for segment in segments {
3168 match current.get(segment) {
3169 Some(v) => current = v,
3170 None => return Ok(Value::Null),
3171 }
3172 }
3173
3174 Ok(current.clone())
3175 }
3176
3177 fn apply_binary_op(&self, op: &BinaryOp, left: &Value, right: &Value) -> Result<Value> {
3179 match op {
3180 BinaryOp::Add => self.numeric_op(left, right, |a, b| a + b, |a, b| a + b),
3182 BinaryOp::Sub => self.numeric_op(left, right, |a, b| a - b, |a, b| a - b),
3183 BinaryOp::Mul => self.numeric_op(left, right, |a, b| a * b, |a, b| a * b),
3184 BinaryOp::Div => {
3185 if let Some(r) = right.as_i64() {
3187 if r == 0 {
3188 return Err("Division by zero".into());
3189 }
3190 }
3191 if let Some(r) = right.as_f64() {
3192 if r == 0.0 {
3193 return Err("Division by zero".into());
3194 }
3195 }
3196 self.numeric_op(left, right, |a, b| a / b, |a, b| a / b)
3197 }
3198 BinaryOp::Mod => {
3199 match (left.as_i64(), right.as_i64()) {
3201 (Some(a), Some(b)) if b != 0 => Ok(json!(a % b)),
3202 (None, _) | (_, None) => match (left.as_u64(), right.as_u64()) {
3203 (Some(a), Some(b)) if b != 0 => Ok(json!(a % b)),
3204 _ => Err("Modulo requires non-zero integer operands".into()),
3205 },
3206 _ => Err("Modulo by zero".into()),
3207 }
3208 }
3209
3210 BinaryOp::Gt => self.comparison_op(left, right, |a, b| a > b, |a, b| a > b),
3212 BinaryOp::Lt => self.comparison_op(left, right, |a, b| a < b, |a, b| a < b),
3213 BinaryOp::Gte => self.comparison_op(left, right, |a, b| a >= b, |a, b| a >= b),
3214 BinaryOp::Lte => self.comparison_op(left, right, |a, b| a <= b, |a, b| a <= b),
3215 BinaryOp::Eq => Ok(json!(left == right)),
3216 BinaryOp::Ne => Ok(json!(left != right)),
3217
3218 BinaryOp::And => {
3220 let l_bool = self.value_to_bool(left);
3221 let r_bool = self.value_to_bool(right);
3222 Ok(json!(l_bool && r_bool))
3223 }
3224 BinaryOp::Or => {
3225 let l_bool = self.value_to_bool(left);
3226 let r_bool = self.value_to_bool(right);
3227 Ok(json!(l_bool || r_bool))
3228 }
3229
3230 BinaryOp::Xor => match (left.as_u64(), right.as_u64()) {
3232 (Some(a), Some(b)) => Ok(json!(a ^ b)),
3233 _ => match (left.as_i64(), right.as_i64()) {
3234 (Some(a), Some(b)) => Ok(json!(a ^ b)),
3235 _ => Err("XOR requires integer operands".into()),
3236 },
3237 },
3238 BinaryOp::BitAnd => match (left.as_u64(), right.as_u64()) {
3239 (Some(a), Some(b)) => Ok(json!(a & b)),
3240 _ => match (left.as_i64(), right.as_i64()) {
3241 (Some(a), Some(b)) => Ok(json!(a & b)),
3242 _ => Err("BitAnd requires integer operands".into()),
3243 },
3244 },
3245 BinaryOp::BitOr => match (left.as_u64(), right.as_u64()) {
3246 (Some(a), Some(b)) => Ok(json!(a | b)),
3247 _ => match (left.as_i64(), right.as_i64()) {
3248 (Some(a), Some(b)) => Ok(json!(a | b)),
3249 _ => Err("BitOr requires integer operands".into()),
3250 },
3251 },
3252 BinaryOp::Shl => match (left.as_u64(), right.as_u64()) {
3253 (Some(a), Some(b)) => Ok(json!(a << b)),
3254 _ => match (left.as_i64(), right.as_i64()) {
3255 (Some(a), Some(b)) => Ok(json!(a << b)),
3256 _ => Err("Shl requires integer operands".into()),
3257 },
3258 },
3259 BinaryOp::Shr => match (left.as_u64(), right.as_u64()) {
3260 (Some(a), Some(b)) => Ok(json!(a >> b)),
3261 _ => match (left.as_i64(), right.as_i64()) {
3262 (Some(a), Some(b)) => Ok(json!(a >> b)),
3263 _ => Err("Shr requires integer operands".into()),
3264 },
3265 },
3266 }
3267 }
3268
3269 fn numeric_op<F1, F2>(
3271 &self,
3272 left: &Value,
3273 right: &Value,
3274 int_op: F1,
3275 float_op: F2,
3276 ) -> Result<Value>
3277 where
3278 F1: Fn(i64, i64) -> i64,
3279 F2: Fn(f64, f64) -> f64,
3280 {
3281 if let (Some(a), Some(b)) = (left.as_i64(), right.as_i64()) {
3283 return Ok(json!(int_op(a, b)));
3284 }
3285
3286 if let (Some(a), Some(b)) = (left.as_u64(), right.as_u64()) {
3288 return Ok(json!(int_op(a as i64, b as i64)));
3290 }
3291
3292 if let (Some(a), Some(b)) = (left.as_f64(), right.as_f64()) {
3294 return Ok(json!(float_op(a, b)));
3295 }
3296
3297 if left.is_null() || right.is_null() {
3299 return Ok(Value::Null);
3300 }
3301
3302 Err(format!(
3303 "Cannot perform numeric operation on {:?} and {:?}",
3304 left, right
3305 )
3306 .into())
3307 }
3308
3309 fn comparison_op<F1, F2>(
3311 &self,
3312 left: &Value,
3313 right: &Value,
3314 int_cmp: F1,
3315 float_cmp: F2,
3316 ) -> Result<Value>
3317 where
3318 F1: Fn(i64, i64) -> bool,
3319 F2: Fn(f64, f64) -> bool,
3320 {
3321 if let (Some(a), Some(b)) = (left.as_i64(), right.as_i64()) {
3323 return Ok(json!(int_cmp(a, b)));
3324 }
3325
3326 if let (Some(a), Some(b)) = (left.as_u64(), right.as_u64()) {
3328 return Ok(json!(int_cmp(a as i64, b as i64)));
3329 }
3330
3331 if let (Some(a), Some(b)) = (left.as_f64(), right.as_f64()) {
3333 return Ok(json!(float_cmp(a, b)));
3334 }
3335
3336 if left.is_null() || right.is_null() {
3338 return Ok(json!(false));
3339 }
3340
3341 Err(format!("Cannot compare {:?} and {:?}", left, right).into())
3342 }
3343
3344 fn value_to_bool(&self, value: &Value) -> bool {
3346 match value {
3347 Value::Null => false,
3348 Value::Bool(b) => *b,
3349 Value::Number(n) => {
3350 if let Some(i) = n.as_i64() {
3351 i != 0
3352 } else if let Some(f) = n.as_f64() {
3353 f != 0.0
3354 } else {
3355 true
3356 }
3357 }
3358 Value::String(s) => !s.is_empty(),
3359 Value::Array(arr) => !arr.is_empty(),
3360 Value::Object(obj) => !obj.is_empty(),
3361 }
3362 }
3363
3364 fn apply_cast(&self, value: &Value, to_type: &str) -> Result<Value> {
3366 match to_type {
3367 "i8" | "i16" | "i32" | "i64" | "isize" => {
3368 if let Some(n) = value.as_i64() {
3369 Ok(json!(n))
3370 } else if let Some(n) = value.as_u64() {
3371 Ok(json!(n as i64))
3372 } else if let Some(n) = value.as_f64() {
3373 Ok(json!(n as i64))
3374 } else if let Some(s) = value.as_str() {
3375 s.parse::<i64>()
3376 .map(|n| json!(n))
3377 .map_err(|e| format!("Cannot parse '{}' as integer: {}", s, e).into())
3378 } else {
3379 Err(format!("Cannot cast {:?} to {}", value, to_type).into())
3380 }
3381 }
3382 "u8" | "u16" | "u32" | "u64" | "usize" => {
3383 if let Some(n) = value.as_u64() {
3384 Ok(json!(n))
3385 } else if let Some(n) = value.as_i64() {
3386 Ok(json!(n as u64))
3387 } else if let Some(n) = value.as_f64() {
3388 Ok(json!(n as u64))
3389 } else if let Some(s) = value.as_str() {
3390 s.parse::<u64>().map(|n| json!(n)).map_err(|e| {
3391 format!("Cannot parse '{}' as unsigned integer: {}", s, e).into()
3392 })
3393 } else {
3394 Err(format!("Cannot cast {:?} to {}", value, to_type).into())
3395 }
3396 }
3397 "f32" | "f64" => {
3398 if let Some(n) = value.as_f64() {
3399 Ok(json!(n))
3400 } else if let Some(n) = value.as_i64() {
3401 Ok(json!(n as f64))
3402 } else if let Some(n) = value.as_u64() {
3403 Ok(json!(n as f64))
3404 } else if let Some(s) = value.as_str() {
3405 s.parse::<f64>()
3406 .map(|n| json!(n))
3407 .map_err(|e| format!("Cannot parse '{}' as float: {}", s, e).into())
3408 } else {
3409 Err(format!("Cannot cast {:?} to {}", value, to_type).into())
3410 }
3411 }
3412 "String" | "string" => Ok(json!(value.to_string())),
3413 "bool" => Ok(json!(self.value_to_bool(value))),
3414 _ => {
3415 Ok(value.clone())
3417 }
3418 }
3419 }
3420
3421 fn apply_method_call(&self, value: &Value, method: &str, args: &[Value]) -> Result<Value> {
3423 match method {
3424 "unwrap_or" => {
3425 if value.is_null() && !args.is_empty() {
3426 Ok(args[0].clone())
3427 } else {
3428 Ok(value.clone())
3429 }
3430 }
3431 "unwrap_or_default" => {
3432 if value.is_null() {
3433 Ok(json!(0))
3435 } else {
3436 Ok(value.clone())
3437 }
3438 }
3439 "is_some" => Ok(json!(!value.is_null())),
3440 "is_none" => Ok(json!(value.is_null())),
3441 "abs" => {
3442 if let Some(n) = value.as_i64() {
3443 Ok(json!(n.abs()))
3444 } else if let Some(n) = value.as_f64() {
3445 Ok(json!(n.abs()))
3446 } else {
3447 Err(format!("Cannot call abs() on {:?}", value).into())
3448 }
3449 }
3450 "len" => {
3451 if let Some(s) = value.as_str() {
3452 Ok(json!(s.len()))
3453 } else if let Some(arr) = value.as_array() {
3454 Ok(json!(arr.len()))
3455 } else if let Some(obj) = value.as_object() {
3456 Ok(json!(obj.len()))
3457 } else {
3458 Err(format!("Cannot call len() on {:?}", value).into())
3459 }
3460 }
3461 "to_string" => Ok(json!(value.to_string())),
3462 "min" => {
3463 if args.is_empty() {
3464 return Err("min() requires an argument".into());
3465 }
3466 let other = &args[0];
3467 if let (Some(a), Some(b)) = (value.as_i64(), other.as_i64()) {
3468 Ok(json!(a.min(b)))
3469 } else if let (Some(a), Some(b)) = (value.as_f64(), other.as_f64()) {
3470 Ok(json!(a.min(b)))
3471 } else {
3472 Err(format!("Cannot call min() on {:?} and {:?}", value, other).into())
3473 }
3474 }
3475 "max" => {
3476 if args.is_empty() {
3477 return Err("max() requires an argument".into());
3478 }
3479 let other = &args[0];
3480 if let (Some(a), Some(b)) = (value.as_i64(), other.as_i64()) {
3481 Ok(json!(a.max(b)))
3482 } else if let (Some(a), Some(b)) = (value.as_f64(), other.as_f64()) {
3483 Ok(json!(a.max(b)))
3484 } else {
3485 Err(format!("Cannot call max() on {:?} and {:?}", value, other).into())
3486 }
3487 }
3488 "saturating_add" => {
3489 if args.is_empty() {
3490 return Err("saturating_add() requires an argument".into());
3491 }
3492 let other = &args[0];
3493 if let (Some(a), Some(b)) = (value.as_i64(), other.as_i64()) {
3494 Ok(json!(a.saturating_add(b)))
3495 } else if let (Some(a), Some(b)) = (value.as_u64(), other.as_u64()) {
3496 Ok(json!(a.saturating_add(b)))
3497 } else {
3498 Err(format!(
3499 "Cannot call saturating_add() on {:?} and {:?}",
3500 value, other
3501 )
3502 .into())
3503 }
3504 }
3505 "saturating_sub" => {
3506 if args.is_empty() {
3507 return Err("saturating_sub() requires an argument".into());
3508 }
3509 let other = &args[0];
3510 if let (Some(a), Some(b)) = (value.as_i64(), other.as_i64()) {
3511 Ok(json!(a.saturating_sub(b)))
3512 } else if let (Some(a), Some(b)) = (value.as_u64(), other.as_u64()) {
3513 Ok(json!(a.saturating_sub(b)))
3514 } else {
3515 Err(format!(
3516 "Cannot call saturating_sub() on {:?} and {:?}",
3517 value, other
3518 )
3519 .into())
3520 }
3521 }
3522 _ => Err(format!("Unknown method call: {}()", method).into()),
3523 }
3524 }
3525
3526 pub fn evaluate_computed_fields_from_ast(
3529 &self,
3530 state: &mut Value,
3531 computed_field_specs: &[ComputedFieldSpec],
3532 ) -> Result<Vec<String>> {
3533 let mut updated_paths = Vec::new();
3534
3535 for spec in computed_field_specs {
3536 if let Ok(result) = self.evaluate_computed_expr(&spec.expression, state) {
3537 self.set_field_in_state(state, &spec.target_path, result)?;
3538 updated_paths.push(spec.target_path.clone());
3539 }
3540 }
3541
3542 Ok(updated_paths)
3543 }
3544
3545 fn set_field_in_state(&self, state: &mut Value, path: &str, value: Value) -> Result<()> {
3547 let segments: Vec<&str> = path.split('.').collect();
3548
3549 if segments.is_empty() {
3550 return Err("Empty path".into());
3551 }
3552
3553 let mut current = state;
3555 for (i, segment) in segments.iter().enumerate() {
3556 if i == segments.len() - 1 {
3557 if let Some(obj) = current.as_object_mut() {
3559 obj.insert(segment.to_string(), value);
3560 return Ok(());
3561 } else {
3562 return Err(format!("Cannot set field '{}' on non-object", segment).into());
3563 }
3564 } else {
3565 if !current.is_object() {
3567 *current = json!({});
3568 }
3569 let obj = current.as_object_mut().unwrap();
3570 current = obj.entry(segment.to_string()).or_insert_with(|| json!({}));
3571 }
3572 }
3573
3574 Ok(())
3575 }
3576
3577 pub fn create_evaluator_from_specs(
3580 specs: Vec<ComputedFieldSpec>,
3581 ) -> impl Fn(&mut Value) -> Result<()> + Send + Sync + 'static {
3582 move |state: &mut Value| {
3583 let vm = VmContext::new();
3586 vm.evaluate_computed_fields_from_ast(state, &specs)?;
3587 Ok(())
3588 }
3589 }
3590}
3591
3592impl Default for VmContext {
3593 fn default() -> Self {
3594 Self::new()
3595 }
3596}
3597
3598impl crate::resolvers::ReverseLookupUpdater for VmContext {
3600 fn update(&mut self, pda_address: String, seed_value: String) -> Vec<PendingAccountUpdate> {
3601 self.update_pda_reverse_lookup(0, "default_pda_lookup", pda_address, seed_value)
3603 .unwrap_or_else(|e| {
3604 tracing::error!("Failed to update PDA reverse lookup: {}", e);
3605 Vec::new()
3606 })
3607 }
3608
3609 fn flush_pending(&mut self, pda_address: &str) -> Vec<PendingAccountUpdate> {
3610 self.flush_pending_updates(0, pda_address)
3612 .unwrap_or_else(|e| {
3613 tracing::error!("Failed to flush pending updates: {}", e);
3614 Vec::new()
3615 })
3616 }
3617}
3618
3619#[cfg(test)]
3620mod tests {
3621 use super::*;
3622 use crate::ast::{BinaryOp, ComputedExpr, ComputedFieldSpec};
3623
3624 #[test]
3625 fn test_computed_field_preserves_integer_type() {
3626 let vm = VmContext::new();
3627
3628 let mut state = serde_json::json!({
3629 "trading": {
3630 "total_buy_volume": 20000000000_i64,
3631 "total_sell_volume": 17951316474_i64
3632 }
3633 });
3634
3635 let spec = ComputedFieldSpec {
3636 target_path: "trading.total_volume".to_string(),
3637 result_type: "Option<u64>".to_string(),
3638 expression: ComputedExpr::Binary {
3639 op: BinaryOp::Add,
3640 left: Box::new(ComputedExpr::UnwrapOr {
3641 expr: Box::new(ComputedExpr::FieldRef {
3642 path: "trading.total_buy_volume".to_string(),
3643 }),
3644 default: serde_json::json!(0),
3645 }),
3646 right: Box::new(ComputedExpr::UnwrapOr {
3647 expr: Box::new(ComputedExpr::FieldRef {
3648 path: "trading.total_sell_volume".to_string(),
3649 }),
3650 default: serde_json::json!(0),
3651 }),
3652 },
3653 };
3654
3655 vm.evaluate_computed_fields_from_ast(&mut state, &[spec])
3656 .unwrap();
3657
3658 let total_volume = state
3659 .get("trading")
3660 .and_then(|t| t.get("total_volume"))
3661 .expect("total_volume should exist");
3662
3663 let serialized = serde_json::to_string(total_volume).unwrap();
3664 assert!(
3665 !serialized.contains('.'),
3666 "Integer should not have decimal point: {}",
3667 serialized
3668 );
3669 assert_eq!(
3670 total_volume.as_i64(),
3671 Some(37951316474),
3672 "Value should be correct sum"
3673 );
3674 }
3675
3676 #[test]
3677 fn test_set_field_sum_preserves_integer_type() {
3678 let mut vm = VmContext::new();
3679 vm.registers[0] = serde_json::json!({});
3680 vm.registers[1] = serde_json::json!(20000000000_i64);
3681 vm.registers[2] = serde_json::json!(17951316474_i64);
3682
3683 vm.set_field_sum(0, "trading.total_buy_volume", 1).unwrap();
3684 vm.set_field_sum(0, "trading.total_sell_volume", 2).unwrap();
3685
3686 let state = &vm.registers[0];
3687 let buy_vol = state
3688 .get("trading")
3689 .and_then(|t| t.get("total_buy_volume"))
3690 .unwrap();
3691 let sell_vol = state
3692 .get("trading")
3693 .and_then(|t| t.get("total_sell_volume"))
3694 .unwrap();
3695
3696 let buy_serialized = serde_json::to_string(buy_vol).unwrap();
3697 let sell_serialized = serde_json::to_string(sell_vol).unwrap();
3698
3699 assert!(
3700 !buy_serialized.contains('.'),
3701 "Buy volume should not have decimal: {}",
3702 buy_serialized
3703 );
3704 assert!(
3705 !sell_serialized.contains('.'),
3706 "Sell volume should not have decimal: {}",
3707 sell_serialized
3708 );
3709 }
3710}