1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
use anyhow::{anyhow, Result};
use futures::stream::TryStreamExt;
use mongodb::bson::{doc, oid::ObjectId, Bson, Document};

pub struct Param {
    pub database: String,
    pub collection: String,
    pub id: Option<String>,
    pub filter_str: Option<String>,
    pub from_uri: String,
    pub to_uri: String,
    pub ignore_error: Option<bool>,
}

pub async fn copy_docs(param: Param) -> Result<()> {
    if param.id.is_some() && param.filter_str.is_some() {
        return Err(anyhow!("You can only use one of id and filter_str"));
    }

    if param.id.is_none() && param.filter_str.is_none() {
        return Err(anyhow!("You must use one of id and filter_str"));
    }

    let mut filter = doc! {};
    if param.filter_str.is_some() {
        match str_to_filter(param.filter_str.unwrap().as_str()) {
            Ok(d) => filter.clone_from(&d),
            _ => (),
        };
    } else {
        filter.insert("_id", ObjectId::parse_str(param.id.unwrap())?);
    }

    if param.from_uri == param.to_uri {
        return Err(anyhow!("from_uri, to_uri cannot be the same"));
    }

    println!(
        "Copy Documents [ns={}.{}, filter={}] from \"{}\" to \"{}\"",
        param.database, param.collection, filter, param.from_uri, param.to_uri
    );

    let from = mongodb::Client::with_uri_str(param.from_uri)
        .await?
        .database(param.database.as_str())
        .collection::<Document>(param.collection.as_str());
    let mut cursor = from.find(filter, None).await?;

    let to = mongodb::Client::with_uri_str(param.to_uri)
        .await?
        .database(param.database.as_str())
        .collection::<Document>(param.collection.as_str());

    while let Some(mut d) = cursor.try_next().await? {
        d.remove("_id").unwrap();
        let r = to.insert_one(d, None).await;
        match r {
            Ok(x) => println!("inserted: {}", x.inserted_id),
            Err(e) => {
                println!("{}", e);
                if param.ignore_error.is_some() && param.ignore_error.unwrap() {
                    continue;
                }
                break;
            }
        }
    }

    Ok(())
}

fn str_to_filter(s: &str) -> Result<Document> {
    let x: Vec<(String, Bson)> = s
        .split("+")
        .map(|s| s.split("=").collect::<Vec<_>>())
        .filter(|s| s.len() == 2)
        .map(|v| (v[0].to_owned(), v[1].into()))
        .collect();

    Ok(Document::from_iter(x))
}

#[cfg(test)]
mod tests {

    use super::*;

    #[test]
    fn test_str_to_filter() {
        let x = str_to_filter("name=jojo+age=10");
        assert!(x.is_ok());
        assert_eq!(x.unwrap(), doc! {"name":"jojo", "age":"10"});
    }

    #[tokio::test]
    async fn test_copy_docs() {
        let param = Param {
            database: "db1".to_owned(),
            collection: "c1".to_owned(),
            // id: Some("634fd2f858ff6f5bc6e250c0".to_owned()),
            id: None,
            // filter_str: None,
            // filter_str: Some("_id=634fd2fc58ff6f5bc6e250c1".to_owned()),
            filter_str: Some("name=jojo".to_owned()),
            from_uri: "mongodb://root:123456@127.0.0.1:27017/admin".to_owned(),
            to_uri: "mongodb://user01:123456@127.0.0.1:27018/db1".to_owned(),
            ignore_error: Some(true),
        };
        let r = copy_docs(param).await;
        if r.is_err() {
            println!("ERROR: {}", &r.err().unwrap());
            assert!(false)
        }
    }
}