use crate::fold_db_core::infrastructure::message_bus::atom_events::MutationContext;
use crate::schema::types::field::Field;
use crate::schema::types::field::FieldValue;
use crate::schema::types::field::HashRangeFilter;
use crate::schema::types::key_value::KeyValue;
use crate::schema::types::Transform;
use crate::schema::types::{Schema, SchemaError};
use std::collections::HashMap;
use std::sync::Arc;
pub struct InputFetcher;
impl InputFetcher {
pub async fn fetch_input_values_with_context(
transform: &Transform,
db_ops: &Arc<crate::db_operations::DbOperations>,
mutation_context: &Option<MutationContext>,
) -> Result<HashMap<String, HashMap<KeyValue, FieldValue>>, SchemaError> {
let mut input_values = HashMap::new();
let transform_schema = db_ops
.get_schema(transform.get_schema_name())
.await?
.ok_or_else(|| {
SchemaError::InvalidData(format!(
"Transform schema '{}' not found",
transform.get_schema_name()
))
})?;
let inputs_to_process = transform_schema.get_inputs();
for input_field in inputs_to_process {
if let Some(dot_pos) = input_field.find('.') {
let input_schema = &input_field[..dot_pos];
let input_field_name = &input_field[dot_pos + 1..];
let schema = db_ops.get_schema(input_schema).await?.ok_or_else(|| {
SchemaError::InvalidData(format!("Schema '{}' not found", input_schema))
})?;
let value = Self::fetch_input_for_field_with_context(
db_ops,
&mut schema.clone(),
input_field_name,
mutation_context,
)
.await?;
input_values.insert(input_schema.to_string() + "." + input_field_name, value);
} else {
let input_schema = input_field;
let schema = db_ops
.get_schema(input_schema.as_str())
.await?
.ok_or_else(|| {
SchemaError::InvalidData(format!("Schema '{}' not found", input_schema))
})?;
for field_name in schema.runtime_fields.keys() {
let value = Self::fetch_input_for_field_with_context(
db_ops,
&mut schema.clone(),
field_name,
mutation_context,
)
.await?;
let key = input_schema.to_string() + "." + field_name;
input_values.insert(key, value);
}
}
}
Ok(input_values)
}
async fn fetch_input_for_field_with_context(
db_ops: &Arc<crate::db_operations::DbOperations>,
schema: &mut Schema,
field_name: &str,
mutation_context: &Option<MutationContext>,
) -> Result<HashMap<KeyValue, FieldValue>, SchemaError> {
let key_value_opt = mutation_context
.as_ref()
.and_then(|ctx| ctx.key_value.clone());
if !schema.runtime_fields.contains_key(field_name) {
let available_fields: Vec<&String> = schema.runtime_fields.keys().collect();
return Err(SchemaError::InvalidData(format!(
"Field '{}' not found in schema '{}'. Available fields: {:?}",
field_name, schema.name, available_fields
)));
}
let field = schema.runtime_fields.get_mut(field_name).unwrap();
let filter = match key_value_opt {
Some(kv) => {
let hash_opt = kv.hash.clone();
let range_opt = kv.range.clone();
match (hash_opt, range_opt) {
(Some(hash), Some(range)) => {
Some(HashRangeFilter::HashRangeKey { hash, range })
}
(Some(hash), None) => Some(HashRangeFilter::HashKey(hash)),
(None, Some(prefix)) => Some(HashRangeFilter::RangePrefix(prefix)),
(None, None) => None,
}
}
None => None,
};
let value = field.resolve_value(db_ops, filter).await?;
Ok(value)
}
}