1use crate::ast::{
2 BinaryOp, ComparisonOp, ComputedExpr, ComputedFieldSpec, FieldPath, Transformation,
3};
4use crate::compiler::{MultiEntityBytecode, OpCode};
5use crate::Mutation;
6use dashmap::DashMap;
7use lru::LruCache;
8use serde_json::{json, Value};
9use std::collections::{HashMap, HashSet};
10use std::num::NonZeroUsize;
11
12#[cfg(feature = "otel")]
13use tracing::instrument;
14
15#[derive(Debug, Clone, Default)]
18pub struct UpdateContext {
19 pub slot: Option<u64>,
21 pub signature: Option<String>,
23 pub timestamp: Option<i64>,
26 pub write_version: Option<u64>,
29 pub txn_index: Option<u64>,
32 pub metadata: HashMap<String, Value>,
34}
35
36impl UpdateContext {
37 pub fn new(slot: u64, signature: String) -> Self {
39 Self {
40 slot: Some(slot),
41 signature: Some(signature),
42 timestamp: None,
43 write_version: None,
44 txn_index: None,
45 metadata: HashMap::new(),
46 }
47 }
48
49 pub fn with_timestamp(slot: u64, signature: String, timestamp: i64) -> Self {
51 Self {
52 slot: Some(slot),
53 signature: Some(signature),
54 timestamp: Some(timestamp),
55 write_version: None,
56 txn_index: None,
57 metadata: HashMap::new(),
58 }
59 }
60
61 pub fn new_account(slot: u64, signature: String, write_version: u64) -> Self {
63 Self {
64 slot: Some(slot),
65 signature: Some(signature),
66 timestamp: None,
67 write_version: Some(write_version),
68 txn_index: None,
69 metadata: HashMap::new(),
70 }
71 }
72
73 pub fn new_instruction(slot: u64, signature: String, txn_index: u64) -> Self {
75 Self {
76 slot: Some(slot),
77 signature: Some(signature),
78 timestamp: None,
79 write_version: None,
80 txn_index: Some(txn_index),
81 metadata: HashMap::new(),
82 }
83 }
84
85 pub fn timestamp(&self) -> i64 {
87 self.timestamp.unwrap_or_else(|| {
88 std::time::SystemTime::now()
89 .duration_since(std::time::UNIX_EPOCH)
90 .unwrap()
91 .as_secs() as i64
92 })
93 }
94
95 pub fn empty() -> Self {
97 Self::default()
98 }
99
100 pub fn with_metadata(mut self, key: String, value: Value) -> Self {
102 self.metadata.insert(key, value);
103 self
104 }
105
106 pub fn get_metadata(&self, key: &str) -> Option<&Value> {
108 self.metadata.get(key)
109 }
110
111 pub fn to_value(&self) -> Value {
113 let mut obj = serde_json::Map::new();
114 if let Some(slot) = self.slot {
115 obj.insert("slot".to_string(), json!(slot));
116 }
117 if let Some(ref sig) = self.signature {
118 obj.insert("signature".to_string(), json!(sig));
119 }
120 obj.insert("timestamp".to_string(), json!(self.timestamp()));
122 for (key, value) in &self.metadata {
123 obj.insert(key.clone(), value.clone());
124 }
125 Value::Object(obj)
126 }
127}
128
129pub type Register = usize;
130pub type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>;
131
132pub type RegisterValue = Value;
133
134pub trait ComputedFieldsEvaluator {
137 fn evaluate(&self, state: &mut Value) -> Result<()>;
138}
139
140const MAX_PENDING_UPDATES_TOTAL: usize = 2_500;
142const MAX_PENDING_UPDATES_PER_PDA: usize = 50;
143const PENDING_UPDATE_TTL_SECONDS: i64 = 300; const TEMPORAL_HISTORY_TTL_SECONDS: i64 = 300; const MAX_TEMPORAL_ENTRIES_PER_KEY: usize = 250;
148
149const DEFAULT_MAX_STATE_TABLE_ENTRIES: usize = 2_500;
151const DEFAULT_MAX_ARRAY_LENGTH: usize = 100;
152
153const DEFAULT_MAX_LOOKUP_INDEX_ENTRIES: usize = 2_500;
154
155const DEFAULT_MAX_VERSION_TRACKER_ENTRIES: usize = 2_500;
156
157const DEFAULT_MAX_TEMPORAL_INDEX_KEYS: usize = 2_500;
158
159const DEFAULT_MAX_PDA_REVERSE_LOOKUP_ENTRIES: usize = 2_500;
160
161fn estimate_json_size(value: &Value) -> usize {
163 match value {
164 Value::Null => 4,
165 Value::Bool(_) => 5,
166 Value::Number(_) => 8,
167 Value::String(s) => s.len() + 2,
168 Value::Array(arr) => 2 + arr.iter().map(|v| estimate_json_size(v) + 1).sum::<usize>(),
169 Value::Object(obj) => {
170 2 + obj
171 .iter()
172 .map(|(k, v)| k.len() + 3 + estimate_json_size(v) + 1)
173 .sum::<usize>()
174 }
175 }
176}
177
178#[derive(Debug, Clone)]
179pub struct CompiledPath {
180 pub segments: std::sync::Arc<[String]>,
181}
182
183impl CompiledPath {
184 pub fn new(path: &str) -> Self {
185 let segments: Vec<String> = path.split('.').map(|s| s.to_string()).collect();
186 CompiledPath {
187 segments: segments.into(),
188 }
189 }
190
191 fn segments(&self) -> &[String] {
192 &self.segments
193 }
194}
195
196#[derive(Debug, Clone)]
199pub enum FieldChange {
200 Replaced,
202 Appended(Vec<Value>),
204}
205
206#[derive(Debug, Clone, Default)]
209pub struct DirtyTracker {
210 changes: HashMap<String, FieldChange>,
211}
212
213impl DirtyTracker {
214 pub fn new() -> Self {
216 Self {
217 changes: HashMap::new(),
218 }
219 }
220
221 pub fn mark_replaced(&mut self, path: &str) {
223 self.changes.insert(path.to_string(), FieldChange::Replaced);
225 }
226
227 pub fn mark_appended(&mut self, path: &str, value: Value) {
229 match self.changes.get_mut(path) {
230 Some(FieldChange::Appended(values)) => {
231 values.push(value);
233 }
234 Some(FieldChange::Replaced) => {
235 }
238 None => {
239 self.changes
241 .insert(path.to_string(), FieldChange::Appended(vec![value]));
242 }
243 }
244 }
245
246 pub fn is_empty(&self) -> bool {
248 self.changes.is_empty()
249 }
250
251 pub fn len(&self) -> usize {
253 self.changes.len()
254 }
255
256 pub fn iter(&self) -> impl Iterator<Item = (&String, &FieldChange)> {
258 self.changes.iter()
259 }
260
261 pub fn dirty_paths(&self) -> HashSet<String> {
263 self.changes.keys().cloned().collect()
264 }
265
266 pub fn into_changes(self) -> HashMap<String, FieldChange> {
268 self.changes
269 }
270
271 pub fn changes(&self) -> &HashMap<String, FieldChange> {
273 &self.changes
274 }
275
276 pub fn appended_paths(&self) -> Vec<String> {
278 self.changes
279 .iter()
280 .filter_map(|(path, change)| match change {
281 FieldChange::Appended(_) => Some(path.clone()),
282 FieldChange::Replaced => None,
283 })
284 .collect()
285 }
286}
287
288pub struct VmContext {
289 registers: Vec<RegisterValue>,
290 states: HashMap<u32, StateTable>,
291 pub instructions_executed: u64,
292 pub cache_hits: u64,
293 path_cache: HashMap<String, CompiledPath>,
294 pub pda_cache_hits: u64,
295 pub pda_cache_misses: u64,
296 pub pending_queue_size: u64,
297 current_context: Option<UpdateContext>,
298 warnings: Vec<String>,
299 last_pda_lookup_miss: Option<String>,
300 last_pda_registered: Option<String>,
301}
302
303#[derive(Debug)]
304pub struct LookupIndex {
305 index: std::sync::Mutex<LruCache<String, Value>>,
306}
307
308impl LookupIndex {
309 pub fn new() -> Self {
310 Self::with_capacity(DEFAULT_MAX_LOOKUP_INDEX_ENTRIES)
311 }
312
313 pub fn with_capacity(capacity: usize) -> Self {
314 LookupIndex {
315 index: std::sync::Mutex::new(LruCache::new(
316 NonZeroUsize::new(capacity).expect("capacity must be > 0"),
317 )),
318 }
319 }
320
321 pub fn lookup(&self, lookup_value: &Value) -> Option<Value> {
322 let key = value_to_cache_key(lookup_value);
323 self.index.lock().unwrap().get(&key).cloned()
324 }
325
326 pub fn insert(&self, lookup_value: Value, primary_key: Value) {
327 let key = value_to_cache_key(&lookup_value);
328 self.index.lock().unwrap().put(key, primary_key);
329 }
330
331 pub fn len(&self) -> usize {
332 self.index.lock().unwrap().len()
333 }
334
335 pub fn is_empty(&self) -> bool {
336 self.index.lock().unwrap().is_empty()
337 }
338}
339
340impl Default for LookupIndex {
341 fn default() -> Self {
342 Self::new()
343 }
344}
345
346fn value_to_cache_key(value: &Value) -> String {
347 match value {
348 Value::String(s) => s.clone(),
349 Value::Number(n) => n.to_string(),
350 Value::Bool(b) => b.to_string(),
351 Value::Null => "null".to_string(),
352 _ => serde_json::to_string(value).unwrap_or_else(|_| "unknown".to_string()),
353 }
354}
355
356#[derive(Debug)]
357pub struct TemporalIndex {
358 index: std::sync::Mutex<LruCache<String, Vec<(Value, i64)>>>,
359}
360
361impl Default for TemporalIndex {
362 fn default() -> Self {
363 Self::new()
364 }
365}
366
367impl TemporalIndex {
368 pub fn new() -> Self {
369 Self::with_capacity(DEFAULT_MAX_TEMPORAL_INDEX_KEYS)
370 }
371
372 pub fn with_capacity(capacity: usize) -> Self {
373 TemporalIndex {
374 index: std::sync::Mutex::new(LruCache::new(
375 NonZeroUsize::new(capacity).expect("capacity must be > 0"),
376 )),
377 }
378 }
379
380 pub fn lookup(&self, lookup_value: &Value, timestamp: i64) -> Option<Value> {
381 let key = value_to_cache_key(lookup_value);
382 let mut cache = self.index.lock().unwrap();
383 if let Some(entries) = cache.get(&key) {
384 for i in (0..entries.len()).rev() {
385 if entries[i].1 <= timestamp {
386 return Some(entries[i].0.clone());
387 }
388 }
389 }
390 None
391 }
392
393 pub fn lookup_latest(&self, lookup_value: &Value) -> Option<Value> {
394 let key = value_to_cache_key(lookup_value);
395 let mut cache = self.index.lock().unwrap();
396 if let Some(entries) = cache.get(&key) {
397 if let Some(last) = entries.last() {
398 return Some(last.0.clone());
399 }
400 }
401 None
402 }
403
404 pub fn insert(&self, lookup_value: Value, primary_key: Value, timestamp: i64) {
405 let key = value_to_cache_key(&lookup_value);
406 let mut cache = self.index.lock().unwrap();
407
408 let entries = cache.get_or_insert_mut(key, Vec::new);
409 entries.push((primary_key, timestamp));
410 entries.sort_by_key(|(_, ts)| *ts);
411
412 let cutoff = timestamp - TEMPORAL_HISTORY_TTL_SECONDS;
413 entries.retain(|(_, ts)| *ts >= cutoff);
414
415 if entries.len() > MAX_TEMPORAL_ENTRIES_PER_KEY {
416 let excess = entries.len() - MAX_TEMPORAL_ENTRIES_PER_KEY;
417 entries.drain(0..excess);
418 }
419 }
420
421 pub fn len(&self) -> usize {
422 self.index.lock().unwrap().len()
423 }
424
425 pub fn is_empty(&self) -> bool {
426 self.index.lock().unwrap().is_empty()
427 }
428
429 pub fn total_entries(&self) -> usize {
430 self.index
431 .lock()
432 .unwrap()
433 .iter()
434 .map(|(_, entries)| entries.len())
435 .sum()
436 }
437
438 pub fn cleanup_expired(&self, cutoff_timestamp: i64) -> usize {
439 let mut cache = self.index.lock().unwrap();
440 let mut total_removed = 0;
441
442 for (_, entries) in cache.iter_mut() {
443 let original_len = entries.len();
444 entries.retain(|(_, ts)| *ts >= cutoff_timestamp);
445 total_removed += original_len - entries.len();
446 }
447
448 total_removed
449 }
450}
451
452#[derive(Debug)]
453pub struct PdaReverseLookup {
454 index: LruCache<String, String>,
456}
457
458impl PdaReverseLookup {
459 pub fn new(capacity: usize) -> Self {
460 PdaReverseLookup {
461 index: LruCache::new(NonZeroUsize::new(capacity).unwrap()),
462 }
463 }
464
465 pub fn lookup(&mut self, pda_address: &str) -> Option<String> {
466 self.index.get(pda_address).cloned()
467 }
468
469 pub fn insert(&mut self, pda_address: String, seed_value: String) -> Option<String> {
470 let evicted = if self.index.len() >= self.index.cap().get() {
471 self.index.peek_lru().map(|(k, _)| k.clone())
472 } else {
473 None
474 };
475
476 self.index.put(pda_address, seed_value);
477 evicted
478 }
479
480 pub fn len(&self) -> usize {
481 self.index.len()
482 }
483
484 pub fn is_empty(&self) -> bool {
485 self.index.is_empty()
486 }
487}
488
489#[derive(Debug, Clone)]
491pub struct QueuedAccountUpdate {
492 pub pda_address: String,
493 pub account_type: String,
494 pub account_data: Value,
495 pub slot: u64,
496 pub write_version: u64,
497 pub signature: String,
498}
499
500#[derive(Debug, Clone)]
502pub struct PendingAccountUpdate {
503 pub account_type: String,
504 pub pda_address: String,
505 pub account_data: Value,
506 pub slot: u64,
507 pub write_version: u64,
508 pub signature: String,
509 pub queued_at: i64,
510}
511
512#[derive(Debug, Clone)]
514pub struct QueuedInstructionEvent {
515 pub pda_address: String,
516 pub event_type: String,
517 pub event_data: Value,
518 pub slot: u64,
519 pub signature: String,
520}
521
522#[derive(Debug, Clone)]
524pub struct PendingInstructionEvent {
525 pub event_type: String,
526 pub pda_address: String,
527 pub event_data: Value,
528 pub slot: u64,
529 pub signature: String,
530 pub queued_at: i64,
531}
532
533#[derive(Debug, Clone)]
534pub struct PendingQueueStats {
535 pub total_updates: usize,
536 pub unique_pdas: usize,
537 pub oldest_age_seconds: i64,
538 pub largest_pda_queue_size: usize,
539 pub estimated_memory_bytes: usize,
540}
541
542#[derive(Debug, Clone, Default)]
543pub struct VmMemoryStats {
544 pub state_table_entity_count: usize,
545 pub state_table_max_entries: usize,
546 pub state_table_at_capacity: bool,
547 pub lookup_index_count: usize,
548 pub lookup_index_total_entries: usize,
549 pub temporal_index_count: usize,
550 pub temporal_index_total_entries: usize,
551 pub pda_reverse_lookup_count: usize,
552 pub pda_reverse_lookup_total_entries: usize,
553 pub version_tracker_entries: usize,
554 pub pending_queue_stats: Option<PendingQueueStats>,
555 pub path_cache_size: usize,
556}
557
558#[derive(Debug, Clone, Default)]
559pub struct CleanupResult {
560 pub pending_updates_removed: usize,
561 pub temporal_entries_removed: usize,
562}
563
564#[derive(Debug, Clone)]
565pub struct CapacityWarning {
566 pub current_entries: usize,
567 pub max_entries: usize,
568 pub entries_over_limit: usize,
569}
570
571#[derive(Debug, Clone)]
572pub struct StateTableConfig {
573 pub max_entries: usize,
574 pub max_array_length: usize,
575}
576
577impl Default for StateTableConfig {
578 fn default() -> Self {
579 Self {
580 max_entries: DEFAULT_MAX_STATE_TABLE_ENTRIES,
581 max_array_length: DEFAULT_MAX_ARRAY_LENGTH,
582 }
583 }
584}
585
586#[derive(Debug)]
587pub struct VersionTracker {
588 cache: std::sync::Mutex<LruCache<String, (u64, u64)>>,
589}
590
591impl VersionTracker {
592 pub fn new() -> Self {
593 Self::with_capacity(DEFAULT_MAX_VERSION_TRACKER_ENTRIES)
594 }
595
596 pub fn with_capacity(capacity: usize) -> Self {
597 VersionTracker {
598 cache: std::sync::Mutex::new(LruCache::new(
599 NonZeroUsize::new(capacity).expect("capacity must be > 0"),
600 )),
601 }
602 }
603
604 fn make_key(primary_key: &Value, event_type: &str) -> String {
605 format!("{}:{}", primary_key, event_type)
606 }
607
608 pub fn get(&self, primary_key: &Value, event_type: &str) -> Option<(u64, u64)> {
609 let key = Self::make_key(primary_key, event_type);
610 self.cache.lock().unwrap().get(&key).copied()
611 }
612
613 pub fn insert(&self, primary_key: &Value, event_type: &str, slot: u64, ordering_value: u64) {
614 let key = Self::make_key(primary_key, event_type);
615 self.cache.lock().unwrap().put(key, (slot, ordering_value));
616 }
617
618 pub fn len(&self) -> usize {
619 self.cache.lock().unwrap().len()
620 }
621
622 pub fn is_empty(&self) -> bool {
623 self.cache.lock().unwrap().is_empty()
624 }
625}
626
627impl Default for VersionTracker {
628 fn default() -> Self {
629 Self::new()
630 }
631}
632
633#[derive(Debug)]
634pub struct StateTable {
635 pub data: DashMap<Value, Value>,
636 access_times: DashMap<Value, i64>,
637 pub lookup_indexes: HashMap<String, LookupIndex>,
638 pub temporal_indexes: HashMap<String, TemporalIndex>,
639 pub pda_reverse_lookups: HashMap<String, PdaReverseLookup>,
640 pub pending_updates: DashMap<String, Vec<PendingAccountUpdate>>,
641 pub pending_instruction_events: DashMap<String, Vec<PendingInstructionEvent>>,
642 version_tracker: VersionTracker,
643 config: StateTableConfig,
644 #[cfg_attr(not(feature = "otel"), allow(dead_code))]
645 entity_name: String,
646}
647
648impl StateTable {
649 pub fn is_at_capacity(&self) -> bool {
650 self.data.len() >= self.config.max_entries
651 }
652
653 pub fn entries_over_limit(&self) -> usize {
654 self.data.len().saturating_sub(self.config.max_entries)
655 }
656
657 pub fn max_array_length(&self) -> usize {
658 self.config.max_array_length
659 }
660
661 fn touch(&self, key: &Value) {
662 let now = std::time::SystemTime::now()
663 .duration_since(std::time::UNIX_EPOCH)
664 .unwrap()
665 .as_secs() as i64;
666 self.access_times.insert(key.clone(), now);
667 }
668
669 fn evict_lru(&self, count: usize) -> usize {
670 if count == 0 || self.data.is_empty() {
671 return 0;
672 }
673
674 let mut entries: Vec<(Value, i64)> = self
675 .access_times
676 .iter()
677 .map(|entry| (entry.key().clone(), *entry.value()))
678 .collect();
679
680 entries.sort_by_key(|(_, ts)| *ts);
681
682 let to_evict: Vec<Value> = entries.iter().take(count).map(|(k, _)| k.clone()).collect();
683
684 let mut evicted = 0;
685 for key in to_evict {
686 self.data.remove(&key);
687 self.access_times.remove(&key);
688 evicted += 1;
689 }
690
691 #[cfg(feature = "otel")]
692 if evicted > 0 {
693 crate::vm_metrics::record_state_table_eviction(evicted as u64, &self.entity_name);
694 }
695
696 evicted
697 }
698
699 pub fn insert_with_eviction(&self, key: Value, value: Value) {
700 if self.data.len() >= self.config.max_entries && !self.data.contains_key(&key) {
701 #[cfg(feature = "otel")]
702 crate::vm_metrics::record_state_table_at_capacity(&self.entity_name);
703 let to_evict = (self.data.len() + 1).saturating_sub(self.config.max_entries);
704 self.evict_lru(to_evict.max(1));
705 }
706 self.data.insert(key.clone(), value);
707 self.touch(&key);
708 }
709
710 pub fn get_and_touch(&self, key: &Value) -> Option<Value> {
711 let result = self.data.get(key).map(|v| v.clone());
712 if result.is_some() {
713 self.touch(key);
714 }
715 result
716 }
717
718 pub fn is_fresh_update(
725 &self,
726 primary_key: &Value,
727 event_type: &str,
728 slot: u64,
729 ordering_value: u64,
730 ) -> bool {
731 let dominated = self
732 .version_tracker
733 .get(primary_key, event_type)
734 .map(|(last_slot, last_version)| (slot, ordering_value) <= (last_slot, last_version))
735 .unwrap_or(false);
736
737 if dominated {
738 return false;
739 }
740
741 self.version_tracker
742 .insert(primary_key, event_type, slot, ordering_value);
743 true
744 }
745}
746
747impl VmContext {
748 pub fn new() -> Self {
749 let mut vm = VmContext {
750 registers: vec![Value::Null; 256],
751 states: HashMap::new(),
752 instructions_executed: 0,
753 cache_hits: 0,
754 path_cache: HashMap::new(),
755 pda_cache_hits: 0,
756 pda_cache_misses: 0,
757 pending_queue_size: 0,
758 current_context: None,
759 warnings: Vec::new(),
760 last_pda_lookup_miss: None,
761 last_pda_registered: None,
762 };
763 vm.states.insert(
764 0,
765 StateTable {
766 data: DashMap::new(),
767 access_times: DashMap::new(),
768 lookup_indexes: HashMap::new(),
769 temporal_indexes: HashMap::new(),
770 pda_reverse_lookups: HashMap::new(),
771 pending_updates: DashMap::new(),
772 pending_instruction_events: DashMap::new(),
773 version_tracker: VersionTracker::new(),
774 config: StateTableConfig::default(),
775 entity_name: "default".to_string(),
776 },
777 );
778 vm
779 }
780
781 pub fn new_with_config(state_config: StateTableConfig) -> Self {
782 let mut vm = VmContext {
783 registers: vec![Value::Null; 256],
784 states: HashMap::new(),
785 instructions_executed: 0,
786 cache_hits: 0,
787 path_cache: HashMap::new(),
788 pda_cache_hits: 0,
789 pda_cache_misses: 0,
790 pending_queue_size: 0,
791 current_context: None,
792 warnings: Vec::new(),
793 last_pda_lookup_miss: None,
794 last_pda_registered: None,
795 };
796 vm.states.insert(
797 0,
798 StateTable {
799 data: DashMap::new(),
800 access_times: DashMap::new(),
801 lookup_indexes: HashMap::new(),
802 temporal_indexes: HashMap::new(),
803 pda_reverse_lookups: HashMap::new(),
804 pending_updates: DashMap::new(),
805 pending_instruction_events: DashMap::new(),
806 version_tracker: VersionTracker::new(),
807 config: state_config,
808 entity_name: "default".to_string(),
809 },
810 );
811 vm
812 }
813
814 pub fn get_state_table_mut(&mut self, state_id: u32) -> Option<&mut StateTable> {
817 self.states.get_mut(&state_id)
818 }
819
820 pub fn registers_mut(&mut self) -> &mut Vec<RegisterValue> {
822 &mut self.registers
823 }
824
825 pub fn path_cache(&self) -> &HashMap<String, CompiledPath> {
827 &self.path_cache
828 }
829
830 pub fn current_context(&self) -> Option<&UpdateContext> {
832 self.current_context.as_ref()
833 }
834
835 fn add_warning(&mut self, msg: String) {
836 self.warnings.push(msg);
837 }
838
839 pub fn take_warnings(&mut self) -> Vec<String> {
840 std::mem::take(&mut self.warnings)
841 }
842
843 pub fn has_warnings(&self) -> bool {
844 !self.warnings.is_empty()
845 }
846
847 pub fn update_state_from_register(
848 &mut self,
849 state_id: u32,
850 key: Value,
851 register: Register,
852 ) -> Result<()> {
853 let state = self.states.get(&state_id).ok_or("State table not found")?;
854 let value = self.registers[register].clone();
855 state.insert_with_eviction(key, value);
856 Ok(())
857 }
858
859 fn reset_registers(&mut self) {
860 for reg in &mut self.registers {
861 *reg = Value::Null;
862 }
863 }
864
865 pub fn extract_partial_state(
867 &self,
868 state_reg: Register,
869 dirty_fields: &HashSet<String>,
870 ) -> Result<Value> {
871 let full_state = &self.registers[state_reg];
872
873 if dirty_fields.is_empty() {
874 return Ok(json!({}));
875 }
876
877 let mut partial = serde_json::Map::new();
878
879 for path in dirty_fields {
880 let segments: Vec<&str> = path.split('.').collect();
881
882 let mut current = full_state;
883 let mut found = true;
884
885 for segment in &segments {
886 match current.get(segment) {
887 Some(v) => current = v,
888 None => {
889 found = false;
890 break;
891 }
892 }
893 }
894
895 if !found {
896 continue;
897 }
898
899 let mut target = &mut partial;
900 for (i, segment) in segments.iter().enumerate() {
901 if i == segments.len() - 1 {
902 target.insert(segment.to_string(), current.clone());
903 } else {
904 target
905 .entry(segment.to_string())
906 .or_insert_with(|| json!({}));
907 target = target
908 .get_mut(*segment)
909 .and_then(|v| v.as_object_mut())
910 .ok_or("Failed to build nested structure")?;
911 }
912 }
913 }
914
915 Ok(Value::Object(partial))
916 }
917
918 pub fn extract_partial_state_with_tracker(
922 &self,
923 state_reg: Register,
924 tracker: &DirtyTracker,
925 ) -> Result<Value> {
926 let full_state = &self.registers[state_reg];
927
928 if tracker.is_empty() {
929 return Ok(json!({}));
930 }
931
932 let mut partial = serde_json::Map::new();
933
934 for (path, change) in tracker.iter() {
935 let segments: Vec<&str> = path.split('.').collect();
936
937 let value_to_insert = match change {
938 FieldChange::Replaced => {
939 let mut current = full_state;
940 let mut found = true;
941
942 for segment in &segments {
943 match current.get(*segment) {
944 Some(v) => current = v,
945 None => {
946 found = false;
947 break;
948 }
949 }
950 }
951
952 if !found {
953 continue;
954 }
955 current.clone()
956 }
957 FieldChange::Appended(values) => Value::Array(values.clone()),
958 };
959
960 let mut target = &mut partial;
961 for (i, segment) in segments.iter().enumerate() {
962 if i == segments.len() - 1 {
963 target.insert(segment.to_string(), value_to_insert.clone());
964 } else {
965 target
966 .entry(segment.to_string())
967 .or_insert_with(|| json!({}));
968 target = target
969 .get_mut(*segment)
970 .and_then(|v| v.as_object_mut())
971 .ok_or("Failed to build nested structure")?;
972 }
973 }
974 }
975
976 Ok(Value::Object(partial))
977 }
978
979 fn get_compiled_path(&mut self, path: &str) -> CompiledPath {
980 if let Some(compiled) = self.path_cache.get(path) {
981 self.cache_hits += 1;
982 #[cfg(feature = "otel")]
983 crate::vm_metrics::record_path_cache_hit();
984 return compiled.clone();
985 }
986 #[cfg(feature = "otel")]
987 crate::vm_metrics::record_path_cache_miss();
988 let compiled = CompiledPath::new(path);
989 self.path_cache.insert(path.to_string(), compiled.clone());
990 compiled
991 }
992
993 #[cfg_attr(feature = "otel", instrument(
995 name = "vm.process_event",
996 skip(self, bytecode, event_value, log),
997 level = "info",
998 fields(
999 event_type = %event_type,
1000 slot = context.as_ref().and_then(|c| c.slot),
1001 )
1002 ))]
1003 pub fn process_event(
1004 &mut self,
1005 bytecode: &MultiEntityBytecode,
1006 event_value: Value,
1007 event_type: &str,
1008 context: Option<&UpdateContext>,
1009 mut log: Option<&mut crate::canonical_log::CanonicalLog>,
1010 ) -> Result<Vec<Mutation>> {
1011 self.current_context = context.cloned();
1012
1013 let mut event_value = event_value;
1014 if let Some(ctx) = context {
1015 if let Some(obj) = event_value.as_object_mut() {
1016 obj.insert("__update_context".to_string(), ctx.to_value());
1017 }
1018 }
1019
1020 let mut all_mutations = Vec::new();
1021
1022 if let Some(entity_names) = bytecode.event_routing.get(event_type) {
1023 for entity_name in entity_names {
1024 if let Some(entity_bytecode) = bytecode.entities.get(entity_name) {
1025 if let Some(handler) = entity_bytecode.handlers.get(event_type) {
1026 if let Some(ref mut log) = log {
1027 log.set("entity", entity_name.clone());
1028 log.inc("handlers", 1);
1029 }
1030
1031 let opcodes_before = self.instructions_executed;
1032 let cache_before = self.cache_hits;
1033 let pda_hits_before = self.pda_cache_hits;
1034 let pda_misses_before = self.pda_cache_misses;
1035
1036 let mutations = self.execute_handler(
1037 handler,
1038 &event_value,
1039 event_type,
1040 entity_bytecode.state_id,
1041 entity_name,
1042 entity_bytecode.computed_fields_evaluator.as_ref(),
1043 )?;
1044
1045 if let Some(ref mut log) = log {
1046 log.inc(
1047 "opcodes",
1048 (self.instructions_executed - opcodes_before) as i64,
1049 );
1050 log.inc("cache_hits", (self.cache_hits - cache_before) as i64);
1051 log.inc("pda_hits", (self.pda_cache_hits - pda_hits_before) as i64);
1052 log.inc(
1053 "pda_misses",
1054 (self.pda_cache_misses - pda_misses_before) as i64,
1055 );
1056 }
1057
1058 if mutations.is_empty() {
1059 if let Some(missed_pda) = self.take_last_pda_lookup_miss() {
1060 if event_type.ends_with("IxState") {
1061 let slot = context.and_then(|c| c.slot).unwrap_or(0);
1062 let signature = context
1063 .and_then(|c| c.signature.clone())
1064 .unwrap_or_default();
1065 let _ = self.queue_instruction_event(
1066 entity_bytecode.state_id,
1067 QueuedInstructionEvent {
1068 pda_address: missed_pda,
1069 event_type: event_type.to_string(),
1070 event_data: event_value.clone(),
1071 slot,
1072 signature,
1073 },
1074 );
1075 }
1076 }
1077 }
1078
1079 all_mutations.extend(mutations);
1080
1081 if let Some(registered_pda) = self.take_last_pda_registered() {
1082 let pending_events = self.flush_pending_instruction_events(
1083 entity_bytecode.state_id,
1084 ®istered_pda,
1085 );
1086 for pending in pending_events {
1087 if let Some(pending_handler) =
1088 entity_bytecode.handlers.get(&pending.event_type)
1089 {
1090 if let Ok(reprocessed_mutations) = self.execute_handler(
1091 pending_handler,
1092 &pending.event_data,
1093 &pending.event_type,
1094 entity_bytecode.state_id,
1095 entity_name,
1096 entity_bytecode.computed_fields_evaluator.as_ref(),
1097 ) {
1098 all_mutations.extend(reprocessed_mutations);
1099 }
1100 }
1101 }
1102 }
1103 } else if let Some(ref mut log) = log {
1104 log.set("skip_reason", "no_handler");
1105 }
1106 } else if let Some(ref mut log) = log {
1107 log.set("skip_reason", "entity_not_found");
1108 }
1109 }
1110 } else if let Some(ref mut log) = log {
1111 log.set("skip_reason", "no_event_routing");
1112 }
1113
1114 if let Some(log) = log {
1115 log.set("mutations", all_mutations.len() as i64);
1116 if let Some(first) = all_mutations.first() {
1117 if let Some(key_str) = first.key.as_str() {
1118 log.set("primary_key", key_str);
1119 } else if let Some(key_num) = first.key.as_u64() {
1120 log.set("primary_key", key_num as i64);
1121 }
1122 }
1123 if let Some(state) = self.states.get(&0) {
1124 log.set("state_table_size", state.data.len() as i64);
1125 }
1126
1127 let warnings = self.take_warnings();
1128 if !warnings.is_empty() {
1129 log.set("warnings", warnings.len() as i64);
1130 log.set(
1131 "warning_messages",
1132 Value::Array(warnings.into_iter().map(Value::String).collect()),
1133 );
1134 log.set_level(crate::canonical_log::LogLevel::Warn);
1135 }
1136 } else {
1137 self.warnings.clear();
1138 }
1139
1140 Ok(all_mutations)
1141 }
1142
1143 pub fn process_any(
1144 &mut self,
1145 bytecode: &MultiEntityBytecode,
1146 any: prost_types::Any,
1147 ) -> Result<Vec<Mutation>> {
1148 let (event_value, event_type) = bytecode.proto_router.decode(any)?;
1149 self.process_event(bytecode, event_value, &event_type, None, None)
1150 }
1151
1152 #[cfg_attr(feature = "otel", instrument(
1153 name = "vm.execute_handler",
1154 skip(self, handler, event_value, entity_evaluator),
1155 level = "debug",
1156 fields(
1157 event_type = %event_type,
1158 handler_opcodes = handler.len(),
1159 )
1160 ))]
1161 #[allow(clippy::type_complexity)]
1162 fn execute_handler(
1163 &mut self,
1164 handler: &[OpCode],
1165 event_value: &Value,
1166 event_type: &str,
1167 override_state_id: u32,
1168 entity_name: &str,
1169 entity_evaluator: Option<&Box<dyn Fn(&mut Value) -> Result<()> + Send + Sync>>,
1170 ) -> Result<Vec<Mutation>> {
1171 self.reset_registers();
1172 self.last_pda_lookup_miss = None;
1173
1174 let mut pc: usize = 0;
1175 let mut output = Vec::new();
1176 let mut dirty_tracker = DirtyTracker::new();
1177
1178 while pc < handler.len() {
1179 match &handler[pc] {
1180 OpCode::LoadEventField {
1181 path,
1182 dest,
1183 default,
1184 } => {
1185 let value = self.load_field(event_value, path, default.as_ref())?;
1186 self.registers[*dest] = value;
1187 pc += 1;
1188 }
1189 OpCode::LoadConstant { value, dest } => {
1190 self.registers[*dest] = value.clone();
1191 pc += 1;
1192 }
1193 OpCode::CopyRegister { source, dest } => {
1194 self.registers[*dest] = self.registers[*source].clone();
1195 pc += 1;
1196 }
1197 OpCode::CopyRegisterIfNull { source, dest } => {
1198 if self.registers[*dest].is_null() {
1199 self.registers[*dest] = self.registers[*source].clone();
1200 }
1201 pc += 1;
1202 }
1203 OpCode::GetEventType { dest } => {
1204 self.registers[*dest] = json!(event_type);
1205 pc += 1;
1206 }
1207 OpCode::CreateObject { dest } => {
1208 self.registers[*dest] = json!({});
1209 pc += 1;
1210 }
1211 OpCode::SetField {
1212 object,
1213 path,
1214 value,
1215 } => {
1216 self.set_field_auto_vivify(*object, path, *value)?;
1217 dirty_tracker.mark_replaced(path);
1218 pc += 1;
1219 }
1220 OpCode::SetFields { object, fields } => {
1221 for (path, value_reg) in fields {
1222 self.set_field_auto_vivify(*object, path, *value_reg)?;
1223 dirty_tracker.mark_replaced(path);
1224 }
1225 pc += 1;
1226 }
1227 OpCode::GetField { object, path, dest } => {
1228 let value = self.get_field(*object, path)?;
1229 self.registers[*dest] = value;
1230 pc += 1;
1231 }
1232 OpCode::ReadOrInitState {
1233 state_id: _,
1234 key,
1235 default,
1236 dest,
1237 } => {
1238 let actual_state_id = override_state_id;
1239 let entity_name_owned = entity_name.to_string();
1240 self.states
1241 .entry(actual_state_id)
1242 .or_insert_with(|| StateTable {
1243 data: DashMap::new(),
1244 access_times: DashMap::new(),
1245 lookup_indexes: HashMap::new(),
1246 temporal_indexes: HashMap::new(),
1247 pda_reverse_lookups: HashMap::new(),
1248 pending_updates: DashMap::new(),
1249 pending_instruction_events: DashMap::new(),
1250 version_tracker: VersionTracker::new(),
1251 config: StateTableConfig::default(),
1252 entity_name: entity_name_owned,
1253 });
1254 let key_value = self.registers[*key].clone();
1255 let warn_null_key = key_value.is_null()
1256 && event_type.ends_with("State")
1257 && !event_type.ends_with("IxState");
1258
1259 if warn_null_key {
1260 self.add_warning(format!(
1261 "ReadOrInitState: key register {} is NULL for account state, event_type={}",
1262 key, event_type
1263 ));
1264 }
1265
1266 let state = self
1267 .states
1268 .get(&actual_state_id)
1269 .ok_or("State table not found")?;
1270
1271 if !key_value.is_null() {
1272 if let Some(ctx) = &self.current_context {
1273 let ordering_value = ctx.write_version.or(ctx.txn_index);
1274 if let (Some(slot), Some(version)) = (ctx.slot, ordering_value) {
1275 if !state.is_fresh_update(&key_value, event_type, slot, version) {
1276 self.add_warning(format!(
1277 "Stale update skipped: slot={}, version={}",
1278 slot, version
1279 ));
1280 return Ok(Vec::new());
1281 }
1282 }
1283 }
1284 }
1285 let value = state
1286 .get_and_touch(&key_value)
1287 .unwrap_or_else(|| default.clone());
1288
1289 self.registers[*dest] = value;
1290 pc += 1;
1291 }
1292 OpCode::UpdateState {
1293 state_id: _,
1294 key,
1295 value,
1296 } => {
1297 let actual_state_id = override_state_id;
1298 let state = self
1299 .states
1300 .get(&actual_state_id)
1301 .ok_or("State table not found")?;
1302 let key_value = self.registers[*key].clone();
1303 let value_data = self.registers[*value].clone();
1304
1305 state.insert_with_eviction(key_value, value_data);
1306 pc += 1;
1307 }
1308 OpCode::AppendToArray {
1309 object,
1310 path,
1311 value,
1312 } => {
1313 let appended_value = self.registers[*value].clone();
1314 let max_len = self
1315 .states
1316 .get(&override_state_id)
1317 .map(|s| s.max_array_length())
1318 .unwrap_or(DEFAULT_MAX_ARRAY_LENGTH);
1319 self.append_to_array(*object, path, *value, max_len)?;
1320 dirty_tracker.mark_appended(path, appended_value);
1321 pc += 1;
1322 }
1323 OpCode::GetCurrentTimestamp { dest } => {
1324 let timestamp = std::time::SystemTime::now()
1325 .duration_since(std::time::UNIX_EPOCH)
1326 .unwrap()
1327 .as_secs() as i64;
1328 self.registers[*dest] = json!(timestamp);
1329 pc += 1;
1330 }
1331 OpCode::CreateEvent { dest, event_value } => {
1332 let timestamp = std::time::SystemTime::now()
1333 .duration_since(std::time::UNIX_EPOCH)
1334 .unwrap()
1335 .as_secs() as i64;
1336
1337 let mut event_data = self.registers[*event_value].clone();
1339 if let Some(obj) = event_data.as_object_mut() {
1340 obj.remove("__update_context");
1341 }
1342
1343 let mut event = serde_json::Map::new();
1345 event.insert("timestamp".to_string(), json!(timestamp));
1346 event.insert("data".to_string(), event_data);
1347
1348 if let Some(ref ctx) = self.current_context {
1350 if let Some(slot) = ctx.slot {
1351 event.insert("slot".to_string(), json!(slot));
1352 }
1353 if let Some(ref signature) = ctx.signature {
1354 event.insert("signature".to_string(), json!(signature));
1355 }
1356 }
1357
1358 self.registers[*dest] = Value::Object(event);
1359 pc += 1;
1360 }
1361 OpCode::CreateCapture {
1362 dest,
1363 capture_value,
1364 } => {
1365 let timestamp = std::time::SystemTime::now()
1366 .duration_since(std::time::UNIX_EPOCH)
1367 .unwrap()
1368 .as_secs() as i64;
1369
1370 let capture_data = self.registers[*capture_value].clone();
1372
1373 let account_address = event_value
1375 .get("__account_address")
1376 .and_then(|v| v.as_str())
1377 .unwrap_or("")
1378 .to_string();
1379
1380 let mut capture = serde_json::Map::new();
1382 capture.insert("timestamp".to_string(), json!(timestamp));
1383 capture.insert("account_address".to_string(), json!(account_address));
1384 capture.insert("data".to_string(), capture_data);
1385
1386 if let Some(ref ctx) = self.current_context {
1388 if let Some(slot) = ctx.slot {
1389 capture.insert("slot".to_string(), json!(slot));
1390 }
1391 if let Some(ref signature) = ctx.signature {
1392 capture.insert("signature".to_string(), json!(signature));
1393 }
1394 }
1395
1396 self.registers[*dest] = Value::Object(capture);
1397 pc += 1;
1398 }
1399 OpCode::Transform {
1400 source,
1401 dest,
1402 transformation,
1403 } => {
1404 if source == dest {
1405 self.transform_in_place(*source, transformation)?;
1406 } else {
1407 let source_value = &self.registers[*source];
1408 let value = self.apply_transformation(source_value, transformation)?;
1409 self.registers[*dest] = value;
1410 }
1411 pc += 1;
1412 }
1413 OpCode::EmitMutation {
1414 entity_name,
1415 key,
1416 state,
1417 } => {
1418 let primary_key = self.registers[*key].clone();
1419
1420 if primary_key.is_null() || dirty_tracker.is_empty() {
1421 let reason = if dirty_tracker.is_empty() {
1422 "no_fields_modified"
1423 } else {
1424 "null_primary_key"
1425 };
1426 self.add_warning(format!(
1427 "Skipping mutation for entity '{}': {} (dirty_fields={})",
1428 entity_name,
1429 reason,
1430 dirty_tracker.len()
1431 ));
1432 } else {
1433 let patch =
1434 self.extract_partial_state_with_tracker(*state, &dirty_tracker)?;
1435 let append = dirty_tracker.appended_paths();
1436 let mutation = Mutation {
1437 export: entity_name.clone(),
1438 key: primary_key,
1439 patch,
1440 append,
1441 };
1442 output.push(mutation);
1443 }
1444 pc += 1;
1445 }
1446 OpCode::SetFieldIfNull {
1447 object,
1448 path,
1449 value,
1450 } => {
1451 let was_set = self.set_field_if_null(*object, path, *value)?;
1452 if was_set {
1453 dirty_tracker.mark_replaced(path);
1454 }
1455 pc += 1;
1456 }
1457 OpCode::SetFieldMax {
1458 object,
1459 path,
1460 value,
1461 } => {
1462 let was_updated = self.set_field_max(*object, path, *value)?;
1463 if was_updated {
1464 dirty_tracker.mark_replaced(path);
1465 }
1466 pc += 1;
1467 }
1468 OpCode::UpdateTemporalIndex {
1469 state_id: _,
1470 index_name,
1471 lookup_value,
1472 primary_key,
1473 timestamp,
1474 } => {
1475 let actual_state_id = override_state_id;
1476 let state = self
1477 .states
1478 .get_mut(&actual_state_id)
1479 .ok_or("State table not found")?;
1480 let index = state
1481 .temporal_indexes
1482 .entry(index_name.clone())
1483 .or_insert_with(TemporalIndex::new);
1484
1485 let lookup_val = self.registers[*lookup_value].clone();
1486 let pk_val = self.registers[*primary_key].clone();
1487 let ts_val = if let Some(val) = self.registers[*timestamp].as_i64() {
1488 val
1489 } else if let Some(val) = self.registers[*timestamp].as_u64() {
1490 val as i64
1491 } else {
1492 return Err(format!(
1493 "Timestamp must be a number (i64 or u64), got: {:?}",
1494 self.registers[*timestamp]
1495 )
1496 .into());
1497 };
1498
1499 index.insert(lookup_val, pk_val, ts_val);
1500 pc += 1;
1501 }
1502 OpCode::LookupTemporalIndex {
1503 state_id: _,
1504 index_name,
1505 lookup_value,
1506 timestamp,
1507 dest,
1508 } => {
1509 let actual_state_id = override_state_id;
1510 let state = self
1511 .states
1512 .get(&actual_state_id)
1513 .ok_or("State table not found")?;
1514 let lookup_val = &self.registers[*lookup_value];
1515
1516 let result = if self.registers[*timestamp].is_null() {
1517 if let Some(index) = state.temporal_indexes.get(index_name) {
1518 index.lookup_latest(lookup_val).unwrap_or(Value::Null)
1519 } else {
1520 Value::Null
1521 }
1522 } else {
1523 let ts_val = if let Some(val) = self.registers[*timestamp].as_i64() {
1524 val
1525 } else if let Some(val) = self.registers[*timestamp].as_u64() {
1526 val as i64
1527 } else {
1528 return Err(format!(
1529 "Timestamp must be a number (i64 or u64), got: {:?}",
1530 self.registers[*timestamp]
1531 )
1532 .into());
1533 };
1534
1535 if let Some(index) = state.temporal_indexes.get(index_name) {
1536 index.lookup(lookup_val, ts_val).unwrap_or(Value::Null)
1537 } else {
1538 Value::Null
1539 }
1540 };
1541
1542 self.registers[*dest] = result;
1543 pc += 1;
1544 }
1545 OpCode::UpdateLookupIndex {
1546 state_id: _,
1547 index_name,
1548 lookup_value,
1549 primary_key,
1550 } => {
1551 let actual_state_id = override_state_id;
1552 let state = self
1553 .states
1554 .get_mut(&actual_state_id)
1555 .ok_or("State table not found")?;
1556 let index = state
1557 .lookup_indexes
1558 .entry(index_name.clone())
1559 .or_insert_with(LookupIndex::new);
1560
1561 let lookup_val = self.registers[*lookup_value].clone();
1562 let pk_val = self.registers[*primary_key].clone();
1563
1564 index.insert(lookup_val, pk_val);
1565 pc += 1;
1566 }
1567 OpCode::LookupIndex {
1568 state_id: _,
1569 index_name,
1570 lookup_value,
1571 dest,
1572 } => {
1573 let actual_state_id = override_state_id;
1574 let lookup_val = self.registers[*lookup_value].clone();
1575
1576 let result = {
1577 let state = self
1578 .states
1579 .get(&actual_state_id)
1580 .ok_or("State table not found")?;
1581
1582 if let Some(index) = state.lookup_indexes.get(index_name) {
1583 let found = index.lookup(&lookup_val).unwrap_or(Value::Null);
1584 #[cfg(feature = "otel")]
1585 if found.is_null() {
1586 crate::vm_metrics::record_lookup_index_miss(index_name);
1587 } else {
1588 crate::vm_metrics::record_lookup_index_hit(index_name);
1589 }
1590 found
1591 } else {
1592 Value::Null
1593 }
1594 };
1595
1596 let final_result = if result.is_null() {
1597 if let Some(pda_str) = lookup_val.as_str() {
1598 let state = self
1599 .states
1600 .get_mut(&actual_state_id)
1601 .ok_or("State table not found")?;
1602
1603 if let Some(pda_lookup) =
1604 state.pda_reverse_lookups.get_mut("default_pda_lookup")
1605 {
1606 if let Some(resolved) = pda_lookup.lookup(pda_str) {
1607 Value::String(resolved)
1608 } else {
1609 self.last_pda_lookup_miss = Some(pda_str.to_string());
1610 Value::Null
1611 }
1612 } else {
1613 self.last_pda_lookup_miss = Some(pda_str.to_string());
1614 Value::Null
1615 }
1616 } else {
1617 Value::Null
1618 }
1619 } else {
1620 result
1621 };
1622
1623 self.registers[*dest] = final_result;
1624 pc += 1;
1625 }
1626 OpCode::SetFieldSum {
1627 object,
1628 path,
1629 value,
1630 } => {
1631 let was_updated = self.set_field_sum(*object, path, *value)?;
1632 if was_updated {
1633 dirty_tracker.mark_replaced(path);
1634 }
1635 pc += 1;
1636 }
1637 OpCode::SetFieldIncrement { object, path } => {
1638 let was_updated = self.set_field_increment(*object, path)?;
1639 if was_updated {
1640 dirty_tracker.mark_replaced(path);
1641 }
1642 pc += 1;
1643 }
1644 OpCode::SetFieldMin {
1645 object,
1646 path,
1647 value,
1648 } => {
1649 let was_updated = self.set_field_min(*object, path, *value)?;
1650 if was_updated {
1651 dirty_tracker.mark_replaced(path);
1652 }
1653 pc += 1;
1654 }
1655 OpCode::AddToUniqueSet {
1656 state_id: _,
1657 set_name,
1658 value,
1659 count_object,
1660 count_path,
1661 } => {
1662 let value_to_add = self.registers[*value].clone();
1663
1664 let set_field_path = format!("__unique_set:{}", set_name);
1667
1668 let mut set: HashSet<Value> =
1670 if let Ok(existing) = self.get_field(*count_object, &set_field_path) {
1671 if !existing.is_null() {
1672 serde_json::from_value(existing).unwrap_or_default()
1673 } else {
1674 HashSet::new()
1675 }
1676 } else {
1677 HashSet::new()
1678 };
1679
1680 let was_new = set.insert(value_to_add);
1682
1683 let set_as_vec: Vec<Value> = set.iter().cloned().collect();
1685 self.registers[100] = serde_json::to_value(set_as_vec)?;
1686 self.set_field_auto_vivify(*count_object, &set_field_path, 100)?;
1687
1688 if was_new {
1690 self.registers[100] = Value::Number(serde_json::Number::from(set.len()));
1691 self.set_field_auto_vivify(*count_object, count_path, 100)?;
1692 dirty_tracker.mark_replaced(count_path);
1693 }
1694
1695 pc += 1;
1696 }
1697 OpCode::ConditionalSetField {
1698 object,
1699 path,
1700 value,
1701 condition_field,
1702 condition_op,
1703 condition_value,
1704 } => {
1705 let field_value = self.load_field(event_value, condition_field, None)?;
1706 let condition_met =
1707 self.evaluate_comparison(&field_value, condition_op, condition_value)?;
1708
1709 if condition_met {
1710 self.set_field_auto_vivify(*object, path, *value)?;
1711 dirty_tracker.mark_replaced(path);
1712 }
1713 pc += 1;
1714 }
1715 OpCode::ConditionalIncrement {
1716 object,
1717 path,
1718 condition_field,
1719 condition_op,
1720 condition_value,
1721 } => {
1722 let field_value = self.load_field(event_value, condition_field, None)?;
1723 let condition_met =
1724 self.evaluate_comparison(&field_value, condition_op, condition_value)?;
1725
1726 if condition_met {
1727 let was_updated = self.set_field_increment(*object, path)?;
1728 if was_updated {
1729 dirty_tracker.mark_replaced(path);
1730 }
1731 }
1732 pc += 1;
1733 }
1734 OpCode::EvaluateComputedFields {
1735 state,
1736 computed_paths,
1737 } => {
1738 if let Some(evaluator) = entity_evaluator {
1739 let old_values: Vec<_> = computed_paths
1740 .iter()
1741 .map(|path| Self::get_value_at_path(&self.registers[*state], path))
1742 .collect();
1743
1744 let state_value = &mut self.registers[*state];
1745 if evaluator(state_value).is_ok() {
1746 for (path, old_value) in computed_paths.iter().zip(old_values.iter()) {
1747 let new_value =
1748 Self::get_value_at_path(&self.registers[*state], path);
1749 if new_value != *old_value {
1750 dirty_tracker.mark_replaced(path);
1751 }
1752 }
1753 }
1754 }
1755 pc += 1;
1756 }
1757 OpCode::UpdatePdaReverseLookup {
1758 state_id: _,
1759 lookup_name,
1760 pda_address,
1761 primary_key,
1762 } => {
1763 let actual_state_id = override_state_id;
1764 let state = self
1765 .states
1766 .get_mut(&actual_state_id)
1767 .ok_or("State table not found")?;
1768
1769 let pda_val = self.registers[*pda_address].clone();
1770 let pk_val = self.registers[*primary_key].clone();
1771
1772 if let (Some(pda_str), Some(pk_str)) = (pda_val.as_str(), pk_val.as_str()) {
1773 let pda_lookup = state
1774 .pda_reverse_lookups
1775 .entry(lookup_name.clone())
1776 .or_insert_with(|| {
1777 PdaReverseLookup::new(DEFAULT_MAX_PDA_REVERSE_LOOKUP_ENTRIES)
1778 });
1779
1780 pda_lookup.insert(pda_str.to_string(), pk_str.to_string());
1781 self.last_pda_registered = Some(pda_str.to_string());
1782 } else if !pk_val.is_null() {
1783 if let Some(pk_num) = pk_val.as_u64() {
1784 if let Some(pda_str) = pda_val.as_str() {
1785 let pda_lookup = state
1786 .pda_reverse_lookups
1787 .entry(lookup_name.clone())
1788 .or_insert_with(|| {
1789 PdaReverseLookup::new(
1790 DEFAULT_MAX_PDA_REVERSE_LOOKUP_ENTRIES,
1791 )
1792 });
1793
1794 pda_lookup.insert(pda_str.to_string(), pk_num.to_string());
1795 self.last_pda_registered = Some(pda_str.to_string());
1796 }
1797 }
1798 }
1799
1800 pc += 1;
1801 }
1802 }
1803
1804 self.instructions_executed += 1;
1805 }
1806
1807 Ok(output)
1808 }
1809
1810 fn load_field(
1811 &self,
1812 event_value: &Value,
1813 path: &FieldPath,
1814 default: Option<&Value>,
1815 ) -> Result<Value> {
1816 if path.segments.is_empty() {
1817 if let Some(obj) = event_value.as_object() {
1818 let filtered: serde_json::Map<String, Value> = obj
1819 .iter()
1820 .filter(|(k, _)| !k.starts_with("__"))
1821 .map(|(k, v)| (k.clone(), v.clone()))
1822 .collect();
1823 return Ok(Value::Object(filtered));
1824 }
1825 return Ok(event_value.clone());
1826 }
1827
1828 let mut current = event_value;
1829 for segment in path.segments.iter() {
1830 current = match current.get(segment) {
1831 Some(v) => v,
1832 None => return Ok(default.cloned().unwrap_or(Value::Null)),
1833 };
1834 }
1835
1836 Ok(current.clone())
1837 }
1838
1839 fn get_value_at_path(value: &Value, path: &str) -> Option<Value> {
1840 let mut current = value;
1841 for segment in path.split('.') {
1842 current = current.get(segment)?;
1843 }
1844 Some(current.clone())
1845 }
1846
1847 fn set_field_auto_vivify(
1848 &mut self,
1849 object_reg: Register,
1850 path: &str,
1851 value_reg: Register,
1852 ) -> Result<()> {
1853 let compiled = self.get_compiled_path(path);
1854 let segments = compiled.segments();
1855 let value = self.registers[value_reg].clone();
1856
1857 if !self.registers[object_reg].is_object() {
1858 self.registers[object_reg] = json!({});
1859 }
1860
1861 let obj = self.registers[object_reg]
1862 .as_object_mut()
1863 .ok_or("Not an object")?;
1864
1865 let mut current = obj;
1866 for (i, segment) in segments.iter().enumerate() {
1867 if i == segments.len() - 1 {
1868 current.insert(segment.to_string(), value);
1869 return Ok(());
1870 } else {
1871 current
1872 .entry(segment.to_string())
1873 .or_insert_with(|| json!({}));
1874 current = current
1875 .get_mut(segment)
1876 .and_then(|v| v.as_object_mut())
1877 .ok_or("Path collision: expected object")?;
1878 }
1879 }
1880
1881 Ok(())
1882 }
1883
1884 fn set_field_if_null(
1885 &mut self,
1886 object_reg: Register,
1887 path: &str,
1888 value_reg: Register,
1889 ) -> Result<bool> {
1890 let compiled = self.get_compiled_path(path);
1891 let segments = compiled.segments();
1892 let value = self.registers[value_reg].clone();
1893
1894 if !self.registers[object_reg].is_object() {
1895 self.registers[object_reg] = json!({});
1896 }
1897
1898 let obj = self.registers[object_reg]
1899 .as_object_mut()
1900 .ok_or("Not an object")?;
1901
1902 let mut current = obj;
1903 for (i, segment) in segments.iter().enumerate() {
1904 if i == segments.len() - 1 {
1905 if !current.contains_key(segment) || current.get(segment).unwrap().is_null() {
1906 current.insert(segment.to_string(), value);
1907 return Ok(true);
1908 }
1909 return Ok(false);
1910 } else {
1911 current
1912 .entry(segment.to_string())
1913 .or_insert_with(|| json!({}));
1914 current = current
1915 .get_mut(segment)
1916 .and_then(|v| v.as_object_mut())
1917 .ok_or("Path collision: expected object")?;
1918 }
1919 }
1920
1921 Ok(false)
1922 }
1923
1924 fn set_field_max(
1925 &mut self,
1926 object_reg: Register,
1927 path: &str,
1928 value_reg: Register,
1929 ) -> Result<bool> {
1930 let compiled = self.get_compiled_path(path);
1931 let segments = compiled.segments();
1932 let new_value = self.registers[value_reg].clone();
1933
1934 if !self.registers[object_reg].is_object() {
1935 self.registers[object_reg] = json!({});
1936 }
1937
1938 let obj = self.registers[object_reg]
1939 .as_object_mut()
1940 .ok_or("Not an object")?;
1941
1942 let mut current = obj;
1943 for (i, segment) in segments.iter().enumerate() {
1944 if i == segments.len() - 1 {
1945 let should_update = if let Some(current_value) = current.get(segment) {
1946 if current_value.is_null() {
1947 true
1948 } else {
1949 match (current_value.as_i64(), new_value.as_i64()) {
1950 (Some(current_val), Some(new_val)) => new_val > current_val,
1951 (Some(current_val), None) if new_value.as_u64().is_some() => {
1952 new_value.as_u64().unwrap() as i64 > current_val
1953 }
1954 (None, Some(new_val)) if current_value.as_u64().is_some() => {
1955 new_val > current_value.as_u64().unwrap() as i64
1956 }
1957 (None, None) => match (current_value.as_u64(), new_value.as_u64()) {
1958 (Some(current_val), Some(new_val)) => new_val > current_val,
1959 _ => match (current_value.as_f64(), new_value.as_f64()) {
1960 (Some(current_val), Some(new_val)) => new_val > current_val,
1961 _ => false,
1962 },
1963 },
1964 _ => false,
1965 }
1966 }
1967 } else {
1968 true
1969 };
1970
1971 if should_update {
1972 current.insert(segment.to_string(), new_value);
1973 return Ok(true);
1974 }
1975 return Ok(false);
1976 } else {
1977 current
1978 .entry(segment.to_string())
1979 .or_insert_with(|| json!({}));
1980 current = current
1981 .get_mut(segment)
1982 .and_then(|v| v.as_object_mut())
1983 .ok_or("Path collision: expected object")?;
1984 }
1985 }
1986
1987 Ok(false)
1988 }
1989
1990 fn set_field_sum(
1991 &mut self,
1992 object_reg: Register,
1993 path: &str,
1994 value_reg: Register,
1995 ) -> Result<bool> {
1996 let compiled = self.get_compiled_path(path);
1997 let segments = compiled.segments();
1998 let new_value = &self.registers[value_reg];
1999
2000 let new_val_num = new_value
2002 .as_i64()
2003 .or_else(|| new_value.as_u64().map(|n| n as i64))
2004 .ok_or("Sum requires numeric value")?;
2005
2006 if !self.registers[object_reg].is_object() {
2007 self.registers[object_reg] = json!({});
2008 }
2009
2010 let obj = self.registers[object_reg]
2011 .as_object_mut()
2012 .ok_or("Not an object")?;
2013
2014 let mut current = obj;
2015 for (i, segment) in segments.iter().enumerate() {
2016 if i == segments.len() - 1 {
2017 let current_val = current
2018 .get(segment)
2019 .and_then(|v| {
2020 if v.is_null() {
2021 None
2022 } else {
2023 v.as_i64().or_else(|| v.as_u64().map(|n| n as i64))
2024 }
2025 })
2026 .unwrap_or(0);
2027
2028 let sum = current_val + new_val_num;
2029 current.insert(segment.to_string(), json!(sum));
2030 return Ok(true);
2031 } else {
2032 current
2033 .entry(segment.to_string())
2034 .or_insert_with(|| json!({}));
2035 current = current
2036 .get_mut(segment)
2037 .and_then(|v| v.as_object_mut())
2038 .ok_or("Path collision: expected object")?;
2039 }
2040 }
2041
2042 Ok(false)
2043 }
2044
2045 fn set_field_increment(&mut self, object_reg: Register, path: &str) -> Result<bool> {
2046 let compiled = self.get_compiled_path(path);
2047 let segments = compiled.segments();
2048
2049 if !self.registers[object_reg].is_object() {
2050 self.registers[object_reg] = json!({});
2051 }
2052
2053 let obj = self.registers[object_reg]
2054 .as_object_mut()
2055 .ok_or("Not an object")?;
2056
2057 let mut current = obj;
2058 for (i, segment) in segments.iter().enumerate() {
2059 if i == segments.len() - 1 {
2060 let current_val = current
2062 .get(segment)
2063 .and_then(|v| {
2064 if v.is_null() {
2065 None
2066 } else {
2067 v.as_i64().or_else(|| v.as_u64().map(|n| n as i64))
2068 }
2069 })
2070 .unwrap_or(0);
2071
2072 let incremented = current_val + 1;
2073 current.insert(segment.to_string(), json!(incremented));
2074 return Ok(true);
2075 } else {
2076 current
2077 .entry(segment.to_string())
2078 .or_insert_with(|| json!({}));
2079 current = current
2080 .get_mut(segment)
2081 .and_then(|v| v.as_object_mut())
2082 .ok_or("Path collision: expected object")?;
2083 }
2084 }
2085
2086 Ok(false)
2087 }
2088
2089 fn set_field_min(
2090 &mut self,
2091 object_reg: Register,
2092 path: &str,
2093 value_reg: Register,
2094 ) -> Result<bool> {
2095 let compiled = self.get_compiled_path(path);
2096 let segments = compiled.segments();
2097 let new_value = self.registers[value_reg].clone();
2098
2099 if !self.registers[object_reg].is_object() {
2100 self.registers[object_reg] = json!({});
2101 }
2102
2103 let obj = self.registers[object_reg]
2104 .as_object_mut()
2105 .ok_or("Not an object")?;
2106
2107 let mut current = obj;
2108 for (i, segment) in segments.iter().enumerate() {
2109 if i == segments.len() - 1 {
2110 let should_update = if let Some(current_value) = current.get(segment) {
2111 if current_value.is_null() {
2112 true
2113 } else {
2114 match (current_value.as_i64(), new_value.as_i64()) {
2115 (Some(current_val), Some(new_val)) => new_val < current_val,
2116 (Some(current_val), None) if new_value.as_u64().is_some() => {
2117 (new_value.as_u64().unwrap() as i64) < current_val
2118 }
2119 (None, Some(new_val)) if current_value.as_u64().is_some() => {
2120 new_val < current_value.as_u64().unwrap() as i64
2121 }
2122 (None, None) => match (current_value.as_u64(), new_value.as_u64()) {
2123 (Some(current_val), Some(new_val)) => new_val < current_val,
2124 _ => match (current_value.as_f64(), new_value.as_f64()) {
2125 (Some(current_val), Some(new_val)) => new_val < current_val,
2126 _ => false,
2127 },
2128 },
2129 _ => false,
2130 }
2131 }
2132 } else {
2133 true
2134 };
2135
2136 if should_update {
2137 current.insert(segment.to_string(), new_value);
2138 return Ok(true);
2139 }
2140 return Ok(false);
2141 } else {
2142 current
2143 .entry(segment.to_string())
2144 .or_insert_with(|| json!({}));
2145 current = current
2146 .get_mut(segment)
2147 .and_then(|v| v.as_object_mut())
2148 .ok_or("Path collision: expected object")?;
2149 }
2150 }
2151
2152 Ok(false)
2153 }
2154
2155 fn get_field(&mut self, object_reg: Register, path: &str) -> Result<Value> {
2156 let compiled = self.get_compiled_path(path);
2157 let segments = compiled.segments();
2158 let mut current = &self.registers[object_reg];
2159
2160 for segment in segments {
2161 current = current
2162 .get(segment)
2163 .ok_or_else(|| format!("Field not found: {}", segment))?;
2164 }
2165
2166 Ok(current.clone())
2167 }
2168
2169 fn append_to_array(
2170 &mut self,
2171 object_reg: Register,
2172 path: &str,
2173 value_reg: Register,
2174 max_length: usize,
2175 ) -> Result<()> {
2176 let compiled = self.get_compiled_path(path);
2177 let segments = compiled.segments();
2178 let value = self.registers[value_reg].clone();
2179
2180 if !self.registers[object_reg].is_object() {
2181 self.registers[object_reg] = json!({});
2182 }
2183
2184 let obj = self.registers[object_reg]
2185 .as_object_mut()
2186 .ok_or("Not an object")?;
2187
2188 let mut current = obj;
2189 for (i, segment) in segments.iter().enumerate() {
2190 if i == segments.len() - 1 {
2191 current
2192 .entry(segment.to_string())
2193 .or_insert_with(|| json!([]));
2194 let arr = current
2195 .get_mut(segment)
2196 .and_then(|v| v.as_array_mut())
2197 .ok_or("Path is not an array")?;
2198 arr.push(value.clone());
2199
2200 if arr.len() > max_length {
2201 let excess = arr.len() - max_length;
2202 arr.drain(0..excess);
2203 }
2204 } else {
2205 current
2206 .entry(segment.to_string())
2207 .or_insert_with(|| json!({}));
2208 current = current
2209 .get_mut(segment)
2210 .and_then(|v| v.as_object_mut())
2211 .ok_or("Path collision: expected object")?;
2212 }
2213 }
2214
2215 Ok(())
2216 }
2217
2218 fn transform_in_place(&mut self, reg: Register, transformation: &Transformation) -> Result<()> {
2219 let value = &self.registers[reg];
2220 let transformed = self.apply_transformation(value, transformation)?;
2221 self.registers[reg] = transformed;
2222 Ok(())
2223 }
2224
2225 fn apply_transformation(
2226 &self,
2227 value: &Value,
2228 transformation: &Transformation,
2229 ) -> Result<Value> {
2230 match transformation {
2231 Transformation::HexEncode => {
2232 if let Some(arr) = value.as_array() {
2233 let bytes: Vec<u8> = arr
2234 .iter()
2235 .filter_map(|v| v.as_u64().map(|n| n as u8))
2236 .collect();
2237 let hex = hex::encode(&bytes);
2238 Ok(json!(hex))
2239 } else {
2240 Err("HexEncode requires an array of numbers".into())
2241 }
2242 }
2243 Transformation::HexDecode => {
2244 if let Some(s) = value.as_str() {
2245 let s = s.strip_prefix("0x").unwrap_or(s);
2246 let bytes = hex::decode(s).map_err(|e| format!("Hex decode error: {}", e))?;
2247 Ok(json!(bytes))
2248 } else {
2249 Err("HexDecode requires a string".into())
2250 }
2251 }
2252 Transformation::Base58Encode => {
2253 if let Some(arr) = value.as_array() {
2254 let bytes: Vec<u8> = arr
2255 .iter()
2256 .filter_map(|v| v.as_u64().map(|n| n as u8))
2257 .collect();
2258 let encoded = bs58::encode(&bytes).into_string();
2259 Ok(json!(encoded))
2260 } else if value.is_string() {
2261 Ok(value.clone())
2262 } else {
2263 Err("Base58Encode requires an array of numbers".into())
2264 }
2265 }
2266 Transformation::Base58Decode => {
2267 if let Some(s) = value.as_str() {
2268 let bytes = bs58::decode(s)
2269 .into_vec()
2270 .map_err(|e| format!("Base58 decode error: {}", e))?;
2271 Ok(json!(bytes))
2272 } else {
2273 Err("Base58Decode requires a string".into())
2274 }
2275 }
2276 Transformation::ToString => Ok(json!(value.to_string())),
2277 Transformation::ToNumber => {
2278 if let Some(s) = value.as_str() {
2279 let n = s
2280 .parse::<i64>()
2281 .map_err(|e| format!("Parse error: {}", e))?;
2282 Ok(json!(n))
2283 } else {
2284 Ok(value.clone())
2285 }
2286 }
2287 }
2288 }
2289
2290 fn evaluate_comparison(
2291 &self,
2292 field_value: &Value,
2293 op: &ComparisonOp,
2294 condition_value: &Value,
2295 ) -> Result<bool> {
2296 use ComparisonOp::*;
2297
2298 match op {
2299 Equal => Ok(field_value == condition_value),
2300 NotEqual => Ok(field_value != condition_value),
2301 GreaterThan => {
2302 match (field_value.as_i64(), condition_value.as_i64()) {
2304 (Some(a), Some(b)) => Ok(a > b),
2305 _ => match (field_value.as_u64(), condition_value.as_u64()) {
2306 (Some(a), Some(b)) => Ok(a > b),
2307 _ => match (field_value.as_f64(), condition_value.as_f64()) {
2308 (Some(a), Some(b)) => Ok(a > b),
2309 _ => Err("Cannot compare non-numeric values with GreaterThan".into()),
2310 },
2311 },
2312 }
2313 }
2314 GreaterThanOrEqual => match (field_value.as_i64(), condition_value.as_i64()) {
2315 (Some(a), Some(b)) => Ok(a >= b),
2316 _ => match (field_value.as_u64(), condition_value.as_u64()) {
2317 (Some(a), Some(b)) => Ok(a >= b),
2318 _ => match (field_value.as_f64(), condition_value.as_f64()) {
2319 (Some(a), Some(b)) => Ok(a >= b),
2320 _ => {
2321 Err("Cannot compare non-numeric values with GreaterThanOrEqual".into())
2322 }
2323 },
2324 },
2325 },
2326 LessThan => match (field_value.as_i64(), condition_value.as_i64()) {
2327 (Some(a), Some(b)) => Ok(a < b),
2328 _ => match (field_value.as_u64(), condition_value.as_u64()) {
2329 (Some(a), Some(b)) => Ok(a < b),
2330 _ => match (field_value.as_f64(), condition_value.as_f64()) {
2331 (Some(a), Some(b)) => Ok(a < b),
2332 _ => Err("Cannot compare non-numeric values with LessThan".into()),
2333 },
2334 },
2335 },
2336 LessThanOrEqual => match (field_value.as_i64(), condition_value.as_i64()) {
2337 (Some(a), Some(b)) => Ok(a <= b),
2338 _ => match (field_value.as_u64(), condition_value.as_u64()) {
2339 (Some(a), Some(b)) => Ok(a <= b),
2340 _ => match (field_value.as_f64(), condition_value.as_f64()) {
2341 (Some(a), Some(b)) => Ok(a <= b),
2342 _ => Err("Cannot compare non-numeric values with LessThanOrEqual".into()),
2343 },
2344 },
2345 },
2346 }
2347 }
2348
2349 #[cfg_attr(feature = "otel", instrument(
2358 name = "vm.update_pda_lookup",
2359 skip(self),
2360 fields(
2361 pda = %pda_address,
2362 seed = %seed_value,
2363 )
2364 ))]
2365 pub fn update_pda_reverse_lookup(
2366 &mut self,
2367 state_id: u32,
2368 lookup_name: &str,
2369 pda_address: String,
2370 seed_value: String,
2371 ) -> Result<Vec<PendingAccountUpdate>> {
2372 let state = self
2373 .states
2374 .get_mut(&state_id)
2375 .ok_or("State table not found")?;
2376
2377 let lookup = state
2378 .pda_reverse_lookups
2379 .entry(lookup_name.to_string())
2380 .or_insert_with(|| PdaReverseLookup::new(DEFAULT_MAX_PDA_REVERSE_LOOKUP_ENTRIES));
2381
2382 let evicted_pda = lookup.insert(pda_address.clone(), seed_value);
2383
2384 if let Some(ref evicted) = evicted_pda {
2385 if let Some((_, evicted_updates)) = state.pending_updates.remove(evicted) {
2386 let count = evicted_updates.len();
2387 self.pending_queue_size = self.pending_queue_size.saturating_sub(count as u64);
2388 }
2389 }
2390
2391 self.flush_pending_updates(state_id, &pda_address)
2393 }
2394
2395 pub fn cleanup_expired_pending_updates(&mut self, state_id: u32) -> usize {
2400 let state = match self.states.get_mut(&state_id) {
2401 Some(s) => s,
2402 None => return 0,
2403 };
2404
2405 let now = std::time::SystemTime::now()
2406 .duration_since(std::time::UNIX_EPOCH)
2407 .unwrap()
2408 .as_secs() as i64;
2409
2410 let mut removed_count = 0;
2411
2412 state.pending_updates.retain(|_pda_address, updates| {
2414 let original_len = updates.len();
2415
2416 updates.retain(|update| {
2417 let age = now - update.queued_at;
2418 age <= PENDING_UPDATE_TTL_SECONDS
2419 });
2420
2421 removed_count += original_len - updates.len();
2422
2423 !updates.is_empty()
2425 });
2426
2427 self.pending_queue_size = self.pending_queue_size.saturating_sub(removed_count as u64);
2429
2430 if removed_count > 0 {
2431 #[cfg(feature = "otel")]
2432 crate::vm_metrics::record_pending_updates_expired(
2433 removed_count as u64,
2434 &state.entity_name,
2435 );
2436 }
2437
2438 removed_count
2439 }
2440
2441 #[cfg_attr(feature = "otel", instrument(
2473 name = "vm.queue_account_update",
2474 skip(self, update),
2475 fields(
2476 pda = %update.pda_address,
2477 account_type = %update.account_type,
2478 slot = update.slot,
2479 )
2480 ))]
2481 pub fn queue_account_update(
2482 &mut self,
2483 state_id: u32,
2484 update: QueuedAccountUpdate,
2485 ) -> Result<()> {
2486 if self.pending_queue_size >= MAX_PENDING_UPDATES_TOTAL as u64 {
2487 self.cleanup_expired_pending_updates(state_id);
2488 if self.pending_queue_size >= MAX_PENDING_UPDATES_TOTAL as u64 {
2489 self.drop_oldest_pending_update(state_id)?;
2490 }
2491 }
2492
2493 let state = self
2494 .states
2495 .get_mut(&state_id)
2496 .ok_or("State table not found")?;
2497
2498 let pending = PendingAccountUpdate {
2499 account_type: update.account_type,
2500 pda_address: update.pda_address.clone(),
2501 account_data: update.account_data,
2502 slot: update.slot,
2503 write_version: update.write_version,
2504 signature: update.signature,
2505 queued_at: std::time::SystemTime::now()
2506 .duration_since(std::time::UNIX_EPOCH)
2507 .unwrap()
2508 .as_secs() as i64,
2509 };
2510
2511 let pda_address = pending.pda_address.clone();
2512 let slot = pending.slot;
2513
2514 let mut updates = state
2515 .pending_updates
2516 .entry(pda_address.clone())
2517 .or_insert_with(Vec::new);
2518
2519 let original_len = updates.len();
2520 updates.retain(|existing| existing.slot > slot);
2521 let removed_by_dedup = original_len - updates.len();
2522
2523 if removed_by_dedup > 0 {
2524 self.pending_queue_size = self
2525 .pending_queue_size
2526 .saturating_sub(removed_by_dedup as u64);
2527 }
2528
2529 if updates.len() >= MAX_PENDING_UPDATES_PER_PDA {
2530 updates.remove(0);
2531 self.pending_queue_size = self.pending_queue_size.saturating_sub(1);
2532 }
2533
2534 updates.push(pending);
2535 #[cfg(feature = "otel")]
2536 crate::vm_metrics::record_pending_update_queued(&state.entity_name);
2537
2538 Ok(())
2539 }
2540
2541 pub fn queue_instruction_event(
2542 &mut self,
2543 state_id: u32,
2544 event: QueuedInstructionEvent,
2545 ) -> Result<()> {
2546 let state = self
2547 .states
2548 .get_mut(&state_id)
2549 .ok_or("State table not found")?;
2550
2551 let pda_address = event.pda_address.clone();
2552
2553 let pending = PendingInstructionEvent {
2554 event_type: event.event_type,
2555 pda_address: event.pda_address,
2556 event_data: event.event_data,
2557 slot: event.slot,
2558 signature: event.signature,
2559 queued_at: std::time::SystemTime::now()
2560 .duration_since(std::time::UNIX_EPOCH)
2561 .unwrap()
2562 .as_secs() as i64,
2563 };
2564
2565 let mut events = state
2566 .pending_instruction_events
2567 .entry(pda_address)
2568 .or_insert_with(Vec::new);
2569
2570 if events.len() >= MAX_PENDING_UPDATES_PER_PDA {
2571 events.remove(0);
2572 }
2573
2574 events.push(pending);
2575
2576 Ok(())
2577 }
2578
2579 pub fn take_last_pda_lookup_miss(&mut self) -> Option<String> {
2580 self.last_pda_lookup_miss.take()
2581 }
2582
2583 pub fn take_last_pda_registered(&mut self) -> Option<String> {
2584 self.last_pda_registered.take()
2585 }
2586
2587 pub fn flush_pending_instruction_events(
2588 &mut self,
2589 state_id: u32,
2590 pda_address: &str,
2591 ) -> Vec<PendingInstructionEvent> {
2592 let state = match self.states.get_mut(&state_id) {
2593 Some(s) => s,
2594 None => return Vec::new(),
2595 };
2596
2597 if let Some((_, events)) = state.pending_instruction_events.remove(pda_address) {
2598 events
2599 } else {
2600 Vec::new()
2601 }
2602 }
2603
2604 pub fn get_pending_queue_stats(&self, state_id: u32) -> Option<PendingQueueStats> {
2606 let state = self.states.get(&state_id)?;
2607
2608 let now = std::time::SystemTime::now()
2609 .duration_since(std::time::UNIX_EPOCH)
2610 .unwrap()
2611 .as_secs() as i64;
2612
2613 let mut total_updates = 0;
2614 let mut oldest_timestamp = now;
2615 let mut largest_pda_queue = 0;
2616 let mut estimated_memory = 0;
2617
2618 for entry in state.pending_updates.iter() {
2619 let (_, updates) = entry.pair();
2620 total_updates += updates.len();
2621 largest_pda_queue = largest_pda_queue.max(updates.len());
2622
2623 for update in updates.iter() {
2624 oldest_timestamp = oldest_timestamp.min(update.queued_at);
2625 estimated_memory += update.account_type.len() +
2627 update.pda_address.len() +
2628 update.signature.len() +
2629 16 + estimate_json_size(&update.account_data);
2631 }
2632 }
2633
2634 Some(PendingQueueStats {
2635 total_updates,
2636 unique_pdas: state.pending_updates.len(),
2637 oldest_age_seconds: now - oldest_timestamp,
2638 largest_pda_queue_size: largest_pda_queue,
2639 estimated_memory_bytes: estimated_memory,
2640 })
2641 }
2642
2643 pub fn get_memory_stats(&self, state_id: u32) -> VmMemoryStats {
2644 let mut stats = VmMemoryStats {
2645 path_cache_size: self.path_cache.len(),
2646 ..Default::default()
2647 };
2648
2649 if let Some(state) = self.states.get(&state_id) {
2650 stats.state_table_entity_count = state.data.len();
2651 stats.state_table_max_entries = state.config.max_entries;
2652 stats.state_table_at_capacity = state.is_at_capacity();
2653
2654 stats.lookup_index_count = state.lookup_indexes.len();
2655 stats.lookup_index_total_entries =
2656 state.lookup_indexes.values().map(|idx| idx.len()).sum();
2657
2658 stats.temporal_index_count = state.temporal_indexes.len();
2659 stats.temporal_index_total_entries = state
2660 .temporal_indexes
2661 .values()
2662 .map(|idx| idx.total_entries())
2663 .sum();
2664
2665 stats.pda_reverse_lookup_count = state.pda_reverse_lookups.len();
2666 stats.pda_reverse_lookup_total_entries = state
2667 .pda_reverse_lookups
2668 .values()
2669 .map(|lookup| lookup.len())
2670 .sum();
2671
2672 stats.version_tracker_entries = state.version_tracker.len();
2673
2674 stats.pending_queue_stats = self.get_pending_queue_stats(state_id);
2675 }
2676
2677 stats
2678 }
2679
2680 pub fn cleanup_all_expired(&mut self, state_id: u32) -> CleanupResult {
2681 let pending_removed = self.cleanup_expired_pending_updates(state_id);
2682 let temporal_removed = self.cleanup_temporal_indexes(state_id);
2683
2684 #[cfg(feature = "otel")]
2685 if let Some(state) = self.states.get(&state_id) {
2686 crate::vm_metrics::record_cleanup(
2687 pending_removed,
2688 temporal_removed,
2689 &state.entity_name,
2690 );
2691 }
2692
2693 CleanupResult {
2694 pending_updates_removed: pending_removed,
2695 temporal_entries_removed: temporal_removed,
2696 }
2697 }
2698
2699 fn cleanup_temporal_indexes(&mut self, state_id: u32) -> usize {
2700 let state = match self.states.get_mut(&state_id) {
2701 Some(s) => s,
2702 None => return 0,
2703 };
2704
2705 let now = std::time::SystemTime::now()
2706 .duration_since(std::time::UNIX_EPOCH)
2707 .unwrap()
2708 .as_secs() as i64;
2709
2710 let cutoff = now - TEMPORAL_HISTORY_TTL_SECONDS;
2711 let mut total_removed = 0;
2712
2713 for (_, index) in state.temporal_indexes.iter_mut() {
2714 total_removed += index.cleanup_expired(cutoff);
2715 }
2716
2717 total_removed
2718 }
2719
2720 pub fn check_state_table_capacity(&self, state_id: u32) -> Option<CapacityWarning> {
2721 let state = self.states.get(&state_id)?;
2722
2723 if state.is_at_capacity() {
2724 Some(CapacityWarning {
2725 current_entries: state.data.len(),
2726 max_entries: state.config.max_entries,
2727 entries_over_limit: state.entries_over_limit(),
2728 })
2729 } else {
2730 None
2731 }
2732 }
2733
2734 fn drop_oldest_pending_update(&mut self, state_id: u32) -> Result<()> {
2736 let state = self
2737 .states
2738 .get_mut(&state_id)
2739 .ok_or("State table not found")?;
2740
2741 let mut oldest_pda: Option<String> = None;
2742 let mut oldest_timestamp = i64::MAX;
2743
2744 for entry in state.pending_updates.iter() {
2746 let (pda, updates) = entry.pair();
2747 if let Some(update) = updates.first() {
2748 if update.queued_at < oldest_timestamp {
2749 oldest_timestamp = update.queued_at;
2750 oldest_pda = Some(pda.clone());
2751 }
2752 }
2753 }
2754
2755 if let Some(pda) = oldest_pda {
2757 if let Some(mut updates) = state.pending_updates.get_mut(&pda) {
2758 if !updates.is_empty() {
2759 updates.remove(0);
2760 self.pending_queue_size = self.pending_queue_size.saturating_sub(1);
2761
2762 if updates.is_empty() {
2764 drop(updates);
2765 state.pending_updates.remove(&pda);
2766 }
2767 }
2768 }
2769 }
2770
2771 Ok(())
2772 }
2773
2774 fn flush_pending_updates(
2779 &mut self,
2780 state_id: u32,
2781 pda_address: &str,
2782 ) -> Result<Vec<PendingAccountUpdate>> {
2783 let state = self
2784 .states
2785 .get_mut(&state_id)
2786 .ok_or("State table not found")?;
2787
2788 if let Some((_, pending_updates)) = state.pending_updates.remove(pda_address) {
2789 let count = pending_updates.len();
2790 self.pending_queue_size = self.pending_queue_size.saturating_sub(count as u64);
2791 #[cfg(feature = "otel")]
2792 crate::vm_metrics::record_pending_updates_flushed(count as u64, &state.entity_name);
2793 Ok(pending_updates)
2794 } else {
2795 Ok(Vec::new())
2796 }
2797 }
2798
2799 pub fn try_pda_reverse_lookup(
2801 &mut self,
2802 state_id: u32,
2803 lookup_name: &str,
2804 pda_address: &str,
2805 ) -> Option<String> {
2806 let state = self.states.get_mut(&state_id)?;
2807
2808 if let Some(lookup) = state.pda_reverse_lookups.get_mut(lookup_name) {
2809 if let Some(value) = lookup.lookup(pda_address) {
2810 self.pda_cache_hits += 1;
2811 return Some(value);
2812 }
2813 }
2814
2815 self.pda_cache_misses += 1;
2816 None
2817 }
2818
2819 pub fn evaluate_computed_expr(&self, expr: &ComputedExpr, state: &Value) -> Result<Value> {
2826 self.evaluate_computed_expr_with_env(expr, state, &std::collections::HashMap::new())
2827 }
2828
2829 fn evaluate_computed_expr_with_env(
2831 &self,
2832 expr: &ComputedExpr,
2833 state: &Value,
2834 env: &std::collections::HashMap<String, Value>,
2835 ) -> Result<Value> {
2836 match expr {
2837 ComputedExpr::FieldRef { path } => self.get_field_from_state(state, path),
2838
2839 ComputedExpr::Var { name } => env
2840 .get(name)
2841 .cloned()
2842 .ok_or_else(|| format!("Undefined variable: {}", name).into()),
2843
2844 ComputedExpr::Let { name, value, body } => {
2845 let val = self.evaluate_computed_expr_with_env(value, state, env)?;
2846 let mut new_env = env.clone();
2847 new_env.insert(name.clone(), val);
2848 self.evaluate_computed_expr_with_env(body, state, &new_env)
2849 }
2850
2851 ComputedExpr::If {
2852 condition,
2853 then_branch,
2854 else_branch,
2855 } => {
2856 let cond_val = self.evaluate_computed_expr_with_env(condition, state, env)?;
2857 if self.value_to_bool(&cond_val) {
2858 self.evaluate_computed_expr_with_env(then_branch, state, env)
2859 } else {
2860 self.evaluate_computed_expr_with_env(else_branch, state, env)
2861 }
2862 }
2863
2864 ComputedExpr::None => Ok(Value::Null),
2865
2866 ComputedExpr::Some { value } => self.evaluate_computed_expr_with_env(value, state, env),
2867
2868 ComputedExpr::Slice { expr, start, end } => {
2869 let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
2870 match val {
2871 Value::Array(arr) => {
2872 let slice: Vec<Value> = arr.get(*start..*end).unwrap_or(&[]).to_vec();
2873 Ok(Value::Array(slice))
2874 }
2875 _ => Err(format!("Cannot slice non-array value: {:?}", val).into()),
2876 }
2877 }
2878
2879 ComputedExpr::Index { expr, index } => {
2880 let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
2881 match val {
2882 Value::Array(arr) => Ok(arr.get(*index).cloned().unwrap_or(Value::Null)),
2883 _ => Err(format!("Cannot index non-array value: {:?}", val).into()),
2884 }
2885 }
2886
2887 ComputedExpr::U64FromLeBytes { bytes } => {
2888 let val = self.evaluate_computed_expr_with_env(bytes, state, env)?;
2889 let byte_vec = self.value_to_bytes(&val)?;
2890 if byte_vec.len() < 8 {
2891 return Err(format!(
2892 "u64::from_le_bytes requires 8 bytes, got {}",
2893 byte_vec.len()
2894 )
2895 .into());
2896 }
2897 let arr: [u8; 8] = byte_vec[..8]
2898 .try_into()
2899 .map_err(|_| "Failed to convert to [u8; 8]")?;
2900 Ok(json!(u64::from_le_bytes(arr)))
2901 }
2902
2903 ComputedExpr::U64FromBeBytes { bytes } => {
2904 let val = self.evaluate_computed_expr_with_env(bytes, state, env)?;
2905 let byte_vec = self.value_to_bytes(&val)?;
2906 if byte_vec.len() < 8 {
2907 return Err(format!(
2908 "u64::from_be_bytes requires 8 bytes, got {}",
2909 byte_vec.len()
2910 )
2911 .into());
2912 }
2913 let arr: [u8; 8] = byte_vec[..8]
2914 .try_into()
2915 .map_err(|_| "Failed to convert to [u8; 8]")?;
2916 Ok(json!(u64::from_be_bytes(arr)))
2917 }
2918
2919 ComputedExpr::ByteArray { bytes } => {
2920 Ok(Value::Array(bytes.iter().map(|b| json!(*b)).collect()))
2921 }
2922
2923 ComputedExpr::Closure { param, body } => {
2924 Ok(json!({
2927 "__closure": {
2928 "param": param,
2929 "body": serde_json::to_value(body).unwrap_or(Value::Null)
2930 }
2931 }))
2932 }
2933
2934 ComputedExpr::Unary { op, expr } => {
2935 let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
2936 self.apply_unary_op(op, &val)
2937 }
2938
2939 ComputedExpr::JsonToBytes { expr } => {
2940 let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
2941 let bytes = self.value_to_bytes(&val)?;
2943 Ok(Value::Array(bytes.iter().map(|b| json!(*b)).collect()))
2944 }
2945
2946 ComputedExpr::UnwrapOr { expr, default } => {
2947 let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
2948 if val.is_null() {
2949 Ok(default.clone())
2950 } else {
2951 Ok(val)
2952 }
2953 }
2954
2955 ComputedExpr::Binary { op, left, right } => {
2956 let l = self.evaluate_computed_expr_with_env(left, state, env)?;
2957 let r = self.evaluate_computed_expr_with_env(right, state, env)?;
2958 self.apply_binary_op(op, &l, &r)
2959 }
2960
2961 ComputedExpr::Cast { expr, to_type } => {
2962 let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
2963 self.apply_cast(&val, to_type)
2964 }
2965
2966 ComputedExpr::MethodCall { expr, method, args } => {
2967 let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
2968 if method == "map" && args.len() == 1 {
2970 if let ComputedExpr::Closure { param, body } = &args[0] {
2971 if val.is_null() {
2973 return Ok(Value::Null);
2974 }
2975 let mut closure_env = env.clone();
2977 closure_env.insert(param.clone(), val);
2978 return self.evaluate_computed_expr_with_env(body, state, &closure_env);
2979 }
2980 }
2981 let evaluated_args: Vec<Value> = args
2982 .iter()
2983 .map(|a| self.evaluate_computed_expr_with_env(a, state, env))
2984 .collect::<Result<Vec<_>>>()?;
2985 self.apply_method_call(&val, method, &evaluated_args)
2986 }
2987
2988 ComputedExpr::Literal { value } => Ok(value.clone()),
2989
2990 ComputedExpr::Paren { expr } => self.evaluate_computed_expr_with_env(expr, state, env),
2991 }
2992 }
2993
2994 fn value_to_bytes(&self, val: &Value) -> Result<Vec<u8>> {
2996 match val {
2997 Value::Array(arr) => arr
2998 .iter()
2999 .map(|v| {
3000 v.as_u64()
3001 .map(|n| n as u8)
3002 .ok_or_else(|| "Array element not a valid byte".into())
3003 })
3004 .collect(),
3005 Value::String(s) => {
3006 if s.starts_with("0x") || s.starts_with("0X") {
3008 hex::decode(&s[2..]).map_err(|e| format!("Invalid hex string: {}", e).into())
3009 } else {
3010 hex::decode(s).map_err(|e| format!("Invalid hex string: {}", e).into())
3011 }
3012 }
3013 _ => Err(format!("Cannot convert {:?} to bytes", val).into()),
3014 }
3015 }
3016
3017 fn apply_unary_op(&self, op: &crate::ast::UnaryOp, val: &Value) -> Result<Value> {
3019 use crate::ast::UnaryOp;
3020 match op {
3021 UnaryOp::Not => Ok(json!(!self.value_to_bool(val))),
3022 UnaryOp::ReverseBits => match val.as_u64() {
3023 Some(n) => Ok(json!(n.reverse_bits())),
3024 None => match val.as_i64() {
3025 Some(n) => Ok(json!((n as u64).reverse_bits())),
3026 None => Err("reverse_bits requires an integer".into()),
3027 },
3028 },
3029 }
3030 }
3031
3032 fn get_field_from_state(&self, state: &Value, path: &str) -> Result<Value> {
3034 let segments: Vec<&str> = path.split('.').collect();
3035 let mut current = state;
3036
3037 for segment in segments {
3038 match current.get(segment) {
3039 Some(v) => current = v,
3040 None => return Ok(Value::Null),
3041 }
3042 }
3043
3044 Ok(current.clone())
3045 }
3046
3047 fn apply_binary_op(&self, op: &BinaryOp, left: &Value, right: &Value) -> Result<Value> {
3049 match op {
3050 BinaryOp::Add => self.numeric_op(left, right, |a, b| a + b, |a, b| a + b),
3052 BinaryOp::Sub => self.numeric_op(left, right, |a, b| a - b, |a, b| a - b),
3053 BinaryOp::Mul => self.numeric_op(left, right, |a, b| a * b, |a, b| a * b),
3054 BinaryOp::Div => {
3055 if let Some(r) = right.as_i64() {
3057 if r == 0 {
3058 return Err("Division by zero".into());
3059 }
3060 }
3061 if let Some(r) = right.as_f64() {
3062 if r == 0.0 {
3063 return Err("Division by zero".into());
3064 }
3065 }
3066 self.numeric_op(left, right, |a, b| a / b, |a, b| a / b)
3067 }
3068 BinaryOp::Mod => {
3069 match (left.as_i64(), right.as_i64()) {
3071 (Some(a), Some(b)) if b != 0 => Ok(json!(a % b)),
3072 (None, _) | (_, None) => match (left.as_u64(), right.as_u64()) {
3073 (Some(a), Some(b)) if b != 0 => Ok(json!(a % b)),
3074 _ => Err("Modulo requires non-zero integer operands".into()),
3075 },
3076 _ => Err("Modulo by zero".into()),
3077 }
3078 }
3079
3080 BinaryOp::Gt => self.comparison_op(left, right, |a, b| a > b, |a, b| a > b),
3082 BinaryOp::Lt => self.comparison_op(left, right, |a, b| a < b, |a, b| a < b),
3083 BinaryOp::Gte => self.comparison_op(left, right, |a, b| a >= b, |a, b| a >= b),
3084 BinaryOp::Lte => self.comparison_op(left, right, |a, b| a <= b, |a, b| a <= b),
3085 BinaryOp::Eq => Ok(json!(left == right)),
3086 BinaryOp::Ne => Ok(json!(left != right)),
3087
3088 BinaryOp::And => {
3090 let l_bool = self.value_to_bool(left);
3091 let r_bool = self.value_to_bool(right);
3092 Ok(json!(l_bool && r_bool))
3093 }
3094 BinaryOp::Or => {
3095 let l_bool = self.value_to_bool(left);
3096 let r_bool = self.value_to_bool(right);
3097 Ok(json!(l_bool || r_bool))
3098 }
3099
3100 BinaryOp::Xor => match (left.as_u64(), right.as_u64()) {
3102 (Some(a), Some(b)) => Ok(json!(a ^ b)),
3103 _ => match (left.as_i64(), right.as_i64()) {
3104 (Some(a), Some(b)) => Ok(json!(a ^ b)),
3105 _ => Err("XOR requires integer operands".into()),
3106 },
3107 },
3108 BinaryOp::BitAnd => match (left.as_u64(), right.as_u64()) {
3109 (Some(a), Some(b)) => Ok(json!(a & b)),
3110 _ => match (left.as_i64(), right.as_i64()) {
3111 (Some(a), Some(b)) => Ok(json!(a & b)),
3112 _ => Err("BitAnd requires integer operands".into()),
3113 },
3114 },
3115 BinaryOp::BitOr => match (left.as_u64(), right.as_u64()) {
3116 (Some(a), Some(b)) => Ok(json!(a | b)),
3117 _ => match (left.as_i64(), right.as_i64()) {
3118 (Some(a), Some(b)) => Ok(json!(a | b)),
3119 _ => Err("BitOr requires integer operands".into()),
3120 },
3121 },
3122 BinaryOp::Shl => match (left.as_u64(), right.as_u64()) {
3123 (Some(a), Some(b)) => Ok(json!(a << b)),
3124 _ => match (left.as_i64(), right.as_i64()) {
3125 (Some(a), Some(b)) => Ok(json!(a << b)),
3126 _ => Err("Shl requires integer operands".into()),
3127 },
3128 },
3129 BinaryOp::Shr => match (left.as_u64(), right.as_u64()) {
3130 (Some(a), Some(b)) => Ok(json!(a >> b)),
3131 _ => match (left.as_i64(), right.as_i64()) {
3132 (Some(a), Some(b)) => Ok(json!(a >> b)),
3133 _ => Err("Shr requires integer operands".into()),
3134 },
3135 },
3136 }
3137 }
3138
3139 fn numeric_op<F1, F2>(
3141 &self,
3142 left: &Value,
3143 right: &Value,
3144 int_op: F1,
3145 float_op: F2,
3146 ) -> Result<Value>
3147 where
3148 F1: Fn(i64, i64) -> i64,
3149 F2: Fn(f64, f64) -> f64,
3150 {
3151 if let (Some(a), Some(b)) = (left.as_i64(), right.as_i64()) {
3153 return Ok(json!(int_op(a, b)));
3154 }
3155
3156 if let (Some(a), Some(b)) = (left.as_u64(), right.as_u64()) {
3158 return Ok(json!(int_op(a as i64, b as i64)));
3160 }
3161
3162 if let (Some(a), Some(b)) = (left.as_f64(), right.as_f64()) {
3164 return Ok(json!(float_op(a, b)));
3165 }
3166
3167 if left.is_null() || right.is_null() {
3169 return Ok(Value::Null);
3170 }
3171
3172 Err(format!(
3173 "Cannot perform numeric operation on {:?} and {:?}",
3174 left, right
3175 )
3176 .into())
3177 }
3178
3179 fn comparison_op<F1, F2>(
3181 &self,
3182 left: &Value,
3183 right: &Value,
3184 int_cmp: F1,
3185 float_cmp: F2,
3186 ) -> Result<Value>
3187 where
3188 F1: Fn(i64, i64) -> bool,
3189 F2: Fn(f64, f64) -> bool,
3190 {
3191 if let (Some(a), Some(b)) = (left.as_i64(), right.as_i64()) {
3193 return Ok(json!(int_cmp(a, b)));
3194 }
3195
3196 if let (Some(a), Some(b)) = (left.as_u64(), right.as_u64()) {
3198 return Ok(json!(int_cmp(a as i64, b as i64)));
3199 }
3200
3201 if let (Some(a), Some(b)) = (left.as_f64(), right.as_f64()) {
3203 return Ok(json!(float_cmp(a, b)));
3204 }
3205
3206 if left.is_null() || right.is_null() {
3208 return Ok(json!(false));
3209 }
3210
3211 Err(format!("Cannot compare {:?} and {:?}", left, right).into())
3212 }
3213
3214 fn value_to_bool(&self, value: &Value) -> bool {
3216 match value {
3217 Value::Null => false,
3218 Value::Bool(b) => *b,
3219 Value::Number(n) => {
3220 if let Some(i) = n.as_i64() {
3221 i != 0
3222 } else if let Some(f) = n.as_f64() {
3223 f != 0.0
3224 } else {
3225 true
3226 }
3227 }
3228 Value::String(s) => !s.is_empty(),
3229 Value::Array(arr) => !arr.is_empty(),
3230 Value::Object(obj) => !obj.is_empty(),
3231 }
3232 }
3233
3234 fn apply_cast(&self, value: &Value, to_type: &str) -> Result<Value> {
3236 match to_type {
3237 "i8" | "i16" | "i32" | "i64" | "isize" => {
3238 if let Some(n) = value.as_i64() {
3239 Ok(json!(n))
3240 } else if let Some(n) = value.as_u64() {
3241 Ok(json!(n as i64))
3242 } else if let Some(n) = value.as_f64() {
3243 Ok(json!(n as i64))
3244 } else if let Some(s) = value.as_str() {
3245 s.parse::<i64>()
3246 .map(|n| json!(n))
3247 .map_err(|e| format!("Cannot parse '{}' as integer: {}", s, e).into())
3248 } else {
3249 Err(format!("Cannot cast {:?} to {}", value, to_type).into())
3250 }
3251 }
3252 "u8" | "u16" | "u32" | "u64" | "usize" => {
3253 if let Some(n) = value.as_u64() {
3254 Ok(json!(n))
3255 } else if let Some(n) = value.as_i64() {
3256 Ok(json!(n as u64))
3257 } else if let Some(n) = value.as_f64() {
3258 Ok(json!(n as u64))
3259 } else if let Some(s) = value.as_str() {
3260 s.parse::<u64>().map(|n| json!(n)).map_err(|e| {
3261 format!("Cannot parse '{}' as unsigned integer: {}", s, e).into()
3262 })
3263 } else {
3264 Err(format!("Cannot cast {:?} to {}", value, to_type).into())
3265 }
3266 }
3267 "f32" | "f64" => {
3268 if let Some(n) = value.as_f64() {
3269 Ok(json!(n))
3270 } else if let Some(n) = value.as_i64() {
3271 Ok(json!(n as f64))
3272 } else if let Some(n) = value.as_u64() {
3273 Ok(json!(n as f64))
3274 } else if let Some(s) = value.as_str() {
3275 s.parse::<f64>()
3276 .map(|n| json!(n))
3277 .map_err(|e| format!("Cannot parse '{}' as float: {}", s, e).into())
3278 } else {
3279 Err(format!("Cannot cast {:?} to {}", value, to_type).into())
3280 }
3281 }
3282 "String" | "string" => Ok(json!(value.to_string())),
3283 "bool" => Ok(json!(self.value_to_bool(value))),
3284 _ => {
3285 Ok(value.clone())
3287 }
3288 }
3289 }
3290
3291 fn apply_method_call(&self, value: &Value, method: &str, args: &[Value]) -> Result<Value> {
3293 match method {
3294 "unwrap_or" => {
3295 if value.is_null() && !args.is_empty() {
3296 Ok(args[0].clone())
3297 } else {
3298 Ok(value.clone())
3299 }
3300 }
3301 "unwrap_or_default" => {
3302 if value.is_null() {
3303 Ok(json!(0))
3305 } else {
3306 Ok(value.clone())
3307 }
3308 }
3309 "is_some" => Ok(json!(!value.is_null())),
3310 "is_none" => Ok(json!(value.is_null())),
3311 "abs" => {
3312 if let Some(n) = value.as_i64() {
3313 Ok(json!(n.abs()))
3314 } else if let Some(n) = value.as_f64() {
3315 Ok(json!(n.abs()))
3316 } else {
3317 Err(format!("Cannot call abs() on {:?}", value).into())
3318 }
3319 }
3320 "len" => {
3321 if let Some(s) = value.as_str() {
3322 Ok(json!(s.len()))
3323 } else if let Some(arr) = value.as_array() {
3324 Ok(json!(arr.len()))
3325 } else if let Some(obj) = value.as_object() {
3326 Ok(json!(obj.len()))
3327 } else {
3328 Err(format!("Cannot call len() on {:?}", value).into())
3329 }
3330 }
3331 "to_string" => Ok(json!(value.to_string())),
3332 "min" => {
3333 if args.is_empty() {
3334 return Err("min() requires an argument".into());
3335 }
3336 let other = &args[0];
3337 if let (Some(a), Some(b)) = (value.as_i64(), other.as_i64()) {
3338 Ok(json!(a.min(b)))
3339 } else if let (Some(a), Some(b)) = (value.as_f64(), other.as_f64()) {
3340 Ok(json!(a.min(b)))
3341 } else {
3342 Err(format!("Cannot call min() on {:?} and {:?}", value, other).into())
3343 }
3344 }
3345 "max" => {
3346 if args.is_empty() {
3347 return Err("max() requires an argument".into());
3348 }
3349 let other = &args[0];
3350 if let (Some(a), Some(b)) = (value.as_i64(), other.as_i64()) {
3351 Ok(json!(a.max(b)))
3352 } else if let (Some(a), Some(b)) = (value.as_f64(), other.as_f64()) {
3353 Ok(json!(a.max(b)))
3354 } else {
3355 Err(format!("Cannot call max() on {:?} and {:?}", value, other).into())
3356 }
3357 }
3358 "saturating_add" => {
3359 if args.is_empty() {
3360 return Err("saturating_add() requires an argument".into());
3361 }
3362 let other = &args[0];
3363 if let (Some(a), Some(b)) = (value.as_i64(), other.as_i64()) {
3364 Ok(json!(a.saturating_add(b)))
3365 } else if let (Some(a), Some(b)) = (value.as_u64(), other.as_u64()) {
3366 Ok(json!(a.saturating_add(b)))
3367 } else {
3368 Err(format!(
3369 "Cannot call saturating_add() on {:?} and {:?}",
3370 value, other
3371 )
3372 .into())
3373 }
3374 }
3375 "saturating_sub" => {
3376 if args.is_empty() {
3377 return Err("saturating_sub() requires an argument".into());
3378 }
3379 let other = &args[0];
3380 if let (Some(a), Some(b)) = (value.as_i64(), other.as_i64()) {
3381 Ok(json!(a.saturating_sub(b)))
3382 } else if let (Some(a), Some(b)) = (value.as_u64(), other.as_u64()) {
3383 Ok(json!(a.saturating_sub(b)))
3384 } else {
3385 Err(format!(
3386 "Cannot call saturating_sub() on {:?} and {:?}",
3387 value, other
3388 )
3389 .into())
3390 }
3391 }
3392 _ => Err(format!("Unknown method call: {}()", method).into()),
3393 }
3394 }
3395
3396 pub fn evaluate_computed_fields_from_ast(
3399 &self,
3400 state: &mut Value,
3401 computed_field_specs: &[ComputedFieldSpec],
3402 ) -> Result<Vec<String>> {
3403 let mut updated_paths = Vec::new();
3404
3405 for spec in computed_field_specs {
3406 if let Ok(result) = self.evaluate_computed_expr(&spec.expression, state) {
3407 self.set_field_in_state(state, &spec.target_path, result)?;
3408 updated_paths.push(spec.target_path.clone());
3409 }
3410 }
3411
3412 Ok(updated_paths)
3413 }
3414
3415 fn set_field_in_state(&self, state: &mut Value, path: &str, value: Value) -> Result<()> {
3417 let segments: Vec<&str> = path.split('.').collect();
3418
3419 if segments.is_empty() {
3420 return Err("Empty path".into());
3421 }
3422
3423 let mut current = state;
3425 for (i, segment) in segments.iter().enumerate() {
3426 if i == segments.len() - 1 {
3427 if let Some(obj) = current.as_object_mut() {
3429 obj.insert(segment.to_string(), value);
3430 return Ok(());
3431 } else {
3432 return Err(format!("Cannot set field '{}' on non-object", segment).into());
3433 }
3434 } else {
3435 if !current.is_object() {
3437 *current = json!({});
3438 }
3439 let obj = current.as_object_mut().unwrap();
3440 current = obj.entry(segment.to_string()).or_insert_with(|| json!({}));
3441 }
3442 }
3443
3444 Ok(())
3445 }
3446
3447 pub fn create_evaluator_from_specs(
3450 specs: Vec<ComputedFieldSpec>,
3451 ) -> impl Fn(&mut Value) -> Result<()> + Send + Sync + 'static {
3452 move |state: &mut Value| {
3453 let vm = VmContext::new();
3456 vm.evaluate_computed_fields_from_ast(state, &specs)?;
3457 Ok(())
3458 }
3459 }
3460}
3461
3462impl Default for VmContext {
3463 fn default() -> Self {
3464 Self::new()
3465 }
3466}
3467
3468impl crate::resolvers::ReverseLookupUpdater for VmContext {
3470 fn update(&mut self, pda_address: String, seed_value: String) -> Vec<PendingAccountUpdate> {
3471 self.update_pda_reverse_lookup(0, "default_pda_lookup", pda_address, seed_value)
3473 .unwrap_or_else(|e| {
3474 tracing::error!("Failed to update PDA reverse lookup: {}", e);
3475 Vec::new()
3476 })
3477 }
3478
3479 fn flush_pending(&mut self, pda_address: &str) -> Vec<PendingAccountUpdate> {
3480 self.flush_pending_updates(0, pda_address)
3482 .unwrap_or_else(|e| {
3483 tracing::error!("Failed to flush pending updates: {}", e);
3484 Vec::new()
3485 })
3486 }
3487}
3488
3489#[cfg(test)]
3490mod tests {
3491 use super::*;
3492 use crate::ast::{BinaryOp, ComputedExpr, ComputedFieldSpec};
3493
3494 #[test]
3495 fn test_computed_field_preserves_integer_type() {
3496 let vm = VmContext::new();
3497
3498 let mut state = serde_json::json!({
3499 "trading": {
3500 "total_buy_volume": 20000000000_i64,
3501 "total_sell_volume": 17951316474_i64
3502 }
3503 });
3504
3505 let spec = ComputedFieldSpec {
3506 target_path: "trading.total_volume".to_string(),
3507 result_type: "Option<u64>".to_string(),
3508 expression: ComputedExpr::Binary {
3509 op: BinaryOp::Add,
3510 left: Box::new(ComputedExpr::UnwrapOr {
3511 expr: Box::new(ComputedExpr::FieldRef {
3512 path: "trading.total_buy_volume".to_string(),
3513 }),
3514 default: serde_json::json!(0),
3515 }),
3516 right: Box::new(ComputedExpr::UnwrapOr {
3517 expr: Box::new(ComputedExpr::FieldRef {
3518 path: "trading.total_sell_volume".to_string(),
3519 }),
3520 default: serde_json::json!(0),
3521 }),
3522 },
3523 };
3524
3525 vm.evaluate_computed_fields_from_ast(&mut state, &[spec])
3526 .unwrap();
3527
3528 let total_volume = state
3529 .get("trading")
3530 .and_then(|t| t.get("total_volume"))
3531 .expect("total_volume should exist");
3532
3533 let serialized = serde_json::to_string(total_volume).unwrap();
3534 assert!(
3535 !serialized.contains('.'),
3536 "Integer should not have decimal point: {}",
3537 serialized
3538 );
3539 assert_eq!(
3540 total_volume.as_i64(),
3541 Some(37951316474),
3542 "Value should be correct sum"
3543 );
3544 }
3545
3546 #[test]
3547 fn test_set_field_sum_preserves_integer_type() {
3548 let mut vm = VmContext::new();
3549 vm.registers[0] = serde_json::json!({});
3550 vm.registers[1] = serde_json::json!(20000000000_i64);
3551 vm.registers[2] = serde_json::json!(17951316474_i64);
3552
3553 vm.set_field_sum(0, "trading.total_buy_volume", 1).unwrap();
3554 vm.set_field_sum(0, "trading.total_sell_volume", 2).unwrap();
3555
3556 let state = &vm.registers[0];
3557 let buy_vol = state
3558 .get("trading")
3559 .and_then(|t| t.get("total_buy_volume"))
3560 .unwrap();
3561 let sell_vol = state
3562 .get("trading")
3563 .and_then(|t| t.get("total_sell_volume"))
3564 .unwrap();
3565
3566 let buy_serialized = serde_json::to_string(buy_vol).unwrap();
3567 let sell_serialized = serde_json::to_string(sell_vol).unwrap();
3568
3569 assert!(
3570 !buy_serialized.contains('.'),
3571 "Buy volume should not have decimal: {}",
3572 buy_serialized
3573 );
3574 assert!(
3575 !sell_serialized.contains('.'),
3576 "Sell volume should not have decimal: {}",
3577 sell_serialized
3578 );
3579 }
3580}