pgwire_replication/protocol/
replication.rs1use bytes::{Buf, Bytes};
2
3use crate::error::{PgWireError, Result};
4use crate::lsn::Lsn;
5
6#[derive(Debug, Clone, PartialEq, Eq)]
13pub enum ReplicationCopyData {
14 XLogData {
16 wal_start: Lsn,
18 wal_end: Lsn,
20 server_time_micros: i64,
22 data: Bytes,
24 },
25 KeepAlive {
27 wal_end: Lsn,
29 server_time_micros: i64,
31 reply_requested: bool,
33 },
34}
35
36impl ReplicationCopyData {
37 #[inline]
39 pub fn is_xlog_data(&self) -> bool {
40 matches!(self, ReplicationCopyData::XLogData { .. })
41 }
42
43 #[inline]
45 pub fn is_keepalive(&self) -> bool {
46 matches!(self, ReplicationCopyData::KeepAlive { .. })
47 }
48
49 #[inline]
51 pub fn requires_reply(&self) -> bool {
52 matches!(
53 self,
54 ReplicationCopyData::KeepAlive {
55 reply_requested: true,
56 ..
57 }
58 )
59 }
60}
61
62pub fn parse_copy_data(payload: Bytes) -> Result<ReplicationCopyData> {
67 if payload.is_empty() {
68 return Err(PgWireError::Protocol("empty CopyData payload".into()));
69 }
70
71 let mut b = payload;
72 let kind = b.get_u8();
73
74 match kind {
75 b'w' => {
76 if b.remaining() < 24 {
78 return Err(PgWireError::Protocol(format!(
79 "XLogData payload too short: {} bytes (need at least 24)",
80 b.remaining()
81 )));
82 }
83 let wal_start = Lsn(b.get_i64() as u64);
84 let wal_end = Lsn(b.get_i64() as u64);
85 let server_time_micros = b.get_i64();
86 let data = b.copy_to_bytes(b.remaining());
87
88 Ok(ReplicationCopyData::XLogData {
89 wal_start,
90 wal_end,
91 server_time_micros,
92 data,
93 })
94 }
95 b'k' => {
96 if b.remaining() < 17 {
98 return Err(PgWireError::Protocol(format!(
99 "KeepAlive payload too short: {} bytes (need 17)",
100 b.remaining()
101 )));
102 }
103 let wal_end = Lsn(b.get_i64() as u64);
104 let server_time_micros = b.get_i64();
105 let reply_requested = b.get_u8() != 0;
106
107 Ok(ReplicationCopyData::KeepAlive {
108 wal_end,
109 server_time_micros,
110 reply_requested,
111 })
112 }
113 _ => Err(PgWireError::Protocol(format!(
114 "unknown CopyData kind: 0x{kind:02x} ('{}')",
115 kind as char
116 ))),
117 }
118}
119
120pub fn encode_standby_status_update(
133 applied: Lsn,
134 client_time_micros: i64,
135 reply_requested: bool,
136) -> Vec<u8> {
137 let mut out = Vec::with_capacity(34);
139 out.push(b'r');
140
141 out.extend_from_slice(&(applied.0 as i64).to_be_bytes());
143 out.extend_from_slice(&(applied.0 as i64).to_be_bytes());
145 out.extend_from_slice(&(applied.0 as i64).to_be_bytes());
147 out.extend_from_slice(&client_time_micros.to_be_bytes());
149 out.push(if reply_requested { 1 } else { 0 });
151
152 out
153}
154
155pub const PG_EPOCH_MICROS: i64 = 946_684_800_000_000;
157
158#[inline]
160pub fn unix_to_pg_timestamp(unix_micros: i64) -> i64 {
161 unix_micros - PG_EPOCH_MICROS
162}
163
164#[inline]
166pub fn pg_to_unix_timestamp(pg_micros: i64) -> i64 {
167 pg_micros + PG_EPOCH_MICROS
168}
169
170#[cfg(test)]
171mod tests {
172 use super::*;
173
174 #[test]
177 fn parse_xlogdata_minimal() {
178 let mut v = Vec::new();
179 v.push(b'w');
180 v.extend_from_slice(&1i64.to_be_bytes()); v.extend_from_slice(&2i64.to_be_bytes()); v.extend_from_slice(&3i64.to_be_bytes()); let msg = parse_copy_data(Bytes::from(v)).unwrap();
186 match msg {
187 ReplicationCopyData::XLogData {
188 wal_start,
189 wal_end,
190 server_time_micros,
191 data,
192 } => {
193 assert_eq!(wal_start.0, 1);
194 assert_eq!(wal_end.0, 2);
195 assert_eq!(server_time_micros, 3);
196 assert!(data.is_empty());
197 }
198 _ => panic!("expected XLogData"),
199 }
200 }
201
202 #[test]
203 fn parse_xlogdata_with_payload() {
204 let mut v = Vec::new();
205 v.push(b'w');
206 v.extend_from_slice(&0x0123456789ABCDEFu64.to_be_bytes());
207 v.extend_from_slice(&0xFEDCBA9876543210u64.to_be_bytes());
208 v.extend_from_slice(&(-12345i64).to_be_bytes());
209 v.extend_from_slice(b"hello world pgoutput data");
210
211 let msg = parse_copy_data(Bytes::from(v)).unwrap();
212 match msg {
213 ReplicationCopyData::XLogData {
214 wal_start,
215 wal_end,
216 server_time_micros,
217 data,
218 } => {
219 assert_eq!(wal_start.0, 0x0123456789ABCDEF);
220 assert_eq!(wal_end.0, 0xFEDCBA9876543210);
221 assert_eq!(server_time_micros, -12345);
222 assert_eq!(&data[..], b"hello world pgoutput data");
223 }
224 _ => panic!("expected XLogData"),
225 }
226 }
227
228 #[test]
229 fn parse_xlogdata_too_short() {
230 let mut v = Vec::new();
231 v.push(b'w');
232 v.extend_from_slice(&[0u8; 23]); let err = parse_copy_data(Bytes::from(v)).unwrap_err();
235 assert!(err.to_string().contains("XLogData"));
236 assert!(err.to_string().contains("too short"));
237 }
238
239 #[test]
242 fn parse_keepalive_reply_requested() {
243 let mut v = Vec::new();
244 v.push(b'k');
245 v.extend_from_slice(&100i64.to_be_bytes()); v.extend_from_slice(&200i64.to_be_bytes()); v.push(1); let msg = parse_copy_data(Bytes::from(v)).unwrap();
250 match msg {
251 ReplicationCopyData::KeepAlive {
252 wal_end,
253 server_time_micros,
254 reply_requested,
255 } => {
256 assert_eq!(wal_end.0, 100);
257 assert_eq!(server_time_micros, 200);
258 assert!(reply_requested);
259 }
260 _ => panic!("expected KeepAlive"),
261 }
262 }
263
264 #[test]
265 fn parse_keepalive_no_reply() {
266 let mut v = Vec::new();
267 v.push(b'k');
268 v.extend_from_slice(&999i64.to_be_bytes());
269 v.extend_from_slice(&888i64.to_be_bytes());
270 v.push(0); let msg = parse_copy_data(Bytes::from(v)).unwrap();
273 match msg {
274 ReplicationCopyData::KeepAlive {
275 reply_requested, ..
276 } => {
277 assert!(!reply_requested);
278 }
279 _ => panic!("expected KeepAlive"),
280 }
281 }
282
283 #[test]
284 fn parse_keepalive_nonzero_reply_byte_is_true() {
285 let mut v = Vec::new();
287 v.push(b'k');
288 v.extend_from_slice(&0i64.to_be_bytes());
289 v.extend_from_slice(&0i64.to_be_bytes());
290 v.push(42); let msg = parse_copy_data(Bytes::from(v)).unwrap();
293 assert!(matches!(
294 msg,
295 ReplicationCopyData::KeepAlive {
296 reply_requested: true,
297 ..
298 }
299 ));
300 }
301
302 #[test]
303 fn parse_keepalive_too_short() {
304 let mut v = Vec::new();
305 v.push(b'k');
306 v.extend_from_slice(&[0u8; 16]); let err = parse_copy_data(Bytes::from(v)).unwrap_err();
309 assert!(err.to_string().contains("KeepAlive"));
310 assert!(err.to_string().contains("too short"));
311 }
312
313 #[test]
316 fn parse_empty_payload() {
317 let err = parse_copy_data(Bytes::new()).unwrap_err();
318 assert!(err.to_string().contains("empty"));
319 }
320
321 #[test]
322 fn parse_unknown_kind() {
323 let v = vec![b'X', 0, 0, 0]; let err = parse_copy_data(Bytes::from(v)).unwrap_err();
325 assert!(err.to_string().contains("unknown CopyData kind"));
326 assert!(err.to_string().contains("0x58")); }
328
329 #[test]
332 fn xlogdata_helper_methods() {
333 let msg = ReplicationCopyData::XLogData {
334 wal_start: Lsn(0),
335 wal_end: Lsn(0),
336 server_time_micros: 0,
337 data: Bytes::new(),
338 };
339 assert!(msg.is_xlog_data());
340 assert!(!msg.is_keepalive());
341 assert!(!msg.requires_reply());
342 }
343
344 #[test]
345 fn keepalive_helper_methods() {
346 let msg_reply = ReplicationCopyData::KeepAlive {
347 wal_end: Lsn(0),
348 server_time_micros: 0,
349 reply_requested: true,
350 };
351 assert!(!msg_reply.is_xlog_data());
352 assert!(msg_reply.is_keepalive());
353 assert!(msg_reply.requires_reply());
354
355 let msg_no_reply = ReplicationCopyData::KeepAlive {
356 wal_end: Lsn(0),
357 server_time_micros: 0,
358 reply_requested: false,
359 };
360 assert!(msg_no_reply.is_keepalive());
361 assert!(!msg_no_reply.requires_reply());
362 }
363
364 #[test]
367 fn encode_status_update_structure() {
368 let p = encode_standby_status_update(Lsn(0x123456789ABCDEF0), 987654321, false);
369
370 assert_eq!(p.len(), 34); assert_eq!(p[0], b'r');
372
373 let lsn_bytes = &0x123456789ABCDEF0u64.to_be_bytes();
375 assert_eq!(&p[1..9], lsn_bytes); assert_eq!(&p[9..17], lsn_bytes); assert_eq!(&p[17..25], lsn_bytes); assert_eq!(&p[25..33], &987654321i64.to_be_bytes());
381
382 assert_eq!(p[33], 0);
384 }
385
386 #[test]
387 fn encode_status_update_reply_requested() {
388 let p = encode_standby_status_update(Lsn(42), 0, true);
389 assert_eq!(p[33], 1);
390 }
391
392 #[test]
393 fn encode_status_update_zero_lsn() {
394 let p = encode_standby_status_update(Lsn(0), 0, false);
395 assert_eq!(&p[1..9], &[0u8; 8]);
396 assert_eq!(&p[9..17], &[0u8; 8]);
397 assert_eq!(&p[17..25], &[0u8; 8]);
398 }
399
400 #[test]
403 fn timestamp_conversion_roundtrip() {
404 let unix_micros = 1_704_067_200_000_000_i64; let pg_time = unix_to_pg_timestamp(unix_micros);
407 let back = pg_to_unix_timestamp(pg_time);
408
409 assert_eq!(back, unix_micros);
410 }
411
412 #[test]
413 fn pg_epoch_is_correct() {
414 let expected = 10957i64 * 24 * 60 * 60 * 1_000_000;
417 assert_eq!(PG_EPOCH_MICROS, expected);
418 }
419
420 #[test]
421 fn unix_to_pg_at_epoch() {
422 assert_eq!(unix_to_pg_timestamp(PG_EPOCH_MICROS), 0);
424 }
425
426 #[test]
427 fn pg_to_unix_at_zero() {
428 assert_eq!(pg_to_unix_timestamp(0), PG_EPOCH_MICROS);
430 }
431}