database_replicator/mongodb/
converter.rs

1// ABOUTME: MongoDB BSON to JSONB type conversion for PostgreSQL storage
2// ABOUTME: Handles all BSON types with lossless conversion and special type encoding
3
4use anyhow::{Context, Result};
5use bson::{Bson, Document};
6use mongodb::Database;
7use serde_json::Value as JsonValue;
8
9/// Convert a BSON value to JSON
10///
11/// Maps BSON types to JSON types:
12/// - Int32/Int64 → number
13/// - Double → number
14/// - String → string
15/// - Bool → boolean
16/// - Array → array
17/// - Document → object
18/// - ObjectId → object with $oid field
19/// - DateTime → object with $date field
20/// - Binary → object with $binary field (base64)
21/// - Null/Undefined → null
22///
23/// # Arguments
24///
25/// * `value` - BSON value from MongoDB
26///
27/// # Returns
28///
29/// JSON value suitable for JSONB storage
30///
31/// # Examples
32///
33/// ```no_run
34/// # use database_replicator::mongodb::converter::bson_to_json;
35/// # use bson::Bson;
36/// let bson_int = Bson::Int32(42);
37/// let json = bson_to_json(&bson_int).unwrap();
38/// assert_eq!(json, serde_json::json!(42));
39/// ```
40pub fn bson_to_json(value: &Bson) -> Result<JsonValue> {
41    match value {
42        Bson::Double(f) => {
43            // Handle non-finite numbers
44            if f.is_finite() {
45                serde_json::Number::from_f64(*f)
46                    .map(JsonValue::Number)
47                    .ok_or_else(|| anyhow::anyhow!("Failed to convert double {} to JSON number", f))
48            } else {
49                // Store non-finite as strings
50                Ok(JsonValue::String(f.to_string()))
51            }
52        }
53        Bson::String(s) => Ok(JsonValue::String(s.clone())),
54        Bson::Array(arr) => {
55            let json_arr: Result<Vec<JsonValue>> = arr.iter().map(bson_to_json).collect();
56            Ok(JsonValue::Array(json_arr?))
57        }
58        Bson::Document(doc) => {
59            let json_obj: Result<serde_json::Map<String, JsonValue>> = doc
60                .iter()
61                .map(|(k, v)| bson_to_json(v).map(|json_v| (k.clone(), json_v)))
62                .collect();
63            Ok(JsonValue::Object(json_obj?))
64        }
65        Bson::Boolean(b) => Ok(JsonValue::Bool(*b)),
66        Bson::Null => Ok(JsonValue::Null),
67        Bson::Int32(i) => Ok(JsonValue::Number((*i).into())),
68        Bson::Int64(i) => Ok(JsonValue::Number((*i).into())),
69        Bson::ObjectId(oid) => {
70            // Store ObjectId as object with $oid field for type preservation
71            Ok(serde_json::json!({
72                "_type": "objectid",
73                "$oid": oid.to_hex()
74            }))
75        }
76        Bson::DateTime(dt) => {
77            // Store DateTime as object with $date field
78            // Using milliseconds since epoch for precision
79            Ok(serde_json::json!({
80                "_type": "datetime",
81                "$date": dt.timestamp_millis()
82            }))
83        }
84        Bson::Binary(bin) => {
85            // Encode binary as base64 in object
86            let encoded =
87                base64::Engine::encode(&base64::engine::general_purpose::STANDARD, &bin.bytes);
88            Ok(serde_json::json!({
89                "_type": "binary",
90                "subtype": u8::from(bin.subtype),
91                "data": encoded
92            }))
93        }
94        Bson::RegularExpression(regex) => {
95            // Store regex as object with pattern and options
96            Ok(serde_json::json!({
97                "_type": "regex",
98                "pattern": regex.pattern,
99                "options": regex.options
100            }))
101        }
102        Bson::Timestamp(ts) => {
103            // Store timestamp as object
104            Ok(serde_json::json!({
105                "_type": "timestamp",
106                "t": ts.time,
107                "i": ts.increment
108            }))
109        }
110        Bson::Decimal128(dec) => {
111            // Store Decimal128 as string to preserve precision
112            Ok(JsonValue::String(dec.to_string()))
113        }
114        Bson::Undefined => {
115            // Treat undefined as null
116            Ok(JsonValue::Null)
117        }
118        Bson::MaxKey => {
119            // Store MaxKey as special object
120            Ok(serde_json::json!({
121                "_type": "maxkey"
122            }))
123        }
124        Bson::MinKey => {
125            // Store MinKey as special object
126            Ok(serde_json::json!({
127                "_type": "minkey"
128            }))
129        }
130        _ => {
131            // For any unsupported types, convert to string representation
132            Ok(JsonValue::String(format!("{:?}", value)))
133        }
134    }
135}
136
137/// Convert a MongoDB document to JSON object
138///
139/// Converts all fields in the document to JSON, preserving all types.
140///
141/// # Arguments
142///
143/// * `document` - BSON document from MongoDB
144///
145/// # Returns
146///
147/// JSON object ready for JSONB storage
148///
149/// # Examples
150///
151/// ```no_run
152/// # use database_replicator::mongodb::converter::document_to_json;
153/// # use bson::{doc, Bson};
154/// let doc = doc! {
155///     "name": "Alice",
156///     "age": 30,
157///     "active": true
158/// };
159/// let json = document_to_json(&doc).unwrap();
160/// assert_eq!(json["name"], "Alice");
161/// assert_eq!(json["age"], 30);
162/// ```
163pub fn document_to_json(document: &Document) -> Result<JsonValue> {
164    let mut json_obj = serde_json::Map::new();
165
166    for (key, value) in document.iter() {
167        let json_value = bson_to_json(value)
168            .with_context(|| format!("Failed to convert field '{}' to JSON", key))?;
169        json_obj.insert(key.clone(), json_value);
170    }
171
172    Ok(JsonValue::Object(json_obj))
173}
174
175/// Convert an entire MongoDB collection to JSONB format
176///
177/// Reads all documents from a collection and converts them to JSONB.
178/// Returns a vector of (id, json_data) tuples ready for insertion.
179///
180/// # ID Generation Strategy
181///
182/// - Uses MongoDB's _id field as the ID (converted to string)
183/// - ObjectId is converted to hex string
184/// - Other ID types are converted to string representation
185///
186/// # Arguments
187///
188/// * `database` - MongoDB database reference
189/// * `collection_name` - Collection name (must be validated)
190///
191/// # Returns
192///
193/// Vector of (id_string, json_data) tuples for batch insert
194///
195/// # Security
196///
197/// Collection name should be validated before calling this function.
198///
199/// # Examples
200///
201/// ```no_run
202/// # use database_replicator::mongodb::{connect_mongodb, converter::convert_collection_to_jsonb};
203/// # use database_replicator::jsonb::validate_table_name;
204/// # async fn example() -> anyhow::Result<()> {
205/// let client = connect_mongodb("mongodb://localhost:27017/mydb").await?;
206/// let db = client.database("mydb");
207/// let collection = "users";
208/// validate_table_name(collection)?;
209/// let rows = convert_collection_to_jsonb(&db, collection).await?;
210/// println!("Converted {} documents to JSONB", rows.len());
211/// # Ok(())
212/// # }
213/// ```
214pub async fn convert_collection_to_jsonb(
215    database: &Database,
216    collection_name: &str,
217) -> Result<Vec<(String, JsonValue)>> {
218    // Validate collection name
219    crate::jsonb::validate_table_name(collection_name)
220        .context("Invalid collection name for JSONB conversion")?;
221
222    tracing::info!(
223        "Converting MongoDB collection '{}' to JSONB",
224        collection_name
225    );
226
227    // Read all documents using our reader
228    let documents = crate::mongodb::reader::read_collection_data(database, collection_name)
229        .await
230        .with_context(|| format!("Failed to read data from collection '{}'", collection_name))?;
231
232    let mut result = Vec::with_capacity(documents.len());
233
234    for (doc_num, document) in documents.into_iter().enumerate() {
235        // Extract or generate ID
236        let id = if let Some(id_value) = document.get("_id") {
237            // Use _id field from document
238            match id_value {
239                Bson::ObjectId(oid) => oid.to_hex(),
240                Bson::String(s) => s.clone(),
241                Bson::Int32(i) => i.to_string(),
242                Bson::Int64(i) => i.to_string(),
243                _ => {
244                    tracing::warn!(
245                        "Document {} in collection '{}' has unsupported _id type, using doc number",
246                        doc_num + 1,
247                        collection_name
248                    );
249                    (doc_num + 1).to_string()
250                }
251            }
252        } else {
253            // No _id field, use document number
254            tracing::warn!(
255                "Document {} in collection '{}' has no _id field, using doc number",
256                doc_num + 1,
257                collection_name
258            );
259            (doc_num + 1).to_string()
260        };
261
262        // Convert document to JSON
263        let json_data = document_to_json(&document).with_context(|| {
264            format!(
265                "Failed to convert document {} in collection '{}' to JSON",
266                doc_num + 1,
267                collection_name
268            )
269        })?;
270
271        result.push((id, json_data));
272    }
273
274    tracing::info!(
275        "Converted {} documents from collection '{}' to JSONB",
276        result.len(),
277        collection_name
278    );
279
280    Ok(result)
281}
282
283#[cfg(test)]
284mod tests {
285    use super::*;
286    use bson::{doc, oid::ObjectId, Bson};
287
288    #[test]
289    fn test_convert_int32() {
290        let bson = Bson::Int32(42);
291        let json = bson_to_json(&bson).unwrap();
292        assert_eq!(json, serde_json::json!(42));
293    }
294
295    #[test]
296    fn test_convert_int64() {
297        let bson = Bson::Int64(42i64);
298        let json = bson_to_json(&bson).unwrap();
299        assert_eq!(json, serde_json::json!(42));
300    }
301
302    #[test]
303    fn test_convert_double() {
304        let bson = Bson::Double(42.75);
305        let json = bson_to_json(&bson).unwrap();
306        assert_eq!(json, serde_json::json!(42.75));
307    }
308
309    #[test]
310    fn test_convert_string() {
311        let bson = Bson::String("Hello, World!".to_string());
312        let json = bson_to_json(&bson).unwrap();
313        assert_eq!(json, serde_json::json!("Hello, World!"));
314    }
315
316    #[test]
317    fn test_convert_bool() {
318        let bson_true = Bson::Boolean(true);
319        let json_true = bson_to_json(&bson_true).unwrap();
320        assert_eq!(json_true, serde_json::json!(true));
321
322        let bson_false = Bson::Boolean(false);
323        let json_false = bson_to_json(&bson_false).unwrap();
324        assert_eq!(json_false, serde_json::json!(false));
325    }
326
327    #[test]
328    fn test_convert_null() {
329        let bson = Bson::Null;
330        let json = bson_to_json(&bson).unwrap();
331        assert_eq!(json, JsonValue::Null);
332    }
333
334    #[test]
335    fn test_convert_array() {
336        let bson = Bson::Array(vec![Bson::Int32(1), Bson::Int32(2), Bson::Int32(3)]);
337        let json = bson_to_json(&bson).unwrap();
338        assert_eq!(json, serde_json::json!([1, 2, 3]));
339    }
340
341    #[test]
342    fn test_convert_document() {
343        let doc = doc! {
344            "name": "Alice",
345            "age": 30,
346            "active": true
347        };
348        let json = document_to_json(&doc).unwrap();
349        assert_eq!(json["name"], "Alice");
350        assert_eq!(json["age"], 30);
351        assert_eq!(json["active"], true);
352    }
353
354    #[test]
355    fn test_convert_objectid() {
356        let oid = ObjectId::new();
357        let bson = Bson::ObjectId(oid);
358        let json = bson_to_json(&bson).unwrap();
359
360        // Should be wrapped in object with _type and $oid
361        assert!(json.is_object());
362        assert_eq!(json["_type"], "objectid");
363        assert_eq!(json["$oid"], oid.to_hex());
364    }
365
366    #[test]
367    fn test_convert_non_finite_double() {
368        let nan_bson = Bson::Double(f64::NAN);
369        let json = bson_to_json(&nan_bson).unwrap();
370        assert!(json.is_string());
371
372        let inf_bson = Bson::Double(f64::INFINITY);
373        let json = bson_to_json(&inf_bson).unwrap();
374        assert!(json.is_string());
375    }
376
377    #[test]
378    fn test_convert_nested_document() {
379        let doc = doc! {
380            "user": {
381                "name": "Alice",
382                "email": "alice@example.com"
383            },
384            "tags": ["admin", "user"]
385        };
386        let json = document_to_json(&doc).unwrap();
387
388        assert_eq!(json["user"]["name"], "Alice");
389        assert_eq!(json["user"]["email"], "alice@example.com");
390        assert_eq!(json["tags"][0], "admin");
391        assert_eq!(json["tags"][1], "user");
392    }
393}