use ferriskey::Client;
use ff_core::contracts::{HeartbeatWorkerOutcome, RegisterWorkerOutcome};
use ff_core::types::TimestampMs;
use crate::error::ScriptError;
use crate::result::{FcallResult, FromFcallResult};
impl FromFcallResult for RegisterWorkerOutcome {
fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
let r = FcallResult::parse(raw)?.into_success()?;
let tag = r.field_str(0);
match tag.as_str() {
"registered" => Ok(RegisterWorkerOutcome::Registered),
"refreshed" => Ok(RegisterWorkerOutcome::Refreshed),
other => Err(ScriptError::Parse {
fcall: "ff_register_worker".into(),
execution_id: None,
message: format!("unexpected outcome tag: {other:?}"),
}),
}
}
}
pub struct RegisterWorkerArgv<'a> {
pub instance_id: &'a str,
pub worker_id: &'a str,
pub lanes_csv: &'a str,
pub caps_csv: &'a str,
pub ttl_ms: u64,
pub now_ms: i64,
}
pub struct RegisterWorkerKeys<'a> {
pub alive_key: &'a str,
pub caps_key: &'a str,
pub index_key: &'a str,
}
pub async fn ff_register_worker(
conn: &Client,
keys: RegisterWorkerKeys<'_>,
argv: RegisterWorkerArgv<'_>,
) -> Result<RegisterWorkerOutcome, ScriptError> {
let key_refs: [&str; 3] = [keys.alive_key, keys.caps_key, keys.index_key];
let ttl = argv.ttl_ms.to_string();
let now = argv.now_ms.to_string();
let argv_refs: [&str; 6] = [
argv.instance_id,
argv.worker_id,
argv.lanes_csv,
argv.caps_csv,
ttl.as_str(),
now.as_str(),
];
let raw = conn
.fcall::<ferriskey::Value>("ff_register_worker", &key_refs, &argv_refs)
.await
.map_err(ScriptError::Valkey)?;
RegisterWorkerOutcome::from_fcall_result(&raw)
}
pub struct HeartbeatWorkerReply {
pub outcome_tag: HeartbeatReplyTag,
}
pub enum HeartbeatReplyTag {
NotRegistered,
Refreshed { ttl_ms: u64 },
}
impl FromFcallResult for HeartbeatWorkerReply {
fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
let r = FcallResult::parse(raw)?.into_success()?;
let tag = r.field_str(0);
match tag.as_str() {
"not_registered" => Ok(HeartbeatWorkerReply {
outcome_tag: HeartbeatReplyTag::NotRegistered,
}),
"refreshed" => {
let ttl_str = r.field_str(1);
let ttl_ms: u64 = ttl_str.parse().map_err(|e| ScriptError::Parse {
fcall: "ff_heartbeat_worker".into(),
execution_id: None,
message: format!("refreshed: ttl_ms not a u64 ({ttl_str:?}): {e}"),
})?;
Ok(HeartbeatWorkerReply {
outcome_tag: HeartbeatReplyTag::Refreshed { ttl_ms },
})
}
other => Err(ScriptError::Parse {
fcall: "ff_heartbeat_worker".into(),
execution_id: None,
message: format!("unexpected outcome tag: {other:?}"),
}),
}
}
}
pub struct HeartbeatWorkerKeys<'a> {
pub alive_key: &'a str,
pub caps_key: &'a str,
}
pub async fn ff_heartbeat_worker(
conn: &Client,
keys: HeartbeatWorkerKeys<'_>,
now_ms: i64,
) -> Result<HeartbeatWorkerOutcome, ScriptError> {
let key_refs: [&str; 2] = [keys.alive_key, keys.caps_key];
let now = now_ms.to_string();
let argv_refs: [&str; 1] = [now.as_str()];
let raw = conn
.fcall::<ferriskey::Value>("ff_heartbeat_worker", &key_refs, &argv_refs)
.await
.map_err(ScriptError::Valkey)?;
let reply = HeartbeatWorkerReply::from_fcall_result(&raw)?;
match reply.outcome_tag {
HeartbeatReplyTag::NotRegistered => Ok(HeartbeatWorkerOutcome::NotRegistered),
HeartbeatReplyTag::Refreshed { ttl_ms } => {
let next_expiry_ms = TimestampMs::from_millis(now_ms.saturating_add(ttl_ms as i64));
Ok(HeartbeatWorkerOutcome::Refreshed { next_expiry_ms })
}
}
}