1use anyhow::{Context, Result};
5use rusqlite::Connection;
6use std::collections::HashMap;
7
8pub fn list_tables(conn: &Connection) -> Result<Vec<String>> {
37 tracing::debug!("Listing tables from SQLite database");
38
39 let mut stmt = conn
40 .prepare(
41 "SELECT name FROM sqlite_master \
42 WHERE type='table' \
43 AND name NOT LIKE 'sqlite_%' \
44 ORDER BY name",
45 )
46 .context("Failed to prepare statement to list tables")?;
47
48 let tables = stmt
49 .query_map([], |row| row.get::<_, String>(0))
50 .context("Failed to query table list")?
51 .collect::<Result<Vec<String>, _>>()
52 .context("Failed to collect table names")?;
53
54 tracing::info!("Found {} user tables in SQLite database", tables.len());
55
56 Ok(tables)
57}
58
59pub fn get_table_row_count(conn: &Connection, table: &str) -> Result<usize> {
92 crate::jsonb::validate_table_name(table).context("Invalid table name for row count query")?;
94
95 tracing::debug!("Getting row count for table '{}'", table);
96
97 let query = format!("SELECT COUNT(*) FROM {}", crate::utils::quote_ident(table));
99
100 let count: i64 = conn
101 .query_row(&query, [], |row| row.get(0))
102 .with_context(|| format!("Failed to count rows in table '{}'", table))?;
103
104 Ok(count as usize)
105}
106
107#[derive(Debug)]
112pub struct BatchedTableReader {
113 pub table: String,
115 pub columns: Vec<String>,
117 pub last_rowid: i64,
119 pub batch_size: usize,
121 pub exhausted: bool,
123}
124
125impl BatchedTableReader {
126 pub fn new(conn: &Connection, table: &str, batch_size: usize) -> Result<Self> {
134 crate::jsonb::validate_table_name(table)
136 .context("Invalid table name for batched reading")?;
137
138 let query = format!("SELECT * FROM {} LIMIT 0", crate::utils::quote_ident(table));
140 let stmt = conn
141 .prepare(&query)
142 .with_context(|| format!("Failed to prepare statement for table '{}'", table))?;
143
144 let columns: Vec<String> = stmt.column_names().iter().map(|s| s.to_string()).collect();
145
146 Ok(Self {
147 table: table.to_string(),
148 columns,
149 last_rowid: 0,
150 batch_size,
151 exhausted: false,
152 })
153 }
154}
155
156pub fn read_table_batch(
171 conn: &Connection,
172 reader: &mut BatchedTableReader,
173) -> Result<Option<Vec<HashMap<String, rusqlite::types::Value>>>> {
174 if reader.exhausted {
175 return Ok(None);
176 }
177
178 let query = format!(
181 "SELECT rowid, * FROM {} WHERE rowid > ? ORDER BY rowid LIMIT ?",
182 crate::utils::quote_ident(&reader.table)
183 );
184
185 let mut stmt = conn
186 .prepare(&query)
187 .with_context(|| format!("Failed to prepare batch query for table '{}'", reader.table))?;
188
189 let column_names = &reader.columns;
190 let last_rowid = reader.last_rowid;
191 let batch_size = reader.batch_size as i64;
192
193 let rows: Vec<HashMap<String, rusqlite::types::Value>> = stmt
194 .query_map([last_rowid, batch_size], |row| {
195 let mut row_map = HashMap::new();
196
197 for (idx, col_name) in column_names.iter().enumerate() {
199 let value: rusqlite::types::Value = row.get(idx + 1)?;
200 row_map.insert(col_name.clone(), value);
201 }
202
203 let rowid: i64 = row.get(0)?;
205 row_map.insert("_rowid".to_string(), rusqlite::types::Value::Integer(rowid));
206
207 Ok(row_map)
208 })
209 .with_context(|| format!("Failed to query batch from table '{}'", reader.table))?
210 .collect::<Result<Vec<_>, _>>()
211 .with_context(|| format!("Failed to collect batch from table '{}'", reader.table))?;
212
213 if rows.is_empty() {
214 reader.exhausted = true;
215 return Ok(None);
216 }
217
218 if let Some(last_row) = rows.last() {
220 if let Some(rusqlite::types::Value::Integer(rowid)) = last_row.get("_rowid") {
221 reader.last_rowid = *rowid;
222 }
223 }
224
225 if rows.len() < reader.batch_size {
227 reader.exhausted = true;
228 }
229
230 tracing::debug!(
231 "Read batch of {} rows from '{}' (last_rowid={})",
232 rows.len(),
233 reader.table,
234 reader.last_rowid
235 );
236
237 Ok(Some(rows))
238}
239
240pub fn read_table_data(
278 conn: &Connection,
279 table: &str,
280) -> Result<Vec<HashMap<String, rusqlite::types::Value>>> {
281 crate::jsonb::validate_table_name(table).context("Invalid table name for data reading")?;
283
284 tracing::info!("Reading all data from table '{}'", table);
285
286 let query = format!("SELECT * FROM {}", crate::utils::quote_ident(table));
288
289 let mut stmt = conn
290 .prepare(&query)
291 .with_context(|| format!("Failed to prepare statement for table '{}'", table))?;
292
293 let column_names: Vec<String> = stmt.column_names().iter().map(|s| s.to_string()).collect();
295
296 tracing::debug!(
297 "Table '{}' has {} columns: {:?}",
298 table,
299 column_names.len(),
300 column_names
301 );
302
303 let rows = stmt
305 .query_map([], |row| {
306 let mut row_map = HashMap::new();
307
308 for (idx, col_name) in column_names.iter().enumerate() {
309 let value: rusqlite::types::Value = row.get(idx)?;
312 row_map.insert(col_name.clone(), value);
313 }
314
315 Ok(row_map)
316 })
317 .with_context(|| format!("Failed to query rows from table '{}'", table))?
318 .collect::<Result<Vec<_>, _>>()
319 .with_context(|| format!("Failed to collect rows from table '{}'", table))?;
320
321 tracing::info!("Read {} rows from table '{}'", rows.len(), table);
322
323 Ok(rows)
324}
325
326#[cfg(test)]
327mod tests {
328 use super::*;
329
330 fn create_test_db() -> (tempfile::TempDir, std::path::PathBuf) {
331 let temp_dir = tempfile::tempdir().unwrap();
332 let db_path = temp_dir.path().join("test.db");
333
334 let conn = Connection::open(&db_path).unwrap();
335
336 conn.execute(
338 "CREATE TABLE users (
339 id INTEGER PRIMARY KEY,
340 name TEXT NOT NULL,
341 email TEXT,
342 age INTEGER
343 )",
344 [],
345 )
346 .unwrap();
347
348 conn.execute(
349 "CREATE TABLE posts (
350 id INTEGER PRIMARY KEY,
351 user_id INTEGER,
352 title TEXT NOT NULL,
353 content TEXT
354 )",
355 [],
356 )
357 .unwrap();
358
359 conn.execute(
361 "INSERT INTO users (id, name, email, age) VALUES (1, 'Alice', 'alice@example.com', 30)",
362 [],
363 )
364 .unwrap();
365 conn.execute(
366 "INSERT INTO users (id, name, email, age) VALUES (2, 'Bob', 'bob@example.com', 25)",
367 [],
368 )
369 .unwrap();
370 conn.execute(
371 "INSERT INTO users (id, name, email) VALUES (3, 'Charlie', 'charlie@example.com')",
372 [],
373 )
374 .unwrap();
375
376 conn.execute(
377 "INSERT INTO posts (id, user_id, title, content) VALUES (1, 1, 'First Post', 'Hello')",
378 [],
379 )
380 .unwrap();
381
382 (temp_dir, db_path)
383 }
384
385 #[test]
386 fn test_list_tables() {
387 let (_temp_dir, db_path) = create_test_db();
388 let conn = Connection::open(db_path).unwrap();
389
390 let tables = list_tables(&conn).unwrap();
391
392 assert_eq!(tables.len(), 2);
393 assert!(tables.contains(&"users".to_string()));
394 assert!(tables.contains(&"posts".to_string()));
395 assert_eq!(tables, vec!["posts", "users"]); }
397
398 #[test]
399 fn test_list_tables_excludes_system_tables() {
400 let (_temp_dir, db_path) = create_test_db();
401 let conn = Connection::open(&db_path).unwrap();
402
403 conn.execute(
405 "CREATE TABLE test_autoincrement (id INTEGER PRIMARY KEY AUTOINCREMENT)",
406 [],
407 )
408 .unwrap();
409
410 let tables = list_tables(&conn).unwrap();
411
412 assert!(!tables.iter().any(|t| t.starts_with("sqlite_")));
414 }
415
416 #[test]
417 fn test_get_table_row_count() {
418 let (_temp_dir, db_path) = create_test_db();
419 let conn = Connection::open(db_path).unwrap();
420
421 let users_count = get_table_row_count(&conn, "users").unwrap();
422 assert_eq!(users_count, 3);
423
424 let posts_count = get_table_row_count(&conn, "posts").unwrap();
425 assert_eq!(posts_count, 1);
426 }
427
428 #[test]
429 fn test_get_table_row_count_invalid_table() {
430 let (_temp_dir, db_path) = create_test_db();
431 let conn = Connection::open(db_path).unwrap();
432
433 let result = get_table_row_count(&conn, "users; DROP TABLE users;");
435 assert!(result.is_err());
436 assert!(result
437 .unwrap_err()
438 .to_string()
439 .contains("Invalid table name"));
440 }
441
442 #[test]
443 fn test_read_table_data() {
444 let (_temp_dir, db_path) = create_test_db();
445 let conn = Connection::open(db_path).unwrap();
446
447 let rows = read_table_data(&conn, "users").unwrap();
448
449 assert_eq!(rows.len(), 3);
450
451 let first_row = &rows[0];
453 assert!(first_row.contains_key("id"));
454 assert!(first_row.contains_key("name"));
455 assert!(first_row.contains_key("email"));
456 assert!(first_row.contains_key("age"));
457
458 match &first_row["id"] {
460 rusqlite::types::Value::Integer(_) => (),
461 _ => panic!("id should be INTEGER"),
462 }
463
464 match &first_row["name"] {
465 rusqlite::types::Value::Text(_) => (),
466 _ => panic!("name should be TEXT"),
467 }
468 }
469
470 #[test]
471 fn test_read_table_data_handles_null() {
472 let (_temp_dir, db_path) = create_test_db();
473 let conn = Connection::open(db_path).unwrap();
474
475 let rows = read_table_data(&conn, "users").unwrap();
476
477 let charlie = rows.iter().find(|r| match &r["name"] {
479 rusqlite::types::Value::Text(s) => s == "Charlie",
480 _ => false,
481 });
482
483 assert!(charlie.is_some());
484 let charlie = charlie.unwrap();
485
486 match &charlie["age"] {
488 rusqlite::types::Value::Null => (),
489 _ => panic!("Charlie's age should be NULL"),
490 }
491 }
492
493 #[test]
494 fn test_read_table_data_invalid_table() {
495 let (_temp_dir, db_path) = create_test_db();
496 let conn = Connection::open(db_path).unwrap();
497
498 let result = read_table_data(&conn, "users'; DROP TABLE users; --");
500 assert!(result.is_err());
501 assert!(result
502 .unwrap_err()
503 .to_string()
504 .contains("Invalid table name"));
505 }
506
507 #[test]
508 fn test_batched_table_reader_creation() {
509 let (_temp_dir, db_path) = create_test_db();
510 let conn = Connection::open(db_path).unwrap();
511
512 let reader = BatchedTableReader::new(&conn, "users", 100).unwrap();
513
514 assert_eq!(reader.table, "users");
515 assert_eq!(reader.batch_size, 100);
516 assert_eq!(reader.last_rowid, 0);
517 assert!(!reader.exhausted);
518 assert_eq!(reader.columns.len(), 4); }
520
521 #[test]
522 fn test_batched_table_reader_invalid_table() {
523 let (_temp_dir, db_path) = create_test_db();
524 let conn = Connection::open(db_path).unwrap();
525
526 let result = BatchedTableReader::new(&conn, "users; DROP TABLE users;", 100);
527 assert!(result.is_err());
528 assert!(result
529 .unwrap_err()
530 .to_string()
531 .contains("Invalid table name"));
532 }
533
534 #[test]
535 fn test_read_table_batch_single_batch() {
536 let (_temp_dir, db_path) = create_test_db();
537 let conn = Connection::open(db_path).unwrap();
538
539 let mut reader = BatchedTableReader::new(&conn, "users", 100).unwrap();
541
542 let batch1 = read_table_batch(&conn, &mut reader).unwrap();
544 assert!(batch1.is_some());
545 let rows = batch1.unwrap();
546 assert_eq!(rows.len(), 3);
547
548 let batch2 = read_table_batch(&conn, &mut reader).unwrap();
550 assert!(batch2.is_none());
551 assert!(reader.exhausted);
552 }
553
554 #[test]
555 fn test_read_table_batch_multiple_batches() {
556 let (_temp_dir, db_path) = create_test_db();
557 let conn = Connection::open(db_path).unwrap();
558
559 let mut reader = BatchedTableReader::new(&conn, "users", 1).unwrap();
561
562 let mut all_rows = Vec::new();
564 while let Some(batch) = read_table_batch(&conn, &mut reader).unwrap() {
565 assert_eq!(batch.len(), 1); all_rows.extend(batch);
567 }
568
569 assert_eq!(all_rows.len(), 3);
570 assert!(reader.exhausted);
571 }
572
573 #[test]
574 fn test_read_table_batch_preserves_data() {
575 let (_temp_dir, db_path) = create_test_db();
576 let conn = Connection::open(db_path).unwrap();
577
578 let mut reader = BatchedTableReader::new(&conn, "users", 100).unwrap();
579 let batch = read_table_batch(&conn, &mut reader).unwrap().unwrap();
580
581 let first_row = &batch[0];
583 assert!(first_row.contains_key("id"));
584 assert!(first_row.contains_key("name"));
585 assert!(first_row.contains_key("email"));
586 assert!(first_row.contains_key("age"));
587
588 match &first_row["name"] {
590 rusqlite::types::Value::Text(s) => assert_eq!(s, "Alice"),
591 _ => panic!("name should be TEXT"),
592 }
593 }
594
595 #[test]
596 fn test_read_table_batch_empty_table() {
597 let temp_dir = tempfile::tempdir().unwrap();
598 let db_path = temp_dir.path().join("empty.db");
599 let conn = Connection::open(&db_path).unwrap();
600
601 conn.execute(
602 "CREATE TABLE empty_table (id INTEGER PRIMARY KEY, name TEXT)",
603 [],
604 )
605 .unwrap();
606
607 let mut reader = BatchedTableReader::new(&conn, "empty_table", 100).unwrap();
608
609 let batch = read_table_batch(&conn, &mut reader).unwrap();
611 assert!(batch.is_none());
612 assert!(reader.exhausted);
613 }
614
615 #[test]
616 fn test_read_table_batch_large_table() {
617 let temp_dir = tempfile::tempdir().unwrap();
618 let db_path = temp_dir.path().join("large.db");
619 let conn = Connection::open(&db_path).unwrap();
620
621 conn.execute(
622 "CREATE TABLE large_table (id INTEGER PRIMARY KEY, value TEXT)",
623 [],
624 )
625 .unwrap();
626
627 for i in 1..=250 {
629 conn.execute(
630 "INSERT INTO large_table (id, value) VALUES (?, ?)",
631 rusqlite::params![i, format!("value_{}", i)],
632 )
633 .unwrap();
634 }
635
636 let mut reader = BatchedTableReader::new(&conn, "large_table", 100).unwrap();
638
639 let mut batch_count = 0;
640 let mut total_rows = 0;
641
642 while let Some(batch) = read_table_batch(&conn, &mut reader).unwrap() {
643 batch_count += 1;
644 total_rows += batch.len();
645
646 assert!(batch.len() <= 100);
648 }
649
650 assert_eq!(total_rows, 250);
651 assert_eq!(batch_count, 3); }
653}