1use crate::ast::{
2 BinaryOp, ComparisonOp, ComputedExpr, ComputedFieldSpec, FieldPath, Transformation,
3};
4use crate::compiler::{MultiEntityBytecode, OpCode};
5use crate::Mutation;
6use dashmap::DashMap;
7use lru::LruCache;
8use serde_json::{json, Value};
9use std::collections::{HashMap, HashSet};
10use std::num::NonZeroUsize;
11
12#[cfg(feature = "otel")]
13use tracing::instrument;
14
15#[derive(Debug, Clone, Default)]
18pub struct UpdateContext {
19 pub slot: Option<u64>,
21 pub signature: Option<String>,
23 pub timestamp: Option<i64>,
26 pub write_version: Option<u64>,
29 pub txn_index: Option<u64>,
32 pub metadata: HashMap<String, Value>,
34}
35
36impl UpdateContext {
37 pub fn new(slot: u64, signature: String) -> Self {
39 Self {
40 slot: Some(slot),
41 signature: Some(signature),
42 timestamp: None,
43 write_version: None,
44 txn_index: None,
45 metadata: HashMap::new(),
46 }
47 }
48
49 pub fn with_timestamp(slot: u64, signature: String, timestamp: i64) -> Self {
51 Self {
52 slot: Some(slot),
53 signature: Some(signature),
54 timestamp: Some(timestamp),
55 write_version: None,
56 txn_index: None,
57 metadata: HashMap::new(),
58 }
59 }
60
61 pub fn new_account(slot: u64, signature: String, write_version: u64) -> Self {
63 Self {
64 slot: Some(slot),
65 signature: Some(signature),
66 timestamp: None,
67 write_version: Some(write_version),
68 txn_index: None,
69 metadata: HashMap::new(),
70 }
71 }
72
73 pub fn new_instruction(slot: u64, signature: String, txn_index: u64) -> Self {
75 Self {
76 slot: Some(slot),
77 signature: Some(signature),
78 timestamp: None,
79 write_version: None,
80 txn_index: Some(txn_index),
81 metadata: HashMap::new(),
82 }
83 }
84
85 pub fn timestamp(&self) -> i64 {
87 self.timestamp.unwrap_or_else(|| {
88 std::time::SystemTime::now()
89 .duration_since(std::time::UNIX_EPOCH)
90 .unwrap()
91 .as_secs() as i64
92 })
93 }
94
95 pub fn empty() -> Self {
97 Self::default()
98 }
99
100 pub fn with_metadata(mut self, key: String, value: Value) -> Self {
102 self.metadata.insert(key, value);
103 self
104 }
105
106 pub fn get_metadata(&self, key: &str) -> Option<&Value> {
108 self.metadata.get(key)
109 }
110
111 pub fn to_value(&self) -> Value {
113 let mut obj = serde_json::Map::new();
114 if let Some(slot) = self.slot {
115 obj.insert("slot".to_string(), json!(slot));
116 }
117 if let Some(ref sig) = self.signature {
118 obj.insert("signature".to_string(), json!(sig));
119 }
120 obj.insert("timestamp".to_string(), json!(self.timestamp()));
122 for (key, value) in &self.metadata {
123 obj.insert(key.clone(), value.clone());
124 }
125 Value::Object(obj)
126 }
127}
128
129pub type Register = usize;
130pub type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>;
131
132pub type RegisterValue = Value;
133
134pub trait ComputedFieldsEvaluator {
137 fn evaluate(&self, state: &mut Value) -> Result<()>;
138}
139
140const MAX_PENDING_UPDATES_TOTAL: usize = 2_500;
142const MAX_PENDING_UPDATES_PER_PDA: usize = 50;
143const PENDING_UPDATE_TTL_SECONDS: i64 = 300; const TEMPORAL_HISTORY_TTL_SECONDS: i64 = 300; const MAX_TEMPORAL_ENTRIES_PER_KEY: usize = 250;
148
149const DEFAULT_MAX_STATE_TABLE_ENTRIES: usize = 2_500;
151const DEFAULT_MAX_ARRAY_LENGTH: usize = 100;
152
153const DEFAULT_MAX_LOOKUP_INDEX_ENTRIES: usize = 2_500;
154
155const DEFAULT_MAX_VERSION_TRACKER_ENTRIES: usize = 2_500;
156
157const DEFAULT_MAX_TEMPORAL_INDEX_KEYS: usize = 2_500;
158
159const DEFAULT_MAX_PDA_REVERSE_LOOKUP_ENTRIES: usize = 2_500;
160
161fn estimate_json_size(value: &Value) -> usize {
163 match value {
164 Value::Null => 4,
165 Value::Bool(_) => 5,
166 Value::Number(_) => 8,
167 Value::String(s) => s.len() + 2,
168 Value::Array(arr) => 2 + arr.iter().map(|v| estimate_json_size(v) + 1).sum::<usize>(),
169 Value::Object(obj) => {
170 2 + obj
171 .iter()
172 .map(|(k, v)| k.len() + 3 + estimate_json_size(v) + 1)
173 .sum::<usize>()
174 }
175 }
176}
177
178#[derive(Debug, Clone)]
179pub struct CompiledPath {
180 pub segments: std::sync::Arc<[String]>,
181}
182
183impl CompiledPath {
184 pub fn new(path: &str) -> Self {
185 let segments: Vec<String> = path.split('.').map(|s| s.to_string()).collect();
186 CompiledPath {
187 segments: segments.into(),
188 }
189 }
190
191 fn segments(&self) -> &[String] {
192 &self.segments
193 }
194}
195
196#[derive(Debug, Clone)]
199pub enum FieldChange {
200 Replaced,
202 Appended(Vec<Value>),
204}
205
206#[derive(Debug, Clone, Default)]
209pub struct DirtyTracker {
210 changes: HashMap<String, FieldChange>,
211}
212
213impl DirtyTracker {
214 pub fn new() -> Self {
216 Self {
217 changes: HashMap::new(),
218 }
219 }
220
221 pub fn mark_replaced(&mut self, path: &str) {
223 self.changes.insert(path.to_string(), FieldChange::Replaced);
225 }
226
227 pub fn mark_appended(&mut self, path: &str, value: Value) {
229 match self.changes.get_mut(path) {
230 Some(FieldChange::Appended(values)) => {
231 values.push(value);
233 }
234 Some(FieldChange::Replaced) => {
235 }
238 None => {
239 self.changes
241 .insert(path.to_string(), FieldChange::Appended(vec![value]));
242 }
243 }
244 }
245
246 pub fn is_empty(&self) -> bool {
248 self.changes.is_empty()
249 }
250
251 pub fn len(&self) -> usize {
253 self.changes.len()
254 }
255
256 pub fn iter(&self) -> impl Iterator<Item = (&String, &FieldChange)> {
258 self.changes.iter()
259 }
260
261 pub fn dirty_paths(&self) -> HashSet<String> {
263 self.changes.keys().cloned().collect()
264 }
265
266 pub fn into_changes(self) -> HashMap<String, FieldChange> {
268 self.changes
269 }
270
271 pub fn changes(&self) -> &HashMap<String, FieldChange> {
273 &self.changes
274 }
275
276 pub fn appended_paths(&self) -> Vec<String> {
278 self.changes
279 .iter()
280 .filter_map(|(path, change)| match change {
281 FieldChange::Appended(_) => Some(path.clone()),
282 FieldChange::Replaced => None,
283 })
284 .collect()
285 }
286}
287
288pub struct VmContext {
289 registers: Vec<RegisterValue>,
290 states: HashMap<u32, StateTable>,
291 pub instructions_executed: u64,
292 pub cache_hits: u64,
293 path_cache: HashMap<String, CompiledPath>,
294 pub pda_cache_hits: u64,
295 pub pda_cache_misses: u64,
296 pub pending_queue_size: u64,
297 current_context: Option<UpdateContext>,
298 warnings: Vec<String>,
299 last_pda_lookup_miss: Option<String>,
300 last_pda_registered: Option<String>,
301}
302
303#[derive(Debug)]
304pub struct LookupIndex {
305 index: std::sync::Mutex<LruCache<String, Value>>,
306}
307
308impl LookupIndex {
309 pub fn new() -> Self {
310 Self::with_capacity(DEFAULT_MAX_LOOKUP_INDEX_ENTRIES)
311 }
312
313 pub fn with_capacity(capacity: usize) -> Self {
314 LookupIndex {
315 index: std::sync::Mutex::new(LruCache::new(
316 NonZeroUsize::new(capacity).expect("capacity must be > 0"),
317 )),
318 }
319 }
320
321 pub fn lookup(&self, lookup_value: &Value) -> Option<Value> {
322 let key = value_to_cache_key(lookup_value);
323 self.index.lock().unwrap().get(&key).cloned()
324 }
325
326 pub fn insert(&self, lookup_value: Value, primary_key: Value) {
327 let key = value_to_cache_key(&lookup_value);
328 self.index.lock().unwrap().put(key, primary_key);
329 }
330
331 pub fn len(&self) -> usize {
332 self.index.lock().unwrap().len()
333 }
334
335 pub fn is_empty(&self) -> bool {
336 self.index.lock().unwrap().is_empty()
337 }
338}
339
340impl Default for LookupIndex {
341 fn default() -> Self {
342 Self::new()
343 }
344}
345
346fn value_to_cache_key(value: &Value) -> String {
347 match value {
348 Value::String(s) => s.clone(),
349 Value::Number(n) => n.to_string(),
350 Value::Bool(b) => b.to_string(),
351 Value::Null => "null".to_string(),
352 _ => serde_json::to_string(value).unwrap_or_else(|_| "unknown".to_string()),
353 }
354}
355
356#[derive(Debug)]
357pub struct TemporalIndex {
358 index: std::sync::Mutex<LruCache<String, Vec<(Value, i64)>>>,
359}
360
361impl Default for TemporalIndex {
362 fn default() -> Self {
363 Self::new()
364 }
365}
366
367impl TemporalIndex {
368 pub fn new() -> Self {
369 Self::with_capacity(DEFAULT_MAX_TEMPORAL_INDEX_KEYS)
370 }
371
372 pub fn with_capacity(capacity: usize) -> Self {
373 TemporalIndex {
374 index: std::sync::Mutex::new(LruCache::new(
375 NonZeroUsize::new(capacity).expect("capacity must be > 0"),
376 )),
377 }
378 }
379
380 pub fn lookup(&self, lookup_value: &Value, timestamp: i64) -> Option<Value> {
381 let key = value_to_cache_key(lookup_value);
382 let mut cache = self.index.lock().unwrap();
383 if let Some(entries) = cache.get(&key) {
384 for i in (0..entries.len()).rev() {
385 if entries[i].1 <= timestamp {
386 return Some(entries[i].0.clone());
387 }
388 }
389 }
390 None
391 }
392
393 pub fn lookup_latest(&self, lookup_value: &Value) -> Option<Value> {
394 let key = value_to_cache_key(lookup_value);
395 let mut cache = self.index.lock().unwrap();
396 if let Some(entries) = cache.get(&key) {
397 if let Some(last) = entries.last() {
398 return Some(last.0.clone());
399 }
400 }
401 None
402 }
403
404 pub fn insert(&self, lookup_value: Value, primary_key: Value, timestamp: i64) {
405 let key = value_to_cache_key(&lookup_value);
406 let mut cache = self.index.lock().unwrap();
407
408 let entries = cache.get_or_insert_mut(key, Vec::new);
409 entries.push((primary_key, timestamp));
410 entries.sort_by_key(|(_, ts)| *ts);
411
412 let cutoff = timestamp - TEMPORAL_HISTORY_TTL_SECONDS;
413 entries.retain(|(_, ts)| *ts >= cutoff);
414
415 if entries.len() > MAX_TEMPORAL_ENTRIES_PER_KEY {
416 let excess = entries.len() - MAX_TEMPORAL_ENTRIES_PER_KEY;
417 entries.drain(0..excess);
418 }
419 }
420
421 pub fn len(&self) -> usize {
422 self.index.lock().unwrap().len()
423 }
424
425 pub fn is_empty(&self) -> bool {
426 self.index.lock().unwrap().is_empty()
427 }
428
429 pub fn total_entries(&self) -> usize {
430 self.index
431 .lock()
432 .unwrap()
433 .iter()
434 .map(|(_, entries)| entries.len())
435 .sum()
436 }
437
438 pub fn cleanup_expired(&self, cutoff_timestamp: i64) -> usize {
439 let mut cache = self.index.lock().unwrap();
440 let mut total_removed = 0;
441
442 for (_, entries) in cache.iter_mut() {
443 let original_len = entries.len();
444 entries.retain(|(_, ts)| *ts >= cutoff_timestamp);
445 total_removed += original_len - entries.len();
446 }
447
448 total_removed
449 }
450}
451
452#[derive(Debug)]
453pub struct PdaReverseLookup {
454 index: LruCache<String, String>,
456}
457
458impl PdaReverseLookup {
459 pub fn new(capacity: usize) -> Self {
460 PdaReverseLookup {
461 index: LruCache::new(NonZeroUsize::new(capacity).unwrap()),
462 }
463 }
464
465 pub fn lookup(&mut self, pda_address: &str) -> Option<String> {
466 self.index.get(pda_address).cloned()
467 }
468
469 pub fn insert(&mut self, pda_address: String, seed_value: String) -> Option<String> {
470 let evicted = if self.index.len() >= self.index.cap().get() {
471 self.index.peek_lru().map(|(k, _)| k.clone())
472 } else {
473 None
474 };
475
476 self.index.put(pda_address, seed_value);
477 evicted
478 }
479
480 pub fn len(&self) -> usize {
481 self.index.len()
482 }
483
484 pub fn is_empty(&self) -> bool {
485 self.index.is_empty()
486 }
487}
488
489#[derive(Debug, Clone)]
491pub struct QueuedAccountUpdate {
492 pub pda_address: String,
493 pub account_type: String,
494 pub account_data: Value,
495 pub slot: u64,
496 pub write_version: u64,
497 pub signature: String,
498}
499
500#[derive(Debug, Clone)]
502pub struct PendingAccountUpdate {
503 pub account_type: String,
504 pub pda_address: String,
505 pub account_data: Value,
506 pub slot: u64,
507 pub write_version: u64,
508 pub signature: String,
509 pub queued_at: i64,
510}
511
512#[derive(Debug, Clone)]
514pub struct QueuedInstructionEvent {
515 pub pda_address: String,
516 pub event_type: String,
517 pub event_data: Value,
518 pub slot: u64,
519 pub signature: String,
520}
521
522#[derive(Debug, Clone)]
524pub struct PendingInstructionEvent {
525 pub event_type: String,
526 pub pda_address: String,
527 pub event_data: Value,
528 pub slot: u64,
529 pub signature: String,
530 pub queued_at: i64,
531}
532
533#[derive(Debug, Clone)]
534pub struct PendingQueueStats {
535 pub total_updates: usize,
536 pub unique_pdas: usize,
537 pub oldest_age_seconds: i64,
538 pub largest_pda_queue_size: usize,
539 pub estimated_memory_bytes: usize,
540}
541
542#[derive(Debug, Clone, Default)]
543pub struct VmMemoryStats {
544 pub state_table_entity_count: usize,
545 pub state_table_max_entries: usize,
546 pub state_table_at_capacity: bool,
547 pub lookup_index_count: usize,
548 pub lookup_index_total_entries: usize,
549 pub temporal_index_count: usize,
550 pub temporal_index_total_entries: usize,
551 pub pda_reverse_lookup_count: usize,
552 pub pda_reverse_lookup_total_entries: usize,
553 pub version_tracker_entries: usize,
554 pub pending_queue_stats: Option<PendingQueueStats>,
555 pub path_cache_size: usize,
556}
557
558#[derive(Debug, Clone, Default)]
559pub struct CleanupResult {
560 pub pending_updates_removed: usize,
561 pub temporal_entries_removed: usize,
562}
563
564#[derive(Debug, Clone)]
565pub struct CapacityWarning {
566 pub current_entries: usize,
567 pub max_entries: usize,
568 pub entries_over_limit: usize,
569}
570
571#[derive(Debug, Clone)]
572pub struct StateTableConfig {
573 pub max_entries: usize,
574 pub max_array_length: usize,
575}
576
577impl Default for StateTableConfig {
578 fn default() -> Self {
579 Self {
580 max_entries: DEFAULT_MAX_STATE_TABLE_ENTRIES,
581 max_array_length: DEFAULT_MAX_ARRAY_LENGTH,
582 }
583 }
584}
585
586#[derive(Debug)]
587pub struct VersionTracker {
588 cache: std::sync::Mutex<LruCache<String, (u64, u64)>>,
589}
590
591impl VersionTracker {
592 pub fn new() -> Self {
593 Self::with_capacity(DEFAULT_MAX_VERSION_TRACKER_ENTRIES)
594 }
595
596 pub fn with_capacity(capacity: usize) -> Self {
597 VersionTracker {
598 cache: std::sync::Mutex::new(LruCache::new(
599 NonZeroUsize::new(capacity).expect("capacity must be > 0"),
600 )),
601 }
602 }
603
604 fn make_key(primary_key: &Value, event_type: &str) -> String {
605 format!("{}:{}", primary_key, event_type)
606 }
607
608 pub fn get(&self, primary_key: &Value, event_type: &str) -> Option<(u64, u64)> {
609 let key = Self::make_key(primary_key, event_type);
610 self.cache.lock().unwrap().get(&key).copied()
611 }
612
613 pub fn insert(&self, primary_key: &Value, event_type: &str, slot: u64, ordering_value: u64) {
614 let key = Self::make_key(primary_key, event_type);
615 self.cache.lock().unwrap().put(key, (slot, ordering_value));
616 }
617
618 pub fn len(&self) -> usize {
619 self.cache.lock().unwrap().len()
620 }
621
622 pub fn is_empty(&self) -> bool {
623 self.cache.lock().unwrap().is_empty()
624 }
625}
626
627impl Default for VersionTracker {
628 fn default() -> Self {
629 Self::new()
630 }
631}
632
633#[derive(Debug)]
634pub struct StateTable {
635 pub data: DashMap<Value, Value>,
636 access_times: DashMap<Value, i64>,
637 pub lookup_indexes: HashMap<String, LookupIndex>,
638 pub temporal_indexes: HashMap<String, TemporalIndex>,
639 pub pda_reverse_lookups: HashMap<String, PdaReverseLookup>,
640 pub pending_updates: DashMap<String, Vec<PendingAccountUpdate>>,
641 pub pending_instruction_events: DashMap<String, Vec<PendingInstructionEvent>>,
642 version_tracker: VersionTracker,
643 config: StateTableConfig,
644 #[cfg_attr(not(feature = "otel"), allow(dead_code))]
645 entity_name: String,
646}
647
648impl StateTable {
649 pub fn is_at_capacity(&self) -> bool {
650 self.data.len() >= self.config.max_entries
651 }
652
653 pub fn entries_over_limit(&self) -> usize {
654 self.data.len().saturating_sub(self.config.max_entries)
655 }
656
657 pub fn max_array_length(&self) -> usize {
658 self.config.max_array_length
659 }
660
661 fn touch(&self, key: &Value) {
662 let now = std::time::SystemTime::now()
663 .duration_since(std::time::UNIX_EPOCH)
664 .unwrap()
665 .as_secs() as i64;
666 self.access_times.insert(key.clone(), now);
667 }
668
669 fn evict_lru(&self, count: usize) -> usize {
670 if count == 0 || self.data.is_empty() {
671 return 0;
672 }
673
674 let mut entries: Vec<(Value, i64)> = self
675 .access_times
676 .iter()
677 .map(|entry| (entry.key().clone(), *entry.value()))
678 .collect();
679
680 entries.sort_by_key(|(_, ts)| *ts);
681
682 let to_evict: Vec<Value> = entries.iter().take(count).map(|(k, _)| k.clone()).collect();
683
684 let mut evicted = 0;
685 for key in to_evict {
686 self.data.remove(&key);
687 self.access_times.remove(&key);
688 evicted += 1;
689 }
690
691 #[cfg(feature = "otel")]
692 if evicted > 0 {
693 crate::vm_metrics::record_state_table_eviction(evicted as u64, &self.entity_name);
694 }
695
696 evicted
697 }
698
699 pub fn insert_with_eviction(&self, key: Value, value: Value) {
700 if self.data.len() >= self.config.max_entries && !self.data.contains_key(&key) {
701 #[cfg(feature = "otel")]
702 crate::vm_metrics::record_state_table_at_capacity(&self.entity_name);
703 let to_evict = (self.data.len() + 1).saturating_sub(self.config.max_entries);
704 self.evict_lru(to_evict.max(1));
705 }
706 self.data.insert(key.clone(), value);
707 self.touch(&key);
708 }
709
710 pub fn get_and_touch(&self, key: &Value) -> Option<Value> {
711 let result = self.data.get(key).map(|v| v.clone());
712 if result.is_some() {
713 self.touch(key);
714 }
715 result
716 }
717
718 pub fn is_fresh_update(
725 &self,
726 primary_key: &Value,
727 event_type: &str,
728 slot: u64,
729 ordering_value: u64,
730 ) -> bool {
731 let dominated = self
732 .version_tracker
733 .get(primary_key, event_type)
734 .map(|(last_slot, last_version)| (slot, ordering_value) <= (last_slot, last_version))
735 .unwrap_or(false);
736
737 if dominated {
738 return false;
739 }
740
741 self.version_tracker
742 .insert(primary_key, event_type, slot, ordering_value);
743 true
744 }
745}
746
747impl VmContext {
748 pub fn new() -> Self {
749 let mut vm = VmContext {
750 registers: vec![Value::Null; 256],
751 states: HashMap::new(),
752 instructions_executed: 0,
753 cache_hits: 0,
754 path_cache: HashMap::new(),
755 pda_cache_hits: 0,
756 pda_cache_misses: 0,
757 pending_queue_size: 0,
758 current_context: None,
759 warnings: Vec::new(),
760 last_pda_lookup_miss: None,
761 last_pda_registered: None,
762 };
763 vm.states.insert(
764 0,
765 StateTable {
766 data: DashMap::new(),
767 access_times: DashMap::new(),
768 lookup_indexes: HashMap::new(),
769 temporal_indexes: HashMap::new(),
770 pda_reverse_lookups: HashMap::new(),
771 pending_updates: DashMap::new(),
772 pending_instruction_events: DashMap::new(),
773 version_tracker: VersionTracker::new(),
774 config: StateTableConfig::default(),
775 entity_name: "default".to_string(),
776 },
777 );
778 vm
779 }
780
781 pub fn new_with_config(state_config: StateTableConfig) -> Self {
782 let mut vm = VmContext {
783 registers: vec![Value::Null; 256],
784 states: HashMap::new(),
785 instructions_executed: 0,
786 cache_hits: 0,
787 path_cache: HashMap::new(),
788 pda_cache_hits: 0,
789 pda_cache_misses: 0,
790 pending_queue_size: 0,
791 current_context: None,
792 warnings: Vec::new(),
793 last_pda_lookup_miss: None,
794 last_pda_registered: None,
795 };
796 vm.states.insert(
797 0,
798 StateTable {
799 data: DashMap::new(),
800 access_times: DashMap::new(),
801 lookup_indexes: HashMap::new(),
802 temporal_indexes: HashMap::new(),
803 pda_reverse_lookups: HashMap::new(),
804 pending_updates: DashMap::new(),
805 pending_instruction_events: DashMap::new(),
806 version_tracker: VersionTracker::new(),
807 config: state_config,
808 entity_name: "default".to_string(),
809 },
810 );
811 vm
812 }
813
814 pub fn get_state_table_mut(&mut self, state_id: u32) -> Option<&mut StateTable> {
817 self.states.get_mut(&state_id)
818 }
819
820 pub fn registers_mut(&mut self) -> &mut Vec<RegisterValue> {
822 &mut self.registers
823 }
824
825 pub fn path_cache(&self) -> &HashMap<String, CompiledPath> {
827 &self.path_cache
828 }
829
830 pub fn current_context(&self) -> Option<&UpdateContext> {
832 self.current_context.as_ref()
833 }
834
835 fn add_warning(&mut self, msg: String) {
836 self.warnings.push(msg);
837 }
838
839 pub fn take_warnings(&mut self) -> Vec<String> {
840 std::mem::take(&mut self.warnings)
841 }
842
843 pub fn has_warnings(&self) -> bool {
844 !self.warnings.is_empty()
845 }
846
847 pub fn update_state_from_register(
848 &mut self,
849 state_id: u32,
850 key: Value,
851 register: Register,
852 ) -> Result<()> {
853 let state = self.states.get(&state_id).ok_or("State table not found")?;
854 let value = self.registers[register].clone();
855 state.insert_with_eviction(key, value);
856 Ok(())
857 }
858
859 fn reset_registers(&mut self) {
860 for reg in &mut self.registers {
861 *reg = Value::Null;
862 }
863 }
864
865 pub fn extract_partial_state(
867 &self,
868 state_reg: Register,
869 dirty_fields: &HashSet<String>,
870 ) -> Result<Value> {
871 let full_state = &self.registers[state_reg];
872
873 if dirty_fields.is_empty() {
874 return Ok(json!({}));
875 }
876
877 let mut partial = serde_json::Map::new();
878
879 for path in dirty_fields {
880 let segments: Vec<&str> = path.split('.').collect();
881
882 let mut current = full_state;
883 let mut found = true;
884
885 for segment in &segments {
886 match current.get(segment) {
887 Some(v) => current = v,
888 None => {
889 found = false;
890 break;
891 }
892 }
893 }
894
895 if !found {
896 continue;
897 }
898
899 let mut target = &mut partial;
900 for (i, segment) in segments.iter().enumerate() {
901 if i == segments.len() - 1 {
902 target.insert(segment.to_string(), current.clone());
903 } else {
904 target
905 .entry(segment.to_string())
906 .or_insert_with(|| json!({}));
907 target = target
908 .get_mut(*segment)
909 .and_then(|v| v.as_object_mut())
910 .ok_or("Failed to build nested structure")?;
911 }
912 }
913 }
914
915 Ok(Value::Object(partial))
916 }
917
918 pub fn extract_partial_state_with_tracker(
922 &self,
923 state_reg: Register,
924 tracker: &DirtyTracker,
925 ) -> Result<Value> {
926 let full_state = &self.registers[state_reg];
927
928 if tracker.is_empty() {
929 return Ok(json!({}));
930 }
931
932 let mut partial = serde_json::Map::new();
933
934 for (path, change) in tracker.iter() {
935 let segments: Vec<&str> = path.split('.').collect();
936
937 let value_to_insert = match change {
938 FieldChange::Replaced => {
939 let mut current = full_state;
940 let mut found = true;
941
942 for segment in &segments {
943 match current.get(*segment) {
944 Some(v) => current = v,
945 None => {
946 found = false;
947 break;
948 }
949 }
950 }
951
952 if !found {
953 continue;
954 }
955 current.clone()
956 }
957 FieldChange::Appended(values) => Value::Array(values.clone()),
958 };
959
960 let mut target = &mut partial;
961 for (i, segment) in segments.iter().enumerate() {
962 if i == segments.len() - 1 {
963 target.insert(segment.to_string(), value_to_insert.clone());
964 } else {
965 target
966 .entry(segment.to_string())
967 .or_insert_with(|| json!({}));
968 target = target
969 .get_mut(*segment)
970 .and_then(|v| v.as_object_mut())
971 .ok_or("Failed to build nested structure")?;
972 }
973 }
974 }
975
976 Ok(Value::Object(partial))
977 }
978
979 fn get_compiled_path(&mut self, path: &str) -> CompiledPath {
980 if let Some(compiled) = self.path_cache.get(path) {
981 self.cache_hits += 1;
982 #[cfg(feature = "otel")]
983 crate::vm_metrics::record_path_cache_hit();
984 return compiled.clone();
985 }
986 #[cfg(feature = "otel")]
987 crate::vm_metrics::record_path_cache_miss();
988 let compiled = CompiledPath::new(path);
989 self.path_cache.insert(path.to_string(), compiled.clone());
990 compiled
991 }
992
993 #[cfg_attr(feature = "otel", instrument(
995 name = "vm.process_event",
996 skip(self, bytecode, event_value, log),
997 level = "info",
998 fields(
999 event_type = %event_type,
1000 slot = context.as_ref().and_then(|c| c.slot),
1001 )
1002 ))]
1003 pub fn process_event(
1004 &mut self,
1005 bytecode: &MultiEntityBytecode,
1006 event_value: Value,
1007 event_type: &str,
1008 context: Option<&UpdateContext>,
1009 mut log: Option<&mut crate::canonical_log::CanonicalLog>,
1010 ) -> Result<Vec<Mutation>> {
1011 self.current_context = context.cloned();
1012
1013 let mut event_value = event_value;
1014 if let Some(ctx) = context {
1015 if let Some(obj) = event_value.as_object_mut() {
1016 obj.insert("__update_context".to_string(), ctx.to_value());
1017 }
1018 }
1019
1020 let mut all_mutations = Vec::new();
1021
1022 if let Some(entity_names) = bytecode.event_routing.get(event_type) {
1023 for entity_name in entity_names {
1024 if let Some(entity_bytecode) = bytecode.entities.get(entity_name) {
1025 if let Some(handler) = entity_bytecode.handlers.get(event_type) {
1026 if let Some(ref mut log) = log {
1027 log.set("entity", entity_name.clone());
1028 log.inc("handlers", 1);
1029 }
1030
1031 let opcodes_before = self.instructions_executed;
1032 let cache_before = self.cache_hits;
1033 let pda_hits_before = self.pda_cache_hits;
1034 let pda_misses_before = self.pda_cache_misses;
1035
1036 let mutations = self.execute_handler(
1037 handler,
1038 &event_value,
1039 event_type,
1040 entity_bytecode.state_id,
1041 entity_name,
1042 entity_bytecode.computed_fields_evaluator.as_ref(),
1043 )?;
1044
1045 if let Some(ref mut log) = log {
1046 log.inc(
1047 "opcodes",
1048 (self.instructions_executed - opcodes_before) as i64,
1049 );
1050 log.inc("cache_hits", (self.cache_hits - cache_before) as i64);
1051 log.inc("pda_hits", (self.pda_cache_hits - pda_hits_before) as i64);
1052 log.inc(
1053 "pda_misses",
1054 (self.pda_cache_misses - pda_misses_before) as i64,
1055 );
1056 }
1057
1058 if mutations.is_empty() {
1059 if let Some(missed_pda) = self.take_last_pda_lookup_miss() {
1060 if event_type.ends_with("IxState") {
1061 let slot = context.and_then(|c| c.slot).unwrap_or(0);
1062 let signature = context
1063 .and_then(|c| c.signature.clone())
1064 .unwrap_or_default();
1065 let _ = self.queue_instruction_event(
1066 entity_bytecode.state_id,
1067 QueuedInstructionEvent {
1068 pda_address: missed_pda,
1069 event_type: event_type.to_string(),
1070 event_data: event_value.clone(),
1071 slot,
1072 signature,
1073 },
1074 );
1075 }
1076 }
1077 }
1078
1079 all_mutations.extend(mutations);
1080
1081 if let Some(registered_pda) = self.take_last_pda_registered() {
1082 let pending_events = self.flush_pending_instruction_events(
1083 entity_bytecode.state_id,
1084 ®istered_pda,
1085 );
1086 for pending in pending_events {
1087 if let Some(pending_handler) =
1088 entity_bytecode.handlers.get(&pending.event_type)
1089 {
1090 if let Ok(reprocessed_mutations) = self.execute_handler(
1091 pending_handler,
1092 &pending.event_data,
1093 &pending.event_type,
1094 entity_bytecode.state_id,
1095 entity_name,
1096 entity_bytecode.computed_fields_evaluator.as_ref(),
1097 ) {
1098 all_mutations.extend(reprocessed_mutations);
1099 }
1100 }
1101 }
1102 }
1103 } else if let Some(ref mut log) = log {
1104 log.set("skip_reason", "no_handler");
1105 }
1106 } else if let Some(ref mut log) = log {
1107 log.set("skip_reason", "entity_not_found");
1108 }
1109 }
1110 } else if let Some(ref mut log) = log {
1111 log.set("skip_reason", "no_event_routing");
1112 }
1113
1114 if let Some(log) = log {
1115 log.set("mutations", all_mutations.len() as i64);
1116 if let Some(first) = all_mutations.first() {
1117 if let Some(key_str) = first.key.as_str() {
1118 log.set("primary_key", key_str);
1119 } else if let Some(key_num) = first.key.as_u64() {
1120 log.set("primary_key", key_num as i64);
1121 }
1122 }
1123 if let Some(state) = self.states.get(&0) {
1124 log.set("state_table_size", state.data.len() as i64);
1125 }
1126
1127 let warnings = self.take_warnings();
1128 if !warnings.is_empty() {
1129 log.set("warnings", warnings.len() as i64);
1130 log.set(
1131 "warning_messages",
1132 Value::Array(warnings.into_iter().map(Value::String).collect()),
1133 );
1134 log.set_level(crate::canonical_log::LogLevel::Warn);
1135 }
1136 } else {
1137 self.warnings.clear();
1138 }
1139
1140 Ok(all_mutations)
1141 }
1142
1143 pub fn process_any(
1144 &mut self,
1145 bytecode: &MultiEntityBytecode,
1146 any: prost_types::Any,
1147 ) -> Result<Vec<Mutation>> {
1148 let (event_value, event_type) = bytecode.proto_router.decode(any)?;
1149 self.process_event(bytecode, event_value, &event_type, None, None)
1150 }
1151
1152 #[cfg_attr(feature = "otel", instrument(
1153 name = "vm.execute_handler",
1154 skip(self, handler, event_value, entity_evaluator),
1155 level = "debug",
1156 fields(
1157 event_type = %event_type,
1158 handler_opcodes = handler.len(),
1159 )
1160 ))]
1161 #[allow(clippy::type_complexity)]
1162 fn execute_handler(
1163 &mut self,
1164 handler: &[OpCode],
1165 event_value: &Value,
1166 event_type: &str,
1167 override_state_id: u32,
1168 entity_name: &str,
1169 entity_evaluator: Option<&Box<dyn Fn(&mut Value) -> Result<()> + Send + Sync>>,
1170 ) -> Result<Vec<Mutation>> {
1171 self.reset_registers();
1172 self.last_pda_lookup_miss = None;
1173
1174 let mut pc: usize = 0;
1175 let mut output = Vec::new();
1176 let mut dirty_tracker = DirtyTracker::new();
1177
1178 while pc < handler.len() {
1179 match &handler[pc] {
1180 OpCode::LoadEventField {
1181 path,
1182 dest,
1183 default,
1184 } => {
1185 let value = self.load_field(event_value, path, default.as_ref())?;
1186 self.registers[*dest] = value;
1187 pc += 1;
1188 }
1189 OpCode::LoadConstant { value, dest } => {
1190 self.registers[*dest] = value.clone();
1191 pc += 1;
1192 }
1193 OpCode::CopyRegister { source, dest } => {
1194 self.registers[*dest] = self.registers[*source].clone();
1195 pc += 1;
1196 }
1197 OpCode::CopyRegisterIfNull { source, dest } => {
1198 if self.registers[*dest].is_null() {
1199 self.registers[*dest] = self.registers[*source].clone();
1200 }
1201 pc += 1;
1202 }
1203 OpCode::GetEventType { dest } => {
1204 self.registers[*dest] = json!(event_type);
1205 pc += 1;
1206 }
1207 OpCode::CreateObject { dest } => {
1208 self.registers[*dest] = json!({});
1209 pc += 1;
1210 }
1211 OpCode::SetField {
1212 object,
1213 path,
1214 value,
1215 } => {
1216 self.set_field_auto_vivify(*object, path, *value)?;
1217 dirty_tracker.mark_replaced(path);
1218 pc += 1;
1219 }
1220 OpCode::SetFields { object, fields } => {
1221 for (path, value_reg) in fields {
1222 self.set_field_auto_vivify(*object, path, *value_reg)?;
1223 dirty_tracker.mark_replaced(path);
1224 }
1225 pc += 1;
1226 }
1227 OpCode::GetField { object, path, dest } => {
1228 let value = self.get_field(*object, path)?;
1229 self.registers[*dest] = value;
1230 pc += 1;
1231 }
1232 OpCode::ReadOrInitState {
1233 state_id: _,
1234 key,
1235 default,
1236 dest,
1237 } => {
1238 let actual_state_id = override_state_id;
1239 let entity_name_owned = entity_name.to_string();
1240 self.states
1241 .entry(actual_state_id)
1242 .or_insert_with(|| StateTable {
1243 data: DashMap::new(),
1244 access_times: DashMap::new(),
1245 lookup_indexes: HashMap::new(),
1246 temporal_indexes: HashMap::new(),
1247 pda_reverse_lookups: HashMap::new(),
1248 pending_updates: DashMap::new(),
1249 pending_instruction_events: DashMap::new(),
1250 version_tracker: VersionTracker::new(),
1251 config: StateTableConfig::default(),
1252 entity_name: entity_name_owned,
1253 });
1254 let key_value = self.registers[*key].clone();
1255 let warn_null_key = key_value.is_null()
1256 && event_type.ends_with("State")
1257 && !event_type.ends_with("IxState");
1258
1259 if warn_null_key {
1260 self.add_warning(format!(
1261 "ReadOrInitState: key register {} is NULL for account state, event_type={}",
1262 key, event_type
1263 ));
1264 }
1265
1266 let state = self
1267 .states
1268 .get(&actual_state_id)
1269 .ok_or("State table not found")?;
1270
1271 if !key_value.is_null() {
1272 if let Some(ctx) = &self.current_context {
1273 let ordering_value = ctx.write_version.or(ctx.txn_index);
1274 if let (Some(slot), Some(version)) = (ctx.slot, ordering_value) {
1275 if !state.is_fresh_update(&key_value, event_type, slot, version) {
1276 self.add_warning(format!(
1277 "Stale update skipped: slot={}, version={}",
1278 slot, version
1279 ));
1280 return Ok(Vec::new());
1281 }
1282 }
1283 }
1284 }
1285 let value = state
1286 .get_and_touch(&key_value)
1287 .unwrap_or_else(|| default.clone());
1288
1289 self.registers[*dest] = value;
1290 pc += 1;
1291 }
1292 OpCode::UpdateState {
1293 state_id: _,
1294 key,
1295 value,
1296 } => {
1297 let actual_state_id = override_state_id;
1298 let state = self
1299 .states
1300 .get(&actual_state_id)
1301 .ok_or("State table not found")?;
1302 let key_value = self.registers[*key].clone();
1303 let value_data = self.registers[*value].clone();
1304
1305 state.insert_with_eviction(key_value, value_data);
1306 pc += 1;
1307 }
1308 OpCode::AppendToArray {
1309 object,
1310 path,
1311 value,
1312 } => {
1313 let appended_value = self.registers[*value].clone();
1314 let max_len = self
1315 .states
1316 .get(&override_state_id)
1317 .map(|s| s.max_array_length())
1318 .unwrap_or(DEFAULT_MAX_ARRAY_LENGTH);
1319 self.append_to_array(*object, path, *value, max_len)?;
1320 dirty_tracker.mark_appended(path, appended_value);
1321 pc += 1;
1322 }
1323 OpCode::GetCurrentTimestamp { dest } => {
1324 let timestamp = std::time::SystemTime::now()
1325 .duration_since(std::time::UNIX_EPOCH)
1326 .unwrap()
1327 .as_secs() as i64;
1328 self.registers[*dest] = json!(timestamp);
1329 pc += 1;
1330 }
1331 OpCode::CreateEvent { dest, event_value } => {
1332 let timestamp = std::time::SystemTime::now()
1333 .duration_since(std::time::UNIX_EPOCH)
1334 .unwrap()
1335 .as_secs() as i64;
1336
1337 let mut event_data = self.registers[*event_value].clone();
1339 if let Some(obj) = event_data.as_object_mut() {
1340 obj.remove("__update_context");
1341 }
1342
1343 let mut event = serde_json::Map::new();
1345 event.insert("timestamp".to_string(), json!(timestamp));
1346 event.insert("data".to_string(), event_data);
1347
1348 if let Some(ref ctx) = self.current_context {
1350 if let Some(slot) = ctx.slot {
1351 event.insert("slot".to_string(), json!(slot));
1352 }
1353 if let Some(ref signature) = ctx.signature {
1354 event.insert("signature".to_string(), json!(signature));
1355 }
1356 }
1357
1358 self.registers[*dest] = Value::Object(event);
1359 pc += 1;
1360 }
1361 OpCode::CreateCapture {
1362 dest,
1363 capture_value,
1364 } => {
1365 let timestamp = std::time::SystemTime::now()
1366 .duration_since(std::time::UNIX_EPOCH)
1367 .unwrap()
1368 .as_secs() as i64;
1369
1370 let capture_data = self.registers[*capture_value].clone();
1372
1373 let account_address = event_value
1375 .get("__account_address")
1376 .and_then(|v| v.as_str())
1377 .unwrap_or("")
1378 .to_string();
1379
1380 let mut capture = serde_json::Map::new();
1382 capture.insert("timestamp".to_string(), json!(timestamp));
1383 capture.insert("account_address".to_string(), json!(account_address));
1384 capture.insert("data".to_string(), capture_data);
1385
1386 if let Some(ref ctx) = self.current_context {
1388 if let Some(slot) = ctx.slot {
1389 capture.insert("slot".to_string(), json!(slot));
1390 }
1391 if let Some(ref signature) = ctx.signature {
1392 capture.insert("signature".to_string(), json!(signature));
1393 }
1394 }
1395
1396 self.registers[*dest] = Value::Object(capture);
1397 pc += 1;
1398 }
1399 OpCode::Transform {
1400 source,
1401 dest,
1402 transformation,
1403 } => {
1404 if source == dest {
1405 self.transform_in_place(*source, transformation)?;
1406 } else {
1407 let source_value = &self.registers[*source];
1408 let value = self.apply_transformation(source_value, transformation)?;
1409 self.registers[*dest] = value;
1410 }
1411 pc += 1;
1412 }
1413 OpCode::EmitMutation {
1414 entity_name,
1415 key,
1416 state,
1417 } => {
1418 let debug_computed = std::env::var("HYPERSTACK_DEBUG_COMPUTED").is_ok();
1419 let primary_key = self.registers[*key].clone();
1420
1421 if primary_key.is_null() || dirty_tracker.is_empty() {
1422 let reason = if dirty_tracker.is_empty() {
1423 "no_fields_modified"
1424 } else {
1425 "null_primary_key"
1426 };
1427 self.add_warning(format!(
1428 "Skipping mutation for entity '{}': {} (dirty_fields={})",
1429 entity_name,
1430 reason,
1431 dirty_tracker.len()
1432 ));
1433 } else {
1434 let patch =
1435 self.extract_partial_state_with_tracker(*state, &dirty_tracker)?;
1436
1437 if debug_computed {
1438 if let Some(results) = patch.get("results") {
1439 if results.get("rng").is_some()
1440 || results.get("winning_square").is_some()
1441 || results.get("did_hit_motherlode").is_some()
1442 {
1443 tracing::warn!(
1444 "[VM_EMIT_MUTATION] entity={} key={} patch.results: rng={:?} winning_square={:?} did_hit_motherlode={:?}",
1445 entity_name,
1446 primary_key,
1447 results.get("rng"),
1448 results.get("winning_square"),
1449 results.get("did_hit_motherlode")
1450 );
1451 }
1452 }
1453 }
1454
1455 let append = dirty_tracker.appended_paths();
1456 let mutation = Mutation {
1457 export: entity_name.clone(),
1458 key: primary_key,
1459 patch,
1460 append,
1461 };
1462 output.push(mutation);
1463 }
1464 pc += 1;
1465 }
1466 OpCode::SetFieldIfNull {
1467 object,
1468 path,
1469 value,
1470 } => {
1471 let was_set = self.set_field_if_null(*object, path, *value)?;
1472 if was_set {
1473 dirty_tracker.mark_replaced(path);
1474 }
1475 pc += 1;
1476 }
1477 OpCode::SetFieldMax {
1478 object,
1479 path,
1480 value,
1481 } => {
1482 let was_updated = self.set_field_max(*object, path, *value)?;
1483 if was_updated {
1484 dirty_tracker.mark_replaced(path);
1485 }
1486 pc += 1;
1487 }
1488 OpCode::UpdateTemporalIndex {
1489 state_id: _,
1490 index_name,
1491 lookup_value,
1492 primary_key,
1493 timestamp,
1494 } => {
1495 let actual_state_id = override_state_id;
1496 let state = self
1497 .states
1498 .get_mut(&actual_state_id)
1499 .ok_or("State table not found")?;
1500 let index = state
1501 .temporal_indexes
1502 .entry(index_name.clone())
1503 .or_insert_with(TemporalIndex::new);
1504
1505 let lookup_val = self.registers[*lookup_value].clone();
1506 let pk_val = self.registers[*primary_key].clone();
1507 let ts_val = if let Some(val) = self.registers[*timestamp].as_i64() {
1508 val
1509 } else if let Some(val) = self.registers[*timestamp].as_u64() {
1510 val as i64
1511 } else {
1512 return Err(format!(
1513 "Timestamp must be a number (i64 or u64), got: {:?}",
1514 self.registers[*timestamp]
1515 )
1516 .into());
1517 };
1518
1519 index.insert(lookup_val, pk_val, ts_val);
1520 pc += 1;
1521 }
1522 OpCode::LookupTemporalIndex {
1523 state_id: _,
1524 index_name,
1525 lookup_value,
1526 timestamp,
1527 dest,
1528 } => {
1529 let actual_state_id = override_state_id;
1530 let state = self
1531 .states
1532 .get(&actual_state_id)
1533 .ok_or("State table not found")?;
1534 let lookup_val = &self.registers[*lookup_value];
1535
1536 let result = if self.registers[*timestamp].is_null() {
1537 if let Some(index) = state.temporal_indexes.get(index_name) {
1538 index.lookup_latest(lookup_val).unwrap_or(Value::Null)
1539 } else {
1540 Value::Null
1541 }
1542 } else {
1543 let ts_val = if let Some(val) = self.registers[*timestamp].as_i64() {
1544 val
1545 } else if let Some(val) = self.registers[*timestamp].as_u64() {
1546 val as i64
1547 } else {
1548 return Err(format!(
1549 "Timestamp must be a number (i64 or u64), got: {:?}",
1550 self.registers[*timestamp]
1551 )
1552 .into());
1553 };
1554
1555 if let Some(index) = state.temporal_indexes.get(index_name) {
1556 index.lookup(lookup_val, ts_val).unwrap_or(Value::Null)
1557 } else {
1558 Value::Null
1559 }
1560 };
1561
1562 self.registers[*dest] = result;
1563 pc += 1;
1564 }
1565 OpCode::UpdateLookupIndex {
1566 state_id: _,
1567 index_name,
1568 lookup_value,
1569 primary_key,
1570 } => {
1571 let actual_state_id = override_state_id;
1572 let state = self
1573 .states
1574 .get_mut(&actual_state_id)
1575 .ok_or("State table not found")?;
1576 let index = state
1577 .lookup_indexes
1578 .entry(index_name.clone())
1579 .or_insert_with(LookupIndex::new);
1580
1581 let lookup_val = self.registers[*lookup_value].clone();
1582 let pk_val = self.registers[*primary_key].clone();
1583
1584 index.insert(lookup_val, pk_val);
1585 pc += 1;
1586 }
1587 OpCode::LookupIndex {
1588 state_id: _,
1589 index_name,
1590 lookup_value,
1591 dest,
1592 } => {
1593 let actual_state_id = override_state_id;
1594 let lookup_val = self.registers[*lookup_value].clone();
1595
1596 let result = {
1597 let state = self
1598 .states
1599 .get(&actual_state_id)
1600 .ok_or("State table not found")?;
1601
1602 if let Some(index) = state.lookup_indexes.get(index_name) {
1603 let found = index.lookup(&lookup_val).unwrap_or(Value::Null);
1604 #[cfg(feature = "otel")]
1605 if found.is_null() {
1606 crate::vm_metrics::record_lookup_index_miss(index_name);
1607 } else {
1608 crate::vm_metrics::record_lookup_index_hit(index_name);
1609 }
1610 found
1611 } else {
1612 Value::Null
1613 }
1614 };
1615
1616 let final_result = if result.is_null() {
1617 if let Some(pda_str) = lookup_val.as_str() {
1618 let state = self
1619 .states
1620 .get_mut(&actual_state_id)
1621 .ok_or("State table not found")?;
1622
1623 if let Some(pda_lookup) =
1624 state.pda_reverse_lookups.get_mut("default_pda_lookup")
1625 {
1626 if let Some(resolved) = pda_lookup.lookup(pda_str) {
1627 Value::String(resolved)
1628 } else {
1629 self.last_pda_lookup_miss = Some(pda_str.to_string());
1630 Value::Null
1631 }
1632 } else {
1633 self.last_pda_lookup_miss = Some(pda_str.to_string());
1634 Value::Null
1635 }
1636 } else {
1637 Value::Null
1638 }
1639 } else {
1640 result
1641 };
1642
1643 self.registers[*dest] = final_result;
1644 pc += 1;
1645 }
1646 OpCode::SetFieldSum {
1647 object,
1648 path,
1649 value,
1650 } => {
1651 let was_updated = self.set_field_sum(*object, path, *value)?;
1652 if was_updated {
1653 dirty_tracker.mark_replaced(path);
1654 }
1655 pc += 1;
1656 }
1657 OpCode::SetFieldIncrement { object, path } => {
1658 let was_updated = self.set_field_increment(*object, path)?;
1659 if was_updated {
1660 dirty_tracker.mark_replaced(path);
1661 }
1662 pc += 1;
1663 }
1664 OpCode::SetFieldMin {
1665 object,
1666 path,
1667 value,
1668 } => {
1669 let was_updated = self.set_field_min(*object, path, *value)?;
1670 if was_updated {
1671 dirty_tracker.mark_replaced(path);
1672 }
1673 pc += 1;
1674 }
1675 OpCode::AddToUniqueSet {
1676 state_id: _,
1677 set_name,
1678 value,
1679 count_object,
1680 count_path,
1681 } => {
1682 let value_to_add = self.registers[*value].clone();
1683
1684 let set_field_path = format!("__unique_set:{}", set_name);
1687
1688 let mut set: HashSet<Value> =
1690 if let Ok(existing) = self.get_field(*count_object, &set_field_path) {
1691 if !existing.is_null() {
1692 serde_json::from_value(existing).unwrap_or_default()
1693 } else {
1694 HashSet::new()
1695 }
1696 } else {
1697 HashSet::new()
1698 };
1699
1700 let was_new = set.insert(value_to_add);
1702
1703 let set_as_vec: Vec<Value> = set.iter().cloned().collect();
1705 self.registers[100] = serde_json::to_value(set_as_vec)?;
1706 self.set_field_auto_vivify(*count_object, &set_field_path, 100)?;
1707
1708 if was_new {
1710 self.registers[100] = Value::Number(serde_json::Number::from(set.len()));
1711 self.set_field_auto_vivify(*count_object, count_path, 100)?;
1712 dirty_tracker.mark_replaced(count_path);
1713 }
1714
1715 pc += 1;
1716 }
1717 OpCode::ConditionalSetField {
1718 object,
1719 path,
1720 value,
1721 condition_field,
1722 condition_op,
1723 condition_value,
1724 } => {
1725 let field_value = self.load_field(event_value, condition_field, None)?;
1726 let condition_met =
1727 self.evaluate_comparison(&field_value, condition_op, condition_value)?;
1728
1729 if condition_met {
1730 self.set_field_auto_vivify(*object, path, *value)?;
1731 dirty_tracker.mark_replaced(path);
1732 }
1733 pc += 1;
1734 }
1735 OpCode::ConditionalIncrement {
1736 object,
1737 path,
1738 condition_field,
1739 condition_op,
1740 condition_value,
1741 } => {
1742 let field_value = self.load_field(event_value, condition_field, None)?;
1743 let condition_met =
1744 self.evaluate_comparison(&field_value, condition_op, condition_value)?;
1745
1746 if condition_met {
1747 let was_updated = self.set_field_increment(*object, path)?;
1748 if was_updated {
1749 dirty_tracker.mark_replaced(path);
1750 }
1751 }
1752 pc += 1;
1753 }
1754 OpCode::EvaluateComputedFields {
1755 state,
1756 computed_paths,
1757 } => {
1758 let debug_computed = std::env::var("HYPERSTACK_DEBUG_COMPUTED").is_ok();
1759
1760 if let Some(evaluator) = entity_evaluator {
1761 let old_values: Vec<_> = computed_paths
1762 .iter()
1763 .map(|path| Self::get_value_at_path(&self.registers[*state], path))
1764 .collect();
1765
1766 if debug_computed {
1767 tracing::warn!(
1768 "[VM_EVAL_COMPUTED] entity={} BEFORE evaluator: {:?}",
1769 entity_name,
1770 computed_paths
1771 .iter()
1772 .zip(old_values.iter())
1773 .map(|(p, v)| format!("{}={:?}", p, v))
1774 .collect::<Vec<_>>()
1775 );
1776 }
1777
1778 let state_value = &mut self.registers[*state];
1779 let eval_result = evaluator(state_value);
1780
1781 if debug_computed {
1782 if let Err(ref e) = eval_result {
1783 tracing::error!(
1784 "[VM_EVAL_COMPUTED] entity={} evaluator FAILED: {:?}",
1785 entity_name,
1786 e
1787 );
1788 }
1789 }
1790
1791 if eval_result.is_ok() {
1792 for (path, old_value) in computed_paths.iter().zip(old_values.iter()) {
1793 let new_value =
1794 Self::get_value_at_path(&self.registers[*state], path);
1795
1796 if debug_computed {
1797 tracing::warn!(
1798 "[VM_EVAL_COMPUTED] entity={} path={} old={:?} new={:?} changed={}",
1799 entity_name,
1800 path,
1801 old_value,
1802 new_value,
1803 new_value != *old_value
1804 );
1805 }
1806
1807 if new_value != *old_value {
1808 dirty_tracker.mark_replaced(path);
1809 }
1810 }
1811 }
1812 } else if debug_computed {
1813 tracing::warn!(
1814 "[VM_EVAL_COMPUTED] entity={} NO EVALUATOR - skipping computed fields",
1815 entity_name
1816 );
1817 }
1818 pc += 1;
1819 }
1820 OpCode::UpdatePdaReverseLookup {
1821 state_id: _,
1822 lookup_name,
1823 pda_address,
1824 primary_key,
1825 } => {
1826 let actual_state_id = override_state_id;
1827 let state = self
1828 .states
1829 .get_mut(&actual_state_id)
1830 .ok_or("State table not found")?;
1831
1832 let pda_val = self.registers[*pda_address].clone();
1833 let pk_val = self.registers[*primary_key].clone();
1834
1835 if let (Some(pda_str), Some(pk_str)) = (pda_val.as_str(), pk_val.as_str()) {
1836 let pda_lookup = state
1837 .pda_reverse_lookups
1838 .entry(lookup_name.clone())
1839 .or_insert_with(|| {
1840 PdaReverseLookup::new(DEFAULT_MAX_PDA_REVERSE_LOOKUP_ENTRIES)
1841 });
1842
1843 pda_lookup.insert(pda_str.to_string(), pk_str.to_string());
1844 self.last_pda_registered = Some(pda_str.to_string());
1845 } else if !pk_val.is_null() {
1846 if let Some(pk_num) = pk_val.as_u64() {
1847 if let Some(pda_str) = pda_val.as_str() {
1848 let pda_lookup = state
1849 .pda_reverse_lookups
1850 .entry(lookup_name.clone())
1851 .or_insert_with(|| {
1852 PdaReverseLookup::new(
1853 DEFAULT_MAX_PDA_REVERSE_LOOKUP_ENTRIES,
1854 )
1855 });
1856
1857 pda_lookup.insert(pda_str.to_string(), pk_num.to_string());
1858 self.last_pda_registered = Some(pda_str.to_string());
1859 }
1860 }
1861 }
1862
1863 pc += 1;
1864 }
1865 }
1866
1867 self.instructions_executed += 1;
1868 }
1869
1870 Ok(output)
1871 }
1872
1873 fn load_field(
1874 &self,
1875 event_value: &Value,
1876 path: &FieldPath,
1877 default: Option<&Value>,
1878 ) -> Result<Value> {
1879 if path.segments.is_empty() {
1880 if let Some(obj) = event_value.as_object() {
1881 let filtered: serde_json::Map<String, Value> = obj
1882 .iter()
1883 .filter(|(k, _)| !k.starts_with("__"))
1884 .map(|(k, v)| (k.clone(), v.clone()))
1885 .collect();
1886 return Ok(Value::Object(filtered));
1887 }
1888 return Ok(event_value.clone());
1889 }
1890
1891 let mut current = event_value;
1892 for segment in path.segments.iter() {
1893 current = match current.get(segment) {
1894 Some(v) => v,
1895 None => return Ok(default.cloned().unwrap_or(Value::Null)),
1896 };
1897 }
1898
1899 Ok(current.clone())
1900 }
1901
1902 fn get_value_at_path(value: &Value, path: &str) -> Option<Value> {
1903 let mut current = value;
1904 for segment in path.split('.') {
1905 current = current.get(segment)?;
1906 }
1907 Some(current.clone())
1908 }
1909
1910 fn set_field_auto_vivify(
1911 &mut self,
1912 object_reg: Register,
1913 path: &str,
1914 value_reg: Register,
1915 ) -> Result<()> {
1916 let compiled = self.get_compiled_path(path);
1917 let segments = compiled.segments();
1918 let value = self.registers[value_reg].clone();
1919
1920 if !self.registers[object_reg].is_object() {
1921 self.registers[object_reg] = json!({});
1922 }
1923
1924 let obj = self.registers[object_reg]
1925 .as_object_mut()
1926 .ok_or("Not an object")?;
1927
1928 let mut current = obj;
1929 for (i, segment) in segments.iter().enumerate() {
1930 if i == segments.len() - 1 {
1931 current.insert(segment.to_string(), value);
1932 return Ok(());
1933 } else {
1934 current
1935 .entry(segment.to_string())
1936 .or_insert_with(|| json!({}));
1937 current = current
1938 .get_mut(segment)
1939 .and_then(|v| v.as_object_mut())
1940 .ok_or("Path collision: expected object")?;
1941 }
1942 }
1943
1944 Ok(())
1945 }
1946
1947 fn set_field_if_null(
1948 &mut self,
1949 object_reg: Register,
1950 path: &str,
1951 value_reg: Register,
1952 ) -> Result<bool> {
1953 let compiled = self.get_compiled_path(path);
1954 let segments = compiled.segments();
1955 let value = self.registers[value_reg].clone();
1956
1957 if !self.registers[object_reg].is_object() {
1958 self.registers[object_reg] = json!({});
1959 }
1960
1961 let obj = self.registers[object_reg]
1962 .as_object_mut()
1963 .ok_or("Not an object")?;
1964
1965 let mut current = obj;
1966 for (i, segment) in segments.iter().enumerate() {
1967 if i == segments.len() - 1 {
1968 if !current.contains_key(segment) || current.get(segment).unwrap().is_null() {
1969 current.insert(segment.to_string(), value);
1970 return Ok(true);
1971 }
1972 return Ok(false);
1973 } else {
1974 current
1975 .entry(segment.to_string())
1976 .or_insert_with(|| json!({}));
1977 current = current
1978 .get_mut(segment)
1979 .and_then(|v| v.as_object_mut())
1980 .ok_or("Path collision: expected object")?;
1981 }
1982 }
1983
1984 Ok(false)
1985 }
1986
1987 fn set_field_max(
1988 &mut self,
1989 object_reg: Register,
1990 path: &str,
1991 value_reg: Register,
1992 ) -> Result<bool> {
1993 let compiled = self.get_compiled_path(path);
1994 let segments = compiled.segments();
1995 let new_value = self.registers[value_reg].clone();
1996
1997 if !self.registers[object_reg].is_object() {
1998 self.registers[object_reg] = json!({});
1999 }
2000
2001 let obj = self.registers[object_reg]
2002 .as_object_mut()
2003 .ok_or("Not an object")?;
2004
2005 let mut current = obj;
2006 for (i, segment) in segments.iter().enumerate() {
2007 if i == segments.len() - 1 {
2008 let should_update = if let Some(current_value) = current.get(segment) {
2009 if current_value.is_null() {
2010 true
2011 } else {
2012 match (current_value.as_i64(), new_value.as_i64()) {
2013 (Some(current_val), Some(new_val)) => new_val > current_val,
2014 (Some(current_val), None) if new_value.as_u64().is_some() => {
2015 new_value.as_u64().unwrap() as i64 > current_val
2016 }
2017 (None, Some(new_val)) if current_value.as_u64().is_some() => {
2018 new_val > current_value.as_u64().unwrap() as i64
2019 }
2020 (None, None) => match (current_value.as_u64(), new_value.as_u64()) {
2021 (Some(current_val), Some(new_val)) => new_val > current_val,
2022 _ => match (current_value.as_f64(), new_value.as_f64()) {
2023 (Some(current_val), Some(new_val)) => new_val > current_val,
2024 _ => false,
2025 },
2026 },
2027 _ => false,
2028 }
2029 }
2030 } else {
2031 true
2032 };
2033
2034 if should_update {
2035 current.insert(segment.to_string(), new_value);
2036 return Ok(true);
2037 }
2038 return Ok(false);
2039 } else {
2040 current
2041 .entry(segment.to_string())
2042 .or_insert_with(|| json!({}));
2043 current = current
2044 .get_mut(segment)
2045 .and_then(|v| v.as_object_mut())
2046 .ok_or("Path collision: expected object")?;
2047 }
2048 }
2049
2050 Ok(false)
2051 }
2052
2053 fn set_field_sum(
2054 &mut self,
2055 object_reg: Register,
2056 path: &str,
2057 value_reg: Register,
2058 ) -> Result<bool> {
2059 let compiled = self.get_compiled_path(path);
2060 let segments = compiled.segments();
2061 let new_value = &self.registers[value_reg];
2062
2063 let new_val_num = new_value
2065 .as_i64()
2066 .or_else(|| new_value.as_u64().map(|n| n as i64))
2067 .ok_or("Sum requires numeric value")?;
2068
2069 if !self.registers[object_reg].is_object() {
2070 self.registers[object_reg] = json!({});
2071 }
2072
2073 let obj = self.registers[object_reg]
2074 .as_object_mut()
2075 .ok_or("Not an object")?;
2076
2077 let mut current = obj;
2078 for (i, segment) in segments.iter().enumerate() {
2079 if i == segments.len() - 1 {
2080 let current_val = current
2081 .get(segment)
2082 .and_then(|v| {
2083 if v.is_null() {
2084 None
2085 } else {
2086 v.as_i64().or_else(|| v.as_u64().map(|n| n as i64))
2087 }
2088 })
2089 .unwrap_or(0);
2090
2091 let sum = current_val + new_val_num;
2092 current.insert(segment.to_string(), json!(sum));
2093 return Ok(true);
2094 } else {
2095 current
2096 .entry(segment.to_string())
2097 .or_insert_with(|| json!({}));
2098 current = current
2099 .get_mut(segment)
2100 .and_then(|v| v.as_object_mut())
2101 .ok_or("Path collision: expected object")?;
2102 }
2103 }
2104
2105 Ok(false)
2106 }
2107
2108 fn set_field_increment(&mut self, object_reg: Register, path: &str) -> Result<bool> {
2109 let compiled = self.get_compiled_path(path);
2110 let segments = compiled.segments();
2111
2112 if !self.registers[object_reg].is_object() {
2113 self.registers[object_reg] = json!({});
2114 }
2115
2116 let obj = self.registers[object_reg]
2117 .as_object_mut()
2118 .ok_or("Not an object")?;
2119
2120 let mut current = obj;
2121 for (i, segment) in segments.iter().enumerate() {
2122 if i == segments.len() - 1 {
2123 let current_val = current
2125 .get(segment)
2126 .and_then(|v| {
2127 if v.is_null() {
2128 None
2129 } else {
2130 v.as_i64().or_else(|| v.as_u64().map(|n| n as i64))
2131 }
2132 })
2133 .unwrap_or(0);
2134
2135 let incremented = current_val + 1;
2136 current.insert(segment.to_string(), json!(incremented));
2137 return Ok(true);
2138 } else {
2139 current
2140 .entry(segment.to_string())
2141 .or_insert_with(|| json!({}));
2142 current = current
2143 .get_mut(segment)
2144 .and_then(|v| v.as_object_mut())
2145 .ok_or("Path collision: expected object")?;
2146 }
2147 }
2148
2149 Ok(false)
2150 }
2151
2152 fn set_field_min(
2153 &mut self,
2154 object_reg: Register,
2155 path: &str,
2156 value_reg: Register,
2157 ) -> Result<bool> {
2158 let compiled = self.get_compiled_path(path);
2159 let segments = compiled.segments();
2160 let new_value = self.registers[value_reg].clone();
2161
2162 if !self.registers[object_reg].is_object() {
2163 self.registers[object_reg] = json!({});
2164 }
2165
2166 let obj = self.registers[object_reg]
2167 .as_object_mut()
2168 .ok_or("Not an object")?;
2169
2170 let mut current = obj;
2171 for (i, segment) in segments.iter().enumerate() {
2172 if i == segments.len() - 1 {
2173 let should_update = if let Some(current_value) = current.get(segment) {
2174 if current_value.is_null() {
2175 true
2176 } else {
2177 match (current_value.as_i64(), new_value.as_i64()) {
2178 (Some(current_val), Some(new_val)) => new_val < current_val,
2179 (Some(current_val), None) if new_value.as_u64().is_some() => {
2180 (new_value.as_u64().unwrap() as i64) < current_val
2181 }
2182 (None, Some(new_val)) if current_value.as_u64().is_some() => {
2183 new_val < current_value.as_u64().unwrap() as i64
2184 }
2185 (None, None) => match (current_value.as_u64(), new_value.as_u64()) {
2186 (Some(current_val), Some(new_val)) => new_val < current_val,
2187 _ => match (current_value.as_f64(), new_value.as_f64()) {
2188 (Some(current_val), Some(new_val)) => new_val < current_val,
2189 _ => false,
2190 },
2191 },
2192 _ => false,
2193 }
2194 }
2195 } else {
2196 true
2197 };
2198
2199 if should_update {
2200 current.insert(segment.to_string(), new_value);
2201 return Ok(true);
2202 }
2203 return Ok(false);
2204 } else {
2205 current
2206 .entry(segment.to_string())
2207 .or_insert_with(|| json!({}));
2208 current = current
2209 .get_mut(segment)
2210 .and_then(|v| v.as_object_mut())
2211 .ok_or("Path collision: expected object")?;
2212 }
2213 }
2214
2215 Ok(false)
2216 }
2217
2218 fn get_field(&mut self, object_reg: Register, path: &str) -> Result<Value> {
2219 let compiled = self.get_compiled_path(path);
2220 let segments = compiled.segments();
2221 let mut current = &self.registers[object_reg];
2222
2223 for segment in segments {
2224 current = current
2225 .get(segment)
2226 .ok_or_else(|| format!("Field not found: {}", segment))?;
2227 }
2228
2229 Ok(current.clone())
2230 }
2231
2232 fn append_to_array(
2233 &mut self,
2234 object_reg: Register,
2235 path: &str,
2236 value_reg: Register,
2237 max_length: usize,
2238 ) -> Result<()> {
2239 let compiled = self.get_compiled_path(path);
2240 let segments = compiled.segments();
2241 let value = self.registers[value_reg].clone();
2242
2243 if !self.registers[object_reg].is_object() {
2244 self.registers[object_reg] = json!({});
2245 }
2246
2247 let obj = self.registers[object_reg]
2248 .as_object_mut()
2249 .ok_or("Not an object")?;
2250
2251 let mut current = obj;
2252 for (i, segment) in segments.iter().enumerate() {
2253 if i == segments.len() - 1 {
2254 current
2255 .entry(segment.to_string())
2256 .or_insert_with(|| json!([]));
2257 let arr = current
2258 .get_mut(segment)
2259 .and_then(|v| v.as_array_mut())
2260 .ok_or("Path is not an array")?;
2261 arr.push(value.clone());
2262
2263 if arr.len() > max_length {
2264 let excess = arr.len() - max_length;
2265 arr.drain(0..excess);
2266 }
2267 } else {
2268 current
2269 .entry(segment.to_string())
2270 .or_insert_with(|| json!({}));
2271 current = current
2272 .get_mut(segment)
2273 .and_then(|v| v.as_object_mut())
2274 .ok_or("Path collision: expected object")?;
2275 }
2276 }
2277
2278 Ok(())
2279 }
2280
2281 fn transform_in_place(&mut self, reg: Register, transformation: &Transformation) -> Result<()> {
2282 let value = &self.registers[reg];
2283 let transformed = self.apply_transformation(value, transformation)?;
2284 self.registers[reg] = transformed;
2285 Ok(())
2286 }
2287
2288 fn apply_transformation(
2289 &self,
2290 value: &Value,
2291 transformation: &Transformation,
2292 ) -> Result<Value> {
2293 match transformation {
2294 Transformation::HexEncode => {
2295 if let Some(arr) = value.as_array() {
2296 let bytes: Vec<u8> = arr
2297 .iter()
2298 .filter_map(|v| v.as_u64().map(|n| n as u8))
2299 .collect();
2300 let hex = hex::encode(&bytes);
2301 Ok(json!(hex))
2302 } else {
2303 Err("HexEncode requires an array of numbers".into())
2304 }
2305 }
2306 Transformation::HexDecode => {
2307 if let Some(s) = value.as_str() {
2308 let s = s.strip_prefix("0x").unwrap_or(s);
2309 let bytes = hex::decode(s).map_err(|e| format!("Hex decode error: {}", e))?;
2310 Ok(json!(bytes))
2311 } else {
2312 Err("HexDecode requires a string".into())
2313 }
2314 }
2315 Transformation::Base58Encode => {
2316 if let Some(arr) = value.as_array() {
2317 let bytes: Vec<u8> = arr
2318 .iter()
2319 .filter_map(|v| v.as_u64().map(|n| n as u8))
2320 .collect();
2321 let encoded = bs58::encode(&bytes).into_string();
2322 Ok(json!(encoded))
2323 } else if value.is_string() {
2324 Ok(value.clone())
2325 } else {
2326 Err("Base58Encode requires an array of numbers".into())
2327 }
2328 }
2329 Transformation::Base58Decode => {
2330 if let Some(s) = value.as_str() {
2331 let bytes = bs58::decode(s)
2332 .into_vec()
2333 .map_err(|e| format!("Base58 decode error: {}", e))?;
2334 Ok(json!(bytes))
2335 } else {
2336 Err("Base58Decode requires a string".into())
2337 }
2338 }
2339 Transformation::ToString => Ok(json!(value.to_string())),
2340 Transformation::ToNumber => {
2341 if let Some(s) = value.as_str() {
2342 let n = s
2343 .parse::<i64>()
2344 .map_err(|e| format!("Parse error: {}", e))?;
2345 Ok(json!(n))
2346 } else {
2347 Ok(value.clone())
2348 }
2349 }
2350 }
2351 }
2352
2353 fn evaluate_comparison(
2354 &self,
2355 field_value: &Value,
2356 op: &ComparisonOp,
2357 condition_value: &Value,
2358 ) -> Result<bool> {
2359 use ComparisonOp::*;
2360
2361 match op {
2362 Equal => Ok(field_value == condition_value),
2363 NotEqual => Ok(field_value != condition_value),
2364 GreaterThan => {
2365 match (field_value.as_i64(), condition_value.as_i64()) {
2367 (Some(a), Some(b)) => Ok(a > b),
2368 _ => match (field_value.as_u64(), condition_value.as_u64()) {
2369 (Some(a), Some(b)) => Ok(a > b),
2370 _ => match (field_value.as_f64(), condition_value.as_f64()) {
2371 (Some(a), Some(b)) => Ok(a > b),
2372 _ => Err("Cannot compare non-numeric values with GreaterThan".into()),
2373 },
2374 },
2375 }
2376 }
2377 GreaterThanOrEqual => match (field_value.as_i64(), condition_value.as_i64()) {
2378 (Some(a), Some(b)) => Ok(a >= b),
2379 _ => match (field_value.as_u64(), condition_value.as_u64()) {
2380 (Some(a), Some(b)) => Ok(a >= b),
2381 _ => match (field_value.as_f64(), condition_value.as_f64()) {
2382 (Some(a), Some(b)) => Ok(a >= b),
2383 _ => {
2384 Err("Cannot compare non-numeric values with GreaterThanOrEqual".into())
2385 }
2386 },
2387 },
2388 },
2389 LessThan => match (field_value.as_i64(), condition_value.as_i64()) {
2390 (Some(a), Some(b)) => Ok(a < b),
2391 _ => match (field_value.as_u64(), condition_value.as_u64()) {
2392 (Some(a), Some(b)) => Ok(a < b),
2393 _ => match (field_value.as_f64(), condition_value.as_f64()) {
2394 (Some(a), Some(b)) => Ok(a < b),
2395 _ => Err("Cannot compare non-numeric values with LessThan".into()),
2396 },
2397 },
2398 },
2399 LessThanOrEqual => match (field_value.as_i64(), condition_value.as_i64()) {
2400 (Some(a), Some(b)) => Ok(a <= b),
2401 _ => match (field_value.as_u64(), condition_value.as_u64()) {
2402 (Some(a), Some(b)) => Ok(a <= b),
2403 _ => match (field_value.as_f64(), condition_value.as_f64()) {
2404 (Some(a), Some(b)) => Ok(a <= b),
2405 _ => Err("Cannot compare non-numeric values with LessThanOrEqual".into()),
2406 },
2407 },
2408 },
2409 }
2410 }
2411
2412 #[cfg_attr(feature = "otel", instrument(
2421 name = "vm.update_pda_lookup",
2422 skip(self),
2423 fields(
2424 pda = %pda_address,
2425 seed = %seed_value,
2426 )
2427 ))]
2428 pub fn update_pda_reverse_lookup(
2429 &mut self,
2430 state_id: u32,
2431 lookup_name: &str,
2432 pda_address: String,
2433 seed_value: String,
2434 ) -> Result<Vec<PendingAccountUpdate>> {
2435 let state = self
2436 .states
2437 .get_mut(&state_id)
2438 .ok_or("State table not found")?;
2439
2440 let lookup = state
2441 .pda_reverse_lookups
2442 .entry(lookup_name.to_string())
2443 .or_insert_with(|| PdaReverseLookup::new(DEFAULT_MAX_PDA_REVERSE_LOOKUP_ENTRIES));
2444
2445 let evicted_pda = lookup.insert(pda_address.clone(), seed_value);
2446
2447 if let Some(ref evicted) = evicted_pda {
2448 if let Some((_, evicted_updates)) = state.pending_updates.remove(evicted) {
2449 let count = evicted_updates.len();
2450 self.pending_queue_size = self.pending_queue_size.saturating_sub(count as u64);
2451 }
2452 }
2453
2454 self.flush_pending_updates(state_id, &pda_address)
2456 }
2457
2458 pub fn cleanup_expired_pending_updates(&mut self, state_id: u32) -> usize {
2463 let state = match self.states.get_mut(&state_id) {
2464 Some(s) => s,
2465 None => return 0,
2466 };
2467
2468 let now = std::time::SystemTime::now()
2469 .duration_since(std::time::UNIX_EPOCH)
2470 .unwrap()
2471 .as_secs() as i64;
2472
2473 let mut removed_count = 0;
2474
2475 state.pending_updates.retain(|_pda_address, updates| {
2477 let original_len = updates.len();
2478
2479 updates.retain(|update| {
2480 let age = now - update.queued_at;
2481 age <= PENDING_UPDATE_TTL_SECONDS
2482 });
2483
2484 removed_count += original_len - updates.len();
2485
2486 !updates.is_empty()
2488 });
2489
2490 self.pending_queue_size = self.pending_queue_size.saturating_sub(removed_count as u64);
2492
2493 if removed_count > 0 {
2494 #[cfg(feature = "otel")]
2495 crate::vm_metrics::record_pending_updates_expired(
2496 removed_count as u64,
2497 &state.entity_name,
2498 );
2499 }
2500
2501 removed_count
2502 }
2503
2504 #[cfg_attr(feature = "otel", instrument(
2536 name = "vm.queue_account_update",
2537 skip(self, update),
2538 fields(
2539 pda = %update.pda_address,
2540 account_type = %update.account_type,
2541 slot = update.slot,
2542 )
2543 ))]
2544 pub fn queue_account_update(
2545 &mut self,
2546 state_id: u32,
2547 update: QueuedAccountUpdate,
2548 ) -> Result<()> {
2549 if self.pending_queue_size >= MAX_PENDING_UPDATES_TOTAL as u64 {
2550 self.cleanup_expired_pending_updates(state_id);
2551 if self.pending_queue_size >= MAX_PENDING_UPDATES_TOTAL as u64 {
2552 self.drop_oldest_pending_update(state_id)?;
2553 }
2554 }
2555
2556 let state = self
2557 .states
2558 .get_mut(&state_id)
2559 .ok_or("State table not found")?;
2560
2561 let pending = PendingAccountUpdate {
2562 account_type: update.account_type,
2563 pda_address: update.pda_address.clone(),
2564 account_data: update.account_data,
2565 slot: update.slot,
2566 write_version: update.write_version,
2567 signature: update.signature,
2568 queued_at: std::time::SystemTime::now()
2569 .duration_since(std::time::UNIX_EPOCH)
2570 .unwrap()
2571 .as_secs() as i64,
2572 };
2573
2574 let pda_address = pending.pda_address.clone();
2575 let slot = pending.slot;
2576
2577 let mut updates = state
2578 .pending_updates
2579 .entry(pda_address.clone())
2580 .or_insert_with(Vec::new);
2581
2582 let original_len = updates.len();
2583 updates.retain(|existing| existing.slot > slot);
2584 let removed_by_dedup = original_len - updates.len();
2585
2586 if removed_by_dedup > 0 {
2587 self.pending_queue_size = self
2588 .pending_queue_size
2589 .saturating_sub(removed_by_dedup as u64);
2590 }
2591
2592 if updates.len() >= MAX_PENDING_UPDATES_PER_PDA {
2593 updates.remove(0);
2594 self.pending_queue_size = self.pending_queue_size.saturating_sub(1);
2595 }
2596
2597 updates.push(pending);
2598 #[cfg(feature = "otel")]
2599 crate::vm_metrics::record_pending_update_queued(&state.entity_name);
2600
2601 Ok(())
2602 }
2603
2604 pub fn queue_instruction_event(
2605 &mut self,
2606 state_id: u32,
2607 event: QueuedInstructionEvent,
2608 ) -> Result<()> {
2609 let state = self
2610 .states
2611 .get_mut(&state_id)
2612 .ok_or("State table not found")?;
2613
2614 let pda_address = event.pda_address.clone();
2615
2616 let pending = PendingInstructionEvent {
2617 event_type: event.event_type,
2618 pda_address: event.pda_address,
2619 event_data: event.event_data,
2620 slot: event.slot,
2621 signature: event.signature,
2622 queued_at: std::time::SystemTime::now()
2623 .duration_since(std::time::UNIX_EPOCH)
2624 .unwrap()
2625 .as_secs() as i64,
2626 };
2627
2628 let mut events = state
2629 .pending_instruction_events
2630 .entry(pda_address)
2631 .or_insert_with(Vec::new);
2632
2633 if events.len() >= MAX_PENDING_UPDATES_PER_PDA {
2634 events.remove(0);
2635 }
2636
2637 events.push(pending);
2638
2639 Ok(())
2640 }
2641
2642 pub fn take_last_pda_lookup_miss(&mut self) -> Option<String> {
2643 self.last_pda_lookup_miss.take()
2644 }
2645
2646 pub fn take_last_pda_registered(&mut self) -> Option<String> {
2647 self.last_pda_registered.take()
2648 }
2649
2650 pub fn flush_pending_instruction_events(
2651 &mut self,
2652 state_id: u32,
2653 pda_address: &str,
2654 ) -> Vec<PendingInstructionEvent> {
2655 let state = match self.states.get_mut(&state_id) {
2656 Some(s) => s,
2657 None => return Vec::new(),
2658 };
2659
2660 if let Some((_, events)) = state.pending_instruction_events.remove(pda_address) {
2661 events
2662 } else {
2663 Vec::new()
2664 }
2665 }
2666
2667 pub fn get_pending_queue_stats(&self, state_id: u32) -> Option<PendingQueueStats> {
2669 let state = self.states.get(&state_id)?;
2670
2671 let now = std::time::SystemTime::now()
2672 .duration_since(std::time::UNIX_EPOCH)
2673 .unwrap()
2674 .as_secs() as i64;
2675
2676 let mut total_updates = 0;
2677 let mut oldest_timestamp = now;
2678 let mut largest_pda_queue = 0;
2679 let mut estimated_memory = 0;
2680
2681 for entry in state.pending_updates.iter() {
2682 let (_, updates) = entry.pair();
2683 total_updates += updates.len();
2684 largest_pda_queue = largest_pda_queue.max(updates.len());
2685
2686 for update in updates.iter() {
2687 oldest_timestamp = oldest_timestamp.min(update.queued_at);
2688 estimated_memory += update.account_type.len() +
2690 update.pda_address.len() +
2691 update.signature.len() +
2692 16 + estimate_json_size(&update.account_data);
2694 }
2695 }
2696
2697 Some(PendingQueueStats {
2698 total_updates,
2699 unique_pdas: state.pending_updates.len(),
2700 oldest_age_seconds: now - oldest_timestamp,
2701 largest_pda_queue_size: largest_pda_queue,
2702 estimated_memory_bytes: estimated_memory,
2703 })
2704 }
2705
2706 pub fn get_memory_stats(&self, state_id: u32) -> VmMemoryStats {
2707 let mut stats = VmMemoryStats {
2708 path_cache_size: self.path_cache.len(),
2709 ..Default::default()
2710 };
2711
2712 if let Some(state) = self.states.get(&state_id) {
2713 stats.state_table_entity_count = state.data.len();
2714 stats.state_table_max_entries = state.config.max_entries;
2715 stats.state_table_at_capacity = state.is_at_capacity();
2716
2717 stats.lookup_index_count = state.lookup_indexes.len();
2718 stats.lookup_index_total_entries =
2719 state.lookup_indexes.values().map(|idx| idx.len()).sum();
2720
2721 stats.temporal_index_count = state.temporal_indexes.len();
2722 stats.temporal_index_total_entries = state
2723 .temporal_indexes
2724 .values()
2725 .map(|idx| idx.total_entries())
2726 .sum();
2727
2728 stats.pda_reverse_lookup_count = state.pda_reverse_lookups.len();
2729 stats.pda_reverse_lookup_total_entries = state
2730 .pda_reverse_lookups
2731 .values()
2732 .map(|lookup| lookup.len())
2733 .sum();
2734
2735 stats.version_tracker_entries = state.version_tracker.len();
2736
2737 stats.pending_queue_stats = self.get_pending_queue_stats(state_id);
2738 }
2739
2740 stats
2741 }
2742
2743 pub fn cleanup_all_expired(&mut self, state_id: u32) -> CleanupResult {
2744 let pending_removed = self.cleanup_expired_pending_updates(state_id);
2745 let temporal_removed = self.cleanup_temporal_indexes(state_id);
2746
2747 #[cfg(feature = "otel")]
2748 if let Some(state) = self.states.get(&state_id) {
2749 crate::vm_metrics::record_cleanup(
2750 pending_removed,
2751 temporal_removed,
2752 &state.entity_name,
2753 );
2754 }
2755
2756 CleanupResult {
2757 pending_updates_removed: pending_removed,
2758 temporal_entries_removed: temporal_removed,
2759 }
2760 }
2761
2762 fn cleanup_temporal_indexes(&mut self, state_id: u32) -> usize {
2763 let state = match self.states.get_mut(&state_id) {
2764 Some(s) => s,
2765 None => return 0,
2766 };
2767
2768 let now = std::time::SystemTime::now()
2769 .duration_since(std::time::UNIX_EPOCH)
2770 .unwrap()
2771 .as_secs() as i64;
2772
2773 let cutoff = now - TEMPORAL_HISTORY_TTL_SECONDS;
2774 let mut total_removed = 0;
2775
2776 for (_, index) in state.temporal_indexes.iter_mut() {
2777 total_removed += index.cleanup_expired(cutoff);
2778 }
2779
2780 total_removed
2781 }
2782
2783 pub fn check_state_table_capacity(&self, state_id: u32) -> Option<CapacityWarning> {
2784 let state = self.states.get(&state_id)?;
2785
2786 if state.is_at_capacity() {
2787 Some(CapacityWarning {
2788 current_entries: state.data.len(),
2789 max_entries: state.config.max_entries,
2790 entries_over_limit: state.entries_over_limit(),
2791 })
2792 } else {
2793 None
2794 }
2795 }
2796
2797 fn drop_oldest_pending_update(&mut self, state_id: u32) -> Result<()> {
2799 let state = self
2800 .states
2801 .get_mut(&state_id)
2802 .ok_or("State table not found")?;
2803
2804 let mut oldest_pda: Option<String> = None;
2805 let mut oldest_timestamp = i64::MAX;
2806
2807 for entry in state.pending_updates.iter() {
2809 let (pda, updates) = entry.pair();
2810 if let Some(update) = updates.first() {
2811 if update.queued_at < oldest_timestamp {
2812 oldest_timestamp = update.queued_at;
2813 oldest_pda = Some(pda.clone());
2814 }
2815 }
2816 }
2817
2818 if let Some(pda) = oldest_pda {
2820 if let Some(mut updates) = state.pending_updates.get_mut(&pda) {
2821 if !updates.is_empty() {
2822 updates.remove(0);
2823 self.pending_queue_size = self.pending_queue_size.saturating_sub(1);
2824
2825 if updates.is_empty() {
2827 drop(updates);
2828 state.pending_updates.remove(&pda);
2829 }
2830 }
2831 }
2832 }
2833
2834 Ok(())
2835 }
2836
2837 fn flush_pending_updates(
2842 &mut self,
2843 state_id: u32,
2844 pda_address: &str,
2845 ) -> Result<Vec<PendingAccountUpdate>> {
2846 let state = self
2847 .states
2848 .get_mut(&state_id)
2849 .ok_or("State table not found")?;
2850
2851 if let Some((_, pending_updates)) = state.pending_updates.remove(pda_address) {
2852 let count = pending_updates.len();
2853 self.pending_queue_size = self.pending_queue_size.saturating_sub(count as u64);
2854 #[cfg(feature = "otel")]
2855 crate::vm_metrics::record_pending_updates_flushed(count as u64, &state.entity_name);
2856 Ok(pending_updates)
2857 } else {
2858 Ok(Vec::new())
2859 }
2860 }
2861
2862 pub fn try_pda_reverse_lookup(
2864 &mut self,
2865 state_id: u32,
2866 lookup_name: &str,
2867 pda_address: &str,
2868 ) -> Option<String> {
2869 let state = self.states.get_mut(&state_id)?;
2870
2871 if let Some(lookup) = state.pda_reverse_lookups.get_mut(lookup_name) {
2872 if let Some(value) = lookup.lookup(pda_address) {
2873 self.pda_cache_hits += 1;
2874 return Some(value);
2875 }
2876 }
2877
2878 self.pda_cache_misses += 1;
2879 None
2880 }
2881
2882 pub fn evaluate_computed_expr(&self, expr: &ComputedExpr, state: &Value) -> Result<Value> {
2889 self.evaluate_computed_expr_with_env(expr, state, &std::collections::HashMap::new())
2890 }
2891
2892 fn evaluate_computed_expr_with_env(
2894 &self,
2895 expr: &ComputedExpr,
2896 state: &Value,
2897 env: &std::collections::HashMap<String, Value>,
2898 ) -> Result<Value> {
2899 match expr {
2900 ComputedExpr::FieldRef { path } => self.get_field_from_state(state, path),
2901
2902 ComputedExpr::Var { name } => env
2903 .get(name)
2904 .cloned()
2905 .ok_or_else(|| format!("Undefined variable: {}", name).into()),
2906
2907 ComputedExpr::Let { name, value, body } => {
2908 let val = self.evaluate_computed_expr_with_env(value, state, env)?;
2909 let mut new_env = env.clone();
2910 new_env.insert(name.clone(), val);
2911 self.evaluate_computed_expr_with_env(body, state, &new_env)
2912 }
2913
2914 ComputedExpr::If {
2915 condition,
2916 then_branch,
2917 else_branch,
2918 } => {
2919 let cond_val = self.evaluate_computed_expr_with_env(condition, state, env)?;
2920 if self.value_to_bool(&cond_val) {
2921 self.evaluate_computed_expr_with_env(then_branch, state, env)
2922 } else {
2923 self.evaluate_computed_expr_with_env(else_branch, state, env)
2924 }
2925 }
2926
2927 ComputedExpr::None => Ok(Value::Null),
2928
2929 ComputedExpr::Some { value } => self.evaluate_computed_expr_with_env(value, state, env),
2930
2931 ComputedExpr::Slice { expr, start, end } => {
2932 let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
2933 match val {
2934 Value::Array(arr) => {
2935 let slice: Vec<Value> = arr.get(*start..*end).unwrap_or(&[]).to_vec();
2936 Ok(Value::Array(slice))
2937 }
2938 _ => Err(format!("Cannot slice non-array value: {:?}", val).into()),
2939 }
2940 }
2941
2942 ComputedExpr::Index { expr, index } => {
2943 let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
2944 match val {
2945 Value::Array(arr) => Ok(arr.get(*index).cloned().unwrap_or(Value::Null)),
2946 _ => Err(format!("Cannot index non-array value: {:?}", val).into()),
2947 }
2948 }
2949
2950 ComputedExpr::U64FromLeBytes { bytes } => {
2951 let val = self.evaluate_computed_expr_with_env(bytes, state, env)?;
2952 let byte_vec = self.value_to_bytes(&val)?;
2953 if byte_vec.len() < 8 {
2954 return Err(format!(
2955 "u64::from_le_bytes requires 8 bytes, got {}",
2956 byte_vec.len()
2957 )
2958 .into());
2959 }
2960 let arr: [u8; 8] = byte_vec[..8]
2961 .try_into()
2962 .map_err(|_| "Failed to convert to [u8; 8]")?;
2963 Ok(json!(u64::from_le_bytes(arr)))
2964 }
2965
2966 ComputedExpr::U64FromBeBytes { bytes } => {
2967 let val = self.evaluate_computed_expr_with_env(bytes, state, env)?;
2968 let byte_vec = self.value_to_bytes(&val)?;
2969 if byte_vec.len() < 8 {
2970 return Err(format!(
2971 "u64::from_be_bytes requires 8 bytes, got {}",
2972 byte_vec.len()
2973 )
2974 .into());
2975 }
2976 let arr: [u8; 8] = byte_vec[..8]
2977 .try_into()
2978 .map_err(|_| "Failed to convert to [u8; 8]")?;
2979 Ok(json!(u64::from_be_bytes(arr)))
2980 }
2981
2982 ComputedExpr::ByteArray { bytes } => {
2983 Ok(Value::Array(bytes.iter().map(|b| json!(*b)).collect()))
2984 }
2985
2986 ComputedExpr::Closure { param, body } => {
2987 Ok(json!({
2990 "__closure": {
2991 "param": param,
2992 "body": serde_json::to_value(body).unwrap_or(Value::Null)
2993 }
2994 }))
2995 }
2996
2997 ComputedExpr::Unary { op, expr } => {
2998 let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
2999 self.apply_unary_op(op, &val)
3000 }
3001
3002 ComputedExpr::JsonToBytes { expr } => {
3003 let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
3004 let bytes = self.value_to_bytes(&val)?;
3006 Ok(Value::Array(bytes.iter().map(|b| json!(*b)).collect()))
3007 }
3008
3009 ComputedExpr::UnwrapOr { expr, default } => {
3010 let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
3011 if val.is_null() {
3012 Ok(default.clone())
3013 } else {
3014 Ok(val)
3015 }
3016 }
3017
3018 ComputedExpr::Binary { op, left, right } => {
3019 let l = self.evaluate_computed_expr_with_env(left, state, env)?;
3020 let r = self.evaluate_computed_expr_with_env(right, state, env)?;
3021 self.apply_binary_op(op, &l, &r)
3022 }
3023
3024 ComputedExpr::Cast { expr, to_type } => {
3025 let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
3026 self.apply_cast(&val, to_type)
3027 }
3028
3029 ComputedExpr::MethodCall { expr, method, args } => {
3030 let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
3031 if method == "map" && args.len() == 1 {
3033 if let ComputedExpr::Closure { param, body } = &args[0] {
3034 if val.is_null() {
3036 return Ok(Value::Null);
3037 }
3038 let mut closure_env = env.clone();
3040 closure_env.insert(param.clone(), val);
3041 return self.evaluate_computed_expr_with_env(body, state, &closure_env);
3042 }
3043 }
3044 let evaluated_args: Vec<Value> = args
3045 .iter()
3046 .map(|a| self.evaluate_computed_expr_with_env(a, state, env))
3047 .collect::<Result<Vec<_>>>()?;
3048 self.apply_method_call(&val, method, &evaluated_args)
3049 }
3050
3051 ComputedExpr::Literal { value } => Ok(value.clone()),
3052
3053 ComputedExpr::Paren { expr } => self.evaluate_computed_expr_with_env(expr, state, env),
3054 }
3055 }
3056
3057 fn value_to_bytes(&self, val: &Value) -> Result<Vec<u8>> {
3059 match val {
3060 Value::Array(arr) => arr
3061 .iter()
3062 .map(|v| {
3063 v.as_u64()
3064 .map(|n| n as u8)
3065 .ok_or_else(|| "Array element not a valid byte".into())
3066 })
3067 .collect(),
3068 Value::String(s) => {
3069 if s.starts_with("0x") || s.starts_with("0X") {
3071 hex::decode(&s[2..]).map_err(|e| format!("Invalid hex string: {}", e).into())
3072 } else {
3073 hex::decode(s).map_err(|e| format!("Invalid hex string: {}", e).into())
3074 }
3075 }
3076 _ => Err(format!("Cannot convert {:?} to bytes", val).into()),
3077 }
3078 }
3079
3080 fn apply_unary_op(&self, op: &crate::ast::UnaryOp, val: &Value) -> Result<Value> {
3082 use crate::ast::UnaryOp;
3083 match op {
3084 UnaryOp::Not => Ok(json!(!self.value_to_bool(val))),
3085 UnaryOp::ReverseBits => match val.as_u64() {
3086 Some(n) => Ok(json!(n.reverse_bits())),
3087 None => match val.as_i64() {
3088 Some(n) => Ok(json!((n as u64).reverse_bits())),
3089 None => Err("reverse_bits requires an integer".into()),
3090 },
3091 },
3092 }
3093 }
3094
3095 fn get_field_from_state(&self, state: &Value, path: &str) -> Result<Value> {
3097 let segments: Vec<&str> = path.split('.').collect();
3098 let mut current = state;
3099
3100 for segment in segments {
3101 match current.get(segment) {
3102 Some(v) => current = v,
3103 None => return Ok(Value::Null),
3104 }
3105 }
3106
3107 Ok(current.clone())
3108 }
3109
3110 fn apply_binary_op(&self, op: &BinaryOp, left: &Value, right: &Value) -> Result<Value> {
3112 match op {
3113 BinaryOp::Add => self.numeric_op(left, right, |a, b| a + b, |a, b| a + b),
3115 BinaryOp::Sub => self.numeric_op(left, right, |a, b| a - b, |a, b| a - b),
3116 BinaryOp::Mul => self.numeric_op(left, right, |a, b| a * b, |a, b| a * b),
3117 BinaryOp::Div => {
3118 if let Some(r) = right.as_i64() {
3120 if r == 0 {
3121 return Err("Division by zero".into());
3122 }
3123 }
3124 if let Some(r) = right.as_f64() {
3125 if r == 0.0 {
3126 return Err("Division by zero".into());
3127 }
3128 }
3129 self.numeric_op(left, right, |a, b| a / b, |a, b| a / b)
3130 }
3131 BinaryOp::Mod => {
3132 match (left.as_i64(), right.as_i64()) {
3134 (Some(a), Some(b)) if b != 0 => Ok(json!(a % b)),
3135 (None, _) | (_, None) => match (left.as_u64(), right.as_u64()) {
3136 (Some(a), Some(b)) if b != 0 => Ok(json!(a % b)),
3137 _ => Err("Modulo requires non-zero integer operands".into()),
3138 },
3139 _ => Err("Modulo by zero".into()),
3140 }
3141 }
3142
3143 BinaryOp::Gt => self.comparison_op(left, right, |a, b| a > b, |a, b| a > b),
3145 BinaryOp::Lt => self.comparison_op(left, right, |a, b| a < b, |a, b| a < b),
3146 BinaryOp::Gte => self.comparison_op(left, right, |a, b| a >= b, |a, b| a >= b),
3147 BinaryOp::Lte => self.comparison_op(left, right, |a, b| a <= b, |a, b| a <= b),
3148 BinaryOp::Eq => Ok(json!(left == right)),
3149 BinaryOp::Ne => Ok(json!(left != right)),
3150
3151 BinaryOp::And => {
3153 let l_bool = self.value_to_bool(left);
3154 let r_bool = self.value_to_bool(right);
3155 Ok(json!(l_bool && r_bool))
3156 }
3157 BinaryOp::Or => {
3158 let l_bool = self.value_to_bool(left);
3159 let r_bool = self.value_to_bool(right);
3160 Ok(json!(l_bool || r_bool))
3161 }
3162
3163 BinaryOp::Xor => match (left.as_u64(), right.as_u64()) {
3165 (Some(a), Some(b)) => Ok(json!(a ^ b)),
3166 _ => match (left.as_i64(), right.as_i64()) {
3167 (Some(a), Some(b)) => Ok(json!(a ^ b)),
3168 _ => Err("XOR requires integer operands".into()),
3169 },
3170 },
3171 BinaryOp::BitAnd => match (left.as_u64(), right.as_u64()) {
3172 (Some(a), Some(b)) => Ok(json!(a & b)),
3173 _ => match (left.as_i64(), right.as_i64()) {
3174 (Some(a), Some(b)) => Ok(json!(a & b)),
3175 _ => Err("BitAnd requires integer operands".into()),
3176 },
3177 },
3178 BinaryOp::BitOr => match (left.as_u64(), right.as_u64()) {
3179 (Some(a), Some(b)) => Ok(json!(a | b)),
3180 _ => match (left.as_i64(), right.as_i64()) {
3181 (Some(a), Some(b)) => Ok(json!(a | b)),
3182 _ => Err("BitOr requires integer operands".into()),
3183 },
3184 },
3185 BinaryOp::Shl => match (left.as_u64(), right.as_u64()) {
3186 (Some(a), Some(b)) => Ok(json!(a << b)),
3187 _ => match (left.as_i64(), right.as_i64()) {
3188 (Some(a), Some(b)) => Ok(json!(a << b)),
3189 _ => Err("Shl requires integer operands".into()),
3190 },
3191 },
3192 BinaryOp::Shr => match (left.as_u64(), right.as_u64()) {
3193 (Some(a), Some(b)) => Ok(json!(a >> b)),
3194 _ => match (left.as_i64(), right.as_i64()) {
3195 (Some(a), Some(b)) => Ok(json!(a >> b)),
3196 _ => Err("Shr requires integer operands".into()),
3197 },
3198 },
3199 }
3200 }
3201
3202 fn numeric_op<F1, F2>(
3204 &self,
3205 left: &Value,
3206 right: &Value,
3207 int_op: F1,
3208 float_op: F2,
3209 ) -> Result<Value>
3210 where
3211 F1: Fn(i64, i64) -> i64,
3212 F2: Fn(f64, f64) -> f64,
3213 {
3214 if let (Some(a), Some(b)) = (left.as_i64(), right.as_i64()) {
3216 return Ok(json!(int_op(a, b)));
3217 }
3218
3219 if let (Some(a), Some(b)) = (left.as_u64(), right.as_u64()) {
3221 return Ok(json!(int_op(a as i64, b as i64)));
3223 }
3224
3225 if let (Some(a), Some(b)) = (left.as_f64(), right.as_f64()) {
3227 return Ok(json!(float_op(a, b)));
3228 }
3229
3230 if left.is_null() || right.is_null() {
3232 return Ok(Value::Null);
3233 }
3234
3235 Err(format!(
3236 "Cannot perform numeric operation on {:?} and {:?}",
3237 left, right
3238 )
3239 .into())
3240 }
3241
3242 fn comparison_op<F1, F2>(
3244 &self,
3245 left: &Value,
3246 right: &Value,
3247 int_cmp: F1,
3248 float_cmp: F2,
3249 ) -> Result<Value>
3250 where
3251 F1: Fn(i64, i64) -> bool,
3252 F2: Fn(f64, f64) -> bool,
3253 {
3254 if let (Some(a), Some(b)) = (left.as_i64(), right.as_i64()) {
3256 return Ok(json!(int_cmp(a, b)));
3257 }
3258
3259 if let (Some(a), Some(b)) = (left.as_u64(), right.as_u64()) {
3261 return Ok(json!(int_cmp(a as i64, b as i64)));
3262 }
3263
3264 if let (Some(a), Some(b)) = (left.as_f64(), right.as_f64()) {
3266 return Ok(json!(float_cmp(a, b)));
3267 }
3268
3269 if left.is_null() || right.is_null() {
3271 return Ok(json!(false));
3272 }
3273
3274 Err(format!("Cannot compare {:?} and {:?}", left, right).into())
3275 }
3276
3277 fn value_to_bool(&self, value: &Value) -> bool {
3279 match value {
3280 Value::Null => false,
3281 Value::Bool(b) => *b,
3282 Value::Number(n) => {
3283 if let Some(i) = n.as_i64() {
3284 i != 0
3285 } else if let Some(f) = n.as_f64() {
3286 f != 0.0
3287 } else {
3288 true
3289 }
3290 }
3291 Value::String(s) => !s.is_empty(),
3292 Value::Array(arr) => !arr.is_empty(),
3293 Value::Object(obj) => !obj.is_empty(),
3294 }
3295 }
3296
3297 fn apply_cast(&self, value: &Value, to_type: &str) -> Result<Value> {
3299 match to_type {
3300 "i8" | "i16" | "i32" | "i64" | "isize" => {
3301 if let Some(n) = value.as_i64() {
3302 Ok(json!(n))
3303 } else if let Some(n) = value.as_u64() {
3304 Ok(json!(n as i64))
3305 } else if let Some(n) = value.as_f64() {
3306 Ok(json!(n as i64))
3307 } else if let Some(s) = value.as_str() {
3308 s.parse::<i64>()
3309 .map(|n| json!(n))
3310 .map_err(|e| format!("Cannot parse '{}' as integer: {}", s, e).into())
3311 } else {
3312 Err(format!("Cannot cast {:?} to {}", value, to_type).into())
3313 }
3314 }
3315 "u8" | "u16" | "u32" | "u64" | "usize" => {
3316 if let Some(n) = value.as_u64() {
3317 Ok(json!(n))
3318 } else if let Some(n) = value.as_i64() {
3319 Ok(json!(n as u64))
3320 } else if let Some(n) = value.as_f64() {
3321 Ok(json!(n as u64))
3322 } else if let Some(s) = value.as_str() {
3323 s.parse::<u64>().map(|n| json!(n)).map_err(|e| {
3324 format!("Cannot parse '{}' as unsigned integer: {}", s, e).into()
3325 })
3326 } else {
3327 Err(format!("Cannot cast {:?} to {}", value, to_type).into())
3328 }
3329 }
3330 "f32" | "f64" => {
3331 if let Some(n) = value.as_f64() {
3332 Ok(json!(n))
3333 } else if let Some(n) = value.as_i64() {
3334 Ok(json!(n as f64))
3335 } else if let Some(n) = value.as_u64() {
3336 Ok(json!(n as f64))
3337 } else if let Some(s) = value.as_str() {
3338 s.parse::<f64>()
3339 .map(|n| json!(n))
3340 .map_err(|e| format!("Cannot parse '{}' as float: {}", s, e).into())
3341 } else {
3342 Err(format!("Cannot cast {:?} to {}", value, to_type).into())
3343 }
3344 }
3345 "String" | "string" => Ok(json!(value.to_string())),
3346 "bool" => Ok(json!(self.value_to_bool(value))),
3347 _ => {
3348 Ok(value.clone())
3350 }
3351 }
3352 }
3353
3354 fn apply_method_call(&self, value: &Value, method: &str, args: &[Value]) -> Result<Value> {
3356 match method {
3357 "unwrap_or" => {
3358 if value.is_null() && !args.is_empty() {
3359 Ok(args[0].clone())
3360 } else {
3361 Ok(value.clone())
3362 }
3363 }
3364 "unwrap_or_default" => {
3365 if value.is_null() {
3366 Ok(json!(0))
3368 } else {
3369 Ok(value.clone())
3370 }
3371 }
3372 "is_some" => Ok(json!(!value.is_null())),
3373 "is_none" => Ok(json!(value.is_null())),
3374 "abs" => {
3375 if let Some(n) = value.as_i64() {
3376 Ok(json!(n.abs()))
3377 } else if let Some(n) = value.as_f64() {
3378 Ok(json!(n.abs()))
3379 } else {
3380 Err(format!("Cannot call abs() on {:?}", value).into())
3381 }
3382 }
3383 "len" => {
3384 if let Some(s) = value.as_str() {
3385 Ok(json!(s.len()))
3386 } else if let Some(arr) = value.as_array() {
3387 Ok(json!(arr.len()))
3388 } else if let Some(obj) = value.as_object() {
3389 Ok(json!(obj.len()))
3390 } else {
3391 Err(format!("Cannot call len() on {:?}", value).into())
3392 }
3393 }
3394 "to_string" => Ok(json!(value.to_string())),
3395 "min" => {
3396 if args.is_empty() {
3397 return Err("min() requires an argument".into());
3398 }
3399 let other = &args[0];
3400 if let (Some(a), Some(b)) = (value.as_i64(), other.as_i64()) {
3401 Ok(json!(a.min(b)))
3402 } else if let (Some(a), Some(b)) = (value.as_f64(), other.as_f64()) {
3403 Ok(json!(a.min(b)))
3404 } else {
3405 Err(format!("Cannot call min() on {:?} and {:?}", value, other).into())
3406 }
3407 }
3408 "max" => {
3409 if args.is_empty() {
3410 return Err("max() requires an argument".into());
3411 }
3412 let other = &args[0];
3413 if let (Some(a), Some(b)) = (value.as_i64(), other.as_i64()) {
3414 Ok(json!(a.max(b)))
3415 } else if let (Some(a), Some(b)) = (value.as_f64(), other.as_f64()) {
3416 Ok(json!(a.max(b)))
3417 } else {
3418 Err(format!("Cannot call max() on {:?} and {:?}", value, other).into())
3419 }
3420 }
3421 "saturating_add" => {
3422 if args.is_empty() {
3423 return Err("saturating_add() requires an argument".into());
3424 }
3425 let other = &args[0];
3426 if let (Some(a), Some(b)) = (value.as_i64(), other.as_i64()) {
3427 Ok(json!(a.saturating_add(b)))
3428 } else if let (Some(a), Some(b)) = (value.as_u64(), other.as_u64()) {
3429 Ok(json!(a.saturating_add(b)))
3430 } else {
3431 Err(format!(
3432 "Cannot call saturating_add() on {:?} and {:?}",
3433 value, other
3434 )
3435 .into())
3436 }
3437 }
3438 "saturating_sub" => {
3439 if args.is_empty() {
3440 return Err("saturating_sub() requires an argument".into());
3441 }
3442 let other = &args[0];
3443 if let (Some(a), Some(b)) = (value.as_i64(), other.as_i64()) {
3444 Ok(json!(a.saturating_sub(b)))
3445 } else if let (Some(a), Some(b)) = (value.as_u64(), other.as_u64()) {
3446 Ok(json!(a.saturating_sub(b)))
3447 } else {
3448 Err(format!(
3449 "Cannot call saturating_sub() on {:?} and {:?}",
3450 value, other
3451 )
3452 .into())
3453 }
3454 }
3455 _ => Err(format!("Unknown method call: {}()", method).into()),
3456 }
3457 }
3458
3459 pub fn evaluate_computed_fields_from_ast(
3462 &self,
3463 state: &mut Value,
3464 computed_field_specs: &[ComputedFieldSpec],
3465 ) -> Result<Vec<String>> {
3466 let mut updated_paths = Vec::new();
3467
3468 for spec in computed_field_specs {
3469 if let Ok(result) = self.evaluate_computed_expr(&spec.expression, state) {
3470 self.set_field_in_state(state, &spec.target_path, result)?;
3471 updated_paths.push(spec.target_path.clone());
3472 }
3473 }
3474
3475 Ok(updated_paths)
3476 }
3477
3478 fn set_field_in_state(&self, state: &mut Value, path: &str, value: Value) -> Result<()> {
3480 let segments: Vec<&str> = path.split('.').collect();
3481
3482 if segments.is_empty() {
3483 return Err("Empty path".into());
3484 }
3485
3486 let mut current = state;
3488 for (i, segment) in segments.iter().enumerate() {
3489 if i == segments.len() - 1 {
3490 if let Some(obj) = current.as_object_mut() {
3492 obj.insert(segment.to_string(), value);
3493 return Ok(());
3494 } else {
3495 return Err(format!("Cannot set field '{}' on non-object", segment).into());
3496 }
3497 } else {
3498 if !current.is_object() {
3500 *current = json!({});
3501 }
3502 let obj = current.as_object_mut().unwrap();
3503 current = obj.entry(segment.to_string()).or_insert_with(|| json!({}));
3504 }
3505 }
3506
3507 Ok(())
3508 }
3509
3510 pub fn create_evaluator_from_specs(
3513 specs: Vec<ComputedFieldSpec>,
3514 ) -> impl Fn(&mut Value) -> Result<()> + Send + Sync + 'static {
3515 move |state: &mut Value| {
3516 let vm = VmContext::new();
3519 vm.evaluate_computed_fields_from_ast(state, &specs)?;
3520 Ok(())
3521 }
3522 }
3523}
3524
3525impl Default for VmContext {
3526 fn default() -> Self {
3527 Self::new()
3528 }
3529}
3530
3531impl crate::resolvers::ReverseLookupUpdater for VmContext {
3533 fn update(&mut self, pda_address: String, seed_value: String) -> Vec<PendingAccountUpdate> {
3534 self.update_pda_reverse_lookup(0, "default_pda_lookup", pda_address, seed_value)
3536 .unwrap_or_else(|e| {
3537 tracing::error!("Failed to update PDA reverse lookup: {}", e);
3538 Vec::new()
3539 })
3540 }
3541
3542 fn flush_pending(&mut self, pda_address: &str) -> Vec<PendingAccountUpdate> {
3543 self.flush_pending_updates(0, pda_address)
3545 .unwrap_or_else(|e| {
3546 tracing::error!("Failed to flush pending updates: {}", e);
3547 Vec::new()
3548 })
3549 }
3550}
3551
3552#[cfg(test)]
3553mod tests {
3554 use super::*;
3555 use crate::ast::{BinaryOp, ComputedExpr, ComputedFieldSpec};
3556
3557 #[test]
3558 fn test_computed_field_preserves_integer_type() {
3559 let vm = VmContext::new();
3560
3561 let mut state = serde_json::json!({
3562 "trading": {
3563 "total_buy_volume": 20000000000_i64,
3564 "total_sell_volume": 17951316474_i64
3565 }
3566 });
3567
3568 let spec = ComputedFieldSpec {
3569 target_path: "trading.total_volume".to_string(),
3570 result_type: "Option<u64>".to_string(),
3571 expression: ComputedExpr::Binary {
3572 op: BinaryOp::Add,
3573 left: Box::new(ComputedExpr::UnwrapOr {
3574 expr: Box::new(ComputedExpr::FieldRef {
3575 path: "trading.total_buy_volume".to_string(),
3576 }),
3577 default: serde_json::json!(0),
3578 }),
3579 right: Box::new(ComputedExpr::UnwrapOr {
3580 expr: Box::new(ComputedExpr::FieldRef {
3581 path: "trading.total_sell_volume".to_string(),
3582 }),
3583 default: serde_json::json!(0),
3584 }),
3585 },
3586 };
3587
3588 vm.evaluate_computed_fields_from_ast(&mut state, &[spec])
3589 .unwrap();
3590
3591 let total_volume = state
3592 .get("trading")
3593 .and_then(|t| t.get("total_volume"))
3594 .expect("total_volume should exist");
3595
3596 let serialized = serde_json::to_string(total_volume).unwrap();
3597 assert!(
3598 !serialized.contains('.'),
3599 "Integer should not have decimal point: {}",
3600 serialized
3601 );
3602 assert_eq!(
3603 total_volume.as_i64(),
3604 Some(37951316474),
3605 "Value should be correct sum"
3606 );
3607 }
3608
3609 #[test]
3610 fn test_set_field_sum_preserves_integer_type() {
3611 let mut vm = VmContext::new();
3612 vm.registers[0] = serde_json::json!({});
3613 vm.registers[1] = serde_json::json!(20000000000_i64);
3614 vm.registers[2] = serde_json::json!(17951316474_i64);
3615
3616 vm.set_field_sum(0, "trading.total_buy_volume", 1).unwrap();
3617 vm.set_field_sum(0, "trading.total_sell_volume", 2).unwrap();
3618
3619 let state = &vm.registers[0];
3620 let buy_vol = state
3621 .get("trading")
3622 .and_then(|t| t.get("total_buy_volume"))
3623 .unwrap();
3624 let sell_vol = state
3625 .get("trading")
3626 .and_then(|t| t.get("total_sell_volume"))
3627 .unwrap();
3628
3629 let buy_serialized = serde_json::to_string(buy_vol).unwrap();
3630 let sell_serialized = serde_json::to_string(sell_vol).unwrap();
3631
3632 assert!(
3633 !buy_serialized.contains('.'),
3634 "Buy volume should not have decimal: {}",
3635 buy_serialized
3636 );
3637 assert!(
3638 !sell_serialized.contains('.'),
3639 "Sell volume should not have decimal: {}",
3640 sell_serialized
3641 );
3642 }
3643}