Skip to main content

vox_types/
retry_support.rs

1use bytes::Bytes;
2
3use crate::{Metadata, MetadataEntry, MetadataFlags, MetadataValue};
4
5pub const RETRY_SUPPORT_METADATA_KEY: &str = "vox-retry-support";
6pub const OPERATION_ID_METADATA_KEY: &str = "vox-operation-id";
7pub const CHANNEL_RETRY_MODE_METADATA_KEY: &str = "vox-channel-retry-mode";
8pub const RETRY_SUPPORT_VERSION: u64 = 1;
9
10#[derive(Clone, Copy, PartialEq, Eq, Debug)]
11pub enum ChannelRetryMode {
12    None = 0,
13    NonIdem = 1,
14    Idem = 2,
15}
16
17/// A unique operation identifier for exactly-once delivery.
18///
19/// Operation IDs are assigned by the client and carried in request metadata.
20/// They survive across disconnects — the operation store uses them to
21/// deduplicate and replay sealed responses after session resumption.
22#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Debug)]
23pub struct OperationId(pub u64);
24
25impl std::fmt::Display for OperationId {
26    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
27        write!(f, "{}", self.0)
28    }
29}
30
31/// Postcard-encoded bytes for a response payload (without schemas).
32///
33/// This is the format stored in the operation store. Schemas are stored
34/// separately, deduplicated by SchemaHash. Backed by `Bytes` so the buffer
35/// can be shared (cheap arc-clone) between the wire-send path and the
36/// retry store without copying.
37#[derive(Clone, Debug)]
38pub struct PostcardPayload(pub Bytes);
39
40impl PostcardPayload {
41    pub fn as_bytes(&self) -> &[u8] {
42        &self.0
43    }
44}
45
46impl From<Vec<u8>> for PostcardPayload {
47    fn from(v: Vec<u8>) -> Self {
48        Self(Bytes::from(v))
49    }
50}
51
52impl From<Bytes> for PostcardPayload {
53    fn from(b: Bytes) -> Self {
54        Self(b)
55    }
56}
57
58pub fn append_retry_support_metadata(metadata: &mut Metadata<'_>) {
59    if metadata_supports_retry(metadata) {
60        return;
61    }
62    metadata.push(MetadataEntry {
63        key: RETRY_SUPPORT_METADATA_KEY.into(),
64        value: MetadataValue::U64(RETRY_SUPPORT_VERSION),
65        flags: MetadataFlags::NONE,
66    });
67}
68
69pub fn metadata_supports_retry(metadata: &[MetadataEntry<'_>]) -> bool {
70    metadata.iter().any(|entry| {
71        entry.key == RETRY_SUPPORT_METADATA_KEY
72            && matches!(&entry.value, MetadataValue::U64(v) if *v == RETRY_SUPPORT_VERSION)
73    })
74}
75
76pub fn metadata_operation_id(metadata: &[MetadataEntry<'_>]) -> Option<OperationId> {
77    metadata.iter().find_map(|entry| {
78        if entry.key != OPERATION_ID_METADATA_KEY {
79            return None;
80        }
81        match &entry.value {
82            MetadataValue::U64(value) => Some(OperationId(*value)),
83            _ => None,
84        }
85    })
86}
87
88pub fn ensure_operation_id(metadata: &mut Metadata<'_>, operation_id: OperationId) {
89    if metadata_operation_id(metadata).is_some() {
90        return;
91    }
92    metadata.push(MetadataEntry {
93        key: OPERATION_ID_METADATA_KEY.into(),
94        value: MetadataValue::U64(operation_id.0),
95        flags: MetadataFlags::NONE,
96    });
97}
98
99pub fn metadata_channel_retry_mode(metadata: &[MetadataEntry<'_>]) -> ChannelRetryMode {
100    metadata
101        .iter()
102        .find_map(|entry| {
103            if entry.key != CHANNEL_RETRY_MODE_METADATA_KEY {
104                return None;
105            }
106            match &entry.value {
107                MetadataValue::U64(1) => Some(ChannelRetryMode::NonIdem),
108                MetadataValue::U64(2) => Some(ChannelRetryMode::Idem),
109                _ => Some(ChannelRetryMode::None),
110            }
111        })
112        .unwrap_or(ChannelRetryMode::None)
113}
114
115pub fn ensure_channel_retry_mode(metadata: &mut Metadata<'_>, mode: ChannelRetryMode) {
116    if matches!(mode, ChannelRetryMode::None) {
117        return;
118    }
119    if metadata
120        .iter()
121        .any(|entry| entry.key == CHANNEL_RETRY_MODE_METADATA_KEY)
122    {
123        return;
124    }
125    metadata.push(MetadataEntry {
126        key: CHANNEL_RETRY_MODE_METADATA_KEY.into(),
127        value: MetadataValue::U64(mode as u64),
128        flags: MetadataFlags::NONE,
129    });
130}