pg2any_lib 0.9.0

PostgreSQL to Any database library with Change Data Capture (CDC) and logical replication support
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
//! Thread-safe LSN tracking for CDC replication
//!
//! This module provides LSN tracking for CDC replication with file persistence:
//!
//! 1. **SharedLsnFeedback**: Re-exported from `pg_walstream` - Thread-safe tracking
//!    of LSN positions for replication feedback to PostgreSQL
//!
//! 2. **LsnTracker**: Thread-safe tracker for the last committed LSN with file persistence
//!    for graceful shutdown and restart recovery
//!
//! ## Simplified LSN Tracking
//!
//! This implementation uses a simplified approach with a single flush_lsn value:
//!
//! - `flush_lsn`: Last WAL location successfully committed to destination database
//!   - Updated by consumer when transactions are successfully committed
//!   - Serves as the start_lsn for recovery on restart
//!   - Represents actual application of changes to the destination
//!
//! ## pg2any LSN Tracking Implementation
//!
//! ### File-Based Workflow
//! - **Consumer**: Updates `flush_lsn` when transaction is successfully executed and committed
//!   - This happens in `client.rs` after successful `execute_sql_batch()` and `delete_pending_transaction()`
//!   - Represents that changes have been applied to the destination database
//!   - This LSN is persisted and used as start_lsn on restart
//!
//! ## Monitoring Replication Progress
//!
//! You can monitor the replication progress using:
//! ```sql
//! SELECT
//!     application_name,
//!     state,
//!     sent_lsn,
//!     write_lsn,
//!     flush_lsn,
//!     replay_lsn
//! FROM pg_stat_replication
//! WHERE application_name = 'pg2any';
//! ```
//!
//! ## Persistence Strategy
//!
//! The LsnTracker persists immediately after critical state changes:
//! - After each batch execution in the consumer (explicit calls to `persist_async()`)
//! - On shutdown for final state preservation
//! - On error conditions for durability guarantees
//! - Tracks a "dirty" flag to skip unnecessary writes when state hasn't changed
//! - All persistence is explicit and immediate for data safety

use crate::types::Lsn;
use chrono::{DateTime, Utc};
use pg_walstream::format_lsn;
use serde::{Deserialize, Serialize};
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use tracing::{debug, info, warn};

// Re-export SharedLsnFeedback from pg_walstream to avoid duplication
pub use pg_walstream::SharedLsnFeedback;

/// Metadata format version for the LSN persistence file
const METADATA_VERSION: &str = "1.0";

/// CDC replication metadata stored in the persistence file
///
/// This structure stores comprehensive state information for recovery:
/// - LSN tracking for all three replication positions (write, flush, replay)
/// - Consumer execution state for resuming from correct position
/// - Timestamp information for monitoring and debugging
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CdcMetadata {
    /// Metadata format version (for future compatibility)
    pub version: String,

    /// Timestamp when metadata was last updated
    pub last_updated: DateTime<Utc>,

    /// LSN tracking for PostgreSQL replication protocol
    pub lsn_tracking: LsnTracking,

    /// Consumer execution state
    pub consumer_state: ConsumerState,
}

/// LSN position for CDC replication tracking
///
/// Simplified to track only the last committed LSN (flush_lsn) which serves as start_lsn for recovery.
/// This LSN is updated when transactions are successfully committed to the destination database.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LsnTracking {
    /// Last WAL location successfully committed to destination database
    /// This serves as the start_lsn for recovery on restart
    pub flush_lsn: u64,
}

/// Consumer execution state for recovery
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ConsumerState {
    /// Last successfully processed transaction ID
    pub last_processed_tx_id: Option<u32>,

    /// Timestamp of last processed transaction
    pub last_processed_timestamp: Option<DateTime<Utc>>,

    /// Number of pending transaction files at last update
    pub pending_file_count: usize,
}

impl Default for CdcMetadata {
    fn default() -> Self {
        Self {
            version: METADATA_VERSION.to_string(),
            last_updated: Utc::now(),
            lsn_tracking: LsnTracking { flush_lsn: 0 },
            consumer_state: ConsumerState {
                last_processed_tx_id: None,
                last_processed_timestamp: None,
                pending_file_count: 0,
            },
        }
    }
}

