capnweb_core/protocol/
il_runner.rs

1// Advanced IL Plan Runner - Execution engine for Cap'n Web IL plans
2// Executes complex instruction sequences with capability composition
3
4use super::tables::{ExportTable, ImportTable, Value};
5use crate::il::{ArrayOp, CallOp, ObjectOp, Op, Plan, Source};
6use crate::CapId;
7use crate::{RpcError, RpcTarget};
8use serde_json::Number;
9use std::collections::HashMap;
10use std::sync::Arc;
11
12/// Plan execution context containing runtime state
13#[derive(Debug)]
14pub struct ExecutionContext {
15    /// Intermediate results from operations
16    results: Vec<Option<Value>>,
17    /// Parameter values passed to the plan
18    parameters: Value,
19    /// Captured capabilities
20    captures: Vec<Arc<dyn RpcTarget>>,
21    /// Variable state during execution
22    #[allow(dead_code)]
23    variables: HashMap<String, Value>,
24}
25
26impl ExecutionContext {
27    /// Create a new execution context
28    pub fn new(parameters: Value, captures: Vec<Arc<dyn RpcTarget>>) -> Self {
29        Self {
30            results: Vec::new(),
31            parameters,
32            captures,
33            variables: HashMap::new(),
34        }
35    }
36
37    /// Convert serde_json::Value to tables::Value
38    fn convert_serde_json_value_to_tables_value(value: serde_json::Value) -> Value {
39        match value {
40            serde_json::Value::Null => Value::Null,
41            serde_json::Value::Bool(b) => Value::Bool(b),
42            serde_json::Value::Number(n) => Value::Number(n),
43            serde_json::Value::String(s) => Value::String(s),
44            serde_json::Value::Array(arr) => Value::Array(
45                arr.into_iter()
46                    .map(Self::convert_serde_json_value_to_tables_value)
47                    .collect(),
48            ),
49            serde_json::Value::Object(obj) => {
50                let mut map = HashMap::new();
51                for (k, v) in obj {
52                    map.insert(
53                        k,
54                        Box::new(Self::convert_serde_json_value_to_tables_value(v)),
55                    );
56                }
57                Value::Object(map)
58            }
59        }
60    }
61
62    /// Get a source value from the context
63    pub async fn get_source_value(&self, source: &Source) -> Result<Value, PlanExecutionError> {
64        match source {
65            Source::Capture { capture } => {
66                if capture.index as usize >= self.captures.len() {
67                    return Err(PlanExecutionError::InvalidCaptureIndex(capture.index));
68                }
69                // Return a capability reference
70                Ok(Value::Object({
71                    let mut obj = HashMap::new();
72                    obj.insert(
73                        "$cap".to_string(),
74                        Box::new(Value::Number(Number::from(capture.index))),
75                    );
76                    obj
77                }))
78            }
79            Source::Result { result } => {
80                if result.index as usize >= self.results.len() {
81                    return Err(PlanExecutionError::InvalidResultIndex(result.index));
82                }
83                match &self.results[result.index as usize] {
84                    Some(value) => Ok(value.clone()),
85                    None => Err(PlanExecutionError::ResultNotSet(result.index)),
86                }
87            }
88            Source::Param { param } => self.get_nested_parameter(&param.path),
89            Source::ByValue { by_value } => Ok(Self::convert_serde_json_value_to_tables_value(
90                by_value.value.clone(),
91            )),
92        }
93    }
94
95    /// Get a nested parameter value by path
96    fn get_nested_parameter(&self, path: &[String]) -> Result<Value, PlanExecutionError> {
97        let mut current = &self.parameters;
98
99        for segment in path {
100            match current {
101                Value::Object(obj) => {
102                    current = obj
103                        .get(segment)
104                        .ok_or_else(|| PlanExecutionError::ParameterNotFound(segment.clone()))?
105                        .as_ref();
106                }
107                _ => return Err(PlanExecutionError::ParameterNotObject(segment.clone())),
108            }
109        }
110
111        Ok(current.clone())
112    }
113
114    /// Set a result value
115    pub fn set_result(&mut self, index: u32, value: Value) {
116        // Extend the results vector if necessary
117        while self.results.len() <= index as usize {
118            self.results.push(None);
119        }
120        self.results[index as usize] = Some(value);
121    }
122
123    /// Get a capability by index
124    pub fn get_capability(&self, index: u32) -> Result<Arc<dyn RpcTarget>, PlanExecutionError> {
125        if index as usize >= self.captures.len() {
126            return Err(PlanExecutionError::InvalidCaptureIndex(index));
127        }
128        Ok(self.captures[index as usize].clone())
129    }
130}
131
132/// IL Plan execution engine
133#[derive(Debug)]
134pub struct PlanRunner {
135    /// Import table for capability resolution
136    #[allow(dead_code)]
137    imports: Arc<ImportTable>,
138    /// Export table for capability storage
139    #[allow(dead_code)]
140    exports: Arc<ExportTable>,
141    /// Execution timeout in milliseconds
142    timeout_ms: u64,
143    /// Maximum operations per plan (prevents infinite loops)
144    max_operations: usize,
145}
146
147impl PlanRunner {
148    /// Create a new plan runner
149    pub fn new(imports: Arc<ImportTable>, exports: Arc<ExportTable>) -> Self {
150        Self {
151            imports,
152            exports,
153            timeout_ms: 30000,    // 30 second default timeout
154            max_operations: 1000, // Maximum 1000 operations per plan
155        }
156    }
157
158    /// Create a plan runner with custom settings
159    pub fn with_settings(
160        imports: Arc<ImportTable>,
161        exports: Arc<ExportTable>,
162        timeout_ms: u64,
163        max_operations: usize,
164    ) -> Self {
165        Self {
166            imports,
167            exports,
168            timeout_ms,
169            max_operations,
170        }
171    }
172
173    /// Execute a plan with the given parameters and captures
174    pub async fn execute_plan(
175        &self,
176        plan: &Plan,
177        parameters: Value,
178        captures: Vec<Arc<dyn RpcTarget>>,
179    ) -> Result<Value, PlanExecutionError> {
180        // Validate the plan first
181        plan.validate()
182            .map_err(PlanExecutionError::ValidationError)?;
183
184        if plan.ops.len() > self.max_operations {
185            return Err(PlanExecutionError::TooManyOperations(plan.ops.len()));
186        }
187
188        tracing::debug!(
189            ops_count = plan.ops.len(),
190            captures_count = captures.len(),
191            "Executing IL plan"
192        );
193
194        let mut context = ExecutionContext::new(parameters, captures);
195
196        // Execute operations in sequence
197        for (i, op) in plan.ops.iter().enumerate() {
198            tracing::trace!(operation_index = i, "Executing operation");
199
200            let result = tokio::time::timeout(
201                std::time::Duration::from_millis(self.timeout_ms),
202                self.execute_operation(op, &mut context),
203            )
204            .await
205            .map_err(|_| PlanExecutionError::ExecutionTimeout)?;
206
207            match result {
208                Ok(value) => {
209                    context.set_result(op.get_result_index(), value);
210                }
211                Err(e) => {
212                    tracing::error!(
213                        operation_index = i,
214                        error = %e,
215                        "Operation execution failed"
216                    );
217                    return Err(e);
218                }
219            }
220        }
221
222        // Return the final result
223        let final_result = context.get_source_value(&plan.result).await?;
224
225        tracing::debug!("Plan execution completed successfully");
226        Ok(final_result)
227    }
228
229    /// Execute a single operation
230    async fn execute_operation(
231        &self,
232        op: &Op,
233        context: &mut ExecutionContext,
234    ) -> Result<Value, PlanExecutionError> {
235        match op {
236            Op::Call { call } => self.execute_call_op(call, context).await,
237            Op::Object { object } => self.execute_object_op(object, context).await,
238            Op::Array { array } => self.execute_array_op(array, context).await,
239        }
240    }
241
242    /// Execute a call operation
243    async fn execute_call_op(
244        &self,
245        call: &CallOp,
246        context: &mut ExecutionContext,
247    ) -> Result<Value, PlanExecutionError> {
248        // Resolve the target capability
249        let target = self.resolve_target(&call.target, context).await?;
250
251        // Resolve arguments
252        let mut args = Vec::new();
253        for arg_source in &call.args {
254            let arg_value = context.get_source_value(arg_source).await?;
255            args.push(arg_value);
256        }
257
258        tracing::trace!(
259            member = %call.member,
260            args_count = args.len(),
261            "Executing RPC call"
262        );
263
264        // Execute the RPC call
265        let result = target
266            .call(&call.member, args)
267            .await
268            .map_err(PlanExecutionError::RpcCallFailed)?;
269
270        tracing::trace!(member = %call.member, "RPC call completed");
271        Ok(result)
272    }
273
274    /// Execute an object construction operation
275    async fn execute_object_op(
276        &self,
277        object: &ObjectOp,
278        context: &mut ExecutionContext,
279    ) -> Result<Value, PlanExecutionError> {
280        let mut obj = HashMap::new();
281
282        for (key, source) in &object.fields {
283            let value = context.get_source_value(source).await?;
284            obj.insert(key.clone(), Box::new(value));
285        }
286
287        tracing::trace!(fields_count = obj.len(), "Created object");
288        Ok(Value::Object(obj))
289    }
290
291    /// Execute an array construction operation
292    async fn execute_array_op(
293        &self,
294        array: &ArrayOp,
295        context: &mut ExecutionContext,
296    ) -> Result<Value, PlanExecutionError> {
297        let mut items = Vec::new();
298
299        for source in &array.items {
300            let value = context.get_source_value(source).await?;
301            items.push(value);
302        }
303
304        tracing::trace!(items_count = items.len(), "Created array");
305        Ok(Value::Array(items))
306    }
307
308    /// Resolve a target source to an RPC target
309    async fn resolve_target(
310        &self,
311        source: &Source,
312        context: &ExecutionContext,
313    ) -> Result<Arc<dyn RpcTarget>, PlanExecutionError> {
314        match source {
315            Source::Capture { capture } => context.get_capability(capture.index),
316            Source::Result { result: _ } => {
317                // Check if the result is a capability reference
318                let value = context.get_source_value(source).await?;
319                if let Value::Object(obj) = value {
320                    if let Some(cap_ref) = obj.get("$cap") {
321                        if let Value::Number(n) = cap_ref.as_ref() {
322                            if let Some(cap_index) = n.as_u64() {
323                                return context.get_capability(cap_index as u32);
324                            }
325                        }
326                    }
327                }
328                Err(PlanExecutionError::InvalidTarget(
329                    "Result is not a capability".to_string(),
330                ))
331            }
332            _ => Err(PlanExecutionError::InvalidTarget(
333                "Source cannot be used as a target".to_string(),
334            )),
335        }
336    }
337}
338
339/// Errors that can occur during plan execution
340#[derive(Debug, thiserror::Error)]
341pub enum PlanExecutionError {
342    #[error("Validation error: {0}")]
343    ValidationError(String),
344
345    #[error("Invalid capture index: {0}")]
346    InvalidCaptureIndex(u32),
347
348    #[error("Invalid result index: {0}")]
349    InvalidResultIndex(u32),
350
351    #[error("Result not set: {0}")]
352    ResultNotSet(u32),
353
354    #[error("Parameter not found: {0}")]
355    ParameterNotFound(String),
356
357    #[error("Parameter is not an object: {0}")]
358    ParameterNotObject(String),
359
360    #[error("RPC call failed: {0}")]
361    RpcCallFailed(RpcError),
362
363    #[error("Invalid target: {0}")]
364    InvalidTarget(String),
365
366    #[error("Execution timeout")]
367    ExecutionTimeout,
368
369    #[error("Too many operations: {0}")]
370    TooManyOperations(usize),
371
372    #[error("Plan execution error: {0}")]
373    ExecutionError(String),
374}
375
376/// Advanced plan builder for creating complex IL plans
377#[derive(Debug)]
378pub struct PlanBuilder {
379    captures: Vec<CapId>,
380    ops: Vec<Op>,
381    next_result_index: u32,
382}
383
384impl PlanBuilder {
385    /// Create a new plan builder
386    pub fn new() -> Self {
387        Self {
388            captures: Vec::new(),
389            ops: Vec::new(),
390            next_result_index: 0,
391        }
392    }
393
394    /// Add a capture to the plan
395    pub fn add_capture(&mut self, cap_id: CapId) -> u32 {
396        let index = self.captures.len() as u32;
397        self.captures.push(cap_id);
398        index
399    }
400
401    /// Add a call operation
402    pub fn add_call(&mut self, target: Source, method: String, args: Vec<Source>) -> u32 {
403        let result_index = self.next_result_index;
404        self.next_result_index += 1;
405
406        let op = Op::call(target, method, args, result_index);
407        self.ops.push(op);
408
409        result_index
410    }
411
412    /// Add an object construction operation
413    pub fn add_object(&mut self, fields: HashMap<String, Source>) -> u32 {
414        let result_index = self.next_result_index;
415        self.next_result_index += 1;
416
417        let op = Op::object(fields.into_iter().collect(), result_index);
418        self.ops.push(op);
419
420        result_index
421    }
422
423    /// Add an array construction operation
424    pub fn add_array(&mut self, items: Vec<Source>) -> u32 {
425        let result_index = self.next_result_index;
426        self.next_result_index += 1;
427
428        let op = Op::array(items, result_index);
429        self.ops.push(op);
430
431        result_index
432    }
433
434    /// Build the final plan
435    pub fn build(self, result_source: Source) -> Plan {
436        Plan::new(self.captures, self.ops, result_source)
437    }
438}
439
440impl Default for PlanBuilder {
441    fn default() -> Self {
442        Self::new()
443    }
444}
445
446/// Plan optimization utilities
447pub struct PlanOptimizer;
448
449impl PlanOptimizer {
450    /// Optimize a plan by removing unused operations
451    pub fn optimize(plan: Plan) -> Plan {
452        // For now, just return the plan as-is
453        // Future optimizations could include:
454        // - Dead code elimination
455        // - Constant folding
456        // - Operation reordering
457        // - Parallel execution planning
458        plan
459    }
460
461    /// Analyze plan complexity
462    pub fn analyze_complexity(plan: &Plan) -> PlanComplexity {
463        let mut call_count = 0;
464        let mut object_count = 0;
465        let mut array_count = 0;
466        let mut max_depth = 0;
467        let mut total_args = 0;
468
469        for op in &plan.ops {
470            match op {
471                Op::Call { call } => {
472                    call_count += 1;
473                    total_args += call.args.len();
474                }
475                Op::Object { object } => {
476                    object_count += 1;
477                    max_depth = max_depth.max(object.fields.len());
478                }
479                Op::Array { array } => {
480                    array_count += 1;
481                    max_depth = max_depth.max(array.items.len());
482                }
483            }
484        }
485
486        PlanComplexity {
487            total_operations: plan.ops.len(),
488            call_operations: call_count,
489            object_operations: object_count,
490            array_operations: array_count,
491            max_depth,
492            total_arguments: total_args,
493            captures_count: plan.captures.len(),
494        }
495    }
496}
497
498/// Plan complexity metrics
499#[derive(Debug, Clone)]
500pub struct PlanComplexity {
501    pub total_operations: usize,
502    pub call_operations: usize,
503    pub object_operations: usize,
504    pub array_operations: usize,
505    pub max_depth: usize,
506    pub total_arguments: usize,
507    pub captures_count: usize,
508}
509
510#[cfg(test)]
511mod tests {
512    use super::*;
513    use crate::MockRpcTarget;
514    use serde_json::json;
515    use std::collections::BTreeMap;
516
517    // Helper to convert serde_json::Value to tables::Value
518    fn json_to_value(json: serde_json::Value) -> Value {
519        match json {
520            serde_json::Value::Null => Value::Null,
521            serde_json::Value::Bool(b) => Value::Bool(b),
522            serde_json::Value::Number(n) => Value::Number(n),
523            serde_json::Value::String(s) => Value::String(s),
524            serde_json::Value::Array(arr) => {
525                Value::Array(arr.into_iter().map(json_to_value).collect())
526            }
527            serde_json::Value::Object(obj) => {
528                let mut map = HashMap::new();
529                for (k, v) in obj {
530                    map.insert(k, Box::new(json_to_value(v)));
531                }
532                Value::Object(map)
533            }
534        }
535    }
536
537    #[tokio::test]
538    async fn test_plan_runner_simple_call() {
539        let imports = Arc::new(ImportTable::with_default_allocator());
540        let exports = Arc::new(ExportTable::with_default_allocator());
541        let runner = PlanRunner::new(imports, exports);
542
543        let mock_target = Arc::new(MockRpcTarget::new());
544        let _captures = [mock_target];
545
546        let plan = Plan::new(
547            vec![CapId::new(1)],
548            vec![Op::call(
549                Source::capture(0),
550                "test_method".to_string(),
551                vec![Source::by_value(json!("arg1"))],
552                0,
553            )],
554            Source::result(0),
555        );
556
557        let parameters = json_to_value(json!({}));
558        let captures: Vec<Arc<dyn RpcTarget>> = vec![Arc::new(MockRpcTarget::new())];
559        let result = runner.execute_plan(&plan, parameters, captures).await;
560
561        assert!(result.is_ok());
562    }
563
564    #[tokio::test]
565    async fn test_plan_builder() {
566        let mut builder = PlanBuilder::new();
567
568        let cap_index = builder.add_capture(CapId::new(1));
569        let call_result =
570            builder.add_call(Source::capture(cap_index), "getData".to_string(), vec![]);
571
572        let mut fields = HashMap::new();
573        fields.insert("data".to_string(), Source::result(call_result));
574        fields.insert("extra".to_string(), Source::by_value(json!("info")));
575        let obj_result = builder.add_object(fields);
576
577        let plan = builder.build(Source::result(obj_result));
578
579        assert!(plan.validate().is_ok());
580        assert_eq!(plan.captures.len(), 1);
581        assert_eq!(plan.ops.len(), 2);
582    }
583
584    #[tokio::test]
585    async fn test_execution_context_parameters() {
586        let params = json_to_value(json!({
587            "user": {
588                "name": "Alice",
589                "id": 123
590            },
591            "settings": {
592                "theme": "dark"
593            }
594        }));
595
596        let context = ExecutionContext::new(params, vec![]);
597
598        // Test nested parameter access
599        let name = context.get_nested_parameter(&["user".to_string(), "name".to_string()]);
600        assert!(name.is_ok());
601        match name.expect("Should get name") {
602            Value::String(s) => assert_eq!(s, "Alice"),
603            _ => panic!("Expected string value for name"),
604        }
605
606        let theme = context.get_nested_parameter(&["settings".to_string(), "theme".to_string()]);
607        assert!(theme.is_ok());
608        match theme.expect("Should get theme") {
609            Value::String(s) => assert_eq!(s, "dark"),
610            _ => panic!("Expected string value for theme"),
611        }
612    }
613
614    #[tokio::test]
615    async fn test_plan_complexity_analysis() {
616        let plan = Plan::new(
617            vec![CapId::new(1), CapId::new(2)],
618            vec![
619                Op::call(
620                    Source::capture(0),
621                    "method1".to_string(),
622                    vec![
623                        Source::by_value(json!("arg1")),
624                        Source::by_value(json!("arg2")),
625                    ],
626                    0,
627                ),
628                Op::object(
629                    BTreeMap::from([
630                        ("field1".to_string(), Source::result(0)),
631                        ("field2".to_string(), Source::capture(1)),
632                    ]),
633                    1,
634                ),
635                Op::array(vec![Source::result(1), Source::by_value(json!(42))], 2),
636            ],
637            Source::result(2),
638        );
639
640        let complexity = PlanOptimizer::analyze_complexity(&plan);
641
642        assert_eq!(complexity.total_operations, 3);
643        assert_eq!(complexity.call_operations, 1);
644        assert_eq!(complexity.object_operations, 1);
645        assert_eq!(complexity.array_operations, 1);
646        assert_eq!(complexity.total_arguments, 2);
647        assert_eq!(complexity.captures_count, 2);
648    }
649
650    #[tokio::test]
651    async fn test_execution_timeout() {
652        let imports = Arc::new(ImportTable::with_default_allocator());
653        let exports = Arc::new(ExportTable::with_default_allocator());
654        let runner = PlanRunner::with_settings(imports, exports, 10, 1000); // 10ms timeout
655
656        // Create a mock target that takes a long time
657        let mock_target = Arc::new(MockRpcTarget::new());
658        let _captures = [mock_target];
659
660        let plan = Plan::new(
661            vec![CapId::new(1)],
662            vec![Op::call(
663                Source::capture(0),
664                "slow_method".to_string(),
665                vec![],
666                0,
667            )],
668            Source::result(0),
669        );
670
671        let parameters = json_to_value(json!({}));
672        let captures: Vec<Arc<dyn RpcTarget>> = vec![Arc::new(MockRpcTarget::new())];
673        let _result = runner.execute_plan(&plan, parameters, captures).await;
674
675        // This test might not fail with the mock target, but demonstrates the timeout structure
676        // In a real scenario with a slow RPC target, this would timeout
677    }
678}