Skip to main content

ff_core/
keys.rs

1use crate::partition::Partition;
2use crate::types::{
3    AttemptIndex, BudgetId, EdgeId, ExecutionId, FlowId, LaneId, QuotaPolicyId, SignalId,
4    WaitpointId, WaitpointKey, WorkerInstanceId,
5};
6
7// ─── Execution Partition Keys ({fp:N} post-RFC-011) ───
8//
9// NOTE: docstrings below still reference `{p:N}` as a historical shorthand.
10// Post-RFC-011 all exec-scoped keys hash-tag to `{fp:N}` (the parent flow's
11// partition), enabling atomic multi-key FCALLs across exec_core + flow_core.
12// The `Partition` type's `hash_tag()` method produces the correct `{fp:N}`
13// prefix; the docstring example paths are indicative of structure, not the
14// literal runtime hash-tag.
15
16/// Pre-computed key context for all keys on an execution's partition.
17/// Created once per operation and reused for all key construction.
18pub struct ExecKeyContext {
19    /// The hash tag, e.g. `{p:42}`
20    tag: String,
21    /// The execution ID string
22    eid: String,
23}
24
25impl ExecKeyContext {
26    pub fn new(partition: &Partition, eid: &ExecutionId) -> Self {
27        Self {
28            tag: partition.hash_tag(),
29            eid: eid.to_string(),
30        }
31    }
32
33    // ── Execution Core (RFC-001) ──
34
35    /// `ff:exec:{p:N}:<eid>:core` — Authoritative execution record.
36    pub fn core(&self) -> String {
37        format!("ff:exec:{}:{}:core", self.tag, self.eid)
38    }
39
40    /// `ff:exec:{p:N}:<eid>:payload` — Opaque input payload.
41    pub fn payload(&self) -> String {
42        format!("ff:exec:{}:{}:payload", self.tag, self.eid)
43    }
44
45    /// `ff:exec:{p:N}:<eid>:result` — Opaque result payload.
46    pub fn result(&self) -> String {
47        format!("ff:exec:{}:{}:result", self.tag, self.eid)
48    }
49
50    /// `ff:exec:{p:N}:<eid>:policy` — JSON-encoded ExecutionPolicySnapshot.
51    pub fn policy(&self) -> String {
52        format!("ff:exec:{}:{}:policy", self.tag, self.eid)
53    }
54
55    /// `ff:exec:{p:N}:<eid>:tags` — User-supplied key-value labels.
56    pub fn tags(&self) -> String {
57        format!("ff:exec:{}:{}:tags", self.tag, self.eid)
58    }
59
60    // ── Lease (RFC-003) ──
61
62    /// `ff:exec:{p:N}:<eid>:lease:current` — Full lease object.
63    pub fn lease_current(&self) -> String {
64        format!("ff:exec:{}:{}:lease:current", self.tag, self.eid)
65    }
66
67    /// `ff:exec:{p:N}:<eid>:lease:history` — Append-only lease events.
68    pub fn lease_history(&self) -> String {
69        format!("ff:exec:{}:{}:lease:history", self.tag, self.eid)
70    }
71
72    /// `ff:exec:{p:N}:<eid>:claim_grant` — Ephemeral claim grant.
73    pub fn claim_grant(&self) -> String {
74        format!("ff:exec:{}:{}:claim_grant", self.tag, self.eid)
75    }
76
77    // ── Attempt (RFC-002) ──
78
79    /// `ff:exec:{p:N}:<eid>:attempts` — Attempt index ZSET.
80    pub fn attempts(&self) -> String {
81        format!("ff:exec:{}:{}:attempts", self.tag, self.eid)
82    }
83
84    /// `ff:attempt:{p:N}:<eid>:<attempt_index>` — Per-attempt detail.
85    pub fn attempt_hash(&self, index: AttemptIndex) -> String {
86        format!("ff:attempt:{}:{}:{}", self.tag, self.eid, index)
87    }
88
89    /// `ff:attempt:{p:N}:<eid>:<attempt_index>:usage` — Per-attempt usage counters.
90    pub fn attempt_usage(&self, index: AttemptIndex) -> String {
91        format!("ff:attempt:{}:{}:{}:usage", self.tag, self.eid, index)
92    }
93
94    /// `ff:attempt:{p:N}:<eid>:<attempt_index>:policy` — Frozen attempt policy snapshot.
95    pub fn attempt_policy(&self, index: AttemptIndex) -> String {
96        format!("ff:attempt:{}:{}:{}:policy", self.tag, self.eid, index)
97    }
98
99    // ── Stream (RFC-006) ──
100
101    /// `ff:stream:{p:N}:<eid>:<attempt_index>` — Attempt-scoped output stream.
102    pub fn stream(&self, index: AttemptIndex) -> String {
103        format!("ff:stream:{}:{}:{}", self.tag, self.eid, index)
104    }
105
106    /// `ff:stream:{p:N}:<eid>:<attempt_index>:meta` — Stream metadata.
107    pub fn stream_meta(&self, index: AttemptIndex) -> String {
108        format!("ff:stream:{}:{}:{}:meta", self.tag, self.eid, index)
109    }
110
111    /// `ff:attempt:{p:N}:<eid>:<attempt_index>:summary` — Rolling summary
112    /// Hash for the `DurableSummary` stream mode (RFC-015 §3.1). Shares
113    /// the `{p:N}` hash-tag slot with [`Self::stream`] / [`Self::stream_meta`]
114    /// so the summary Hash, the stream key, and the stream-meta Hash are
115    /// all co-located for atomic multi-key FCALL application from the
116    /// single-shard Lua Function.
117    pub fn stream_summary(&self, index: AttemptIndex) -> String {
118        format!("ff:attempt:{}:{}:{}:summary", self.tag, self.eid, index)
119    }
120
121    // ── Suspension / Waitpoint (RFC-004) ──
122
123    /// `ff:exec:{p:N}:<eid>:suspension:current` — Current suspension episode.
124    pub fn suspension_current(&self) -> String {
125        format!("ff:exec:{}:{}:suspension:current", self.tag, self.eid)
126    }
127
128    /// `ff:exec:{p:N}:<eid>:suspension:current:satisfied_set` — RFC-014
129    /// §3.1. Durable SET of satisfier tokens accumulated during the
130    /// active suspension. Created at `suspend_execution` (for composite
131    /// conditions only) and deleted on the three terminating paths
132    /// (resume, cancel, expire).
133    pub fn suspension_satisfied_set(&self) -> String {
134        format!(
135            "ff:exec:{}:{}:suspension:current:satisfied_set",
136            self.tag, self.eid
137        )
138    }
139
140    /// `ff:exec:{p:N}:<eid>:suspension:current:member_map` — RFC-014
141    /// §3.1. Write-once HASH mapping `waitpoint_id → node_path`
142    /// (e.g. `"members[0]"`). Read by `deliver_signal` to locate which
143    /// composite node a signal affects.
144    pub fn suspension_member_map(&self) -> String {
145        format!(
146            "ff:exec:{}:{}:suspension:current:member_map",
147            self.tag, self.eid
148        )
149    }
150
151    /// `ff:exec:{p:N}:<eid>:waitpoints` — Set of all waitpoint IDs.
152    pub fn waitpoints(&self) -> String {
153        format!("ff:exec:{}:{}:waitpoints", self.tag, self.eid)
154    }
155
156    /// `ff:wp:{p:N}:<wp_id>` — Waitpoint record.
157    pub fn waitpoint(&self, wp_id: &WaitpointId) -> String {
158        format!("ff:wp:{}:{}", self.tag, wp_id)
159    }
160
161    /// `ff:wp:{p:N}:<wp_id>:signals` — Per-waitpoint signal history.
162    pub fn waitpoint_signals(&self, wp_id: &WaitpointId) -> String {
163        format!("ff:wp:{}:{}:signals", self.tag, wp_id)
164    }
165
166    /// `ff:wp:{p:N}:<wp_id>:condition` — Resume condition evaluation state.
167    pub fn waitpoint_condition(&self, wp_id: &WaitpointId) -> String {
168        format!("ff:wp:{}:{}:condition", self.tag, wp_id)
169    }
170
171    // ── Signal (RFC-005) ──
172
173    /// `ff:signal:{p:N}:<signal_id>` — Signal record.
174    pub fn signal(&self, sig_id: &SignalId) -> String {
175        format!("ff:signal:{}:{}", self.tag, sig_id)
176    }
177
178    /// `ff:signal:{p:N}:<signal_id>:payload` — Opaque signal payload.
179    pub fn signal_payload(&self, sig_id: &SignalId) -> String {
180        format!("ff:signal:{}:{}:payload", self.tag, sig_id)
181    }
182
183    /// `ff:exec:{p:N}:<eid>:signals` — Per-execution signal index.
184    pub fn exec_signals(&self) -> String {
185        format!("ff:exec:{}:{}:signals", self.tag, self.eid)
186    }
187
188    /// `ff:sigdedup:{p:N}:<wp_id>:<idem_key>` — Signal idempotency guard.
189    pub fn signal_dedup(&self, wp_id: &WaitpointId, idempotency_key: &str) -> String {
190        format!("ff:sigdedup:{}:{}:{}", self.tag, wp_id, idempotency_key)
191    }
192
193    /// `ff:dedup:suspend:{p:N}:<eid>:<idem_key>` — Suspend idempotency
194    /// guard (RFC-013 §2.2). Partition-scoped hash that stores a
195    /// serialized [`crate::contracts::SuspendOutcome`] when a caller
196    /// supplies `SuspendArgs::idempotency_key`; a retry within the TTL
197    /// window replays the first outcome verbatim without state
198    /// mutation.
199    pub fn suspend_dedup(&self, idempotency_key: &str) -> String {
200        format!(
201            "ff:dedup:suspend:{}:{}:{}",
202            self.tag, self.eid, idempotency_key
203        )
204    }
205
206    // ── Flow Dependency — Execution-Local (RFC-007) ──
207
208    /// `ff:exec:{p:N}:<eid>:deps:meta` — Dependency summary.
209    pub fn deps_meta(&self) -> String {
210        format!("ff:exec:{}:{}:deps:meta", self.tag, self.eid)
211    }
212
213    /// `ff:exec:{p:N}:<eid>:dep:<edge_id>` — Per-edge local state.
214    pub fn dep_edge(&self, edge_id: &EdgeId) -> String {
215        format!("ff:exec:{}:{}:dep:{}", self.tag, self.eid, edge_id)
216    }
217
218    /// `ff:exec:{p:N}:<eid>:deps:unresolved` — Set of unresolved edge IDs.
219    pub fn deps_unresolved(&self) -> String {
220        format!("ff:exec:{}:{}:deps:unresolved", self.tag, self.eid)
221    }
222
223    /// `ff:exec:{p:N}:<eid>:deps:all_edges` — Set of ALL applied edge IDs
224    /// on this execution (unresolved + satisfied + impossible). Populated by
225    /// ff_apply_dependency_to_child, never pruned on resolve. Used by the
226    /// retention trimmer to enumerate dep edge hashes without SCAN.
227    pub fn deps_all_edges(&self) -> String {
228        format!("ff:exec:{}:{}:deps:all_edges", self.tag, self.eid)
229    }
230
231    // ── Accessor ──
232
233    /// Dummy key on this partition, used as a placeholder for unused KEYS
234    /// positions (e.g. empty idempotency key). Ensures all KEYS in an FCALL
235    /// share the same hash tag, preventing CrossSlot errors in cluster mode.
236    pub fn noop(&self) -> String {
237        format!("ff:noop:{}", self.tag)
238    }
239
240    pub fn hash_tag(&self) -> &str {
241        &self.tag
242    }
243
244    pub fn execution_id_str(&self) -> &str {
245        &self.eid
246    }
247}
248
249// ─── Partition-Local Index Keys ({p:N}) ───
250
251/// Index keys scoped to a single execution partition.
252pub struct IndexKeys {
253    tag: String,
254}
255
256impl IndexKeys {
257    pub fn new(partition: &Partition) -> Self {
258        Self {
259            tag: partition.hash_tag(),
260        }
261    }
262
263    /// `ff:idx:{p:N}:lane:<lane_id>:eligible`
264    pub fn lane_eligible(&self, lane_id: &LaneId) -> String {
265        format!("ff:idx:{}:lane:{}:eligible", self.tag, lane_id)
266    }
267
268    /// `ff:idx:{p:N}:lane:<lane_id>:delayed`
269    pub fn lane_delayed(&self, lane_id: &LaneId) -> String {
270        format!("ff:idx:{}:lane:{}:delayed", self.tag, lane_id)
271    }
272
273    /// `ff:idx:{p:N}:lane:<lane_id>:active`
274    pub fn lane_active(&self, lane_id: &LaneId) -> String {
275        format!("ff:idx:{}:lane:{}:active", self.tag, lane_id)
276    }
277
278    /// `ff:idx:{p:N}:lane:<lane_id>:terminal`
279    pub fn lane_terminal(&self, lane_id: &LaneId) -> String {
280        format!("ff:idx:{}:lane:{}:terminal", self.tag, lane_id)
281    }
282
283    /// `ff:idx:{p:N}:lane:<lane_id>:blocked:dependencies`
284    pub fn lane_blocked_dependencies(&self, lane_id: &LaneId) -> String {
285        format!("ff:idx:{}:lane:{}:blocked:dependencies", self.tag, lane_id)
286    }
287
288    /// `ff:idx:{p:N}:lane:<lane_id>:blocked:budget`
289    pub fn lane_blocked_budget(&self, lane_id: &LaneId) -> String {
290        format!("ff:idx:{}:lane:{}:blocked:budget", self.tag, lane_id)
291    }
292
293    /// `ff:idx:{p:N}:lane:<lane_id>:blocked:quota`
294    pub fn lane_blocked_quota(&self, lane_id: &LaneId) -> String {
295        format!("ff:idx:{}:lane:{}:blocked:quota", self.tag, lane_id)
296    }
297
298    /// `ff:idx:{p:N}:lane:<lane_id>:blocked:route`
299    pub fn lane_blocked_route(&self, lane_id: &LaneId) -> String {
300        format!("ff:idx:{}:lane:{}:blocked:route", self.tag, lane_id)
301    }
302
303    /// `ff:idx:{p:N}:lane:<lane_id>:blocked:operator`
304    pub fn lane_blocked_operator(&self, lane_id: &LaneId) -> String {
305        format!("ff:idx:{}:lane:{}:blocked:operator", self.tag, lane_id)
306    }
307
308    /// `ff:idx:{p:N}:lane:<lane_id>:suspended`
309    pub fn lane_suspended(&self, lane_id: &LaneId) -> String {
310        format!("ff:idx:{}:lane:{}:suspended", self.tag, lane_id)
311    }
312
313    /// `ff:sec:{p:N}:waitpoint_hmac` — HMAC signing secrets replicated
314    /// across every execution partition (RFC-004 §Waitpoint Security).
315    /// Hash fields:
316    ///   `current_kid`, `previous_kid`, `secret:<kid>` (hex), `previous_expires_at`.
317    /// Replication is required for cluster mode: every FCALL that mints or
318    /// validates a token must hash-tag-collocate this key with the rest of
319    /// its execution-partition KEYS. The secret value is identical across
320    /// partitions; rotation fans out HSETs across them.
321    pub fn waitpoint_hmac_secrets(&self) -> String {
322        format!("ff:sec:{}:waitpoint_hmac", self.tag)
323    }
324
325    /// `ff:idx:{p:N}:lease_expiry` — Cross-lane lease expiry scanner target.
326    pub fn lease_expiry(&self) -> String {
327        format!("ff:idx:{}:lease_expiry", self.tag)
328    }
329
330    /// `ff:idx:{p:N}:worker:<worker_instance_id>:leases`
331    pub fn worker_leases(&self, wid: &WorkerInstanceId) -> String {
332        format!("ff:idx:{}:worker:{}:leases", self.tag, wid)
333    }
334
335    /// `ff:idx:{p:N}:suspension_timeout`
336    pub fn suspension_timeout(&self) -> String {
337        format!("ff:idx:{}:suspension_timeout", self.tag)
338    }
339
340    /// `ff:idx:{p:N}:pending_waitpoint_expiry`
341    pub fn pending_waitpoint_expiry(&self) -> String {
342        format!("ff:idx:{}:pending_waitpoint_expiry", self.tag)
343    }
344
345    /// `ff:idx:{p:N}:attempt_timeout`
346    pub fn attempt_timeout(&self) -> String {
347        format!("ff:idx:{}:attempt_timeout", self.tag)
348    }
349
350    /// `ff:idx:{p:N}:execution_deadline`
351    pub fn execution_deadline(&self) -> String {
352        format!("ff:idx:{}:execution_deadline", self.tag)
353    }
354
355    /// `ff:idx:{p:N}:all_executions`
356    pub fn all_executions(&self) -> String {
357        format!("ff:idx:{}:all_executions", self.tag)
358    }
359
360    /// `ff:part:{fp:N}:signal_delivery` — partition-level aggregate
361    /// stream that `ff_deliver_signal` XADDs into on every successful
362    /// delivery. `subscribe_signal_delivery` XREAD BLOCKs this key.
363    /// RFC-019 Stage B / #310.
364    pub fn partition_signal_delivery(&self) -> String {
365        format!("ff:part:{}:signal_delivery", self.tag)
366    }
367}
368
369// ─── Flow Partition Keys ({fp:N}) ───
370
371/// Key context for flow-structural partition keys.
372pub struct FlowKeyContext {
373    tag: String,
374    fid: String,
375}
376
377impl FlowKeyContext {
378    pub fn new(partition: &Partition, fid: &FlowId) -> Self {
379        Self {
380            tag: partition.hash_tag(),
381            fid: fid.to_string(),
382        }
383    }
384
385    /// `ff:flow:{fp:N}:<flow_id>:core`
386    pub fn core(&self) -> String {
387        format!("ff:flow:{}:{}:core", self.tag, self.fid)
388    }
389
390    /// `ff:flow:{fp:N}:<flow_id>:members`
391    pub fn members(&self) -> String {
392        format!("ff:flow:{}:{}:members", self.tag, self.fid)
393    }
394
395    /// `ff:flow:{fp:N}:<flow_id>:tags` — User-supplied key-value labels.
396    /// Symmetric with [`ExecKeyContext::tags`]. Populated by
397    /// `ff_set_flow_tags`, which also lazy-migrates any pre-58.4
398    /// reserved-namespace fields stashed inline on `flow_core`.
399    pub fn tags(&self) -> String {
400        format!("ff:flow:{}:{}:tags", self.tag, self.fid)
401    }
402
403    /// `ff:flow:{fp:N}:<flow_id>:member:<eid>`
404    pub fn member(&self, eid: &ExecutionId) -> String {
405        format!("ff:flow:{}:{}:member:{}", self.tag, self.fid, eid)
406    }
407
408    /// `ff:flow:{fp:N}:<flow_id>:edge:<edge_id>`
409    pub fn edge(&self, edge_id: &EdgeId) -> String {
410        format!("ff:flow:{}:{}:edge:{}", self.tag, self.fid, edge_id)
411    }
412
413    /// `ff:flow:{fp:N}:<flow_id>:out:<upstream_eid>`
414    pub fn outgoing(&self, upstream_eid: &ExecutionId) -> String {
415        format!("ff:flow:{}:{}:out:{}", self.tag, self.fid, upstream_eid)
416    }
417
418    /// `ff:flow:{fp:N}:<flow_id>:in:<downstream_eid>`
419    pub fn incoming(&self, downstream_eid: &ExecutionId) -> String {
420        format!("ff:flow:{}:{}:in:{}", self.tag, self.fid, downstream_eid)
421    }
422
423    /// `ff:flow:{fp:N}:<flow_id>:edgegroup:<downstream_eid>` — RFC-016
424    /// Stage A edge-group hash. Fields written in Stage A:
425    /// `policy_variant` (`all_of`), `n`, `succeeded`, `group_state`.
426    /// Stage B+ adds `on_satisfied`, `failed`, `skipped`, `satisfied_at`,
427    /// `cancel_siblings_pending`.
428    pub fn edgegroup(&self, downstream_eid: &ExecutionId) -> String {
429        format!(
430            "ff:flow:{}:{}:edgegroup:{}",
431            self.tag, self.fid, downstream_eid
432        )
433    }
434
435    /// `ff:flow:{fp:N}:<flow_id>:events`
436    pub fn events(&self) -> String {
437        format!("ff:flow:{}:{}:events", self.tag, self.fid)
438    }
439
440    /// `ff:flow:{fp:N}:<flow_id>:summary`
441    pub fn summary(&self) -> String {
442        format!("ff:flow:{}:{}:summary", self.tag, self.fid)
443    }
444
445    /// `ff:flow:{fp:N}:<flow_id>:grant:<mutation_id>`
446    pub fn grant(&self, mutation_id: &str) -> String {
447        format!("ff:flow:{}:{}:grant:{}", self.tag, self.fid, mutation_id)
448    }
449
450    /// `ff:flow:{fp:N}:<flow_id>:pending_cancels` — SET of execution IDs
451    /// whose cancel is still owed after a `cancel_all` cancel_flow. The
452    /// live async dispatch SREMs entries as it succeeds; the cancel
453    /// reconciler scanner drains the remainder on its interval so a
454    /// process crash mid-dispatch or a member whose cancel hit a
455    /// permanent error can't leave a flow member un-cancelled.
456    pub fn pending_cancels(&self) -> String {
457        format!("ff:flow:{}:{}:pending_cancels", self.tag, self.fid)
458    }
459
460    pub fn hash_tag(&self) -> &str {
461        &self.tag
462    }
463
464    /// The owning flow id as a string (as supplied to `new`).
465    pub fn flow_id(&self) -> &str {
466        &self.fid
467    }
468}
469
470/// Flow-partition index keys.
471pub struct FlowIndexKeys {
472    tag: String,
473}
474
475impl FlowIndexKeys {
476    pub fn new(partition: &Partition) -> Self {
477        Self {
478            tag: partition.hash_tag(),
479        }
480    }
481
482    /// `ff:idx:{fp:N}:flow_index` — SET of flow IDs on this partition.
483    /// Used by the flow projector for cluster-safe discovery (replaces SCAN).
484    pub fn flow_index(&self) -> String {
485        format!("ff:idx:{}:flow_index", self.tag)
486    }
487
488    /// `ff:idx:{fp:N}:cancel_backlog` — ZSET of flow IDs whose async
489    /// cancel dispatch is still owed members. Score = grace-window expiry
490    /// (unix ms). The cancel reconciler scanner ZRANGEBYSCOREs entries
491    /// whose score <= now, drains their `pending_cancels` set, and ZREMs
492    /// when empty. Live dispatch runs without waiting on this score, so
493    /// the grace window just keeps the reconciler from fighting the
494    /// happy path.
495    pub fn cancel_backlog(&self) -> String {
496        format!("ff:idx:{}:cancel_backlog", self.tag)
497    }
498
499    /// `ff:idx:{fp:N}:pending_cancel_groups` — RFC-016 Stage C
500    /// per-flow-partition SET of `<flow_id>|<downstream_eid>` tuples
501    /// whose edgegroup has a non-empty sibling-cancel queue awaiting
502    /// dispatch. Populated atomically by `ff_resolve_dependency` when
503    /// the AnyOf/Quorum resolver flips to `satisfied|impossible` under
504    /// `OnSatisfied::CancelRemaining`; drained by the dispatcher via
505    /// `ff_drain_sibling_cancel_group` once per-sibling cancels have
506    /// been acked. The sibling-cancel dispatcher scanner iterates this
507    /// SET (cluster-safe) instead of scanning edgegroup hashes.
508    pub fn pending_cancel_groups(&self) -> String {
509        format!("ff:idx:{}:pending_cancel_groups", self.tag)
510    }
511}
512
513// ─── Budget Partition Keys ({b:M}) ───
514
515/// Key context for budget partition keys.
516pub struct BudgetKeyContext {
517    tag: String,
518    bid: String,
519}
520
521impl BudgetKeyContext {
522    pub fn new(partition: &Partition, bid: &BudgetId) -> Self {
523        Self {
524            tag: partition.hash_tag(),
525            bid: bid.to_string(),
526        }
527    }
528
529    /// `ff:budget:{b:M}:<budget_id>` — Budget definition.
530    pub fn definition(&self) -> String {
531        format!("ff:budget:{}:{}", self.tag, self.bid)
532    }
533
534    /// `ff:budget:{b:M}:<budget_id>:limits` — Hard/soft limits per dimension.
535    pub fn limits(&self) -> String {
536        format!("ff:budget:{}:{}:limits", self.tag, self.bid)
537    }
538
539    /// `ff:budget:{b:M}:<budget_id>:usage` — Usage counters.
540    pub fn usage(&self) -> String {
541        format!("ff:budget:{}:{}:usage", self.tag, self.bid)
542    }
543
544    /// `ff:budget:{b:M}:<budget_id>:executions` — Reverse index.
545    pub fn executions(&self) -> String {
546        format!("ff:budget:{}:{}:executions", self.tag, self.bid)
547    }
548
549    pub fn hash_tag(&self) -> &str {
550        &self.tag
551    }
552}
553
554/// Budget attachment key (not budget-ID scoped).
555pub fn budget_attach_key(tag: &str, scope_type: &str, scope_id: &str) -> String {
556    format!("ff:budget_attach:{}:{}:{}", tag, scope_type, scope_id)
557}
558
559/// Budget reset schedule index.
560pub fn budget_resets_key(tag: &str) -> String {
561    format!("ff:idx:{}:budget_resets", tag)
562}
563
564/// Budget policies index — SET of budget IDs on this partition.
565/// Used by the budget reconciler for cluster-safe discovery (replaces SCAN).
566pub fn budget_policies_index(tag: &str) -> String {
567    format!("ff:idx:{}:budget_policies", tag)
568}
569
570// ─── Quota Partition Keys ({q:K}) ───
571
572/// Key context for quota partition keys.
573pub struct QuotaKeyContext {
574    tag: String,
575    qid: String,
576}
577
578impl QuotaKeyContext {
579    pub fn new(partition: &Partition, qid: &QuotaPolicyId) -> Self {
580        Self {
581            tag: partition.hash_tag(),
582            qid: qid.to_string(),
583        }
584    }
585
586    /// `ff:quota:{q:K}:<quota_policy_id>` — Quota policy definition.
587    pub fn definition(&self) -> String {
588        format!("ff:quota:{}:{}", self.tag, self.qid)
589    }
590
591    /// `ff:quota:{q:K}:<quota_policy_id>:window:<dimension>`
592    pub fn window(&self, dimension: &str) -> String {
593        format!("ff:quota:{}:{}:window:{}", self.tag, self.qid, dimension)
594    }
595
596    /// `ff:quota:{q:K}:<quota_policy_id>:concurrency`
597    pub fn concurrency(&self) -> String {
598        format!("ff:quota:{}:{}:concurrency", self.tag, self.qid)
599    }
600
601    /// `ff:quota:{q:K}:<quota_policy_id>:admitted:<execution_id>`
602    pub fn admitted(&self, eid: &ExecutionId) -> String {
603        format!("ff:quota:{}:{}:admitted:{}", self.tag, self.qid, eid)
604    }
605
606    /// `ff:quota:{q:K}:<quota_policy_id>:admitted_set` — SET of admitted execution IDs.
607    /// Used by the quota reconciler instead of SCAN (cluster-safe).
608    pub fn admitted_set(&self) -> String {
609        format!("ff:quota:{}:{}:admitted_set", self.tag, self.qid)
610    }
611
612    pub fn hash_tag(&self) -> &str {
613        &self.tag
614    }
615}
616
617/// Quota policy index — SET of policy IDs on this partition.
618/// Used by the quota reconciler for cluster-safe discovery (replaces SCAN).
619pub fn quota_policies_index(tag: &str) -> String {
620    format!("ff:idx:{}:quota_policies", tag)
621}
622
623/// Quota attachment key.
624pub fn quota_attach_key(tag: &str, scope_type: &str, scope_id: &str) -> String {
625    format!("ff:quota_attach:{}:{}:{}", tag, scope_type, scope_id)
626}
627
628// ─── Global Keys (no hash tag) ───
629
630/// Lane configuration key.
631pub fn lane_config_key(lane_id: &LaneId) -> String {
632    format!("ff:lane:{}:config", lane_id)
633}
634
635/// Lane counts key.
636pub fn lane_counts_key(lane_id: &LaneId) -> String {
637    format!("ff:lane:{}:counts", lane_id)
638}
639
640/// Worker registration key.
641pub fn worker_key(wid: &WorkerInstanceId) -> String {
642    format!("ff:worker:{}", wid)
643}
644
645/// Non-authoritative capability advertisement STRING for a worker
646/// (sorted CSV). Written by `ff-sdk::FlowFabricWorker::connect`, read by
647/// the engine's unblock scanner to decide whether a `blocked_by_route`
648/// execution has a matching worker. Cluster mode: the key lands on
649/// whatever slot CRC16 hashes to — enumeration goes through
650/// `workers_index_key()` rather than a keyspace SCAN, which would only
651/// hit one shard in cluster mode.
652pub fn worker_caps_key(wid: &WorkerInstanceId) -> String {
653    format!("ff:worker:{}:caps", wid)
654}
655
656/// Global worker index — SET of connected worker_instance_ids. Single
657/// slot in cluster mode (no hash tag; CRC16 of the literal key). SADD on
658/// connect, SREM on empty-caps restart; SMEMBERS gives the enumerable
659/// universe the unblock scanner walks. Separate from `ff:worker:{id}`
660/// registration keys to keep the index membership cheap to read and
661/// independent of per-worker hash details.
662pub fn workers_index_key() -> String {
663    "ff:idx:workers".to_owned()
664}
665
666/// Worker capability index.
667pub fn workers_capability_key(key: &str, value: &str) -> String {
668    format!("ff:idx:workers:cap:{}:{}", key, value)
669}
670
671/// Lane registry.
672pub fn lanes_index_key() -> String {
673    "ff:idx:lanes".to_owned()
674}
675
676/// Partition configuration — `ff:config:partitions` (§8.3).
677/// Validated on startup; created on first boot.
678pub fn global_config_partitions() -> String {
679    "ff:config:partitions".to_owned()
680}
681
682// ─── Cross-Partition Secondary Indexes ───
683
684/// `ff:ns:<namespace>:executions`
685pub fn namespace_executions_key(namespace: &str) -> String {
686    format!("ff:ns:{}:executions", namespace)
687}
688
689/// `ff:idem:{p:N}:<namespace>:<idempotency_key>`
690///
691/// Includes the execution partition hash tag so the key hashes to the same
692/// Valkey cluster slot as all other KEYS in the ff_create_execution FCALL.
693pub fn idempotency_key(tag: &str, namespace: &str, idem_key: &str) -> String {
694    format!("ff:idem:{}:{}:{}", tag, namespace, idem_key)
695}
696
697/// Placeholder key that shares a hash tag with other KEYS in the same FCALL.
698///
699/// Used when an optional key (e.g. idempotency key) is absent. The Lua
700/// function never reads/writes this key, but Valkey cluster requires all
701/// KEYS in a single FCALL to hash to the same slot.
702pub fn noop_key(tag: &str) -> String {
703    format!("ff:noop:{}", tag)
704}
705
706/// `ff:tag:<namespace>:<key>:<value>`
707pub fn tag_index_key(namespace: &str, key: &str, value: &str) -> String {
708    format!("ff:tag:{}:{}:{}", namespace, key, value)
709}
710
711/// Waitpoint key resolution — `ff:wpkey:{p:N}:<waitpoint_key>`
712pub fn waitpoint_key_resolution(tag: &str, wp_key: &WaitpointKey) -> String {
713    format!("ff:wpkey:{}:{}", tag, wp_key)
714}
715
716/// Shared prefix for the usage-dedup keyspace. Must match the
717/// `ff:usagededup:` literal referenced in `lua/**.lua` (notably
718/// `lua/budget.lua:99`, the `ff_report_usage_and_check` function).
719/// Grep `ff:usagededup:` to find all producers, consumers, and test
720/// fixtures in one search.
721pub const USAGE_DEDUP_KEY_PREFIX: &str = "ff:usagededup:";
722
723/// Build a usage-dedup key: `ff:usagededup:<hash_tag>:<dedup_id>`.
724///
725/// `hash_tag` must ALREADY include the Valkey hash-tag braces
726/// (e.g. `"{bp:7}"`) — typically obtained from
727/// [`BudgetKeyContext::hash_tag`]. Do not double-wrap.
728pub fn usage_dedup_key(hash_tag: &str, dedup_id: &str) -> String {
729    format!("{USAGE_DEDUP_KEY_PREFIX}{hash_tag}:{dedup_id}")
730}
731
732#[cfg(test)]
733mod tests {
734    use super::*;
735    use crate::partition::{execution_partition, flow_partition, PartitionConfig};
736
737    #[test]
738    fn exec_key_context_core_format() {
739        let config = PartitionConfig::default();
740        let eid = ExecutionId::parse("{fp:0}:550e8400-e29b-41d4-a716-446655440000").unwrap();
741        let partition = execution_partition(&eid, &config);
742        let ctx = ExecKeyContext::new(&partition, &eid);
743
744        let core_key = ctx.core();
745        // Post-RFC-011: exec keys co-locate on flow partitions ({fp:*}).
746        assert!(core_key.starts_with("ff:exec:{fp:"));
747        assert!(core_key.ends_with(":core"));
748        assert!(core_key.contains("550e8400-e29b-41d4-a716-446655440000"));
749    }
750
751    #[test]
752    fn exec_key_all_keys_share_hash_tag() {
753        let config = PartitionConfig::default();
754        let eid = ExecutionId::for_flow(&FlowId::new(), &config);
755        let partition = execution_partition(&eid, &config);
756        let ctx = ExecKeyContext::new(&partition, &eid);
757        let tag = ctx.hash_tag();
758
759        // All keys must contain the same hash tag
760        assert!(ctx.core().contains(tag));
761        assert!(ctx.payload().contains(tag));
762        assert!(ctx.result().contains(tag));
763        assert!(ctx.policy().contains(tag));
764        assert!(ctx.lease_current().contains(tag));
765        assert!(ctx.lease_history().contains(tag));
766        assert!(ctx.attempts().contains(tag));
767        assert!(ctx.suspension_current().contains(tag));
768        assert!(ctx.exec_signals().contains(tag));
769    }
770
771    #[test]
772    fn attempt_key_includes_index() {
773        let config = PartitionConfig::default();
774        let eid = ExecutionId::for_flow(&FlowId::new(), &config);
775        let partition = execution_partition(&eid, &config);
776        let ctx = ExecKeyContext::new(&partition, &eid);
777
778        let key = ctx.attempt_hash(AttemptIndex::new(3));
779        assert!(key.contains(":3"), "attempt key should contain index");
780    }
781
782    #[test]
783    fn stream_key_format() {
784        let config = PartitionConfig::default();
785        let eid = ExecutionId::for_flow(&FlowId::new(), &config);
786        let partition = execution_partition(&eid, &config);
787        let ctx = ExecKeyContext::new(&partition, &eid);
788
789        let key = ctx.stream(AttemptIndex::new(0));
790        assert!(key.starts_with("ff:stream:{fp:"));
791        assert!(key.ends_with(":0"));
792    }
793
794    #[test]
795    fn index_keys_format() {
796        let config = PartitionConfig::default();
797        let eid = ExecutionId::for_flow(&FlowId::new(), &config);
798        let partition = execution_partition(&eid, &config);
799        let idx = IndexKeys::new(&partition);
800        let lane = LaneId::new("default");
801
802        assert!(idx.lane_eligible(&lane).contains(":lane:default:eligible"));
803        assert!(idx.lane_delayed(&lane).contains(":lane:default:delayed"));
804        assert!(idx.lease_expiry().contains(":lease_expiry"));
805        assert!(idx.all_executions().contains(":all_executions"));
806    }
807
808    #[test]
809    fn flow_key_context_format() {
810        let config = PartitionConfig::default();
811        let fid = FlowId::new();
812        let partition = flow_partition(&fid, &config);
813        let ctx = FlowKeyContext::new(&partition, &fid);
814
815        assert!(ctx.core().starts_with("ff:flow:{fp:"));
816        assert!(ctx.core().ends_with(":core"));
817        assert!(ctx.members().ends_with(":members"));
818    }
819
820    #[test]
821    fn global_keys_no_hash_tag() {
822        let lane = LaneId::new("default");
823        let key = lane_config_key(&lane);
824        assert_eq!(key, "ff:lane:default:config");
825        assert!(!key.contains('{'));
826    }
827
828    #[test]
829    fn usage_dedup_key_format() {
830        // hash_tag already includes braces — helper must not double-wrap.
831        let key = usage_dedup_key("{bp:7}", "dedup-123");
832        assert_eq!(key, "ff:usagededup:{bp:7}:dedup-123");
833        assert!(key.starts_with(USAGE_DEDUP_KEY_PREFIX));
834        // Exactly one `{…}` hash-tag region.
835        assert_eq!(key.matches('{').count(), 1);
836        assert_eq!(key.matches('}').count(), 1);
837    }
838}