impl CdcMetadata {
    /// Create new metadata with current timestamp
    pub fn new() -> Self {
        Self::default()
    }

    /// Update flush LSN (transaction committed to destination)
    fn update_flush_lsn(&mut self, lsn: u64) {
        if lsn > self.lsn_tracking.flush_lsn {
            self.lsn_tracking.flush_lsn = lsn;
            self.last_updated = Utc::now();
        }
    }

    /// Update consumer state after successful transaction execution
    pub fn update_consumer_state(
        &mut self,
        tx_id: u32,
        timestamp: DateTime<Utc>,
        pending_count: usize,
    ) {
        self.consumer_state.last_processed_tx_id = Some(tx_id);
        self.consumer_state.last_processed_timestamp = Some(timestamp);
        self.consumer_state.pending_file_count = pending_count;
        self.last_updated = Utc::now();
    }

    /// Get the flush LSN (last committed to destination)
    pub fn get_lsn(&self) -> u64 {
        self.lsn_tracking.flush_lsn
    }
}

/// Thread-safe tracker for the last committed LSN with file persistence
///
/// This tracker maintains simplified CDC replication metadata including:
/// - flush_lsn for tracking last committed position
/// - Consumer execution state for recovery
/// - Timestamp information for monitoring
///
/// ## Persistence Strategy
///
/// This tracker persists immediately when explicitly requested:
/// - After each batch execution in the consumer (via `persist_async()`)
/// - On shutdown for final state preservation
/// - On error conditions for durability guarantees
/// - Tracks a "dirty" flag to skip unnecessary writes when state hasn't changed
///
/// ## Key Design Principle
///
/// The LSN tracked here represents what was actually **committed to the destination**
/// (e.g., MySQL), NOT what was received from the PostgreSQL replication stream.
/// This distinction is critical for graceful shutdown during large transactions:
///
/// - If shutdown occurs mid-transaction, only the LSN of the last *committed* batch is saved
/// - On restart, replication resumes from the last committed point, preventing data loss
/// - Incomplete transactions are replayed from their last committed batch position
///
/// ## Metadata Format
///
/// The persistence file uses JSON format for comprehensive state tracking:
/// ```json
/// {
///   "version": "1.0",
///   "last_updated": "2025-12-28T10:30:45Z",
///   "lsn_tracking": {
///     "flush_lsn": "0/2E00000"
///   },
///   "consumer_state": {
///     "last_processed_tx_id": 12345,
///     "last_processed_timestamp": "2025-12-28T10:30:40Z",
///     "pending_file_count": 3
///   }
/// }
/// ```
#[derive(Debug)]
pub struct LsnTracker {
    /// The comprehensive CDC metadata
    metadata: Arc<Mutex<CdcMetadata>>,
    /// Last persisted replay LSN (to avoid unnecessary writes)
    last_persisted_lsn: AtomicU64,
    /// Flag indicating if metadata needs to be persisted
    dirty: AtomicBool,
    /// Path to the metadata persistence file
    lsn_file_path: String,
}

impl LsnTracker {
    /// Create a new LSN tracker with the specified file path
    pub async fn new(lsn_file_path: Option<&str>) -> Arc<Self> {
        let mut path: String = lsn_file_path
            .map(String::from)
            .or_else(|| std::env::var("CDC_LAST_LSN_FILE").ok())
            .unwrap_or_else(|| "./pg2any_last_lsn.metadata".to_string());

        // Ensure path has .metadata extension
        if !path.ends_with(".metadata") {
            path.push_str(".metadata");
        }

        // Create parent directory if it doesn't exist
        if let Some(parent) = std::path::Path::new(&path).parent() {
            if !parent.as_os_str().is_empty() && tokio::fs::metadata(parent).await.is_err() {
                if let Err(e) = tokio::fs::create_dir_all(parent).await {
                    warn!("Failed to create directory for LSN file {}: {}", path, e);
                } else {
                    info!("Created directory for LSN metadata: {:?}", parent);
                }
            }
        }

        Arc::new(Self {
            metadata: Arc::new(Mutex::new(CdcMetadata::default())),
            last_persisted_lsn: AtomicU64::new(0),
            dirty: AtomicBool::new(false),
            lsn_file_path: path,
        })
    }

