hyperlane_plugin/postgresql/
fn.rs1use super::*;
2
3#[instrument_trace]
4pub async fn connection_postgresql_db<I>(
5 instance_name: I,
6 schema: Option<DatabaseSchema>,
7) -> Result<DatabaseConnection, String>
8where
9 I: AsRef<str>,
10{
11 let instance_name_str: &str = instance_name.as_ref();
12 let env: &'static EnvConfig = get_global_env_config();
13 let instance: &PostgreSqlInstanceConfig = env
14 .get_postgresql_instance(instance_name_str)
15 .ok_or_else(|| format!("PostgreSQL instance '{instance_name_str}' not found"))?;
16 match perform_postgresql_auto_creation(instance, schema.clone()).await {
17 Ok(result) => {
18 if result.has_changes() {
19 database::AutoCreationLogger::log_auto_creation_complete(
20 database::PluginType::PostgreSQL,
21 &result,
22 )
23 .await;
24 }
25 }
26 Err(error) => {
27 database::AutoCreationLogger::log_auto_creation_error(
28 &error,
29 "Auto-creation process",
30 database::PluginType::PostgreSQL,
31 Some(instance.get_database().as_str()),
32 )
33 .await;
34 if !error.should_continue() {
35 return Err(error.to_string());
36 }
37 }
38 }
39 let db_url: String = instance.get_connection_url();
40 let timeout_duration: Duration = get_connection_timeout_duration();
41 let timeout_seconds: u64 = timeout_duration.as_secs();
42 let connection_result: Result<DatabaseConnection, DbErr> =
43 match timeout(timeout_duration, Database::connect(&db_url)).await {
44 Ok(result) => result,
45 Err(_) => Err(DbErr::Custom(format!(
46 "PostgreSQL connection timeout after {timeout_seconds} seconds"
47 ))),
48 };
49 connection_result.map_err(|error: DbErr| {
50 let error_msg: String = error.to_string();
51 let database_name: String = instance.get_database().clone();
52 let error_msg_clone: String = error_msg.clone();
53 tokio::spawn(async move {
54 database::AutoCreationLogger::log_connection_verification(
55 database::PluginType::PostgreSQL,
56 &database_name,
57 false,
58 Some(&error_msg_clone),
59 )
60 .await;
61 });
62 error_msg
63 })
64}
65
66#[instrument_trace]
67pub async fn get_postgresql_connection<I>(
68 instance_name: I,
69 schema: Option<DatabaseSchema>,
70) -> Result<DatabaseConnection, String>
71where
72 I: AsRef<str>,
73{
74 let instance_name_str: &str = instance_name.as_ref();
75 let duration: Duration = get_retry_duration();
76 {
77 if let Some(cache) = POSTGRESQL_CONNECTIONS.read().await.get(instance_name_str) {
78 match cache.try_get_result() {
79 Ok(conn) => return Ok(conn.clone()),
80 Err(error) => {
81 if !cache.is_expired(duration) {
82 return Err(error.clone());
83 }
84 }
85 }
86 }
87 }
88 let mut connections: RwLockWriteGuard<
89 '_,
90 HashMap<String, ConnectionCache<DatabaseConnection>>,
91 > = POSTGRESQL_CONNECTIONS.write().await;
92 if let Some(cache) = connections.get(instance_name_str) {
93 match cache.try_get_result() {
94 Ok(conn) => return Ok(conn.clone()),
95 Err(error) => {
96 if !cache.is_expired(duration) {
97 return Err(error.clone());
98 }
99 }
100 }
101 }
102 connections.remove(instance_name_str);
103 drop(connections);
104 let new_connection: Result<DatabaseConnection, String> =
105 connection_postgresql_db(instance_name_str, schema).await;
106 let mut connections: RwLockWriteGuard<
107 '_,
108 HashMap<String, ConnectionCache<DatabaseConnection>>,
109 > = POSTGRESQL_CONNECTIONS.write().await;
110 connections.insert(
111 instance_name_str.to_string(),
112 ConnectionCache::new(new_connection.clone()),
113 );
114 new_connection
115}
116
117#[instrument_trace]
118pub async fn perform_postgresql_auto_creation(
119 instance: &PostgreSqlInstanceConfig,
120 schema: Option<DatabaseSchema>,
121) -> Result<AutoCreationResult, AutoCreationError> {
122 let start_time: Instant = Instant::now();
123 let mut result: AutoCreationResult = AutoCreationResult::default();
124 AutoCreationLogger::log_auto_creation_start(
125 database::PluginType::PostgreSQL,
126 instance.get_database(),
127 )
128 .await;
129 let auto_creator: PostgreSqlAutoCreation = match schema {
130 Some(s) => PostgreSqlAutoCreation::with_schema(instance.clone(), s),
131 None => PostgreSqlAutoCreation::new(instance.clone()),
132 };
133 match auto_creator.create_database_if_not_exists().await {
134 Ok(created) => {
135 result.set_database_created(created);
136 }
137 Err(error) => {
138 AutoCreationLogger::log_auto_creation_error(
139 &error,
140 "Database creation",
141 database::PluginType::PostgreSQL,
142 Some(instance.get_database().as_str()),
143 )
144 .await;
145 if !error.should_continue() {
146 result.set_duration(start_time.elapsed());
147 return Err(error);
148 }
149 result.get_mut_errors().push(error.to_string());
150 }
151 }
152 match auto_creator.create_tables_if_not_exist().await {
153 Ok(tables) => {
154 result.set_tables_created(tables);
155 }
156 Err(error) => {
157 AutoCreationLogger::log_auto_creation_error(
158 &error,
159 "Table creation",
160 database::PluginType::PostgreSQL,
161 Some(instance.get_database().as_str()),
162 )
163 .await;
164 result.get_mut_errors().push(error.to_string());
165 }
166 }
167 if let Err(error) = auto_creator.verify_connection().await {
168 AutoCreationLogger::log_auto_creation_error(
169 &error,
170 "Connection verification",
171 database::PluginType::PostgreSQL,
172 Some(instance.get_database().as_str()),
173 )
174 .await;
175 if !error.should_continue() {
176 result.set_duration(start_time.elapsed());
177 return Err(error);
178 }
179 result.get_mut_errors().push(error.to_string());
180 }
181 result.set_duration(start_time.elapsed());
182 AutoCreationLogger::log_auto_creation_complete(database::PluginType::PostgreSQL, &result).await;
183 Ok(result)
184}