1pub mod downloader;
2pub mod extractor;
3pub mod process;
4
5use anyhow::Result;
6use std::path::PathBuf;
7use directories::ProjectDirs;
8
9use crate::downloader::{get_download_url, download_file_with_callback, get_os};
10use crate::extractor::extract;
11use crate::process::MongoProcess;
12
13pub use crate::downloader::DownloadProgress;
14
15pub enum InitStatus {
16 CheckingDB,
17 ValidatingInstallation,
18 Downloading,
19 DownloadProgress(DownloadProgress),
20 SettingUpUser,
21 VerifyingCredentials,
22 DBInitialized,
23}
24
25pub struct MongoEmbedded {
26 pub version: String,
27 pub download_path: PathBuf,
28 pub extract_path: PathBuf,
29 pub db_path: PathBuf,
30 pub port: u16,
31 pub bind_ip: String,
32 pub username: Option<String>,
33 pub password: Option<String>,
34}
35
36
37impl MongoEmbedded {
38 pub fn new(version: &str) -> Result<Self> {
39 let proj_dirs = ProjectDirs::from("com", "mongo", "embedded")
40 .ok_or_else(|| anyhow::anyhow!("Could not determine project directories"))?;
41
42 let cache_dir = proj_dirs.cache_dir();
43 let data_dir = proj_dirs.data_dir();
44
45 Ok(Self {
46 version: version.to_string(),
47 download_path: cache_dir.join("downloads"),
48 extract_path: cache_dir.join("extracted"),
49 db_path: data_dir.join("db"),
50 port: 27017,
51 bind_ip: "127.0.0.1".to_string(),
52 username: None,
53 password: None,
54 })
55 }
56
57 pub fn set_port(mut self, port: u16) -> Self {
58 self.port = port;
59 self
60 }
61
62 pub fn set_bind_ip(mut self, bind_ip: &str) -> Self {
63 self.bind_ip = bind_ip.to_string();
64 self
65 }
66
67 pub fn set_db_path(mut self, path: PathBuf) -> Self {
68 self.db_path = path;
69 self
70 }
71
72 pub fn set_credentials(mut self, username: &str, password: &str) -> Self {
73 self.username = Some(username.to_string());
74 self.password = Some(password.to_string());
75 self
76 }
77
78 pub fn is_installed(&self) -> bool {
79 let extract_target = self.extract_path.join(self.version.as_str());
80 extract_target.exists()
81 }
82
83 pub async fn start(&self) -> Result<MongoProcess> {
84 self.start_with_progress(|_| {}).await
85 }
86
87 pub async fn start_with_progress<F>(&self, mut callback: F) -> Result<MongoProcess>
88 where
89 F: FnMut(InitStatus),
90 {
91 callback(InitStatus::CheckingDB);
92 let mongo_url = get_download_url(&self.version)?;
93 let download_target = self.download_path.join(&mongo_url.filename);
94
95 callback(InitStatus::ValidatingInstallation);
96 if !download_target.exists() {
97 if !self.download_path.exists() {
98 std::fs::create_dir_all(&self.download_path)?;
99 }
100 callback(InitStatus::Downloading);
101 download_file_with_callback(&mongo_url.url, &download_target, |progress| {
102 callback(InitStatus::DownloadProgress(progress));
103 }).await?;
104 }
105
106 let extract_target = self.extract_path.join(self.version.as_str());
107 if !extract_target.exists() {
108 extract(&download_target, &extract_target)?;
109 }
110
111 let os = get_os()?;
112
113 let uri = if self.bind_ip.contains('/') || self.bind_ip.ends_with(".sock") {
115 let encoded = self.bind_ip.replace("/", "%2F");
120 format!("mongodb://{}/?directConnection=true", encoded)
121 } else {
122 format!("mongodb://{}:{}/?directConnection=true", self.bind_ip, self.port)
123 };
124
125 let auth_enabled = self.username.is_some() && self.password.is_some();
127 let mut process = MongoProcess::start(&extract_target, self.port, &self.db_path, &os, &self.bind_ip, auth_enabled, uri.clone())?;
128
129 let mut client_options = mongodb::options::ClientOptions::parse(&uri).await?;
132 client_options.connect_timeout = Some(std::time::Duration::from_secs(2));
133 client_options.server_selection_timeout = Some(std::time::Duration::from_secs(2));
134
135 let mut connected = false;
137 let start = std::time::Instant::now();
138 println!("DEBUG: Waiting for MongoDB to start at {}", uri);
139 while start.elapsed() < std::time::Duration::from_secs(30) {
140 let client = mongodb::Client::with_options(client_options.clone())?;
141 match client.list_database_names(None, None).await {
142 Ok(_) => {
143 connected = true;
144 break;
145 }
146 Err(e) => {
147 println!("DEBUG: Connection attempt failed: {:?}", e);
148 match *e.kind {
151 mongodb::error::ErrorKind::Command(ref cmd_err) => {
152 if cmd_err.code == 51 || cmd_err.code == 13 || cmd_err.code == 18 { connected = true;
154 break;
155 }
156 },
157 _ => {}
158 }
159 }
160 }
161
162 tokio::time::sleep(std::time::Duration::from_millis(500)).await;
163 }
164
165 if !connected {
166 println!("DEBUG: Timed out waiting for start.");
167 process.kill()?;
168 return Err(anyhow::anyhow!("Timed out waiting for MongoDB to start"));
169 }
170
171 if let (Some(username), Some(password)) = (&self.username, &self.password) {
172 callback(InitStatus::SettingUpUser);
173 let client = mongodb::Client::with_options(client_options.clone())?;
174
175 use mongodb::bson::doc;
177 let db = client.database("admin");
178 let run_cmd = db.run_command(doc! {
179 "createUser": username,
180 "pwd": password,
181 "roles": [
182 { "role": "root", "db": "admin" }
183 ]
184 }, None).await;
185
186 match run_cmd {
187 Ok(_) => {
188 },
190 Err(e) => {
191 let kind = &*e.kind;
193 let needs_verify;
194 if let mongodb::error::ErrorKind::Command(cmd_err) = kind {
195 if cmd_err.code == 51 { needs_verify = true;
197 } else if cmd_err.code == 13 { needs_verify = true;
199 } else {
200 needs_verify = true;
202 }
203 } else {
204 needs_verify = true; }
206
207 if needs_verify {
208 callback(InitStatus::VerifyingCredentials);
209 let mut auth_opts = client_options.clone();
211 auth_opts.credential = Some(mongodb::options::Credential::builder()
212 .username(username.clone())
213 .password(password.clone())
214 .source("admin".to_string())
215 .build());
216
217 let auth_client = mongodb::Client::with_options(auth_opts)?;
218 if let Err(auth_err) = auth_client.database("admin").run_command(doc! { "ping": 1 }, None).await {
220 process.kill()?;
221 return Err(anyhow::anyhow!("Authentication failed or invalid credentials provided: {}", auth_err));
222 }
223 }
224 }
225 }
226
227 let final_uri;
229 if self.bind_ip.contains('/') || self.bind_ip.ends_with(".sock") {
230 let encoded = self.bind_ip.replace("/", "%2F");
231 final_uri = format!("mongodb://{}:{}@{}", username, password, encoded);
233 } else {
234 final_uri = format!("mongodb://{}:{}@{}:{}/", username, password, self.bind_ip, self.port);
235 }
236 process.connection_string = final_uri;
237 }
238
239 callback(InitStatus::DBInitialized);
240 Ok(process)
241 }
242}
243