1#[derive(Debug, Clone, PartialEq, Eq)]
5pub struct ReplayId(pub(crate) Vec<u8>);
6
7impl ReplayId {
8 #[must_use]
10 #[allow(clippy::missing_const_for_fn)]
11 pub fn from_bytes(bytes: Vec<u8>) -> Self {
12 Self(bytes)
13 }
14
15 #[must_use]
17 pub fn as_bytes(&self) -> &[u8] {
18 &self.0
19 }
20
21 #[must_use]
23 pub const fn is_empty(&self) -> bool {
24 self.0.is_empty()
25 }
26}
27
28#[derive(Debug, Clone)]
30pub struct EventMessage<T> {
31 pub payload: T,
33 pub replay_id: ReplayId,
35 pub schema_id: String,
37 pub event_id: String,
39}
40
41#[derive(Debug, Clone)]
43pub enum PubSubEvent<T> {
44 Event(EventMessage<T>),
46 Reconnected {
48 replay_id: ReplayId,
50 attempt: u32,
52 },
53 KeepAlive,
55}
56
57#[derive(Debug, Clone)]
59pub struct PublishResult {
60 pub replay_id: Option<ReplayId>,
62 pub error: Option<String>,
64}
65
66impl PublishResult {
67 #[must_use]
69 pub const fn is_success(&self) -> bool {
70 self.error.is_none()
71 }
72}
73
74#[derive(Debug, Clone)]
76pub struct PublishResponse {
77 pub topic_name: String,
79 pub results: Vec<PublishResult>,
81}
82
83impl PublishResponse {
84 #[must_use]
86 pub fn all_succeeded(&self) -> bool {
87 self.results.iter().all(PublishResult::is_success)
88 }
89
90 #[must_use]
92 pub fn failure_count(&self) -> usize {
93 self.results.iter().filter(|r| !r.is_success()).count()
94 }
95}
96
97#[cfg(test)]
98mod tests {
99 use super::*;
100
101 #[test]
102 fn test_replay_id_from_bytes_roundtrip() {
103 let bytes = vec![1u8, 2, 3, 4];
104 let id = ReplayId::from_bytes(bytes.clone());
105 assert_eq!(id.as_bytes(), bytes.as_slice());
106 }
107
108 #[test]
109 fn test_replay_id_empty() {
110 let id = ReplayId::from_bytes(vec![]);
111 assert!(id.is_empty());
112 }
113
114 #[test]
115 fn test_replay_id_not_empty() {
116 let id = ReplayId::from_bytes(vec![1, 2, 3]);
117 assert!(!id.is_empty());
118 }
119
120 #[test]
121 fn test_publish_result_success() {
122 let r = PublishResult {
123 replay_id: Some(ReplayId::from_bytes(vec![1])),
124 error: None,
125 };
126 assert!(r.is_success());
127 }
128
129 #[test]
130 fn test_publish_result_failure() {
131 let r = PublishResult {
132 replay_id: None,
133 error: Some("INVALID_TYPE".to_string()),
134 };
135 assert!(!r.is_success());
136 }
137
138 #[test]
139 fn test_publish_response_all_succeeded() {
140 let resp = PublishResponse {
141 topic_name: "/event/MyEvent__e".to_string(),
142 results: vec![
143 PublishResult {
144 replay_id: Some(ReplayId::from_bytes(vec![1])),
145 error: None,
146 },
147 PublishResult {
148 replay_id: Some(ReplayId::from_bytes(vec![2])),
149 error: None,
150 },
151 ],
152 };
153 assert!(resp.all_succeeded());
154 assert_eq!(resp.failure_count(), 0);
155 }
156
157 #[test]
158 fn test_publish_response_partial_failure() {
159 let resp = PublishResponse {
160 topic_name: "/event/MyEvent__e".to_string(),
161 results: vec![
162 PublishResult {
163 replay_id: Some(ReplayId::from_bytes(vec![1])),
164 error: None,
165 },
166 PublishResult {
167 replay_id: None,
168 error: Some("ERR".to_string()),
169 },
170 ],
171 };
172 assert!(!resp.all_succeeded());
173 assert_eq!(resp.failure_count(), 1);
174 }
175}