pg_embed_alternative/
postgres.rs

1//!
2//! Postgresql server
3//!
4//! Start, stop, initialize the postgresql server.
5//! Create database clusters and databases.
6//!
7use std::io::BufRead;
8use std::path::PathBuf;
9use std::process::Stdio;
10use std::sync::Arc;
11use std::time::Duration;
12
13use futures::TryFutureExt;
14use log::{error, info};
15use tokio::sync::Mutex;
16
17#[cfg(feature = "rt_tokio_migrate")]
18use sqlx_tokio::migrate::{MigrateDatabase, Migrator};
19#[cfg(feature = "rt_tokio_migrate")]
20use sqlx_tokio::postgres::PgPoolOptions;
21#[cfg(feature = "rt_tokio_migrate")]
22use sqlx_tokio::Postgres;
23
24use crate::command_executor::AsyncCommand;
25use crate::pg_access::PgAccess;
26use crate::pg_commands::PgCommand;
27use crate::pg_enums::{PgAuthMethod, PgServerStatus};
28use crate::pg_errors::{PgEmbedError, PgEmbedErrorType};
29use crate::pg_fetch;
30use crate::pg_types::PgResult;
31
32///
33/// Database settings
34///
35pub struct PgSettings {
36    /// postgresql database directory
37    pub database_dir: PathBuf,
38    /// postgresql port
39    pub port: u16,
40    /// postgresql user name
41    pub user: String,
42    /// postgresql password
43    pub password: String,
44    /// authentication
45    pub auth_method: PgAuthMethod,
46    /// persist database
47    pub persistent: bool,
48    /// duration to wait before terminating process execution
49    /// pg_ctl start/stop and initdb timeout
50    pub timeout: Option<Duration>,
51    /// migrations folder
52    /// sql script files to execute on migrate
53    pub migration_dir: Option<PathBuf>,
54}
55
56///
57/// Embedded postgresql database
58///
59/// If the PgEmbed instance is dropped / goes out of scope and postgresql is still
60/// running, the postgresql process will be killed and depending on the [PgSettings::persistent] setting,
61/// file and directories will be cleaned up.
62///
63pub struct PgEmbed {
64    /// Postgresql settings
65    pub pg_settings: PgSettings,
66    /// Download settings
67    pub fetch_settings: pg_fetch::PgFetchSettings,
68    /// Database uri `postgres://{username}:{password}@localhost:{port}`
69    pub db_uri: String,
70    /// Postgres server status
71    pub server_status: Arc<Mutex<PgServerStatus>>,
72    pub shutting_down: bool,
73    /// Postgres files access
74    pub pg_access: PgAccess,
75}
76
77impl Drop for PgEmbed {
78    fn drop(&mut self) {
79        if !self.shutting_down {
80            let _ = self.stop_db_sync();
81        }
82        if !&self.pg_settings.persistent {
83            let _ = &self.pg_access.clean();
84        }
85    }
86}
87
88impl PgEmbed {
89    ///
90    /// Create a new PgEmbed instance
91    ///
92    pub async fn new(
93        pg_settings: PgSettings,
94        fetch_settings: pg_fetch::PgFetchSettings,
95    ) -> PgResult<Self> {
96        let password: &str = &pg_settings.password;
97        let db_uri = format!(
98            "postgres://{}:{}@localhost:{}",
99            &pg_settings.user,
100            &password,
101            pg_settings.port.to_string()
102        );
103        let pg_access = PgAccess::new(&fetch_settings, &pg_settings.database_dir).await?;
104        Ok(PgEmbed {
105            pg_settings,
106            fetch_settings,
107            db_uri,
108            server_status: Arc::new(Mutex::new(PgServerStatus::Uninitialized)),
109            shutting_down: false,
110            pg_access,
111        })
112    }
113
114    ///
115    /// Setup postgresql for execution
116    ///
117    /// Download, unpack, create password file and database
118    ///
119    pub async fn setup(&mut self) -> PgResult<()> {
120        self.pg_access.maybe_acquire_postgres().await?;
121        self.pg_access
122            .create_password_file(self.pg_settings.password.as_bytes())
123            .await?;
124        if self.pg_access.db_files_exist().await? {
125            let mut server_status = self.server_status.lock().await;
126            *server_status = PgServerStatus::Initialized;
127        } else {
128            let _r = &self.init_db().await?;
129        }
130        Ok(())
131    }
132
133    ///
134    /// Initialize postgresql database
135    ///
136    /// Returns `Ok(())` on success, otherwise returns an error.
137    ///
138    pub async fn init_db(&mut self) -> PgResult<()> {
139        {
140            let mut server_status = self.server_status.lock().await;
141            *server_status = PgServerStatus::Initializing;
142        }
143
144        let mut executor = PgCommand::init_db_executor(
145            &self.pg_access.init_db_exe,
146            &self.pg_access.database_dir,
147            &self.pg_access.pw_file_path,
148            &self.pg_settings.user,
149            &self.pg_settings.auth_method,
150        )?;
151        let exit_status = executor.execute(self.pg_settings.timeout).await?;
152        let mut server_status = self.server_status.lock().await;
153        *server_status = exit_status;
154        Ok(())
155    }
156
157    ///
158    /// Start postgresql database
159    ///
160    /// Returns `Ok(())` on success, otherwise returns an error.
161    ///
162    pub async fn start_db(&mut self) -> PgResult<()> {
163        {
164            let mut server_status = self.server_status.lock().await;
165            *server_status = PgServerStatus::Starting;
166        }
167        self.shutting_down = false;
168        let mut executor = PgCommand::start_db_executor(
169            &self.pg_access.pg_ctl_exe,
170            &self.pg_access.database_dir,
171            &self.pg_settings.port,
172        )?;
173        let exit_status = executor.execute(self.pg_settings.timeout).await?;
174        let mut server_status = self.server_status.lock().await;
175        *server_status = exit_status;
176        Ok(())
177    }
178
179    ///
180    /// Stop postgresql database
181    ///
182    /// Returns `Ok(())` on success, otherwise returns an error.
183    ///
184    pub async fn stop_db(&mut self) -> PgResult<()> {
185        {
186            let mut server_status = self.server_status.lock().await;
187            *server_status = PgServerStatus::Stopping;
188        }
189        self.shutting_down = true;
190        let mut executor =
191            PgCommand::stop_db_executor(&self.pg_access.pg_ctl_exe, &self.pg_access.database_dir)?;
192        let exit_status = executor.execute(self.pg_settings.timeout).await?;
193        let mut server_status = self.server_status.lock().await;
194        *server_status = exit_status;
195        Ok(())
196    }
197
198    ///
199    /// Stop postgresql database synchronous
200    ///
201    /// Returns `Ok(())` on success, otherwise returns an error.
202    ///
203    pub fn stop_db_sync(&mut self) -> PgResult<()> {
204        self.shutting_down = true;
205        let mut stop_db_command = self
206            .pg_access
207            .stop_db_command_sync(&self.pg_settings.database_dir);
208        let process = stop_db_command
209            .get_mut()
210            .stdout(Stdio::piped())
211            .stderr(Stdio::piped())
212            .spawn()
213            .map_err(|e| PgEmbedError {
214                error_type: PgEmbedErrorType::PgError,
215                source: Some(Box::new(e)),
216                message: None,
217            })?;
218
219        self.handle_process_io_sync(process)
220    }
221
222    ///
223    /// Handle process logging synchronous
224    ///
225    pub fn handle_process_io_sync(&self, mut process: std::process::Child) -> PgResult<()> {
226        let reader_out = std::io::BufReader::new(process.stdout.take().unwrap()).lines();
227        let reader_err = std::io::BufReader::new(process.stderr.take().unwrap()).lines();
228        reader_out.for_each(|line| info!("{}", line.unwrap()));
229        reader_err.for_each(|line| error!("{}", line.unwrap()));
230        Ok(())
231    }
232
233    ///
234    /// Create a database
235    ///
236    #[cfg(any(
237        feature = "rt_tokio_migrate",
238        feature = "rt_async_std_migrate",
239        feature = "rt_actix_migrate"
240    ))]
241    pub async fn create_database(&self, db_name: &str) -> PgResult<()> {
242        Postgres::create_database(&self.full_db_uri(db_name))
243            .map_err(|e| PgEmbedError {
244                error_type: PgEmbedErrorType::PgTaskJoinError,
245                source: Some(Box::new(e)),
246                message: None,
247            })
248            .await?;
249        Ok(())
250    }
251
252    ///
253    /// Drop a database
254    ///
255    #[cfg(any(
256        feature = "rt_tokio_migrate",
257        feature = "rt_async_std_migrate",
258        feature = "rt_actix_migrate"
259    ))]
260    pub async fn drop_database(&self, db_name: &str) -> PgResult<()> {
261        Postgres::drop_database(&self.full_db_uri(db_name))
262            .map_err(|e| PgEmbedError {
263                error_type: PgEmbedErrorType::PgTaskJoinError,
264                source: Some(Box::new(e)),
265                message: None,
266            })
267            .await?;
268        Ok(())
269    }
270
271    ///
272    /// Check database existence
273    ///
274    #[cfg(any(
275        feature = "rt_tokio_migrate",
276        feature = "rt_async_std_migrate",
277        feature = "rt_actix_migrate"
278    ))]
279    pub async fn database_exists(&self, db_name: &str) -> PgResult<bool> {
280        let result = Postgres::database_exists(&self.full_db_uri(db_name))
281            .map_err(|e| PgEmbedError {
282                error_type: PgEmbedErrorType::PgTaskJoinError,
283                source: Some(Box::new(e)),
284                message: None,
285            })
286            .await?;
287        Ok(result)
288    }
289
290    ///
291    /// The full database uri
292    ///
293    /// (*postgres://{username}:{password}@localhost:{port}/{db_name}*)
294    ///
295    pub fn full_db_uri(&self, db_name: &str) -> String {
296        format!("{}/{}", &self.db_uri, db_name)
297    }
298
299    ///
300    /// Run migrations
301    ///
302    #[cfg(any(
303        feature = "rt_tokio_migrate",
304        feature = "rt_async_std_migrate",
305        feature = "rt_actix_migrate"
306    ))]
307    pub async fn migrate(&self, db_name: &str) -> PgResult<()> {
308        if let Some(migration_dir) = &self.pg_settings.migration_dir {
309            let m = Migrator::new(migration_dir.as_path())
310                .map_err(|e| PgEmbedError {
311                    error_type: PgEmbedErrorType::MigrationError,
312                    source: Some(Box::new(e)),
313                    message: None,
314                })
315                .await?;
316            let pool = PgPoolOptions::new()
317                .connect(&self.full_db_uri(db_name))
318                .map_err(|e| PgEmbedError {
319                    error_type: PgEmbedErrorType::SqlQueryError,
320                    source: Some(Box::new(e)),
321                    message: None,
322                })
323                .await?;
324            m.run(&pool)
325                .map_err(|e| PgEmbedError {
326                    error_type: PgEmbedErrorType::MigrationError,
327                    source: Some(Box::new(e)),
328                    message: None,
329                })
330                .await?;
331        }
332        Ok(())
333    }
334}