1use crate::ast::{
2 BinaryOp, ComparisonOp, ComputedExpr, ComputedFieldSpec, FieldPath, Transformation,
3};
4use crate::compiler::{MultiEntityBytecode, OpCode};
5use crate::Mutation;
6use dashmap::DashMap;
7use lru::LruCache;
8use serde_json::{json, Value};
9use std::collections::{HashMap, HashSet};
10use std::num::NonZeroUsize;
11
12#[cfg(feature = "otel")]
13use tracing::instrument;
14
15#[derive(Debug, Clone, Default)]
18pub struct UpdateContext {
19 pub slot: Option<u64>,
21 pub signature: Option<String>,
23 pub timestamp: Option<i64>,
26 pub write_version: Option<u64>,
29 pub txn_index: Option<u64>,
32 pub metadata: HashMap<String, Value>,
34}
35
36impl UpdateContext {
37 pub fn new(slot: u64, signature: String) -> Self {
39 Self {
40 slot: Some(slot),
41 signature: Some(signature),
42 timestamp: None,
43 write_version: None,
44 txn_index: None,
45 metadata: HashMap::new(),
46 }
47 }
48
49 pub fn with_timestamp(slot: u64, signature: String, timestamp: i64) -> Self {
51 Self {
52 slot: Some(slot),
53 signature: Some(signature),
54 timestamp: Some(timestamp),
55 write_version: None,
56 txn_index: None,
57 metadata: HashMap::new(),
58 }
59 }
60
61 pub fn new_account(slot: u64, signature: String, write_version: u64) -> Self {
63 Self {
64 slot: Some(slot),
65 signature: Some(signature),
66 timestamp: None,
67 write_version: Some(write_version),
68 txn_index: None,
69 metadata: HashMap::new(),
70 }
71 }
72
73 pub fn new_instruction(slot: u64, signature: String, txn_index: u64) -> Self {
75 Self {
76 slot: Some(slot),
77 signature: Some(signature),
78 timestamp: None,
79 write_version: None,
80 txn_index: Some(txn_index),
81 metadata: HashMap::new(),
82 }
83 }
84
85 pub fn timestamp(&self) -> i64 {
87 self.timestamp.unwrap_or_else(|| {
88 std::time::SystemTime::now()
89 .duration_since(std::time::UNIX_EPOCH)
90 .unwrap()
91 .as_secs() as i64
92 })
93 }
94
95 pub fn empty() -> Self {
97 Self::default()
98 }
99
100 pub fn is_account_update(&self) -> bool {
103 self.write_version.is_some() && self.txn_index.is_none()
104 }
105
106 pub fn is_instruction_update(&self) -> bool {
108 self.txn_index.is_some() && self.write_version.is_none()
109 }
110
111 pub fn with_metadata(mut self, key: String, value: Value) -> Self {
112 self.metadata.insert(key, value);
113 self
114 }
115
116 pub fn get_metadata(&self, key: &str) -> Option<&Value> {
118 self.metadata.get(key)
119 }
120
121 pub fn to_value(&self) -> Value {
123 let mut obj = serde_json::Map::new();
124 if let Some(slot) = self.slot {
125 obj.insert("slot".to_string(), json!(slot));
126 }
127 if let Some(ref sig) = self.signature {
128 obj.insert("signature".to_string(), json!(sig));
129 }
130 obj.insert("timestamp".to_string(), json!(self.timestamp()));
132 for (key, value) in &self.metadata {
133 obj.insert(key.clone(), value.clone());
134 }
135 Value::Object(obj)
136 }
137}
138
139pub type Register = usize;
140pub type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>;
141
142pub type RegisterValue = Value;
143
144pub trait ComputedFieldsEvaluator {
147 fn evaluate(&self, state: &mut Value) -> Result<()>;
148}
149
150const MAX_PENDING_UPDATES_TOTAL: usize = 2_500;
152const MAX_PENDING_UPDATES_PER_PDA: usize = 50;
153const PENDING_UPDATE_TTL_SECONDS: i64 = 300; const TEMPORAL_HISTORY_TTL_SECONDS: i64 = 300; const MAX_TEMPORAL_ENTRIES_PER_KEY: usize = 250;
158
159const DEFAULT_MAX_STATE_TABLE_ENTRIES: usize = 2_500;
161const DEFAULT_MAX_ARRAY_LENGTH: usize = 100;
162
163const DEFAULT_MAX_LOOKUP_INDEX_ENTRIES: usize = 2_500;
164
165const DEFAULT_MAX_VERSION_TRACKER_ENTRIES: usize = 2_500;
166
167const DEFAULT_MAX_INSTRUCTION_DEDUP_ENTRIES: usize = 500;
170
171const DEFAULT_MAX_TEMPORAL_INDEX_KEYS: usize = 2_500;
172
173const DEFAULT_MAX_PDA_REVERSE_LOOKUP_ENTRIES: usize = 2_500;
174
175fn estimate_json_size(value: &Value) -> usize {
177 match value {
178 Value::Null => 4,
179 Value::Bool(_) => 5,
180 Value::Number(_) => 8,
181 Value::String(s) => s.len() + 2,
182 Value::Array(arr) => 2 + arr.iter().map(|v| estimate_json_size(v) + 1).sum::<usize>(),
183 Value::Object(obj) => {
184 2 + obj
185 .iter()
186 .map(|(k, v)| k.len() + 3 + estimate_json_size(v) + 1)
187 .sum::<usize>()
188 }
189 }
190}
191
192#[derive(Debug, Clone)]
193pub struct CompiledPath {
194 pub segments: std::sync::Arc<[String]>,
195}
196
197impl CompiledPath {
198 pub fn new(path: &str) -> Self {
199 let segments: Vec<String> = path.split('.').map(|s| s.to_string()).collect();
200 CompiledPath {
201 segments: segments.into(),
202 }
203 }
204
205 fn segments(&self) -> &[String] {
206 &self.segments
207 }
208}
209
210#[derive(Debug, Clone)]
213pub enum FieldChange {
214 Replaced,
216 Appended(Vec<Value>),
218}
219
220#[derive(Debug, Clone, Default)]
223pub struct DirtyTracker {
224 changes: HashMap<String, FieldChange>,
225}
226
227impl DirtyTracker {
228 pub fn new() -> Self {
230 Self {
231 changes: HashMap::new(),
232 }
233 }
234
235 pub fn mark_replaced(&mut self, path: &str) {
237 self.changes.insert(path.to_string(), FieldChange::Replaced);
239 }
240
241 pub fn mark_appended(&mut self, path: &str, value: Value) {
243 match self.changes.get_mut(path) {
244 Some(FieldChange::Appended(values)) => {
245 values.push(value);
247 }
248 Some(FieldChange::Replaced) => {
249 }
252 None => {
253 self.changes
255 .insert(path.to_string(), FieldChange::Appended(vec![value]));
256 }
257 }
258 }
259
260 pub fn is_empty(&self) -> bool {
262 self.changes.is_empty()
263 }
264
265 pub fn len(&self) -> usize {
267 self.changes.len()
268 }
269
270 pub fn iter(&self) -> impl Iterator<Item = (&String, &FieldChange)> {
272 self.changes.iter()
273 }
274
275 pub fn dirty_paths(&self) -> HashSet<String> {
277 self.changes.keys().cloned().collect()
278 }
279
280 pub fn into_changes(self) -> HashMap<String, FieldChange> {
282 self.changes
283 }
284
285 pub fn changes(&self) -> &HashMap<String, FieldChange> {
287 &self.changes
288 }
289
290 pub fn appended_paths(&self) -> Vec<String> {
292 self.changes
293 .iter()
294 .filter_map(|(path, change)| match change {
295 FieldChange::Appended(_) => Some(path.clone()),
296 FieldChange::Replaced => None,
297 })
298 .collect()
299 }
300}
301
302pub struct VmContext {
303 registers: Vec<RegisterValue>,
304 states: HashMap<u32, StateTable>,
305 pub instructions_executed: u64,
306 pub cache_hits: u64,
307 path_cache: HashMap<String, CompiledPath>,
308 pub pda_cache_hits: u64,
309 pub pda_cache_misses: u64,
310 pub pending_queue_size: u64,
311 current_context: Option<UpdateContext>,
312 warnings: Vec<String>,
313 last_pda_lookup_miss: Option<String>,
314 last_pda_registered: Option<String>,
315}
316
317#[derive(Debug)]
318pub struct LookupIndex {
319 index: std::sync::Mutex<LruCache<String, Value>>,
320}
321
322impl LookupIndex {
323 pub fn new() -> Self {
324 Self::with_capacity(DEFAULT_MAX_LOOKUP_INDEX_ENTRIES)
325 }
326
327 pub fn with_capacity(capacity: usize) -> Self {
328 LookupIndex {
329 index: std::sync::Mutex::new(LruCache::new(
330 NonZeroUsize::new(capacity).expect("capacity must be > 0"),
331 )),
332 }
333 }
334
335 pub fn lookup(&self, lookup_value: &Value) -> Option<Value> {
336 let key = value_to_cache_key(lookup_value);
337 self.index.lock().unwrap().get(&key).cloned()
338 }
339
340 pub fn insert(&self, lookup_value: Value, primary_key: Value) {
341 let key = value_to_cache_key(&lookup_value);
342 self.index.lock().unwrap().put(key, primary_key);
343 }
344
345 pub fn len(&self) -> usize {
346 self.index.lock().unwrap().len()
347 }
348
349 pub fn is_empty(&self) -> bool {
350 self.index.lock().unwrap().is_empty()
351 }
352}
353
354impl Default for LookupIndex {
355 fn default() -> Self {
356 Self::new()
357 }
358}
359
360fn value_to_cache_key(value: &Value) -> String {
361 match value {
362 Value::String(s) => s.clone(),
363 Value::Number(n) => n.to_string(),
364 Value::Bool(b) => b.to_string(),
365 Value::Null => "null".to_string(),
366 _ => serde_json::to_string(value).unwrap_or_else(|_| "unknown".to_string()),
367 }
368}
369
370#[derive(Debug)]
371pub struct TemporalIndex {
372 index: std::sync::Mutex<LruCache<String, Vec<(Value, i64)>>>,
373}
374
375impl Default for TemporalIndex {
376 fn default() -> Self {
377 Self::new()
378 }
379}
380
381impl TemporalIndex {
382 pub fn new() -> Self {
383 Self::with_capacity(DEFAULT_MAX_TEMPORAL_INDEX_KEYS)
384 }
385
386 pub fn with_capacity(capacity: usize) -> Self {
387 TemporalIndex {
388 index: std::sync::Mutex::new(LruCache::new(
389 NonZeroUsize::new(capacity).expect("capacity must be > 0"),
390 )),
391 }
392 }
393
394 pub fn lookup(&self, lookup_value: &Value, timestamp: i64) -> Option<Value> {
395 let key = value_to_cache_key(lookup_value);
396 let mut cache = self.index.lock().unwrap();
397 if let Some(entries) = cache.get(&key) {
398 for i in (0..entries.len()).rev() {
399 if entries[i].1 <= timestamp {
400 return Some(entries[i].0.clone());
401 }
402 }
403 }
404 None
405 }
406
407 pub fn lookup_latest(&self, lookup_value: &Value) -> Option<Value> {
408 let key = value_to_cache_key(lookup_value);
409 let mut cache = self.index.lock().unwrap();
410 if let Some(entries) = cache.get(&key) {
411 if let Some(last) = entries.last() {
412 return Some(last.0.clone());
413 }
414 }
415 None
416 }
417
418 pub fn insert(&self, lookup_value: Value, primary_key: Value, timestamp: i64) {
419 let key = value_to_cache_key(&lookup_value);
420 let mut cache = self.index.lock().unwrap();
421
422 let entries = cache.get_or_insert_mut(key, Vec::new);
423 entries.push((primary_key, timestamp));
424 entries.sort_by_key(|(_, ts)| *ts);
425
426 let cutoff = timestamp - TEMPORAL_HISTORY_TTL_SECONDS;
427 entries.retain(|(_, ts)| *ts >= cutoff);
428
429 if entries.len() > MAX_TEMPORAL_ENTRIES_PER_KEY {
430 let excess = entries.len() - MAX_TEMPORAL_ENTRIES_PER_KEY;
431 entries.drain(0..excess);
432 }
433 }
434
435 pub fn len(&self) -> usize {
436 self.index.lock().unwrap().len()
437 }
438
439 pub fn is_empty(&self) -> bool {
440 self.index.lock().unwrap().is_empty()
441 }
442
443 pub fn total_entries(&self) -> usize {
444 self.index
445 .lock()
446 .unwrap()
447 .iter()
448 .map(|(_, entries)| entries.len())
449 .sum()
450 }
451
452 pub fn cleanup_expired(&self, cutoff_timestamp: i64) -> usize {
453 let mut cache = self.index.lock().unwrap();
454 let mut total_removed = 0;
455
456 for (_, entries) in cache.iter_mut() {
457 let original_len = entries.len();
458 entries.retain(|(_, ts)| *ts >= cutoff_timestamp);
459 total_removed += original_len - entries.len();
460 }
461
462 total_removed
463 }
464}
465
466#[derive(Debug)]
467pub struct PdaReverseLookup {
468 index: LruCache<String, String>,
470}
471
472impl PdaReverseLookup {
473 pub fn new(capacity: usize) -> Self {
474 PdaReverseLookup {
475 index: LruCache::new(NonZeroUsize::new(capacity).unwrap()),
476 }
477 }
478
479 pub fn lookup(&mut self, pda_address: &str) -> Option<String> {
480 self.index.get(pda_address).cloned()
481 }
482
483 pub fn insert(&mut self, pda_address: String, seed_value: String) -> Option<String> {
484 let evicted = if self.index.len() >= self.index.cap().get() {
485 self.index.peek_lru().map(|(k, _)| k.clone())
486 } else {
487 None
488 };
489
490 self.index.put(pda_address, seed_value);
491 evicted
492 }
493
494 pub fn len(&self) -> usize {
495 self.index.len()
496 }
497
498 pub fn is_empty(&self) -> bool {
499 self.index.is_empty()
500 }
501}
502
503#[derive(Debug, Clone)]
505pub struct QueuedAccountUpdate {
506 pub pda_address: String,
507 pub account_type: String,
508 pub account_data: Value,
509 pub slot: u64,
510 pub write_version: u64,
511 pub signature: String,
512}
513
514#[derive(Debug, Clone)]
516pub struct PendingAccountUpdate {
517 pub account_type: String,
518 pub pda_address: String,
519 pub account_data: Value,
520 pub slot: u64,
521 pub write_version: u64,
522 pub signature: String,
523 pub queued_at: i64,
524}
525
526#[derive(Debug, Clone)]
528pub struct QueuedInstructionEvent {
529 pub pda_address: String,
530 pub event_type: String,
531 pub event_data: Value,
532 pub slot: u64,
533 pub signature: String,
534}
535
536#[derive(Debug, Clone)]
538pub struct PendingInstructionEvent {
539 pub event_type: String,
540 pub pda_address: String,
541 pub event_data: Value,
542 pub slot: u64,
543 pub signature: String,
544 pub queued_at: i64,
545}
546
547#[derive(Debug, Clone)]
548pub struct PendingQueueStats {
549 pub total_updates: usize,
550 pub unique_pdas: usize,
551 pub oldest_age_seconds: i64,
552 pub largest_pda_queue_size: usize,
553 pub estimated_memory_bytes: usize,
554}
555
556#[derive(Debug, Clone, Default)]
557pub struct VmMemoryStats {
558 pub state_table_entity_count: usize,
559 pub state_table_max_entries: usize,
560 pub state_table_at_capacity: bool,
561 pub lookup_index_count: usize,
562 pub lookup_index_total_entries: usize,
563 pub temporal_index_count: usize,
564 pub temporal_index_total_entries: usize,
565 pub pda_reverse_lookup_count: usize,
566 pub pda_reverse_lookup_total_entries: usize,
567 pub version_tracker_entries: usize,
568 pub pending_queue_stats: Option<PendingQueueStats>,
569 pub path_cache_size: usize,
570}
571
572#[derive(Debug, Clone, Default)]
573pub struct CleanupResult {
574 pub pending_updates_removed: usize,
575 pub temporal_entries_removed: usize,
576}
577
578#[derive(Debug, Clone)]
579pub struct CapacityWarning {
580 pub current_entries: usize,
581 pub max_entries: usize,
582 pub entries_over_limit: usize,
583}
584
585#[derive(Debug, Clone)]
586pub struct StateTableConfig {
587 pub max_entries: usize,
588 pub max_array_length: usize,
589}
590
591impl Default for StateTableConfig {
592 fn default() -> Self {
593 Self {
594 max_entries: DEFAULT_MAX_STATE_TABLE_ENTRIES,
595 max_array_length: DEFAULT_MAX_ARRAY_LENGTH,
596 }
597 }
598}
599
600#[derive(Debug)]
601pub struct VersionTracker {
602 cache: std::sync::Mutex<LruCache<String, (u64, u64)>>,
603}
604
605impl VersionTracker {
606 pub fn new() -> Self {
607 Self::with_capacity(DEFAULT_MAX_VERSION_TRACKER_ENTRIES)
608 }
609
610 pub fn with_capacity(capacity: usize) -> Self {
611 VersionTracker {
612 cache: std::sync::Mutex::new(LruCache::new(
613 NonZeroUsize::new(capacity).expect("capacity must be > 0"),
614 )),
615 }
616 }
617
618 fn make_key(primary_key: &Value, event_type: &str) -> String {
619 format!("{}:{}", primary_key, event_type)
620 }
621
622 pub fn get(&self, primary_key: &Value, event_type: &str) -> Option<(u64, u64)> {
623 let key = Self::make_key(primary_key, event_type);
624 self.cache.lock().unwrap().get(&key).copied()
625 }
626
627 pub fn insert(&self, primary_key: &Value, event_type: &str, slot: u64, ordering_value: u64) {
628 let key = Self::make_key(primary_key, event_type);
629 self.cache.lock().unwrap().put(key, (slot, ordering_value));
630 }
631
632 pub fn len(&self) -> usize {
633 self.cache.lock().unwrap().len()
634 }
635
636 pub fn is_empty(&self) -> bool {
637 self.cache.lock().unwrap().is_empty()
638 }
639}
640
641impl Default for VersionTracker {
642 fn default() -> Self {
643 Self::new()
644 }
645}
646
647#[derive(Debug)]
648pub struct StateTable {
649 pub data: DashMap<Value, Value>,
650 access_times: DashMap<Value, i64>,
651 pub lookup_indexes: HashMap<String, LookupIndex>,
652 pub temporal_indexes: HashMap<String, TemporalIndex>,
653 pub pda_reverse_lookups: HashMap<String, PdaReverseLookup>,
654 pub pending_updates: DashMap<String, Vec<PendingAccountUpdate>>,
655 pub pending_instruction_events: DashMap<String, Vec<PendingInstructionEvent>>,
656 version_tracker: VersionTracker,
657 instruction_dedup_cache: VersionTracker,
658 config: StateTableConfig,
659 #[cfg_attr(not(feature = "otel"), allow(dead_code))]
660 entity_name: String,
661}
662
663impl StateTable {
664 pub fn is_at_capacity(&self) -> bool {
665 self.data.len() >= self.config.max_entries
666 }
667
668 pub fn entries_over_limit(&self) -> usize {
669 self.data.len().saturating_sub(self.config.max_entries)
670 }
671
672 pub fn max_array_length(&self) -> usize {
673 self.config.max_array_length
674 }
675
676 fn touch(&self, key: &Value) {
677 let now = std::time::SystemTime::now()
678 .duration_since(std::time::UNIX_EPOCH)
679 .unwrap()
680 .as_secs() as i64;
681 self.access_times.insert(key.clone(), now);
682 }
683
684 fn evict_lru(&self, count: usize) -> usize {
685 if count == 0 || self.data.is_empty() {
686 return 0;
687 }
688
689 let mut entries: Vec<(Value, i64)> = self
690 .access_times
691 .iter()
692 .map(|entry| (entry.key().clone(), *entry.value()))
693 .collect();
694
695 entries.sort_by_key(|(_, ts)| *ts);
696
697 let to_evict: Vec<Value> = entries.iter().take(count).map(|(k, _)| k.clone()).collect();
698
699 let mut evicted = 0;
700 for key in to_evict {
701 self.data.remove(&key);
702 self.access_times.remove(&key);
703 evicted += 1;
704 }
705
706 #[cfg(feature = "otel")]
707 if evicted > 0 {
708 crate::vm_metrics::record_state_table_eviction(evicted as u64, &self.entity_name);
709 }
710
711 evicted
712 }
713
714 pub fn insert_with_eviction(&self, key: Value, value: Value) {
715 if self.data.len() >= self.config.max_entries && !self.data.contains_key(&key) {
716 #[cfg(feature = "otel")]
717 crate::vm_metrics::record_state_table_at_capacity(&self.entity_name);
718 let to_evict = (self.data.len() + 1).saturating_sub(self.config.max_entries);
719 self.evict_lru(to_evict.max(1));
720 }
721 self.data.insert(key.clone(), value);
722 self.touch(&key);
723 }
724
725 pub fn get_and_touch(&self, key: &Value) -> Option<Value> {
726 let result = self.data.get(key).map(|v| v.clone());
727 if result.is_some() {
728 self.touch(key);
729 }
730 result
731 }
732
733 pub fn is_fresh_update(
740 &self,
741 primary_key: &Value,
742 event_type: &str,
743 slot: u64,
744 ordering_value: u64,
745 ) -> bool {
746 let dominated = self
747 .version_tracker
748 .get(primary_key, event_type)
749 .map(|(last_slot, last_version)| (slot, ordering_value) <= (last_slot, last_version))
750 .unwrap_or(false);
751
752 if dominated {
753 return false;
754 }
755
756 self.version_tracker
757 .insert(primary_key, event_type, slot, ordering_value);
758 true
759 }
760
761 pub fn is_duplicate_instruction(
769 &self,
770 primary_key: &Value,
771 event_type: &str,
772 slot: u64,
773 txn_index: u64,
774 ) -> bool {
775 let is_duplicate = self
777 .instruction_dedup_cache
778 .get(primary_key, event_type)
779 .map(|(last_slot, last_txn_index)| slot == last_slot && txn_index == last_txn_index)
780 .unwrap_or(false);
781
782 if is_duplicate {
783 return true;
784 }
785
786 self.instruction_dedup_cache
788 .insert(primary_key, event_type, slot, txn_index);
789 false
790 }
791}
792
793impl VmContext {
794 pub fn new() -> Self {
795 let mut vm = VmContext {
796 registers: vec![Value::Null; 256],
797 states: HashMap::new(),
798 instructions_executed: 0,
799 cache_hits: 0,
800 path_cache: HashMap::new(),
801 pda_cache_hits: 0,
802 pda_cache_misses: 0,
803 pending_queue_size: 0,
804 current_context: None,
805 warnings: Vec::new(),
806 last_pda_lookup_miss: None,
807 last_pda_registered: None,
808 };
809 vm.states.insert(
810 0,
811 StateTable {
812 data: DashMap::new(),
813 access_times: DashMap::new(),
814 lookup_indexes: HashMap::new(),
815 temporal_indexes: HashMap::new(),
816 pda_reverse_lookups: HashMap::new(),
817 pending_updates: DashMap::new(),
818 pending_instruction_events: DashMap::new(),
819 version_tracker: VersionTracker::new(),
820 instruction_dedup_cache: VersionTracker::with_capacity(
821 DEFAULT_MAX_INSTRUCTION_DEDUP_ENTRIES,
822 ),
823 config: StateTableConfig::default(),
824 entity_name: "default".to_string(),
825 },
826 );
827 vm
828 }
829
830 pub fn new_with_config(state_config: StateTableConfig) -> Self {
831 let mut vm = VmContext {
832 registers: vec![Value::Null; 256],
833 states: HashMap::new(),
834 instructions_executed: 0,
835 cache_hits: 0,
836 path_cache: HashMap::new(),
837 pda_cache_hits: 0,
838 pda_cache_misses: 0,
839 pending_queue_size: 0,
840 current_context: None,
841 warnings: Vec::new(),
842 last_pda_lookup_miss: None,
843 last_pda_registered: None,
844 };
845 vm.states.insert(
846 0,
847 StateTable {
848 data: DashMap::new(),
849 access_times: DashMap::new(),
850 lookup_indexes: HashMap::new(),
851 temporal_indexes: HashMap::new(),
852 pda_reverse_lookups: HashMap::new(),
853 pending_updates: DashMap::new(),
854 pending_instruction_events: DashMap::new(),
855 version_tracker: VersionTracker::new(),
856 instruction_dedup_cache: VersionTracker::with_capacity(
857 DEFAULT_MAX_INSTRUCTION_DEDUP_ENTRIES,
858 ),
859 config: state_config,
860 entity_name: "default".to_string(),
861 },
862 );
863 vm
864 }
865
866 pub fn get_state_table_mut(&mut self, state_id: u32) -> Option<&mut StateTable> {
869 self.states.get_mut(&state_id)
870 }
871
872 pub fn registers_mut(&mut self) -> &mut Vec<RegisterValue> {
874 &mut self.registers
875 }
876
877 pub fn path_cache(&self) -> &HashMap<String, CompiledPath> {
879 &self.path_cache
880 }
881
882 pub fn current_context(&self) -> Option<&UpdateContext> {
884 self.current_context.as_ref()
885 }
886
887 fn add_warning(&mut self, msg: String) {
888 self.warnings.push(msg);
889 }
890
891 pub fn take_warnings(&mut self) -> Vec<String> {
892 std::mem::take(&mut self.warnings)
893 }
894
895 pub fn has_warnings(&self) -> bool {
896 !self.warnings.is_empty()
897 }
898
899 pub fn update_state_from_register(
900 &mut self,
901 state_id: u32,
902 key: Value,
903 register: Register,
904 ) -> Result<()> {
905 let state = self.states.get(&state_id).ok_or("State table not found")?;
906 let value = self.registers[register].clone();
907 state.insert_with_eviction(key, value);
908 Ok(())
909 }
910
911 fn reset_registers(&mut self) {
912 for reg in &mut self.registers {
913 *reg = Value::Null;
914 }
915 }
916
917 pub fn extract_partial_state(
919 &self,
920 state_reg: Register,
921 dirty_fields: &HashSet<String>,
922 ) -> Result<Value> {
923 let full_state = &self.registers[state_reg];
924
925 if dirty_fields.is_empty() {
926 return Ok(json!({}));
927 }
928
929 let mut partial = serde_json::Map::new();
930
931 for path in dirty_fields {
932 let segments: Vec<&str> = path.split('.').collect();
933
934 let mut current = full_state;
935 let mut found = true;
936
937 for segment in &segments {
938 match current.get(segment) {
939 Some(v) => current = v,
940 None => {
941 found = false;
942 break;
943 }
944 }
945 }
946
947 if !found {
948 continue;
949 }
950
951 let mut target = &mut partial;
952 for (i, segment) in segments.iter().enumerate() {
953 if i == segments.len() - 1 {
954 target.insert(segment.to_string(), current.clone());
955 } else {
956 target
957 .entry(segment.to_string())
958 .or_insert_with(|| json!({}));
959 target = target
960 .get_mut(*segment)
961 .and_then(|v| v.as_object_mut())
962 .ok_or("Failed to build nested structure")?;
963 }
964 }
965 }
966
967 Ok(Value::Object(partial))
968 }
969
970 pub fn extract_partial_state_with_tracker(
974 &self,
975 state_reg: Register,
976 tracker: &DirtyTracker,
977 ) -> Result<Value> {
978 let full_state = &self.registers[state_reg];
979
980 if tracker.is_empty() {
981 return Ok(json!({}));
982 }
983
984 let mut partial = serde_json::Map::new();
985
986 for (path, change) in tracker.iter() {
987 let segments: Vec<&str> = path.split('.').collect();
988
989 let value_to_insert = match change {
990 FieldChange::Replaced => {
991 let mut current = full_state;
992 let mut found = true;
993
994 for segment in &segments {
995 match current.get(*segment) {
996 Some(v) => current = v,
997 None => {
998 found = false;
999 break;
1000 }
1001 }
1002 }
1003
1004 if !found {
1005 continue;
1006 }
1007 current.clone()
1008 }
1009 FieldChange::Appended(values) => Value::Array(values.clone()),
1010 };
1011
1012 let mut target = &mut partial;
1013 for (i, segment) in segments.iter().enumerate() {
1014 if i == segments.len() - 1 {
1015 target.insert(segment.to_string(), value_to_insert.clone());
1016 } else {
1017 target
1018 .entry(segment.to_string())
1019 .or_insert_with(|| json!({}));
1020 target = target
1021 .get_mut(*segment)
1022 .and_then(|v| v.as_object_mut())
1023 .ok_or("Failed to build nested structure")?;
1024 }
1025 }
1026 }
1027
1028 Ok(Value::Object(partial))
1029 }
1030
1031 fn get_compiled_path(&mut self, path: &str) -> CompiledPath {
1032 if let Some(compiled) = self.path_cache.get(path) {
1033 self.cache_hits += 1;
1034 #[cfg(feature = "otel")]
1035 crate::vm_metrics::record_path_cache_hit();
1036 return compiled.clone();
1037 }
1038 #[cfg(feature = "otel")]
1039 crate::vm_metrics::record_path_cache_miss();
1040 let compiled = CompiledPath::new(path);
1041 self.path_cache.insert(path.to_string(), compiled.clone());
1042 compiled
1043 }
1044
1045 #[cfg_attr(feature = "otel", instrument(
1047 name = "vm.process_event",
1048 skip(self, bytecode, event_value, log),
1049 level = "info",
1050 fields(
1051 event_type = %event_type,
1052 slot = context.as_ref().and_then(|c| c.slot),
1053 )
1054 ))]
1055 pub fn process_event(
1056 &mut self,
1057 bytecode: &MultiEntityBytecode,
1058 event_value: Value,
1059 event_type: &str,
1060 context: Option<&UpdateContext>,
1061 mut log: Option<&mut crate::canonical_log::CanonicalLog>,
1062 ) -> Result<Vec<Mutation>> {
1063 self.current_context = context.cloned();
1064
1065 let mut event_value = event_value;
1066 if let Some(ctx) = context {
1067 if let Some(obj) = event_value.as_object_mut() {
1068 obj.insert("__update_context".to_string(), ctx.to_value());
1069 }
1070 }
1071
1072 let mut all_mutations = Vec::new();
1073
1074 if let Some(entity_names) = bytecode.event_routing.get(event_type) {
1075 for entity_name in entity_names {
1076 if let Some(entity_bytecode) = bytecode.entities.get(entity_name) {
1077 if let Some(handler) = entity_bytecode.handlers.get(event_type) {
1078 if let Some(ref mut log) = log {
1079 log.set("entity", entity_name.clone());
1080 log.inc("handlers", 1);
1081 }
1082
1083 let opcodes_before = self.instructions_executed;
1084 let cache_before = self.cache_hits;
1085 let pda_hits_before = self.pda_cache_hits;
1086 let pda_misses_before = self.pda_cache_misses;
1087
1088 let mutations = self.execute_handler(
1089 handler,
1090 &event_value,
1091 event_type,
1092 entity_bytecode.state_id,
1093 entity_name,
1094 entity_bytecode.computed_fields_evaluator.as_ref(),
1095 )?;
1096
1097 if let Some(ref mut log) = log {
1098 log.inc(
1099 "opcodes",
1100 (self.instructions_executed - opcodes_before) as i64,
1101 );
1102 log.inc("cache_hits", (self.cache_hits - cache_before) as i64);
1103 log.inc("pda_hits", (self.pda_cache_hits - pda_hits_before) as i64);
1104 log.inc(
1105 "pda_misses",
1106 (self.pda_cache_misses - pda_misses_before) as i64,
1107 );
1108 }
1109
1110 if mutations.is_empty() {
1111 if let Some(missed_pda) = self.take_last_pda_lookup_miss() {
1112 if event_type.ends_with("IxState") {
1113 let slot = context.and_then(|c| c.slot).unwrap_or(0);
1114 let signature = context
1115 .and_then(|c| c.signature.clone())
1116 .unwrap_or_default();
1117 let _ = self.queue_instruction_event(
1118 entity_bytecode.state_id,
1119 QueuedInstructionEvent {
1120 pda_address: missed_pda,
1121 event_type: event_type.to_string(),
1122 event_data: event_value.clone(),
1123 slot,
1124 signature,
1125 },
1126 );
1127 }
1128 }
1129 }
1130
1131 all_mutations.extend(mutations);
1132
1133 if let Some(registered_pda) = self.take_last_pda_registered() {
1134 let pending_events = self.flush_pending_instruction_events(
1135 entity_bytecode.state_id,
1136 ®istered_pda,
1137 );
1138 for pending in pending_events {
1139 if let Some(pending_handler) =
1140 entity_bytecode.handlers.get(&pending.event_type)
1141 {
1142 if let Ok(reprocessed_mutations) = self.execute_handler(
1143 pending_handler,
1144 &pending.event_data,
1145 &pending.event_type,
1146 entity_bytecode.state_id,
1147 entity_name,
1148 entity_bytecode.computed_fields_evaluator.as_ref(),
1149 ) {
1150 all_mutations.extend(reprocessed_mutations);
1151 }
1152 }
1153 }
1154 }
1155 } else if let Some(ref mut log) = log {
1156 log.set("skip_reason", "no_handler");
1157 }
1158 } else if let Some(ref mut log) = log {
1159 log.set("skip_reason", "entity_not_found");
1160 }
1161 }
1162 } else if let Some(ref mut log) = log {
1163 log.set("skip_reason", "no_event_routing");
1164 }
1165
1166 if let Some(log) = log {
1167 log.set("mutations", all_mutations.len() as i64);
1168 if let Some(first) = all_mutations.first() {
1169 if let Some(key_str) = first.key.as_str() {
1170 log.set("primary_key", key_str);
1171 } else if let Some(key_num) = first.key.as_u64() {
1172 log.set("primary_key", key_num as i64);
1173 }
1174 }
1175 if let Some(state) = self.states.get(&0) {
1176 log.set("state_table_size", state.data.len() as i64);
1177 }
1178
1179 let warnings = self.take_warnings();
1180 if !warnings.is_empty() {
1181 log.set("warnings", warnings.len() as i64);
1182 log.set(
1183 "warning_messages",
1184 Value::Array(warnings.into_iter().map(Value::String).collect()),
1185 );
1186 log.set_level(crate::canonical_log::LogLevel::Warn);
1187 }
1188 } else {
1189 self.warnings.clear();
1190 }
1191
1192 Ok(all_mutations)
1193 }
1194
1195 pub fn process_any(
1196 &mut self,
1197 bytecode: &MultiEntityBytecode,
1198 any: prost_types::Any,
1199 ) -> Result<Vec<Mutation>> {
1200 let (event_value, event_type) = bytecode.proto_router.decode(any)?;
1201 self.process_event(bytecode, event_value, &event_type, None, None)
1202 }
1203
1204 #[cfg_attr(feature = "otel", instrument(
1205 name = "vm.execute_handler",
1206 skip(self, handler, event_value, entity_evaluator),
1207 level = "debug",
1208 fields(
1209 event_type = %event_type,
1210 handler_opcodes = handler.len(),
1211 )
1212 ))]
1213 #[allow(clippy::type_complexity)]
1214 fn execute_handler(
1215 &mut self,
1216 handler: &[OpCode],
1217 event_value: &Value,
1218 event_type: &str,
1219 override_state_id: u32,
1220 entity_name: &str,
1221 entity_evaluator: Option<&Box<dyn Fn(&mut Value) -> Result<()> + Send + Sync>>,
1222 ) -> Result<Vec<Mutation>> {
1223 self.reset_registers();
1224 self.last_pda_lookup_miss = None;
1225
1226 let mut pc: usize = 0;
1227 let mut output = Vec::new();
1228 let mut dirty_tracker = DirtyTracker::new();
1229
1230 while pc < handler.len() {
1231 match &handler[pc] {
1232 OpCode::LoadEventField {
1233 path,
1234 dest,
1235 default,
1236 } => {
1237 let value = self.load_field(event_value, path, default.as_ref())?;
1238 self.registers[*dest] = value;
1239 pc += 1;
1240 }
1241 OpCode::LoadConstant { value, dest } => {
1242 self.registers[*dest] = value.clone();
1243 pc += 1;
1244 }
1245 OpCode::CopyRegister { source, dest } => {
1246 self.registers[*dest] = self.registers[*source].clone();
1247 pc += 1;
1248 }
1249 OpCode::CopyRegisterIfNull { source, dest } => {
1250 if self.registers[*dest].is_null() {
1251 self.registers[*dest] = self.registers[*source].clone();
1252 }
1253 pc += 1;
1254 }
1255 OpCode::GetEventType { dest } => {
1256 self.registers[*dest] = json!(event_type);
1257 pc += 1;
1258 }
1259 OpCode::CreateObject { dest } => {
1260 self.registers[*dest] = json!({});
1261 pc += 1;
1262 }
1263 OpCode::SetField {
1264 object,
1265 path,
1266 value,
1267 } => {
1268 self.set_field_auto_vivify(*object, path, *value)?;
1269 dirty_tracker.mark_replaced(path);
1270 pc += 1;
1271 }
1272 OpCode::SetFields { object, fields } => {
1273 for (path, value_reg) in fields {
1274 self.set_field_auto_vivify(*object, path, *value_reg)?;
1275 dirty_tracker.mark_replaced(path);
1276 }
1277 pc += 1;
1278 }
1279 OpCode::GetField { object, path, dest } => {
1280 let value = self.get_field(*object, path)?;
1281 self.registers[*dest] = value;
1282 pc += 1;
1283 }
1284 OpCode::ReadOrInitState {
1285 state_id: _,
1286 key,
1287 default,
1288 dest,
1289 } => {
1290 let actual_state_id = override_state_id;
1291 let entity_name_owned = entity_name.to_string();
1292 self.states
1293 .entry(actual_state_id)
1294 .or_insert_with(|| StateTable {
1295 data: DashMap::new(),
1296 access_times: DashMap::new(),
1297 lookup_indexes: HashMap::new(),
1298 temporal_indexes: HashMap::new(),
1299 pda_reverse_lookups: HashMap::new(),
1300 pending_updates: DashMap::new(),
1301 pending_instruction_events: DashMap::new(),
1302 version_tracker: VersionTracker::new(),
1303 instruction_dedup_cache: VersionTracker::with_capacity(
1304 DEFAULT_MAX_INSTRUCTION_DEDUP_ENTRIES,
1305 ),
1306 config: StateTableConfig::default(),
1307 entity_name: entity_name_owned,
1308 });
1309 let key_value = self.registers[*key].clone();
1310 let warn_null_key = key_value.is_null()
1311 && event_type.ends_with("State")
1312 && !event_type.ends_with("IxState");
1313
1314 if warn_null_key {
1315 self.add_warning(format!(
1316 "ReadOrInitState: key register {} is NULL for account state, event_type={}",
1317 key, event_type
1318 ));
1319 }
1320
1321 let state = self
1322 .states
1323 .get(&actual_state_id)
1324 .ok_or("State table not found")?;
1325
1326 if !key_value.is_null() {
1327 if let Some(ctx) = &self.current_context {
1328 if ctx.is_account_update() {
1330 if let (Some(slot), Some(write_version)) =
1331 (ctx.slot, ctx.write_version)
1332 {
1333 if !state.is_fresh_update(
1334 &key_value,
1335 event_type,
1336 slot,
1337 write_version,
1338 ) {
1339 self.add_warning(format!(
1340 "Stale account update skipped: slot={}, write_version={}",
1341 slot, write_version
1342 ));
1343 return Ok(Vec::new());
1344 }
1345 }
1346 }
1347 else if ctx.is_instruction_update() {
1349 if let (Some(slot), Some(txn_index)) = (ctx.slot, ctx.txn_index) {
1350 if state.is_duplicate_instruction(
1351 &key_value, event_type, slot, txn_index,
1352 ) {
1353 self.add_warning(format!(
1354 "Duplicate instruction skipped: slot={}, txn_index={}",
1355 slot, txn_index
1356 ));
1357 return Ok(Vec::new());
1358 }
1359 }
1360 }
1361 }
1362 }
1363 let value = state
1364 .get_and_touch(&key_value)
1365 .unwrap_or_else(|| default.clone());
1366
1367 self.registers[*dest] = value;
1368 pc += 1;
1369 }
1370 OpCode::UpdateState {
1371 state_id: _,
1372 key,
1373 value,
1374 } => {
1375 let actual_state_id = override_state_id;
1376 let state = self
1377 .states
1378 .get(&actual_state_id)
1379 .ok_or("State table not found")?;
1380 let key_value = self.registers[*key].clone();
1381 let value_data = self.registers[*value].clone();
1382
1383 state.insert_with_eviction(key_value, value_data);
1384 pc += 1;
1385 }
1386 OpCode::AppendToArray {
1387 object,
1388 path,
1389 value,
1390 } => {
1391 let appended_value = self.registers[*value].clone();
1392 let max_len = self
1393 .states
1394 .get(&override_state_id)
1395 .map(|s| s.max_array_length())
1396 .unwrap_or(DEFAULT_MAX_ARRAY_LENGTH);
1397 self.append_to_array(*object, path, *value, max_len)?;
1398 dirty_tracker.mark_appended(path, appended_value);
1399 pc += 1;
1400 }
1401 OpCode::GetCurrentTimestamp { dest } => {
1402 let timestamp = std::time::SystemTime::now()
1403 .duration_since(std::time::UNIX_EPOCH)
1404 .unwrap()
1405 .as_secs() as i64;
1406 self.registers[*dest] = json!(timestamp);
1407 pc += 1;
1408 }
1409 OpCode::CreateEvent { dest, event_value } => {
1410 let timestamp = std::time::SystemTime::now()
1411 .duration_since(std::time::UNIX_EPOCH)
1412 .unwrap()
1413 .as_secs() as i64;
1414
1415 let mut event_data = self.registers[*event_value].clone();
1417 if let Some(obj) = event_data.as_object_mut() {
1418 obj.remove("__update_context");
1419 }
1420
1421 let mut event = serde_json::Map::new();
1423 event.insert("timestamp".to_string(), json!(timestamp));
1424 event.insert("data".to_string(), event_data);
1425
1426 if let Some(ref ctx) = self.current_context {
1428 if let Some(slot) = ctx.slot {
1429 event.insert("slot".to_string(), json!(slot));
1430 }
1431 if let Some(ref signature) = ctx.signature {
1432 event.insert("signature".to_string(), json!(signature));
1433 }
1434 }
1435
1436 self.registers[*dest] = Value::Object(event);
1437 pc += 1;
1438 }
1439 OpCode::CreateCapture {
1440 dest,
1441 capture_value,
1442 } => {
1443 let timestamp = std::time::SystemTime::now()
1444 .duration_since(std::time::UNIX_EPOCH)
1445 .unwrap()
1446 .as_secs() as i64;
1447
1448 let capture_data = self.registers[*capture_value].clone();
1450
1451 let account_address = event_value
1453 .get("__account_address")
1454 .and_then(|v| v.as_str())
1455 .unwrap_or("")
1456 .to_string();
1457
1458 let mut capture = serde_json::Map::new();
1460 capture.insert("timestamp".to_string(), json!(timestamp));
1461 capture.insert("account_address".to_string(), json!(account_address));
1462 capture.insert("data".to_string(), capture_data);
1463
1464 if let Some(ref ctx) = self.current_context {
1466 if let Some(slot) = ctx.slot {
1467 capture.insert("slot".to_string(), json!(slot));
1468 }
1469 if let Some(ref signature) = ctx.signature {
1470 capture.insert("signature".to_string(), json!(signature));
1471 }
1472 }
1473
1474 self.registers[*dest] = Value::Object(capture);
1475 pc += 1;
1476 }
1477 OpCode::Transform {
1478 source,
1479 dest,
1480 transformation,
1481 } => {
1482 if source == dest {
1483 self.transform_in_place(*source, transformation)?;
1484 } else {
1485 let source_value = &self.registers[*source];
1486 let value = self.apply_transformation(source_value, transformation)?;
1487 self.registers[*dest] = value;
1488 }
1489 pc += 1;
1490 }
1491 OpCode::EmitMutation {
1492 entity_name,
1493 key,
1494 state,
1495 } => {
1496 let primary_key = self.registers[*key].clone();
1497
1498 if primary_key.is_null() || dirty_tracker.is_empty() {
1499 let reason = if dirty_tracker.is_empty() {
1500 "no_fields_modified"
1501 } else {
1502 "null_primary_key"
1503 };
1504 self.add_warning(format!(
1505 "Skipping mutation for entity '{}': {} (dirty_fields={})",
1506 entity_name,
1507 reason,
1508 dirty_tracker.len()
1509 ));
1510 } else {
1511 let patch =
1512 self.extract_partial_state_with_tracker(*state, &dirty_tracker)?;
1513
1514 let append = dirty_tracker.appended_paths();
1515 let mutation = Mutation {
1516 export: entity_name.clone(),
1517 key: primary_key,
1518 patch,
1519 append,
1520 };
1521 output.push(mutation);
1522 }
1523 pc += 1;
1524 }
1525 OpCode::SetFieldIfNull {
1526 object,
1527 path,
1528 value,
1529 } => {
1530 let was_set = self.set_field_if_null(*object, path, *value)?;
1531 if was_set {
1532 dirty_tracker.mark_replaced(path);
1533 }
1534 pc += 1;
1535 }
1536 OpCode::SetFieldMax {
1537 object,
1538 path,
1539 value,
1540 } => {
1541 let was_updated = self.set_field_max(*object, path, *value)?;
1542 if was_updated {
1543 dirty_tracker.mark_replaced(path);
1544 }
1545 pc += 1;
1546 }
1547 OpCode::UpdateTemporalIndex {
1548 state_id: _,
1549 index_name,
1550 lookup_value,
1551 primary_key,
1552 timestamp,
1553 } => {
1554 let actual_state_id = override_state_id;
1555 let state = self
1556 .states
1557 .get_mut(&actual_state_id)
1558 .ok_or("State table not found")?;
1559 let index = state
1560 .temporal_indexes
1561 .entry(index_name.clone())
1562 .or_insert_with(TemporalIndex::new);
1563
1564 let lookup_val = self.registers[*lookup_value].clone();
1565 let pk_val = self.registers[*primary_key].clone();
1566 let ts_val = if let Some(val) = self.registers[*timestamp].as_i64() {
1567 val
1568 } else if let Some(val) = self.registers[*timestamp].as_u64() {
1569 val as i64
1570 } else {
1571 return Err(format!(
1572 "Timestamp must be a number (i64 or u64), got: {:?}",
1573 self.registers[*timestamp]
1574 )
1575 .into());
1576 };
1577
1578 index.insert(lookup_val, pk_val, ts_val);
1579 pc += 1;
1580 }
1581 OpCode::LookupTemporalIndex {
1582 state_id: _,
1583 index_name,
1584 lookup_value,
1585 timestamp,
1586 dest,
1587 } => {
1588 let actual_state_id = override_state_id;
1589 let state = self
1590 .states
1591 .get(&actual_state_id)
1592 .ok_or("State table not found")?;
1593 let lookup_val = &self.registers[*lookup_value];
1594
1595 let result = if self.registers[*timestamp].is_null() {
1596 if let Some(index) = state.temporal_indexes.get(index_name) {
1597 index.lookup_latest(lookup_val).unwrap_or(Value::Null)
1598 } else {
1599 Value::Null
1600 }
1601 } else {
1602 let ts_val = if let Some(val) = self.registers[*timestamp].as_i64() {
1603 val
1604 } else if let Some(val) = self.registers[*timestamp].as_u64() {
1605 val as i64
1606 } else {
1607 return Err(format!(
1608 "Timestamp must be a number (i64 or u64), got: {:?}",
1609 self.registers[*timestamp]
1610 )
1611 .into());
1612 };
1613
1614 if let Some(index) = state.temporal_indexes.get(index_name) {
1615 index.lookup(lookup_val, ts_val).unwrap_or(Value::Null)
1616 } else {
1617 Value::Null
1618 }
1619 };
1620
1621 self.registers[*dest] = result;
1622 pc += 1;
1623 }
1624 OpCode::UpdateLookupIndex {
1625 state_id: _,
1626 index_name,
1627 lookup_value,
1628 primary_key,
1629 } => {
1630 let actual_state_id = override_state_id;
1631 let state = self
1632 .states
1633 .get_mut(&actual_state_id)
1634 .ok_or("State table not found")?;
1635 let index = state
1636 .lookup_indexes
1637 .entry(index_name.clone())
1638 .or_insert_with(LookupIndex::new);
1639
1640 let lookup_val = self.registers[*lookup_value].clone();
1641 let pk_val = self.registers[*primary_key].clone();
1642
1643 index.insert(lookup_val, pk_val);
1644 pc += 1;
1645 }
1646 OpCode::LookupIndex {
1647 state_id: _,
1648 index_name,
1649 lookup_value,
1650 dest,
1651 } => {
1652 let actual_state_id = override_state_id;
1653 let lookup_val = self.registers[*lookup_value].clone();
1654
1655 let result = {
1656 let state = self
1657 .states
1658 .get(&actual_state_id)
1659 .ok_or("State table not found")?;
1660
1661 if let Some(index) = state.lookup_indexes.get(index_name) {
1662 let found = index.lookup(&lookup_val).unwrap_or(Value::Null);
1663 #[cfg(feature = "otel")]
1664 if found.is_null() {
1665 crate::vm_metrics::record_lookup_index_miss(index_name);
1666 } else {
1667 crate::vm_metrics::record_lookup_index_hit(index_name);
1668 }
1669 found
1670 } else {
1671 Value::Null
1672 }
1673 };
1674
1675 let final_result = if result.is_null() {
1676 if let Some(pda_str) = lookup_val.as_str() {
1677 let state = self
1678 .states
1679 .get_mut(&actual_state_id)
1680 .ok_or("State table not found")?;
1681
1682 if let Some(pda_lookup) =
1683 state.pda_reverse_lookups.get_mut("default_pda_lookup")
1684 {
1685 if let Some(resolved) = pda_lookup.lookup(pda_str) {
1686 Value::String(resolved)
1687 } else {
1688 self.last_pda_lookup_miss = Some(pda_str.to_string());
1689 Value::Null
1690 }
1691 } else {
1692 self.last_pda_lookup_miss = Some(pda_str.to_string());
1693 Value::Null
1694 }
1695 } else {
1696 Value::Null
1697 }
1698 } else {
1699 result
1700 };
1701
1702 self.registers[*dest] = final_result;
1703 pc += 1;
1704 }
1705 OpCode::SetFieldSum {
1706 object,
1707 path,
1708 value,
1709 } => {
1710 let was_updated = self.set_field_sum(*object, path, *value)?;
1711 if was_updated {
1712 dirty_tracker.mark_replaced(path);
1713 }
1714 pc += 1;
1715 }
1716 OpCode::SetFieldIncrement { object, path } => {
1717 let was_updated = self.set_field_increment(*object, path)?;
1718 if was_updated {
1719 dirty_tracker.mark_replaced(path);
1720 }
1721 pc += 1;
1722 }
1723 OpCode::SetFieldMin {
1724 object,
1725 path,
1726 value,
1727 } => {
1728 let was_updated = self.set_field_min(*object, path, *value)?;
1729 if was_updated {
1730 dirty_tracker.mark_replaced(path);
1731 }
1732 pc += 1;
1733 }
1734 OpCode::AddToUniqueSet {
1735 state_id: _,
1736 set_name,
1737 value,
1738 count_object,
1739 count_path,
1740 } => {
1741 let value_to_add = self.registers[*value].clone();
1742
1743 let set_field_path = format!("__unique_set:{}", set_name);
1746
1747 let mut set: HashSet<Value> =
1749 if let Ok(existing) = self.get_field(*count_object, &set_field_path) {
1750 if !existing.is_null() {
1751 serde_json::from_value(existing).unwrap_or_default()
1752 } else {
1753 HashSet::new()
1754 }
1755 } else {
1756 HashSet::new()
1757 };
1758
1759 let was_new = set.insert(value_to_add);
1761
1762 let set_as_vec: Vec<Value> = set.iter().cloned().collect();
1764 self.registers[100] = serde_json::to_value(set_as_vec)?;
1765 self.set_field_auto_vivify(*count_object, &set_field_path, 100)?;
1766
1767 if was_new {
1769 self.registers[100] = Value::Number(serde_json::Number::from(set.len()));
1770 self.set_field_auto_vivify(*count_object, count_path, 100)?;
1771 dirty_tracker.mark_replaced(count_path);
1772 }
1773
1774 pc += 1;
1775 }
1776 OpCode::ConditionalSetField {
1777 object,
1778 path,
1779 value,
1780 condition_field,
1781 condition_op,
1782 condition_value,
1783 } => {
1784 let field_value = self.load_field(event_value, condition_field, None)?;
1785 let condition_met =
1786 self.evaluate_comparison(&field_value, condition_op, condition_value)?;
1787
1788 if condition_met {
1789 self.set_field_auto_vivify(*object, path, *value)?;
1790 dirty_tracker.mark_replaced(path);
1791 }
1792 pc += 1;
1793 }
1794 OpCode::ConditionalIncrement {
1795 object,
1796 path,
1797 condition_field,
1798 condition_op,
1799 condition_value,
1800 } => {
1801 let field_value = self.load_field(event_value, condition_field, None)?;
1802 let condition_met =
1803 self.evaluate_comparison(&field_value, condition_op, condition_value)?;
1804
1805 if condition_met {
1806 let was_updated = self.set_field_increment(*object, path)?;
1807 if was_updated {
1808 dirty_tracker.mark_replaced(path);
1809 }
1810 }
1811 pc += 1;
1812 }
1813 OpCode::EvaluateComputedFields {
1814 state,
1815 computed_paths,
1816 } => {
1817 if let Some(evaluator) = entity_evaluator {
1818 let old_values: Vec<_> = computed_paths
1819 .iter()
1820 .map(|path| Self::get_value_at_path(&self.registers[*state], path))
1821 .collect();
1822
1823 let state_value = &mut self.registers[*state];
1824 let eval_result = evaluator(state_value);
1825
1826 if eval_result.is_ok() {
1827 for (path, old_value) in computed_paths.iter().zip(old_values.iter()) {
1828 let new_value =
1829 Self::get_value_at_path(&self.registers[*state], path);
1830
1831 if new_value != *old_value {
1832 dirty_tracker.mark_replaced(path);
1833 }
1834 }
1835 }
1836 }
1837 pc += 1;
1838 }
1839 OpCode::UpdatePdaReverseLookup {
1840 state_id: _,
1841 lookup_name,
1842 pda_address,
1843 primary_key,
1844 } => {
1845 let actual_state_id = override_state_id;
1846 let state = self
1847 .states
1848 .get_mut(&actual_state_id)
1849 .ok_or("State table not found")?;
1850
1851 let pda_val = self.registers[*pda_address].clone();
1852 let pk_val = self.registers[*primary_key].clone();
1853
1854 if let (Some(pda_str), Some(pk_str)) = (pda_val.as_str(), pk_val.as_str()) {
1855 let pda_lookup = state
1856 .pda_reverse_lookups
1857 .entry(lookup_name.clone())
1858 .or_insert_with(|| {
1859 PdaReverseLookup::new(DEFAULT_MAX_PDA_REVERSE_LOOKUP_ENTRIES)
1860 });
1861
1862 pda_lookup.insert(pda_str.to_string(), pk_str.to_string());
1863 self.last_pda_registered = Some(pda_str.to_string());
1864 } else if !pk_val.is_null() {
1865 if let Some(pk_num) = pk_val.as_u64() {
1866 if let Some(pda_str) = pda_val.as_str() {
1867 let pda_lookup = state
1868 .pda_reverse_lookups
1869 .entry(lookup_name.clone())
1870 .or_insert_with(|| {
1871 PdaReverseLookup::new(
1872 DEFAULT_MAX_PDA_REVERSE_LOOKUP_ENTRIES,
1873 )
1874 });
1875
1876 pda_lookup.insert(pda_str.to_string(), pk_num.to_string());
1877 self.last_pda_registered = Some(pda_str.to_string());
1878 }
1879 }
1880 }
1881
1882 pc += 1;
1883 }
1884 }
1885
1886 self.instructions_executed += 1;
1887 }
1888
1889 Ok(output)
1890 }
1891
1892 fn load_field(
1893 &self,
1894 event_value: &Value,
1895 path: &FieldPath,
1896 default: Option<&Value>,
1897 ) -> Result<Value> {
1898 if path.segments.is_empty() {
1899 if let Some(obj) = event_value.as_object() {
1900 let filtered: serde_json::Map<String, Value> = obj
1901 .iter()
1902 .filter(|(k, _)| !k.starts_with("__"))
1903 .map(|(k, v)| (k.clone(), v.clone()))
1904 .collect();
1905 return Ok(Value::Object(filtered));
1906 }
1907 return Ok(event_value.clone());
1908 }
1909
1910 let mut current = event_value;
1911 for segment in path.segments.iter() {
1912 current = match current.get(segment) {
1913 Some(v) => v,
1914 None => return Ok(default.cloned().unwrap_or(Value::Null)),
1915 };
1916 }
1917
1918 Ok(current.clone())
1919 }
1920
1921 fn get_value_at_path(value: &Value, path: &str) -> Option<Value> {
1922 let mut current = value;
1923 for segment in path.split('.') {
1924 current = current.get(segment)?;
1925 }
1926 Some(current.clone())
1927 }
1928
1929 fn set_field_auto_vivify(
1930 &mut self,
1931 object_reg: Register,
1932 path: &str,
1933 value_reg: Register,
1934 ) -> Result<()> {
1935 let compiled = self.get_compiled_path(path);
1936 let segments = compiled.segments();
1937 let value = self.registers[value_reg].clone();
1938
1939 if !self.registers[object_reg].is_object() {
1940 self.registers[object_reg] = json!({});
1941 }
1942
1943 let obj = self.registers[object_reg]
1944 .as_object_mut()
1945 .ok_or("Not an object")?;
1946
1947 let mut current = obj;
1948 for (i, segment) in segments.iter().enumerate() {
1949 if i == segments.len() - 1 {
1950 current.insert(segment.to_string(), value);
1951 return Ok(());
1952 } else {
1953 current
1954 .entry(segment.to_string())
1955 .or_insert_with(|| json!({}));
1956 current = current
1957 .get_mut(segment)
1958 .and_then(|v| v.as_object_mut())
1959 .ok_or("Path collision: expected object")?;
1960 }
1961 }
1962
1963 Ok(())
1964 }
1965
1966 fn set_field_if_null(
1967 &mut self,
1968 object_reg: Register,
1969 path: &str,
1970 value_reg: Register,
1971 ) -> Result<bool> {
1972 let compiled = self.get_compiled_path(path);
1973 let segments = compiled.segments();
1974 let value = self.registers[value_reg].clone();
1975
1976 if value.is_null() {
1980 return Ok(false);
1981 }
1982
1983 if !self.registers[object_reg].is_object() {
1984 self.registers[object_reg] = json!({});
1985 }
1986
1987 let obj = self.registers[object_reg]
1988 .as_object_mut()
1989 .ok_or("Not an object")?;
1990
1991 let mut current = obj;
1992 for (i, segment) in segments.iter().enumerate() {
1993 if i == segments.len() - 1 {
1994 if !current.contains_key(segment) || current.get(segment).unwrap().is_null() {
1995 current.insert(segment.to_string(), value);
1996 return Ok(true);
1997 }
1998 return Ok(false);
1999 } else {
2000 current
2001 .entry(segment.to_string())
2002 .or_insert_with(|| json!({}));
2003 current = current
2004 .get_mut(segment)
2005 .and_then(|v| v.as_object_mut())
2006 .ok_or("Path collision: expected object")?;
2007 }
2008 }
2009
2010 Ok(false)
2011 }
2012
2013 fn set_field_max(
2014 &mut self,
2015 object_reg: Register,
2016 path: &str,
2017 value_reg: Register,
2018 ) -> Result<bool> {
2019 let compiled = self.get_compiled_path(path);
2020 let segments = compiled.segments();
2021 let new_value = self.registers[value_reg].clone();
2022
2023 if !self.registers[object_reg].is_object() {
2024 self.registers[object_reg] = json!({});
2025 }
2026
2027 let obj = self.registers[object_reg]
2028 .as_object_mut()
2029 .ok_or("Not an object")?;
2030
2031 let mut current = obj;
2032 for (i, segment) in segments.iter().enumerate() {
2033 if i == segments.len() - 1 {
2034 let should_update = if let Some(current_value) = current.get(segment) {
2035 if current_value.is_null() {
2036 true
2037 } else {
2038 match (current_value.as_i64(), new_value.as_i64()) {
2039 (Some(current_val), Some(new_val)) => new_val > current_val,
2040 (Some(current_val), None) if new_value.as_u64().is_some() => {
2041 new_value.as_u64().unwrap() as i64 > current_val
2042 }
2043 (None, Some(new_val)) if current_value.as_u64().is_some() => {
2044 new_val > current_value.as_u64().unwrap() as i64
2045 }
2046 (None, None) => match (current_value.as_u64(), new_value.as_u64()) {
2047 (Some(current_val), Some(new_val)) => new_val > current_val,
2048 _ => match (current_value.as_f64(), new_value.as_f64()) {
2049 (Some(current_val), Some(new_val)) => new_val > current_val,
2050 _ => false,
2051 },
2052 },
2053 _ => false,
2054 }
2055 }
2056 } else {
2057 true
2058 };
2059
2060 if should_update {
2061 current.insert(segment.to_string(), new_value);
2062 return Ok(true);
2063 }
2064 return Ok(false);
2065 } else {
2066 current
2067 .entry(segment.to_string())
2068 .or_insert_with(|| json!({}));
2069 current = current
2070 .get_mut(segment)
2071 .and_then(|v| v.as_object_mut())
2072 .ok_or("Path collision: expected object")?;
2073 }
2074 }
2075
2076 Ok(false)
2077 }
2078
2079 fn set_field_sum(
2080 &mut self,
2081 object_reg: Register,
2082 path: &str,
2083 value_reg: Register,
2084 ) -> Result<bool> {
2085 let compiled = self.get_compiled_path(path);
2086 let segments = compiled.segments();
2087 let new_value = &self.registers[value_reg];
2088
2089 let new_val_num = new_value
2091 .as_i64()
2092 .or_else(|| new_value.as_u64().map(|n| n as i64))
2093 .ok_or("Sum requires numeric value")?;
2094
2095 if !self.registers[object_reg].is_object() {
2096 self.registers[object_reg] = json!({});
2097 }
2098
2099 let obj = self.registers[object_reg]
2100 .as_object_mut()
2101 .ok_or("Not an object")?;
2102
2103 let mut current = obj;
2104 for (i, segment) in segments.iter().enumerate() {
2105 if i == segments.len() - 1 {
2106 let current_val = current
2107 .get(segment)
2108 .and_then(|v| {
2109 if v.is_null() {
2110 None
2111 } else {
2112 v.as_i64().or_else(|| v.as_u64().map(|n| n as i64))
2113 }
2114 })
2115 .unwrap_or(0);
2116
2117 let sum = current_val + new_val_num;
2118 current.insert(segment.to_string(), json!(sum));
2119 return Ok(true);
2120 } else {
2121 current
2122 .entry(segment.to_string())
2123 .or_insert_with(|| json!({}));
2124 current = current
2125 .get_mut(segment)
2126 .and_then(|v| v.as_object_mut())
2127 .ok_or("Path collision: expected object")?;
2128 }
2129 }
2130
2131 Ok(false)
2132 }
2133
2134 fn set_field_increment(&mut self, object_reg: Register, path: &str) -> Result<bool> {
2135 let compiled = self.get_compiled_path(path);
2136 let segments = compiled.segments();
2137
2138 if !self.registers[object_reg].is_object() {
2139 self.registers[object_reg] = json!({});
2140 }
2141
2142 let obj = self.registers[object_reg]
2143 .as_object_mut()
2144 .ok_or("Not an object")?;
2145
2146 let mut current = obj;
2147 for (i, segment) in segments.iter().enumerate() {
2148 if i == segments.len() - 1 {
2149 let current_val = current
2151 .get(segment)
2152 .and_then(|v| {
2153 if v.is_null() {
2154 None
2155 } else {
2156 v.as_i64().or_else(|| v.as_u64().map(|n| n as i64))
2157 }
2158 })
2159 .unwrap_or(0);
2160
2161 let incremented = current_val + 1;
2162 current.insert(segment.to_string(), json!(incremented));
2163 return Ok(true);
2164 } else {
2165 current
2166 .entry(segment.to_string())
2167 .or_insert_with(|| json!({}));
2168 current = current
2169 .get_mut(segment)
2170 .and_then(|v| v.as_object_mut())
2171 .ok_or("Path collision: expected object")?;
2172 }
2173 }
2174
2175 Ok(false)
2176 }
2177
2178 fn set_field_min(
2179 &mut self,
2180 object_reg: Register,
2181 path: &str,
2182 value_reg: Register,
2183 ) -> Result<bool> {
2184 let compiled = self.get_compiled_path(path);
2185 let segments = compiled.segments();
2186 let new_value = self.registers[value_reg].clone();
2187
2188 if !self.registers[object_reg].is_object() {
2189 self.registers[object_reg] = json!({});
2190 }
2191
2192 let obj = self.registers[object_reg]
2193 .as_object_mut()
2194 .ok_or("Not an object")?;
2195
2196 let mut current = obj;
2197 for (i, segment) in segments.iter().enumerate() {
2198 if i == segments.len() - 1 {
2199 let should_update = if let Some(current_value) = current.get(segment) {
2200 if current_value.is_null() {
2201 true
2202 } else {
2203 match (current_value.as_i64(), new_value.as_i64()) {
2204 (Some(current_val), Some(new_val)) => new_val < current_val,
2205 (Some(current_val), None) if new_value.as_u64().is_some() => {
2206 (new_value.as_u64().unwrap() as i64) < current_val
2207 }
2208 (None, Some(new_val)) if current_value.as_u64().is_some() => {
2209 new_val < current_value.as_u64().unwrap() as i64
2210 }
2211 (None, None) => match (current_value.as_u64(), new_value.as_u64()) {
2212 (Some(current_val), Some(new_val)) => new_val < current_val,
2213 _ => match (current_value.as_f64(), new_value.as_f64()) {
2214 (Some(current_val), Some(new_val)) => new_val < current_val,
2215 _ => false,
2216 },
2217 },
2218 _ => false,
2219 }
2220 }
2221 } else {
2222 true
2223 };
2224
2225 if should_update {
2226 current.insert(segment.to_string(), new_value);
2227 return Ok(true);
2228 }
2229 return Ok(false);
2230 } else {
2231 current
2232 .entry(segment.to_string())
2233 .or_insert_with(|| json!({}));
2234 current = current
2235 .get_mut(segment)
2236 .and_then(|v| v.as_object_mut())
2237 .ok_or("Path collision: expected object")?;
2238 }
2239 }
2240
2241 Ok(false)
2242 }
2243
2244 fn get_field(&mut self, object_reg: Register, path: &str) -> Result<Value> {
2245 let compiled = self.get_compiled_path(path);
2246 let segments = compiled.segments();
2247 let mut current = &self.registers[object_reg];
2248
2249 for segment in segments {
2250 current = current
2251 .get(segment)
2252 .ok_or_else(|| format!("Field not found: {}", segment))?;
2253 }
2254
2255 Ok(current.clone())
2256 }
2257
2258 fn append_to_array(
2259 &mut self,
2260 object_reg: Register,
2261 path: &str,
2262 value_reg: Register,
2263 max_length: usize,
2264 ) -> Result<()> {
2265 let compiled = self.get_compiled_path(path);
2266 let segments = compiled.segments();
2267 let value = self.registers[value_reg].clone();
2268
2269 if !self.registers[object_reg].is_object() {
2270 self.registers[object_reg] = json!({});
2271 }
2272
2273 let obj = self.registers[object_reg]
2274 .as_object_mut()
2275 .ok_or("Not an object")?;
2276
2277 let mut current = obj;
2278 for (i, segment) in segments.iter().enumerate() {
2279 if i == segments.len() - 1 {
2280 current
2281 .entry(segment.to_string())
2282 .or_insert_with(|| json!([]));
2283 let arr = current
2284 .get_mut(segment)
2285 .and_then(|v| v.as_array_mut())
2286 .ok_or("Path is not an array")?;
2287 arr.push(value.clone());
2288
2289 if arr.len() > max_length {
2290 let excess = arr.len() - max_length;
2291 arr.drain(0..excess);
2292 }
2293 } else {
2294 current
2295 .entry(segment.to_string())
2296 .or_insert_with(|| json!({}));
2297 current = current
2298 .get_mut(segment)
2299 .and_then(|v| v.as_object_mut())
2300 .ok_or("Path collision: expected object")?;
2301 }
2302 }
2303
2304 Ok(())
2305 }
2306
2307 fn transform_in_place(&mut self, reg: Register, transformation: &Transformation) -> Result<()> {
2308 let value = &self.registers[reg];
2309 let transformed = self.apply_transformation(value, transformation)?;
2310 self.registers[reg] = transformed;
2311 Ok(())
2312 }
2313
2314 fn apply_transformation(
2315 &self,
2316 value: &Value,
2317 transformation: &Transformation,
2318 ) -> Result<Value> {
2319 match transformation {
2320 Transformation::HexEncode => {
2321 if let Some(arr) = value.as_array() {
2322 let bytes: Vec<u8> = arr
2323 .iter()
2324 .filter_map(|v| v.as_u64().map(|n| n as u8))
2325 .collect();
2326 let hex = hex::encode(&bytes);
2327 Ok(json!(hex))
2328 } else {
2329 Err("HexEncode requires an array of numbers".into())
2330 }
2331 }
2332 Transformation::HexDecode => {
2333 if let Some(s) = value.as_str() {
2334 let s = s.strip_prefix("0x").unwrap_or(s);
2335 let bytes = hex::decode(s).map_err(|e| format!("Hex decode error: {}", e))?;
2336 Ok(json!(bytes))
2337 } else {
2338 Err("HexDecode requires a string".into())
2339 }
2340 }
2341 Transformation::Base58Encode => {
2342 if let Some(arr) = value.as_array() {
2343 let bytes: Vec<u8> = arr
2344 .iter()
2345 .filter_map(|v| v.as_u64().map(|n| n as u8))
2346 .collect();
2347 let encoded = bs58::encode(&bytes).into_string();
2348 Ok(json!(encoded))
2349 } else if value.is_string() {
2350 Ok(value.clone())
2351 } else {
2352 Err("Base58Encode requires an array of numbers".into())
2353 }
2354 }
2355 Transformation::Base58Decode => {
2356 if let Some(s) = value.as_str() {
2357 let bytes = bs58::decode(s)
2358 .into_vec()
2359 .map_err(|e| format!("Base58 decode error: {}", e))?;
2360 Ok(json!(bytes))
2361 } else {
2362 Err("Base58Decode requires a string".into())
2363 }
2364 }
2365 Transformation::ToString => Ok(json!(value.to_string())),
2366 Transformation::ToNumber => {
2367 if let Some(s) = value.as_str() {
2368 let n = s
2369 .parse::<i64>()
2370 .map_err(|e| format!("Parse error: {}", e))?;
2371 Ok(json!(n))
2372 } else {
2373 Ok(value.clone())
2374 }
2375 }
2376 }
2377 }
2378
2379 fn evaluate_comparison(
2380 &self,
2381 field_value: &Value,
2382 op: &ComparisonOp,
2383 condition_value: &Value,
2384 ) -> Result<bool> {
2385 use ComparisonOp::*;
2386
2387 match op {
2388 Equal => Ok(field_value == condition_value),
2389 NotEqual => Ok(field_value != condition_value),
2390 GreaterThan => {
2391 match (field_value.as_i64(), condition_value.as_i64()) {
2393 (Some(a), Some(b)) => Ok(a > b),
2394 _ => match (field_value.as_u64(), condition_value.as_u64()) {
2395 (Some(a), Some(b)) => Ok(a > b),
2396 _ => match (field_value.as_f64(), condition_value.as_f64()) {
2397 (Some(a), Some(b)) => Ok(a > b),
2398 _ => Err("Cannot compare non-numeric values with GreaterThan".into()),
2399 },
2400 },
2401 }
2402 }
2403 GreaterThanOrEqual => match (field_value.as_i64(), condition_value.as_i64()) {
2404 (Some(a), Some(b)) => Ok(a >= b),
2405 _ => match (field_value.as_u64(), condition_value.as_u64()) {
2406 (Some(a), Some(b)) => Ok(a >= b),
2407 _ => match (field_value.as_f64(), condition_value.as_f64()) {
2408 (Some(a), Some(b)) => Ok(a >= b),
2409 _ => {
2410 Err("Cannot compare non-numeric values with GreaterThanOrEqual".into())
2411 }
2412 },
2413 },
2414 },
2415 LessThan => match (field_value.as_i64(), condition_value.as_i64()) {
2416 (Some(a), Some(b)) => Ok(a < b),
2417 _ => match (field_value.as_u64(), condition_value.as_u64()) {
2418 (Some(a), Some(b)) => Ok(a < b),
2419 _ => match (field_value.as_f64(), condition_value.as_f64()) {
2420 (Some(a), Some(b)) => Ok(a < b),
2421 _ => Err("Cannot compare non-numeric values with LessThan".into()),
2422 },
2423 },
2424 },
2425 LessThanOrEqual => match (field_value.as_i64(), condition_value.as_i64()) {
2426 (Some(a), Some(b)) => Ok(a <= b),
2427 _ => match (field_value.as_u64(), condition_value.as_u64()) {
2428 (Some(a), Some(b)) => Ok(a <= b),
2429 _ => match (field_value.as_f64(), condition_value.as_f64()) {
2430 (Some(a), Some(b)) => Ok(a <= b),
2431 _ => Err("Cannot compare non-numeric values with LessThanOrEqual".into()),
2432 },
2433 },
2434 },
2435 }
2436 }
2437
2438 #[cfg_attr(feature = "otel", instrument(
2447 name = "vm.update_pda_lookup",
2448 skip(self),
2449 fields(
2450 pda = %pda_address,
2451 seed = %seed_value,
2452 )
2453 ))]
2454 pub fn update_pda_reverse_lookup(
2455 &mut self,
2456 state_id: u32,
2457 lookup_name: &str,
2458 pda_address: String,
2459 seed_value: String,
2460 ) -> Result<Vec<PendingAccountUpdate>> {
2461 let state = self
2462 .states
2463 .get_mut(&state_id)
2464 .ok_or("State table not found")?;
2465
2466 let lookup = state
2467 .pda_reverse_lookups
2468 .entry(lookup_name.to_string())
2469 .or_insert_with(|| PdaReverseLookup::new(DEFAULT_MAX_PDA_REVERSE_LOOKUP_ENTRIES));
2470
2471 let evicted_pda = lookup.insert(pda_address.clone(), seed_value);
2472
2473 if let Some(ref evicted) = evicted_pda {
2474 if let Some((_, evicted_updates)) = state.pending_updates.remove(evicted) {
2475 let count = evicted_updates.len();
2476 self.pending_queue_size = self.pending_queue_size.saturating_sub(count as u64);
2477 }
2478 }
2479
2480 self.flush_pending_updates(state_id, &pda_address)
2482 }
2483
2484 pub fn cleanup_expired_pending_updates(&mut self, state_id: u32) -> usize {
2489 let state = match self.states.get_mut(&state_id) {
2490 Some(s) => s,
2491 None => return 0,
2492 };
2493
2494 let now = std::time::SystemTime::now()
2495 .duration_since(std::time::UNIX_EPOCH)
2496 .unwrap()
2497 .as_secs() as i64;
2498
2499 let mut removed_count = 0;
2500
2501 state.pending_updates.retain(|_pda_address, updates| {
2503 let original_len = updates.len();
2504
2505 updates.retain(|update| {
2506 let age = now - update.queued_at;
2507 age <= PENDING_UPDATE_TTL_SECONDS
2508 });
2509
2510 removed_count += original_len - updates.len();
2511
2512 !updates.is_empty()
2514 });
2515
2516 self.pending_queue_size = self.pending_queue_size.saturating_sub(removed_count as u64);
2518
2519 if removed_count > 0 {
2520 #[cfg(feature = "otel")]
2521 crate::vm_metrics::record_pending_updates_expired(
2522 removed_count as u64,
2523 &state.entity_name,
2524 );
2525 }
2526
2527 removed_count
2528 }
2529
2530 #[cfg_attr(feature = "otel", instrument(
2562 name = "vm.queue_account_update",
2563 skip(self, update),
2564 fields(
2565 pda = %update.pda_address,
2566 account_type = %update.account_type,
2567 slot = update.slot,
2568 )
2569 ))]
2570 pub fn queue_account_update(
2571 &mut self,
2572 state_id: u32,
2573 update: QueuedAccountUpdate,
2574 ) -> Result<()> {
2575 if self.pending_queue_size >= MAX_PENDING_UPDATES_TOTAL as u64 {
2576 self.cleanup_expired_pending_updates(state_id);
2577 if self.pending_queue_size >= MAX_PENDING_UPDATES_TOTAL as u64 {
2578 self.drop_oldest_pending_update(state_id)?;
2579 }
2580 }
2581
2582 let state = self
2583 .states
2584 .get_mut(&state_id)
2585 .ok_or("State table not found")?;
2586
2587 let pending = PendingAccountUpdate {
2588 account_type: update.account_type,
2589 pda_address: update.pda_address.clone(),
2590 account_data: update.account_data,
2591 slot: update.slot,
2592 write_version: update.write_version,
2593 signature: update.signature,
2594 queued_at: std::time::SystemTime::now()
2595 .duration_since(std::time::UNIX_EPOCH)
2596 .unwrap()
2597 .as_secs() as i64,
2598 };
2599
2600 let pda_address = pending.pda_address.clone();
2601 let slot = pending.slot;
2602
2603 let mut updates = state
2604 .pending_updates
2605 .entry(pda_address.clone())
2606 .or_insert_with(Vec::new);
2607
2608 let original_len = updates.len();
2609 updates.retain(|existing| existing.slot > slot);
2610 let removed_by_dedup = original_len - updates.len();
2611
2612 if removed_by_dedup > 0 {
2613 self.pending_queue_size = self
2614 .pending_queue_size
2615 .saturating_sub(removed_by_dedup as u64);
2616 }
2617
2618 if updates.len() >= MAX_PENDING_UPDATES_PER_PDA {
2619 updates.remove(0);
2620 self.pending_queue_size = self.pending_queue_size.saturating_sub(1);
2621 }
2622
2623 updates.push(pending);
2624 #[cfg(feature = "otel")]
2625 crate::vm_metrics::record_pending_update_queued(&state.entity_name);
2626
2627 Ok(())
2628 }
2629
2630 pub fn queue_instruction_event(
2631 &mut self,
2632 state_id: u32,
2633 event: QueuedInstructionEvent,
2634 ) -> Result<()> {
2635 let state = self
2636 .states
2637 .get_mut(&state_id)
2638 .ok_or("State table not found")?;
2639
2640 let pda_address = event.pda_address.clone();
2641
2642 let pending = PendingInstructionEvent {
2643 event_type: event.event_type,
2644 pda_address: event.pda_address,
2645 event_data: event.event_data,
2646 slot: event.slot,
2647 signature: event.signature,
2648 queued_at: std::time::SystemTime::now()
2649 .duration_since(std::time::UNIX_EPOCH)
2650 .unwrap()
2651 .as_secs() as i64,
2652 };
2653
2654 let mut events = state
2655 .pending_instruction_events
2656 .entry(pda_address)
2657 .or_insert_with(Vec::new);
2658
2659 if events.len() >= MAX_PENDING_UPDATES_PER_PDA {
2660 events.remove(0);
2661 }
2662
2663 events.push(pending);
2664
2665 Ok(())
2666 }
2667
2668 pub fn take_last_pda_lookup_miss(&mut self) -> Option<String> {
2669 self.last_pda_lookup_miss.take()
2670 }
2671
2672 pub fn take_last_pda_registered(&mut self) -> Option<String> {
2673 self.last_pda_registered.take()
2674 }
2675
2676 pub fn flush_pending_instruction_events(
2677 &mut self,
2678 state_id: u32,
2679 pda_address: &str,
2680 ) -> Vec<PendingInstructionEvent> {
2681 let state = match self.states.get_mut(&state_id) {
2682 Some(s) => s,
2683 None => return Vec::new(),
2684 };
2685
2686 if let Some((_, events)) = state.pending_instruction_events.remove(pda_address) {
2687 events
2688 } else {
2689 Vec::new()
2690 }
2691 }
2692
2693 pub fn get_pending_queue_stats(&self, state_id: u32) -> Option<PendingQueueStats> {
2695 let state = self.states.get(&state_id)?;
2696
2697 let now = std::time::SystemTime::now()
2698 .duration_since(std::time::UNIX_EPOCH)
2699 .unwrap()
2700 .as_secs() as i64;
2701
2702 let mut total_updates = 0;
2703 let mut oldest_timestamp = now;
2704 let mut largest_pda_queue = 0;
2705 let mut estimated_memory = 0;
2706
2707 for entry in state.pending_updates.iter() {
2708 let (_, updates) = entry.pair();
2709 total_updates += updates.len();
2710 largest_pda_queue = largest_pda_queue.max(updates.len());
2711
2712 for update in updates.iter() {
2713 oldest_timestamp = oldest_timestamp.min(update.queued_at);
2714 estimated_memory += update.account_type.len() +
2716 update.pda_address.len() +
2717 update.signature.len() +
2718 16 + estimate_json_size(&update.account_data);
2720 }
2721 }
2722
2723 Some(PendingQueueStats {
2724 total_updates,
2725 unique_pdas: state.pending_updates.len(),
2726 oldest_age_seconds: now - oldest_timestamp,
2727 largest_pda_queue_size: largest_pda_queue,
2728 estimated_memory_bytes: estimated_memory,
2729 })
2730 }
2731
2732 pub fn get_memory_stats(&self, state_id: u32) -> VmMemoryStats {
2733 let mut stats = VmMemoryStats {
2734 path_cache_size: self.path_cache.len(),
2735 ..Default::default()
2736 };
2737
2738 if let Some(state) = self.states.get(&state_id) {
2739 stats.state_table_entity_count = state.data.len();
2740 stats.state_table_max_entries = state.config.max_entries;
2741 stats.state_table_at_capacity = state.is_at_capacity();
2742
2743 stats.lookup_index_count = state.lookup_indexes.len();
2744 stats.lookup_index_total_entries =
2745 state.lookup_indexes.values().map(|idx| idx.len()).sum();
2746
2747 stats.temporal_index_count = state.temporal_indexes.len();
2748 stats.temporal_index_total_entries = state
2749 .temporal_indexes
2750 .values()
2751 .map(|idx| idx.total_entries())
2752 .sum();
2753
2754 stats.pda_reverse_lookup_count = state.pda_reverse_lookups.len();
2755 stats.pda_reverse_lookup_total_entries = state
2756 .pda_reverse_lookups
2757 .values()
2758 .map(|lookup| lookup.len())
2759 .sum();
2760
2761 stats.version_tracker_entries = state.version_tracker.len();
2762
2763 stats.pending_queue_stats = self.get_pending_queue_stats(state_id);
2764 }
2765
2766 stats
2767 }
2768
2769 pub fn cleanup_all_expired(&mut self, state_id: u32) -> CleanupResult {
2770 let pending_removed = self.cleanup_expired_pending_updates(state_id);
2771 let temporal_removed = self.cleanup_temporal_indexes(state_id);
2772
2773 #[cfg(feature = "otel")]
2774 if let Some(state) = self.states.get(&state_id) {
2775 crate::vm_metrics::record_cleanup(
2776 pending_removed,
2777 temporal_removed,
2778 &state.entity_name,
2779 );
2780 }
2781
2782 CleanupResult {
2783 pending_updates_removed: pending_removed,
2784 temporal_entries_removed: temporal_removed,
2785 }
2786 }
2787
2788 fn cleanup_temporal_indexes(&mut self, state_id: u32) -> usize {
2789 let state = match self.states.get_mut(&state_id) {
2790 Some(s) => s,
2791 None => return 0,
2792 };
2793
2794 let now = std::time::SystemTime::now()
2795 .duration_since(std::time::UNIX_EPOCH)
2796 .unwrap()
2797 .as_secs() as i64;
2798
2799 let cutoff = now - TEMPORAL_HISTORY_TTL_SECONDS;
2800 let mut total_removed = 0;
2801
2802 for (_, index) in state.temporal_indexes.iter_mut() {
2803 total_removed += index.cleanup_expired(cutoff);
2804 }
2805
2806 total_removed
2807 }
2808
2809 pub fn check_state_table_capacity(&self, state_id: u32) -> Option<CapacityWarning> {
2810 let state = self.states.get(&state_id)?;
2811
2812 if state.is_at_capacity() {
2813 Some(CapacityWarning {
2814 current_entries: state.data.len(),
2815 max_entries: state.config.max_entries,
2816 entries_over_limit: state.entries_over_limit(),
2817 })
2818 } else {
2819 None
2820 }
2821 }
2822
2823 fn drop_oldest_pending_update(&mut self, state_id: u32) -> Result<()> {
2825 let state = self
2826 .states
2827 .get_mut(&state_id)
2828 .ok_or("State table not found")?;
2829
2830 let mut oldest_pda: Option<String> = None;
2831 let mut oldest_timestamp = i64::MAX;
2832
2833 for entry in state.pending_updates.iter() {
2835 let (pda, updates) = entry.pair();
2836 if let Some(update) = updates.first() {
2837 if update.queued_at < oldest_timestamp {
2838 oldest_timestamp = update.queued_at;
2839 oldest_pda = Some(pda.clone());
2840 }
2841 }
2842 }
2843
2844 if let Some(pda) = oldest_pda {
2846 if let Some(mut updates) = state.pending_updates.get_mut(&pda) {
2847 if !updates.is_empty() {
2848 updates.remove(0);
2849 self.pending_queue_size = self.pending_queue_size.saturating_sub(1);
2850
2851 if updates.is_empty() {
2853 drop(updates);
2854 state.pending_updates.remove(&pda);
2855 }
2856 }
2857 }
2858 }
2859
2860 Ok(())
2861 }
2862
2863 fn flush_pending_updates(
2868 &mut self,
2869 state_id: u32,
2870 pda_address: &str,
2871 ) -> Result<Vec<PendingAccountUpdate>> {
2872 let state = self
2873 .states
2874 .get_mut(&state_id)
2875 .ok_or("State table not found")?;
2876
2877 if let Some((_, pending_updates)) = state.pending_updates.remove(pda_address) {
2878 let count = pending_updates.len();
2879 self.pending_queue_size = self.pending_queue_size.saturating_sub(count as u64);
2880 #[cfg(feature = "otel")]
2881 crate::vm_metrics::record_pending_updates_flushed(count as u64, &state.entity_name);
2882 Ok(pending_updates)
2883 } else {
2884 Ok(Vec::new())
2885 }
2886 }
2887
2888 pub fn try_pda_reverse_lookup(
2890 &mut self,
2891 state_id: u32,
2892 lookup_name: &str,
2893 pda_address: &str,
2894 ) -> Option<String> {
2895 let state = self.states.get_mut(&state_id)?;
2896
2897 if let Some(lookup) = state.pda_reverse_lookups.get_mut(lookup_name) {
2898 if let Some(value) = lookup.lookup(pda_address) {
2899 self.pda_cache_hits += 1;
2900 return Some(value);
2901 }
2902 }
2903
2904 self.pda_cache_misses += 1;
2905 None
2906 }
2907
2908 pub fn evaluate_computed_expr(&self, expr: &ComputedExpr, state: &Value) -> Result<Value> {
2915 self.evaluate_computed_expr_with_env(expr, state, &std::collections::HashMap::new())
2916 }
2917
2918 fn evaluate_computed_expr_with_env(
2920 &self,
2921 expr: &ComputedExpr,
2922 state: &Value,
2923 env: &std::collections::HashMap<String, Value>,
2924 ) -> Result<Value> {
2925 match expr {
2926 ComputedExpr::FieldRef { path } => self.get_field_from_state(state, path),
2927
2928 ComputedExpr::Var { name } => env
2929 .get(name)
2930 .cloned()
2931 .ok_or_else(|| format!("Undefined variable: {}", name).into()),
2932
2933 ComputedExpr::Let { name, value, body } => {
2934 let val = self.evaluate_computed_expr_with_env(value, state, env)?;
2935 let mut new_env = env.clone();
2936 new_env.insert(name.clone(), val);
2937 self.evaluate_computed_expr_with_env(body, state, &new_env)
2938 }
2939
2940 ComputedExpr::If {
2941 condition,
2942 then_branch,
2943 else_branch,
2944 } => {
2945 let cond_val = self.evaluate_computed_expr_with_env(condition, state, env)?;
2946 if self.value_to_bool(&cond_val) {
2947 self.evaluate_computed_expr_with_env(then_branch, state, env)
2948 } else {
2949 self.evaluate_computed_expr_with_env(else_branch, state, env)
2950 }
2951 }
2952
2953 ComputedExpr::None => Ok(Value::Null),
2954
2955 ComputedExpr::Some { value } => self.evaluate_computed_expr_with_env(value, state, env),
2956
2957 ComputedExpr::Slice { expr, start, end } => {
2958 let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
2959 match val {
2960 Value::Array(arr) => {
2961 let slice: Vec<Value> = arr.get(*start..*end).unwrap_or(&[]).to_vec();
2962 Ok(Value::Array(slice))
2963 }
2964 _ => Err(format!("Cannot slice non-array value: {:?}", val).into()),
2965 }
2966 }
2967
2968 ComputedExpr::Index { expr, index } => {
2969 let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
2970 match val {
2971 Value::Array(arr) => Ok(arr.get(*index).cloned().unwrap_or(Value::Null)),
2972 _ => Err(format!("Cannot index non-array value: {:?}", val).into()),
2973 }
2974 }
2975
2976 ComputedExpr::U64FromLeBytes { bytes } => {
2977 let val = self.evaluate_computed_expr_with_env(bytes, state, env)?;
2978 let byte_vec = self.value_to_bytes(&val)?;
2979 if byte_vec.len() < 8 {
2980 return Err(format!(
2981 "u64::from_le_bytes requires 8 bytes, got {}",
2982 byte_vec.len()
2983 )
2984 .into());
2985 }
2986 let arr: [u8; 8] = byte_vec[..8]
2987 .try_into()
2988 .map_err(|_| "Failed to convert to [u8; 8]")?;
2989 Ok(json!(u64::from_le_bytes(arr)))
2990 }
2991
2992 ComputedExpr::U64FromBeBytes { bytes } => {
2993 let val = self.evaluate_computed_expr_with_env(bytes, state, env)?;
2994 let byte_vec = self.value_to_bytes(&val)?;
2995 if byte_vec.len() < 8 {
2996 return Err(format!(
2997 "u64::from_be_bytes requires 8 bytes, got {}",
2998 byte_vec.len()
2999 )
3000 .into());
3001 }
3002 let arr: [u8; 8] = byte_vec[..8]
3003 .try_into()
3004 .map_err(|_| "Failed to convert to [u8; 8]")?;
3005 Ok(json!(u64::from_be_bytes(arr)))
3006 }
3007
3008 ComputedExpr::ByteArray { bytes } => {
3009 Ok(Value::Array(bytes.iter().map(|b| json!(*b)).collect()))
3010 }
3011
3012 ComputedExpr::Closure { param, body } => {
3013 Ok(json!({
3016 "__closure": {
3017 "param": param,
3018 "body": serde_json::to_value(body).unwrap_or(Value::Null)
3019 }
3020 }))
3021 }
3022
3023 ComputedExpr::Unary { op, expr } => {
3024 let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
3025 self.apply_unary_op(op, &val)
3026 }
3027
3028 ComputedExpr::JsonToBytes { expr } => {
3029 let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
3030 let bytes = self.value_to_bytes(&val)?;
3032 Ok(Value::Array(bytes.iter().map(|b| json!(*b)).collect()))
3033 }
3034
3035 ComputedExpr::UnwrapOr { expr, default } => {
3036 let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
3037 if val.is_null() {
3038 Ok(default.clone())
3039 } else {
3040 Ok(val)
3041 }
3042 }
3043
3044 ComputedExpr::Binary { op, left, right } => {
3045 let l = self.evaluate_computed_expr_with_env(left, state, env)?;
3046 let r = self.evaluate_computed_expr_with_env(right, state, env)?;
3047 self.apply_binary_op(op, &l, &r)
3048 }
3049
3050 ComputedExpr::Cast { expr, to_type } => {
3051 let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
3052 self.apply_cast(&val, to_type)
3053 }
3054
3055 ComputedExpr::MethodCall { expr, method, args } => {
3056 let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
3057 if method == "map" && args.len() == 1 {
3059 if let ComputedExpr::Closure { param, body } = &args[0] {
3060 if val.is_null() {
3062 return Ok(Value::Null);
3063 }
3064 let mut closure_env = env.clone();
3066 closure_env.insert(param.clone(), val);
3067 return self.evaluate_computed_expr_with_env(body, state, &closure_env);
3068 }
3069 }
3070 let evaluated_args: Vec<Value> = args
3071 .iter()
3072 .map(|a| self.evaluate_computed_expr_with_env(a, state, env))
3073 .collect::<Result<Vec<_>>>()?;
3074 self.apply_method_call(&val, method, &evaluated_args)
3075 }
3076
3077 ComputedExpr::Literal { value } => Ok(value.clone()),
3078
3079 ComputedExpr::Paren { expr } => self.evaluate_computed_expr_with_env(expr, state, env),
3080 }
3081 }
3082
3083 fn value_to_bytes(&self, val: &Value) -> Result<Vec<u8>> {
3085 match val {
3086 Value::Array(arr) => arr
3087 .iter()
3088 .map(|v| {
3089 v.as_u64()
3090 .map(|n| n as u8)
3091 .ok_or_else(|| "Array element not a valid byte".into())
3092 })
3093 .collect(),
3094 Value::String(s) => {
3095 if s.starts_with("0x") || s.starts_with("0X") {
3097 hex::decode(&s[2..]).map_err(|e| format!("Invalid hex string: {}", e).into())
3098 } else {
3099 hex::decode(s).map_err(|e| format!("Invalid hex string: {}", e).into())
3100 }
3101 }
3102 _ => Err(format!("Cannot convert {:?} to bytes", val).into()),
3103 }
3104 }
3105
3106 fn apply_unary_op(&self, op: &crate::ast::UnaryOp, val: &Value) -> Result<Value> {
3108 use crate::ast::UnaryOp;
3109 match op {
3110 UnaryOp::Not => Ok(json!(!self.value_to_bool(val))),
3111 UnaryOp::ReverseBits => match val.as_u64() {
3112 Some(n) => Ok(json!(n.reverse_bits())),
3113 None => match val.as_i64() {
3114 Some(n) => Ok(json!((n as u64).reverse_bits())),
3115 None => Err("reverse_bits requires an integer".into()),
3116 },
3117 },
3118 }
3119 }
3120
3121 fn get_field_from_state(&self, state: &Value, path: &str) -> Result<Value> {
3123 let segments: Vec<&str> = path.split('.').collect();
3124 let mut current = state;
3125
3126 for segment in segments {
3127 match current.get(segment) {
3128 Some(v) => current = v,
3129 None => return Ok(Value::Null),
3130 }
3131 }
3132
3133 Ok(current.clone())
3134 }
3135
3136 fn apply_binary_op(&self, op: &BinaryOp, left: &Value, right: &Value) -> Result<Value> {
3138 match op {
3139 BinaryOp::Add => self.numeric_op(left, right, |a, b| a + b, |a, b| a + b),
3141 BinaryOp::Sub => self.numeric_op(left, right, |a, b| a - b, |a, b| a - b),
3142 BinaryOp::Mul => self.numeric_op(left, right, |a, b| a * b, |a, b| a * b),
3143 BinaryOp::Div => {
3144 if let Some(r) = right.as_i64() {
3146 if r == 0 {
3147 return Err("Division by zero".into());
3148 }
3149 }
3150 if let Some(r) = right.as_f64() {
3151 if r == 0.0 {
3152 return Err("Division by zero".into());
3153 }
3154 }
3155 self.numeric_op(left, right, |a, b| a / b, |a, b| a / b)
3156 }
3157 BinaryOp::Mod => {
3158 match (left.as_i64(), right.as_i64()) {
3160 (Some(a), Some(b)) if b != 0 => Ok(json!(a % b)),
3161 (None, _) | (_, None) => match (left.as_u64(), right.as_u64()) {
3162 (Some(a), Some(b)) if b != 0 => Ok(json!(a % b)),
3163 _ => Err("Modulo requires non-zero integer operands".into()),
3164 },
3165 _ => Err("Modulo by zero".into()),
3166 }
3167 }
3168
3169 BinaryOp::Gt => self.comparison_op(left, right, |a, b| a > b, |a, b| a > b),
3171 BinaryOp::Lt => self.comparison_op(left, right, |a, b| a < b, |a, b| a < b),
3172 BinaryOp::Gte => self.comparison_op(left, right, |a, b| a >= b, |a, b| a >= b),
3173 BinaryOp::Lte => self.comparison_op(left, right, |a, b| a <= b, |a, b| a <= b),
3174 BinaryOp::Eq => Ok(json!(left == right)),
3175 BinaryOp::Ne => Ok(json!(left != right)),
3176
3177 BinaryOp::And => {
3179 let l_bool = self.value_to_bool(left);
3180 let r_bool = self.value_to_bool(right);
3181 Ok(json!(l_bool && r_bool))
3182 }
3183 BinaryOp::Or => {
3184 let l_bool = self.value_to_bool(left);
3185 let r_bool = self.value_to_bool(right);
3186 Ok(json!(l_bool || r_bool))
3187 }
3188
3189 BinaryOp::Xor => match (left.as_u64(), right.as_u64()) {
3191 (Some(a), Some(b)) => Ok(json!(a ^ b)),
3192 _ => match (left.as_i64(), right.as_i64()) {
3193 (Some(a), Some(b)) => Ok(json!(a ^ b)),
3194 _ => Err("XOR requires integer operands".into()),
3195 },
3196 },
3197 BinaryOp::BitAnd => match (left.as_u64(), right.as_u64()) {
3198 (Some(a), Some(b)) => Ok(json!(a & b)),
3199 _ => match (left.as_i64(), right.as_i64()) {
3200 (Some(a), Some(b)) => Ok(json!(a & b)),
3201 _ => Err("BitAnd requires integer operands".into()),
3202 },
3203 },
3204 BinaryOp::BitOr => match (left.as_u64(), right.as_u64()) {
3205 (Some(a), Some(b)) => Ok(json!(a | b)),
3206 _ => match (left.as_i64(), right.as_i64()) {
3207 (Some(a), Some(b)) => Ok(json!(a | b)),
3208 _ => Err("BitOr requires integer operands".into()),
3209 },
3210 },
3211 BinaryOp::Shl => match (left.as_u64(), right.as_u64()) {
3212 (Some(a), Some(b)) => Ok(json!(a << b)),
3213 _ => match (left.as_i64(), right.as_i64()) {
3214 (Some(a), Some(b)) => Ok(json!(a << b)),
3215 _ => Err("Shl requires integer operands".into()),
3216 },
3217 },
3218 BinaryOp::Shr => match (left.as_u64(), right.as_u64()) {
3219 (Some(a), Some(b)) => Ok(json!(a >> b)),
3220 _ => match (left.as_i64(), right.as_i64()) {
3221 (Some(a), Some(b)) => Ok(json!(a >> b)),
3222 _ => Err("Shr requires integer operands".into()),
3223 },
3224 },
3225 }
3226 }
3227
3228 fn numeric_op<F1, F2>(
3230 &self,
3231 left: &Value,
3232 right: &Value,
3233 int_op: F1,
3234 float_op: F2,
3235 ) -> Result<Value>
3236 where
3237 F1: Fn(i64, i64) -> i64,
3238 F2: Fn(f64, f64) -> f64,
3239 {
3240 if let (Some(a), Some(b)) = (left.as_i64(), right.as_i64()) {
3242 return Ok(json!(int_op(a, b)));
3243 }
3244
3245 if let (Some(a), Some(b)) = (left.as_u64(), right.as_u64()) {
3247 return Ok(json!(int_op(a as i64, b as i64)));
3249 }
3250
3251 if let (Some(a), Some(b)) = (left.as_f64(), right.as_f64()) {
3253 return Ok(json!(float_op(a, b)));
3254 }
3255
3256 if left.is_null() || right.is_null() {
3258 return Ok(Value::Null);
3259 }
3260
3261 Err(format!(
3262 "Cannot perform numeric operation on {:?} and {:?}",
3263 left, right
3264 )
3265 .into())
3266 }
3267
3268 fn comparison_op<F1, F2>(
3270 &self,
3271 left: &Value,
3272 right: &Value,
3273 int_cmp: F1,
3274 float_cmp: F2,
3275 ) -> Result<Value>
3276 where
3277 F1: Fn(i64, i64) -> bool,
3278 F2: Fn(f64, f64) -> bool,
3279 {
3280 if let (Some(a), Some(b)) = (left.as_i64(), right.as_i64()) {
3282 return Ok(json!(int_cmp(a, b)));
3283 }
3284
3285 if let (Some(a), Some(b)) = (left.as_u64(), right.as_u64()) {
3287 return Ok(json!(int_cmp(a as i64, b as i64)));
3288 }
3289
3290 if let (Some(a), Some(b)) = (left.as_f64(), right.as_f64()) {
3292 return Ok(json!(float_cmp(a, b)));
3293 }
3294
3295 if left.is_null() || right.is_null() {
3297 return Ok(json!(false));
3298 }
3299
3300 Err(format!("Cannot compare {:?} and {:?}", left, right).into())
3301 }
3302
3303 fn value_to_bool(&self, value: &Value) -> bool {
3305 match value {
3306 Value::Null => false,
3307 Value::Bool(b) => *b,
3308 Value::Number(n) => {
3309 if let Some(i) = n.as_i64() {
3310 i != 0
3311 } else if let Some(f) = n.as_f64() {
3312 f != 0.0
3313 } else {
3314 true
3315 }
3316 }
3317 Value::String(s) => !s.is_empty(),
3318 Value::Array(arr) => !arr.is_empty(),
3319 Value::Object(obj) => !obj.is_empty(),
3320 }
3321 }
3322
3323 fn apply_cast(&self, value: &Value, to_type: &str) -> Result<Value> {
3325 match to_type {
3326 "i8" | "i16" | "i32" | "i64" | "isize" => {
3327 if let Some(n) = value.as_i64() {
3328 Ok(json!(n))
3329 } else if let Some(n) = value.as_u64() {
3330 Ok(json!(n as i64))
3331 } else if let Some(n) = value.as_f64() {
3332 Ok(json!(n as i64))
3333 } else if let Some(s) = value.as_str() {
3334 s.parse::<i64>()
3335 .map(|n| json!(n))
3336 .map_err(|e| format!("Cannot parse '{}' as integer: {}", s, e).into())
3337 } else {
3338 Err(format!("Cannot cast {:?} to {}", value, to_type).into())
3339 }
3340 }
3341 "u8" | "u16" | "u32" | "u64" | "usize" => {
3342 if let Some(n) = value.as_u64() {
3343 Ok(json!(n))
3344 } else if let Some(n) = value.as_i64() {
3345 Ok(json!(n as u64))
3346 } else if let Some(n) = value.as_f64() {
3347 Ok(json!(n as u64))
3348 } else if let Some(s) = value.as_str() {
3349 s.parse::<u64>().map(|n| json!(n)).map_err(|e| {
3350 format!("Cannot parse '{}' as unsigned integer: {}", s, e).into()
3351 })
3352 } else {
3353 Err(format!("Cannot cast {:?} to {}", value, to_type).into())
3354 }
3355 }
3356 "f32" | "f64" => {
3357 if let Some(n) = value.as_f64() {
3358 Ok(json!(n))
3359 } else if let Some(n) = value.as_i64() {
3360 Ok(json!(n as f64))
3361 } else if let Some(n) = value.as_u64() {
3362 Ok(json!(n as f64))
3363 } else if let Some(s) = value.as_str() {
3364 s.parse::<f64>()
3365 .map(|n| json!(n))
3366 .map_err(|e| format!("Cannot parse '{}' as float: {}", s, e).into())
3367 } else {
3368 Err(format!("Cannot cast {:?} to {}", value, to_type).into())
3369 }
3370 }
3371 "String" | "string" => Ok(json!(value.to_string())),
3372 "bool" => Ok(json!(self.value_to_bool(value))),
3373 _ => {
3374 Ok(value.clone())
3376 }
3377 }
3378 }
3379
3380 fn apply_method_call(&self, value: &Value, method: &str, args: &[Value]) -> Result<Value> {
3382 match method {
3383 "unwrap_or" => {
3384 if value.is_null() && !args.is_empty() {
3385 Ok(args[0].clone())
3386 } else {
3387 Ok(value.clone())
3388 }
3389 }
3390 "unwrap_or_default" => {
3391 if value.is_null() {
3392 Ok(json!(0))
3394 } else {
3395 Ok(value.clone())
3396 }
3397 }
3398 "is_some" => Ok(json!(!value.is_null())),
3399 "is_none" => Ok(json!(value.is_null())),
3400 "abs" => {
3401 if let Some(n) = value.as_i64() {
3402 Ok(json!(n.abs()))
3403 } else if let Some(n) = value.as_f64() {
3404 Ok(json!(n.abs()))
3405 } else {
3406 Err(format!("Cannot call abs() on {:?}", value).into())
3407 }
3408 }
3409 "len" => {
3410 if let Some(s) = value.as_str() {
3411 Ok(json!(s.len()))
3412 } else if let Some(arr) = value.as_array() {
3413 Ok(json!(arr.len()))
3414 } else if let Some(obj) = value.as_object() {
3415 Ok(json!(obj.len()))
3416 } else {
3417 Err(format!("Cannot call len() on {:?}", value).into())
3418 }
3419 }
3420 "to_string" => Ok(json!(value.to_string())),
3421 "min" => {
3422 if args.is_empty() {
3423 return Err("min() requires an argument".into());
3424 }
3425 let other = &args[0];
3426 if let (Some(a), Some(b)) = (value.as_i64(), other.as_i64()) {
3427 Ok(json!(a.min(b)))
3428 } else if let (Some(a), Some(b)) = (value.as_f64(), other.as_f64()) {
3429 Ok(json!(a.min(b)))
3430 } else {
3431 Err(format!("Cannot call min() on {:?} and {:?}", value, other).into())
3432 }
3433 }
3434 "max" => {
3435 if args.is_empty() {
3436 return Err("max() requires an argument".into());
3437 }
3438 let other = &args[0];
3439 if let (Some(a), Some(b)) = (value.as_i64(), other.as_i64()) {
3440 Ok(json!(a.max(b)))
3441 } else if let (Some(a), Some(b)) = (value.as_f64(), other.as_f64()) {
3442 Ok(json!(a.max(b)))
3443 } else {
3444 Err(format!("Cannot call max() on {:?} and {:?}", value, other).into())
3445 }
3446 }
3447 "saturating_add" => {
3448 if args.is_empty() {
3449 return Err("saturating_add() requires an argument".into());
3450 }
3451 let other = &args[0];
3452 if let (Some(a), Some(b)) = (value.as_i64(), other.as_i64()) {
3453 Ok(json!(a.saturating_add(b)))
3454 } else if let (Some(a), Some(b)) = (value.as_u64(), other.as_u64()) {
3455 Ok(json!(a.saturating_add(b)))
3456 } else {
3457 Err(format!(
3458 "Cannot call saturating_add() on {:?} and {:?}",
3459 value, other
3460 )
3461 .into())
3462 }
3463 }
3464 "saturating_sub" => {
3465 if args.is_empty() {
3466 return Err("saturating_sub() requires an argument".into());
3467 }
3468 let other = &args[0];
3469 if let (Some(a), Some(b)) = (value.as_i64(), other.as_i64()) {
3470 Ok(json!(a.saturating_sub(b)))
3471 } else if let (Some(a), Some(b)) = (value.as_u64(), other.as_u64()) {
3472 Ok(json!(a.saturating_sub(b)))
3473 } else {
3474 Err(format!(
3475 "Cannot call saturating_sub() on {:?} and {:?}",
3476 value, other
3477 )
3478 .into())
3479 }
3480 }
3481 _ => Err(format!("Unknown method call: {}()", method).into()),
3482 }
3483 }
3484
3485 pub fn evaluate_computed_fields_from_ast(
3488 &self,
3489 state: &mut Value,
3490 computed_field_specs: &[ComputedFieldSpec],
3491 ) -> Result<Vec<String>> {
3492 let mut updated_paths = Vec::new();
3493
3494 for spec in computed_field_specs {
3495 if let Ok(result) = self.evaluate_computed_expr(&spec.expression, state) {
3496 self.set_field_in_state(state, &spec.target_path, result)?;
3497 updated_paths.push(spec.target_path.clone());
3498 }
3499 }
3500
3501 Ok(updated_paths)
3502 }
3503
3504 fn set_field_in_state(&self, state: &mut Value, path: &str, value: Value) -> Result<()> {
3506 let segments: Vec<&str> = path.split('.').collect();
3507
3508 if segments.is_empty() {
3509 return Err("Empty path".into());
3510 }
3511
3512 let mut current = state;
3514 for (i, segment) in segments.iter().enumerate() {
3515 if i == segments.len() - 1 {
3516 if let Some(obj) = current.as_object_mut() {
3518 obj.insert(segment.to_string(), value);
3519 return Ok(());
3520 } else {
3521 return Err(format!("Cannot set field '{}' on non-object", segment).into());
3522 }
3523 } else {
3524 if !current.is_object() {
3526 *current = json!({});
3527 }
3528 let obj = current.as_object_mut().unwrap();
3529 current = obj.entry(segment.to_string()).or_insert_with(|| json!({}));
3530 }
3531 }
3532
3533 Ok(())
3534 }
3535
3536 pub fn create_evaluator_from_specs(
3539 specs: Vec<ComputedFieldSpec>,
3540 ) -> impl Fn(&mut Value) -> Result<()> + Send + Sync + 'static {
3541 move |state: &mut Value| {
3542 let vm = VmContext::new();
3545 vm.evaluate_computed_fields_from_ast(state, &specs)?;
3546 Ok(())
3547 }
3548 }
3549}
3550
3551impl Default for VmContext {
3552 fn default() -> Self {
3553 Self::new()
3554 }
3555}
3556
3557impl crate::resolvers::ReverseLookupUpdater for VmContext {
3559 fn update(&mut self, pda_address: String, seed_value: String) -> Vec<PendingAccountUpdate> {
3560 self.update_pda_reverse_lookup(0, "default_pda_lookup", pda_address, seed_value)
3562 .unwrap_or_else(|e| {
3563 tracing::error!("Failed to update PDA reverse lookup: {}", e);
3564 Vec::new()
3565 })
3566 }
3567
3568 fn flush_pending(&mut self, pda_address: &str) -> Vec<PendingAccountUpdate> {
3569 self.flush_pending_updates(0, pda_address)
3571 .unwrap_or_else(|e| {
3572 tracing::error!("Failed to flush pending updates: {}", e);
3573 Vec::new()
3574 })
3575 }
3576}
3577
3578#[cfg(test)]
3579mod tests {
3580 use super::*;
3581 use crate::ast::{BinaryOp, ComputedExpr, ComputedFieldSpec};
3582
3583 #[test]
3584 fn test_computed_field_preserves_integer_type() {
3585 let vm = VmContext::new();
3586
3587 let mut state = serde_json::json!({
3588 "trading": {
3589 "total_buy_volume": 20000000000_i64,
3590 "total_sell_volume": 17951316474_i64
3591 }
3592 });
3593
3594 let spec = ComputedFieldSpec {
3595 target_path: "trading.total_volume".to_string(),
3596 result_type: "Option<u64>".to_string(),
3597 expression: ComputedExpr::Binary {
3598 op: BinaryOp::Add,
3599 left: Box::new(ComputedExpr::UnwrapOr {
3600 expr: Box::new(ComputedExpr::FieldRef {
3601 path: "trading.total_buy_volume".to_string(),
3602 }),
3603 default: serde_json::json!(0),
3604 }),
3605 right: Box::new(ComputedExpr::UnwrapOr {
3606 expr: Box::new(ComputedExpr::FieldRef {
3607 path: "trading.total_sell_volume".to_string(),
3608 }),
3609 default: serde_json::json!(0),
3610 }),
3611 },
3612 };
3613
3614 vm.evaluate_computed_fields_from_ast(&mut state, &[spec])
3615 .unwrap();
3616
3617 let total_volume = state
3618 .get("trading")
3619 .and_then(|t| t.get("total_volume"))
3620 .expect("total_volume should exist");
3621
3622 let serialized = serde_json::to_string(total_volume).unwrap();
3623 assert!(
3624 !serialized.contains('.'),
3625 "Integer should not have decimal point: {}",
3626 serialized
3627 );
3628 assert_eq!(
3629 total_volume.as_i64(),
3630 Some(37951316474),
3631 "Value should be correct sum"
3632 );
3633 }
3634
3635 #[test]
3636 fn test_set_field_sum_preserves_integer_type() {
3637 let mut vm = VmContext::new();
3638 vm.registers[0] = serde_json::json!({});
3639 vm.registers[1] = serde_json::json!(20000000000_i64);
3640 vm.registers[2] = serde_json::json!(17951316474_i64);
3641
3642 vm.set_field_sum(0, "trading.total_buy_volume", 1).unwrap();
3643 vm.set_field_sum(0, "trading.total_sell_volume", 2).unwrap();
3644
3645 let state = &vm.registers[0];
3646 let buy_vol = state
3647 .get("trading")
3648 .and_then(|t| t.get("total_buy_volume"))
3649 .unwrap();
3650 let sell_vol = state
3651 .get("trading")
3652 .and_then(|t| t.get("total_sell_volume"))
3653 .unwrap();
3654
3655 let buy_serialized = serde_json::to_string(buy_vol).unwrap();
3656 let sell_serialized = serde_json::to_string(sell_vol).unwrap();
3657
3658 assert!(
3659 !buy_serialized.contains('.'),
3660 "Buy volume should not have decimal: {}",
3661 buy_serialized
3662 );
3663 assert!(
3664 !sell_serialized.contains('.'),
3665 "Sell volume should not have decimal: {}",
3666 sell_serialized
3667 );
3668 }
3669}