use crate::{Actor, ActorBehavior, Message, Port};
use anyhow::{Error, Result};
use reflow_actor::{message::EncodableValue, ActorContext};
use reflow_actor_macro::actor;
use std::collections::HashMap;
#[cfg(not(target_arch = "wasm32"))]
fn spawn_task<F: std::future::Future<Output = ()> + Send + 'static>(f: F) {
tokio::spawn(f);
}
#[cfg(target_arch = "wasm32")]
fn spawn_task<F: std::future::Future<Output = ()> + 'static>(f: F) {
wasm_bindgen_futures::spawn_local(f);
}
#[cfg(not(target_arch = "wasm32"))]
async fn sleep_ms(ms: u64) {
tokio::time::sleep(std::time::Duration::from_millis(ms)).await;
}
#[cfg(target_arch = "wasm32")]
async fn sleep_ms(ms: u64) {
let promise = js_sys::Promise::new(&mut |resolve, _| {
let _ = web_sys::window()
.unwrap()
.set_timeout_with_callback_and_timeout_and_arguments_0(&resolve, ms as i32);
});
let _ = wasm_bindgen_futures::JsFuture::from(promise).await;
}
#[actor(
IntervalTriggerActor,
inports::<1>(start),
outports::<50>(trigger, done),
state(MemoryState)
)]
pub async fn interval_trigger_actor(ctx: ActorContext) -> Result<HashMap<String, Message>, Error> {
let config = ctx.get_config_hashmap();
let interval_ms = config
.get("interval")
.and_then(|v| v.as_u64())
.unwrap_or(60000);
let unit = config
.get("intervalUnit")
.and_then(|v| v.as_str())
.unwrap_or("milliseconds");
let interval = match unit {
"seconds" => interval_ms * 1000,
"minutes" => interval_ms * 60_000,
"hours" => interval_ms * 3_600_000,
"days" => interval_ms * 86_400_000,
_ => interval_ms,
};
let start_immediately = config
.get("startImmediately")
.and_then(|v| v.as_bool())
.unwrap_or(true);
let max_executions = config
.get("maxExecutions")
.and_then(|v| v.as_u64())
.unwrap_or(0);
let payload_template = config
.get("payload")
.and_then(|v| v.as_str())
.unwrap_or(r#"{"timestamp": "${timestamp}"}"#)
.to_string();
let already_spawned = ctx
.get_pool("_trigger")
.into_iter()
.any(|(k, _)| k == "spawned");
if already_spawned {
return Ok(HashMap::new());
}
ctx.pool_upsert("_trigger", "spawned", serde_json::json!(true));
let outport_tx = ctx.get_outports().0;
spawn_task(async move {
let mut execution_count: u64 = 0;
if start_immediately {
execution_count += 1;
let payload = build_trigger_payload(&payload_template, execution_count);
let mut out = HashMap::new();
out.insert("trigger".to_string(), payload);
if outport_tx.send(out).is_err() {
return;
}
}
loop {
sleep_ms(interval).await;
execution_count += 1;
if max_executions > 0 && execution_count > max_executions {
let mut out = HashMap::new();
out.insert("done".to_string(), Message::Flow);
let _ = outport_tx.send(out);
break;
}
let payload = build_trigger_payload(&payload_template, execution_count);
let mut out = HashMap::new();
out.insert("trigger".to_string(), payload);
if outport_tx.send(out).is_err() {
break;
}
}
});
Ok(HashMap::new())
}
#[actor(
CronTriggerActor,
inports::<1>(),
outports::<50>(trigger),
state(MemoryState)
)]
pub async fn cron_trigger_actor(ctx: ActorContext) -> Result<HashMap<String, Message>, Error> {
let config = ctx.get_config_hashmap();
let common = config
.get("commonSchedules")
.and_then(|v| v.as_str())
.unwrap_or("Custom");
let cron_expr = match common {
"Every minute" => "* * * * *",
"Every 5 minutes" => "*/5 * * * *",
"Every 15 minutes" => "*/15 * * * *",
"Every 30 minutes" => "*/30 * * * *",
"Every hour" => "0 * * * *",
"Every day at midnight" => "0 0 * * *",
"Every Monday at 9 AM" => "0 9 * * 1",
"First day of month" => "0 0 1 * *",
_ => config
.get("cronExpression")
.and_then(|v| v.as_str())
.unwrap_or("0 * * * *"),
};
let max_executions = config
.get("maxExecutions")
.and_then(|v| v.as_u64())
.unwrap_or(0);
let payload_template = config
.get("payload")
.and_then(|v| v.as_str())
.unwrap_or(r#"{"timestamp": "${timestamp}", "schedule": "${schedule}"}"#)
.to_string();
let interval_ms = parse_cron_to_interval(cron_expr);
let cron_expr_owned = cron_expr.to_string();
let outport_tx = ctx.get_outports().0;
spawn_task(async move {
let mut execution_count: u64 = 0;
loop {
if interval_ms > 0 {
sleep_ms(interval_ms).await;
}
execution_count += 1;
if max_executions > 0 && execution_count > max_executions {
break;
}
let now = chrono::Utc::now();
let payload_str = payload_template
.replace("${timestamp}", &now.to_rfc3339())
.replace("${schedule}", &cron_expr_owned);
let payload = match serde_json::from_str::<serde_json::Value>(&payload_str) {
Ok(val) => Message::object(EncodableValue::from(val)),
Err(_) => Message::String(payload_str.into()),
};
let mut out = HashMap::new();
out.insert("trigger".to_string(), payload);
if outport_tx.send(out).is_err() {
break;
}
}
});
Ok(HashMap::new())
}
fn build_trigger_payload(template: &str, execution_count: u64) -> Message {
let now = chrono::Utc::now();
let resolved = template
.replace("${timestamp}", &now.to_rfc3339())
.replace("${executionCount}", &execution_count.to_string());
match serde_json::from_str::<serde_json::Value>(&resolved) {
Ok(val) => Message::object(EncodableValue::from(val)),
Err(_) => Message::String(resolved.into()),
}
}
fn parse_cron_to_interval(expr: &str) -> u64 {
let parts: Vec<&str> = expr.split_whitespace().collect();
if parts.len() < 5 {
return 60_000; }
match parts[0] {
"*" => 60_000, "*/5" => 300_000, "*/15" => 900_000, "*/30" => 1_800_000, "0" if parts[1] == "*" => 3_600_000, "0" if parts[1] == "0" => 86_400_000, _ => 60_000,
}
}