1use std::cell::Cell;
6use std::collections::HashMap;
7use std::path::{Path, PathBuf};
8use std::sync::Arc;
9
10use futures::TryFutureExt;
11use tokio::io::AsyncWriteExt;
12use tokio::sync::Mutex;
13use tokio::time::{interval, Duration};
14
15use crate::pg_enums::{OperationSystem, PgAcquisitionStatus};
16use crate::pg_errors::{PgEmbedError, PgEmbedErrorType};
17use crate::pg_fetch::PgFetchSettings;
18use crate::pg_types::{PgCommandSync, PgResult};
19
20lazy_static! {
21 static ref ACQUIRED_PG_BINS: Arc<Mutex<HashMap<PathBuf, PgAcquisitionStatus>>> =
28 Arc::new(Mutex::new(HashMap::with_capacity(5)));
29}
30
31const PG_EMBED_CACHE_DIR_NAME: &'static str = "pg-embed";
32const PG_VERSION_FILE_NAME: &'static str = "PG_VERSION";
33
34pub struct PgAccess {
38 pub cache_dir: PathBuf,
40 pub database_dir: PathBuf,
42 pub pg_ctl_exe: PathBuf,
44 pub init_db_exe: PathBuf,
46 pub pw_file_path: PathBuf,
48 pub zip_file_path: PathBuf,
50 pg_version_file: PathBuf,
53}
54
55impl PgAccess {
56 pub async fn new(
62 fetch_settings: &PgFetchSettings,
63 database_dir: &PathBuf,
64 ) -> Result<Self, PgEmbedError> {
65 let cache_dir = Self::create_cache_dir_structure(&fetch_settings).await?;
67 Self::create_db_dir_structure(database_dir).await?;
68 let mut pg_ctl = cache_dir.clone();
70 pg_ctl.push("bin/pg_ctl");
71 let mut init_db = cache_dir.clone();
73 init_db.push("bin/initdb");
74 let mut zip_file_path = cache_dir.clone();
76 let platform = fetch_settings.platform();
77 let file_name = format!("{}-{}.zip", platform, &fetch_settings.version.0);
78 zip_file_path.push(file_name);
79 let mut pw_file = database_dir.clone();
81 pw_file.set_extension("pwfile");
82 let mut pg_version_file = database_dir.clone();
84 pg_version_file.push(PG_VERSION_FILE_NAME);
85
86 Ok(PgAccess {
87 cache_dir,
88 database_dir: database_dir.clone(),
89 pg_ctl_exe: pg_ctl,
90 init_db_exe: init_db,
91 pw_file_path: pw_file,
92 zip_file_path,
93 pg_version_file,
94 })
95 }
96
97 async fn create_cache_dir_structure(fetch_settings: &PgFetchSettings) -> PgResult<PathBuf> {
103 let cache_dir = dirs::cache_dir().ok_or_else(|| PgEmbedError {
104 error_type: PgEmbedErrorType::InvalidPgUrl,
105 source: None,
106 message: None,
107 })?;
108 let os_string = match fetch_settings.operating_system {
109 OperationSystem::Darwin | OperationSystem::Windows | OperationSystem::Linux => {
110 fetch_settings.operating_system.to_string()
111 }
112 OperationSystem::AlpineLinux => {
113 format!("arch_{}", fetch_settings.operating_system.to_string())
114 }
115 };
116 let pg_path = format!(
117 "{}/{}/{}/{}",
118 PG_EMBED_CACHE_DIR_NAME,
119 os_string,
120 fetch_settings.architecture.to_string(),
121 fetch_settings.version.0
122 );
123 let mut cache_pg_embed = cache_dir.clone();
124 cache_pg_embed.push(pg_path);
125 tokio::fs::create_dir_all(&cache_pg_embed)
126 .map_err(|e| PgEmbedError {
127 error_type: PgEmbedErrorType::DirCreationError,
128 source: Some(Box::new(e)),
129 message: None,
130 })
131 .await?;
132 Ok(cache_pg_embed)
133 }
134
135 async fn create_db_dir_structure(db_dir: &PathBuf) -> PgResult<()> {
136 tokio::fs::create_dir_all(db_dir)
137 .map_err(|e| PgEmbedError {
138 error_type: PgEmbedErrorType::DirCreationError,
139 source: Some(Box::new(e)),
140 message: None,
141 })
142 .await
143 }
144
145 pub async fn pg_executables_cached(&self) -> PgResult<bool> {
149 Self::path_exists(self.init_db_exe.as_path()).await
150 }
151
152 pub async fn db_files_exist(&self) -> PgResult<bool> {
156 Self::path_exists(self.pg_version_file.as_path()).await
157 }
158
159 pub async fn pg_version_file_exists(db_dir: &PathBuf) -> PgResult<bool> {
163 let mut pg_version_file = db_dir.clone();
164 pg_version_file.push(PG_VERSION_FILE_NAME);
165 let file_exists = if let Ok(_) = tokio::fs::File::open(pg_version_file.as_path()).await {
166 true
167 } else {
168 false
169 };
170 Ok(file_exists)
171 }
172
173 async fn path_exists(file: &Path) -> PgResult<bool> {
177 if let Ok(_) = tokio::fs::File::open(file).await {
178 Ok(true)
179 } else {
180 Ok(false)
181 }
182 }
183
184 pub async fn mark_acquisition_in_progress(&self) -> PgResult<()> {
191 let mut lock = ACQUIRED_PG_BINS.lock().await;
192 lock.insert(self.cache_dir.clone(), PgAcquisitionStatus::InProgress);
193 Ok(())
194 }
195
196 pub async fn mark_acquisition_finished(&self) -> PgResult<()> {
203 let mut lock = ACQUIRED_PG_BINS.lock().await;
204 lock.insert(self.cache_dir.clone(), PgAcquisitionStatus::Finished);
205 Ok(())
206 }
207
208 pub async fn acquisition_status(&self) -> PgAcquisitionStatus {
212 let lock = ACQUIRED_PG_BINS.lock().await;
213 let acquisition_status = lock.get(&self.cache_dir);
214 match acquisition_status {
215 None => PgAcquisitionStatus::Undefined,
216 Some(status) => *status,
217 }
218 }
219
220 pub async fn acquisition_needed(&self) -> PgResult<bool> {
224 if !self.pg_executables_cached().await? {
225 match self.acquisition_status().await {
226 PgAcquisitionStatus::InProgress => {
227 let mut interval = interval(Duration::from_millis(100));
228 while self.acquisition_status().await == PgAcquisitionStatus::InProgress {
229 interval.tick().await;
230 }
231 Ok(false)
232 }
233 PgAcquisitionStatus::Finished => Ok(false),
234 PgAcquisitionStatus::Undefined => Ok(true),
235 }
236 } else {
237 Ok(false)
238 }
239 }
240
241 pub async fn write_pg_zip(&self, bytes: &[u8]) -> PgResult<()> {
245 let mut file: tokio::fs::File = tokio::fs::File::create(&self.zip_file_path.as_path())
246 .map_err(|e| PgEmbedError {
247 error_type: PgEmbedErrorType::WriteFileError,
248 source: Some(Box::new(e)),
249 message: None,
250 })
251 .await?;
252 file.write_all(&bytes)
253 .map_err(|e| PgEmbedError {
254 error_type: PgEmbedErrorType::WriteFileError,
255 source: Some(Box::new(e)),
256 message: None,
257 })
258 .await?;
259 Ok(())
260 }
261
262 pub fn clean(&self) -> PgResult<()> {
268 std::fs::remove_dir_all(self.database_dir.as_path()).map_err(|e| PgEmbedError {
270 error_type: PgEmbedErrorType::PgCleanUpFailure,
271 source: Some(Box::new(e)),
272 message: None,
273 })?;
274 std::fs::remove_file(self.pw_file_path.as_path()).map_err(|e| PgEmbedError {
275 error_type: PgEmbedErrorType::PgCleanUpFailure,
276 source: Some(Box::new(e)),
277 message: None,
278 })?;
279 Ok(())
280 }
281
282 pub async fn purge() -> PgResult<()> {
288 let mut cache_dir = dirs::cache_dir().ok_or_else(|| PgEmbedError {
289 error_type: PgEmbedErrorType::ReadFileError,
290 source: None,
291 message: Some(String::from("cache dir error")),
292 })?;
293 cache_dir.push(PG_EMBED_CACHE_DIR_NAME);
294 let _ = tokio::fs::remove_dir_all(cache_dir.as_path())
295 .map_err(|e| PgEmbedError {
296 error_type: PgEmbedErrorType::PgPurgeFailure,
297 source: Some(Box::new(e)),
298 message: None,
299 })
300 .await;
301 Ok(())
302 }
303
304 pub async fn clean_up(database_dir: PathBuf, pw_file: PathBuf) -> PgResult<()> {
308 tokio::fs::remove_dir_all(database_dir.as_path())
309 .await
310 .map_err(|e| PgEmbedError {
311 error_type: PgEmbedErrorType::PgCleanUpFailure,
312 source: Some(Box::new(e)),
313 message: None,
314 })?;
315
316 tokio::fs::remove_file(pw_file.as_path())
317 .await
318 .map_err(|e| PgEmbedError {
319 error_type: PgEmbedErrorType::PgCleanUpFailure,
320 source: Some(Box::new(e)),
321 message: None,
322 })
323 }
324
325 pub async fn create_password_file(&self, password: &[u8]) -> PgResult<()> {
331 let mut file: tokio::fs::File = tokio::fs::File::create(self.pw_file_path.as_path())
332 .map_err(|e| PgEmbedError {
333 error_type: PgEmbedErrorType::WriteFileError,
334 source: Some(Box::new(e)),
335 message: None,
336 })
337 .await?;
338 let _ = file
339 .write(password)
340 .map_err(|e| PgEmbedError {
341 error_type: PgEmbedErrorType::WriteFileError,
342 source: Some(Box::new(e)),
343 message: None,
344 })
345 .await?;
346 Ok(())
347 }
348
349 pub fn stop_db_command_sync(&self, database_dir: &PathBuf) -> PgCommandSync {
353 let pg_ctl_executable = self.pg_ctl_exe.to_str().unwrap();
354 let mut command = Box::new(Cell::new(std::process::Command::new(pg_ctl_executable)));
355 command
356 .get_mut()
357 .args(&["stop", "-w", "-D", database_dir.to_str().unwrap()]);
358 command
359 }
360}