1use crate::ast::{
2 BinaryOp, ComparisonOp, ComputedExpr, ComputedFieldSpec, FieldPath, Transformation,
3};
4use crate::compiler::{MultiEntityBytecode, OpCode};
5use crate::Mutation;
6use dashmap::DashMap;
7use lru::LruCache;
8use serde_json::{json, Value};
9use std::collections::{HashMap, HashSet};
10use std::num::NonZeroUsize;
11
12#[cfg(feature = "otel")]
13use tracing::instrument;
14
15#[derive(Debug, Clone, Default)]
18pub struct UpdateContext {
19 pub slot: Option<u64>,
21 pub signature: Option<String>,
23 pub timestamp: Option<i64>,
26 pub write_version: Option<u64>,
29 pub txn_index: Option<u64>,
32 pub metadata: HashMap<String, Value>,
34}
35
36impl UpdateContext {
37 pub fn new(slot: u64, signature: String) -> Self {
39 Self {
40 slot: Some(slot),
41 signature: Some(signature),
42 timestamp: None,
43 write_version: None,
44 txn_index: None,
45 metadata: HashMap::new(),
46 }
47 }
48
49 pub fn with_timestamp(slot: u64, signature: String, timestamp: i64) -> Self {
51 Self {
52 slot: Some(slot),
53 signature: Some(signature),
54 timestamp: Some(timestamp),
55 write_version: None,
56 txn_index: None,
57 metadata: HashMap::new(),
58 }
59 }
60
61 pub fn new_account(slot: u64, signature: String, write_version: u64) -> Self {
63 Self {
64 slot: Some(slot),
65 signature: Some(signature),
66 timestamp: None,
67 write_version: Some(write_version),
68 txn_index: None,
69 metadata: HashMap::new(),
70 }
71 }
72
73 pub fn new_instruction(slot: u64, signature: String, txn_index: u64) -> Self {
75 Self {
76 slot: Some(slot),
77 signature: Some(signature),
78 timestamp: None,
79 write_version: None,
80 txn_index: Some(txn_index),
81 metadata: HashMap::new(),
82 }
83 }
84
85 pub fn timestamp(&self) -> i64 {
87 self.timestamp.unwrap_or_else(|| {
88 std::time::SystemTime::now()
89 .duration_since(std::time::UNIX_EPOCH)
90 .unwrap()
91 .as_secs() as i64
92 })
93 }
94
95 pub fn empty() -> Self {
97 Self::default()
98 }
99
100 pub fn with_metadata(mut self, key: String, value: Value) -> Self {
102 self.metadata.insert(key, value);
103 self
104 }
105
106 pub fn get_metadata(&self, key: &str) -> Option<&Value> {
108 self.metadata.get(key)
109 }
110
111 pub fn to_value(&self) -> Value {
113 let mut obj = serde_json::Map::new();
114 if let Some(slot) = self.slot {
115 obj.insert("slot".to_string(), json!(slot));
116 }
117 if let Some(ref sig) = self.signature {
118 obj.insert("signature".to_string(), json!(sig));
119 }
120 obj.insert("timestamp".to_string(), json!(self.timestamp()));
122 for (key, value) in &self.metadata {
123 obj.insert(key.clone(), value.clone());
124 }
125 Value::Object(obj)
126 }
127}
128
129pub type Register = usize;
130pub type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>;
131
132pub type RegisterValue = Value;
133
134pub trait ComputedFieldsEvaluator {
137 fn evaluate(&self, state: &mut Value) -> Result<()>;
138}
139
140const MAX_PENDING_UPDATES_TOTAL: usize = 2_500;
142const MAX_PENDING_UPDATES_PER_PDA: usize = 50;
143const PENDING_UPDATE_TTL_SECONDS: i64 = 300; const TEMPORAL_HISTORY_TTL_SECONDS: i64 = 300; const MAX_TEMPORAL_ENTRIES_PER_KEY: usize = 250;
148
149const DEFAULT_MAX_STATE_TABLE_ENTRIES: usize = 2_500;
151const DEFAULT_MAX_ARRAY_LENGTH: usize = 100;
152
153const DEFAULT_MAX_LOOKUP_INDEX_ENTRIES: usize = 2_500;
154
155const DEFAULT_MAX_VERSION_TRACKER_ENTRIES: usize = 2_500;
156
157const DEFAULT_MAX_TEMPORAL_INDEX_KEYS: usize = 2_500;
158
159const DEFAULT_MAX_PDA_REVERSE_LOOKUP_ENTRIES: usize = 2_500;
160
161fn estimate_json_size(value: &Value) -> usize {
163 match value {
164 Value::Null => 4,
165 Value::Bool(_) => 5,
166 Value::Number(_) => 8,
167 Value::String(s) => s.len() + 2,
168 Value::Array(arr) => 2 + arr.iter().map(|v| estimate_json_size(v) + 1).sum::<usize>(),
169 Value::Object(obj) => {
170 2 + obj
171 .iter()
172 .map(|(k, v)| k.len() + 3 + estimate_json_size(v) + 1)
173 .sum::<usize>()
174 }
175 }
176}
177
178#[derive(Debug, Clone)]
179pub struct CompiledPath {
180 pub segments: std::sync::Arc<[String]>,
181}
182
183impl CompiledPath {
184 pub fn new(path: &str) -> Self {
185 let segments: Vec<String> = path.split('.').map(|s| s.to_string()).collect();
186 CompiledPath {
187 segments: segments.into(),
188 }
189 }
190
191 fn segments(&self) -> &[String] {
192 &self.segments
193 }
194}
195
196#[derive(Debug, Clone)]
199pub enum FieldChange {
200 Replaced,
202 Appended(Vec<Value>),
204}
205
206#[derive(Debug, Clone, Default)]
209pub struct DirtyTracker {
210 changes: HashMap<String, FieldChange>,
211}
212
213impl DirtyTracker {
214 pub fn new() -> Self {
216 Self {
217 changes: HashMap::new(),
218 }
219 }
220
221 pub fn mark_replaced(&mut self, path: &str) {
223 self.changes.insert(path.to_string(), FieldChange::Replaced);
225 }
226
227 pub fn mark_appended(&mut self, path: &str, value: Value) {
229 match self.changes.get_mut(path) {
230 Some(FieldChange::Appended(values)) => {
231 values.push(value);
233 }
234 Some(FieldChange::Replaced) => {
235 }
238 None => {
239 self.changes
241 .insert(path.to_string(), FieldChange::Appended(vec![value]));
242 }
243 }
244 }
245
246 pub fn is_empty(&self) -> bool {
248 self.changes.is_empty()
249 }
250
251 pub fn len(&self) -> usize {
253 self.changes.len()
254 }
255
256 pub fn iter(&self) -> impl Iterator<Item = (&String, &FieldChange)> {
258 self.changes.iter()
259 }
260
261 pub fn dirty_paths(&self) -> HashSet<String> {
263 self.changes.keys().cloned().collect()
264 }
265
266 pub fn into_changes(self) -> HashMap<String, FieldChange> {
268 self.changes
269 }
270
271 pub fn changes(&self) -> &HashMap<String, FieldChange> {
273 &self.changes
274 }
275
276 pub fn appended_paths(&self) -> Vec<String> {
278 self.changes
279 .iter()
280 .filter_map(|(path, change)| match change {
281 FieldChange::Appended(_) => Some(path.clone()),
282 FieldChange::Replaced => None,
283 })
284 .collect()
285 }
286}
287
288pub struct VmContext {
289 registers: Vec<RegisterValue>,
290 states: HashMap<u32, StateTable>,
291 pub instructions_executed: u64,
292 pub cache_hits: u64,
293 path_cache: HashMap<String, CompiledPath>,
294 pub pda_cache_hits: u64,
295 pub pda_cache_misses: u64,
296 pub pending_queue_size: u64,
297 current_context: Option<UpdateContext>,
298 warnings: Vec<String>,
299}
300
301#[derive(Debug)]
302pub struct LookupIndex {
303 index: std::sync::Mutex<LruCache<String, Value>>,
304}
305
306impl LookupIndex {
307 pub fn new() -> Self {
308 Self::with_capacity(DEFAULT_MAX_LOOKUP_INDEX_ENTRIES)
309 }
310
311 pub fn with_capacity(capacity: usize) -> Self {
312 LookupIndex {
313 index: std::sync::Mutex::new(LruCache::new(
314 NonZeroUsize::new(capacity).expect("capacity must be > 0"),
315 )),
316 }
317 }
318
319 pub fn lookup(&self, lookup_value: &Value) -> Option<Value> {
320 let key = value_to_cache_key(lookup_value);
321 self.index.lock().unwrap().get(&key).cloned()
322 }
323
324 pub fn insert(&self, lookup_value: Value, primary_key: Value) {
325 let key = value_to_cache_key(&lookup_value);
326 self.index.lock().unwrap().put(key, primary_key);
327 }
328
329 pub fn len(&self) -> usize {
330 self.index.lock().unwrap().len()
331 }
332
333 pub fn is_empty(&self) -> bool {
334 self.index.lock().unwrap().is_empty()
335 }
336}
337
338impl Default for LookupIndex {
339 fn default() -> Self {
340 Self::new()
341 }
342}
343
344fn value_to_cache_key(value: &Value) -> String {
345 match value {
346 Value::String(s) => s.clone(),
347 Value::Number(n) => n.to_string(),
348 Value::Bool(b) => b.to_string(),
349 Value::Null => "null".to_string(),
350 _ => serde_json::to_string(value).unwrap_or_else(|_| "unknown".to_string()),
351 }
352}
353
354#[derive(Debug)]
355pub struct TemporalIndex {
356 index: std::sync::Mutex<LruCache<String, Vec<(Value, i64)>>>,
357}
358
359impl Default for TemporalIndex {
360 fn default() -> Self {
361 Self::new()
362 }
363}
364
365impl TemporalIndex {
366 pub fn new() -> Self {
367 Self::with_capacity(DEFAULT_MAX_TEMPORAL_INDEX_KEYS)
368 }
369
370 pub fn with_capacity(capacity: usize) -> Self {
371 TemporalIndex {
372 index: std::sync::Mutex::new(LruCache::new(
373 NonZeroUsize::new(capacity).expect("capacity must be > 0"),
374 )),
375 }
376 }
377
378 pub fn lookup(&self, lookup_value: &Value, timestamp: i64) -> Option<Value> {
379 let key = value_to_cache_key(lookup_value);
380 let mut cache = self.index.lock().unwrap();
381 if let Some(entries) = cache.get(&key) {
382 for i in (0..entries.len()).rev() {
383 if entries[i].1 <= timestamp {
384 return Some(entries[i].0.clone());
385 }
386 }
387 }
388 None
389 }
390
391 pub fn lookup_latest(&self, lookup_value: &Value) -> Option<Value> {
392 let key = value_to_cache_key(lookup_value);
393 let mut cache = self.index.lock().unwrap();
394 if let Some(entries) = cache.get(&key) {
395 if let Some(last) = entries.last() {
396 return Some(last.0.clone());
397 }
398 }
399 None
400 }
401
402 pub fn insert(&self, lookup_value: Value, primary_key: Value, timestamp: i64) {
403 let key = value_to_cache_key(&lookup_value);
404 let mut cache = self.index.lock().unwrap();
405
406 let entries = cache.get_or_insert_mut(key, Vec::new);
407 entries.push((primary_key, timestamp));
408 entries.sort_by_key(|(_, ts)| *ts);
409
410 let cutoff = timestamp - TEMPORAL_HISTORY_TTL_SECONDS;
411 entries.retain(|(_, ts)| *ts >= cutoff);
412
413 if entries.len() > MAX_TEMPORAL_ENTRIES_PER_KEY {
414 let excess = entries.len() - MAX_TEMPORAL_ENTRIES_PER_KEY;
415 entries.drain(0..excess);
416 }
417 }
418
419 pub fn len(&self) -> usize {
420 self.index.lock().unwrap().len()
421 }
422
423 pub fn is_empty(&self) -> bool {
424 self.index.lock().unwrap().is_empty()
425 }
426
427 pub fn total_entries(&self) -> usize {
428 self.index
429 .lock()
430 .unwrap()
431 .iter()
432 .map(|(_, entries)| entries.len())
433 .sum()
434 }
435
436 pub fn cleanup_expired(&self, cutoff_timestamp: i64) -> usize {
437 let mut cache = self.index.lock().unwrap();
438 let mut total_removed = 0;
439
440 for (_, entries) in cache.iter_mut() {
441 let original_len = entries.len();
442 entries.retain(|(_, ts)| *ts >= cutoff_timestamp);
443 total_removed += original_len - entries.len();
444 }
445
446 total_removed
447 }
448}
449
450#[derive(Debug)]
451pub struct PdaReverseLookup {
452 index: LruCache<String, String>,
454}
455
456impl PdaReverseLookup {
457 pub fn new(capacity: usize) -> Self {
458 PdaReverseLookup {
459 index: LruCache::new(NonZeroUsize::new(capacity).unwrap()),
460 }
461 }
462
463 pub fn lookup(&mut self, pda_address: &str) -> Option<String> {
464 self.index.get(pda_address).cloned()
465 }
466
467 pub fn insert(&mut self, pda_address: String, seed_value: String) -> Option<String> {
468 let evicted = if self.index.len() >= self.index.cap().get() {
469 self.index.peek_lru().map(|(k, _)| k.clone())
470 } else {
471 None
472 };
473
474 self.index.put(pda_address, seed_value);
475 evicted
476 }
477
478 pub fn len(&self) -> usize {
479 self.index.len()
480 }
481
482 pub fn is_empty(&self) -> bool {
483 self.index.is_empty()
484 }
485}
486
487#[derive(Debug, Clone)]
489pub struct QueuedAccountUpdate {
490 pub pda_address: String,
491 pub account_type: String,
492 pub account_data: Value,
493 pub slot: u64,
494 pub write_version: u64,
495 pub signature: String,
496}
497
498#[derive(Debug, Clone)]
500pub struct PendingAccountUpdate {
501 pub account_type: String,
502 pub pda_address: String,
503 pub account_data: Value,
504 pub slot: u64,
505 pub write_version: u64,
506 pub signature: String,
507 pub queued_at: i64,
508}
509
510#[derive(Debug, Clone)]
511pub struct PendingQueueStats {
512 pub total_updates: usize,
513 pub unique_pdas: usize,
514 pub oldest_age_seconds: i64,
515 pub largest_pda_queue_size: usize,
516 pub estimated_memory_bytes: usize,
517}
518
519#[derive(Debug, Clone, Default)]
520pub struct VmMemoryStats {
521 pub state_table_entity_count: usize,
522 pub state_table_max_entries: usize,
523 pub state_table_at_capacity: bool,
524 pub lookup_index_count: usize,
525 pub lookup_index_total_entries: usize,
526 pub temporal_index_count: usize,
527 pub temporal_index_total_entries: usize,
528 pub pda_reverse_lookup_count: usize,
529 pub pda_reverse_lookup_total_entries: usize,
530 pub version_tracker_entries: usize,
531 pub pending_queue_stats: Option<PendingQueueStats>,
532 pub path_cache_size: usize,
533}
534
535#[derive(Debug, Clone, Default)]
536pub struct CleanupResult {
537 pub pending_updates_removed: usize,
538 pub temporal_entries_removed: usize,
539}
540
541#[derive(Debug, Clone)]
542pub struct CapacityWarning {
543 pub current_entries: usize,
544 pub max_entries: usize,
545 pub entries_over_limit: usize,
546}
547
548#[derive(Debug, Clone)]
549pub struct StateTableConfig {
550 pub max_entries: usize,
551 pub max_array_length: usize,
552}
553
554impl Default for StateTableConfig {
555 fn default() -> Self {
556 Self {
557 max_entries: DEFAULT_MAX_STATE_TABLE_ENTRIES,
558 max_array_length: DEFAULT_MAX_ARRAY_LENGTH,
559 }
560 }
561}
562
563#[derive(Debug)]
564pub struct VersionTracker {
565 cache: std::sync::Mutex<LruCache<String, (u64, u64)>>,
566}
567
568impl VersionTracker {
569 pub fn new() -> Self {
570 Self::with_capacity(DEFAULT_MAX_VERSION_TRACKER_ENTRIES)
571 }
572
573 pub fn with_capacity(capacity: usize) -> Self {
574 VersionTracker {
575 cache: std::sync::Mutex::new(LruCache::new(
576 NonZeroUsize::new(capacity).expect("capacity must be > 0"),
577 )),
578 }
579 }
580
581 fn make_key(primary_key: &Value, event_type: &str) -> String {
582 format!("{}:{}", primary_key, event_type)
583 }
584
585 pub fn get(&self, primary_key: &Value, event_type: &str) -> Option<(u64, u64)> {
586 let key = Self::make_key(primary_key, event_type);
587 self.cache.lock().unwrap().get(&key).copied()
588 }
589
590 pub fn insert(&self, primary_key: &Value, event_type: &str, slot: u64, ordering_value: u64) {
591 let key = Self::make_key(primary_key, event_type);
592 self.cache.lock().unwrap().put(key, (slot, ordering_value));
593 }
594
595 pub fn len(&self) -> usize {
596 self.cache.lock().unwrap().len()
597 }
598
599 pub fn is_empty(&self) -> bool {
600 self.cache.lock().unwrap().is_empty()
601 }
602}
603
604impl Default for VersionTracker {
605 fn default() -> Self {
606 Self::new()
607 }
608}
609
610#[derive(Debug)]
611pub struct StateTable {
612 pub data: DashMap<Value, Value>,
613 access_times: DashMap<Value, i64>,
614 pub lookup_indexes: HashMap<String, LookupIndex>,
615 pub temporal_indexes: HashMap<String, TemporalIndex>,
616 pub pda_reverse_lookups: HashMap<String, PdaReverseLookup>,
617 pub pending_updates: DashMap<String, Vec<PendingAccountUpdate>>,
618 version_tracker: VersionTracker,
619 config: StateTableConfig,
620 #[cfg_attr(not(feature = "otel"), allow(dead_code))]
621 entity_name: String,
622}
623
624impl StateTable {
625 pub fn is_at_capacity(&self) -> bool {
626 self.data.len() >= self.config.max_entries
627 }
628
629 pub fn entries_over_limit(&self) -> usize {
630 self.data.len().saturating_sub(self.config.max_entries)
631 }
632
633 pub fn max_array_length(&self) -> usize {
634 self.config.max_array_length
635 }
636
637 fn touch(&self, key: &Value) {
638 let now = std::time::SystemTime::now()
639 .duration_since(std::time::UNIX_EPOCH)
640 .unwrap()
641 .as_secs() as i64;
642 self.access_times.insert(key.clone(), now);
643 }
644
645 fn evict_lru(&self, count: usize) -> usize {
646 if count == 0 || self.data.is_empty() {
647 return 0;
648 }
649
650 let mut entries: Vec<(Value, i64)> = self
651 .access_times
652 .iter()
653 .map(|entry| (entry.key().clone(), *entry.value()))
654 .collect();
655
656 entries.sort_by_key(|(_, ts)| *ts);
657
658 let to_evict: Vec<Value> = entries.iter().take(count).map(|(k, _)| k.clone()).collect();
659
660 let mut evicted = 0;
661 for key in to_evict {
662 self.data.remove(&key);
663 self.access_times.remove(&key);
664 evicted += 1;
665 }
666
667 #[cfg(feature = "otel")]
668 if evicted > 0 {
669 crate::vm_metrics::record_state_table_eviction(evicted as u64, &self.entity_name);
670 }
671
672 evicted
673 }
674
675 pub fn insert_with_eviction(&self, key: Value, value: Value) {
676 if self.data.len() >= self.config.max_entries && !self.data.contains_key(&key) {
677 #[cfg(feature = "otel")]
678 crate::vm_metrics::record_state_table_at_capacity(&self.entity_name);
679 let to_evict = (self.data.len() + 1).saturating_sub(self.config.max_entries);
680 self.evict_lru(to_evict.max(1));
681 }
682 self.data.insert(key.clone(), value);
683 self.touch(&key);
684 }
685
686 pub fn get_and_touch(&self, key: &Value) -> Option<Value> {
687 let result = self.data.get(key).map(|v| v.clone());
688 if result.is_some() {
689 self.touch(key);
690 }
691 result
692 }
693
694 pub fn is_fresh_update(
701 &self,
702 primary_key: &Value,
703 event_type: &str,
704 slot: u64,
705 ordering_value: u64,
706 ) -> bool {
707 let dominated = self
708 .version_tracker
709 .get(primary_key, event_type)
710 .map(|(last_slot, last_version)| (slot, ordering_value) <= (last_slot, last_version))
711 .unwrap_or(false);
712
713 if dominated {
714 return false;
715 }
716
717 self.version_tracker
718 .insert(primary_key, event_type, slot, ordering_value);
719 true
720 }
721}
722
723impl VmContext {
724 pub fn new() -> Self {
725 let mut vm = VmContext {
726 registers: vec![Value::Null; 256],
727 states: HashMap::new(),
728 instructions_executed: 0,
729 cache_hits: 0,
730 path_cache: HashMap::new(),
731 pda_cache_hits: 0,
732 pda_cache_misses: 0,
733 pending_queue_size: 0,
734 current_context: None,
735 warnings: Vec::new(),
736 };
737 vm.states.insert(
738 0,
739 StateTable {
740 data: DashMap::new(),
741 access_times: DashMap::new(),
742 lookup_indexes: HashMap::new(),
743 temporal_indexes: HashMap::new(),
744 pda_reverse_lookups: HashMap::new(),
745 pending_updates: DashMap::new(),
746 version_tracker: VersionTracker::new(),
747 config: StateTableConfig::default(),
748 entity_name: "default".to_string(),
749 },
750 );
751 vm
752 }
753
754 pub fn new_with_config(state_config: StateTableConfig) -> Self {
755 let mut vm = VmContext {
756 registers: vec![Value::Null; 256],
757 states: HashMap::new(),
758 instructions_executed: 0,
759 cache_hits: 0,
760 path_cache: HashMap::new(),
761 pda_cache_hits: 0,
762 pda_cache_misses: 0,
763 pending_queue_size: 0,
764 current_context: None,
765 warnings: Vec::new(),
766 };
767 vm.states.insert(
768 0,
769 StateTable {
770 data: DashMap::new(),
771 access_times: DashMap::new(),
772 lookup_indexes: HashMap::new(),
773 temporal_indexes: HashMap::new(),
774 pda_reverse_lookups: HashMap::new(),
775 pending_updates: DashMap::new(),
776 version_tracker: VersionTracker::new(),
777 config: state_config,
778 entity_name: "default".to_string(),
779 },
780 );
781 vm
782 }
783
784 pub fn get_state_table_mut(&mut self, state_id: u32) -> Option<&mut StateTable> {
787 self.states.get_mut(&state_id)
788 }
789
790 pub fn registers_mut(&mut self) -> &mut Vec<RegisterValue> {
792 &mut self.registers
793 }
794
795 pub fn path_cache(&self) -> &HashMap<String, CompiledPath> {
797 &self.path_cache
798 }
799
800 pub fn current_context(&self) -> Option<&UpdateContext> {
802 self.current_context.as_ref()
803 }
804
805 fn add_warning(&mut self, msg: String) {
806 self.warnings.push(msg);
807 }
808
809 pub fn take_warnings(&mut self) -> Vec<String> {
810 std::mem::take(&mut self.warnings)
811 }
812
813 pub fn has_warnings(&self) -> bool {
814 !self.warnings.is_empty()
815 }
816
817 pub fn update_state_from_register(
818 &mut self,
819 state_id: u32,
820 key: Value,
821 register: Register,
822 ) -> Result<()> {
823 let state = self.states.get(&state_id).ok_or("State table not found")?;
824 let value = self.registers[register].clone();
825 state.insert_with_eviction(key, value);
826 Ok(())
827 }
828
829 fn reset_registers(&mut self) {
830 for reg in &mut self.registers {
831 *reg = Value::Null;
832 }
833 }
834
835 pub fn extract_partial_state(
837 &self,
838 state_reg: Register,
839 dirty_fields: &HashSet<String>,
840 ) -> Result<Value> {
841 let full_state = &self.registers[state_reg];
842
843 if dirty_fields.is_empty() {
844 return Ok(json!({}));
845 }
846
847 let mut partial = serde_json::Map::new();
848
849 for path in dirty_fields {
850 let segments: Vec<&str> = path.split('.').collect();
851
852 let mut current = full_state;
853 let mut found = true;
854
855 for segment in &segments {
856 match current.get(segment) {
857 Some(v) => current = v,
858 None => {
859 found = false;
860 break;
861 }
862 }
863 }
864
865 if !found {
866 continue;
867 }
868
869 let mut target = &mut partial;
870 for (i, segment) in segments.iter().enumerate() {
871 if i == segments.len() - 1 {
872 target.insert(segment.to_string(), current.clone());
873 } else {
874 target
875 .entry(segment.to_string())
876 .or_insert_with(|| json!({}));
877 target = target
878 .get_mut(*segment)
879 .and_then(|v| v.as_object_mut())
880 .ok_or("Failed to build nested structure")?;
881 }
882 }
883 }
884
885 Ok(Value::Object(partial))
886 }
887
888 pub fn extract_partial_state_with_tracker(
892 &self,
893 state_reg: Register,
894 tracker: &DirtyTracker,
895 ) -> Result<Value> {
896 let full_state = &self.registers[state_reg];
897
898 if tracker.is_empty() {
899 return Ok(json!({}));
900 }
901
902 let mut partial = serde_json::Map::new();
903
904 for (path, change) in tracker.iter() {
905 let segments: Vec<&str> = path.split('.').collect();
906
907 let value_to_insert = match change {
908 FieldChange::Replaced => {
909 let mut current = full_state;
910 let mut found = true;
911
912 for segment in &segments {
913 match current.get(*segment) {
914 Some(v) => current = v,
915 None => {
916 found = false;
917 break;
918 }
919 }
920 }
921
922 if !found {
923 continue;
924 }
925 current.clone()
926 }
927 FieldChange::Appended(values) => Value::Array(values.clone()),
928 };
929
930 let mut target = &mut partial;
931 for (i, segment) in segments.iter().enumerate() {
932 if i == segments.len() - 1 {
933 target.insert(segment.to_string(), value_to_insert.clone());
934 } else {
935 target
936 .entry(segment.to_string())
937 .or_insert_with(|| json!({}));
938 target = target
939 .get_mut(*segment)
940 .and_then(|v| v.as_object_mut())
941 .ok_or("Failed to build nested structure")?;
942 }
943 }
944 }
945
946 Ok(Value::Object(partial))
947 }
948
949 fn get_compiled_path(&mut self, path: &str) -> CompiledPath {
950 if let Some(compiled) = self.path_cache.get(path) {
951 self.cache_hits += 1;
952 #[cfg(feature = "otel")]
953 crate::vm_metrics::record_path_cache_hit();
954 return compiled.clone();
955 }
956 #[cfg(feature = "otel")]
957 crate::vm_metrics::record_path_cache_miss();
958 let compiled = CompiledPath::new(path);
959 self.path_cache.insert(path.to_string(), compiled.clone());
960 compiled
961 }
962
963 #[cfg_attr(feature = "otel", instrument(
965 name = "vm.process_event",
966 skip(self, bytecode, event_value, log),
967 level = "info",
968 fields(
969 event_type = %event_type,
970 slot = context.as_ref().and_then(|c| c.slot),
971 )
972 ))]
973 pub fn process_event(
974 &mut self,
975 bytecode: &MultiEntityBytecode,
976 event_value: Value,
977 event_type: &str,
978 context: Option<&UpdateContext>,
979 mut log: Option<&mut crate::canonical_log::CanonicalLog>,
980 ) -> Result<Vec<Mutation>> {
981 self.current_context = context.cloned();
982
983 let mut event_value = event_value;
984 if let Some(ctx) = context {
985 if let Some(obj) = event_value.as_object_mut() {
986 obj.insert("__update_context".to_string(), ctx.to_value());
987 }
988 }
989
990 let mut all_mutations = Vec::new();
991
992 if let Some(entity_names) = bytecode.event_routing.get(event_type) {
993 for entity_name in entity_names {
994 if let Some(entity_bytecode) = bytecode.entities.get(entity_name) {
995 if let Some(handler) = entity_bytecode.handlers.get(event_type) {
996 if let Some(ref mut log) = log {
997 log.set("entity", entity_name.clone());
998 log.inc("handlers", 1);
999 }
1000
1001 let opcodes_before = self.instructions_executed;
1002 let cache_before = self.cache_hits;
1003 let pda_hits_before = self.pda_cache_hits;
1004 let pda_misses_before = self.pda_cache_misses;
1005
1006 let mutations = self.execute_handler(
1007 handler,
1008 &event_value,
1009 event_type,
1010 entity_bytecode.state_id,
1011 entity_name,
1012 entity_bytecode.computed_fields_evaluator.as_ref(),
1013 )?;
1014
1015 if let Some(ref mut log) = log {
1016 log.inc(
1017 "opcodes",
1018 (self.instructions_executed - opcodes_before) as i64,
1019 );
1020 log.inc("cache_hits", (self.cache_hits - cache_before) as i64);
1021 log.inc("pda_hits", (self.pda_cache_hits - pda_hits_before) as i64);
1022 log.inc(
1023 "pda_misses",
1024 (self.pda_cache_misses - pda_misses_before) as i64,
1025 );
1026 }
1027
1028 all_mutations.extend(mutations);
1029 } else if let Some(ref mut log) = log {
1030 log.set("skip_reason", "no_handler");
1031 }
1032 } else if let Some(ref mut log) = log {
1033 log.set("skip_reason", "entity_not_found");
1034 }
1035 }
1036 } else if let Some(ref mut log) = log {
1037 log.set("skip_reason", "no_event_routing");
1038 }
1039
1040 if let Some(log) = log {
1041 log.set("mutations", all_mutations.len() as i64);
1042 if let Some(first) = all_mutations.first() {
1043 if let Some(key_str) = first.key.as_str() {
1044 log.set("primary_key", key_str);
1045 } else if let Some(key_num) = first.key.as_u64() {
1046 log.set("primary_key", key_num as i64);
1047 }
1048 }
1049 if let Some(state) = self.states.get(&0) {
1050 log.set("state_table_size", state.data.len() as i64);
1051 }
1052
1053 let warnings = self.take_warnings();
1054 if !warnings.is_empty() {
1055 log.set("warnings", warnings.len() as i64);
1056 log.set(
1057 "warning_messages",
1058 Value::Array(warnings.into_iter().map(Value::String).collect()),
1059 );
1060 log.set_level(crate::canonical_log::LogLevel::Warn);
1061 }
1062 } else {
1063 self.warnings.clear();
1064 }
1065
1066 Ok(all_mutations)
1067 }
1068
1069 pub fn process_any(
1070 &mut self,
1071 bytecode: &MultiEntityBytecode,
1072 any: prost_types::Any,
1073 ) -> Result<Vec<Mutation>> {
1074 let (event_value, event_type) = bytecode.proto_router.decode(any)?;
1075 self.process_event(bytecode, event_value, &event_type, None, None)
1076 }
1077
1078 #[cfg_attr(feature = "otel", instrument(
1079 name = "vm.execute_handler",
1080 skip(self, handler, event_value, entity_evaluator),
1081 level = "debug",
1082 fields(
1083 event_type = %event_type,
1084 handler_opcodes = handler.len(),
1085 )
1086 ))]
1087 #[allow(clippy::type_complexity)]
1088 fn execute_handler(
1089 &mut self,
1090 handler: &[OpCode],
1091 event_value: &Value,
1092 event_type: &str,
1093 override_state_id: u32,
1094 entity_name: &str,
1095 entity_evaluator: Option<&Box<dyn Fn(&mut Value) -> Result<()> + Send + Sync>>,
1096 ) -> Result<Vec<Mutation>> {
1097 self.reset_registers();
1098
1099 let mut pc: usize = 0;
1100 let mut output = Vec::new();
1101 let mut dirty_tracker = DirtyTracker::new();
1102
1103 while pc < handler.len() {
1104 match &handler[pc] {
1105 OpCode::LoadEventField {
1106 path,
1107 dest,
1108 default,
1109 } => {
1110 let value = self.load_field(event_value, path, default.as_ref())?;
1111 self.registers[*dest] = value;
1112 pc += 1;
1113 }
1114 OpCode::LoadConstant { value, dest } => {
1115 self.registers[*dest] = value.clone();
1116 pc += 1;
1117 }
1118 OpCode::CopyRegister { source, dest } => {
1119 self.registers[*dest] = self.registers[*source].clone();
1120 pc += 1;
1121 }
1122 OpCode::CopyRegisterIfNull { source, dest } => {
1123 if self.registers[*dest].is_null() {
1124 self.registers[*dest] = self.registers[*source].clone();
1125 }
1126 pc += 1;
1127 }
1128 OpCode::GetEventType { dest } => {
1129 self.registers[*dest] = json!(event_type);
1130 pc += 1;
1131 }
1132 OpCode::CreateObject { dest } => {
1133 self.registers[*dest] = json!({});
1134 pc += 1;
1135 }
1136 OpCode::SetField {
1137 object,
1138 path,
1139 value,
1140 } => {
1141 self.set_field_auto_vivify(*object, path, *value)?;
1142 dirty_tracker.mark_replaced(path);
1143 pc += 1;
1144 }
1145 OpCode::SetFields { object, fields } => {
1146 for (path, value_reg) in fields {
1147 self.set_field_auto_vivify(*object, path, *value_reg)?;
1148 dirty_tracker.mark_replaced(path);
1149 }
1150 pc += 1;
1151 }
1152 OpCode::GetField { object, path, dest } => {
1153 let value = self.get_field(*object, path)?;
1154 self.registers[*dest] = value;
1155 pc += 1;
1156 }
1157 OpCode::ReadOrInitState {
1158 state_id: _,
1159 key,
1160 default,
1161 dest,
1162 } => {
1163 let actual_state_id = override_state_id;
1164 let entity_name_owned = entity_name.to_string();
1165 self.states
1166 .entry(actual_state_id)
1167 .or_insert_with(|| StateTable {
1168 data: DashMap::new(),
1169 access_times: DashMap::new(),
1170 lookup_indexes: HashMap::new(),
1171 temporal_indexes: HashMap::new(),
1172 pda_reverse_lookups: HashMap::new(),
1173 pending_updates: DashMap::new(),
1174 version_tracker: VersionTracker::new(),
1175 config: StateTableConfig::default(),
1176 entity_name: entity_name_owned,
1177 });
1178 let key_value = self.registers[*key].clone();
1179 let warn_null_key = key_value.is_null()
1180 && event_type.ends_with("State")
1181 && !event_type.ends_with("IxState");
1182
1183 if warn_null_key {
1184 self.add_warning(format!(
1185 "ReadOrInitState: key register {} is NULL for account state, event_type={}",
1186 key, event_type
1187 ));
1188 }
1189
1190 let state = self
1191 .states
1192 .get(&actual_state_id)
1193 .ok_or("State table not found")?;
1194
1195 if !key_value.is_null() {
1196 if let Some(ctx) = &self.current_context {
1197 let ordering_value = ctx.write_version.or(ctx.txn_index);
1198 if let (Some(slot), Some(version)) = (ctx.slot, ordering_value) {
1199 if !state.is_fresh_update(&key_value, event_type, slot, version) {
1200 self.add_warning(format!(
1201 "Stale update skipped: slot={}, version={}",
1202 slot, version
1203 ));
1204 return Ok(Vec::new());
1205 }
1206 }
1207 }
1208 }
1209 let value = state
1210 .get_and_touch(&key_value)
1211 .unwrap_or_else(|| default.clone());
1212
1213 self.registers[*dest] = value;
1214 pc += 1;
1215 }
1216 OpCode::UpdateState {
1217 state_id: _,
1218 key,
1219 value,
1220 } => {
1221 let actual_state_id = override_state_id;
1222 let state = self
1223 .states
1224 .get(&actual_state_id)
1225 .ok_or("State table not found")?;
1226 let key_value = self.registers[*key].clone();
1227 let value_data = self.registers[*value].clone();
1228
1229 state.insert_with_eviction(key_value, value_data);
1230 pc += 1;
1231 }
1232 OpCode::AppendToArray {
1233 object,
1234 path,
1235 value,
1236 } => {
1237 let appended_value = self.registers[*value].clone();
1238 let max_len = self
1239 .states
1240 .get(&override_state_id)
1241 .map(|s| s.max_array_length())
1242 .unwrap_or(DEFAULT_MAX_ARRAY_LENGTH);
1243 self.append_to_array(*object, path, *value, max_len)?;
1244 dirty_tracker.mark_appended(path, appended_value);
1245 pc += 1;
1246 }
1247 OpCode::GetCurrentTimestamp { dest } => {
1248 let timestamp = std::time::SystemTime::now()
1249 .duration_since(std::time::UNIX_EPOCH)
1250 .unwrap()
1251 .as_secs() as i64;
1252 self.registers[*dest] = json!(timestamp);
1253 pc += 1;
1254 }
1255 OpCode::CreateEvent { dest, event_value } => {
1256 let timestamp = std::time::SystemTime::now()
1257 .duration_since(std::time::UNIX_EPOCH)
1258 .unwrap()
1259 .as_secs() as i64;
1260
1261 let mut event_data = self.registers[*event_value].clone();
1263 if let Some(obj) = event_data.as_object_mut() {
1264 obj.remove("__update_context");
1265 }
1266
1267 let mut event = serde_json::Map::new();
1269 event.insert("timestamp".to_string(), json!(timestamp));
1270 event.insert("data".to_string(), event_data);
1271
1272 if let Some(ref ctx) = self.current_context {
1274 if let Some(slot) = ctx.slot {
1275 event.insert("slot".to_string(), json!(slot));
1276 }
1277 if let Some(ref signature) = ctx.signature {
1278 event.insert("signature".to_string(), json!(signature));
1279 }
1280 }
1281
1282 self.registers[*dest] = Value::Object(event);
1283 pc += 1;
1284 }
1285 OpCode::CreateCapture {
1286 dest,
1287 capture_value,
1288 } => {
1289 let timestamp = std::time::SystemTime::now()
1290 .duration_since(std::time::UNIX_EPOCH)
1291 .unwrap()
1292 .as_secs() as i64;
1293
1294 let capture_data = self.registers[*capture_value].clone();
1296
1297 let account_address = event_value
1299 .get("__account_address")
1300 .and_then(|v| v.as_str())
1301 .unwrap_or("")
1302 .to_string();
1303
1304 let mut capture = serde_json::Map::new();
1306 capture.insert("timestamp".to_string(), json!(timestamp));
1307 capture.insert("account_address".to_string(), json!(account_address));
1308 capture.insert("data".to_string(), capture_data);
1309
1310 if let Some(ref ctx) = self.current_context {
1312 if let Some(slot) = ctx.slot {
1313 capture.insert("slot".to_string(), json!(slot));
1314 }
1315 if let Some(ref signature) = ctx.signature {
1316 capture.insert("signature".to_string(), json!(signature));
1317 }
1318 }
1319
1320 self.registers[*dest] = Value::Object(capture);
1321 pc += 1;
1322 }
1323 OpCode::Transform {
1324 source,
1325 dest,
1326 transformation,
1327 } => {
1328 if source == dest {
1329 self.transform_in_place(*source, transformation)?;
1330 } else {
1331 let source_value = &self.registers[*source];
1332 let value = self.apply_transformation(source_value, transformation)?;
1333 self.registers[*dest] = value;
1334 }
1335 pc += 1;
1336 }
1337 OpCode::EmitMutation {
1338 entity_name,
1339 key,
1340 state,
1341 } => {
1342 let primary_key = self.registers[*key].clone();
1343
1344 if primary_key.is_null() || dirty_tracker.is_empty() {
1345 let reason = if dirty_tracker.is_empty() {
1346 "no_fields_modified"
1347 } else {
1348 "null_primary_key"
1349 };
1350 self.add_warning(format!(
1351 "Skipping mutation for entity '{}': {} (dirty_fields={})",
1352 entity_name,
1353 reason,
1354 dirty_tracker.len()
1355 ));
1356 } else {
1357 let patch =
1358 self.extract_partial_state_with_tracker(*state, &dirty_tracker)?;
1359 let append = dirty_tracker.appended_paths();
1360 let mutation = Mutation {
1361 export: entity_name.clone(),
1362 key: primary_key,
1363 patch,
1364 append,
1365 };
1366 output.push(mutation);
1367 }
1368 pc += 1;
1369 }
1370 OpCode::SetFieldIfNull {
1371 object,
1372 path,
1373 value,
1374 } => {
1375 let was_set = self.set_field_if_null(*object, path, *value)?;
1376 if was_set {
1377 dirty_tracker.mark_replaced(path);
1378 }
1379 pc += 1;
1380 }
1381 OpCode::SetFieldMax {
1382 object,
1383 path,
1384 value,
1385 } => {
1386 let was_updated = self.set_field_max(*object, path, *value)?;
1387 if was_updated {
1388 dirty_tracker.mark_replaced(path);
1389 }
1390 pc += 1;
1391 }
1392 OpCode::UpdateTemporalIndex {
1393 state_id: _,
1394 index_name,
1395 lookup_value,
1396 primary_key,
1397 timestamp,
1398 } => {
1399 let actual_state_id = override_state_id;
1400 let state = self
1401 .states
1402 .get_mut(&actual_state_id)
1403 .ok_or("State table not found")?;
1404 let index = state
1405 .temporal_indexes
1406 .entry(index_name.clone())
1407 .or_insert_with(TemporalIndex::new);
1408
1409 let lookup_val = self.registers[*lookup_value].clone();
1410 let pk_val = self.registers[*primary_key].clone();
1411 let ts_val = if let Some(val) = self.registers[*timestamp].as_i64() {
1412 val
1413 } else if let Some(val) = self.registers[*timestamp].as_u64() {
1414 val as i64
1415 } else {
1416 return Err(format!(
1417 "Timestamp must be a number (i64 or u64), got: {:?}",
1418 self.registers[*timestamp]
1419 )
1420 .into());
1421 };
1422
1423 index.insert(lookup_val, pk_val, ts_val);
1424 pc += 1;
1425 }
1426 OpCode::LookupTemporalIndex {
1427 state_id: _,
1428 index_name,
1429 lookup_value,
1430 timestamp,
1431 dest,
1432 } => {
1433 let actual_state_id = override_state_id;
1434 let state = self
1435 .states
1436 .get(&actual_state_id)
1437 .ok_or("State table not found")?;
1438 let lookup_val = &self.registers[*lookup_value];
1439
1440 let result = if self.registers[*timestamp].is_null() {
1441 if let Some(index) = state.temporal_indexes.get(index_name) {
1442 index.lookup_latest(lookup_val).unwrap_or(Value::Null)
1443 } else {
1444 Value::Null
1445 }
1446 } else {
1447 let ts_val = if let Some(val) = self.registers[*timestamp].as_i64() {
1448 val
1449 } else if let Some(val) = self.registers[*timestamp].as_u64() {
1450 val as i64
1451 } else {
1452 return Err(format!(
1453 "Timestamp must be a number (i64 or u64), got: {:?}",
1454 self.registers[*timestamp]
1455 )
1456 .into());
1457 };
1458
1459 if let Some(index) = state.temporal_indexes.get(index_name) {
1460 index.lookup(lookup_val, ts_val).unwrap_or(Value::Null)
1461 } else {
1462 Value::Null
1463 }
1464 };
1465
1466 self.registers[*dest] = result;
1467 pc += 1;
1468 }
1469 OpCode::UpdateLookupIndex {
1470 state_id: _,
1471 index_name,
1472 lookup_value,
1473 primary_key,
1474 } => {
1475 let actual_state_id = override_state_id;
1476 let state = self
1477 .states
1478 .get_mut(&actual_state_id)
1479 .ok_or("State table not found")?;
1480 let index = state
1481 .lookup_indexes
1482 .entry(index_name.clone())
1483 .or_insert_with(LookupIndex::new);
1484
1485 let lookup_val = self.registers[*lookup_value].clone();
1486 let pk_val = self.registers[*primary_key].clone();
1487
1488 index.insert(lookup_val, pk_val);
1489 pc += 1;
1490 }
1491 OpCode::LookupIndex {
1492 state_id: _,
1493 index_name,
1494 lookup_value,
1495 dest,
1496 } => {
1497 let actual_state_id = override_state_id;
1498 let lookup_val = self.registers[*lookup_value].clone();
1499
1500 let result = {
1501 let state = self
1502 .states
1503 .get(&actual_state_id)
1504 .ok_or("State table not found")?;
1505
1506 if let Some(index) = state.lookup_indexes.get(index_name) {
1507 let found = index.lookup(&lookup_val).unwrap_or(Value::Null);
1508 #[cfg(feature = "otel")]
1509 if found.is_null() {
1510 crate::vm_metrics::record_lookup_index_miss(index_name);
1511 } else {
1512 crate::vm_metrics::record_lookup_index_hit(index_name);
1513 }
1514 found
1515 } else {
1516 Value::Null
1517 }
1518 };
1519
1520 let final_result = if result.is_null() {
1521 if let Some(pda_str) = lookup_val.as_str() {
1522 let state = self
1523 .states
1524 .get_mut(&actual_state_id)
1525 .ok_or("State table not found")?;
1526
1527 if let Some(pda_lookup) =
1528 state.pda_reverse_lookups.get_mut("default_pda_lookup")
1529 {
1530 if let Some(resolved) = pda_lookup.lookup(pda_str) {
1531 Value::String(resolved)
1532 } else {
1533 Value::Null
1534 }
1535 } else {
1536 Value::Null
1537 }
1538 } else {
1539 Value::Null
1540 }
1541 } else {
1542 result
1543 };
1544
1545 self.registers[*dest] = final_result;
1546 pc += 1;
1547 }
1548 OpCode::SetFieldSum {
1549 object,
1550 path,
1551 value,
1552 } => {
1553 let was_updated = self.set_field_sum(*object, path, *value)?;
1554 if was_updated {
1555 dirty_tracker.mark_replaced(path);
1556 }
1557 pc += 1;
1558 }
1559 OpCode::SetFieldIncrement { object, path } => {
1560 let was_updated = self.set_field_increment(*object, path)?;
1561 if was_updated {
1562 dirty_tracker.mark_replaced(path);
1563 }
1564 pc += 1;
1565 }
1566 OpCode::SetFieldMin {
1567 object,
1568 path,
1569 value,
1570 } => {
1571 let was_updated = self.set_field_min(*object, path, *value)?;
1572 if was_updated {
1573 dirty_tracker.mark_replaced(path);
1574 }
1575 pc += 1;
1576 }
1577 OpCode::AddToUniqueSet {
1578 state_id: _,
1579 set_name,
1580 value,
1581 count_object,
1582 count_path,
1583 } => {
1584 let value_to_add = self.registers[*value].clone();
1585
1586 let set_field_path = format!("__unique_set:{}", set_name);
1589
1590 let mut set: HashSet<Value> =
1592 if let Ok(existing) = self.get_field(*count_object, &set_field_path) {
1593 if !existing.is_null() {
1594 serde_json::from_value(existing).unwrap_or_default()
1595 } else {
1596 HashSet::new()
1597 }
1598 } else {
1599 HashSet::new()
1600 };
1601
1602 let was_new = set.insert(value_to_add);
1604
1605 let set_as_vec: Vec<Value> = set.iter().cloned().collect();
1607 self.registers[100] = serde_json::to_value(set_as_vec)?;
1608 self.set_field_auto_vivify(*count_object, &set_field_path, 100)?;
1609
1610 if was_new {
1612 self.registers[100] = Value::Number(serde_json::Number::from(set.len()));
1613 self.set_field_auto_vivify(*count_object, count_path, 100)?;
1614 dirty_tracker.mark_replaced(count_path);
1615 }
1616
1617 pc += 1;
1618 }
1619 OpCode::ConditionalSetField {
1620 object,
1621 path,
1622 value,
1623 condition_field,
1624 condition_op,
1625 condition_value,
1626 } => {
1627 let field_value = self.load_field(event_value, condition_field, None)?;
1628 let condition_met =
1629 self.evaluate_comparison(&field_value, condition_op, condition_value)?;
1630
1631 if condition_met {
1632 self.set_field_auto_vivify(*object, path, *value)?;
1633 dirty_tracker.mark_replaced(path);
1634 }
1635 pc += 1;
1636 }
1637 OpCode::ConditionalIncrement {
1638 object,
1639 path,
1640 condition_field,
1641 condition_op,
1642 condition_value,
1643 } => {
1644 let field_value = self.load_field(event_value, condition_field, None)?;
1645 let condition_met =
1646 self.evaluate_comparison(&field_value, condition_op, condition_value)?;
1647
1648 if condition_met {
1649 let was_updated = self.set_field_increment(*object, path)?;
1650 if was_updated {
1651 dirty_tracker.mark_replaced(path);
1652 }
1653 }
1654 pc += 1;
1655 }
1656 OpCode::EvaluateComputedFields {
1657 state,
1658 computed_paths,
1659 } => {
1660 if let Some(evaluator) = entity_evaluator {
1661 let old_values: Vec<_> = computed_paths
1662 .iter()
1663 .map(|path| Self::get_value_at_path(&self.registers[*state], path))
1664 .collect();
1665
1666 let state_value = &mut self.registers[*state];
1667 if evaluator(state_value).is_ok() {
1668 for (path, old_value) in computed_paths.iter().zip(old_values.iter()) {
1669 let new_value =
1670 Self::get_value_at_path(&self.registers[*state], path);
1671 if new_value != *old_value {
1672 dirty_tracker.mark_replaced(path);
1673 }
1674 }
1675 }
1676 }
1677 pc += 1;
1678 }
1679 OpCode::UpdatePdaReverseLookup {
1680 state_id: _,
1681 lookup_name,
1682 pda_address,
1683 primary_key,
1684 } => {
1685 let actual_state_id = override_state_id;
1686 let state = self
1687 .states
1688 .get_mut(&actual_state_id)
1689 .ok_or("State table not found")?;
1690
1691 let pda_val = self.registers[*pda_address].clone();
1692 let pk_val = self.registers[*primary_key].clone();
1693
1694 if let (Some(pda_str), Some(pk_str)) = (pda_val.as_str(), pk_val.as_str()) {
1695 let pda_lookup = state
1696 .pda_reverse_lookups
1697 .entry(lookup_name.clone())
1698 .or_insert_with(|| {
1699 PdaReverseLookup::new(DEFAULT_MAX_PDA_REVERSE_LOOKUP_ENTRIES)
1700 });
1701
1702 pda_lookup.insert(pda_str.to_string(), pk_str.to_string());
1703 } else if !pk_val.is_null() {
1704 if let Some(pk_num) = pk_val.as_u64() {
1705 if let Some(pda_str) = pda_val.as_str() {
1706 let pda_lookup = state
1707 .pda_reverse_lookups
1708 .entry(lookup_name.clone())
1709 .or_insert_with(|| {
1710 PdaReverseLookup::new(
1711 DEFAULT_MAX_PDA_REVERSE_LOOKUP_ENTRIES,
1712 )
1713 });
1714
1715 pda_lookup.insert(pda_str.to_string(), pk_num.to_string());
1716 }
1717 }
1718 }
1719
1720 pc += 1;
1721 }
1722 }
1723
1724 self.instructions_executed += 1;
1725 }
1726
1727 Ok(output)
1728 }
1729
1730 fn load_field(
1731 &self,
1732 event_value: &Value,
1733 path: &FieldPath,
1734 default: Option<&Value>,
1735 ) -> Result<Value> {
1736 if path.segments.is_empty() {
1737 if let Some(obj) = event_value.as_object() {
1738 let filtered: serde_json::Map<String, Value> = obj
1739 .iter()
1740 .filter(|(k, _)| !k.starts_with("__"))
1741 .map(|(k, v)| (k.clone(), v.clone()))
1742 .collect();
1743 return Ok(Value::Object(filtered));
1744 }
1745 return Ok(event_value.clone());
1746 }
1747
1748 let mut current = event_value;
1749 for segment in path.segments.iter() {
1750 current = match current.get(segment) {
1751 Some(v) => v,
1752 None => return Ok(default.cloned().unwrap_or(Value::Null)),
1753 };
1754 }
1755
1756 Ok(current.clone())
1757 }
1758
1759 fn get_value_at_path(value: &Value, path: &str) -> Option<Value> {
1760 let mut current = value;
1761 for segment in path.split('.') {
1762 current = current.get(segment)?;
1763 }
1764 Some(current.clone())
1765 }
1766
1767 fn set_field_auto_vivify(
1768 &mut self,
1769 object_reg: Register,
1770 path: &str,
1771 value_reg: Register,
1772 ) -> Result<()> {
1773 let compiled = self.get_compiled_path(path);
1774 let segments = compiled.segments();
1775 let value = self.registers[value_reg].clone();
1776
1777 if !self.registers[object_reg].is_object() {
1778 self.registers[object_reg] = json!({});
1779 }
1780
1781 let obj = self.registers[object_reg]
1782 .as_object_mut()
1783 .ok_or("Not an object")?;
1784
1785 let mut current = obj;
1786 for (i, segment) in segments.iter().enumerate() {
1787 if i == segments.len() - 1 {
1788 current.insert(segment.to_string(), value);
1789 return Ok(());
1790 } else {
1791 current
1792 .entry(segment.to_string())
1793 .or_insert_with(|| json!({}));
1794 current = current
1795 .get_mut(segment)
1796 .and_then(|v| v.as_object_mut())
1797 .ok_or("Path collision: expected object")?;
1798 }
1799 }
1800
1801 Ok(())
1802 }
1803
1804 fn set_field_if_null(
1805 &mut self,
1806 object_reg: Register,
1807 path: &str,
1808 value_reg: Register,
1809 ) -> Result<bool> {
1810 let compiled = self.get_compiled_path(path);
1811 let segments = compiled.segments();
1812 let value = self.registers[value_reg].clone();
1813
1814 if !self.registers[object_reg].is_object() {
1815 self.registers[object_reg] = json!({});
1816 }
1817
1818 let obj = self.registers[object_reg]
1819 .as_object_mut()
1820 .ok_or("Not an object")?;
1821
1822 let mut current = obj;
1823 for (i, segment) in segments.iter().enumerate() {
1824 if i == segments.len() - 1 {
1825 if !current.contains_key(segment) || current.get(segment).unwrap().is_null() {
1826 current.insert(segment.to_string(), value);
1827 return Ok(true);
1828 }
1829 return Ok(false);
1830 } else {
1831 current
1832 .entry(segment.to_string())
1833 .or_insert_with(|| json!({}));
1834 current = current
1835 .get_mut(segment)
1836 .and_then(|v| v.as_object_mut())
1837 .ok_or("Path collision: expected object")?;
1838 }
1839 }
1840
1841 Ok(false)
1842 }
1843
1844 fn set_field_max(
1845 &mut self,
1846 object_reg: Register,
1847 path: &str,
1848 value_reg: Register,
1849 ) -> Result<bool> {
1850 let compiled = self.get_compiled_path(path);
1851 let segments = compiled.segments();
1852 let new_value = self.registers[value_reg].clone();
1853
1854 if !self.registers[object_reg].is_object() {
1855 self.registers[object_reg] = json!({});
1856 }
1857
1858 let obj = self.registers[object_reg]
1859 .as_object_mut()
1860 .ok_or("Not an object")?;
1861
1862 let mut current = obj;
1863 for (i, segment) in segments.iter().enumerate() {
1864 if i == segments.len() - 1 {
1865 let should_update = if let Some(current_value) = current.get(segment) {
1866 if current_value.is_null() {
1867 true
1868 } else {
1869 match (current_value.as_i64(), new_value.as_i64()) {
1870 (Some(current_val), Some(new_val)) => new_val > current_val,
1871 (Some(current_val), None) if new_value.as_u64().is_some() => {
1872 new_value.as_u64().unwrap() as i64 > current_val
1873 }
1874 (None, Some(new_val)) if current_value.as_u64().is_some() => {
1875 new_val > current_value.as_u64().unwrap() as i64
1876 }
1877 (None, None) => match (current_value.as_u64(), new_value.as_u64()) {
1878 (Some(current_val), Some(new_val)) => new_val > current_val,
1879 _ => match (current_value.as_f64(), new_value.as_f64()) {
1880 (Some(current_val), Some(new_val)) => new_val > current_val,
1881 _ => false,
1882 },
1883 },
1884 _ => false,
1885 }
1886 }
1887 } else {
1888 true
1889 };
1890
1891 if should_update {
1892 current.insert(segment.to_string(), new_value);
1893 return Ok(true);
1894 }
1895 return Ok(false);
1896 } else {
1897 current
1898 .entry(segment.to_string())
1899 .or_insert_with(|| json!({}));
1900 current = current
1901 .get_mut(segment)
1902 .and_then(|v| v.as_object_mut())
1903 .ok_or("Path collision: expected object")?;
1904 }
1905 }
1906
1907 Ok(false)
1908 }
1909
1910 fn set_field_sum(
1911 &mut self,
1912 object_reg: Register,
1913 path: &str,
1914 value_reg: Register,
1915 ) -> Result<bool> {
1916 let compiled = self.get_compiled_path(path);
1917 let segments = compiled.segments();
1918 let new_value = &self.registers[value_reg];
1919
1920 let new_val_num = new_value
1922 .as_i64()
1923 .or_else(|| new_value.as_u64().map(|n| n as i64))
1924 .ok_or("Sum requires numeric value")?;
1925
1926 if !self.registers[object_reg].is_object() {
1927 self.registers[object_reg] = json!({});
1928 }
1929
1930 let obj = self.registers[object_reg]
1931 .as_object_mut()
1932 .ok_or("Not an object")?;
1933
1934 let mut current = obj;
1935 for (i, segment) in segments.iter().enumerate() {
1936 if i == segments.len() - 1 {
1937 let current_val = current
1938 .get(segment)
1939 .and_then(|v| {
1940 if v.is_null() {
1941 None
1942 } else {
1943 v.as_i64().or_else(|| v.as_u64().map(|n| n as i64))
1944 }
1945 })
1946 .unwrap_or(0);
1947
1948 let sum = current_val + new_val_num;
1949 current.insert(segment.to_string(), json!(sum));
1950 return Ok(true);
1951 } else {
1952 current
1953 .entry(segment.to_string())
1954 .or_insert_with(|| json!({}));
1955 current = current
1956 .get_mut(segment)
1957 .and_then(|v| v.as_object_mut())
1958 .ok_or("Path collision: expected object")?;
1959 }
1960 }
1961
1962 Ok(false)
1963 }
1964
1965 fn set_field_increment(&mut self, object_reg: Register, path: &str) -> Result<bool> {
1966 let compiled = self.get_compiled_path(path);
1967 let segments = compiled.segments();
1968
1969 if !self.registers[object_reg].is_object() {
1970 self.registers[object_reg] = json!({});
1971 }
1972
1973 let obj = self.registers[object_reg]
1974 .as_object_mut()
1975 .ok_or("Not an object")?;
1976
1977 let mut current = obj;
1978 for (i, segment) in segments.iter().enumerate() {
1979 if i == segments.len() - 1 {
1980 let current_val = current
1982 .get(segment)
1983 .and_then(|v| {
1984 if v.is_null() {
1985 None
1986 } else {
1987 v.as_i64().or_else(|| v.as_u64().map(|n| n as i64))
1988 }
1989 })
1990 .unwrap_or(0);
1991
1992 let incremented = current_val + 1;
1993 current.insert(segment.to_string(), json!(incremented));
1994 return Ok(true);
1995 } else {
1996 current
1997 .entry(segment.to_string())
1998 .or_insert_with(|| json!({}));
1999 current = current
2000 .get_mut(segment)
2001 .and_then(|v| v.as_object_mut())
2002 .ok_or("Path collision: expected object")?;
2003 }
2004 }
2005
2006 Ok(false)
2007 }
2008
2009 fn set_field_min(
2010 &mut self,
2011 object_reg: Register,
2012 path: &str,
2013 value_reg: Register,
2014 ) -> Result<bool> {
2015 let compiled = self.get_compiled_path(path);
2016 let segments = compiled.segments();
2017 let new_value = self.registers[value_reg].clone();
2018
2019 if !self.registers[object_reg].is_object() {
2020 self.registers[object_reg] = json!({});
2021 }
2022
2023 let obj = self.registers[object_reg]
2024 .as_object_mut()
2025 .ok_or("Not an object")?;
2026
2027 let mut current = obj;
2028 for (i, segment) in segments.iter().enumerate() {
2029 if i == segments.len() - 1 {
2030 let should_update = if let Some(current_value) = current.get(segment) {
2031 if current_value.is_null() {
2032 true
2033 } else {
2034 match (current_value.as_i64(), new_value.as_i64()) {
2035 (Some(current_val), Some(new_val)) => new_val < current_val,
2036 (Some(current_val), None) if new_value.as_u64().is_some() => {
2037 (new_value.as_u64().unwrap() as i64) < current_val
2038 }
2039 (None, Some(new_val)) if current_value.as_u64().is_some() => {
2040 new_val < current_value.as_u64().unwrap() as i64
2041 }
2042 (None, None) => match (current_value.as_u64(), new_value.as_u64()) {
2043 (Some(current_val), Some(new_val)) => new_val < current_val,
2044 _ => match (current_value.as_f64(), new_value.as_f64()) {
2045 (Some(current_val), Some(new_val)) => new_val < current_val,
2046 _ => false,
2047 },
2048 },
2049 _ => false,
2050 }
2051 }
2052 } else {
2053 true
2054 };
2055
2056 if should_update {
2057 current.insert(segment.to_string(), new_value);
2058 return Ok(true);
2059 }
2060 return Ok(false);
2061 } else {
2062 current
2063 .entry(segment.to_string())
2064 .or_insert_with(|| json!({}));
2065 current = current
2066 .get_mut(segment)
2067 .and_then(|v| v.as_object_mut())
2068 .ok_or("Path collision: expected object")?;
2069 }
2070 }
2071
2072 Ok(false)
2073 }
2074
2075 fn get_field(&mut self, object_reg: Register, path: &str) -> Result<Value> {
2076 let compiled = self.get_compiled_path(path);
2077 let segments = compiled.segments();
2078 let mut current = &self.registers[object_reg];
2079
2080 for segment in segments {
2081 current = current
2082 .get(segment)
2083 .ok_or_else(|| format!("Field not found: {}", segment))?;
2084 }
2085
2086 Ok(current.clone())
2087 }
2088
2089 fn append_to_array(
2090 &mut self,
2091 object_reg: Register,
2092 path: &str,
2093 value_reg: Register,
2094 max_length: usize,
2095 ) -> Result<()> {
2096 let compiled = self.get_compiled_path(path);
2097 let segments = compiled.segments();
2098 let value = self.registers[value_reg].clone();
2099
2100 if !self.registers[object_reg].is_object() {
2101 self.registers[object_reg] = json!({});
2102 }
2103
2104 let obj = self.registers[object_reg]
2105 .as_object_mut()
2106 .ok_or("Not an object")?;
2107
2108 let mut current = obj;
2109 for (i, segment) in segments.iter().enumerate() {
2110 if i == segments.len() - 1 {
2111 current
2112 .entry(segment.to_string())
2113 .or_insert_with(|| json!([]));
2114 let arr = current
2115 .get_mut(segment)
2116 .and_then(|v| v.as_array_mut())
2117 .ok_or("Path is not an array")?;
2118 arr.push(value.clone());
2119
2120 if arr.len() > max_length {
2121 let excess = arr.len() - max_length;
2122 arr.drain(0..excess);
2123 }
2124 } else {
2125 current
2126 .entry(segment.to_string())
2127 .or_insert_with(|| json!({}));
2128 current = current
2129 .get_mut(segment)
2130 .and_then(|v| v.as_object_mut())
2131 .ok_or("Path collision: expected object")?;
2132 }
2133 }
2134
2135 Ok(())
2136 }
2137
2138 fn transform_in_place(&mut self, reg: Register, transformation: &Transformation) -> Result<()> {
2139 let value = &self.registers[reg];
2140 let transformed = self.apply_transformation(value, transformation)?;
2141 self.registers[reg] = transformed;
2142 Ok(())
2143 }
2144
2145 fn apply_transformation(
2146 &self,
2147 value: &Value,
2148 transformation: &Transformation,
2149 ) -> Result<Value> {
2150 match transformation {
2151 Transformation::HexEncode => {
2152 if let Some(arr) = value.as_array() {
2153 let bytes: Vec<u8> = arr
2154 .iter()
2155 .filter_map(|v| v.as_u64().map(|n| n as u8))
2156 .collect();
2157 let hex = hex::encode(&bytes);
2158 Ok(json!(hex))
2159 } else {
2160 Err("HexEncode requires an array of numbers".into())
2161 }
2162 }
2163 Transformation::HexDecode => {
2164 if let Some(s) = value.as_str() {
2165 let s = s.strip_prefix("0x").unwrap_or(s);
2166 let bytes = hex::decode(s).map_err(|e| format!("Hex decode error: {}", e))?;
2167 Ok(json!(bytes))
2168 } else {
2169 Err("HexDecode requires a string".into())
2170 }
2171 }
2172 Transformation::Base58Encode => {
2173 if let Some(arr) = value.as_array() {
2174 let bytes: Vec<u8> = arr
2175 .iter()
2176 .filter_map(|v| v.as_u64().map(|n| n as u8))
2177 .collect();
2178 let encoded = bs58::encode(&bytes).into_string();
2179 Ok(json!(encoded))
2180 } else if value.is_string() {
2181 Ok(value.clone())
2182 } else {
2183 Err("Base58Encode requires an array of numbers".into())
2184 }
2185 }
2186 Transformation::Base58Decode => {
2187 if let Some(s) = value.as_str() {
2188 let bytes = bs58::decode(s)
2189 .into_vec()
2190 .map_err(|e| format!("Base58 decode error: {}", e))?;
2191 Ok(json!(bytes))
2192 } else {
2193 Err("Base58Decode requires a string".into())
2194 }
2195 }
2196 Transformation::ToString => Ok(json!(value.to_string())),
2197 Transformation::ToNumber => {
2198 if let Some(s) = value.as_str() {
2199 let n = s
2200 .parse::<i64>()
2201 .map_err(|e| format!("Parse error: {}", e))?;
2202 Ok(json!(n))
2203 } else {
2204 Ok(value.clone())
2205 }
2206 }
2207 }
2208 }
2209
2210 fn evaluate_comparison(
2211 &self,
2212 field_value: &Value,
2213 op: &ComparisonOp,
2214 condition_value: &Value,
2215 ) -> Result<bool> {
2216 use ComparisonOp::*;
2217
2218 match op {
2219 Equal => Ok(field_value == condition_value),
2220 NotEqual => Ok(field_value != condition_value),
2221 GreaterThan => {
2222 match (field_value.as_i64(), condition_value.as_i64()) {
2224 (Some(a), Some(b)) => Ok(a > b),
2225 _ => match (field_value.as_u64(), condition_value.as_u64()) {
2226 (Some(a), Some(b)) => Ok(a > b),
2227 _ => match (field_value.as_f64(), condition_value.as_f64()) {
2228 (Some(a), Some(b)) => Ok(a > b),
2229 _ => Err("Cannot compare non-numeric values with GreaterThan".into()),
2230 },
2231 },
2232 }
2233 }
2234 GreaterThanOrEqual => match (field_value.as_i64(), condition_value.as_i64()) {
2235 (Some(a), Some(b)) => Ok(a >= b),
2236 _ => match (field_value.as_u64(), condition_value.as_u64()) {
2237 (Some(a), Some(b)) => Ok(a >= b),
2238 _ => match (field_value.as_f64(), condition_value.as_f64()) {
2239 (Some(a), Some(b)) => Ok(a >= b),
2240 _ => {
2241 Err("Cannot compare non-numeric values with GreaterThanOrEqual".into())
2242 }
2243 },
2244 },
2245 },
2246 LessThan => match (field_value.as_i64(), condition_value.as_i64()) {
2247 (Some(a), Some(b)) => Ok(a < b),
2248 _ => match (field_value.as_u64(), condition_value.as_u64()) {
2249 (Some(a), Some(b)) => Ok(a < b),
2250 _ => match (field_value.as_f64(), condition_value.as_f64()) {
2251 (Some(a), Some(b)) => Ok(a < b),
2252 _ => Err("Cannot compare non-numeric values with LessThan".into()),
2253 },
2254 },
2255 },
2256 LessThanOrEqual => match (field_value.as_i64(), condition_value.as_i64()) {
2257 (Some(a), Some(b)) => Ok(a <= b),
2258 _ => match (field_value.as_u64(), condition_value.as_u64()) {
2259 (Some(a), Some(b)) => Ok(a <= b),
2260 _ => match (field_value.as_f64(), condition_value.as_f64()) {
2261 (Some(a), Some(b)) => Ok(a <= b),
2262 _ => Err("Cannot compare non-numeric values with LessThanOrEqual".into()),
2263 },
2264 },
2265 },
2266 }
2267 }
2268
2269 #[cfg_attr(feature = "otel", instrument(
2278 name = "vm.update_pda_lookup",
2279 skip(self),
2280 fields(
2281 pda = %pda_address,
2282 seed = %seed_value,
2283 )
2284 ))]
2285 pub fn update_pda_reverse_lookup(
2286 &mut self,
2287 state_id: u32,
2288 lookup_name: &str,
2289 pda_address: String,
2290 seed_value: String,
2291 ) -> Result<Vec<PendingAccountUpdate>> {
2292 let state = self
2293 .states
2294 .get_mut(&state_id)
2295 .ok_or("State table not found")?;
2296
2297 let lookup = state
2298 .pda_reverse_lookups
2299 .entry(lookup_name.to_string())
2300 .or_insert_with(|| PdaReverseLookup::new(DEFAULT_MAX_PDA_REVERSE_LOOKUP_ENTRIES));
2301
2302 let evicted_pda = lookup.insert(pda_address.clone(), seed_value);
2303
2304 if let Some(ref evicted) = evicted_pda {
2305 if let Some((_, evicted_updates)) = state.pending_updates.remove(evicted) {
2306 let count = evicted_updates.len();
2307 self.pending_queue_size = self.pending_queue_size.saturating_sub(count as u64);
2308 }
2309 }
2310
2311 self.flush_pending_updates(state_id, &pda_address)
2313 }
2314
2315 pub fn cleanup_expired_pending_updates(&mut self, state_id: u32) -> usize {
2320 let state = match self.states.get_mut(&state_id) {
2321 Some(s) => s,
2322 None => return 0,
2323 };
2324
2325 let now = std::time::SystemTime::now()
2326 .duration_since(std::time::UNIX_EPOCH)
2327 .unwrap()
2328 .as_secs() as i64;
2329
2330 let mut removed_count = 0;
2331
2332 state.pending_updates.retain(|_pda_address, updates| {
2334 let original_len = updates.len();
2335
2336 updates.retain(|update| {
2337 let age = now - update.queued_at;
2338 age <= PENDING_UPDATE_TTL_SECONDS
2339 });
2340
2341 removed_count += original_len - updates.len();
2342
2343 !updates.is_empty()
2345 });
2346
2347 self.pending_queue_size = self.pending_queue_size.saturating_sub(removed_count as u64);
2349
2350 if removed_count > 0 {
2351 #[cfg(feature = "otel")]
2352 crate::vm_metrics::record_pending_updates_expired(
2353 removed_count as u64,
2354 &state.entity_name,
2355 );
2356 }
2357
2358 removed_count
2359 }
2360
2361 #[cfg_attr(feature = "otel", instrument(
2393 name = "vm.queue_account_update",
2394 skip(self, update),
2395 fields(
2396 pda = %update.pda_address,
2397 account_type = %update.account_type,
2398 slot = update.slot,
2399 )
2400 ))]
2401 pub fn queue_account_update(
2402 &mut self,
2403 state_id: u32,
2404 update: QueuedAccountUpdate,
2405 ) -> Result<()> {
2406 if self.pending_queue_size >= MAX_PENDING_UPDATES_TOTAL as u64 {
2407 self.cleanup_expired_pending_updates(state_id);
2408 if self.pending_queue_size >= MAX_PENDING_UPDATES_TOTAL as u64 {
2409 self.drop_oldest_pending_update(state_id)?;
2410 }
2411 }
2412
2413 let state = self
2414 .states
2415 .get_mut(&state_id)
2416 .ok_or("State table not found")?;
2417
2418 let pending = PendingAccountUpdate {
2419 account_type: update.account_type,
2420 pda_address: update.pda_address.clone(),
2421 account_data: update.account_data,
2422 slot: update.slot,
2423 write_version: update.write_version,
2424 signature: update.signature,
2425 queued_at: std::time::SystemTime::now()
2426 .duration_since(std::time::UNIX_EPOCH)
2427 .unwrap()
2428 .as_secs() as i64,
2429 };
2430
2431 let pda_address = pending.pda_address.clone();
2432 let slot = pending.slot;
2433
2434 let mut updates = state
2435 .pending_updates
2436 .entry(pda_address.clone())
2437 .or_insert_with(Vec::new);
2438
2439 let original_len = updates.len();
2440 updates.retain(|existing| existing.slot > slot);
2441 let removed_by_dedup = original_len - updates.len();
2442
2443 if removed_by_dedup > 0 {
2444 self.pending_queue_size = self
2445 .pending_queue_size
2446 .saturating_sub(removed_by_dedup as u64);
2447 }
2448
2449 if updates.len() >= MAX_PENDING_UPDATES_PER_PDA {
2450 updates.remove(0);
2451 self.pending_queue_size = self.pending_queue_size.saturating_sub(1);
2452 }
2453
2454 updates.push(pending);
2455 #[cfg(feature = "otel")]
2456 crate::vm_metrics::record_pending_update_queued(&state.entity_name);
2457
2458 Ok(())
2459 }
2460
2461 pub fn get_pending_queue_stats(&self, state_id: u32) -> Option<PendingQueueStats> {
2463 let state = self.states.get(&state_id)?;
2464
2465 let now = std::time::SystemTime::now()
2466 .duration_since(std::time::UNIX_EPOCH)
2467 .unwrap()
2468 .as_secs() as i64;
2469
2470 let mut total_updates = 0;
2471 let mut oldest_timestamp = now;
2472 let mut largest_pda_queue = 0;
2473 let mut estimated_memory = 0;
2474
2475 for entry in state.pending_updates.iter() {
2476 let (_, updates) = entry.pair();
2477 total_updates += updates.len();
2478 largest_pda_queue = largest_pda_queue.max(updates.len());
2479
2480 for update in updates.iter() {
2481 oldest_timestamp = oldest_timestamp.min(update.queued_at);
2482 estimated_memory += update.account_type.len() +
2484 update.pda_address.len() +
2485 update.signature.len() +
2486 16 + estimate_json_size(&update.account_data);
2488 }
2489 }
2490
2491 Some(PendingQueueStats {
2492 total_updates,
2493 unique_pdas: state.pending_updates.len(),
2494 oldest_age_seconds: now - oldest_timestamp,
2495 largest_pda_queue_size: largest_pda_queue,
2496 estimated_memory_bytes: estimated_memory,
2497 })
2498 }
2499
2500 pub fn get_memory_stats(&self, state_id: u32) -> VmMemoryStats {
2501 let mut stats = VmMemoryStats {
2502 path_cache_size: self.path_cache.len(),
2503 ..Default::default()
2504 };
2505
2506 if let Some(state) = self.states.get(&state_id) {
2507 stats.state_table_entity_count = state.data.len();
2508 stats.state_table_max_entries = state.config.max_entries;
2509 stats.state_table_at_capacity = state.is_at_capacity();
2510
2511 stats.lookup_index_count = state.lookup_indexes.len();
2512 stats.lookup_index_total_entries =
2513 state.lookup_indexes.values().map(|idx| idx.len()).sum();
2514
2515 stats.temporal_index_count = state.temporal_indexes.len();
2516 stats.temporal_index_total_entries = state
2517 .temporal_indexes
2518 .values()
2519 .map(|idx| idx.total_entries())
2520 .sum();
2521
2522 stats.pda_reverse_lookup_count = state.pda_reverse_lookups.len();
2523 stats.pda_reverse_lookup_total_entries = state
2524 .pda_reverse_lookups
2525 .values()
2526 .map(|lookup| lookup.len())
2527 .sum();
2528
2529 stats.version_tracker_entries = state.version_tracker.len();
2530
2531 stats.pending_queue_stats = self.get_pending_queue_stats(state_id);
2532 }
2533
2534 stats
2535 }
2536
2537 pub fn cleanup_all_expired(&mut self, state_id: u32) -> CleanupResult {
2538 let pending_removed = self.cleanup_expired_pending_updates(state_id);
2539 let temporal_removed = self.cleanup_temporal_indexes(state_id);
2540
2541 #[cfg(feature = "otel")]
2542 if let Some(state) = self.states.get(&state_id) {
2543 crate::vm_metrics::record_cleanup(
2544 pending_removed,
2545 temporal_removed,
2546 &state.entity_name,
2547 );
2548 }
2549
2550 CleanupResult {
2551 pending_updates_removed: pending_removed,
2552 temporal_entries_removed: temporal_removed,
2553 }
2554 }
2555
2556 fn cleanup_temporal_indexes(&mut self, state_id: u32) -> usize {
2557 let state = match self.states.get_mut(&state_id) {
2558 Some(s) => s,
2559 None => return 0,
2560 };
2561
2562 let now = std::time::SystemTime::now()
2563 .duration_since(std::time::UNIX_EPOCH)
2564 .unwrap()
2565 .as_secs() as i64;
2566
2567 let cutoff = now - TEMPORAL_HISTORY_TTL_SECONDS;
2568 let mut total_removed = 0;
2569
2570 for (_, index) in state.temporal_indexes.iter_mut() {
2571 total_removed += index.cleanup_expired(cutoff);
2572 }
2573
2574 total_removed
2575 }
2576
2577 pub fn check_state_table_capacity(&self, state_id: u32) -> Option<CapacityWarning> {
2578 let state = self.states.get(&state_id)?;
2579
2580 if state.is_at_capacity() {
2581 Some(CapacityWarning {
2582 current_entries: state.data.len(),
2583 max_entries: state.config.max_entries,
2584 entries_over_limit: state.entries_over_limit(),
2585 })
2586 } else {
2587 None
2588 }
2589 }
2590
2591 fn drop_oldest_pending_update(&mut self, state_id: u32) -> Result<()> {
2593 let state = self
2594 .states
2595 .get_mut(&state_id)
2596 .ok_or("State table not found")?;
2597
2598 let mut oldest_pda: Option<String> = None;
2599 let mut oldest_timestamp = i64::MAX;
2600
2601 for entry in state.pending_updates.iter() {
2603 let (pda, updates) = entry.pair();
2604 if let Some(update) = updates.first() {
2605 if update.queued_at < oldest_timestamp {
2606 oldest_timestamp = update.queued_at;
2607 oldest_pda = Some(pda.clone());
2608 }
2609 }
2610 }
2611
2612 if let Some(pda) = oldest_pda {
2614 if let Some(mut updates) = state.pending_updates.get_mut(&pda) {
2615 if !updates.is_empty() {
2616 updates.remove(0);
2617 self.pending_queue_size = self.pending_queue_size.saturating_sub(1);
2618
2619 if updates.is_empty() {
2621 drop(updates);
2622 state.pending_updates.remove(&pda);
2623 }
2624 }
2625 }
2626 }
2627
2628 Ok(())
2629 }
2630
2631 fn flush_pending_updates(
2636 &mut self,
2637 state_id: u32,
2638 pda_address: &str,
2639 ) -> Result<Vec<PendingAccountUpdate>> {
2640 let state = self
2641 .states
2642 .get_mut(&state_id)
2643 .ok_or("State table not found")?;
2644
2645 if let Some((_, pending_updates)) = state.pending_updates.remove(pda_address) {
2646 let count = pending_updates.len();
2647 self.pending_queue_size = self.pending_queue_size.saturating_sub(count as u64);
2648 #[cfg(feature = "otel")]
2649 crate::vm_metrics::record_pending_updates_flushed(count as u64, &state.entity_name);
2650 Ok(pending_updates)
2651 } else {
2652 Ok(Vec::new())
2653 }
2654 }
2655
2656 pub fn try_pda_reverse_lookup(
2658 &mut self,
2659 state_id: u32,
2660 lookup_name: &str,
2661 pda_address: &str,
2662 ) -> Option<String> {
2663 let state = self.states.get_mut(&state_id)?;
2664
2665 if let Some(lookup) = state.pda_reverse_lookups.get_mut(lookup_name) {
2666 if let Some(value) = lookup.lookup(pda_address) {
2667 self.pda_cache_hits += 1;
2668 return Some(value);
2669 }
2670 }
2671
2672 self.pda_cache_misses += 1;
2673 None
2674 }
2675
2676 pub fn evaluate_computed_expr(&self, expr: &ComputedExpr, state: &Value) -> Result<Value> {
2683 self.evaluate_computed_expr_with_env(expr, state, &std::collections::HashMap::new())
2684 }
2685
2686 fn evaluate_computed_expr_with_env(
2688 &self,
2689 expr: &ComputedExpr,
2690 state: &Value,
2691 env: &std::collections::HashMap<String, Value>,
2692 ) -> Result<Value> {
2693 match expr {
2694 ComputedExpr::FieldRef { path } => self.get_field_from_state(state, path),
2695
2696 ComputedExpr::Var { name } => env
2697 .get(name)
2698 .cloned()
2699 .ok_or_else(|| format!("Undefined variable: {}", name).into()),
2700
2701 ComputedExpr::Let { name, value, body } => {
2702 let val = self.evaluate_computed_expr_with_env(value, state, env)?;
2703 let mut new_env = env.clone();
2704 new_env.insert(name.clone(), val);
2705 self.evaluate_computed_expr_with_env(body, state, &new_env)
2706 }
2707
2708 ComputedExpr::If {
2709 condition,
2710 then_branch,
2711 else_branch,
2712 } => {
2713 let cond_val = self.evaluate_computed_expr_with_env(condition, state, env)?;
2714 if self.value_to_bool(&cond_val) {
2715 self.evaluate_computed_expr_with_env(then_branch, state, env)
2716 } else {
2717 self.evaluate_computed_expr_with_env(else_branch, state, env)
2718 }
2719 }
2720
2721 ComputedExpr::None => Ok(Value::Null),
2722
2723 ComputedExpr::Some { value } => self.evaluate_computed_expr_with_env(value, state, env),
2724
2725 ComputedExpr::Slice { expr, start, end } => {
2726 let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
2727 match val {
2728 Value::Array(arr) => {
2729 let slice: Vec<Value> = arr.get(*start..*end).unwrap_or(&[]).to_vec();
2730 Ok(Value::Array(slice))
2731 }
2732 _ => Err(format!("Cannot slice non-array value: {:?}", val).into()),
2733 }
2734 }
2735
2736 ComputedExpr::Index { expr, index } => {
2737 let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
2738 match val {
2739 Value::Array(arr) => Ok(arr.get(*index).cloned().unwrap_or(Value::Null)),
2740 _ => Err(format!("Cannot index non-array value: {:?}", val).into()),
2741 }
2742 }
2743
2744 ComputedExpr::U64FromLeBytes { bytes } => {
2745 let val = self.evaluate_computed_expr_with_env(bytes, state, env)?;
2746 let byte_vec = self.value_to_bytes(&val)?;
2747 if byte_vec.len() < 8 {
2748 return Err(format!(
2749 "u64::from_le_bytes requires 8 bytes, got {}",
2750 byte_vec.len()
2751 )
2752 .into());
2753 }
2754 let arr: [u8; 8] = byte_vec[..8]
2755 .try_into()
2756 .map_err(|_| "Failed to convert to [u8; 8]")?;
2757 Ok(json!(u64::from_le_bytes(arr)))
2758 }
2759
2760 ComputedExpr::U64FromBeBytes { bytes } => {
2761 let val = self.evaluate_computed_expr_with_env(bytes, state, env)?;
2762 let byte_vec = self.value_to_bytes(&val)?;
2763 if byte_vec.len() < 8 {
2764 return Err(format!(
2765 "u64::from_be_bytes requires 8 bytes, got {}",
2766 byte_vec.len()
2767 )
2768 .into());
2769 }
2770 let arr: [u8; 8] = byte_vec[..8]
2771 .try_into()
2772 .map_err(|_| "Failed to convert to [u8; 8]")?;
2773 Ok(json!(u64::from_be_bytes(arr)))
2774 }
2775
2776 ComputedExpr::ByteArray { bytes } => {
2777 Ok(Value::Array(bytes.iter().map(|b| json!(*b)).collect()))
2778 }
2779
2780 ComputedExpr::Closure { param, body } => {
2781 Ok(json!({
2784 "__closure": {
2785 "param": param,
2786 "body": serde_json::to_value(body).unwrap_or(Value::Null)
2787 }
2788 }))
2789 }
2790
2791 ComputedExpr::Unary { op, expr } => {
2792 let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
2793 self.apply_unary_op(op, &val)
2794 }
2795
2796 ComputedExpr::JsonToBytes { expr } => {
2797 let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
2798 let bytes = self.value_to_bytes(&val)?;
2800 Ok(Value::Array(bytes.iter().map(|b| json!(*b)).collect()))
2801 }
2802
2803 ComputedExpr::UnwrapOr { expr, default } => {
2804 let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
2805 if val.is_null() {
2806 Ok(default.clone())
2807 } else {
2808 Ok(val)
2809 }
2810 }
2811
2812 ComputedExpr::Binary { op, left, right } => {
2813 let l = self.evaluate_computed_expr_with_env(left, state, env)?;
2814 let r = self.evaluate_computed_expr_with_env(right, state, env)?;
2815 self.apply_binary_op(op, &l, &r)
2816 }
2817
2818 ComputedExpr::Cast { expr, to_type } => {
2819 let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
2820 self.apply_cast(&val, to_type)
2821 }
2822
2823 ComputedExpr::MethodCall { expr, method, args } => {
2824 let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
2825 if method == "map" && args.len() == 1 {
2827 if let ComputedExpr::Closure { param, body } = &args[0] {
2828 if val.is_null() {
2830 return Ok(Value::Null);
2831 }
2832 let mut closure_env = env.clone();
2834 closure_env.insert(param.clone(), val);
2835 return self.evaluate_computed_expr_with_env(body, state, &closure_env);
2836 }
2837 }
2838 let evaluated_args: Vec<Value> = args
2839 .iter()
2840 .map(|a| self.evaluate_computed_expr_with_env(a, state, env))
2841 .collect::<Result<Vec<_>>>()?;
2842 self.apply_method_call(&val, method, &evaluated_args)
2843 }
2844
2845 ComputedExpr::Literal { value } => Ok(value.clone()),
2846
2847 ComputedExpr::Paren { expr } => self.evaluate_computed_expr_with_env(expr, state, env),
2848 }
2849 }
2850
2851 fn value_to_bytes(&self, val: &Value) -> Result<Vec<u8>> {
2853 match val {
2854 Value::Array(arr) => arr
2855 .iter()
2856 .map(|v| {
2857 v.as_u64()
2858 .map(|n| n as u8)
2859 .ok_or_else(|| "Array element not a valid byte".into())
2860 })
2861 .collect(),
2862 Value::String(s) => {
2863 if s.starts_with("0x") || s.starts_with("0X") {
2865 hex::decode(&s[2..]).map_err(|e| format!("Invalid hex string: {}", e).into())
2866 } else {
2867 hex::decode(s).map_err(|e| format!("Invalid hex string: {}", e).into())
2868 }
2869 }
2870 _ => Err(format!("Cannot convert {:?} to bytes", val).into()),
2871 }
2872 }
2873
2874 fn apply_unary_op(&self, op: &crate::ast::UnaryOp, val: &Value) -> Result<Value> {
2876 use crate::ast::UnaryOp;
2877 match op {
2878 UnaryOp::Not => Ok(json!(!self.value_to_bool(val))),
2879 UnaryOp::ReverseBits => match val.as_u64() {
2880 Some(n) => Ok(json!(n.reverse_bits())),
2881 None => match val.as_i64() {
2882 Some(n) => Ok(json!((n as u64).reverse_bits())),
2883 None => Err("reverse_bits requires an integer".into()),
2884 },
2885 },
2886 }
2887 }
2888
2889 fn get_field_from_state(&self, state: &Value, path: &str) -> Result<Value> {
2891 let segments: Vec<&str> = path.split('.').collect();
2892 let mut current = state;
2893
2894 for segment in segments {
2895 match current.get(segment) {
2896 Some(v) => current = v,
2897 None => return Ok(Value::Null),
2898 }
2899 }
2900
2901 Ok(current.clone())
2902 }
2903
2904 fn apply_binary_op(&self, op: &BinaryOp, left: &Value, right: &Value) -> Result<Value> {
2906 match op {
2907 BinaryOp::Add => self.numeric_op(left, right, |a, b| a + b, |a, b| a + b),
2909 BinaryOp::Sub => self.numeric_op(left, right, |a, b| a - b, |a, b| a - b),
2910 BinaryOp::Mul => self.numeric_op(left, right, |a, b| a * b, |a, b| a * b),
2911 BinaryOp::Div => {
2912 if let Some(r) = right.as_i64() {
2914 if r == 0 {
2915 return Err("Division by zero".into());
2916 }
2917 }
2918 if let Some(r) = right.as_f64() {
2919 if r == 0.0 {
2920 return Err("Division by zero".into());
2921 }
2922 }
2923 self.numeric_op(left, right, |a, b| a / b, |a, b| a / b)
2924 }
2925 BinaryOp::Mod => {
2926 match (left.as_i64(), right.as_i64()) {
2928 (Some(a), Some(b)) if b != 0 => Ok(json!(a % b)),
2929 (None, _) | (_, None) => match (left.as_u64(), right.as_u64()) {
2930 (Some(a), Some(b)) if b != 0 => Ok(json!(a % b)),
2931 _ => Err("Modulo requires non-zero integer operands".into()),
2932 },
2933 _ => Err("Modulo by zero".into()),
2934 }
2935 }
2936
2937 BinaryOp::Gt => self.comparison_op(left, right, |a, b| a > b, |a, b| a > b),
2939 BinaryOp::Lt => self.comparison_op(left, right, |a, b| a < b, |a, b| a < b),
2940 BinaryOp::Gte => self.comparison_op(left, right, |a, b| a >= b, |a, b| a >= b),
2941 BinaryOp::Lte => self.comparison_op(left, right, |a, b| a <= b, |a, b| a <= b),
2942 BinaryOp::Eq => Ok(json!(left == right)),
2943 BinaryOp::Ne => Ok(json!(left != right)),
2944
2945 BinaryOp::And => {
2947 let l_bool = self.value_to_bool(left);
2948 let r_bool = self.value_to_bool(right);
2949 Ok(json!(l_bool && r_bool))
2950 }
2951 BinaryOp::Or => {
2952 let l_bool = self.value_to_bool(left);
2953 let r_bool = self.value_to_bool(right);
2954 Ok(json!(l_bool || r_bool))
2955 }
2956
2957 BinaryOp::Xor => match (left.as_u64(), right.as_u64()) {
2959 (Some(a), Some(b)) => Ok(json!(a ^ b)),
2960 _ => match (left.as_i64(), right.as_i64()) {
2961 (Some(a), Some(b)) => Ok(json!(a ^ b)),
2962 _ => Err("XOR requires integer operands".into()),
2963 },
2964 },
2965 BinaryOp::BitAnd => match (left.as_u64(), right.as_u64()) {
2966 (Some(a), Some(b)) => Ok(json!(a & b)),
2967 _ => match (left.as_i64(), right.as_i64()) {
2968 (Some(a), Some(b)) => Ok(json!(a & b)),
2969 _ => Err("BitAnd requires integer operands".into()),
2970 },
2971 },
2972 BinaryOp::BitOr => match (left.as_u64(), right.as_u64()) {
2973 (Some(a), Some(b)) => Ok(json!(a | b)),
2974 _ => match (left.as_i64(), right.as_i64()) {
2975 (Some(a), Some(b)) => Ok(json!(a | b)),
2976 _ => Err("BitOr requires integer operands".into()),
2977 },
2978 },
2979 BinaryOp::Shl => match (left.as_u64(), right.as_u64()) {
2980 (Some(a), Some(b)) => Ok(json!(a << b)),
2981 _ => match (left.as_i64(), right.as_i64()) {
2982 (Some(a), Some(b)) => Ok(json!(a << b)),
2983 _ => Err("Shl requires integer operands".into()),
2984 },
2985 },
2986 BinaryOp::Shr => match (left.as_u64(), right.as_u64()) {
2987 (Some(a), Some(b)) => Ok(json!(a >> b)),
2988 _ => match (left.as_i64(), right.as_i64()) {
2989 (Some(a), Some(b)) => Ok(json!(a >> b)),
2990 _ => Err("Shr requires integer operands".into()),
2991 },
2992 },
2993 }
2994 }
2995
2996 fn numeric_op<F1, F2>(
2998 &self,
2999 left: &Value,
3000 right: &Value,
3001 int_op: F1,
3002 float_op: F2,
3003 ) -> Result<Value>
3004 where
3005 F1: Fn(i64, i64) -> i64,
3006 F2: Fn(f64, f64) -> f64,
3007 {
3008 if let (Some(a), Some(b)) = (left.as_i64(), right.as_i64()) {
3010 return Ok(json!(int_op(a, b)));
3011 }
3012
3013 if let (Some(a), Some(b)) = (left.as_u64(), right.as_u64()) {
3015 return Ok(json!(int_op(a as i64, b as i64)));
3017 }
3018
3019 if let (Some(a), Some(b)) = (left.as_f64(), right.as_f64()) {
3021 return Ok(json!(float_op(a, b)));
3022 }
3023
3024 if left.is_null() || right.is_null() {
3026 return Ok(Value::Null);
3027 }
3028
3029 Err(format!(
3030 "Cannot perform numeric operation on {:?} and {:?}",
3031 left, right
3032 )
3033 .into())
3034 }
3035
3036 fn comparison_op<F1, F2>(
3038 &self,
3039 left: &Value,
3040 right: &Value,
3041 int_cmp: F1,
3042 float_cmp: F2,
3043 ) -> Result<Value>
3044 where
3045 F1: Fn(i64, i64) -> bool,
3046 F2: Fn(f64, f64) -> bool,
3047 {
3048 if let (Some(a), Some(b)) = (left.as_i64(), right.as_i64()) {
3050 return Ok(json!(int_cmp(a, b)));
3051 }
3052
3053 if let (Some(a), Some(b)) = (left.as_u64(), right.as_u64()) {
3055 return Ok(json!(int_cmp(a as i64, b as i64)));
3056 }
3057
3058 if let (Some(a), Some(b)) = (left.as_f64(), right.as_f64()) {
3060 return Ok(json!(float_cmp(a, b)));
3061 }
3062
3063 if left.is_null() || right.is_null() {
3065 return Ok(json!(false));
3066 }
3067
3068 Err(format!("Cannot compare {:?} and {:?}", left, right).into())
3069 }
3070
3071 fn value_to_bool(&self, value: &Value) -> bool {
3073 match value {
3074 Value::Null => false,
3075 Value::Bool(b) => *b,
3076 Value::Number(n) => {
3077 if let Some(i) = n.as_i64() {
3078 i != 0
3079 } else if let Some(f) = n.as_f64() {
3080 f != 0.0
3081 } else {
3082 true
3083 }
3084 }
3085 Value::String(s) => !s.is_empty(),
3086 Value::Array(arr) => !arr.is_empty(),
3087 Value::Object(obj) => !obj.is_empty(),
3088 }
3089 }
3090
3091 fn apply_cast(&self, value: &Value, to_type: &str) -> Result<Value> {
3093 match to_type {
3094 "i8" | "i16" | "i32" | "i64" | "isize" => {
3095 if let Some(n) = value.as_i64() {
3096 Ok(json!(n))
3097 } else if let Some(n) = value.as_u64() {
3098 Ok(json!(n as i64))
3099 } else if let Some(n) = value.as_f64() {
3100 Ok(json!(n as i64))
3101 } else if let Some(s) = value.as_str() {
3102 s.parse::<i64>()
3103 .map(|n| json!(n))
3104 .map_err(|e| format!("Cannot parse '{}' as integer: {}", s, e).into())
3105 } else {
3106 Err(format!("Cannot cast {:?} to {}", value, to_type).into())
3107 }
3108 }
3109 "u8" | "u16" | "u32" | "u64" | "usize" => {
3110 if let Some(n) = value.as_u64() {
3111 Ok(json!(n))
3112 } else if let Some(n) = value.as_i64() {
3113 Ok(json!(n as u64))
3114 } else if let Some(n) = value.as_f64() {
3115 Ok(json!(n as u64))
3116 } else if let Some(s) = value.as_str() {
3117 s.parse::<u64>().map(|n| json!(n)).map_err(|e| {
3118 format!("Cannot parse '{}' as unsigned integer: {}", s, e).into()
3119 })
3120 } else {
3121 Err(format!("Cannot cast {:?} to {}", value, to_type).into())
3122 }
3123 }
3124 "f32" | "f64" => {
3125 if let Some(n) = value.as_f64() {
3126 Ok(json!(n))
3127 } else if let Some(n) = value.as_i64() {
3128 Ok(json!(n as f64))
3129 } else if let Some(n) = value.as_u64() {
3130 Ok(json!(n as f64))
3131 } else if let Some(s) = value.as_str() {
3132 s.parse::<f64>()
3133 .map(|n| json!(n))
3134 .map_err(|e| format!("Cannot parse '{}' as float: {}", s, e).into())
3135 } else {
3136 Err(format!("Cannot cast {:?} to {}", value, to_type).into())
3137 }
3138 }
3139 "String" | "string" => Ok(json!(value.to_string())),
3140 "bool" => Ok(json!(self.value_to_bool(value))),
3141 _ => {
3142 Ok(value.clone())
3144 }
3145 }
3146 }
3147
3148 fn apply_method_call(&self, value: &Value, method: &str, args: &[Value]) -> Result<Value> {
3150 match method {
3151 "unwrap_or" => {
3152 if value.is_null() && !args.is_empty() {
3153 Ok(args[0].clone())
3154 } else {
3155 Ok(value.clone())
3156 }
3157 }
3158 "unwrap_or_default" => {
3159 if value.is_null() {
3160 Ok(json!(0))
3162 } else {
3163 Ok(value.clone())
3164 }
3165 }
3166 "is_some" => Ok(json!(!value.is_null())),
3167 "is_none" => Ok(json!(value.is_null())),
3168 "abs" => {
3169 if let Some(n) = value.as_i64() {
3170 Ok(json!(n.abs()))
3171 } else if let Some(n) = value.as_f64() {
3172 Ok(json!(n.abs()))
3173 } else {
3174 Err(format!("Cannot call abs() on {:?}", value).into())
3175 }
3176 }
3177 "len" => {
3178 if let Some(s) = value.as_str() {
3179 Ok(json!(s.len()))
3180 } else if let Some(arr) = value.as_array() {
3181 Ok(json!(arr.len()))
3182 } else if let Some(obj) = value.as_object() {
3183 Ok(json!(obj.len()))
3184 } else {
3185 Err(format!("Cannot call len() on {:?}", value).into())
3186 }
3187 }
3188 "to_string" => Ok(json!(value.to_string())),
3189 "min" => {
3190 if args.is_empty() {
3191 return Err("min() requires an argument".into());
3192 }
3193 let other = &args[0];
3194 if let (Some(a), Some(b)) = (value.as_i64(), other.as_i64()) {
3195 Ok(json!(a.min(b)))
3196 } else if let (Some(a), Some(b)) = (value.as_f64(), other.as_f64()) {
3197 Ok(json!(a.min(b)))
3198 } else {
3199 Err(format!("Cannot call min() on {:?} and {:?}", value, other).into())
3200 }
3201 }
3202 "max" => {
3203 if args.is_empty() {
3204 return Err("max() requires an argument".into());
3205 }
3206 let other = &args[0];
3207 if let (Some(a), Some(b)) = (value.as_i64(), other.as_i64()) {
3208 Ok(json!(a.max(b)))
3209 } else if let (Some(a), Some(b)) = (value.as_f64(), other.as_f64()) {
3210 Ok(json!(a.max(b)))
3211 } else {
3212 Err(format!("Cannot call max() on {:?} and {:?}", value, other).into())
3213 }
3214 }
3215 "saturating_add" => {
3216 if args.is_empty() {
3217 return Err("saturating_add() requires an argument".into());
3218 }
3219 let other = &args[0];
3220 if let (Some(a), Some(b)) = (value.as_i64(), other.as_i64()) {
3221 Ok(json!(a.saturating_add(b)))
3222 } else if let (Some(a), Some(b)) = (value.as_u64(), other.as_u64()) {
3223 Ok(json!(a.saturating_add(b)))
3224 } else {
3225 Err(format!(
3226 "Cannot call saturating_add() on {:?} and {:?}",
3227 value, other
3228 )
3229 .into())
3230 }
3231 }
3232 "saturating_sub" => {
3233 if args.is_empty() {
3234 return Err("saturating_sub() requires an argument".into());
3235 }
3236 let other = &args[0];
3237 if let (Some(a), Some(b)) = (value.as_i64(), other.as_i64()) {
3238 Ok(json!(a.saturating_sub(b)))
3239 } else if let (Some(a), Some(b)) = (value.as_u64(), other.as_u64()) {
3240 Ok(json!(a.saturating_sub(b)))
3241 } else {
3242 Err(format!(
3243 "Cannot call saturating_sub() on {:?} and {:?}",
3244 value, other
3245 )
3246 .into())
3247 }
3248 }
3249 _ => Err(format!("Unknown method call: {}()", method).into()),
3250 }
3251 }
3252
3253 pub fn evaluate_computed_fields_from_ast(
3256 &self,
3257 state: &mut Value,
3258 computed_field_specs: &[ComputedFieldSpec],
3259 ) -> Result<Vec<String>> {
3260 let mut updated_paths = Vec::new();
3261
3262 for spec in computed_field_specs {
3263 if let Ok(result) = self.evaluate_computed_expr(&spec.expression, state) {
3264 self.set_field_in_state(state, &spec.target_path, result)?;
3265 updated_paths.push(spec.target_path.clone());
3266 }
3267 }
3268
3269 Ok(updated_paths)
3270 }
3271
3272 fn set_field_in_state(&self, state: &mut Value, path: &str, value: Value) -> Result<()> {
3274 let segments: Vec<&str> = path.split('.').collect();
3275
3276 if segments.is_empty() {
3277 return Err("Empty path".into());
3278 }
3279
3280 let mut current = state;
3282 for (i, segment) in segments.iter().enumerate() {
3283 if i == segments.len() - 1 {
3284 if let Some(obj) = current.as_object_mut() {
3286 obj.insert(segment.to_string(), value);
3287 return Ok(());
3288 } else {
3289 return Err(format!("Cannot set field '{}' on non-object", segment).into());
3290 }
3291 } else {
3292 if !current.is_object() {
3294 *current = json!({});
3295 }
3296 let obj = current.as_object_mut().unwrap();
3297 current = obj.entry(segment.to_string()).or_insert_with(|| json!({}));
3298 }
3299 }
3300
3301 Ok(())
3302 }
3303
3304 pub fn create_evaluator_from_specs(
3307 specs: Vec<ComputedFieldSpec>,
3308 ) -> impl Fn(&mut Value) -> Result<()> + Send + Sync + 'static {
3309 move |state: &mut Value| {
3310 let vm = VmContext::new();
3313 vm.evaluate_computed_fields_from_ast(state, &specs)?;
3314 Ok(())
3315 }
3316 }
3317}
3318
3319impl Default for VmContext {
3320 fn default() -> Self {
3321 Self::new()
3322 }
3323}
3324
3325impl crate::resolvers::ReverseLookupUpdater for VmContext {
3327 fn update(&mut self, pda_address: String, seed_value: String) -> Vec<PendingAccountUpdate> {
3328 self.update_pda_reverse_lookup(0, "default_pda_lookup", pda_address, seed_value)
3330 .unwrap_or_else(|e| {
3331 tracing::error!("Failed to update PDA reverse lookup: {}", e);
3332 Vec::new()
3333 })
3334 }
3335
3336 fn flush_pending(&mut self, pda_address: &str) -> Vec<PendingAccountUpdate> {
3337 self.flush_pending_updates(0, pda_address)
3339 .unwrap_or_else(|e| {
3340 tracing::error!("Failed to flush pending updates: {}", e);
3341 Vec::new()
3342 })
3343 }
3344}
3345
3346#[cfg(test)]
3347mod tests {
3348 use super::*;
3349 use crate::ast::{BinaryOp, ComputedExpr, ComputedFieldSpec};
3350
3351 #[test]
3352 fn test_computed_field_preserves_integer_type() {
3353 let vm = VmContext::new();
3354
3355 let mut state = serde_json::json!({
3356 "trading": {
3357 "total_buy_volume": 20000000000_i64,
3358 "total_sell_volume": 17951316474_i64
3359 }
3360 });
3361
3362 let spec = ComputedFieldSpec {
3363 target_path: "trading.total_volume".to_string(),
3364 result_type: "Option<u64>".to_string(),
3365 expression: ComputedExpr::Binary {
3366 op: BinaryOp::Add,
3367 left: Box::new(ComputedExpr::UnwrapOr {
3368 expr: Box::new(ComputedExpr::FieldRef {
3369 path: "trading.total_buy_volume".to_string(),
3370 }),
3371 default: serde_json::json!(0),
3372 }),
3373 right: Box::new(ComputedExpr::UnwrapOr {
3374 expr: Box::new(ComputedExpr::FieldRef {
3375 path: "trading.total_sell_volume".to_string(),
3376 }),
3377 default: serde_json::json!(0),
3378 }),
3379 },
3380 };
3381
3382 vm.evaluate_computed_fields_from_ast(&mut state, &[spec])
3383 .unwrap();
3384
3385 let total_volume = state
3386 .get("trading")
3387 .and_then(|t| t.get("total_volume"))
3388 .expect("total_volume should exist");
3389
3390 let serialized = serde_json::to_string(total_volume).unwrap();
3391 assert!(
3392 !serialized.contains('.'),
3393 "Integer should not have decimal point: {}",
3394 serialized
3395 );
3396 assert_eq!(
3397 total_volume.as_i64(),
3398 Some(37951316474),
3399 "Value should be correct sum"
3400 );
3401 }
3402
3403 #[test]
3404 fn test_set_field_sum_preserves_integer_type() {
3405 let mut vm = VmContext::new();
3406 vm.registers[0] = serde_json::json!({});
3407 vm.registers[1] = serde_json::json!(20000000000_i64);
3408 vm.registers[2] = serde_json::json!(17951316474_i64);
3409
3410 vm.set_field_sum(0, "trading.total_buy_volume", 1).unwrap();
3411 vm.set_field_sum(0, "trading.total_sell_volume", 2).unwrap();
3412
3413 let state = &vm.registers[0];
3414 let buy_vol = state
3415 .get("trading")
3416 .and_then(|t| t.get("total_buy_volume"))
3417 .unwrap();
3418 let sell_vol = state
3419 .get("trading")
3420 .and_then(|t| t.get("total_sell_volume"))
3421 .unwrap();
3422
3423 let buy_serialized = serde_json::to_string(buy_vol).unwrap();
3424 let sell_serialized = serde_json::to_string(sell_vol).unwrap();
3425
3426 assert!(
3427 !buy_serialized.contains('.'),
3428 "Buy volume should not have decimal: {}",
3429 buy_serialized
3430 );
3431 assert!(
3432 !sell_serialized.contains('.'),
3433 "Sell volume should not have decimal: {}",
3434 sell_serialized
3435 );
3436 }
3437}