pg_embed_alternative/
postgres.rs1use std::io::BufRead;
8use std::path::PathBuf;
9use std::process::Stdio;
10use std::sync::Arc;
11use std::time::Duration;
12
13use futures::TryFutureExt;
14use log::{error, info};
15use tokio::sync::Mutex;
16
17#[cfg(feature = "rt_tokio_migrate")]
18use sqlx_tokio::migrate::{MigrateDatabase, Migrator};
19#[cfg(feature = "rt_tokio_migrate")]
20use sqlx_tokio::postgres::PgPoolOptions;
21#[cfg(feature = "rt_tokio_migrate")]
22use sqlx_tokio::Postgres;
23
24use crate::command_executor::AsyncCommand;
25use crate::pg_access::PgAccess;
26use crate::pg_commands::PgCommand;
27use crate::pg_enums::{PgAuthMethod, PgServerStatus};
28use crate::pg_errors::{PgEmbedError, PgEmbedErrorType};
29use crate::pg_fetch;
30use crate::pg_types::PgResult;
31
32pub struct PgSettings {
36 pub database_dir: PathBuf,
38 pub port: u16,
40 pub user: String,
42 pub password: String,
44 pub auth_method: PgAuthMethod,
46 pub persistent: bool,
48 pub timeout: Option<Duration>,
51 pub migration_dir: Option<PathBuf>,
54}
55
56pub struct PgEmbed {
64 pub pg_settings: PgSettings,
66 pub fetch_settings: pg_fetch::PgFetchSettings,
68 pub db_uri: String,
70 pub server_status: Arc<Mutex<PgServerStatus>>,
72 pub shutting_down: bool,
73 pub pg_access: PgAccess,
75}
76
77impl Drop for PgEmbed {
78 fn drop(&mut self) {
79 if !self.shutting_down {
80 let _ = self.stop_db_sync();
81 }
82 if !&self.pg_settings.persistent {
83 let _ = &self.pg_access.clean();
84 }
85 }
86}
87
88impl PgEmbed {
89 pub async fn new(
93 pg_settings: PgSettings,
94 fetch_settings: pg_fetch::PgFetchSettings,
95 ) -> PgResult<Self> {
96 let password: &str = &pg_settings.password;
97 let db_uri = format!(
98 "postgres://{}:{}@localhost:{}",
99 &pg_settings.user,
100 &password,
101 pg_settings.port.to_string()
102 );
103 let pg_access = PgAccess::new(&fetch_settings, &pg_settings.database_dir).await?;
104 Ok(PgEmbed {
105 pg_settings,
106 fetch_settings,
107 db_uri,
108 server_status: Arc::new(Mutex::new(PgServerStatus::Uninitialized)),
109 shutting_down: false,
110 pg_access,
111 })
112 }
113
114 pub async fn setup(&mut self) -> PgResult<()> {
120 self.pg_access.maybe_acquire_postgres().await?;
121 self.pg_access
122 .create_password_file(self.pg_settings.password.as_bytes())
123 .await?;
124 if self.pg_access.db_files_exist().await? {
125 let mut server_status = self.server_status.lock().await;
126 *server_status = PgServerStatus::Initialized;
127 } else {
128 let _r = &self.init_db().await?;
129 }
130 Ok(())
131 }
132
133 pub async fn init_db(&mut self) -> PgResult<()> {
139 {
140 let mut server_status = self.server_status.lock().await;
141 *server_status = PgServerStatus::Initializing;
142 }
143
144 let mut executor = PgCommand::init_db_executor(
145 &self.pg_access.init_db_exe,
146 &self.pg_access.database_dir,
147 &self.pg_access.pw_file_path,
148 &self.pg_settings.user,
149 &self.pg_settings.auth_method,
150 )?;
151 let exit_status = executor.execute(self.pg_settings.timeout).await?;
152 let mut server_status = self.server_status.lock().await;
153 *server_status = exit_status;
154 Ok(())
155 }
156
157 pub async fn start_db(&mut self) -> PgResult<()> {
163 {
164 let mut server_status = self.server_status.lock().await;
165 *server_status = PgServerStatus::Starting;
166 }
167 self.shutting_down = false;
168 let mut executor = PgCommand::start_db_executor(
169 &self.pg_access.pg_ctl_exe,
170 &self.pg_access.database_dir,
171 &self.pg_settings.port,
172 )?;
173 let exit_status = executor.execute(self.pg_settings.timeout).await?;
174 let mut server_status = self.server_status.lock().await;
175 *server_status = exit_status;
176 Ok(())
177 }
178
179 pub async fn stop_db(&mut self) -> PgResult<()> {
185 {
186 let mut server_status = self.server_status.lock().await;
187 *server_status = PgServerStatus::Stopping;
188 }
189 self.shutting_down = true;
190 let mut executor =
191 PgCommand::stop_db_executor(&self.pg_access.pg_ctl_exe, &self.pg_access.database_dir)?;
192 let exit_status = executor.execute(self.pg_settings.timeout).await?;
193 let mut server_status = self.server_status.lock().await;
194 *server_status = exit_status;
195 Ok(())
196 }
197
198 pub fn stop_db_sync(&mut self) -> PgResult<()> {
204 self.shutting_down = true;
205 let mut stop_db_command = self
206 .pg_access
207 .stop_db_command_sync(&self.pg_settings.database_dir);
208 let process = stop_db_command
209 .get_mut()
210 .stdout(Stdio::piped())
211 .stderr(Stdio::piped())
212 .spawn()
213 .map_err(|e| PgEmbedError {
214 error_type: PgEmbedErrorType::PgError,
215 source: Some(Box::new(e)),
216 message: None,
217 })?;
218
219 self.handle_process_io_sync(process)
220 }
221
222 pub fn handle_process_io_sync(&self, mut process: std::process::Child) -> PgResult<()> {
226 let reader_out = std::io::BufReader::new(process.stdout.take().unwrap()).lines();
227 let reader_err = std::io::BufReader::new(process.stderr.take().unwrap()).lines();
228 reader_out.for_each(|line| info!("{}", line.unwrap()));
229 reader_err.for_each(|line| error!("{}", line.unwrap()));
230 Ok(())
231 }
232
233 #[cfg(any(
237 feature = "rt_tokio_migrate",
238 feature = "rt_async_std_migrate",
239 feature = "rt_actix_migrate"
240 ))]
241 pub async fn create_database(&self, db_name: &str) -> PgResult<()> {
242 Postgres::create_database(&self.full_db_uri(db_name))
243 .map_err(|e| PgEmbedError {
244 error_type: PgEmbedErrorType::PgTaskJoinError,
245 source: Some(Box::new(e)),
246 message: None,
247 })
248 .await?;
249 Ok(())
250 }
251
252 #[cfg(any(
256 feature = "rt_tokio_migrate",
257 feature = "rt_async_std_migrate",
258 feature = "rt_actix_migrate"
259 ))]
260 pub async fn drop_database(&self, db_name: &str) -> PgResult<()> {
261 Postgres::drop_database(&self.full_db_uri(db_name))
262 .map_err(|e| PgEmbedError {
263 error_type: PgEmbedErrorType::PgTaskJoinError,
264 source: Some(Box::new(e)),
265 message: None,
266 })
267 .await?;
268 Ok(())
269 }
270
271 #[cfg(any(
275 feature = "rt_tokio_migrate",
276 feature = "rt_async_std_migrate",
277 feature = "rt_actix_migrate"
278 ))]
279 pub async fn database_exists(&self, db_name: &str) -> PgResult<bool> {
280 let result = Postgres::database_exists(&self.full_db_uri(db_name))
281 .map_err(|e| PgEmbedError {
282 error_type: PgEmbedErrorType::PgTaskJoinError,
283 source: Some(Box::new(e)),
284 message: None,
285 })
286 .await?;
287 Ok(result)
288 }
289
290 pub fn full_db_uri(&self, db_name: &str) -> String {
296 format!("{}/{}", &self.db_uri, db_name)
297 }
298
299 #[cfg(any(
303 feature = "rt_tokio_migrate",
304 feature = "rt_async_std_migrate",
305 feature = "rt_actix_migrate"
306 ))]
307 pub async fn migrate(&self, db_name: &str) -> PgResult<()> {
308 if let Some(migration_dir) = &self.pg_settings.migration_dir {
309 let m = Migrator::new(migration_dir.as_path())
310 .map_err(|e| PgEmbedError {
311 error_type: PgEmbedErrorType::MigrationError,
312 source: Some(Box::new(e)),
313 message: None,
314 })
315 .await?;
316 let pool = PgPoolOptions::new()
317 .connect(&self.full_db_uri(db_name))
318 .map_err(|e| PgEmbedError {
319 error_type: PgEmbedErrorType::SqlQueryError,
320 source: Some(Box::new(e)),
321 message: None,
322 })
323 .await?;
324 m.run(&pool)
325 .map_err(|e| PgEmbedError {
326 error_type: PgEmbedErrorType::MigrationError,
327 source: Some(Box::new(e)),
328 message: None,
329 })
330 .await?;
331 }
332 Ok(())
333 }
334}