database_replicator/mongodb/reader.rs
1// ABOUTME: MongoDB data reading functions for collection introspection and document retrieval
2// ABOUTME: Provides read-only access to MongoDB collections with security validation
3
4use anyhow::{Context, Result};
5use bson::Document;
6use mongodb::{Client, Database};
7
8/// List all collection names in a MongoDB database
9///
10/// Retrieves names of all collections in the specified database.
11/// System collections (starting with "system.") are excluded.
12///
13/// # Arguments
14///
15/// * `client` - MongoDB client connection
16/// * `db_name` - Database name to list collections from
17///
18/// # Returns
19///
20/// Vector of collection names
21///
22/// # Security
23///
24/// Only lists user collections, system collections are excluded
25///
26/// # Examples
27///
28/// ```no_run
29/// # use database_replicator::mongodb::{connect_mongodb, reader::list_collections};
30/// # async fn example() -> anyhow::Result<()> {
31/// let client = connect_mongodb("mongodb://localhost:27017/mydb").await?;
32/// let collections = list_collections(&client, "mydb").await?;
33/// println!("Found {} collections", collections.len());
34/// # Ok(())
35/// # }
36/// ```
37pub async fn list_collections(client: &Client, db_name: &str) -> Result<Vec<String>> {
38 tracing::info!("Listing collections in database '{}'", db_name);
39
40 let database = client.database(db_name);
41
42 let collection_names = database
43 .list_collection_names()
44 .await
45 .with_context(|| format!("Failed to list collections in database '{}'", db_name))?;
46
47 // Filter out system collections
48 let user_collections: Vec<String> = collection_names
49 .into_iter()
50 .filter(|name| !name.starts_with("system."))
51 .collect();
52
53 tracing::debug!(
54 "Found {} user collections in '{}'",
55 user_collections.len(),
56 db_name
57 );
58
59 Ok(user_collections)
60}
61
62/// Get document count for a MongoDB collection
63///
64/// Returns the total number of documents in the collection.
65///
66/// # Arguments
67///
68/// * `database` - MongoDB database reference
69/// * `collection_name` - Collection name (must be validated)
70///
71/// # Returns
72///
73/// Number of documents in the collection
74///
75/// # Security
76///
77/// Collection name should be validated before calling this function.
78///
79/// # Examples
80///
81/// ```no_run
82/// # use database_replicator::mongodb::{connect_mongodb, reader::get_collection_count};
83/// # use database_replicator::jsonb::validate_table_name;
84/// # async fn example() -> anyhow::Result<()> {
85/// let client = connect_mongodb("mongodb://localhost:27017/mydb").await?;
86/// let db = client.database("mydb");
87/// let collection = "users";
88/// validate_table_name(collection)?;
89/// let count = get_collection_count(&db, collection).await?;
90/// println!("Collection '{}' has {} documents", collection, count);
91/// # Ok(())
92/// # }
93/// ```
94pub async fn get_collection_count(database: &Database, collection_name: &str) -> Result<usize> {
95 // Validate collection name to prevent injection
96 crate::jsonb::validate_table_name(collection_name)
97 .context("Invalid collection name for count query")?;
98
99 tracing::debug!(
100 "Getting document count for collection '{}'",
101 collection_name
102 );
103
104 let collection = database.collection::<Document>(collection_name);
105
106 let count = collection
107 .estimated_document_count()
108 .await
109 .with_context(|| {
110 format!(
111 "Failed to count documents in collection '{}'",
112 collection_name
113 )
114 })?;
115
116 Ok(count as usize)
117}
118
119/// Read all documents from a MongoDB collection
120///
121/// Reads all documents from the collection and returns them as BSON documents.
122/// For large collections, this may consume significant memory.
123///
124/// # Arguments
125///
126/// * `database` - MongoDB database reference
127/// * `collection_name` - Collection name (must be validated)
128///
129/// # Returns
130///
131/// Vector of BSON documents from the collection
132///
133/// # Security
134///
135/// - Collection name is validated before querying
136/// - Read-only operation, no modifications possible
137///
138/// # Examples
139///
140/// ```no_run
141/// # use database_replicator::mongodb::{connect_mongodb, reader::read_collection_data};
142/// # use database_replicator::jsonb::validate_table_name;
143/// # async fn example() -> anyhow::Result<()> {
144/// let client = connect_mongodb("mongodb://localhost:27017/mydb").await?;
145/// let db = client.database("mydb");
146/// let collection = "users";
147/// validate_table_name(collection)?;
148/// let documents = read_collection_data(&db, collection).await?;
149/// println!("Read {} documents", documents.len());
150/// # Ok(())
151/// # }
152/// ```
153pub async fn read_collection_data(
154 database: &Database,
155 collection_name: &str,
156) -> Result<Vec<Document>> {
157 // Validate collection name to prevent injection
158 crate::jsonb::validate_table_name(collection_name)
159 .context("Invalid collection name for data reading")?;
160
161 tracing::info!(
162 "Reading all documents from collection '{}'",
163 collection_name
164 );
165
166 let collection = database.collection::<Document>(collection_name);
167
168 let mut cursor = collection
169 .find(bson::Document::new())
170 .await
171 .with_context(|| format!("Failed to query collection '{}'", collection_name))?;
172
173 let mut documents = Vec::new();
174
175 use futures::stream::StreamExt;
176 while let Some(result) = cursor.next().await {
177 let document = result.with_context(|| {
178 format!(
179 "Failed to read document from collection '{}'",
180 collection_name
181 )
182 })?;
183 documents.push(document);
184 }
185
186 tracing::info!(
187 "Read {} documents from collection '{}'",
188 documents.len(),
189 collection_name
190 );
191
192 Ok(documents)
193}
194
195#[cfg(test)]
196mod tests {
197 #[test]
198 fn test_validate_collection_names() {
199 // Valid collection names should pass validation
200 let valid_names = vec!["users", "user_events", "UserData", "_private"];
201
202 for name in valid_names {
203 let result = crate::jsonb::validate_table_name(name);
204 assert!(
205 result.is_ok(),
206 "Valid collection name should pass: {}",
207 name
208 );
209 }
210 }
211
212 #[test]
213 fn test_reject_invalid_collection_names() {
214 // Invalid collection names should be rejected
215 let invalid_names = vec![
216 "users; DROP DATABASE;",
217 "users--",
218 "select",
219 "insert",
220 "drop",
221 ];
222
223 for name in invalid_names {
224 let result = crate::jsonb::validate_table_name(name);
225 assert!(
226 result.is_err(),
227 "Invalid collection name should be rejected: {}",
228 name
229 );
230 }
231 }
232}