capnweb_core/protocol/
pipeline.rs

1// Promise pipelining support for Cap'n Web protocol
2// Implements the official pipelining feature per specification
3
4use super::expression::{Expression, PropertyKey};
5use super::ids::ImportId;
6use super::tables::Value;
7use dashmap::DashMap;
8use std::collections::VecDeque;
9
10/// Pipeline manager for promise pipelining as per Cap'n Web spec
11pub struct PipelineManager {
12    /// Pending pipeline operations for unresolved promises
13    pipelines: DashMap<ImportId, VecDeque<PipelineOperation>>,
14}
15
16impl PipelineManager {
17    /// Create a new pipeline manager
18    pub fn new() -> Self {
19        Self {
20            pipelines: DashMap::new(),
21        }
22    }
23
24    /// Register a pipelined operation on a promise (spec-compliant)
25    pub fn add_pipeline_operation(
26        &self,
27        promise_id: ImportId,
28        operation: PipelineOperation,
29    ) -> ImportId {
30        let result_id = operation.result_id;
31
32        self.pipelines
33            .entry(promise_id)
34            .or_default()
35            .push_back(operation);
36
37        result_id
38    }
39
40    /// Execute all pipelined operations when a promise resolves (spec-compliant)
41    pub async fn resolve_promise(
42        &self,
43        promise_id: ImportId,
44        value: Value,
45    ) -> Result<Vec<(ImportId, Result<Value, PipelineError>)>, PipelineError> {
46        let mut results = Vec::new();
47
48        if let Some((_, operations)) = self.pipelines.remove(&promise_id) {
49            for operation in operations {
50                let result = self.execute_pipeline_operation(&value, &operation).await;
51                results.push((operation.result_id, result));
52            }
53        }
54
55        Ok(results)
56    }
57
58    /// Execute a single pipeline operation (property access or method call)
59    async fn execute_pipeline_operation(
60        &self,
61        value: &Value,
62        operation: &PipelineOperation,
63    ) -> Result<Value, PipelineError> {
64        match operation.operation_type {
65            PipelineOperationType::PropertyAccess { ref path } => {
66                self.access_property_path(value, path).await
67            }
68            PipelineOperationType::MethodCall {
69                ref method,
70                ref args,
71            } => self.call_method(value, method, args).await,
72        }
73    }
74
75    /// Access a property path on a value (supports chained property access)
76    async fn access_property_path(
77        &self,
78        mut current_value: &Value,
79        path: &[PropertyKey],
80    ) -> Result<Value, PipelineError> {
81        #[allow(unused_assignments)] // Used to extend lifetime of cloned values
82        let mut owned_value = None;
83
84        for key in path {
85            current_value = match current_value {
86                Value::Object(obj) => match key {
87                    PropertyKey::String(key_str) => {
88                        if let Some(boxed_val) = obj.get(key_str) {
89                            boxed_val.as_ref()
90                        } else {
91                            return Err(PipelineError::PropertyNotFound(key_str.clone()));
92                        }
93                    }
94                    PropertyKey::Number(_) => {
95                        return Err(PipelineError::InvalidPropertyType);
96                    }
97                },
98                Value::Array(arr) => match key {
99                    PropertyKey::Number(index) => {
100                        let idx = *index;
101                        if let Some(val) = arr.get(idx) {
102                            owned_value = Some(val.clone());
103                            // Safe: we just assigned Some(val.clone()) above
104                            owned_value.as_ref().expect("owned_value was just assigned")
105                        } else {
106                            return Err(PipelineError::IndexOutOfBounds(idx));
107                        }
108                    }
109                    PropertyKey::String(_) => {
110                        return Err(PipelineError::InvalidPropertyType);
111                    }
112                },
113                _ => {
114                    return Err(PipelineError::CannotAccessProperty);
115                }
116            };
117        }
118
119        Ok(current_value.clone())
120    }
121
122    /// Call a method on a value (basic implementation)
123    async fn call_method(
124        &self,
125        _value: &Value,
126        _method: &str,
127        _args: &Expression,
128    ) -> Result<Value, PipelineError> {
129        // Method calls on values require RPC target resolution
130        // This would need integration with the capability system
131        Err(PipelineError::MethodCallNotImplemented)
132    }
133}
134
135impl Default for PipelineManager {
136    fn default() -> Self {
137        Self::new()
138    }
139}
140
141/// A pipeline operation as per Cap'n Web specification
142#[derive(Debug, Clone)]
143pub struct PipelineOperation {
144    pub operation_type: PipelineOperationType,
145    pub result_id: ImportId,
146}
147
148/// Types of pipeline operations supported by Cap'n Web
149#[derive(Debug, Clone)]
150pub enum PipelineOperationType {
151    /// Property access with path (e.g., obj.foo.bar)
152    PropertyAccess { path: Vec<PropertyKey> },
153    /// Method call with arguments
154    MethodCall { method: String, args: Expression },
155}
156
157#[derive(Debug, thiserror::Error)]
158pub enum PipelineError {
159    #[error("Property not found: {0}")]
160    PropertyNotFound(String),
161
162    #[error("Index out of bounds: {0}")]
163    IndexOutOfBounds(usize),
164
165    #[error("Invalid property type for access")]
166    InvalidPropertyType,
167
168    #[error("Cannot access property on this value type")]
169    CannotAccessProperty,
170
171    #[error("Method call not implemented")]
172    MethodCallNotImplemented,
173
174    #[error("Pipeline execution failed")]
175    ExecutionFailed,
176}