use crate::{Actor, ActorBehavior, Message, Port};
use anyhow::{Error, Result};
use reflow_actor::{message::EncodableValue, ActorContext};
use reflow_actor_macro::actor;
use serde_json::Value;
use std::collections::HashMap;
#[actor(
DataEmitActor,
inports::<10>(trigger),
outports::<10>(output),
state(MemoryState)
)]
pub async fn data_emit_actor(ctx: ActorContext) -> Result<HashMap<String, Message>, Error> {
let config = ctx.get_config_hashmap();
let oneshot = config
.get("oneshot")
.and_then(|v| v.as_bool())
.unwrap_or(true);
if oneshot && ctx.get_pool("_emit").into_iter().any(|(k, _)| k == "done") {
return Ok(HashMap::new());
}
let data = match config.get("data") {
Some(v) => v.clone(),
None => return Ok(HashMap::new()),
};
if oneshot {
ctx.pool_upsert("_emit", "done", Value::Bool(true));
}
let msg = Message::object(EncodableValue::from(data));
Ok([("output".to_string(), msg)].into())
}