use crate::{Actor, ActorBehavior, Message, Port};
use anyhow::{Error, Result};
use reflow_actor::{message::EncodableValue, ActorContext, MemoryState};
use reflow_actor_macro::actor;
use serde_json::json;
use std::collections::HashMap;
use std::sync::Arc;
#[actor(MapActor, inports::<10>(input), outports::<1>(output, error), state(MemoryState))]
pub async fn map_actor(ctx: ActorContext) -> Result<HashMap<String, Message>, Error> {
let payload = ctx.get_payload();
let config = ctx.get_config_hashmap();
let operation = config
.get("operation")
.and_then(|v| v.as_str())
.unwrap_or("identity");
let field = config.get("field").and_then(|v| v.as_str()).unwrap_or("");
let items = match payload.get("input") {
Some(Message::Array(arr)) => arr.as_ref().clone(),
_ => return Ok(error_output("Expected Array on input port")),
};
let mapped: Vec<EncodableValue> = items
.iter()
.map(|item| {
let val: serde_json::Value = item.clone().into();
let result = match operation {
"to_string" => json!(val.to_string()),
"to_number" => json!(val.as_f64().unwrap_or(0.0)),
"extract_field" => val.get(field).cloned().unwrap_or(serde_json::Value::Null),
_ => val,
};
EncodableValue::from(result)
})
.collect();
let mut out = HashMap::new();
out.insert("output".to_string(), Message::Array(Arc::new(mapped)));
Ok(out)
}
#[actor(FilterActor, inports::<10>(input), outports::<1>(output, rejected, error), state(MemoryState))]
pub async fn filter_actor(ctx: ActorContext) -> Result<HashMap<String, Message>, Error> {
let payload = ctx.get_payload();
let config = ctx.get_config_hashmap();
let condition = config
.get("condition")
.and_then(|v| v.as_str())
.unwrap_or("truthy");
let field = config.get("field").and_then(|v| v.as_str()).unwrap_or("");
let compare_value = config.get("value").cloned();
let items = match payload.get("input") {
Some(Message::Array(arr)) => arr.as_ref().clone(),
_ => return Ok(error_output("Expected Array on input port")),
};
let mut passed = Vec::new();
let mut rejected = Vec::new();
for item in &items {
let val: serde_json::Value = item.clone().into();
let test_val = if field.is_empty() {
&val
} else {
val.get(field).unwrap_or(&val)
};
let keep = match condition {
"not_null" => !test_val.is_null(),
"equals" => compare_value.as_ref().map_or(false, |cv| test_val == cv),
"greater_than" => {
let a = test_val.as_f64().unwrap_or(0.0);
let b = compare_value.as_ref().and_then(|v| v.as_f64()).unwrap_or(0.0);
a > b
}
"less_than" => {
let a = test_val.as_f64().unwrap_or(0.0);
let b = compare_value.as_ref().and_then(|v| v.as_f64()).unwrap_or(0.0);
a < b
}
"contains" => {
let s = test_val.as_str().unwrap_or("");
let sub = compare_value.as_ref().and_then(|v| v.as_str()).unwrap_or("");
s.contains(sub)
}
_ => match test_val {
serde_json::Value::Null => false,
serde_json::Value::Bool(b) => *b,
serde_json::Value::Number(n) => n.as_f64().unwrap_or(0.0) != 0.0,
serde_json::Value::String(s) => !s.is_empty(),
_ => true,
},
};
if keep {
passed.push(item.clone());
} else {
rejected.push(item.clone());
}
}
let mut out = HashMap::new();
out.insert("output".to_string(), Message::Array(Arc::new(passed)));
out.insert("rejected".to_string(), Message::Array(Arc::new(rejected)));
Ok(out)
}
#[actor(ReduceActor, inports::<10>(input), outports::<1>(output, error), state(MemoryState))]
pub async fn reduce_actor(ctx: ActorContext) -> Result<HashMap<String, Message>, Error> {
let payload = ctx.get_payload();
let config = ctx.get_config_hashmap();
let operation = config
.get("operation")
.and_then(|v| v.as_str())
.unwrap_or("sum");
let items = match payload.get("input") {
Some(Message::Array(arr)) => arr.as_ref().clone(),
_ => return Ok(error_output("Expected Array on input port")),
};
if items.is_empty() {
return Ok([("output".to_string(), Message::Float(0.0))].into());
}
let values: Vec<serde_json::Value> = items.iter().map(|i| i.clone().into()).collect();
let result = match operation {
"sum" => {
let s: f64 = values.iter().filter_map(|v| v.as_f64()).sum();
Message::Float(s)
}
"product" => {
let p: f64 = values.iter().filter_map(|v| v.as_f64()).product();
Message::Float(p)
}
"min" => {
let m = values
.iter()
.filter_map(|v| v.as_f64())
.fold(f64::MAX, f64::min);
Message::Float(m)
}
"max" => {
let m = values
.iter()
.filter_map(|v| v.as_f64())
.fold(f64::MIN, f64::max);
Message::Float(m)
}
"count" => Message::Integer(values.len() as i64),
"concat" => {
let s: String = values
.iter()
.filter_map(|v| v.as_str())
.collect::<Vec<_>>()
.join("");
Message::String(s.into())
}
"first" => {
let v = values.first().cloned().unwrap_or(serde_json::Value::Null);
Message::object(EncodableValue::from(v))
}
"last" => {
let v = values.last().cloned().unwrap_or(serde_json::Value::Null);
Message::object(EncodableValue::from(v))
}
_ => Message::Float(0.0),
};
Ok([("output".to_string(), result)].into())
}
#[actor(MergeActor, inports::<10>(a, b, c, d), outports::<1>(output), state(MemoryState))]
pub async fn merge_actor(ctx: ActorContext) -> Result<HashMap<String, Message>, Error> {
let payload = ctx.get_payload();
let config = ctx.get_config_hashmap();
let mode = config
.get("mode")
.and_then(|v| v.as_str())
.unwrap_or("object");
match mode {
"array" => {
let mut arr = Vec::new();
for port in ["a", "b", "c", "d"] {
if let Some(msg) = payload.get(port) {
let val = serde_json::to_value(msg).unwrap_or(serde_json::Value::Null);
arr.push(EncodableValue::from(val));
}
}
Ok([("output".to_string(), Message::Array(Arc::new(arr)))].into())
}
_ => {
let mut obj = serde_json::Map::new();
for port in ["a", "b", "c", "d"] {
if let Some(msg) = payload.get(port) {
let val = serde_json::to_value(msg).unwrap_or(serde_json::Value::Null);
obj.insert(port.to_string(), val);
}
}
Ok([(
"output".to_string(),
Message::object(EncodableValue::from(serde_json::Value::Object(obj))),
)]
.into())
}
}
}
#[actor(SplitActor, inports::<10>(input), outports::<1>(head, tail, count), state(MemoryState))]
pub async fn split_actor(ctx: ActorContext) -> Result<HashMap<String, Message>, Error> {
let payload = ctx.get_payload();
let items = match payload.get("input") {
Some(Message::Array(arr)) => arr.as_ref().clone(),
_ => return Ok(error_output("Expected Array on input port")),
};
let count = items.len();
let mut out = HashMap::new();
if let Some(first) = items.first() {
let val: serde_json::Value = first.clone().into();
out.insert(
"head".to_string(),
Message::object(EncodableValue::from(val)),
);
}
let tail: Vec<EncodableValue> = items.into_iter().skip(1).collect();
out.insert("tail".to_string(), Message::Array(Arc::new(tail)));
out.insert("count".to_string(), Message::Integer(count as i64));
Ok(out)
}
#[actor(DelayActor, inports::<10>(input), outports::<1>(output), state(MemoryState))]
pub async fn delay_actor(ctx: ActorContext) -> Result<HashMap<String, Message>, Error> {
let payload = ctx.get_payload();
let config = ctx.get_config_hashmap();
let delay_ms = config
.get("delayMs")
.and_then(|v| v.as_u64())
.unwrap_or(1000);
let input = payload.get("input").cloned().unwrap_or(Message::Flow);
#[cfg(not(target_arch = "wasm32"))]
tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
Ok([("output".to_string(), input)].into())
}
#[actor(GateActor, inports::<10>(input, control), outports::<1>(output, blocked), state(MemoryState), await_all_inports)]
pub async fn gate_actor(ctx: ActorContext) -> Result<HashMap<String, Message>, Error> {
let payload = ctx.get_payload();
let config = ctx.get_config_hashmap();
let invert = config
.get("invert")
.and_then(|v| v.as_bool())
.unwrap_or(false);
let open = match payload.get("control") {
Some(Message::Boolean(b)) => *b,
Some(Message::Integer(i)) => *i != 0,
Some(Message::Float(f)) => *f != 0.0,
_ => true,
};
let pass = if invert { !open } else { open };
let input = payload.get("input").cloned().unwrap_or(Message::Flow);
let mut out = HashMap::new();
if pass {
out.insert("output".to_string(), input);
} else {
out.insert("blocked".to_string(), input);
}
Ok(out)
}
#[actor(CollectActor, inports::<100>(input), outports::<1>(output, count), state(MemoryState))]
pub async fn collect_actor(ctx: ActorContext) -> Result<HashMap<String, Message>, Error> {
let payload = ctx.get_payload();
let config = ctx.get_config_hashmap();
let state = ctx.get_state();
let target = config.get("count").and_then(|v| v.as_u64()).unwrap_or(10) as usize;
let mut state_lock = state.lock();
let memory = state_lock
.as_mut_any()
.downcast_mut::<MemoryState>()
.ok_or_else(|| anyhow::anyhow!("Invalid state"))?;
if memory.value("buffer").is_none() {
memory.insert("buffer", serde_json::Value::Array(Vec::new()));
}
if let Some(msg) = payload.get("input") {
let val = serde_json::to_value(msg).unwrap_or(serde_json::Value::Null);
if let Some(serde_json::Value::Array(arr)) = memory.value_mut("buffer") {
arr.push(val);
if arr.len() >= target {
let items: Vec<EncodableValue> = arr.drain(..).map(EncodableValue::from).collect();
let count = items.len();
let mut out = HashMap::new();
out.insert("output".to_string(), Message::Array(Arc::new(items)));
out.insert("count".to_string(), Message::Integer(count as i64));
return Ok(out);
}
}
}
Ok(HashMap::new())
}
#[actor(PassthroughActor, inports::<10>(input), outports::<1>(output), state(MemoryState))]
pub async fn passthrough_actor(ctx: ActorContext) -> Result<HashMap<String, Message>, Error> {
let payload = ctx.get_payload();
let input = payload.get("input").cloned().unwrap_or(Message::Flow);
Ok([("output".to_string(), input)].into())
}
fn error_output(msg: &str) -> HashMap<String, Message> {
let mut out = HashMap::new();
out.insert("error".to_string(), Message::Error(msg.to_string().into()));
out
}