mongodb_command_cli/
copier.rs

1use anyhow::{anyhow, Result};
2use futures::stream::TryStreamExt;
3use mongodb::bson::{doc, oid::ObjectId, Bson, Document};
4
5pub struct Param {
6    pub database: String,
7    pub collection: String,
8    pub id: Option<String>,
9    pub filter_str: Option<String>,
10    pub from: String,
11    pub to: String,
12    pub ignore_error: Option<bool>,
13}
14
15pub async fn copy_docs(param: Param) -> Result<()> {
16    if param.id.is_some() && param.filter_str.is_some() {
17        return Err(anyhow!("You can only use one of id and filter_str"));
18    }
19
20    if param.id.is_none() && param.filter_str.is_none() {
21        return Err(anyhow!("You must use one of id and filter_str"));
22    }
23
24    let mut filter = doc! {};
25    if param.filter_str.is_some() {
26        match str_to_filter(param.filter_str.unwrap().as_str()) {
27            Ok(d) => filter.clone_from(&d),
28            _ => (),
29        };
30    } else {
31        filter.insert("_id", ObjectId::parse_str(param.id.unwrap())?);
32    }
33
34    if param.from == param.to {
35        return Err(anyhow!("from_uri, to_uri cannot be the same"));
36    }
37
38    println!(
39        "Copy Documents [ns={}.{}, filter={}] from \"{}\" to \"{}\"",
40        param.database, param.collection, filter, param.from, param.to
41    );
42
43    let from = mongodb::Client::with_uri_str(parse_uri(param.from)?)
44        .await?
45        .database(param.database.as_str())
46        .collection::<Document>(param.collection.as_str());
47    let mut cursor = from.find(filter, None).await?;
48
49    let to = mongodb::Client::with_uri_str(parse_uri(param.to)?)
50        .await?
51        .database(param.database.as_str())
52        .collection::<Document>(param.collection.as_str());
53
54    while let Some(mut d) = cursor.try_next().await? {
55        d.remove("_id").unwrap();
56        let r = to.insert_one(d, None).await;
57        match r {
58            Ok(x) => println!("inserted: {}", x.inserted_id),
59            Err(e) => {
60                println!("{}", e);
61                if param.ignore_error.is_some() && param.ignore_error.unwrap() {
62                    continue;
63                }
64                break;
65            }
66        }
67    }
68
69    Ok(())
70}
71
72fn str_to_filter(s: &str) -> Result<Document> {
73    let x: Vec<(String, Bson)> = s
74        .split("+")
75        .map(|s| s.split("=").collect::<Vec<_>>())
76        .filter(|s| s.len() == 2)
77        .map(|v| (v[0].to_owned(), v[1].into()))
78        .collect();
79
80    Ok(Document::from_iter(x))
81}
82
83fn parse_uri(s: String) -> Result<String> {
84    if s.starts_with("mongodb://") {
85        Ok(s)
86    } else {
87        match std::env::var(s.clone()) {
88            Ok(v) => Ok(v),
89            Err(e) => Err(anyhow!("couldn't interpret {s}: {e}")),
90        }
91    }
92}
93
94#[cfg(test)]
95mod tests {
96
97    use super::*;
98
99    #[test]
100    fn test_str_to_filter() {
101        let x = str_to_filter("name=jojo+age=10");
102        assert!(x.is_ok());
103        assert_eq!(x.unwrap(), doc! {"name":"jojo", "age":"10"});
104    }
105
106    #[test]
107    fn test_parse_uri() {
108        let r = parse_uri("mongodb://root:123456@127.0.0.1:27017/admin".to_owned());
109        assert_eq!(r.unwrap(), "mongodb://root:123456@127.0.0.1:27017/admin".to_owned());
110
111        let r = parse_uri("xxx".to_owned());
112        assert!(r.is_err());
113
114        std::env::set_var("a", "mongodb://root:123456@127.0.0.1:27017/admin");
115        let r = parse_uri("a".to_owned());
116        assert_eq!(r.unwrap(), "mongodb://root:123456@127.0.0.1:27017/admin".to_owned());
117    }
118
119    #[tokio::test]
120    async fn test_copy_docs() {
121        let param = Param {
122            database: "db1".to_owned(),
123            collection: "c1".to_owned(),
124            // id: Some("634fd2f858ff6f5bc6e250c0".to_owned()),
125            id: None,
126            // filter_str: None,
127            // filter_str: Some("_id=634fd2fc58ff6f5bc6e250c1".to_owned()),
128            filter_str: Some("name=jojo".to_owned()),
129            from: "mongodb://root:123456@127.0.0.1:27017/admin".to_owned(),
130            to: "mongodb://user01:123456@127.0.0.1:27018/db1".to_owned(),
131            ignore_error: Some(true),
132        };
133        let r = copy_docs(param).await;
134        if r.is_err() {
135            println!("ERROR: {}", &r.err().unwrap());
136            assert!(false)
137        }
138    }
139}