1use crate::policy::ExecutionPolicy;
7use crate::state::{AttemptType, PublicState, StateVector};
8use crate::types::{
9 AttemptId, AttemptIndex, CancelSource, EdgeId, ExecutionId, FlowId, LaneId, LeaseEpoch,
10 LeaseFence, LeaseId, Namespace, SignalId, SuspensionId, TimestampMs, WaitpointId,
11 WaitpointToken, WorkerId, WorkerInstanceId,
12};
13use serde::{Deserialize, Serialize};
14use std::collections::{BTreeMap, BTreeSet, HashMap};
15
16#[derive(Clone, Debug, Serialize, Deserialize)]
19pub struct CreateExecutionArgs {
20 pub execution_id: ExecutionId,
21 pub namespace: Namespace,
22 pub lane_id: LaneId,
23 pub execution_kind: String,
24 pub input_payload: Vec<u8>,
25 #[serde(default)]
26 pub payload_encoding: Option<String>,
27 pub priority: i32,
28 pub creator_identity: String,
29 #[serde(default)]
30 pub idempotency_key: Option<String>,
31 #[serde(default)]
32 pub tags: HashMap<String, String>,
33 #[serde(default)]
35 pub policy: Option<ExecutionPolicy>,
36 #[serde(default)]
38 pub delay_until: Option<TimestampMs>,
39 #[serde(default)]
41 pub execution_deadline_at: Option<TimestampMs>,
42 pub partition_id: u16,
44 pub now: TimestampMs,
45}
46
47#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
48pub enum CreateExecutionResult {
49 Created {
51 execution_id: ExecutionId,
52 public_state: PublicState,
53 },
54 Duplicate { execution_id: ExecutionId },
56}
57
58#[derive(Clone, Debug, Serialize, Deserialize)]
61pub struct IssueClaimGrantArgs {
62 pub execution_id: ExecutionId,
63 pub lane_id: LaneId,
64 pub worker_id: WorkerId,
65 pub worker_instance_id: WorkerInstanceId,
66 #[serde(default)]
67 pub capability_hash: Option<String>,
68 #[serde(default)]
69 pub route_snapshot_json: Option<String>,
70 #[serde(default)]
71 pub admission_summary: Option<String>,
72 #[serde(default)]
77 pub worker_capabilities: BTreeSet<String>,
78 pub grant_ttl_ms: u64,
79 pub now: TimestampMs,
82}
83
84#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
85pub enum IssueClaimGrantResult {
86 Granted { execution_id: ExecutionId },
88}
89
90#[derive(Clone, Debug, PartialEq, Eq)]
108pub struct ClaimGrant {
109 pub execution_id: ExecutionId,
111 pub partition_key: crate::partition::PartitionKey,
118 pub grant_key: String,
121 pub expires_at_ms: u64,
123}
124
125impl ClaimGrant {
126 pub fn partition(
137 &self,
138 ) -> Result<crate::partition::Partition, crate::partition::PartitionKeyParseError> {
139 self.partition_key.parse()
140 }
141}
142
143#[derive(Clone, Debug, PartialEq, Eq)]
184pub struct ReclaimGrant {
185 pub execution_id: ExecutionId,
187 pub partition_key: crate::partition::PartitionKey,
193 pub grant_key: String,
196 pub expires_at_ms: u64,
199 pub lane_id: LaneId,
203}
204
205impl ReclaimGrant {
206 pub fn partition(
210 &self,
211 ) -> Result<crate::partition::Partition, crate::partition::PartitionKeyParseError> {
212 self.partition_key.parse()
213 }
214}
215
216#[derive(Clone, Debug, Serialize, Deserialize)]
219pub struct ClaimExecutionArgs {
220 pub execution_id: ExecutionId,
221 pub worker_id: WorkerId,
222 pub worker_instance_id: WorkerInstanceId,
223 pub lane_id: LaneId,
224 pub lease_id: LeaseId,
225 pub lease_ttl_ms: u64,
226 pub attempt_id: AttemptId,
227 pub expected_attempt_index: AttemptIndex,
230 #[serde(default)]
232 pub attempt_policy_json: String,
233 #[serde(default)]
235 pub attempt_timeout_ms: Option<u64>,
236 #[serde(default)]
238 pub execution_deadline_at: Option<i64>,
239 pub now: TimestampMs,
240}
241
242#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
243pub struct ClaimedExecution {
244 pub execution_id: ExecutionId,
245 pub lease_id: LeaseId,
246 pub lease_epoch: LeaseEpoch,
247 pub attempt_index: AttemptIndex,
248 pub attempt_id: AttemptId,
249 pub attempt_type: AttemptType,
250 pub lease_expires_at: TimestampMs,
251}
252
253#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
254pub enum ClaimExecutionResult {
255 Claimed(ClaimedExecution),
257}
258
259#[derive(Clone, Debug, Serialize, Deserialize)]
262pub struct CompleteExecutionArgs {
263 pub execution_id: ExecutionId,
264 #[serde(default)]
269 pub fence: Option<LeaseFence>,
270 pub attempt_index: AttemptIndex,
271 #[serde(default)]
272 pub result_payload: Option<Vec<u8>>,
273 #[serde(default)]
274 pub result_encoding: Option<String>,
275 #[serde(default)]
277 pub source: CancelSource,
278 pub now: TimestampMs,
279}
280
281#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
282pub enum CompleteExecutionResult {
283 Completed {
285 execution_id: ExecutionId,
286 public_state: PublicState,
287 },
288}
289
290#[derive(Clone, Debug, Serialize, Deserialize)]
293pub struct RenewLeaseArgs {
294 pub execution_id: ExecutionId,
295 pub attempt_index: AttemptIndex,
296 pub fence: Option<LeaseFence>,
299 pub lease_ttl_ms: u64,
301 #[serde(default = "default_lease_history_grace_ms")]
303 pub lease_history_grace_ms: u64,
304}
305
306fn default_lease_history_grace_ms() -> u64 {
307 60_000
308}
309
310#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
311pub enum RenewLeaseResult {
312 Renewed { expires_at: TimestampMs },
314}
315
316#[derive(Clone, Debug, Serialize, Deserialize)]
319pub struct MarkLeaseExpiredArgs {
320 pub execution_id: ExecutionId,
321}
322
323#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
324pub enum MarkLeaseExpiredResult {
325 MarkedExpired,
327 AlreadySatisfied { reason: String },
329}
330
331#[derive(Clone, Debug, Serialize, Deserialize)]
334pub struct CancelExecutionArgs {
335 pub execution_id: ExecutionId,
336 pub reason: String,
337 #[serde(default)]
338 pub source: CancelSource,
339 #[serde(default)]
341 pub lease_id: Option<LeaseId>,
342 #[serde(default)]
343 pub lease_epoch: Option<LeaseEpoch>,
344 #[serde(default)]
346 pub attempt_id: Option<AttemptId>,
347 pub now: TimestampMs,
348}
349
350#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
351pub enum CancelExecutionResult {
352 Cancelled {
354 execution_id: ExecutionId,
355 public_state: PublicState,
356 },
357}
358
359#[derive(Clone, Debug, Serialize, Deserialize)]
362pub struct RevokeLeaseArgs {
363 pub execution_id: ExecutionId,
364 #[serde(default)]
366 pub expected_lease_id: Option<String>,
367 pub worker_instance_id: WorkerInstanceId,
369 pub reason: String,
370}
371
372#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
373pub enum RevokeLeaseResult {
374 Revoked {
376 lease_id: String,
377 lease_epoch: String,
378 },
379 AlreadySatisfied { reason: String },
381}
382
383#[derive(Clone, Debug, Serialize, Deserialize)]
386pub struct DelayExecutionArgs {
387 pub execution_id: ExecutionId,
388 #[serde(default)]
391 pub fence: Option<LeaseFence>,
392 pub attempt_index: AttemptIndex,
393 pub delay_until: TimestampMs,
394 #[serde(default)]
395 pub source: CancelSource,
396 pub now: TimestampMs,
397}
398
399#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
400pub enum DelayExecutionResult {
401 Delayed {
403 execution_id: ExecutionId,
404 public_state: PublicState,
405 },
406}
407
408#[derive(Clone, Debug, Serialize, Deserialize)]
411pub struct MoveToWaitingChildrenArgs {
412 pub execution_id: ExecutionId,
413 #[serde(default)]
416 pub fence: Option<LeaseFence>,
417 pub attempt_index: AttemptIndex,
418 #[serde(default)]
419 pub source: CancelSource,
420 pub now: TimestampMs,
421}
422
423#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
424pub enum MoveToWaitingChildrenResult {
425 Moved {
427 execution_id: ExecutionId,
428 public_state: PublicState,
429 },
430}
431
432#[derive(Clone, Debug, Serialize, Deserialize)]
435pub struct ChangePriorityArgs {
436 pub execution_id: ExecutionId,
437 pub new_priority: i32,
438 pub lane_id: LaneId,
439 pub now: TimestampMs,
440}
441
442#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
443pub enum ChangePriorityResult {
444 Changed { execution_id: ExecutionId },
446}
447
448#[derive(Clone, Debug, Serialize, Deserialize)]
451pub struct UpdateProgressArgs {
452 pub execution_id: ExecutionId,
453 pub lease_id: LeaseId,
454 pub lease_epoch: LeaseEpoch,
455 pub attempt_id: AttemptId,
456 #[serde(default)]
457 pub progress_pct: Option<u8>,
458 #[serde(default)]
459 pub progress_message: Option<String>,
460 pub now: TimestampMs,
461}
462
463#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
464pub enum UpdateProgressResult {
465 Updated,
467}
468
469#[derive(Clone, Debug, Serialize, Deserialize)]
476pub struct FailExecutionArgs {
477 pub execution_id: ExecutionId,
478 #[serde(default)]
481 pub fence: Option<LeaseFence>,
482 pub attempt_index: AttemptIndex,
483 pub failure_reason: String,
484 pub failure_category: String,
485 #[serde(default)]
487 pub retry_policy_json: String,
488 #[serde(default)]
490 pub next_attempt_policy_json: String,
491 #[serde(default)]
492 pub source: CancelSource,
493}
494
495#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
497pub enum FailExecutionResult {
498 RetryScheduled {
500 delay_until: TimestampMs,
501 next_attempt_index: AttemptIndex,
502 },
503 TerminalFailed,
505}
506
507#[derive(Clone, Debug, Serialize, Deserialize)]
510pub struct IssueReclaimGrantArgs {
511 pub execution_id: ExecutionId,
512 pub worker_id: WorkerId,
513 pub worker_instance_id: WorkerInstanceId,
514 pub lane_id: LaneId,
515 #[serde(default)]
516 pub capability_hash: Option<String>,
517 pub grant_ttl_ms: u64,
518 #[serde(default)]
519 pub route_snapshot_json: Option<String>,
520 #[serde(default)]
521 pub admission_summary: Option<String>,
522 pub now: TimestampMs,
527}
528
529#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
530pub enum IssueReclaimGrantResult {
531 Granted { expires_at_ms: TimestampMs },
533}
534
535#[derive(Clone, Debug, Serialize, Deserialize)]
538pub struct ReclaimExecutionArgs {
539 pub execution_id: ExecutionId,
540 pub worker_id: WorkerId,
541 pub worker_instance_id: WorkerInstanceId,
542 pub lane_id: LaneId,
543 #[serde(default)]
544 pub capability_hash: Option<String>,
545 pub lease_id: LeaseId,
546 pub lease_ttl_ms: u64,
547 pub attempt_id: AttemptId,
548 #[serde(default)]
550 pub attempt_policy_json: String,
551 #[serde(default = "default_max_reclaim_count")]
553 pub max_reclaim_count: u32,
554 pub old_worker_instance_id: WorkerInstanceId,
556 pub current_attempt_index: AttemptIndex,
558}
559
560fn default_max_reclaim_count() -> u32 {
561 100
562}
563
564#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
565pub enum ReclaimExecutionResult {
566 Reclaimed {
568 new_attempt_index: AttemptIndex,
569 new_attempt_id: AttemptId,
570 new_lease_id: LeaseId,
571 new_lease_epoch: LeaseEpoch,
572 lease_expires_at: TimestampMs,
573 },
574 MaxReclaimsExceeded,
576}
577
578#[derive(Clone, Debug, Serialize, Deserialize)]
581pub struct ExpireExecutionArgs {
582 pub execution_id: ExecutionId,
583 pub expire_reason: String,
585}
586
587#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
588pub enum ExpireExecutionResult {
589 Expired { execution_id: ExecutionId },
591 AlreadyTerminal,
593}
594
595#[derive(Clone, Debug, Serialize, Deserialize)]
602pub struct SuspendExecutionArgs {
603 pub execution_id: ExecutionId,
604 pub fence: Option<LeaseFence>,
607 pub attempt_index: AttemptIndex,
608 pub suspension_id: SuspensionId,
609 pub waitpoint_id: WaitpointId,
610 pub waitpoint_key: String,
611 pub reason_code: String,
612 pub requested_by: String,
613 pub resume_condition_json: String,
614 pub resume_policy_json: String,
615 #[serde(default)]
616 pub continuation_metadata_pointer: Option<String>,
617 #[serde(default)]
618 pub timeout_at: Option<TimestampMs>,
619 #[serde(default)]
621 pub use_pending_waitpoint: bool,
622 #[serde(default = "default_timeout_behavior")]
624 pub timeout_behavior: String,
625}
626
627fn default_timeout_behavior() -> String {
628 "fail".to_owned()
629}
630
631#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
632pub enum SuspendExecutionResult {
633 Suspended {
635 suspension_id: SuspensionId,
636 waitpoint_id: WaitpointId,
637 waitpoint_key: String,
638 waitpoint_token: WaitpointToken,
642 },
643 AlreadySatisfied {
646 suspension_id: SuspensionId,
647 waitpoint_id: WaitpointId,
648 waitpoint_key: String,
649 waitpoint_token: WaitpointToken,
650 },
651}
652
653#[derive(Clone, Debug, Serialize, Deserialize)]
656pub struct ResumeExecutionArgs {
657 pub execution_id: ExecutionId,
658 #[serde(default = "default_trigger_type")]
660 pub trigger_type: String,
661 #[serde(default)]
663 pub resume_delay_ms: u64,
664}
665
666fn default_trigger_type() -> String {
667 "signal".to_owned()
668}
669
670#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
671pub enum ResumeExecutionResult {
672 Resumed { public_state: PublicState },
674}
675
676#[derive(Clone, Debug, Serialize, Deserialize)]
679pub struct CreatePendingWaitpointArgs {
680 pub execution_id: ExecutionId,
681 pub lease_id: LeaseId,
682 pub lease_epoch: LeaseEpoch,
683 pub attempt_index: AttemptIndex,
684 pub attempt_id: AttemptId,
685 pub waitpoint_id: WaitpointId,
686 pub waitpoint_key: String,
687 pub expires_in_ms: u64,
689}
690
691#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
692pub enum CreatePendingWaitpointResult {
693 Created {
695 waitpoint_id: WaitpointId,
696 waitpoint_key: String,
697 waitpoint_token: WaitpointToken,
701 },
702}
703
704#[derive(Clone, Debug, Serialize, Deserialize)]
707pub struct CloseWaitpointArgs {
708 pub waitpoint_id: WaitpointId,
709 pub reason: String,
710}
711
712#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
713pub enum CloseWaitpointResult {
714 Closed,
716}
717
718#[derive(Clone, Debug, Serialize, Deserialize)]
721pub struct DeliverSignalArgs {
722 pub execution_id: ExecutionId,
723 pub waitpoint_id: WaitpointId,
724 pub signal_id: SignalId,
725 pub signal_name: String,
726 pub signal_category: String,
727 pub source_type: String,
728 pub source_identity: String,
729 #[serde(default)]
730 pub payload: Option<Vec<u8>>,
731 #[serde(default)]
732 pub payload_encoding: Option<String>,
733 #[serde(default)]
734 pub correlation_id: Option<String>,
735 #[serde(default)]
736 pub idempotency_key: Option<String>,
737 pub target_scope: String,
738 #[serde(default)]
739 pub created_at: Option<TimestampMs>,
740 #[serde(default)]
742 pub dedup_ttl_ms: Option<u64>,
743 #[serde(default)]
745 pub resume_delay_ms: Option<u64>,
746 #[serde(default)]
748 pub max_signals_per_execution: Option<u64>,
749 #[serde(default)]
751 pub signal_maxlen: Option<u64>,
752 pub waitpoint_token: WaitpointToken,
763 pub now: TimestampMs,
764}
765
766#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
767pub enum DeliverSignalResult {
768 Accepted { signal_id: SignalId, effect: String },
770 Duplicate { existing_signal_id: SignalId },
772}
773
774#[derive(Clone, Debug, Serialize, Deserialize)]
777pub struct BufferSignalArgs {
778 pub execution_id: ExecutionId,
779 pub waitpoint_id: WaitpointId,
780 pub signal_id: SignalId,
781 pub signal_name: String,
782 pub signal_category: String,
783 pub source_type: String,
784 pub source_identity: String,
785 #[serde(default)]
786 pub payload: Option<Vec<u8>>,
787 #[serde(default)]
788 pub payload_encoding: Option<String>,
789 #[serde(default)]
790 pub idempotency_key: Option<String>,
791 pub target_scope: String,
792 pub waitpoint_token: WaitpointToken,
795 pub now: TimestampMs,
796}
797
798#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
799pub enum BufferSignalResult {
800 Buffered { signal_id: SignalId },
802 Duplicate { existing_signal_id: SignalId },
804}
805
806#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
822pub struct PendingWaitpointInfo {
823 pub waitpoint_id: WaitpointId,
824 pub waitpoint_key: String,
825 pub state: String,
828 pub waitpoint_token: WaitpointToken,
831 #[serde(default)]
841 pub required_signal_names: Vec<String>,
842 pub created_at: TimestampMs,
844 #[serde(default, skip_serializing_if = "Option::is_none")]
847 pub activated_at: Option<TimestampMs>,
848 #[serde(default, skip_serializing_if = "Option::is_none")]
850 pub expires_at: Option<TimestampMs>,
851}
852
853#[derive(Clone, Debug, Serialize, Deserialize)]
856pub struct ExpireSuspensionArgs {
857 pub execution_id: ExecutionId,
858}
859
860#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
861pub enum ExpireSuspensionResult {
862 Expired { behavior_applied: String },
864 AlreadySatisfied { reason: String },
866}
867
868#[derive(Clone, Debug, Serialize, Deserialize)]
871pub struct ClaimResumedExecutionArgs {
872 pub execution_id: ExecutionId,
873 pub worker_id: WorkerId,
874 pub worker_instance_id: WorkerInstanceId,
875 pub lane_id: LaneId,
876 pub lease_id: LeaseId,
877 pub lease_ttl_ms: u64,
878 pub current_attempt_index: AttemptIndex,
880 #[serde(default)]
882 pub remaining_attempt_timeout_ms: Option<u64>,
883 pub now: TimestampMs,
884}
885
886#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
887pub struct ClaimedResumedExecution {
888 pub execution_id: ExecutionId,
889 pub lease_id: LeaseId,
890 pub lease_epoch: LeaseEpoch,
891 pub attempt_index: AttemptIndex,
892 pub attempt_id: AttemptId,
893 pub lease_expires_at: TimestampMs,
894}
895
896#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
897pub enum ClaimResumedExecutionResult {
898 Claimed(ClaimedResumedExecution),
900}
901
902#[derive(Clone, Debug, Serialize, Deserialize)]
909pub struct AppendFrameArgs {
910 pub execution_id: ExecutionId,
911 pub attempt_index: AttemptIndex,
912 pub lease_id: LeaseId,
913 pub lease_epoch: LeaseEpoch,
914 pub attempt_id: AttemptId,
915 pub frame_type: String,
916 pub timestamp: TimestampMs,
917 pub payload: Vec<u8>,
918 #[serde(default)]
919 pub encoding: Option<String>,
920 #[serde(default)]
922 pub metadata_json: Option<String>,
923 #[serde(default)]
924 pub correlation_id: Option<String>,
925 #[serde(default)]
926 pub source: Option<String>,
927 #[serde(default)]
929 pub retention_maxlen: Option<u32>,
930 #[serde(default)]
932 pub max_payload_bytes: Option<u32>,
933}
934
935#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
936pub enum AppendFrameResult {
937 Appended {
939 entry_id: String,
941 frame_count: u64,
943 },
944}
945
946#[derive(Clone, Debug, PartialEq, Eq, Hash)]
990pub enum StreamCursor {
991 Start,
993 End,
995 At(String),
1001}
1002
1003impl StreamCursor {
1004 pub fn from_beginning() -> Self {
1008 Self::At("0-0".to_owned())
1009 }
1010
1011 pub fn start() -> Self {
1015 Self::Start
1016 }
1017
1018 pub fn end() -> Self {
1020 Self::End
1021 }
1022
1023 pub fn beginning() -> Self {
1027 Self::from_beginning()
1028 }
1029
1030 #[doc(hidden)]
1041 pub fn to_wire(&self) -> &str {
1042 match self {
1043 Self::Start => "-",
1044 Self::End => "+",
1045 Self::At(s) => s.as_str(),
1046 }
1047 }
1048
1049 #[doc(hidden)]
1055 pub fn into_wire_string(self) -> String {
1056 match self {
1057 Self::Start => "-".to_owned(),
1058 Self::End => "+".to_owned(),
1059 Self::At(s) => s,
1060 }
1061 }
1062
1063 pub fn is_concrete(&self) -> bool {
1072 matches!(self, Self::At(_))
1073 }
1074}
1075
1076impl std::fmt::Display for StreamCursor {
1077 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1078 match self {
1079 Self::Start => f.write_str("start"),
1080 Self::End => f.write_str("end"),
1081 Self::At(s) => f.write_str(s),
1082 }
1083 }
1084}
1085
1086#[derive(Clone, Debug, PartialEq, Eq)]
1088pub enum StreamCursorParseError {
1089 Empty,
1091 BareMarkerRejected(String),
1095 Malformed(String),
1098}
1099
1100impl std::fmt::Display for StreamCursorParseError {
1101 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1102 match self {
1103 Self::Empty => f.write_str("stream cursor must not be empty"),
1104 Self::BareMarkerRejected(s) => write!(
1105 f,
1106 "bare marker '{s}' is not a valid stream cursor; use 'start' or 'end'"
1107 ),
1108 Self::Malformed(s) => write!(
1109 f,
1110 "invalid stream cursor '{s}' (expected 'start', 'end', '<ms>', or '<ms>-<seq>')"
1111 ),
1112 }
1113 }
1114}
1115
1116impl std::error::Error for StreamCursorParseError {}
1117
1118enum StreamCursorClass {
1126 Start,
1127 End,
1128 Concrete,
1129 BareMarker,
1130 Empty,
1131 Malformed,
1132}
1133
1134fn classify_stream_cursor(s: &str) -> StreamCursorClass {
1135 if s.is_empty() {
1136 return StreamCursorClass::Empty;
1137 }
1138 if s == "-" || s == "+" {
1139 return StreamCursorClass::BareMarker;
1140 }
1141 if s == "start" {
1142 return StreamCursorClass::Start;
1143 }
1144 if s == "end" {
1145 return StreamCursorClass::End;
1146 }
1147 if !s.is_ascii() {
1148 return StreamCursorClass::Malformed;
1149 }
1150 let (ms_part, seq_part) = match s.split_once('-') {
1151 Some((ms, seq)) => (ms, Some(seq)),
1152 None => (s, None),
1153 };
1154 let ms_ok = !ms_part.is_empty() && ms_part.bytes().all(|b| b.is_ascii_digit());
1155 let seq_ok = seq_part
1156 .map(|p| !p.is_empty() && p.bytes().all(|b| b.is_ascii_digit()))
1157 .unwrap_or(true);
1158 if ms_ok && seq_ok {
1159 StreamCursorClass::Concrete
1160 } else {
1161 StreamCursorClass::Malformed
1162 }
1163}
1164
1165impl std::str::FromStr for StreamCursor {
1166 type Err = StreamCursorParseError;
1167
1168 fn from_str(s: &str) -> Result<Self, Self::Err> {
1169 match classify_stream_cursor(s) {
1170 StreamCursorClass::Start => Ok(Self::Start),
1171 StreamCursorClass::End => Ok(Self::End),
1172 StreamCursorClass::Concrete => Ok(Self::At(s.to_owned())),
1173 StreamCursorClass::BareMarker => {
1174 Err(StreamCursorParseError::BareMarkerRejected(s.to_owned()))
1175 }
1176 StreamCursorClass::Empty => Err(StreamCursorParseError::Empty),
1177 StreamCursorClass::Malformed => {
1178 Err(StreamCursorParseError::Malformed(s.to_owned()))
1179 }
1180 }
1181 }
1182}
1183
1184impl TryFrom<String> for StreamCursor {
1185 type Error = StreamCursorParseError;
1186
1187 fn try_from(s: String) -> Result<Self, Self::Error> {
1188 match classify_stream_cursor(&s) {
1194 StreamCursorClass::Start => Ok(Self::Start),
1195 StreamCursorClass::End => Ok(Self::End),
1196 StreamCursorClass::Concrete => Ok(Self::At(s)),
1197 StreamCursorClass::BareMarker => {
1198 Err(StreamCursorParseError::BareMarkerRejected(s))
1199 }
1200 StreamCursorClass::Empty => Err(StreamCursorParseError::Empty),
1201 StreamCursorClass::Malformed => Err(StreamCursorParseError::Malformed(s)),
1202 }
1203 }
1204}
1205
1206impl From<StreamCursor> for String {
1207 fn from(c: StreamCursor) -> Self {
1208 c.to_string()
1209 }
1210}
1211
1212impl Serialize for StreamCursor {
1213 fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
1214 serializer.collect_str(self)
1215 }
1216}
1217
1218impl<'de> Deserialize<'de> for StreamCursor {
1219 fn deserialize<D: serde::Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
1220 let s = String::deserialize(deserializer)?;
1221 Self::try_from(s).map_err(serde::de::Error::custom)
1222 }
1223}
1224
1225pub const STREAM_READ_HARD_CAP: u64 = 10_000;
1234
1235#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1241pub struct StreamFrame {
1242 pub id: String,
1244 pub fields: std::collections::BTreeMap<String, String>,
1246}
1247
1248#[derive(Clone, Debug, Serialize, Deserialize)]
1250pub struct ReadFramesArgs {
1251 pub execution_id: ExecutionId,
1252 pub attempt_index: AttemptIndex,
1253 pub from_id: String,
1255 pub to_id: String,
1257 pub count_limit: u64,
1261}
1262
1263#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1266pub struct StreamFrames {
1267 pub frames: Vec<StreamFrame>,
1269 #[serde(default, skip_serializing_if = "Option::is_none")]
1272 pub closed_at: Option<TimestampMs>,
1273 #[serde(default, skip_serializing_if = "Option::is_none")]
1277 pub closed_reason: Option<String>,
1278}
1279
1280impl StreamFrames {
1281 pub fn empty_open() -> Self {
1284 Self {
1285 frames: Vec::new(),
1286 closed_at: None,
1287 closed_reason: None,
1288 }
1289 }
1290
1291 pub fn is_closed(&self) -> bool {
1294 self.closed_at.is_some()
1295 }
1296}
1297
1298#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1299pub enum ReadFramesResult {
1300 Frames(StreamFrames),
1302}
1303
1304#[derive(Clone, Debug, Serialize, Deserialize)]
1311pub struct CreateBudgetArgs {
1312 pub budget_id: crate::types::BudgetId,
1313 pub scope_type: String,
1314 pub scope_id: String,
1315 pub enforcement_mode: String,
1316 pub on_hard_limit: String,
1317 pub on_soft_limit: String,
1318 pub reset_interval_ms: u64,
1319 pub dimensions: Vec<String>,
1321 pub hard_limits: Vec<u64>,
1323 pub soft_limits: Vec<u64>,
1325 pub now: TimestampMs,
1326}
1327
1328#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1329pub enum CreateBudgetResult {
1330 Created { budget_id: crate::types::BudgetId },
1332 AlreadySatisfied { budget_id: crate::types::BudgetId },
1334}
1335
1336#[derive(Clone, Debug, Serialize, Deserialize)]
1339pub struct CreateQuotaPolicyArgs {
1340 pub quota_policy_id: crate::types::QuotaPolicyId,
1341 pub window_seconds: u64,
1342 pub max_requests_per_window: u64,
1343 pub max_concurrent: u64,
1344 pub now: TimestampMs,
1345}
1346
1347#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1348pub enum CreateQuotaPolicyResult {
1349 Created {
1351 quota_policy_id: crate::types::QuotaPolicyId,
1352 },
1353 AlreadySatisfied {
1355 quota_policy_id: crate::types::QuotaPolicyId,
1356 },
1357}
1358
1359#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1363pub struct BudgetStatus {
1364 pub budget_id: String,
1365 pub scope_type: String,
1366 pub scope_id: String,
1367 pub enforcement_mode: String,
1368 pub usage: HashMap<String, u64>,
1370 pub hard_limits: HashMap<String, u64>,
1372 pub soft_limits: HashMap<String, u64>,
1374 pub breach_count: u64,
1375 pub soft_breach_count: u64,
1376 pub last_breach_at: Option<String>,
1377 pub last_breach_dim: Option<String>,
1378 pub next_reset_at: Option<String>,
1379 pub created_at: Option<String>,
1380}
1381
1382#[derive(Clone, Debug, Serialize, Deserialize)]
1385pub struct ReportUsageArgs {
1386 pub dimensions: Vec<String>,
1388 pub deltas: Vec<u64>,
1390 pub now: TimestampMs,
1391 #[serde(default)]
1397 pub dedup_key: Option<String>,
1398}
1399
1400#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1401pub enum ReportUsageResult {
1402 Ok,
1404 SoftBreach {
1406 dimension: String,
1407 current_usage: u64,
1408 soft_limit: u64,
1409 },
1410 HardBreach {
1412 dimension: String,
1413 current_usage: u64,
1414 hard_limit: u64,
1415 },
1416 AlreadyApplied,
1418}
1419
1420#[derive(Clone, Debug, Serialize, Deserialize)]
1423pub struct ResetBudgetArgs {
1424 pub budget_id: crate::types::BudgetId,
1425 pub now: TimestampMs,
1426}
1427
1428#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1429pub enum ResetBudgetResult {
1430 Reset { next_reset_at: TimestampMs },
1432}
1433
1434#[derive(Clone, Debug, Serialize, Deserialize)]
1437pub struct CheckAdmissionArgs {
1438 pub execution_id: ExecutionId,
1439 pub now: TimestampMs,
1440 pub window_seconds: u64,
1441 pub rate_limit: u64,
1442 pub concurrency_cap: u64,
1443 #[serde(default)]
1444 pub jitter_ms: Option<u64>,
1445}
1446
1447#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1448pub enum CheckAdmissionResult {
1449 Admitted,
1451 AlreadyAdmitted,
1453 RateExceeded { retry_after_ms: u64 },
1455 ConcurrencyExceeded,
1457}
1458
1459#[derive(Clone, Debug, Serialize, Deserialize)]
1462pub struct ReleaseAdmissionArgs {
1463 pub execution_id: ExecutionId,
1464}
1465
1466#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1467pub enum ReleaseAdmissionResult {
1468 Released,
1469}
1470
1471#[derive(Clone, Debug, Serialize, Deserialize)]
1474pub struct BlockExecutionArgs {
1475 pub execution_id: ExecutionId,
1476 pub blocking_reason: String,
1477 #[serde(default)]
1478 pub blocking_detail: Option<String>,
1479 pub now: TimestampMs,
1480}
1481
1482#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1483pub enum BlockExecutionResult {
1484 Blocked,
1486}
1487
1488#[derive(Clone, Debug, Serialize, Deserialize)]
1491pub struct UnblockExecutionArgs {
1492 pub execution_id: ExecutionId,
1493 pub now: TimestampMs,
1494 #[serde(default)]
1496 pub expected_blocking_reason: Option<String>,
1497}
1498
1499#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1500pub enum UnblockExecutionResult {
1501 Unblocked,
1503}
1504
1505#[derive(Clone, Debug, Serialize, Deserialize)]
1512pub struct CreateFlowArgs {
1513 pub flow_id: crate::types::FlowId,
1514 pub flow_kind: String,
1515 pub namespace: Namespace,
1516 pub now: TimestampMs,
1517}
1518
1519#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1520pub enum CreateFlowResult {
1521 Created { flow_id: crate::types::FlowId },
1523 AlreadySatisfied { flow_id: crate::types::FlowId },
1525}
1526
1527#[derive(Clone, Debug, Serialize, Deserialize)]
1530pub struct AddExecutionToFlowArgs {
1531 pub flow_id: crate::types::FlowId,
1532 pub execution_id: ExecutionId,
1533 pub now: TimestampMs,
1534}
1535
1536#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1537pub enum AddExecutionToFlowResult {
1538 Added {
1540 execution_id: ExecutionId,
1541 new_node_count: u32,
1542 },
1543 AlreadyMember {
1545 execution_id: ExecutionId,
1546 node_count: u32,
1547 },
1548}
1549
1550#[derive(Clone, Debug, Serialize, Deserialize)]
1553pub struct CancelFlowArgs {
1554 pub flow_id: crate::types::FlowId,
1555 pub reason: String,
1556 pub cancellation_policy: String,
1557 pub now: TimestampMs,
1558}
1559
1560#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1561pub enum CancelFlowResult {
1562 Cancelled {
1574 cancellation_policy: String,
1575 member_execution_ids: Vec<String>,
1576 },
1577 CancellationScheduled {
1582 cancellation_policy: String,
1583 member_count: u32,
1584 member_execution_ids: Vec<String>,
1585 },
1586 PartiallyCancelled {
1600 cancellation_policy: String,
1601 member_execution_ids: Vec<String>,
1604 failed_member_execution_ids: Vec<String>,
1608 },
1609}
1610
1611#[derive(Clone, Debug, Serialize, Deserialize)]
1614pub struct StageDependencyEdgeArgs {
1615 pub flow_id: crate::types::FlowId,
1616 pub edge_id: crate::types::EdgeId,
1617 pub upstream_execution_id: ExecutionId,
1618 pub downstream_execution_id: ExecutionId,
1619 #[serde(default = "default_dependency_kind")]
1620 pub dependency_kind: String,
1621 #[serde(default)]
1622 pub data_passing_ref: Option<String>,
1623 pub expected_graph_revision: u64,
1624 pub now: TimestampMs,
1625}
1626
1627fn default_dependency_kind() -> String {
1628 "success_only".to_owned()
1629}
1630
1631#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1632pub enum StageDependencyEdgeResult {
1633 Staged {
1635 edge_id: crate::types::EdgeId,
1636 new_graph_revision: u64,
1637 },
1638}
1639
1640#[derive(Clone, Debug, Serialize, Deserialize)]
1643pub struct ApplyDependencyToChildArgs {
1644 pub flow_id: crate::types::FlowId,
1645 pub edge_id: crate::types::EdgeId,
1646 pub downstream_execution_id: ExecutionId,
1648 pub upstream_execution_id: ExecutionId,
1649 pub graph_revision: u64,
1650 #[serde(default = "default_dependency_kind")]
1651 pub dependency_kind: String,
1652 #[serde(default)]
1653 pub data_passing_ref: Option<String>,
1654 pub now: TimestampMs,
1655}
1656
1657#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1658pub enum ApplyDependencyToChildResult {
1659 Applied { unsatisfied_count: u32 },
1661 AlreadyApplied,
1663}
1664
1665#[derive(Clone, Debug, Serialize, Deserialize)]
1668pub struct ResolveDependencyArgs {
1669 pub edge_id: crate::types::EdgeId,
1670 pub upstream_outcome: String,
1672 pub now: TimestampMs,
1673}
1674
1675#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1676pub enum ResolveDependencyResult {
1677 Satisfied,
1679 Impossible,
1681 AlreadyResolved,
1683}
1684
1685#[derive(Clone, Debug, Serialize, Deserialize)]
1688pub struct PromoteBlockedToEligibleArgs {
1689 pub execution_id: ExecutionId,
1690 pub now: TimestampMs,
1691}
1692
1693#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1694pub enum PromoteBlockedToEligibleResult {
1695 Promoted,
1696}
1697
1698#[derive(Clone, Debug, Serialize, Deserialize)]
1701pub struct EvaluateFlowEligibilityArgs {
1702 pub execution_id: ExecutionId,
1703}
1704
1705#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1706pub enum EvaluateFlowEligibilityResult {
1707 Status { status: String },
1709}
1710
1711#[derive(Clone, Debug, Serialize, Deserialize)]
1714pub struct ReplayExecutionArgs {
1715 pub execution_id: ExecutionId,
1716 pub now: TimestampMs,
1717}
1718
1719#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1720pub enum ReplayExecutionResult {
1721 Replayed { public_state: PublicState },
1723}
1724
1725#[derive(Clone, Debug, Serialize, Deserialize)]
1729pub struct ExecutionInfo {
1730 pub execution_id: ExecutionId,
1731 pub namespace: String,
1732 pub lane_id: String,
1733 pub priority: i32,
1734 pub execution_kind: String,
1735 pub state_vector: StateVector,
1736 pub public_state: PublicState,
1737 pub created_at: String,
1738 #[serde(default, skip_serializing_if = "Option::is_none")]
1743 pub started_at: Option<String>,
1744 #[serde(default, skip_serializing_if = "Option::is_none")]
1748 pub completed_at: Option<String>,
1749 pub current_attempt_index: u32,
1750 pub flow_id: Option<String>,
1751 pub blocking_detail: String,
1752}
1753
1754#[derive(Clone, Debug, Serialize, Deserialize)]
1762pub struct SetExecutionTagsArgs {
1763 pub execution_id: ExecutionId,
1764 pub tags: BTreeMap<String, String>,
1765}
1766
1767#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1769pub enum SetExecutionTagsResult {
1770 Ok { count: u32 },
1772}
1773
1774#[derive(Clone, Debug, Serialize, Deserialize)]
1779pub struct SetFlowTagsArgs {
1780 pub flow_id: FlowId,
1781 pub tags: BTreeMap<String, String>,
1782}
1783
1784#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1786pub enum SetFlowTagsResult {
1787 Ok { count: u32 },
1789}
1790
1791#[derive(Clone, Debug, PartialEq, Eq)]
1803#[non_exhaustive]
1804pub struct ExecutionSnapshot {
1805 pub execution_id: ExecutionId,
1806 pub flow_id: Option<FlowId>,
1807 pub lane_id: LaneId,
1808 pub namespace: Namespace,
1809 pub public_state: PublicState,
1810 pub blocking_reason: Option<String>,
1814 pub blocking_detail: Option<String>,
1817 pub current_attempt: Option<AttemptSummary>,
1821 pub current_lease: Option<LeaseSummary>,
1824 pub current_waitpoint: Option<WaitpointId>,
1826 pub created_at: TimestampMs,
1827 pub last_mutation_at: TimestampMs,
1829 pub total_attempt_count: u32,
1830 pub tags: BTreeMap<String, String>,
1835}
1836
1837impl ExecutionSnapshot {
1838 #[allow(clippy::too_many_arguments)]
1843 pub fn new(
1844 execution_id: ExecutionId,
1845 flow_id: Option<FlowId>,
1846 lane_id: LaneId,
1847 namespace: Namespace,
1848 public_state: PublicState,
1849 blocking_reason: Option<String>,
1850 blocking_detail: Option<String>,
1851 current_attempt: Option<AttemptSummary>,
1852 current_lease: Option<LeaseSummary>,
1853 current_waitpoint: Option<WaitpointId>,
1854 created_at: TimestampMs,
1855 last_mutation_at: TimestampMs,
1856 total_attempt_count: u32,
1857 tags: BTreeMap<String, String>,
1858 ) -> Self {
1859 Self {
1860 execution_id,
1861 flow_id,
1862 lane_id,
1863 namespace,
1864 public_state,
1865 blocking_reason,
1866 blocking_detail,
1867 current_attempt,
1868 current_lease,
1869 current_waitpoint,
1870 created_at,
1871 last_mutation_at,
1872 total_attempt_count,
1873 tags,
1874 }
1875 }
1876}
1877
1878#[derive(Clone, Debug, PartialEq, Eq)]
1882#[non_exhaustive]
1883pub struct AttemptSummary {
1884 pub attempt_id: AttemptId,
1885 pub attempt_index: AttemptIndex,
1886}
1887
1888impl AttemptSummary {
1889 pub fn new(attempt_id: AttemptId, attempt_index: AttemptIndex) -> Self {
1893 Self {
1894 attempt_id,
1895 attempt_index,
1896 }
1897 }
1898}
1899
1900#[derive(Clone, Debug, PartialEq, Eq)]
1904#[non_exhaustive]
1905pub struct LeaseSummary {
1906 pub lease_epoch: LeaseEpoch,
1907 pub worker_instance_id: WorkerInstanceId,
1908 pub expires_at: TimestampMs,
1909}
1910
1911impl LeaseSummary {
1912 pub fn new(
1915 lease_epoch: LeaseEpoch,
1916 worker_instance_id: WorkerInstanceId,
1917 expires_at: TimestampMs,
1918 ) -> Self {
1919 Self {
1920 lease_epoch,
1921 worker_instance_id,
1922 expires_at,
1923 }
1924 }
1925}
1926
1927#[derive(Clone, Debug, PartialEq, Eq)]
1964#[non_exhaustive]
1965pub struct FlowSnapshot {
1966 pub flow_id: FlowId,
1967 pub flow_kind: String,
1970 pub namespace: Namespace,
1971 pub public_flow_state: String,
1974 pub graph_revision: u64,
1978 pub node_count: u32,
1980 pub edge_count: u32,
1982 pub created_at: TimestampMs,
1983 pub last_mutation_at: TimestampMs,
1986 pub cancelled_at: Option<TimestampMs>,
1991 pub cancel_reason: Option<String>,
1994 pub cancellation_policy: Option<String>,
1998 pub tags: BTreeMap<String, String>,
2001}
2002
2003impl FlowSnapshot {
2004 #[allow(clippy::too_many_arguments)]
2008 pub fn new(
2009 flow_id: FlowId,
2010 flow_kind: String,
2011 namespace: Namespace,
2012 public_flow_state: String,
2013 graph_revision: u64,
2014 node_count: u32,
2015 edge_count: u32,
2016 created_at: TimestampMs,
2017 last_mutation_at: TimestampMs,
2018 cancelled_at: Option<TimestampMs>,
2019 cancel_reason: Option<String>,
2020 cancellation_policy: Option<String>,
2021 tags: BTreeMap<String, String>,
2022 ) -> Self {
2023 Self {
2024 flow_id,
2025 flow_kind,
2026 namespace,
2027 public_flow_state,
2028 graph_revision,
2029 node_count,
2030 edge_count,
2031 created_at,
2032 last_mutation_at,
2033 cancelled_at,
2034 cancel_reason,
2035 cancellation_policy,
2036 tags,
2037 }
2038 }
2039}
2040
2041#[derive(Clone, Debug, PartialEq, Eq)]
2064#[non_exhaustive]
2065pub struct EdgeSnapshot {
2066 pub edge_id: EdgeId,
2067 pub flow_id: FlowId,
2068 pub upstream_execution_id: ExecutionId,
2069 pub downstream_execution_id: ExecutionId,
2070 pub dependency_kind: String,
2074 pub satisfaction_condition: String,
2077 pub data_passing_ref: Option<String>,
2080 pub edge_state: String,
2085 pub created_at: TimestampMs,
2086 pub created_by: String,
2088}
2089
2090impl EdgeSnapshot {
2091 #[allow(clippy::too_many_arguments)]
2095 pub fn new(
2096 edge_id: EdgeId,
2097 flow_id: FlowId,
2098 upstream_execution_id: ExecutionId,
2099 downstream_execution_id: ExecutionId,
2100 dependency_kind: String,
2101 satisfaction_condition: String,
2102 data_passing_ref: Option<String>,
2103 edge_state: String,
2104 created_at: TimestampMs,
2105 created_by: String,
2106 ) -> Self {
2107 Self {
2108 edge_id,
2109 flow_id,
2110 upstream_execution_id,
2111 downstream_execution_id,
2112 dependency_kind,
2113 satisfaction_condition,
2114 data_passing_ref,
2115 edge_state,
2116 created_at,
2117 created_by,
2118 }
2119 }
2120}
2121
2122#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
2124pub struct StateSummary {
2125 pub state_vector: StateVector,
2126 pub current_attempt_index: AttemptIndex,
2127}
2128
2129#[cfg(test)]
2130mod tests {
2131 use super::*;
2132 use crate::types::FlowId;
2133
2134 #[test]
2135 fn create_execution_args_serde() {
2136 let config = crate::partition::PartitionConfig::default();
2137 let args = CreateExecutionArgs {
2138 execution_id: ExecutionId::for_flow(&FlowId::new(), &config),
2139 namespace: Namespace::new("test"),
2140 lane_id: LaneId::new("default"),
2141 execution_kind: "llm_call".to_owned(),
2142 input_payload: b"hello".to_vec(),
2143 payload_encoding: Some("json".to_owned()),
2144 priority: 0,
2145 creator_identity: "test-user".to_owned(),
2146 idempotency_key: None,
2147 tags: HashMap::new(),
2148 policy: None,
2149 delay_until: None,
2150 execution_deadline_at: None,
2151 partition_id: 42,
2152 now: TimestampMs::now(),
2153 };
2154 let json = serde_json::to_string(&args).unwrap();
2155 let parsed: CreateExecutionArgs = serde_json::from_str(&json).unwrap();
2156 assert_eq!(args.execution_id, parsed.execution_id);
2157 }
2158
2159 #[test]
2160 fn claim_result_serde() {
2161 let config = crate::partition::PartitionConfig::default();
2162 let result = ClaimExecutionResult::Claimed(ClaimedExecution {
2163 execution_id: ExecutionId::for_flow(&FlowId::new(), &config),
2164 lease_id: LeaseId::new(),
2165 lease_epoch: LeaseEpoch::new(1),
2166 attempt_index: AttemptIndex::new(0),
2167 attempt_id: AttemptId::new(),
2168 attempt_type: AttemptType::Initial,
2169 lease_expires_at: TimestampMs::from_millis(1000),
2170 });
2171 let json = serde_json::to_string(&result).unwrap();
2172 let parsed: ClaimExecutionResult = serde_json::from_str(&json).unwrap();
2173 assert_eq!(result, parsed);
2174 }
2175
2176 #[test]
2179 fn stream_cursor_display_matches_wire_tokens() {
2180 assert_eq!(StreamCursor::Start.to_string(), "start");
2181 assert_eq!(StreamCursor::End.to_string(), "end");
2182 assert_eq!(StreamCursor::At("123".into()).to_string(), "123");
2183 assert_eq!(StreamCursor::At("123-4".into()).to_string(), "123-4");
2184 }
2185
2186 #[test]
2187 fn stream_cursor_to_wire_maps_to_valkey_markers() {
2188 assert_eq!(StreamCursor::Start.to_wire(), "-");
2189 assert_eq!(StreamCursor::End.to_wire(), "+");
2190 assert_eq!(StreamCursor::At("0-0".into()).to_wire(), "0-0");
2191 assert_eq!(StreamCursor::At("17-3".into()).to_wire(), "17-3");
2192 }
2193
2194 #[test]
2195 fn stream_cursor_from_str_accepts_wire_tokens() {
2196 use std::str::FromStr;
2197 assert_eq!(StreamCursor::from_str("start").unwrap(), StreamCursor::Start);
2198 assert_eq!(StreamCursor::from_str("end").unwrap(), StreamCursor::End);
2199 assert_eq!(
2200 StreamCursor::from_str("123").unwrap(),
2201 StreamCursor::At("123".into())
2202 );
2203 assert_eq!(
2204 StreamCursor::from_str("0-0").unwrap(),
2205 StreamCursor::At("0-0".into())
2206 );
2207 assert_eq!(
2208 StreamCursor::from_str("1713100800150-0").unwrap(),
2209 StreamCursor::At("1713100800150-0".into())
2210 );
2211 }
2212
2213 #[test]
2214 fn stream_cursor_from_str_rejects_bare_markers() {
2215 use std::str::FromStr;
2216 assert!(matches!(
2217 StreamCursor::from_str("-"),
2218 Err(StreamCursorParseError::BareMarkerRejected(s)) if s == "-"
2219 ));
2220 assert!(matches!(
2221 StreamCursor::from_str("+"),
2222 Err(StreamCursorParseError::BareMarkerRejected(s)) if s == "+"
2223 ));
2224 }
2225
2226 #[test]
2227 fn stream_cursor_from_str_rejects_empty() {
2228 use std::str::FromStr;
2229 assert_eq!(
2230 StreamCursor::from_str(""),
2231 Err(StreamCursorParseError::Empty)
2232 );
2233 }
2234
2235 #[test]
2236 fn stream_cursor_from_str_rejects_malformed() {
2237 use std::str::FromStr;
2238 for bad in ["abc", "-1", "1-", "-1-2", "1-2-3", "1.2", "1 2", "Start", "END"] {
2239 assert!(
2240 matches!(
2241 StreamCursor::from_str(bad),
2242 Err(StreamCursorParseError::Malformed(_))
2243 ),
2244 "must reject {bad:?}",
2245 );
2246 }
2247 }
2248
2249 #[test]
2250 fn stream_cursor_from_str_rejects_non_ascii() {
2251 use std::str::FromStr;
2252 assert!(matches!(
2253 StreamCursor::from_str("1\u{2013}2"),
2254 Err(StreamCursorParseError::Malformed(_))
2255 ));
2256 }
2257
2258 #[test]
2259 fn stream_cursor_serde_round_trip() {
2260 for c in [
2261 StreamCursor::Start,
2262 StreamCursor::End,
2263 StreamCursor::At("0-0".into()),
2264 StreamCursor::At("1713100800150-0".into()),
2265 ] {
2266 let json = serde_json::to_string(&c).unwrap();
2267 let back: StreamCursor = serde_json::from_str(&json).unwrap();
2268 assert_eq!(back, c);
2269 }
2270 }
2271
2272 #[test]
2273 fn stream_cursor_serializes_as_bare_string() {
2274 assert_eq!(serde_json::to_string(&StreamCursor::Start).unwrap(), r#""start""#);
2275 assert_eq!(serde_json::to_string(&StreamCursor::End).unwrap(), r#""end""#);
2276 assert_eq!(
2277 serde_json::to_string(&StreamCursor::At("123-0".into())).unwrap(),
2278 r#""123-0""#
2279 );
2280 }
2281
2282 #[test]
2283 fn stream_cursor_deserialize_rejects_bare_markers() {
2284 assert!(serde_json::from_str::<StreamCursor>(r#""-""#).is_err());
2285 assert!(serde_json::from_str::<StreamCursor>(r#""+""#).is_err());
2286 }
2287
2288 #[test]
2289 fn stream_cursor_from_beginning_is_zero_zero() {
2290 assert_eq!(
2291 StreamCursor::from_beginning(),
2292 StreamCursor::At("0-0".into())
2293 );
2294 }
2295
2296 #[test]
2297 fn stream_cursor_is_concrete_classifies_variants() {
2298 assert!(!StreamCursor::Start.is_concrete());
2299 assert!(!StreamCursor::End.is_concrete());
2300 assert!(StreamCursor::At("0-0".into()).is_concrete());
2301 assert!(StreamCursor::At("123-0".into()).is_concrete());
2302 assert!(StreamCursor::from_beginning().is_concrete());
2303 }
2304
2305 #[test]
2306 fn stream_cursor_into_wire_string_moves_without_cloning() {
2307 assert_eq!(StreamCursor::Start.into_wire_string(), "-");
2308 assert_eq!(StreamCursor::End.into_wire_string(), "+");
2309 assert_eq!(
2310 StreamCursor::At("17-3".into()).into_wire_string(),
2311 "17-3"
2312 );
2313 }
2314}
2315
2316#[derive(Clone, Debug, Serialize, Deserialize)]
2320pub struct ExecutionSummary {
2321 pub execution_id: ExecutionId,
2322 pub namespace: String,
2323 pub lane_id: String,
2324 pub execution_kind: String,
2325 pub public_state: String,
2326 pub priority: i32,
2327 pub created_at: String,
2328}
2329
2330#[derive(Clone, Debug, Serialize, Deserialize)]
2332pub struct ListExecutionsResult {
2333 pub executions: Vec<ExecutionSummary>,
2334 pub total_returned: usize,
2335}
2336
2337#[derive(Clone, Debug)]
2349pub struct RotateWaitpointHmacSecretArgs {
2350 pub new_kid: String,
2351 pub new_secret_hex: String,
2352 pub grace_ms: u64,
2355}
2356
2357#[derive(Clone, Debug, PartialEq, Eq)]
2359pub enum RotateWaitpointHmacSecretOutcome {
2360 Rotated {
2364 previous_kid: Option<String>,
2365 new_kid: String,
2366 gc_count: u32,
2367 },
2368 Noop { kid: String },
2371}
2372
2373#[derive(Clone, Debug, PartialEq, Eq)]
2376pub struct ListWaitpointHmacKidsArgs {}
2377
2378#[derive(Clone, Debug, PartialEq, Eq)]
2380pub struct WaitpointHmacKids {
2381 pub current_kid: Option<String>,
2383 pub verifying: Vec<VerifyingKid>,
2387}
2388
2389#[derive(Clone, Debug, PartialEq, Eq)]
2390pub struct VerifyingKid {
2391 pub kid: String,
2392 pub expires_at_ms: i64,
2393}