hyperlane_plugin/database/
impl.rs1use super::*;
2
3impl fmt::Display for PluginType {
4 #[instrument_trace]
5 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
6 match self {
7 Self::MySQL => write!(f, "MySQL"),
8 Self::PostgreSQL => write!(f, "PostgreSQL"),
9 Self::Redis => write!(f, "Redis"),
10 }
11 }
12}
13
14impl FromStr for PluginType {
15 type Err = ();
16
17 #[instrument_trace]
18 fn from_str(s: &str) -> Result<Self, Self::Err> {
19 match s {
20 "MySQL" => Ok(Self::MySQL),
21 "PostgreSQL" => Ok(Self::PostgreSQL),
22 "Redis" => Ok(Self::Redis),
23 _ => Err(()),
24 }
25 }
26}
27
28impl std::fmt::Display for AutoCreationError {
29 #[instrument_trace]
30 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
31 match self {
32 Self::InsufficientPermissions(msg) => {
33 write!(f, "Insufficient permissions {msg}")
34 }
35 Self::ConnectionFailed(msg) => write!(f, "Connection failed {msg}"),
36 Self::SchemaError(msg) => write!(f, "Schema error {msg}"),
37 Self::Timeout(msg) => write!(f, "Timeout {msg}"),
38 Self::DatabaseError(msg) => write!(f, "Database error {msg}"),
39 }
40 }
41}
42
43impl std::error::Error for AutoCreationError {}
44
45impl AutoCreationError {
46 #[instrument_trace]
47 pub fn should_continue(&self) -> bool {
48 match self {
49 Self::InsufficientPermissions(_) => true,
50 Self::ConnectionFailed(_) => false,
51 Self::SchemaError(_) => true,
52 Self::Timeout(_) => true,
53 Self::DatabaseError(_) => true,
54 }
55 }
56
57 #[instrument_trace]
58 pub fn user_message(&self) -> &str {
59 match self {
60 Self::InsufficientPermissions(msg) => msg,
61 Self::ConnectionFailed(msg) => msg,
62 Self::SchemaError(msg) => msg,
63 Self::Timeout(msg) => msg,
64 Self::DatabaseError(msg) => msg,
65 }
66 }
67}
68
69impl TableSchema {
70 #[instrument_trace]
71 pub fn with_dependency(mut self, dependency: String) -> Self {
72 self.get_mut_dependencies().push(dependency);
73 self
74 }
75}
76
77impl DatabasePlugin {
78 #[instrument_trace]
79 pub fn get_connection_timeout_duration() -> Duration {
80 let timeout_seconds: u64 = std::env::var(ENV_KEY_DB_CONNECTION_TIMEOUT_MILLIS)
81 .ok()
82 .and_then(|value: String| value.parse::<u64>().ok())
83 .unwrap_or(DEFAULT_DB_CONNECTION_TIMEOUT_MILLIS);
84 Duration::from_millis(timeout_seconds)
85 }
86
87 #[instrument_trace]
88 pub fn get_retry_duration() -> Duration {
89 let millis: u64 = std::env::var(ENV_KEY_DB_RETRY_INTERVAL_MILLIS)
90 .ok()
91 .and_then(|value: String| value.parse::<u64>().ok())
92 .unwrap_or(DEFAULT_DB_RETRY_INTERVAL_MILLIS);
93 Duration::from_millis(millis)
94 }
95
96 #[instrument_trace]
97 pub async fn initialize_auto_creation() -> Result<(), String> {
98 Self::initialize_auto_creation_with_schema(None, None, None).await
99 }
100
101 #[instrument_trace]
102 pub async fn initialize_auto_creation_with_schema(
103 mysql_schema: Option<DatabaseSchema>,
104 postgresql_schema: Option<DatabaseSchema>,
105 _redis_schema: Option<()>,
106 ) -> Result<(), String> {
107 if let Err(error) = AutoCreationConfig::validate() {
108 return Err(format!(
109 "Auto-creation configuration validation failed {error}"
110 ));
111 }
112 let env: &'static EnvConfig = EnvPlugin::get_or_init();
113 let mut initialization_results: Vec<String> = Vec::new();
114 for instance in env.get_mysql_instances() {
115 match MySqlPlugin::perform_auto_creation(instance, mysql_schema.clone()).await {
116 Ok(result) => {
117 initialization_results.push(format!(
118 "MySQL ({}) {}",
119 instance.get_name(),
120 if result.has_changes() {
121 "initialized with changes"
122 } else {
123 "verified"
124 }
125 ));
126 }
127 Err(error) => {
128 if !error.should_continue() {
129 return Err(format!(
130 "MySQL ({}) auto-creation failed {error}",
131 instance.get_name()
132 ));
133 }
134 initialization_results.push(format!(
135 "MySQL ({}) : failed but continuing ({error})",
136 instance.get_name()
137 ));
138 }
139 }
140 }
141 for instance in env.get_postgresql_instances() {
142 match PostgreSqlPlugin::perform_auto_creation(instance, postgresql_schema.clone()).await
143 {
144 Ok(result) => {
145 initialization_results.push(format!(
146 "PostgreSQL ({}) {}",
147 instance.get_name(),
148 if result.has_changes() {
149 "initialized with changes"
150 } else {
151 "verified"
152 }
153 ));
154 }
155 Err(error) => {
156 if !error.should_continue() {
157 return Err(format!(
158 "PostgreSQL ({}) auto-creation failed {error}",
159 instance.get_name()
160 ));
161 }
162 initialization_results.push(format!(
163 "PostgreSQL ({}) : failed but continuing ({error})",
164 instance.get_name()
165 ));
166 }
167 }
168 }
169 for instance in env.get_redis_instances() {
170 match RedisPlugin::perform_auto_creation(instance, None).await {
171 Ok(result) => {
172 initialization_results.push(format!(
173 "Redis ({}) {}",
174 instance.get_name(),
175 if result.has_changes() {
176 "initialized with changes"
177 } else {
178 "verified"
179 }
180 ));
181 }
182 Err(error) => {
183 if !error.should_continue() {
184 return Err(format!(
185 "Redis ({}) auto-creation failed {error}",
186 instance.get_name()
187 ));
188 }
189 initialization_results.push(format!(
190 "Redis ({}) : failed but continuing ({error})",
191 instance.get_name()
192 ));
193 }
194 }
195 }
196 if initialization_results.is_empty() {
197 info!("[AUTO-CREATION] No plugins enabled for auto-creation");
198 } else {
199 let results_summary: String = initialization_results.join(", ");
200 info!("[AUTO-CREATION] Initialization complete {results_summary}");
201 }
202 Ok(())
203 }
204}
205
206impl<T: Clone> ConnectionCache<T> {
207 #[instrument_trace]
208 pub fn new(result: Result<T, String>) -> Self {
209 Self {
210 result,
211 last_attempt: Instant::now(),
212 }
213 }
214
215 #[instrument_trace]
216 pub fn is_expired(&self, duration: Duration) -> bool {
217 self.get_last_attempt().elapsed() >= duration
218 }
219
220 #[instrument_trace]
221 pub fn should_retry(&self, duration: Duration) -> bool {
222 self.try_get_result().is_err() && self.is_expired(duration)
223 }
224}
225
226impl AutoCreationResult {
227 #[instrument_trace]
228 pub fn has_changes(&self) -> bool {
229 self.get_database_created() || !self.get_tables_created().is_empty()
230 }
231
232 #[instrument_trace]
233 pub fn has_errors(&self) -> bool {
234 !self.get_errors().is_empty()
235 }
236}
237
238impl DatabaseSchema {
239 #[instrument_trace]
240 pub fn add_table(mut self, table: TableSchema) -> Self {
241 self.get_mut_tables().push(table);
242 self
243 }
244
245 #[instrument_trace]
246 pub fn add_index(mut self, index: String) -> Self {
247 self.get_mut_indexes().push(index);
248 self
249 }
250
251 #[instrument_trace]
252 pub fn add_constraint(mut self, constraint: String) -> Self {
253 self.get_mut_constraints().push(constraint);
254 self
255 }
256
257 #[instrument_trace]
258 pub fn add_init_data(mut self, init_data: String) -> Self {
259 self.get_mut_init_data().push(init_data);
260 self
261 }
262
263 #[instrument_trace]
264 pub fn ordered_tables(&self) -> Vec<&TableSchema> {
265 let mut ordered: Vec<&TableSchema> = Vec::new();
266 let mut remaining: Vec<&TableSchema> = self.get_tables().iter().collect();
267 while !remaining.is_empty() {
268 let mut added_any: bool = false;
269 remaining.retain(|table: &&TableSchema| {
270 let dependencies_satisfied: bool =
271 table.get_dependencies().iter().all(|dep: &String| {
272 ordered.iter().any(|ordered_table: &&TableSchema| {
273 ordered_table.get_name().as_str() == dep.as_str()
274 })
275 });
276 if dependencies_satisfied {
277 ordered.push(table);
278 added_any = true;
279 false
280 } else {
281 true
282 }
283 });
284 if !added_any && !remaining.is_empty() {
285 for table in remaining {
286 ordered.push(table);
287 }
288 break;
289 }
290 }
291 ordered
292 }
293}
294
295impl AutoCreationConfig {
296 #[instrument_trace]
297 pub fn validate() -> Result<(), String> {
298 let env: &'static EnvConfig = EnvPlugin::get_or_init();
299 if env.get_mysql_instances().is_empty() {
300 return Err("At least one MySQL instance is required".to_string());
301 }
302 if env.get_postgresql_instances().is_empty() {
303 return Err("At least one PostgreSQL instance is required".to_string());
304 }
305 if env.get_redis_instances().is_empty() {
306 return Err("At least one Redis instance is required".to_string());
307 }
308 Ok(())
309 }
310
311 #[instrument_trace]
312 pub fn for_plugin(plugin_name: &str) -> PluginAutoCreationConfig {
313 PluginAutoCreationConfig {
314 plugin_name: plugin_name.to_string(),
315 }
316 }
317}
318
319impl PluginAutoCreationConfig {
320 #[instrument_trace]
321 pub fn is_plugin_enabled(&self) -> bool {
322 PluginType::from_str(self.get_plugin_name()).is_ok()
323 }
324
325 #[instrument_trace]
326 pub fn get_database_name(&self) -> String {
327 let env: &'static EnvConfig = EnvPlugin::get_or_init();
328 if let Ok(plugin_type) = PluginType::from_str(self.get_plugin_name()) {
329 match plugin_type {
330 PluginType::MySQL => {
331 if let Some(instance) = env.get_default_mysql_instance() {
332 instance.get_database().clone()
333 } else {
334 "unknown".to_string()
335 }
336 }
337 PluginType::PostgreSQL => {
338 if let Some(instance) = env.get_default_postgresql_instance() {
339 instance.get_database().clone()
340 } else {
341 "unknown".to_string()
342 }
343 }
344 PluginType::Redis => "default".to_string(),
345 }
346 } else {
347 "unknown".to_string()
348 }
349 }
350
351 #[instrument_trace]
352 pub fn get_connection_info(&self) -> String {
353 let env: &'static EnvConfig = EnvPlugin::get_or_init();
354 if let Ok(plugin_type) = PluginType::from_str(self.get_plugin_name()) {
355 match plugin_type {
356 PluginType::MySQL => {
357 if let Some(instance) = env.get_default_mysql_instance() {
358 format!(
359 "{}:{}:{}",
360 instance.get_host(),
361 instance.get_port(),
362 instance.get_database()
363 )
364 } else {
365 "unknown".to_string()
366 }
367 }
368 PluginType::PostgreSQL => {
369 if let Some(instance) = env.get_default_postgresql_instance() {
370 format!(
371 "{}:{}:{}",
372 instance.get_host(),
373 instance.get_port(),
374 instance.get_database()
375 )
376 } else {
377 "unknown".to_string()
378 }
379 }
380 PluginType::Redis => {
381 if let Some(instance) = env.get_default_redis_instance() {
382 format!("{}:{}", instance.get_host(), instance.get_port())
383 } else {
384 "unknown".to_string()
385 }
386 }
387 }
388 } else {
389 "unknown".to_string()
390 }
391 }
392}
393
394impl AutoCreationLogger {
395 #[instrument_trace]
396 pub async fn log_auto_creation_start(plugin_type: PluginType, database_name: &str) {
397 info!(
398 "[AUTO-CREATION] Starting auto-creation for {plugin_type} database '{database_name}'"
399 );
400 }
401
402 #[instrument_trace]
403 pub async fn log_auto_creation_complete(plugin_type: PluginType, result: &AutoCreationResult) {
404 if result.has_errors() {
405 info!(
406 "[AUTO-CREATION] Auto-creation completed for {plugin_type} with warnings {}",
407 result.get_errors().join(", ")
408 );
409 } else {
410 info!("[AUTO-CREATION] Auto-creation completed successfully for {plugin_type}");
411 }
412 }
413
414 #[instrument_trace]
415 pub async fn log_auto_creation_error(
416 error: &AutoCreationError,
417 operation: &str,
418 plugin_type: PluginType,
419 database_name: Option<&str>,
420 ) {
421 error!(
422 "[AUTO-CREATION] {operation} failed for {plugin_type} database '{}' {error}",
423 database_name.unwrap_or("unknown")
424 );
425 }
426
427 #[instrument_trace]
428 pub async fn log_connection_verification(
429 plugin_type: PluginType,
430 database_name: &str,
431 success: bool,
432 error: Option<&str>,
433 ) {
434 if success {
435 info!(
436 "[AUTO-CREATION] Connection verification successful for {plugin_type} database '{database_name}'"
437 );
438 } else {
439 error!(
440 "[AUTO-CREATION] Connection verification failed for {plugin_type} database '{database_name}' {}",
441 error.unwrap_or("Unknown error")
442 );
443 };
444 }
445
446 #[instrument_trace]
447 pub async fn log_database_created(database_name: &str, plugin_type: PluginType) {
448 info!(
449 "[AUTO-CREATION] Successfully created database '{database_name}' for {plugin_type} plugin"
450 );
451 }
452
453 #[instrument_trace]
454 pub async fn log_database_exists(database_name: &str, plugin_type: PluginType) {
455 info!("[AUTO-CREATION] Database '{database_name}' already exists for {plugin_type} plugin");
456 }
457
458 #[instrument_trace]
459 pub async fn log_table_created(table_name: &str, database_name: &str, plugin_type: PluginType) {
460 info!(
461 "[AUTO-CREATION] Successfully created table '{table_name}' in database '{database_name}' for {plugin_type} plugin"
462 );
463 }
464
465 #[instrument_trace]
466 pub async fn log_table_exists(table_name: &str, database_name: &str, plugin_type: PluginType) {
467 info!(
468 "[AUTO-CREATION] Table '{table_name}' already exists in database '{database_name}' for {plugin_type} plugin"
469 );
470 }
471
472 #[instrument_trace]
473 pub async fn log_tables_created(
474 tables: &[String],
475 database_name: &str,
476 plugin_type: PluginType,
477 ) {
478 if tables.is_empty() {
479 info!(
480 "[AUTO-CREATION] No new tables created in database '{database_name}' for {plugin_type} plugin"
481 );
482 } else {
483 info!(
484 "[AUTO-CREATION] Created tables [{}] in database '{database_name}' for {plugin_type} plugin",
485 tables.join(", ")
486 );
487 }
488 }
489}