pgwire_replication/protocol/
replication.rs

1use bytes::{Buf, Bytes};
2
3use crate::error::{PgWireError, Result};
4use crate::lsn::Lsn;
5
6/// Replication protocol CopyData message types.
7///
8/// During logical replication streaming, PostgreSQL sends data wrapped in CopyData
9/// messages. This enum represents the two primary message types:
10/// - `XLogData`: Contains actual WAL data (transaction changes)
11/// - `KeepAlive`: Server heartbeat, optionally requesting client response
12#[derive(Debug, Clone, PartialEq, Eq)]
13pub enum ReplicationCopyData {
14    /// WAL data message containing transaction changes.
15    XLogData {
16        /// WAL position where this data starts
17        wal_start: Lsn,
18        /// Current WAL end position on server (may be 0 for in-transaction messages)
19        wal_end: Lsn,
20        /// Server timestamp in microseconds since 2000-01-01
21        server_time_micros: i64,
22        /// The actual pgoutput/wal2json/etc. payload
23        data: Bytes,
24    },
25    /// Server heartbeat message.
26    KeepAlive {
27        /// Current WAL end position on server
28        wal_end: Lsn,
29        /// Server timestamp in microseconds since 2000-01-01
30        server_time_micros: i64,
31        /// If true, server expects StandbyStatusUpdate reply
32        reply_requested: bool,
33    },
34}
35
36impl ReplicationCopyData {
37    /// Returns true if this is an XLogData message
38    #[inline]
39    pub fn is_xlog_data(&self) -> bool {
40        matches!(self, ReplicationCopyData::XLogData { .. })
41    }
42
43    /// Returns true if this is a KeepAlive message
44    #[inline]
45    pub fn is_keepalive(&self) -> bool {
46        matches!(self, ReplicationCopyData::KeepAlive { .. })
47    }
48
49    /// Returns true if this is a KeepAlive that requests a reply
50    #[inline]
51    pub fn requires_reply(&self) -> bool {
52        matches!(
53            self,
54            ReplicationCopyData::KeepAlive {
55                reply_requested: true,
56                ..
57            }
58        )
59    }
60}
61
62/// Parse a CopyData payload into a replication message.
63///
64/// The payload should be the CopyData content (after stripping the 'd' tag and length).
65/// Returns either `XLogData` or `KeepAlive` depending on the first byte.
66pub fn parse_copy_data(payload: Bytes) -> Result<ReplicationCopyData> {
67    if payload.is_empty() {
68        return Err(PgWireError::Protocol("empty CopyData payload".into()));
69    }
70
71    let mut b = payload;
72    let kind = b.get_u8();
73
74    match kind {
75        b'w' => {
76            // XLogData: wal_start(8) + wal_end(8) + server_time(8) + data(variable)
77            if b.remaining() < 24 {
78                return Err(PgWireError::Protocol(format!(
79                    "XLogData payload too short: {} bytes (need at least 24)",
80                    b.remaining()
81                )));
82            }
83            let wal_start = Lsn(b.get_i64() as u64);
84            let wal_end = Lsn(b.get_i64() as u64);
85            let server_time_micros = b.get_i64();
86            let data = b.copy_to_bytes(b.remaining());
87
88            Ok(ReplicationCopyData::XLogData {
89                wal_start,
90                wal_end,
91                server_time_micros,
92                data,
93            })
94        }
95        b'k' => {
96            // KeepAlive: wal_end(8) + server_time(8) + reply_requested(1)
97            if b.remaining() < 17 {
98                return Err(PgWireError::Protocol(format!(
99                    "KeepAlive payload too short: {} bytes (need 17)",
100                    b.remaining()
101                )));
102            }
103            let wal_end = Lsn(b.get_i64() as u64);
104            let server_time_micros = b.get_i64();
105            let reply_requested = b.get_u8() != 0;
106
107            Ok(ReplicationCopyData::KeepAlive {
108                wal_end,
109                server_time_micros,
110                reply_requested,
111            })
112        }
113        _ => Err(PgWireError::Protocol(format!(
114            "unknown CopyData kind: 0x{kind:02x} ('{}')",
115            kind as char
116        ))),
117    }
118}
119
120/// Encode a StandbyStatusUpdate message.
121///
122/// This message reports the client's replay position to the server.
123/// All three LSN fields (write, flush, apply) are set to the same value.
124///
125/// # Arguments
126/// * `applied` - The LSN up to which the client has processed data
127/// * `client_time_micros` - Client timestamp in microseconds since 2000-01-01
128/// * `reply_requested` - If true, server should send a reply (usually false)
129///
130/// # Returns
131/// Raw bytes suitable for sending via CopyData
132pub fn encode_standby_status_update(
133    applied: Lsn,
134    client_time_micros: i64,
135    reply_requested: bool,
136) -> Vec<u8> {
137    // Format: 'r' + write(8) + flush(8) + apply(8) + client_time(8) + reply(1) = 34 bytes
138    let mut out = Vec::with_capacity(34);
139    out.push(b'r');
140
141    // Write position - last WAL position written to disk
142    out.extend_from_slice(&(applied.0 as i64).to_be_bytes());
143    // Flush position - last WAL position flushed to disk
144    out.extend_from_slice(&(applied.0 as i64).to_be_bytes());
145    // Apply position - last WAL position applied to standby
146    out.extend_from_slice(&(applied.0 as i64).to_be_bytes());
147    // Client system clock
148    out.extend_from_slice(&client_time_micros.to_be_bytes());
149    // Reply requested
150    out.push(if reply_requested { 1 } else { 0 });
151
152    out
153}
154
155/// PostgreSQL epoch (2000-01-01) in microseconds since Unix epoch.
156pub const PG_EPOCH_MICROS: i64 = 946_684_800_000_000;
157
158/// Convert Unix timestamp (micros) to PostgreSQL timestamp (micros since 2000-01-01).
159#[inline]
160pub fn unix_to_pg_timestamp(unix_micros: i64) -> i64 {
161    unix_micros - PG_EPOCH_MICROS
162}
163
164/// Convert PostgreSQL timestamp to Unix timestamp (micros).
165#[inline]
166pub fn pg_to_unix_timestamp(pg_micros: i64) -> i64 {
167    pg_micros + PG_EPOCH_MICROS
168}
169
170#[cfg(test)]
171mod tests {
172    use super::*;
173
174    // ==================== XLogData tests ====================
175
176    #[test]
177    fn parse_xlogdata_minimal() {
178        let mut v = Vec::new();
179        v.push(b'w');
180        v.extend_from_slice(&1i64.to_be_bytes()); // wal_start
181        v.extend_from_slice(&2i64.to_be_bytes()); // wal_end
182        v.extend_from_slice(&3i64.to_be_bytes()); // server_time
183                                                  // no data payload
184
185        let msg = parse_copy_data(Bytes::from(v)).unwrap();
186        match msg {
187            ReplicationCopyData::XLogData {
188                wal_start,
189                wal_end,
190                server_time_micros,
191                data,
192            } => {
193                assert_eq!(wal_start.0, 1);
194                assert_eq!(wal_end.0, 2);
195                assert_eq!(server_time_micros, 3);
196                assert!(data.is_empty());
197            }
198            _ => panic!("expected XLogData"),
199        }
200    }
201
202    #[test]
203    fn parse_xlogdata_with_payload() {
204        let mut v = Vec::new();
205        v.push(b'w');
206        v.extend_from_slice(&0x0123456789ABCDEFu64.to_be_bytes());
207        v.extend_from_slice(&0xFEDCBA9876543210u64.to_be_bytes());
208        v.extend_from_slice(&(-12345i64).to_be_bytes());
209        v.extend_from_slice(b"hello world pgoutput data");
210
211        let msg = parse_copy_data(Bytes::from(v)).unwrap();
212        match msg {
213            ReplicationCopyData::XLogData {
214                wal_start,
215                wal_end,
216                server_time_micros,
217                data,
218            } => {
219                assert_eq!(wal_start.0, 0x0123456789ABCDEF);
220                assert_eq!(wal_end.0, 0xFEDCBA9876543210);
221                assert_eq!(server_time_micros, -12345);
222                assert_eq!(&data[..], b"hello world pgoutput data");
223            }
224            _ => panic!("expected XLogData"),
225        }
226    }
227
228    #[test]
229    fn parse_xlogdata_too_short() {
230        let mut v = Vec::new();
231        v.push(b'w');
232        v.extend_from_slice(&[0u8; 23]); // only 23 bytes, need 24
233
234        let err = parse_copy_data(Bytes::from(v)).unwrap_err();
235        assert!(err.to_string().contains("XLogData"));
236        assert!(err.to_string().contains("too short"));
237    }
238
239    // ==================== KeepAlive tests ====================
240
241    #[test]
242    fn parse_keepalive_reply_requested() {
243        let mut v = Vec::new();
244        v.push(b'k');
245        v.extend_from_slice(&100i64.to_be_bytes()); // wal_end
246        v.extend_from_slice(&200i64.to_be_bytes()); // server_time
247        v.push(1); // reply_requested = true
248
249        let msg = parse_copy_data(Bytes::from(v)).unwrap();
250        match msg {
251            ReplicationCopyData::KeepAlive {
252                wal_end,
253                server_time_micros,
254                reply_requested,
255            } => {
256                assert_eq!(wal_end.0, 100);
257                assert_eq!(server_time_micros, 200);
258                assert!(reply_requested);
259            }
260            _ => panic!("expected KeepAlive"),
261        }
262    }
263
264    #[test]
265    fn parse_keepalive_no_reply() {
266        let mut v = Vec::new();
267        v.push(b'k');
268        v.extend_from_slice(&999i64.to_be_bytes());
269        v.extend_from_slice(&888i64.to_be_bytes());
270        v.push(0); // reply_requested = false
271
272        let msg = parse_copy_data(Bytes::from(v)).unwrap();
273        match msg {
274            ReplicationCopyData::KeepAlive {
275                reply_requested, ..
276            } => {
277                assert!(!reply_requested);
278            }
279            _ => panic!("expected KeepAlive"),
280        }
281    }
282
283    #[test]
284    fn parse_keepalive_nonzero_reply_byte_is_true() {
285        // Any non-zero byte should be treated as true
286        let mut v = Vec::new();
287        v.push(b'k');
288        v.extend_from_slice(&0i64.to_be_bytes());
289        v.extend_from_slice(&0i64.to_be_bytes());
290        v.push(42); // non-zero = true
291
292        let msg = parse_copy_data(Bytes::from(v)).unwrap();
293        assert!(matches!(
294            msg,
295            ReplicationCopyData::KeepAlive {
296                reply_requested: true,
297                ..
298            }
299        ));
300    }
301
302    #[test]
303    fn parse_keepalive_too_short() {
304        let mut v = Vec::new();
305        v.push(b'k');
306        v.extend_from_slice(&[0u8; 16]); // only 16 bytes, need 17
307
308        let err = parse_copy_data(Bytes::from(v)).unwrap_err();
309        assert!(err.to_string().contains("KeepAlive"));
310        assert!(err.to_string().contains("too short"));
311    }
312
313    // ==================== Error cases ====================
314
315    #[test]
316    fn parse_empty_payload() {
317        let err = parse_copy_data(Bytes::new()).unwrap_err();
318        assert!(err.to_string().contains("empty"));
319    }
320
321    #[test]
322    fn parse_unknown_kind() {
323        let v = vec![b'X', 0, 0, 0]; // unknown kind 'X'
324        let err = parse_copy_data(Bytes::from(v)).unwrap_err();
325        assert!(err.to_string().contains("unknown CopyData kind"));
326        assert!(err.to_string().contains("0x58")); // 'X' in hex
327    }
328
329    // ==================== Helper method tests ====================
330
331    #[test]
332    fn xlogdata_helper_methods() {
333        let msg = ReplicationCopyData::XLogData {
334            wal_start: Lsn(0),
335            wal_end: Lsn(0),
336            server_time_micros: 0,
337            data: Bytes::new(),
338        };
339        assert!(msg.is_xlog_data());
340        assert!(!msg.is_keepalive());
341        assert!(!msg.requires_reply());
342    }
343
344    #[test]
345    fn keepalive_helper_methods() {
346        let msg_reply = ReplicationCopyData::KeepAlive {
347            wal_end: Lsn(0),
348            server_time_micros: 0,
349            reply_requested: true,
350        };
351        assert!(!msg_reply.is_xlog_data());
352        assert!(msg_reply.is_keepalive());
353        assert!(msg_reply.requires_reply());
354
355        let msg_no_reply = ReplicationCopyData::KeepAlive {
356            wal_end: Lsn(0),
357            server_time_micros: 0,
358            reply_requested: false,
359        };
360        assert!(msg_no_reply.is_keepalive());
361        assert!(!msg_no_reply.requires_reply());
362    }
363
364    // ==================== StandbyStatusUpdate tests ====================
365
366    #[test]
367    fn encode_status_update_structure() {
368        let p = encode_standby_status_update(Lsn(0x123456789ABCDEF0), 987654321, false);
369
370        assert_eq!(p.len(), 34); // 1 + 8*4 + 1
371        assert_eq!(p[0], b'r');
372
373        // All three LSN fields should be the same
374        let lsn_bytes = &0x123456789ABCDEF0u64.to_be_bytes();
375        assert_eq!(&p[1..9], lsn_bytes); // write
376        assert_eq!(&p[9..17], lsn_bytes); // flush
377        assert_eq!(&p[17..25], lsn_bytes); // apply
378
379        // Client time
380        assert_eq!(&p[25..33], &987654321i64.to_be_bytes());
381
382        // Reply requested = false
383        assert_eq!(p[33], 0);
384    }
385
386    #[test]
387    fn encode_status_update_reply_requested() {
388        let p = encode_standby_status_update(Lsn(42), 0, true);
389        assert_eq!(p[33], 1);
390    }
391
392    #[test]
393    fn encode_status_update_zero_lsn() {
394        let p = encode_standby_status_update(Lsn(0), 0, false);
395        assert_eq!(&p[1..9], &[0u8; 8]);
396        assert_eq!(&p[9..17], &[0u8; 8]);
397        assert_eq!(&p[17..25], &[0u8; 8]);
398    }
399
400    // ==================== Timestamp conversion tests ====================
401
402    #[test]
403    fn timestamp_conversion_roundtrip() {
404        let unix_micros = 1_704_067_200_000_000_i64; // 2024-01-01 00:00:00 UTC
405
406        let pg_time = unix_to_pg_timestamp(unix_micros);
407        let back = pg_to_unix_timestamp(pg_time);
408
409        assert_eq!(back, unix_micros);
410    }
411
412    #[test]
413    fn pg_epoch_is_correct() {
414        // 2000-01-01 00:00:00 UTC in Unix microseconds
415        // Days from 1970-01-01 to 2000-01-01 = 10957 days
416        let expected = 10957i64 * 24 * 60 * 60 * 1_000_000;
417        assert_eq!(PG_EPOCH_MICROS, expected);
418    }
419
420    #[test]
421    fn unix_to_pg_at_epoch() {
422        // At PG epoch, pg timestamp should be 0
423        assert_eq!(unix_to_pg_timestamp(PG_EPOCH_MICROS), 0);
424    }
425
426    #[test]
427    fn pg_to_unix_at_zero() {
428        // PG time 0 = Unix PG_EPOCH_MICROS
429        assert_eq!(pg_to_unix_timestamp(0), PG_EPOCH_MICROS);
430    }
431}