1use anyhow::{Context, Result};
5use tokio_postgres::{Client, Row};
6
7const WRAPAROUND_THRESHOLD: u32 = 2_000_000_000;
11
12#[derive(Debug, Clone, Copy, PartialEq, Eq)]
14pub enum WraparoundCheck {
15 Normal,
17 WraparoundDetected,
19}
20
21pub fn detect_wraparound(old_xmin: u32, current_xmin: u32) -> WraparoundCheck {
36 if old_xmin > current_xmin && (old_xmin - current_xmin) > WRAPAROUND_THRESHOLD {
38 tracing::warn!(
39 "xmin wraparound detected: old_xmin={}, current_xmin={}, delta={}",
40 old_xmin,
41 current_xmin,
42 old_xmin - current_xmin
43 );
44 WraparoundCheck::WraparoundDetected
45 } else {
46 WraparoundCheck::Normal
47 }
48}
49
50fn is_valid_ctid(s: &str) -> bool {
58 let s = s.trim();
59 if !s.starts_with('(') || !s.ends_with(')') {
60 return false;
61 }
62 let inner = &s[1..s.len() - 1];
63 let parts: Vec<&str> = inner.split(',').collect();
64 if parts.len() != 2 {
65 return false;
66 }
67 parts[0].trim().parse::<u64>().is_ok() && parts[1].trim().parse::<u32>().is_ok()
69}
70
71pub struct XminReader<'a> {
80 client: &'a Client,
81}
82
83impl<'a> XminReader<'a> {
84 pub fn new(client: &'a Client) -> Self {
86 Self { client }
87 }
88
89 pub fn client(&self) -> &Client {
91 self.client
92 }
93
94 pub async fn get_current_xmin(&self) -> Result<u32> {
98 let row = self
99 .client
100 .query_one("SELECT txid_current()::text::bigint", &[])
101 .await
102 .context("Failed to get current transaction ID")?;
103
104 let txid: i64 = row.get(0);
105 Ok((txid & 0xFFFFFFFF) as u32)
108 }
109
110 pub async fn read_changes(
123 &self,
124 schema: &str,
125 table: &str,
126 columns: &[String],
127 since_xmin: u32,
128 ) -> Result<(Vec<Row>, u32)> {
129 let column_list = if columns.is_empty() {
130 "*".to_string()
131 } else {
132 columns
133 .iter()
134 .map(|c| format!("\"{}\"", c))
135 .collect::<Vec<_>>()
136 .join(", ")
137 };
138
139 let query = format!(
142 "SELECT {}, xmin::text::bigint as _xmin FROM \"{}\".\"{}\" WHERE xmin::text::bigint > $1 ORDER BY xmin::text::bigint",
143 column_list, schema, table
144 );
145
146 let rows = self
147 .client
148 .query(&query, &[&(since_xmin as i64)])
149 .await
150 .with_context(|| format!("Failed to read changes from {}.{}", schema, table))?;
151
152 let max_xmin = rows
154 .iter()
155 .map(|row| {
156 let xmin: i64 = row.get("_xmin");
157 (xmin & 0xFFFFFFFF) as u32
158 })
159 .max()
160 .unwrap_or(since_xmin);
161
162 Ok((rows, max_xmin))
163 }
164
165 pub async fn read_changes_batched(
179 &self,
180 schema: &str,
181 table: &str,
182 columns: &[String],
183 since_xmin: u32,
184 batch_size: usize,
185 ) -> Result<BatchReader> {
186 Ok(BatchReader {
187 schema: schema.to_string(),
188 table: table.to_string(),
189 columns: columns.to_vec(),
190 current_xmin: since_xmin,
191 last_ctid: None,
192 batch_size,
193 exhausted: false,
194 })
195 }
196
197 pub async fn fetch_batch(
203 &self,
204 batch_reader: &mut BatchReader,
205 ) -> Result<Option<(Vec<Row>, u32)>> {
206 if batch_reader.exhausted {
207 return Ok(None);
208 }
209
210 let column_list = if batch_reader.columns.is_empty() {
211 "*".to_string()
212 } else {
213 batch_reader
214 .columns
215 .iter()
216 .map(|c| format!("\"{}\"", c))
217 .collect::<Vec<_>>()
218 .join(", ")
219 };
220
221 let (query, rows) = if let Some(ref last_ctid) = batch_reader.last_ctid {
224 if !is_valid_ctid(last_ctid) {
227 anyhow::bail!("Invalid ctid format: {}", last_ctid);
228 }
229
230 let query = format!(
233 "SELECT {}, xmin::text::bigint as _xmin, ctid::text as _ctid \
234 FROM \"{}\".\"{}\" \
235 WHERE (xmin::text::bigint, ctid) > ($1, '{}'::tid) \
236 ORDER BY xmin::text::bigint, ctid \
237 LIMIT $2",
238 column_list, batch_reader.schema, batch_reader.table, last_ctid
239 );
240
241 let rows = self
242 .client
243 .query(
244 &query,
245 &[
246 &(batch_reader.current_xmin as i64),
247 &(batch_reader.batch_size as i64),
248 ],
249 )
250 .await
251 .with_context(|| {
252 format!(
253 "Failed to read batch from {}.{}",
254 batch_reader.schema, batch_reader.table
255 )
256 })?;
257 (query, rows)
258 } else {
259 let query = format!(
261 "SELECT {}, xmin::text::bigint as _xmin, ctid::text as _ctid \
262 FROM \"{}\".\"{}\" \
263 WHERE xmin::text::bigint > $1 \
264 ORDER BY xmin::text::bigint, ctid \
265 LIMIT $2",
266 column_list, batch_reader.schema, batch_reader.table
267 );
268
269 let rows = self
270 .client
271 .query(
272 &query,
273 &[
274 &(batch_reader.current_xmin as i64),
275 &(batch_reader.batch_size as i64),
276 ],
277 )
278 .await
279 .with_context(|| {
280 format!(
281 "Failed to read batch from {}.{}",
282 batch_reader.schema, batch_reader.table
283 )
284 })?;
285 (query, rows)
286 };
287
288 let _ = query;
290
291 if rows.is_empty() {
292 batch_reader.exhausted = true;
293 return Ok(None);
294 }
295
296 let last_row = rows.last().unwrap();
298 let last_xmin: i64 = last_row.get("_xmin");
299 let last_ctid: String = last_row.get("_ctid");
300
301 let max_xmin = (last_xmin & 0xFFFFFFFF) as u32;
302
303 if rows.len() < batch_reader.batch_size {
305 batch_reader.exhausted = true;
306 }
307
308 batch_reader.current_xmin = max_xmin;
309 batch_reader.last_ctid = Some(last_ctid);
310
311 Ok(Some((rows, max_xmin)))
312 }
313
314 pub async fn estimate_changes(
318 &self,
319 schema: &str,
320 table: &str,
321 since_xmin: u32,
322 ) -> Result<i64> {
323 let query = format!(
324 "SELECT COUNT(*) FROM \"{}\".\"{}\" WHERE xmin::text::bigint > $1",
325 schema, table
326 );
327
328 let row = self
329 .client
330 .query_one(&query, &[&(since_xmin as i64)])
331 .await
332 .with_context(|| format!("Failed to count changes in {}.{}", schema, table))?;
333
334 let count: i64 = row.get(0);
335 Ok(count)
336 }
337
338 pub async fn list_tables(&self, schema: &str) -> Result<Vec<String>> {
340 let rows = self
341 .client
342 .query(
343 "SELECT tablename FROM pg_tables WHERE schemaname = $1 ORDER BY tablename",
344 &[&schema],
345 )
346 .await
347 .with_context(|| format!("Failed to list tables in schema {}", schema))?;
348
349 Ok(rows.iter().map(|row| row.get(0)).collect())
350 }
351
352 pub async fn get_columns(&self, schema: &str, table: &str) -> Result<Vec<ColumnInfo>> {
354 let rows = self
355 .client
356 .query(
357 "SELECT column_name, data_type, is_nullable, column_default
358 FROM information_schema.columns
359 WHERE table_schema = $1 AND table_name = $2
360 ORDER BY ordinal_position",
361 &[&schema, &table],
362 )
363 .await
364 .with_context(|| format!("Failed to get columns for {}.{}", schema, table))?;
365
366 Ok(rows
367 .iter()
368 .map(|row| ColumnInfo {
369 name: row.get(0),
370 data_type: row.get(1),
371 is_nullable: row.get::<_, String>(2) == "YES",
372 has_default: row.get::<_, Option<String>>(3).is_some(),
373 })
374 .collect())
375 }
376
377 pub async fn get_primary_key(&self, schema: &str, table: &str) -> Result<Vec<String>> {
379 let rows = self
380 .client
381 .query(
382 "SELECT a.attname
383 FROM pg_index i
384 JOIN pg_attribute a ON a.attrelid = i.indrelid AND a.attnum = ANY(i.indkey)
385 JOIN pg_class c ON c.oid = i.indrelid
386 JOIN pg_namespace n ON n.oid = c.relnamespace
387 WHERE i.indisprimary
388 AND n.nspname = $1
389 AND c.relname = $2
390 ORDER BY array_position(i.indkey, a.attnum)",
391 &[&schema, &table],
392 )
393 .await
394 .with_context(|| format!("Failed to get primary key for {}.{}", schema, table))?;
395
396 Ok(rows.iter().map(|row| row.get(0)).collect())
397 }
398
399 pub async fn read_all_rows(
414 &self,
415 schema: &str,
416 table: &str,
417 columns: &[String],
418 ) -> Result<(Vec<Row>, u32)> {
419 tracing::info!(
420 "Performing full table read for {}.{} (wraparound recovery)",
421 schema,
422 table
423 );
424
425 let column_list = if columns.is_empty() {
426 "*".to_string()
427 } else {
428 columns
429 .iter()
430 .map(|c| format!("\"{}\"", c))
431 .collect::<Vec<_>>()
432 .join(", ")
433 };
434
435 let query = format!(
438 "SELECT {}, xmin::text::bigint as _xmin FROM \"{}\".\"{}\" ORDER BY xmin::text::bigint",
439 column_list, schema, table
440 );
441
442 let rows = self
443 .client
444 .query(&query, &[])
445 .await
446 .with_context(|| format!("Failed to read all rows from {}.{}", schema, table))?;
447
448 let max_xmin = rows
450 .iter()
451 .map(|row| {
452 let xmin: i64 = row.get("_xmin");
453 (xmin & 0xFFFFFFFF) as u32
454 })
455 .max()
456 .unwrap_or(0);
457
458 tracing::info!(
459 "Full table read complete: {} rows, max_xmin={}",
460 rows.len(),
461 max_xmin
462 );
463
464 Ok((rows, max_xmin))
465 }
466
467 pub async fn read_changes_with_wraparound_check(
484 &self,
485 schema: &str,
486 table: &str,
487 columns: &[String],
488 since_xmin: u32,
489 ) -> Result<(Vec<Row>, u32, bool)> {
490 let current_xmin = self.get_current_xmin().await?;
492
493 if detect_wraparound(since_xmin, current_xmin) == WraparoundCheck::WraparoundDetected {
495 let (rows, max_xmin) = self.read_all_rows(schema, table, columns).await?;
497 Ok((rows, max_xmin, true))
498 } else {
499 let (rows, max_xmin) = self
501 .read_changes(schema, table, columns, since_xmin)
502 .await?;
503 Ok((rows, max_xmin, false))
504 }
505 }
506}
507
508pub struct BatchReader {
513 pub schema: String,
514 pub table: String,
515 pub columns: Vec<String>,
516 pub current_xmin: u32,
517 pub last_ctid: Option<String>,
520 pub batch_size: usize,
521 pub exhausted: bool,
522}
523
524#[derive(Debug, Clone)]
526pub struct ColumnInfo {
527 pub name: String,
528 pub data_type: String,
529 pub is_nullable: bool,
530 pub has_default: bool,
531}
532
533#[cfg(test)]
534mod tests {
535 use super::*;
536
537 #[test]
538 fn test_batch_reader_initial_state() {
539 let reader = BatchReader {
540 schema: "public".to_string(),
541 table: "users".to_string(),
542 columns: vec!["id".to_string(), "name".to_string()],
543 current_xmin: 0,
544 last_ctid: None,
545 batch_size: 1000,
546 exhausted: false,
547 };
548
549 assert_eq!(reader.schema, "public");
550 assert_eq!(reader.table, "users");
551 assert_eq!(reader.current_xmin, 0);
552 assert!(reader.last_ctid.is_none());
553 assert!(!reader.exhausted);
554 }
555
556 #[test]
557 fn test_column_info() {
558 let col = ColumnInfo {
559 name: "id".to_string(),
560 data_type: "integer".to_string(),
561 is_nullable: false,
562 has_default: true,
563 };
564
565 assert_eq!(col.name, "id");
566 assert!(!col.is_nullable);
567 assert!(col.has_default);
568 }
569
570 #[test]
571 fn test_wraparound_detection_normal() {
572 assert_eq!(detect_wraparound(100, 200), WraparoundCheck::Normal);
574
575 assert_eq!(detect_wraparound(1000, 900), WraparoundCheck::Normal);
577
578 assert_eq!(detect_wraparound(0, 100), WraparoundCheck::Normal);
580 }
581
582 #[test]
583 fn test_wraparound_detection_wraparound() {
584 assert_eq!(
587 detect_wraparound(3_500_000_000, 100),
588 WraparoundCheck::WraparoundDetected
589 );
590
591 assert_eq!(
593 detect_wraparound(4_000_000_000, 1_000_000),
594 WraparoundCheck::WraparoundDetected
595 );
596
597 assert_eq!(
599 detect_wraparound(2_500_000_000, 400_000_000),
600 WraparoundCheck::WraparoundDetected
601 );
602 }
603
604 #[test]
605 fn test_wraparound_detection_edge_cases() {
606 assert_eq!(detect_wraparound(0, 1_000_000), WraparoundCheck::Normal);
608
609 assert_eq!(detect_wraparound(1000, 1000), WraparoundCheck::Normal);
611
612 assert_eq!(detect_wraparound(2_000_000_001, 1), WraparoundCheck::Normal);
614
615 assert_eq!(
617 detect_wraparound(2_000_000_002, 1),
618 WraparoundCheck::WraparoundDetected
619 );
620 }
621
622 #[test]
623 fn test_is_valid_ctid() {
624 assert!(is_valid_ctid("(0,1)"));
626 assert!(is_valid_ctid("(123,45)"));
627 assert!(is_valid_ctid("(0,100)"));
628 assert!(is_valid_ctid("(999999,65535)"));
629 assert!(is_valid_ctid(" (0,1) ")); assert!(!is_valid_ctid("0,1")); assert!(!is_valid_ctid("(0,1")); assert!(!is_valid_ctid("0,1)")); assert!(!is_valid_ctid("(0)")); assert!(!is_valid_ctid("(0,1,2)")); assert!(!is_valid_ctid("(a,1)")); assert!(!is_valid_ctid("(0,b)")); assert!(!is_valid_ctid("")); assert!(!is_valid_ctid("()")); assert!(!is_valid_ctid("(-1,1)")); }
643}