    /// Create a new LSN tracker and load the initial value from file
    ///
    /// # Arguments
    /// * `lsn_file_path` - Optional path to the LSN metadata file
    pub async fn new_with_load(lsn_file_path: Option<&str>) -> (Arc<Self>, Option<Lsn>) {
        let tracker = Self::new(lsn_file_path).await;

        let loaded_metadata = tracker.load_from_file().await;

        // Initialize the metadata with loaded values
        if let Some(metadata) = loaded_metadata {
            let mut current_metadata = tracker.metadata.lock().unwrap();
            *current_metadata = metadata.clone();
            drop(current_metadata);

            let flush_lsn = metadata.get_lsn();
            tracker
                .last_persisted_lsn
                .store(flush_lsn, Ordering::Release);

            let loaded_lsn = if flush_lsn > 0 {
                Some(Lsn(flush_lsn))
            } else {
                None
            };

            (tracker, loaded_lsn)
        } else {
            (tracker, None)
        }
    }

    /// Shutdown the tracker gracefully
    /// This ensures the final LSN is persisted before exit.
    pub async fn shutdown_async(&self) {
        info!("Shutting down LSN tracker");

        // Force an immediate persist to ensure final state is written
        if let Err(e) = self.persist_async().await {
            warn!("Failed to persist on shutdown: {}", e);
        } else {
            info!("Final state persisted on shutdown");
        }

        info!("LSN tracker stopped gracefully");
    }

    /// Synchronous shutdown for use in Drop or non-async contexts
    pub fn shutdown_sync(&self) {
        info!("Initiating sync shutdown of LSN tracker");

        // Perform synchronous final persist
        if let Err(e) = self.persist_sync() {
            warn!("Failed to persist LSN on sync shutdown: {}", e);
        }
    }

    /// Load metadata from the persistence file
    ///
    /// Returns `Some(CdcMetadata)` if valid JSON metadata was found and parsed, `None` otherwise.
    /// Logs warnings for parsing errors but doesn't fail.
    pub async fn load_from_file(&self) -> Option<CdcMetadata> {
        match tokio::fs::read_to_string(&self.lsn_file_path).await {
            Ok(contents) => {
                let s = contents.trim();
                if s.is_empty() {
                    info!(
                        "LSN metadata file {} is empty, starting from latest",
                        self.lsn_file_path
                    );
                    None
                } else {
                    // Parse as JSON metadata format
                    match serde_json::from_str::<CdcMetadata>(s) {
                        Ok(metadata) => {
                            info!(
                                "Loaded CDC metadata from {} (flush_lsn: {})",
                                self.lsn_file_path,
                                format_lsn(metadata.lsn_tracking.flush_lsn)
                            );

                            Some(metadata)
                        }
                        Err(e) => {
                            warn!(
                                "Failed to parse metadata from {}: {}. File must contain valid JSON.",
                                self.lsn_file_path, e
                            );
                            None
                        }
                    }
                }
            }
            Err(_) => {
                info!(
                    "No persisted metadata file found at {}, starting from latest",
                    self.lsn_file_path
                );
                None
            }
        }
    }

    /// Update and mark LSN for persistence
    /// This should be called after each successful transaction commit to the destination.
    /// It updates the in-memory LSN and marks it as dirty.
    /// Note: This does NOT automatically persist to disk - caller must explicitly call `persist_async()`.
    #[inline]
    pub fn commit_lsn(&self, lsn: u64) {
        if lsn == 0 {
            return;
        }

        let mut metadata = self.metadata.lock().unwrap();
        if lsn > metadata.lsn_tracking.flush_lsn {
            metadata.update_flush_lsn(lsn);
            drop(metadata);
            self.dirty.store(true, Ordering::Release);
        }
    }

    /// Update consumer state after successful transaction execution
    pub fn update_consumer_state(
        &self,
        tx_id: u32,
        timestamp: DateTime<Utc>,
        pending_count: usize,
    ) {
        {
            let mut metadata = self.metadata.lock().unwrap();
            metadata.update_consumer_state(tx_id, timestamp, pending_count);
        }
        self.dirty.store(true, Ordering::Release);
        debug!(
            "Updated consumer state: tx_id={}, pending_count={}",
            tx_id, pending_count
        );
    }

