use crate::{Actor, ActorBehavior, Message, Port};
use anyhow::{Error, Result};
use reflow_actor::{message::EncodableValue, ActorContext};
use reflow_actor_macro::actor;
use serde_json::{json, Value};
use std::collections::HashMap;
#[actor(
SignalActor,
inports::<10>(trigger),
outports::<50>(signal),
state(MemoryState)
)]
pub async fn signal_actor(ctx: ActorContext) -> Result<HashMap<String, Message>, Error> {
let payload = ctx.get_payload();
let config = ctx.get_config_hashmap();
if !payload.contains_key("trigger") {
return Ok(HashMap::new());
}
let event_id = config
.get("event")
.and_then(|v| v.as_str())
.unwrap_or("signal");
let mut data = config
.get("data")
.cloned()
.unwrap_or(Value::Object(Default::default()));
if let Some(Message::Object(obj)) = payload.get("trigger") {
let trigger_v: Value = obj.as_ref().clone().into();
if let (Some(base), Some(overlay)) = (data.as_object_mut(), trigger_v.as_object()) {
for (k, v) in overlay {
base.insert(k.clone(), v.clone());
}
}
}
let signal = json!({ "id": event_id, "data": data });
Ok(HashMap::from([(
"signal".to_string(),
Message::object(EncodableValue::from(signal)),
)]))
}
#[actor(
SubscriberActor,
inports::<100>(signal),
outports::<50>(data, trigger),
state(MemoryState)
)]
pub async fn subscriber_actor(ctx: ActorContext) -> Result<HashMap<String, Message>, Error> {
let payload = ctx.get_payload();
let config = ctx.get_config_hashmap();
let listen_for = config.get("event").and_then(|v| v.as_str()).unwrap_or("*");
let msg = match payload.get("signal") {
Some(m) => m,
None => return Ok(HashMap::new()),
};
let (signal_id, signal_data) = match msg {
Message::Object(obj) => {
let v: Value = obj.as_ref().clone().into();
let id = v
.get("id")
.and_then(|i| i.as_str())
.unwrap_or("")
.to_string();
let data = v.get("data").cloned().unwrap_or(Value::Null);
(id, data)
}
Message::String(s) => (s.to_string(), Value::Null),
_ => return Ok(HashMap::new()),
};
if listen_for != "*" && signal_id != listen_for {
return Ok(HashMap::new());
}
let mut out = HashMap::new();
if !signal_data.is_null() {
out.insert(
"data".to_string(),
Message::object(EncodableValue::from(signal_data)),
);
}
out.insert("trigger".to_string(), Message::Flow);
Ok(out)
}