pg_embed_alternative/
pg_access.rs

1//!
2//! Cache postgresql files, access to executables, clean up files
3//!
4
5use std::cell::Cell;
6use std::collections::HashMap;
7use std::path::{Path, PathBuf};
8use std::sync::Arc;
9
10use futures::TryFutureExt;
11use tokio::io::AsyncWriteExt;
12use tokio::sync::Mutex;
13
14use crate::pg_enums::{OperationSystem, PgAcquisitionStatus};
15use crate::pg_errors::{PgEmbedError, PgEmbedErrorType};
16use crate::pg_fetch::PgFetchSettings;
17use crate::pg_types::{PgCommandSync, PgResult};
18use crate::pg_unpack;
19
20lazy_static! {
21    ///
22    /// Stores the paths to the cache directories while acquiring the related postgres binaries
23    ///
24    /// Used to prevent simultaneous downloads and unpacking of the same binaries
25    /// while executing multiple PgEmbed instances concurrently.
26    ///
27    static ref ACQUIRED_PG_BINS: Arc<Mutex<HashMap<PathBuf, PgAcquisitionStatus>>> =
28    Arc::new(Mutex::new(HashMap::with_capacity(5)));
29}
30
31const PG_EMBED_CACHE_DIR_NAME: &'static str = "pg-embed";
32const PG_VERSION_FILE_NAME: &'static str = "PG_VERSION";
33
34///
35/// Access to pg_ctl, initdb, database directory and cache directory
36///
37pub struct PgAccess {
38    /// Cache directory path
39    pub cache_dir: PathBuf,
40    /// Database directory path
41    pub database_dir: PathBuf,
42    /// Postgresql pg_ctl executable path
43    pub pg_ctl_exe: PathBuf,
44    /// Postgresql initdb executable path
45    pub init_db_exe: PathBuf,
46    /// Password file path
47    pub pw_file_path: PathBuf,
48    /// Postgresql binaries zip file path
49    pub zip_file_path: PathBuf,
50    /// Postgresql database version file
51    /// used for internal checks
52    pg_version_file: PathBuf,
53    /// Fetch settings
54    fetch_settings: PgFetchSettings,
55}
56
57impl PgAccess {
58    ///
59    /// Create a new instance
60    ///
61    /// Directory structure for cached postgresql binaries will be created
62    ///
63    pub async fn new(
64        fetch_settings: &PgFetchSettings,
65        database_dir: &PathBuf,
66    ) -> Result<Self, PgEmbedError> {
67        // cache directory
68        let cache_dir = Self::create_cache_dir_structure(&fetch_settings).await?;
69        Self::create_db_dir_structure(database_dir).await?;
70        // pg_ctl executable
71        let mut pg_ctl = cache_dir.clone();
72        pg_ctl.push("bin/pg_ctl");
73        // initdb executable
74        let mut init_db = cache_dir.clone();
75        init_db.push("bin/initdb");
76        // postgres zip file
77        let mut zip_file_path = cache_dir.clone();
78        let platform = fetch_settings.platform();
79        let file_name = format!("{}-{}.zip", platform, &fetch_settings.version.0);
80        zip_file_path.push(file_name);
81        // password file
82        let mut pw_file = database_dir.clone();
83        pw_file.set_extension("pwfile");
84        // postgres version file
85        let mut pg_version_file = database_dir.clone();
86        pg_version_file.push(PG_VERSION_FILE_NAME);
87
88        Ok(PgAccess {
89            cache_dir,
90            database_dir: database_dir.clone(),
91            pg_ctl_exe: pg_ctl,
92            init_db_exe: init_db,
93            pw_file_path: pw_file,
94            zip_file_path,
95            pg_version_file,
96            fetch_settings: fetch_settings.clone(),
97        })
98    }
99
100    ///
101    /// Create directory structure for cached postgresql executables
102    ///
103    /// Returns PathBuf(cache_directory) on success, an error otherwise
104    ///
105    async fn create_cache_dir_structure(fetch_settings: &PgFetchSettings) -> PgResult<PathBuf> {
106        let cache_dir = dirs::cache_dir().ok_or_else(|| PgEmbedError {
107            error_type: PgEmbedErrorType::InvalidPgUrl,
108            source: None,
109            message: None,
110        })?;
111        let os_string = match fetch_settings.operating_system {
112            OperationSystem::Darwin | OperationSystem::Windows | OperationSystem::Linux => {
113                fetch_settings.operating_system.to_string()
114            }
115            OperationSystem::AlpineLinux => {
116                format!("arch_{}", fetch_settings.operating_system.to_string())
117            }
118        };
119        let pg_path = format!(
120            "{}/{}/{}/{}",
121            PG_EMBED_CACHE_DIR_NAME,
122            os_string,
123            fetch_settings.architecture.to_string(),
124            fetch_settings.version.0
125        );
126        let mut cache_pg_embed = cache_dir.clone();
127        cache_pg_embed.push(pg_path);
128        tokio::fs::create_dir_all(&cache_pg_embed)
129            .map_err(|e| PgEmbedError {
130                error_type: PgEmbedErrorType::DirCreationError,
131                source: Some(Box::new(e)),
132                message: None,
133            })
134            .await?;
135        Ok(cache_pg_embed)
136    }
137
138    async fn create_db_dir_structure(db_dir: &PathBuf) -> PgResult<()> {
139        tokio::fs::create_dir_all(db_dir)
140            .map_err(|e| PgEmbedError {
141                error_type: PgEmbedErrorType::DirCreationError,
142                source: Some(Box::new(e)),
143                message: None,
144            })
145            .await
146    }
147
148    ///
149    /// Download and unpack postgres binaries
150    ///
151    pub async fn maybe_acquire_postgres(&self) -> PgResult<()> {
152        let mut lock = ACQUIRED_PG_BINS.lock().await;
153
154        if self.pg_executables_cached().await? {
155            return Ok(());
156        }
157
158        lock.insert(self.cache_dir.clone(), PgAcquisitionStatus::InProgress);
159        let pg_bin_data = self.fetch_settings.fetch_postgres().await?;
160        self.write_pg_zip(&pg_bin_data).await?;
161        log::debug!(
162            "Unpacking postgres binaries {} {}",
163            self.zip_file_path.display(),
164            self.cache_dir.display()
165        );
166        pg_unpack::unpack_postgres(&self.zip_file_path, &self.cache_dir).await?;
167
168        lock.insert(self.cache_dir.clone(), PgAcquisitionStatus::Finished);
169        Ok(())
170    }
171
172    ///
173    /// Check if postgresql executables are already cached
174    ///
175    pub async fn pg_executables_cached(&self) -> PgResult<bool> {
176        Self::path_exists(self.init_db_exe.as_path()).await
177    }
178
179    ///
180    /// Check if database files exist
181    ///
182    pub async fn db_files_exist(&self) -> PgResult<bool> {
183        Ok(self.pg_executables_cached().await?
184            && Self::path_exists(self.pg_version_file.as_path()).await?)
185    }
186
187    ///
188    /// Check if database version file exists
189    ///
190    pub async fn pg_version_file_exists(db_dir: &PathBuf) -> PgResult<bool> {
191        let mut pg_version_file = db_dir.clone();
192        pg_version_file.push(PG_VERSION_FILE_NAME);
193        let file_exists = if let Ok(_) = tokio::fs::File::open(pg_version_file.as_path()).await {
194            true
195        } else {
196            false
197        };
198        Ok(file_exists)
199    }
200
201    ///
202    /// Check if file path exists
203    ///
204    async fn path_exists(file: &Path) -> PgResult<bool> {
205        if let Ok(_) = tokio::fs::File::open(file).await {
206            Ok(true)
207        } else {
208            Ok(false)
209        }
210    }
211
212    ///
213    /// Check postgresql acquisition status
214    ///
215    pub async fn acquisition_status(&self) -> PgAcquisitionStatus {
216        let lock = ACQUIRED_PG_BINS.lock().await;
217        let acquisition_status = lock.get(&self.cache_dir);
218        match acquisition_status {
219            None => PgAcquisitionStatus::Undefined,
220            Some(status) => *status,
221        }
222    }
223
224    ///
225    /// Write pg binaries zip to postgresql cache directory
226    ///
227    async fn write_pg_zip(&self, bytes: &[u8]) -> PgResult<()> {
228        let mut file: tokio::fs::File = tokio::fs::File::create(&self.zip_file_path.as_path())
229            .map_err(|e| PgEmbedError {
230                error_type: PgEmbedErrorType::WriteFileError,
231                source: Some(Box::new(e)),
232                message: None,
233            })
234            .await?;
235        file.write_all(&bytes)
236            .map_err(|e| PgEmbedError {
237                error_type: PgEmbedErrorType::WriteFileError,
238                source: Some(Box::new(e)),
239                message: None,
240            })
241            .await?;
242        file.sync_data()
243            .map_err(|e| PgEmbedError {
244                error_type: PgEmbedErrorType::WriteFileError,
245                source: Some(Box::new(e)),
246                message: None,
247            })
248            .await?;
249        Ok(())
250    }
251
252    ///
253    /// Clean up created files and directories.
254    ///
255    /// Remove created directories containing the database and the password file.
256    ///
257    pub fn clean(&self) -> PgResult<()> {
258        // not using tokio::fs async methods because clean() is called on drop
259        std::fs::remove_dir_all(self.database_dir.as_path()).map_err(|e| PgEmbedError {
260            error_type: PgEmbedErrorType::PgCleanUpFailure,
261            source: Some(Box::new(e)),
262            message: None,
263        })?;
264        std::fs::remove_file(self.pw_file_path.as_path()).map_err(|e| PgEmbedError {
265            error_type: PgEmbedErrorType::PgCleanUpFailure,
266            source: Some(Box::new(e)),
267            message: None,
268        })?;
269        Ok(())
270    }
271
272    ///
273    /// Purge postgresql executables
274    ///
275    /// Remove all cached postgresql executables
276    ///
277    pub async fn purge() -> PgResult<()> {
278        let mut cache_dir = dirs::cache_dir().ok_or_else(|| PgEmbedError {
279            error_type: PgEmbedErrorType::ReadFileError,
280            source: None,
281            message: Some(String::from("cache dir error")),
282        })?;
283        cache_dir.push(PG_EMBED_CACHE_DIR_NAME);
284        let _ = tokio::fs::remove_dir_all(cache_dir.as_path())
285            .map_err(|e| PgEmbedError {
286                error_type: PgEmbedErrorType::PgPurgeFailure,
287                source: Some(Box::new(e)),
288                message: None,
289            })
290            .await;
291        Ok(())
292    }
293
294    ///
295    /// Clean up database directory and password file
296    ///
297    pub async fn clean_up(database_dir: PathBuf, pw_file: PathBuf) -> PgResult<()> {
298        tokio::fs::remove_dir_all(database_dir.as_path())
299            .await
300            .map_err(|e| PgEmbedError {
301                error_type: PgEmbedErrorType::PgCleanUpFailure,
302                source: Some(Box::new(e)),
303                message: None,
304            })?;
305
306        tokio::fs::remove_file(pw_file.as_path())
307            .await
308            .map_err(|e| PgEmbedError {
309                error_type: PgEmbedErrorType::PgCleanUpFailure,
310                source: Some(Box::new(e)),
311                message: None,
312            })
313    }
314
315    ///
316    /// Create a database password file
317    ///
318    /// Returns `Ok(())` on success, otherwise returns an error.
319    ///
320    pub async fn create_password_file(&self, password: &[u8]) -> PgResult<()> {
321        let mut file: tokio::fs::File = tokio::fs::File::create(self.pw_file_path.as_path())
322            .map_err(|e| PgEmbedError {
323                error_type: PgEmbedErrorType::WriteFileError,
324                source: Some(Box::new(e)),
325                message: None,
326            })
327            .await?;
328        let _ = file
329            .write(password)
330            .map_err(|e| PgEmbedError {
331                error_type: PgEmbedErrorType::WriteFileError,
332                source: Some(Box::new(e)),
333                message: None,
334            })
335            .await?;
336        Ok(())
337    }
338
339    ///
340    /// Create synchronous pg_ctl stop command
341    ///
342    pub fn stop_db_command_sync(&self, database_dir: &PathBuf) -> PgCommandSync {
343        let pg_ctl_executable = self.pg_ctl_exe.to_str().unwrap();
344        let mut command = Box::new(Cell::new(std::process::Command::new(pg_ctl_executable)));
345        command
346            .get_mut()
347            .args(&["stop", "-w", "-D", database_dir.to_str().unwrap()]);
348        command
349    }
350}