1use crate::partition::Partition;
2use crate::types::{
3 AttemptIndex, BudgetId, EdgeId, ExecutionId, FlowId, LaneId, QuotaPolicyId, SignalId,
4 WaitpointId, WaitpointKey, WorkerInstanceId,
5};
6
7pub struct ExecKeyContext {
19 tag: String,
21 eid: String,
23}
24
25impl ExecKeyContext {
26 pub fn new(partition: &Partition, eid: &ExecutionId) -> Self {
27 Self {
28 tag: partition.hash_tag(),
29 eid: eid.to_string(),
30 }
31 }
32
33 pub fn core(&self) -> String {
37 format!("ff:exec:{}:{}:core", self.tag, self.eid)
38 }
39
40 pub fn payload(&self) -> String {
42 format!("ff:exec:{}:{}:payload", self.tag, self.eid)
43 }
44
45 pub fn result(&self) -> String {
47 format!("ff:exec:{}:{}:result", self.tag, self.eid)
48 }
49
50 pub fn policy(&self) -> String {
52 format!("ff:exec:{}:{}:policy", self.tag, self.eid)
53 }
54
55 pub fn tags(&self) -> String {
57 format!("ff:exec:{}:{}:tags", self.tag, self.eid)
58 }
59
60 pub fn lease_current(&self) -> String {
64 format!("ff:exec:{}:{}:lease:current", self.tag, self.eid)
65 }
66
67 pub fn lease_history(&self) -> String {
69 format!("ff:exec:{}:{}:lease:history", self.tag, self.eid)
70 }
71
72 pub fn claim_grant(&self) -> String {
74 format!("ff:exec:{}:{}:claim_grant", self.tag, self.eid)
75 }
76
77 pub fn attempts(&self) -> String {
81 format!("ff:exec:{}:{}:attempts", self.tag, self.eid)
82 }
83
84 pub fn attempt_hash(&self, index: AttemptIndex) -> String {
86 format!("ff:attempt:{}:{}:{}", self.tag, self.eid, index)
87 }
88
89 pub fn attempt_usage(&self, index: AttemptIndex) -> String {
91 format!("ff:attempt:{}:{}:{}:usage", self.tag, self.eid, index)
92 }
93
94 pub fn attempt_policy(&self, index: AttemptIndex) -> String {
96 format!("ff:attempt:{}:{}:{}:policy", self.tag, self.eid, index)
97 }
98
99 pub fn stream(&self, index: AttemptIndex) -> String {
103 format!("ff:stream:{}:{}:{}", self.tag, self.eid, index)
104 }
105
106 pub fn stream_meta(&self, index: AttemptIndex) -> String {
108 format!("ff:stream:{}:{}:{}:meta", self.tag, self.eid, index)
109 }
110
111 pub fn suspension_current(&self) -> String {
115 format!("ff:exec:{}:{}:suspension:current", self.tag, self.eid)
116 }
117
118 pub fn waitpoints(&self) -> String {
120 format!("ff:exec:{}:{}:waitpoints", self.tag, self.eid)
121 }
122
123 pub fn waitpoint(&self, wp_id: &WaitpointId) -> String {
125 format!("ff:wp:{}:{}", self.tag, wp_id)
126 }
127
128 pub fn waitpoint_signals(&self, wp_id: &WaitpointId) -> String {
130 format!("ff:wp:{}:{}:signals", self.tag, wp_id)
131 }
132
133 pub fn waitpoint_condition(&self, wp_id: &WaitpointId) -> String {
135 format!("ff:wp:{}:{}:condition", self.tag, wp_id)
136 }
137
138 pub fn signal(&self, sig_id: &SignalId) -> String {
142 format!("ff:signal:{}:{}", self.tag, sig_id)
143 }
144
145 pub fn signal_payload(&self, sig_id: &SignalId) -> String {
147 format!("ff:signal:{}:{}:payload", self.tag, sig_id)
148 }
149
150 pub fn exec_signals(&self) -> String {
152 format!("ff:exec:{}:{}:signals", self.tag, self.eid)
153 }
154
155 pub fn signal_dedup(&self, wp_id: &WaitpointId, idempotency_key: &str) -> String {
157 format!("ff:sigdedup:{}:{}:{}", self.tag, wp_id, idempotency_key)
158 }
159
160 pub fn deps_meta(&self) -> String {
164 format!("ff:exec:{}:{}:deps:meta", self.tag, self.eid)
165 }
166
167 pub fn dep_edge(&self, edge_id: &EdgeId) -> String {
169 format!("ff:exec:{}:{}:dep:{}", self.tag, self.eid, edge_id)
170 }
171
172 pub fn deps_unresolved(&self) -> String {
174 format!("ff:exec:{}:{}:deps:unresolved", self.tag, self.eid)
175 }
176
177 pub fn deps_all_edges(&self) -> String {
182 format!("ff:exec:{}:{}:deps:all_edges", self.tag, self.eid)
183 }
184
185 pub fn noop(&self) -> String {
191 format!("ff:noop:{}", self.tag)
192 }
193
194 pub fn hash_tag(&self) -> &str {
195 &self.tag
196 }
197
198 pub fn execution_id_str(&self) -> &str {
199 &self.eid
200 }
201}
202
203pub struct IndexKeys {
207 tag: String,
208}
209
210impl IndexKeys {
211 pub fn new(partition: &Partition) -> Self {
212 Self {
213 tag: partition.hash_tag(),
214 }
215 }
216
217 pub fn lane_eligible(&self, lane_id: &LaneId) -> String {
219 format!("ff:idx:{}:lane:{}:eligible", self.tag, lane_id)
220 }
221
222 pub fn lane_delayed(&self, lane_id: &LaneId) -> String {
224 format!("ff:idx:{}:lane:{}:delayed", self.tag, lane_id)
225 }
226
227 pub fn lane_active(&self, lane_id: &LaneId) -> String {
229 format!("ff:idx:{}:lane:{}:active", self.tag, lane_id)
230 }
231
232 pub fn lane_terminal(&self, lane_id: &LaneId) -> String {
234 format!("ff:idx:{}:lane:{}:terminal", self.tag, lane_id)
235 }
236
237 pub fn lane_blocked_dependencies(&self, lane_id: &LaneId) -> String {
239 format!("ff:idx:{}:lane:{}:blocked:dependencies", self.tag, lane_id)
240 }
241
242 pub fn lane_blocked_budget(&self, lane_id: &LaneId) -> String {
244 format!("ff:idx:{}:lane:{}:blocked:budget", self.tag, lane_id)
245 }
246
247 pub fn lane_blocked_quota(&self, lane_id: &LaneId) -> String {
249 format!("ff:idx:{}:lane:{}:blocked:quota", self.tag, lane_id)
250 }
251
252 pub fn lane_blocked_route(&self, lane_id: &LaneId) -> String {
254 format!("ff:idx:{}:lane:{}:blocked:route", self.tag, lane_id)
255 }
256
257 pub fn lane_blocked_operator(&self, lane_id: &LaneId) -> String {
259 format!("ff:idx:{}:lane:{}:blocked:operator", self.tag, lane_id)
260 }
261
262 pub fn lane_suspended(&self, lane_id: &LaneId) -> String {
264 format!("ff:idx:{}:lane:{}:suspended", self.tag, lane_id)
265 }
266
267 pub fn waitpoint_hmac_secrets(&self) -> String {
276 format!("ff:sec:{}:waitpoint_hmac", self.tag)
277 }
278
279 pub fn lease_expiry(&self) -> String {
281 format!("ff:idx:{}:lease_expiry", self.tag)
282 }
283
284 pub fn worker_leases(&self, wid: &WorkerInstanceId) -> String {
286 format!("ff:idx:{}:worker:{}:leases", self.tag, wid)
287 }
288
289 pub fn suspension_timeout(&self) -> String {
291 format!("ff:idx:{}:suspension_timeout", self.tag)
292 }
293
294 pub fn pending_waitpoint_expiry(&self) -> String {
296 format!("ff:idx:{}:pending_waitpoint_expiry", self.tag)
297 }
298
299 pub fn attempt_timeout(&self) -> String {
301 format!("ff:idx:{}:attempt_timeout", self.tag)
302 }
303
304 pub fn execution_deadline(&self) -> String {
306 format!("ff:idx:{}:execution_deadline", self.tag)
307 }
308
309 pub fn all_executions(&self) -> String {
311 format!("ff:idx:{}:all_executions", self.tag)
312 }
313}
314
315pub struct FlowKeyContext {
319 tag: String,
320 fid: String,
321}
322
323impl FlowKeyContext {
324 pub fn new(partition: &Partition, fid: &FlowId) -> Self {
325 Self {
326 tag: partition.hash_tag(),
327 fid: fid.to_string(),
328 }
329 }
330
331 pub fn core(&self) -> String {
333 format!("ff:flow:{}:{}:core", self.tag, self.fid)
334 }
335
336 pub fn members(&self) -> String {
338 format!("ff:flow:{}:{}:members", self.tag, self.fid)
339 }
340
341 pub fn tags(&self) -> String {
346 format!("ff:flow:{}:{}:tags", self.tag, self.fid)
347 }
348
349 pub fn member(&self, eid: &ExecutionId) -> String {
351 format!("ff:flow:{}:{}:member:{}", self.tag, self.fid, eid)
352 }
353
354 pub fn edge(&self, edge_id: &EdgeId) -> String {
356 format!("ff:flow:{}:{}:edge:{}", self.tag, self.fid, edge_id)
357 }
358
359 pub fn outgoing(&self, upstream_eid: &ExecutionId) -> String {
361 format!("ff:flow:{}:{}:out:{}", self.tag, self.fid, upstream_eid)
362 }
363
364 pub fn incoming(&self, downstream_eid: &ExecutionId) -> String {
366 format!("ff:flow:{}:{}:in:{}", self.tag, self.fid, downstream_eid)
367 }
368
369 pub fn events(&self) -> String {
371 format!("ff:flow:{}:{}:events", self.tag, self.fid)
372 }
373
374 pub fn summary(&self) -> String {
376 format!("ff:flow:{}:{}:summary", self.tag, self.fid)
377 }
378
379 pub fn grant(&self, mutation_id: &str) -> String {
381 format!("ff:flow:{}:{}:grant:{}", self.tag, self.fid, mutation_id)
382 }
383
384 pub fn pending_cancels(&self) -> String {
391 format!("ff:flow:{}:{}:pending_cancels", self.tag, self.fid)
392 }
393
394 pub fn hash_tag(&self) -> &str {
395 &self.tag
396 }
397}
398
399pub struct FlowIndexKeys {
401 tag: String,
402}
403
404impl FlowIndexKeys {
405 pub fn new(partition: &Partition) -> Self {
406 Self {
407 tag: partition.hash_tag(),
408 }
409 }
410
411 pub fn flow_index(&self) -> String {
414 format!("ff:idx:{}:flow_index", self.tag)
415 }
416
417 pub fn cancel_backlog(&self) -> String {
425 format!("ff:idx:{}:cancel_backlog", self.tag)
426 }
427}
428
429pub struct BudgetKeyContext {
433 tag: String,
434 bid: String,
435}
436
437impl BudgetKeyContext {
438 pub fn new(partition: &Partition, bid: &BudgetId) -> Self {
439 Self {
440 tag: partition.hash_tag(),
441 bid: bid.to_string(),
442 }
443 }
444
445 pub fn definition(&self) -> String {
447 format!("ff:budget:{}:{}", self.tag, self.bid)
448 }
449
450 pub fn limits(&self) -> String {
452 format!("ff:budget:{}:{}:limits", self.tag, self.bid)
453 }
454
455 pub fn usage(&self) -> String {
457 format!("ff:budget:{}:{}:usage", self.tag, self.bid)
458 }
459
460 pub fn executions(&self) -> String {
462 format!("ff:budget:{}:{}:executions", self.tag, self.bid)
463 }
464
465 pub fn hash_tag(&self) -> &str {
466 &self.tag
467 }
468}
469
470pub fn budget_attach_key(tag: &str, scope_type: &str, scope_id: &str) -> String {
472 format!("ff:budget_attach:{}:{}:{}", tag, scope_type, scope_id)
473}
474
475pub fn budget_resets_key(tag: &str) -> String {
477 format!("ff:idx:{}:budget_resets", tag)
478}
479
480pub fn budget_policies_index(tag: &str) -> String {
483 format!("ff:idx:{}:budget_policies", tag)
484}
485
486pub struct QuotaKeyContext {
490 tag: String,
491 qid: String,
492}
493
494impl QuotaKeyContext {
495 pub fn new(partition: &Partition, qid: &QuotaPolicyId) -> Self {
496 Self {
497 tag: partition.hash_tag(),
498 qid: qid.to_string(),
499 }
500 }
501
502 pub fn definition(&self) -> String {
504 format!("ff:quota:{}:{}", self.tag, self.qid)
505 }
506
507 pub fn window(&self, dimension: &str) -> String {
509 format!("ff:quota:{}:{}:window:{}", self.tag, self.qid, dimension)
510 }
511
512 pub fn concurrency(&self) -> String {
514 format!("ff:quota:{}:{}:concurrency", self.tag, self.qid)
515 }
516
517 pub fn admitted(&self, eid: &ExecutionId) -> String {
519 format!("ff:quota:{}:{}:admitted:{}", self.tag, self.qid, eid)
520 }
521
522 pub fn admitted_set(&self) -> String {
525 format!("ff:quota:{}:{}:admitted_set", self.tag, self.qid)
526 }
527
528 pub fn hash_tag(&self) -> &str {
529 &self.tag
530 }
531}
532
533pub fn quota_policies_index(tag: &str) -> String {
536 format!("ff:idx:{}:quota_policies", tag)
537}
538
539pub fn quota_attach_key(tag: &str, scope_type: &str, scope_id: &str) -> String {
541 format!("ff:quota_attach:{}:{}:{}", tag, scope_type, scope_id)
542}
543
544pub fn lane_config_key(lane_id: &LaneId) -> String {
548 format!("ff:lane:{}:config", lane_id)
549}
550
551pub fn lane_counts_key(lane_id: &LaneId) -> String {
553 format!("ff:lane:{}:counts", lane_id)
554}
555
556pub fn worker_key(wid: &WorkerInstanceId) -> String {
558 format!("ff:worker:{}", wid)
559}
560
561pub fn worker_caps_key(wid: &WorkerInstanceId) -> String {
569 format!("ff:worker:{}:caps", wid)
570}
571
572pub fn workers_index_key() -> String {
579 "ff:idx:workers".to_owned()
580}
581
582pub fn workers_capability_key(key: &str, value: &str) -> String {
584 format!("ff:idx:workers:cap:{}:{}", key, value)
585}
586
587pub fn lanes_index_key() -> String {
589 "ff:idx:lanes".to_owned()
590}
591
592pub fn global_config_partitions() -> String {
595 "ff:config:partitions".to_owned()
596}
597
598pub fn namespace_executions_key(namespace: &str) -> String {
602 format!("ff:ns:{}:executions", namespace)
603}
604
605pub fn idempotency_key(tag: &str, namespace: &str, idem_key: &str) -> String {
610 format!("ff:idem:{}:{}:{}", tag, namespace, idem_key)
611}
612
613pub fn noop_key(tag: &str) -> String {
619 format!("ff:noop:{}", tag)
620}
621
622pub fn tag_index_key(namespace: &str, key: &str, value: &str) -> String {
624 format!("ff:tag:{}:{}:{}", namespace, key, value)
625}
626
627pub fn waitpoint_key_resolution(tag: &str, wp_key: &WaitpointKey) -> String {
629 format!("ff:wpkey:{}:{}", tag, wp_key)
630}
631
632pub const USAGE_DEDUP_KEY_PREFIX: &str = "ff:usagededup:";
638
639pub fn usage_dedup_key(hash_tag: &str, dedup_id: &str) -> String {
645 format!("{USAGE_DEDUP_KEY_PREFIX}{hash_tag}:{dedup_id}")
646}
647
648#[cfg(test)]
649mod tests {
650 use super::*;
651 use crate::partition::{execution_partition, flow_partition, PartitionConfig};
652
653 #[test]
654 fn exec_key_context_core_format() {
655 let config = PartitionConfig::default();
656 let eid = ExecutionId::parse("{fp:0}:550e8400-e29b-41d4-a716-446655440000").unwrap();
657 let partition = execution_partition(&eid, &config);
658 let ctx = ExecKeyContext::new(&partition, &eid);
659
660 let core_key = ctx.core();
661 assert!(core_key.starts_with("ff:exec:{fp:"));
663 assert!(core_key.ends_with(":core"));
664 assert!(core_key.contains("550e8400-e29b-41d4-a716-446655440000"));
665 }
666
667 #[test]
668 fn exec_key_all_keys_share_hash_tag() {
669 let config = PartitionConfig::default();
670 let eid = ExecutionId::for_flow(&FlowId::new(), &config);
671 let partition = execution_partition(&eid, &config);
672 let ctx = ExecKeyContext::new(&partition, &eid);
673 let tag = ctx.hash_tag();
674
675 assert!(ctx.core().contains(tag));
677 assert!(ctx.payload().contains(tag));
678 assert!(ctx.result().contains(tag));
679 assert!(ctx.policy().contains(tag));
680 assert!(ctx.lease_current().contains(tag));
681 assert!(ctx.lease_history().contains(tag));
682 assert!(ctx.attempts().contains(tag));
683 assert!(ctx.suspension_current().contains(tag));
684 assert!(ctx.exec_signals().contains(tag));
685 }
686
687 #[test]
688 fn attempt_key_includes_index() {
689 let config = PartitionConfig::default();
690 let eid = ExecutionId::for_flow(&FlowId::new(), &config);
691 let partition = execution_partition(&eid, &config);
692 let ctx = ExecKeyContext::new(&partition, &eid);
693
694 let key = ctx.attempt_hash(AttemptIndex::new(3));
695 assert!(key.contains(":3"), "attempt key should contain index");
696 }
697
698 #[test]
699 fn stream_key_format() {
700 let config = PartitionConfig::default();
701 let eid = ExecutionId::for_flow(&FlowId::new(), &config);
702 let partition = execution_partition(&eid, &config);
703 let ctx = ExecKeyContext::new(&partition, &eid);
704
705 let key = ctx.stream(AttemptIndex::new(0));
706 assert!(key.starts_with("ff:stream:{fp:"));
707 assert!(key.ends_with(":0"));
708 }
709
710 #[test]
711 fn index_keys_format() {
712 let config = PartitionConfig::default();
713 let eid = ExecutionId::for_flow(&FlowId::new(), &config);
714 let partition = execution_partition(&eid, &config);
715 let idx = IndexKeys::new(&partition);
716 let lane = LaneId::new("default");
717
718 assert!(idx.lane_eligible(&lane).contains(":lane:default:eligible"));
719 assert!(idx.lane_delayed(&lane).contains(":lane:default:delayed"));
720 assert!(idx.lease_expiry().contains(":lease_expiry"));
721 assert!(idx.all_executions().contains(":all_executions"));
722 }
723
724 #[test]
725 fn flow_key_context_format() {
726 let config = PartitionConfig::default();
727 let fid = FlowId::new();
728 let partition = flow_partition(&fid, &config);
729 let ctx = FlowKeyContext::new(&partition, &fid);
730
731 assert!(ctx.core().starts_with("ff:flow:{fp:"));
732 assert!(ctx.core().ends_with(":core"));
733 assert!(ctx.members().ends_with(":members"));
734 }
735
736 #[test]
737 fn global_keys_no_hash_tag() {
738 let lane = LaneId::new("default");
739 let key = lane_config_key(&lane);
740 assert_eq!(key, "ff:lane:default:config");
741 assert!(!key.contains('{'));
742 }
743
744 #[test]
745 fn usage_dedup_key_format() {
746 let key = usage_dedup_key("{bp:7}", "dedup-123");
748 assert_eq!(key, "ff:usagededup:{bp:7}:dedup-123");
749 assert!(key.starts_with(USAGE_DEDUP_KEY_PREFIX));
750 assert_eq!(key.matches('{').count(), 1);
752 assert_eq!(key.matches('}').count(), 1);
753 }
754}