database_replicator/sqlite/
reader.rs1use anyhow::{Context, Result};
5use rusqlite::Connection;
6use std::collections::HashMap;
7
8pub fn list_tables(conn: &Connection) -> Result<Vec<String>> {
37 tracing::debug!("Listing tables from SQLite database");
38
39 let mut stmt = conn
40 .prepare(
41 "SELECT name FROM sqlite_master \
42 WHERE type='table' \
43 AND name NOT LIKE 'sqlite_%' \
44 ORDER BY name",
45 )
46 .context("Failed to prepare statement to list tables")?;
47
48 let tables = stmt
49 .query_map([], |row| row.get::<_, String>(0))
50 .context("Failed to query table list")?
51 .collect::<Result<Vec<String>, _>>()
52 .context("Failed to collect table names")?;
53
54 tracing::info!("Found {} user tables in SQLite database", tables.len());
55
56 Ok(tables)
57}
58
59pub fn get_table_row_count(conn: &Connection, table: &str) -> Result<usize> {
92 crate::jsonb::validate_table_name(table).context("Invalid table name for row count query")?;
94
95 tracing::debug!("Getting row count for table '{}'", table);
96
97 let query = format!("SELECT COUNT(*) FROM {}", crate::utils::quote_ident(table));
99
100 let count: i64 = conn
101 .query_row(&query, [], |row| row.get(0))
102 .with_context(|| format!("Failed to count rows in table '{}'", table))?;
103
104 Ok(count as usize)
105}
106
107pub fn read_table_data(
145 conn: &Connection,
146 table: &str,
147) -> Result<Vec<HashMap<String, rusqlite::types::Value>>> {
148 crate::jsonb::validate_table_name(table).context("Invalid table name for data reading")?;
150
151 tracing::info!("Reading all data from table '{}'", table);
152
153 let query = format!("SELECT * FROM {}", crate::utils::quote_ident(table));
155
156 let mut stmt = conn
157 .prepare(&query)
158 .with_context(|| format!("Failed to prepare statement for table '{}'", table))?;
159
160 let column_names: Vec<String> = stmt.column_names().iter().map(|s| s.to_string()).collect();
162
163 tracing::debug!(
164 "Table '{}' has {} columns: {:?}",
165 table,
166 column_names.len(),
167 column_names
168 );
169
170 let rows = stmt
172 .query_map([], |row| {
173 let mut row_map = HashMap::new();
174
175 for (idx, col_name) in column_names.iter().enumerate() {
176 let value: rusqlite::types::Value = row.get(idx)?;
179 row_map.insert(col_name.clone(), value);
180 }
181
182 Ok(row_map)
183 })
184 .with_context(|| format!("Failed to query rows from table '{}'", table))?
185 .collect::<Result<Vec<_>, _>>()
186 .with_context(|| format!("Failed to collect rows from table '{}'", table))?;
187
188 tracing::info!("Read {} rows from table '{}'", rows.len(), table);
189
190 Ok(rows)
191}
192
193#[cfg(test)]
194mod tests {
195 use super::*;
196
197 fn create_test_db() -> (tempfile::TempDir, std::path::PathBuf) {
198 let temp_dir = tempfile::tempdir().unwrap();
199 let db_path = temp_dir.path().join("test.db");
200
201 let conn = Connection::open(&db_path).unwrap();
202
203 conn.execute(
205 "CREATE TABLE users (
206 id INTEGER PRIMARY KEY,
207 name TEXT NOT NULL,
208 email TEXT,
209 age INTEGER
210 )",
211 [],
212 )
213 .unwrap();
214
215 conn.execute(
216 "CREATE TABLE posts (
217 id INTEGER PRIMARY KEY,
218 user_id INTEGER,
219 title TEXT NOT NULL,
220 content TEXT
221 )",
222 [],
223 )
224 .unwrap();
225
226 conn.execute(
228 "INSERT INTO users (id, name, email, age) VALUES (1, 'Alice', 'alice@example.com', 30)",
229 [],
230 )
231 .unwrap();
232 conn.execute(
233 "INSERT INTO users (id, name, email, age) VALUES (2, 'Bob', 'bob@example.com', 25)",
234 [],
235 )
236 .unwrap();
237 conn.execute(
238 "INSERT INTO users (id, name, email) VALUES (3, 'Charlie', 'charlie@example.com')",
239 [],
240 )
241 .unwrap();
242
243 conn.execute(
244 "INSERT INTO posts (id, user_id, title, content) VALUES (1, 1, 'First Post', 'Hello')",
245 [],
246 )
247 .unwrap();
248
249 (temp_dir, db_path)
250 }
251
252 #[test]
253 fn test_list_tables() {
254 let (_temp_dir, db_path) = create_test_db();
255 let conn = Connection::open(db_path).unwrap();
256
257 let tables = list_tables(&conn).unwrap();
258
259 assert_eq!(tables.len(), 2);
260 assert!(tables.contains(&"users".to_string()));
261 assert!(tables.contains(&"posts".to_string()));
262 assert_eq!(tables, vec!["posts", "users"]); }
264
265 #[test]
266 fn test_list_tables_excludes_system_tables() {
267 let (_temp_dir, db_path) = create_test_db();
268 let conn = Connection::open(&db_path).unwrap();
269
270 conn.execute(
272 "CREATE TABLE test_autoincrement (id INTEGER PRIMARY KEY AUTOINCREMENT)",
273 [],
274 )
275 .unwrap();
276
277 let tables = list_tables(&conn).unwrap();
278
279 assert!(!tables.iter().any(|t| t.starts_with("sqlite_")));
281 }
282
283 #[test]
284 fn test_get_table_row_count() {
285 let (_temp_dir, db_path) = create_test_db();
286 let conn = Connection::open(db_path).unwrap();
287
288 let users_count = get_table_row_count(&conn, "users").unwrap();
289 assert_eq!(users_count, 3);
290
291 let posts_count = get_table_row_count(&conn, "posts").unwrap();
292 assert_eq!(posts_count, 1);
293 }
294
295 #[test]
296 fn test_get_table_row_count_invalid_table() {
297 let (_temp_dir, db_path) = create_test_db();
298 let conn = Connection::open(db_path).unwrap();
299
300 let result = get_table_row_count(&conn, "users; DROP TABLE users;");
302 assert!(result.is_err());
303 assert!(result
304 .unwrap_err()
305 .to_string()
306 .contains("Invalid table name"));
307 }
308
309 #[test]
310 fn test_read_table_data() {
311 let (_temp_dir, db_path) = create_test_db();
312 let conn = Connection::open(db_path).unwrap();
313
314 let rows = read_table_data(&conn, "users").unwrap();
315
316 assert_eq!(rows.len(), 3);
317
318 let first_row = &rows[0];
320 assert!(first_row.contains_key("id"));
321 assert!(first_row.contains_key("name"));
322 assert!(first_row.contains_key("email"));
323 assert!(first_row.contains_key("age"));
324
325 match &first_row["id"] {
327 rusqlite::types::Value::Integer(_) => (),
328 _ => panic!("id should be INTEGER"),
329 }
330
331 match &first_row["name"] {
332 rusqlite::types::Value::Text(_) => (),
333 _ => panic!("name should be TEXT"),
334 }
335 }
336
337 #[test]
338 fn test_read_table_data_handles_null() {
339 let (_temp_dir, db_path) = create_test_db();
340 let conn = Connection::open(db_path).unwrap();
341
342 let rows = read_table_data(&conn, "users").unwrap();
343
344 let charlie = rows.iter().find(|r| match &r["name"] {
346 rusqlite::types::Value::Text(s) => s == "Charlie",
347 _ => false,
348 });
349
350 assert!(charlie.is_some());
351 let charlie = charlie.unwrap();
352
353 match &charlie["age"] {
355 rusqlite::types::Value::Null => (),
356 _ => panic!("Charlie's age should be NULL"),
357 }
358 }
359
360 #[test]
361 fn test_read_table_data_invalid_table() {
362 let (_temp_dir, db_path) = create_test_db();
363 let conn = Connection::open(db_path).unwrap();
364
365 let result = read_table_data(&conn, "users'; DROP TABLE users; --");
367 assert!(result.is_err());
368 assert!(result
369 .unwrap_err()
370 .to_string()
371 .contains("Invalid table name"));
372 }
373}