Skip to main content

ff_core/
keys.rs

1use crate::partition::Partition;
2use crate::types::{
3    AttemptIndex, BudgetId, EdgeId, ExecutionId, FlowId, LaneId, QuotaPolicyId, SignalId,
4    WaitpointId, WaitpointKey, WorkerInstanceId,
5};
6
7// ─── Execution Partition Keys ({fp:N} post-RFC-011) ───
8//
9// NOTE: docstrings below still reference `{p:N}` as a historical shorthand.
10// Post-RFC-011 all exec-scoped keys hash-tag to `{fp:N}` (the parent flow's
11// partition), enabling atomic multi-key FCALLs across exec_core + flow_core.
12// The `Partition` type's `hash_tag()` method produces the correct `{fp:N}`
13// prefix; the docstring example paths are indicative of structure, not the
14// literal runtime hash-tag.
15
16/// Pre-computed key context for all keys on an execution's partition.
17/// Created once per operation and reused for all key construction.
18pub struct ExecKeyContext {
19    /// The hash tag, e.g. `{p:42}`
20    tag: String,
21    /// The execution ID string
22    eid: String,
23}
24
25impl ExecKeyContext {
26    pub fn new(partition: &Partition, eid: &ExecutionId) -> Self {
27        Self {
28            tag: partition.hash_tag(),
29            eid: eid.to_string(),
30        }
31    }
32
33    // ── Execution Core (RFC-001) ──
34
35    /// `ff:exec:{p:N}:<eid>:core` — Authoritative execution record.
36    pub fn core(&self) -> String {
37        format!("ff:exec:{}:{}:core", self.tag, self.eid)
38    }
39
40    /// `ff:exec:{p:N}:<eid>:payload` — Opaque input payload.
41    pub fn payload(&self) -> String {
42        format!("ff:exec:{}:{}:payload", self.tag, self.eid)
43    }
44
45    /// `ff:exec:{p:N}:<eid>:result` — Opaque result payload.
46    pub fn result(&self) -> String {
47        format!("ff:exec:{}:{}:result", self.tag, self.eid)
48    }
49
50    /// `ff:exec:{p:N}:<eid>:policy` — JSON-encoded ExecutionPolicySnapshot.
51    pub fn policy(&self) -> String {
52        format!("ff:exec:{}:{}:policy", self.tag, self.eid)
53    }
54
55    /// `ff:exec:{p:N}:<eid>:tags` — User-supplied key-value labels.
56    pub fn tags(&self) -> String {
57        format!("ff:exec:{}:{}:tags", self.tag, self.eid)
58    }
59
60    // ── Lease (RFC-003) ──
61
62    /// `ff:exec:{p:N}:<eid>:lease:current` — Full lease object.
63    pub fn lease_current(&self) -> String {
64        format!("ff:exec:{}:{}:lease:current", self.tag, self.eid)
65    }
66
67    /// `ff:exec:{p:N}:<eid>:lease:history` — Append-only lease events.
68    pub fn lease_history(&self) -> String {
69        format!("ff:exec:{}:{}:lease:history", self.tag, self.eid)
70    }
71
72    /// `ff:exec:{p:N}:<eid>:claim_grant` — Ephemeral claim grant.
73    pub fn claim_grant(&self) -> String {
74        format!("ff:exec:{}:{}:claim_grant", self.tag, self.eid)
75    }
76
77    // ── Attempt (RFC-002) ──
78
79    /// `ff:exec:{p:N}:<eid>:attempts` — Attempt index ZSET.
80    pub fn attempts(&self) -> String {
81        format!("ff:exec:{}:{}:attempts", self.tag, self.eid)
82    }
83
84    /// `ff:attempt:{p:N}:<eid>:<attempt_index>` — Per-attempt detail.
85    pub fn attempt_hash(&self, index: AttemptIndex) -> String {
86        format!("ff:attempt:{}:{}:{}", self.tag, self.eid, index)
87    }
88
89    /// `ff:attempt:{p:N}:<eid>:<attempt_index>:usage` — Per-attempt usage counters.
90    pub fn attempt_usage(&self, index: AttemptIndex) -> String {
91        format!("ff:attempt:{}:{}:{}:usage", self.tag, self.eid, index)
92    }
93
94    /// `ff:attempt:{p:N}:<eid>:<attempt_index>:policy` — Frozen attempt policy snapshot.
95    pub fn attempt_policy(&self, index: AttemptIndex) -> String {
96        format!("ff:attempt:{}:{}:{}:policy", self.tag, self.eid, index)
97    }
98
99    // ── Stream (RFC-006) ──
100
101    /// `ff:stream:{p:N}:<eid>:<attempt_index>` — Attempt-scoped output stream.
102    pub fn stream(&self, index: AttemptIndex) -> String {
103        format!("ff:stream:{}:{}:{}", self.tag, self.eid, index)
104    }
105
106    /// `ff:stream:{p:N}:<eid>:<attempt_index>:meta` — Stream metadata.
107    pub fn stream_meta(&self, index: AttemptIndex) -> String {
108        format!("ff:stream:{}:{}:{}:meta", self.tag, self.eid, index)
109    }
110
111    // ── Suspension / Waitpoint (RFC-004) ──
112
113    /// `ff:exec:{p:N}:<eid>:suspension:current` — Current suspension episode.
114    pub fn suspension_current(&self) -> String {
115        format!("ff:exec:{}:{}:suspension:current", self.tag, self.eid)
116    }
117
118    /// `ff:exec:{p:N}:<eid>:waitpoints` — Set of all waitpoint IDs.
119    pub fn waitpoints(&self) -> String {
120        format!("ff:exec:{}:{}:waitpoints", self.tag, self.eid)
121    }
122
123    /// `ff:wp:{p:N}:<wp_id>` — Waitpoint record.
124    pub fn waitpoint(&self, wp_id: &WaitpointId) -> String {
125        format!("ff:wp:{}:{}", self.tag, wp_id)
126    }
127
128    /// `ff:wp:{p:N}:<wp_id>:signals` — Per-waitpoint signal history.
129    pub fn waitpoint_signals(&self, wp_id: &WaitpointId) -> String {
130        format!("ff:wp:{}:{}:signals", self.tag, wp_id)
131    }
132
133    /// `ff:wp:{p:N}:<wp_id>:condition` — Resume condition evaluation state.
134    pub fn waitpoint_condition(&self, wp_id: &WaitpointId) -> String {
135        format!("ff:wp:{}:{}:condition", self.tag, wp_id)
136    }
137
138    // ── Signal (RFC-005) ──
139
140    /// `ff:signal:{p:N}:<signal_id>` — Signal record.
141    pub fn signal(&self, sig_id: &SignalId) -> String {
142        format!("ff:signal:{}:{}", self.tag, sig_id)
143    }
144
145    /// `ff:signal:{p:N}:<signal_id>:payload` — Opaque signal payload.
146    pub fn signal_payload(&self, sig_id: &SignalId) -> String {
147        format!("ff:signal:{}:{}:payload", self.tag, sig_id)
148    }
149
150    /// `ff:exec:{p:N}:<eid>:signals` — Per-execution signal index.
151    pub fn exec_signals(&self) -> String {
152        format!("ff:exec:{}:{}:signals", self.tag, self.eid)
153    }
154
155    /// `ff:sigdedup:{p:N}:<wp_id>:<idem_key>` — Signal idempotency guard.
156    pub fn signal_dedup(&self, wp_id: &WaitpointId, idempotency_key: &str) -> String {
157        format!("ff:sigdedup:{}:{}:{}", self.tag, wp_id, idempotency_key)
158    }
159
160    // ── Flow Dependency — Execution-Local (RFC-007) ──
161
162    /// `ff:exec:{p:N}:<eid>:deps:meta` — Dependency summary.
163    pub fn deps_meta(&self) -> String {
164        format!("ff:exec:{}:{}:deps:meta", self.tag, self.eid)
165    }
166
167    /// `ff:exec:{p:N}:<eid>:dep:<edge_id>` — Per-edge local state.
168    pub fn dep_edge(&self, edge_id: &EdgeId) -> String {
169        format!("ff:exec:{}:{}:dep:{}", self.tag, self.eid, edge_id)
170    }
171
172    /// `ff:exec:{p:N}:<eid>:deps:unresolved` — Set of unresolved edge IDs.
173    pub fn deps_unresolved(&self) -> String {
174        format!("ff:exec:{}:{}:deps:unresolved", self.tag, self.eid)
175    }
176
177    /// `ff:exec:{p:N}:<eid>:deps:all_edges` — Set of ALL applied edge IDs
178    /// on this execution (unresolved + satisfied + impossible). Populated by
179    /// ff_apply_dependency_to_child, never pruned on resolve. Used by the
180    /// retention trimmer to enumerate dep edge hashes without SCAN.
181    pub fn deps_all_edges(&self) -> String {
182        format!("ff:exec:{}:{}:deps:all_edges", self.tag, self.eid)
183    }
184
185    // ── Accessor ──
186
187    /// Dummy key on this partition, used as a placeholder for unused KEYS
188    /// positions (e.g. empty idempotency key). Ensures all KEYS in an FCALL
189    /// share the same hash tag, preventing CrossSlot errors in cluster mode.
190    pub fn noop(&self) -> String {
191        format!("ff:noop:{}", self.tag)
192    }
193
194    pub fn hash_tag(&self) -> &str {
195        &self.tag
196    }
197
198    pub fn execution_id_str(&self) -> &str {
199        &self.eid
200    }
201}
202
203// ─── Partition-Local Index Keys ({p:N}) ───
204
205/// Index keys scoped to a single execution partition.
206pub struct IndexKeys {
207    tag: String,
208}
209
210impl IndexKeys {
211    pub fn new(partition: &Partition) -> Self {
212        Self {
213            tag: partition.hash_tag(),
214        }
215    }
216
217    /// `ff:idx:{p:N}:lane:<lane_id>:eligible`
218    pub fn lane_eligible(&self, lane_id: &LaneId) -> String {
219        format!("ff:idx:{}:lane:{}:eligible", self.tag, lane_id)
220    }
221
222    /// `ff:idx:{p:N}:lane:<lane_id>:delayed`
223    pub fn lane_delayed(&self, lane_id: &LaneId) -> String {
224        format!("ff:idx:{}:lane:{}:delayed", self.tag, lane_id)
225    }
226
227    /// `ff:idx:{p:N}:lane:<lane_id>:active`
228    pub fn lane_active(&self, lane_id: &LaneId) -> String {
229        format!("ff:idx:{}:lane:{}:active", self.tag, lane_id)
230    }
231
232    /// `ff:idx:{p:N}:lane:<lane_id>:terminal`
233    pub fn lane_terminal(&self, lane_id: &LaneId) -> String {
234        format!("ff:idx:{}:lane:{}:terminal", self.tag, lane_id)
235    }
236
237    /// `ff:idx:{p:N}:lane:<lane_id>:blocked:dependencies`
238    pub fn lane_blocked_dependencies(&self, lane_id: &LaneId) -> String {
239        format!("ff:idx:{}:lane:{}:blocked:dependencies", self.tag, lane_id)
240    }
241
242    /// `ff:idx:{p:N}:lane:<lane_id>:blocked:budget`
243    pub fn lane_blocked_budget(&self, lane_id: &LaneId) -> String {
244        format!("ff:idx:{}:lane:{}:blocked:budget", self.tag, lane_id)
245    }
246
247    /// `ff:idx:{p:N}:lane:<lane_id>:blocked:quota`
248    pub fn lane_blocked_quota(&self, lane_id: &LaneId) -> String {
249        format!("ff:idx:{}:lane:{}:blocked:quota", self.tag, lane_id)
250    }
251
252    /// `ff:idx:{p:N}:lane:<lane_id>:blocked:route`
253    pub fn lane_blocked_route(&self, lane_id: &LaneId) -> String {
254        format!("ff:idx:{}:lane:{}:blocked:route", self.tag, lane_id)
255    }
256
257    /// `ff:idx:{p:N}:lane:<lane_id>:blocked:operator`
258    pub fn lane_blocked_operator(&self, lane_id: &LaneId) -> String {
259        format!("ff:idx:{}:lane:{}:blocked:operator", self.tag, lane_id)
260    }
261
262    /// `ff:idx:{p:N}:lane:<lane_id>:suspended`
263    pub fn lane_suspended(&self, lane_id: &LaneId) -> String {
264        format!("ff:idx:{}:lane:{}:suspended", self.tag, lane_id)
265    }
266
267    /// `ff:sec:{p:N}:waitpoint_hmac` — HMAC signing secrets replicated
268    /// across every execution partition (RFC-004 §Waitpoint Security).
269    /// Hash fields:
270    ///   `current_kid`, `previous_kid`, `secret:<kid>` (hex), `previous_expires_at`.
271    /// Replication is required for cluster mode: every FCALL that mints or
272    /// validates a token must hash-tag-collocate this key with the rest of
273    /// its execution-partition KEYS. The secret value is identical across
274    /// partitions; rotation fans out HSETs across them.
275    pub fn waitpoint_hmac_secrets(&self) -> String {
276        format!("ff:sec:{}:waitpoint_hmac", self.tag)
277    }
278
279    /// `ff:idx:{p:N}:lease_expiry` — Cross-lane lease expiry scanner target.
280    pub fn lease_expiry(&self) -> String {
281        format!("ff:idx:{}:lease_expiry", self.tag)
282    }
283
284    /// `ff:idx:{p:N}:worker:<worker_instance_id>:leases`
285    pub fn worker_leases(&self, wid: &WorkerInstanceId) -> String {
286        format!("ff:idx:{}:worker:{}:leases", self.tag, wid)
287    }
288
289    /// `ff:idx:{p:N}:suspension_timeout`
290    pub fn suspension_timeout(&self) -> String {
291        format!("ff:idx:{}:suspension_timeout", self.tag)
292    }
293
294    /// `ff:idx:{p:N}:pending_waitpoint_expiry`
295    pub fn pending_waitpoint_expiry(&self) -> String {
296        format!("ff:idx:{}:pending_waitpoint_expiry", self.tag)
297    }
298
299    /// `ff:idx:{p:N}:attempt_timeout`
300    pub fn attempt_timeout(&self) -> String {
301        format!("ff:idx:{}:attempt_timeout", self.tag)
302    }
303
304    /// `ff:idx:{p:N}:execution_deadline`
305    pub fn execution_deadline(&self) -> String {
306        format!("ff:idx:{}:execution_deadline", self.tag)
307    }
308
309    /// `ff:idx:{p:N}:all_executions`
310    pub fn all_executions(&self) -> String {
311        format!("ff:idx:{}:all_executions", self.tag)
312    }
313}
314
315// ─── Flow Partition Keys ({fp:N}) ───
316
317/// Key context for flow-structural partition keys.
318pub struct FlowKeyContext {
319    tag: String,
320    fid: String,
321}
322
323impl FlowKeyContext {
324    pub fn new(partition: &Partition, fid: &FlowId) -> Self {
325        Self {
326            tag: partition.hash_tag(),
327            fid: fid.to_string(),
328        }
329    }
330
331    /// `ff:flow:{fp:N}:<flow_id>:core`
332    pub fn core(&self) -> String {
333        format!("ff:flow:{}:{}:core", self.tag, self.fid)
334    }
335
336    /// `ff:flow:{fp:N}:<flow_id>:members`
337    pub fn members(&self) -> String {
338        format!("ff:flow:{}:{}:members", self.tag, self.fid)
339    }
340
341    /// `ff:flow:{fp:N}:<flow_id>:tags` — User-supplied key-value labels.
342    /// Symmetric with [`ExecKeyContext::tags`]. Populated by
343    /// `ff_set_flow_tags`, which also lazy-migrates any pre-58.4
344    /// reserved-namespace fields stashed inline on `flow_core`.
345    pub fn tags(&self) -> String {
346        format!("ff:flow:{}:{}:tags", self.tag, self.fid)
347    }
348
349    /// `ff:flow:{fp:N}:<flow_id>:member:<eid>`
350    pub fn member(&self, eid: &ExecutionId) -> String {
351        format!("ff:flow:{}:{}:member:{}", self.tag, self.fid, eid)
352    }
353
354    /// `ff:flow:{fp:N}:<flow_id>:edge:<edge_id>`
355    pub fn edge(&self, edge_id: &EdgeId) -> String {
356        format!("ff:flow:{}:{}:edge:{}", self.tag, self.fid, edge_id)
357    }
358
359    /// `ff:flow:{fp:N}:<flow_id>:out:<upstream_eid>`
360    pub fn outgoing(&self, upstream_eid: &ExecutionId) -> String {
361        format!("ff:flow:{}:{}:out:{}", self.tag, self.fid, upstream_eid)
362    }
363
364    /// `ff:flow:{fp:N}:<flow_id>:in:<downstream_eid>`
365    pub fn incoming(&self, downstream_eid: &ExecutionId) -> String {
366        format!("ff:flow:{}:{}:in:{}", self.tag, self.fid, downstream_eid)
367    }
368
369    /// `ff:flow:{fp:N}:<flow_id>:events`
370    pub fn events(&self) -> String {
371        format!("ff:flow:{}:{}:events", self.tag, self.fid)
372    }
373
374    /// `ff:flow:{fp:N}:<flow_id>:summary`
375    pub fn summary(&self) -> String {
376        format!("ff:flow:{}:{}:summary", self.tag, self.fid)
377    }
378
379    /// `ff:flow:{fp:N}:<flow_id>:grant:<mutation_id>`
380    pub fn grant(&self, mutation_id: &str) -> String {
381        format!("ff:flow:{}:{}:grant:{}", self.tag, self.fid, mutation_id)
382    }
383
384    /// `ff:flow:{fp:N}:<flow_id>:pending_cancels` — SET of execution IDs
385    /// whose cancel is still owed after a `cancel_all` cancel_flow. The
386    /// live async dispatch SREMs entries as it succeeds; the cancel
387    /// reconciler scanner drains the remainder on its interval so a
388    /// process crash mid-dispatch or a member whose cancel hit a
389    /// permanent error can't leave a flow member un-cancelled.
390    pub fn pending_cancels(&self) -> String {
391        format!("ff:flow:{}:{}:pending_cancels", self.tag, self.fid)
392    }
393
394    pub fn hash_tag(&self) -> &str {
395        &self.tag
396    }
397}
398
399/// Flow-partition index keys.
400pub struct FlowIndexKeys {
401    tag: String,
402}
403
404impl FlowIndexKeys {
405    pub fn new(partition: &Partition) -> Self {
406        Self {
407            tag: partition.hash_tag(),
408        }
409    }
410
411    /// `ff:idx:{fp:N}:flow_index` — SET of flow IDs on this partition.
412    /// Used by the flow projector for cluster-safe discovery (replaces SCAN).
413    pub fn flow_index(&self) -> String {
414        format!("ff:idx:{}:flow_index", self.tag)
415    }
416
417    /// `ff:idx:{fp:N}:cancel_backlog` — ZSET of flow IDs whose async
418    /// cancel dispatch is still owed members. Score = grace-window expiry
419    /// (unix ms). The cancel reconciler scanner ZRANGEBYSCOREs entries
420    /// whose score <= now, drains their `pending_cancels` set, and ZREMs
421    /// when empty. Live dispatch runs without waiting on this score, so
422    /// the grace window just keeps the reconciler from fighting the
423    /// happy path.
424    pub fn cancel_backlog(&self) -> String {
425        format!("ff:idx:{}:cancel_backlog", self.tag)
426    }
427}
428
429// ─── Budget Partition Keys ({b:M}) ───
430
431/// Key context for budget partition keys.
432pub struct BudgetKeyContext {
433    tag: String,
434    bid: String,
435}
436
437impl BudgetKeyContext {
438    pub fn new(partition: &Partition, bid: &BudgetId) -> Self {
439        Self {
440            tag: partition.hash_tag(),
441            bid: bid.to_string(),
442        }
443    }
444
445    /// `ff:budget:{b:M}:<budget_id>` — Budget definition.
446    pub fn definition(&self) -> String {
447        format!("ff:budget:{}:{}", self.tag, self.bid)
448    }
449
450    /// `ff:budget:{b:M}:<budget_id>:limits` — Hard/soft limits per dimension.
451    pub fn limits(&self) -> String {
452        format!("ff:budget:{}:{}:limits", self.tag, self.bid)
453    }
454
455    /// `ff:budget:{b:M}:<budget_id>:usage` — Usage counters.
456    pub fn usage(&self) -> String {
457        format!("ff:budget:{}:{}:usage", self.tag, self.bid)
458    }
459
460    /// `ff:budget:{b:M}:<budget_id>:executions` — Reverse index.
461    pub fn executions(&self) -> String {
462        format!("ff:budget:{}:{}:executions", self.tag, self.bid)
463    }
464
465    pub fn hash_tag(&self) -> &str {
466        &self.tag
467    }
468}
469
470/// Budget attachment key (not budget-ID scoped).
471pub fn budget_attach_key(tag: &str, scope_type: &str, scope_id: &str) -> String {
472    format!("ff:budget_attach:{}:{}:{}", tag, scope_type, scope_id)
473}
474
475/// Budget reset schedule index.
476pub fn budget_resets_key(tag: &str) -> String {
477    format!("ff:idx:{}:budget_resets", tag)
478}
479
480/// Budget policies index — SET of budget IDs on this partition.
481/// Used by the budget reconciler for cluster-safe discovery (replaces SCAN).
482pub fn budget_policies_index(tag: &str) -> String {
483    format!("ff:idx:{}:budget_policies", tag)
484}
485
486// ─── Quota Partition Keys ({q:K}) ───
487
488/// Key context for quota partition keys.
489pub struct QuotaKeyContext {
490    tag: String,
491    qid: String,
492}
493
494impl QuotaKeyContext {
495    pub fn new(partition: &Partition, qid: &QuotaPolicyId) -> Self {
496        Self {
497            tag: partition.hash_tag(),
498            qid: qid.to_string(),
499        }
500    }
501
502    /// `ff:quota:{q:K}:<quota_policy_id>` — Quota policy definition.
503    pub fn definition(&self) -> String {
504        format!("ff:quota:{}:{}", self.tag, self.qid)
505    }
506
507    /// `ff:quota:{q:K}:<quota_policy_id>:window:<dimension>`
508    pub fn window(&self, dimension: &str) -> String {
509        format!("ff:quota:{}:{}:window:{}", self.tag, self.qid, dimension)
510    }
511
512    /// `ff:quota:{q:K}:<quota_policy_id>:concurrency`
513    pub fn concurrency(&self) -> String {
514        format!("ff:quota:{}:{}:concurrency", self.tag, self.qid)
515    }
516
517    /// `ff:quota:{q:K}:<quota_policy_id>:admitted:<execution_id>`
518    pub fn admitted(&self, eid: &ExecutionId) -> String {
519        format!("ff:quota:{}:{}:admitted:{}", self.tag, self.qid, eid)
520    }
521
522    /// `ff:quota:{q:K}:<quota_policy_id>:admitted_set` — SET of admitted execution IDs.
523    /// Used by the quota reconciler instead of SCAN (cluster-safe).
524    pub fn admitted_set(&self) -> String {
525        format!("ff:quota:{}:{}:admitted_set", self.tag, self.qid)
526    }
527
528    pub fn hash_tag(&self) -> &str {
529        &self.tag
530    }
531}
532
533/// Quota policy index — SET of policy IDs on this partition.
534/// Used by the quota reconciler for cluster-safe discovery (replaces SCAN).
535pub fn quota_policies_index(tag: &str) -> String {
536    format!("ff:idx:{}:quota_policies", tag)
537}
538
539/// Quota attachment key.
540pub fn quota_attach_key(tag: &str, scope_type: &str, scope_id: &str) -> String {
541    format!("ff:quota_attach:{}:{}:{}", tag, scope_type, scope_id)
542}
543
544// ─── Global Keys (no hash tag) ───
545
546/// Lane configuration key.
547pub fn lane_config_key(lane_id: &LaneId) -> String {
548    format!("ff:lane:{}:config", lane_id)
549}
550
551/// Lane counts key.
552pub fn lane_counts_key(lane_id: &LaneId) -> String {
553    format!("ff:lane:{}:counts", lane_id)
554}
555
556/// Worker registration key.
557pub fn worker_key(wid: &WorkerInstanceId) -> String {
558    format!("ff:worker:{}", wid)
559}
560
561/// Non-authoritative capability advertisement STRING for a worker
562/// (sorted CSV). Written by `ff-sdk::FlowFabricWorker::connect`, read by
563/// the engine's unblock scanner to decide whether a `blocked_by_route`
564/// execution has a matching worker. Cluster mode: the key lands on
565/// whatever slot CRC16 hashes to — enumeration goes through
566/// `workers_index_key()` rather than a keyspace SCAN, which would only
567/// hit one shard in cluster mode.
568pub fn worker_caps_key(wid: &WorkerInstanceId) -> String {
569    format!("ff:worker:{}:caps", wid)
570}
571
572/// Global worker index — SET of connected worker_instance_ids. Single
573/// slot in cluster mode (no hash tag; CRC16 of the literal key). SADD on
574/// connect, SREM on empty-caps restart; SMEMBERS gives the enumerable
575/// universe the unblock scanner walks. Separate from `ff:worker:{id}`
576/// registration keys to keep the index membership cheap to read and
577/// independent of per-worker hash details.
578pub fn workers_index_key() -> String {
579    "ff:idx:workers".to_owned()
580}
581
582/// Worker capability index.
583pub fn workers_capability_key(key: &str, value: &str) -> String {
584    format!("ff:idx:workers:cap:{}:{}", key, value)
585}
586
587/// Lane registry.
588pub fn lanes_index_key() -> String {
589    "ff:idx:lanes".to_owned()
590}
591
592/// Partition configuration — `ff:config:partitions` (§8.3).
593/// Validated on startup; created on first boot.
594pub fn global_config_partitions() -> String {
595    "ff:config:partitions".to_owned()
596}
597
598// ─── Cross-Partition Secondary Indexes ───
599
600/// `ff:ns:<namespace>:executions`
601pub fn namespace_executions_key(namespace: &str) -> String {
602    format!("ff:ns:{}:executions", namespace)
603}
604
605/// `ff:idem:{p:N}:<namespace>:<idempotency_key>`
606///
607/// Includes the execution partition hash tag so the key hashes to the same
608/// Valkey cluster slot as all other KEYS in the ff_create_execution FCALL.
609pub fn idempotency_key(tag: &str, namespace: &str, idem_key: &str) -> String {
610    format!("ff:idem:{}:{}:{}", tag, namespace, idem_key)
611}
612
613/// Placeholder key that shares a hash tag with other KEYS in the same FCALL.
614///
615/// Used when an optional key (e.g. idempotency key) is absent. The Lua
616/// function never reads/writes this key, but Valkey cluster requires all
617/// KEYS in a single FCALL to hash to the same slot.
618pub fn noop_key(tag: &str) -> String {
619    format!("ff:noop:{}", tag)
620}
621
622/// `ff:tag:<namespace>:<key>:<value>`
623pub fn tag_index_key(namespace: &str, key: &str, value: &str) -> String {
624    format!("ff:tag:{}:{}:{}", namespace, key, value)
625}
626
627/// Waitpoint key resolution — `ff:wpkey:{p:N}:<waitpoint_key>`
628pub fn waitpoint_key_resolution(tag: &str, wp_key: &WaitpointKey) -> String {
629    format!("ff:wpkey:{}:{}", tag, wp_key)
630}
631
632/// Shared prefix for the usage-dedup keyspace. Must match the
633/// `ff:usagededup:` literal referenced in `lua/**.lua` (notably
634/// `lua/budget.lua:99`, the `ff_report_usage_and_check` function).
635/// Grep `ff:usagededup:` to find all producers, consumers, and test
636/// fixtures in one search.
637pub const USAGE_DEDUP_KEY_PREFIX: &str = "ff:usagededup:";
638
639/// Build a usage-dedup key: `ff:usagededup:<hash_tag>:<dedup_id>`.
640///
641/// `hash_tag` must ALREADY include the Valkey hash-tag braces
642/// (e.g. `"{bp:7}"`) — typically obtained from
643/// [`BudgetKeyContext::hash_tag`]. Do not double-wrap.
644pub fn usage_dedup_key(hash_tag: &str, dedup_id: &str) -> String {
645    format!("{USAGE_DEDUP_KEY_PREFIX}{hash_tag}:{dedup_id}")
646}
647
648#[cfg(test)]
649mod tests {
650    use super::*;
651    use crate::partition::{execution_partition, flow_partition, PartitionConfig};
652
653    #[test]
654    fn exec_key_context_core_format() {
655        let config = PartitionConfig::default();
656        let eid = ExecutionId::parse("{fp:0}:550e8400-e29b-41d4-a716-446655440000").unwrap();
657        let partition = execution_partition(&eid, &config);
658        let ctx = ExecKeyContext::new(&partition, &eid);
659
660        let core_key = ctx.core();
661        // Post-RFC-011: exec keys co-locate on flow partitions ({fp:*}).
662        assert!(core_key.starts_with("ff:exec:{fp:"));
663        assert!(core_key.ends_with(":core"));
664        assert!(core_key.contains("550e8400-e29b-41d4-a716-446655440000"));
665    }
666
667    #[test]
668    fn exec_key_all_keys_share_hash_tag() {
669        let config = PartitionConfig::default();
670        let eid = ExecutionId::for_flow(&FlowId::new(), &config);
671        let partition = execution_partition(&eid, &config);
672        let ctx = ExecKeyContext::new(&partition, &eid);
673        let tag = ctx.hash_tag();
674
675        // All keys must contain the same hash tag
676        assert!(ctx.core().contains(tag));
677        assert!(ctx.payload().contains(tag));
678        assert!(ctx.result().contains(tag));
679        assert!(ctx.policy().contains(tag));
680        assert!(ctx.lease_current().contains(tag));
681        assert!(ctx.lease_history().contains(tag));
682        assert!(ctx.attempts().contains(tag));
683        assert!(ctx.suspension_current().contains(tag));
684        assert!(ctx.exec_signals().contains(tag));
685    }
686
687    #[test]
688    fn attempt_key_includes_index() {
689        let config = PartitionConfig::default();
690        let eid = ExecutionId::for_flow(&FlowId::new(), &config);
691        let partition = execution_partition(&eid, &config);
692        let ctx = ExecKeyContext::new(&partition, &eid);
693
694        let key = ctx.attempt_hash(AttemptIndex::new(3));
695        assert!(key.contains(":3"), "attempt key should contain index");
696    }
697
698    #[test]
699    fn stream_key_format() {
700        let config = PartitionConfig::default();
701        let eid = ExecutionId::for_flow(&FlowId::new(), &config);
702        let partition = execution_partition(&eid, &config);
703        let ctx = ExecKeyContext::new(&partition, &eid);
704
705        let key = ctx.stream(AttemptIndex::new(0));
706        assert!(key.starts_with("ff:stream:{fp:"));
707        assert!(key.ends_with(":0"));
708    }
709
710    #[test]
711    fn index_keys_format() {
712        let config = PartitionConfig::default();
713        let eid = ExecutionId::for_flow(&FlowId::new(), &config);
714        let partition = execution_partition(&eid, &config);
715        let idx = IndexKeys::new(&partition);
716        let lane = LaneId::new("default");
717
718        assert!(idx.lane_eligible(&lane).contains(":lane:default:eligible"));
719        assert!(idx.lane_delayed(&lane).contains(":lane:default:delayed"));
720        assert!(idx.lease_expiry().contains(":lease_expiry"));
721        assert!(idx.all_executions().contains(":all_executions"));
722    }
723
724    #[test]
725    fn flow_key_context_format() {
726        let config = PartitionConfig::default();
727        let fid = FlowId::new();
728        let partition = flow_partition(&fid, &config);
729        let ctx = FlowKeyContext::new(&partition, &fid);
730
731        assert!(ctx.core().starts_with("ff:flow:{fp:"));
732        assert!(ctx.core().ends_with(":core"));
733        assert!(ctx.members().ends_with(":members"));
734    }
735
736    #[test]
737    fn global_keys_no_hash_tag() {
738        let lane = LaneId::new("default");
739        let key = lane_config_key(&lane);
740        assert_eq!(key, "ff:lane:default:config");
741        assert!(!key.contains('{'));
742    }
743
744    #[test]
745    fn usage_dedup_key_format() {
746        // hash_tag already includes braces — helper must not double-wrap.
747        let key = usage_dedup_key("{bp:7}", "dedup-123");
748        assert_eq!(key, "ff:usagededup:{bp:7}:dedup-123");
749        assert!(key.starts_with(USAGE_DEDUP_KEY_PREFIX));
750        // Exactly one `{…}` hash-tag region.
751        assert_eq!(key.matches('{').count(), 1);
752        assert_eq!(key.matches('}').count(), 1);
753    }
754}