mongodb_command_cli/
copier.rs1use anyhow::{anyhow, Result};
2use futures::stream::TryStreamExt;
3use mongodb::bson::{doc, oid::ObjectId, Bson, Document};
4
5pub struct Param {
6 pub database: String,
7 pub collection: String,
8 pub id: Option<String>,
9 pub filter_str: Option<String>,
10 pub from: String,
11 pub to: String,
12 pub ignore_error: Option<bool>,
13}
14
15pub async fn copy_docs(param: Param) -> Result<()> {
16 if param.id.is_some() && param.filter_str.is_some() {
17 return Err(anyhow!("You can only use one of id and filter_str"));
18 }
19
20 if param.id.is_none() && param.filter_str.is_none() {
21 return Err(anyhow!("You must use one of id and filter_str"));
22 }
23
24 let mut filter = doc! {};
25 if param.filter_str.is_some() {
26 match str_to_filter(param.filter_str.unwrap().as_str()) {
27 Ok(d) => filter.clone_from(&d),
28 _ => (),
29 };
30 } else {
31 filter.insert("_id", ObjectId::parse_str(param.id.unwrap())?);
32 }
33
34 if param.from == param.to {
35 return Err(anyhow!("from_uri, to_uri cannot be the same"));
36 }
37
38 println!(
39 "Copy Documents [ns={}.{}, filter={}] from \"{}\" to \"{}\"",
40 param.database, param.collection, filter, param.from, param.to
41 );
42
43 let from = mongodb::Client::with_uri_str(parse_uri(param.from)?)
44 .await?
45 .database(param.database.as_str())
46 .collection::<Document>(param.collection.as_str());
47 let mut cursor = from.find(filter, None).await?;
48
49 let to = mongodb::Client::with_uri_str(parse_uri(param.to)?)
50 .await?
51 .database(param.database.as_str())
52 .collection::<Document>(param.collection.as_str());
53
54 while let Some(mut d) = cursor.try_next().await? {
55 d.remove("_id").unwrap();
56 let r = to.insert_one(d, None).await;
57 match r {
58 Ok(x) => println!("inserted: {}", x.inserted_id),
59 Err(e) => {
60 println!("{}", e);
61 if param.ignore_error.is_some() && param.ignore_error.unwrap() {
62 continue;
63 }
64 break;
65 }
66 }
67 }
68
69 Ok(())
70}
71
72fn str_to_filter(s: &str) -> Result<Document> {
73 let x: Vec<(String, Bson)> = s
74 .split("+")
75 .map(|s| s.split("=").collect::<Vec<_>>())
76 .filter(|s| s.len() == 2)
77 .map(|v| (v[0].to_owned(), v[1].into()))
78 .collect();
79
80 Ok(Document::from_iter(x))
81}
82
83fn parse_uri(s: String) -> Result<String> {
84 if s.starts_with("mongodb://") {
85 Ok(s)
86 } else {
87 match std::env::var(s.clone()) {
88 Ok(v) => Ok(v),
89 Err(e) => Err(anyhow!("couldn't interpret {s}: {e}")),
90 }
91 }
92}
93
94#[cfg(test)]
95mod tests {
96
97 use super::*;
98
99 #[test]
100 fn test_str_to_filter() {
101 let x = str_to_filter("name=jojo+age=10");
102 assert!(x.is_ok());
103 assert_eq!(x.unwrap(), doc! {"name":"jojo", "age":"10"});
104 }
105
106 #[test]
107 fn test_parse_uri() {
108 let r = parse_uri("mongodb://root:123456@127.0.0.1:27017/admin".to_owned());
109 assert_eq!(r.unwrap(), "mongodb://root:123456@127.0.0.1:27017/admin".to_owned());
110
111 let r = parse_uri("xxx".to_owned());
112 assert!(r.is_err());
113
114 std::env::set_var("a", "mongodb://root:123456@127.0.0.1:27017/admin");
115 let r = parse_uri("a".to_owned());
116 assert_eq!(r.unwrap(), "mongodb://root:123456@127.0.0.1:27017/admin".to_owned());
117 }
118
119 #[tokio::test]
120 async fn test_copy_docs() {
121 let param = Param {
122 database: "db1".to_owned(),
123 collection: "c1".to_owned(),
124 id: None,
126 filter_str: Some("name=jojo".to_owned()),
129 from: "mongodb://root:123456@127.0.0.1:27017/admin".to_owned(),
130 to: "mongodb://user01:123456@127.0.0.1:27018/db1".to_owned(),
131 ignore_error: Some(true),
132 };
133 let r = copy_docs(param).await;
134 if r.is_err() {
135 println!("ERROR: {}", &r.err().unwrap());
136 assert!(false)
137 }
138 }
139}