1use std::cell::Cell;
6use std::collections::HashMap;
7use std::path::{Path, PathBuf};
8use std::sync::Arc;
9
10use futures::TryFutureExt;
11use tokio::io::AsyncWriteExt;
12use tokio::sync::Mutex;
13
14use crate::pg_enums::{OperationSystem, PgAcquisitionStatus};
15use crate::pg_errors::{PgEmbedError, PgEmbedErrorType};
16use crate::pg_fetch::PgFetchSettings;
17use crate::pg_types::{PgCommandSync, PgResult};
18use crate::pg_unpack;
19
20lazy_static! {
21 static ref ACQUIRED_PG_BINS: Arc<Mutex<HashMap<PathBuf, PgAcquisitionStatus>>> =
28 Arc::new(Mutex::new(HashMap::with_capacity(5)));
29}
30
31const PG_EMBED_CACHE_DIR_NAME: &'static str = "pg-embed";
32const PG_VERSION_FILE_NAME: &'static str = "PG_VERSION";
33
34pub struct PgAccess {
38 pub cache_dir: PathBuf,
40 pub database_dir: PathBuf,
42 pub pg_ctl_exe: PathBuf,
44 pub init_db_exe: PathBuf,
46 pub pw_file_path: PathBuf,
48 pub zip_file_path: PathBuf,
50 pg_version_file: PathBuf,
53 fetch_settings: PgFetchSettings,
55}
56
57impl PgAccess {
58 pub async fn new(
64 fetch_settings: &PgFetchSettings,
65 database_dir: &PathBuf,
66 ) -> Result<Self, PgEmbedError> {
67 let cache_dir = Self::create_cache_dir_structure(&fetch_settings).await?;
69 Self::create_db_dir_structure(database_dir).await?;
70 let mut pg_ctl = cache_dir.clone();
72 pg_ctl.push("bin/pg_ctl");
73 let mut init_db = cache_dir.clone();
75 init_db.push("bin/initdb");
76 let mut zip_file_path = cache_dir.clone();
78 let platform = fetch_settings.platform();
79 let file_name = format!("{}-{}.zip", platform, &fetch_settings.version.0);
80 zip_file_path.push(file_name);
81 let mut pw_file = database_dir.clone();
83 pw_file.set_extension("pwfile");
84 let mut pg_version_file = database_dir.clone();
86 pg_version_file.push(PG_VERSION_FILE_NAME);
87
88 Ok(PgAccess {
89 cache_dir,
90 database_dir: database_dir.clone(),
91 pg_ctl_exe: pg_ctl,
92 init_db_exe: init_db,
93 pw_file_path: pw_file,
94 zip_file_path,
95 pg_version_file,
96 fetch_settings: fetch_settings.clone(),
97 })
98 }
99
100 async fn create_cache_dir_structure(fetch_settings: &PgFetchSettings) -> PgResult<PathBuf> {
106 let cache_dir = dirs::cache_dir().ok_or_else(|| PgEmbedError {
107 error_type: PgEmbedErrorType::InvalidPgUrl,
108 source: None,
109 message: None,
110 })?;
111 let os_string = match fetch_settings.operating_system {
112 OperationSystem::Darwin | OperationSystem::Windows | OperationSystem::Linux => {
113 fetch_settings.operating_system.to_string()
114 }
115 OperationSystem::AlpineLinux => {
116 format!("arch_{}", fetch_settings.operating_system.to_string())
117 }
118 };
119 let pg_path = format!(
120 "{}/{}/{}/{}",
121 PG_EMBED_CACHE_DIR_NAME,
122 os_string,
123 fetch_settings.architecture.to_string(),
124 fetch_settings.version.0
125 );
126 let mut cache_pg_embed = cache_dir.clone();
127 cache_pg_embed.push(pg_path);
128 tokio::fs::create_dir_all(&cache_pg_embed)
129 .map_err(|e| PgEmbedError {
130 error_type: PgEmbedErrorType::DirCreationError,
131 source: Some(Box::new(e)),
132 message: None,
133 })
134 .await?;
135 Ok(cache_pg_embed)
136 }
137
138 async fn create_db_dir_structure(db_dir: &PathBuf) -> PgResult<()> {
139 tokio::fs::create_dir_all(db_dir)
140 .map_err(|e| PgEmbedError {
141 error_type: PgEmbedErrorType::DirCreationError,
142 source: Some(Box::new(e)),
143 message: None,
144 })
145 .await
146 }
147
148 pub async fn maybe_acquire_postgres(&self) -> PgResult<()> {
152 let mut lock = ACQUIRED_PG_BINS.lock().await;
153
154 if self.pg_executables_cached().await? {
155 return Ok(());
156 }
157
158 lock.insert(self.cache_dir.clone(), PgAcquisitionStatus::InProgress);
159 let pg_bin_data = self.fetch_settings.fetch_postgres().await?;
160 self.write_pg_zip(&pg_bin_data).await?;
161 log::debug!(
162 "Unpacking postgres binaries {} {}",
163 self.zip_file_path.display(),
164 self.cache_dir.display()
165 );
166 pg_unpack::unpack_postgres(&self.zip_file_path, &self.cache_dir).await?;
167
168 lock.insert(self.cache_dir.clone(), PgAcquisitionStatus::Finished);
169 Ok(())
170 }
171
172 pub async fn pg_executables_cached(&self) -> PgResult<bool> {
176 Self::path_exists(self.init_db_exe.as_path()).await
177 }
178
179 pub async fn db_files_exist(&self) -> PgResult<bool> {
183 Ok(self.pg_executables_cached().await?
184 && Self::path_exists(self.pg_version_file.as_path()).await?)
185 }
186
187 pub async fn pg_version_file_exists(db_dir: &PathBuf) -> PgResult<bool> {
191 let mut pg_version_file = db_dir.clone();
192 pg_version_file.push(PG_VERSION_FILE_NAME);
193 let file_exists = if let Ok(_) = tokio::fs::File::open(pg_version_file.as_path()).await {
194 true
195 } else {
196 false
197 };
198 Ok(file_exists)
199 }
200
201 async fn path_exists(file: &Path) -> PgResult<bool> {
205 if let Ok(_) = tokio::fs::File::open(file).await {
206 Ok(true)
207 } else {
208 Ok(false)
209 }
210 }
211
212 pub async fn acquisition_status(&self) -> PgAcquisitionStatus {
216 let lock = ACQUIRED_PG_BINS.lock().await;
217 let acquisition_status = lock.get(&self.cache_dir);
218 match acquisition_status {
219 None => PgAcquisitionStatus::Undefined,
220 Some(status) => *status,
221 }
222 }
223
224 async fn write_pg_zip(&self, bytes: &[u8]) -> PgResult<()> {
228 let mut file: tokio::fs::File = tokio::fs::File::create(&self.zip_file_path.as_path())
229 .map_err(|e| PgEmbedError {
230 error_type: PgEmbedErrorType::WriteFileError,
231 source: Some(Box::new(e)),
232 message: None,
233 })
234 .await?;
235 file.write_all(&bytes)
236 .map_err(|e| PgEmbedError {
237 error_type: PgEmbedErrorType::WriteFileError,
238 source: Some(Box::new(e)),
239 message: None,
240 })
241 .await?;
242 file.sync_data()
243 .map_err(|e| PgEmbedError {
244 error_type: PgEmbedErrorType::WriteFileError,
245 source: Some(Box::new(e)),
246 message: None,
247 })
248 .await?;
249 Ok(())
250 }
251
252 pub fn clean(&self) -> PgResult<()> {
258 std::fs::remove_dir_all(self.database_dir.as_path()).map_err(|e| PgEmbedError {
260 error_type: PgEmbedErrorType::PgCleanUpFailure,
261 source: Some(Box::new(e)),
262 message: None,
263 })?;
264 std::fs::remove_file(self.pw_file_path.as_path()).map_err(|e| PgEmbedError {
265 error_type: PgEmbedErrorType::PgCleanUpFailure,
266 source: Some(Box::new(e)),
267 message: None,
268 })?;
269 Ok(())
270 }
271
272 pub async fn purge() -> PgResult<()> {
278 let mut cache_dir = dirs::cache_dir().ok_or_else(|| PgEmbedError {
279 error_type: PgEmbedErrorType::ReadFileError,
280 source: None,
281 message: Some(String::from("cache dir error")),
282 })?;
283 cache_dir.push(PG_EMBED_CACHE_DIR_NAME);
284 let _ = tokio::fs::remove_dir_all(cache_dir.as_path())
285 .map_err(|e| PgEmbedError {
286 error_type: PgEmbedErrorType::PgPurgeFailure,
287 source: Some(Box::new(e)),
288 message: None,
289 })
290 .await;
291 Ok(())
292 }
293
294 pub async fn clean_up(database_dir: PathBuf, pw_file: PathBuf) -> PgResult<()> {
298 tokio::fs::remove_dir_all(database_dir.as_path())
299 .await
300 .map_err(|e| PgEmbedError {
301 error_type: PgEmbedErrorType::PgCleanUpFailure,
302 source: Some(Box::new(e)),
303 message: None,
304 })?;
305
306 tokio::fs::remove_file(pw_file.as_path())
307 .await
308 .map_err(|e| PgEmbedError {
309 error_type: PgEmbedErrorType::PgCleanUpFailure,
310 source: Some(Box::new(e)),
311 message: None,
312 })
313 }
314
315 pub async fn create_password_file(&self, password: &[u8]) -> PgResult<()> {
321 let mut file: tokio::fs::File = tokio::fs::File::create(self.pw_file_path.as_path())
322 .map_err(|e| PgEmbedError {
323 error_type: PgEmbedErrorType::WriteFileError,
324 source: Some(Box::new(e)),
325 message: None,
326 })
327 .await?;
328 let _ = file
329 .write(password)
330 .map_err(|e| PgEmbedError {
331 error_type: PgEmbedErrorType::WriteFileError,
332 source: Some(Box::new(e)),
333 message: None,
334 })
335 .await?;
336 Ok(())
337 }
338
339 pub fn stop_db_command_sync(&self, database_dir: &PathBuf) -> PgCommandSync {
343 let pg_ctl_executable = self.pg_ctl_exe.to_str().unwrap();
344 let mut command = Box::new(Cell::new(std::process::Command::new(pg_ctl_executable)));
345 command
346 .get_mut()
347 .args(&["stop", "-w", "-D", database_dir.to_str().unwrap()]);
348 command
349 }
350}