1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
use anyhow::{anyhow, Result};
use futures::stream::TryStreamExt;
use mongodb::bson::{doc, oid::ObjectId, Bson, Document};
pub struct Param {
pub database: String,
pub collection: String,
pub id: Option<String>,
pub filter_str: Option<String>,
pub from_uri: String,
pub to_uri: String,
pub ignore_error: Option<bool>,
}
pub async fn copy_docs(param: Param) -> Result<()> {
if param.id.is_some() && param.filter_str.is_some() {
return Err(anyhow!("You can only use one of id and filter_str"));
}
if param.id.is_none() && param.filter_str.is_none() {
return Err(anyhow!("You must use one of id and filter_str"));
}
let mut filter = doc! {};
if param.filter_str.is_some() {
match str_to_filter(param.filter_str.unwrap().as_str()) {
Ok(d) => filter.clone_from(&d),
_ => (),
};
} else {
filter.insert("_id", ObjectId::parse_str(param.id.unwrap())?);
}
if param.from_uri == param.to_uri {
return Err(anyhow!("from_uri, to_uri cannot be the same"));
}
println!(
"Copy Documents [ns={}.{}, filter={}] from \"{}\" to \"{}\"",
param.database, param.collection, filter, param.from_uri, param.to_uri
);
let from = mongodb::Client::with_uri_str(param.from_uri)
.await?
.database(param.database.as_str())
.collection::<Document>(param.collection.as_str());
let mut cursor = from.find(filter, None).await?;
let to = mongodb::Client::with_uri_str(param.to_uri)
.await?
.database(param.database.as_str())
.collection::<Document>(param.collection.as_str());
while let Some(mut d) = cursor.try_next().await? {
d.remove("_id").unwrap();
let r = to.insert_one(d, None).await;
match r {
Ok(x) => println!("inserted: {}", x.inserted_id),
Err(e) => {
println!("{}", e);
if param.ignore_error.is_some() && param.ignore_error.unwrap() {
continue;
}
break;
}
}
}
Ok(())
}
fn str_to_filter(s: &str) -> Result<Document> {
let x: Vec<(String, Bson)> = s
.split("+")
.map(|s| s.split("=").collect::<Vec<_>>())
.filter(|s| s.len() == 2)
.map(|v| (v[0].to_owned(), v[1].into()))
.collect();
Ok(Document::from_iter(x))
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_str_to_filter() {
let x = str_to_filter("name=jojo+age=10");
assert!(x.is_ok());
assert_eq!(x.unwrap(), doc! {"name":"jojo", "age":"10"});
}
#[tokio::test]
async fn test_copy_docs() {
let param = Param {
database: "db1".to_owned(),
collection: "c1".to_owned(),
id: None,
filter_str: Some("name=jojo".to_owned()),
from_uri: "mongodb://root:123456@127.0.0.1:27017/admin".to_owned(),
to_uri: "mongodb://user01:123456@127.0.0.1:27018/db1".to_owned(),
ignore_error: Some(true),
};
let r = copy_docs(param).await;
if r.is_err() {
println!("ERROR: {}", &r.err().unwrap());
assert!(false)
}
}
}