pg_embed/
pg_access.rs

1//!
2//! Cache postgresql files, access to executables, clean up files
3//!
4
5use std::cell::Cell;
6use std::collections::HashMap;
7use std::path::{Path, PathBuf};
8use std::sync::Arc;
9
10use futures::TryFutureExt;
11use tokio::io::AsyncWriteExt;
12use tokio::sync::Mutex;
13use tokio::time::{interval, Duration};
14
15use crate::pg_enums::{OperationSystem, PgAcquisitionStatus};
16use crate::pg_errors::{PgEmbedError, PgEmbedErrorType};
17use crate::pg_fetch::PgFetchSettings;
18use crate::pg_types::{PgCommandSync, PgResult};
19
20lazy_static! {
21    ///
22    /// Stores the paths to the cache directories while acquiring the related postgres binaries
23    ///
24    /// Used to prevent simultaneous downloads and unpacking of the same binaries
25    /// while executing multiple PgEmbed instances concurrently.
26    ///
27    static ref ACQUIRED_PG_BINS: Arc<Mutex<HashMap<PathBuf, PgAcquisitionStatus>>> =
28    Arc::new(Mutex::new(HashMap::with_capacity(5)));
29}
30
31const PG_EMBED_CACHE_DIR_NAME: &'static str = "pg-embed";
32const PG_VERSION_FILE_NAME: &'static str = "PG_VERSION";
33
34///
35/// Access to pg_ctl, initdb, database directory and cache directory
36///
37pub struct PgAccess {
38    /// Cache directory path
39    pub cache_dir: PathBuf,
40    /// Database directory path
41    pub database_dir: PathBuf,
42    /// Postgresql pg_ctl executable path
43    pub pg_ctl_exe: PathBuf,
44    /// Postgresql initdb executable path
45    pub init_db_exe: PathBuf,
46    /// Password file path
47    pub pw_file_path: PathBuf,
48    /// Postgresql binaries zip file path
49    pub zip_file_path: PathBuf,
50    /// Postgresql database version file
51    /// used for internal checks
52    pg_version_file: PathBuf,
53}
54
55impl PgAccess {
56    ///
57    /// Create a new instance
58    ///
59    /// Directory structure for cached postgresql binaries will be created
60    ///
61    pub async fn new(
62        fetch_settings: &PgFetchSettings,
63        database_dir: &PathBuf,
64    ) -> Result<Self, PgEmbedError> {
65        // cache directory
66        let cache_dir = Self::create_cache_dir_structure(&fetch_settings).await?;
67        Self::create_db_dir_structure(database_dir).await?;
68        // pg_ctl executable
69        let mut pg_ctl = cache_dir.clone();
70        pg_ctl.push("bin/pg_ctl");
71        // initdb executable
72        let mut init_db = cache_dir.clone();
73        init_db.push("bin/initdb");
74        // postgres zip file
75        let mut zip_file_path = cache_dir.clone();
76        let platform = fetch_settings.platform();
77        let file_name = format!("{}-{}.zip", platform, &fetch_settings.version.0);
78        zip_file_path.push(file_name);
79        // password file
80        let mut pw_file = database_dir.clone();
81        pw_file.set_extension("pwfile");
82        // postgres version file
83        let mut pg_version_file = database_dir.clone();
84        pg_version_file.push(PG_VERSION_FILE_NAME);
85
86        Ok(PgAccess {
87            cache_dir,
88            database_dir: database_dir.clone(),
89            pg_ctl_exe: pg_ctl,
90            init_db_exe: init_db,
91            pw_file_path: pw_file,
92            zip_file_path,
93            pg_version_file,
94        })
95    }
96
97    ///
98    /// Create directory structure for cached postgresql executables
99    ///
100    /// Returns PathBuf(cache_directory) on success, an error otherwise
101    ///
102    async fn create_cache_dir_structure(fetch_settings: &PgFetchSettings) -> PgResult<PathBuf> {
103        let cache_dir = dirs::cache_dir().ok_or_else(|| PgEmbedError {
104            error_type: PgEmbedErrorType::InvalidPgUrl,
105            source: None,
106            message: None,
107        })?;
108        let os_string = match fetch_settings.operating_system {
109            OperationSystem::Darwin | OperationSystem::Windows | OperationSystem::Linux => {
110                fetch_settings.operating_system.to_string()
111            }
112            OperationSystem::AlpineLinux => {
113                format!("arch_{}", fetch_settings.operating_system.to_string())
114            }
115        };
116        let pg_path = format!(
117            "{}/{}/{}/{}",
118            PG_EMBED_CACHE_DIR_NAME,
119            os_string,
120            fetch_settings.architecture.to_string(),
121            fetch_settings.version.0
122        );
123        let mut cache_pg_embed = cache_dir.clone();
124        cache_pg_embed.push(pg_path);
125        tokio::fs::create_dir_all(&cache_pg_embed)
126            .map_err(|e| PgEmbedError {
127                error_type: PgEmbedErrorType::DirCreationError,
128                source: Some(Box::new(e)),
129                message: None,
130            })
131            .await?;
132        Ok(cache_pg_embed)
133    }
134
135    async fn create_db_dir_structure(db_dir: &PathBuf) -> PgResult<()> {
136        tokio::fs::create_dir_all(db_dir)
137            .map_err(|e| PgEmbedError {
138                error_type: PgEmbedErrorType::DirCreationError,
139                source: Some(Box::new(e)),
140                message: None,
141            })
142            .await
143    }
144
145    ///
146    /// Check if postgresql executables are already cached
147    ///
148    pub async fn pg_executables_cached(&self) -> PgResult<bool> {
149        Self::path_exists(self.init_db_exe.as_path()).await
150    }
151
152    ///
153    /// Check if database files exist
154    ///
155    pub async fn db_files_exist(&self) -> PgResult<bool> {
156        Self::path_exists(self.pg_version_file.as_path()).await
157    }
158
159    ///
160    /// Check if database version file exists
161    ///
162    pub async fn pg_version_file_exists(db_dir: &PathBuf) -> PgResult<bool> {
163        let mut pg_version_file = db_dir.clone();
164        pg_version_file.push(PG_VERSION_FILE_NAME);
165        let file_exists = if let Ok(_) = tokio::fs::File::open(pg_version_file.as_path()).await {
166            true
167        } else {
168            false
169        };
170        Ok(file_exists)
171    }
172
173    ///
174    /// Check if file path exists
175    ///
176    async fn path_exists(file: &Path) -> PgResult<bool> {
177        if let Ok(_) = tokio::fs::File::open(file).await {
178            Ok(true)
179        } else {
180            Ok(false)
181        }
182    }
183
184    ///
185    /// Mark postgresql binaries acquisition in progress
186    ///
187    /// Used while acquiring postgresql binaries, so that no two instances
188    /// of PgEmbed try to acquire the same resources
189    ///
190    pub async fn mark_acquisition_in_progress(&self) -> PgResult<()> {
191        let mut lock = ACQUIRED_PG_BINS.lock().await;
192        lock.insert(self.cache_dir.clone(), PgAcquisitionStatus::InProgress);
193        Ok(())
194    }
195
196    ///
197    /// Mark postgresql binaries acquisition finished
198    ///
199    /// Used when acquiring postgresql has finished, so that other instances
200    /// of PgEmbed don't try to reacquire resources
201    ///
202    pub async fn mark_acquisition_finished(&self) -> PgResult<()> {
203        let mut lock = ACQUIRED_PG_BINS.lock().await;
204        lock.insert(self.cache_dir.clone(), PgAcquisitionStatus::Finished);
205        Ok(())
206    }
207
208    ///
209    /// Check postgresql acquisition status
210    ///
211    pub async fn acquisition_status(&self) -> PgAcquisitionStatus {
212        let lock = ACQUIRED_PG_BINS.lock().await;
213        let acquisition_status = lock.get(&self.cache_dir);
214        match acquisition_status {
215            None => PgAcquisitionStatus::Undefined,
216            Some(status) => *status,
217        }
218    }
219
220    ///
221    /// Determine if postgresql binaries acquisition is needed
222    ///
223    pub async fn acquisition_needed(&self) -> PgResult<bool> {
224        if !self.pg_executables_cached().await? {
225            match self.acquisition_status().await {
226                PgAcquisitionStatus::InProgress => {
227                    let mut interval = interval(Duration::from_millis(100));
228                    while self.acquisition_status().await == PgAcquisitionStatus::InProgress {
229                        interval.tick().await;
230                    }
231                    Ok(false)
232                }
233                PgAcquisitionStatus::Finished => Ok(false),
234                PgAcquisitionStatus::Undefined => Ok(true),
235            }
236        } else {
237            Ok(false)
238        }
239    }
240
241    ///
242    /// Write pg binaries zip to postgresql cache directory
243    ///
244    pub async fn write_pg_zip(&self, bytes: &[u8]) -> PgResult<()> {
245        let mut file: tokio::fs::File = tokio::fs::File::create(&self.zip_file_path.as_path())
246            .map_err(|e| PgEmbedError {
247                error_type: PgEmbedErrorType::WriteFileError,
248                source: Some(Box::new(e)),
249                message: None,
250            })
251            .await?;
252        file.write_all(&bytes)
253            .map_err(|e| PgEmbedError {
254                error_type: PgEmbedErrorType::WriteFileError,
255                source: Some(Box::new(e)),
256                message: None,
257            })
258            .await?;
259        Ok(())
260    }
261
262    ///
263    /// Clean up created files and directories.
264    ///
265    /// Remove created directories containing the database and the password file.
266    ///
267    pub fn clean(&self) -> PgResult<()> {
268        // not using tokio::fs async methods because clean() is called on drop
269        std::fs::remove_dir_all(self.database_dir.as_path()).map_err(|e| PgEmbedError {
270            error_type: PgEmbedErrorType::PgCleanUpFailure,
271            source: Some(Box::new(e)),
272            message: None,
273        })?;
274        std::fs::remove_file(self.pw_file_path.as_path()).map_err(|e| PgEmbedError {
275            error_type: PgEmbedErrorType::PgCleanUpFailure,
276            source: Some(Box::new(e)),
277            message: None,
278        })?;
279        Ok(())
280    }
281
282    ///
283    /// Purge postgresql executables
284    ///
285    /// Remove all cached postgresql executables
286    ///
287    pub async fn purge() -> PgResult<()> {
288        let mut cache_dir = dirs::cache_dir().ok_or_else(|| PgEmbedError {
289            error_type: PgEmbedErrorType::ReadFileError,
290            source: None,
291            message: Some(String::from("cache dir error")),
292        })?;
293        cache_dir.push(PG_EMBED_CACHE_DIR_NAME);
294        let _ = tokio::fs::remove_dir_all(cache_dir.as_path())
295            .map_err(|e| PgEmbedError {
296                error_type: PgEmbedErrorType::PgPurgeFailure,
297                source: Some(Box::new(e)),
298                message: None,
299            })
300            .await;
301        Ok(())
302    }
303
304    ///
305    /// Clean up database directory and password file
306    ///
307    pub async fn clean_up(database_dir: PathBuf, pw_file: PathBuf) -> PgResult<()> {
308        tokio::fs::remove_dir_all(database_dir.as_path())
309            .await
310            .map_err(|e| PgEmbedError {
311                error_type: PgEmbedErrorType::PgCleanUpFailure,
312                source: Some(Box::new(e)),
313                message: None,
314            })?;
315
316        tokio::fs::remove_file(pw_file.as_path())
317            .await
318            .map_err(|e| PgEmbedError {
319                error_type: PgEmbedErrorType::PgCleanUpFailure,
320                source: Some(Box::new(e)),
321                message: None,
322            })
323    }
324
325    ///
326    /// Create a database password file
327    ///
328    /// Returns `Ok(())` on success, otherwise returns an error.
329    ///
330    pub async fn create_password_file(&self, password: &[u8]) -> PgResult<()> {
331        let mut file: tokio::fs::File = tokio::fs::File::create(self.pw_file_path.as_path())
332            .map_err(|e| PgEmbedError {
333                error_type: PgEmbedErrorType::WriteFileError,
334                source: Some(Box::new(e)),
335                message: None,
336            })
337            .await?;
338        let _ = file
339            .write(password)
340            .map_err(|e| PgEmbedError {
341                error_type: PgEmbedErrorType::WriteFileError,
342                source: Some(Box::new(e)),
343                message: None,
344            })
345            .await?;
346        Ok(())
347    }
348
349    ///
350    /// Create synchronous pg_ctl stop command
351    ///
352    pub fn stop_db_command_sync(&self, database_dir: &PathBuf) -> PgCommandSync {
353        let pg_ctl_executable = self.pg_ctl_exe.to_str().unwrap();
354        let mut command = Box::new(Cell::new(std::process::Command::new(pg_ctl_executable)));
355        command
356            .get_mut()
357            .args(&["stop", "-w", "-D", database_dir.to_str().unwrap()]);
358        command
359    }
360}