    /// Get the current last committed LSN (flush_lsn)
    #[inline]
    pub fn get(&self) -> u64 {
        let metadata = self.metadata.lock().unwrap();
        metadata.get_lsn()
    }

    /// Get the current last committed LSN as `Option<Lsn>`
    pub fn get_lsn(&self) -> Option<Lsn> {
        let v = self.get();
        if v == 0 {
            None
        } else {
            Some(Lsn(v))
        }
    }

    /// Get a copy of the current metadata
    pub fn get_metadata(&self) -> CdcMetadata {
        let metadata = self.metadata.lock().unwrap();
        metadata.clone()
    }

    /// Persist the current metadata to file asynchronously
    /// This method writes the metadata atomically by first writing to a temp file
    /// and then renaming to ensure crash safety. Returns early if the metadata
    /// is already persisted.
    pub async fn persist_async(&self) -> std::io::Result<()> {
        self.persist_internal().await
    }

    /// Internal persist implementation
    async fn persist_internal(&self) -> std::io::Result<()> {
        let metadata = {
            let m = self.metadata.lock().unwrap();
            m.clone()
        };

        let flush_lsn = metadata.get_lsn();

        // Serialize metadata to JSON
        let json_content =
            serde_json::to_string_pretty(&metadata).map_err(std::io::Error::other)?;

        // Write to temp file first for atomic write
        let temp_path = format!("{}.tmp", self.lsn_file_path);
        tokio::fs::write(&temp_path, &json_content).await?;
        tokio::fs::rename(&temp_path, &self.lsn_file_path).await?;

        // Update persisted LSN and clear dirty flag
        self.last_persisted_lsn.store(flush_lsn, Ordering::Release);
        self.dirty.store(false, Ordering::Release);

        debug!(
            "Persisted CDC metadata to {} (flush_lsn: {})",
            self.lsn_file_path,
            format_lsn(flush_lsn)
        );
        Ok(())
    }

    /// Persist for use in Drop or non-async contexts via spawn_blocking
    fn persist_sync(&self) -> std::io::Result<()> {
        let metadata = {
            let m = self.metadata.lock().unwrap();
            m.clone()
        };

        let flush_lsn = metadata.get_lsn();

        // Serialize metadata to JSON
        let json_content =
            serde_json::to_string_pretty(&metadata).map_err(std::io::Error::other)?;

        // Write to temp file first for atomic write
        let temp_path = format!("{}.tmp", self.lsn_file_path);
        debug!("Writing LSN metadata to temp file (sync): {}", temp_path);

        std::fs::write(&temp_path, &json_content)?;
        debug!("Renaming (sync) {} -> {}", temp_path, self.lsn_file_path);
        std::fs::rename(&temp_path, &self.lsn_file_path)?;

        // Update persisted LSN and clear dirty flag
        self.last_persisted_lsn.store(flush_lsn, Ordering::Release);
        self.dirty.store(false, Ordering::Release);

        debug!(
            "Persisted CDC metadata to {} (sync) - flush_lsn: {}",
            self.lsn_file_path,
            format_lsn(flush_lsn)
        );
        Ok(())
    }

    /// Get the file path for LSN persistence
    pub fn file_path(&self) -> &str {
        &self.lsn_file_path
    }

    /// Check if there are pending changes to persist
    pub fn is_dirty(&self) -> bool {
        self.dirty.load(Ordering::Acquire)
    }
}

/// Helper function to create a tracker with loaded LSN value
/// This is a convenience wrapper for the most common usage pattern.
pub async fn create_lsn_tracker_with_load(
    lsn_file_path: Option<&str>,
) -> (Arc<LsnTracker>, Option<Lsn>) {
    LsnTracker::new_with_load(lsn_file_path).await
}

#[cfg(test)]
mod lsn_tracker_tests {
    use super::*;

