Skip to main content

hyperlane_plugin/postgresql/
fn.rs

1use super::*;
2
3#[instrument_trace]
4pub async fn connection_postgresql_db<I>(
5    instance_name: I,
6    schema: Option<DatabaseSchema>,
7) -> Result<DatabaseConnection, String>
8where
9    I: AsRef<str>,
10{
11    let instance_name_str: &str = instance_name.as_ref();
12    let env: &'static EnvConfig = get_global_env_config();
13    let instance: &PostgreSqlInstanceConfig = env
14        .get_postgresql_instance(instance_name_str)
15        .ok_or_else(|| format!("PostgreSQL instance '{instance_name_str}' not found"))?;
16    match perform_postgresql_auto_creation(instance, schema.clone()).await {
17        Ok(result) => {
18            if result.has_changes() {
19                database::AutoCreationLogger::log_auto_creation_complete(
20                    database::PluginType::PostgreSQL,
21                    &result,
22                )
23                .await;
24            }
25        }
26        Err(error) => {
27            database::AutoCreationLogger::log_auto_creation_error(
28                &error,
29                "Auto-creation process",
30                database::PluginType::PostgreSQL,
31                Some(instance.get_database().as_str()),
32            )
33            .await;
34            if !error.should_continue() {
35                return Err(error.to_string());
36            }
37        }
38    }
39    let db_url: String = instance.get_connection_url();
40    let timeout_duration: Duration = get_connection_timeout_duration();
41    let timeout_seconds: u64 = timeout_duration.as_secs();
42    let connection_result: Result<DatabaseConnection, DbErr> =
43        match timeout(timeout_duration, Database::connect(&db_url)).await {
44            Ok(result) => result,
45            Err(_) => Err(DbErr::Custom(format!(
46                "PostgreSQL connection timeout after {timeout_seconds} seconds"
47            ))),
48        };
49    connection_result.map_err(|error: DbErr| {
50        let error_msg: String = error.to_string();
51        let database_name: String = instance.get_database().clone();
52        let error_msg_clone: String = error_msg.clone();
53        tokio::spawn(async move {
54            database::AutoCreationLogger::log_connection_verification(
55                database::PluginType::PostgreSQL,
56                &database_name,
57                false,
58                Some(&error_msg_clone),
59            )
60            .await;
61        });
62        error_msg
63    })
64}
65
66#[instrument_trace]
67pub async fn get_postgresql_connection<I>(
68    instance_name: I,
69    schema: Option<DatabaseSchema>,
70) -> Result<DatabaseConnection, String>
71where
72    I: AsRef<str>,
73{
74    let instance_name_str: &str = instance_name.as_ref();
75    let duration: Duration = get_retry_duration();
76    {
77        if let Some(cache) = POSTGRESQL_CONNECTIONS.read().await.get(instance_name_str) {
78            match cache.try_get_result() {
79                Ok(conn) => return Ok(conn.clone()),
80                Err(error) => {
81                    if !cache.is_expired(duration) {
82                        return Err(error.clone());
83                    }
84                }
85            }
86        }
87    }
88    let mut connections: RwLockWriteGuard<
89        '_,
90        HashMap<String, ConnectionCache<DatabaseConnection>>,
91    > = POSTGRESQL_CONNECTIONS.write().await;
92    if let Some(cache) = connections.get(instance_name_str) {
93        match cache.try_get_result() {
94            Ok(conn) => return Ok(conn.clone()),
95            Err(error) => {
96                if !cache.is_expired(duration) {
97                    return Err(error.clone());
98                }
99            }
100        }
101    }
102    connections.remove(instance_name_str);
103    drop(connections);
104    let new_connection: Result<DatabaseConnection, String> =
105        connection_postgresql_db(instance_name_str, schema).await;
106    let mut connections: RwLockWriteGuard<
107        '_,
108        HashMap<String, ConnectionCache<DatabaseConnection>>,
109    > = POSTGRESQL_CONNECTIONS.write().await;
110    connections.insert(
111        instance_name_str.to_string(),
112        ConnectionCache::new(new_connection.clone()),
113    );
114    new_connection
115}
116
117#[instrument_trace]
118pub async fn perform_postgresql_auto_creation(
119    instance: &PostgreSqlInstanceConfig,
120    schema: Option<DatabaseSchema>,
121) -> Result<AutoCreationResult, AutoCreationError> {
122    let start_time: Instant = Instant::now();
123    let mut result: AutoCreationResult = AutoCreationResult::default();
124    AutoCreationLogger::log_auto_creation_start(
125        database::PluginType::PostgreSQL,
126        instance.get_database(),
127    )
128    .await;
129    let auto_creator: PostgreSqlAutoCreation = match schema {
130        Some(s) => PostgreSqlAutoCreation::with_schema(instance.clone(), s),
131        None => PostgreSqlAutoCreation::new(instance.clone()),
132    };
133    match auto_creator.create_database_if_not_exists().await {
134        Ok(created) => {
135            result.set_database_created(created);
136        }
137        Err(error) => {
138            AutoCreationLogger::log_auto_creation_error(
139                &error,
140                "Database creation",
141                database::PluginType::PostgreSQL,
142                Some(instance.get_database().as_str()),
143            )
144            .await;
145            if !error.should_continue() {
146                result.set_duration(start_time.elapsed());
147                return Err(error);
148            }
149            result.get_mut_errors().push(error.to_string());
150        }
151    }
152    match auto_creator.create_tables_if_not_exist().await {
153        Ok(tables) => {
154            result.set_tables_created(tables);
155        }
156        Err(error) => {
157            AutoCreationLogger::log_auto_creation_error(
158                &error,
159                "Table creation",
160                database::PluginType::PostgreSQL,
161                Some(instance.get_database().as_str()),
162            )
163            .await;
164            result.get_mut_errors().push(error.to_string());
165        }
166    }
167    if let Err(error) = auto_creator.verify_connection().await {
168        AutoCreationLogger::log_auto_creation_error(
169            &error,
170            "Connection verification",
171            database::PluginType::PostgreSQL,
172            Some(instance.get_database().as_str()),
173        )
174        .await;
175        if !error.should_continue() {
176            result.set_duration(start_time.elapsed());
177            return Err(error);
178        }
179        result.get_mut_errors().push(error.to_string());
180    }
181    result.set_duration(start_time.elapsed());
182    AutoCreationLogger::log_auto_creation_complete(database::PluginType::PostgreSQL, &result).await;
183    Ok(result)
184}