database_replicator/mongodb/
converter.rs1use anyhow::{Context, Result};
5use bson::{Bson, Document};
6use mongodb::Database;
7use serde_json::Value as JsonValue;
8
9pub fn bson_to_json(value: &Bson) -> Result<JsonValue> {
41 match value {
42 Bson::Double(f) => {
43 if f.is_finite() {
45 serde_json::Number::from_f64(*f)
46 .map(JsonValue::Number)
47 .ok_or_else(|| anyhow::anyhow!("Failed to convert double {} to JSON number", f))
48 } else {
49 Ok(JsonValue::String(f.to_string()))
51 }
52 }
53 Bson::String(s) => Ok(JsonValue::String(s.clone())),
54 Bson::Array(arr) => {
55 let json_arr: Result<Vec<JsonValue>> = arr.iter().map(bson_to_json).collect();
56 Ok(JsonValue::Array(json_arr?))
57 }
58 Bson::Document(doc) => {
59 let json_obj: Result<serde_json::Map<String, JsonValue>> = doc
60 .iter()
61 .map(|(k, v)| bson_to_json(v).map(|json_v| (k.clone(), json_v)))
62 .collect();
63 Ok(JsonValue::Object(json_obj?))
64 }
65 Bson::Boolean(b) => Ok(JsonValue::Bool(*b)),
66 Bson::Null => Ok(JsonValue::Null),
67 Bson::Int32(i) => Ok(JsonValue::Number((*i).into())),
68 Bson::Int64(i) => Ok(JsonValue::Number((*i).into())),
69 Bson::ObjectId(oid) => {
70 Ok(serde_json::json!({
72 "_type": "objectid",
73 "$oid": oid.to_hex()
74 }))
75 }
76 Bson::DateTime(dt) => {
77 Ok(serde_json::json!({
80 "_type": "datetime",
81 "$date": dt.timestamp_millis()
82 }))
83 }
84 Bson::Binary(bin) => {
85 let encoded =
87 base64::Engine::encode(&base64::engine::general_purpose::STANDARD, &bin.bytes);
88 Ok(serde_json::json!({
89 "_type": "binary",
90 "subtype": u8::from(bin.subtype),
91 "data": encoded
92 }))
93 }
94 Bson::RegularExpression(regex) => {
95 Ok(serde_json::json!({
97 "_type": "regex",
98 "pattern": regex.pattern,
99 "options": regex.options
100 }))
101 }
102 Bson::Timestamp(ts) => {
103 Ok(serde_json::json!({
105 "_type": "timestamp",
106 "t": ts.time,
107 "i": ts.increment
108 }))
109 }
110 Bson::Decimal128(dec) => {
111 Ok(JsonValue::String(dec.to_string()))
113 }
114 Bson::Undefined => {
115 Ok(JsonValue::Null)
117 }
118 Bson::MaxKey => {
119 Ok(serde_json::json!({
121 "_type": "maxkey"
122 }))
123 }
124 Bson::MinKey => {
125 Ok(serde_json::json!({
127 "_type": "minkey"
128 }))
129 }
130 _ => {
131 Ok(JsonValue::String(format!("{:?}", value)))
133 }
134 }
135}
136
137pub fn document_to_json(document: &Document) -> Result<JsonValue> {
164 let mut json_obj = serde_json::Map::new();
165
166 for (key, value) in document.iter() {
167 let json_value = bson_to_json(value)
168 .with_context(|| format!("Failed to convert field '{}' to JSON", key))?;
169 json_obj.insert(key.clone(), json_value);
170 }
171
172 Ok(JsonValue::Object(json_obj))
173}
174
175pub async fn convert_collection_to_jsonb(
215 database: &Database,
216 collection_name: &str,
217) -> Result<Vec<(String, JsonValue)>> {
218 crate::jsonb::validate_table_name(collection_name)
220 .context("Invalid collection name for JSONB conversion")?;
221
222 tracing::info!(
223 "Converting MongoDB collection '{}' to JSONB",
224 collection_name
225 );
226
227 let documents = crate::mongodb::reader::read_collection_data(database, collection_name)
229 .await
230 .with_context(|| format!("Failed to read data from collection '{}'", collection_name))?;
231
232 let mut result = Vec::with_capacity(documents.len());
233
234 for (doc_num, document) in documents.into_iter().enumerate() {
235 let id = if let Some(id_value) = document.get("_id") {
237 match id_value {
239 Bson::ObjectId(oid) => oid.to_hex(),
240 Bson::String(s) => s.clone(),
241 Bson::Int32(i) => i.to_string(),
242 Bson::Int64(i) => i.to_string(),
243 _ => {
244 tracing::warn!(
245 "Document {} in collection '{}' has unsupported _id type, using doc number",
246 doc_num + 1,
247 collection_name
248 );
249 (doc_num + 1).to_string()
250 }
251 }
252 } else {
253 tracing::warn!(
255 "Document {} in collection '{}' has no _id field, using doc number",
256 doc_num + 1,
257 collection_name
258 );
259 (doc_num + 1).to_string()
260 };
261
262 let json_data = document_to_json(&document).with_context(|| {
264 format!(
265 "Failed to convert document {} in collection '{}' to JSON",
266 doc_num + 1,
267 collection_name
268 )
269 })?;
270
271 result.push((id, json_data));
272 }
273
274 tracing::info!(
275 "Converted {} documents from collection '{}' to JSONB",
276 result.len(),
277 collection_name
278 );
279
280 Ok(result)
281}
282
283#[cfg(test)]
284mod tests {
285 use super::*;
286 use bson::{doc, oid::ObjectId, Bson};
287
288 #[test]
289 fn test_convert_int32() {
290 let bson = Bson::Int32(42);
291 let json = bson_to_json(&bson).unwrap();
292 assert_eq!(json, serde_json::json!(42));
293 }
294
295 #[test]
296 fn test_convert_int64() {
297 let bson = Bson::Int64(42i64);
298 let json = bson_to_json(&bson).unwrap();
299 assert_eq!(json, serde_json::json!(42));
300 }
301
302 #[test]
303 fn test_convert_double() {
304 let bson = Bson::Double(42.75);
305 let json = bson_to_json(&bson).unwrap();
306 assert_eq!(json, serde_json::json!(42.75));
307 }
308
309 #[test]
310 fn test_convert_string() {
311 let bson = Bson::String("Hello, World!".to_string());
312 let json = bson_to_json(&bson).unwrap();
313 assert_eq!(json, serde_json::json!("Hello, World!"));
314 }
315
316 #[test]
317 fn test_convert_bool() {
318 let bson_true = Bson::Boolean(true);
319 let json_true = bson_to_json(&bson_true).unwrap();
320 assert_eq!(json_true, serde_json::json!(true));
321
322 let bson_false = Bson::Boolean(false);
323 let json_false = bson_to_json(&bson_false).unwrap();
324 assert_eq!(json_false, serde_json::json!(false));
325 }
326
327 #[test]
328 fn test_convert_null() {
329 let bson = Bson::Null;
330 let json = bson_to_json(&bson).unwrap();
331 assert_eq!(json, JsonValue::Null);
332 }
333
334 #[test]
335 fn test_convert_array() {
336 let bson = Bson::Array(vec![Bson::Int32(1), Bson::Int32(2), Bson::Int32(3)]);
337 let json = bson_to_json(&bson).unwrap();
338 assert_eq!(json, serde_json::json!([1, 2, 3]));
339 }
340
341 #[test]
342 fn test_convert_document() {
343 let doc = doc! {
344 "name": "Alice",
345 "age": 30,
346 "active": true
347 };
348 let json = document_to_json(&doc).unwrap();
349 assert_eq!(json["name"], "Alice");
350 assert_eq!(json["age"], 30);
351 assert_eq!(json["active"], true);
352 }
353
354 #[test]
355 fn test_convert_objectid() {
356 let oid = ObjectId::new();
357 let bson = Bson::ObjectId(oid);
358 let json = bson_to_json(&bson).unwrap();
359
360 assert!(json.is_object());
362 assert_eq!(json["_type"], "objectid");
363 assert_eq!(json["$oid"], oid.to_hex());
364 }
365
366 #[test]
367 fn test_convert_non_finite_double() {
368 let nan_bson = Bson::Double(f64::NAN);
369 let json = bson_to_json(&nan_bson).unwrap();
370 assert!(json.is_string());
371
372 let inf_bson = Bson::Double(f64::INFINITY);
373 let json = bson_to_json(&inf_bson).unwrap();
374 assert!(json.is_string());
375 }
376
377 #[test]
378 fn test_convert_nested_document() {
379 let doc = doc! {
380 "user": {
381 "name": "Alice",
382 "email": "alice@example.com"
383 },
384 "tags": ["admin", "user"]
385 };
386 let json = document_to_json(&doc).unwrap();
387
388 assert_eq!(json["user"]["name"], "Alice");
389 assert_eq!(json["user"]["email"], "alice@example.com");
390 assert_eq!(json["tags"][0], "admin");
391 assert_eq!(json["tags"][1], "user");
392 }
393}