    #[tokio::test]
    async fn test_lsn_tracker_new() {
        let tracker = LsnTracker::new(Some("/tmp/test_lsn")).await;
        assert_eq!(tracker.get(), 0);
        assert_eq!(tracker.file_path(), "/tmp/test_lsn.metadata");
    }

    #[tokio::test]
    async fn test_lsn_tracker_get_lsn() {
        let tracker = LsnTracker::new(Some("/tmp/test_lsn_get_lsn")).await;
        tracker.commit_lsn(100);
        let lsn = tracker.get_lsn();
        assert_eq!(lsn, Some(Lsn(100)));
    }

    #[tokio::test]
    async fn test_lsn_tracker_commit_lsn() {
        let tracker = LsnTracker::new(Some("/tmp/test_lsn_commit_lsn")).await;

        tracker.commit_lsn(100);
        assert_eq!(tracker.get(), 100);

        // Smaller value should be ignored
        tracker.commit_lsn(50);
        assert_eq!(tracker.get(), 100);

        // Greater value should update
        tracker.commit_lsn(200);
        assert_eq!(tracker.get(), 200);
    }

    #[tokio::test]
    async fn test_zero_lsn_ignored() {
        let tracker = LsnTracker::new(Some("/tmp/test_lsn_zero")).await;

        tracker.commit_lsn(100);
        tracker.commit_lsn(0); // Should be ignored

        assert_eq!(tracker.get(), 100);
    }

    #[tokio::test]
    async fn test_persist_async() {
        let path = "/tmp/test_lsn_persist_async";
        let _ = std::fs::remove_file(format!("{}.metadata", path));

        let tracker = LsnTracker::new(Some(path)).await;
        tracker.commit_lsn(12345);
        tracker.persist_async().await.unwrap();

        // Read back the file
        let content = tokio::fs::read_to_string(format!("{}.metadata", path))
            .await
            .unwrap();
        let metadata: CdcMetadata = serde_json::from_str(&content).unwrap();
        assert_eq!(metadata.lsn_tracking.flush_lsn, 12345);

        // Clean up
        let _ = std::fs::remove_file(format!("{}.metadata", path));
    }

    #[tokio::test]
    async fn test_persist_skips_when_not_dirty() {
        let path = "/tmp/test_lsn_persist_skip";
        let _ = std::fs::remove_file(format!("{}.metadata", path));

        let tracker = LsnTracker::new(Some(path)).await;
        tracker.commit_lsn(12345);
        tracker.persist_async().await.unwrap();

        // Second persist should be skipped (not dirty)
        let result = tracker.persist_async().await;
        assert!(result.is_ok());

        // Clean up
        let _ = std::fs::remove_file(format!("{}.metadata", path));
    }

    #[tokio::test]
    async fn test_commit_lsn_marks_dirty() {
        let tracker = LsnTracker::new(Some("/tmp/test_lsn_dirty")).await;

        assert!(!tracker.is_dirty());

        tracker.commit_lsn(100);
        assert!(tracker.is_dirty());
    }

    #[tokio::test]
    async fn test_load_from_file() {
        let path = "/tmp/test_lsn_load";
        let metadata_path = format!("{}.metadata", path);

        // Create a metadata file
        let metadata = CdcMetadata {
            version: "1.0".to_string(),
            last_updated: Utc::now(),
            lsn_tracking: LsnTracking { flush_lsn: 54321 },
            consumer_state: ConsumerState {
                last_processed_tx_id: Some(999),
                last_processed_timestamp: Some(Utc::now()),
                pending_file_count: 5,
            },
        };

        let json = serde_json::to_string_pretty(&metadata).unwrap();
        tokio::fs::write(&metadata_path, json).await.unwrap();

        // Load it
        let tracker = LsnTracker::new(Some(path)).await;
        let loaded = tracker.load_from_file().await;

        assert!(loaded.is_some());
        let loaded_metadata = loaded.unwrap();
        assert_eq!(loaded_metadata.lsn_tracking.flush_lsn, 54321);
        assert_eq!(
            loaded_metadata.consumer_state.last_processed_tx_id,
            Some(999)
        );

        // Clean up
        let _ = std::fs::remove_file(metadata_path);
    }

