hyperlane_plugin/database/
impl.rs1use super::*;
2
3impl<T: Clone> ConnectionCache<T> {
4 #[instrument_trace]
5 pub fn new(result: Result<T, String>) -> Self {
6 Self {
7 result,
8 last_attempt: Instant::now(),
9 }
10 }
11
12 #[instrument_trace]
13 pub fn is_expired(&self, duration: Duration) -> bool {
14 self.get_last_attempt().elapsed() >= duration
15 }
16
17 #[instrument_trace]
18 pub fn should_retry(&self, duration: Duration) -> bool {
19 self.try_get_result().is_err() && self.is_expired(duration)
20 }
21}
22
23impl PluginType {
24 #[instrument_trace]
25 pub fn as_str(&self) -> &'static str {
26 match self {
27 Self::MySQL => "MySQL",
28 Self::PostgreSQL => "PostgreSQL",
29 Self::Redis => "Redis",
30 }
31 }
32}
33
34impl FromStr for PluginType {
35 type Err = ();
36
37 #[instrument_trace]
38 fn from_str(s: &str) -> Result<Self, Self::Err> {
39 match s {
40 "MySQL" => Ok(Self::MySQL),
41 "PostgreSQL" => Ok(Self::PostgreSQL),
42 "Redis" => Ok(Self::Redis),
43 _ => Err(()),
44 }
45 }
46}
47
48impl std::fmt::Display for PluginType {
49 #[instrument_trace]
50 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
51 write!(f, "{}", self.as_str())
52 }
53}
54
55impl AutoCreationError {
56 #[instrument_trace]
57 pub fn should_continue(&self) -> bool {
58 match self {
59 Self::InsufficientPermissions(_) => true,
60 Self::ConnectionFailed(_) => false,
61 Self::SchemaError(_) => true,
62 Self::Timeout(_) => true,
63 Self::DatabaseError(_) => true,
64 }
65 }
66
67 #[instrument_trace]
68 pub fn user_message(&self) -> &str {
69 match self {
70 Self::InsufficientPermissions(msg) => msg,
71 Self::ConnectionFailed(msg) => msg,
72 Self::SchemaError(msg) => msg,
73 Self::Timeout(msg) => msg,
74 Self::DatabaseError(msg) => msg,
75 }
76 }
77}
78
79impl std::fmt::Display for AutoCreationError {
80 #[instrument_trace]
81 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
82 match self {
83 Self::InsufficientPermissions(msg) => {
84 write!(f, "Insufficient permissions{COLON_SPACE}{msg}")
85 }
86 Self::ConnectionFailed(msg) => write!(f, "Connection failed{COLON_SPACE}{msg}"),
87 Self::SchemaError(msg) => write!(f, "Schema error{COLON_SPACE}{msg}"),
88 Self::Timeout(msg) => write!(f, "Timeout{COLON_SPACE}{msg}"),
89 Self::DatabaseError(msg) => write!(f, "Database error{COLON_SPACE}{msg}"),
90 }
91 }
92}
93
94impl std::error::Error for AutoCreationError {}
95
96impl AutoCreationResult {
97 #[instrument_trace]
98 pub fn has_changes(&self) -> bool {
99 self.get_database_created() || !self.get_tables_created().is_empty()
100 }
101
102 #[instrument_trace]
103 pub fn has_errors(&self) -> bool {
104 !self.get_errors().is_empty()
105 }
106}
107
108impl Default for AutoCreationResult {
109 #[instrument_trace]
110 fn default() -> Self {
111 Self {
112 database_created: false,
113 tables_created: Vec::new(),
114 errors: Vec::new(),
115 duration: Duration::from_secs(0),
116 }
117 }
118}
119
120impl TableSchema {
121 #[instrument_trace]
122 pub fn with_dependency(mut self, dependency: String) -> Self {
123 self.get_mut_dependencies().push(dependency);
124 self
125 }
126}
127
128impl DatabaseSchema {
129 #[instrument_trace]
130 pub fn add_table(mut self, table: TableSchema) -> Self {
131 self.get_mut_tables().push(table);
132 self
133 }
134
135 #[instrument_trace]
136 pub fn add_index(mut self, index: String) -> Self {
137 self.get_mut_indexes().push(index);
138 self
139 }
140
141 #[instrument_trace]
142 pub fn add_constraint(mut self, constraint: String) -> Self {
143 self.get_mut_constraints().push(constraint);
144 self
145 }
146
147 #[instrument_trace]
148 pub fn ordered_tables(&self) -> Vec<&TableSchema> {
149 let mut ordered: Vec<&TableSchema> = Vec::new();
150 let mut remaining: Vec<&TableSchema> = self.get_tables().iter().collect();
151 while !remaining.is_empty() {
152 let mut added_any: bool = false;
153 remaining.retain(|table: &&TableSchema| {
154 let dependencies_satisfied: bool =
155 table.get_dependencies().iter().all(|dep: &String| {
156 ordered.iter().any(|ordered_table: &&TableSchema| {
157 ordered_table.get_name().as_str() == dep.as_str()
158 })
159 });
160 if dependencies_satisfied {
161 ordered.push(table);
162 added_any = true;
163 false
164 } else {
165 true
166 }
167 });
168 if !added_any && !remaining.is_empty() {
169 for table in remaining {
170 ordered.push(table);
171 }
172 break;
173 }
174 }
175 ordered
176 }
177}
178
179impl AutoCreationConfig {
180 #[instrument_trace]
181 pub fn get_env() -> &'static env::EnvConfig {
182 env::get_global_env_config()
183 }
184
185 #[instrument_trace]
186 pub fn validate() -> Result<(), String> {
187 let env: &'static EnvConfig = Self::get_env();
188 if env.get_mysql_instances().is_empty() {
189 return Err("At least one MySQL instance is required".to_string());
190 }
191 if env.get_postgresql_instances().is_empty() {
192 return Err("At least one PostgreSQL instance is required".to_string());
193 }
194 if env.get_redis_instances().is_empty() {
195 return Err("At least one Redis instance is required".to_string());
196 }
197 Ok(())
198 }
199
200 #[instrument_trace]
201 pub fn for_plugin(plugin_name: &str) -> PluginAutoCreationConfig {
202 PluginAutoCreationConfig {
203 plugin_name: plugin_name.to_string(),
204 }
205 }
206}
207
208impl PluginAutoCreationConfig {
209 #[instrument_trace]
210 pub fn is_plugin_enabled(&self) -> bool {
211 PluginType::from_str(self.get_plugin_name()).is_ok()
212 }
213
214 #[instrument_trace]
215 pub fn get_database_name(&self) -> String {
216 let env: &'static EnvConfig = AutoCreationConfig::get_env();
217 if let Ok(plugin_type) = PluginType::from_str(self.get_plugin_name()) {
218 match plugin_type {
219 PluginType::MySQL => {
220 if let Some(instance) = env.get_default_mysql_instance() {
221 instance.get_database().clone()
222 } else {
223 "unknown".to_string()
224 }
225 }
226 PluginType::PostgreSQL => {
227 if let Some(instance) = env.get_default_postgresql_instance() {
228 instance.get_database().clone()
229 } else {
230 "unknown".to_string()
231 }
232 }
233 PluginType::Redis => "default".to_string(),
234 }
235 } else {
236 "unknown".to_string()
237 }
238 }
239
240 #[instrument_trace]
241 pub fn get_connection_info(&self) -> String {
242 let env: &'static EnvConfig = AutoCreationConfig::get_env();
243 if let Ok(plugin_type) = PluginType::from_str(self.get_plugin_name()) {
244 match plugin_type {
245 PluginType::MySQL => {
246 if let Some(instance) = env.get_default_mysql_instance() {
247 format!(
248 "{}:{}:{}",
249 instance.get_host(),
250 instance.get_port(),
251 instance.get_database()
252 )
253 } else {
254 "unknown".to_string()
255 }
256 }
257 PluginType::PostgreSQL => {
258 if let Some(instance) = env.get_default_postgresql_instance() {
259 format!(
260 "{}:{}:{}",
261 instance.get_host(),
262 instance.get_port(),
263 instance.get_database()
264 )
265 } else {
266 "unknown".to_string()
267 }
268 }
269 PluginType::Redis => {
270 if let Some(instance) = env.get_default_redis_instance() {
271 format!("{}:{}", instance.get_host(), instance.get_port())
272 } else {
273 "unknown".to_string()
274 }
275 }
276 }
277 } else {
278 "unknown".to_string()
279 }
280 }
281}
282
283impl AutoCreationLogger {
284 #[instrument_trace]
285 pub async fn log_auto_creation_start(plugin_type: PluginType, database_name: &str) {
286 info!(
287 "[AUTO-CREATION] Starting auto-creation for {plugin_type} database '{database_name}'"
288 );
289 }
290
291 #[instrument_trace]
292 pub async fn log_auto_creation_complete(plugin_type: PluginType, result: &AutoCreationResult) {
293 if result.has_errors() {
294 info!(
295 "[AUTO-CREATION] Auto-creation completed for {plugin_type} with warnings{COLON_SPACE}{}",
296 result.get_errors().join(", ")
297 );
298 } else {
299 info!("[AUTO-CREATION] Auto-creation completed successfully for {plugin_type}");
300 }
301 }
302
303 #[instrument_trace]
304 pub async fn log_auto_creation_error(
305 error: &AutoCreationError,
306 operation: &str,
307 plugin_type: PluginType,
308 database_name: Option<&str>,
309 ) {
310 error!(
311 "[AUTO-CREATION] {operation} failed for {plugin_type} database '{}'{COLON_SPACE}{error}",
312 database_name.unwrap_or("unknown")
313 );
314 }
315
316 #[instrument_trace]
317 pub async fn log_connection_verification(
318 plugin_type: PluginType,
319 database_name: &str,
320 success: bool,
321 error: Option<&str>,
322 ) {
323 if success {
324 info!(
325 "[AUTO-CREATION] Connection verification successful for {plugin_type} database '{database_name}'"
326 );
327 } else {
328 error!(
329 "[AUTO-CREATION] Connection verification failed for {plugin_type} database '{database_name}'{COLON_SPACE}{}",
330 error.unwrap_or("Unknown error")
331 );
332 };
333 }
334
335 #[instrument_trace]
336 pub async fn log_database_created(database_name: &str, plugin_type: PluginType) {
337 info!(
338 "[AUTO-CREATION] Successfully created database '{database_name}' for {plugin_type} plugin"
339 );
340 }
341
342 #[instrument_trace]
343 pub async fn log_database_exists(database_name: &str, plugin_type: PluginType) {
344 info!("[AUTO-CREATION] Database '{database_name}' already exists for {plugin_type} plugin");
345 }
346
347 #[instrument_trace]
348 pub async fn log_table_created(table_name: &str, database_name: &str, plugin_type: PluginType) {
349 info!(
350 "[AUTO-CREATION] Successfully created table '{table_name}' in database '{database_name}' for {plugin_type} plugin"
351 );
352 }
353
354 #[instrument_trace]
355 pub async fn log_table_exists(table_name: &str, database_name: &str, plugin_type: PluginType) {
356 info!(
357 "[AUTO-CREATION] Table '{table_name}' already exists in database '{database_name}' for {plugin_type} plugin"
358 );
359 }
360
361 #[instrument_trace]
362 pub async fn log_tables_created(
363 tables: &[String],
364 database_name: &str,
365 plugin_type: PluginType,
366 ) {
367 if tables.is_empty() {
368 info!(
369 "[AUTO-CREATION] No new tables created in database '{database_name}' for {plugin_type} plugin"
370 );
371 } else {
372 info!(
373 "[AUTO-CREATION] Created tables [{}] in database '{database_name}' for {plugin_type} plugin",
374 tables.join(", ")
375 );
376 }
377 }
378}