1use rand::RngExt;
2
3use crate::LOCALHOST_HOST_ADDR;
4use crate::LOCALHOST_IP;
5use crate::UNSPECIFIED_IP;
6use crate::certificate;
7use crate::definition;
8
9pub const PGDATA: &str = "/var/lib/pg-ephemeral";
10
11static HOST_HAS_TTY: std::sync::LazyLock<bool> = std::sync::LazyLock::new(|| {
26 use std::io::IsTerminal;
27 std::io::stdin().is_terminal() && std::io::stdout().is_terminal()
28});
29
30pub(crate) trait TtyIfTerminal: Sized {
42 fn tty_if_terminal(self) -> Self;
43}
44
45impl TtyIfTerminal for ociman::ExecCommand<'_> {
46 fn tty_if_terminal(self) -> Self {
47 if *HOST_HAS_TTY { self.tty() } else { self }
48 }
49}
50
51impl TtyIfTerminal for ociman::Definition {
52 fn tty_if_terminal(self) -> Self {
53 if *HOST_HAS_TTY { self.tty() } else { self }
54 }
55}
56
57#[derive(Debug, thiserror::Error)]
58pub enum Error {
59 #[error("PostgreSQL did not become available within {timeout:?}")]
60 ConnectionTimeout {
61 timeout: std::time::Duration,
62 #[source]
63 source: Option<sqlx::Error>,
64 },
65 #[error("Failed to execute command in container")]
66 ContainerExec(#[from] cmd_proc::CommandError),
67 #[error("Failed to apply pg-ephemeral metadata labels")]
68 ApplyLabels(#[from] crate::label::ApplyError),
69 #[error("Failed to decode pg-ephemeral metadata labels")]
70 DecodeMetadataLabels(#[from] crate::label::ReadError),
71 #[error("Failed to inspect cache image {reference}")]
72 InspectImage {
73 reference: ociman::Reference,
74 #[source]
75 source: ociman::label::ImageError,
76 },
77 #[error("Failed to serialize pg-ephemeral metadata as JSON")]
78 SerializeMetadata(#[source] serde_json::Error),
79 #[error("Failed to materialize CA certificate")]
80 WriteCaCert(#[from] crate::certificate::WriteCaPemError),
81 #[error("Failed to start container")]
82 RunDetached(#[from] ociman::RunDetachedError),
83 #[error("Failed to read host TCP port from container")]
84 ReadHostTcpPort(#[from] ociman::ReadHostTcpPortError),
85 #[error(transparent)]
86 SeedApply(#[from] crate::definition::SeedApplyError),
87 #[error(transparent)]
88 SeedLoad(#[from] crate::seed::LoadError),
89 #[error("Failed to terminate backend connections")]
90 TerminateConnections(#[source] sqlx::Error),
91 #[error("Failed to checkpoint")]
92 Checkpoint(#[source] sqlx::Error),
93 #[error("Failed to stop container")]
94 ContainerStop(#[source] cmd_proc::CommandError),
95 #[error("Failed to remove container")]
96 ContainerRemove(#[source] cmd_proc::CommandError),
97 #[error(transparent)]
98 EnvVariableValue(#[from] cmd_proc::EnvVariableValueError),
99 #[error(
100 "Parameter `{name}` conflicts with ssl_config; pg-ephemeral controls this parameter when ssl_config is set"
101 )]
102 ParameterConflictsWithSslConfig { name: pg_client::parameter::Name },
103}
104
105#[derive(Debug, thiserror::Error)]
106pub enum AttachSessionError {
107 #[error("Failed to read host TCP port from session container")]
108 ReadHostTcpPort(#[from] ociman::ReadHostTcpPortError),
109 #[error("Failed to read session container labels")]
110 ReadLabels(#[from] ociman::label::ContainerError),
111 #[error("Failed to decode pg-ephemeral metadata from session labels")]
112 DecodeMetadata(#[from] crate::label::ReadError),
113 #[error("Failed to materialize pg-ephemeral client config from session labels")]
114 PrepareConfig(#[from] crate::label::PrepareConfigError),
115}
116
117const PG_CONTAINER_PORT: u16 = 5432;
121
122const ENV_POSTGRES_PASSWORD: cmd_proc::EnvVariableName =
123 cmd_proc::EnvVariableName::from_static_or_panic("POSTGRES_PASSWORD");
124const ENV_POSTGRES_USER: cmd_proc::EnvVariableName =
125 cmd_proc::EnvVariableName::from_static_or_panic("POSTGRES_USER");
126const ENV_PGDATA: cmd_proc::EnvVariableName =
127 cmd_proc::EnvVariableName::from_static_or_panic("PGDATA");
128const ENV_PG_EPHEMERAL_SSL_DIR: cmd_proc::EnvVariableName =
129 cmd_proc::EnvVariableName::from_static_or_panic("PG_EPHEMERAL_SSL_DIR");
130const ENV_PG_EPHEMERAL_CA_CERT_PEM: cmd_proc::EnvVariableName =
131 cmd_proc::EnvVariableName::from_static_or_panic("PG_EPHEMERAL_CA_CERT_PEM");
132const ENV_PG_EPHEMERAL_SERVER_CERT_PEM: cmd_proc::EnvVariableName =
133 cmd_proc::EnvVariableName::from_static_or_panic("PG_EPHEMERAL_SERVER_CERT_PEM");
134const ENV_PG_EPHEMERAL_SERVER_KEY_PEM: cmd_proc::EnvVariableName =
135 cmd_proc::EnvVariableName::from_static_or_panic("PG_EPHEMERAL_SERVER_KEY_PEM");
136
137const SSL_DIR: &str = "/var/lib/postgresql";
138
139static PARAM_SSL: pg_client::parameter::Name =
143 pg_client::parameter::Name::from_static_or_panic("ssl");
144static PARAM_SSL_CERT_FILE: pg_client::parameter::Name =
145 pg_client::parameter::Name::from_static_or_panic("ssl_cert_file");
146static PARAM_SSL_KEY_FILE: pg_client::parameter::Name =
147 pg_client::parameter::Name::from_static_or_panic("ssl_key_file");
148static PARAM_SSL_CA_FILE: pg_client::parameter::Name =
149 pg_client::parameter::Name::from_static_or_panic("ssl_ca_file");
150
151static VALUE_ON: pg_client::parameter::Value =
155 pg_client::parameter::Value::from_static_or_panic("on");
156static VALUE_SSL_CERT_PATH: pg_client::parameter::Value =
157 pg_client::parameter::Value::from_static_or_panic("/var/lib/postgresql/server.crt");
158static VALUE_SSL_KEY_PATH: pg_client::parameter::Value =
159 pg_client::parameter::Value::from_static_or_panic("/var/lib/postgresql/server.key");
160static VALUE_SSL_CA_PATH: pg_client::parameter::Value =
161 pg_client::parameter::Value::from_static_or_panic("/var/lib/postgresql/root.crt");
162
163const SSL_FILE_STAGING_SCRIPT: &str = r#"
170printf '%s' "$PG_EPHEMERAL_CA_CERT_PEM" > ${PG_EPHEMERAL_SSL_DIR}/root.crt
171printf '%s' "$PG_EPHEMERAL_SERVER_CERT_PEM" > ${PG_EPHEMERAL_SSL_DIR}/server.crt
172printf '%s' "$PG_EPHEMERAL_SERVER_KEY_PEM" > ${PG_EPHEMERAL_SSL_DIR}/server.key
173chown postgres ${PG_EPHEMERAL_SSL_DIR}/root.crt
174chown postgres ${PG_EPHEMERAL_SSL_DIR}/server.crt
175chown postgres ${PG_EPHEMERAL_SSL_DIR}/server.key
176chmod 600 ${PG_EPHEMERAL_SSL_DIR}/root.crt
177chmod 600 ${PG_EPHEMERAL_SSL_DIR}/server.crt
178chmod 600 ${PG_EPHEMERAL_SSL_DIR}/server.key
179exec docker-entrypoint.sh "$@"
180"#;
181
182#[derive(Debug)]
183pub struct Container {
184 host_port: pg_client::config::Port,
185 pub(crate) client_config: pg_client::Config,
186 container: ociman::Container,
187 backend: ociman::Backend,
188}
189
190impl Container {
191 pub(crate) async fn run_definition(
192 definition: &crate::definition::Definition,
193 seeds: &[crate::label::SeedEntry],
194 ) -> Result<Self, Error> {
195 if definition.ssl_config.is_some() {
196 for reserved in [
197 &PARAM_SSL,
198 &PARAM_SSL_CERT_FILE,
199 &PARAM_SSL_KEY_FILE,
200 &PARAM_SSL_CA_FILE,
201 ] {
202 if definition.parameters.contains_key(reserved) {
203 return Err(Error::ParameterConflictsWithSslConfig {
204 name: reserved.clone(),
205 });
206 }
207 }
208 }
209
210 let password = generate_password();
211
212 let host_ip = if definition.cross_container_access {
213 UNSPECIFIED_IP
214 } else {
215 LOCALHOST_IP
216 };
217
218 let mut ociman_definition = definition
219 .to_ociman_definition()
220 .environment_variable(
221 ENV_POSTGRES_PASSWORD,
222 password.as_ref().parse::<cmd_proc::EnvVariableValue>()?,
223 )
224 .environment_variable(
225 ENV_POSTGRES_USER,
226 definition
227 .superuser
228 .as_ref()
229 .parse::<cmd_proc::EnvVariableValue>()?,
230 )
231 .environment_variable(ENV_PGDATA, "/var/lib/pg-ephemeral")
232 .publish(ociman::Publish::tcp(PG_CONTAINER_PORT).host_ip(host_ip));
233
234 if definition.remove {
235 ociman_definition = ociman_definition.remove();
236 }
237
238 let mut effective_parameters: std::collections::BTreeMap<
239 &pg_client::parameter::Name,
240 &pg_client::parameter::Value,
241 > = definition.parameters.iter().collect();
242
243 let ssl_bundle = if let Some(ssl_config) = &definition.ssl_config {
244 let definition::SslConfig::Generated { hostname } = ssl_config;
245 let bundle = certificate::Bundle::generate(hostname.as_str())
246 .expect("Failed to generate SSL certificate bundle");
247
248 ociman_definition = ociman_definition
249 .entrypoint("sh")
250 .argument("-e")
251 .argument("-c")
252 .argument(SSL_FILE_STAGING_SCRIPT)
253 .argument("--")
254 .argument("postgres")
255 .environment_variable(ENV_PG_EPHEMERAL_SSL_DIR, SSL_DIR)
256 .environment_variable(
257 ENV_PG_EPHEMERAL_CA_CERT_PEM,
258 bundle.ca_cert_pem.parse::<cmd_proc::EnvVariableValue>()?,
259 )
260 .environment_variable(
261 ENV_PG_EPHEMERAL_SERVER_CERT_PEM,
262 bundle
263 .server_cert_pem
264 .parse::<cmd_proc::EnvVariableValue>()?,
265 )
266 .environment_variable(
267 ENV_PG_EPHEMERAL_SERVER_KEY_PEM,
268 bundle
269 .server_key_pem
270 .parse::<cmd_proc::EnvVariableValue>()?,
271 );
272
273 effective_parameters.insert(&PARAM_SSL, &VALUE_ON);
274 effective_parameters.insert(&PARAM_SSL_CERT_FILE, &VALUE_SSL_CERT_PATH);
275 effective_parameters.insert(&PARAM_SSL_KEY_FILE, &VALUE_SSL_KEY_PATH);
276 effective_parameters.insert(&PARAM_SSL_CA_FILE, &VALUE_SSL_CA_PATH);
277
278 Some(bundle)
279 } else {
280 None
281 };
282
283 for (name, value) in &effective_parameters {
293 ociman_definition = ociman_definition
294 .argument("-c")
295 .argument(format!("{name}={value}"));
296 }
297
298 ociman_definition = crate::label::apply(
299 ociman_definition,
300 definition,
301 &password,
302 ssl_bundle.as_ref(),
303 seeds,
304 )?;
305
306 let container = ociman_definition.run_detached().await?;
307 let port: pg_client::config::Port = container
308 .read_host_tcp_port(PG_CONTAINER_PORT)
309 .await?
310 .into();
311
312 let (host, host_addr, ssl_mode, ssl_root_cert) =
313 if let Some(ssl_config) = &definition.ssl_config {
314 let definition::SslConfig::Generated { hostname } = ssl_config;
315
316 let ca_cert_path = crate::certificate::write_ca_pem_to_temp(
317 ssl_bundle.as_ref().unwrap().ca_cert_pem.as_bytes(),
318 )?;
319
320 (
321 pg_client::config::Host::HostName(hostname.clone()),
322 Some(LOCALHOST_HOST_ADDR),
323 pg_client::config::SslMode::VerifyFull,
324 Some(pg_client::config::SslRootCert::File(ca_cert_path)),
325 )
326 } else {
327 (
328 pg_client::config::Host::IpAddr(LOCALHOST_IP),
329 None,
330 pg_client::config::SslMode::Disable,
331 None,
332 )
333 };
334
335 let client_config = pg_client::Config {
336 endpoint: pg_client::config::Endpoint::Network {
337 host,
338 channel_binding: None,
339 host_addr,
340 port: Some(port),
341 },
342 session: pg_client::config::Session {
343 application_name: definition.application_name.clone(),
344 database: definition.database.clone(),
345 password: Some(password.clone()),
346 user: definition.superuser.clone(),
347 },
348 ssl_mode,
349 ssl_root_cert,
350 sqlx: Default::default(),
351 };
352
353 Ok(Container {
354 host_port: port,
355 container,
356 backend: definition.backend.clone(),
357 client_config,
358 })
359 }
360
361 pub async fn attach_session(
369 session: crate::session::Session,
370 backend: ociman::Backend,
371 ) -> Result<Self, AttachSessionError> {
372 let container = session.into_ociman_container();
373 let port: pg_client::config::Port = container
374 .read_host_tcp_port(PG_CONTAINER_PORT)
375 .await?
376 .into();
377 let labels = container.labels().await?;
378 let metadata = crate::label::read_container(&labels)?;
379
380 let (host, host_addr) = match &metadata.ssl {
381 Some(ssl) => (
382 pg_client::config::Host::HostName(ssl.hostname.clone()),
383 Some(LOCALHOST_HOST_ADDR),
384 ),
385 None => (pg_client::config::Host::IpAddr(LOCALHOST_IP), None),
386 };
387
388 let client_config = metadata.prepare_config(host, host_addr, port)?;
389
390 Ok(Container {
391 host_port: port,
392 container,
393 backend,
394 client_config,
395 })
396 }
397
398 pub async fn wait_available(&self, timeout: std::time::Duration) -> Result<(), Error> {
399 let config = self.client_config.to_sqlx_connect_options().unwrap();
400
401 let start = std::time::Instant::now();
402 let max_duration = timeout;
403 let sleep_duration = std::time::Duration::from_millis(100);
404
405 let mut last_error: Option<sqlx::Error> = None;
406
407 while start.elapsed() <= max_duration {
408 log::trace!("connection attempt");
409 match sqlx::ConnectOptions::connect(&config).await {
410 Ok(connection) => {
411 sqlx::Connection::close(connection)
412 .await
413 .expect("connection close failed");
414
415 log::debug!(
416 "pg is available on endpoint: {:#?}",
417 self.client_config.endpoint
418 );
419
420 return Ok(());
421 }
422 Err(error) => {
423 log::trace!("{error:#?}, retry in 100ms");
424 last_error = Some(error);
425 }
426 }
427 tokio::time::sleep(sleep_duration).await;
428 }
429
430 Err(Error::ConnectionTimeout {
431 timeout: max_duration,
432 source: last_error,
433 })
434 }
435
436 pub async fn exec_schema_dump(
437 &self,
438 pg_schema_dump: &pg_client::PgSchemaDump,
439 ) -> Result<String, Error> {
440 let output = self
441 .container
442 .exec("pg_dump")
443 .arguments(pg_schema_dump.arguments())
444 .environment_variables(self.container_client_config().pg_env()?)
445 .to_cmd_proc_command()
446 .stdout_capture()
447 .bytes()
448 .await
449 .unwrap();
450 Ok(crate::convert_schema(&output))
451 }
452
453 #[must_use]
454 pub fn client_config(&self) -> &pg_client::Config {
455 &self.client_config
456 }
457
458 pub async fn labels(
459 &self,
460 ) -> Result<ociman::label::ContainerLabels, ociman::label::ContainerError> {
461 self.container.labels().await
462 }
463
464 pub async fn with_connection<T, F: AsyncFnMut(&mut sqlx::postgres::PgConnection) -> T>(
465 &self,
466 mut action: F,
467 ) -> T {
468 self.client_config
469 .with_sqlx_connection(async |connection| action(connection).await)
470 .await
471 .unwrap()
472 }
473
474 pub async fn apply_sql(&self, sql: &str) -> Result<(), sqlx::Error> {
475 self.with_connection(async |connection| {
476 log::debug!("Executing: {sql}");
477 sqlx::raw_sql(sqlx::AssertSqlSafe(sql))
478 .execute(connection)
479 .await
480 .map(|_| ())
481 })
482 .await
483 }
484
485 pub async fn apply_csv(
489 &self,
490 table: &pg_client::QualifiedTable,
491 delimiter: char,
492 content: &str,
493 ) -> Result<(), sqlx::Error> {
494 self.with_connection(async |connection| {
495 let header_line = content.lines().next().unwrap_or_default();
496
497 let columns: Vec<&str> = header_line.split(delimiter).map(str::trim).collect();
498
499 let row = sqlx::query(
500 r#"SELECT 'COPY ' || format('%I.%I', $1, $2)
501 || '(' || (SELECT string_agg(format('%I', "column"), ', ') FROM unnest($3::text[]) AS "column") || ')'
502 || ' FROM STDIN WITH (FORMAT csv, HEADER MATCH, DELIMITER ' || quote_literal($4) || ')'
503 AS statement"#,
504 )
505 .bind(table.schema.as_ref())
506 .bind(table.table.as_ref())
507 .bind(&columns)
508 .bind(delimiter.to_string())
509 .fetch_one(&mut *connection)
510 .await?;
511 let statement: String = sqlx::Row::get(&row, "statement");
512
513 log::debug!("Executing: {statement}");
514 let mut copy = connection.copy_in_raw(&statement).await?;
515 copy.send(content.as_bytes()).await?;
516 copy.finish().await?;
517 Ok(())
518 })
519 .await
520 }
521
522 pub(crate) async fn exec_container_script(
523 &self,
524 script: &str,
525 ) -> Result<(), cmd_proc::CommandError> {
526 self.container
527 .exec("sh")
528 .arguments(["-e", "-c", script])
529 .to_cmd_proc_command()
530 .status()
531 .await
532 }
533
534 pub(crate) async fn exec_container_shell(&self) -> Result<(), Error> {
535 self.container
536 .exec("sh")
537 .environment_variables(self.container_client_config().pg_env()?)
538 .tty_if_terminal()
539 .interactive()
540 .status()
541 .await
542 .unwrap();
543 Ok(())
544 }
545
546 pub(crate) async fn exec_pgbench(&self, arguments: &[String]) -> Result<(), Error> {
547 let mut env = self.container_client_config().pg_env()?;
548 env.insert(
549 cmd_proc::EnvVariableName::from_static_or_panic("PGHOST"),
550 cmd_proc::EnvVariableValue::from_static_or_panic("/var/run/postgresql"),
551 );
552 self.container
553 .exec("pgbench")
554 .environment_variables(env)
555 .arguments(arguments.iter().cloned())
556 .status()
557 .await
558 .unwrap();
559 Ok(())
560 }
561
562 pub(crate) async fn exec_psql(&self) -> Result<(), Error> {
563 self.container
564 .exec("psql")
565 .environment_variables(self.container_client_config().pg_env()?)
566 .tty_if_terminal()
567 .interactive()
568 .status()
569 .await
570 .unwrap();
571 Ok(())
572 }
573
574 pub async fn exec_run_env(&self, command: &str, arguments: &[String]) -> Result<(), Error> {
575 let config = self.container_unix_socket_config();
576 self.container
577 .exec(command)
578 .arguments(arguments.iter().cloned())
579 .environment_variables(config.pg_env()?)
580 .environment_variable(
581 crate::ENV_DATABASE_URL,
582 config
583 .to_url_string()
584 .parse::<cmd_proc::EnvVariableValue>()?,
585 )
586 .tty_if_terminal()
587 .interactive()
588 .status()
589 .await?;
590 Ok(())
591 }
592
593 fn transparent_user(&self) -> (rustix::process::Uid, rustix::process::Gid) {
607 if self.backend.is_rootless() {
608 (rustix::process::Uid::ROOT, rustix::process::Gid::ROOT)
609 } else {
610 (rustix::process::getuid(), rustix::process::getgid())
611 }
612 }
613
614 #[allow(
623 clippy::result_large_err,
624 reason = "container::Error aggregates diagnostic-rich variants; this private helper is called once per CLI invocation, not on a hot path where the 128-byte threshold matters"
625 )]
626 fn exec_transparent(
627 &self,
628 executable: &str,
629 workdir: &crate::definition::TransparentWorkdir,
630 ) -> Result<ociman::ExecCommand<'_>, Error> {
631 let (uid, gid) = self.transparent_user();
632 Ok(self
633 .container
634 .exec(executable)
635 .environment_variables(self.container_unix_socket_config().pg_env()?)
636 .workdir(workdir.as_str())
637 .user(uid, gid))
638 }
639
640 async fn exec_transparent_interactive(
645 &self,
646 executable: &str,
647 workdir: &crate::definition::TransparentWorkdir,
648 arguments: &[String],
649 ) -> Result<(), Error> {
650 self.exec_transparent(executable, workdir)?
651 .arguments(arguments.iter().cloned())
652 .tty_if_terminal()
653 .interactive()
654 .status()
655 .await?;
656 Ok(())
657 }
658
659 pub async fn exec_transparent_psql(
660 &self,
661 workdir: &crate::definition::TransparentWorkdir,
662 ) -> Result<(), Error> {
663 self.exec_transparent_interactive("psql", workdir, &[])
664 .await
665 }
666
667 pub async fn exec_transparent_run_env(
668 &self,
669 workdir: &crate::definition::TransparentWorkdir,
670 command: &str,
671 arguments: &[String],
672 ) -> Result<(), Error> {
673 let database_url = self
674 .container_unix_socket_config()
675 .to_url_string()
676 .parse::<cmd_proc::EnvVariableValue>()?;
677 self.exec_transparent(command, workdir)?
678 .arguments(arguments.iter().cloned())
679 .environment_variable(crate::ENV_DATABASE_URL, database_url)
680 .tty_if_terminal()
681 .interactive()
682 .status()
683 .await?;
684 Ok(())
685 }
686
687 pub async fn exec_transparent_schema_dump(
688 &self,
689 workdir: &crate::definition::TransparentWorkdir,
690 pg_schema_dump: &pg_client::PgSchemaDump,
691 ) -> Result<String, Error> {
692 let output = self
693 .exec_transparent("pg_dump", workdir)?
694 .arguments(pg_schema_dump.arguments())
695 .to_cmd_proc_command()
696 .stdout_capture()
697 .bytes()
698 .await?;
699 Ok(crate::convert_schema(&output))
700 }
701
702 pub async fn exec_transparent_shell(
703 &self,
704 workdir: &crate::definition::TransparentWorkdir,
705 ) -> Result<(), Error> {
706 self.exec_transparent_interactive("sh", workdir, &[]).await
707 }
708
709 pub async fn exec_transparent_pgbench(
710 &self,
711 workdir: &crate::definition::TransparentWorkdir,
712 arguments: &[String],
713 ) -> Result<(), Error> {
714 self.exec_transparent_interactive("pgbench", workdir, arguments)
715 .await
716 }
717
718 fn container_client_config(&self) -> pg_client::Config {
719 let mut config = self.client_config.clone();
720 if let pg_client::config::Endpoint::Network {
721 ref host,
722 ref channel_binding,
723 ref host_addr,
724 ..
725 } = config.endpoint
726 {
727 config.endpoint = pg_client::config::Endpoint::Network {
728 host: host.clone(),
729 channel_binding: *channel_binding,
730 host_addr: host_addr.clone(),
731 port: Some(pg_client::config::Port::new(PG_CONTAINER_PORT)),
732 };
733 }
734 config
735 }
736
737 fn container_unix_socket_config(&self) -> pg_client::Config {
738 let mut config = self.client_config.clone();
739 config.endpoint = pg_client::config::Endpoint::SocketPath(std::path::PathBuf::from(
740 "/var/run/postgresql",
741 ));
742 config.ssl_mode = pg_client::config::SslMode::Disable;
743 config.ssl_root_cert = None;
744 config
745 }
746
747 pub async fn cross_container_client_config(&self) -> pg_client::Config {
748 let ip_address = self
751 .backend
752 .resolve_container_host()
753 .await
754 .expect("Failed to resolve container host from container");
755
756 let channel_binding = match &self.client_config.endpoint {
757 pg_client::config::Endpoint::Network {
758 channel_binding, ..
759 } => *channel_binding,
760 pg_client::config::Endpoint::SocketPath(_) => None,
761 };
762
763 let endpoint = pg_client::config::Endpoint::Network {
764 host: pg_client::config::Host::IpAddr(ip_address),
765 channel_binding,
766 host_addr: None,
767 port: Some(self.host_port),
768 };
769
770 self.client_config.clone().endpoint(endpoint)
771 }
772
773 pub fn pg_env(
774 &self,
775 ) -> Result<
776 std::collections::BTreeMap<cmd_proc::EnvVariableName, cmd_proc::EnvVariableValue>,
777 cmd_proc::EnvVariableValueError,
778 > {
779 self.client_config.pg_env()
780 }
781
782 #[must_use]
783 pub fn database_url(&self) -> String {
784 self.client_config.to_url_string()
785 }
786
787 pub async fn stop(&mut self) -> Result<(), Error> {
788 self.container.stop().await.map_err(Error::ContainerStop)
789 }
790
791 async fn terminate_connections(&self) -> Result<(), Error> {
792 self.apply_sql(
793 "SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE pid <> pg_backend_pid() AND backend_type = 'client backend'",
794 )
795 .await
796 .map_err(Error::TerminateConnections)
797 }
798
799 async fn checkpoint(&self) -> Result<(), Error> {
800 self.apply_sql("CHECKPOINT")
801 .await
802 .map_err(Error::Checkpoint)
803 }
804
805 pub(crate) async fn stop_commit_remove(
808 &mut self,
809 reference: &ociman::Reference,
810 ) -> Result<(), Error> {
811 self.terminate_connections().await?;
812 self.checkpoint().await?;
813 self.container.stop().await.map_err(Error::ContainerStop)?;
814 self.container.commit(reference, false).await.unwrap();
815 self.container
816 .remove()
817 .await
818 .map_err(Error::ContainerRemove)?;
819 Ok(())
820 }
821
822 async fn wait_for_container_socket(&self, timeout: std::time::Duration) -> Result<(), Error> {
823 let start = std::time::Instant::now();
824 let max_duration = timeout;
825 let sleep_duration = std::time::Duration::from_millis(100);
826
827 while start.elapsed() <= max_duration {
828 if self
829 .container
830 .exec("pg_isready")
831 .argument("--host")
832 .argument("localhost")
833 .to_cmd_proc_command()
834 .stdout_capture()
835 .bytes()
836 .await
837 .is_ok()
838 {
839 return Ok(());
840 }
841 tokio::time::sleep(sleep_duration).await;
842 }
843
844 Err(Error::ConnectionTimeout {
845 timeout: max_duration,
846 source: None,
847 })
848 }
849
850 pub async fn set_superuser_password(
855 &self,
856 password: &pg_client::config::Password,
857 timeout: std::time::Duration,
858 ) -> Result<(), Error> {
859 self.wait_for_container_socket(timeout).await?;
860
861 self.container
862 .exec("psql")
863 .argument("--host")
864 .argument("/var/run/postgresql")
865 .argument("--username")
866 .argument(self.client_config.session.user.as_ref())
867 .argument("--dbname")
868 .argument("postgres")
869 .argument("--variable")
870 .argument(format!(
871 "target_user={}",
872 self.client_config.session.user.as_ref()
873 ))
874 .argument("--variable")
875 .argument(format!("new_password={}", password.as_ref()))
876 .stdin("ALTER USER :target_user WITH PASSWORD :'new_password'")
877 .to_cmd_proc_command()
878 .stdout_capture()
879 .bytes()
880 .await?;
881
882 Ok(())
883 }
884}
885
886fn generate_password() -> pg_client::config::Password {
887 let value: String = rand::rng()
888 .sample_iter(rand::distr::Alphanumeric)
889 .take(32)
890 .map(char::from)
891 .collect();
892
893 <pg_client::config::Password as std::str::FromStr>::from_str(&value).unwrap()
894}