    #[tokio::test]
    async fn test_new_with_load() {
        let path = "/tmp/test_lsn_new_with_load";
        let metadata_path = format!("{}.metadata", path);

        // Create a metadata file
        let metadata = CdcMetadata {
            version: "1.0".to_string(),
            last_updated: Utc::now(),
            lsn_tracking: LsnTracking { flush_lsn: 67890 },
            consumer_state: ConsumerState {
                last_processed_tx_id: None,
                last_processed_timestamp: None,
                pending_file_count: 0,
            },
        };

        let json = serde_json::to_string_pretty(&metadata).unwrap();
        tokio::fs::write(&metadata_path, json).await.unwrap();

        // Load using new_with_load
        let (tracker, loaded_lsn) = LsnTracker::new_with_load(Some(path)).await;

        assert_eq!(loaded_lsn, Some(Lsn(67890)));
        assert_eq!(tracker.get(), 67890);

        // Clean up
        let _ = std::fs::remove_file(metadata_path);
    }

    #[tokio::test]
    async fn test_metadata_extension_not_present() {
        let tracker = LsnTracker::new(Some("/tmp/test_lsn_no_ext")).await;
        assert!(tracker.file_path().ends_with(".metadata"));
    }

    #[tokio::test]
    async fn test_metadata_extension_already_present() {
        let tracker = LsnTracker::new(Some("/tmp/test_lsn.metadata")).await;
        // Should not double-add the extension
        assert_eq!(tracker.file_path().matches(".metadata").count(), 1);
    }

    #[tokio::test]
    async fn test_shared_across_threads() {
        let tracker = LsnTracker::new(Some("/tmp/test_lsn_threads")).await;
        let tracker_clone = tracker.clone();

        let handle = tokio::spawn(async move {
            tracker_clone.commit_lsn(12345);
        });

        handle.await.unwrap();
        assert_eq!(tracker.get(), 12345);
    }

    #[tokio::test]
    async fn test_shutdown_without_background_task() {
        let path = "/tmp/test_lsn_shutdown_simple";
        let _ = std::fs::remove_file(format!("{}.metadata", path));

        let tracker = LsnTracker::new(Some(path)).await;
        tracker.commit_lsn(99999);

        // Shutdown should persist
        tracker.shutdown_async().await;

        // Verify file was written
        let content = tokio::fs::read_to_string(format!("{}.metadata", path))
            .await
            .unwrap();
        let metadata: CdcMetadata = serde_json::from_str(&content).unwrap();
        assert_eq!(metadata.lsn_tracking.flush_lsn, 99999);

        // Clean up
        let _ = std::fs::remove_file(format!("{}.metadata", path));
    }

    #[tokio::test]
    async fn test_double_shutdown_is_safe() {
        let path = "/tmp/test_lsn_double_shutdown";
        let tracker = LsnTracker::new(Some(path)).await;
        tracker.shutdown_async().await;
        // Second shutdown should be safe
        tracker.shutdown_async().await;

        // Clean up
        let _ = std::fs::remove_file(format!("{}.metadata", path));
    }

    // Shared LSN Feedback tests
    #[test]
    fn test_shared_lsn_feedback_new() {
        let feedback = SharedLsnFeedback::new_shared();
        assert_eq!(feedback.get_flushed_lsn(), 0);
        assert_eq!(feedback.get_applied_lsn(), 0);
    }

    #[test]
    fn test_update_flushed_lsn() {
        let feedback = SharedLsnFeedback::new_shared();
        feedback.update_flushed_lsn(100);
        assert_eq!(feedback.get_flushed_lsn(), 100);
    }

    #[test]
    fn test_update_applied_lsn() {
        let feedback = SharedLsnFeedback::new_shared();
        feedback.update_applied_lsn(200);
        assert_eq!(feedback.get_applied_lsn(), 200);
    }

    #[test]
    fn test_get_feedback_lsn() {
        let feedback = SharedLsnFeedback::new_shared();
        feedback.update_flushed_lsn(150);
        feedback.update_applied_lsn(200);

        // get_feedback_lsn returns a tuple (flushed, applied)
        // Both update to the max value seen
        let (flushed, applied) = feedback.get_feedback_lsn();
        assert_eq!(flushed, 200);
        assert_eq!(applied, 200);
    }
}