Skip to main content

ff_core/
keys.rs

1use crate::partition::Partition;
2use crate::types::{
3    AttemptIndex, BudgetId, EdgeId, ExecutionId, FlowId, LaneId, QuotaPolicyId, SignalId,
4    WaitpointId, WaitpointKey, WorkerInstanceId,
5};
6
7// ─── Execution Partition Keys ({fp:N} post-RFC-011) ───
8//
9// NOTE: docstrings below still reference `{p:N}` as a historical shorthand.
10// Post-RFC-011 all exec-scoped keys hash-tag to `{fp:N}` (the parent flow's
11// partition), enabling atomic multi-key FCALLs across exec_core + flow_core.
12// The `Partition` type's `hash_tag()` method produces the correct `{fp:N}`
13// prefix; the docstring example paths are indicative of structure, not the
14// literal runtime hash-tag.
15
16/// Pre-computed key context for all keys on an execution's partition.
17/// Created once per operation and reused for all key construction.
18pub struct ExecKeyContext {
19    /// The hash tag, e.g. `{p:42}`
20    tag: String,
21    /// The execution ID string
22    eid: String,
23}
24
25impl ExecKeyContext {
26    pub fn new(partition: &Partition, eid: &ExecutionId) -> Self {
27        Self {
28            tag: partition.hash_tag(),
29            eid: eid.to_string(),
30        }
31    }
32
33    // ── Execution Core (RFC-001) ──
34
35    /// `ff:exec:{p:N}:<eid>:core` — Authoritative execution record.
36    pub fn core(&self) -> String {
37        format!("ff:exec:{}:{}:core", self.tag, self.eid)
38    }
39
40    /// `ff:exec:{p:N}:<eid>:payload` — Opaque input payload.
41    pub fn payload(&self) -> String {
42        format!("ff:exec:{}:{}:payload", self.tag, self.eid)
43    }
44
45    /// `ff:exec:{p:N}:<eid>:result` — Opaque result payload.
46    pub fn result(&self) -> String {
47        format!("ff:exec:{}:{}:result", self.tag, self.eid)
48    }
49
50    /// `ff:exec:{p:N}:<eid>:policy` — JSON-encoded ExecutionPolicySnapshot.
51    pub fn policy(&self) -> String {
52        format!("ff:exec:{}:{}:policy", self.tag, self.eid)
53    }
54
55    /// `ff:exec:{p:N}:<eid>:tags` — User-supplied key-value labels.
56    pub fn tags(&self) -> String {
57        format!("ff:exec:{}:{}:tags", self.tag, self.eid)
58    }
59
60    // ── Lease (RFC-003) ──
61
62    /// `ff:exec:{p:N}:<eid>:lease:current` — Full lease object.
63    pub fn lease_current(&self) -> String {
64        format!("ff:exec:{}:{}:lease:current", self.tag, self.eid)
65    }
66
67    /// `ff:exec:{p:N}:<eid>:lease:history` — Append-only lease events.
68    pub fn lease_history(&self) -> String {
69        format!("ff:exec:{}:{}:lease:history", self.tag, self.eid)
70    }
71
72    /// `ff:exec:{p:N}:<eid>:claim_grant` — Ephemeral claim grant.
73    pub fn claim_grant(&self) -> String {
74        format!("ff:exec:{}:{}:claim_grant", self.tag, self.eid)
75    }
76
77    // ── Attempt (RFC-002) ──
78
79    /// `ff:exec:{p:N}:<eid>:attempts` — Attempt index ZSET.
80    pub fn attempts(&self) -> String {
81        format!("ff:exec:{}:{}:attempts", self.tag, self.eid)
82    }
83
84    /// `ff:attempt:{p:N}:<eid>:<attempt_index>` — Per-attempt detail.
85    pub fn attempt_hash(&self, index: AttemptIndex) -> String {
86        format!("ff:attempt:{}:{}:{}", self.tag, self.eid, index)
87    }
88
89    /// `ff:attempt:{p:N}:<eid>:<attempt_index>:usage` — Per-attempt usage counters.
90    pub fn attempt_usage(&self, index: AttemptIndex) -> String {
91        format!("ff:attempt:{}:{}:{}:usage", self.tag, self.eid, index)
92    }
93
94    /// `ff:attempt:{p:N}:<eid>:<attempt_index>:policy` — Frozen attempt policy snapshot.
95    pub fn attempt_policy(&self, index: AttemptIndex) -> String {
96        format!("ff:attempt:{}:{}:{}:policy", self.tag, self.eid, index)
97    }
98
99    // ── Stream (RFC-006) ──
100
101    /// `ff:stream:{p:N}:<eid>:<attempt_index>` — Attempt-scoped output stream.
102    pub fn stream(&self, index: AttemptIndex) -> String {
103        format!("ff:stream:{}:{}:{}", self.tag, self.eid, index)
104    }
105
106    /// `ff:stream:{p:N}:<eid>:<attempt_index>:meta` — Stream metadata.
107    pub fn stream_meta(&self, index: AttemptIndex) -> String {
108        format!("ff:stream:{}:{}:{}:meta", self.tag, self.eid, index)
109    }
110
111    // ── Suspension / Waitpoint (RFC-004) ──
112
113    /// `ff:exec:{p:N}:<eid>:suspension:current` — Current suspension episode.
114    pub fn suspension_current(&self) -> String {
115        format!("ff:exec:{}:{}:suspension:current", self.tag, self.eid)
116    }
117
118    /// `ff:exec:{p:N}:<eid>:waitpoints` — Set of all waitpoint IDs.
119    pub fn waitpoints(&self) -> String {
120        format!("ff:exec:{}:{}:waitpoints", self.tag, self.eid)
121    }
122
123    /// `ff:wp:{p:N}:<wp_id>` — Waitpoint record.
124    pub fn waitpoint(&self, wp_id: &WaitpointId) -> String {
125        format!("ff:wp:{}:{}", self.tag, wp_id)
126    }
127
128    /// `ff:wp:{p:N}:<wp_id>:signals` — Per-waitpoint signal history.
129    pub fn waitpoint_signals(&self, wp_id: &WaitpointId) -> String {
130        format!("ff:wp:{}:{}:signals", self.tag, wp_id)
131    }
132
133    /// `ff:wp:{p:N}:<wp_id>:condition` — Resume condition evaluation state.
134    pub fn waitpoint_condition(&self, wp_id: &WaitpointId) -> String {
135        format!("ff:wp:{}:{}:condition", self.tag, wp_id)
136    }
137
138    // ── Signal (RFC-005) ──
139
140    /// `ff:signal:{p:N}:<signal_id>` — Signal record.
141    pub fn signal(&self, sig_id: &SignalId) -> String {
142        format!("ff:signal:{}:{}", self.tag, sig_id)
143    }
144
145    /// `ff:signal:{p:N}:<signal_id>:payload` — Opaque signal payload.
146    pub fn signal_payload(&self, sig_id: &SignalId) -> String {
147        format!("ff:signal:{}:{}:payload", self.tag, sig_id)
148    }
149
150    /// `ff:exec:{p:N}:<eid>:signals` — Per-execution signal index.
151    pub fn exec_signals(&self) -> String {
152        format!("ff:exec:{}:{}:signals", self.tag, self.eid)
153    }
154
155    /// `ff:sigdedup:{p:N}:<wp_id>:<idem_key>` — Signal idempotency guard.
156    pub fn signal_dedup(&self, wp_id: &WaitpointId, idempotency_key: &str) -> String {
157        format!("ff:sigdedup:{}:{}:{}", self.tag, wp_id, idempotency_key)
158    }
159
160    // ── Flow Dependency — Execution-Local (RFC-007) ──
161
162    /// `ff:exec:{p:N}:<eid>:deps:meta` — Dependency summary.
163    pub fn deps_meta(&self) -> String {
164        format!("ff:exec:{}:{}:deps:meta", self.tag, self.eid)
165    }
166
167    /// `ff:exec:{p:N}:<eid>:dep:<edge_id>` — Per-edge local state.
168    pub fn dep_edge(&self, edge_id: &EdgeId) -> String {
169        format!("ff:exec:{}:{}:dep:{}", self.tag, self.eid, edge_id)
170    }
171
172    /// `ff:exec:{p:N}:<eid>:deps:unresolved` — Set of unresolved edge IDs.
173    pub fn deps_unresolved(&self) -> String {
174        format!("ff:exec:{}:{}:deps:unresolved", self.tag, self.eid)
175    }
176
177    /// `ff:exec:{p:N}:<eid>:deps:all_edges` — Set of ALL applied edge IDs
178    /// on this execution (unresolved + satisfied + impossible). Populated by
179    /// ff_apply_dependency_to_child, never pruned on resolve. Used by the
180    /// retention trimmer to enumerate dep edge hashes without SCAN.
181    pub fn deps_all_edges(&self) -> String {
182        format!("ff:exec:{}:{}:deps:all_edges", self.tag, self.eid)
183    }
184
185    // ── Accessor ──
186
187    /// Dummy key on this partition, used as a placeholder for unused KEYS
188    /// positions (e.g. empty idempotency key). Ensures all KEYS in an FCALL
189    /// share the same hash tag, preventing CrossSlot errors in cluster mode.
190    pub fn noop(&self) -> String {
191        format!("ff:noop:{}", self.tag)
192    }
193
194    pub fn hash_tag(&self) -> &str {
195        &self.tag
196    }
197
198    pub fn execution_id_str(&self) -> &str {
199        &self.eid
200    }
201}
202
203// ─── Partition-Local Index Keys ({p:N}) ───
204
205/// Index keys scoped to a single execution partition.
206pub struct IndexKeys {
207    tag: String,
208}
209
210impl IndexKeys {
211    pub fn new(partition: &Partition) -> Self {
212        Self {
213            tag: partition.hash_tag(),
214        }
215    }
216
217    /// `ff:idx:{p:N}:lane:<lane_id>:eligible`
218    pub fn lane_eligible(&self, lane_id: &LaneId) -> String {
219        format!("ff:idx:{}:lane:{}:eligible", self.tag, lane_id)
220    }
221
222    /// `ff:idx:{p:N}:lane:<lane_id>:delayed`
223    pub fn lane_delayed(&self, lane_id: &LaneId) -> String {
224        format!("ff:idx:{}:lane:{}:delayed", self.tag, lane_id)
225    }
226
227    /// `ff:idx:{p:N}:lane:<lane_id>:active`
228    pub fn lane_active(&self, lane_id: &LaneId) -> String {
229        format!("ff:idx:{}:lane:{}:active", self.tag, lane_id)
230    }
231
232    /// `ff:idx:{p:N}:lane:<lane_id>:terminal`
233    pub fn lane_terminal(&self, lane_id: &LaneId) -> String {
234        format!("ff:idx:{}:lane:{}:terminal", self.tag, lane_id)
235    }
236
237    /// `ff:idx:{p:N}:lane:<lane_id>:blocked:dependencies`
238    pub fn lane_blocked_dependencies(&self, lane_id: &LaneId) -> String {
239        format!("ff:idx:{}:lane:{}:blocked:dependencies", self.tag, lane_id)
240    }
241
242    /// `ff:idx:{p:N}:lane:<lane_id>:blocked:budget`
243    pub fn lane_blocked_budget(&self, lane_id: &LaneId) -> String {
244        format!("ff:idx:{}:lane:{}:blocked:budget", self.tag, lane_id)
245    }
246
247    /// `ff:idx:{p:N}:lane:<lane_id>:blocked:quota`
248    pub fn lane_blocked_quota(&self, lane_id: &LaneId) -> String {
249        format!("ff:idx:{}:lane:{}:blocked:quota", self.tag, lane_id)
250    }
251
252    /// `ff:idx:{p:N}:lane:<lane_id>:blocked:route`
253    pub fn lane_blocked_route(&self, lane_id: &LaneId) -> String {
254        format!("ff:idx:{}:lane:{}:blocked:route", self.tag, lane_id)
255    }
256
257    /// `ff:idx:{p:N}:lane:<lane_id>:blocked:operator`
258    pub fn lane_blocked_operator(&self, lane_id: &LaneId) -> String {
259        format!("ff:idx:{}:lane:{}:blocked:operator", self.tag, lane_id)
260    }
261
262    /// `ff:idx:{p:N}:lane:<lane_id>:suspended`
263    pub fn lane_suspended(&self, lane_id: &LaneId) -> String {
264        format!("ff:idx:{}:lane:{}:suspended", self.tag, lane_id)
265    }
266
267    /// `ff:sec:{p:N}:waitpoint_hmac` — HMAC signing secrets replicated
268    /// across every execution partition (RFC-004 §Waitpoint Security).
269    /// Hash fields:
270    ///   `current_kid`, `previous_kid`, `secret:<kid>` (hex), `previous_expires_at`.
271    /// Replication is required for cluster mode: every FCALL that mints or
272    /// validates a token must hash-tag-collocate this key with the rest of
273    /// its execution-partition KEYS. The secret value is identical across
274    /// partitions; rotation fans out HSETs across them.
275    pub fn waitpoint_hmac_secrets(&self) -> String {
276        format!("ff:sec:{}:waitpoint_hmac", self.tag)
277    }
278
279    /// `ff:idx:{p:N}:lease_expiry` — Cross-lane lease expiry scanner target.
280    pub fn lease_expiry(&self) -> String {
281        format!("ff:idx:{}:lease_expiry", self.tag)
282    }
283
284    /// `ff:idx:{p:N}:worker:<worker_instance_id>:leases`
285    pub fn worker_leases(&self, wid: &WorkerInstanceId) -> String {
286        format!("ff:idx:{}:worker:{}:leases", self.tag, wid)
287    }
288
289    /// `ff:idx:{p:N}:suspension_timeout`
290    pub fn suspension_timeout(&self) -> String {
291        format!("ff:idx:{}:suspension_timeout", self.tag)
292    }
293
294    /// `ff:idx:{p:N}:pending_waitpoint_expiry`
295    pub fn pending_waitpoint_expiry(&self) -> String {
296        format!("ff:idx:{}:pending_waitpoint_expiry", self.tag)
297    }
298
299    /// `ff:idx:{p:N}:attempt_timeout`
300    pub fn attempt_timeout(&self) -> String {
301        format!("ff:idx:{}:attempt_timeout", self.tag)
302    }
303
304    /// `ff:idx:{p:N}:execution_deadline`
305    pub fn execution_deadline(&self) -> String {
306        format!("ff:idx:{}:execution_deadline", self.tag)
307    }
308
309    /// `ff:idx:{p:N}:all_executions`
310    pub fn all_executions(&self) -> String {
311        format!("ff:idx:{}:all_executions", self.tag)
312    }
313}
314
315// ─── Flow Partition Keys ({fp:N}) ───
316
317/// Key context for flow-structural partition keys.
318pub struct FlowKeyContext {
319    tag: String,
320    fid: String,
321}
322
323impl FlowKeyContext {
324    pub fn new(partition: &Partition, fid: &FlowId) -> Self {
325        Self {
326            tag: partition.hash_tag(),
327            fid: fid.to_string(),
328        }
329    }
330
331    /// `ff:flow:{fp:N}:<flow_id>:core`
332    pub fn core(&self) -> String {
333        format!("ff:flow:{}:{}:core", self.tag, self.fid)
334    }
335
336    /// `ff:flow:{fp:N}:<flow_id>:members`
337    pub fn members(&self) -> String {
338        format!("ff:flow:{}:{}:members", self.tag, self.fid)
339    }
340
341    /// `ff:flow:{fp:N}:<flow_id>:member:<eid>`
342    pub fn member(&self, eid: &ExecutionId) -> String {
343        format!("ff:flow:{}:{}:member:{}", self.tag, self.fid, eid)
344    }
345
346    /// `ff:flow:{fp:N}:<flow_id>:edge:<edge_id>`
347    pub fn edge(&self, edge_id: &EdgeId) -> String {
348        format!("ff:flow:{}:{}:edge:{}", self.tag, self.fid, edge_id)
349    }
350
351    /// `ff:flow:{fp:N}:<flow_id>:out:<upstream_eid>`
352    pub fn outgoing(&self, upstream_eid: &ExecutionId) -> String {
353        format!("ff:flow:{}:{}:out:{}", self.tag, self.fid, upstream_eid)
354    }
355
356    /// `ff:flow:{fp:N}:<flow_id>:in:<downstream_eid>`
357    pub fn incoming(&self, downstream_eid: &ExecutionId) -> String {
358        format!("ff:flow:{}:{}:in:{}", self.tag, self.fid, downstream_eid)
359    }
360
361    /// `ff:flow:{fp:N}:<flow_id>:events`
362    pub fn events(&self) -> String {
363        format!("ff:flow:{}:{}:events", self.tag, self.fid)
364    }
365
366    /// `ff:flow:{fp:N}:<flow_id>:summary`
367    pub fn summary(&self) -> String {
368        format!("ff:flow:{}:{}:summary", self.tag, self.fid)
369    }
370
371    /// `ff:flow:{fp:N}:<flow_id>:grant:<mutation_id>`
372    pub fn grant(&self, mutation_id: &str) -> String {
373        format!("ff:flow:{}:{}:grant:{}", self.tag, self.fid, mutation_id)
374    }
375
376    pub fn hash_tag(&self) -> &str {
377        &self.tag
378    }
379}
380
381/// Flow-partition index keys.
382pub struct FlowIndexKeys {
383    tag: String,
384}
385
386impl FlowIndexKeys {
387    pub fn new(partition: &Partition) -> Self {
388        Self {
389            tag: partition.hash_tag(),
390        }
391    }
392
393    /// `ff:idx:{fp:N}:flow_index` — SET of flow IDs on this partition.
394    /// Used by the flow projector for cluster-safe discovery (replaces SCAN).
395    pub fn flow_index(&self) -> String {
396        format!("ff:idx:{}:flow_index", self.tag)
397    }
398}
399
400// ─── Budget Partition Keys ({b:M}) ───
401
402/// Key context for budget partition keys.
403pub struct BudgetKeyContext {
404    tag: String,
405    bid: String,
406}
407
408impl BudgetKeyContext {
409    pub fn new(partition: &Partition, bid: &BudgetId) -> Self {
410        Self {
411            tag: partition.hash_tag(),
412            bid: bid.to_string(),
413        }
414    }
415
416    /// `ff:budget:{b:M}:<budget_id>` — Budget definition.
417    pub fn definition(&self) -> String {
418        format!("ff:budget:{}:{}", self.tag, self.bid)
419    }
420
421    /// `ff:budget:{b:M}:<budget_id>:limits` — Hard/soft limits per dimension.
422    pub fn limits(&self) -> String {
423        format!("ff:budget:{}:{}:limits", self.tag, self.bid)
424    }
425
426    /// `ff:budget:{b:M}:<budget_id>:usage` — Usage counters.
427    pub fn usage(&self) -> String {
428        format!("ff:budget:{}:{}:usage", self.tag, self.bid)
429    }
430
431    /// `ff:budget:{b:M}:<budget_id>:executions` — Reverse index.
432    pub fn executions(&self) -> String {
433        format!("ff:budget:{}:{}:executions", self.tag, self.bid)
434    }
435
436    pub fn hash_tag(&self) -> &str {
437        &self.tag
438    }
439}
440
441/// Budget attachment key (not budget-ID scoped).
442pub fn budget_attach_key(tag: &str, scope_type: &str, scope_id: &str) -> String {
443    format!("ff:budget_attach:{}:{}:{}", tag, scope_type, scope_id)
444}
445
446/// Budget reset schedule index.
447pub fn budget_resets_key(tag: &str) -> String {
448    format!("ff:idx:{}:budget_resets", tag)
449}
450
451/// Budget policies index — SET of budget IDs on this partition.
452/// Used by the budget reconciler for cluster-safe discovery (replaces SCAN).
453pub fn budget_policies_index(tag: &str) -> String {
454    format!("ff:idx:{}:budget_policies", tag)
455}
456
457// ─── Quota Partition Keys ({q:K}) ───
458
459/// Key context for quota partition keys.
460pub struct QuotaKeyContext {
461    tag: String,
462    qid: String,
463}
464
465impl QuotaKeyContext {
466    pub fn new(partition: &Partition, qid: &QuotaPolicyId) -> Self {
467        Self {
468            tag: partition.hash_tag(),
469            qid: qid.to_string(),
470        }
471    }
472
473    /// `ff:quota:{q:K}:<quota_policy_id>` — Quota policy definition.
474    pub fn definition(&self) -> String {
475        format!("ff:quota:{}:{}", self.tag, self.qid)
476    }
477
478    /// `ff:quota:{q:K}:<quota_policy_id>:window:<dimension>`
479    pub fn window(&self, dimension: &str) -> String {
480        format!("ff:quota:{}:{}:window:{}", self.tag, self.qid, dimension)
481    }
482
483    /// `ff:quota:{q:K}:<quota_policy_id>:concurrency`
484    pub fn concurrency(&self) -> String {
485        format!("ff:quota:{}:{}:concurrency", self.tag, self.qid)
486    }
487
488    /// `ff:quota:{q:K}:<quota_policy_id>:admitted:<execution_id>`
489    pub fn admitted(&self, eid: &ExecutionId) -> String {
490        format!("ff:quota:{}:{}:admitted:{}", self.tag, self.qid, eid)
491    }
492
493    /// `ff:quota:{q:K}:<quota_policy_id>:admitted_set` — SET of admitted execution IDs.
494    /// Used by the quota reconciler instead of SCAN (cluster-safe).
495    pub fn admitted_set(&self) -> String {
496        format!("ff:quota:{}:{}:admitted_set", self.tag, self.qid)
497    }
498
499    pub fn hash_tag(&self) -> &str {
500        &self.tag
501    }
502}
503
504/// Quota policy index — SET of policy IDs on this partition.
505/// Used by the quota reconciler for cluster-safe discovery (replaces SCAN).
506pub fn quota_policies_index(tag: &str) -> String {
507    format!("ff:idx:{}:quota_policies", tag)
508}
509
510/// Quota attachment key.
511pub fn quota_attach_key(tag: &str, scope_type: &str, scope_id: &str) -> String {
512    format!("ff:quota_attach:{}:{}:{}", tag, scope_type, scope_id)
513}
514
515// ─── Global Keys (no hash tag) ───
516
517/// Lane configuration key.
518pub fn lane_config_key(lane_id: &LaneId) -> String {
519    format!("ff:lane:{}:config", lane_id)
520}
521
522/// Lane counts key.
523pub fn lane_counts_key(lane_id: &LaneId) -> String {
524    format!("ff:lane:{}:counts", lane_id)
525}
526
527/// Worker registration key.
528pub fn worker_key(wid: &WorkerInstanceId) -> String {
529    format!("ff:worker:{}", wid)
530}
531
532/// Non-authoritative capability advertisement STRING for a worker
533/// (sorted CSV). Written by `ff-sdk::FlowFabricWorker::connect`, read by
534/// the engine's unblock scanner to decide whether a `blocked_by_route`
535/// execution has a matching worker. Cluster mode: the key lands on
536/// whatever slot CRC16 hashes to — enumeration goes through
537/// `workers_index_key()` rather than a keyspace SCAN, which would only
538/// hit one shard in cluster mode.
539pub fn worker_caps_key(wid: &WorkerInstanceId) -> String {
540    format!("ff:worker:{}:caps", wid)
541}
542
543/// Global worker index — SET of connected worker_instance_ids. Single
544/// slot in cluster mode (no hash tag; CRC16 of the literal key). SADD on
545/// connect, SREM on empty-caps restart; SMEMBERS gives the enumerable
546/// universe the unblock scanner walks. Separate from `ff:worker:{id}`
547/// registration keys to keep the index membership cheap to read and
548/// independent of per-worker hash details.
549pub fn workers_index_key() -> String {
550    "ff:idx:workers".to_owned()
551}
552
553/// Worker capability index.
554pub fn workers_capability_key(key: &str, value: &str) -> String {
555    format!("ff:idx:workers:cap:{}:{}", key, value)
556}
557
558/// Lane registry.
559pub fn lanes_index_key() -> String {
560    "ff:idx:lanes".to_owned()
561}
562
563/// Partition configuration — `ff:config:partitions` (§8.3).
564/// Validated on startup; created on first boot.
565pub fn global_config_partitions() -> String {
566    "ff:config:partitions".to_owned()
567}
568
569// ─── Cross-Partition Secondary Indexes ───
570
571/// `ff:ns:<namespace>:executions`
572pub fn namespace_executions_key(namespace: &str) -> String {
573    format!("ff:ns:{}:executions", namespace)
574}
575
576/// `ff:idem:{p:N}:<namespace>:<idempotency_key>`
577///
578/// Includes the execution partition hash tag so the key hashes to the same
579/// Valkey cluster slot as all other KEYS in the ff_create_execution FCALL.
580pub fn idempotency_key(tag: &str, namespace: &str, idem_key: &str) -> String {
581    format!("ff:idem:{}:{}:{}", tag, namespace, idem_key)
582}
583
584/// Placeholder key that shares a hash tag with other KEYS in the same FCALL.
585///
586/// Used when an optional key (e.g. idempotency key) is absent. The Lua
587/// function never reads/writes this key, but Valkey cluster requires all
588/// KEYS in a single FCALL to hash to the same slot.
589pub fn noop_key(tag: &str) -> String {
590    format!("ff:noop:{}", tag)
591}
592
593/// `ff:tag:<namespace>:<key>:<value>`
594pub fn tag_index_key(namespace: &str, key: &str, value: &str) -> String {
595    format!("ff:tag:{}:{}:{}", namespace, key, value)
596}
597
598/// Waitpoint key resolution — `ff:wpkey:{p:N}:<waitpoint_key>`
599pub fn waitpoint_key_resolution(tag: &str, wp_key: &WaitpointKey) -> String {
600    format!("ff:wpkey:{}:{}", tag, wp_key)
601}
602
603/// Shared prefix for the usage-dedup keyspace. Must match the
604/// `ff:usagededup:` literal referenced in `lua/**.lua` (notably
605/// `lua/budget.lua:99`, the `ff_report_usage_and_check` function).
606/// Grep `ff:usagededup:` to find all producers, consumers, and test
607/// fixtures in one search.
608pub const USAGE_DEDUP_KEY_PREFIX: &str = "ff:usagededup:";
609
610/// Build a usage-dedup key: `ff:usagededup:<hash_tag>:<dedup_id>`.
611///
612/// `hash_tag` must ALREADY include the Valkey hash-tag braces
613/// (e.g. `"{bp:7}"`) — typically obtained from
614/// [`BudgetKeyContext::hash_tag`]. Do not double-wrap.
615pub fn usage_dedup_key(hash_tag: &str, dedup_id: &str) -> String {
616    format!("{USAGE_DEDUP_KEY_PREFIX}{hash_tag}:{dedup_id}")
617}
618
619#[cfg(test)]
620mod tests {
621    use super::*;
622    use crate::partition::{execution_partition, flow_partition, PartitionConfig};
623
624    #[test]
625    fn exec_key_context_core_format() {
626        let config = PartitionConfig::default();
627        let eid = ExecutionId::parse("{fp:0}:550e8400-e29b-41d4-a716-446655440000").unwrap();
628        let partition = execution_partition(&eid, &config);
629        let ctx = ExecKeyContext::new(&partition, &eid);
630
631        let core_key = ctx.core();
632        // Post-RFC-011: exec keys co-locate on flow partitions ({fp:*}).
633        assert!(core_key.starts_with("ff:exec:{fp:"));
634        assert!(core_key.ends_with(":core"));
635        assert!(core_key.contains("550e8400-e29b-41d4-a716-446655440000"));
636    }
637
638    #[test]
639    fn exec_key_all_keys_share_hash_tag() {
640        let config = PartitionConfig::default();
641        let eid = ExecutionId::for_flow(&FlowId::new(), &config);
642        let partition = execution_partition(&eid, &config);
643        let ctx = ExecKeyContext::new(&partition, &eid);
644        let tag = ctx.hash_tag();
645
646        // All keys must contain the same hash tag
647        assert!(ctx.core().contains(tag));
648        assert!(ctx.payload().contains(tag));
649        assert!(ctx.result().contains(tag));
650        assert!(ctx.policy().contains(tag));
651        assert!(ctx.lease_current().contains(tag));
652        assert!(ctx.lease_history().contains(tag));
653        assert!(ctx.attempts().contains(tag));
654        assert!(ctx.suspension_current().contains(tag));
655        assert!(ctx.exec_signals().contains(tag));
656    }
657
658    #[test]
659    fn attempt_key_includes_index() {
660        let config = PartitionConfig::default();
661        let eid = ExecutionId::for_flow(&FlowId::new(), &config);
662        let partition = execution_partition(&eid, &config);
663        let ctx = ExecKeyContext::new(&partition, &eid);
664
665        let key = ctx.attempt_hash(AttemptIndex::new(3));
666        assert!(key.contains(":3"), "attempt key should contain index");
667    }
668
669    #[test]
670    fn stream_key_format() {
671        let config = PartitionConfig::default();
672        let eid = ExecutionId::for_flow(&FlowId::new(), &config);
673        let partition = execution_partition(&eid, &config);
674        let ctx = ExecKeyContext::new(&partition, &eid);
675
676        let key = ctx.stream(AttemptIndex::new(0));
677        assert!(key.starts_with("ff:stream:{fp:"));
678        assert!(key.ends_with(":0"));
679    }
680
681    #[test]
682    fn index_keys_format() {
683        let config = PartitionConfig::default();
684        let eid = ExecutionId::for_flow(&FlowId::new(), &config);
685        let partition = execution_partition(&eid, &config);
686        let idx = IndexKeys::new(&partition);
687        let lane = LaneId::new("default");
688
689        assert!(idx.lane_eligible(&lane).contains(":lane:default:eligible"));
690        assert!(idx.lane_delayed(&lane).contains(":lane:default:delayed"));
691        assert!(idx.lease_expiry().contains(":lease_expiry"));
692        assert!(idx.all_executions().contains(":all_executions"));
693    }
694
695    #[test]
696    fn flow_key_context_format() {
697        let config = PartitionConfig::default();
698        let fid = FlowId::new();
699        let partition = flow_partition(&fid, &config);
700        let ctx = FlowKeyContext::new(&partition, &fid);
701
702        assert!(ctx.core().starts_with("ff:flow:{fp:"));
703        assert!(ctx.core().ends_with(":core"));
704        assert!(ctx.members().ends_with(":members"));
705    }
706
707    #[test]
708    fn global_keys_no_hash_tag() {
709        let lane = LaneId::new("default");
710        let key = lane_config_key(&lane);
711        assert_eq!(key, "ff:lane:default:config");
712        assert!(!key.contains('{'));
713    }
714
715    #[test]
716    fn usage_dedup_key_format() {
717        // hash_tag already includes braces — helper must not double-wrap.
718        let key = usage_dedup_key("{bp:7}", "dedup-123");
719        assert_eq!(key, "ff:usagededup:{bp:7}:dedup-123");
720        assert!(key.starts_with(USAGE_DEDUP_KEY_PREFIX));
721        // Exactly one `{…}` hash-tag region.
722        assert_eq!(key.matches('{').count(), 1);
723        assert_eq!(key.matches('}').count(), 1);
724    }
725}