1use super::tables::{ExportTable, ImportTable, Value};
5use crate::il::{ArrayOp, CallOp, ObjectOp, Op, Plan, Source};
6use crate::CapId;
7use crate::{RpcError, RpcTarget};
8use serde_json::Number;
9use std::collections::HashMap;
10use std::sync::Arc;
11
12#[derive(Debug)]
14pub struct ExecutionContext {
15 results: Vec<Option<Value>>,
17 parameters: Value,
19 captures: Vec<Arc<dyn RpcTarget>>,
21 #[allow(dead_code)]
23 variables: HashMap<String, Value>,
24}
25
26impl ExecutionContext {
27 pub fn new(parameters: Value, captures: Vec<Arc<dyn RpcTarget>>) -> Self {
29 Self {
30 results: Vec::new(),
31 parameters,
32 captures,
33 variables: HashMap::new(),
34 }
35 }
36
37 fn convert_serde_json_value_to_tables_value(value: serde_json::Value) -> Value {
39 match value {
40 serde_json::Value::Null => Value::Null,
41 serde_json::Value::Bool(b) => Value::Bool(b),
42 serde_json::Value::Number(n) => Value::Number(n),
43 serde_json::Value::String(s) => Value::String(s),
44 serde_json::Value::Array(arr) => Value::Array(
45 arr.into_iter()
46 .map(Self::convert_serde_json_value_to_tables_value)
47 .collect(),
48 ),
49 serde_json::Value::Object(obj) => {
50 let mut map = HashMap::new();
51 for (k, v) in obj {
52 map.insert(
53 k,
54 Box::new(Self::convert_serde_json_value_to_tables_value(v)),
55 );
56 }
57 Value::Object(map)
58 }
59 }
60 }
61
62 pub async fn get_source_value(&self, source: &Source) -> Result<Value, PlanExecutionError> {
64 match source {
65 Source::Capture { capture } => {
66 if capture.index as usize >= self.captures.len() {
67 return Err(PlanExecutionError::InvalidCaptureIndex(capture.index));
68 }
69 Ok(Value::Object({
71 let mut obj = HashMap::new();
72 obj.insert(
73 "$cap".to_string(),
74 Box::new(Value::Number(Number::from(capture.index))),
75 );
76 obj
77 }))
78 }
79 Source::Result { result } => {
80 if result.index as usize >= self.results.len() {
81 return Err(PlanExecutionError::InvalidResultIndex(result.index));
82 }
83 match &self.results[result.index as usize] {
84 Some(value) => Ok(value.clone()),
85 None => Err(PlanExecutionError::ResultNotSet(result.index)),
86 }
87 }
88 Source::Param { param } => self.get_nested_parameter(¶m.path),
89 Source::ByValue { by_value } => Ok(Self::convert_serde_json_value_to_tables_value(
90 by_value.value.clone(),
91 )),
92 }
93 }
94
95 fn get_nested_parameter(&self, path: &[String]) -> Result<Value, PlanExecutionError> {
97 let mut current = &self.parameters;
98
99 for segment in path {
100 match current {
101 Value::Object(obj) => {
102 current = obj
103 .get(segment)
104 .ok_or_else(|| PlanExecutionError::ParameterNotFound(segment.clone()))?
105 .as_ref();
106 }
107 _ => return Err(PlanExecutionError::ParameterNotObject(segment.clone())),
108 }
109 }
110
111 Ok(current.clone())
112 }
113
114 pub fn set_result(&mut self, index: u32, value: Value) {
116 while self.results.len() <= index as usize {
118 self.results.push(None);
119 }
120 self.results[index as usize] = Some(value);
121 }
122
123 pub fn get_capability(&self, index: u32) -> Result<Arc<dyn RpcTarget>, PlanExecutionError> {
125 if index as usize >= self.captures.len() {
126 return Err(PlanExecutionError::InvalidCaptureIndex(index));
127 }
128 Ok(self.captures[index as usize].clone())
129 }
130}
131
132#[derive(Debug)]
134pub struct PlanRunner {
135 #[allow(dead_code)]
137 imports: Arc<ImportTable>,
138 #[allow(dead_code)]
140 exports: Arc<ExportTable>,
141 timeout_ms: u64,
143 max_operations: usize,
145}
146
147impl PlanRunner {
148 pub fn new(imports: Arc<ImportTable>, exports: Arc<ExportTable>) -> Self {
150 Self {
151 imports,
152 exports,
153 timeout_ms: 30000, max_operations: 1000, }
156 }
157
158 pub fn with_settings(
160 imports: Arc<ImportTable>,
161 exports: Arc<ExportTable>,
162 timeout_ms: u64,
163 max_operations: usize,
164 ) -> Self {
165 Self {
166 imports,
167 exports,
168 timeout_ms,
169 max_operations,
170 }
171 }
172
173 pub async fn execute_plan(
175 &self,
176 plan: &Plan,
177 parameters: Value,
178 captures: Vec<Arc<dyn RpcTarget>>,
179 ) -> Result<Value, PlanExecutionError> {
180 plan.validate()
182 .map_err(PlanExecutionError::ValidationError)?;
183
184 if plan.ops.len() > self.max_operations {
185 return Err(PlanExecutionError::TooManyOperations(plan.ops.len()));
186 }
187
188 tracing::debug!(
189 ops_count = plan.ops.len(),
190 captures_count = captures.len(),
191 "Executing IL plan"
192 );
193
194 let mut context = ExecutionContext::new(parameters, captures);
195
196 for (i, op) in plan.ops.iter().enumerate() {
198 tracing::trace!(operation_index = i, "Executing operation");
199
200 let result = tokio::time::timeout(
201 std::time::Duration::from_millis(self.timeout_ms),
202 self.execute_operation(op, &mut context),
203 )
204 .await
205 .map_err(|_| PlanExecutionError::ExecutionTimeout)?;
206
207 match result {
208 Ok(value) => {
209 context.set_result(op.get_result_index(), value);
210 }
211 Err(e) => {
212 tracing::error!(
213 operation_index = i,
214 error = %e,
215 "Operation execution failed"
216 );
217 return Err(e);
218 }
219 }
220 }
221
222 let final_result = context.get_source_value(&plan.result).await?;
224
225 tracing::debug!("Plan execution completed successfully");
226 Ok(final_result)
227 }
228
229 async fn execute_operation(
231 &self,
232 op: &Op,
233 context: &mut ExecutionContext,
234 ) -> Result<Value, PlanExecutionError> {
235 match op {
236 Op::Call { call } => self.execute_call_op(call, context).await,
237 Op::Object { object } => self.execute_object_op(object, context).await,
238 Op::Array { array } => self.execute_array_op(array, context).await,
239 }
240 }
241
242 async fn execute_call_op(
244 &self,
245 call: &CallOp,
246 context: &mut ExecutionContext,
247 ) -> Result<Value, PlanExecutionError> {
248 let target = self.resolve_target(&call.target, context).await?;
250
251 let mut args = Vec::new();
253 for arg_source in &call.args {
254 let arg_value = context.get_source_value(arg_source).await?;
255 args.push(arg_value);
256 }
257
258 tracing::trace!(
259 member = %call.member,
260 args_count = args.len(),
261 "Executing RPC call"
262 );
263
264 let result = target
266 .call(&call.member, args)
267 .await
268 .map_err(PlanExecutionError::RpcCallFailed)?;
269
270 tracing::trace!(member = %call.member, "RPC call completed");
271 Ok(result)
272 }
273
274 async fn execute_object_op(
276 &self,
277 object: &ObjectOp,
278 context: &mut ExecutionContext,
279 ) -> Result<Value, PlanExecutionError> {
280 let mut obj = HashMap::new();
281
282 for (key, source) in &object.fields {
283 let value = context.get_source_value(source).await?;
284 obj.insert(key.clone(), Box::new(value));
285 }
286
287 tracing::trace!(fields_count = obj.len(), "Created object");
288 Ok(Value::Object(obj))
289 }
290
291 async fn execute_array_op(
293 &self,
294 array: &ArrayOp,
295 context: &mut ExecutionContext,
296 ) -> Result<Value, PlanExecutionError> {
297 let mut items = Vec::new();
298
299 for source in &array.items {
300 let value = context.get_source_value(source).await?;
301 items.push(value);
302 }
303
304 tracing::trace!(items_count = items.len(), "Created array");
305 Ok(Value::Array(items))
306 }
307
308 async fn resolve_target(
310 &self,
311 source: &Source,
312 context: &ExecutionContext,
313 ) -> Result<Arc<dyn RpcTarget>, PlanExecutionError> {
314 match source {
315 Source::Capture { capture } => context.get_capability(capture.index),
316 Source::Result { result: _ } => {
317 let value = context.get_source_value(source).await?;
319 if let Value::Object(obj) = value {
320 if let Some(cap_ref) = obj.get("$cap") {
321 if let Value::Number(n) = cap_ref.as_ref() {
322 if let Some(cap_index) = n.as_u64() {
323 return context.get_capability(cap_index as u32);
324 }
325 }
326 }
327 }
328 Err(PlanExecutionError::InvalidTarget(
329 "Result is not a capability".to_string(),
330 ))
331 }
332 _ => Err(PlanExecutionError::InvalidTarget(
333 "Source cannot be used as a target".to_string(),
334 )),
335 }
336 }
337}
338
339#[derive(Debug, thiserror::Error)]
341pub enum PlanExecutionError {
342 #[error("Validation error: {0}")]
343 ValidationError(String),
344
345 #[error("Invalid capture index: {0}")]
346 InvalidCaptureIndex(u32),
347
348 #[error("Invalid result index: {0}")]
349 InvalidResultIndex(u32),
350
351 #[error("Result not set: {0}")]
352 ResultNotSet(u32),
353
354 #[error("Parameter not found: {0}")]
355 ParameterNotFound(String),
356
357 #[error("Parameter is not an object: {0}")]
358 ParameterNotObject(String),
359
360 #[error("RPC call failed: {0}")]
361 RpcCallFailed(RpcError),
362
363 #[error("Invalid target: {0}")]
364 InvalidTarget(String),
365
366 #[error("Execution timeout")]
367 ExecutionTimeout,
368
369 #[error("Too many operations: {0}")]
370 TooManyOperations(usize),
371
372 #[error("Plan execution error: {0}")]
373 ExecutionError(String),
374}
375
376#[derive(Debug)]
378pub struct PlanBuilder {
379 captures: Vec<CapId>,
380 ops: Vec<Op>,
381 next_result_index: u32,
382}
383
384impl PlanBuilder {
385 pub fn new() -> Self {
387 Self {
388 captures: Vec::new(),
389 ops: Vec::new(),
390 next_result_index: 0,
391 }
392 }
393
394 pub fn add_capture(&mut self, cap_id: CapId) -> u32 {
396 let index = self.captures.len() as u32;
397 self.captures.push(cap_id);
398 index
399 }
400
401 pub fn add_call(&mut self, target: Source, method: String, args: Vec<Source>) -> u32 {
403 let result_index = self.next_result_index;
404 self.next_result_index += 1;
405
406 let op = Op::call(target, method, args, result_index);
407 self.ops.push(op);
408
409 result_index
410 }
411
412 pub fn add_object(&mut self, fields: HashMap<String, Source>) -> u32 {
414 let result_index = self.next_result_index;
415 self.next_result_index += 1;
416
417 let op = Op::object(fields.into_iter().collect(), result_index);
418 self.ops.push(op);
419
420 result_index
421 }
422
423 pub fn add_array(&mut self, items: Vec<Source>) -> u32 {
425 let result_index = self.next_result_index;
426 self.next_result_index += 1;
427
428 let op = Op::array(items, result_index);
429 self.ops.push(op);
430
431 result_index
432 }
433
434 pub fn build(self, result_source: Source) -> Plan {
436 Plan::new(self.captures, self.ops, result_source)
437 }
438}
439
440impl Default for PlanBuilder {
441 fn default() -> Self {
442 Self::new()
443 }
444}
445
446pub struct PlanOptimizer;
448
449impl PlanOptimizer {
450 pub fn optimize(plan: Plan) -> Plan {
452 plan
459 }
460
461 pub fn analyze_complexity(plan: &Plan) -> PlanComplexity {
463 let mut call_count = 0;
464 let mut object_count = 0;
465 let mut array_count = 0;
466 let mut max_depth = 0;
467 let mut total_args = 0;
468
469 for op in &plan.ops {
470 match op {
471 Op::Call { call } => {
472 call_count += 1;
473 total_args += call.args.len();
474 }
475 Op::Object { object } => {
476 object_count += 1;
477 max_depth = max_depth.max(object.fields.len());
478 }
479 Op::Array { array } => {
480 array_count += 1;
481 max_depth = max_depth.max(array.items.len());
482 }
483 }
484 }
485
486 PlanComplexity {
487 total_operations: plan.ops.len(),
488 call_operations: call_count,
489 object_operations: object_count,
490 array_operations: array_count,
491 max_depth,
492 total_arguments: total_args,
493 captures_count: plan.captures.len(),
494 }
495 }
496}
497
498#[derive(Debug, Clone)]
500pub struct PlanComplexity {
501 pub total_operations: usize,
502 pub call_operations: usize,
503 pub object_operations: usize,
504 pub array_operations: usize,
505 pub max_depth: usize,
506 pub total_arguments: usize,
507 pub captures_count: usize,
508}
509
510#[cfg(test)]
511mod tests {
512 use super::*;
513 use crate::MockRpcTarget;
514 use serde_json::json;
515 use std::collections::BTreeMap;
516
517 fn json_to_value(json: serde_json::Value) -> Value {
519 match json {
520 serde_json::Value::Null => Value::Null,
521 serde_json::Value::Bool(b) => Value::Bool(b),
522 serde_json::Value::Number(n) => Value::Number(n),
523 serde_json::Value::String(s) => Value::String(s),
524 serde_json::Value::Array(arr) => {
525 Value::Array(arr.into_iter().map(json_to_value).collect())
526 }
527 serde_json::Value::Object(obj) => {
528 let mut map = HashMap::new();
529 for (k, v) in obj {
530 map.insert(k, Box::new(json_to_value(v)));
531 }
532 Value::Object(map)
533 }
534 }
535 }
536
537 #[tokio::test]
538 async fn test_plan_runner_simple_call() {
539 let imports = Arc::new(ImportTable::with_default_allocator());
540 let exports = Arc::new(ExportTable::with_default_allocator());
541 let runner = PlanRunner::new(imports, exports);
542
543 let mock_target = Arc::new(MockRpcTarget::new());
544 let _captures = [mock_target];
545
546 let plan = Plan::new(
547 vec![CapId::new(1)],
548 vec![Op::call(
549 Source::capture(0),
550 "test_method".to_string(),
551 vec![Source::by_value(json!("arg1"))],
552 0,
553 )],
554 Source::result(0),
555 );
556
557 let parameters = json_to_value(json!({}));
558 let captures: Vec<Arc<dyn RpcTarget>> = vec![Arc::new(MockRpcTarget::new())];
559 let result = runner.execute_plan(&plan, parameters, captures).await;
560
561 assert!(result.is_ok());
562 }
563
564 #[tokio::test]
565 async fn test_plan_builder() {
566 let mut builder = PlanBuilder::new();
567
568 let cap_index = builder.add_capture(CapId::new(1));
569 let call_result =
570 builder.add_call(Source::capture(cap_index), "getData".to_string(), vec![]);
571
572 let mut fields = HashMap::new();
573 fields.insert("data".to_string(), Source::result(call_result));
574 fields.insert("extra".to_string(), Source::by_value(json!("info")));
575 let obj_result = builder.add_object(fields);
576
577 let plan = builder.build(Source::result(obj_result));
578
579 assert!(plan.validate().is_ok());
580 assert_eq!(plan.captures.len(), 1);
581 assert_eq!(plan.ops.len(), 2);
582 }
583
584 #[tokio::test]
585 async fn test_execution_context_parameters() {
586 let params = json_to_value(json!({
587 "user": {
588 "name": "Alice",
589 "id": 123
590 },
591 "settings": {
592 "theme": "dark"
593 }
594 }));
595
596 let context = ExecutionContext::new(params, vec![]);
597
598 let name = context.get_nested_parameter(&["user".to_string(), "name".to_string()]);
600 assert!(name.is_ok());
601 match name.expect("Should get name") {
602 Value::String(s) => assert_eq!(s, "Alice"),
603 _ => panic!("Expected string value for name"),
604 }
605
606 let theme = context.get_nested_parameter(&["settings".to_string(), "theme".to_string()]);
607 assert!(theme.is_ok());
608 match theme.expect("Should get theme") {
609 Value::String(s) => assert_eq!(s, "dark"),
610 _ => panic!("Expected string value for theme"),
611 }
612 }
613
614 #[tokio::test]
615 async fn test_plan_complexity_analysis() {
616 let plan = Plan::new(
617 vec![CapId::new(1), CapId::new(2)],
618 vec![
619 Op::call(
620 Source::capture(0),
621 "method1".to_string(),
622 vec![
623 Source::by_value(json!("arg1")),
624 Source::by_value(json!("arg2")),
625 ],
626 0,
627 ),
628 Op::object(
629 BTreeMap::from([
630 ("field1".to_string(), Source::result(0)),
631 ("field2".to_string(), Source::capture(1)),
632 ]),
633 1,
634 ),
635 Op::array(vec![Source::result(1), Source::by_value(json!(42))], 2),
636 ],
637 Source::result(2),
638 );
639
640 let complexity = PlanOptimizer::analyze_complexity(&plan);
641
642 assert_eq!(complexity.total_operations, 3);
643 assert_eq!(complexity.call_operations, 1);
644 assert_eq!(complexity.object_operations, 1);
645 assert_eq!(complexity.array_operations, 1);
646 assert_eq!(complexity.total_arguments, 2);
647 assert_eq!(complexity.captures_count, 2);
648 }
649
650 #[tokio::test]
651 async fn test_execution_timeout() {
652 let imports = Arc::new(ImportTable::with_default_allocator());
653 let exports = Arc::new(ExportTable::with_default_allocator());
654 let runner = PlanRunner::with_settings(imports, exports, 10, 1000); let mock_target = Arc::new(MockRpcTarget::new());
658 let _captures = [mock_target];
659
660 let plan = Plan::new(
661 vec![CapId::new(1)],
662 vec![Op::call(
663 Source::capture(0),
664 "slow_method".to_string(),
665 vec![],
666 0,
667 )],
668 Source::result(0),
669 );
670
671 let parameters = json_to_value(json!({}));
672 let captures: Vec<Arc<dyn RpcTarget>> = vec![Arc::new(MockRpcTarget::new())];
673 let _result = runner.execute_plan(&plan, parameters, captures).await;
674
675 }
678}