Skip to main content

hyperlane_plugin/database/
impl.rs

1use super::*;
2
3impl fmt::Display for PluginType {
4    #[instrument_trace]
5    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
6        match self {
7            Self::MySQL => write!(f, "MySQL"),
8            Self::PostgreSQL => write!(f, "PostgreSQL"),
9            Self::Redis => write!(f, "Redis"),
10        }
11    }
12}
13
14impl FromStr for PluginType {
15    type Err = ();
16
17    #[instrument_trace]
18    fn from_str(s: &str) -> Result<Self, Self::Err> {
19        match s {
20            "MySQL" => Ok(Self::MySQL),
21            "PostgreSQL" => Ok(Self::PostgreSQL),
22            "Redis" => Ok(Self::Redis),
23            _ => Err(()),
24        }
25    }
26}
27
28impl std::fmt::Display for AutoCreationError {
29    #[instrument_trace]
30    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
31        match self {
32            Self::InsufficientPermissions(msg) => {
33                write!(f, "Insufficient permissions {msg}")
34            }
35            Self::ConnectionFailed(msg) => write!(f, "Connection failed {msg}"),
36            Self::SchemaError(msg) => write!(f, "Schema error {msg}"),
37            Self::Timeout(msg) => write!(f, "Timeout {msg}"),
38            Self::DatabaseError(msg) => write!(f, "Database error {msg}"),
39        }
40    }
41}
42
43impl std::error::Error for AutoCreationError {}
44
45impl AutoCreationError {
46    #[instrument_trace]
47    pub fn should_continue(&self) -> bool {
48        match self {
49            Self::InsufficientPermissions(_) => true,
50            Self::ConnectionFailed(_) => false,
51            Self::SchemaError(_) => true,
52            Self::Timeout(_) => true,
53            Self::DatabaseError(_) => true,
54        }
55    }
56
57    #[instrument_trace]
58    pub fn user_message(&self) -> &str {
59        match self {
60            Self::InsufficientPermissions(msg) => msg,
61            Self::ConnectionFailed(msg) => msg,
62            Self::SchemaError(msg) => msg,
63            Self::Timeout(msg) => msg,
64            Self::DatabaseError(msg) => msg,
65        }
66    }
67}
68
69impl TableSchema {
70    #[instrument_trace]
71    pub fn with_dependency(mut self, dependency: String) -> Self {
72        self.get_mut_dependencies().push(dependency);
73        self
74    }
75}
76
77impl DatabasePlugin {
78    #[instrument_trace]
79    pub fn get_connection_timeout_duration() -> Duration {
80        let timeout_seconds: u64 = std::env::var(ENV_KEY_DB_CONNECTION_TIMEOUT_MILLIS)
81            .ok()
82            .and_then(|value: String| value.parse::<u64>().ok())
83            .unwrap_or(DEFAULT_DB_CONNECTION_TIMEOUT_MILLIS);
84        Duration::from_millis(timeout_seconds)
85    }
86
87    #[instrument_trace]
88    pub fn get_retry_duration() -> Duration {
89        let millis: u64 = std::env::var(ENV_KEY_DB_RETRY_INTERVAL_MILLIS)
90            .ok()
91            .and_then(|value: String| value.parse::<u64>().ok())
92            .unwrap_or(DEFAULT_DB_RETRY_INTERVAL_MILLIS);
93        Duration::from_millis(millis)
94    }
95
96    #[instrument_trace]
97    pub async fn initialize_auto_creation() -> Result<(), String> {
98        Self::initialize_auto_creation_with_schema(None, None, None).await
99    }
100
101    #[instrument_trace]
102    pub async fn initialize_auto_creation_with_schema(
103        mysql_schema: Option<DatabaseSchema>,
104        postgresql_schema: Option<DatabaseSchema>,
105        _redis_schema: Option<()>,
106    ) -> Result<(), String> {
107        if let Err(error) = AutoCreationConfig::validate() {
108            return Err(format!(
109                "Auto-creation configuration validation failed {error}"
110            ));
111        }
112        let env: &'static EnvConfig = EnvPlugin::get_or_init();
113        let mut initialization_results: Vec<String> = Vec::new();
114        for instance in env.get_mysql_instances() {
115            match MySqlPlugin::perform_auto_creation(instance, mysql_schema.clone()).await {
116                Ok(result) => {
117                    initialization_results.push(format!(
118                        "MySQL ({})  {}",
119                        instance.get_name(),
120                        if result.has_changes() {
121                            "initialized with changes"
122                        } else {
123                            "verified"
124                        }
125                    ));
126                }
127                Err(error) => {
128                    if !error.should_continue() {
129                        return Err(format!(
130                            "MySQL ({}) auto-creation failed {error}",
131                            instance.get_name()
132                        ));
133                    }
134                    initialization_results.push(format!(
135                        "MySQL ({}) : failed but continuing ({error})",
136                        instance.get_name()
137                    ));
138                }
139            }
140        }
141        for instance in env.get_postgresql_instances() {
142            match PostgreSqlPlugin::perform_auto_creation(instance, postgresql_schema.clone()).await
143            {
144                Ok(result) => {
145                    initialization_results.push(format!(
146                        "PostgreSQL ({})  {}",
147                        instance.get_name(),
148                        if result.has_changes() {
149                            "initialized with changes"
150                        } else {
151                            "verified"
152                        }
153                    ));
154                }
155                Err(error) => {
156                    if !error.should_continue() {
157                        return Err(format!(
158                            "PostgreSQL ({}) auto-creation failed {error}",
159                            instance.get_name()
160                        ));
161                    }
162                    initialization_results.push(format!(
163                        "PostgreSQL ({}) : failed but continuing ({error})",
164                        instance.get_name()
165                    ));
166                }
167            }
168        }
169        for instance in env.get_redis_instances() {
170            match RedisPlugin::perform_auto_creation(instance, None).await {
171                Ok(result) => {
172                    initialization_results.push(format!(
173                        "Redis ({})  {}",
174                        instance.get_name(),
175                        if result.has_changes() {
176                            "initialized with changes"
177                        } else {
178                            "verified"
179                        }
180                    ));
181                }
182                Err(error) => {
183                    if !error.should_continue() {
184                        return Err(format!(
185                            "Redis ({}) auto-creation failed {error}",
186                            instance.get_name()
187                        ));
188                    }
189                    initialization_results.push(format!(
190                        "Redis ({}) : failed but continuing ({error})",
191                        instance.get_name()
192                    ));
193                }
194            }
195        }
196        if initialization_results.is_empty() {
197            info!("[AUTO-CREATION] No plugins enabled for auto-creation");
198        } else {
199            let results_summary: String = initialization_results.join(", ");
200            info!("[AUTO-CREATION] Initialization complete {results_summary}");
201        }
202        Ok(())
203    }
204}
205
206impl<T: Clone> ConnectionCache<T> {
207    #[instrument_trace]
208    pub fn new(result: Result<T, String>) -> Self {
209        Self {
210            result,
211            last_attempt: Instant::now(),
212        }
213    }
214
215    #[instrument_trace]
216    pub fn is_expired(&self, duration: Duration) -> bool {
217        self.get_last_attempt().elapsed() >= duration
218    }
219
220    #[instrument_trace]
221    pub fn should_retry(&self, duration: Duration) -> bool {
222        self.try_get_result().is_err() && self.is_expired(duration)
223    }
224}
225
226impl AutoCreationResult {
227    #[instrument_trace]
228    pub fn has_changes(&self) -> bool {
229        self.get_database_created() || !self.get_tables_created().is_empty()
230    }
231
232    #[instrument_trace]
233    pub fn has_errors(&self) -> bool {
234        !self.get_errors().is_empty()
235    }
236}
237
238impl DatabaseSchema {
239    #[instrument_trace]
240    pub fn add_table(mut self, table: TableSchema) -> Self {
241        self.get_mut_tables().push(table);
242        self
243    }
244
245    #[instrument_trace]
246    pub fn add_index(mut self, index: String) -> Self {
247        self.get_mut_indexes().push(index);
248        self
249    }
250
251    #[instrument_trace]
252    pub fn add_constraint(mut self, constraint: String) -> Self {
253        self.get_mut_constraints().push(constraint);
254        self
255    }
256
257    #[instrument_trace]
258    pub fn add_init_data(mut self, init_data: String) -> Self {
259        self.get_mut_init_data().push(init_data);
260        self
261    }
262
263    #[instrument_trace]
264    pub fn ordered_tables(&self) -> Vec<&TableSchema> {
265        let mut ordered: Vec<&TableSchema> = Vec::new();
266        let mut remaining: Vec<&TableSchema> = self.get_tables().iter().collect();
267        while !remaining.is_empty() {
268            let mut added_any: bool = false;
269            remaining.retain(|table: &&TableSchema| {
270                let dependencies_satisfied: bool =
271                    table.get_dependencies().iter().all(|dep: &String| {
272                        ordered.iter().any(|ordered_table: &&TableSchema| {
273                            ordered_table.get_name().as_str() == dep.as_str()
274                        })
275                    });
276                if dependencies_satisfied {
277                    ordered.push(table);
278                    added_any = true;
279                    false
280                } else {
281                    true
282                }
283            });
284            if !added_any && !remaining.is_empty() {
285                for table in remaining {
286                    ordered.push(table);
287                }
288                break;
289            }
290        }
291        ordered
292    }
293}
294
295impl AutoCreationConfig {
296    #[instrument_trace]
297    pub fn validate() -> Result<(), String> {
298        let env: &'static EnvConfig = EnvPlugin::get_or_init();
299        if env.get_mysql_instances().is_empty() {
300            return Err("At least one MySQL instance is required".to_string());
301        }
302        if env.get_postgresql_instances().is_empty() {
303            return Err("At least one PostgreSQL instance is required".to_string());
304        }
305        if env.get_redis_instances().is_empty() {
306            return Err("At least one Redis instance is required".to_string());
307        }
308        Ok(())
309    }
310
311    #[instrument_trace]
312    pub fn for_plugin(plugin_name: &str) -> PluginAutoCreationConfig {
313        PluginAutoCreationConfig {
314            plugin_name: plugin_name.to_string(),
315        }
316    }
317}
318
319impl PluginAutoCreationConfig {
320    #[instrument_trace]
321    pub fn is_plugin_enabled(&self) -> bool {
322        PluginType::from_str(self.get_plugin_name()).is_ok()
323    }
324
325    #[instrument_trace]
326    pub fn get_database_name(&self) -> String {
327        let env: &'static EnvConfig = EnvPlugin::get_or_init();
328        if let Ok(plugin_type) = PluginType::from_str(self.get_plugin_name()) {
329            match plugin_type {
330                PluginType::MySQL => {
331                    if let Some(instance) = env.get_default_mysql_instance() {
332                        instance.get_database().clone()
333                    } else {
334                        "unknown".to_string()
335                    }
336                }
337                PluginType::PostgreSQL => {
338                    if let Some(instance) = env.get_default_postgresql_instance() {
339                        instance.get_database().clone()
340                    } else {
341                        "unknown".to_string()
342                    }
343                }
344                PluginType::Redis => "default".to_string(),
345            }
346        } else {
347            "unknown".to_string()
348        }
349    }
350
351    #[instrument_trace]
352    pub fn get_connection_info(&self) -> String {
353        let env: &'static EnvConfig = EnvPlugin::get_or_init();
354        if let Ok(plugin_type) = PluginType::from_str(self.get_plugin_name()) {
355            match plugin_type {
356                PluginType::MySQL => {
357                    if let Some(instance) = env.get_default_mysql_instance() {
358                        format!(
359                            "{}:{}:{}",
360                            instance.get_host(),
361                            instance.get_port(),
362                            instance.get_database()
363                        )
364                    } else {
365                        "unknown".to_string()
366                    }
367                }
368                PluginType::PostgreSQL => {
369                    if let Some(instance) = env.get_default_postgresql_instance() {
370                        format!(
371                            "{}:{}:{}",
372                            instance.get_host(),
373                            instance.get_port(),
374                            instance.get_database()
375                        )
376                    } else {
377                        "unknown".to_string()
378                    }
379                }
380                PluginType::Redis => {
381                    if let Some(instance) = env.get_default_redis_instance() {
382                        format!("{}:{}", instance.get_host(), instance.get_port())
383                    } else {
384                        "unknown".to_string()
385                    }
386                }
387            }
388        } else {
389            "unknown".to_string()
390        }
391    }
392}
393
394impl AutoCreationLogger {
395    #[instrument_trace]
396    pub async fn log_auto_creation_start(plugin_type: PluginType, database_name: &str) {
397        info!(
398            "[AUTO-CREATION] Starting auto-creation for {plugin_type} database '{database_name}'"
399        );
400    }
401
402    #[instrument_trace]
403    pub async fn log_auto_creation_complete(plugin_type: PluginType, result: &AutoCreationResult) {
404        if result.has_errors() {
405            info!(
406                "[AUTO-CREATION] Auto-creation completed for {plugin_type} with warnings {}",
407                result.get_errors().join(", ")
408            );
409        } else {
410            info!("[AUTO-CREATION] Auto-creation completed successfully for {plugin_type}");
411        }
412    }
413
414    #[instrument_trace]
415    pub async fn log_auto_creation_error(
416        error: &AutoCreationError,
417        operation: &str,
418        plugin_type: PluginType,
419        database_name: Option<&str>,
420    ) {
421        error!(
422            "[AUTO-CREATION] {operation} failed for {plugin_type} database '{}' {error}",
423            database_name.unwrap_or("unknown")
424        );
425    }
426
427    #[instrument_trace]
428    pub async fn log_connection_verification(
429        plugin_type: PluginType,
430        database_name: &str,
431        success: bool,
432        error: Option<&str>,
433    ) {
434        if success {
435            info!(
436                "[AUTO-CREATION] Connection verification successful for {plugin_type} database '{database_name}'"
437            );
438        } else {
439            error!(
440                "[AUTO-CREATION] Connection verification failed for {plugin_type} database '{database_name}' {}",
441                error.unwrap_or("Unknown error")
442            );
443        };
444    }
445
446    #[instrument_trace]
447    pub async fn log_database_created(database_name: &str, plugin_type: PluginType) {
448        info!(
449            "[AUTO-CREATION] Successfully created database '{database_name}' for {plugin_type} plugin"
450        );
451    }
452
453    #[instrument_trace]
454    pub async fn log_database_exists(database_name: &str, plugin_type: PluginType) {
455        info!("[AUTO-CREATION] Database '{database_name}' already exists for {plugin_type} plugin");
456    }
457
458    #[instrument_trace]
459    pub async fn log_table_created(table_name: &str, database_name: &str, plugin_type: PluginType) {
460        info!(
461            "[AUTO-CREATION] Successfully created table '{table_name}' in database '{database_name}' for {plugin_type} plugin"
462        );
463    }
464
465    #[instrument_trace]
466    pub async fn log_table_exists(table_name: &str, database_name: &str, plugin_type: PluginType) {
467        info!(
468            "[AUTO-CREATION] Table '{table_name}' already exists in database '{database_name}' for {plugin_type} plugin"
469        );
470    }
471
472    #[instrument_trace]
473    pub async fn log_tables_created(
474        tables: &[String],
475        database_name: &str,
476        plugin_type: PluginType,
477    ) {
478        if tables.is_empty() {
479            info!(
480                "[AUTO-CREATION] No new tables created in database '{database_name}' for {plugin_type} plugin"
481            );
482        } else {
483            info!(
484                "[AUTO-CREATION] Created tables [{}] in database '{database_name}' for {plugin_type} plugin",
485                tables.join(", ")
486            );
487        }
488    }
489}