database_replicator/mongodb/
reader.rs

1// ABOUTME: MongoDB data reading functions for collection introspection and document retrieval
2// ABOUTME: Provides read-only access to MongoDB collections with security validation
3
4use anyhow::{Context, Result};
5use bson::Document;
6use mongodb::{Client, Database};
7
8/// List all collection names in a MongoDB database
9///
10/// Retrieves names of all collections in the specified database.
11/// System collections (starting with "system.") are excluded.
12///
13/// # Arguments
14///
15/// * `client` - MongoDB client connection
16/// * `db_name` - Database name to list collections from
17///
18/// # Returns
19///
20/// Vector of collection names
21///
22/// # Security
23///
24/// Only lists user collections, system collections are excluded
25///
26/// # Examples
27///
28/// ```no_run
29/// # use database_replicator::mongodb::{connect_mongodb, reader::list_collections};
30/// # async fn example() -> anyhow::Result<()> {
31/// let client = connect_mongodb("mongodb://localhost:27017/mydb").await?;
32/// let collections = list_collections(&client, "mydb").await?;
33/// println!("Found {} collections", collections.len());
34/// # Ok(())
35/// # }
36/// ```
37pub async fn list_collections(client: &Client, db_name: &str) -> Result<Vec<String>> {
38    tracing::info!("Listing collections in database '{}'", db_name);
39
40    let database = client.database(db_name);
41
42    let collection_names = database
43        .list_collection_names()
44        .await
45        .with_context(|| format!("Failed to list collections in database '{}'", db_name))?;
46
47    // Filter out system collections
48    let user_collections: Vec<String> = collection_names
49        .into_iter()
50        .filter(|name| !name.starts_with("system."))
51        .collect();
52
53    tracing::debug!(
54        "Found {} user collections in '{}'",
55        user_collections.len(),
56        db_name
57    );
58
59    Ok(user_collections)
60}
61
62/// Get document count for a MongoDB collection
63///
64/// Returns the total number of documents in the collection.
65///
66/// # Arguments
67///
68/// * `database` - MongoDB database reference
69/// * `collection_name` - Collection name (must be validated)
70///
71/// # Returns
72///
73/// Number of documents in the collection
74///
75/// # Security
76///
77/// Collection name should be validated before calling this function.
78///
79/// # Examples
80///
81/// ```no_run
82/// # use database_replicator::mongodb::{connect_mongodb, reader::get_collection_count};
83/// # use database_replicator::jsonb::validate_table_name;
84/// # async fn example() -> anyhow::Result<()> {
85/// let client = connect_mongodb("mongodb://localhost:27017/mydb").await?;
86/// let db = client.database("mydb");
87/// let collection = "users";
88/// validate_table_name(collection)?;
89/// let count = get_collection_count(&db, collection).await?;
90/// println!("Collection '{}' has {} documents", collection, count);
91/// # Ok(())
92/// # }
93/// ```
94pub async fn get_collection_count(database: &Database, collection_name: &str) -> Result<usize> {
95    // Validate collection name to prevent injection
96    crate::jsonb::validate_table_name(collection_name)
97        .context("Invalid collection name for count query")?;
98
99    tracing::debug!(
100        "Getting document count for collection '{}'",
101        collection_name
102    );
103
104    let collection = database.collection::<Document>(collection_name);
105
106    let count = collection
107        .estimated_document_count()
108        .await
109        .with_context(|| {
110            format!(
111                "Failed to count documents in collection '{}'",
112                collection_name
113            )
114        })?;
115
116    Ok(count as usize)
117}
118
119/// Read all documents from a MongoDB collection
120///
121/// Reads all documents from the collection and returns them as BSON documents.
122/// For large collections, this may consume significant memory.
123///
124/// # Arguments
125///
126/// * `database` - MongoDB database reference
127/// * `collection_name` - Collection name (must be validated)
128///
129/// # Returns
130///
131/// Vector of BSON documents from the collection
132///
133/// # Security
134///
135/// - Collection name is validated before querying
136/// - Read-only operation, no modifications possible
137///
138/// # Examples
139///
140/// ```no_run
141/// # use database_replicator::mongodb::{connect_mongodb, reader::read_collection_data};
142/// # use database_replicator::jsonb::validate_table_name;
143/// # async fn example() -> anyhow::Result<()> {
144/// let client = connect_mongodb("mongodb://localhost:27017/mydb").await?;
145/// let db = client.database("mydb");
146/// let collection = "users";
147/// validate_table_name(collection)?;
148/// let documents = read_collection_data(&db, collection).await?;
149/// println!("Read {} documents", documents.len());
150/// # Ok(())
151/// # }
152/// ```
153pub async fn read_collection_data(
154    database: &Database,
155    collection_name: &str,
156) -> Result<Vec<Document>> {
157    // Validate collection name to prevent injection
158    crate::jsonb::validate_table_name(collection_name)
159        .context("Invalid collection name for data reading")?;
160
161    tracing::info!(
162        "Reading all documents from collection '{}'",
163        collection_name
164    );
165
166    let collection = database.collection::<Document>(collection_name);
167
168    let mut cursor = collection
169        .find(bson::Document::new())
170        .await
171        .with_context(|| format!("Failed to query collection '{}'", collection_name))?;
172
173    let mut documents = Vec::new();
174
175    use futures::stream::StreamExt;
176    while let Some(result) = cursor.next().await {
177        let document = result.with_context(|| {
178            format!(
179                "Failed to read document from collection '{}'",
180                collection_name
181            )
182        })?;
183        documents.push(document);
184    }
185
186    tracing::info!(
187        "Read {} documents from collection '{}'",
188        documents.len(),
189        collection_name
190    );
191
192    Ok(documents)
193}
194
195#[cfg(test)]
196mod tests {
197    #[test]
198    fn test_validate_collection_names() {
199        // Valid collection names should pass validation
200        let valid_names = vec!["users", "user_events", "UserData", "_private"];
201
202        for name in valid_names {
203            let result = crate::jsonb::validate_table_name(name);
204            assert!(
205                result.is_ok(),
206                "Valid collection name should pass: {}",
207                name
208            );
209        }
210    }
211
212    #[test]
213    fn test_reject_invalid_collection_names() {
214        // Invalid collection names should be rejected
215        let invalid_names = vec![
216            "users; DROP DATABASE;",
217            "users--",
218            "select",
219            "insert",
220            "drop",
221        ];
222
223        for name in invalid_names {
224            let result = crate::jsonb::validate_table_name(name);
225            assert!(
226                result.is_err(),
227                "Invalid collection name should be rejected: {}",
228                name
229            );
230        }
231    }
232}