1use crate::ast::{
2 BinaryOp, ComparisonOp, ComputedExpr, ComputedFieldSpec, FieldPath, Transformation,
3};
4use crate::compiler::{MultiEntityBytecode, OpCode};
5use crate::Mutation;
6use dashmap::DashMap;
7use lru::LruCache;
8use serde_json::{json, Value};
9use std::collections::{HashMap, HashSet};
10use std::num::NonZeroUsize;
11
12#[cfg(feature = "otel")]
13use tracing::instrument;
14
15#[derive(Debug, Clone, Default)]
18pub struct UpdateContext {
19 pub slot: Option<u64>,
21 pub signature: Option<String>,
23 pub timestamp: Option<i64>,
26 pub write_version: Option<u64>,
29 pub txn_index: Option<u64>,
32 pub metadata: HashMap<String, Value>,
34}
35
36impl UpdateContext {
37 pub fn new(slot: u64, signature: String) -> Self {
39 Self {
40 slot: Some(slot),
41 signature: Some(signature),
42 timestamp: None,
43 write_version: None,
44 txn_index: None,
45 metadata: HashMap::new(),
46 }
47 }
48
49 pub fn with_timestamp(slot: u64, signature: String, timestamp: i64) -> Self {
51 Self {
52 slot: Some(slot),
53 signature: Some(signature),
54 timestamp: Some(timestamp),
55 write_version: None,
56 txn_index: None,
57 metadata: HashMap::new(),
58 }
59 }
60
61 pub fn new_account(slot: u64, signature: String, write_version: u64) -> Self {
63 Self {
64 slot: Some(slot),
65 signature: Some(signature),
66 timestamp: None,
67 write_version: Some(write_version),
68 txn_index: None,
69 metadata: HashMap::new(),
70 }
71 }
72
73 pub fn new_instruction(slot: u64, signature: String, txn_index: u64) -> Self {
75 Self {
76 slot: Some(slot),
77 signature: Some(signature),
78 timestamp: None,
79 write_version: None,
80 txn_index: Some(txn_index),
81 metadata: HashMap::new(),
82 }
83 }
84
85 pub fn timestamp(&self) -> i64 {
87 self.timestamp.unwrap_or_else(|| {
88 std::time::SystemTime::now()
89 .duration_since(std::time::UNIX_EPOCH)
90 .unwrap()
91 .as_secs() as i64
92 })
93 }
94
95 pub fn empty() -> Self {
97 Self::default()
98 }
99
100 pub fn with_metadata(mut self, key: String, value: Value) -> Self {
102 self.metadata.insert(key, value);
103 self
104 }
105
106 pub fn get_metadata(&self, key: &str) -> Option<&Value> {
108 self.metadata.get(key)
109 }
110
111 pub fn to_value(&self) -> Value {
113 let mut obj = serde_json::Map::new();
114 if let Some(slot) = self.slot {
115 obj.insert("slot".to_string(), json!(slot));
116 }
117 if let Some(ref sig) = self.signature {
118 obj.insert("signature".to_string(), json!(sig));
119 }
120 obj.insert("timestamp".to_string(), json!(self.timestamp()));
122 for (key, value) in &self.metadata {
123 obj.insert(key.clone(), value.clone());
124 }
125 Value::Object(obj)
126 }
127}
128
129pub type Register = usize;
130pub type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>;
131
132pub type RegisterValue = Value;
133
134pub trait ComputedFieldsEvaluator {
137 fn evaluate(&self, state: &mut Value) -> Result<()>;
138}
139
140const MAX_PENDING_UPDATES_TOTAL: usize = 10_000;
142const MAX_PENDING_UPDATES_PER_PDA: usize = 10;
143const PENDING_UPDATE_TTL_SECONDS: i64 = 300; const TEMPORAL_HISTORY_TTL_SECONDS: i64 = 300; const MAX_TEMPORAL_ENTRIES_PER_KEY: usize = 1000; const DEFAULT_MAX_STATE_TABLE_ENTRIES: usize = 100_000;
151const DEFAULT_MAX_ARRAY_LENGTH: usize = 100;
152
153fn estimate_json_size(value: &Value) -> usize {
155 match value {
156 Value::Null => 4,
157 Value::Bool(_) => 5,
158 Value::Number(_) => 8,
159 Value::String(s) => s.len() + 2,
160 Value::Array(arr) => 2 + arr.iter().map(|v| estimate_json_size(v) + 1).sum::<usize>(),
161 Value::Object(obj) => {
162 2 + obj
163 .iter()
164 .map(|(k, v)| k.len() + 3 + estimate_json_size(v) + 1)
165 .sum::<usize>()
166 }
167 }
168}
169
170#[derive(Debug, Clone)]
171pub struct CompiledPath {
172 pub segments: std::sync::Arc<[String]>,
173}
174
175impl CompiledPath {
176 pub fn new(path: &str) -> Self {
177 let segments: Vec<String> = path.split('.').map(|s| s.to_string()).collect();
178 CompiledPath {
179 segments: segments.into(),
180 }
181 }
182
183 fn segments(&self) -> &[String] {
184 &self.segments
185 }
186}
187
188pub struct VmContext {
189 registers: Vec<RegisterValue>,
190 states: HashMap<u32, StateTable>,
191 pub instructions_executed: u64,
192 pub cache_hits: u64,
193 path_cache: HashMap<String, CompiledPath>,
194 pub pda_cache_hits: u64,
195 pub pda_cache_misses: u64,
196 pub pending_queue_size: u64,
197 current_context: Option<UpdateContext>,
199}
200
201#[derive(Debug)]
202pub struct LookupIndex {
203 index: DashMap<Value, Value>,
204}
205
206impl Default for LookupIndex {
207 fn default() -> Self {
208 Self::new()
209 }
210}
211
212impl LookupIndex {
213 pub fn new() -> Self {
214 LookupIndex {
215 index: DashMap::new(),
216 }
217 }
218
219 pub fn lookup(&self, lookup_value: &Value) -> Option<Value> {
220 self.index.get(lookup_value).map(|v| v.clone())
221 }
222
223 pub fn insert(&self, lookup_value: Value, primary_key: Value) {
224 self.index.insert(lookup_value, primary_key);
225 }
226
227 pub fn len(&self) -> usize {
228 self.index.len()
229 }
230
231 pub fn is_empty(&self) -> bool {
232 self.index.is_empty()
233 }
234}
235
236#[derive(Debug)]
237pub struct TemporalIndex {
238 index: DashMap<Value, Vec<(Value, i64)>>,
239}
240
241impl Default for TemporalIndex {
242 fn default() -> Self {
243 Self::new()
244 }
245}
246
247impl TemporalIndex {
248 pub fn new() -> Self {
249 TemporalIndex {
250 index: DashMap::new(),
251 }
252 }
253
254 pub fn lookup(&self, lookup_value: &Value, timestamp: i64) -> Option<Value> {
255 if let Some(entries) = self.index.get(lookup_value) {
256 let entries_vec = entries.value();
257 for i in (0..entries_vec.len()).rev() {
258 if entries_vec[i].1 <= timestamp {
259 return Some(entries_vec[i].0.clone());
260 }
261 }
262 }
263 None
264 }
265
266 pub fn lookup_latest(&self, lookup_value: &Value) -> Option<Value> {
267 if let Some(entries) = self.index.get(lookup_value) {
268 let entries_vec = entries.value();
269 if let Some(last) = entries_vec.last() {
270 return Some(last.0.clone());
271 }
272 }
273 None
274 }
275
276 pub fn insert(&self, lookup_value: Value, primary_key: Value, timestamp: i64) {
277 let lookup_key = lookup_value.clone();
278 self.index
279 .entry(lookup_value)
280 .or_default()
281 .push((primary_key, timestamp));
282
283 if let Some(mut entries) = self.index.get_mut(&lookup_key) {
284 entries.sort_by_key(|(_, ts)| *ts);
285
286 let cutoff = timestamp - TEMPORAL_HISTORY_TTL_SECONDS;
287 entries.retain(|(_, ts)| *ts >= cutoff);
288
289 if entries.len() > MAX_TEMPORAL_ENTRIES_PER_KEY {
290 let excess = entries.len() - MAX_TEMPORAL_ENTRIES_PER_KEY;
291 entries.drain(0..excess);
292 }
293 }
294 }
295
296 pub fn len(&self) -> usize {
297 self.index.len()
298 }
299
300 pub fn is_empty(&self) -> bool {
301 self.index.is_empty()
302 }
303
304 pub fn total_entries(&self) -> usize {
305 self.index.iter().map(|entry| entry.value().len()).sum()
306 }
307}
308
309#[derive(Debug)]
310pub struct PdaReverseLookup {
311 index: LruCache<String, String>,
313}
314
315impl PdaReverseLookup {
316 pub fn new(capacity: usize) -> Self {
317 PdaReverseLookup {
318 index: LruCache::new(NonZeroUsize::new(capacity).unwrap()),
319 }
320 }
321
322 pub fn lookup(&mut self, pda_address: &str) -> Option<String> {
323 self.index.get(pda_address).cloned()
324 }
325
326 pub fn insert(&mut self, pda_address: String, seed_value: String) -> Option<String> {
327 let evicted = if self.index.len() >= self.index.cap().get() {
328 self.index.peek_lru().map(|(k, _)| k.clone())
329 } else {
330 None
331 };
332
333 self.index.put(pda_address, seed_value);
334 evicted
335 }
336
337 pub fn len(&self) -> usize {
338 self.index.len()
339 }
340
341 pub fn is_empty(&self) -> bool {
342 self.index.is_empty()
343 }
344}
345
346#[derive(Debug, Clone)]
348pub struct QueuedAccountUpdate {
349 pub pda_address: String,
350 pub account_type: String,
351 pub account_data: Value,
352 pub slot: u64,
353 pub write_version: u64,
354 pub signature: String,
355}
356
357#[derive(Debug, Clone)]
359pub struct PendingAccountUpdate {
360 pub account_type: String,
361 pub pda_address: String,
362 pub account_data: Value,
363 pub slot: u64,
364 pub write_version: u64,
365 pub signature: String,
366 pub queued_at: i64,
367}
368
369#[derive(Debug, Clone)]
370pub struct PendingQueueStats {
371 pub total_updates: usize,
372 pub unique_pdas: usize,
373 pub oldest_age_seconds: i64,
374 pub largest_pda_queue_size: usize,
375 pub estimated_memory_bytes: usize,
376}
377
378#[derive(Debug, Clone, Default)]
379pub struct VmMemoryStats {
380 pub state_table_entity_count: usize,
381 pub state_table_max_entries: usize,
382 pub state_table_at_capacity: bool,
383 pub lookup_index_count: usize,
384 pub lookup_index_total_entries: usize,
385 pub temporal_index_count: usize,
386 pub temporal_index_total_entries: usize,
387 pub pda_reverse_lookup_count: usize,
388 pub pda_reverse_lookup_total_entries: usize,
389 pub pending_queue_stats: Option<PendingQueueStats>,
390 pub path_cache_size: usize,
391}
392
393#[derive(Debug, Clone, Default)]
394pub struct CleanupResult {
395 pub pending_updates_removed: usize,
396 pub temporal_entries_removed: usize,
397}
398
399#[derive(Debug, Clone)]
400pub struct CapacityWarning {
401 pub current_entries: usize,
402 pub max_entries: usize,
403 pub entries_over_limit: usize,
404}
405
406#[derive(Debug, Clone)]
407pub struct StateTableConfig {
408 pub max_entries: usize,
409 pub max_array_length: usize,
410}
411
412impl Default for StateTableConfig {
413 fn default() -> Self {
414 Self {
415 max_entries: DEFAULT_MAX_STATE_TABLE_ENTRIES,
416 max_array_length: DEFAULT_MAX_ARRAY_LENGTH,
417 }
418 }
419}
420
421#[derive(Debug, Clone, PartialEq, Eq, Hash)]
422struct VersionKey {
423 primary_key: String,
424 event_type: String,
425}
426
427#[derive(Debug)]
428pub struct StateTable {
429 pub data: DashMap<Value, Value>,
430 access_times: DashMap<Value, i64>,
431 pub lookup_indexes: HashMap<String, LookupIndex>,
432 pub temporal_indexes: HashMap<String, TemporalIndex>,
433 pub pda_reverse_lookups: HashMap<String, PdaReverseLookup>,
434 pub pending_updates: DashMap<String, Vec<PendingAccountUpdate>>,
435 version_tracker: DashMap<VersionKey, (u64, u64)>,
437 config: StateTableConfig,
438}
439
440impl StateTable {
441 pub fn is_at_capacity(&self) -> bool {
442 self.data.len() >= self.config.max_entries
443 }
444
445 pub fn entries_over_limit(&self) -> usize {
446 self.data.len().saturating_sub(self.config.max_entries)
447 }
448
449 pub fn max_array_length(&self) -> usize {
450 self.config.max_array_length
451 }
452
453 fn touch(&self, key: &Value) {
454 let now = std::time::SystemTime::now()
455 .duration_since(std::time::UNIX_EPOCH)
456 .unwrap()
457 .as_secs() as i64;
458 self.access_times.insert(key.clone(), now);
459 }
460
461 fn evict_lru(&self, count: usize) -> usize {
462 if count == 0 || self.data.is_empty() {
463 return 0;
464 }
465
466 let mut entries: Vec<(Value, i64)> = self
467 .access_times
468 .iter()
469 .map(|entry| (entry.key().clone(), *entry.value()))
470 .collect();
471
472 entries.sort_by_key(|(_, ts)| *ts);
473
474 let to_evict: Vec<Value> = entries.iter().take(count).map(|(k, _)| k.clone()).collect();
475
476 let mut evicted = 0;
477 for key in to_evict {
478 self.data.remove(&key);
479 self.access_times.remove(&key);
480 evicted += 1;
481 }
482
483 if evicted > 0 {
484 tracing::info!("Evicted {} LRU entries from state table", evicted);
485 }
486
487 evicted
488 }
489
490 pub fn insert_with_eviction(&self, key: Value, value: Value) {
491 if self.data.len() >= self.config.max_entries && !self.data.contains_key(&key) {
492 let to_evict = (self.data.len() + 1).saturating_sub(self.config.max_entries);
493 self.evict_lru(to_evict.max(1));
494 }
495 self.data.insert(key.clone(), value);
496 self.touch(&key);
497 }
498
499 pub fn get_and_touch(&self, key: &Value) -> Option<Value> {
500 let result = self.data.get(key).map(|v| v.clone());
501 if result.is_some() {
502 self.touch(key);
503 }
504 result
505 }
506
507 pub fn is_fresh_update(
514 &self,
515 primary_key: &Value,
516 event_type: &str,
517 slot: u64,
518 ordering_value: u64,
519 ) -> bool {
520 let key = VersionKey {
521 primary_key: primary_key.to_string(),
522 event_type: event_type.to_string(),
523 };
524
525 let dominated = self
526 .version_tracker
527 .get(&key)
528 .map(|entry| {
529 let (last_slot, last_version) = *entry;
530 (slot, ordering_value) <= (last_slot, last_version)
531 })
532 .unwrap_or(false);
533
534 if dominated {
535 return false;
536 }
537
538 self.version_tracker.insert(key, (slot, ordering_value));
539 true
540 }
541}
542
543impl VmContext {
544 pub fn new() -> Self {
545 let mut vm = VmContext {
546 registers: vec![Value::Null; 256],
547 states: HashMap::new(),
548 instructions_executed: 0,
549 cache_hits: 0,
550 path_cache: HashMap::new(),
551 pda_cache_hits: 0,
552 pda_cache_misses: 0,
553 pending_queue_size: 0,
554 current_context: None,
555 };
556 vm.states.insert(
557 0,
558 StateTable {
559 data: DashMap::new(),
560 access_times: DashMap::new(),
561 lookup_indexes: HashMap::new(),
562 temporal_indexes: HashMap::new(),
563 pda_reverse_lookups: HashMap::new(),
564 pending_updates: DashMap::new(),
565 version_tracker: DashMap::new(),
566 config: StateTableConfig::default(),
567 },
568 );
569 vm
570 }
571
572 pub fn new_with_config(state_config: StateTableConfig) -> Self {
573 let mut vm = VmContext {
574 registers: vec![Value::Null; 256],
575 states: HashMap::new(),
576 instructions_executed: 0,
577 cache_hits: 0,
578 path_cache: HashMap::new(),
579 pda_cache_hits: 0,
580 pda_cache_misses: 0,
581 pending_queue_size: 0,
582 current_context: None,
583 };
584 vm.states.insert(
585 0,
586 StateTable {
587 data: DashMap::new(),
588 access_times: DashMap::new(),
589 lookup_indexes: HashMap::new(),
590 temporal_indexes: HashMap::new(),
591 pda_reverse_lookups: HashMap::new(),
592 pending_updates: DashMap::new(),
593 version_tracker: DashMap::new(),
594 config: state_config,
595 },
596 );
597 vm
598 }
599
600 pub fn get_state_table_mut(&mut self, state_id: u32) -> Option<&mut StateTable> {
603 self.states.get_mut(&state_id)
604 }
605
606 pub fn registers_mut(&mut self) -> &mut Vec<RegisterValue> {
608 &mut self.registers
609 }
610
611 pub fn path_cache(&self) -> &HashMap<String, CompiledPath> {
613 &self.path_cache
614 }
615
616 pub fn current_context(&self) -> Option<&UpdateContext> {
618 self.current_context.as_ref()
619 }
620
621 pub fn update_state_from_register(
622 &mut self,
623 state_id: u32,
624 key: Value,
625 register: Register,
626 ) -> Result<()> {
627 let state = self.states.get(&state_id).ok_or("State table not found")?;
628 let value = self.registers[register].clone();
629 state.insert_with_eviction(key, value);
630 Ok(())
631 }
632
633 fn reset_registers(&mut self) {
634 for reg in &mut self.registers {
635 *reg = Value::Null;
636 }
637 }
638
639 pub fn extract_partial_state(
641 &self,
642 state_reg: Register,
643 dirty_fields: &HashSet<String>,
644 ) -> Result<Value> {
645 let full_state = &self.registers[state_reg];
646
647 if dirty_fields.is_empty() {
648 return Ok(json!({}));
649 }
650
651 let mut partial = serde_json::Map::new();
652
653 for path in dirty_fields {
654 let segments: Vec<&str> = path.split('.').collect();
655
656 let mut current = full_state;
657 let mut found = true;
658
659 for segment in &segments {
660 match current.get(segment) {
661 Some(v) => current = v,
662 None => {
663 found = false;
664 break;
665 }
666 }
667 }
668
669 if !found {
670 continue;
671 }
672
673 let mut target = &mut partial;
674 for (i, segment) in segments.iter().enumerate() {
675 if i == segments.len() - 1 {
676 target.insert(segment.to_string(), current.clone());
677 } else {
678 target
679 .entry(segment.to_string())
680 .or_insert_with(|| json!({}));
681 target = target
682 .get_mut(*segment)
683 .and_then(|v| v.as_object_mut())
684 .ok_or("Failed to build nested structure")?;
685 }
686 }
687 }
688
689 Ok(Value::Object(partial))
690 }
691
692 fn get_compiled_path(&mut self, path: &str) -> CompiledPath {
693 if let Some(compiled) = self.path_cache.get(path) {
694 self.cache_hits += 1;
695 return compiled.clone();
696 }
697 let compiled = CompiledPath::new(path);
698 self.path_cache.insert(path.to_string(), compiled.clone());
699 compiled
700 }
701
702 #[cfg_attr(feature = "otel", instrument(
704 name = "vm.process_event",
705 skip(self, bytecode, event_value, context),
706 fields(
707 event_type = %event_type,
708 slot = context.as_ref().and_then(|c| c.slot),
709 )
710 ))]
711 pub fn process_event_with_context(
712 &mut self,
713 bytecode: &MultiEntityBytecode,
714 mut event_value: Value,
715 event_type: &str,
716 context: Option<&UpdateContext>,
717 ) -> Result<Vec<Mutation>> {
718 self.current_context = context.cloned();
720
721 if let Some(ctx) = context {
723 if let Some(obj) = event_value.as_object_mut() {
724 obj.insert("__update_context".to_string(), ctx.to_value());
725 }
726 }
727
728 let mut all_mutations = Vec::new();
729
730 if let Some(entity_names) = bytecode.event_routing.get(event_type) {
731 tracing::debug!(
732 "🔀 Event type '{}' routes to {} entity/entities",
733 event_type,
734 entity_names.len()
735 );
736
737 for entity_name in entity_names {
738 if let Some(entity_bytecode) = bytecode.entities.get(entity_name) {
739 if let Some(handler) = entity_bytecode.handlers.get(event_type) {
740 tracing::debug!(
741 " ▶️ Executing handler for entity '{}', event '{}'",
742 entity_name,
743 event_type
744 );
745 tracing::debug!(" Handler has {} opcodes", handler.len());
746
747 let mutations = self.execute_handler(
748 handler,
749 event_value.clone(),
750 event_type,
751 entity_bytecode.state_id,
752 entity_bytecode.computed_fields_evaluator.as_ref(),
753 )?;
754
755 tracing::debug!(" Handler produced {} mutation(s)", mutations.len());
756 all_mutations.extend(mutations);
757 } else {
758 tracing::debug!(
759 " ⊘ No handler found for entity '{}', event '{}'",
760 entity_name,
761 event_type
762 );
763 }
764 } else {
765 tracing::debug!(" ⊘ Entity '{}' not found in bytecode", entity_name);
766 }
767 }
768 } else {
769 tracing::debug!("⊘ No event routing found for event type '{}'", event_type);
770 }
771
772 Ok(all_mutations)
773 }
774
775 pub fn process_event(
777 &mut self,
778 bytecode: &MultiEntityBytecode,
779 event_value: Value,
780 event_type: &str,
781 ) -> Result<Vec<Mutation>> {
782 self.process_event_with_context(bytecode, event_value, event_type, None)
783 }
784
785 pub fn process_any(
786 &mut self,
787 bytecode: &MultiEntityBytecode,
788 any: prost_types::Any,
789 ) -> Result<Vec<Mutation>> {
790 let (event_value, event_type) = bytecode.proto_router.decode(any)?;
791 self.process_event(bytecode, event_value, &event_type)
792 }
793
794 #[cfg_attr(feature = "otel", instrument(
795 name = "vm.execute_handler",
796 skip(self, handler, event_value, entity_evaluator),
797 level = "debug",
798 fields(
799 event_type = %event_type,
800 handler_opcodes = handler.len(),
801 )
802 ))]
803 #[allow(clippy::type_complexity)]
804 fn execute_handler(
805 &mut self,
806 handler: &[OpCode],
807 event_value: Value,
808 event_type: &str,
809 override_state_id: u32,
810 entity_evaluator: Option<&Box<dyn Fn(&mut Value) -> Result<()> + Send + Sync>>,
811 ) -> Result<Vec<Mutation>> {
812 self.reset_registers();
813
814 tracing::trace!(
815 "Executing handler: event_type={}, handler_opcodes={}, event_value={:?}",
816 event_type,
817 handler.len(),
818 event_value
819 );
820
821 let mut pc: usize = 0;
822 let mut output = Vec::new();
823 let mut dirty_fields: HashSet<String> = HashSet::new();
824
825 while pc < handler.len() {
826 tracing::debug!(
827 "Executing opcode {}/{}: {:?}",
828 pc + 1,
829 handler.len(),
830 &handler[pc]
831 );
832 match &handler[pc] {
833 OpCode::LoadEventField {
834 path,
835 dest,
836 default,
837 } => {
838 let value = self.load_field(&event_value, path, default.as_ref())?;
839 tracing::trace!(
840 "LoadEventField: path={:?}, dest={}, value={:?}",
841 path.segments,
842 dest,
843 value
844 );
845 if value.is_null() && path.segments.len() >= 2 && path.segments[0] == "accounts"
847 {
848 tracing::warn!(
849 "⚠️ LoadEventField returned NULL for accounts path: {:?} -> dest={}",
850 path.segments,
851 dest
852 );
853 }
854 self.registers[*dest] = value;
855 pc += 1;
856 }
857 OpCode::LoadConstant { value, dest } => {
858 self.registers[*dest] = value.clone();
859 pc += 1;
860 }
861 OpCode::CopyRegister { source, dest } => {
862 let value = self.registers[*source].clone();
863 tracing::trace!(
864 "CopyRegister: source={}, dest={}, value={:?}",
865 source,
866 dest,
867 if value.is_null() {
868 "NULL".to_string()
869 } else {
870 format!("{}", value)
871 }
872 );
873 self.registers[*dest] = value;
874 pc += 1;
875 }
876 OpCode::CopyRegisterIfNull { source, dest } => {
877 if self.registers[*dest].is_null() {
878 self.registers[*dest] = self.registers[*source].clone();
879 tracing::trace!(
880 "CopyRegisterIfNull: copied from reg {} to reg {} (value={:?})",
881 source,
882 dest,
883 self.registers[*source]
884 );
885 } else {
886 tracing::trace!(
887 "CopyRegisterIfNull: dest reg {} not null, skipping copy",
888 dest
889 );
890 }
891 pc += 1;
892 }
893 OpCode::GetEventType { dest } => {
894 self.registers[*dest] = json!(event_type);
895 pc += 1;
896 }
897 OpCode::CreateObject { dest } => {
898 self.registers[*dest] = json!({});
899 pc += 1;
900 }
901 OpCode::SetField {
902 object,
903 path,
904 value,
905 } => {
906 let val = self.registers[*value].clone();
907 tracing::trace!("SetField: path={}, value={:?}", path, val);
908
909 self.set_field_auto_vivify(*object, path, *value)?;
910 dirty_fields.insert(path.to_string());
911 pc += 1;
912 }
913 OpCode::SetFields { object, fields } => {
914 for (path, value_reg) in fields {
915 self.set_field_auto_vivify(*object, path, *value_reg)?;
916 dirty_fields.insert(path.to_string());
917 }
918 pc += 1;
919 }
920 OpCode::GetField { object, path, dest } => {
921 let value = self.get_field(*object, path)?;
922 self.registers[*dest] = value;
923 pc += 1;
924 }
925 OpCode::ReadOrInitState {
926 state_id: _,
927 key,
928 default,
929 dest,
930 } => {
931 let actual_state_id = override_state_id;
932 self.states
933 .entry(actual_state_id)
934 .or_insert_with(|| StateTable {
935 data: DashMap::new(),
936 access_times: DashMap::new(),
937 lookup_indexes: HashMap::new(),
938 temporal_indexes: HashMap::new(),
939 pda_reverse_lookups: HashMap::new(),
940 pending_updates: DashMap::new(),
941 version_tracker: DashMap::new(),
942 config: StateTableConfig::default(),
943 });
944 let state = self
945 .states
946 .get(&actual_state_id)
947 .ok_or("State table not found")?;
948 let key_value = &self.registers[*key];
949 if key_value.is_null() {
950 if event_type.ends_with("State") && !event_type.ends_with("IxState") {
953 tracing::warn!(
954 "ReadOrInitState: key register {} is NULL for account state, event_type={}",
955 key,
956 event_type
957 );
958 } else {
959 tracing::debug!(
960 "ReadOrInitState: key register {} is NULL (expected for PDA registration hook), event_type={}",
961 key,
962 event_type
963 );
964 }
965 } else if let Some(ctx) = &self.current_context {
966 let ordering_value = ctx.write_version.or(ctx.txn_index);
968 if let (Some(slot), Some(version)) = (ctx.slot, ordering_value) {
969 if !state.is_fresh_update(key_value, event_type, slot, version) {
970 tracing::debug!(
971 "Skipping stale update for key={}, event_type={}, slot={}, version={}",
972 key_value,
973 event_type,
974 slot,
975 version
976 );
977 return Ok(Vec::new());
978 }
979 }
980 }
981 let value = state
982 .get_and_touch(key_value)
983 .unwrap_or_else(|| default.clone());
984
985 self.registers[*dest] = value;
986 pc += 1;
987 }
988 OpCode::UpdateState {
989 state_id: _,
990 key,
991 value,
992 } => {
993 let actual_state_id = override_state_id;
994 let state = self
995 .states
996 .get(&actual_state_id)
997 .ok_or("State table not found")?;
998 let key_value = self.registers[*key].clone();
999 let value_data = self.registers[*value].clone();
1000
1001 state.insert_with_eviction(key_value, value_data);
1002 pc += 1;
1003 }
1004 OpCode::AppendToArray {
1005 object,
1006 path,
1007 value,
1008 } => {
1009 tracing::trace!(
1010 "AppendToArray: path={}, value={:?}",
1011 path,
1012 self.registers[*value]
1013 );
1014 let max_len = self
1015 .states
1016 .get(&override_state_id)
1017 .map(|s| s.max_array_length())
1018 .unwrap_or(DEFAULT_MAX_ARRAY_LENGTH);
1019 self.append_to_array(*object, path, *value, max_len)?;
1020 dirty_fields.insert(path.to_string());
1021 pc += 1;
1022 }
1023 OpCode::GetCurrentTimestamp { dest } => {
1024 let timestamp = std::time::SystemTime::now()
1025 .duration_since(std::time::UNIX_EPOCH)
1026 .unwrap()
1027 .as_secs() as i64;
1028 self.registers[*dest] = json!(timestamp);
1029 pc += 1;
1030 }
1031 OpCode::CreateEvent { dest, event_value } => {
1032 tracing::trace!(
1033 "CreateEvent: event_value_reg={}, event_value={:?}",
1034 event_value,
1035 self.registers[*event_value]
1036 );
1037 let timestamp = std::time::SystemTime::now()
1038 .duration_since(std::time::UNIX_EPOCH)
1039 .unwrap()
1040 .as_secs() as i64;
1041
1042 let mut event_data = self.registers[*event_value].clone();
1044 if let Some(obj) = event_data.as_object_mut() {
1045 obj.remove("__update_context");
1046 }
1047
1048 let mut event = serde_json::Map::new();
1050 event.insert("timestamp".to_string(), json!(timestamp));
1051 event.insert("data".to_string(), event_data);
1052
1053 if let Some(ref ctx) = self.current_context {
1055 if let Some(slot) = ctx.slot {
1056 event.insert("slot".to_string(), json!(slot));
1057 }
1058 if let Some(ref signature) = ctx.signature {
1059 event.insert("signature".to_string(), json!(signature));
1060 }
1061 }
1062
1063 self.registers[*dest] = Value::Object(event);
1064 pc += 1;
1065 }
1066 OpCode::CreateCapture {
1067 dest,
1068 capture_value,
1069 } => {
1070 let timestamp = std::time::SystemTime::now()
1071 .duration_since(std::time::UNIX_EPOCH)
1072 .unwrap()
1073 .as_secs() as i64;
1074
1075 let capture_data = self.registers[*capture_value].clone();
1077
1078 let account_address = event_value
1080 .get("__account_address")
1081 .and_then(|v| v.as_str())
1082 .unwrap_or("")
1083 .to_string();
1084
1085 let mut capture = serde_json::Map::new();
1087 capture.insert("timestamp".to_string(), json!(timestamp));
1088 capture.insert("account_address".to_string(), json!(account_address));
1089 capture.insert("data".to_string(), capture_data);
1090
1091 if let Some(ref ctx) = self.current_context {
1093 if let Some(slot) = ctx.slot {
1094 capture.insert("slot".to_string(), json!(slot));
1095 }
1096 if let Some(ref signature) = ctx.signature {
1097 capture.insert("signature".to_string(), json!(signature));
1098 }
1099 }
1100
1101 self.registers[*dest] = Value::Object(capture);
1102 pc += 1;
1103 }
1104 OpCode::Transform {
1105 source,
1106 dest,
1107 transformation,
1108 } => {
1109 if source == dest {
1110 let result = self.transform_in_place(*source, transformation);
1111 if let Err(ref e) = result {
1112 tracing::error!(
1113 "Transform {:?} failed at pc={}: {} (source_reg={}, source_value={:?})",
1114 transformation, pc, e, source, &self.registers[*source]
1115 );
1116 }
1117 result?;
1118 } else {
1119 let source_value = &self.registers[*source];
1120 let result = self.apply_transformation(source_value, transformation);
1121 if let Err(ref e) = result {
1122 tracing::error!(
1123 "Transform {:?} failed at pc={}: {} (source_reg={}, source_value={:?})",
1124 transformation, pc, e, source, source_value
1125 );
1126 }
1127 let value = result?;
1128 self.registers[*dest] = value;
1129 }
1130 pc += 1;
1131 }
1132 OpCode::EmitMutation {
1133 entity_name,
1134 key,
1135 state,
1136 } => {
1137 let primary_key = self.registers[*key].clone();
1138
1139 if primary_key.is_null() || dirty_fields.is_empty() {
1140 tracing::warn!(
1142 " ⊘ [VM] Skipping mutation for entity '{}': primary_key={}, dirty_fields.len()={}",
1143 entity_name,
1144 if primary_key.is_null() { "NULL" } else { "present" },
1145 dirty_fields.len()
1146 );
1147 if dirty_fields.is_empty() {
1148 tracing::warn!(" Reason: No fields were modified by this handler");
1149 }
1150 if primary_key.is_null() {
1151 tracing::warn!(" Reason: Primary key could not be resolved (check __resolved_primary_key or key_resolution)");
1152 tracing::warn!(" Debug: reg[15]={:?}, reg[17]={:?}, reg[19]={:?}, reg[20]={:?}",
1154 self.registers.get(15).map(|v| if v.is_null() { "NULL".to_string() } else { format!("{:?}", v) }),
1155 self.registers.get(17).map(|v| if v.is_null() { "NULL".to_string() } else { format!("{:?}", v) }),
1156 self.registers.get(19).map(|v| if v.is_null() { "NULL".to_string() } else { format!("{:?}", v) }),
1157 self.registers.get(20).map(|v| if v.is_null() { "NULL".to_string() } else { format!("{:?}", v) })
1158 );
1159 }
1160 } else {
1161 let patch = self.extract_partial_state(*state, &dirty_fields)?;
1162 tracing::debug!(
1163 " Patch structure: {}",
1164 serde_json::to_string_pretty(&patch).unwrap_or_default()
1165 );
1166 let mutation = Mutation {
1167 export: entity_name.clone(),
1168 key: primary_key,
1169 patch,
1170 };
1171 output.push(mutation);
1172 }
1173 pc += 1;
1174 }
1175 OpCode::SetFieldIfNull {
1176 object,
1177 path,
1178 value,
1179 } => {
1180 let val = self.registers[*value].clone();
1181 let was_set = self.set_field_if_null(*object, path, *value)?;
1182 tracing::trace!(
1183 "SetFieldIfNull: path={}, value={:?}, was_set={}",
1184 path,
1185 val,
1186 was_set
1187 );
1188 if was_set {
1189 dirty_fields.insert(path.to_string());
1190 }
1191 pc += 1;
1192 }
1193 OpCode::SetFieldMax {
1194 object,
1195 path,
1196 value,
1197 } => {
1198 let was_updated = self.set_field_max(*object, path, *value)?;
1199 if was_updated {
1200 dirty_fields.insert(path.to_string());
1201 }
1202 pc += 1;
1203 }
1204 OpCode::UpdateTemporalIndex {
1205 state_id: _,
1206 index_name,
1207 lookup_value,
1208 primary_key,
1209 timestamp,
1210 } => {
1211 let actual_state_id = override_state_id;
1212 let state = self
1213 .states
1214 .get_mut(&actual_state_id)
1215 .ok_or("State table not found")?;
1216 let index = state
1217 .temporal_indexes
1218 .entry(index_name.clone())
1219 .or_insert_with(TemporalIndex::new);
1220
1221 let lookup_val = self.registers[*lookup_value].clone();
1222 let pk_val = self.registers[*primary_key].clone();
1223 let ts_val = if let Some(val) = self.registers[*timestamp].as_i64() {
1224 val
1225 } else if let Some(val) = self.registers[*timestamp].as_u64() {
1226 val as i64
1227 } else {
1228 return Err(format!(
1229 "Timestamp must be a number (i64 or u64), got: {:?}",
1230 self.registers[*timestamp]
1231 )
1232 .into());
1233 };
1234
1235 index.insert(lookup_val, pk_val, ts_val);
1236 pc += 1;
1237 }
1238 OpCode::LookupTemporalIndex {
1239 state_id: _,
1240 index_name,
1241 lookup_value,
1242 timestamp,
1243 dest,
1244 } => {
1245 let actual_state_id = override_state_id;
1246 let state = self
1247 .states
1248 .get(&actual_state_id)
1249 .ok_or("State table not found")?;
1250 let lookup_val = &self.registers[*lookup_value];
1251
1252 let result = if self.registers[*timestamp].is_null() {
1253 if let Some(index) = state.temporal_indexes.get(index_name) {
1254 index.lookup_latest(lookup_val).unwrap_or(Value::Null)
1255 } else {
1256 Value::Null
1257 }
1258 } else {
1259 let ts_val = if let Some(val) = self.registers[*timestamp].as_i64() {
1260 val
1261 } else if let Some(val) = self.registers[*timestamp].as_u64() {
1262 val as i64
1263 } else {
1264 return Err(format!(
1265 "Timestamp must be a number (i64 or u64), got: {:?}",
1266 self.registers[*timestamp]
1267 )
1268 .into());
1269 };
1270
1271 if let Some(index) = state.temporal_indexes.get(index_name) {
1272 index.lookup(lookup_val, ts_val).unwrap_or(Value::Null)
1273 } else {
1274 Value::Null
1275 }
1276 };
1277
1278 self.registers[*dest] = result;
1279 pc += 1;
1280 }
1281 OpCode::UpdateLookupIndex {
1282 state_id: _,
1283 index_name,
1284 lookup_value,
1285 primary_key,
1286 } => {
1287 let actual_state_id = override_state_id;
1288 let state = self
1289 .states
1290 .get_mut(&actual_state_id)
1291 .ok_or("State table not found")?;
1292 let index = state
1293 .lookup_indexes
1294 .entry(index_name.clone())
1295 .or_insert_with(LookupIndex::new);
1296
1297 let lookup_val = self.registers[*lookup_value].clone();
1298 let pk_val = self.registers[*primary_key].clone();
1299
1300 tracing::debug!(
1301 "📝 UpdateLookupIndex: {} -> {} (index: {})",
1302 lookup_val,
1303 pk_val,
1304 index_name
1305 );
1306
1307 index.insert(lookup_val, pk_val);
1308 pc += 1;
1309 }
1310 OpCode::LookupIndex {
1311 state_id: _,
1312 index_name,
1313 lookup_value,
1314 dest,
1315 } => {
1316 let actual_state_id = override_state_id;
1317 let lookup_val = self.registers[*lookup_value].clone();
1318
1319 let result = {
1320 let state = self
1321 .states
1322 .get(&actual_state_id)
1323 .ok_or("State table not found")?;
1324
1325 if let Some(index) = state.lookup_indexes.get(index_name) {
1326 let found = index.lookup(&lookup_val).unwrap_or(Value::Null);
1327 tracing::debug!(
1328 "🔍 LookupIndex: {} -> {} (index: {}, entries: {})",
1329 lookup_val,
1330 found,
1331 index_name,
1332 index.len()
1333 );
1334 found
1335 } else {
1336 Value::Null
1337 }
1338 };
1339
1340 let final_result = if result.is_null() {
1343 tracing::debug!(
1344 "🔍 LookupIndex failed for {}, attempting PDA reverse lookup fallback",
1345 lookup_val
1346 );
1347 if let Some(pda_str) = lookup_val.as_str() {
1348 let state = self
1349 .states
1350 .get_mut(&actual_state_id)
1351 .ok_or("State table not found")?;
1352
1353 if let Some(pda_lookup) =
1354 state.pda_reverse_lookups.get_mut("default_pda_lookup")
1355 {
1356 if let Some(resolved) = pda_lookup.lookup(pda_str) {
1357 tracing::info!(
1358 "🔍 LookupIndex (PDA fallback): {} -> {} (via default_pda_lookup)",
1359 &pda_str[..pda_str.len().min(8)], resolved
1360 );
1361 Value::String(resolved)
1362 } else {
1363 tracing::debug!(
1364 "🔍 LookupIndex (PDA fallback): {} -> NOT FOUND in PDA reverse lookup",
1365 &pda_str[..pda_str.len().min(8)]
1366 );
1367 Value::Null
1368 }
1369 } else {
1370 tracing::debug!(
1371 "🔍 LookupIndex: {} -> NOT FOUND (index: {} does not exist, no PDA lookup table)",
1372 lookup_val, index_name
1373 );
1374 Value::Null
1375 }
1376 } else {
1377 tracing::debug!(
1378 "🔍 LookupIndex: {} -> NOT FOUND (index: {} does not exist)",
1379 lookup_val,
1380 index_name
1381 );
1382 Value::Null
1383 }
1384 } else {
1385 result
1386 };
1387
1388 self.registers[*dest] = final_result;
1389 pc += 1;
1390 }
1391 OpCode::SetFieldSum {
1392 object,
1393 path,
1394 value,
1395 } => {
1396 let was_updated = self.set_field_sum(*object, path, *value)?;
1397 if was_updated {
1398 dirty_fields.insert(path.to_string());
1399 }
1400 pc += 1;
1401 }
1402 OpCode::SetFieldIncrement { object, path } => {
1403 let was_updated = self.set_field_increment(*object, path)?;
1404 if was_updated {
1405 dirty_fields.insert(path.to_string());
1406 }
1407 pc += 1;
1408 }
1409 OpCode::SetFieldMin {
1410 object,
1411 path,
1412 value,
1413 } => {
1414 let was_updated = self.set_field_min(*object, path, *value)?;
1415 if was_updated {
1416 dirty_fields.insert(path.to_string());
1417 }
1418 pc += 1;
1419 }
1420 OpCode::AddToUniqueSet {
1421 state_id: _,
1422 set_name,
1423 value,
1424 count_object,
1425 count_path,
1426 } => {
1427 let value_to_add = self.registers[*value].clone();
1428
1429 let set_field_path = format!("__unique_set:{}", set_name);
1432
1433 let mut set: HashSet<Value> =
1435 if let Ok(existing) = self.get_field(*count_object, &set_field_path) {
1436 if !existing.is_null() {
1437 serde_json::from_value(existing).unwrap_or_default()
1438 } else {
1439 HashSet::new()
1440 }
1441 } else {
1442 HashSet::new()
1443 };
1444
1445 let was_new = set.insert(value_to_add);
1447
1448 let set_as_vec: Vec<Value> = set.iter().cloned().collect();
1450 self.registers[100] = serde_json::to_value(set_as_vec)?;
1451 self.set_field_auto_vivify(*count_object, &set_field_path, 100)?;
1452
1453 if was_new {
1455 self.registers[100] = Value::Number(serde_json::Number::from(set.len()));
1456 self.set_field_auto_vivify(*count_object, count_path, 100)?;
1457 dirty_fields.insert(count_path.to_string());
1458 }
1459
1460 pc += 1;
1461 }
1462 OpCode::ConditionalSetField {
1463 object,
1464 path,
1465 value,
1466 condition_field,
1467 condition_op,
1468 condition_value,
1469 } => {
1470 let field_value = self.load_field(&event_value, condition_field, None)?;
1472
1473 let condition_met =
1475 self.evaluate_comparison(&field_value, condition_op, condition_value)?;
1476
1477 if condition_met {
1478 let val = self.registers[*value].clone();
1479 tracing::trace!(
1480 "ConditionalSetField: condition met, setting {}={:?}",
1481 path,
1482 val
1483 );
1484 self.set_field_auto_vivify(*object, path, *value)?;
1485 dirty_fields.insert(path.to_string());
1486 } else {
1487 tracing::trace!(
1488 "ConditionalSetField: condition not met, skipping {}",
1489 path
1490 );
1491 }
1492 pc += 1;
1493 }
1494 OpCode::ConditionalIncrement {
1495 object,
1496 path,
1497 condition_field,
1498 condition_op,
1499 condition_value,
1500 } => {
1501 let field_value = self.load_field(&event_value, condition_field, None)?;
1503
1504 let condition_met =
1506 self.evaluate_comparison(&field_value, condition_op, condition_value)?;
1507
1508 if condition_met {
1509 tracing::trace!(
1510 "ConditionalIncrement: condition met, incrementing {}",
1511 path
1512 );
1513 let was_updated = self.set_field_increment(*object, path)?;
1514 if was_updated {
1515 dirty_fields.insert(path.to_string());
1516 }
1517 } else {
1518 tracing::trace!(
1519 "ConditionalIncrement: condition not met, skipping {}",
1520 path
1521 );
1522 }
1523 pc += 1;
1524 }
1525 OpCode::EvaluateComputedFields {
1526 state,
1527 computed_paths,
1528 } => {
1529 if let Some(evaluator) = entity_evaluator {
1531 let state_value = &mut self.registers[*state];
1532 match evaluator(state_value) {
1533 Ok(()) => {
1534 for path in computed_paths {
1536 dirty_fields.insert(path.clone());
1537 }
1538 }
1539 Err(_e) => {
1540 }
1542 }
1543 }
1544 pc += 1;
1545 }
1546 OpCode::UpdatePdaReverseLookup {
1547 state_id: _,
1548 lookup_name,
1549 pda_address,
1550 primary_key,
1551 } => {
1552 let actual_state_id = override_state_id;
1553 let state = self
1554 .states
1555 .get_mut(&actual_state_id)
1556 .ok_or("State table not found")?;
1557
1558 let pda_val = self.registers[*pda_address].clone();
1559 let pk_val = self.registers[*primary_key].clone();
1560
1561 if let (Some(pda_str), Some(pk_str)) = (pda_val.as_str(), pk_val.as_str()) {
1563 let pda_lookup = state
1565 .pda_reverse_lookups
1566 .entry(lookup_name.clone())
1567 .or_insert_with(|| PdaReverseLookup::new(10000));
1568
1569 pda_lookup.insert(pda_str.to_string(), pk_str.to_string());
1570
1571 tracing::debug!(
1572 "📝 UpdatePdaReverseLookup: {} -> {} (lookup: {})",
1573 pda_str,
1574 pk_str,
1575 lookup_name
1576 );
1577 } else if !pk_val.is_null() {
1578 if let Some(pk_num) = pk_val.as_u64() {
1580 if let Some(pda_str) = pda_val.as_str() {
1581 let pda_lookup = state
1582 .pda_reverse_lookups
1583 .entry(lookup_name.clone())
1584 .or_insert_with(|| PdaReverseLookup::new(10000));
1585
1586 pda_lookup.insert(pda_str.to_string(), pk_num.to_string());
1587
1588 tracing::debug!(
1589 "📝 UpdatePdaReverseLookup: {} -> {} (lookup: {}, pk was u64)",
1590 pda_str,
1591 pk_num,
1592 lookup_name
1593 );
1594 }
1595 }
1596 }
1597
1598 pc += 1;
1599 }
1600 }
1601
1602 self.instructions_executed += 1;
1603 }
1604
1605 tracing::info!(
1606 "🏁 Handler completed: produced {} mutation(s)",
1607 output.len()
1608 );
1609 Ok(output)
1610 }
1611
1612 fn load_field(
1613 &self,
1614 event_value: &Value,
1615 path: &FieldPath,
1616 default: Option<&Value>,
1617 ) -> Result<Value> {
1618 if path.segments.iter().any(|s| s.is_empty()) {
1620 tracing::warn!(
1621 "load_field: path contains empty segment! path={:?}",
1622 path.segments
1623 );
1624 }
1625
1626 if path.segments.is_empty() {
1627 if let Some(obj) = event_value.as_object() {
1631 let filtered: serde_json::Map<String, Value> = obj
1632 .iter()
1633 .filter(|(k, _)| !k.starts_with("__"))
1634 .map(|(k, v)| (k.clone(), v.clone()))
1635 .collect();
1636 return Ok(Value::Object(filtered));
1637 }
1638 return Ok(event_value.clone());
1639 }
1640
1641 let mut current = event_value;
1642 for (i, segment) in path.segments.iter().enumerate() {
1643 current = match current.get(segment) {
1644 Some(v) => v,
1645 None => {
1646 if path.segments.len() == 2 && path.segments[0] == "accounts" {
1648 tracing::warn!(
1649 "load_field: could not find segment '{}' at index {} in path {:?}",
1650 segment,
1651 i,
1652 path.segments,
1653 );
1654 tracing::warn!(
1655 " event has top-level keys: {:?}",
1656 event_value
1657 .as_object()
1658 .map(|o| o.keys().collect::<Vec<_>>())
1659 );
1660 if let Some(accounts) = event_value.get("accounts") {
1662 tracing::warn!(
1663 " 'accounts' exists with keys: {:?}",
1664 accounts.as_object().map(|o| o.keys().collect::<Vec<_>>())
1665 );
1666 } else {
1667 tracing::warn!(" 'accounts' key does NOT exist in event_value");
1668 }
1669 }
1670 return Ok(default.cloned().unwrap_or(Value::Null));
1671 }
1672 };
1673 }
1674
1675 Ok(current.clone())
1676 }
1677
1678 fn set_field_auto_vivify(
1679 &mut self,
1680 object_reg: Register,
1681 path: &str,
1682 value_reg: Register,
1683 ) -> Result<()> {
1684 let compiled = self.get_compiled_path(path);
1685 let segments = compiled.segments();
1686 let value = self.registers[value_reg].clone();
1687
1688 if !self.registers[object_reg].is_object() {
1689 self.registers[object_reg] = json!({});
1690 }
1691
1692 let obj = self.registers[object_reg]
1693 .as_object_mut()
1694 .ok_or("Not an object")?;
1695
1696 let mut current = obj;
1697 for (i, segment) in segments.iter().enumerate() {
1698 if i == segments.len() - 1 {
1699 current.insert(segment.to_string(), value.clone());
1700 } else {
1701 current
1702 .entry(segment.to_string())
1703 .or_insert_with(|| json!({}));
1704 current = current
1705 .get_mut(segment)
1706 .and_then(|v| v.as_object_mut())
1707 .ok_or("Path collision: expected object")?;
1708 }
1709 }
1710
1711 Ok(())
1712 }
1713
1714 fn set_field_if_null(
1715 &mut self,
1716 object_reg: Register,
1717 path: &str,
1718 value_reg: Register,
1719 ) -> Result<bool> {
1720 let compiled = self.get_compiled_path(path);
1721 let segments = compiled.segments();
1722 let value = self.registers[value_reg].clone();
1723
1724 if !self.registers[object_reg].is_object() {
1725 self.registers[object_reg] = json!({});
1726 }
1727
1728 let obj = self.registers[object_reg]
1729 .as_object_mut()
1730 .ok_or("Not an object")?;
1731
1732 let mut current = obj;
1733 let mut was_set = false;
1734 for (i, segment) in segments.iter().enumerate() {
1735 if i == segments.len() - 1 {
1736 if !current.contains_key(segment) || current.get(segment).unwrap().is_null() {
1737 current.insert(segment.to_string(), value.clone());
1738 was_set = true;
1739 }
1740 } else {
1741 current
1742 .entry(segment.to_string())
1743 .or_insert_with(|| json!({}));
1744 current = current
1745 .get_mut(segment)
1746 .and_then(|v| v.as_object_mut())
1747 .ok_or("Path collision: expected object")?;
1748 }
1749 }
1750
1751 Ok(was_set)
1752 }
1753
1754 fn set_field_max(
1755 &mut self,
1756 object_reg: Register,
1757 path: &str,
1758 value_reg: Register,
1759 ) -> Result<bool> {
1760 let compiled = self.get_compiled_path(path);
1761 let segments = compiled.segments();
1762 let new_value = self.registers[value_reg].clone();
1763
1764 if !self.registers[object_reg].is_object() {
1765 self.registers[object_reg] = json!({});
1766 }
1767
1768 let obj = self.registers[object_reg]
1769 .as_object_mut()
1770 .ok_or("Not an object")?;
1771
1772 let mut current = obj;
1773 let mut was_updated = false;
1774 for (i, segment) in segments.iter().enumerate() {
1775 if i == segments.len() - 1 {
1776 let should_update = if let Some(current_value) = current.get(segment) {
1777 if current_value.is_null() {
1778 true
1779 } else {
1780 match (current_value.as_i64(), new_value.as_i64()) {
1781 (Some(current_val), Some(new_val)) => new_val > current_val,
1782 (Some(current_val), None) if new_value.as_u64().is_some() => {
1783 new_value.as_u64().unwrap() as i64 > current_val
1784 }
1785 (None, Some(new_val)) if current_value.as_u64().is_some() => {
1786 new_val > current_value.as_u64().unwrap() as i64
1787 }
1788 (None, None) => match (current_value.as_u64(), new_value.as_u64()) {
1789 (Some(current_val), Some(new_val)) => new_val > current_val,
1790 _ => match (current_value.as_f64(), new_value.as_f64()) {
1791 (Some(current_val), Some(new_val)) => new_val > current_val,
1792 _ => false,
1793 },
1794 },
1795 _ => false,
1796 }
1797 }
1798 } else {
1799 true
1800 };
1801
1802 if should_update {
1803 current.insert(segment.to_string(), new_value.clone());
1804 was_updated = true;
1805 }
1806 } else {
1807 current
1808 .entry(segment.to_string())
1809 .or_insert_with(|| json!({}));
1810 current = current
1811 .get_mut(segment)
1812 .and_then(|v| v.as_object_mut())
1813 .ok_or("Path collision: expected object")?;
1814 }
1815 }
1816
1817 Ok(was_updated)
1818 }
1819
1820 fn set_field_sum(
1821 &mut self,
1822 object_reg: Register,
1823 path: &str,
1824 value_reg: Register,
1825 ) -> Result<bool> {
1826 let compiled = self.get_compiled_path(path);
1827 let segments = compiled.segments();
1828 let new_value = self.registers[value_reg].clone();
1829
1830 if !self.registers[object_reg].is_object() {
1831 self.registers[object_reg] = json!({});
1832 }
1833
1834 let obj = self.registers[object_reg]
1835 .as_object_mut()
1836 .ok_or("Not an object")?;
1837
1838 let mut current = obj;
1839 for (i, segment) in segments.iter().enumerate() {
1840 if i == segments.len() - 1 {
1841 let current_val = current
1843 .get(segment)
1844 .and_then(|v| {
1845 if v.is_null() {
1846 None
1847 } else {
1848 v.as_i64().or_else(|| v.as_u64().map(|n| n as i64))
1849 }
1850 })
1851 .unwrap_or(0);
1852
1853 let new_val_num = new_value
1855 .as_i64()
1856 .or_else(|| new_value.as_u64().map(|n| n as i64))
1857 .ok_or("Sum requires numeric value")?;
1858
1859 let sum = current_val + new_val_num;
1860 current.insert(segment.to_string(), json!(sum));
1861 return Ok(true);
1862 } else {
1863 current
1864 .entry(segment.to_string())
1865 .or_insert_with(|| json!({}));
1866 current = current
1867 .get_mut(segment)
1868 .and_then(|v| v.as_object_mut())
1869 .ok_or("Path collision: expected object")?;
1870 }
1871 }
1872
1873 Ok(false)
1874 }
1875
1876 fn set_field_increment(&mut self, object_reg: Register, path: &str) -> Result<bool> {
1877 let compiled = self.get_compiled_path(path);
1878 let segments = compiled.segments();
1879
1880 if !self.registers[object_reg].is_object() {
1881 self.registers[object_reg] = json!({});
1882 }
1883
1884 let obj = self.registers[object_reg]
1885 .as_object_mut()
1886 .ok_or("Not an object")?;
1887
1888 let mut current = obj;
1889 for (i, segment) in segments.iter().enumerate() {
1890 if i == segments.len() - 1 {
1891 let current_val = current
1893 .get(segment)
1894 .and_then(|v| {
1895 if v.is_null() {
1896 None
1897 } else {
1898 v.as_i64().or_else(|| v.as_u64().map(|n| n as i64))
1899 }
1900 })
1901 .unwrap_or(0);
1902
1903 let incremented = current_val + 1;
1904 current.insert(segment.to_string(), json!(incremented));
1905 return Ok(true);
1906 } else {
1907 current
1908 .entry(segment.to_string())
1909 .or_insert_with(|| json!({}));
1910 current = current
1911 .get_mut(segment)
1912 .and_then(|v| v.as_object_mut())
1913 .ok_or("Path collision: expected object")?;
1914 }
1915 }
1916
1917 Ok(false)
1918 }
1919
1920 fn set_field_min(
1921 &mut self,
1922 object_reg: Register,
1923 path: &str,
1924 value_reg: Register,
1925 ) -> Result<bool> {
1926 let compiled = self.get_compiled_path(path);
1927 let segments = compiled.segments();
1928 let new_value = self.registers[value_reg].clone();
1929
1930 if !self.registers[object_reg].is_object() {
1931 self.registers[object_reg] = json!({});
1932 }
1933
1934 let obj = self.registers[object_reg]
1935 .as_object_mut()
1936 .ok_or("Not an object")?;
1937
1938 let mut current = obj;
1939 let mut was_updated = false;
1940 for (i, segment) in segments.iter().enumerate() {
1941 if i == segments.len() - 1 {
1942 let should_update = if let Some(current_value) = current.get(segment) {
1943 if current_value.is_null() {
1944 true
1945 } else {
1946 match (current_value.as_i64(), new_value.as_i64()) {
1947 (Some(current_val), Some(new_val)) => new_val < current_val,
1948 (Some(current_val), None) if new_value.as_u64().is_some() => {
1949 (new_value.as_u64().unwrap() as i64) < current_val
1950 }
1951 (None, Some(new_val)) if current_value.as_u64().is_some() => {
1952 new_val < current_value.as_u64().unwrap() as i64
1953 }
1954 (None, None) => match (current_value.as_u64(), new_value.as_u64()) {
1955 (Some(current_val), Some(new_val)) => new_val < current_val,
1956 _ => match (current_value.as_f64(), new_value.as_f64()) {
1957 (Some(current_val), Some(new_val)) => new_val < current_val,
1958 _ => false,
1959 },
1960 },
1961 _ => false,
1962 }
1963 }
1964 } else {
1965 true
1966 };
1967
1968 if should_update {
1969 current.insert(segment.to_string(), new_value.clone());
1970 was_updated = true;
1971 }
1972 } else {
1973 current
1974 .entry(segment.to_string())
1975 .or_insert_with(|| json!({}));
1976 current = current
1977 .get_mut(segment)
1978 .and_then(|v| v.as_object_mut())
1979 .ok_or("Path collision: expected object")?;
1980 }
1981 }
1982
1983 Ok(was_updated)
1984 }
1985
1986 fn get_field(&mut self, object_reg: Register, path: &str) -> Result<Value> {
1987 let compiled = self.get_compiled_path(path);
1988 let segments = compiled.segments();
1989 let mut current = &self.registers[object_reg];
1990
1991 for segment in segments {
1992 current = current
1993 .get(segment)
1994 .ok_or_else(|| format!("Field not found: {}", segment))?;
1995 }
1996
1997 Ok(current.clone())
1998 }
1999
2000 fn append_to_array(
2001 &mut self,
2002 object_reg: Register,
2003 path: &str,
2004 value_reg: Register,
2005 max_length: usize,
2006 ) -> Result<()> {
2007 let compiled = self.get_compiled_path(path);
2008 let segments = compiled.segments();
2009 let value = self.registers[value_reg].clone();
2010
2011 if !self.registers[object_reg].is_object() {
2012 self.registers[object_reg] = json!({});
2013 }
2014
2015 let obj = self.registers[object_reg]
2016 .as_object_mut()
2017 .ok_or("Not an object")?;
2018
2019 let mut current = obj;
2020 for (i, segment) in segments.iter().enumerate() {
2021 if i == segments.len() - 1 {
2022 current
2023 .entry(segment.to_string())
2024 .or_insert_with(|| json!([]));
2025 let arr = current
2026 .get_mut(segment)
2027 .and_then(|v| v.as_array_mut())
2028 .ok_or("Path is not an array")?;
2029 arr.push(value.clone());
2030
2031 if arr.len() > max_length {
2032 let excess = arr.len() - max_length;
2033 arr.drain(0..excess);
2034 }
2035 } else {
2036 current
2037 .entry(segment.to_string())
2038 .or_insert_with(|| json!({}));
2039 current = current
2040 .get_mut(segment)
2041 .and_then(|v| v.as_object_mut())
2042 .ok_or("Path collision: expected object")?;
2043 }
2044 }
2045
2046 Ok(())
2047 }
2048
2049 fn transform_in_place(&mut self, reg: Register, transformation: &Transformation) -> Result<()> {
2050 let value = &self.registers[reg];
2051 let transformed = self.apply_transformation(value, transformation)?;
2052 self.registers[reg] = transformed;
2053 Ok(())
2054 }
2055
2056 fn apply_transformation(
2057 &self,
2058 value: &Value,
2059 transformation: &Transformation,
2060 ) -> Result<Value> {
2061 match transformation {
2062 Transformation::HexEncode => {
2063 if let Some(arr) = value.as_array() {
2064 let bytes: Vec<u8> = arr
2065 .iter()
2066 .filter_map(|v| v.as_u64().map(|n| n as u8))
2067 .collect();
2068 let hex = hex::encode(&bytes);
2069 Ok(json!(hex))
2070 } else {
2071 Err("HexEncode requires an array of numbers".into())
2072 }
2073 }
2074 Transformation::HexDecode => {
2075 if let Some(s) = value.as_str() {
2076 let s = s.strip_prefix("0x").unwrap_or(s);
2077 let bytes = hex::decode(s).map_err(|e| format!("Hex decode error: {}", e))?;
2078 Ok(json!(bytes))
2079 } else {
2080 Err("HexDecode requires a string".into())
2081 }
2082 }
2083 Transformation::Base58Encode => {
2084 if let Some(arr) = value.as_array() {
2085 let bytes: Vec<u8> = arr
2086 .iter()
2087 .filter_map(|v| v.as_u64().map(|n| n as u8))
2088 .collect();
2089 let encoded = bs58::encode(&bytes).into_string();
2090 Ok(json!(encoded))
2091 } else if value.is_string() {
2092 tracing::debug!(
2094 "Base58Encode: value is already a string, passing through: {:?}",
2095 value
2096 );
2097 Ok(value.clone())
2098 } else {
2099 tracing::error!(
2100 "Base58Encode failed: value type is {:?}, value: {:?}",
2101 if value.is_null() {
2102 "null"
2103 } else if value.is_boolean() {
2104 "boolean"
2105 } else if value.is_number() {
2106 "number"
2107 } else if value.is_object() {
2108 "object"
2109 } else {
2110 "unknown"
2111 },
2112 value
2113 );
2114 Err("Base58Encode requires an array of numbers".into())
2115 }
2116 }
2117 Transformation::Base58Decode => {
2118 if let Some(s) = value.as_str() {
2119 let bytes = bs58::decode(s)
2120 .into_vec()
2121 .map_err(|e| format!("Base58 decode error: {}", e))?;
2122 Ok(json!(bytes))
2123 } else {
2124 Err("Base58Decode requires a string".into())
2125 }
2126 }
2127 Transformation::ToString => Ok(json!(value.to_string())),
2128 Transformation::ToNumber => {
2129 if let Some(s) = value.as_str() {
2130 let n = s
2131 .parse::<i64>()
2132 .map_err(|e| format!("Parse error: {}", e))?;
2133 Ok(json!(n))
2134 } else {
2135 Ok(value.clone())
2136 }
2137 }
2138 }
2139 }
2140
2141 fn evaluate_comparison(
2142 &self,
2143 field_value: &Value,
2144 op: &ComparisonOp,
2145 condition_value: &Value,
2146 ) -> Result<bool> {
2147 use ComparisonOp::*;
2148
2149 match op {
2150 Equal => Ok(field_value == condition_value),
2151 NotEqual => Ok(field_value != condition_value),
2152 GreaterThan => {
2153 match (field_value.as_i64(), condition_value.as_i64()) {
2155 (Some(a), Some(b)) => Ok(a > b),
2156 _ => match (field_value.as_u64(), condition_value.as_u64()) {
2157 (Some(a), Some(b)) => Ok(a > b),
2158 _ => match (field_value.as_f64(), condition_value.as_f64()) {
2159 (Some(a), Some(b)) => Ok(a > b),
2160 _ => Err("Cannot compare non-numeric values with GreaterThan".into()),
2161 },
2162 },
2163 }
2164 }
2165 GreaterThanOrEqual => match (field_value.as_i64(), condition_value.as_i64()) {
2166 (Some(a), Some(b)) => Ok(a >= b),
2167 _ => match (field_value.as_u64(), condition_value.as_u64()) {
2168 (Some(a), Some(b)) => Ok(a >= b),
2169 _ => match (field_value.as_f64(), condition_value.as_f64()) {
2170 (Some(a), Some(b)) => Ok(a >= b),
2171 _ => {
2172 Err("Cannot compare non-numeric values with GreaterThanOrEqual".into())
2173 }
2174 },
2175 },
2176 },
2177 LessThan => match (field_value.as_i64(), condition_value.as_i64()) {
2178 (Some(a), Some(b)) => Ok(a < b),
2179 _ => match (field_value.as_u64(), condition_value.as_u64()) {
2180 (Some(a), Some(b)) => Ok(a < b),
2181 _ => match (field_value.as_f64(), condition_value.as_f64()) {
2182 (Some(a), Some(b)) => Ok(a < b),
2183 _ => Err("Cannot compare non-numeric values with LessThan".into()),
2184 },
2185 },
2186 },
2187 LessThanOrEqual => match (field_value.as_i64(), condition_value.as_i64()) {
2188 (Some(a), Some(b)) => Ok(a <= b),
2189 _ => match (field_value.as_u64(), condition_value.as_u64()) {
2190 (Some(a), Some(b)) => Ok(a <= b),
2191 _ => match (field_value.as_f64(), condition_value.as_f64()) {
2192 (Some(a), Some(b)) => Ok(a <= b),
2193 _ => Err("Cannot compare non-numeric values with LessThanOrEqual".into()),
2194 },
2195 },
2196 },
2197 }
2198 }
2199
2200 #[cfg_attr(feature = "otel", instrument(
2214 name = "vm.update_pda_lookup",
2215 skip(self),
2216 fields(
2217 pda = %pda_address,
2218 seed = %seed_value,
2219 )
2220 ))]
2221 pub fn update_pda_reverse_lookup(
2222 &mut self,
2223 state_id: u32,
2224 lookup_name: &str,
2225 pda_address: String,
2226 seed_value: String,
2227 ) -> Result<Vec<PendingAccountUpdate>> {
2228 let state = self
2229 .states
2230 .get_mut(&state_id)
2231 .ok_or("State table not found")?;
2232
2233 let lookup = state
2234 .pda_reverse_lookups
2235 .entry(lookup_name.to_string())
2236 .or_insert_with(|| PdaReverseLookup::new(10000));
2237
2238 tracing::info!(
2239 "📝 Registering PDA reverse lookup: {} -> {} (lookup: {})",
2240 pda_address,
2241 seed_value,
2242 lookup_name
2243 );
2244
2245 let evicted_pda = lookup.insert(pda_address.clone(), seed_value);
2247
2248 if let Some(ref evicted) = evicted_pda {
2250 if let Some((_, evicted_updates)) = state.pending_updates.remove(evicted) {
2251 let count = evicted_updates.len();
2252 self.pending_queue_size = self.pending_queue_size.saturating_sub(count as u64);
2253 tracing::info!(
2254 "Cleaned up {} pending updates for evicted PDA {} from LRU cache",
2255 count,
2256 evicted
2257 );
2258 }
2259 }
2260
2261 self.flush_pending_updates(state_id, &pda_address)
2263 }
2264
2265 pub fn cleanup_expired_pending_updates(&mut self, state_id: u32) -> usize {
2270 let state = match self.states.get_mut(&state_id) {
2271 Some(s) => s,
2272 None => return 0,
2273 };
2274
2275 let now = std::time::SystemTime::now()
2276 .duration_since(std::time::UNIX_EPOCH)
2277 .unwrap()
2278 .as_secs() as i64;
2279
2280 let mut removed_count = 0;
2281
2282 state.pending_updates.retain(|_pda_address, updates| {
2284 let original_len = updates.len();
2285
2286 updates.retain(|update| {
2287 let age = now - update.queued_at;
2288 age <= PENDING_UPDATE_TTL_SECONDS
2289 });
2290
2291 removed_count += original_len - updates.len();
2292
2293 !updates.is_empty()
2295 });
2296
2297 self.pending_queue_size = self.pending_queue_size.saturating_sub(removed_count as u64);
2299
2300 if removed_count > 0 {
2301 tracing::info!(
2302 "Cleaned up {} expired pending updates (TTL: {}s)",
2303 removed_count,
2304 PENDING_UPDATE_TTL_SECONDS
2305 );
2306 }
2307
2308 removed_count
2309 }
2310
2311 #[cfg_attr(feature = "otel", instrument(
2346 name = "vm.queue_account_update",
2347 skip(self, update),
2348 fields(
2349 pda = %update.pda_address,
2350 account_type = %update.account_type,
2351 slot = update.slot,
2352 )
2353 ))]
2354 pub fn queue_account_update(
2355 &mut self,
2356 state_id: u32,
2357 update: QueuedAccountUpdate,
2358 ) -> Result<()> {
2359 if self.pending_queue_size >= MAX_PENDING_UPDATES_TOTAL as u64 {
2360 tracing::warn!(
2361 "Pending queue size limit reached ({}), cleaning up expired updates",
2362 MAX_PENDING_UPDATES_TOTAL
2363 );
2364 let removed = self.cleanup_expired_pending_updates(state_id);
2365
2366 if self.pending_queue_size >= MAX_PENDING_UPDATES_TOTAL as u64 {
2367 tracing::warn!(
2368 "Still at limit after cleanup (removed {}), will drop oldest update",
2369 removed
2370 );
2371 self.drop_oldest_pending_update(state_id)?;
2372 }
2373 }
2374
2375 let state = self
2376 .states
2377 .get_mut(&state_id)
2378 .ok_or("State table not found")?;
2379
2380 tracing::debug!(
2381 "📋 Queued account update for PDA {} (type: {}, slot: {})",
2382 update.pda_address,
2383 update.account_type,
2384 update.slot
2385 );
2386 let pending = PendingAccountUpdate {
2387 account_type: update.account_type,
2388 pda_address: update.pda_address.clone(),
2389 account_data: update.account_data,
2390 slot: update.slot,
2391 write_version: update.write_version,
2392 signature: update.signature,
2393 queued_at: std::time::SystemTime::now()
2394 .duration_since(std::time::UNIX_EPOCH)
2395 .unwrap()
2396 .as_secs() as i64,
2397 };
2398
2399 let pda_address = pending.pda_address.clone();
2400 let slot = pending.slot;
2401
2402 let mut updates = state
2403 .pending_updates
2404 .entry(pda_address.clone())
2405 .or_insert_with(Vec::new);
2406
2407 let original_len = updates.len();
2408 updates.retain(|existing| existing.slot > slot);
2409 let removed_by_dedup = original_len - updates.len();
2410
2411 if removed_by_dedup > 0 {
2412 self.pending_queue_size = self
2413 .pending_queue_size
2414 .saturating_sub(removed_by_dedup as u64);
2415 tracing::debug!(
2416 "Deduplicated {} older update(s) for PDA {} (new slot: {})",
2417 removed_by_dedup,
2418 pda_address,
2419 slot
2420 );
2421 }
2422
2423 if updates.len() >= MAX_PENDING_UPDATES_PER_PDA {
2424 tracing::warn!(
2425 "Per-PDA limit reached for {} ({} updates), dropping oldest",
2426 pda_address,
2427 updates.len()
2428 );
2429 updates.remove(0);
2430 self.pending_queue_size = self.pending_queue_size.saturating_sub(1);
2431 }
2432
2433 updates.push(pending);
2434
2435 Ok(())
2436 }
2437
2438 pub fn get_pending_queue_stats(&self, state_id: u32) -> Option<PendingQueueStats> {
2440 let state = self.states.get(&state_id)?;
2441
2442 let now = std::time::SystemTime::now()
2443 .duration_since(std::time::UNIX_EPOCH)
2444 .unwrap()
2445 .as_secs() as i64;
2446
2447 let mut total_updates = 0;
2448 let mut oldest_timestamp = now;
2449 let mut largest_pda_queue = 0;
2450 let mut estimated_memory = 0;
2451
2452 for entry in state.pending_updates.iter() {
2453 let (_, updates) = entry.pair();
2454 total_updates += updates.len();
2455 largest_pda_queue = largest_pda_queue.max(updates.len());
2456
2457 for update in updates.iter() {
2458 oldest_timestamp = oldest_timestamp.min(update.queued_at);
2459 estimated_memory += update.account_type.len() +
2461 update.pda_address.len() +
2462 update.signature.len() +
2463 16 + estimate_json_size(&update.account_data);
2465 }
2466 }
2467
2468 Some(PendingQueueStats {
2469 total_updates,
2470 unique_pdas: state.pending_updates.len(),
2471 oldest_age_seconds: now - oldest_timestamp,
2472 largest_pda_queue_size: largest_pda_queue,
2473 estimated_memory_bytes: estimated_memory,
2474 })
2475 }
2476
2477 pub fn get_memory_stats(&self, state_id: u32) -> VmMemoryStats {
2478 let mut stats = VmMemoryStats {
2479 path_cache_size: self.path_cache.len(),
2480 ..Default::default()
2481 };
2482
2483 if let Some(state) = self.states.get(&state_id) {
2484 stats.state_table_entity_count = state.data.len();
2485 stats.state_table_max_entries = state.config.max_entries;
2486 stats.state_table_at_capacity = state.is_at_capacity();
2487
2488 stats.lookup_index_count = state.lookup_indexes.len();
2489 stats.lookup_index_total_entries =
2490 state.lookup_indexes.values().map(|idx| idx.len()).sum();
2491
2492 stats.temporal_index_count = state.temporal_indexes.len();
2493 stats.temporal_index_total_entries = state
2494 .temporal_indexes
2495 .values()
2496 .map(|idx| idx.total_entries())
2497 .sum();
2498
2499 stats.pda_reverse_lookup_count = state.pda_reverse_lookups.len();
2500 stats.pda_reverse_lookup_total_entries = state
2501 .pda_reverse_lookups
2502 .values()
2503 .map(|lookup| lookup.len())
2504 .sum();
2505
2506 stats.pending_queue_stats = self.get_pending_queue_stats(state_id);
2507 }
2508
2509 stats
2510 }
2511
2512 pub fn cleanup_all_expired(&mut self, state_id: u32) -> CleanupResult {
2513 let pending_removed = self.cleanup_expired_pending_updates(state_id);
2514 let temporal_removed = self.cleanup_temporal_indexes(state_id);
2515
2516 CleanupResult {
2517 pending_updates_removed: pending_removed,
2518 temporal_entries_removed: temporal_removed,
2519 }
2520 }
2521
2522 fn cleanup_temporal_indexes(&mut self, state_id: u32) -> usize {
2523 let state = match self.states.get_mut(&state_id) {
2524 Some(s) => s,
2525 None => return 0,
2526 };
2527
2528 let now = std::time::SystemTime::now()
2529 .duration_since(std::time::UNIX_EPOCH)
2530 .unwrap()
2531 .as_secs() as i64;
2532
2533 let cutoff = now - TEMPORAL_HISTORY_TTL_SECONDS;
2534 let mut total_removed = 0;
2535
2536 for (_, index) in state.temporal_indexes.iter_mut() {
2537 for mut entry in index.index.iter_mut() {
2538 let original_len = entry.value().len();
2539 entry.value_mut().retain(|(_, ts)| *ts >= cutoff);
2540 total_removed += original_len - entry.value().len();
2541 }
2542 index.index.retain(|_, entries| !entries.is_empty());
2543 }
2544
2545 if total_removed > 0 {
2546 tracing::info!(
2547 "Cleaned up {} expired temporal index entries (TTL: {}s)",
2548 total_removed,
2549 TEMPORAL_HISTORY_TTL_SECONDS
2550 );
2551 }
2552
2553 total_removed
2554 }
2555
2556 pub fn check_state_table_capacity(&self, state_id: u32) -> Option<CapacityWarning> {
2557 let state = self.states.get(&state_id)?;
2558
2559 if state.is_at_capacity() {
2560 Some(CapacityWarning {
2561 current_entries: state.data.len(),
2562 max_entries: state.config.max_entries,
2563 entries_over_limit: state.entries_over_limit(),
2564 })
2565 } else {
2566 None
2567 }
2568 }
2569
2570 fn drop_oldest_pending_update(&mut self, state_id: u32) -> Result<()> {
2572 let state = self
2573 .states
2574 .get_mut(&state_id)
2575 .ok_or("State table not found")?;
2576
2577 let mut oldest_pda: Option<String> = None;
2578 let mut oldest_timestamp = i64::MAX;
2579
2580 for entry in state.pending_updates.iter() {
2582 let (pda, updates) = entry.pair();
2583 if let Some(update) = updates.first() {
2584 if update.queued_at < oldest_timestamp {
2585 oldest_timestamp = update.queued_at;
2586 oldest_pda = Some(pda.clone());
2587 }
2588 }
2589 }
2590
2591 if let Some(pda) = oldest_pda {
2593 if let Some(mut updates) = state.pending_updates.get_mut(&pda) {
2594 if !updates.is_empty() {
2595 updates.remove(0);
2596 self.pending_queue_size = self.pending_queue_size.saturating_sub(1);
2597
2598 if updates.is_empty() {
2600 drop(updates);
2601 state.pending_updates.remove(&pda);
2602 }
2603 }
2604 }
2605 }
2606
2607 Ok(())
2608 }
2609
2610 fn flush_pending_updates(
2615 &mut self,
2616 state_id: u32,
2617 pda_address: &str,
2618 ) -> Result<Vec<PendingAccountUpdate>> {
2619 let state = self
2620 .states
2621 .get_mut(&state_id)
2622 .ok_or("State table not found")?;
2623
2624 if let Some((_, pending_updates)) = state.pending_updates.remove(pda_address) {
2625 let count = pending_updates.len();
2626 self.pending_queue_size = self.pending_queue_size.saturating_sub(count as u64);
2627 tracing::info!(
2628 "🔄 Flushed {} pending account update(s) for PDA {}",
2629 count,
2630 pda_address
2631 );
2632 Ok(pending_updates)
2633 } else {
2634 Ok(Vec::new())
2635 }
2636 }
2637
2638 pub fn try_pda_reverse_lookup(
2640 &mut self,
2641 state_id: u32,
2642 lookup_name: &str,
2643 pda_address: &str,
2644 ) -> Option<String> {
2645 let state = self.states.get_mut(&state_id)?;
2646
2647 if let Some(lookup) = state.pda_reverse_lookups.get_mut(lookup_name) {
2648 if let Some(value) = lookup.lookup(pda_address) {
2649 self.pda_cache_hits += 1;
2650 tracing::debug!(
2651 "✓ PDA reverse lookup cache hit: {} -> {}",
2652 pda_address,
2653 value
2654 );
2655 return Some(value);
2656 }
2657 }
2658
2659 self.pda_cache_misses += 1;
2660 tracing::debug!("✗ PDA reverse lookup cache miss: {}", pda_address);
2661 None
2662 }
2663
2664 pub fn evaluate_computed_expr(&self, expr: &ComputedExpr, state: &Value) -> Result<Value> {
2671 self.evaluate_computed_expr_with_env(expr, state, &std::collections::HashMap::new())
2672 }
2673
2674 fn evaluate_computed_expr_with_env(
2676 &self,
2677 expr: &ComputedExpr,
2678 state: &Value,
2679 env: &std::collections::HashMap<String, Value>,
2680 ) -> Result<Value> {
2681 match expr {
2682 ComputedExpr::FieldRef { path } => self.get_field_from_state(state, path),
2683
2684 ComputedExpr::Var { name } => env
2685 .get(name)
2686 .cloned()
2687 .ok_or_else(|| format!("Undefined variable: {}", name).into()),
2688
2689 ComputedExpr::Let { name, value, body } => {
2690 let val = self.evaluate_computed_expr_with_env(value, state, env)?;
2691 let mut new_env = env.clone();
2692 new_env.insert(name.clone(), val);
2693 self.evaluate_computed_expr_with_env(body, state, &new_env)
2694 }
2695
2696 ComputedExpr::If {
2697 condition,
2698 then_branch,
2699 else_branch,
2700 } => {
2701 let cond_val = self.evaluate_computed_expr_with_env(condition, state, env)?;
2702 if self.value_to_bool(&cond_val) {
2703 self.evaluate_computed_expr_with_env(then_branch, state, env)
2704 } else {
2705 self.evaluate_computed_expr_with_env(else_branch, state, env)
2706 }
2707 }
2708
2709 ComputedExpr::None => Ok(Value::Null),
2710
2711 ComputedExpr::Some { value } => self.evaluate_computed_expr_with_env(value, state, env),
2712
2713 ComputedExpr::Slice { expr, start, end } => {
2714 let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
2715 match val {
2716 Value::Array(arr) => {
2717 let slice: Vec<Value> = arr.get(*start..*end).unwrap_or(&[]).to_vec();
2718 Ok(Value::Array(slice))
2719 }
2720 _ => Err(format!("Cannot slice non-array value: {:?}", val).into()),
2721 }
2722 }
2723
2724 ComputedExpr::Index { expr, index } => {
2725 let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
2726 match val {
2727 Value::Array(arr) => Ok(arr.get(*index).cloned().unwrap_or(Value::Null)),
2728 _ => Err(format!("Cannot index non-array value: {:?}", val).into()),
2729 }
2730 }
2731
2732 ComputedExpr::U64FromLeBytes { bytes } => {
2733 let val = self.evaluate_computed_expr_with_env(bytes, state, env)?;
2734 let byte_vec = self.value_to_bytes(&val)?;
2735 if byte_vec.len() < 8 {
2736 return Err(format!(
2737 "u64::from_le_bytes requires 8 bytes, got {}",
2738 byte_vec.len()
2739 )
2740 .into());
2741 }
2742 let arr: [u8; 8] = byte_vec[..8]
2743 .try_into()
2744 .map_err(|_| "Failed to convert to [u8; 8]")?;
2745 Ok(json!(u64::from_le_bytes(arr)))
2746 }
2747
2748 ComputedExpr::U64FromBeBytes { bytes } => {
2749 let val = self.evaluate_computed_expr_with_env(bytes, state, env)?;
2750 let byte_vec = self.value_to_bytes(&val)?;
2751 if byte_vec.len() < 8 {
2752 return Err(format!(
2753 "u64::from_be_bytes requires 8 bytes, got {}",
2754 byte_vec.len()
2755 )
2756 .into());
2757 }
2758 let arr: [u8; 8] = byte_vec[..8]
2759 .try_into()
2760 .map_err(|_| "Failed to convert to [u8; 8]")?;
2761 Ok(json!(u64::from_be_bytes(arr)))
2762 }
2763
2764 ComputedExpr::ByteArray { bytes } => {
2765 Ok(Value::Array(bytes.iter().map(|b| json!(*b)).collect()))
2766 }
2767
2768 ComputedExpr::Closure { param, body } => {
2769 Ok(json!({
2772 "__closure": {
2773 "param": param,
2774 "body": serde_json::to_value(body).unwrap_or(Value::Null)
2775 }
2776 }))
2777 }
2778
2779 ComputedExpr::Unary { op, expr } => {
2780 let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
2781 self.apply_unary_op(op, &val)
2782 }
2783
2784 ComputedExpr::JsonToBytes { expr } => {
2785 let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
2786 let bytes = self.value_to_bytes(&val)?;
2788 Ok(Value::Array(bytes.iter().map(|b| json!(*b)).collect()))
2789 }
2790
2791 ComputedExpr::UnwrapOr { expr, default } => {
2792 let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
2793 if val.is_null() {
2794 Ok(default.clone())
2795 } else {
2796 Ok(val)
2797 }
2798 }
2799
2800 ComputedExpr::Binary { op, left, right } => {
2801 let l = self.evaluate_computed_expr_with_env(left, state, env)?;
2802 let r = self.evaluate_computed_expr_with_env(right, state, env)?;
2803 self.apply_binary_op(op, &l, &r)
2804 }
2805
2806 ComputedExpr::Cast { expr, to_type } => {
2807 let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
2808 self.apply_cast(&val, to_type)
2809 }
2810
2811 ComputedExpr::MethodCall { expr, method, args } => {
2812 let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
2813 if method == "map" && args.len() == 1 {
2815 if let ComputedExpr::Closure { param, body } = &args[0] {
2816 if val.is_null() {
2818 return Ok(Value::Null);
2819 }
2820 let mut closure_env = env.clone();
2822 closure_env.insert(param.clone(), val);
2823 return self.evaluate_computed_expr_with_env(body, state, &closure_env);
2824 }
2825 }
2826 let evaluated_args: Vec<Value> = args
2827 .iter()
2828 .map(|a| self.evaluate_computed_expr_with_env(a, state, env))
2829 .collect::<Result<Vec<_>>>()?;
2830 self.apply_method_call(&val, method, &evaluated_args)
2831 }
2832
2833 ComputedExpr::Literal { value } => Ok(value.clone()),
2834
2835 ComputedExpr::Paren { expr } => self.evaluate_computed_expr_with_env(expr, state, env),
2836 }
2837 }
2838
2839 fn value_to_bytes(&self, val: &Value) -> Result<Vec<u8>> {
2841 match val {
2842 Value::Array(arr) => arr
2843 .iter()
2844 .map(|v| {
2845 v.as_u64()
2846 .map(|n| n as u8)
2847 .ok_or_else(|| "Array element not a valid byte".into())
2848 })
2849 .collect(),
2850 Value::String(s) => {
2851 if s.starts_with("0x") || s.starts_with("0X") {
2853 hex::decode(&s[2..]).map_err(|e| format!("Invalid hex string: {}", e).into())
2854 } else {
2855 hex::decode(s).map_err(|e| format!("Invalid hex string: {}", e).into())
2856 }
2857 }
2858 _ => Err(format!("Cannot convert {:?} to bytes", val).into()),
2859 }
2860 }
2861
2862 fn apply_unary_op(&self, op: &crate::ast::UnaryOp, val: &Value) -> Result<Value> {
2864 use crate::ast::UnaryOp;
2865 match op {
2866 UnaryOp::Not => Ok(json!(!self.value_to_bool(val))),
2867 UnaryOp::ReverseBits => match val.as_u64() {
2868 Some(n) => Ok(json!(n.reverse_bits())),
2869 None => match val.as_i64() {
2870 Some(n) => Ok(json!((n as u64).reverse_bits())),
2871 None => Err("reverse_bits requires an integer".into()),
2872 },
2873 },
2874 }
2875 }
2876
2877 fn get_field_from_state(&self, state: &Value, path: &str) -> Result<Value> {
2879 let segments: Vec<&str> = path.split('.').collect();
2880 let mut current = state;
2881
2882 for segment in segments {
2883 match current.get(segment) {
2884 Some(v) => current = v,
2885 None => return Ok(Value::Null),
2886 }
2887 }
2888
2889 Ok(current.clone())
2890 }
2891
2892 fn apply_binary_op(&self, op: &BinaryOp, left: &Value, right: &Value) -> Result<Value> {
2894 match op {
2895 BinaryOp::Add => self.numeric_op(left, right, |a, b| a + b, |a, b| a + b),
2897 BinaryOp::Sub => self.numeric_op(left, right, |a, b| a - b, |a, b| a - b),
2898 BinaryOp::Mul => self.numeric_op(left, right, |a, b| a * b, |a, b| a * b),
2899 BinaryOp::Div => {
2900 if let Some(r) = right.as_i64() {
2902 if r == 0 {
2903 return Err("Division by zero".into());
2904 }
2905 }
2906 if let Some(r) = right.as_f64() {
2907 if r == 0.0 {
2908 return Err("Division by zero".into());
2909 }
2910 }
2911 self.numeric_op(left, right, |a, b| a / b, |a, b| a / b)
2912 }
2913 BinaryOp::Mod => {
2914 match (left.as_i64(), right.as_i64()) {
2916 (Some(a), Some(b)) if b != 0 => Ok(json!(a % b)),
2917 (None, _) | (_, None) => match (left.as_u64(), right.as_u64()) {
2918 (Some(a), Some(b)) if b != 0 => Ok(json!(a % b)),
2919 _ => Err("Modulo requires non-zero integer operands".into()),
2920 },
2921 _ => Err("Modulo by zero".into()),
2922 }
2923 }
2924
2925 BinaryOp::Gt => self.comparison_op(left, right, |a, b| a > b, |a, b| a > b),
2927 BinaryOp::Lt => self.comparison_op(left, right, |a, b| a < b, |a, b| a < b),
2928 BinaryOp::Gte => self.comparison_op(left, right, |a, b| a >= b, |a, b| a >= b),
2929 BinaryOp::Lte => self.comparison_op(left, right, |a, b| a <= b, |a, b| a <= b),
2930 BinaryOp::Eq => Ok(json!(left == right)),
2931 BinaryOp::Ne => Ok(json!(left != right)),
2932
2933 BinaryOp::And => {
2935 let l_bool = self.value_to_bool(left);
2936 let r_bool = self.value_to_bool(right);
2937 Ok(json!(l_bool && r_bool))
2938 }
2939 BinaryOp::Or => {
2940 let l_bool = self.value_to_bool(left);
2941 let r_bool = self.value_to_bool(right);
2942 Ok(json!(l_bool || r_bool))
2943 }
2944
2945 BinaryOp::Xor => match (left.as_u64(), right.as_u64()) {
2947 (Some(a), Some(b)) => Ok(json!(a ^ b)),
2948 _ => match (left.as_i64(), right.as_i64()) {
2949 (Some(a), Some(b)) => Ok(json!(a ^ b)),
2950 _ => Err("XOR requires integer operands".into()),
2951 },
2952 },
2953 BinaryOp::BitAnd => match (left.as_u64(), right.as_u64()) {
2954 (Some(a), Some(b)) => Ok(json!(a & b)),
2955 _ => match (left.as_i64(), right.as_i64()) {
2956 (Some(a), Some(b)) => Ok(json!(a & b)),
2957 _ => Err("BitAnd requires integer operands".into()),
2958 },
2959 },
2960 BinaryOp::BitOr => match (left.as_u64(), right.as_u64()) {
2961 (Some(a), Some(b)) => Ok(json!(a | b)),
2962 _ => match (left.as_i64(), right.as_i64()) {
2963 (Some(a), Some(b)) => Ok(json!(a | b)),
2964 _ => Err("BitOr requires integer operands".into()),
2965 },
2966 },
2967 BinaryOp::Shl => match (left.as_u64(), right.as_u64()) {
2968 (Some(a), Some(b)) => Ok(json!(a << b)),
2969 _ => match (left.as_i64(), right.as_i64()) {
2970 (Some(a), Some(b)) => Ok(json!(a << b)),
2971 _ => Err("Shl requires integer operands".into()),
2972 },
2973 },
2974 BinaryOp::Shr => match (left.as_u64(), right.as_u64()) {
2975 (Some(a), Some(b)) => Ok(json!(a >> b)),
2976 _ => match (left.as_i64(), right.as_i64()) {
2977 (Some(a), Some(b)) => Ok(json!(a >> b)),
2978 _ => Err("Shr requires integer operands".into()),
2979 },
2980 },
2981 }
2982 }
2983
2984 fn numeric_op<F1, F2>(
2986 &self,
2987 left: &Value,
2988 right: &Value,
2989 int_op: F1,
2990 float_op: F2,
2991 ) -> Result<Value>
2992 where
2993 F1: Fn(i64, i64) -> i64,
2994 F2: Fn(f64, f64) -> f64,
2995 {
2996 if let (Some(a), Some(b)) = (left.as_i64(), right.as_i64()) {
2998 return Ok(json!(int_op(a, b)));
2999 }
3000
3001 if let (Some(a), Some(b)) = (left.as_u64(), right.as_u64()) {
3003 return Ok(json!(int_op(a as i64, b as i64)));
3005 }
3006
3007 if let (Some(a), Some(b)) = (left.as_f64(), right.as_f64()) {
3009 return Ok(json!(float_op(a, b)));
3010 }
3011
3012 if left.is_null() || right.is_null() {
3014 return Ok(Value::Null);
3015 }
3016
3017 Err(format!(
3018 "Cannot perform numeric operation on {:?} and {:?}",
3019 left, right
3020 )
3021 .into())
3022 }
3023
3024 fn comparison_op<F1, F2>(
3026 &self,
3027 left: &Value,
3028 right: &Value,
3029 int_cmp: F1,
3030 float_cmp: F2,
3031 ) -> Result<Value>
3032 where
3033 F1: Fn(i64, i64) -> bool,
3034 F2: Fn(f64, f64) -> bool,
3035 {
3036 if let (Some(a), Some(b)) = (left.as_i64(), right.as_i64()) {
3038 return Ok(json!(int_cmp(a, b)));
3039 }
3040
3041 if let (Some(a), Some(b)) = (left.as_u64(), right.as_u64()) {
3043 return Ok(json!(int_cmp(a as i64, b as i64)));
3044 }
3045
3046 if let (Some(a), Some(b)) = (left.as_f64(), right.as_f64()) {
3048 return Ok(json!(float_cmp(a, b)));
3049 }
3050
3051 if left.is_null() || right.is_null() {
3053 return Ok(json!(false));
3054 }
3055
3056 Err(format!("Cannot compare {:?} and {:?}", left, right).into())
3057 }
3058
3059 fn value_to_bool(&self, value: &Value) -> bool {
3061 match value {
3062 Value::Null => false,
3063 Value::Bool(b) => *b,
3064 Value::Number(n) => {
3065 if let Some(i) = n.as_i64() {
3066 i != 0
3067 } else if let Some(f) = n.as_f64() {
3068 f != 0.0
3069 } else {
3070 true
3071 }
3072 }
3073 Value::String(s) => !s.is_empty(),
3074 Value::Array(arr) => !arr.is_empty(),
3075 Value::Object(obj) => !obj.is_empty(),
3076 }
3077 }
3078
3079 fn apply_cast(&self, value: &Value, to_type: &str) -> Result<Value> {
3081 match to_type {
3082 "i8" | "i16" | "i32" | "i64" | "isize" => {
3083 if let Some(n) = value.as_i64() {
3084 Ok(json!(n))
3085 } else if let Some(n) = value.as_u64() {
3086 Ok(json!(n as i64))
3087 } else if let Some(n) = value.as_f64() {
3088 Ok(json!(n as i64))
3089 } else if let Some(s) = value.as_str() {
3090 s.parse::<i64>()
3091 .map(|n| json!(n))
3092 .map_err(|e| format!("Cannot parse '{}' as integer: {}", s, e).into())
3093 } else {
3094 Err(format!("Cannot cast {:?} to {}", value, to_type).into())
3095 }
3096 }
3097 "u8" | "u16" | "u32" | "u64" | "usize" => {
3098 if let Some(n) = value.as_u64() {
3099 Ok(json!(n))
3100 } else if let Some(n) = value.as_i64() {
3101 Ok(json!(n as u64))
3102 } else if let Some(n) = value.as_f64() {
3103 Ok(json!(n as u64))
3104 } else if let Some(s) = value.as_str() {
3105 s.parse::<u64>().map(|n| json!(n)).map_err(|e| {
3106 format!("Cannot parse '{}' as unsigned integer: {}", s, e).into()
3107 })
3108 } else {
3109 Err(format!("Cannot cast {:?} to {}", value, to_type).into())
3110 }
3111 }
3112 "f32" | "f64" => {
3113 if let Some(n) = value.as_f64() {
3114 Ok(json!(n))
3115 } else if let Some(n) = value.as_i64() {
3116 Ok(json!(n as f64))
3117 } else if let Some(n) = value.as_u64() {
3118 Ok(json!(n as f64))
3119 } else if let Some(s) = value.as_str() {
3120 s.parse::<f64>()
3121 .map(|n| json!(n))
3122 .map_err(|e| format!("Cannot parse '{}' as float: {}", s, e).into())
3123 } else {
3124 Err(format!("Cannot cast {:?} to {}", value, to_type).into())
3125 }
3126 }
3127 "String" | "string" => Ok(json!(value.to_string())),
3128 "bool" => Ok(json!(self.value_to_bool(value))),
3129 _ => {
3130 Ok(value.clone())
3132 }
3133 }
3134 }
3135
3136 fn apply_method_call(&self, value: &Value, method: &str, args: &[Value]) -> Result<Value> {
3138 match method {
3139 "unwrap_or" => {
3140 if value.is_null() && !args.is_empty() {
3141 Ok(args[0].clone())
3142 } else {
3143 Ok(value.clone())
3144 }
3145 }
3146 "unwrap_or_default" => {
3147 if value.is_null() {
3148 Ok(json!(0))
3150 } else {
3151 Ok(value.clone())
3152 }
3153 }
3154 "is_some" => Ok(json!(!value.is_null())),
3155 "is_none" => Ok(json!(value.is_null())),
3156 "abs" => {
3157 if let Some(n) = value.as_i64() {
3158 Ok(json!(n.abs()))
3159 } else if let Some(n) = value.as_f64() {
3160 Ok(json!(n.abs()))
3161 } else {
3162 Err(format!("Cannot call abs() on {:?}", value).into())
3163 }
3164 }
3165 "len" => {
3166 if let Some(s) = value.as_str() {
3167 Ok(json!(s.len()))
3168 } else if let Some(arr) = value.as_array() {
3169 Ok(json!(arr.len()))
3170 } else if let Some(obj) = value.as_object() {
3171 Ok(json!(obj.len()))
3172 } else {
3173 Err(format!("Cannot call len() on {:?}", value).into())
3174 }
3175 }
3176 "to_string" => Ok(json!(value.to_string())),
3177 "min" => {
3178 if args.is_empty() {
3179 return Err("min() requires an argument".into());
3180 }
3181 let other = &args[0];
3182 if let (Some(a), Some(b)) = (value.as_i64(), other.as_i64()) {
3183 Ok(json!(a.min(b)))
3184 } else if let (Some(a), Some(b)) = (value.as_f64(), other.as_f64()) {
3185 Ok(json!(a.min(b)))
3186 } else {
3187 Err(format!("Cannot call min() on {:?} and {:?}", value, other).into())
3188 }
3189 }
3190 "max" => {
3191 if args.is_empty() {
3192 return Err("max() requires an argument".into());
3193 }
3194 let other = &args[0];
3195 if let (Some(a), Some(b)) = (value.as_i64(), other.as_i64()) {
3196 Ok(json!(a.max(b)))
3197 } else if let (Some(a), Some(b)) = (value.as_f64(), other.as_f64()) {
3198 Ok(json!(a.max(b)))
3199 } else {
3200 Err(format!("Cannot call max() on {:?} and {:?}", value, other).into())
3201 }
3202 }
3203 "saturating_add" => {
3204 if args.is_empty() {
3205 return Err("saturating_add() requires an argument".into());
3206 }
3207 let other = &args[0];
3208 if let (Some(a), Some(b)) = (value.as_i64(), other.as_i64()) {
3209 Ok(json!(a.saturating_add(b)))
3210 } else if let (Some(a), Some(b)) = (value.as_u64(), other.as_u64()) {
3211 Ok(json!(a.saturating_add(b)))
3212 } else {
3213 Err(format!(
3214 "Cannot call saturating_add() on {:?} and {:?}",
3215 value, other
3216 )
3217 .into())
3218 }
3219 }
3220 "saturating_sub" => {
3221 if args.is_empty() {
3222 return Err("saturating_sub() requires an argument".into());
3223 }
3224 let other = &args[0];
3225 if let (Some(a), Some(b)) = (value.as_i64(), other.as_i64()) {
3226 Ok(json!(a.saturating_sub(b)))
3227 } else if let (Some(a), Some(b)) = (value.as_u64(), other.as_u64()) {
3228 Ok(json!(a.saturating_sub(b)))
3229 } else {
3230 Err(format!(
3231 "Cannot call saturating_sub() on {:?} and {:?}",
3232 value, other
3233 )
3234 .into())
3235 }
3236 }
3237 _ => {
3238 tracing::warn!("Unknown method call: {}() on {:?}", method, value);
3240 Ok(value.clone())
3241 }
3242 }
3243 }
3244
3245 pub fn evaluate_computed_fields_from_ast(
3248 &self,
3249 state: &mut Value,
3250 computed_field_specs: &[ComputedFieldSpec],
3251 ) -> Result<Vec<String>> {
3252 let mut updated_paths = Vec::new();
3253
3254 for spec in computed_field_specs {
3255 match self.evaluate_computed_expr(&spec.expression, state) {
3256 Ok(result) => {
3257 self.set_field_in_state(state, &spec.target_path, result)?;
3259 updated_paths.push(spec.target_path.clone());
3260 }
3261 Err(e) => {
3262 tracing::warn!(
3264 "Failed to evaluate computed field '{}': {}",
3265 spec.target_path,
3266 e
3267 );
3268 }
3269 }
3270 }
3271
3272 Ok(updated_paths)
3273 }
3274
3275 fn set_field_in_state(&self, state: &mut Value, path: &str, value: Value) -> Result<()> {
3277 let segments: Vec<&str> = path.split('.').collect();
3278
3279 if segments.is_empty() {
3280 return Err("Empty path".into());
3281 }
3282
3283 let mut current = state;
3285 for (i, segment) in segments.iter().enumerate() {
3286 if i == segments.len() - 1 {
3287 if let Some(obj) = current.as_object_mut() {
3289 obj.insert(segment.to_string(), value);
3290 return Ok(());
3291 } else {
3292 return Err(format!("Cannot set field '{}' on non-object", segment).into());
3293 }
3294 } else {
3295 if !current.is_object() {
3297 *current = json!({});
3298 }
3299 let obj = current.as_object_mut().unwrap();
3300 current = obj.entry(segment.to_string()).or_insert_with(|| json!({}));
3301 }
3302 }
3303
3304 Ok(())
3305 }
3306
3307 pub fn create_evaluator_from_specs(
3310 specs: Vec<ComputedFieldSpec>,
3311 ) -> impl Fn(&mut Value) -> Result<()> + Send + Sync + 'static {
3312 move |state: &mut Value| {
3313 let vm = VmContext::new();
3316 vm.evaluate_computed_fields_from_ast(state, &specs)?;
3317 Ok(())
3318 }
3319 }
3320}
3321
3322impl Default for VmContext {
3323 fn default() -> Self {
3324 Self::new()
3325 }
3326}
3327
3328impl crate::resolvers::ReverseLookupUpdater for VmContext {
3330 fn update(&mut self, pda_address: String, seed_value: String) -> Vec<PendingAccountUpdate> {
3331 self.update_pda_reverse_lookup(0, "default_pda_lookup", pda_address, seed_value)
3333 .unwrap_or_else(|e| {
3334 tracing::error!("Failed to update PDA reverse lookup: {}", e);
3335 Vec::new()
3336 })
3337 }
3338
3339 fn flush_pending(&mut self, pda_address: &str) -> Vec<PendingAccountUpdate> {
3340 self.flush_pending_updates(0, pda_address)
3342 .unwrap_or_else(|e| {
3343 tracing::error!("Failed to flush pending updates: {}", e);
3344 Vec::new()
3345 })
3346 }
3347}
3348
3349#[cfg(test)]
3350mod tests {
3351 use super::*;
3352 use crate::ast::{BinaryOp, ComputedExpr, ComputedFieldSpec};
3353
3354 #[test]
3355 fn test_computed_field_preserves_integer_type() {
3356 let vm = VmContext::new();
3357
3358 let mut state = serde_json::json!({
3359 "trading": {
3360 "total_buy_volume": 20000000000_i64,
3361 "total_sell_volume": 17951316474_i64
3362 }
3363 });
3364
3365 let spec = ComputedFieldSpec {
3366 target_path: "trading.total_volume".to_string(),
3367 result_type: "Option<u64>".to_string(),
3368 expression: ComputedExpr::Binary {
3369 op: BinaryOp::Add,
3370 left: Box::new(ComputedExpr::UnwrapOr {
3371 expr: Box::new(ComputedExpr::FieldRef {
3372 path: "trading.total_buy_volume".to_string(),
3373 }),
3374 default: serde_json::json!(0),
3375 }),
3376 right: Box::new(ComputedExpr::UnwrapOr {
3377 expr: Box::new(ComputedExpr::FieldRef {
3378 path: "trading.total_sell_volume".to_string(),
3379 }),
3380 default: serde_json::json!(0),
3381 }),
3382 },
3383 };
3384
3385 vm.evaluate_computed_fields_from_ast(&mut state, &[spec])
3386 .unwrap();
3387
3388 let total_volume = state
3389 .get("trading")
3390 .and_then(|t| t.get("total_volume"))
3391 .expect("total_volume should exist");
3392
3393 let serialized = serde_json::to_string(total_volume).unwrap();
3394 assert!(
3395 !serialized.contains('.'),
3396 "Integer should not have decimal point: {}",
3397 serialized
3398 );
3399 assert_eq!(
3400 total_volume.as_i64(),
3401 Some(37951316474),
3402 "Value should be correct sum"
3403 );
3404 }
3405
3406 #[test]
3407 fn test_set_field_sum_preserves_integer_type() {
3408 let mut vm = VmContext::new();
3409 vm.registers[0] = serde_json::json!({});
3410 vm.registers[1] = serde_json::json!(20000000000_i64);
3411 vm.registers[2] = serde_json::json!(17951316474_i64);
3412
3413 vm.set_field_sum(0, "trading.total_buy_volume", 1).unwrap();
3414 vm.set_field_sum(0, "trading.total_sell_volume", 2).unwrap();
3415
3416 let state = &vm.registers[0];
3417 let buy_vol = state
3418 .get("trading")
3419 .and_then(|t| t.get("total_buy_volume"))
3420 .unwrap();
3421 let sell_vol = state
3422 .get("trading")
3423 .and_then(|t| t.get("total_sell_volume"))
3424 .unwrap();
3425
3426 let buy_serialized = serde_json::to_string(buy_vol).unwrap();
3427 let sell_serialized = serde_json::to_string(sell_vol).unwrap();
3428
3429 assert!(
3430 !buy_serialized.contains('.'),
3431 "Buy volume should not have decimal: {}",
3432 buy_serialized
3433 );
3434 assert!(
3435 !sell_serialized.contains('.'),
3436 "Sell volume should not have decimal: {}",
3437 sell_serialized
3438 );
3439 }
3440}