1use crate::partition::Partition;
2use crate::types::{
3 AttemptIndex, BudgetId, EdgeId, ExecutionId, FlowId, LaneId, Namespace, QuotaPolicyId,
4 SignalId, WaitpointId, WaitpointKey, WorkerInstanceId,
5};
6
7pub struct ExecKeyContext {
19 tag: String,
21 eid: String,
23}
24
25impl ExecKeyContext {
26 pub fn new(partition: &Partition, eid: &ExecutionId) -> Self {
27 Self {
28 tag: partition.hash_tag(),
29 eid: eid.to_string(),
30 }
31 }
32
33 pub fn core(&self) -> String {
37 format!("ff:exec:{}:{}:core", self.tag, self.eid)
38 }
39
40 pub fn payload(&self) -> String {
42 format!("ff:exec:{}:{}:payload", self.tag, self.eid)
43 }
44
45 pub fn result(&self) -> String {
47 format!("ff:exec:{}:{}:result", self.tag, self.eid)
48 }
49
50 pub fn policy(&self) -> String {
52 format!("ff:exec:{}:{}:policy", self.tag, self.eid)
53 }
54
55 pub fn tags(&self) -> String {
57 format!("ff:exec:{}:{}:tags", self.tag, self.eid)
58 }
59
60 pub fn lease_current(&self) -> String {
64 format!("ff:exec:{}:{}:lease:current", self.tag, self.eid)
65 }
66
67 pub fn lease_history(&self) -> String {
69 format!("ff:exec:{}:{}:lease:history", self.tag, self.eid)
70 }
71
72 pub fn claim_grant(&self) -> String {
74 format!("ff:exec:{}:{}:claim_grant", self.tag, self.eid)
75 }
76
77 pub fn attempts(&self) -> String {
81 format!("ff:exec:{}:{}:attempts", self.tag, self.eid)
82 }
83
84 pub fn attempt_hash(&self, index: AttemptIndex) -> String {
86 format!("ff:attempt:{}:{}:{}", self.tag, self.eid, index)
87 }
88
89 pub fn attempt_usage(&self, index: AttemptIndex) -> String {
91 format!("ff:attempt:{}:{}:{}:usage", self.tag, self.eid, index)
92 }
93
94 pub fn attempt_policy(&self, index: AttemptIndex) -> String {
96 format!("ff:attempt:{}:{}:{}:policy", self.tag, self.eid, index)
97 }
98
99 pub fn stream(&self, index: AttemptIndex) -> String {
103 format!("ff:stream:{}:{}:{}", self.tag, self.eid, index)
104 }
105
106 pub fn stream_meta(&self, index: AttemptIndex) -> String {
108 format!("ff:stream:{}:{}:{}:meta", self.tag, self.eid, index)
109 }
110
111 pub fn stream_summary(&self, index: AttemptIndex) -> String {
118 format!("ff:attempt:{}:{}:{}:summary", self.tag, self.eid, index)
119 }
120
121 pub fn suspension_current(&self) -> String {
125 format!("ff:exec:{}:{}:suspension:current", self.tag, self.eid)
126 }
127
128 pub fn suspension_satisfied_set(&self) -> String {
134 format!(
135 "ff:exec:{}:{}:suspension:current:satisfied_set",
136 self.tag, self.eid
137 )
138 }
139
140 pub fn suspension_member_map(&self) -> String {
145 format!(
146 "ff:exec:{}:{}:suspension:current:member_map",
147 self.tag, self.eid
148 )
149 }
150
151 pub fn waitpoints(&self) -> String {
153 format!("ff:exec:{}:{}:waitpoints", self.tag, self.eid)
154 }
155
156 pub fn waitpoint(&self, wp_id: &WaitpointId) -> String {
158 format!("ff:wp:{}:{}", self.tag, wp_id)
159 }
160
161 pub fn waitpoint_signals(&self, wp_id: &WaitpointId) -> String {
163 format!("ff:wp:{}:{}:signals", self.tag, wp_id)
164 }
165
166 pub fn waitpoint_condition(&self, wp_id: &WaitpointId) -> String {
168 format!("ff:wp:{}:{}:condition", self.tag, wp_id)
169 }
170
171 pub fn signal(&self, sig_id: &SignalId) -> String {
175 format!("ff:signal:{}:{}", self.tag, sig_id)
176 }
177
178 pub fn signal_payload(&self, sig_id: &SignalId) -> String {
180 format!("ff:signal:{}:{}:payload", self.tag, sig_id)
181 }
182
183 pub fn exec_signals(&self) -> String {
185 format!("ff:exec:{}:{}:signals", self.tag, self.eid)
186 }
187
188 pub fn signal_dedup(&self, wp_id: &WaitpointId, idempotency_key: &str) -> String {
190 format!("ff:sigdedup:{}:{}:{}", self.tag, wp_id, idempotency_key)
191 }
192
193 pub fn suspend_dedup(&self, idempotency_key: &str) -> String {
200 format!(
201 "ff:dedup:suspend:{}:{}:{}",
202 self.tag, self.eid, idempotency_key
203 )
204 }
205
206 pub fn deps_meta(&self) -> String {
210 format!("ff:exec:{}:{}:deps:meta", self.tag, self.eid)
211 }
212
213 pub fn dep_edge(&self, edge_id: &EdgeId) -> String {
215 format!("ff:exec:{}:{}:dep:{}", self.tag, self.eid, edge_id)
216 }
217
218 pub fn deps_unresolved(&self) -> String {
220 format!("ff:exec:{}:{}:deps:unresolved", self.tag, self.eid)
221 }
222
223 pub fn deps_all_edges(&self) -> String {
228 format!("ff:exec:{}:{}:deps:all_edges", self.tag, self.eid)
229 }
230
231 pub fn noop(&self) -> String {
237 format!("ff:noop:{}", self.tag)
238 }
239
240 pub fn hash_tag(&self) -> &str {
241 &self.tag
242 }
243
244 pub fn execution_id_str(&self) -> &str {
245 &self.eid
246 }
247}
248
249pub struct IndexKeys {
253 tag: String,
254}
255
256impl IndexKeys {
257 pub fn new(partition: &Partition) -> Self {
258 Self {
259 tag: partition.hash_tag(),
260 }
261 }
262
263 pub fn lane_eligible(&self, lane_id: &LaneId) -> String {
265 format!("ff:idx:{}:lane:{}:eligible", self.tag, lane_id)
266 }
267
268 pub fn lane_delayed(&self, lane_id: &LaneId) -> String {
270 format!("ff:idx:{}:lane:{}:delayed", self.tag, lane_id)
271 }
272
273 pub fn lane_active(&self, lane_id: &LaneId) -> String {
275 format!("ff:idx:{}:lane:{}:active", self.tag, lane_id)
276 }
277
278 pub fn lane_terminal(&self, lane_id: &LaneId) -> String {
280 format!("ff:idx:{}:lane:{}:terminal", self.tag, lane_id)
281 }
282
283 pub fn lane_blocked_dependencies(&self, lane_id: &LaneId) -> String {
285 format!("ff:idx:{}:lane:{}:blocked:dependencies", self.tag, lane_id)
286 }
287
288 pub fn lane_blocked_budget(&self, lane_id: &LaneId) -> String {
290 format!("ff:idx:{}:lane:{}:blocked:budget", self.tag, lane_id)
291 }
292
293 pub fn lane_blocked_quota(&self, lane_id: &LaneId) -> String {
295 format!("ff:idx:{}:lane:{}:blocked:quota", self.tag, lane_id)
296 }
297
298 pub fn lane_blocked_route(&self, lane_id: &LaneId) -> String {
300 format!("ff:idx:{}:lane:{}:blocked:route", self.tag, lane_id)
301 }
302
303 pub fn lane_blocked_operator(&self, lane_id: &LaneId) -> String {
305 format!("ff:idx:{}:lane:{}:blocked:operator", self.tag, lane_id)
306 }
307
308 pub fn lane_suspended(&self, lane_id: &LaneId) -> String {
310 format!("ff:idx:{}:lane:{}:suspended", self.tag, lane_id)
311 }
312
313 pub fn waitpoint_hmac_secrets(&self) -> String {
322 format!("ff:sec:{}:waitpoint_hmac", self.tag)
323 }
324
325 pub fn lease_expiry(&self) -> String {
327 format!("ff:idx:{}:lease_expiry", self.tag)
328 }
329
330 pub fn worker_leases(&self, wid: &WorkerInstanceId) -> String {
332 format!("ff:idx:{}:worker:{}:leases", self.tag, wid)
333 }
334
335 pub fn suspension_timeout(&self) -> String {
337 format!("ff:idx:{}:suspension_timeout", self.tag)
338 }
339
340 pub fn pending_waitpoint_expiry(&self) -> String {
342 format!("ff:idx:{}:pending_waitpoint_expiry", self.tag)
343 }
344
345 pub fn attempt_timeout(&self) -> String {
347 format!("ff:idx:{}:attempt_timeout", self.tag)
348 }
349
350 pub fn execution_deadline(&self) -> String {
352 format!("ff:idx:{}:execution_deadline", self.tag)
353 }
354
355 pub fn all_executions(&self) -> String {
357 format!("ff:idx:{}:all_executions", self.tag)
358 }
359
360 pub fn partition_signal_delivery(&self) -> String {
365 format!("ff:part:{}:signal_delivery", self.tag)
366 }
367}
368
369pub struct FlowKeyContext {
373 tag: String,
374 fid: String,
375}
376
377impl FlowKeyContext {
378 pub fn new(partition: &Partition, fid: &FlowId) -> Self {
379 Self {
380 tag: partition.hash_tag(),
381 fid: fid.to_string(),
382 }
383 }
384
385 pub fn core(&self) -> String {
387 format!("ff:flow:{}:{}:core", self.tag, self.fid)
388 }
389
390 pub fn members(&self) -> String {
392 format!("ff:flow:{}:{}:members", self.tag, self.fid)
393 }
394
395 pub fn tags(&self) -> String {
400 format!("ff:flow:{}:{}:tags", self.tag, self.fid)
401 }
402
403 pub fn member(&self, eid: &ExecutionId) -> String {
405 format!("ff:flow:{}:{}:member:{}", self.tag, self.fid, eid)
406 }
407
408 pub fn edge(&self, edge_id: &EdgeId) -> String {
410 format!("ff:flow:{}:{}:edge:{}", self.tag, self.fid, edge_id)
411 }
412
413 pub fn outgoing(&self, upstream_eid: &ExecutionId) -> String {
415 format!("ff:flow:{}:{}:out:{}", self.tag, self.fid, upstream_eid)
416 }
417
418 pub fn incoming(&self, downstream_eid: &ExecutionId) -> String {
420 format!("ff:flow:{}:{}:in:{}", self.tag, self.fid, downstream_eid)
421 }
422
423 pub fn edgegroup(&self, downstream_eid: &ExecutionId) -> String {
429 format!(
430 "ff:flow:{}:{}:edgegroup:{}",
431 self.tag, self.fid, downstream_eid
432 )
433 }
434
435 pub fn events(&self) -> String {
437 format!("ff:flow:{}:{}:events", self.tag, self.fid)
438 }
439
440 pub fn summary(&self) -> String {
442 format!("ff:flow:{}:{}:summary", self.tag, self.fid)
443 }
444
445 pub fn grant(&self, mutation_id: &str) -> String {
447 format!("ff:flow:{}:{}:grant:{}", self.tag, self.fid, mutation_id)
448 }
449
450 pub fn pending_cancels(&self) -> String {
457 format!("ff:flow:{}:{}:pending_cancels", self.tag, self.fid)
458 }
459
460 pub fn hash_tag(&self) -> &str {
461 &self.tag
462 }
463
464 pub fn flow_id(&self) -> &str {
466 &self.fid
467 }
468}
469
470pub struct FlowIndexKeys {
472 tag: String,
473}
474
475impl FlowIndexKeys {
476 pub fn new(partition: &Partition) -> Self {
477 Self {
478 tag: partition.hash_tag(),
479 }
480 }
481
482 pub fn flow_index(&self) -> String {
485 format!("ff:idx:{}:flow_index", self.tag)
486 }
487
488 pub fn cancel_backlog(&self) -> String {
496 format!("ff:idx:{}:cancel_backlog", self.tag)
497 }
498
499 pub fn pending_cancel_groups(&self) -> String {
509 format!("ff:idx:{}:pending_cancel_groups", self.tag)
510 }
511}
512
513pub struct BudgetKeyContext {
517 tag: String,
518 bid: String,
519}
520
521impl BudgetKeyContext {
522 pub fn new(partition: &Partition, bid: &BudgetId) -> Self {
523 Self {
524 tag: partition.hash_tag(),
525 bid: bid.to_string(),
526 }
527 }
528
529 pub fn definition(&self) -> String {
531 format!("ff:budget:{}:{}", self.tag, self.bid)
532 }
533
534 pub fn limits(&self) -> String {
536 format!("ff:budget:{}:{}:limits", self.tag, self.bid)
537 }
538
539 pub fn usage(&self) -> String {
541 format!("ff:budget:{}:{}:usage", self.tag, self.bid)
542 }
543
544 pub fn executions(&self) -> String {
546 format!("ff:budget:{}:{}:executions", self.tag, self.bid)
547 }
548
549 pub fn by_exec(&self, execution_id: &str) -> String {
553 format!("ff:budget:{}:{}:by_exec:{}", self.tag, self.bid, execution_id)
554 }
555
556 pub fn by_exec_index(&self) -> String {
560 format!("ff:budget:{}:{}:by_exec:index", self.tag, self.bid)
561 }
562
563 pub fn hash_tag(&self) -> &str {
564 &self.tag
565 }
566}
567
568pub fn budget_attach_key(tag: &str, scope_type: &str, scope_id: &str) -> String {
570 format!("ff:budget_attach:{}:{}:{}", tag, scope_type, scope_id)
571}
572
573pub fn budget_resets_key(tag: &str) -> String {
575 format!("ff:idx:{}:budget_resets", tag)
576}
577
578pub fn budget_policies_index(tag: &str) -> String {
581 format!("ff:idx:{}:budget_policies", tag)
582}
583
584pub struct QuotaKeyContext {
588 tag: String,
589 qid: String,
590}
591
592impl QuotaKeyContext {
593 pub fn new(partition: &Partition, qid: &QuotaPolicyId) -> Self {
594 Self {
595 tag: partition.hash_tag(),
596 qid: qid.to_string(),
597 }
598 }
599
600 pub fn definition(&self) -> String {
602 format!("ff:quota:{}:{}", self.tag, self.qid)
603 }
604
605 pub fn window(&self, dimension: &str) -> String {
607 format!("ff:quota:{}:{}:window:{}", self.tag, self.qid, dimension)
608 }
609
610 pub fn concurrency(&self) -> String {
612 format!("ff:quota:{}:{}:concurrency", self.tag, self.qid)
613 }
614
615 pub fn admitted(&self, eid: &ExecutionId) -> String {
617 format!("ff:quota:{}:{}:admitted:{}", self.tag, self.qid, eid)
618 }
619
620 pub fn admitted_set(&self) -> String {
623 format!("ff:quota:{}:{}:admitted_set", self.tag, self.qid)
624 }
625
626 pub fn hash_tag(&self) -> &str {
627 &self.tag
628 }
629}
630
631pub fn quota_policies_index(tag: &str) -> String {
634 format!("ff:idx:{}:quota_policies", tag)
635}
636
637pub fn quota_attach_key(tag: &str, scope_type: &str, scope_id: &str) -> String {
639 format!("ff:quota_attach:{}:{}:{}", tag, scope_type, scope_id)
640}
641
642pub fn lane_config_key(lane_id: &LaneId) -> String {
646 format!("ff:lane:{}:config", lane_id)
647}
648
649pub fn lane_counts_key(lane_id: &LaneId) -> String {
651 format!("ff:lane:{}:counts", lane_id)
652}
653
654pub fn worker_alive_key_ns(ns: &Namespace, wid: &WorkerInstanceId) -> String {
671 format!("ff:worker:{}:{}:alive", ns, wid)
672}
673
674pub fn worker_caps_key_ns(ns: &Namespace, wid: &WorkerInstanceId) -> String {
678 format!("ff:worker:{}:{}:caps", ns, wid)
679}
680
681pub fn workers_index_key_ns(ns: &Namespace) -> String {
685 format!("ff:idx:{}:workers", ns)
686}
687
688pub fn workers_capability_key(key: &str, value: &str) -> String {
690 format!("ff:idx:workers:cap:{}:{}", key, value)
691}
692
693pub fn lanes_index_key() -> String {
695 "ff:idx:lanes".to_owned()
696}
697
698pub fn global_config_partitions() -> String {
701 "ff:config:partitions".to_owned()
702}
703
704pub fn namespace_executions_key(namespace: &str) -> String {
708 format!("ff:ns:{}:executions", namespace)
709}
710
711pub fn idempotency_key(tag: &str, namespace: &str, idem_key: &str) -> String {
716 format!("ff:idem:{}:{}:{}", tag, namespace, idem_key)
717}
718
719pub fn noop_key(tag: &str) -> String {
725 format!("ff:noop:{}", tag)
726}
727
728pub fn tag_index_key(namespace: &str, key: &str, value: &str) -> String {
730 format!("ff:tag:{}:{}:{}", namespace, key, value)
731}
732
733pub fn waitpoint_key_resolution(tag: &str, wp_key: &WaitpointKey) -> String {
735 format!("ff:wpkey:{}:{}", tag, wp_key)
736}
737
738pub const USAGE_DEDUP_KEY_PREFIX: &str = "ff:usagededup:";
744
745pub fn usage_dedup_key(hash_tag: &str, dedup_id: &str) -> String {
751 format!("{USAGE_DEDUP_KEY_PREFIX}{hash_tag}:{dedup_id}")
752}
753
754#[cfg(test)]
755mod tests {
756 use super::*;
757 use crate::partition::{execution_partition, flow_partition, PartitionConfig};
758
759 #[test]
760 fn exec_key_context_core_format() {
761 let config = PartitionConfig::default();
762 let eid = ExecutionId::parse("{fp:0}:550e8400-e29b-41d4-a716-446655440000").unwrap();
763 let partition = execution_partition(&eid, &config);
764 let ctx = ExecKeyContext::new(&partition, &eid);
765
766 let core_key = ctx.core();
767 assert!(core_key.starts_with("ff:exec:{fp:"));
769 assert!(core_key.ends_with(":core"));
770 assert!(core_key.contains("550e8400-e29b-41d4-a716-446655440000"));
771 }
772
773 #[test]
774 fn exec_key_all_keys_share_hash_tag() {
775 let config = PartitionConfig::default();
776 let eid = ExecutionId::for_flow(&FlowId::new(), &config);
777 let partition = execution_partition(&eid, &config);
778 let ctx = ExecKeyContext::new(&partition, &eid);
779 let tag = ctx.hash_tag();
780
781 assert!(ctx.core().contains(tag));
783 assert!(ctx.payload().contains(tag));
784 assert!(ctx.result().contains(tag));
785 assert!(ctx.policy().contains(tag));
786 assert!(ctx.lease_current().contains(tag));
787 assert!(ctx.lease_history().contains(tag));
788 assert!(ctx.attempts().contains(tag));
789 assert!(ctx.suspension_current().contains(tag));
790 assert!(ctx.exec_signals().contains(tag));
791 }
792
793 #[test]
794 fn attempt_key_includes_index() {
795 let config = PartitionConfig::default();
796 let eid = ExecutionId::for_flow(&FlowId::new(), &config);
797 let partition = execution_partition(&eid, &config);
798 let ctx = ExecKeyContext::new(&partition, &eid);
799
800 let key = ctx.attempt_hash(AttemptIndex::new(3));
801 assert!(key.contains(":3"), "attempt key should contain index");
802 }
803
804 #[test]
805 fn stream_key_format() {
806 let config = PartitionConfig::default();
807 let eid = ExecutionId::for_flow(&FlowId::new(), &config);
808 let partition = execution_partition(&eid, &config);
809 let ctx = ExecKeyContext::new(&partition, &eid);
810
811 let key = ctx.stream(AttemptIndex::new(0));
812 assert!(key.starts_with("ff:stream:{fp:"));
813 assert!(key.ends_with(":0"));
814 }
815
816 #[test]
817 fn index_keys_format() {
818 let config = PartitionConfig::default();
819 let eid = ExecutionId::for_flow(&FlowId::new(), &config);
820 let partition = execution_partition(&eid, &config);
821 let idx = IndexKeys::new(&partition);
822 let lane = LaneId::new("default");
823
824 assert!(idx.lane_eligible(&lane).contains(":lane:default:eligible"));
825 assert!(idx.lane_delayed(&lane).contains(":lane:default:delayed"));
826 assert!(idx.lease_expiry().contains(":lease_expiry"));
827 assert!(idx.all_executions().contains(":all_executions"));
828 }
829
830 #[test]
831 fn flow_key_context_format() {
832 let config = PartitionConfig::default();
833 let fid = FlowId::new();
834 let partition = flow_partition(&fid, &config);
835 let ctx = FlowKeyContext::new(&partition, &fid);
836
837 assert!(ctx.core().starts_with("ff:flow:{fp:"));
838 assert!(ctx.core().ends_with(":core"));
839 assert!(ctx.members().ends_with(":members"));
840 }
841
842 #[test]
843 fn global_keys_no_hash_tag() {
844 let lane = LaneId::new("default");
845 let key = lane_config_key(&lane);
846 assert_eq!(key, "ff:lane:default:config");
847 assert!(!key.contains('{'));
848 }
849
850 #[test]
851 fn usage_dedup_key_format() {
852 let key = usage_dedup_key("{bp:7}", "dedup-123");
854 assert_eq!(key, "ff:usagededup:{bp:7}:dedup-123");
855 assert!(key.starts_with(USAGE_DEDUP_KEY_PREFIX));
856 assert_eq!(key.matches('{').count(), 1);
858 assert_eq!(key.matches('}').count(), 1);
859 }
860}