use super::expression::{Expression, PropertyKey};
use super::ids::ImportId;
use super::tables::Value;
use dashmap::DashMap;
use std::collections::VecDeque;
pub struct PipelineManager {
pipelines: DashMap<ImportId, VecDeque<PipelineOperation>>,
}
impl PipelineManager {
pub fn new() -> Self {
Self {
pipelines: DashMap::new(),
}
}
pub fn add_pipeline_operation(
&self,
promise_id: ImportId,
operation: PipelineOperation,
) -> ImportId {
let result_id = operation.result_id;
self.pipelines
.entry(promise_id)
.or_default()
.push_back(operation);
result_id
}
pub async fn resolve_promise(
&self,
promise_id: ImportId,
value: Value,
) -> Result<Vec<(ImportId, Result<Value, PipelineError>)>, PipelineError> {
let mut results = Vec::new();
if let Some((_, operations)) = self.pipelines.remove(&promise_id) {
for operation in operations {
let result = self.execute_pipeline_operation(&value, &operation).await;
results.push((operation.result_id, result));
}
}
Ok(results)
}
async fn execute_pipeline_operation(
&self,
value: &Value,
operation: &PipelineOperation,
) -> Result<Value, PipelineError> {
match operation.operation_type {
PipelineOperationType::PropertyAccess { ref path } => {
self.access_property_path(value, path).await
}
PipelineOperationType::MethodCall {
ref method,
ref args,
} => self.call_method(value, method, args).await,
}
}
async fn access_property_path(
&self,
mut current_value: &Value,
path: &[PropertyKey],
) -> Result<Value, PipelineError> {
#[allow(unused_assignments)] let mut owned_value = None;
for key in path {
current_value = match current_value {
Value::Object(obj) => match key {
PropertyKey::String(key_str) => {
if let Some(boxed_val) = obj.get(key_str) {
boxed_val.as_ref()
} else {
return Err(PipelineError::PropertyNotFound(key_str.clone()));
}
}
PropertyKey::Number(_) => {
return Err(PipelineError::InvalidPropertyType);
}
},
Value::Array(arr) => match key {
PropertyKey::Number(index) => {
let idx = *index;
if let Some(val) = arr.get(idx) {
owned_value = Some(val.clone());
owned_value.as_ref().expect("owned_value was just assigned")
} else {
return Err(PipelineError::IndexOutOfBounds(idx));
}
}
PropertyKey::String(_) => {
return Err(PipelineError::InvalidPropertyType);
}
},
_ => {
return Err(PipelineError::CannotAccessProperty);
}
};
}
Ok(current_value.clone())
}
async fn call_method(
&self,
_value: &Value,
_method: &str,
_args: &Expression,
) -> Result<Value, PipelineError> {
Err(PipelineError::MethodCallNotImplemented)
}
}
impl Default for PipelineManager {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone)]
pub struct PipelineOperation {
pub operation_type: PipelineOperationType,
pub result_id: ImportId,
}
#[derive(Debug, Clone)]
pub enum PipelineOperationType {
PropertyAccess { path: Vec<PropertyKey> },
MethodCall { method: String, args: Expression },
}
#[derive(Debug, thiserror::Error)]
pub enum PipelineError {
#[error("Property not found: {0}")]
PropertyNotFound(String),
#[error("Index out of bounds: {0}")]
IndexOutOfBounds(usize),
#[error("Invalid property type for access")]
InvalidPropertyType,
#[error("Cannot access property on this value type")]
CannotAccessProperty,
#[error("Method call not implemented")]
MethodCallNotImplemented,
#[error("Pipeline execution failed")]
ExecutionFailed,
}