1use crate::ast::{
2 BinaryOp, ComparisonOp, ComputedExpr, ComputedFieldSpec, FieldPath, Transformation,
3};
4use crate::compiler::{MultiEntityBytecode, OpCode};
5use crate::Mutation;
6use dashmap::DashMap;
7use lru::LruCache;
8use serde_json::{json, Value};
9use std::collections::{HashMap, HashSet};
10use std::num::NonZeroUsize;
11
12#[cfg(feature = "otel")]
13use tracing::instrument;
14
15#[derive(Debug, Clone, Default)]
18pub struct UpdateContext {
19 pub slot: Option<u64>,
21 pub signature: Option<String>,
23 pub timestamp: Option<i64>,
26 pub metadata: HashMap<String, Value>,
28}
29
30impl UpdateContext {
31 pub fn new(slot: u64, signature: String) -> Self {
33 Self {
34 slot: Some(slot),
35 signature: Some(signature),
36 timestamp: None,
37 metadata: HashMap::new(),
38 }
39 }
40
41 pub fn with_timestamp(slot: u64, signature: String, timestamp: i64) -> Self {
43 Self {
44 slot: Some(slot),
45 signature: Some(signature),
46 timestamp: Some(timestamp),
47 metadata: HashMap::new(),
48 }
49 }
50
51 pub fn timestamp(&self) -> i64 {
53 self.timestamp.unwrap_or_else(|| {
54 std::time::SystemTime::now()
55 .duration_since(std::time::UNIX_EPOCH)
56 .unwrap()
57 .as_secs() as i64
58 })
59 }
60
61 pub fn empty() -> Self {
63 Self::default()
64 }
65
66 pub fn with_metadata(mut self, key: String, value: Value) -> Self {
68 self.metadata.insert(key, value);
69 self
70 }
71
72 pub fn get_metadata(&self, key: &str) -> Option<&Value> {
74 self.metadata.get(key)
75 }
76
77 pub fn to_value(&self) -> Value {
79 let mut obj = serde_json::Map::new();
80 if let Some(slot) = self.slot {
81 obj.insert("slot".to_string(), json!(slot));
82 }
83 if let Some(ref sig) = self.signature {
84 obj.insert("signature".to_string(), json!(sig));
85 }
86 obj.insert("timestamp".to_string(), json!(self.timestamp()));
88 for (key, value) in &self.metadata {
89 obj.insert(key.clone(), value.clone());
90 }
91 Value::Object(obj)
92 }
93}
94
95pub type Register = usize;
96pub type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>;
97
98pub type RegisterValue = Value;
99
100pub trait ComputedFieldsEvaluator {
103 fn evaluate(&self, state: &mut Value) -> Result<()>;
104}
105
106const MAX_PENDING_UPDATES_TOTAL: usize = 10_000;
108const MAX_PENDING_UPDATES_PER_PDA: usize = 10;
109const PENDING_UPDATE_TTL_SECONDS: i64 = 300; fn estimate_json_size(value: &Value) -> usize {
113 match value {
114 Value::Null => 4,
115 Value::Bool(_) => 5,
116 Value::Number(_) => 8,
117 Value::String(s) => s.len() + 2,
118 Value::Array(arr) => 2 + arr.iter().map(|v| estimate_json_size(v) + 1).sum::<usize>(),
119 Value::Object(obj) => {
120 2 + obj
121 .iter()
122 .map(|(k, v)| k.len() + 3 + estimate_json_size(v) + 1)
123 .sum::<usize>()
124 }
125 }
126}
127
128#[derive(Debug, Clone)]
129pub struct CompiledPath {
130 pub segments: std::sync::Arc<[String]>,
131}
132
133impl CompiledPath {
134 pub fn new(path: &str) -> Self {
135 let segments: Vec<String> = path.split('.').map(|s| s.to_string()).collect();
136 CompiledPath {
137 segments: segments.into(),
138 }
139 }
140
141 fn segments(&self) -> &[String] {
142 &self.segments
143 }
144}
145
146pub struct VmContext {
147 registers: Vec<RegisterValue>,
148 states: HashMap<u32, StateTable>,
149 pub instructions_executed: u64,
150 pub cache_hits: u64,
151 path_cache: HashMap<String, CompiledPath>,
152 pub pda_cache_hits: u64,
153 pub pda_cache_misses: u64,
154 pub pending_queue_size: u64,
155 current_context: Option<UpdateContext>,
157}
158
159#[derive(Debug)]
160pub struct LookupIndex {
161 index: DashMap<Value, Value>,
162}
163
164impl Default for LookupIndex {
165 fn default() -> Self {
166 Self::new()
167 }
168}
169
170impl LookupIndex {
171 pub fn new() -> Self {
172 LookupIndex {
173 index: DashMap::new(),
174 }
175 }
176
177 pub fn lookup(&self, lookup_value: &Value) -> Option<Value> {
178 self.index.get(lookup_value).map(|v| v.clone())
179 }
180
181 pub fn insert(&self, lookup_value: Value, primary_key: Value) {
182 self.index.insert(lookup_value, primary_key);
183 }
184
185 pub fn len(&self) -> usize {
186 self.index.len()
187 }
188
189 pub fn is_empty(&self) -> bool {
190 self.index.is_empty()
191 }
192}
193
194#[derive(Debug)]
195pub struct TemporalIndex {
196 index: DashMap<Value, Vec<(Value, i64)>>,
197}
198
199impl Default for TemporalIndex {
200 fn default() -> Self {
201 Self::new()
202 }
203}
204
205impl TemporalIndex {
206 pub fn new() -> Self {
207 TemporalIndex {
208 index: DashMap::new(),
209 }
210 }
211
212 pub fn lookup(&self, lookup_value: &Value, timestamp: i64) -> Option<Value> {
213 if let Some(entries) = self.index.get(lookup_value) {
214 let entries_vec = entries.value();
215 for i in (0..entries_vec.len()).rev() {
216 if entries_vec[i].1 <= timestamp {
217 return Some(entries_vec[i].0.clone());
218 }
219 }
220 }
221 None
222 }
223
224 pub fn lookup_latest(&self, lookup_value: &Value) -> Option<Value> {
225 if let Some(entries) = self.index.get(lookup_value) {
226 let entries_vec = entries.value();
227 if let Some(last) = entries_vec.last() {
228 return Some(last.0.clone());
229 }
230 }
231 None
232 }
233
234 pub fn insert(&self, lookup_value: Value, primary_key: Value, timestamp: i64) {
235 let lookup_key = lookup_value.clone();
236 self.index
237 .entry(lookup_value)
238 .or_default()
239 .push((primary_key, timestamp));
240
241 if let Some(mut entries) = self.index.get_mut(&lookup_key) {
242 entries.sort_by_key(|(_, ts)| *ts);
243 }
244 }
245}
246
247#[derive(Debug)]
248pub struct PdaReverseLookup {
249 index: LruCache<String, String>,
251}
252
253impl PdaReverseLookup {
254 pub fn new(capacity: usize) -> Self {
255 PdaReverseLookup {
256 index: LruCache::new(NonZeroUsize::new(capacity).unwrap()),
257 }
258 }
259
260 pub fn lookup(&mut self, pda_address: &str) -> Option<String> {
261 self.index.get(pda_address).cloned()
262 }
263
264 pub fn insert(&mut self, pda_address: String, seed_value: String) -> Option<String> {
265 let evicted = if self.index.len() >= self.index.cap().get() {
267 self.index.peek_lru().map(|(k, _)| k.clone())
269 } else {
270 None
271 };
272
273 self.index.put(pda_address, seed_value);
274 evicted
275 }
276}
277
278#[derive(Debug, Clone)]
279pub struct PendingAccountUpdate {
280 pub account_type: String,
281 pub pda_address: String,
282 pub account_data: Value,
283 pub slot: u64,
284 pub signature: String,
285 pub queued_at: i64,
286}
287
288#[derive(Debug, Clone)]
289pub struct PendingQueueStats {
290 pub total_updates: usize,
291 pub unique_pdas: usize,
292 pub oldest_age_seconds: i64,
293 pub largest_pda_queue_size: usize,
294 pub estimated_memory_bytes: usize,
295}
296
297#[derive(Debug)]
298pub struct StateTable {
299 pub data: DashMap<Value, Value>,
300 pub lookup_indexes: HashMap<String, LookupIndex>,
301 pub temporal_indexes: HashMap<String, TemporalIndex>,
302 pub pda_reverse_lookups: HashMap<String, PdaReverseLookup>,
303 pub pending_updates: DashMap<String, Vec<PendingAccountUpdate>>,
304}
305
306impl VmContext {
307 pub fn new() -> Self {
308 let mut vm = VmContext {
309 registers: vec![Value::Null; 256],
310 states: HashMap::new(),
311 instructions_executed: 0,
312 cache_hits: 0,
313 path_cache: HashMap::new(),
314 pda_cache_hits: 0,
315 pda_cache_misses: 0,
316 pending_queue_size: 0,
317 current_context: None,
318 };
319 vm.states.insert(
320 0,
321 StateTable {
322 data: DashMap::new(),
323 lookup_indexes: HashMap::new(),
324 temporal_indexes: HashMap::new(),
325 pda_reverse_lookups: HashMap::new(),
326 pending_updates: DashMap::new(),
327 },
328 );
329 vm
330 }
331
332 pub fn get_state_table_mut(&mut self, state_id: u32) -> Option<&mut StateTable> {
335 self.states.get_mut(&state_id)
336 }
337
338 pub fn registers_mut(&mut self) -> &mut Vec<RegisterValue> {
340 &mut self.registers
341 }
342
343 pub fn path_cache(&self) -> &HashMap<String, CompiledPath> {
345 &self.path_cache
346 }
347
348 pub fn current_context(&self) -> Option<&UpdateContext> {
350 self.current_context.as_ref()
351 }
352
353 pub fn update_state_from_register(
356 &mut self,
357 state_id: u32,
358 key: Value,
359 register: Register,
360 ) -> Result<()> {
361 let state = self.states.get(&state_id).ok_or("State table not found")?;
362 let value = self.registers[register].clone();
363 state.data.insert(key, value);
364 Ok(())
365 }
366
367 fn reset_registers(&mut self) {
368 for reg in &mut self.registers {
369 *reg = Value::Null;
370 }
371 }
372
373 pub fn extract_partial_state(
375 &self,
376 state_reg: Register,
377 dirty_fields: &HashSet<String>,
378 ) -> Result<Value> {
379 let full_state = &self.registers[state_reg];
380
381 if dirty_fields.is_empty() {
382 return Ok(json!({}));
383 }
384
385 let mut partial = serde_json::Map::new();
386
387 for path in dirty_fields {
388 let segments: Vec<&str> = path.split('.').collect();
389
390 let mut current = full_state;
391 let mut found = true;
392
393 for segment in &segments {
394 match current.get(segment) {
395 Some(v) => current = v,
396 None => {
397 found = false;
398 break;
399 }
400 }
401 }
402
403 if !found {
404 continue;
405 }
406
407 let mut target = &mut partial;
408 for (i, segment) in segments.iter().enumerate() {
409 if i == segments.len() - 1 {
410 target.insert(segment.to_string(), current.clone());
411 } else {
412 target
413 .entry(segment.to_string())
414 .or_insert_with(|| json!({}));
415 target = target
416 .get_mut(*segment)
417 .and_then(|v| v.as_object_mut())
418 .ok_or("Failed to build nested structure")?;
419 }
420 }
421 }
422
423 Ok(Value::Object(partial))
424 }
425
426 fn get_compiled_path(&mut self, path: &str) -> CompiledPath {
427 if let Some(compiled) = self.path_cache.get(path) {
428 self.cache_hits += 1;
429 return compiled.clone();
430 }
431 let compiled = CompiledPath::new(path);
432 self.path_cache.insert(path.to_string(), compiled.clone());
433 compiled
434 }
435
436 #[cfg_attr(feature = "otel", instrument(
438 name = "vm.process_event",
439 skip(self, bytecode, event_value, context),
440 fields(
441 event_type = %event_type,
442 slot = context.as_ref().and_then(|c| c.slot),
443 )
444 ))]
445 pub fn process_event_with_context(
446 &mut self,
447 bytecode: &MultiEntityBytecode,
448 mut event_value: Value,
449 event_type: &str,
450 context: Option<&UpdateContext>,
451 ) -> Result<Vec<Mutation>> {
452 self.current_context = context.cloned();
454
455 if let Some(ctx) = context {
457 if let Some(obj) = event_value.as_object_mut() {
458 obj.insert("__update_context".to_string(), ctx.to_value());
459 }
460 }
461
462 let mut all_mutations = Vec::new();
463
464 if let Some(entity_names) = bytecode.event_routing.get(event_type) {
465 tracing::debug!(
466 "🔀 Event type '{}' routes to {} entity/entities",
467 event_type,
468 entity_names.len()
469 );
470
471 for entity_name in entity_names {
472 if let Some(entity_bytecode) = bytecode.entities.get(entity_name) {
473 if let Some(handler) = entity_bytecode.handlers.get(event_type) {
474 tracing::debug!(
475 " ▶️ Executing handler for entity '{}', event '{}'",
476 entity_name,
477 event_type
478 );
479 tracing::debug!(" Handler has {} opcodes", handler.len());
480
481 let mutations = self.execute_handler(
482 handler,
483 event_value.clone(),
484 event_type,
485 entity_bytecode.state_id,
486 entity_bytecode.computed_fields_evaluator.as_ref(),
487 )?;
488
489 tracing::debug!(" Handler produced {} mutation(s)", mutations.len());
490 all_mutations.extend(mutations);
491 } else {
492 tracing::debug!(
493 " ⊘ No handler found for entity '{}', event '{}'",
494 entity_name,
495 event_type
496 );
497 }
498 } else {
499 tracing::debug!(" ⊘ Entity '{}' not found in bytecode", entity_name);
500 }
501 }
502 } else {
503 tracing::debug!("⊘ No event routing found for event type '{}'", event_type);
504 }
505
506 Ok(all_mutations)
507 }
508
509 pub fn process_event(
511 &mut self,
512 bytecode: &MultiEntityBytecode,
513 event_value: Value,
514 event_type: &str,
515 ) -> Result<Vec<Mutation>> {
516 self.process_event_with_context(bytecode, event_value, event_type, None)
517 }
518
519 pub fn process_any(
520 &mut self,
521 bytecode: &MultiEntityBytecode,
522 any: prost_types::Any,
523 ) -> Result<Vec<Mutation>> {
524 let (event_value, event_type) = bytecode.proto_router.decode(any)?;
525 self.process_event(bytecode, event_value, &event_type)
526 }
527
528 #[cfg_attr(feature = "otel", instrument(
529 name = "vm.execute_handler",
530 skip(self, handler, event_value, entity_evaluator),
531 level = "debug",
532 fields(
533 event_type = %event_type,
534 handler_opcodes = handler.len(),
535 )
536 ))]
537 #[allow(clippy::type_complexity)]
538 fn execute_handler(
539 &mut self,
540 handler: &[OpCode],
541 event_value: Value,
542 event_type: &str,
543 override_state_id: u32,
544 entity_evaluator: Option<&Box<dyn Fn(&mut Value) -> Result<()> + Send + Sync>>,
545 ) -> Result<Vec<Mutation>> {
546 self.reset_registers();
547
548 tracing::trace!(
549 "Executing handler: event_type={}, handler_opcodes={}, event_value={:?}",
550 event_type,
551 handler.len(),
552 event_value
553 );
554
555 let mut pc: usize = 0;
556 let mut output = Vec::new();
557 let mut dirty_fields: HashSet<String> = HashSet::new();
558
559 while pc < handler.len() {
560 tracing::debug!(
561 "Executing opcode {}/{}: {:?}",
562 pc + 1,
563 handler.len(),
564 &handler[pc]
565 );
566 match &handler[pc] {
567 OpCode::LoadEventField {
568 path,
569 dest,
570 default,
571 } => {
572 let value = self.load_field(&event_value, path, default.as_ref())?;
573 tracing::trace!(
574 "LoadEventField: path={:?}, dest={}, value={:?}",
575 path.segments,
576 dest,
577 value
578 );
579 if value.is_null() && path.segments.len() >= 2 && path.segments[0] == "accounts"
581 {
582 tracing::warn!(
583 "⚠️ LoadEventField returned NULL for accounts path: {:?} -> dest={}",
584 path.segments,
585 dest
586 );
587 }
588 self.registers[*dest] = value;
589 pc += 1;
590 }
591 OpCode::LoadConstant { value, dest } => {
592 self.registers[*dest] = value.clone();
593 pc += 1;
594 }
595 OpCode::CopyRegister { source, dest } => {
596 let value = self.registers[*source].clone();
597 tracing::trace!(
598 "CopyRegister: source={}, dest={}, value={:?}",
599 source,
600 dest,
601 if value.is_null() {
602 "NULL".to_string()
603 } else {
604 format!("{}", value)
605 }
606 );
607 self.registers[*dest] = value;
608 pc += 1;
609 }
610 OpCode::CopyRegisterIfNull { source, dest } => {
611 if self.registers[*dest].is_null() {
612 self.registers[*dest] = self.registers[*source].clone();
613 tracing::trace!(
614 "CopyRegisterIfNull: copied from reg {} to reg {} (value={:?})",
615 source,
616 dest,
617 self.registers[*source]
618 );
619 } else {
620 tracing::trace!(
621 "CopyRegisterIfNull: dest reg {} not null, skipping copy",
622 dest
623 );
624 }
625 pc += 1;
626 }
627 OpCode::GetEventType { dest } => {
628 self.registers[*dest] = json!(event_type);
629 pc += 1;
630 }
631 OpCode::CreateObject { dest } => {
632 self.registers[*dest] = json!({});
633 pc += 1;
634 }
635 OpCode::SetField {
636 object,
637 path,
638 value,
639 } => {
640 let val = self.registers[*value].clone();
641 tracing::trace!("SetField: path={}, value={:?}", path, val);
642
643 self.set_field_auto_vivify(*object, path, *value)?;
644 dirty_fields.insert(path.to_string());
645 pc += 1;
646 }
647 OpCode::SetFields { object, fields } => {
648 for (path, value_reg) in fields {
649 self.set_field_auto_vivify(*object, path, *value_reg)?;
650 dirty_fields.insert(path.to_string());
651 }
652 pc += 1;
653 }
654 OpCode::GetField { object, path, dest } => {
655 let value = self.get_field(*object, path)?;
656 self.registers[*dest] = value;
657 pc += 1;
658 }
659 OpCode::ReadOrInitState {
660 state_id: _,
661 key,
662 default,
663 dest,
664 } => {
665 let actual_state_id = override_state_id;
666 self.states
667 .entry(actual_state_id)
668 .or_insert_with(|| StateTable {
669 data: DashMap::new(),
670 lookup_indexes: HashMap::new(),
671 temporal_indexes: HashMap::new(),
672 pda_reverse_lookups: HashMap::new(),
673 pending_updates: DashMap::new(),
674 });
675 let state = self
676 .states
677 .get(&actual_state_id)
678 .ok_or("State table not found")?;
679 let key_value = &self.registers[*key];
680 if key_value.is_null() {
681 if event_type.ends_with("State") && !event_type.ends_with("IxState") {
684 tracing::warn!(
685 "ReadOrInitState: key register {} is NULL for account state, event_type={}",
686 key,
687 event_type
688 );
689 } else {
690 tracing::debug!(
691 "ReadOrInitState: key register {} is NULL (expected for PDA registration hook), event_type={}",
692 key,
693 event_type
694 );
695 }
696 }
697 let value = state
698 .data
699 .get(key_value)
700 .map(|v| v.clone())
701 .unwrap_or_else(|| default.clone());
702
703 self.registers[*dest] = value;
704 pc += 1;
705 }
706 OpCode::UpdateState {
707 state_id: _,
708 key,
709 value,
710 } => {
711 let actual_state_id = override_state_id;
712 let state = self
713 .states
714 .get(&actual_state_id)
715 .ok_or("State table not found")?;
716 let key_value = self.registers[*key].clone();
717 let value_data = self.registers[*value].clone();
718
719 state.data.insert(key_value, value_data);
720 pc += 1;
721 }
722 OpCode::AppendToArray {
723 object,
724 path,
725 value,
726 } => {
727 tracing::trace!(
728 "AppendToArray: path={}, value={:?}",
729 path,
730 self.registers[*value]
731 );
732 self.append_to_array(*object, path, *value)?;
733 dirty_fields.insert(path.to_string());
734 pc += 1;
735 }
736 OpCode::GetCurrentTimestamp { dest } => {
737 let timestamp = std::time::SystemTime::now()
738 .duration_since(std::time::UNIX_EPOCH)
739 .unwrap()
740 .as_secs() as i64;
741 self.registers[*dest] = json!(timestamp);
742 pc += 1;
743 }
744 OpCode::CreateEvent { dest, event_value } => {
745 tracing::trace!(
746 "CreateEvent: event_value_reg={}, event_value={:?}",
747 event_value,
748 self.registers[*event_value]
749 );
750 let timestamp = std::time::SystemTime::now()
751 .duration_since(std::time::UNIX_EPOCH)
752 .unwrap()
753 .as_secs() as i64;
754
755 let mut event_data = self.registers[*event_value].clone();
757 if let Some(obj) = event_data.as_object_mut() {
758 obj.remove("__update_context");
759 }
760
761 let mut event = serde_json::Map::new();
763 event.insert("timestamp".to_string(), json!(timestamp));
764 event.insert("data".to_string(), event_data);
765
766 if let Some(ref ctx) = self.current_context {
768 if let Some(slot) = ctx.slot {
769 event.insert("slot".to_string(), json!(slot));
770 }
771 if let Some(ref signature) = ctx.signature {
772 event.insert("signature".to_string(), json!(signature));
773 }
774 }
775
776 self.registers[*dest] = Value::Object(event);
777 pc += 1;
778 }
779 OpCode::CreateCapture {
780 dest,
781 capture_value,
782 } => {
783 let timestamp = std::time::SystemTime::now()
784 .duration_since(std::time::UNIX_EPOCH)
785 .unwrap()
786 .as_secs() as i64;
787
788 let capture_data = self.registers[*capture_value].clone();
790
791 let account_address = event_value
793 .get("__account_address")
794 .and_then(|v| v.as_str())
795 .unwrap_or("")
796 .to_string();
797
798 let mut capture = serde_json::Map::new();
800 capture.insert("timestamp".to_string(), json!(timestamp));
801 capture.insert("account_address".to_string(), json!(account_address));
802 capture.insert("data".to_string(), capture_data);
803
804 if let Some(ref ctx) = self.current_context {
806 if let Some(slot) = ctx.slot {
807 capture.insert("slot".to_string(), json!(slot));
808 }
809 if let Some(ref signature) = ctx.signature {
810 capture.insert("signature".to_string(), json!(signature));
811 }
812 }
813
814 self.registers[*dest] = Value::Object(capture);
815 pc += 1;
816 }
817 OpCode::Transform {
818 source,
819 dest,
820 transformation,
821 } => {
822 if source == dest {
823 let result = self.transform_in_place(*source, transformation);
824 if let Err(ref e) = result {
825 tracing::error!(
826 "Transform {:?} failed at pc={}: {} (source_reg={}, source_value={:?})",
827 transformation, pc, e, source, &self.registers[*source]
828 );
829 }
830 result?;
831 } else {
832 let source_value = &self.registers[*source];
833 let result = self.apply_transformation(source_value, transformation);
834 if let Err(ref e) = result {
835 tracing::error!(
836 "Transform {:?} failed at pc={}: {} (source_reg={}, source_value={:?})",
837 transformation, pc, e, source, source_value
838 );
839 }
840 let value = result?;
841 self.registers[*dest] = value;
842 }
843 pc += 1;
844 }
845 OpCode::EmitMutation {
846 entity_name,
847 key,
848 state,
849 } => {
850 let primary_key = self.registers[*key].clone();
851
852 if primary_key.is_null() || dirty_fields.is_empty() {
853 tracing::warn!(
855 " ⊘ [VM] Skipping mutation for entity '{}': primary_key={}, dirty_fields.len()={}",
856 entity_name,
857 if primary_key.is_null() { "NULL" } else { "present" },
858 dirty_fields.len()
859 );
860 if dirty_fields.is_empty() {
861 tracing::warn!(" Reason: No fields were modified by this handler");
862 }
863 if primary_key.is_null() {
864 tracing::warn!(" Reason: Primary key could not be resolved (check __resolved_primary_key or key_resolution)");
865 tracing::warn!(" Debug: reg[15]={:?}, reg[17]={:?}, reg[19]={:?}, reg[20]={:?}",
867 self.registers.get(15).map(|v| if v.is_null() { "NULL".to_string() } else { format!("{:?}", v) }),
868 self.registers.get(17).map(|v| if v.is_null() { "NULL".to_string() } else { format!("{:?}", v) }),
869 self.registers.get(19).map(|v| if v.is_null() { "NULL".to_string() } else { format!("{:?}", v) }),
870 self.registers.get(20).map(|v| if v.is_null() { "NULL".to_string() } else { format!("{:?}", v) })
871 );
872 }
873 } else {
874 let patch = self.extract_partial_state(*state, &dirty_fields)?;
875 tracing::debug!(
876 " Patch structure: {}",
877 serde_json::to_string_pretty(&patch).unwrap_or_default()
878 );
879 let mutation = Mutation {
880 export: entity_name.clone(),
881 key: primary_key,
882 patch,
883 };
884 output.push(mutation);
885 }
886 pc += 1;
887 }
888 OpCode::SetFieldIfNull {
889 object,
890 path,
891 value,
892 } => {
893 let val = self.registers[*value].clone();
894 let was_set = self.set_field_if_null(*object, path, *value)?;
895 tracing::trace!(
896 "SetFieldIfNull: path={}, value={:?}, was_set={}",
897 path,
898 val,
899 was_set
900 );
901 if was_set {
902 dirty_fields.insert(path.to_string());
903 }
904 pc += 1;
905 }
906 OpCode::SetFieldMax {
907 object,
908 path,
909 value,
910 } => {
911 let was_updated = self.set_field_max(*object, path, *value)?;
912 if was_updated {
913 dirty_fields.insert(path.to_string());
914 }
915 pc += 1;
916 }
917 OpCode::UpdateTemporalIndex {
918 state_id: _,
919 index_name,
920 lookup_value,
921 primary_key,
922 timestamp,
923 } => {
924 let actual_state_id = override_state_id;
925 let state = self
926 .states
927 .get_mut(&actual_state_id)
928 .ok_or("State table not found")?;
929 let index = state
930 .temporal_indexes
931 .entry(index_name.clone())
932 .or_insert_with(TemporalIndex::new);
933
934 let lookup_val = self.registers[*lookup_value].clone();
935 let pk_val = self.registers[*primary_key].clone();
936 let ts_val = if let Some(val) = self.registers[*timestamp].as_i64() {
937 val
938 } else if let Some(val) = self.registers[*timestamp].as_u64() {
939 val as i64
940 } else {
941 return Err(format!(
942 "Timestamp must be a number (i64 or u64), got: {:?}",
943 self.registers[*timestamp]
944 )
945 .into());
946 };
947
948 index.insert(lookup_val, pk_val, ts_val);
949 pc += 1;
950 }
951 OpCode::LookupTemporalIndex {
952 state_id: _,
953 index_name,
954 lookup_value,
955 timestamp,
956 dest,
957 } => {
958 let actual_state_id = override_state_id;
959 let state = self
960 .states
961 .get(&actual_state_id)
962 .ok_or("State table not found")?;
963 let lookup_val = &self.registers[*lookup_value];
964
965 let result = if self.registers[*timestamp].is_null() {
966 if let Some(index) = state.temporal_indexes.get(index_name) {
967 index.lookup_latest(lookup_val).unwrap_or(Value::Null)
968 } else {
969 Value::Null
970 }
971 } else {
972 let ts_val = if let Some(val) = self.registers[*timestamp].as_i64() {
973 val
974 } else if let Some(val) = self.registers[*timestamp].as_u64() {
975 val as i64
976 } else {
977 return Err(format!(
978 "Timestamp must be a number (i64 or u64), got: {:?}",
979 self.registers[*timestamp]
980 )
981 .into());
982 };
983
984 if let Some(index) = state.temporal_indexes.get(index_name) {
985 index.lookup(lookup_val, ts_val).unwrap_or(Value::Null)
986 } else {
987 Value::Null
988 }
989 };
990
991 self.registers[*dest] = result;
992 pc += 1;
993 }
994 OpCode::UpdateLookupIndex {
995 state_id: _,
996 index_name,
997 lookup_value,
998 primary_key,
999 } => {
1000 let actual_state_id = override_state_id;
1001 let state = self
1002 .states
1003 .get_mut(&actual_state_id)
1004 .ok_or("State table not found")?;
1005 let index = state
1006 .lookup_indexes
1007 .entry(index_name.clone())
1008 .or_insert_with(LookupIndex::new);
1009
1010 let lookup_val = self.registers[*lookup_value].clone();
1011 let pk_val = self.registers[*primary_key].clone();
1012
1013 tracing::debug!(
1014 "📝 UpdateLookupIndex: {} -> {} (index: {})",
1015 lookup_val,
1016 pk_val,
1017 index_name
1018 );
1019
1020 index.insert(lookup_val, pk_val);
1021 pc += 1;
1022 }
1023 OpCode::LookupIndex {
1024 state_id: _,
1025 index_name,
1026 lookup_value,
1027 dest,
1028 } => {
1029 let actual_state_id = override_state_id;
1030 let lookup_val = self.registers[*lookup_value].clone();
1031
1032 let result = {
1033 let state = self
1034 .states
1035 .get(&actual_state_id)
1036 .ok_or("State table not found")?;
1037
1038 if let Some(index) = state.lookup_indexes.get(index_name) {
1039 let found = index.lookup(&lookup_val).unwrap_or(Value::Null);
1040 tracing::debug!(
1041 "🔍 LookupIndex: {} -> {} (index: {}, entries: {})",
1042 lookup_val,
1043 found,
1044 index_name,
1045 index.len()
1046 );
1047 found
1048 } else {
1049 Value::Null
1050 }
1051 };
1052
1053 let final_result = if result.is_null() {
1056 tracing::debug!(
1057 "🔍 LookupIndex failed for {}, attempting PDA reverse lookup fallback",
1058 lookup_val
1059 );
1060 if let Some(pda_str) = lookup_val.as_str() {
1061 let state = self
1062 .states
1063 .get_mut(&actual_state_id)
1064 .ok_or("State table not found")?;
1065
1066 if let Some(pda_lookup) =
1067 state.pda_reverse_lookups.get_mut("default_pda_lookup")
1068 {
1069 if let Some(resolved) = pda_lookup.lookup(pda_str) {
1070 tracing::info!(
1071 "🔍 LookupIndex (PDA fallback): {} -> {} (via default_pda_lookup)",
1072 &pda_str[..pda_str.len().min(8)], resolved
1073 );
1074 Value::String(resolved)
1075 } else {
1076 tracing::debug!(
1077 "🔍 LookupIndex (PDA fallback): {} -> NOT FOUND in PDA reverse lookup",
1078 &pda_str[..pda_str.len().min(8)]
1079 );
1080 Value::Null
1081 }
1082 } else {
1083 tracing::debug!(
1084 "🔍 LookupIndex: {} -> NOT FOUND (index: {} does not exist, no PDA lookup table)",
1085 lookup_val, index_name
1086 );
1087 Value::Null
1088 }
1089 } else {
1090 tracing::debug!(
1091 "🔍 LookupIndex: {} -> NOT FOUND (index: {} does not exist)",
1092 lookup_val,
1093 index_name
1094 );
1095 Value::Null
1096 }
1097 } else {
1098 result
1099 };
1100
1101 self.registers[*dest] = final_result;
1102 pc += 1;
1103 }
1104 OpCode::SetFieldSum {
1105 object,
1106 path,
1107 value,
1108 } => {
1109 let was_updated = self.set_field_sum(*object, path, *value)?;
1110 if was_updated {
1111 dirty_fields.insert(path.to_string());
1112 }
1113 pc += 1;
1114 }
1115 OpCode::SetFieldIncrement { object, path } => {
1116 let was_updated = self.set_field_increment(*object, path)?;
1117 if was_updated {
1118 dirty_fields.insert(path.to_string());
1119 }
1120 pc += 1;
1121 }
1122 OpCode::SetFieldMin {
1123 object,
1124 path,
1125 value,
1126 } => {
1127 let was_updated = self.set_field_min(*object, path, *value)?;
1128 if was_updated {
1129 dirty_fields.insert(path.to_string());
1130 }
1131 pc += 1;
1132 }
1133 OpCode::AddToUniqueSet {
1134 state_id: _,
1135 set_name,
1136 value,
1137 count_object,
1138 count_path,
1139 } => {
1140 let value_to_add = self.registers[*value].clone();
1141
1142 let set_field_path = format!("__unique_set:{}", set_name);
1145
1146 let mut set: HashSet<Value> =
1148 if let Ok(existing) = self.get_field(*count_object, &set_field_path) {
1149 if !existing.is_null() {
1150 serde_json::from_value(existing).unwrap_or_default()
1151 } else {
1152 HashSet::new()
1153 }
1154 } else {
1155 HashSet::new()
1156 };
1157
1158 let was_new = set.insert(value_to_add);
1160
1161 let set_as_vec: Vec<Value> = set.iter().cloned().collect();
1163 self.registers[100] = serde_json::to_value(set_as_vec)?;
1164 self.set_field_auto_vivify(*count_object, &set_field_path, 100)?;
1165
1166 if was_new {
1168 self.registers[100] = Value::Number(serde_json::Number::from(set.len()));
1169 self.set_field_auto_vivify(*count_object, count_path, 100)?;
1170 dirty_fields.insert(count_path.to_string());
1171 }
1172
1173 pc += 1;
1174 }
1175 OpCode::ConditionalSetField {
1176 object,
1177 path,
1178 value,
1179 condition_field,
1180 condition_op,
1181 condition_value,
1182 } => {
1183 let field_value = self.load_field(&event_value, condition_field, None)?;
1185
1186 let condition_met =
1188 self.evaluate_comparison(&field_value, condition_op, condition_value)?;
1189
1190 if condition_met {
1191 let val = self.registers[*value].clone();
1192 tracing::trace!(
1193 "ConditionalSetField: condition met, setting {}={:?}",
1194 path,
1195 val
1196 );
1197 self.set_field_auto_vivify(*object, path, *value)?;
1198 dirty_fields.insert(path.to_string());
1199 } else {
1200 tracing::trace!(
1201 "ConditionalSetField: condition not met, skipping {}",
1202 path
1203 );
1204 }
1205 pc += 1;
1206 }
1207 OpCode::ConditionalIncrement {
1208 object,
1209 path,
1210 condition_field,
1211 condition_op,
1212 condition_value,
1213 } => {
1214 let field_value = self.load_field(&event_value, condition_field, None)?;
1216
1217 let condition_met =
1219 self.evaluate_comparison(&field_value, condition_op, condition_value)?;
1220
1221 if condition_met {
1222 tracing::trace!(
1223 "ConditionalIncrement: condition met, incrementing {}",
1224 path
1225 );
1226 let was_updated = self.set_field_increment(*object, path)?;
1227 if was_updated {
1228 dirty_fields.insert(path.to_string());
1229 }
1230 } else {
1231 tracing::trace!(
1232 "ConditionalIncrement: condition not met, skipping {}",
1233 path
1234 );
1235 }
1236 pc += 1;
1237 }
1238 OpCode::EvaluateComputedFields {
1239 state,
1240 computed_paths,
1241 } => {
1242 if let Some(evaluator) = entity_evaluator {
1244 let state_value = &mut self.registers[*state];
1245 match evaluator(state_value) {
1246 Ok(()) => {
1247 for path in computed_paths {
1249 dirty_fields.insert(path.clone());
1250 }
1251 }
1252 Err(_e) => {
1253 }
1255 }
1256 }
1257 pc += 1;
1258 }
1259 OpCode::UpdatePdaReverseLookup {
1260 state_id: _,
1261 lookup_name,
1262 pda_address,
1263 primary_key,
1264 } => {
1265 let actual_state_id = override_state_id;
1266 let state = self
1267 .states
1268 .get_mut(&actual_state_id)
1269 .ok_or("State table not found")?;
1270
1271 let pda_val = self.registers[*pda_address].clone();
1272 let pk_val = self.registers[*primary_key].clone();
1273
1274 if let (Some(pda_str), Some(pk_str)) = (pda_val.as_str(), pk_val.as_str()) {
1276 let pda_lookup = state
1278 .pda_reverse_lookups
1279 .entry(lookup_name.clone())
1280 .or_insert_with(|| PdaReverseLookup::new(10000));
1281
1282 pda_lookup.insert(pda_str.to_string(), pk_str.to_string());
1283
1284 tracing::debug!(
1285 "📝 UpdatePdaReverseLookup: {} -> {} (lookup: {})",
1286 pda_str,
1287 pk_str,
1288 lookup_name
1289 );
1290 } else if !pk_val.is_null() {
1291 if let Some(pk_num) = pk_val.as_u64() {
1293 if let Some(pda_str) = pda_val.as_str() {
1294 let pda_lookup = state
1295 .pda_reverse_lookups
1296 .entry(lookup_name.clone())
1297 .or_insert_with(|| PdaReverseLookup::new(10000));
1298
1299 pda_lookup.insert(pda_str.to_string(), pk_num.to_string());
1300
1301 tracing::debug!(
1302 "📝 UpdatePdaReverseLookup: {} -> {} (lookup: {}, pk was u64)",
1303 pda_str,
1304 pk_num,
1305 lookup_name
1306 );
1307 }
1308 }
1309 }
1310
1311 pc += 1;
1312 }
1313 }
1314
1315 self.instructions_executed += 1;
1316 }
1317
1318 tracing::info!(
1319 "🏁 Handler completed: produced {} mutation(s)",
1320 output.len()
1321 );
1322 Ok(output)
1323 }
1324
1325 fn load_field(
1326 &self,
1327 event_value: &Value,
1328 path: &FieldPath,
1329 default: Option<&Value>,
1330 ) -> Result<Value> {
1331 if path.segments.iter().any(|s| s.is_empty()) {
1333 tracing::warn!(
1334 "load_field: path contains empty segment! path={:?}",
1335 path.segments
1336 );
1337 }
1338
1339 if path.segments.is_empty() {
1340 if let Some(obj) = event_value.as_object() {
1344 let filtered: serde_json::Map<String, Value> = obj
1345 .iter()
1346 .filter(|(k, _)| !k.starts_with("__"))
1347 .map(|(k, v)| (k.clone(), v.clone()))
1348 .collect();
1349 return Ok(Value::Object(filtered));
1350 }
1351 return Ok(event_value.clone());
1352 }
1353
1354 let mut current = event_value;
1355 for (i, segment) in path.segments.iter().enumerate() {
1356 current = match current.get(segment) {
1357 Some(v) => v,
1358 None => {
1359 if path.segments.len() == 2 && path.segments[0] == "accounts" {
1361 tracing::warn!(
1362 "load_field: could not find segment '{}' at index {} in path {:?}",
1363 segment,
1364 i,
1365 path.segments,
1366 );
1367 tracing::warn!(
1368 " event has top-level keys: {:?}",
1369 event_value
1370 .as_object()
1371 .map(|o| o.keys().collect::<Vec<_>>())
1372 );
1373 if let Some(accounts) = event_value.get("accounts") {
1375 tracing::warn!(
1376 " 'accounts' exists with keys: {:?}",
1377 accounts.as_object().map(|o| o.keys().collect::<Vec<_>>())
1378 );
1379 } else {
1380 tracing::warn!(" 'accounts' key does NOT exist in event_value");
1381 }
1382 }
1383 return Ok(default.cloned().unwrap_or(Value::Null));
1384 }
1385 };
1386 }
1387
1388 Ok(current.clone())
1389 }
1390
1391 fn set_field_auto_vivify(
1392 &mut self,
1393 object_reg: Register,
1394 path: &str,
1395 value_reg: Register,
1396 ) -> Result<()> {
1397 let compiled = self.get_compiled_path(path);
1398 let segments = compiled.segments();
1399 let value = self.registers[value_reg].clone();
1400
1401 if !self.registers[object_reg].is_object() {
1402 self.registers[object_reg] = json!({});
1403 }
1404
1405 let obj = self.registers[object_reg]
1406 .as_object_mut()
1407 .ok_or("Not an object")?;
1408
1409 let mut current = obj;
1410 for (i, segment) in segments.iter().enumerate() {
1411 if i == segments.len() - 1 {
1412 current.insert(segment.to_string(), value.clone());
1413 } else {
1414 current
1415 .entry(segment.to_string())
1416 .or_insert_with(|| json!({}));
1417 current = current
1418 .get_mut(segment)
1419 .and_then(|v| v.as_object_mut())
1420 .ok_or("Path collision: expected object")?;
1421 }
1422 }
1423
1424 Ok(())
1425 }
1426
1427 fn set_field_if_null(
1428 &mut self,
1429 object_reg: Register,
1430 path: &str,
1431 value_reg: Register,
1432 ) -> Result<bool> {
1433 let compiled = self.get_compiled_path(path);
1434 let segments = compiled.segments();
1435 let value = self.registers[value_reg].clone();
1436
1437 if !self.registers[object_reg].is_object() {
1438 self.registers[object_reg] = json!({});
1439 }
1440
1441 let obj = self.registers[object_reg]
1442 .as_object_mut()
1443 .ok_or("Not an object")?;
1444
1445 let mut current = obj;
1446 let mut was_set = false;
1447 for (i, segment) in segments.iter().enumerate() {
1448 if i == segments.len() - 1 {
1449 if !current.contains_key(segment) || current.get(segment).unwrap().is_null() {
1450 current.insert(segment.to_string(), value.clone());
1451 was_set = true;
1452 }
1453 } else {
1454 current
1455 .entry(segment.to_string())
1456 .or_insert_with(|| json!({}));
1457 current = current
1458 .get_mut(segment)
1459 .and_then(|v| v.as_object_mut())
1460 .ok_or("Path collision: expected object")?;
1461 }
1462 }
1463
1464 Ok(was_set)
1465 }
1466
1467 fn set_field_max(
1468 &mut self,
1469 object_reg: Register,
1470 path: &str,
1471 value_reg: Register,
1472 ) -> Result<bool> {
1473 let compiled = self.get_compiled_path(path);
1474 let segments = compiled.segments();
1475 let new_value = self.registers[value_reg].clone();
1476
1477 if !self.registers[object_reg].is_object() {
1478 self.registers[object_reg] = json!({});
1479 }
1480
1481 let obj = self.registers[object_reg]
1482 .as_object_mut()
1483 .ok_or("Not an object")?;
1484
1485 let mut current = obj;
1486 let mut was_updated = false;
1487 for (i, segment) in segments.iter().enumerate() {
1488 if i == segments.len() - 1 {
1489 let should_update = if let Some(current_value) = current.get(segment) {
1490 if current_value.is_null() {
1491 true
1492 } else {
1493 match (current_value.as_i64(), new_value.as_i64()) {
1494 (Some(current_val), Some(new_val)) => new_val > current_val,
1495 (Some(current_val), None) if new_value.as_u64().is_some() => {
1496 new_value.as_u64().unwrap() as i64 > current_val
1497 }
1498 (None, Some(new_val)) if current_value.as_u64().is_some() => {
1499 new_val > current_value.as_u64().unwrap() as i64
1500 }
1501 (None, None) => match (current_value.as_u64(), new_value.as_u64()) {
1502 (Some(current_val), Some(new_val)) => new_val > current_val,
1503 _ => match (current_value.as_f64(), new_value.as_f64()) {
1504 (Some(current_val), Some(new_val)) => new_val > current_val,
1505 _ => false,
1506 },
1507 },
1508 _ => false,
1509 }
1510 }
1511 } else {
1512 true
1513 };
1514
1515 if should_update {
1516 current.insert(segment.to_string(), new_value.clone());
1517 was_updated = true;
1518 }
1519 } else {
1520 current
1521 .entry(segment.to_string())
1522 .or_insert_with(|| json!({}));
1523 current = current
1524 .get_mut(segment)
1525 .and_then(|v| v.as_object_mut())
1526 .ok_or("Path collision: expected object")?;
1527 }
1528 }
1529
1530 Ok(was_updated)
1531 }
1532
1533 fn set_field_sum(
1534 &mut self,
1535 object_reg: Register,
1536 path: &str,
1537 value_reg: Register,
1538 ) -> Result<bool> {
1539 let compiled = self.get_compiled_path(path);
1540 let segments = compiled.segments();
1541 let new_value = self.registers[value_reg].clone();
1542
1543 if !self.registers[object_reg].is_object() {
1544 self.registers[object_reg] = json!({});
1545 }
1546
1547 let obj = self.registers[object_reg]
1548 .as_object_mut()
1549 .ok_or("Not an object")?;
1550
1551 let mut current = obj;
1552 for (i, segment) in segments.iter().enumerate() {
1553 if i == segments.len() - 1 {
1554 let current_val = current
1556 .get(segment)
1557 .and_then(|v| {
1558 if v.is_null() {
1559 None
1560 } else {
1561 v.as_i64().or_else(|| v.as_u64().map(|n| n as i64))
1562 }
1563 })
1564 .unwrap_or(0);
1565
1566 let new_val_num = new_value
1568 .as_i64()
1569 .or_else(|| new_value.as_u64().map(|n| n as i64))
1570 .ok_or("Sum requires numeric value")?;
1571
1572 let sum = current_val + new_val_num;
1573 current.insert(segment.to_string(), json!(sum));
1574 return Ok(true);
1575 } else {
1576 current
1577 .entry(segment.to_string())
1578 .or_insert_with(|| json!({}));
1579 current = current
1580 .get_mut(segment)
1581 .and_then(|v| v.as_object_mut())
1582 .ok_or("Path collision: expected object")?;
1583 }
1584 }
1585
1586 Ok(false)
1587 }
1588
1589 fn set_field_increment(&mut self, object_reg: Register, path: &str) -> Result<bool> {
1590 let compiled = self.get_compiled_path(path);
1591 let segments = compiled.segments();
1592
1593 if !self.registers[object_reg].is_object() {
1594 self.registers[object_reg] = json!({});
1595 }
1596
1597 let obj = self.registers[object_reg]
1598 .as_object_mut()
1599 .ok_or("Not an object")?;
1600
1601 let mut current = obj;
1602 for (i, segment) in segments.iter().enumerate() {
1603 if i == segments.len() - 1 {
1604 let current_val = current
1606 .get(segment)
1607 .and_then(|v| {
1608 if v.is_null() {
1609 None
1610 } else {
1611 v.as_i64().or_else(|| v.as_u64().map(|n| n as i64))
1612 }
1613 })
1614 .unwrap_or(0);
1615
1616 let incremented = current_val + 1;
1617 current.insert(segment.to_string(), json!(incremented));
1618 return Ok(true);
1619 } else {
1620 current
1621 .entry(segment.to_string())
1622 .or_insert_with(|| json!({}));
1623 current = current
1624 .get_mut(segment)
1625 .and_then(|v| v.as_object_mut())
1626 .ok_or("Path collision: expected object")?;
1627 }
1628 }
1629
1630 Ok(false)
1631 }
1632
1633 fn set_field_min(
1634 &mut self,
1635 object_reg: Register,
1636 path: &str,
1637 value_reg: Register,
1638 ) -> Result<bool> {
1639 let compiled = self.get_compiled_path(path);
1640 let segments = compiled.segments();
1641 let new_value = self.registers[value_reg].clone();
1642
1643 if !self.registers[object_reg].is_object() {
1644 self.registers[object_reg] = json!({});
1645 }
1646
1647 let obj = self.registers[object_reg]
1648 .as_object_mut()
1649 .ok_or("Not an object")?;
1650
1651 let mut current = obj;
1652 let mut was_updated = false;
1653 for (i, segment) in segments.iter().enumerate() {
1654 if i == segments.len() - 1 {
1655 let should_update = if let Some(current_value) = current.get(segment) {
1656 if current_value.is_null() {
1657 true
1658 } else {
1659 match (current_value.as_i64(), new_value.as_i64()) {
1660 (Some(current_val), Some(new_val)) => new_val < current_val,
1661 (Some(current_val), None) if new_value.as_u64().is_some() => {
1662 (new_value.as_u64().unwrap() as i64) < current_val
1663 }
1664 (None, Some(new_val)) if current_value.as_u64().is_some() => {
1665 new_val < current_value.as_u64().unwrap() as i64
1666 }
1667 (None, None) => match (current_value.as_u64(), new_value.as_u64()) {
1668 (Some(current_val), Some(new_val)) => new_val < current_val,
1669 _ => match (current_value.as_f64(), new_value.as_f64()) {
1670 (Some(current_val), Some(new_val)) => new_val < current_val,
1671 _ => false,
1672 },
1673 },
1674 _ => false,
1675 }
1676 }
1677 } else {
1678 true
1679 };
1680
1681 if should_update {
1682 current.insert(segment.to_string(), new_value.clone());
1683 was_updated = true;
1684 }
1685 } else {
1686 current
1687 .entry(segment.to_string())
1688 .or_insert_with(|| json!({}));
1689 current = current
1690 .get_mut(segment)
1691 .and_then(|v| v.as_object_mut())
1692 .ok_or("Path collision: expected object")?;
1693 }
1694 }
1695
1696 Ok(was_updated)
1697 }
1698
1699 fn get_field(&mut self, object_reg: Register, path: &str) -> Result<Value> {
1700 let compiled = self.get_compiled_path(path);
1701 let segments = compiled.segments();
1702 let mut current = &self.registers[object_reg];
1703
1704 for segment in segments {
1705 current = current
1706 .get(segment)
1707 .ok_or_else(|| format!("Field not found: {}", segment))?;
1708 }
1709
1710 Ok(current.clone())
1711 }
1712
1713 fn append_to_array(
1714 &mut self,
1715 object_reg: Register,
1716 path: &str,
1717 value_reg: Register,
1718 ) -> Result<()> {
1719 let compiled = self.get_compiled_path(path);
1720 let segments = compiled.segments();
1721 let value = self.registers[value_reg].clone();
1722
1723 if !self.registers[object_reg].is_object() {
1724 self.registers[object_reg] = json!({});
1725 }
1726
1727 let obj = self.registers[object_reg]
1728 .as_object_mut()
1729 .ok_or("Not an object")?;
1730
1731 let mut current = obj;
1732 for (i, segment) in segments.iter().enumerate() {
1733 if i == segments.len() - 1 {
1734 current
1735 .entry(segment.to_string())
1736 .or_insert_with(|| json!([]));
1737 let arr = current
1738 .get_mut(segment)
1739 .and_then(|v| v.as_array_mut())
1740 .ok_or("Path is not an array")?;
1741 arr.push(value.clone());
1742 } else {
1743 current
1744 .entry(segment.to_string())
1745 .or_insert_with(|| json!({}));
1746 current = current
1747 .get_mut(segment)
1748 .and_then(|v| v.as_object_mut())
1749 .ok_or("Path collision: expected object")?;
1750 }
1751 }
1752
1753 Ok(())
1754 }
1755
1756 fn transform_in_place(&mut self, reg: Register, transformation: &Transformation) -> Result<()> {
1757 let value = &self.registers[reg];
1758 let transformed = self.apply_transformation(value, transformation)?;
1759 self.registers[reg] = transformed;
1760 Ok(())
1761 }
1762
1763 fn apply_transformation(
1764 &self,
1765 value: &Value,
1766 transformation: &Transformation,
1767 ) -> Result<Value> {
1768 match transformation {
1769 Transformation::HexEncode => {
1770 if let Some(arr) = value.as_array() {
1771 let bytes: Vec<u8> = arr
1772 .iter()
1773 .filter_map(|v| v.as_u64().map(|n| n as u8))
1774 .collect();
1775 let hex = hex::encode(&bytes);
1776 Ok(json!(hex))
1777 } else {
1778 Err("HexEncode requires an array of numbers".into())
1779 }
1780 }
1781 Transformation::HexDecode => {
1782 if let Some(s) = value.as_str() {
1783 let s = s.strip_prefix("0x").unwrap_or(s);
1784 let bytes = hex::decode(s).map_err(|e| format!("Hex decode error: {}", e))?;
1785 Ok(json!(bytes))
1786 } else {
1787 Err("HexDecode requires a string".into())
1788 }
1789 }
1790 Transformation::Base58Encode => {
1791 if let Some(arr) = value.as_array() {
1792 let bytes: Vec<u8> = arr
1793 .iter()
1794 .filter_map(|v| v.as_u64().map(|n| n as u8))
1795 .collect();
1796 let encoded = bs58::encode(&bytes).into_string();
1797 Ok(json!(encoded))
1798 } else if value.is_string() {
1799 tracing::debug!(
1801 "Base58Encode: value is already a string, passing through: {:?}",
1802 value
1803 );
1804 Ok(value.clone())
1805 } else {
1806 tracing::error!(
1807 "Base58Encode failed: value type is {:?}, value: {:?}",
1808 if value.is_null() {
1809 "null"
1810 } else if value.is_boolean() {
1811 "boolean"
1812 } else if value.is_number() {
1813 "number"
1814 } else if value.is_object() {
1815 "object"
1816 } else {
1817 "unknown"
1818 },
1819 value
1820 );
1821 Err("Base58Encode requires an array of numbers".into())
1822 }
1823 }
1824 Transformation::Base58Decode => {
1825 if let Some(s) = value.as_str() {
1826 let bytes = bs58::decode(s)
1827 .into_vec()
1828 .map_err(|e| format!("Base58 decode error: {}", e))?;
1829 Ok(json!(bytes))
1830 } else {
1831 Err("Base58Decode requires a string".into())
1832 }
1833 }
1834 Transformation::ToString => Ok(json!(value.to_string())),
1835 Transformation::ToNumber => {
1836 if let Some(s) = value.as_str() {
1837 let n = s
1838 .parse::<i64>()
1839 .map_err(|e| format!("Parse error: {}", e))?;
1840 Ok(json!(n))
1841 } else {
1842 Ok(value.clone())
1843 }
1844 }
1845 }
1846 }
1847
1848 fn evaluate_comparison(
1849 &self,
1850 field_value: &Value,
1851 op: &ComparisonOp,
1852 condition_value: &Value,
1853 ) -> Result<bool> {
1854 use ComparisonOp::*;
1855
1856 match op {
1857 Equal => Ok(field_value == condition_value),
1858 NotEqual => Ok(field_value != condition_value),
1859 GreaterThan => {
1860 match (field_value.as_i64(), condition_value.as_i64()) {
1862 (Some(a), Some(b)) => Ok(a > b),
1863 _ => match (field_value.as_u64(), condition_value.as_u64()) {
1864 (Some(a), Some(b)) => Ok(a > b),
1865 _ => match (field_value.as_f64(), condition_value.as_f64()) {
1866 (Some(a), Some(b)) => Ok(a > b),
1867 _ => Err("Cannot compare non-numeric values with GreaterThan".into()),
1868 },
1869 },
1870 }
1871 }
1872 GreaterThanOrEqual => match (field_value.as_i64(), condition_value.as_i64()) {
1873 (Some(a), Some(b)) => Ok(a >= b),
1874 _ => match (field_value.as_u64(), condition_value.as_u64()) {
1875 (Some(a), Some(b)) => Ok(a >= b),
1876 _ => match (field_value.as_f64(), condition_value.as_f64()) {
1877 (Some(a), Some(b)) => Ok(a >= b),
1878 _ => {
1879 Err("Cannot compare non-numeric values with GreaterThanOrEqual".into())
1880 }
1881 },
1882 },
1883 },
1884 LessThan => match (field_value.as_i64(), condition_value.as_i64()) {
1885 (Some(a), Some(b)) => Ok(a < b),
1886 _ => match (field_value.as_u64(), condition_value.as_u64()) {
1887 (Some(a), Some(b)) => Ok(a < b),
1888 _ => match (field_value.as_f64(), condition_value.as_f64()) {
1889 (Some(a), Some(b)) => Ok(a < b),
1890 _ => Err("Cannot compare non-numeric values with LessThan".into()),
1891 },
1892 },
1893 },
1894 LessThanOrEqual => match (field_value.as_i64(), condition_value.as_i64()) {
1895 (Some(a), Some(b)) => Ok(a <= b),
1896 _ => match (field_value.as_u64(), condition_value.as_u64()) {
1897 (Some(a), Some(b)) => Ok(a <= b),
1898 _ => match (field_value.as_f64(), condition_value.as_f64()) {
1899 (Some(a), Some(b)) => Ok(a <= b),
1900 _ => Err("Cannot compare non-numeric values with LessThanOrEqual".into()),
1901 },
1902 },
1903 },
1904 }
1905 }
1906
1907 #[cfg_attr(feature = "otel", instrument(
1921 name = "vm.update_pda_lookup",
1922 skip(self),
1923 fields(
1924 pda = %pda_address,
1925 seed = %seed_value,
1926 )
1927 ))]
1928 pub fn update_pda_reverse_lookup(
1929 &mut self,
1930 state_id: u32,
1931 lookup_name: &str,
1932 pda_address: String,
1933 seed_value: String,
1934 ) -> Result<Vec<PendingAccountUpdate>> {
1935 let state = self
1936 .states
1937 .get_mut(&state_id)
1938 .ok_or("State table not found")?;
1939
1940 let lookup = state
1941 .pda_reverse_lookups
1942 .entry(lookup_name.to_string())
1943 .or_insert_with(|| PdaReverseLookup::new(10000));
1944
1945 tracing::info!(
1946 "📝 Registering PDA reverse lookup: {} -> {} (lookup: {})",
1947 pda_address,
1948 seed_value,
1949 lookup_name
1950 );
1951
1952 let evicted_pda = lookup.insert(pda_address.clone(), seed_value);
1954
1955 if let Some(ref evicted) = evicted_pda {
1957 if let Some((_, evicted_updates)) = state.pending_updates.remove(evicted) {
1958 let count = evicted_updates.len();
1959 self.pending_queue_size = self.pending_queue_size.saturating_sub(count as u64);
1960 tracing::info!(
1961 "Cleaned up {} pending updates for evicted PDA {} from LRU cache",
1962 count,
1963 evicted
1964 );
1965 }
1966 }
1967
1968 self.flush_pending_updates(state_id, &pda_address)
1970 }
1971
1972 pub fn cleanup_expired_pending_updates(&mut self, state_id: u32) -> usize {
1977 let state = match self.states.get_mut(&state_id) {
1978 Some(s) => s,
1979 None => return 0,
1980 };
1981
1982 let now = std::time::SystemTime::now()
1983 .duration_since(std::time::UNIX_EPOCH)
1984 .unwrap()
1985 .as_secs() as i64;
1986
1987 let mut removed_count = 0;
1988
1989 state.pending_updates.retain(|_pda_address, updates| {
1991 let original_len = updates.len();
1992
1993 updates.retain(|update| {
1994 let age = now - update.queued_at;
1995 age <= PENDING_UPDATE_TTL_SECONDS
1996 });
1997
1998 removed_count += original_len - updates.len();
1999
2000 !updates.is_empty()
2002 });
2003
2004 self.pending_queue_size = self.pending_queue_size.saturating_sub(removed_count as u64);
2006
2007 if removed_count > 0 {
2008 tracing::info!(
2009 "Cleaned up {} expired pending updates (TTL: {}s)",
2010 removed_count,
2011 PENDING_UPDATE_TTL_SECONDS
2012 );
2013 }
2014
2015 removed_count
2016 }
2017
2018 #[cfg_attr(feature = "otel", instrument(
2053 name = "vm.queue_account_update",
2054 skip(self, account_data),
2055 fields(
2056 pda = %pda_address,
2057 account_type = %account_type,
2058 slot = slot,
2059 )
2060 ))]
2061 pub fn queue_account_update(
2062 &mut self,
2063 state_id: u32,
2064 pda_address: String,
2065 account_type: String,
2066 account_data: Value,
2067 slot: u64,
2068 signature: String,
2069 ) -> Result<()> {
2070 if self.pending_queue_size >= MAX_PENDING_UPDATES_TOTAL as u64 {
2072 tracing::warn!(
2073 "Pending queue size limit reached ({}), cleaning up expired updates",
2074 MAX_PENDING_UPDATES_TOTAL
2075 );
2076 let removed = self.cleanup_expired_pending_updates(state_id);
2077
2078 if self.pending_queue_size >= MAX_PENDING_UPDATES_TOTAL as u64 {
2080 tracing::warn!(
2081 "Still at limit after cleanup (removed {}), will drop oldest update",
2082 removed
2083 );
2084 self.drop_oldest_pending_update(state_id)?;
2085 }
2086 }
2087
2088 let state = self
2089 .states
2090 .get_mut(&state_id)
2091 .ok_or("State table not found")?;
2092
2093 tracing::debug!(
2094 "📋 Queued account update for PDA {} (type: {}, slot: {})",
2095 pda_address,
2096 account_type,
2097 slot
2098 );
2099 let pending = PendingAccountUpdate {
2100 account_type,
2101 pda_address: pda_address.clone(),
2102 account_data,
2103 slot,
2104 signature,
2105 queued_at: std::time::SystemTime::now()
2106 .duration_since(std::time::UNIX_EPOCH)
2107 .unwrap()
2108 .as_secs() as i64,
2109 };
2110
2111 {
2113 let mut updates = state
2114 .pending_updates
2115 .entry(pda_address.clone())
2116 .or_insert_with(Vec::new);
2117
2118 let original_len = updates.len();
2121 updates.retain(|existing| {
2122 existing.slot > slot
2124 });
2125 let removed_by_dedup = original_len - updates.len();
2126
2127 if removed_by_dedup > 0 {
2128 self.pending_queue_size = self
2129 .pending_queue_size
2130 .saturating_sub(removed_by_dedup as u64);
2131 tracing::debug!(
2132 "Deduplicated {} older update(s) for PDA {} (new slot: {})",
2133 removed_by_dedup,
2134 pda_address,
2135 slot
2136 );
2137 }
2138
2139 if updates.len() >= MAX_PENDING_UPDATES_PER_PDA {
2141 tracing::warn!(
2142 "Per-PDA limit reached for {} ({} updates), dropping oldest",
2143 pda_address,
2144 updates.len()
2145 );
2146 updates.remove(0);
2147 self.pending_queue_size = self.pending_queue_size.saturating_sub(1);
2148 }
2149
2150 updates.push(pending);
2151 }
2152
2153 Ok(())
2154 }
2155
2156 pub fn get_pending_queue_stats(&self, state_id: u32) -> Option<PendingQueueStats> {
2158 let state = self.states.get(&state_id)?;
2159
2160 let now = std::time::SystemTime::now()
2161 .duration_since(std::time::UNIX_EPOCH)
2162 .unwrap()
2163 .as_secs() as i64;
2164
2165 let mut total_updates = 0;
2166 let mut oldest_timestamp = now;
2167 let mut largest_pda_queue = 0;
2168 let mut estimated_memory = 0;
2169
2170 for entry in state.pending_updates.iter() {
2171 let (_, updates) = entry.pair();
2172 total_updates += updates.len();
2173 largest_pda_queue = largest_pda_queue.max(updates.len());
2174
2175 for update in updates.iter() {
2176 oldest_timestamp = oldest_timestamp.min(update.queued_at);
2177 estimated_memory += update.account_type.len() +
2179 update.pda_address.len() +
2180 update.signature.len() +
2181 16 + estimate_json_size(&update.account_data);
2183 }
2184 }
2185
2186 Some(PendingQueueStats {
2187 total_updates,
2188 unique_pdas: state.pending_updates.len(),
2189 oldest_age_seconds: now - oldest_timestamp,
2190 largest_pda_queue_size: largest_pda_queue,
2191 estimated_memory_bytes: estimated_memory,
2192 })
2193 }
2194
2195 fn drop_oldest_pending_update(&mut self, state_id: u32) -> Result<()> {
2197 let state = self
2198 .states
2199 .get_mut(&state_id)
2200 .ok_or("State table not found")?;
2201
2202 let mut oldest_pda: Option<String> = None;
2203 let mut oldest_timestamp = i64::MAX;
2204
2205 for entry in state.pending_updates.iter() {
2207 let (pda, updates) = entry.pair();
2208 if let Some(update) = updates.first() {
2209 if update.queued_at < oldest_timestamp {
2210 oldest_timestamp = update.queued_at;
2211 oldest_pda = Some(pda.clone());
2212 }
2213 }
2214 }
2215
2216 if let Some(pda) = oldest_pda {
2218 if let Some(mut updates) = state.pending_updates.get_mut(&pda) {
2219 if !updates.is_empty() {
2220 updates.remove(0);
2221 self.pending_queue_size = self.pending_queue_size.saturating_sub(1);
2222
2223 if updates.is_empty() {
2225 drop(updates);
2226 state.pending_updates.remove(&pda);
2227 }
2228 }
2229 }
2230 }
2231
2232 Ok(())
2233 }
2234
2235 fn flush_pending_updates(
2240 &mut self,
2241 state_id: u32,
2242 pda_address: &str,
2243 ) -> Result<Vec<PendingAccountUpdate>> {
2244 let state = self
2245 .states
2246 .get_mut(&state_id)
2247 .ok_or("State table not found")?;
2248
2249 if let Some((_, pending_updates)) = state.pending_updates.remove(pda_address) {
2250 let count = pending_updates.len();
2251 self.pending_queue_size = self.pending_queue_size.saturating_sub(count as u64);
2252 tracing::info!(
2253 "🔄 Flushed {} pending account update(s) for PDA {}",
2254 count,
2255 pda_address
2256 );
2257 Ok(pending_updates)
2258 } else {
2259 Ok(Vec::new())
2260 }
2261 }
2262
2263 pub fn try_pda_reverse_lookup(
2265 &mut self,
2266 state_id: u32,
2267 lookup_name: &str,
2268 pda_address: &str,
2269 ) -> Option<String> {
2270 let state = self.states.get_mut(&state_id)?;
2271
2272 if let Some(lookup) = state.pda_reverse_lookups.get_mut(lookup_name) {
2273 if let Some(value) = lookup.lookup(pda_address) {
2274 self.pda_cache_hits += 1;
2275 tracing::debug!(
2276 "✓ PDA reverse lookup cache hit: {} -> {}",
2277 pda_address,
2278 value
2279 );
2280 return Some(value);
2281 }
2282 }
2283
2284 self.pda_cache_misses += 1;
2285 tracing::debug!("✗ PDA reverse lookup cache miss: {}", pda_address);
2286 None
2287 }
2288
2289 pub fn evaluate_computed_expr(&self, expr: &ComputedExpr, state: &Value) -> Result<Value> {
2296 self.evaluate_computed_expr_with_env(expr, state, &std::collections::HashMap::new())
2297 }
2298
2299 fn evaluate_computed_expr_with_env(
2301 &self,
2302 expr: &ComputedExpr,
2303 state: &Value,
2304 env: &std::collections::HashMap<String, Value>,
2305 ) -> Result<Value> {
2306 match expr {
2307 ComputedExpr::FieldRef { path } => self.get_field_from_state(state, path),
2308
2309 ComputedExpr::Var { name } => env
2310 .get(name)
2311 .cloned()
2312 .ok_or_else(|| format!("Undefined variable: {}", name).into()),
2313
2314 ComputedExpr::Let { name, value, body } => {
2315 let val = self.evaluate_computed_expr_with_env(value, state, env)?;
2316 let mut new_env = env.clone();
2317 new_env.insert(name.clone(), val);
2318 self.evaluate_computed_expr_with_env(body, state, &new_env)
2319 }
2320
2321 ComputedExpr::If {
2322 condition,
2323 then_branch,
2324 else_branch,
2325 } => {
2326 let cond_val = self.evaluate_computed_expr_with_env(condition, state, env)?;
2327 if self.value_to_bool(&cond_val) {
2328 self.evaluate_computed_expr_with_env(then_branch, state, env)
2329 } else {
2330 self.evaluate_computed_expr_with_env(else_branch, state, env)
2331 }
2332 }
2333
2334 ComputedExpr::None => Ok(Value::Null),
2335
2336 ComputedExpr::Some { value } => self.evaluate_computed_expr_with_env(value, state, env),
2337
2338 ComputedExpr::Slice { expr, start, end } => {
2339 let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
2340 match val {
2341 Value::Array(arr) => {
2342 let slice: Vec<Value> = arr.get(*start..*end).unwrap_or(&[]).to_vec();
2343 Ok(Value::Array(slice))
2344 }
2345 _ => Err(format!("Cannot slice non-array value: {:?}", val).into()),
2346 }
2347 }
2348
2349 ComputedExpr::Index { expr, index } => {
2350 let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
2351 match val {
2352 Value::Array(arr) => Ok(arr.get(*index).cloned().unwrap_or(Value::Null)),
2353 _ => Err(format!("Cannot index non-array value: {:?}", val).into()),
2354 }
2355 }
2356
2357 ComputedExpr::U64FromLeBytes { bytes } => {
2358 let val = self.evaluate_computed_expr_with_env(bytes, state, env)?;
2359 let byte_vec = self.value_to_bytes(&val)?;
2360 if byte_vec.len() < 8 {
2361 return Err(format!(
2362 "u64::from_le_bytes requires 8 bytes, got {}",
2363 byte_vec.len()
2364 )
2365 .into());
2366 }
2367 let arr: [u8; 8] = byte_vec[..8]
2368 .try_into()
2369 .map_err(|_| "Failed to convert to [u8; 8]")?;
2370 Ok(json!(u64::from_le_bytes(arr)))
2371 }
2372
2373 ComputedExpr::U64FromBeBytes { bytes } => {
2374 let val = self.evaluate_computed_expr_with_env(bytes, state, env)?;
2375 let byte_vec = self.value_to_bytes(&val)?;
2376 if byte_vec.len() < 8 {
2377 return Err(format!(
2378 "u64::from_be_bytes requires 8 bytes, got {}",
2379 byte_vec.len()
2380 )
2381 .into());
2382 }
2383 let arr: [u8; 8] = byte_vec[..8]
2384 .try_into()
2385 .map_err(|_| "Failed to convert to [u8; 8]")?;
2386 Ok(json!(u64::from_be_bytes(arr)))
2387 }
2388
2389 ComputedExpr::ByteArray { bytes } => {
2390 Ok(Value::Array(bytes.iter().map(|b| json!(*b)).collect()))
2391 }
2392
2393 ComputedExpr::Closure { param, body } => {
2394 Ok(json!({
2397 "__closure": {
2398 "param": param,
2399 "body": serde_json::to_value(body).unwrap_or(Value::Null)
2400 }
2401 }))
2402 }
2403
2404 ComputedExpr::Unary { op, expr } => {
2405 let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
2406 self.apply_unary_op(op, &val)
2407 }
2408
2409 ComputedExpr::JsonToBytes { expr } => {
2410 let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
2411 let bytes = self.value_to_bytes(&val)?;
2413 Ok(Value::Array(bytes.iter().map(|b| json!(*b)).collect()))
2414 }
2415
2416 ComputedExpr::UnwrapOr { expr, default } => {
2417 let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
2418 if val.is_null() {
2419 Ok(default.clone())
2420 } else {
2421 Ok(val)
2422 }
2423 }
2424
2425 ComputedExpr::Binary { op, left, right } => {
2426 let l = self.evaluate_computed_expr_with_env(left, state, env)?;
2427 let r = self.evaluate_computed_expr_with_env(right, state, env)?;
2428 self.apply_binary_op(op, &l, &r)
2429 }
2430
2431 ComputedExpr::Cast { expr, to_type } => {
2432 let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
2433 self.apply_cast(&val, to_type)
2434 }
2435
2436 ComputedExpr::MethodCall { expr, method, args } => {
2437 let val = self.evaluate_computed_expr_with_env(expr, state, env)?;
2438 if method == "map" && args.len() == 1 {
2440 if let ComputedExpr::Closure { param, body } = &args[0] {
2441 if val.is_null() {
2443 return Ok(Value::Null);
2444 }
2445 let mut closure_env = env.clone();
2447 closure_env.insert(param.clone(), val);
2448 return self.evaluate_computed_expr_with_env(body, state, &closure_env);
2449 }
2450 }
2451 let evaluated_args: Vec<Value> = args
2452 .iter()
2453 .map(|a| self.evaluate_computed_expr_with_env(a, state, env))
2454 .collect::<Result<Vec<_>>>()?;
2455 self.apply_method_call(&val, method, &evaluated_args)
2456 }
2457
2458 ComputedExpr::Literal { value } => Ok(value.clone()),
2459
2460 ComputedExpr::Paren { expr } => self.evaluate_computed_expr_with_env(expr, state, env),
2461 }
2462 }
2463
2464 fn value_to_bytes(&self, val: &Value) -> Result<Vec<u8>> {
2466 match val {
2467 Value::Array(arr) => arr
2468 .iter()
2469 .map(|v| {
2470 v.as_u64()
2471 .map(|n| n as u8)
2472 .ok_or_else(|| "Array element not a valid byte".into())
2473 })
2474 .collect(),
2475 Value::String(s) => {
2476 if s.starts_with("0x") || s.starts_with("0X") {
2478 hex::decode(&s[2..]).map_err(|e| format!("Invalid hex string: {}", e).into())
2479 } else {
2480 hex::decode(s).map_err(|e| format!("Invalid hex string: {}", e).into())
2481 }
2482 }
2483 _ => Err(format!("Cannot convert {:?} to bytes", val).into()),
2484 }
2485 }
2486
2487 fn apply_unary_op(&self, op: &crate::ast::UnaryOp, val: &Value) -> Result<Value> {
2489 use crate::ast::UnaryOp;
2490 match op {
2491 UnaryOp::Not => Ok(json!(!self.value_to_bool(val))),
2492 UnaryOp::ReverseBits => match val.as_u64() {
2493 Some(n) => Ok(json!(n.reverse_bits())),
2494 None => match val.as_i64() {
2495 Some(n) => Ok(json!((n as u64).reverse_bits())),
2496 None => Err("reverse_bits requires an integer".into()),
2497 },
2498 },
2499 }
2500 }
2501
2502 fn get_field_from_state(&self, state: &Value, path: &str) -> Result<Value> {
2504 let segments: Vec<&str> = path.split('.').collect();
2505 let mut current = state;
2506
2507 for segment in segments {
2508 match current.get(segment) {
2509 Some(v) => current = v,
2510 None => return Ok(Value::Null),
2511 }
2512 }
2513
2514 Ok(current.clone())
2515 }
2516
2517 fn apply_binary_op(&self, op: &BinaryOp, left: &Value, right: &Value) -> Result<Value> {
2519 match op {
2520 BinaryOp::Add => self.numeric_op(left, right, |a, b| a + b, |a, b| a + b),
2522 BinaryOp::Sub => self.numeric_op(left, right, |a, b| a - b, |a, b| a - b),
2523 BinaryOp::Mul => self.numeric_op(left, right, |a, b| a * b, |a, b| a * b),
2524 BinaryOp::Div => {
2525 if let Some(r) = right.as_i64() {
2527 if r == 0 {
2528 return Err("Division by zero".into());
2529 }
2530 }
2531 if let Some(r) = right.as_f64() {
2532 if r == 0.0 {
2533 return Err("Division by zero".into());
2534 }
2535 }
2536 self.numeric_op(left, right, |a, b| a / b, |a, b| a / b)
2537 }
2538 BinaryOp::Mod => {
2539 match (left.as_i64(), right.as_i64()) {
2541 (Some(a), Some(b)) if b != 0 => Ok(json!(a % b)),
2542 (None, _) | (_, None) => match (left.as_u64(), right.as_u64()) {
2543 (Some(a), Some(b)) if b != 0 => Ok(json!(a % b)),
2544 _ => Err("Modulo requires non-zero integer operands".into()),
2545 },
2546 _ => Err("Modulo by zero".into()),
2547 }
2548 }
2549
2550 BinaryOp::Gt => self.comparison_op(left, right, |a, b| a > b, |a, b| a > b),
2552 BinaryOp::Lt => self.comparison_op(left, right, |a, b| a < b, |a, b| a < b),
2553 BinaryOp::Gte => self.comparison_op(left, right, |a, b| a >= b, |a, b| a >= b),
2554 BinaryOp::Lte => self.comparison_op(left, right, |a, b| a <= b, |a, b| a <= b),
2555 BinaryOp::Eq => Ok(json!(left == right)),
2556 BinaryOp::Ne => Ok(json!(left != right)),
2557
2558 BinaryOp::And => {
2560 let l_bool = self.value_to_bool(left);
2561 let r_bool = self.value_to_bool(right);
2562 Ok(json!(l_bool && r_bool))
2563 }
2564 BinaryOp::Or => {
2565 let l_bool = self.value_to_bool(left);
2566 let r_bool = self.value_to_bool(right);
2567 Ok(json!(l_bool || r_bool))
2568 }
2569
2570 BinaryOp::Xor => match (left.as_u64(), right.as_u64()) {
2572 (Some(a), Some(b)) => Ok(json!(a ^ b)),
2573 _ => match (left.as_i64(), right.as_i64()) {
2574 (Some(a), Some(b)) => Ok(json!(a ^ b)),
2575 _ => Err("XOR requires integer operands".into()),
2576 },
2577 },
2578 BinaryOp::BitAnd => match (left.as_u64(), right.as_u64()) {
2579 (Some(a), Some(b)) => Ok(json!(a & b)),
2580 _ => match (left.as_i64(), right.as_i64()) {
2581 (Some(a), Some(b)) => Ok(json!(a & b)),
2582 _ => Err("BitAnd requires integer operands".into()),
2583 },
2584 },
2585 BinaryOp::BitOr => match (left.as_u64(), right.as_u64()) {
2586 (Some(a), Some(b)) => Ok(json!(a | b)),
2587 _ => match (left.as_i64(), right.as_i64()) {
2588 (Some(a), Some(b)) => Ok(json!(a | b)),
2589 _ => Err("BitOr requires integer operands".into()),
2590 },
2591 },
2592 BinaryOp::Shl => match (left.as_u64(), right.as_u64()) {
2593 (Some(a), Some(b)) => Ok(json!(a << b)),
2594 _ => match (left.as_i64(), right.as_i64()) {
2595 (Some(a), Some(b)) => Ok(json!(a << b)),
2596 _ => Err("Shl requires integer operands".into()),
2597 },
2598 },
2599 BinaryOp::Shr => match (left.as_u64(), right.as_u64()) {
2600 (Some(a), Some(b)) => Ok(json!(a >> b)),
2601 _ => match (left.as_i64(), right.as_i64()) {
2602 (Some(a), Some(b)) => Ok(json!(a >> b)),
2603 _ => Err("Shr requires integer operands".into()),
2604 },
2605 },
2606 }
2607 }
2608
2609 fn numeric_op<F1, F2>(
2611 &self,
2612 left: &Value,
2613 right: &Value,
2614 int_op: F1,
2615 float_op: F2,
2616 ) -> Result<Value>
2617 where
2618 F1: Fn(i64, i64) -> i64,
2619 F2: Fn(f64, f64) -> f64,
2620 {
2621 if let (Some(a), Some(b)) = (left.as_i64(), right.as_i64()) {
2623 return Ok(json!(int_op(a, b)));
2624 }
2625
2626 if let (Some(a), Some(b)) = (left.as_u64(), right.as_u64()) {
2628 return Ok(json!(int_op(a as i64, b as i64)));
2630 }
2631
2632 if let (Some(a), Some(b)) = (left.as_f64(), right.as_f64()) {
2634 return Ok(json!(float_op(a, b)));
2635 }
2636
2637 if left.is_null() || right.is_null() {
2639 return Ok(Value::Null);
2640 }
2641
2642 Err(format!(
2643 "Cannot perform numeric operation on {:?} and {:?}",
2644 left, right
2645 )
2646 .into())
2647 }
2648
2649 fn comparison_op<F1, F2>(
2651 &self,
2652 left: &Value,
2653 right: &Value,
2654 int_cmp: F1,
2655 float_cmp: F2,
2656 ) -> Result<Value>
2657 where
2658 F1: Fn(i64, i64) -> bool,
2659 F2: Fn(f64, f64) -> bool,
2660 {
2661 if let (Some(a), Some(b)) = (left.as_i64(), right.as_i64()) {
2663 return Ok(json!(int_cmp(a, b)));
2664 }
2665
2666 if let (Some(a), Some(b)) = (left.as_u64(), right.as_u64()) {
2668 return Ok(json!(int_cmp(a as i64, b as i64)));
2669 }
2670
2671 if let (Some(a), Some(b)) = (left.as_f64(), right.as_f64()) {
2673 return Ok(json!(float_cmp(a, b)));
2674 }
2675
2676 if left.is_null() || right.is_null() {
2678 return Ok(json!(false));
2679 }
2680
2681 Err(format!("Cannot compare {:?} and {:?}", left, right).into())
2682 }
2683
2684 fn value_to_bool(&self, value: &Value) -> bool {
2686 match value {
2687 Value::Null => false,
2688 Value::Bool(b) => *b,
2689 Value::Number(n) => {
2690 if let Some(i) = n.as_i64() {
2691 i != 0
2692 } else if let Some(f) = n.as_f64() {
2693 f != 0.0
2694 } else {
2695 true
2696 }
2697 }
2698 Value::String(s) => !s.is_empty(),
2699 Value::Array(arr) => !arr.is_empty(),
2700 Value::Object(obj) => !obj.is_empty(),
2701 }
2702 }
2703
2704 fn apply_cast(&self, value: &Value, to_type: &str) -> Result<Value> {
2706 match to_type {
2707 "i8" | "i16" | "i32" | "i64" | "isize" => {
2708 if let Some(n) = value.as_i64() {
2709 Ok(json!(n))
2710 } else if let Some(n) = value.as_u64() {
2711 Ok(json!(n as i64))
2712 } else if let Some(n) = value.as_f64() {
2713 Ok(json!(n as i64))
2714 } else if let Some(s) = value.as_str() {
2715 s.parse::<i64>()
2716 .map(|n| json!(n))
2717 .map_err(|e| format!("Cannot parse '{}' as integer: {}", s, e).into())
2718 } else {
2719 Err(format!("Cannot cast {:?} to {}", value, to_type).into())
2720 }
2721 }
2722 "u8" | "u16" | "u32" | "u64" | "usize" => {
2723 if let Some(n) = value.as_u64() {
2724 Ok(json!(n))
2725 } else if let Some(n) = value.as_i64() {
2726 Ok(json!(n as u64))
2727 } else if let Some(n) = value.as_f64() {
2728 Ok(json!(n as u64))
2729 } else if let Some(s) = value.as_str() {
2730 s.parse::<u64>().map(|n| json!(n)).map_err(|e| {
2731 format!("Cannot parse '{}' as unsigned integer: {}", s, e).into()
2732 })
2733 } else {
2734 Err(format!("Cannot cast {:?} to {}", value, to_type).into())
2735 }
2736 }
2737 "f32" | "f64" => {
2738 if let Some(n) = value.as_f64() {
2739 Ok(json!(n))
2740 } else if let Some(n) = value.as_i64() {
2741 Ok(json!(n as f64))
2742 } else if let Some(n) = value.as_u64() {
2743 Ok(json!(n as f64))
2744 } else if let Some(s) = value.as_str() {
2745 s.parse::<f64>()
2746 .map(|n| json!(n))
2747 .map_err(|e| format!("Cannot parse '{}' as float: {}", s, e).into())
2748 } else {
2749 Err(format!("Cannot cast {:?} to {}", value, to_type).into())
2750 }
2751 }
2752 "String" | "string" => Ok(json!(value.to_string())),
2753 "bool" => Ok(json!(self.value_to_bool(value))),
2754 _ => {
2755 Ok(value.clone())
2757 }
2758 }
2759 }
2760
2761 fn apply_method_call(&self, value: &Value, method: &str, args: &[Value]) -> Result<Value> {
2763 match method {
2764 "unwrap_or" => {
2765 if value.is_null() && !args.is_empty() {
2766 Ok(args[0].clone())
2767 } else {
2768 Ok(value.clone())
2769 }
2770 }
2771 "unwrap_or_default" => {
2772 if value.is_null() {
2773 Ok(json!(0))
2775 } else {
2776 Ok(value.clone())
2777 }
2778 }
2779 "is_some" => Ok(json!(!value.is_null())),
2780 "is_none" => Ok(json!(value.is_null())),
2781 "abs" => {
2782 if let Some(n) = value.as_i64() {
2783 Ok(json!(n.abs()))
2784 } else if let Some(n) = value.as_f64() {
2785 Ok(json!(n.abs()))
2786 } else {
2787 Err(format!("Cannot call abs() on {:?}", value).into())
2788 }
2789 }
2790 "len" => {
2791 if let Some(s) = value.as_str() {
2792 Ok(json!(s.len()))
2793 } else if let Some(arr) = value.as_array() {
2794 Ok(json!(arr.len()))
2795 } else if let Some(obj) = value.as_object() {
2796 Ok(json!(obj.len()))
2797 } else {
2798 Err(format!("Cannot call len() on {:?}", value).into())
2799 }
2800 }
2801 "to_string" => Ok(json!(value.to_string())),
2802 "min" => {
2803 if args.is_empty() {
2804 return Err("min() requires an argument".into());
2805 }
2806 let other = &args[0];
2807 if let (Some(a), Some(b)) = (value.as_i64(), other.as_i64()) {
2808 Ok(json!(a.min(b)))
2809 } else if let (Some(a), Some(b)) = (value.as_f64(), other.as_f64()) {
2810 Ok(json!(a.min(b)))
2811 } else {
2812 Err(format!("Cannot call min() on {:?} and {:?}", value, other).into())
2813 }
2814 }
2815 "max" => {
2816 if args.is_empty() {
2817 return Err("max() requires an argument".into());
2818 }
2819 let other = &args[0];
2820 if let (Some(a), Some(b)) = (value.as_i64(), other.as_i64()) {
2821 Ok(json!(a.max(b)))
2822 } else if let (Some(a), Some(b)) = (value.as_f64(), other.as_f64()) {
2823 Ok(json!(a.max(b)))
2824 } else {
2825 Err(format!("Cannot call max() on {:?} and {:?}", value, other).into())
2826 }
2827 }
2828 "saturating_add" => {
2829 if args.is_empty() {
2830 return Err("saturating_add() requires an argument".into());
2831 }
2832 let other = &args[0];
2833 if let (Some(a), Some(b)) = (value.as_i64(), other.as_i64()) {
2834 Ok(json!(a.saturating_add(b)))
2835 } else if let (Some(a), Some(b)) = (value.as_u64(), other.as_u64()) {
2836 Ok(json!(a.saturating_add(b)))
2837 } else {
2838 Err(format!(
2839 "Cannot call saturating_add() on {:?} and {:?}",
2840 value, other
2841 )
2842 .into())
2843 }
2844 }
2845 "saturating_sub" => {
2846 if args.is_empty() {
2847 return Err("saturating_sub() requires an argument".into());
2848 }
2849 let other = &args[0];
2850 if let (Some(a), Some(b)) = (value.as_i64(), other.as_i64()) {
2851 Ok(json!(a.saturating_sub(b)))
2852 } else if let (Some(a), Some(b)) = (value.as_u64(), other.as_u64()) {
2853 Ok(json!(a.saturating_sub(b)))
2854 } else {
2855 Err(format!(
2856 "Cannot call saturating_sub() on {:?} and {:?}",
2857 value, other
2858 )
2859 .into())
2860 }
2861 }
2862 _ => {
2863 tracing::warn!("Unknown method call: {}() on {:?}", method, value);
2865 Ok(value.clone())
2866 }
2867 }
2868 }
2869
2870 pub fn evaluate_computed_fields_from_ast(
2873 &self,
2874 state: &mut Value,
2875 computed_field_specs: &[ComputedFieldSpec],
2876 ) -> Result<Vec<String>> {
2877 let mut updated_paths = Vec::new();
2878
2879 for spec in computed_field_specs {
2880 match self.evaluate_computed_expr(&spec.expression, state) {
2881 Ok(result) => {
2882 self.set_field_in_state(state, &spec.target_path, result)?;
2884 updated_paths.push(spec.target_path.clone());
2885 }
2886 Err(e) => {
2887 tracing::warn!(
2889 "Failed to evaluate computed field '{}': {}",
2890 spec.target_path,
2891 e
2892 );
2893 }
2894 }
2895 }
2896
2897 Ok(updated_paths)
2898 }
2899
2900 fn set_field_in_state(&self, state: &mut Value, path: &str, value: Value) -> Result<()> {
2902 let segments: Vec<&str> = path.split('.').collect();
2903
2904 if segments.is_empty() {
2905 return Err("Empty path".into());
2906 }
2907
2908 let mut current = state;
2910 for (i, segment) in segments.iter().enumerate() {
2911 if i == segments.len() - 1 {
2912 if let Some(obj) = current.as_object_mut() {
2914 obj.insert(segment.to_string(), value);
2915 return Ok(());
2916 } else {
2917 return Err(format!("Cannot set field '{}' on non-object", segment).into());
2918 }
2919 } else {
2920 if !current.is_object() {
2922 *current = json!({});
2923 }
2924 let obj = current.as_object_mut().unwrap();
2925 current = obj.entry(segment.to_string()).or_insert_with(|| json!({}));
2926 }
2927 }
2928
2929 Ok(())
2930 }
2931
2932 pub fn create_evaluator_from_specs(
2935 specs: Vec<ComputedFieldSpec>,
2936 ) -> impl Fn(&mut Value) -> Result<()> + Send + Sync + 'static {
2937 move |state: &mut Value| {
2938 let vm = VmContext::new();
2941 vm.evaluate_computed_fields_from_ast(state, &specs)?;
2942 Ok(())
2943 }
2944 }
2945}
2946
2947impl Default for VmContext {
2948 fn default() -> Self {
2949 Self::new()
2950 }
2951}
2952
2953impl crate::resolvers::ReverseLookupUpdater for VmContext {
2955 fn update(&mut self, pda_address: String, seed_value: String) -> Vec<PendingAccountUpdate> {
2956 self.update_pda_reverse_lookup(0, "default_pda_lookup", pda_address, seed_value)
2958 .unwrap_or_else(|e| {
2959 tracing::error!("Failed to update PDA reverse lookup: {}", e);
2960 Vec::new()
2961 })
2962 }
2963
2964 fn flush_pending(&mut self, pda_address: &str) -> Vec<PendingAccountUpdate> {
2965 self.flush_pending_updates(0, pda_address)
2967 .unwrap_or_else(|e| {
2968 tracing::error!("Failed to flush pending updates: {}", e);
2969 Vec::new()
2970 })
2971 }
2972}
2973
2974#[cfg(test)]
2975mod tests {
2976 use super::*;
2977 use crate::ast::{BinaryOp, ComputedExpr, ComputedFieldSpec};
2978
2979 #[test]
2980 fn test_computed_field_preserves_integer_type() {
2981 let vm = VmContext::new();
2982
2983 let mut state = serde_json::json!({
2984 "trading": {
2985 "total_buy_volume": 20000000000_i64,
2986 "total_sell_volume": 17951316474_i64
2987 }
2988 });
2989
2990 let spec = ComputedFieldSpec {
2991 target_path: "trading.total_volume".to_string(),
2992 result_type: "Option<u64>".to_string(),
2993 expression: ComputedExpr::Binary {
2994 op: BinaryOp::Add,
2995 left: Box::new(ComputedExpr::UnwrapOr {
2996 expr: Box::new(ComputedExpr::FieldRef {
2997 path: "trading.total_buy_volume".to_string(),
2998 }),
2999 default: serde_json::json!(0),
3000 }),
3001 right: Box::new(ComputedExpr::UnwrapOr {
3002 expr: Box::new(ComputedExpr::FieldRef {
3003 path: "trading.total_sell_volume".to_string(),
3004 }),
3005 default: serde_json::json!(0),
3006 }),
3007 },
3008 };
3009
3010 vm.evaluate_computed_fields_from_ast(&mut state, &[spec])
3011 .unwrap();
3012
3013 let total_volume = state
3014 .get("trading")
3015 .and_then(|t| t.get("total_volume"))
3016 .expect("total_volume should exist");
3017
3018 let serialized = serde_json::to_string(total_volume).unwrap();
3019 assert!(
3020 !serialized.contains('.'),
3021 "Integer should not have decimal point: {}",
3022 serialized
3023 );
3024 assert_eq!(
3025 total_volume.as_i64(),
3026 Some(37951316474),
3027 "Value should be correct sum"
3028 );
3029 }
3030
3031 #[test]
3032 fn test_set_field_sum_preserves_integer_type() {
3033 let mut vm = VmContext::new();
3034 vm.registers[0] = serde_json::json!({});
3035 vm.registers[1] = serde_json::json!(20000000000_i64);
3036 vm.registers[2] = serde_json::json!(17951316474_i64);
3037
3038 vm.set_field_sum(0, "trading.total_buy_volume", 1).unwrap();
3039 vm.set_field_sum(0, "trading.total_sell_volume", 2).unwrap();
3040
3041 let state = &vm.registers[0];
3042 let buy_vol = state
3043 .get("trading")
3044 .and_then(|t| t.get("total_buy_volume"))
3045 .unwrap();
3046 let sell_vol = state
3047 .get("trading")
3048 .and_then(|t| t.get("total_sell_volume"))
3049 .unwrap();
3050
3051 let buy_serialized = serde_json::to_string(buy_vol).unwrap();
3052 let sell_serialized = serde_json::to_string(sell_vol).unwrap();
3053
3054 assert!(
3055 !buy_serialized.contains('.'),
3056 "Buy volume should not have decimal: {}",
3057 buy_serialized
3058 );
3059 assert!(
3060 !sell_serialized.contains('.'),
3061 "Sell volume should not have decimal: {}",
3062 sell_serialized
3063 );
3064 }
3065}