use ff_core::contracts::*;
use crate::error::ScriptError;
use ff_core::keys::{ExecKeyContext, IndexKeys};
use ff_core::types::*;
use crate::result::{FcallResult, FromFcallResult};
use super::execution::ExecOpKeys;
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct ClaimedResumedExecutionPartial {
pub lease_id: LeaseId,
pub lease_epoch: LeaseEpoch,
pub attempt_index: AttemptIndex,
pub attempt_id: AttemptId,
pub lease_expires_at: TimestampMs,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum ClaimResumedExecutionResultPartial {
Claimed(ClaimedResumedExecutionPartial),
}
impl ClaimResumedExecutionResultPartial {
pub fn complete(self, execution_id: ExecutionId) -> ClaimResumedExecutionResult {
match self {
Self::Claimed(p) => ClaimResumedExecutionResult::Claimed(ClaimedResumedExecution {
execution_id,
lease_id: p.lease_id,
lease_epoch: p.lease_epoch,
attempt_index: p.attempt_index,
attempt_id: p.attempt_id,
lease_expires_at: p.lease_expires_at,
}),
}
}
}
pub struct SignalOpKeys<'a> {
pub ctx: &'a ExecKeyContext,
pub idx: &'a IndexKeys,
pub lane_id: &'a LaneId,
}
ff_function! {
pub ff_deliver_signal(args: DeliverSignalArgs) -> DeliverSignalResult {
keys(k: &SignalOpKeys<'_>) {
k.ctx.core(), k.ctx.waitpoint_condition(&args.waitpoint_id), k.ctx.waitpoint_signals(&args.waitpoint_id), k.ctx.exec_signals(), k.ctx.signal(&args.signal_id), k.ctx.signal_payload(&args.signal_id), args.idempotency_key.as_ref().filter(|ik| !ik.is_empty()).map(|ik| {
k.ctx.signal_dedup(&args.waitpoint_id, ik)
}).unwrap_or_else(|| k.ctx.noop()), k.ctx.waitpoint(&args.waitpoint_id), k.ctx.suspension_current(), k.idx.lane_eligible(k.lane_id), k.idx.lane_suspended(k.lane_id), k.idx.lane_delayed(k.lane_id), k.idx.suspension_timeout(), k.idx.waitpoint_hmac_secrets(), }
argv {
args.signal_id.to_string(), args.execution_id.to_string(), args.waitpoint_id.to_string(), args.signal_name.clone(), args.signal_category.clone(), args.source_type.clone(), args.source_identity.clone(), args.payload.as_ref()
.map(|p| String::from_utf8_lossy(p).into_owned())
.unwrap_or_default(), args.payload_encoding.clone().unwrap_or_else(|| "json".into()), args.idempotency_key.clone().unwrap_or_default(), args.correlation_id.clone().unwrap_or_default(), args.target_scope.clone(), args.created_at.map(|t| t.to_string()).unwrap_or_default(), args.dedup_ttl_ms.unwrap_or(86_400_000).to_string(), args.resume_delay_ms.unwrap_or(0).to_string(), args.signal_maxlen.unwrap_or(1000).to_string(), args.max_signals_per_execution.unwrap_or(10_000).to_string(), args.waitpoint_token.as_str().to_owned(), }
}
}
impl FromFcallResult for DeliverSignalResult {
fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
let r = FcallResult::parse(raw)?;
if r.status == "DUPLICATE" {
let sid_str = r.field_str(0);
let sid = SignalId::parse(&sid_str)
.map_err(|e| ScriptError::Parse(format!("bad signal_id: {e}")))?;
return Ok(DeliverSignalResult::Duplicate {
existing_signal_id: sid,
});
}
let r = r.into_success()?;
let sid_str = r.field_str(0);
let effect = r.field_str(1);
let sid = SignalId::parse(&sid_str)
.map_err(|e| ScriptError::Parse(format!("bad signal_id: {e}")))?;
Ok(DeliverSignalResult::Accepted {
signal_id: sid,
effect,
})
}
}
ff_function! {
pub ff_buffer_signal_for_pending_waitpoint(args: BufferSignalArgs) -> BufferSignalResult {
keys(k: &SignalOpKeys<'_>) {
k.ctx.core(), k.ctx.waitpoint_condition(&args.waitpoint_id), k.ctx.waitpoint_signals(&args.waitpoint_id), k.ctx.exec_signals(), k.ctx.signal(&args.signal_id), k.ctx.signal_payload(&args.signal_id), args.idempotency_key.as_ref().filter(|ik| !ik.is_empty()).map(|ik| {
k.ctx.signal_dedup(&args.waitpoint_id, ik)
}).unwrap_or_else(|| k.ctx.noop()), k.ctx.waitpoint(&args.waitpoint_id), k.idx.waitpoint_hmac_secrets(), }
argv {
args.signal_id.to_string(), args.execution_id.to_string(), args.waitpoint_id.to_string(), args.signal_name.clone(), args.signal_category.clone(), args.source_type.clone(), args.source_identity.clone(), args.payload.as_ref()
.map(|p| String::from_utf8_lossy(p).into_owned())
.unwrap_or_default(), args.payload_encoding.clone().unwrap_or_else(|| "json".into()), args.idempotency_key.clone().unwrap_or_default(), String::new(), args.target_scope.clone(), String::new(), String::new(), String::new(), String::new(), String::new(), args.waitpoint_token.as_str().to_owned(), }
}
}
impl FromFcallResult for BufferSignalResult {
fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
let r = FcallResult::parse(raw)?;
if r.status == "DUPLICATE" {
let sid_str = r.field_str(0);
let sid = SignalId::parse(&sid_str)
.map_err(|e| ScriptError::Parse(format!("bad signal_id: {e}")))?;
return Ok(BufferSignalResult::Duplicate {
existing_signal_id: sid,
});
}
let r = r.into_success()?;
let sid_str = r.field_str(0);
let sid = SignalId::parse(&sid_str)
.map_err(|e| ScriptError::Parse(format!("bad signal_id: {e}")))?;
Ok(BufferSignalResult::Buffered { signal_id: sid })
}
}
ff_function! {
pub ff_claim_resumed_execution(args: ClaimResumedExecutionArgs) -> ClaimResumedExecutionResultPartial {
keys(k: &ExecOpKeys<'_>) {
k.ctx.core(), k.ctx.claim_grant(), k.idx.lane_eligible(k.lane_id), k.idx.lease_expiry(), k.idx.worker_leases(k.worker_instance_id), k.ctx.attempt_hash(args.current_attempt_index), k.ctx.lease_current(), k.ctx.lease_history(), k.idx.lane_active(k.lane_id), k.idx.attempt_timeout(), k.idx.execution_deadline(), }
argv {
args.execution_id.to_string(), args.worker_id.to_string(), args.worker_instance_id.to_string(), args.lane_id.to_string(), String::new(), args.lease_id.to_string(), args.lease_ttl_ms.to_string(), args.remaining_attempt_timeout_ms
.map(|t| t.to_string())
.unwrap_or_default(), }
}
}
impl FromFcallResult for ClaimResumedExecutionResultPartial {
fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
let r = FcallResult::parse(raw)?.into_success()?;
let lease_id = LeaseId::parse(&r.field_str(0))
.map_err(|e| ScriptError::Parse(format!("bad lease_id: {e}")))?;
let epoch = r.field_str(1).parse::<u64>()
.map_err(|e| ScriptError::Parse(format!("bad epoch: {e}")))?;
let expires_at = r.field_str(2).parse::<i64>()
.map_err(|e| ScriptError::Parse(format!("bad expires_at: {e}")))?;
let attempt_id = AttemptId::parse(&r.field_str(3))
.map_err(|e| ScriptError::Parse(format!("bad attempt_id: {e}")))?;
let attempt_index = r.field_str(4).parse::<u32>()
.map_err(|e| ScriptError::Parse(format!("bad attempt_index: {e}")))?;
Ok(Self::Claimed(ClaimedResumedExecutionPartial {
lease_id,
lease_epoch: LeaseEpoch::new(epoch),
attempt_index: AttemptIndex::new(attempt_index),
attempt_id,
lease_expires_at: TimestampMs::from_millis(expires_at),
}))
}
}
#[cfg(test)]
mod partial_tests {
use super::*;
use ff_core::partition::PartitionConfig;
#[test]
fn claim_resumed_partial_complete_attaches_execution_id() {
let partial = ClaimResumedExecutionResultPartial::Claimed(ClaimedResumedExecutionPartial {
lease_id: LeaseId::new(),
lease_epoch: LeaseEpoch::new(2),
attempt_index: AttemptIndex::new(1),
attempt_id: AttemptId::new(),
lease_expires_at: TimestampMs::from_millis(2000),
});
let eid = ExecutionId::for_flow(&FlowId::new(), &PartitionConfig::default());
let full = partial.complete(eid.clone());
match full {
ClaimResumedExecutionResult::Claimed(c) => assert_eq!(c.execution_id, eid),
}
}
}