use crate::partition::Partition;
use crate::types::{
AttemptIndex, BudgetId, EdgeId, ExecutionId, FlowId, LaneId, QuotaPolicyId, SignalId,
WaitpointId, WaitpointKey, WorkerInstanceId,
};
pub struct ExecKeyContext {
tag: String,
eid: String,
}
impl ExecKeyContext {
pub fn new(partition: &Partition, eid: &ExecutionId) -> Self {
Self {
tag: partition.hash_tag(),
eid: eid.to_string(),
}
}
pub fn core(&self) -> String {
format!("ff:exec:{}:{}:core", self.tag, self.eid)
}
pub fn payload(&self) -> String {
format!("ff:exec:{}:{}:payload", self.tag, self.eid)
}
pub fn result(&self) -> String {
format!("ff:exec:{}:{}:result", self.tag, self.eid)
}
pub fn policy(&self) -> String {
format!("ff:exec:{}:{}:policy", self.tag, self.eid)
}
pub fn tags(&self) -> String {
format!("ff:exec:{}:{}:tags", self.tag, self.eid)
}
pub fn lease_current(&self) -> String {
format!("ff:exec:{}:{}:lease:current", self.tag, self.eid)
}
pub fn lease_history(&self) -> String {
format!("ff:exec:{}:{}:lease:history", self.tag, self.eid)
}
pub fn claim_grant(&self) -> String {
format!("ff:exec:{}:{}:claim_grant", self.tag, self.eid)
}
pub fn attempts(&self) -> String {
format!("ff:exec:{}:{}:attempts", self.tag, self.eid)
}
pub fn attempt_hash(&self, index: AttemptIndex) -> String {
format!("ff:attempt:{}:{}:{}", self.tag, self.eid, index)
}
pub fn attempt_usage(&self, index: AttemptIndex) -> String {
format!("ff:attempt:{}:{}:{}:usage", self.tag, self.eid, index)
}
pub fn attempt_policy(&self, index: AttemptIndex) -> String {
format!("ff:attempt:{}:{}:{}:policy", self.tag, self.eid, index)
}
pub fn stream(&self, index: AttemptIndex) -> String {
format!("ff:stream:{}:{}:{}", self.tag, self.eid, index)
}
pub fn stream_meta(&self, index: AttemptIndex) -> String {
format!("ff:stream:{}:{}:{}:meta", self.tag, self.eid, index)
}
pub fn suspension_current(&self) -> String {
format!("ff:exec:{}:{}:suspension:current", self.tag, self.eid)
}
pub fn waitpoints(&self) -> String {
format!("ff:exec:{}:{}:waitpoints", self.tag, self.eid)
}
pub fn waitpoint(&self, wp_id: &WaitpointId) -> String {
format!("ff:wp:{}:{}", self.tag, wp_id)
}
pub fn waitpoint_signals(&self, wp_id: &WaitpointId) -> String {
format!("ff:wp:{}:{}:signals", self.tag, wp_id)
}
pub fn waitpoint_condition(&self, wp_id: &WaitpointId) -> String {
format!("ff:wp:{}:{}:condition", self.tag, wp_id)
}
pub fn signal(&self, sig_id: &SignalId) -> String {
format!("ff:signal:{}:{}", self.tag, sig_id)
}
pub fn signal_payload(&self, sig_id: &SignalId) -> String {
format!("ff:signal:{}:{}:payload", self.tag, sig_id)
}
pub fn exec_signals(&self) -> String {
format!("ff:exec:{}:{}:signals", self.tag, self.eid)
}
pub fn signal_dedup(&self, wp_id: &WaitpointId, idempotency_key: &str) -> String {
format!("ff:sigdedup:{}:{}:{}", self.tag, wp_id, idempotency_key)
}
pub fn deps_meta(&self) -> String {
format!("ff:exec:{}:{}:deps:meta", self.tag, self.eid)
}
pub fn dep_edge(&self, edge_id: &EdgeId) -> String {
format!("ff:exec:{}:{}:dep:{}", self.tag, self.eid, edge_id)
}
pub fn deps_unresolved(&self) -> String {
format!("ff:exec:{}:{}:deps:unresolved", self.tag, self.eid)
}
pub fn deps_all_edges(&self) -> String {
format!("ff:exec:{}:{}:deps:all_edges", self.tag, self.eid)
}
pub fn noop(&self) -> String {
format!("ff:noop:{}", self.tag)
}
pub fn hash_tag(&self) -> &str {
&self.tag
}
pub fn execution_id_str(&self) -> &str {
&self.eid
}
}
pub struct IndexKeys {
tag: String,
}
impl IndexKeys {
pub fn new(partition: &Partition) -> Self {
Self {
tag: partition.hash_tag(),
}
}
pub fn lane_eligible(&self, lane_id: &LaneId) -> String {
format!("ff:idx:{}:lane:{}:eligible", self.tag, lane_id)
}
pub fn lane_delayed(&self, lane_id: &LaneId) -> String {
format!("ff:idx:{}:lane:{}:delayed", self.tag, lane_id)
}
pub fn lane_active(&self, lane_id: &LaneId) -> String {
format!("ff:idx:{}:lane:{}:active", self.tag, lane_id)
}
pub fn lane_terminal(&self, lane_id: &LaneId) -> String {
format!("ff:idx:{}:lane:{}:terminal", self.tag, lane_id)
}
pub fn lane_blocked_dependencies(&self, lane_id: &LaneId) -> String {
format!("ff:idx:{}:lane:{}:blocked:dependencies", self.tag, lane_id)
}
pub fn lane_blocked_budget(&self, lane_id: &LaneId) -> String {
format!("ff:idx:{}:lane:{}:blocked:budget", self.tag, lane_id)
}
pub fn lane_blocked_quota(&self, lane_id: &LaneId) -> String {
format!("ff:idx:{}:lane:{}:blocked:quota", self.tag, lane_id)
}
pub fn lane_blocked_route(&self, lane_id: &LaneId) -> String {
format!("ff:idx:{}:lane:{}:blocked:route", self.tag, lane_id)
}
pub fn lane_blocked_operator(&self, lane_id: &LaneId) -> String {
format!("ff:idx:{}:lane:{}:blocked:operator", self.tag, lane_id)
}
pub fn lane_suspended(&self, lane_id: &LaneId) -> String {
format!("ff:idx:{}:lane:{}:suspended", self.tag, lane_id)
}
pub fn waitpoint_hmac_secrets(&self) -> String {
format!("ff:sec:{}:waitpoint_hmac", self.tag)
}
pub fn lease_expiry(&self) -> String {
format!("ff:idx:{}:lease_expiry", self.tag)
}
pub fn worker_leases(&self, wid: &WorkerInstanceId) -> String {
format!("ff:idx:{}:worker:{}:leases", self.tag, wid)
}
pub fn suspension_timeout(&self) -> String {
format!("ff:idx:{}:suspension_timeout", self.tag)
}
pub fn pending_waitpoint_expiry(&self) -> String {
format!("ff:idx:{}:pending_waitpoint_expiry", self.tag)
}
pub fn attempt_timeout(&self) -> String {
format!("ff:idx:{}:attempt_timeout", self.tag)
}
pub fn execution_deadline(&self) -> String {
format!("ff:idx:{}:execution_deadline", self.tag)
}
pub fn all_executions(&self) -> String {
format!("ff:idx:{}:all_executions", self.tag)
}
}
pub struct FlowKeyContext {
tag: String,
fid: String,
}
impl FlowKeyContext {
pub fn new(partition: &Partition, fid: &FlowId) -> Self {
Self {
tag: partition.hash_tag(),
fid: fid.to_string(),
}
}
pub fn core(&self) -> String {
format!("ff:flow:{}:{}:core", self.tag, self.fid)
}
pub fn members(&self) -> String {
format!("ff:flow:{}:{}:members", self.tag, self.fid)
}
pub fn tags(&self) -> String {
format!("ff:flow:{}:{}:tags", self.tag, self.fid)
}
pub fn member(&self, eid: &ExecutionId) -> String {
format!("ff:flow:{}:{}:member:{}", self.tag, self.fid, eid)
}
pub fn edge(&self, edge_id: &EdgeId) -> String {
format!("ff:flow:{}:{}:edge:{}", self.tag, self.fid, edge_id)
}
pub fn outgoing(&self, upstream_eid: &ExecutionId) -> String {
format!("ff:flow:{}:{}:out:{}", self.tag, self.fid, upstream_eid)
}
pub fn incoming(&self, downstream_eid: &ExecutionId) -> String {
format!("ff:flow:{}:{}:in:{}", self.tag, self.fid, downstream_eid)
}
pub fn events(&self) -> String {
format!("ff:flow:{}:{}:events", self.tag, self.fid)
}
pub fn summary(&self) -> String {
format!("ff:flow:{}:{}:summary", self.tag, self.fid)
}
pub fn grant(&self, mutation_id: &str) -> String {
format!("ff:flow:{}:{}:grant:{}", self.tag, self.fid, mutation_id)
}
pub fn pending_cancels(&self) -> String {
format!("ff:flow:{}:{}:pending_cancels", self.tag, self.fid)
}
pub fn hash_tag(&self) -> &str {
&self.tag
}
}
pub struct FlowIndexKeys {
tag: String,
}
impl FlowIndexKeys {
pub fn new(partition: &Partition) -> Self {
Self {
tag: partition.hash_tag(),
}
}
pub fn flow_index(&self) -> String {
format!("ff:idx:{}:flow_index", self.tag)
}
pub fn cancel_backlog(&self) -> String {
format!("ff:idx:{}:cancel_backlog", self.tag)
}
}
pub struct BudgetKeyContext {
tag: String,
bid: String,
}
impl BudgetKeyContext {
pub fn new(partition: &Partition, bid: &BudgetId) -> Self {
Self {
tag: partition.hash_tag(),
bid: bid.to_string(),
}
}
pub fn definition(&self) -> String {
format!("ff:budget:{}:{}", self.tag, self.bid)
}
pub fn limits(&self) -> String {
format!("ff:budget:{}:{}:limits", self.tag, self.bid)
}
pub fn usage(&self) -> String {
format!("ff:budget:{}:{}:usage", self.tag, self.bid)
}
pub fn executions(&self) -> String {
format!("ff:budget:{}:{}:executions", self.tag, self.bid)
}
pub fn hash_tag(&self) -> &str {
&self.tag
}
}
pub fn budget_attach_key(tag: &str, scope_type: &str, scope_id: &str) -> String {
format!("ff:budget_attach:{}:{}:{}", tag, scope_type, scope_id)
}
pub fn budget_resets_key(tag: &str) -> String {
format!("ff:idx:{}:budget_resets", tag)
}
pub fn budget_policies_index(tag: &str) -> String {
format!("ff:idx:{}:budget_policies", tag)
}
pub struct QuotaKeyContext {
tag: String,
qid: String,
}
impl QuotaKeyContext {
pub fn new(partition: &Partition, qid: &QuotaPolicyId) -> Self {
Self {
tag: partition.hash_tag(),
qid: qid.to_string(),
}
}
pub fn definition(&self) -> String {
format!("ff:quota:{}:{}", self.tag, self.qid)
}
pub fn window(&self, dimension: &str) -> String {
format!("ff:quota:{}:{}:window:{}", self.tag, self.qid, dimension)
}
pub fn concurrency(&self) -> String {
format!("ff:quota:{}:{}:concurrency", self.tag, self.qid)
}
pub fn admitted(&self, eid: &ExecutionId) -> String {
format!("ff:quota:{}:{}:admitted:{}", self.tag, self.qid, eid)
}
pub fn admitted_set(&self) -> String {
format!("ff:quota:{}:{}:admitted_set", self.tag, self.qid)
}
pub fn hash_tag(&self) -> &str {
&self.tag
}
}
pub fn quota_policies_index(tag: &str) -> String {
format!("ff:idx:{}:quota_policies", tag)
}
pub fn quota_attach_key(tag: &str, scope_type: &str, scope_id: &str) -> String {
format!("ff:quota_attach:{}:{}:{}", tag, scope_type, scope_id)
}
pub fn lane_config_key(lane_id: &LaneId) -> String {
format!("ff:lane:{}:config", lane_id)
}
pub fn lane_counts_key(lane_id: &LaneId) -> String {
format!("ff:lane:{}:counts", lane_id)
}
pub fn worker_key(wid: &WorkerInstanceId) -> String {
format!("ff:worker:{}", wid)
}
pub fn worker_caps_key(wid: &WorkerInstanceId) -> String {
format!("ff:worker:{}:caps", wid)
}
pub fn workers_index_key() -> String {
"ff:idx:workers".to_owned()
}
pub fn workers_capability_key(key: &str, value: &str) -> String {
format!("ff:idx:workers:cap:{}:{}", key, value)
}
pub fn lanes_index_key() -> String {
"ff:idx:lanes".to_owned()
}
pub fn global_config_partitions() -> String {
"ff:config:partitions".to_owned()
}
pub fn namespace_executions_key(namespace: &str) -> String {
format!("ff:ns:{}:executions", namespace)
}
pub fn idempotency_key(tag: &str, namespace: &str, idem_key: &str) -> String {
format!("ff:idem:{}:{}:{}", tag, namespace, idem_key)
}
pub fn noop_key(tag: &str) -> String {
format!("ff:noop:{}", tag)
}
pub fn tag_index_key(namespace: &str, key: &str, value: &str) -> String {
format!("ff:tag:{}:{}:{}", namespace, key, value)
}
pub fn waitpoint_key_resolution(tag: &str, wp_key: &WaitpointKey) -> String {
format!("ff:wpkey:{}:{}", tag, wp_key)
}
pub const USAGE_DEDUP_KEY_PREFIX: &str = "ff:usagededup:";
pub fn usage_dedup_key(hash_tag: &str, dedup_id: &str) -> String {
format!("{USAGE_DEDUP_KEY_PREFIX}{hash_tag}:{dedup_id}")
}
#[cfg(test)]
mod tests {
use super::*;
use crate::partition::{execution_partition, flow_partition, PartitionConfig};
#[test]
fn exec_key_context_core_format() {
let config = PartitionConfig::default();
let eid = ExecutionId::parse("{fp:0}:550e8400-e29b-41d4-a716-446655440000").unwrap();
let partition = execution_partition(&eid, &config);
let ctx = ExecKeyContext::new(&partition, &eid);
let core_key = ctx.core();
assert!(core_key.starts_with("ff:exec:{fp:"));
assert!(core_key.ends_with(":core"));
assert!(core_key.contains("550e8400-e29b-41d4-a716-446655440000"));
}
#[test]
fn exec_key_all_keys_share_hash_tag() {
let config = PartitionConfig::default();
let eid = ExecutionId::for_flow(&FlowId::new(), &config);
let partition = execution_partition(&eid, &config);
let ctx = ExecKeyContext::new(&partition, &eid);
let tag = ctx.hash_tag();
assert!(ctx.core().contains(tag));
assert!(ctx.payload().contains(tag));
assert!(ctx.result().contains(tag));
assert!(ctx.policy().contains(tag));
assert!(ctx.lease_current().contains(tag));
assert!(ctx.lease_history().contains(tag));
assert!(ctx.attempts().contains(tag));
assert!(ctx.suspension_current().contains(tag));
assert!(ctx.exec_signals().contains(tag));
}
#[test]
fn attempt_key_includes_index() {
let config = PartitionConfig::default();
let eid = ExecutionId::for_flow(&FlowId::new(), &config);
let partition = execution_partition(&eid, &config);
let ctx = ExecKeyContext::new(&partition, &eid);
let key = ctx.attempt_hash(AttemptIndex::new(3));
assert!(key.contains(":3"), "attempt key should contain index");
}
#[test]
fn stream_key_format() {
let config = PartitionConfig::default();
let eid = ExecutionId::for_flow(&FlowId::new(), &config);
let partition = execution_partition(&eid, &config);
let ctx = ExecKeyContext::new(&partition, &eid);
let key = ctx.stream(AttemptIndex::new(0));
assert!(key.starts_with("ff:stream:{fp:"));
assert!(key.ends_with(":0"));
}
#[test]
fn index_keys_format() {
let config = PartitionConfig::default();
let eid = ExecutionId::for_flow(&FlowId::new(), &config);
let partition = execution_partition(&eid, &config);
let idx = IndexKeys::new(&partition);
let lane = LaneId::new("default");
assert!(idx.lane_eligible(&lane).contains(":lane:default:eligible"));
assert!(idx.lane_delayed(&lane).contains(":lane:default:delayed"));
assert!(idx.lease_expiry().contains(":lease_expiry"));
assert!(idx.all_executions().contains(":all_executions"));
}
#[test]
fn flow_key_context_format() {
let config = PartitionConfig::default();
let fid = FlowId::new();
let partition = flow_partition(&fid, &config);
let ctx = FlowKeyContext::new(&partition, &fid);
assert!(ctx.core().starts_with("ff:flow:{fp:"));
assert!(ctx.core().ends_with(":core"));
assert!(ctx.members().ends_with(":members"));
}
#[test]
fn global_keys_no_hash_tag() {
let lane = LaneId::new("default");
let key = lane_config_key(&lane);
assert_eq!(key, "ff:lane:default:config");
assert!(!key.contains('{'));
}
#[test]
fn usage_dedup_key_format() {
let key = usage_dedup_key("{bp:7}", "dedup-123");
assert_eq!(key, "ff:usagededup:{bp:7}:dedup-123");
assert!(key.starts_with(USAGE_DEDUP_KEY_PREFIX));
assert_eq!(key.matches('{').count(), 1);
assert_eq!(key.matches('}').count(), 1);
}
}