capnweb_core/protocol/
pipeline.rs1use super::expression::{Expression, PropertyKey};
5use super::ids::ImportId;
6use super::tables::Value;
7use dashmap::DashMap;
8use std::collections::VecDeque;
9
10pub struct PipelineManager {
12 pipelines: DashMap<ImportId, VecDeque<PipelineOperation>>,
14}
15
16impl PipelineManager {
17 pub fn new() -> Self {
19 Self {
20 pipelines: DashMap::new(),
21 }
22 }
23
24 pub fn add_pipeline_operation(
26 &self,
27 promise_id: ImportId,
28 operation: PipelineOperation,
29 ) -> ImportId {
30 let result_id = operation.result_id;
31
32 self.pipelines
33 .entry(promise_id)
34 .or_default()
35 .push_back(operation);
36
37 result_id
38 }
39
40 pub async fn resolve_promise(
42 &self,
43 promise_id: ImportId,
44 value: Value,
45 ) -> Result<Vec<(ImportId, Result<Value, PipelineError>)>, PipelineError> {
46 let mut results = Vec::new();
47
48 if let Some((_, operations)) = self.pipelines.remove(&promise_id) {
49 for operation in operations {
50 let result = self.execute_pipeline_operation(&value, &operation).await;
51 results.push((operation.result_id, result));
52 }
53 }
54
55 Ok(results)
56 }
57
58 async fn execute_pipeline_operation(
60 &self,
61 value: &Value,
62 operation: &PipelineOperation,
63 ) -> Result<Value, PipelineError> {
64 match operation.operation_type {
65 PipelineOperationType::PropertyAccess { ref path } => {
66 self.access_property_path(value, path).await
67 }
68 PipelineOperationType::MethodCall {
69 ref method,
70 ref args,
71 } => self.call_method(value, method, args).await,
72 }
73 }
74
75 async fn access_property_path(
77 &self,
78 mut current_value: &Value,
79 path: &[PropertyKey],
80 ) -> Result<Value, PipelineError> {
81 #[allow(unused_assignments)] let mut owned_value = None;
83
84 for key in path {
85 current_value = match current_value {
86 Value::Object(obj) => match key {
87 PropertyKey::String(key_str) => {
88 if let Some(boxed_val) = obj.get(key_str) {
89 boxed_val.as_ref()
90 } else {
91 return Err(PipelineError::PropertyNotFound(key_str.clone()));
92 }
93 }
94 PropertyKey::Number(_) => {
95 return Err(PipelineError::InvalidPropertyType);
96 }
97 },
98 Value::Array(arr) => match key {
99 PropertyKey::Number(index) => {
100 let idx = *index;
101 if let Some(val) = arr.get(idx) {
102 owned_value = Some(val.clone());
103 owned_value.as_ref().expect("owned_value was just assigned")
105 } else {
106 return Err(PipelineError::IndexOutOfBounds(idx));
107 }
108 }
109 PropertyKey::String(_) => {
110 return Err(PipelineError::InvalidPropertyType);
111 }
112 },
113 _ => {
114 return Err(PipelineError::CannotAccessProperty);
115 }
116 };
117 }
118
119 Ok(current_value.clone())
120 }
121
122 async fn call_method(
124 &self,
125 _value: &Value,
126 _method: &str,
127 _args: &Expression,
128 ) -> Result<Value, PipelineError> {
129 Err(PipelineError::MethodCallNotImplemented)
132 }
133}
134
135impl Default for PipelineManager {
136 fn default() -> Self {
137 Self::new()
138 }
139}
140
141#[derive(Debug, Clone)]
143pub struct PipelineOperation {
144 pub operation_type: PipelineOperationType,
145 pub result_id: ImportId,
146}
147
148#[derive(Debug, Clone)]
150pub enum PipelineOperationType {
151 PropertyAccess { path: Vec<PropertyKey> },
153 MethodCall { method: String, args: Expression },
155}
156
157#[derive(Debug, thiserror::Error)]
158pub enum PipelineError {
159 #[error("Property not found: {0}")]
160 PropertyNotFound(String),
161
162 #[error("Index out of bounds: {0}")]
163 IndexOutOfBounds(usize),
164
165 #[error("Invalid property type for access")]
166 InvalidPropertyType,
167
168 #[error("Cannot access property on this value type")]
169 CannotAccessProperty,
170
171 #[error("Method call not implemented")]
172 MethodCallNotImplemented,
173
174 #[error("Pipeline execution failed")]
175 ExecutionFailed,
176}