vox_types/
retry_support.rs1use bytes::Bytes;
2
3use crate::{Metadata, MetadataEntry, MetadataFlags, MetadataValue};
4
5pub const RETRY_SUPPORT_METADATA_KEY: &str = "vox-retry-support";
6pub const OPERATION_ID_METADATA_KEY: &str = "vox-operation-id";
7pub const CHANNEL_RETRY_MODE_METADATA_KEY: &str = "vox-channel-retry-mode";
8pub const RETRY_SUPPORT_VERSION: u64 = 1;
9
10#[derive(Clone, Copy, PartialEq, Eq, Debug)]
11pub enum ChannelRetryMode {
12 None = 0,
13 NonIdem = 1,
14 Idem = 2,
15}
16
17#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Debug)]
23pub struct OperationId(pub u64);
24
25impl std::fmt::Display for OperationId {
26 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
27 write!(f, "{}", self.0)
28 }
29}
30
31#[derive(Clone, Debug)]
38pub struct PostcardPayload(pub Bytes);
39
40impl PostcardPayload {
41 pub fn as_bytes(&self) -> &[u8] {
42 &self.0
43 }
44}
45
46impl From<Vec<u8>> for PostcardPayload {
47 fn from(v: Vec<u8>) -> Self {
48 Self(Bytes::from(v))
49 }
50}
51
52impl From<Bytes> for PostcardPayload {
53 fn from(b: Bytes) -> Self {
54 Self(b)
55 }
56}
57
58pub fn append_retry_support_metadata(metadata: &mut Metadata<'_>) {
59 if metadata_supports_retry(metadata) {
60 return;
61 }
62 metadata.push(MetadataEntry {
63 key: RETRY_SUPPORT_METADATA_KEY.into(),
64 value: MetadataValue::U64(RETRY_SUPPORT_VERSION),
65 flags: MetadataFlags::NONE,
66 });
67}
68
69pub fn metadata_supports_retry(metadata: &[MetadataEntry<'_>]) -> bool {
70 metadata.iter().any(|entry| {
71 entry.key == RETRY_SUPPORT_METADATA_KEY
72 && matches!(&entry.value, MetadataValue::U64(v) if *v == RETRY_SUPPORT_VERSION)
73 })
74}
75
76pub fn metadata_operation_id(metadata: &[MetadataEntry<'_>]) -> Option<OperationId> {
77 metadata.iter().find_map(|entry| {
78 if entry.key != OPERATION_ID_METADATA_KEY {
79 return None;
80 }
81 match &entry.value {
82 MetadataValue::U64(value) => Some(OperationId(*value)),
83 _ => None,
84 }
85 })
86}
87
88pub fn ensure_operation_id(metadata: &mut Metadata<'_>, operation_id: OperationId) {
89 if metadata_operation_id(metadata).is_some() {
90 return;
91 }
92 metadata.push(MetadataEntry {
93 key: OPERATION_ID_METADATA_KEY.into(),
94 value: MetadataValue::U64(operation_id.0),
95 flags: MetadataFlags::NONE,
96 });
97}
98
99pub fn metadata_channel_retry_mode(metadata: &[MetadataEntry<'_>]) -> ChannelRetryMode {
100 metadata
101 .iter()
102 .find_map(|entry| {
103 if entry.key != CHANNEL_RETRY_MODE_METADATA_KEY {
104 return None;
105 }
106 match &entry.value {
107 MetadataValue::U64(1) => Some(ChannelRetryMode::NonIdem),
108 MetadataValue::U64(2) => Some(ChannelRetryMode::Idem),
109 _ => Some(ChannelRetryMode::None),
110 }
111 })
112 .unwrap_or(ChannelRetryMode::None)
113}
114
115pub fn ensure_channel_retry_mode(metadata: &mut Metadata<'_>, mode: ChannelRetryMode) {
116 if matches!(mode, ChannelRetryMode::None) {
117 return;
118 }
119 if metadata
120 .iter()
121 .any(|entry| entry.key == CHANNEL_RETRY_MODE_METADATA_KEY)
122 {
123 return;
124 }
125 metadata.push(MetadataEntry {
126 key: CHANNEL_RETRY_MODE_METADATA_KEY.into(),
127 value: MetadataValue::U64(mode as u64),
128 flags: MetadataFlags::NONE,
129 });
130}