replication_engine/
cursor.rs

1// Copyright (c) 2025-2026 Adrian Robinson. Licensed under the AGPL-3.0.
2// See LICENSE file in the project root for full license text.
3
4//! Cursor persistence for stream positions.
5//!
6//! Stores the last-read stream ID for each peer in SQLite.
7//! This survives Redis restarts (we're not relying on AOF!) and daemon restarts.
8//!
9//! # Debounced Writes
10//!
11//! To reduce SQLite write pressure, cursors are debounced:
12//! - `set()` updates the in-memory cache immediately and marks the cursor dirty
13//! - `flush_dirty()` persists all dirty cursors to disk in a batch
14//! - The coordinator calls `flush_dirty()` periodically (every few seconds)
15//! - On shutdown, `flush_dirty()` is called to ensure no data loss
16//!
17//! This means a crash between `set()` and `flush_dirty()` could lose up to
18//! one flush interval of cursor progress. On restart, we'd re-read some
19//! events that were already applied (idempotent, safe).
20//!
21//! # SQLite Busy Handling
22//!
23//! SQLite can return SQLITE_BUSY/SQLITE_LOCKED when the database is
24//! contended. We handle this with:
25//! - Automatic retry with exponential backoff
26//! - Configurable max retries (default 5)
27//! - Cache-first writes (cache is updated immediately, disk write retried)
28//!
29//! ## Why SQLite?
30//!
31//! - Redis may be ephemeral (no AOF/RDB persistence)
32//! - We need to survive both Redis and calling daemon restarts
33//! - Cursors are small and low-write (updated every few seconds)
34//! - SQLite WAL mode gives us durability with good performance
35//!
36//! ## Cursor Semantics
37//!
38//! The cursor stores the **last successfully applied** stream ID.
39//! On restart, we resume from `cursor + 1` (exclusive read).
40//!
41//! ```text
42//! read event 1234 → apply to sync-engine → persist cursor 1234
43//!                   (crash here = re-read 1234, idempotent)
44//! ```
45
46use crate::error::{ReplicationError, Result};
47use sqlx::sqlite::{SqliteConnectOptions, SqlitePool, SqlitePoolOptions};
48use std::collections::{HashMap, HashSet};
49use std::path::Path;
50use std::str::FromStr;
51use std::sync::Arc;
52use std::time::Duration;
53use tokio::sync::RwLock;
54use tracing::{debug, info, warn};
55
56/// Configuration for SQLite busy retry behavior
57const SQLITE_RETRY_MAX_ATTEMPTS: u32 = 5;
58const SQLITE_RETRY_BASE_DELAY_MS: u64 = 10;
59const SQLITE_RETRY_MAX_DELAY_MS: u64 = 500;
60
61/// Check if an error is a retryable SQLite busy/locked error
62fn is_sqlite_busy_error(e: &sqlx::Error) -> bool {
63    match e {
64        sqlx::Error::Database(db_err) => {
65            // SQLite error codes: SQLITE_BUSY = 5, SQLITE_LOCKED = 6
66            if let Some(code) = db_err.code() {
67                return code == "5" || code == "6";
68            }
69            // Fallback to message matching
70            let msg = db_err.message().to_lowercase();
71            msg.contains("database is locked") || msg.contains("database is busy")
72        }
73        _ => false,
74    }
75}
76
77/// Execute a database operation with retry on SQLITE_BUSY/SQLITE_LOCKED
78async fn execute_with_retry<F, Fut, T>(operation_name: &str, mut f: F) -> std::result::Result<T, sqlx::Error>
79where
80    F: FnMut() -> Fut,
81    Fut: std::future::Future<Output = std::result::Result<T, sqlx::Error>>,
82{
83    let mut attempts = 0;
84    let mut delay_ms = SQLITE_RETRY_BASE_DELAY_MS;
85
86    loop {
87        attempts += 1;
88        match f().await {
89            Ok(result) => {
90                if attempts > 1 {
91                    debug!(
92                        operation = operation_name,
93                        attempts,
94                        "SQLite operation succeeded after retry"
95                    );
96                }
97                return Ok(result);
98            }
99            Err(e) if is_sqlite_busy_error(&e) && attempts < SQLITE_RETRY_MAX_ATTEMPTS => {
100                warn!(
101                    operation = operation_name,
102                    attempts,
103                    max_attempts = SQLITE_RETRY_MAX_ATTEMPTS,
104                    delay_ms,
105                    "SQLite busy, retrying"
106                );
107                crate::metrics::cursor_retries_total(operation_name);
108                tokio::time::sleep(Duration::from_millis(delay_ms)).await;
109                // Exponential backoff with cap
110                delay_ms = (delay_ms * 2).min(SQLITE_RETRY_MAX_DELAY_MS);
111            }
112            Err(e) => {
113                if is_sqlite_busy_error(&e) {
114                    warn!(
115                        operation = operation_name,
116                        attempts,
117                        "SQLite busy, max retries exceeded"
118                    );
119                }
120                return Err(e);
121            }
122        }
123    }
124}
125
126/// Stream cursor entry
127#[derive(Debug, Clone)]
128pub struct CursorEntry {
129    /// Peer node ID
130    pub peer_id: String,
131    /// Last successfully applied stream ID (e.g., "1234567890123-0")
132    pub stream_id: String,
133    /// Timestamp of last update
134    pub updated_at: i64,
135}
136
137/// Persistent cursor storage backed by SQLite.
138///
139/// Supports debounced writes: updates go to cache immediately,
140/// and are flushed to disk periodically via `flush_dirty()`.
141pub struct CursorStore {
142    /// SQLite connection pool
143    pool: SqlitePool,
144    /// In-memory cache for fast reads
145    cache: Arc<RwLock<HashMap<String, String>>>,
146    /// Peer IDs with dirty (not yet persisted) cursors
147    dirty: Arc<RwLock<HashSet<String>>>,
148    /// Path to database file
149    path: String,
150}
151
152impl CursorStore {
153    /// Create a new cursor store at the given path.
154    ///
155    /// Creates the database and tables if they don't exist.
156    pub async fn new(path: impl AsRef<Path>) -> Result<Self> {
157        let path_str = path.as_ref().to_string_lossy().to_string();
158        info!(path = %path_str, "Initializing cursor store");
159
160        let options = SqliteConnectOptions::from_str(&format!("sqlite://{}?mode=rwc", path_str))
161            .map_err(|e| ReplicationError::Config(format!("Invalid SQLite path: {}", e)))?
162            .journal_mode(sqlx::sqlite::SqliteJournalMode::Wal)
163            .synchronous(sqlx::sqlite::SqliteSynchronous::Normal)
164            .create_if_missing(true);
165
166        let pool = SqlitePoolOptions::new()
167            .max_connections(2) // Low concurrency needed
168            .connect_with(options)
169            .await?;
170
171        // Create table
172        sqlx::query(
173            r#"
174            CREATE TABLE IF NOT EXISTS cursors (
175                peer_id TEXT PRIMARY KEY,
176                stream_id TEXT NOT NULL,
177                updated_at INTEGER NOT NULL
178            )
179            "#,
180        )
181        .execute(&pool)
182        .await?;
183
184        // Load existing cursors into cache
185        let rows: Vec<(String, String)> =
186            sqlx::query_as("SELECT peer_id, stream_id FROM cursors")
187                .fetch_all(&pool)
188                .await?;
189
190        let mut cache = HashMap::new();
191        for (peer_id, stream_id) in rows {
192            debug!(peer_id = %peer_id, stream_id = %stream_id, "Loaded cursor from disk");
193            cache.insert(peer_id, stream_id);
194        }
195
196        if !cache.is_empty() {
197            info!(count = cache.len(), "Restored cursors from previous run");
198        }
199
200        Ok(Self {
201            pool,
202            cache: Arc::new(RwLock::new(cache)),
203            dirty: Arc::new(RwLock::new(HashSet::new())),
204            path: path_str,
205        })
206    }
207
208    /// Get the cursor for a peer (from cache).
209    ///
210    /// Returns `None` if no cursor exists (first sync with this peer).
211    pub async fn get(&self, peer_id: &str) -> Option<String> {
212        self.cache.read().await.get(peer_id).cloned()
213    }
214
215    /// Get the cursor, or return "0" for first-time sync.
216    ///
217    /// Redis XREAD interprets "0" as "from the beginning".
218    pub async fn get_or_start(&self, peer_id: &str) -> String {
219        self.get(peer_id).await.unwrap_or_else(|| "0".to_string())
220    }
221
222    /// Update the cursor for a peer (debounced).
223    ///
224    /// Updates cache immediately, marks cursor as dirty.
225    /// Call `flush_dirty()` periodically to persist to disk.
226    ///
227    /// This is the preferred method for hot path updates where
228    /// we want to minimize SQLite writes.
229    pub async fn set(&self, peer_id: &str, stream_id: &str) {
230        // Update cache
231        {
232            let mut cache = self.cache.write().await;
233            cache.insert(peer_id.to_string(), stream_id.to_string());
234        }
235
236        // Mark as dirty
237        {
238            let mut dirty = self.dirty.write().await;
239            dirty.insert(peer_id.to_string());
240        }
241
242        debug!(peer_id = %peer_id, stream_id = %stream_id, "Cursor updated (pending flush)");
243    }
244
245    /// Flush all dirty cursors to disk.
246    ///
247    /// Call this periodically (e.g., every 5 seconds) and on shutdown.
248    /// Returns the number of cursors flushed.
249    pub async fn flush_dirty(&self) -> Result<usize> {
250        // Swap out dirty set atomically
251        let dirty_peers: Vec<String> = {
252            let mut dirty = self.dirty.write().await;
253            let peers: Vec<String> = dirty.drain().collect();
254            peers
255        };
256
257        if dirty_peers.is_empty() {
258            return Ok(0);
259        }
260
261        let now = chrono::Utc::now().timestamp_millis();
262        let cache = self.cache.read().await;
263        let pool = &self.pool;
264
265        let mut flushed = 0;
266        let mut errors = 0;
267
268        for peer_id in &dirty_peers {
269            if let Some(stream_id) = cache.get(peer_id) {
270                let peer_id_owned = peer_id.clone();
271                let stream_id_owned = stream_id.clone();
272
273                let result = execute_with_retry("cursor_flush", || async {
274                    sqlx::query(
275                        r#"
276                        INSERT INTO cursors (peer_id, stream_id, updated_at)
277                        VALUES (?, ?, ?)
278                        ON CONFLICT(peer_id) DO UPDATE SET
279                            stream_id = excluded.stream_id,
280                            updated_at = excluded.updated_at
281                        "#,
282                    )
283                    .bind(&peer_id_owned)
284                    .bind(&stream_id_owned)
285                    .bind(now)
286                    .execute(pool)
287                    .await
288                })
289                .await;
290
291                match result {
292                    Ok(_) => {
293                        flushed += 1;
294                    }
295                    Err(e) => {
296                        errors += 1;
297                        warn!(peer_id = %peer_id, error = %e, "Failed to flush cursor");
298                        // Re-mark as dirty so we retry next flush
299                        self.dirty.write().await.insert(peer_id.clone());
300                    }
301                }
302            }
303        }
304
305        if flushed > 0 {
306            debug!(flushed, errors, "Flushed dirty cursors");
307            crate::metrics::record_cursor_flush(flushed, errors);
308        }
309
310        if errors > 0 {
311            return Err(ReplicationError::Internal(format!(
312                "Failed to flush {} cursors",
313                errors
314            )));
315        }
316
317        Ok(flushed)
318    }
319
320    /// Check if there are any dirty (unflushed) cursors.
321    pub async fn has_dirty(&self) -> bool {
322        !self.dirty.read().await.is_empty()
323    }
324
325    /// Get count of dirty cursors pending flush.
326    pub async fn dirty_count(&self) -> usize {
327        self.dirty.read().await.len()
328    }
329
330    /// Delete cursor for a peer (e.g., when peer is removed from mesh).
331    /// Retries on SQLITE_BUSY/SQLITE_LOCKED with exponential backoff.
332    pub async fn delete(&self, peer_id: &str) -> Result<()> {
333        {
334            let mut cache = self.cache.write().await;
335            cache.remove(peer_id);
336        }
337
338        let pool = &self.pool;
339        let peer_id_owned = peer_id.to_string();
340
341        execute_with_retry("cursor_delete", || async {
342            sqlx::query("DELETE FROM cursors WHERE peer_id = ?")
343                .bind(&peer_id_owned)
344                .execute(pool)
345                .await
346        })
347        .await?;
348
349        info!(peer_id = %peer_id, "Deleted cursor");
350        Ok(())
351    }
352
353    /// Get all cursors (for metrics/debugging).
354    pub async fn get_all(&self) -> HashMap<String, String> {
355        self.cache.read().await.clone()
356    }
357
358    /// Get database path (for diagnostics).
359    pub fn path(&self) -> &str {
360        &self.path
361    }
362
363    /// Force flush WAL to main database (for clean shutdown).
364    /// Retries on SQLITE_BUSY/SQLITE_LOCKED with exponential backoff.
365    pub async fn checkpoint(&self) -> Result<()> {
366        let pool = &self.pool;
367
368        execute_with_retry("cursor_checkpoint", || async {
369            sqlx::query("PRAGMA wal_checkpoint(TRUNCATE)")
370                .execute(pool)
371                .await
372        })
373        .await?;
374
375        debug!("WAL checkpoint complete");
376        Ok(())
377    }
378
379    /// Close the connection pool gracefully.
380    ///
381    /// Flushes any dirty cursors and checkpoints WAL before closing.
382    pub async fn close(&self) {
383        // Flush any pending cursor updates
384        if self.has_dirty().await {
385            match self.flush_dirty().await {
386                Ok(count) => {
387                    if count > 0 {
388                        info!(count, "Flushed dirty cursors on close");
389                    }
390                }
391                Err(e) => {
392                    warn!(error = %e, "Failed to flush dirty cursors on close");
393                }
394            }
395        }
396
397        // Checkpoint WAL for clean shutdown
398        if let Err(e) = self.checkpoint().await {
399            warn!(error = %e, "Failed to checkpoint WAL on close");
400        }
401        self.pool.close().await;
402        info!("Cursor store closed");
403    }
404}
405
406#[cfg(test)]
407mod tests {
408    use super::*;
409    use tempfile::tempdir;
410
411    #[tokio::test]
412    async fn test_cursor_store_basic() {
413        let dir = tempdir().unwrap();
414        let db_path = dir.path().join("test_cursors.db");
415
416        let store = CursorStore::new(&db_path).await.unwrap();
417
418        // Initially no cursor
419        assert!(store.get("peer1").await.is_none());
420        assert_eq!(store.get_or_start("peer1").await, "0");
421
422        // Set cursor (debounced - updates cache only)
423        store.set("peer1", "1234567890123-0").await;
424        assert_eq!(store.get("peer1").await, Some("1234567890123-0".to_string()));
425        assert!(store.has_dirty().await);
426
427        // Update cursor
428        store.set("peer1", "1234567890124-0").await;
429        assert_eq!(store.get("peer1").await, Some("1234567890124-0".to_string()));
430
431        // Flush to disk
432        let flushed = store.flush_dirty().await.unwrap();
433        assert_eq!(flushed, 1);
434        assert!(!store.has_dirty().await);
435
436        store.close().await;
437    }
438
439    #[tokio::test]
440    async fn test_cursor_store_persistence() {
441        let dir = tempdir().unwrap();
442        let db_path = dir.path().join("test_persist.db");
443
444        // Create and set cursor
445        {
446            let store = CursorStore::new(&db_path).await.unwrap();
447            store.set("peer1", "9999-0").await;
448            store.flush_dirty().await.unwrap(); // Must flush before close!
449            store.close().await;
450        }
451
452        // Reopen and verify
453        {
454            let store = CursorStore::new(&db_path).await.unwrap();
455            assert_eq!(store.get("peer1").await, Some("9999-0".to_string()));
456            store.close().await;
457        }
458    }
459
460    #[tokio::test]
461    async fn test_cursor_store_delete() {
462        let dir = tempdir().unwrap();
463        let db_path = dir.path().join("test_delete.db");
464
465        let store = CursorStore::new(&db_path).await.unwrap();
466        store.set("peer1", "1234-0").await;
467        store.set("peer2", "5678-0").await;
468        store.flush_dirty().await.unwrap();
469
470        store.delete("peer1").await.unwrap();
471
472        assert!(store.get("peer1").await.is_none());
473        assert_eq!(store.get("peer2").await, Some("5678-0".to_string()));
474
475        store.close().await;
476    }
477
478    #[tokio::test]
479    async fn test_cursor_debounce_multiple_updates() {
480        let dir = tempdir().unwrap();
481        let db_path = dir.path().join("test_debounce.db");
482
483        let store = CursorStore::new(&db_path).await.unwrap();
484
485        // Multiple rapid updates to same peer
486        store.set("peer1", "100-0").await;
487        store.set("peer1", "200-0").await;
488        store.set("peer1", "300-0").await;
489
490        // Should only have one dirty entry
491        assert_eq!(store.dirty_count().await, 1);
492
493        // Cache should have latest value
494        assert_eq!(store.get("peer1").await, Some("300-0".to_string()));
495
496        // Flush should only write once
497        let flushed = store.flush_dirty().await.unwrap();
498        assert_eq!(flushed, 1);
499
500        store.close().await;
501    }
502
503    #[tokio::test]
504    async fn test_execute_with_retry_succeeds_immediately() {
505        let mut attempt_count = 0;
506
507        let result: std::result::Result<i32, sqlx::Error> =
508            execute_with_retry("test_op", || {
509                attempt_count += 1;
510                async { Ok(42) }
511            })
512            .await;
513
514        assert_eq!(result.unwrap(), 42);
515        assert_eq!(attempt_count, 1);
516    }
517
518    #[tokio::test]
519    async fn test_execute_with_retry_fails_on_non_busy_error() {
520        let mut attempt_count = 0;
521
522        let result: std::result::Result<i32, sqlx::Error> =
523            execute_with_retry("test_op", || {
524                attempt_count += 1;
525                async { Err(sqlx::Error::RowNotFound) }
526            })
527            .await;
528
529        assert!(result.is_err());
530        // Non-busy errors should not retry
531        assert_eq!(attempt_count, 1);
532    }
533
534    #[tokio::test]
535    async fn test_cursor_store_get_all() {
536        let dir = tempdir().unwrap();
537        let db_path = dir.path().join("test_get_all.db");
538
539        let store = CursorStore::new(&db_path).await.unwrap();
540        
541        store.set("peer1", "100-0").await;
542        store.set("peer2", "200-0").await;
543        store.set("peer3", "300-0").await;
544
545        let all = store.get_all().await;
546        assert_eq!(all.len(), 3);
547        assert_eq!(all.get("peer1"), Some(&"100-0".to_string()));
548        assert_eq!(all.get("peer2"), Some(&"200-0".to_string()));
549        assert_eq!(all.get("peer3"), Some(&"300-0".to_string()));
550
551        store.close().await;
552    }
553
554    #[tokio::test]
555    async fn test_cursor_store_path() {
556        let dir = tempdir().unwrap();
557        let db_path = dir.path().join("test_path.db");
558
559        let store = CursorStore::new(&db_path).await.unwrap();
560        assert!(store.path().contains("test_path.db"));
561
562        store.close().await;
563    }
564
565    #[tokio::test]
566    async fn test_cursor_store_checkpoint() {
567        let dir = tempdir().unwrap();
568        let db_path = dir.path().join("test_checkpoint.db");
569
570        let store = CursorStore::new(&db_path).await.unwrap();
571        store.set("peer1", "100-0").await;
572        store.flush_dirty().await.unwrap();
573
574        // Checkpoint should succeed
575        let result = store.checkpoint().await;
576        assert!(result.is_ok());
577
578        store.close().await;
579    }
580
581    #[tokio::test]
582    async fn test_cursor_store_dirty_count() {
583        let dir = tempdir().unwrap();
584        let db_path = dir.path().join("test_dirty_count.db");
585
586        let store = CursorStore::new(&db_path).await.unwrap();
587
588        assert_eq!(store.dirty_count().await, 0);
589        assert!(!store.has_dirty().await);
590
591        store.set("peer1", "100-0").await;
592        assert_eq!(store.dirty_count().await, 1);
593        assert!(store.has_dirty().await);
594
595        store.set("peer2", "200-0").await;
596        assert_eq!(store.dirty_count().await, 2);
597
598        // Same peer update doesn't increase count
599        store.set("peer1", "150-0").await;
600        assert_eq!(store.dirty_count().await, 2);
601
602        store.flush_dirty().await.unwrap();
603        assert_eq!(store.dirty_count().await, 0);
604        assert!(!store.has_dirty().await);
605
606        store.close().await;
607    }
608
609    #[tokio::test]
610    async fn test_cursor_store_close_flushes_dirty() {
611        let dir = tempdir().unwrap();
612        let db_path = dir.path().join("test_close_flush.db");
613
614        // Set cursor but don't manually flush
615        {
616            let store = CursorStore::new(&db_path).await.unwrap();
617            store.set("peer1", "999-0").await;
618            // close() should flush automatically
619            store.close().await;
620        }
621
622        // Verify it persisted
623        {
624            let store = CursorStore::new(&db_path).await.unwrap();
625            assert_eq!(store.get("peer1").await, Some("999-0".to_string()));
626            store.close().await;
627        }
628    }
629
630    #[tokio::test]
631    async fn test_cursor_store_get_or_start() {
632        let dir = tempdir().unwrap();
633        let db_path = dir.path().join("test_get_or_start.db");
634
635        let store = CursorStore::new(&db_path).await.unwrap();
636
637        // Non-existent peer should return "0"
638        assert_eq!(store.get_or_start("new_peer").await, "0");
639
640        // After setting, should return the value
641        store.set("new_peer", "123-0").await;
642        assert_eq!(store.get_or_start("new_peer").await, "123-0");
643
644        store.close().await;
645    }
646
647    #[tokio::test]
648    async fn test_cursor_store_multiple_peers() {
649        let dir = tempdir().unwrap();
650        let db_path = dir.path().join("test_multi_peer.db");
651
652        let store = CursorStore::new(&db_path).await.unwrap();
653
654        // Set cursors for many peers
655        for i in 0..10 {
656            store.set(&format!("peer{}", i), &format!("{}-0", i * 100)).await;
657        }
658
659        assert_eq!(store.dirty_count().await, 10);
660        
661        let flushed = store.flush_dirty().await.unwrap();
662        assert_eq!(flushed, 10);
663
664        // Verify all are correct
665        for i in 0..10 {
666            let expected = format!("{}-0", i * 100);
667            assert_eq!(store.get(&format!("peer{}", i)).await, Some(expected));
668        }
669
670        store.close().await;
671    }
672
673    #[tokio::test]
674    async fn test_cursor_store_delete_nonexistent() {
675        let dir = tempdir().unwrap();
676        let db_path = dir.path().join("test_delete_nonexistent.db");
677
678        let store = CursorStore::new(&db_path).await.unwrap();
679
680        // Deleting a non-existent peer should not error
681        let result = store.delete("nonexistent").await;
682        assert!(result.is_ok());
683
684        store.close().await;
685    }
686
687    #[test]
688    fn test_is_sqlite_busy_error_row_not_found() {
689        let error = sqlx::Error::RowNotFound;
690        assert!(!is_sqlite_busy_error(&error));
691    }
692
693    #[test]
694    fn test_is_sqlite_busy_error_pool_timed_out() {
695        let error = sqlx::Error::PoolTimedOut;
696        assert!(!is_sqlite_busy_error(&error));
697    }
698}