1use crate::ast::{
2 BinaryOp, ComparisonOp, ComputedExpr, ComputedFieldSpec, FieldPath, Transformation,
3};
4use crate::compiler::{MultiEntityBytecode, OpCode};
5use crate::Mutation;
6use dashmap::DashMap;
7use lru::LruCache;
8use serde_json::{json, Value};
9use std::collections::{HashMap, HashSet};
10use std::num::NonZeroUsize;
11
12#[cfg(feature = "otel")]
13use tracing::instrument;
14#[derive(Debug, Clone, Default)]
17pub struct UpdateContext {
18 pub slot: Option<u64>,
20 pub signature: Option<String>,
22 pub timestamp: Option<i64>,
25 pub write_version: Option<u64>,
28 pub txn_index: Option<u64>,
31 pub metadata: HashMap<String, Value>,
33}
34
35impl UpdateContext {
36 pub fn new(slot: u64, signature: String) -> Self {
38 Self {
39 slot: Some(slot),
40 signature: Some(signature),
41 timestamp: None,
42 write_version: None,
43 txn_index: None,
44 metadata: HashMap::new(),
45 }
46 }
47
48 pub fn with_timestamp(slot: u64, signature: String, timestamp: i64) -> Self {
50 Self {
51 slot: Some(slot),
52 signature: Some(signature),
53 timestamp: Some(timestamp),
54 write_version: None,
55 txn_index: None,
56 metadata: HashMap::new(),
57 }
58 }
59
60 pub fn new_account(slot: u64, signature: String, write_version: u64) -> Self {
62 Self {
63 slot: Some(slot),
64 signature: Some(signature),
65 timestamp: None,
66 write_version: Some(write_version),
67 txn_index: None,
68 metadata: HashMap::new(),
69 }
70 }
71
72 pub fn new_instruction(slot: u64, signature: String, txn_index: u64) -> Self {
74 Self {
75 slot: Some(slot),
76 signature: Some(signature),
77 timestamp: None,
78 write_version: None,
79 txn_index: Some(txn_index),
80 metadata: HashMap::new(),
81 }
82 }
83
84 pub fn timestamp(&self) -> i64 {
86 self.timestamp.unwrap_or_else(|| {
87 std::time::SystemTime::now()
88 .duration_since(std::time::UNIX_EPOCH)
89 .unwrap()
90 .as_secs() as i64
91 })
92 }
93
94 pub fn empty() -> Self {
96 Self::default()
97 }
98
99 pub fn is_account_update(&self) -> bool {
102 self.write_version.is_some() && self.txn_index.is_none()
103 }
104
105 pub fn is_instruction_update(&self) -> bool {
107 self.txn_index.is_some() && self.write_version.is_none()
108 }
109
110 pub fn with_metadata(mut self, key: String, value: Value) -> Self {
111 self.metadata.insert(key, value);
112 self
113 }
114
115 pub fn get_metadata(&self, key: &str) -> Option<&Value> {
117 self.metadata.get(key)
118 }
119
120 pub fn to_value(&self) -> Value {
122 let mut obj = serde_json::Map::new();
123 if let Some(slot) = self.slot {
124 obj.insert("slot".to_string(), json!(slot));
125 }
126 if let Some(ref sig) = self.signature {
127 obj.insert("signature".to_string(), json!(sig));
128 }
129 obj.insert("timestamp".to_string(), json!(self.timestamp()));
131 for (key, value) in &self.metadata {
132 obj.insert(key.clone(), value.clone());
133 }
134 Value::Object(obj)
135 }
136}
137
138pub type Register = usize;
139pub type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>;
140
141pub type RegisterValue = Value;
142
143pub trait ComputedFieldsEvaluator {
146 fn evaluate(&self, state: &mut Value) -> Result<()>;
147}
148
149const MAX_PENDING_UPDATES_TOTAL: usize = 2_500;
151const MAX_PENDING_UPDATES_PER_PDA: usize = 50;
152const PENDING_UPDATE_TTL_SECONDS: i64 = 300; const TEMPORAL_HISTORY_TTL_SECONDS: i64 = 300; const MAX_TEMPORAL_ENTRIES_PER_KEY: usize = 250;
157
158const DEFAULT_MAX_STATE_TABLE_ENTRIES: usize = 2_500;
160const DEFAULT_MAX_ARRAY_LENGTH: usize = 100;
161
162const DEFAULT_MAX_LOOKUP_INDEX_ENTRIES: usize = 2_500;
163
164const DEFAULT_MAX_VERSION_TRACKER_ENTRIES: usize = 2_500;
165
166const DEFAULT_MAX_INSTRUCTION_DEDUP_ENTRIES: usize = 500;
169
170const DEFAULT_MAX_TEMPORAL_INDEX_KEYS: usize = 2_500;
171
172const DEFAULT_MAX_PDA_REVERSE_LOOKUP_ENTRIES: usize = 2_500;
173
174fn estimate_json_size(value: &Value) -> usize {
176 match value {
177 Value::Null => 4,
178 Value::Bool(_) => 5,
179 Value::Number(_) => 8,
180 Value::String(s) => s.len() + 2,
181 Value::Array(arr) => 2 + arr.iter().map(|v| estimate_json_size(v) + 1).sum::<usize>(),
182 Value::Object(obj) => {
183 2 + obj
184 .iter()
185 .map(|(k, v)| k.len() + 3 + estimate_json_size(v) + 1)
186 .sum::<usize>()
187 }
188 }
189}
190
191#[derive(Debug, Clone)]
192pub struct CompiledPath {
193 pub segments: std::sync::Arc<[String]>,
194}
195
196impl CompiledPath {
197 pub fn new(path: &str) -> Self {
198 let segments: Vec<String> = path.split('.').map(|s| s.to_string()).collect();
199 CompiledPath {
200 segments: segments.into(),
201 }
202 }
203
204 fn segments(&self) -> &[String] {
205 &self.segments
206 }
207}
208
209#[derive(Debug, Clone)]
212pub enum FieldChange {
213 Replaced,
215 Appended(Vec<Value>),
217}
218
219#[derive(Debug, Clone, Default)]
222pub struct DirtyTracker {
223 changes: HashMap<String, FieldChange>,
224}
225
226impl DirtyTracker {
227 pub fn new() -> Self {
229 Self {
230 changes: HashMap::new(),
231 }
232 }
233
234 pub fn mark_replaced(&mut self, path: &str) {
236 self.changes.insert(path.to_string(), FieldChange::Replaced);
238 }
239
240 pub fn mark_appended(&mut self, path: &str, value: Value) {
242 match self.changes.get_mut(path) {
243 Some(FieldChange::Appended(values)) => {
244 values.push(value);
246 }
247 Some(FieldChange::Replaced) => {
248 }
251 None => {
252 self.changes
254 .insert(path.to_string(), FieldChange::Appended(vec![value]));
255 }
256 }
257 }
258
259 pub fn is_empty(&self) -> bool {
261 self.changes.is_empty()
262 }
263
264 pub fn len(&self) -> usize {
266 self.changes.len()
267 }
268
269 pub fn iter(&self) -> impl Iterator<Item = (&String, &FieldChange)> {
271 self.changes.iter()
272 }
273
274 pub fn dirty_paths(&self) -> HashSet<String> {
276 self.changes.keys().cloned().collect()
277 }
278
279 pub fn into_changes(self) -> HashMap<String, FieldChange> {
281 self.changes
282 }
283
284 pub fn changes(&self) -> &HashMap<String, FieldChange> {
286 &self.changes
287 }
288
289 pub fn appended_paths(&self) -> Vec<String> {
291 self.changes
292 .iter()
293 .filter_map(|(path, change)| match change {
294 FieldChange::Appended(_) => Some(path.clone()),
295 FieldChange::Replaced => None,
296 })
297 .collect()
298 }
299}
300
301pub struct VmContext {
302 registers: Vec<RegisterValue>,
303 states: HashMap<u32, StateTable>,
304 pub instructions_executed: u64,
305 pub cache_hits: u64,
306 path_cache: HashMap<String, CompiledPath>,
307 pub pda_cache_hits: u64,
308 pub pda_cache_misses: u64,
309 pub pending_queue_size: u64,
310 current_context: Option<UpdateContext>,
311 warnings: Vec<String>,
312 last_pda_lookup_miss: Option<String>,
313 last_lookup_index_miss: Option<String>,
314 last_pda_registered: Option<String>,
315 last_lookup_index_keys: Vec<String>,
316}
317
318#[derive(Debug)]
319pub struct LookupIndex {
320 index: std::sync::Mutex<LruCache<String, Value>>,
321}
322
323impl LookupIndex {
324 pub fn new() -> Self {
325 Self::with_capacity(DEFAULT_MAX_LOOKUP_INDEX_ENTRIES)
326 }
327
328 pub fn with_capacity(capacity: usize) -> Self {
329 LookupIndex {
330 index: std::sync::Mutex::new(LruCache::new(
331 NonZeroUsize::new(capacity).expect("capacity must be > 0"),
332 )),
333 }
334 }
335
336 pub fn lookup(&self, lookup_value: &Value) -> Option<Value> {
337 let key = value_to_cache_key(lookup_value);
338 self.index.lock().unwrap().get(&key).cloned()
339 }
340
341 pub fn insert(&self, lookup_value: Value, primary_key: Value) {
342 let key = value_to_cache_key(&lookup_value);
343 self.index.lock().unwrap().put(key, primary_key);
344 }
345
346 pub fn len(&self) -> usize {
347 self.index.lock().unwrap().len()
348 }
349
350 pub fn is_empty(&self) -> bool {
351 self.index.lock().unwrap().is_empty()
352 }
353}
354
355impl Default for LookupIndex {
356 fn default() -> Self {
357 Self::new()
358 }
359}
360
361fn value_to_cache_key(value: &Value) -> String {
362 match value {
363 Value::String(s) => s.clone(),
364 Value::Number(n) => n.to_string(),
365 Value::Bool(b) => b.to_string(),
366 Value::Null => "null".to_string(),
367 _ => serde_json::to_string(value).unwrap_or_else(|_| "unknown".to_string()),
368 }
369}
370
371#[derive(Debug)]
372pub struct TemporalIndex {
373 index: std::sync::Mutex<LruCache<String, Vec<(Value, i64)>>>,
374}
375
376impl Default for TemporalIndex {
377 fn default() -> Self {
378 Self::new()
379 }
380}
381
382impl TemporalIndex {
383 pub fn new() -> Self {
384 Self::with_capacity(DEFAULT_MAX_TEMPORAL_INDEX_KEYS)
385 }
386
387 pub fn with_capacity(capacity: usize) -> Self {
388 TemporalIndex {
389 index: std::sync::Mutex::new(LruCache::new(
390 NonZeroUsize::new(capacity).expect("capacity must be > 0"),
391 )),
392 }
393 }
394
395 pub fn lookup(&self, lookup_value: &Value, timestamp: i64) -> Option<Value> {
396 let key = value_to_cache_key(lookup_value);
397 let mut cache = self.index.lock().unwrap();
398 if let Some(entries) = cache.get(&key) {
399 for i in (0..entries.len()).rev() {
400 if entries[i].1 <= timestamp {
401 return Some(entries[i].0.clone());
402 }
403 }
404 }
405 None
406 }
407
408 pub fn lookup_latest(&self, lookup_value: &Value) -> Option<Value> {
409 let key = value_to_cache_key(lookup_value);
410 let mut cache = self.index.lock().unwrap();
411 if let Some(entries) = cache.get(&key) {
412 if let Some(last) = entries.last() {
413 return Some(last.0.clone());
414 }
415 }
416 None
417 }
418
419 pub fn insert(&self, lookup_value: Value, primary_key: Value, timestamp: i64) {
420 let key = value_to_cache_key(&lookup_value);
421 let mut cache = self.index.lock().unwrap();
422
423 let entries = cache.get_or_insert_mut(key, Vec::new);
424 entries.push((primary_key, timestamp));
425 entries.sort_by_key(|(_, ts)| *ts);
426
427 let cutoff = timestamp - TEMPORAL_HISTORY_TTL_SECONDS;
428 entries.retain(|(_, ts)| *ts >= cutoff);
429
430 if entries.len() > MAX_TEMPORAL_ENTRIES_PER_KEY {
431 let excess = entries.len() - MAX_TEMPORAL_ENTRIES_PER_KEY;
432 entries.drain(0..excess);
433 }
434 }
435
436 pub fn len(&self) -> usize {
437 self.index.lock().unwrap().len()
438 }
439
440 pub fn is_empty(&self) -> bool {
441 self.index.lock().unwrap().is_empty()
442 }
443
444 pub fn total_entries(&self) -> usize {
445 self.index
446 .lock()
447 .unwrap()
448 .iter()
449 .map(|(_, entries)| entries.len())
450 .sum()
451 }
452
453 pub fn cleanup_expired(&self, cutoff_timestamp: i64) -> usize {
454 let mut cache = self.index.lock().unwrap();
455 let mut total_removed = 0;
456
457 for (_, entries) in cache.iter_mut() {
458 let original_len = entries.len();
459 entries.retain(|(_, ts)| *ts >= cutoff_timestamp);
460 total_removed += original_len - entries.len();
461 }
462
463 total_removed
464 }
465}
466
467#[derive(Debug)]
468pub struct PdaReverseLookup {
469 index: LruCache<String, String>,
471}
472
473impl PdaReverseLookup {
474 pub fn new(capacity: usize) -> Self {
475 PdaReverseLookup {
476 index: LruCache::new(NonZeroUsize::new(capacity).unwrap()),
477 }
478 }
479
480 pub fn lookup(&mut self, pda_address: &str) -> Option<String> {
481 self.index.get(pda_address).cloned()
482 }
483
484 pub fn insert(&mut self, pda_address: String, seed_value: String) -> Option<String> {
485 let evicted = if self.index.len() >= self.index.cap().get() {
486 self.index.peek_lru().map(|(k, _)| k.clone())
487 } else {
488 None
489 };
490
491 self.index.put(pda_address, seed_value);
492 evicted
493 }
494
495 pub fn len(&self) -> usize {
496 self.index.len()
497 }
498
499 pub fn is_empty(&self) -> bool {
500 self.index.is_empty()
501 }
502
503 pub fn contains(&self, pda_address: &str) -> bool {
504 self.index.peek(pda_address).is_some()
505 }
506}
507
508#[derive(Debug, Clone)]
510pub struct QueuedAccountUpdate {
511 pub pda_address: String,
512 pub account_type: String,
513 pub account_data: Value,
514 pub slot: u64,
515 pub write_version: u64,
516 pub signature: String,
517}
518
519#[derive(Debug, Clone)]
521pub struct PendingAccountUpdate {
522 pub account_type: String,
523 pub pda_address: String,
524 pub account_data: Value,
525 pub slot: u64,
526 pub write_version: u64,
527 pub signature: String,
528 pub queued_at: i64,
529}
530
531#[derive(Debug, Clone)]
533pub struct QueuedInstructionEvent {
534 pub pda_address: String,
535 pub event_type: String,
536 pub event_data: Value,
537 pub slot: u64,
538 pub signature: String,
539}
540
541#[derive(Debug, Clone)]
543pub struct PendingInstructionEvent {
544 pub event_type: String,
545 pub pda_address: String,
546 pub event_data: Value,
547 pub slot: u64,
548 pub signature: String,
549 pub queued_at: i64,
550}
551
552#[derive(Debug, Clone)]
553pub struct DeferredWhenOperation {
554 pub entity_name: String,
555 pub primary_key: Value,
556 pub field_path: String,
557 pub field_value: Value,
558 pub when_instruction: String,
559 pub signature: String,
560 pub slot: u64,
561 pub deferred_at: i64,
562 pub emit: bool,
563}
564
565#[derive(Debug, Clone)]
566pub struct PendingQueueStats {
567 pub total_updates: usize,
568 pub unique_pdas: usize,
569 pub oldest_age_seconds: i64,
570 pub largest_pda_queue_size: usize,
571 pub estimated_memory_bytes: usize,
572}
573
574#[derive(Debug, Clone, Default)]
575pub struct VmMemoryStats {
576 pub state_table_entity_count: usize,
577 pub state_table_max_entries: usize,
578 pub state_table_at_capacity: bool,
579 pub lookup_index_count: usize,
580 pub lookup_index_total_entries: usize,
581 pub temporal_index_count: usize,
582 pub temporal_index_total_entries: usize,
583 pub pda_reverse_lookup_count: usize,
584 pub pda_reverse_lookup_total_entries: usize,
585 pub version_tracker_entries: usize,
586 pub pending_queue_stats: Option<PendingQueueStats>,
587 pub path_cache_size: usize,
588}
589
590#[derive(Debug, Clone, Default)]
591pub struct CleanupResult {
592 pub pending_updates_removed: usize,
593 pub temporal_entries_removed: usize,
594}
595
596#[derive(Debug, Clone)]
597pub struct CapacityWarning {
598 pub current_entries: usize,
599 pub max_entries: usize,
600 pub entries_over_limit: usize,
601}
602
603#[derive(Debug, Clone)]
604pub struct StateTableConfig {
605 pub max_entries: usize,
606 pub max_array_length: usize,
607}
608
609impl Default for StateTableConfig {
610 fn default() -> Self {
611 Self {
612 max_entries: DEFAULT_MAX_STATE_TABLE_ENTRIES,
613 max_array_length: DEFAULT_MAX_ARRAY_LENGTH,
614 }
615 }
616}
617
618#[derive(Debug)]
619pub struct VersionTracker {
620 cache: std::sync::Mutex<LruCache<String, (u64, u64)>>,
621}
622
623impl VersionTracker {
624 pub fn new() -> Self {
625 Self::with_capacity(DEFAULT_MAX_VERSION_TRACKER_ENTRIES)
626 }
627
628 pub fn with_capacity(capacity: usize) -> Self {
629 VersionTracker {
630 cache: std::sync::Mutex::new(LruCache::new(
631 NonZeroUsize::new(capacity).expect("capacity must be > 0"),
632 )),
633 }
634 }
635
636 fn make_key(primary_key: &Value, event_type: &str) -> String {
637 format!("{}:{}", primary_key, event_type)
638 }
639
640 pub fn get(&self, primary_key: &Value, event_type: &str) -> Option<(u64, u64)> {
641 let key = Self::make_key(primary_key, event_type);
642 self.cache.lock().unwrap().get(&key).copied()
643 }
644
645 pub fn insert(&self, primary_key: &Value, event_type: &str, slot: u64, ordering_value: u64) {
646 let key = Self::make_key(primary_key, event_type);
647 self.cache.lock().unwrap().put(key, (slot, ordering_value));
648 }
649
650 pub fn len(&self) -> usize {
651 self.cache.lock().unwrap().len()
652 }
653
654 pub fn is_empty(&self) -> bool {
655 self.cache.lock().unwrap().is_empty()
656 }
657}
658
659impl Default for VersionTracker {
660 fn default() -> Self {
661 Self::new()
662 }
663}
664
665#[derive(Debug)]
666pub struct StateTable {
667 pub data: DashMap<Value, Value>,
668 access_times: DashMap<Value, i64>,
669 pub lookup_indexes: HashMap<String, LookupIndex>,
670 pub temporal_indexes: HashMap<String, TemporalIndex>,
671 pub pda_reverse_lookups: HashMap<String, PdaReverseLookup>,
672 pub pending_updates: DashMap<String, Vec<PendingAccountUpdate>>,
673 pub pending_instruction_events: DashMap<String, Vec<PendingInstructionEvent>>,
674 version_tracker: VersionTracker,
675 instruction_dedup_cache: VersionTracker,
676 config: StateTableConfig,
677 #[cfg_attr(not(feature = "otel"), allow(dead_code))]
678 entity_name: String,
679 pub recent_tx_instructions:
680 std::sync::Mutex<lru::LruCache<String, std::collections::HashSet<String>>>,
681 pub deferred_when_ops: DashMap<(String, String), Vec<DeferredWhenOperation>>,
682}
683
684impl StateTable {
685 pub fn is_at_capacity(&self) -> bool {
686 self.data.len() >= self.config.max_entries
687 }
688
689 pub fn entries_over_limit(&self) -> usize {
690 self.data.len().saturating_sub(self.config.max_entries)
691 }
692
693 pub fn max_array_length(&self) -> usize {
694 self.config.max_array_length
695 }
696
697 fn touch(&self, key: &Value) {
698 let now = std::time::SystemTime::now()
699 .duration_since(std::time::UNIX_EPOCH)
700 .unwrap()
701 .as_secs() as i64;
702 self.access_times.insert(key.clone(), now);
703 }
704
705 fn evict_lru(&self, count: usize) -> usize {
706 if count == 0 || self.data.is_empty() {
707 return 0;
708 }
709
710 let mut entries: Vec<(Value, i64)> = self
711 .access_times
712 .iter()
713 .map(|entry| (entry.key().clone(), *entry.value()))
714 .collect();
715
716 entries.sort_by_key(|(_, ts)| *ts);
717
718 let to_evict: Vec<Value> = entries.iter().take(count).map(|(k, _)| k.clone()).collect();
719
720 let mut evicted = 0;
721 for key in to_evict {
722 self.data.remove(&key);
723 self.access_times.remove(&key);
724 evicted += 1;
725 }
726
727 #[cfg(feature = "otel")]
728 if evicted > 0 {
729 crate::vm_metrics::record_state_table_eviction(evicted as u64, &self.entity_name);
730 }
731
732 evicted
733 }
734
735 pub fn insert_with_eviction(&self, key: Value, value: Value) {
736 if self.data.len() >= self.config.max_entries && !self.data.contains_key(&key) {
737 #[cfg(feature = "otel")]
738 crate::vm_metrics::record_state_table_at_capacity(&self.entity_name);
739 let to_evict = (self.data.len() + 1).saturating_sub(self.config.max_entries);
740 self.evict_lru(to_evict.max(1));
741 }
742 self.data.insert(key.clone(), value);
743 self.touch(&key);
744 }
745
746 pub fn get_and_touch(&self, key: &Value) -> Option<Value> {
747 let result = self.data.get(key).map(|v| v.clone());
748 if result.is_some() {
749 self.touch(key);
750 }
751 result
752 }
753
754 pub fn is_fresh_update(
761 &self,
762 primary_key: &Value,
763 event_type: &str,
764 slot: u64,
765 ordering_value: u64,
766 ) -> bool {
767 let dominated = self
768 .version_tracker
769 .get(primary_key, event_type)
770 .map(|(last_slot, last_version)| (slot, ordering_value) <= (last_slot, last_version))
771 .unwrap_or(false);
772
773 if dominated {
774 return false;
775 }
776
777 self.version_tracker
778 .insert(primary_key, event_type, slot, ordering_value);
779 true
780 }
781
782 pub fn is_duplicate_instruction(
790 &self,
791 primary_key: &Value,
792 event_type: &str,
793 slot: u64,
794 txn_index: u64,
795 ) -> bool {
796 let is_duplicate = self
798 .instruction_dedup_cache
799 .get(primary_key, event_type)
800 .map(|(last_slot, last_txn_index)| slot == last_slot && txn_index == last_txn_index)
801 .unwrap_or(false);
802
803 if is_duplicate {
804 return true;
805 }
806
807 self.instruction_dedup_cache
809 .insert(primary_key, event_type, slot, txn_index);
810 false
811 }
812}
813
814impl VmContext {
815 pub fn new() -> Self {
816 let mut vm = VmContext {
817 registers: vec![Value::Null; 256],
818 states: HashMap::new(),
819 instructions_executed: 0,
820 cache_hits: 0,
821 path_cache: HashMap::new(),
822 pda_cache_hits: 0,
823 pda_cache_misses: 0,
824 pending_queue_size: 0,
825 current_context: None,
826 warnings: Vec::new(),
827 last_pda_lookup_miss: None,
828 last_lookup_index_miss: None,
829 last_pda_registered: None,
830 last_lookup_index_keys: Vec::new(),
831 };
832 vm.states.insert(
833 0,
834 StateTable {
835 data: DashMap::new(),
836 access_times: DashMap::new(),
837 lookup_indexes: HashMap::new(),
838 temporal_indexes: HashMap::new(),
839 pda_reverse_lookups: HashMap::new(),
840 pending_updates: DashMap::new(),
841 pending_instruction_events: DashMap::new(),
842 version_tracker: VersionTracker::new(),
843 instruction_dedup_cache: VersionTracker::with_capacity(
844 DEFAULT_MAX_INSTRUCTION_DEDUP_ENTRIES,
845 ),
846 config: StateTableConfig::default(),
847 entity_name: "default".to_string(),
848 recent_tx_instructions: std::sync::Mutex::new(LruCache::new(
849 NonZeroUsize::new(1000).unwrap(),
850 )),
851 deferred_when_ops: DashMap::new(),
852 },
853 );
854 vm
855 }
856
857 pub fn new_with_config(state_config: StateTableConfig) -> Self {
858 let mut vm = VmContext {
859 registers: vec![Value::Null; 256],
860 states: HashMap::new(),
861 instructions_executed: 0,
862 cache_hits: 0,
863 path_cache: HashMap::new(),
864 pda_cache_hits: 0,
865 pda_cache_misses: 0,
866 pending_queue_size: 0,
867 current_context: None,
868 warnings: Vec::new(),
869 last_pda_lookup_miss: None,
870 last_lookup_index_miss: None,
871 last_pda_registered: None,
872 last_lookup_index_keys: Vec::new(),
873 };
874 vm.states.insert(
875 0,
876 StateTable {
877 data: DashMap::new(),
878 access_times: DashMap::new(),
879 lookup_indexes: HashMap::new(),
880 temporal_indexes: HashMap::new(),
881 pda_reverse_lookups: HashMap::new(),
882 pending_updates: DashMap::new(),
883 pending_instruction_events: DashMap::new(),
884 version_tracker: VersionTracker::new(),
885 instruction_dedup_cache: VersionTracker::with_capacity(
886 DEFAULT_MAX_INSTRUCTION_DEDUP_ENTRIES,
887 ),
888 config: state_config,
889 entity_name: "default".to_string(),
890 recent_tx_instructions: std::sync::Mutex::new(LruCache::new(
891 NonZeroUsize::new(1000).unwrap(),
892 )),
893 deferred_when_ops: DashMap::new(),
894 },
895 );
896 vm
897 }
898
899 pub fn get_state_table_mut(&mut self, state_id: u32) -> Option<&mut StateTable> {
902 self.states.get_mut(&state_id)
903 }
904
905 pub fn registers_mut(&mut self) -> &mut Vec<RegisterValue> {
907 &mut self.registers
908 }
909
910 pub fn path_cache(&self) -> &HashMap<String, CompiledPath> {
912 &self.path_cache
913 }
914
915 pub fn current_context(&self) -> Option<&UpdateContext> {
917 self.current_context.as_ref()
918 }
919
920 fn add_warning(&mut self, msg: String) {
921 self.warnings.push(msg);
922 }
923
924 pub fn take_warnings(&mut self) -> Vec<String> {
925 std::mem::take(&mut self.warnings)
926 }
927
928 pub fn has_warnings(&self) -> bool {
929 !self.warnings.is_empty()
930 }
931
932 pub fn update_state_from_register(
933 &mut self,
934 state_id: u32,
935 key: Value,
936 register: Register,
937 ) -> Result<()> {
938 let state = self.states.get(&state_id).ok_or("State table not found")?;
939 let value = self.registers[register].clone();
940 state.insert_with_eviction(key, value);
941 Ok(())
942 }
943
944 fn reset_registers(&mut self) {
945 for reg in &mut self.registers {
946 *reg = Value::Null;
947 }
948 }
949
950 pub fn extract_partial_state(
952 &self,
953 state_reg: Register,
954 dirty_fields: &HashSet<String>,
955 ) -> Result<Value> {
956 let full_state = &self.registers[state_reg];
957
958 if dirty_fields.is_empty() {
959 return Ok(json!({}));
960 }
961
962 let mut partial = serde_json::Map::new();
963
964 for path in dirty_fields {
965 let segments: Vec<&str> = path.split('.').collect();
966
967 let mut current = full_state;
968 let mut found = true;
969
970 for segment in &segments {
971 match current.get(segment) {
972 Some(v) => current = v,
973 None => {
974 found = false;
975 break;
976 }
977 }
978 }
979
980 if !found {
981 continue;
982 }
983
984 let mut target = &mut partial;
985 for (i, segment) in segments.iter().enumerate() {
986 if i == segments.len() - 1 {
987 target.insert(segment.to_string(), current.clone());
988 } else {
989 target
990 .entry(segment.to_string())
991 .or_insert_with(|| json!({}));
992 target = target
993 .get_mut(*segment)
994 .and_then(|v| v.as_object_mut())
995 .ok_or("Failed to build nested structure")?;
996 }
997 }
998 }
999
1000 Ok(Value::Object(partial))
1001 }
1002
1003 pub fn extract_partial_state_with_tracker(
1007 &self,
1008 state_reg: Register,
1009 tracker: &DirtyTracker,
1010 ) -> Result<Value> {
1011 let full_state = &self.registers[state_reg];
1012
1013 if tracker.is_empty() {
1014 return Ok(json!({}));
1015 }
1016
1017 let mut partial = serde_json::Map::new();
1018
1019 for (path, change) in tracker.iter() {
1020 let segments: Vec<&str> = path.split('.').collect();
1021
1022 let value_to_insert = match change {
1023 FieldChange::Replaced => {
1024 let mut current = full_state;
1025 let mut found = true;
1026
1027 for segment in &segments {
1028 match current.get(*segment) {
1029 Some(v) => current = v,
1030 None => {
1031 found = false;
1032 break;
1033 }
1034 }
1035 }
1036
1037 if !found {
1038 continue;
1039 }
1040 current.clone()
1041 }
1042 FieldChange::Appended(values) => Value::Array(values.clone()),
1043 };
1044
1045 let mut target = &mut partial;
1046 for (i, segment) in segments.iter().enumerate() {
1047 if i == segments.len() - 1 {
1048 target.insert(segment.to_string(), value_to_insert.clone());
1049 } else {
1050 target
1051 .entry(segment.to_string())
1052 .or_insert_with(|| json!({}));
1053 target = target
1054 .get_mut(*segment)
1055 .and_then(|v| v.as_object_mut())
1056 .ok_or("Failed to build nested structure")?;
1057 }
1058 }
1059 }
1060
1061 Ok(Value::Object(partial))
1062 }
1063
1064 fn get_compiled_path(&mut self, path: &str) -> CompiledPath {
1065 if let Some(compiled) = self.path_cache.get(path) {
1066 self.cache_hits += 1;
1067 #[cfg(feature = "otel")]
1068 crate::vm_metrics::record_path_cache_hit();
1069 return compiled.clone();
1070 }
1071 #[cfg(feature = "otel")]
1072 crate::vm_metrics::record_path_cache_miss();
1073 let compiled = CompiledPath::new(path);
1074 self.path_cache.insert(path.to_string(), compiled.clone());
1075 compiled
1076 }
1077
1078 #[cfg_attr(feature = "otel", instrument(
1080 name = "vm.process_event",
1081 skip(self, bytecode, event_value, log),
1082 level = "info",
1083 fields(
1084 event_type = %event_type,
1085 slot = context.as_ref().and_then(|c| c.slot),
1086 )
1087 ))]
1088 pub fn process_event(
1089 &mut self,
1090 bytecode: &MultiEntityBytecode,
1091 event_value: Value,
1092 event_type: &str,
1093 context: Option<&UpdateContext>,
1094 mut log: Option<&mut crate::canonical_log::CanonicalLog>,
1095 ) -> Result<Vec<Mutation>> {
1096 self.current_context = context.cloned();
1097
1098 let mut event_value = event_value;
1099 if let Some(ctx) = context {
1100 if let Some(obj) = event_value.as_object_mut() {
1101 obj.insert("__update_context".to_string(), ctx.to_value());
1102 }
1103 }
1104
1105 let mut all_mutations = Vec::new();
1106
1107 if event_type.ends_with("IxState") && bytecode.when_events.contains(event_type) {
1108 if let Some(ctx) = context {
1109 if let Some(signature) = ctx.signature.clone() {
1110 let state_ids: Vec<u32> = self.states.keys().cloned().collect();
1111 for state_id in state_ids {
1112 if let Some(state) = self.states.get(&state_id) {
1113 {
1114 let mut cache = state.recent_tx_instructions.lock().unwrap();
1115 let entry =
1116 cache.get_or_insert_mut(signature.clone(), HashSet::new);
1117 entry.insert(event_type.to_string());
1118 }
1119
1120 let key = (signature.clone(), event_type.to_string());
1121 if let Some((_, deferred_ops)) = state.deferred_when_ops.remove(&key) {
1122 for op in deferred_ops {
1123 match self.apply_deferred_when_op(state_id, &op) {
1124 Ok(mutations) => all_mutations.extend(mutations),
1125 Err(e) => tracing::warn!(
1126 "Failed to apply deferred when-op: {}",
1127 e
1128 ),
1129 }
1130 }
1131 }
1132 }
1133 }
1134 }
1135 }
1136 }
1137
1138 if let Some(entity_names) = bytecode.event_routing.get(event_type) {
1139 for entity_name in entity_names {
1140 if let Some(entity_bytecode) = bytecode.entities.get(entity_name) {
1141 if let Some(handler) = entity_bytecode.handlers.get(event_type) {
1142 if let Some(ref mut log) = log {
1143 log.set("entity", entity_name.clone());
1144 log.inc("handlers", 1);
1145 }
1146
1147 let opcodes_before = self.instructions_executed;
1148 let cache_before = self.cache_hits;
1149 let pda_hits_before = self.pda_cache_hits;
1150 let pda_misses_before = self.pda_cache_misses;
1151
1152 let mutations = self.execute_handler(
1153 handler,
1154 &event_value,
1155 event_type,
1156 entity_bytecode.state_id,
1157 entity_name,
1158 entity_bytecode.computed_fields_evaluator.as_ref(),
1159 Some(&entity_bytecode.non_emitted_fields),
1160 )?;
1161
1162 if let Some(ref mut log) = log {
1163 log.inc(
1164 "opcodes",
1165 (self.instructions_executed - opcodes_before) as i64,
1166 );
1167 log.inc("cache_hits", (self.cache_hits - cache_before) as i64);
1168 log.inc("pda_hits", (self.pda_cache_hits - pda_hits_before) as i64);
1169 log.inc(
1170 "pda_misses",
1171 (self.pda_cache_misses - pda_misses_before) as i64,
1172 );
1173 }
1174
1175 if mutations.is_empty() {
1176 if let Some(missed_pda) = self.take_last_pda_lookup_miss() {
1177 if event_type.ends_with("IxState") {
1178 let slot = context.and_then(|c| c.slot).unwrap_or(0);
1179 let signature = context
1180 .and_then(|c| c.signature.clone())
1181 .unwrap_or_default();
1182 let _ = self.queue_instruction_event(
1183 entity_bytecode.state_id,
1184 QueuedInstructionEvent {
1185 pda_address: missed_pda,
1186 event_type: event_type.to_string(),
1187 event_data: event_value.clone(),
1188 slot,
1189 signature,
1190 },
1191 );
1192 }
1193 }
1194
1195 if let Some(missed_lookup) = self.take_last_lookup_index_miss() {
1196 if !event_type.ends_with("IxState") {
1197 let slot = context.and_then(|c| c.slot).unwrap_or(0);
1198 let signature = context
1199 .and_then(|c| c.signature.clone())
1200 .unwrap_or_default();
1201 if let Some(write_version) =
1202 context.and_then(|c| c.write_version)
1203 {
1204 let _ = self.queue_account_update(
1205 entity_bytecode.state_id,
1206 QueuedAccountUpdate {
1207 pda_address: missed_lookup,
1208 account_type: event_type.to_string(),
1209 account_data: event_value.clone(),
1210 slot,
1211 write_version,
1212 signature,
1213 },
1214 );
1215 }
1216 }
1217 }
1218 }
1219
1220 all_mutations.extend(mutations);
1221
1222 if event_type.ends_with("IxState") {
1223 if let Some(ctx) = context {
1224 if let Some(ref signature) = ctx.signature {
1225 if let Some(state) = self.states.get(&entity_bytecode.state_id)
1226 {
1227 {
1228 let mut cache =
1229 state.recent_tx_instructions.lock().unwrap();
1230 let entry = cache
1231 .get_or_insert_mut(signature.clone(), HashSet::new);
1232 entry.insert(event_type.to_string());
1233 }
1234
1235 let key = (signature.clone(), event_type.to_string());
1236 if let Some((_, deferred_ops)) =
1237 state.deferred_when_ops.remove(&key)
1238 {
1239 for op in deferred_ops {
1240 match self.apply_deferred_when_op(
1241 entity_bytecode.state_id,
1242 &op,
1243 ) {
1244 Ok(mutations) => {
1245 all_mutations.extend(mutations)
1246 }
1247 Err(e) => {
1248 tracing::warn!(
1249 "Failed to apply deferred when-op: {}",
1250 e
1251 );
1252 }
1253 }
1254 }
1255 }
1256 }
1257 }
1258 }
1259 }
1260
1261 if let Some(registered_pda) = self.take_last_pda_registered() {
1262 let pending_events = self.flush_pending_instruction_events(
1263 entity_bytecode.state_id,
1264 ®istered_pda,
1265 );
1266 for pending in pending_events {
1267 if let Some(pending_handler) =
1268 entity_bytecode.handlers.get(&pending.event_type)
1269 {
1270 if let Ok(reprocessed_mutations) = self.execute_handler(
1271 pending_handler,
1272 &pending.event_data,
1273 &pending.event_type,
1274 entity_bytecode.state_id,
1275 entity_name,
1276 entity_bytecode.computed_fields_evaluator.as_ref(),
1277 Some(&entity_bytecode.non_emitted_fields),
1278 ) {
1279 all_mutations.extend(reprocessed_mutations);
1280 }
1281 }
1282 }
1283 }
1284
1285 let lookup_keys = self.take_last_lookup_index_keys();
1286 for lookup_key in lookup_keys {
1287 if let Ok(pending_updates) =
1288 self.flush_pending_updates(entity_bytecode.state_id, &lookup_key)
1289 {
1290 for pending in pending_updates {
1291 if let Some(pending_handler) =
1292 entity_bytecode.handlers.get(&pending.account_type)
1293 {
1294 self.current_context = Some(UpdateContext::new_account(
1295 pending.slot,
1296 pending.signature.clone(),
1297 pending.write_version,
1298 ));
1299 if let Ok(reprocessed) = self.execute_handler(
1300 pending_handler,
1301 &pending.account_data,
1302 &pending.account_type,
1303 entity_bytecode.state_id,
1304 entity_name,
1305 entity_bytecode.computed_fields_evaluator.as_ref(),
1306 Some(&entity_bytecode.non_emitted_fields),
1307 ) {
1308 all_mutations.extend(reprocessed);
1309 }
1310 }
1311 }
1312 }
1313 }
1314 } else if let Some(ref mut log) = log {
1315 log.set("skip_reason", "no_handler");
1316 }
1317 } else if let Some(ref mut log) = log {
1318 log.set("skip_reason", "entity_not_found");
1319 }
1320 }
1321 } else if let Some(ref mut log) = log {
1322 log.set("skip_reason", "no_event_routing");
1323 }
1324
1325 if let Some(log) = log {
1326 log.set("mutations", all_mutations.len() as i64);
1327 if let Some(first) = all_mutations.first() {
1328 if let Some(key_str) = first.key.as_str() {
1329 log.set("primary_key", key_str);
1330 } else if let Some(key_num) = first.key.as_u64() {
1331 log.set("primary_key", key_num as i64);
1332 }
1333 }
1334 if let Some(state) = self.states.get(&0) {
1335 log.set("state_table_size", state.data.len() as i64);
1336 }
1337
1338 let warnings = self.take_warnings();
1339 if !warnings.is_empty() {
1340 log.set("warnings", warnings.len() as i64);
1341 log.set(
1342 "warning_messages",
1343 Value::Array(warnings.into_iter().map(Value::String).collect()),
1344 );
1345 log.set_level(crate::canonical_log::LogLevel::Warn);
1346 }
1347 } else {
1348 self.warnings.clear();
1349 }
1350
1351 if self.instructions_executed.is_multiple_of(1000) {
1352 let state_ids: Vec<u32> = self.states.keys().cloned().collect();
1353 for state_id in state_ids {
1354 let expired = self.cleanup_expired_when_ops(state_id, 60);
1355 if expired > 0 {
1356 tracing::debug!(
1357 "Cleaned up {} expired deferred when-ops for state {}",
1358 expired,
1359 state_id
1360 );
1361 }
1362 }
1363 }
1364
1365 Ok(all_mutations)
1366 }
1367
1368 pub fn process_any(
1369 &mut self,
1370 bytecode: &MultiEntityBytecode,
1371 any: prost_types::Any,
1372 ) -> Result<Vec<Mutation>> {
1373 let (event_value, event_type) = bytecode.proto_router.decode(any)?;
1374 self.process_event(bytecode, event_value, &event_type, None, None)
1375 }
1376
1377 #[cfg_attr(feature = "otel", instrument(
1378 name = "vm.execute_handler",
1379 skip(self, handler, event_value, entity_evaluator),
1380 level = "debug",
1381 fields(
1382 event_type = %event_type,
1383 handler_opcodes = handler.len(),
1384 )
1385 ))]
1386 #[allow(clippy::type_complexity, clippy::too_many_arguments)]
1387 fn execute_handler(
1388 &mut self,
1389 handler: &[OpCode],
1390 event_value: &Value,
1391 event_type: &str,
1392 override_state_id: u32,
1393 entity_name: &str,
1394 entity_evaluator: Option<&Box<dyn Fn(&mut Value) -> Result<()> + Send + Sync>>,
1395 non_emitted_fields: Option<&HashSet<String>>,
1396 ) -> Result<Vec<Mutation>> {
1397 self.reset_registers();
1398 self.last_pda_lookup_miss = None;
1399
1400 let mut pc: usize = 0;
1401 let mut output = Vec::new();
1402 let mut dirty_tracker = DirtyTracker::new();
1403 let should_emit = |path: &str| {
1404 non_emitted_fields
1405 .map(|fields| !fields.contains(path))
1406 .unwrap_or(true)
1407 };
1408
1409 while pc < handler.len() {
1410 match &handler[pc] {
1411 OpCode::LoadEventField {
1412 path,
1413 dest,
1414 default,
1415 } => {
1416 let value = self.load_field(event_value, path, default.as_ref())?;
1417 self.registers[*dest] = value;
1418 pc += 1;
1419 }
1420 OpCode::LoadConstant { value, dest } => {
1421 self.registers[*dest] = value.clone();
1422 pc += 1;
1423 }
1424 OpCode::CopyRegister { source, dest } => {
1425 self.registers[*dest] = self.registers[*source].clone();
1426 pc += 1;
1427 }
1428 OpCode::CopyRegisterIfNull { source, dest } => {
1429 if self.registers[*dest].is_null() {
1430 self.registers[*dest] = self.registers[*source].clone();
1431 }
1432 pc += 1;
1433 }
1434 OpCode::GetEventType { dest } => {
1435 self.registers[*dest] = json!(event_type);
1436 pc += 1;
1437 }
1438 OpCode::CreateObject { dest } => {
1439 self.registers[*dest] = json!({});
1440 pc += 1;
1441 }
1442 OpCode::SetField {
1443 object,
1444 path,
1445 value,
1446 } => {
1447 self.set_field_auto_vivify(*object, path, *value)?;
1448 if should_emit(path) {
1449 dirty_tracker.mark_replaced(path);
1450 }
1451 pc += 1;
1452 }
1453 OpCode::SetFields { object, fields } => {
1454 for (path, value_reg) in fields {
1455 self.set_field_auto_vivify(*object, path, *value_reg)?;
1456 if should_emit(path) {
1457 dirty_tracker.mark_replaced(path);
1458 }
1459 }
1460 pc += 1;
1461 }
1462 OpCode::GetField { object, path, dest } => {
1463 let value = self.get_field(*object, path)?;
1464 self.registers[*dest] = value;
1465 pc += 1;
1466 }
1467 OpCode::ReadOrInitState {
1468 state_id: _,
1469 key,
1470 default,
1471 dest,
1472 } => {
1473 let actual_state_id = override_state_id;
1474 let entity_name_owned = entity_name.to_string();
1475 self.states
1476 .entry(actual_state_id)
1477 .or_insert_with(|| StateTable {
1478 data: DashMap::new(),
1479 access_times: DashMap::new(),
1480 lookup_indexes: HashMap::new(),
1481 temporal_indexes: HashMap::new(),
1482 pda_reverse_lookups: HashMap::new(),
1483 pending_updates: DashMap::new(),
1484 pending_instruction_events: DashMap::new(),
1485 version_tracker: VersionTracker::new(),
1486 instruction_dedup_cache: VersionTracker::with_capacity(
1487 DEFAULT_MAX_INSTRUCTION_DEDUP_ENTRIES,
1488 ),
1489 config: StateTableConfig::default(),
1490 entity_name: entity_name_owned,
1491 recent_tx_instructions: std::sync::Mutex::new(LruCache::new(
1492 NonZeroUsize::new(1000).unwrap(),
1493 )),
1494 deferred_when_ops: DashMap::new(),
1495 });
1496 let key_value = self.registers[*key].clone();
1497 let warn_null_key = key_value.is_null()
1498 && event_type.ends_with("State")
1499 && !event_type.ends_with("IxState");
1500
1501 if warn_null_key {
1502 self.add_warning(format!(
1503 "ReadOrInitState: key register {} is NULL for account state, event_type={}",
1504 key, event_type
1505 ));
1506 }
1507
1508 let state = self
1509 .states
1510 .get(&actual_state_id)
1511 .ok_or("State table not found")?;
1512
1513 if !key_value.is_null() {
1514 if let Some(ctx) = &self.current_context {
1515 if ctx.is_account_update() {
1517 if let (Some(slot), Some(write_version)) =
1518 (ctx.slot, ctx.write_version)
1519 {
1520 if !state.is_fresh_update(
1521 &key_value,
1522 event_type,
1523 slot,
1524 write_version,
1525 ) {
1526 self.add_warning(format!(
1527 "Stale account update skipped: slot={}, write_version={}",
1528 slot, write_version
1529 ));
1530 return Ok(Vec::new());
1531 }
1532 }
1533 }
1534 else if ctx.is_instruction_update() {
1536 if let (Some(slot), Some(txn_index)) = (ctx.slot, ctx.txn_index) {
1537 if state.is_duplicate_instruction(
1538 &key_value, event_type, slot, txn_index,
1539 ) {
1540 self.add_warning(format!(
1541 "Duplicate instruction skipped: slot={}, txn_index={}",
1542 slot, txn_index
1543 ));
1544 return Ok(Vec::new());
1545 }
1546 }
1547 }
1548 }
1549 }
1550 let value = state
1551 .get_and_touch(&key_value)
1552 .unwrap_or_else(|| default.clone());
1553
1554 self.registers[*dest] = value;
1555 pc += 1;
1556 }
1557 OpCode::UpdateState {
1558 state_id: _,
1559 key,
1560 value,
1561 } => {
1562 let actual_state_id = override_state_id;
1563 let state = self
1564 .states
1565 .get(&actual_state_id)
1566 .ok_or("State table not found")?;
1567 let key_value = self.registers[*key].clone();
1568 let value_data = self.registers[*value].clone();
1569
1570 state.insert_with_eviction(key_value, value_data);
1571 pc += 1;
1572 }
1573 OpCode::AppendToArray {
1574 object,
1575 path,
1576 value,
1577 } => {
1578 let appended_value = self.registers[*value].clone();
1579 let max_len = self
1580 .states
1581 .get(&override_state_id)
1582 .map(|s| s.max_array_length())
1583 .unwrap_or(DEFAULT_MAX_ARRAY_LENGTH);
1584 self.append_to_array(*object, path, *value, max_len)?;
1585 if should_emit(path) {
1586 dirty_tracker.mark_appended(path, appended_value);
1587 }
1588 pc += 1;
1589 }
1590 OpCode::GetCurrentTimestamp { dest } => {
1591 let timestamp = std::time::SystemTime::now()
1592 .duration_since(std::time::UNIX_EPOCH)
1593 .unwrap()
1594 .as_secs() as i64;
1595 self.registers[*dest] = json!(timestamp);
1596 pc += 1;
1597 }
1598 OpCode::CreateEvent { dest, event_value } => {
1599 let timestamp = std::time::SystemTime::now()
1600 .duration_since(std::time::UNIX_EPOCH)
1601 .unwrap()
1602 .as_secs() as i64;
1603
1604 let mut event_data = self.registers[*event_value].clone();
1606 if let Some(obj) = event_data.as_object_mut() {
1607 obj.remove("__update_context");
1608 }
1609
1610 let mut event = serde_json::Map::new();
1612 event.insert("timestamp".to_string(), json!(timestamp));
1613 event.insert("data".to_string(), event_data);
1614
1615 if let Some(ref ctx) = self.current_context {
1617 if let Some(slot) = ctx.slot {
1618 event.insert("slot".to_string(), json!(slot));
1619 }
1620 if let Some(ref signature) = ctx.signature {
1621 event.insert("signature".to_string(), json!(signature));
1622 }
1623 }
1624
1625 self.registers[*dest] = Value::Object(event);
1626 pc += 1;
1627 }
1628 OpCode::CreateCapture {
1629 dest,
1630 capture_value,
1631 } => {
1632 let timestamp = std::time::SystemTime::now()
1633 .duration_since(std::time::UNIX_EPOCH)
1634 .unwrap()
1635 .as_secs() as i64;
1636
1637 let capture_data = self.registers[*capture_value].clone();
1639
1640 let account_address = event_value
1642 .get("__account_address")
1643 .and_then(|v| v.as_str())
1644 .unwrap_or("")
1645 .to_string();
1646
1647 let mut capture = serde_json::Map::new();
1649 capture.insert("timestamp".to_string(), json!(timestamp));
1650 capture.insert("account_address".to_string(), json!(account_address));
1651 capture.insert("data".to_string(), capture_data);
1652
1653 if let Some(ref ctx) = self.current_context {
1655 if let Some(slot) = ctx.slot {
1656 capture.insert("slot".to_string(), json!(slot));
1657 }
1658 if let Some(ref signature) = ctx.signature {
1659 capture.insert("signature".to_string(), json!(signature));
1660 }
1661 }
1662
1663 self.registers[*dest] = Value::Object(capture);
1664 pc += 1;
1665 }
1666 OpCode::Transform {
1667 source,
1668 dest,
1669 transformation,
1670 } => {
1671 if source == dest {
1672 self.transform_in_place(*source, transformation)?;
1673 } else {
1674 let source_value = &self.registers[*source];
1675 let value = self.apply_transformation(source_value, transformation)?;
1676 self.registers[*dest] = value;
1677 }
1678 pc += 1;
1679 }
1680 OpCode::EmitMutation {
1681 entity_name,
1682 key,
1683 state,
1684 } => {
1685 let primary_key = self.registers[*key].clone();
1686
1687 if primary_key.is_null() || dirty_tracker.is_empty() {
1688 let reason = if dirty_tracker.is_empty() {
1689 "no_fields_modified"
1690 } else {
1691 "null_primary_key"
1692 };
1693 self.add_warning(format!(
1694 "Skipping mutation for entity '{}': {} (dirty_fields={})",
1695 entity_name,
1696 reason,
1697 dirty_tracker.len()
1698 ));
1699 } else {
1700 let patch =
1701 self.extract_partial_state_with_tracker(*state, &dirty_tracker)?;
1702
1703 let append = dirty_tracker.appended_paths();
1704 let mutation = Mutation {
1705 export: entity_name.clone(),
1706 key: primary_key,
1707 patch,
1708 append,
1709 };
1710 output.push(mutation);
1711 }
1712 pc += 1;
1713 }
1714 OpCode::SetFieldIfNull {
1715 object,
1716 path,
1717 value,
1718 } => {
1719 let was_set = self.set_field_if_null(*object, path, *value)?;
1720 if was_set && should_emit(path) {
1721 dirty_tracker.mark_replaced(path);
1722 }
1723 pc += 1;
1724 }
1725 OpCode::SetFieldMax {
1726 object,
1727 path,
1728 value,
1729 } => {
1730 let was_updated = self.set_field_max(*object, path, *value)?;
1731 if was_updated && should_emit(path) {
1732 dirty_tracker.mark_replaced(path);
1733 }
1734 pc += 1;
1735 }
1736 OpCode::UpdateTemporalIndex {
1737 state_id: _,
1738 index_name,
1739 lookup_value,
1740 primary_key,
1741 timestamp,
1742 } => {
1743 let actual_state_id = override_state_id;
1744 let state = self
1745 .states
1746 .get_mut(&actual_state_id)
1747 .ok_or("State table not found")?;
1748 let index = state
1749 .temporal_indexes
1750 .entry(index_name.clone())
1751 .or_insert_with(TemporalIndex::new);
1752
1753 let lookup_val = self.registers[*lookup_value].clone();
1754 let pk_val = self.registers[*primary_key].clone();
1755 let ts_val = if let Some(val) = self.registers[*timestamp].as_i64() {
1756 val
1757 } else if let Some(val) = self.registers[*timestamp].as_u64() {
1758 val as i64
1759 } else {
1760 return Err(format!(
1761 "Timestamp must be a number (i64 or u64), got: {:?}",
1762 self.registers[*timestamp]
1763 )
1764 .into());
1765 };
1766
1767 index.insert(lookup_val, pk_val, ts_val);
1768 pc += 1;
1769 }
1770 OpCode::LookupTemporalIndex {
1771 state_id: _,
1772 index_name,
1773 lookup_value,
1774 timestamp,
1775 dest,
1776 } => {
1777 let actual_state_id = override_state_id;
1778 let state = self
1779 .states
1780 .get(&actual_state_id)
1781 .ok_or("State table not found")?;
1782 let lookup_val = &self.registers[*lookup_value];
1783
1784 let result = if self.registers[*timestamp].is_null() {
1785 if let Some(index) = state.temporal_indexes.get(index_name) {
1786 index.lookup_latest(lookup_val).unwrap_or(Value::Null)
1787 } else {
1788 Value::Null
1789 }
1790 } else {
1791 let ts_val = if let Some(val) = self.registers[*timestamp].as_i64() {
1792 val
1793 } else if let Some(val) = self.registers[*timestamp].as_u64() {
1794 val as i64
1795 } else {
1796 return Err(format!(
1797 "Timestamp must be a number (i64 or u64), got: {:?}",
1798 self.registers[*timestamp]
1799 )
1800 .into());
1801 };
1802
1803 if let Some(index) = state.temporal_indexes.get(index_name) {
1804 index.lookup(lookup_val, ts_val).unwrap_or(Value::Null)
1805 } else {
1806 Value::Null
1807 }
1808 };
1809
1810 self.registers[*dest] = result;
1811 pc += 1;
1812 }
1813 OpCode::UpdateLookupIndex {
1814 state_id: _,
1815 index_name,
1816 lookup_value,
1817 primary_key,
1818 } => {
1819 let actual_state_id = override_state_id;
1820 let state = self
1821 .states
1822 .get_mut(&actual_state_id)
1823 .ok_or("State table not found")?;
1824 let index = state
1825 .lookup_indexes
1826 .entry(index_name.clone())
1827 .or_insert_with(LookupIndex::new);
1828
1829 let lookup_val = self.registers[*lookup_value].clone();
1830 let pk_val = self.registers[*primary_key].clone();
1831
1832 index.insert(lookup_val.clone(), pk_val);
1833
1834 if let Some(key_str) = lookup_val.as_str() {
1836 self.last_lookup_index_keys.push(key_str.to_string());
1837 }
1838
1839 pc += 1;
1840 }
1841 OpCode::LookupIndex {
1842 state_id: _,
1843 index_name,
1844 lookup_value,
1845 dest,
1846 } => {
1847 let actual_state_id = override_state_id;
1848 let mut current_value = self.registers[*lookup_value].clone();
1849
1850 const MAX_CHAIN_DEPTH: usize = 5;
1851 let mut iterations = 0;
1852
1853 let final_result = if self.states.contains_key(&actual_state_id) {
1854 loop {
1855 iterations += 1;
1856 if iterations > MAX_CHAIN_DEPTH {
1857 break current_value;
1858 }
1859
1860 let resolved = self
1861 .states
1862 .get(&actual_state_id)
1863 .and_then(|state| {
1864 if let Some(index) = state.lookup_indexes.get(index_name) {
1865 if let Some(found) = index.lookup(¤t_value) {
1866 return Some(found);
1867 }
1868 }
1869
1870 for (name, index) in state.lookup_indexes.iter() {
1871 if name == index_name {
1872 continue;
1873 }
1874 if let Some(found) = index.lookup(¤t_value) {
1875 return Some(found);
1876 }
1877 }
1878
1879 None
1880 })
1881 .unwrap_or(Value::Null);
1882
1883 let mut resolved_from_pda = false;
1884 let resolved = if resolved.is_null() {
1885 if let Some(pda_str) = current_value.as_str() {
1886 resolved_from_pda = true;
1887 self.states
1888 .get_mut(&actual_state_id)
1889 .and_then(|state_mut| {
1890 state_mut
1891 .pda_reverse_lookups
1892 .get_mut("default_pda_lookup")
1893 })
1894 .and_then(|pda_lookup| pda_lookup.lookup(pda_str))
1895 .map(Value::String)
1896 .unwrap_or(Value::Null)
1897 } else {
1898 Value::Null
1899 }
1900 } else {
1901 resolved
1902 };
1903
1904 if resolved.is_null() {
1905 if iterations == 1 {
1906 if let Some(pda_str) = current_value.as_str() {
1907 self.last_pda_lookup_miss = Some(pda_str.to_string());
1908 }
1909 }
1910 break Value::Null;
1911 }
1912
1913 let can_chain =
1914 self.can_resolve_further(&resolved, actual_state_id, index_name);
1915
1916 if !can_chain {
1917 if resolved_from_pda {
1918 if let Some(resolved_str) = resolved.as_str() {
1919 self.last_lookup_index_miss =
1920 Some(resolved_str.to_string());
1921 }
1922 break Value::Null;
1923 }
1924 break resolved;
1925 }
1926
1927 current_value = resolved;
1928 }
1929 } else {
1930 Value::Null
1931 };
1932
1933 self.registers[*dest] = final_result;
1934 pc += 1;
1935 }
1936 OpCode::SetFieldSum {
1937 object,
1938 path,
1939 value,
1940 } => {
1941 let was_updated = self.set_field_sum(*object, path, *value)?;
1942 if was_updated && should_emit(path) {
1943 dirty_tracker.mark_replaced(path);
1944 }
1945 pc += 1;
1946 }
1947 OpCode::SetFieldIncrement { object, path } => {
1948 let was_updated = self.set_field_increment(*object, path)?;
1949 if was_updated && should_emit(path) {
1950 dirty_tracker.mark_replaced(path);
1951 }
1952 pc += 1;
1953 }
1954 OpCode::SetFieldMin {
1955 object,
1956 path,
1957 value,
1958 } => {
1959 let was_updated = self.set_field_min(*object, path, *value)?;
1960 if was_updated && should_emit(path) {
1961 dirty_tracker.mark_replaced(path);
1962 }
1963 pc += 1;
1964 }
1965 OpCode::AddToUniqueSet {
1966 state_id: _,
1967 set_name,
1968 value,
1969 count_object,
1970 count_path,
1971 } => {
1972 let value_to_add = self.registers[*value].clone();
1973
1974 let set_field_path = format!("__unique_set:{}", set_name);
1977
1978 let mut set: HashSet<Value> =
1980 if let Ok(existing) = self.get_field(*count_object, &set_field_path) {
1981 if !existing.is_null() {
1982 serde_json::from_value(existing).unwrap_or_default()
1983 } else {
1984 HashSet::new()
1985 }
1986 } else {
1987 HashSet::new()
1988 };
1989
1990 let was_new = set.insert(value_to_add);
1992
1993 let set_as_vec: Vec<Value> = set.iter().cloned().collect();
1995 self.registers[100] = serde_json::to_value(set_as_vec)?;
1996 self.set_field_auto_vivify(*count_object, &set_field_path, 100)?;
1997
1998 if was_new {
2000 self.registers[100] = Value::Number(serde_json::Number::from(set.len()));
2001 self.set_field_auto_vivify(*count_object, count_path, 100)?;
2002 if should_emit(count_path) {
2003 dirty_tracker.mark_replaced(count_path);
2004 }
2005 }
2006
2007 pc += 1;
2008 }
2009 OpCode::ConditionalSetField {
2010 object,
2011 path,
2012 value,
2013 condition_field,
2014 condition_op,
2015 condition_value,
2016 } => {
2017 let field_value = self.load_field(event_value, condition_field, None)?;
2018 let condition_met =
2019 self.evaluate_comparison(&field_value, condition_op, condition_value)?;
2020
2021 if condition_met {
2022 self.set_field_auto_vivify(*object, path, *value)?;
2023 if should_emit(path) {
2024 dirty_tracker.mark_replaced(path);
2025 }
2026 }
2027 pc += 1;
2028 }
2029 OpCode::SetFieldWhen {
2030 object,
2031 path,
2032 value,
2033 when_instruction,
2034 entity_name,
2035 key_reg,
2036 condition_field,
2037 condition_op,
2038 condition_value,
2039 } => {
2040 let actual_state_id = override_state_id;
2041 let condition_met = if let (Some(field), Some(op), Some(cond_value)) = (
2042 condition_field.as_ref(),
2043 condition_op.as_ref(),
2044 condition_value.as_ref(),
2045 ) {
2046 let field_value = self.load_field(event_value, field, None)?;
2047 self.evaluate_comparison(&field_value, op, cond_value)?
2048 } else {
2049 true
2050 };
2051
2052 if !condition_met {
2053 pc += 1;
2054 continue;
2055 }
2056
2057 let signature = self
2058 .current_context
2059 .as_ref()
2060 .and_then(|c| c.signature.clone())
2061 .unwrap_or_default();
2062
2063 let emit = should_emit(path);
2064
2065 let instruction_seen = if !signature.is_empty() {
2066 if let Some(state) = self.states.get(&actual_state_id) {
2067 let mut cache = state.recent_tx_instructions.lock().unwrap();
2068 cache
2069 .get(&signature)
2070 .map(|set| set.contains(when_instruction))
2071 .unwrap_or(false)
2072 } else {
2073 false
2074 }
2075 } else {
2076 false
2077 };
2078
2079 if instruction_seen {
2080 self.set_field_auto_vivify(*object, path, *value)?;
2081 if emit {
2082 dirty_tracker.mark_replaced(path);
2083 }
2084 } else if !signature.is_empty() {
2085 let deferred = DeferredWhenOperation {
2086 entity_name: entity_name.clone(),
2087 primary_key: self.registers[*key_reg].clone(),
2088 field_path: path.clone(),
2089 field_value: self.registers[*value].clone(),
2090 when_instruction: when_instruction.clone(),
2091 signature: signature.clone(),
2092 slot: self
2093 .current_context
2094 .as_ref()
2095 .and_then(|c| c.slot)
2096 .unwrap_or(0),
2097 deferred_at: std::time::SystemTime::now()
2098 .duration_since(std::time::UNIX_EPOCH)
2099 .unwrap()
2100 .as_secs() as i64,
2101 emit,
2102 };
2103
2104 if let Some(state) = self.states.get(&actual_state_id) {
2105 let key = (signature, when_instruction.clone());
2106 state
2107 .deferred_when_ops
2108 .entry(key)
2109 .or_insert_with(Vec::new)
2110 .push(deferred);
2111 }
2112 }
2113
2114 pc += 1;
2115 }
2116 OpCode::ConditionalIncrement {
2117 object,
2118 path,
2119 condition_field,
2120 condition_op,
2121 condition_value,
2122 } => {
2123 let field_value = self.load_field(event_value, condition_field, None)?;
2124 let condition_met =
2125 self.evaluate_comparison(&field_value, condition_op, condition_value)?;
2126
2127 if condition_met {
2128 let was_updated = self.set_field_increment(*object, path)?;
2129 if was_updated && should_emit(path) {
2130 dirty_tracker.mark_replaced(path);
2131 }
2132 }
2133 pc += 1;
2134 }
2135 OpCode::EvaluateComputedFields {
2136 state,
2137 computed_paths,
2138 } => {
2139 if let Some(evaluator) = entity_evaluator {
2140 let old_values: Vec<_> = computed_paths
2141 .iter()
2142 .map(|path| Self::get_value_at_path(&self.registers[*state], path))
2143 .collect();
2144
2145 let state_value = &mut self.registers[*state];
2146 let eval_result = evaluator(state_value);
2147
2148 if eval_result.is_ok() {
2149 for (path, old_value) in computed_paths.iter().zip(old_values.iter()) {
2150 let new_value =
2151 Self::get_value_at_path(&self.registers[*state], path);
2152
2153 if new_value != *old_value && should_emit(path) {
2154 dirty_tracker.mark_replaced(path);
2155 }
2156 }
2157 }
2158 }
2159 pc += 1;
2160 }
2161 OpCode::UpdatePdaReverseLookup {
2162 state_id: _,
2163 lookup_name,
2164 pda_address,
2165 primary_key,
2166 } => {
2167 let actual_state_id = override_state_id;
2168 let state = self
2169 .states
2170 .get_mut(&actual_state_id)
2171 .ok_or("State table not found")?;
2172
2173 let pda_val = self.registers[*pda_address].clone();
2174 let pk_val = self.registers[*primary_key].clone();
2175
2176 if let (Some(pda_str), Some(pk_str)) = (pda_val.as_str(), pk_val.as_str()) {
2177 let pda_lookup = state
2178 .pda_reverse_lookups
2179 .entry(lookup_name.clone())
2180 .or_insert_with(|| {
2181 PdaReverseLookup::new(DEFAULT_MAX_PDA_REVERSE_LOOKUP_ENTRIES)
2182 });
2183
2184 pda_lookup.insert(pda_str.to_string(), pk_str.to_string());
2185 self.last_pda_registered = Some(pda_str.to_string());
2186 } else if !pk_val.is_null() {
2187 if let Some(pk_num) = pk_val.as_u64() {
2188 if let Some(pda_str) = pda_val.as_str() {
2189 let pda_lookup = state
2190 .pda_reverse_lookups
2191 .entry(lookup_name.clone())
2192 .or_insert_with(|| {
2193 PdaReverseLookup::new(
2194 DEFAULT_MAX_PDA_REVERSE_LOOKUP_ENTRIES,
2195 )
2196 });
2197
2198 pda_lookup.insert(pda_str.to_string(), pk_num.to_string());
2199 self.last_pda_registered = Some(pda_str.to_string());
2200 }
2201 }
2202 }
2203
2204 pc += 1;
2205 }
2206 }
2207
2208 self.instructions_executed += 1;
2209 }
2210
2211 Ok(output)
2212 }
2213
2214 fn load_field(
2215 &self,
2216 event_value: &Value,
2217 path: &FieldPath,
2218 default: Option<&Value>,
2219 ) -> Result<Value> {
2220 if path.segments.is_empty() {
2221 if let Some(obj) = event_value.as_object() {
2222 let filtered: serde_json::Map<String, Value> = obj
2223 .iter()
2224 .filter(|(k, _)| !k.starts_with("__"))
2225 .map(|(k, v)| (k.clone(), v.clone()))
2226 .collect();
2227 return Ok(Value::Object(filtered));
2228 }
2229 return Ok(event_value.clone());
2230 }
2231
2232 let mut current = event_value;
2233 for segment in path.segments.iter() {
2234 current = match current.get(segment) {
2235 Some(v) => v,
2236 None => return Ok(default.cloned().unwrap_or(Value::Null)),
2237 };
2238 }
2239
2240 Ok(current.clone())
2241 }
2242
2243 fn get_value_at_path(value: &Value, path: &str) -> Option<Value> {
2244 let mut current = value;
2245 for segment in path.split('.') {
2246 current = current.get(segment)?;
2247 }
2248 Some(current.clone())
2249 }
2250
2251 fn set_field_auto_vivify(
2252 &mut self,
2253 object_reg: Register,
2254 path: &str,
2255 value_reg: Register,
2256 ) -> Result<()> {
2257 let compiled = self.get_compiled_path(path);
2258 let segments = compiled.segments();
2259 let value = self.registers[value_reg].clone();
2260
2261 if !self.registers[object_reg].is_object() {
2262 self.registers[object_reg] = json!({});
2263 }
2264
2265 let obj = self.registers[object_reg]
2266 .as_object_mut()
2267 .ok_or("Not an object")?;
2268
2269 let mut current = obj;
2270 for (i, segment) in segments.iter().enumerate() {
2271 if i == segments.len() - 1 {
2272 current.insert(segment.to_string(), value);
2273 return Ok(());
2274 } else {
2275 current
2276 .entry(segment.to_string())
2277 .or_insert_with(|| json!({}));
2278 current = current
2279 .get_mut(segment)
2280 .and_then(|v| v.as_object_mut())
2281 .ok_or("Path collision: expected object")?;
2282 }
2283 }
2284
2285 Ok(())
2286 }
2287
2288 fn set_field_if_null(
2289 &mut self,
2290 object_reg: Register,
2291 path: &str,
2292 value_reg: Register,
2293 ) -> Result<bool> {
2294 let compiled = self.get_compiled_path(path);
2295 let segments = compiled.segments();
2296 let value = self.registers[value_reg].clone();
2297
2298 if value.is_null() {
2302 return Ok(false);
2303 }
2304
2305 if !self.registers[object_reg].is_object() {
2306 self.registers[object_reg] = json!({});
2307 }
2308
2309 let obj = self.registers[object_reg]
2310 .as_object_mut()
2311 .ok_or("Not an object")?;
2312
2313 let mut current = obj;
2314 for (i, segment) in segments.iter().enumerate() {
2315 if i == segments.len() - 1 {
2316 if !current.contains_key(segment) || current.get(segment).unwrap().is_null() {
2317 current.insert(segment.to_string(), value);
2318 return Ok(true);
2319 }
2320 return Ok(false);
2321 } else {
2322 current
2323 .entry(segment.to_string())
2324 .or_insert_with(|| json!({}));
2325 current = current
2326 .get_mut(segment)
2327 .and_then(|v| v.as_object_mut())
2328 .ok_or("Path collision: expected object")?;
2329 }
2330 }
2331
2332 Ok(false)
2333 }
2334
2335 fn set_field_max(
2336 &mut self,
2337 object_reg: Register,
2338 path: &str,
2339 value_reg: Register,
2340 ) -> Result<bool> {
2341 let compiled = self.get_compiled_path(path);
2342 let segments = compiled.segments();
2343 let new_value = self.registers[value_reg].clone();
2344
2345 if !self.registers[object_reg].is_object() {
2346 self.registers[object_reg] = json!({});
2347 }
2348
2349 let obj = self.registers[object_reg]
2350 .as_object_mut()
2351 .ok_or("Not an object")?;
2352
2353 let mut current = obj;
2354 for (i, segment) in segments.iter().enumerate() {
2355 if i == segments.len() - 1 {
2356 let should_update = if let Some(current_value) = current.get(segment) {
2357 if current_value.is_null() {
2358 true
2359 } else {
2360 match (current_value.as_i64(), new_value.as_i64()) {
2361 (Some(current_val), Some(new_val)) => new_val > current_val,
2362 (Some(current_val), None) if new_value.as_u64().is_some() => {
2363 new_value.as_u64().unwrap() as i64 > current_val
2364 }
2365 (None, Some(new_val)) if current_value.as_u64().is_some() => {
2366 new_val > current_value.as_u64().unwrap() as i64
2367 }
2368 (None, None) => match (current_value.as_u64(), new_value.as_u64()) {
2369 (Some(current_val), Some(new_val)) => new_val > current_val,
2370 _ => match (current_value.as_f64(), new_value.as_f64()) {
2371 (Some(current_val), Some(new_val)) => new_val > current_val,
2372 _ => false,
2373 },
2374 },
2375 _ => false,
2376 }
2377 }
2378 } else {
2379 true
2380 };
2381
2382 if should_update {
2383 current.insert(segment.to_string(), new_value);
2384 return Ok(true);
2385 }
2386 return Ok(false);
2387 } else {
2388 current
2389 .entry(segment.to_string())
2390 .or_insert_with(|| json!({}));
2391 current = current
2392 .get_mut(segment)
2393 .and_then(|v| v.as_object_mut())
2394 .ok_or("Path collision: expected object")?;
2395 }
2396 }
2397
2398 Ok(false)
2399 }
2400
2401 fn set_field_sum(
2402 &mut self,
2403 object_reg: Register,
2404 path: &str,
2405 value_reg: Register,
2406 ) -> Result<bool> {
2407 let compiled = self.get_compiled_path(path);
2408 let segments = compiled.segments();
2409 let new_value = &self.registers[value_reg];
2410
2411 let new_val_num = new_value
2413 .as_i64()
2414 .or_else(|| new_value.as_u64().map(|n| n as i64))
2415 .ok_or("Sum requires numeric value")?;
2416
2417 if !self.registers[object_reg].is_object() {
2418 self.registers[object_reg] = json!({});
2419 }
2420
2421 let obj = self.registers[object_reg]
2422 .as_object_mut()
2423 .ok_or("Not an object")?;
2424
2425 let mut current = obj;
2426 for (i, segment) in segments.iter().enumerate() {
2427 if i == segments.len() - 1 {
2428 let current_val = current
2429 .get(segment)
2430 .and_then(|v| {
2431 if v.is_null() {
2432 None
2433 } else {
2434 v.as_i64().or_else(|| v.as_u64().map(|n| n as i64))
2435 }
2436 })
2437 .unwrap_or(0);
2438
2439 let sum = current_val + new_val_num;
2440 current.insert(segment.to_string(), json!(sum));
2441 return Ok(true);
2442 } else {
2443 current
2444 .entry(segment.to_string())
2445 .or_insert_with(|| json!({}));
2446 current = current
2447 .get_mut(segment)
2448 .and_then(|v| v.as_object_mut())
2449 .ok_or("Path collision: expected object")?;
2450 }
2451 }
2452
2453 Ok(false)
2454 }
2455
2456 fn set_field_increment(&mut self, object_reg: Register, path: &str) -> Result<bool> {
2457 let compiled = self.get_compiled_path(path);
2458 let segments = compiled.segments();
2459
2460 if !self.registers[object_reg].is_object() {
2461 self.registers[object_reg] = json!({});
2462 }
2463
2464 let obj = self.registers[object_reg]
2465 .as_object_mut()
2466 .ok_or("Not an object")?;
2467
2468 let mut current = obj;
2469 for (i, segment) in segments.iter().enumerate() {
2470 if i == segments.len() - 1 {
2471 let current_val = current
2473 .get(segment)
2474 .and_then(|v| {
2475 if v.is_null() {
2476 None
2477 } else {
2478 v.as_i64().or_else(|| v.as_u64().map(|n| n as i64))
2479 }
2480 })
2481 .unwrap_or(0);
2482
2483 let incremented = current_val + 1;
2484 current.insert(segment.to_string(), json!(incremented));
2485 return Ok(true);
2486 } else {
2487 current
2488 .entry(segment.to_string())
2489 .or_insert_with(|| json!({}));
2490 current = current
2491 .get_mut(segment)
2492 .and_then(|v| v.as_object_mut())
2493 .ok_or("Path collision: expected object")?;
2494 }
2495 }
2496
2497 Ok(false)
2498 }
2499
2500 fn set_field_min(
2501 &mut self,
2502 object_reg: Register,
2503 path: &str,
2504 value_reg: Register,
2505 ) -> Result<bool> {
2506 let compiled = self.get_compiled_path(path);
2507 let segments = compiled.segments();
2508 let new_value = self.registers[value_reg].clone();
2509
2510 if !self.registers[object_reg].is_object() {
2511 self.registers[object_reg] = json!({});
2512 }
2513
2514 let obj = self.registers[object_reg]
2515 .as_object_mut()
2516 .ok_or("Not an object")?;
2517
2518 let mut current = obj;
2519 for (i, segment) in segments.iter().enumerate() {
2520 if i == segments.len() - 1 {
2521 let should_update = if let Some(current_value) = current.get(segment) {
2522 if current_value.is_null() {
2523 true
2524 } else {
2525 match (current_value.as_i64(), new_value.as_i64()) {
2526 (Some(current_val), Some(new_val)) => new_val < current_val,
2527 (Some(current_val), None) if new_value.as_u64().is_some() => {
2528 (new_value.as_u64().unwrap() as i64) < current_val
2529 }
2530 (None, Some(new_val)) if current_value.as_u64().is_some() => {
2531 new_val < current_value.as_u64().unwrap() as i64
2532 }
2533 (None, None) => match (current_value.as_u64(), new_value.as_u64()) {
2534 (Some(current_val), Some(new_val)) => new_val < current_val,
2535 _ => match (current_value.as_f64(), new_value.as_f64()) {
2536 (Some(current_val), Some(new_val)) => new_val < current_val,
2537 _ => false,
2538 },
2539 },
2540 _ => false,
2541 }
2542 }
2543 } else {
2544 true
2545 };
2546
2547 if should_update {
2548 current.insert(segment.to_string(), new_value);
2549 return Ok(true);
2550 }
2551 return Ok(false);
2552 } else {
2553 current
2554 .entry(segment.to_string())
2555 .or_insert_with(|| json!({}));
2556 current = current
2557 .get_mut(segment)
2558 .and_then(|v| v.as_object_mut())
2559 .ok_or("Path collision: expected object")?;
2560 }
2561 }
2562
2563 Ok(false)
2564 }
2565
2566 fn get_field(&mut self, object_reg: Register, path: &str) -> Result<Value> {
2567 let compiled = self.get_compiled_path(path);
2568 let segments = compiled.segments();
2569 let mut current = &self.registers[object_reg];
2570
2571 for segment in segments {
2572 current = current
2573 .get(segment)
2574 .ok_or_else(|| format!("Field not found: {}", segment))?;
2575 }
2576
2577 Ok(current.clone())
2578 }
2579
2580 fn append_to_array(
2581 &mut self,
2582 object_reg: Register,
2583 path: &str,
2584 value_reg: Register,
2585 max_length: usize,
2586 ) -> Result<()> {
2587 let compiled = self.get_compiled_path(path);
2588 let segments = compiled.segments();
2589 let value = self.registers[value_reg].clone();
2590
2591 if !self.registers[object_reg].is_object() {
2592 self.registers[object_reg] = json!({});
2593 }
2594
2595 let obj = self.registers[object_reg]
2596 .as_object_mut()
2597 .ok_or("Not an object")?;
2598
2599 let mut current = obj;
2600 for (i, segment) in segments.iter().enumerate() {
2601 if i == segments.len() - 1 {
2602 current
2603 .entry(segment.to_string())
2604 .or_insert_with(|| json!([]));
2605 let arr = current
2606 .get_mut(segment)
2607 .and_then(|v| v.as_array_mut())
2608 .ok_or("Path is not an array")?;
2609 arr.push(value.clone());
2610
2611 if arr.len() > max_length {
2612 let excess = arr.len() - max_length;
2613 arr.drain(0..excess);
2614 }
2615 } else {
2616 current
2617 .entry(segment.to_string())
2618 .or_insert_with(|| json!({}));
2619 current = current
2620 .get_mut(segment)
2621 .and_then(|v| v.as_object_mut())
2622 .ok_or("Path collision: expected object")?;
2623 }
2624 }
2625
2626 Ok(())
2627 }
2628
2629 fn transform_in_place(&mut self, reg: Register, transformation: &Transformation) -> Result<()> {
2630 let value = &self.registers[reg];
2631 let transformed = self.apply_transformation(value, transformation)?;
2632 self.registers[reg] = transformed;
2633 Ok(())
2634 }
2635
2636 fn apply_transformation(
2637 &self,
2638 value: &Value,
2639 transformation: &Transformation,
2640 ) -> Result<Value> {
2641 match transformation {
2642 Transformation::HexEncode => {
2643 if let Some(arr) = value.as_array() {
2644 let bytes: Vec<u8> = arr
2645 .iter()
2646 .filter_map(|v| v.as_u64().map(|n| n as u8))
2647 .collect();
2648 let hex = hex::encode(&bytes);
2649 Ok(json!(hex))
2650 } else {
2651 Err("HexEncode requires an array of numbers".into())
2652 }
2653 }
2654 Transformation::HexDecode => {
2655 if let Some(s) = value.as_str() {
2656 let s = s.strip_prefix("0x").unwrap_or(s);
2657 let bytes = hex::decode(s).map_err(|e| format!("Hex decode error: {}", e))?;
2658 Ok(json!(bytes))
2659 } else {
2660 Err("HexDecode requires a string".into())
2661 }
2662 }
2663 Transformation::Base58Encode => {
2664 if let Some(arr) = value.as_array() {
2665 let bytes: Vec<u8> = arr
2666 .iter()
2667 .filter_map(|v| v.as_u64().map(|n| n as u8))
2668 .collect();
2669 let encoded = bs58::encode(&bytes).into_string();
2670 Ok(json!(encoded))
2671 } else if value.is_string() {
2672 Ok(value.clone())
2673 } else {
2674 Err("Base58Encode requires an array of numbers".into())
2675 }
2676 }
2677 Transformation::Base58Decode => {
2678 if let Some(s) = value.as_str() {
2679 let bytes = bs58::decode(s)
2680 .into_vec()
2681 .map_err(|e| format!("Base58 decode error: {}", e))?;
2682 Ok(json!(bytes))
2683 } else {
2684 Err("Base58Decode requires a string".into())
2685 }
2686 }
2687 Transformation::ToString => Ok(json!(value.to_string())),
2688 Transformation::ToNumber => {
2689 if let Some(s) = value.as_str() {
2690 let n = s
2691 .parse::<i64>()
2692 .map_err(|e| format!("Parse error: {}", e))?;
2693 Ok(json!(n))
2694 } else {
2695 Ok(value.clone())
2696 }
2697 }
2698 }
2699 }
2700
2701 fn evaluate_comparison(
2702 &self,
2703 field_value: &Value,
2704 op: &ComparisonOp,
2705 condition_value: &Value,
2706 ) -> Result<bool> {
2707 use ComparisonOp::*;
2708
2709 match op {
2710 Equal => Ok(field_value == condition_value),
2711 NotEqual => Ok(field_value != condition_value),
2712 GreaterThan => {
2713 match (field_value.as_i64(), condition_value.as_i64()) {
2715 (Some(a), Some(b)) => Ok(a > b),
2716 _ => match (field_value.as_u64(), condition_value.as_u64()) {
2717 (Some(a), Some(b)) => Ok(a > b),
2718 _ => match (field_value.as_f64(), condition_value.as_f64()) {
2719 (Some(a), Some(b)) => Ok(a > b),
2720 _ => Err("Cannot compare non-numeric values with GreaterThan".into()),
2721 },
2722 },
2723 }
2724 }
2725 GreaterThanOrEqual => match (field_value.as_i64(), condition_value.as_i64()) {
2726 (Some(a), Some(b)) => Ok(a >= b),
2727 _ => match (field_value.as_u64(), condition_value.as_u64()) {
2728 (Some(a), Some(b)) => Ok(a >= b),
2729 _ => match (field_value.as_f64(), condition_value.as_f64()) {
2730 (Some(a), Some(b)) => Ok(a >= b),
2731 _ => {
2732 Err("Cannot compare non-numeric values with GreaterThanOrEqual".into())
2733 }
2734 },
2735 },
2736 },
2737 LessThan => match (field_value.as_i64(), condition_value.as_i64()) {
2738 (Some(a), Some(b)) => Ok(a < b),
2739 _ => match (field_value.as_u64(), condition_value.as_u64()) {
2740 (Some(a), Some(b)) => Ok(a < b),
2741 _ => match (field_value.as_f64(), condition_value.as_f64()) {
2742 (Some(a), Some(b)) => Ok(a < b),
2743 _ => Err("Cannot compare non-numeric values with LessThan".into()),
2744 },
2745 },
2746 },
2747 LessThanOrEqual => match (field_value.as_i64(), condition_value.as_i64()) {
2748 (Some(a), Some(b)) => Ok(a <= b),
2749 _ => match (field_value.as_u64(), condition_value.as_u64()) {
2750 (Some(a), Some(b)) => Ok(a <= b),
2751 _ => match (field_value.as_f64(), condition_value.as_f64()) {
2752 (Some(a), Some(b)) => Ok(a <= b),
2753 _ => Err("Cannot compare non-numeric values with LessThanOrEqual".into()),
2754 },
2755 },
2756 },
2757 }
2758 }
2759
2760 fn can_resolve_further(&self, value: &Value, state_id: u32, index_name: &str) -> bool {
2761 if let Some(state) = self.states.get(&state_id) {
2762 if let Some(index) = state.lookup_indexes.get(index_name) {
2763 if index.lookup(value).is_some() {
2764 return true;
2765 }
2766 }
2767
2768 for (name, index) in state.lookup_indexes.iter() {
2769 if name == index_name {
2770 continue;
2771 }
2772 if index.lookup(value).is_some() {
2773 return true;
2774 }
2775 }
2776
2777 if let Some(pda_str) = value.as_str() {
2778 if let Some(pda_lookup) = state.pda_reverse_lookups.get("default_pda_lookup") {
2779 if pda_lookup.contains(pda_str) {
2780 return true;
2781 }
2782 }
2783 }
2784 }
2785
2786 false
2787 }
2788
2789 fn apply_deferred_when_op(
2790 &mut self,
2791 state_id: u32,
2792 op: &DeferredWhenOperation,
2793 ) -> Result<Vec<Mutation>> {
2794 let state = self.states.get(&state_id).ok_or("State not found")?;
2795
2796 if op.primary_key.is_null() {
2797 return Ok(vec![]);
2798 }
2799
2800 let mut entity_state = state
2801 .get_and_touch(&op.primary_key)
2802 .unwrap_or_else(|| json!({}));
2803
2804 Self::set_nested_field_value(&mut entity_state, &op.field_path, op.field_value.clone())?;
2805
2806 state.insert_with_eviction(op.primary_key.clone(), entity_state);
2807
2808 if !op.emit {
2809 return Ok(vec![]);
2810 }
2811
2812 let mut patch = json!({});
2813 Self::set_nested_field_value(&mut patch, &op.field_path, op.field_value.clone())?;
2814
2815 Ok(vec![Mutation {
2816 export: op.entity_name.clone(),
2817 key: op.primary_key.clone(),
2818 patch,
2819 append: vec![],
2820 }])
2821 }
2822
2823 fn set_nested_field_value(obj: &mut Value, path: &str, value: Value) -> Result<()> {
2824 let parts: Vec<&str> = path.split('.').collect();
2825 let mut current = obj;
2826
2827 for (i, part) in parts.iter().enumerate() {
2828 if i == parts.len() - 1 {
2829 if let Some(map) = current.as_object_mut() {
2830 map.insert(part.to_string(), value);
2831 return Ok(());
2832 }
2833 return Err("Cannot set field on non-object".into());
2834 }
2835
2836 if current.get(*part).is_none() || !current.get(*part).unwrap().is_object() {
2837 if let Some(map) = current.as_object_mut() {
2838 map.insert(part.to_string(), json!({}));
2839 }
2840 }
2841
2842 current = current.get_mut(*part).ok_or("Path navigation failed")?;
2843 }
2844
2845 Ok(())
2846 }
2847
2848 pub fn cleanup_expired_when_ops(&mut self, state_id: u32, max_age_secs: i64) -> usize {
2849 let now = std::time::SystemTime::now()
2850 .duration_since(std::time::UNIX_EPOCH)
2851 .unwrap()
2852 .as_secs() as i64;
2853
2854 let state = match self.states.get(&state_id) {
2855 Some(s) => s,
2856 None => return 0,
2857 };
2858
2859 let mut removed = 0;
2860 state.deferred_when_ops.retain(|_, ops| {
2861 let before = ops.len();
2862 ops.retain(|op| now - op.deferred_at < max_age_secs);
2863 removed += before - ops.len();
2864 !ops.is_empty()
2865 });
2866
2867 removed
2868 }
2869
2870 #[cfg_attr(feature = "otel", instrument(
2879 name = "vm.update_pda_lookup",
2880 skip(self),
2881 fields(
2882 pda = %pda_address,
2883 seed = %seed_value,
2884 )
2885 ))]
2886 pub fn update_pda_reverse_lookup(
2887 &mut self,
2888 state_id: u32,
2889 lookup_name: &str,
2890 pda_address: String,
2891 seed_value: String,
2892 ) -> Result<Vec<PendingAccountUpdate>> {
2893 let state = self
2894 .states
2895 .get_mut(&state_id)
2896 .ok_or("State table not found")?;
2897
2898 let lookup = state
2899 .pda_reverse_lookups
2900 .entry(lookup_name.to_string())
2901 .or_insert_with(|| PdaReverseLookup::new(DEFAULT_MAX_PDA_REVERSE_LOOKUP_ENTRIES));
2902
2903 let evicted_pda = lookup.insert(pda_address.clone(), seed_value);
2904
2905 if let Some(ref evicted) = evicted_pda {
2906 if let Some((_, evicted_updates)) = state.pending_updates.remove(evicted) {
2907 let count = evicted_updates.len();
2908 self.pending_queue_size = self.pending_queue_size.saturating_sub(count as u64);
2909 }
2910 }
2911
2912 self.flush_pending_updates(state_id, &pda_address)
2914 }
2915
2916 pub fn cleanup_expired_pending_updates(&mut self, state_id: u32) -> usize {
2921 let state = match self.states.get_mut(&state_id) {
2922 Some(s) => s,
2923 None => return 0,
2924 };
2925
2926 let now = std::time::SystemTime::now()
2927 .duration_since(std::time::UNIX_EPOCH)
2928 .unwrap()
2929 .as_secs() as i64;
2930
2931 let mut removed_count = 0;
2932
2933 state.pending_updates.retain(|_pda_address, updates| {
2935 let original_len = updates.len();
2936
2937 updates.retain(|update| {
2938 let age = now - update.queued_at;
2939 age <= PENDING_UPDATE_TTL_SECONDS
2940 });
2941
2942 removed_count += original_len - updates.len();
2943
2944 !updates.is_empty()
2946 });
2947
2948 self.pending_queue_size = self.pending_queue_size.saturating_sub(removed_count as u64);
2950
2951 if removed_count > 0 {
2952 #[cfg(feature = "otel")]
2953 crate::vm_metrics::record_pending_updates_expired(
2954 removed_count as u64,
2955 &state.entity_name,
2956 );
2957 }
2958
2959 removed_count
2960 }
2961
2962 #[cfg_attr(feature = "otel", instrument(
2994 name = "vm.queue_account_update",
2995 skip(self, update),
2996 fields(
2997 pda = %update.pda_address,
2998 account_type = %update.account_type,
2999 slot = update.slot,
3000 )
3001 ))]
3002 pub fn queue_account_update(
3003 &mut self,
3004 state_id: u32,
3005 update: QueuedAccountUpdate,
3006 ) -> Result<()> {
3007 if self.pending_queue_size >= MAX_PENDING_UPDATES_TOTAL as u64 {
3008 self.cleanup_expired_pending_updates(state_id);
3009 if self.pending_queue_size >= MAX_PENDING_UPDATES_TOTAL as u64 {
3010 self.drop_oldest_pending_update(state_id)?;
3011 }
3012 }
3013
3014 let state = self
3015 .states
3016 .get_mut(&state_id)
3017 .ok_or("State table not found")?;
3018
3019 let pending = PendingAccountUpdate {
3020 account_type: update.account_type,
3021 pda_address: update.pda_address.clone(),
3022 account_data: update.account_data,
3023 slot: update.slot,
3024 write_version: update.write_version,
3025 signature: update.signature,
3026 queued_at: std::time::SystemTime::now()
3027 .duration_since(std::time::UNIX_EPOCH)
3028 .unwrap()
3029 .as_secs() as i64,
3030 };
3031
3032 let pda_address = pending.pda_address.clone();
3033 let slot = pending.slot;
3034
3035 let mut updates = state
3036 .pending_updates
3037 .entry(pda_address.clone())
3038 .or_insert_with(Vec::new);
3039
3040 let original_len = updates.len();
3041 updates.retain(|existing| existing.slot > slot);
3042 let removed_by_dedup = original_len - updates.len();
3043
3044 if removed_by_dedup > 0 {
3045 self.pending_queue_size = self
3046 .pending_queue_size
3047 .saturating_sub(removed_by_dedup as u64);
3048 }
3049
3050 if updates.len() >= MAX_PENDING_UPDATES_PER_PDA {
3051 updates.remove(0);
3052 self.pending_queue_size = self.pending_queue_size.saturating_sub(1);
3053 }
3054
3055 updates.push(pending);
3056 #[cfg(feature = "otel")]
3057 crate::vm_metrics::record_pending_update_queued(&state.entity_name);
3058
3059 Ok(())
3060 }
3061
3062 pub fn queue_instruction_event(
3063 &mut self,
3064 state_id: u32,
3065 event: QueuedInstructionEvent,
3066 ) -> Result<()> {
3067 let state = self
3068 .states
3069 .get_mut(&state_id)
3070 .ok_or("State table not found")?;
3071
3072 let pda_address = event.pda_address.clone();
3073
3074 let pending = PendingInstructionEvent {
3075 event_type: event.event_type,
3076 pda_address: event.pda_address,
3077 event_data: event.event_data,
3078 slot: event.slot,
3079 signature: event.signature,
3080 queued_at: std::time::SystemTime::now()
3081 .duration_since(std::time::UNIX_EPOCH)
3082 .unwrap()
3083 .as_secs() as i64,
3084 };
3085
3086 let mut events = state
3087 .pending_instruction_events
3088 .entry(pda_address)
3089 .or_insert_with(Vec::new);
3090
3091 if events.len() >= MAX_PENDING_UPDATES_PER_PDA {
3092 events.remove(0);
3093 }
3094
3095 events.push(pending);
3096
3097 Ok(())
3098 }
3099
3100 pub fn take_last_pda_lookup_miss(&mut self) -> Option<String> {
3101 self.last_pda_lookup_miss.take()
3102 }
3103
3104 pub fn take_last_lookup_index_miss(&mut self) -> Option<String> {
3105 self.last_lookup_index_miss.take()
3106 }
3107
3108 pub fn take_last_pda_registered(&mut self) -> Option<String> {
3109 self.last_pda_registered.take()
3110 }
3111
3112 pub fn take_last_lookup_index_keys(&mut self) -> Vec<String> {
3113 std::mem::take(&mut self.last_lookup_index_keys)
3114 }
3115
3116 pub fn flush_pending_instruction_events(
3117 &mut self,
3118 state_id: u32,
3119 pda_address: &str,
3120 ) -> Vec<PendingInstructionEvent> {
3121 let state = match self.states.get_mut(&state_id) {
3122 Some(s) => s,
3123 None => return Vec::new(),
3124 };
3125
3126 if let Some((_, events)) = state.pending_instruction_events.remove(pda_address) {
3127 events
3128 } else {
3129 Vec::new()
3130 }
3131 }
3132
3133 pub fn get_pending_queue_stats(&self, state_id: u32) -> Option<PendingQueueStats> {
3135 let state = self.states.get(&state_id)?;
3136
3137 let now = std::time::SystemTime::now()
3138 .duration_since(std::time::UNIX_EPOCH)
3139 .unwrap()
3140 .as_secs() as i64;
3141
3142 let mut total_updates = 0;
3143 let mut oldest_timestamp = now;
3144 let mut largest_pda_queue = 0;
3145 let mut estimated_memory = 0;
3146
3147 for entry in state.pending_updates.iter() {
3148 let (_, updates) = entry.pair();
3149 total_updates += updates.len();
3150 largest_pda_queue = largest_pda_queue.max(updates.len());
3151
3152 for update in updates.iter() {
3153 oldest_timestamp = oldest_timestamp.min(update.queued_at);
3154 estimated_memory += update.account_type.len() +
3156 update.pda_address.len() +
3157 update.signature.len() +
3158 16 + estimate_json_size(&update.account_data);
3160 }
3161 }
3162
3163 Some(PendingQueueStats {
3164 total_updates,
3165 unique_pdas: state.pending_updates.len(),
3166 oldest_age_seconds: now - oldest_timestamp,
3167 largest_pda_queue_size: largest_pda_queue,
3168 estimated_memory_bytes: estimated_memory,
3169 })
3170 }
3171
3172 pub fn get_memory_stats(&self, state_id: u32) -> VmMemoryStats {
3173 let mut stats = VmMemoryStats {
3174 path_cache_size: self.path_cache.len(),
3175 ..Default::default()
3176 };
3177
3178 if let Some(state) = self.states.get(&state_id) {
3179 stats.state_table_entity_count = state.data.len();
3180 stats.state_table_max_entries = state.config.max_entries;
3181 stats.state_table_at_capacity = state.is_at_capacity();
3182
3183 stats.lookup_index_count = state.lookup_indexes.len();
3184 stats.lookup_index_total_entries =
3185 state.lookup_indexes.values().map(|idx| idx.len()).sum();
3186
3187 stats.temporal_index_count = state.temporal_indexes.len();
3188 stats.temporal_index_total_entries = state
3189 .temporal_indexes
3190 .values()
3191 .map(|idx| idx.total_entries())
3192 .sum();
3193
3194 stats.pda_reverse_lookup_count = state.pda_reverse_lookups.len();
3195 stats.pda_reverse_lookup_total_entries = state
3196 .pda_reverse_lookups
3197 .values()
3198 .map(|lookup| lookup.len())
3199 .sum();
3200
3201 stats.version_tracker_entries = state.version_tracker.len();
3202
3203 stats.pending_queue_stats = self.get_pending_queue_stats(state_id);
3204 }
3205
3206 stats
3207 }
3208
3209 pub fn cleanup_all_expired(&mut self, state_id: u32) -> CleanupResult {
3210 let pending_removed = self.cleanup_expired_pending_updates(state_id);
3211 let temporal_removed = self.cleanup_temporal_indexes(state_id);
3212
3213 #[cfg(feature = "otel")]
3214 if let Some(state) = self.states.get(&state_id) {
3215 crate::vm_metrics::record_cleanup(
3216 pending_removed,
3217 temporal_removed,
3218 &state.entity_name,
3219 );
3220 }
3221
3222 CleanupResult {
3223 pending_updates_removed: pending_removed,
3224 temporal_entries_removed: temporal_removed,
3225 }
3226 }
3227
3228 fn cleanup_temporal_indexes(&mut self, state_id: u32) -> usize {
3229 let state = match self.states.get_mut(&state_id) {
3230 Some(s) => s,
3231 None => return 0,
3232 };
3233
3234 let now = std::time::SystemTime::now()
3235 .duration_since(std::time::UNIX_EPOCH)
3236 .unwrap()
3237 .as_secs() as i64;
3238
3239 let cutoff = now - TEMPORAL_HISTORY_TTL_SECONDS;
3240 let mut total_removed = 0;
3241
3242 for (_, index) in state.temporal_indexes.iter_mut() {
3243 total_removed += index.cleanup_expired(cutoff);
3244 }
3245
3246 total_removed
3247 }
3248
3249 pub fn check_state_table_capacity(&self, state_id: u32) -> Option<CapacityWarning> {
3250 let state = self.states.get(&state_id)?;
3251
3252 if state.is_at_capacity() {
3253 Some(CapacityWarning {
3254 current_entries: state.data.len(),
3255 max_entries: state.config.max_entries,
3256 entries_over_limit: state.entries_over_limit(),
3257 })
3258 } else {
3259 None
3260 }
3261 }
3262
3263 fn drop_oldest_pending_update(&mut self, state_id: u32) -> Result<()> {
3265 let state = self
3266 .states
3267 .get_mut(&state_id)
3268 .ok_or("State table not found")?;
3269
3270 let mut oldest_pda: Option<String> = None;
3271 let mut oldest_timestamp = i64::MAX;
3272
3273 for entry in state.pending_updates.iter() {
3275 let (pda, updates) = entry.pair();
3276 if let Some(update) = updates.first() {
3277 if update.queued_at < oldest_timestamp {
3278 oldest_timestamp = update.queued_at;
3279 oldest_pda = Some(pda.clone());
3280 }
3281 }
3282 }
3283
3284 if let Some(pda) = oldest_pda {
3286 if let Some(mut updates) = state.pending_updates.get_mut(&pda) {
3287 if !updates.is_empty() {
3288 updates.remove(0);
3289 self.pending_queue_size = self.pending_queue_size.saturating_sub(1);
3290
3291 if updates.is_empty() {
3293 drop(updates);
3294 state.pending_updates.remove(&pda);
3295 }
3296 }
3297 }
3298 }
3299
3300 Ok(())
3301 }
3302
3303 fn flush_pending_updates(
3308 &mut self,
3309 state_id: u32,
3310 pda_address: &str,
3311 ) -> Result<Vec<PendingAccountUpdate>> {
3312 let state = self
3313 .states
3314 .get_mut(&state_id)
3315 .ok_or("State table not found")?;
3316
3317 if let Some((_, pending_updates)) = state.pending_updates.remove(pda_address) {
3318 let count = pending_updates.len();
3319 self.pending_queue_size = self.pending_queue_size.saturating_sub(count as u64);
3320 #[cfg(feature = "otel")]
3321 crate::vm_metrics::record_pending_updates_flushed(count as u64, &state.entity_name);
3322 Ok(pending_updates)
3323 } else {
3324 Ok(Vec::new())
3325 }
3326 }
3327
3328 pub fn try_pda_reverse_lookup(
3330 &mut self,
3331 state_id: u32,
3332 lookup_name: &str,
3333 pda_address: &str,
3334 ) -> Option<String> {
3335 let state = self.states.get_mut(&state_id)?;
3336
3337 if let Some(lookup) = state.pda_reverse_lookups.get_mut(lookup_name) {
3338 if let Some(value) = lookup.lookup(pda_address) {
3339 self.pda_cache_hits += 1;
3340 return Some(value);
3341 }
3342 }
3343
3344 self.pda_cache_misses += 1;
3345 None
3346 }
3347
3348 pub fn evaluate_computed_expr(&self, expr: &ComputedExpr, state: &Value) -> Result<Value> {
3355 self.evaluate_computed_expr_with_env(expr, state, &std::collections::HashMap::new())
3356 }
3357
3358 fn evaluate_computed_expr_with_env(
3360 &self,
3361 expr: &ComputedExpr,
3362 state: &Value,
3363 env: &std::collections::HashMap<String, Value>,
3364 ) -> Result<Value> {
3365 match expr {
3366 ComputedExpr::FieldRef { path } => self.get_field_from_state(state, path),
3367
3368 ComputedExpr::Var { name } => env
3369 .get(name)
3370 .cloned()
3371 .ok_or_else(|| format!("Undefined variable: {}", name).into()),
3372
3373 ComputedExpr::Let { name, value, body } => {
3374 let val = self.evaluate_computed_expr_with_env(value, state, env)?;
3375 let mut new_env = env.clone();
3376 new_env.insert(name.clone(), val);
3377 self.evaluate_computed_expr_with_env(body, state, &new_env)
3378 }
3379
3380 ComputedExpr::If {
3381 condition,
3382 then_branch,
3383 else_branch,
3384 } => {
3385 let cond_val = self.evaluate_computed_expr_with_env(condition, state, env)?;
3386 if self.value_to_bool(&cond_val) {
3387 self.evaluate_computed_expr_with_env(then_branch, state, env)
3388 } else {
3389 self.evaluate_computed_expr_with_env(else_branch, state, env)
3390 }
3391 }
3392
3393 ComputedExpr::None => Ok(Value::Null),
3394
3395 ComputedExpr::Some { value } => self.evaluate_computed_expr_with_env(value, state, env),
3396
3397 ComputedExpr::Slice { expr, start, end } => {
3398 let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
3399 match val {
3400 Value::Array(arr) => {
3401 let slice: Vec<Value> = arr.get(*start..*end).unwrap_or(&[]).to_vec();
3402 Ok(Value::Array(slice))
3403 }
3404 _ => Err(format!("Cannot slice non-array value: {:?}", val).into()),
3405 }
3406 }
3407
3408 ComputedExpr::Index { expr, index } => {
3409 let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
3410 match val {
3411 Value::Array(arr) => Ok(arr.get(*index).cloned().unwrap_or(Value::Null)),
3412 _ => Err(format!("Cannot index non-array value: {:?}", val).into()),
3413 }
3414 }
3415
3416 ComputedExpr::U64FromLeBytes { bytes } => {
3417 let val = self.evaluate_computed_expr_with_env(bytes, state, env)?;
3418 let byte_vec = self.value_to_bytes(&val)?;
3419 if byte_vec.len() < 8 {
3420 return Err(format!(
3421 "u64::from_le_bytes requires 8 bytes, got {}",
3422 byte_vec.len()
3423 )
3424 .into());
3425 }
3426 let arr: [u8; 8] = byte_vec[..8]
3427 .try_into()
3428 .map_err(|_| "Failed to convert to [u8; 8]")?;
3429 Ok(json!(u64::from_le_bytes(arr)))
3430 }
3431
3432 ComputedExpr::U64FromBeBytes { bytes } => {
3433 let val = self.evaluate_computed_expr_with_env(bytes, state, env)?;
3434 let byte_vec = self.value_to_bytes(&val)?;
3435 if byte_vec.len() < 8 {
3436 return Err(format!(
3437 "u64::from_be_bytes requires 8 bytes, got {}",
3438 byte_vec.len()
3439 )
3440 .into());
3441 }
3442 let arr: [u8; 8] = byte_vec[..8]
3443 .try_into()
3444 .map_err(|_| "Failed to convert to [u8; 8]")?;
3445 Ok(json!(u64::from_be_bytes(arr)))
3446 }
3447
3448 ComputedExpr::ByteArray { bytes } => {
3449 Ok(Value::Array(bytes.iter().map(|b| json!(*b)).collect()))
3450 }
3451
3452 ComputedExpr::Closure { param, body } => {
3453 Ok(json!({
3456 "__closure": {
3457 "param": param,
3458 "body": serde_json::to_value(body).unwrap_or(Value::Null)
3459 }
3460 }))
3461 }
3462
3463 ComputedExpr::Unary { op, expr } => {
3464 let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
3465 self.apply_unary_op(op, &val)
3466 }
3467
3468 ComputedExpr::JsonToBytes { expr } => {
3469 let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
3470 let bytes = self.value_to_bytes(&val)?;
3472 Ok(Value::Array(bytes.iter().map(|b| json!(*b)).collect()))
3473 }
3474
3475 ComputedExpr::UnwrapOr { expr, default } => {
3476 let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
3477 if val.is_null() {
3478 Ok(default.clone())
3479 } else {
3480 Ok(val)
3481 }
3482 }
3483
3484 ComputedExpr::Binary { op, left, right } => {
3485 let l = self.evaluate_computed_expr_with_env(left, state, env)?;
3486 let r = self.evaluate_computed_expr_with_env(right, state, env)?;
3487 self.apply_binary_op(op, &l, &r)
3488 }
3489
3490 ComputedExpr::Cast { expr, to_type } => {
3491 let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
3492 self.apply_cast(&val, to_type)
3493 }
3494
3495 ComputedExpr::MethodCall { expr, method, args } => {
3496 let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
3497 if method == "map" && args.len() == 1 {
3499 if let ComputedExpr::Closure { param, body } = &args[0] {
3500 if val.is_null() {
3502 return Ok(Value::Null);
3503 }
3504 let mut closure_env = env.clone();
3506 closure_env.insert(param.clone(), val);
3507 return self.evaluate_computed_expr_with_env(body, state, &closure_env);
3508 }
3509 }
3510 let evaluated_args: Vec<Value> = args
3511 .iter()
3512 .map(|a| self.evaluate_computed_expr_with_env(a, state, env))
3513 .collect::<Result<Vec<_>>>()?;
3514 self.apply_method_call(&val, method, &evaluated_args)
3515 }
3516
3517 ComputedExpr::Literal { value } => Ok(value.clone()),
3518
3519 ComputedExpr::Paren { expr } => self.evaluate_computed_expr_with_env(expr, state, env),
3520 }
3521 }
3522
3523 fn value_to_bytes(&self, val: &Value) -> Result<Vec<u8>> {
3525 match val {
3526 Value::Array(arr) => arr
3527 .iter()
3528 .map(|v| {
3529 v.as_u64()
3530 .map(|n| n as u8)
3531 .ok_or_else(|| "Array element not a valid byte".into())
3532 })
3533 .collect(),
3534 Value::String(s) => {
3535 if s.starts_with("0x") || s.starts_with("0X") {
3537 hex::decode(&s[2..]).map_err(|e| format!("Invalid hex string: {}", e).into())
3538 } else {
3539 hex::decode(s).map_err(|e| format!("Invalid hex string: {}", e).into())
3540 }
3541 }
3542 _ => Err(format!("Cannot convert {:?} to bytes", val).into()),
3543 }
3544 }
3545
3546 fn apply_unary_op(&self, op: &crate::ast::UnaryOp, val: &Value) -> Result<Value> {
3548 use crate::ast::UnaryOp;
3549 match op {
3550 UnaryOp::Not => Ok(json!(!self.value_to_bool(val))),
3551 UnaryOp::ReverseBits => match val.as_u64() {
3552 Some(n) => Ok(json!(n.reverse_bits())),
3553 None => match val.as_i64() {
3554 Some(n) => Ok(json!((n as u64).reverse_bits())),
3555 None => Err("reverse_bits requires an integer".into()),
3556 },
3557 },
3558 }
3559 }
3560
3561 fn get_field_from_state(&self, state: &Value, path: &str) -> Result<Value> {
3563 let segments: Vec<&str> = path.split('.').collect();
3564 let mut current = state;
3565
3566 for segment in segments {
3567 match current.get(segment) {
3568 Some(v) => current = v,
3569 None => return Ok(Value::Null),
3570 }
3571 }
3572
3573 Ok(current.clone())
3574 }
3575
3576 fn apply_binary_op(&self, op: &BinaryOp, left: &Value, right: &Value) -> Result<Value> {
3578 match op {
3579 BinaryOp::Add => self.numeric_op(left, right, |a, b| a + b, |a, b| a + b),
3581 BinaryOp::Sub => self.numeric_op(left, right, |a, b| a - b, |a, b| a - b),
3582 BinaryOp::Mul => self.numeric_op(left, right, |a, b| a * b, |a, b| a * b),
3583 BinaryOp::Div => {
3584 if let Some(r) = right.as_i64() {
3586 if r == 0 {
3587 return Err("Division by zero".into());
3588 }
3589 }
3590 if let Some(r) = right.as_f64() {
3591 if r == 0.0 {
3592 return Err("Division by zero".into());
3593 }
3594 }
3595 self.numeric_op(left, right, |a, b| a / b, |a, b| a / b)
3596 }
3597 BinaryOp::Mod => {
3598 match (left.as_i64(), right.as_i64()) {
3600 (Some(a), Some(b)) if b != 0 => Ok(json!(a % b)),
3601 (None, _) | (_, None) => match (left.as_u64(), right.as_u64()) {
3602 (Some(a), Some(b)) if b != 0 => Ok(json!(a % b)),
3603 _ => Err("Modulo requires non-zero integer operands".into()),
3604 },
3605 _ => Err("Modulo by zero".into()),
3606 }
3607 }
3608
3609 BinaryOp::Gt => self.comparison_op(left, right, |a, b| a > b, |a, b| a > b),
3611 BinaryOp::Lt => self.comparison_op(left, right, |a, b| a < b, |a, b| a < b),
3612 BinaryOp::Gte => self.comparison_op(left, right, |a, b| a >= b, |a, b| a >= b),
3613 BinaryOp::Lte => self.comparison_op(left, right, |a, b| a <= b, |a, b| a <= b),
3614 BinaryOp::Eq => Ok(json!(left == right)),
3615 BinaryOp::Ne => Ok(json!(left != right)),
3616
3617 BinaryOp::And => {
3619 let l_bool = self.value_to_bool(left);
3620 let r_bool = self.value_to_bool(right);
3621 Ok(json!(l_bool && r_bool))
3622 }
3623 BinaryOp::Or => {
3624 let l_bool = self.value_to_bool(left);
3625 let r_bool = self.value_to_bool(right);
3626 Ok(json!(l_bool || r_bool))
3627 }
3628
3629 BinaryOp::Xor => match (left.as_u64(), right.as_u64()) {
3631 (Some(a), Some(b)) => Ok(json!(a ^ b)),
3632 _ => match (left.as_i64(), right.as_i64()) {
3633 (Some(a), Some(b)) => Ok(json!(a ^ b)),
3634 _ => Err("XOR requires integer operands".into()),
3635 },
3636 },
3637 BinaryOp::BitAnd => match (left.as_u64(), right.as_u64()) {
3638 (Some(a), Some(b)) => Ok(json!(a & b)),
3639 _ => match (left.as_i64(), right.as_i64()) {
3640 (Some(a), Some(b)) => Ok(json!(a & b)),
3641 _ => Err("BitAnd requires integer operands".into()),
3642 },
3643 },
3644 BinaryOp::BitOr => match (left.as_u64(), right.as_u64()) {
3645 (Some(a), Some(b)) => Ok(json!(a | b)),
3646 _ => match (left.as_i64(), right.as_i64()) {
3647 (Some(a), Some(b)) => Ok(json!(a | b)),
3648 _ => Err("BitOr requires integer operands".into()),
3649 },
3650 },
3651 BinaryOp::Shl => match (left.as_u64(), right.as_u64()) {
3652 (Some(a), Some(b)) => Ok(json!(a << b)),
3653 _ => match (left.as_i64(), right.as_i64()) {
3654 (Some(a), Some(b)) => Ok(json!(a << b)),
3655 _ => Err("Shl requires integer operands".into()),
3656 },
3657 },
3658 BinaryOp::Shr => match (left.as_u64(), right.as_u64()) {
3659 (Some(a), Some(b)) => Ok(json!(a >> b)),
3660 _ => match (left.as_i64(), right.as_i64()) {
3661 (Some(a), Some(b)) => Ok(json!(a >> b)),
3662 _ => Err("Shr requires integer operands".into()),
3663 },
3664 },
3665 }
3666 }
3667
3668 fn numeric_op<F1, F2>(
3670 &self,
3671 left: &Value,
3672 right: &Value,
3673 int_op: F1,
3674 float_op: F2,
3675 ) -> Result<Value>
3676 where
3677 F1: Fn(i64, i64) -> i64,
3678 F2: Fn(f64, f64) -> f64,
3679 {
3680 if let (Some(a), Some(b)) = (left.as_i64(), right.as_i64()) {
3682 return Ok(json!(int_op(a, b)));
3683 }
3684
3685 if let (Some(a), Some(b)) = (left.as_u64(), right.as_u64()) {
3687 return Ok(json!(int_op(a as i64, b as i64)));
3689 }
3690
3691 if let (Some(a), Some(b)) = (left.as_f64(), right.as_f64()) {
3693 return Ok(json!(float_op(a, b)));
3694 }
3695
3696 if left.is_null() || right.is_null() {
3698 return Ok(Value::Null);
3699 }
3700
3701 Err(format!(
3702 "Cannot perform numeric operation on {:?} and {:?}",
3703 left, right
3704 )
3705 .into())
3706 }
3707
3708 fn comparison_op<F1, F2>(
3710 &self,
3711 left: &Value,
3712 right: &Value,
3713 int_cmp: F1,
3714 float_cmp: F2,
3715 ) -> Result<Value>
3716 where
3717 F1: Fn(i64, i64) -> bool,
3718 F2: Fn(f64, f64) -> bool,
3719 {
3720 if let (Some(a), Some(b)) = (left.as_i64(), right.as_i64()) {
3722 return Ok(json!(int_cmp(a, b)));
3723 }
3724
3725 if let (Some(a), Some(b)) = (left.as_u64(), right.as_u64()) {
3727 return Ok(json!(int_cmp(a as i64, b as i64)));
3728 }
3729
3730 if let (Some(a), Some(b)) = (left.as_f64(), right.as_f64()) {
3732 return Ok(json!(float_cmp(a, b)));
3733 }
3734
3735 if left.is_null() || right.is_null() {
3737 return Ok(json!(false));
3738 }
3739
3740 Err(format!("Cannot compare {:?} and {:?}", left, right).into())
3741 }
3742
3743 fn value_to_bool(&self, value: &Value) -> bool {
3745 match value {
3746 Value::Null => false,
3747 Value::Bool(b) => *b,
3748 Value::Number(n) => {
3749 if let Some(i) = n.as_i64() {
3750 i != 0
3751 } else if let Some(f) = n.as_f64() {
3752 f != 0.0
3753 } else {
3754 true
3755 }
3756 }
3757 Value::String(s) => !s.is_empty(),
3758 Value::Array(arr) => !arr.is_empty(),
3759 Value::Object(obj) => !obj.is_empty(),
3760 }
3761 }
3762
3763 fn apply_cast(&self, value: &Value, to_type: &str) -> Result<Value> {
3765 match to_type {
3766 "i8" | "i16" | "i32" | "i64" | "isize" => {
3767 if let Some(n) = value.as_i64() {
3768 Ok(json!(n))
3769 } else if let Some(n) = value.as_u64() {
3770 Ok(json!(n as i64))
3771 } else if let Some(n) = value.as_f64() {
3772 Ok(json!(n as i64))
3773 } else if let Some(s) = value.as_str() {
3774 s.parse::<i64>()
3775 .map(|n| json!(n))
3776 .map_err(|e| format!("Cannot parse '{}' as integer: {}", s, e).into())
3777 } else {
3778 Err(format!("Cannot cast {:?} to {}", value, to_type).into())
3779 }
3780 }
3781 "u8" | "u16" | "u32" | "u64" | "usize" => {
3782 if let Some(n) = value.as_u64() {
3783 Ok(json!(n))
3784 } else if let Some(n) = value.as_i64() {
3785 Ok(json!(n as u64))
3786 } else if let Some(n) = value.as_f64() {
3787 Ok(json!(n as u64))
3788 } else if let Some(s) = value.as_str() {
3789 s.parse::<u64>().map(|n| json!(n)).map_err(|e| {
3790 format!("Cannot parse '{}' as unsigned integer: {}", s, e).into()
3791 })
3792 } else {
3793 Err(format!("Cannot cast {:?} to {}", value, to_type).into())
3794 }
3795 }
3796 "f32" | "f64" => {
3797 if let Some(n) = value.as_f64() {
3798 Ok(json!(n))
3799 } else if let Some(n) = value.as_i64() {
3800 Ok(json!(n as f64))
3801 } else if let Some(n) = value.as_u64() {
3802 Ok(json!(n as f64))
3803 } else if let Some(s) = value.as_str() {
3804 s.parse::<f64>()
3805 .map(|n| json!(n))
3806 .map_err(|e| format!("Cannot parse '{}' as float: {}", s, e).into())
3807 } else {
3808 Err(format!("Cannot cast {:?} to {}", value, to_type).into())
3809 }
3810 }
3811 "String" | "string" => Ok(json!(value.to_string())),
3812 "bool" => Ok(json!(self.value_to_bool(value))),
3813 _ => {
3814 Ok(value.clone())
3816 }
3817 }
3818 }
3819
3820 fn apply_method_call(&self, value: &Value, method: &str, args: &[Value]) -> Result<Value> {
3822 match method {
3823 "unwrap_or" => {
3824 if value.is_null() && !args.is_empty() {
3825 Ok(args[0].clone())
3826 } else {
3827 Ok(value.clone())
3828 }
3829 }
3830 "unwrap_or_default" => {
3831 if value.is_null() {
3832 Ok(json!(0))
3834 } else {
3835 Ok(value.clone())
3836 }
3837 }
3838 "is_some" => Ok(json!(!value.is_null())),
3839 "is_none" => Ok(json!(value.is_null())),
3840 "abs" => {
3841 if let Some(n) = value.as_i64() {
3842 Ok(json!(n.abs()))
3843 } else if let Some(n) = value.as_f64() {
3844 Ok(json!(n.abs()))
3845 } else {
3846 Err(format!("Cannot call abs() on {:?}", value).into())
3847 }
3848 }
3849 "len" => {
3850 if let Some(s) = value.as_str() {
3851 Ok(json!(s.len()))
3852 } else if let Some(arr) = value.as_array() {
3853 Ok(json!(arr.len()))
3854 } else if let Some(obj) = value.as_object() {
3855 Ok(json!(obj.len()))
3856 } else {
3857 Err(format!("Cannot call len() on {:?}", value).into())
3858 }
3859 }
3860 "to_string" => Ok(json!(value.to_string())),
3861 "min" => {
3862 if args.is_empty() {
3863 return Err("min() requires an argument".into());
3864 }
3865 let other = &args[0];
3866 if let (Some(a), Some(b)) = (value.as_i64(), other.as_i64()) {
3867 Ok(json!(a.min(b)))
3868 } else if let (Some(a), Some(b)) = (value.as_f64(), other.as_f64()) {
3869 Ok(json!(a.min(b)))
3870 } else {
3871 Err(format!("Cannot call min() on {:?} and {:?}", value, other).into())
3872 }
3873 }
3874 "max" => {
3875 if args.is_empty() {
3876 return Err("max() requires an argument".into());
3877 }
3878 let other = &args[0];
3879 if let (Some(a), Some(b)) = (value.as_i64(), other.as_i64()) {
3880 Ok(json!(a.max(b)))
3881 } else if let (Some(a), Some(b)) = (value.as_f64(), other.as_f64()) {
3882 Ok(json!(a.max(b)))
3883 } else {
3884 Err(format!("Cannot call max() on {:?} and {:?}", value, other).into())
3885 }
3886 }
3887 "saturating_add" => {
3888 if args.is_empty() {
3889 return Err("saturating_add() requires an argument".into());
3890 }
3891 let other = &args[0];
3892 if let (Some(a), Some(b)) = (value.as_i64(), other.as_i64()) {
3893 Ok(json!(a.saturating_add(b)))
3894 } else if let (Some(a), Some(b)) = (value.as_u64(), other.as_u64()) {
3895 Ok(json!(a.saturating_add(b)))
3896 } else {
3897 Err(format!(
3898 "Cannot call saturating_add() on {:?} and {:?}",
3899 value, other
3900 )
3901 .into())
3902 }
3903 }
3904 "saturating_sub" => {
3905 if args.is_empty() {
3906 return Err("saturating_sub() requires an argument".into());
3907 }
3908 let other = &args[0];
3909 if let (Some(a), Some(b)) = (value.as_i64(), other.as_i64()) {
3910 Ok(json!(a.saturating_sub(b)))
3911 } else if let (Some(a), Some(b)) = (value.as_u64(), other.as_u64()) {
3912 Ok(json!(a.saturating_sub(b)))
3913 } else {
3914 Err(format!(
3915 "Cannot call saturating_sub() on {:?} and {:?}",
3916 value, other
3917 )
3918 .into())
3919 }
3920 }
3921 _ => Err(format!("Unknown method call: {}()", method).into()),
3922 }
3923 }
3924
3925 pub fn evaluate_computed_fields_from_ast(
3928 &self,
3929 state: &mut Value,
3930 computed_field_specs: &[ComputedFieldSpec],
3931 ) -> Result<Vec<String>> {
3932 let mut updated_paths = Vec::new();
3933
3934 for spec in computed_field_specs {
3935 if let Ok(result) = self.evaluate_computed_expr(&spec.expression, state) {
3936 self.set_field_in_state(state, &spec.target_path, result)?;
3937 updated_paths.push(spec.target_path.clone());
3938 }
3939 }
3940
3941 Ok(updated_paths)
3942 }
3943
3944 fn set_field_in_state(&self, state: &mut Value, path: &str, value: Value) -> Result<()> {
3946 let segments: Vec<&str> = path.split('.').collect();
3947
3948 if segments.is_empty() {
3949 return Err("Empty path".into());
3950 }
3951
3952 let mut current = state;
3954 for (i, segment) in segments.iter().enumerate() {
3955 if i == segments.len() - 1 {
3956 if let Some(obj) = current.as_object_mut() {
3958 obj.insert(segment.to_string(), value);
3959 return Ok(());
3960 } else {
3961 return Err(format!("Cannot set field '{}' on non-object", segment).into());
3962 }
3963 } else {
3964 if !current.is_object() {
3966 *current = json!({});
3967 }
3968 let obj = current.as_object_mut().unwrap();
3969 current = obj.entry(segment.to_string()).or_insert_with(|| json!({}));
3970 }
3971 }
3972
3973 Ok(())
3974 }
3975
3976 pub fn create_evaluator_from_specs(
3979 specs: Vec<ComputedFieldSpec>,
3980 ) -> impl Fn(&mut Value) -> Result<()> + Send + Sync + 'static {
3981 move |state: &mut Value| {
3982 let vm = VmContext::new();
3985 vm.evaluate_computed_fields_from_ast(state, &specs)?;
3986 Ok(())
3987 }
3988 }
3989}
3990
3991impl Default for VmContext {
3992 fn default() -> Self {
3993 Self::new()
3994 }
3995}
3996
3997impl crate::resolvers::ReverseLookupUpdater for VmContext {
3999 fn update(&mut self, pda_address: String, seed_value: String) -> Vec<PendingAccountUpdate> {
4000 self.update_pda_reverse_lookup(0, "default_pda_lookup", pda_address, seed_value)
4002 .unwrap_or_else(|e| {
4003 tracing::error!("Failed to update PDA reverse lookup: {}", e);
4004 Vec::new()
4005 })
4006 }
4007
4008 fn flush_pending(&mut self, pda_address: &str) -> Vec<PendingAccountUpdate> {
4009 self.flush_pending_updates(0, pda_address)
4011 .unwrap_or_else(|e| {
4012 tracing::error!("Failed to flush pending updates: {}", e);
4013 Vec::new()
4014 })
4015 }
4016}
4017
4018#[cfg(test)]
4019mod tests {
4020 use super::*;
4021 use crate::ast::{BinaryOp, ComputedExpr, ComputedFieldSpec};
4022
4023 #[test]
4024 fn test_computed_field_preserves_integer_type() {
4025 let vm = VmContext::new();
4026
4027 let mut state = serde_json::json!({
4028 "trading": {
4029 "total_buy_volume": 20000000000_i64,
4030 "total_sell_volume": 17951316474_i64
4031 }
4032 });
4033
4034 let spec = ComputedFieldSpec {
4035 target_path: "trading.total_volume".to_string(),
4036 result_type: "Option<u64>".to_string(),
4037 expression: ComputedExpr::Binary {
4038 op: BinaryOp::Add,
4039 left: Box::new(ComputedExpr::UnwrapOr {
4040 expr: Box::new(ComputedExpr::FieldRef {
4041 path: "trading.total_buy_volume".to_string(),
4042 }),
4043 default: serde_json::json!(0),
4044 }),
4045 right: Box::new(ComputedExpr::UnwrapOr {
4046 expr: Box::new(ComputedExpr::FieldRef {
4047 path: "trading.total_sell_volume".to_string(),
4048 }),
4049 default: serde_json::json!(0),
4050 }),
4051 },
4052 };
4053
4054 vm.evaluate_computed_fields_from_ast(&mut state, &[spec])
4055 .unwrap();
4056
4057 let total_volume = state
4058 .get("trading")
4059 .and_then(|t| t.get("total_volume"))
4060 .expect("total_volume should exist");
4061
4062 let serialized = serde_json::to_string(total_volume).unwrap();
4063 assert!(
4064 !serialized.contains('.'),
4065 "Integer should not have decimal point: {}",
4066 serialized
4067 );
4068 assert_eq!(
4069 total_volume.as_i64(),
4070 Some(37951316474),
4071 "Value should be correct sum"
4072 );
4073 }
4074
4075 #[test]
4076 fn test_set_field_sum_preserves_integer_type() {
4077 let mut vm = VmContext::new();
4078 vm.registers[0] = serde_json::json!({});
4079 vm.registers[1] = serde_json::json!(20000000000_i64);
4080 vm.registers[2] = serde_json::json!(17951316474_i64);
4081
4082 vm.set_field_sum(0, "trading.total_buy_volume", 1).unwrap();
4083 vm.set_field_sum(0, "trading.total_sell_volume", 2).unwrap();
4084
4085 let state = &vm.registers[0];
4086 let buy_vol = state
4087 .get("trading")
4088 .and_then(|t| t.get("total_buy_volume"))
4089 .unwrap();
4090 let sell_vol = state
4091 .get("trading")
4092 .and_then(|t| t.get("total_sell_volume"))
4093 .unwrap();
4094
4095 let buy_serialized = serde_json::to_string(buy_vol).unwrap();
4096 let sell_serialized = serde_json::to_string(sell_vol).unwrap();
4097
4098 assert!(
4099 !buy_serialized.contains('.'),
4100 "Buy volume should not have decimal: {}",
4101 buy_serialized
4102 );
4103 assert!(
4104 !sell_serialized.contains('.'),
4105 "Sell volume should not have decimal: {}",
4106 sell_serialized
4107 );
4108 }
4109
4110 #[test]
4111 fn test_lookup_index_chaining() {
4112 let mut vm = VmContext::new();
4113
4114 let state = vm.states.get_mut(&0).unwrap();
4115
4116 state
4117 .pda_reverse_lookups
4118 .entry("default_pda_lookup".to_string())
4119 .or_insert_with(|| PdaReverseLookup::new(1000))
4120 .insert("pda_123".to_string(), "addr_456".to_string());
4121
4122 state
4123 .lookup_indexes
4124 .entry("round_address_lookup_index".to_string())
4125 .or_insert_with(LookupIndex::new)
4126 .insert(json!("addr_456"), json!(789));
4127
4128 let handler = vec![
4129 OpCode::LoadConstant {
4130 value: json!("pda_123"),
4131 dest: 0,
4132 },
4133 OpCode::LookupIndex {
4134 state_id: 0,
4135 index_name: "round_address_lookup_index".to_string(),
4136 lookup_value: 0,
4137 dest: 1,
4138 },
4139 ];
4140
4141 vm.execute_handler(&handler, &json!({}), "test", 0, "TestEntity", None, None)
4142 .unwrap();
4143
4144 assert_eq!(vm.registers[1], json!(789));
4145 }
4146
4147 #[test]
4148 fn test_lookup_index_no_chain() {
4149 let mut vm = VmContext::new();
4150
4151 let state = vm.states.get_mut(&0).unwrap();
4152 state
4153 .lookup_indexes
4154 .entry("test_index".to_string())
4155 .or_insert_with(LookupIndex::new)
4156 .insert(json!("key_abc"), json!(42));
4157
4158 let handler = vec![
4159 OpCode::LoadConstant {
4160 value: json!("key_abc"),
4161 dest: 0,
4162 },
4163 OpCode::LookupIndex {
4164 state_id: 0,
4165 index_name: "test_index".to_string(),
4166 lookup_value: 0,
4167 dest: 1,
4168 },
4169 ];
4170
4171 vm.execute_handler(&handler, &json!({}), "test", 0, "TestEntity", None, None)
4172 .unwrap();
4173
4174 assert_eq!(vm.registers[1], json!(42));
4175 }
4176
4177 #[test]
4178 fn test_conditional_set_field_with_zero_array() {
4179 let mut vm = VmContext::new();
4180
4181 let event_zeros = json!({
4182 "value": [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
4183 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
4184 });
4185
4186 let event_nonzero = json!({
4187 "value": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16,
4188 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32]
4189 });
4190
4191 let zero_32: Value = json!([
4192 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
4193 0, 0, 0
4194 ]);
4195
4196 let handler = vec![
4197 OpCode::CreateObject { dest: 2 },
4198 OpCode::LoadEventField {
4199 path: FieldPath::new(&["value"]),
4200 dest: 10,
4201 default: None,
4202 },
4203 OpCode::ConditionalSetField {
4204 object: 2,
4205 path: "captured_value".to_string(),
4206 value: 10,
4207 condition_field: FieldPath::new(&["value"]),
4208 condition_op: ComparisonOp::NotEqual,
4209 condition_value: zero_32,
4210 },
4211 ];
4212
4213 vm.execute_handler(&handler, &event_zeros, "test", 0, "Test", None, None)
4214 .unwrap();
4215 assert!(
4216 vm.registers[2].get("captured_value").is_none(),
4217 "Field should not be set when value is all zeros"
4218 );
4219
4220 vm.reset_registers();
4221 vm.execute_handler(&handler, &event_nonzero, "test", 0, "Test", None, None)
4222 .unwrap();
4223 assert!(
4224 vm.registers[2].get("captured_value").is_some(),
4225 "Field should be set when value is non-zero"
4226 );
4227 }
4228
4229 #[test]
4230 fn test_when_instruction_arrives_first() {
4231 let mut vm = VmContext::new();
4232
4233 let signature = "test_sig_123".to_string();
4234
4235 {
4236 let state = vm.states.get(&0).unwrap();
4237 let mut cache = state.recent_tx_instructions.lock().unwrap();
4238 let mut set = HashSet::new();
4239 set.insert("RevealIxState".to_string());
4240 cache.put(signature.clone(), set);
4241 }
4242
4243 vm.current_context = Some(UpdateContext::new(100, signature.clone()));
4244
4245 let handler = vec![
4246 OpCode::CreateObject { dest: 2 },
4247 OpCode::LoadConstant {
4248 value: json!("primary_key_value"),
4249 dest: 1,
4250 },
4251 OpCode::LoadConstant {
4252 value: json!("the_revealed_value"),
4253 dest: 10,
4254 },
4255 OpCode::SetFieldWhen {
4256 object: 2,
4257 path: "entropy_value".to_string(),
4258 value: 10,
4259 when_instruction: "RevealIxState".to_string(),
4260 entity_name: "TestEntity".to_string(),
4261 key_reg: 1,
4262 condition_field: None,
4263 condition_op: None,
4264 condition_value: None,
4265 },
4266 ];
4267
4268 vm.execute_handler(
4269 &handler,
4270 &json!({}),
4271 "VarState",
4272 0,
4273 "TestEntity",
4274 None,
4275 None,
4276 )
4277 .unwrap();
4278
4279 assert_eq!(
4280 vm.registers[2].get("entropy_value").unwrap(),
4281 "the_revealed_value",
4282 "Field should be set when instruction was already seen"
4283 );
4284 }
4285
4286 #[test]
4287 fn test_when_account_arrives_first() {
4288 let mut vm = VmContext::new();
4289
4290 let signature = "test_sig_456".to_string();
4291
4292 vm.current_context = Some(UpdateContext::new(100, signature.clone()));
4293
4294 let handler = vec![
4295 OpCode::CreateObject { dest: 2 },
4296 OpCode::LoadConstant {
4297 value: json!("pk_123"),
4298 dest: 1,
4299 },
4300 OpCode::LoadConstant {
4301 value: json!("deferred_value"),
4302 dest: 10,
4303 },
4304 OpCode::SetFieldWhen {
4305 object: 2,
4306 path: "entropy_value".to_string(),
4307 value: 10,
4308 when_instruction: "RevealIxState".to_string(),
4309 entity_name: "TestEntity".to_string(),
4310 key_reg: 1,
4311 condition_field: None,
4312 condition_op: None,
4313 condition_value: None,
4314 },
4315 ];
4316
4317 vm.execute_handler(
4318 &handler,
4319 &json!({}),
4320 "VarState",
4321 0,
4322 "TestEntity",
4323 None,
4324 None,
4325 )
4326 .unwrap();
4327
4328 assert!(
4329 vm.registers[2].get("entropy_value").is_none(),
4330 "Field should not be set when instruction hasn't been seen"
4331 );
4332
4333 let state = vm.states.get(&0).unwrap();
4334 let key = (signature.clone(), "RevealIxState".to_string());
4335 assert!(
4336 state.deferred_when_ops.contains_key(&key),
4337 "Operation should be queued"
4338 );
4339
4340 {
4341 let mut cache = state.recent_tx_instructions.lock().unwrap();
4342 let mut set = HashSet::new();
4343 set.insert("RevealIxState".to_string());
4344 cache.put(signature.clone(), set);
4345 }
4346
4347 let deferred = state.deferred_when_ops.remove(&key).unwrap().1;
4348 for op in deferred {
4349 vm.apply_deferred_when_op(0, &op).unwrap();
4350 }
4351
4352 let state = vm.states.get(&0).unwrap();
4353 let entity = state.data.get(&json!("pk_123")).unwrap();
4354 assert_eq!(
4355 entity.get("entropy_value").unwrap(),
4356 "deferred_value",
4357 "Field should be set after instruction arrives"
4358 );
4359 }
4360
4361 #[test]
4362 fn test_when_cleanup_expired() {
4363 let mut vm = VmContext::new();
4364
4365 let state = vm.states.get(&0).unwrap();
4366 let key = ("old_sig".to_string(), "SomeIxState".to_string());
4367 state.deferred_when_ops.insert(
4368 key,
4369 vec![DeferredWhenOperation {
4370 entity_name: "Test".to_string(),
4371 primary_key: json!("pk"),
4372 field_path: "field".to_string(),
4373 field_value: json!("value"),
4374 when_instruction: "SomeIxState".to_string(),
4375 signature: "old_sig".to_string(),
4376 slot: 0,
4377 deferred_at: 0,
4378 emit: true,
4379 }],
4380 );
4381
4382 let removed = vm.cleanup_expired_when_ops(0, 60);
4383
4384 assert_eq!(removed, 1, "Should have removed 1 expired op");
4385 assert!(
4386 vm.states.get(&0).unwrap().deferred_when_ops.is_empty(),
4387 "Deferred ops should be empty after cleanup"
4388 );
4389 }
4390}