use crate::nodes::comparison::common::{compare_equal, compare_less_than};
use crate::nodes::filter_node;
use crate::nodes::map_node;
use std::any::Any;
use std::sync::Arc;
pub fn array_length(v: &Arc<dyn Any + Send + Sync>) -> Result<Arc<dyn Any + Send + Sync>, String> {
let arc_vec = v
.clone()
.downcast::<Vec<Arc<dyn Any + Send + Sync>>>()
.map_err(|_| {
format!(
"Unsupported type for array length input: {} (input must be Vec<Arc<dyn Any + Send + Sync>>)",
std::any::type_name_of_val(&**v)
)
})?;
let length = arc_vec.len();
Ok(Arc::new(length) as Arc<dyn Any + Send + Sync>)
}
pub fn array_index(
v: &Arc<dyn Any + Send + Sync>,
index: &Arc<dyn Any + Send + Sync>,
) -> Result<Arc<dyn Any + Send + Sync>, String> {
let arc_vec = v
.clone()
.downcast::<Vec<Arc<dyn Any + Send + Sync>>>()
.map_err(|_| {
format!(
"Unsupported type for array index input: {} (input must be Vec<Arc<dyn Any + Send + Sync>>)",
std::any::type_name_of_val(&**v)
)
})?;
let get_usize = |val: &Arc<dyn Any + Send + Sync>, name: &str| -> Result<usize, String> {
if let Ok(arc_usize) = val.clone().downcast::<usize>() {
Ok(*arc_usize)
} else if let Ok(arc_i32) = val.clone().downcast::<i32>() {
if *arc_i32 < 0 {
return Err(format!("{} cannot be negative", name));
}
(*arc_i32)
.try_into()
.map_err(|_| format!("{} too large", name))
} else if let Ok(arc_i64) = val.clone().downcast::<i64>() {
if *arc_i64 < 0 {
return Err(format!("{} cannot be negative", name));
}
(*arc_i64)
.try_into()
.map_err(|_| format!("{} too large", name))
} else if let Ok(arc_u32) = val.clone().downcast::<u32>() {
(*arc_u32)
.try_into()
.map_err(|_| format!("{} too large", name))
} else if let Ok(arc_u64) = val.clone().downcast::<u64>() {
(*arc_u64)
.try_into()
.map_err(|_| format!("{} too large", name))
} else {
Err(format!(
"Unsupported type for {}: {} (must be numeric)",
name,
std::any::type_name_of_val(&**val)
))
}
};
let idx = get_usize(index, "index")?;
if idx >= arc_vec.len() {
return Err(format!(
"Index {} out of bounds for array of length {}",
idx,
arc_vec.len()
));
}
Ok(arc_vec[idx].clone())
}
pub fn array_slice(
v: &Arc<dyn Any + Send + Sync>,
start: &Arc<dyn Any + Send + Sync>,
end: &Arc<dyn Any + Send + Sync>,
) -> Result<Arc<dyn Any + Send + Sync>, String> {
let arc_vec = v
.clone()
.downcast::<Vec<Arc<dyn Any + Send + Sync>>>()
.map_err(|_| {
format!(
"Unsupported type for array slice input: {} (input must be Vec<Arc<dyn Any + Send + Sync>>)",
std::any::type_name_of_val(&**v)
)
})?;
let get_usize = |val: &Arc<dyn Any + Send + Sync>, name: &str| -> Result<usize, String> {
if let Ok(arc_usize) = val.clone().downcast::<usize>() {
Ok(*arc_usize)
} else if let Ok(arc_i32) = val.clone().downcast::<i32>() {
if *arc_i32 < 0 {
return Err(format!("{} cannot be negative", name));
}
(*arc_i32)
.try_into()
.map_err(|_| format!("{} too large", name))
} else if let Ok(arc_i64) = val.clone().downcast::<i64>() {
if *arc_i64 < 0 {
return Err(format!("{} cannot be negative", name));
}
(*arc_i64)
.try_into()
.map_err(|_| format!("{} too large", name))
} else if let Ok(arc_u32) = val.clone().downcast::<u32>() {
(*arc_u32)
.try_into()
.map_err(|_| format!("{} too large", name))
} else if let Ok(arc_u64) = val.clone().downcast::<u64>() {
(*arc_u64)
.try_into()
.map_err(|_| format!("{} too large", name))
} else {
Err(format!(
"Unsupported type for {}: {} (must be numeric)",
name,
std::any::type_name_of_val(&**val)
))
}
};
let start_idx = get_usize(start, "start")?;
let end_idx = get_usize(end, "end")?;
if start_idx > end_idx {
return Err(format!(
"Invalid slice range: start index {} is greater than end index {}",
start_idx, end_idx
));
}
let array_len = arc_vec.len();
let clamped_start = start_idx.min(array_len);
let clamped_end = end_idx.min(array_len);
let result: Vec<Arc<dyn Any + Send + Sync>> = arc_vec[clamped_start..clamped_end].to_vec();
Ok(Arc::new(result) as Arc<dyn Any + Send + Sync>)
}
pub fn array_contains(
v: &Arc<dyn Any + Send + Sync>,
value: &Arc<dyn Any + Send + Sync>,
) -> Result<Arc<dyn Any + Send + Sync>, String> {
let arc_vec = v
.clone()
.downcast::<Vec<Arc<dyn Any + Send + Sync>>>()
.map_err(|_| {
format!(
"Unsupported type for array contains input: {} (input must be Vec<Arc<dyn Any + Send + Sync>>)",
std::any::type_name_of_val(&**v)
)
})?;
for element in arc_vec.iter() {
match compare_equal(element, value) {
Ok(result_arc) => {
if let Ok(arc_bool) = result_arc.downcast::<bool>()
&& *arc_bool
{
return Ok(Arc::new(true) as Arc<dyn Any + Send + Sync>);
}
}
Err(_) => {
continue;
}
}
}
Ok(Arc::new(false) as Arc<dyn Any + Send + Sync>)
}
pub fn array_index_of(
v: &Arc<dyn Any + Send + Sync>,
value: &Arc<dyn Any + Send + Sync>,
) -> Result<Arc<dyn Any + Send + Sync>, String> {
let arc_vec = v
.clone()
.downcast::<Vec<Arc<dyn Any + Send + Sync>>>()
.map_err(|_| {
format!(
"Unsupported type for array index_of input: {} (input must be Vec<Arc<dyn Any + Send + Sync>>)",
std::any::type_name_of_val(&**v)
)
})?;
for (index, element) in arc_vec.iter().enumerate() {
match compare_equal(element, value) {
Ok(result_arc) => {
if let Ok(arc_bool) = result_arc.downcast::<bool>()
&& *arc_bool
{
return Ok(Arc::new(index as i32) as Arc<dyn Any + Send + Sync>);
}
}
Err(_) => {
continue;
}
}
}
Ok(Arc::new(-1i32) as Arc<dyn Any + Send + Sync>)
}
pub fn array_concat(
v1: &Arc<dyn Any + Send + Sync>,
v2: &Arc<dyn Any + Send + Sync>,
) -> Result<Arc<dyn Any + Send + Sync>, String> {
let arc_vec1 = v1
.clone()
.downcast::<Vec<Arc<dyn Any + Send + Sync>>>()
.map_err(|_| {
format!(
"Unsupported type for array concat first input: {} (input must be Vec<Arc<dyn Any + Send + Sync>>)",
std::any::type_name_of_val(&**v1)
)
})?;
let arc_vec2 = v2
.clone()
.downcast::<Vec<Arc<dyn Any + Send + Sync>>>()
.map_err(|_| {
format!(
"Unsupported type for array concat second input: {} (input must be Vec<Arc<dyn Any + Send + Sync>>)",
std::any::type_name_of_val(&**v2)
)
})?;
let mut result: Vec<Arc<dyn Any + Send + Sync>> =
Vec::with_capacity(arc_vec1.len() + arc_vec2.len());
result.extend(arc_vec1.iter().cloned());
result.extend(arc_vec2.iter().cloned());
Ok(Arc::new(result) as Arc<dyn Any + Send + Sync>)
}
pub fn array_reverse(v: &Arc<dyn Any + Send + Sync>) -> Result<Arc<dyn Any + Send + Sync>, String> {
let arc_vec = v
.clone()
.downcast::<Vec<Arc<dyn Any + Send + Sync>>>()
.map_err(|_| {
format!(
"Unsupported type for array reverse input: {} (input must be Vec<Arc<dyn Any + Send + Sync>>)",
std::any::type_name_of_val(&**v)
)
})?;
let mut result: Vec<Arc<dyn Any + Send + Sync>> = arc_vec.iter().cloned().collect();
result.reverse();
Ok(Arc::new(result) as Arc<dyn Any + Send + Sync>)
}
pub fn array_sort(
v: &Arc<dyn Any + Send + Sync>,
ascending: bool,
) -> Result<Arc<dyn Any + Send + Sync>, String> {
let arc_vec = v
.clone()
.downcast::<Vec<Arc<dyn Any + Send + Sync>>>()
.map_err(|_| {
format!(
"Unsupported type for array sort input: {} (input must be Vec<Arc<dyn Any + Send + Sync>>)",
std::any::type_name_of_val(&**v)
)
})?;
let mut result: Vec<Arc<dyn Any + Send + Sync>> = arc_vec.iter().cloned().collect();
result.sort_by(|a, b| {
if let (Ok(arc_str1), Ok(arc_str2)) = (
a.clone().downcast::<String>(),
b.clone().downcast::<String>(),
) {
return arc_str1.as_str().cmp(arc_str2.as_str());
}
match compare_less_than(a, b) {
Ok(result_arc) => {
if let Ok(arc_bool) = result_arc.downcast::<bool>()
&& *arc_bool
{
return std::cmp::Ordering::Less;
}
if let Ok(result_arc2) = compare_less_than(b, a)
&& let Ok(arc_bool2) = result_arc2.downcast::<bool>()
&& *arc_bool2
{
return std::cmp::Ordering::Greater;
}
std::cmp::Ordering::Equal
}
Err(_) => {
std::cmp::Ordering::Equal
}
}
});
if !ascending {
result.reverse();
}
Ok(Arc::new(result) as Arc<dyn Any + Send + Sync>)
}
pub async fn array_filter(
v: &Arc<dyn Any + Send + Sync>,
predicate: &filter_node::FilterConfig,
) -> Result<Arc<dyn Any + Send + Sync>, String> {
let arc_vec = v
.clone()
.downcast::<Vec<Arc<dyn Any + Send + Sync>>>()
.map_err(|_| {
format!(
"Unsupported type for array filter input: {} (input must be Vec<Arc<dyn Any + Send + Sync>>)",
std::any::type_name_of_val(&**v)
)
})?;
let mut result: Vec<Arc<dyn Any + Send + Sync>> = Vec::new();
for element in arc_vec.iter() {
match predicate.apply(element.clone()).await {
Ok(true) => {
result.push(element.clone());
}
Ok(false) => {
}
Err(e) => {
return Err(format!("Predicate error: {}", e));
}
}
}
Ok(Arc::new(result) as Arc<dyn Any + Send + Sync>)
}
pub async fn array_map(
v: &Arc<dyn Any + Send + Sync>,
map_fn: &map_node::MapConfig,
) -> Result<Arc<dyn Any + Send + Sync>, String> {
let arc_vec = v
.clone()
.downcast::<Vec<Arc<dyn Any + Send + Sync>>>()
.map_err(|_| {
format!(
"Unsupported type for array map input: {} (input must be Vec<Arc<dyn Any + Send + Sync>>)",
std::any::type_name_of_val(&**v)
)
})?;
let mut result: Vec<Arc<dyn Any + Send + Sync>> = Vec::with_capacity(arc_vec.len());
for element in arc_vec.iter() {
match map_fn.apply(element.clone()).await {
Ok(transformed) => {
result.push(transformed);
}
Err(e) => {
return Err(format!("Map function error: {}", e));
}
}
}
Ok(Arc::new(result) as Arc<dyn Any + Send + Sync>)
}
pub fn array_join(
v: &Arc<dyn Any + Send + Sync>,
delimiter: &Arc<dyn Any + Send + Sync>,
) -> Result<Arc<dyn Any + Send + Sync>, String> {
let arc_vec = v
.clone()
.downcast::<Vec<Arc<dyn Any + Send + Sync>>>()
.map_err(|_| {
format!(
"Unsupported type for array join input: {} (input must be Vec<Arc<dyn Any + Send + Sync>>)",
std::any::type_name_of_val(&**v)
)
})?;
let arc_delimiter = delimiter.clone().downcast::<String>().map_err(|_| {
format!(
"Unsupported type for delimiter: {} (delimiter must be String)",
std::any::type_name_of_val(&**delimiter)
)
})?;
let mut parts = Vec::new();
for (idx, item) in arc_vec.iter().enumerate() {
let str_repr = if let Ok(arc_str) = item.clone().downcast::<String>() {
(*arc_str).clone()
} else if let Ok(arc_i32) = item.clone().downcast::<i32>() {
arc_i32.to_string()
} else if let Ok(arc_i64) = item.clone().downcast::<i64>() {
arc_i64.to_string()
} else if let Ok(arc_u32) = item.clone().downcast::<u32>() {
arc_u32.to_string()
} else if let Ok(arc_u64) = item.clone().downcast::<u64>() {
arc_u64.to_string()
} else if let Ok(arc_f32) = item.clone().downcast::<f32>() {
arc_f32.to_string()
} else if let Ok(arc_f64) = item.clone().downcast::<f64>() {
arc_f64.to_string()
} else if let Ok(arc_bool) = item.clone().downcast::<bool>() {
arc_bool.to_string()
} else {
return Err(format!(
"Unsupported type for array element at index {}: {} (element must be convertible to string)",
idx,
std::any::type_name_of_val(&**item)
));
};
parts.push(str_repr);
}
let result = parts.join(&*arc_delimiter);
Ok(Arc::new(result) as Arc<dyn Any + Send + Sync>)
}
pub fn array_split(
v: &Arc<dyn Any + Send + Sync>,
chunk_size: &Arc<dyn Any + Send + Sync>,
) -> Result<Arc<dyn Any + Send + Sync>, String> {
let arc_vec = v
.clone()
.downcast::<Vec<Arc<dyn Any + Send + Sync>>>()
.map_err(|_| {
format!(
"Unsupported type for array split input: {} (input must be Vec<Arc<dyn Any + Send + Sync>>)",
std::any::type_name_of_val(&**v)
)
})?;
let get_usize = |val: &Arc<dyn Any + Send + Sync>, name: &str| -> Result<usize, String> {
if let Ok(arc_i32) = val.clone().downcast::<i32>() {
let i = *arc_i32;
if i < 0 {
return Err(format!("{} must be non-negative, got {}", name, i));
}
Ok(i as usize)
} else if let Ok(arc_i64) = val.clone().downcast::<i64>() {
let i = *arc_i64;
if i < 0 {
return Err(format!("{} must be non-negative, got {}", name, i));
}
if i > usize::MAX as i64 {
return Err(format!("{} overflow: {} exceeds usize::MAX", name, i));
}
Ok(i as usize)
} else if let Ok(arc_u32) = val.clone().downcast::<u32>() {
Ok(*arc_u32 as usize)
} else if let Ok(arc_u64) = val.clone().downcast::<u64>() {
let u = *arc_u64;
if u > usize::MAX as u64 {
return Err(format!("{} overflow: {} exceeds usize::MAX", name, u));
}
Ok(u as usize)
} else {
Err(format!(
"Unsupported type for {}: {} (must be i32, i64, u32, or u64)",
name,
std::any::type_name_of_val(&**val)
))
}
};
let size = get_usize(chunk_size, "chunk_size")?;
if size == 0 {
return Err("chunk_size must be greater than 0".to_string());
}
let mut result: Vec<Arc<dyn Any + Send + Sync>> = Vec::new();
for chunk in arc_vec.chunks(size) {
let chunk_vec: Vec<Arc<dyn Any + Send + Sync>> = chunk.to_vec();
result.push(Arc::new(chunk_vec) as Arc<dyn Any + Send + Sync>);
}
Ok(Arc::new(result) as Arc<dyn Any + Send + Sync>)
}
pub fn array_flatten(v: &Arc<dyn Any + Send + Sync>) -> Result<Arc<dyn Any + Send + Sync>, String> {
let arc_vec = v
.clone()
.downcast::<Vec<Arc<dyn Any + Send + Sync>>>()
.map_err(|_| {
format!(
"Unsupported type for array flatten input: {} (input must be Vec<Arc<dyn Any + Send + Sync>>)",
std::any::type_name_of_val(&**v)
)
})?;
let mut result: Vec<Arc<dyn Any + Send + Sync>> = Vec::new();
for item in arc_vec.iter() {
if let Ok(nested_array) = item.clone().downcast::<Vec<Arc<dyn Any + Send + Sync>>>() {
result.extend(nested_array.iter().cloned());
} else {
result.push(item.clone());
}
}
Ok(Arc::new(result) as Arc<dyn Any + Send + Sync>)
}
pub fn array_unique(v: &Arc<dyn Any + Send + Sync>) -> Result<Arc<dyn Any + Send + Sync>, String> {
let arc_vec = v
.clone()
.downcast::<Vec<Arc<dyn Any + Send + Sync>>>()
.map_err(|_| {
format!(
"Unsupported type for array unique input: {} (input must be Vec<Arc<dyn Any + Send + Sync>>)",
std::any::type_name_of_val(&**v)
)
})?;
let mut result: Vec<Arc<dyn Any + Send + Sync>> = Vec::new();
for element in arc_vec.iter() {
let mut is_duplicate = false;
for existing in result.iter() {
match compare_equal(element, existing) {
Ok(result_arc) => {
if let Ok(arc_bool) = result_arc.downcast::<bool>()
&& *arc_bool
{
is_duplicate = true;
break;
}
}
Err(_) => {
continue;
}
}
}
if !is_duplicate {
result.push(element.clone());
}
}
Ok(Arc::new(result) as Arc<dyn Any + Send + Sync>)
}