database_replicator/sqlite/
reader.rs

1// ABOUTME: SQLite database introspection and data reading
2// ABOUTME: Functions to list tables, count rows, and read table data
3
4use anyhow::{Context, Result};
5use rusqlite::Connection;
6use std::collections::HashMap;
7
8/// List all user tables in a SQLite database
9///
10/// Queries sqlite_master system table for user-created tables.
11/// Excludes:
12/// - sqlite_* system tables (sqlite_sequence, sqlite_stat1, etc.)
13/// - Internal tables
14///
15/// # Arguments
16///
17/// * `conn` - SQLite database connection
18///
19/// # Returns
20///
21/// Sorted vector of table names
22///
23/// # Examples
24///
25/// ```no_run
26/// # use database_replicator::sqlite::{open_sqlite, reader::list_tables};
27/// # fn example() -> anyhow::Result<()> {
28/// let conn = open_sqlite("database.db")?;
29/// let tables = list_tables(&conn)?;
30/// for table in tables {
31///     println!("Table: {}", table);
32/// }
33/// # Ok(())
34/// # }
35/// ```
36pub fn list_tables(conn: &Connection) -> Result<Vec<String>> {
37    tracing::debug!("Listing tables from SQLite database");
38
39    let mut stmt = conn
40        .prepare(
41            "SELECT name FROM sqlite_master \
42             WHERE type='table' \
43             AND name NOT LIKE 'sqlite_%' \
44             ORDER BY name",
45        )
46        .context("Failed to prepare statement to list tables")?;
47
48    let tables = stmt
49        .query_map([], |row| row.get::<_, String>(0))
50        .context("Failed to query table list")?
51        .collect::<Result<Vec<String>, _>>()
52        .context("Failed to collect table names")?;
53
54    tracing::info!("Found {} user tables in SQLite database", tables.len());
55
56    Ok(tables)
57}
58
59/// Get row count for a specific table
60///
61/// Executes COUNT(*) query on the specified table.
62///
63/// # Arguments
64///
65/// * `conn` - SQLite database connection
66/// * `table` - Table name (should be validated with validate_table_name)
67///
68/// # Returns
69///
70/// Number of rows in the table
71///
72/// # Security
73///
74/// IMPORTANT: Table name should be validated before calling this function
75/// to prevent SQL injection. Use crate::jsonb::validate_table_name().
76///
77/// # Examples
78///
79/// ```no_run
80/// # use database_replicator::sqlite::{open_sqlite, reader::get_table_row_count};
81/// # use database_replicator::jsonb::validate_table_name;
82/// # fn example() -> anyhow::Result<()> {
83/// let conn = open_sqlite("database.db")?;
84/// let table = "users";
85/// validate_table_name(table)?;
86/// let count = get_table_row_count(&conn, table)?;
87/// println!("Table {} has {} rows", table, count);
88/// # Ok(())
89/// # }
90/// ```
91pub fn get_table_row_count(conn: &Connection, table: &str) -> Result<usize> {
92    // Validate table name to prevent SQL injection
93    crate::jsonb::validate_table_name(table).context("Invalid table name for row count query")?;
94
95    tracing::debug!("Getting row count for table '{}'", table);
96
97    // Note: table name is validated above, so it's safe to use in SQL
98    let query = format!("SELECT COUNT(*) FROM {}", crate::utils::quote_ident(table));
99
100    let count: i64 = conn
101        .query_row(&query, [], |row| row.get(0))
102        .with_context(|| format!("Failed to count rows in table '{}'", table))?;
103
104    Ok(count as usize)
105}
106
107/// Batch reader state for iterating over large SQLite tables.
108///
109/// Uses rowid-based pagination to efficiently read tables in chunks
110/// without loading all data into memory.
111#[derive(Debug)]
112pub struct BatchedTableReader {
113    /// Table name being read
114    pub table: String,
115    /// Column names in the table
116    pub columns: Vec<String>,
117    /// Last rowid seen (for pagination)
118    pub last_rowid: i64,
119    /// Maximum rows per batch
120    pub batch_size: usize,
121    /// Whether all rows have been read
122    pub exhausted: bool,
123}
124
125impl BatchedTableReader {
126    /// Create a new batched reader for a table.
127    ///
128    /// # Arguments
129    ///
130    /// * `conn` - SQLite database connection
131    /// * `table` - Table name (must be validated)
132    /// * `batch_size` - Maximum rows per batch
133    pub fn new(conn: &Connection, table: &str, batch_size: usize) -> Result<Self> {
134        // Validate table name
135        crate::jsonb::validate_table_name(table)
136            .context("Invalid table name for batched reading")?;
137
138        // Get column names
139        let query = format!("SELECT * FROM {} LIMIT 0", crate::utils::quote_ident(table));
140        let stmt = conn
141            .prepare(&query)
142            .with_context(|| format!("Failed to prepare statement for table '{}'", table))?;
143
144        let columns: Vec<String> = stmt.column_names().iter().map(|s| s.to_string()).collect();
145
146        Ok(Self {
147            table: table.to_string(),
148            columns,
149            last_rowid: 0,
150            batch_size,
151            exhausted: false,
152        })
153    }
154}
155
156/// Read the next batch of rows from a table.
157///
158/// Uses rowid-based pagination for efficient batched reading.
159/// SQLite's rowid is always present (even if not explicitly defined)
160/// and provides stable ordering for pagination.
161///
162/// # Arguments
163///
164/// * `conn` - SQLite database connection
165/// * `reader` - Mutable batch reader state
166///
167/// # Returns
168///
169/// Some(rows) if there are more rows, None if exhausted.
170pub fn read_table_batch(
171    conn: &Connection,
172    reader: &mut BatchedTableReader,
173) -> Result<Option<Vec<HashMap<String, rusqlite::types::Value>>>> {
174    if reader.exhausted {
175        return Ok(None);
176    }
177
178    // Query using rowid for stable pagination
179    // rowid is always available in SQLite (alias for INTEGER PRIMARY KEY if defined)
180    let query = format!(
181        "SELECT rowid, * FROM {} WHERE rowid > ? ORDER BY rowid LIMIT ?",
182        crate::utils::quote_ident(&reader.table)
183    );
184
185    let mut stmt = conn
186        .prepare(&query)
187        .with_context(|| format!("Failed to prepare batch query for table '{}'", reader.table))?;
188
189    let column_names = &reader.columns;
190    let last_rowid = reader.last_rowid;
191    let batch_size = reader.batch_size as i64;
192
193    let rows: Vec<HashMap<String, rusqlite::types::Value>> = stmt
194        .query_map([last_rowid, batch_size], |row| {
195            let mut row_map = HashMap::new();
196
197            // First column is rowid (index 0), actual columns start at index 1
198            for (idx, col_name) in column_names.iter().enumerate() {
199                let value: rusqlite::types::Value = row.get(idx + 1)?;
200                row_map.insert(col_name.clone(), value);
201            }
202
203            // Also store rowid for pagination tracking
204            let rowid: i64 = row.get(0)?;
205            row_map.insert("_rowid".to_string(), rusqlite::types::Value::Integer(rowid));
206
207            Ok(row_map)
208        })
209        .with_context(|| format!("Failed to query batch from table '{}'", reader.table))?
210        .collect::<Result<Vec<_>, _>>()
211        .with_context(|| format!("Failed to collect batch from table '{}'", reader.table))?;
212
213    if rows.is_empty() {
214        reader.exhausted = true;
215        return Ok(None);
216    }
217
218    // Update last_rowid from the last row for next iteration
219    if let Some(last_row) = rows.last() {
220        if let Some(rusqlite::types::Value::Integer(rowid)) = last_row.get("_rowid") {
221            reader.last_rowid = *rowid;
222        }
223    }
224
225    // Mark as exhausted if we got fewer rows than batch_size
226    if rows.len() < reader.batch_size {
227        reader.exhausted = true;
228    }
229
230    tracing::debug!(
231        "Read batch of {} rows from '{}' (last_rowid={})",
232        rows.len(),
233        reader.table,
234        reader.last_rowid
235    );
236
237    Ok(Some(rows))
238}
239
240/// Read all data from a SQLite table
241///
242/// Returns all rows as a vector of HashMaps, where each HashMap maps
243/// column names to their values.
244///
245/// # Arguments
246///
247/// * `conn` - SQLite database connection
248/// * `table` - Table name (should be validated)
249///
250/// # Returns
251///
252/// Vector of rows, each row is a HashMap<column_name, value>
253///
254/// # Security
255///
256/// Table name should be validated before calling this function.
257///
258/// # Performance
259///
260/// Loads all rows into memory. For large tables, use `BatchedTableReader`
261/// and `read_table_batch()` instead.
262///
263/// # Examples
264///
265/// ```no_run
266/// # use database_replicator::sqlite::{open_sqlite, reader::read_table_data};
267/// # use database_replicator::jsonb::validate_table_name;
268/// # fn example() -> anyhow::Result<()> {
269/// let conn = open_sqlite("database.db")?;
270/// let table = "users";
271/// validate_table_name(table)?;
272/// let rows = read_table_data(&conn, table)?;
273/// println!("Read {} rows from {}", rows.len(), table);
274/// # Ok(())
275/// # }
276/// ```
277pub fn read_table_data(
278    conn: &Connection,
279    table: &str,
280) -> Result<Vec<HashMap<String, rusqlite::types::Value>>> {
281    // Validate table name to prevent SQL injection
282    crate::jsonb::validate_table_name(table).context("Invalid table name for data reading")?;
283
284    tracing::info!("Reading all data from table '{}'", table);
285
286    // Note: table name is validated above
287    let query = format!("SELECT * FROM {}", crate::utils::quote_ident(table));
288
289    let mut stmt = conn
290        .prepare(&query)
291        .with_context(|| format!("Failed to prepare statement for table '{}'", table))?;
292
293    // Get column names
294    let column_names: Vec<String> = stmt.column_names().iter().map(|s| s.to_string()).collect();
295
296    tracing::debug!(
297        "Table '{}' has {} columns: {:?}",
298        table,
299        column_names.len(),
300        column_names
301    );
302
303    // Read all rows
304    let rows = stmt
305        .query_map([], |row| {
306            let mut row_map = HashMap::new();
307
308            for (idx, col_name) in column_names.iter().enumerate() {
309                // Get value from row
310                // rusqlite::types::Value represents all SQLite types
311                let value: rusqlite::types::Value = row.get(idx)?;
312                row_map.insert(col_name.clone(), value);
313            }
314
315            Ok(row_map)
316        })
317        .with_context(|| format!("Failed to query rows from table '{}'", table))?
318        .collect::<Result<Vec<_>, _>>()
319        .with_context(|| format!("Failed to collect rows from table '{}'", table))?;
320
321    tracing::info!("Read {} rows from table '{}'", rows.len(), table);
322
323    Ok(rows)
324}
325
326#[cfg(test)]
327mod tests {
328    use super::*;
329
330    fn create_test_db() -> (tempfile::TempDir, std::path::PathBuf) {
331        let temp_dir = tempfile::tempdir().unwrap();
332        let db_path = temp_dir.path().join("test.db");
333
334        let conn = Connection::open(&db_path).unwrap();
335
336        // Create test tables
337        conn.execute(
338            "CREATE TABLE users (
339                id INTEGER PRIMARY KEY,
340                name TEXT NOT NULL,
341                email TEXT,
342                age INTEGER
343            )",
344            [],
345        )
346        .unwrap();
347
348        conn.execute(
349            "CREATE TABLE posts (
350                id INTEGER PRIMARY KEY,
351                user_id INTEGER,
352                title TEXT NOT NULL,
353                content TEXT
354            )",
355            [],
356        )
357        .unwrap();
358
359        // Insert test data
360        conn.execute(
361            "INSERT INTO users (id, name, email, age) VALUES (1, 'Alice', 'alice@example.com', 30)",
362            [],
363        )
364        .unwrap();
365        conn.execute(
366            "INSERT INTO users (id, name, email, age) VALUES (2, 'Bob', 'bob@example.com', 25)",
367            [],
368        )
369        .unwrap();
370        conn.execute(
371            "INSERT INTO users (id, name, email) VALUES (3, 'Charlie', 'charlie@example.com')",
372            [],
373        )
374        .unwrap();
375
376        conn.execute(
377            "INSERT INTO posts (id, user_id, title, content) VALUES (1, 1, 'First Post', 'Hello')",
378            [],
379        )
380        .unwrap();
381
382        (temp_dir, db_path)
383    }
384
385    #[test]
386    fn test_list_tables() {
387        let (_temp_dir, db_path) = create_test_db();
388        let conn = Connection::open(db_path).unwrap();
389
390        let tables = list_tables(&conn).unwrap();
391
392        assert_eq!(tables.len(), 2);
393        assert!(tables.contains(&"users".to_string()));
394        assert!(tables.contains(&"posts".to_string()));
395        assert_eq!(tables, vec!["posts", "users"]); // Should be sorted
396    }
397
398    #[test]
399    fn test_list_tables_excludes_system_tables() {
400        let (_temp_dir, db_path) = create_test_db();
401        let conn = Connection::open(&db_path).unwrap();
402
403        // Create a sequence (creates sqlite_sequence table)
404        conn.execute(
405            "CREATE TABLE test_autoincrement (id INTEGER PRIMARY KEY AUTOINCREMENT)",
406            [],
407        )
408        .unwrap();
409
410        let tables = list_tables(&conn).unwrap();
411
412        // Should not include sqlite_sequence
413        assert!(!tables.iter().any(|t| t.starts_with("sqlite_")));
414    }
415
416    #[test]
417    fn test_get_table_row_count() {
418        let (_temp_dir, db_path) = create_test_db();
419        let conn = Connection::open(db_path).unwrap();
420
421        let users_count = get_table_row_count(&conn, "users").unwrap();
422        assert_eq!(users_count, 3);
423
424        let posts_count = get_table_row_count(&conn, "posts").unwrap();
425        assert_eq!(posts_count, 1);
426    }
427
428    #[test]
429    fn test_get_table_row_count_invalid_table() {
430        let (_temp_dir, db_path) = create_test_db();
431        let conn = Connection::open(db_path).unwrap();
432
433        // SQL injection attempt
434        let result = get_table_row_count(&conn, "users; DROP TABLE users;");
435        assert!(result.is_err());
436        assert!(result
437            .unwrap_err()
438            .to_string()
439            .contains("Invalid table name"));
440    }
441
442    #[test]
443    fn test_read_table_data() {
444        let (_temp_dir, db_path) = create_test_db();
445        let conn = Connection::open(db_path).unwrap();
446
447        let rows = read_table_data(&conn, "users").unwrap();
448
449        assert_eq!(rows.len(), 3);
450
451        // Check first row
452        let first_row = &rows[0];
453        assert!(first_row.contains_key("id"));
454        assert!(first_row.contains_key("name"));
455        assert!(first_row.contains_key("email"));
456        assert!(first_row.contains_key("age"));
457
458        // Check data types
459        match &first_row["id"] {
460            rusqlite::types::Value::Integer(_) => (),
461            _ => panic!("id should be INTEGER"),
462        }
463
464        match &first_row["name"] {
465            rusqlite::types::Value::Text(_) => (),
466            _ => panic!("name should be TEXT"),
467        }
468    }
469
470    #[test]
471    fn test_read_table_data_handles_null() {
472        let (_temp_dir, db_path) = create_test_db();
473        let conn = Connection::open(db_path).unwrap();
474
475        let rows = read_table_data(&conn, "users").unwrap();
476
477        // Find Charlie (row with NULL age)
478        let charlie = rows.iter().find(|r| match &r["name"] {
479            rusqlite::types::Value::Text(s) => s == "Charlie",
480            _ => false,
481        });
482
483        assert!(charlie.is_some());
484        let charlie = charlie.unwrap();
485
486        // Age should be NULL
487        match &charlie["age"] {
488            rusqlite::types::Value::Null => (),
489            _ => panic!("Charlie's age should be NULL"),
490        }
491    }
492
493    #[test]
494    fn test_read_table_data_invalid_table() {
495        let (_temp_dir, db_path) = create_test_db();
496        let conn = Connection::open(db_path).unwrap();
497
498        // SQL injection attempt
499        let result = read_table_data(&conn, "users'; DROP TABLE users; --");
500        assert!(result.is_err());
501        assert!(result
502            .unwrap_err()
503            .to_string()
504            .contains("Invalid table name"));
505    }
506
507    #[test]
508    fn test_batched_table_reader_creation() {
509        let (_temp_dir, db_path) = create_test_db();
510        let conn = Connection::open(db_path).unwrap();
511
512        let reader = BatchedTableReader::new(&conn, "users", 100).unwrap();
513
514        assert_eq!(reader.table, "users");
515        assert_eq!(reader.batch_size, 100);
516        assert_eq!(reader.last_rowid, 0);
517        assert!(!reader.exhausted);
518        assert_eq!(reader.columns.len(), 4); // id, name, email, age
519    }
520
521    #[test]
522    fn test_batched_table_reader_invalid_table() {
523        let (_temp_dir, db_path) = create_test_db();
524        let conn = Connection::open(db_path).unwrap();
525
526        let result = BatchedTableReader::new(&conn, "users; DROP TABLE users;", 100);
527        assert!(result.is_err());
528        assert!(result
529            .unwrap_err()
530            .to_string()
531            .contains("Invalid table name"));
532    }
533
534    #[test]
535    fn test_read_table_batch_single_batch() {
536        let (_temp_dir, db_path) = create_test_db();
537        let conn = Connection::open(db_path).unwrap();
538
539        // Use batch size larger than row count - should get all rows in one batch
540        let mut reader = BatchedTableReader::new(&conn, "users", 100).unwrap();
541
542        // First batch should return all 3 rows
543        let batch1 = read_table_batch(&conn, &mut reader).unwrap();
544        assert!(batch1.is_some());
545        let rows = batch1.unwrap();
546        assert_eq!(rows.len(), 3);
547
548        // Second call should return None (exhausted)
549        let batch2 = read_table_batch(&conn, &mut reader).unwrap();
550        assert!(batch2.is_none());
551        assert!(reader.exhausted);
552    }
553
554    #[test]
555    fn test_read_table_batch_multiple_batches() {
556        let (_temp_dir, db_path) = create_test_db();
557        let conn = Connection::open(db_path).unwrap();
558
559        // Use batch size of 1 - should need multiple batches
560        let mut reader = BatchedTableReader::new(&conn, "users", 1).unwrap();
561
562        // Collect all batches
563        let mut all_rows = Vec::new();
564        while let Some(batch) = read_table_batch(&conn, &mut reader).unwrap() {
565            assert_eq!(batch.len(), 1); // Each batch should have 1 row
566            all_rows.extend(batch);
567        }
568
569        assert_eq!(all_rows.len(), 3);
570        assert!(reader.exhausted);
571    }
572
573    #[test]
574    fn test_read_table_batch_preserves_data() {
575        let (_temp_dir, db_path) = create_test_db();
576        let conn = Connection::open(db_path).unwrap();
577
578        let mut reader = BatchedTableReader::new(&conn, "users", 100).unwrap();
579        let batch = read_table_batch(&conn, &mut reader).unwrap().unwrap();
580
581        // Verify row contents (sorted by rowid)
582        let first_row = &batch[0];
583        assert!(first_row.contains_key("id"));
584        assert!(first_row.contains_key("name"));
585        assert!(first_row.contains_key("email"));
586        assert!(first_row.contains_key("age"));
587
588        // First row should be Alice (id=1)
589        match &first_row["name"] {
590            rusqlite::types::Value::Text(s) => assert_eq!(s, "Alice"),
591            _ => panic!("name should be TEXT"),
592        }
593    }
594
595    #[test]
596    fn test_read_table_batch_empty_table() {
597        let temp_dir = tempfile::tempdir().unwrap();
598        let db_path = temp_dir.path().join("empty.db");
599        let conn = Connection::open(&db_path).unwrap();
600
601        conn.execute(
602            "CREATE TABLE empty_table (id INTEGER PRIMARY KEY, name TEXT)",
603            [],
604        )
605        .unwrap();
606
607        let mut reader = BatchedTableReader::new(&conn, "empty_table", 100).unwrap();
608
609        // Should return None immediately for empty table
610        let batch = read_table_batch(&conn, &mut reader).unwrap();
611        assert!(batch.is_none());
612        assert!(reader.exhausted);
613    }
614
615    #[test]
616    fn test_read_table_batch_large_table() {
617        let temp_dir = tempfile::tempdir().unwrap();
618        let db_path = temp_dir.path().join("large.db");
619        let conn = Connection::open(&db_path).unwrap();
620
621        conn.execute(
622            "CREATE TABLE large_table (id INTEGER PRIMARY KEY, value TEXT)",
623            [],
624        )
625        .unwrap();
626
627        // Insert 250 rows
628        for i in 1..=250 {
629            conn.execute(
630                "INSERT INTO large_table (id, value) VALUES (?, ?)",
631                rusqlite::params![i, format!("value_{}", i)],
632            )
633            .unwrap();
634        }
635
636        // Read with batch size of 100
637        let mut reader = BatchedTableReader::new(&conn, "large_table", 100).unwrap();
638
639        let mut batch_count = 0;
640        let mut total_rows = 0;
641
642        while let Some(batch) = read_table_batch(&conn, &mut reader).unwrap() {
643            batch_count += 1;
644            total_rows += batch.len();
645
646            // Each batch should be at most 100 rows
647            assert!(batch.len() <= 100);
648        }
649
650        assert_eq!(total_rows, 250);
651        assert_eq!(batch_count, 3); // 100 + 100 + 50
652    }
653}