Skip to main content

force_pubsub/
types.rs

1//! Domain types for Salesforce Pub/Sub API events and responses.
2
3/// Opaque replay cursor. Consumers store and pass back; never interpret the bytes.
4#[derive(Debug, Clone, PartialEq, Eq)]
5pub struct ReplayId(pub(crate) Vec<u8>);
6
7impl ReplayId {
8    /// Construct from raw bytes (e.g., from a FetchResponse).
9    #[must_use]
10    #[allow(clippy::missing_const_for_fn)]
11    pub fn from_bytes(bytes: Vec<u8>) -> Self {
12        Self(bytes)
13    }
14
15    /// Return the raw bytes.
16    #[must_use]
17    pub fn as_bytes(&self) -> &[u8] {
18        &self.0
19    }
20
21    /// True if this replay ID has no bytes (represents "no replay").
22    #[must_use]
23    pub const fn is_empty(&self) -> bool {
24        self.0.is_empty()
25    }
26}
27
28/// A decoded event received from the subscribe stream.
29#[derive(Debug, Clone)]
30pub struct EventMessage<T> {
31    /// Decoded event payload.
32    pub payload: T,
33    /// Replay cursor for this event — store to resume from here.
34    pub replay_id: ReplayId,
35    /// Avro schema ID used to decode this event.
36    pub schema_id: String,
37    /// Unique event identifier.
38    pub event_id: String,
39}
40
41/// Items yielded by the subscribe stream.
42#[derive(Debug, Clone)]
43pub enum PubSubEvent<T> {
44    /// A decoded event.
45    Event(EventMessage<T>),
46    /// Stream was reconnected after a drop.
47    Reconnected {
48        /// The replay ID we resumed from.
49        replay_id: ReplayId,
50        /// Which reconnection attempt this was (1-indexed).
51        attempt: u32,
52    },
53    /// Salesforce heartbeat — no events this batch.
54    KeepAlive,
55}
56
57/// Per-event result from a publish operation.
58#[derive(Debug, Clone)]
59pub struct PublishResult {
60    /// Replay ID assigned by Salesforce (if successful).
61    pub replay_id: Option<ReplayId>,
62    /// Error message if this individual event failed.
63    pub error: Option<String>,
64}
65
66impl PublishResult {
67    /// Returns true if this event was published successfully.
68    #[must_use]
69    pub const fn is_success(&self) -> bool {
70        self.error.is_none()
71    }
72}
73
74/// Response from a publish operation.
75#[derive(Debug, Clone)]
76pub struct PublishResponse {
77    /// The topic name events were published to.
78    pub topic_name: String,
79    /// Per-event results (same order as the input events).
80    pub results: Vec<PublishResult>,
81}
82
83impl PublishResponse {
84    /// Returns true if all events were published successfully.
85    #[must_use]
86    pub fn all_succeeded(&self) -> bool {
87        self.results.iter().all(PublishResult::is_success)
88    }
89
90    /// Returns the number of failed events.
91    #[must_use]
92    pub fn failure_count(&self) -> usize {
93        self.results.iter().filter(|r| !r.is_success()).count()
94    }
95}
96
97#[cfg(test)]
98mod tests {
99    use super::*;
100
101    #[test]
102    fn test_replay_id_from_bytes_roundtrip() {
103        let bytes = vec![1u8, 2, 3, 4];
104        let id = ReplayId::from_bytes(bytes.clone());
105        assert_eq!(id.as_bytes(), bytes.as_slice());
106    }
107
108    #[test]
109    fn test_replay_id_empty() {
110        let id = ReplayId::from_bytes(vec![]);
111        assert!(id.is_empty());
112    }
113
114    #[test]
115    fn test_replay_id_not_empty() {
116        let id = ReplayId::from_bytes(vec![1, 2, 3]);
117        assert!(!id.is_empty());
118    }
119
120    #[test]
121    fn test_publish_result_success() {
122        let r = PublishResult {
123            replay_id: Some(ReplayId::from_bytes(vec![1])),
124            error: None,
125        };
126        assert!(r.is_success());
127    }
128
129    #[test]
130    fn test_publish_result_failure() {
131        let r = PublishResult {
132            replay_id: None,
133            error: Some("INVALID_TYPE".to_string()),
134        };
135        assert!(!r.is_success());
136    }
137
138    #[test]
139    fn test_publish_response_all_succeeded() {
140        let resp = PublishResponse {
141            topic_name: "/event/MyEvent__e".to_string(),
142            results: vec![
143                PublishResult {
144                    replay_id: Some(ReplayId::from_bytes(vec![1])),
145                    error: None,
146                },
147                PublishResult {
148                    replay_id: Some(ReplayId::from_bytes(vec![2])),
149                    error: None,
150                },
151            ],
152        };
153        assert!(resp.all_succeeded());
154        assert_eq!(resp.failure_count(), 0);
155    }
156
157    #[test]
158    fn test_publish_response_partial_failure() {
159        let resp = PublishResponse {
160            topic_name: "/event/MyEvent__e".to_string(),
161            results: vec![
162                PublishResult {
163                    replay_id: Some(ReplayId::from_bytes(vec![1])),
164                    error: None,
165                },
166                PublishResult {
167                    replay_id: None,
168                    error: Some("ERR".to_string()),
169                },
170            ],
171        };
172        assert!(!resp.all_succeeded());
173        assert_eq!(resp.failure_count(), 1);
174    }
175}