Skip to main content

hyperlane_plugin/redis/
impl.rs

1use super::*;
2
3impl Default for RedisAutoCreation {
4    #[instrument_trace]
5    fn default() -> Self {
6        let env: &'static EnvConfig = get_global_env_config();
7        if let Some(instance) = env.get_default_redis_instance() {
8            Self::new(instance.clone())
9        } else {
10            let default_instance: RedisInstanceConfig = RedisInstanceConfig::default();
11            Self::new(default_instance)
12        }
13    }
14}
15
16impl RedisAutoCreation {
17    #[instrument_trace]
18    async fn create_mutable_connection(&self) -> Result<Connection, AutoCreationError> {
19        let db_url: String = self.instance.get_connection_url();
20        let client: Client = Client::open(db_url).map_err(|error: RedisError| {
21            let error_msg: String = error.to_string();
22            if error_msg.contains("authentication failed") || error_msg.contains("NOAUTH") {
23                AutoCreationError::InsufficientPermissions(format!(
24                    "Redis authentication failed{COLON_SPACE}{error_msg}"
25                ))
26            } else if error_msg.contains("Connection refused") || error_msg.contains("timeout") {
27                AutoCreationError::ConnectionFailed(format!(
28                    "Cannot connect to Redis server{COLON_SPACE}{error_msg}"
29                ))
30            } else {
31                AutoCreationError::DatabaseError(format!(
32                    "Redis connection error{COLON_SPACE}{error_msg}"
33                ))
34            }
35        })?;
36        let timeout_duration: Duration = get_connection_timeout_duration();
37        let timeout_seconds: u64 = timeout_duration.as_secs();
38        let connection_task: tokio::task::JoinHandle<Result<Connection, RedisError>> =
39            tokio::task::spawn_blocking(move || client.get_connection());
40        let connection: Connection = match timeout(timeout_duration, connection_task).await {
41            Ok(join_result) => match join_result {
42                Ok(result) => result.map_err(|error: RedisError| {
43                    let error_msg: String = error.to_string();
44                    if error_msg.contains("authentication failed") || error_msg.contains("NOAUTH") {
45                        AutoCreationError::InsufficientPermissions(format!(
46                            "Redis authentication failed{COLON_SPACE}{error_msg}"
47                        ))
48                    } else if error_msg.contains("Connection refused")
49                        || error_msg.contains("timeout")
50                    {
51                        AutoCreationError::ConnectionFailed(format!(
52                            "Cannot connect to Redis server{COLON_SPACE}{error_msg}"
53                        ))
54                    } else {
55                        AutoCreationError::DatabaseError(format!(
56                            "Redis connection error{COLON_SPACE}{error_msg}"
57                        ))
58                    }
59                })?,
60                Err(_) => {
61                    return Err(AutoCreationError::ConnectionFailed(
62                        "Redis connection task failed".to_string(),
63                    ));
64                }
65            },
66            Err(_) => {
67                return Err(AutoCreationError::Timeout(format!(
68                    "Redis connection timeout after {timeout_seconds} seconds"
69                )));
70            }
71        };
72        Ok(connection)
73    }
74
75    #[instrument_trace]
76    async fn validate_redis_server(&self) -> Result<(), AutoCreationError> {
77        let mut conn: Connection = self.create_mutable_connection().await?;
78        let pong: String = redis::cmd("PING")
79            .query(&mut conn)
80            .map_err(|error: RedisError| {
81                AutoCreationError::ConnectionFailed(format!(
82                    "Redis PING failed{COLON_SPACE}{error}"
83                ))
84            })?;
85        if pong != "PONG" {
86            return Err(AutoCreationError::ConnectionFailed(
87                "Redis PING returned unexpected response".to_string(),
88            ));
89        }
90        let info: String =
91            redis::cmd("INFO")
92                .arg("server")
93                .query(&mut conn)
94                .map_err(|error: RedisError| {
95                    AutoCreationError::DatabaseError(format!(
96                        "Failed to get Redis server info{COLON_SPACE}{error}"
97                    ))
98                })?;
99        if info.contains("redis_version:") {
100            AutoCreationLogger::log_connection_verification(
101                database::PluginType::Redis,
102                &self.instance.name,
103                true,
104                None,
105            )
106            .await;
107        }
108        Ok(())
109    }
110
111    #[instrument_trace]
112    async fn setup_redis_namespace(&self) -> Result<Vec<String>, AutoCreationError> {
113        let mut setup_operations: Vec<String> = Vec::new();
114        let mut conn: Connection = self.create_mutable_connection().await?;
115        let app_key: String = format!("{}:initialized", self.instance.name);
116        let exists: i32 = redis::cmd("EXISTS")
117            .arg(&app_key)
118            .query(&mut conn)
119            .map_err(|error: RedisError| {
120                AutoCreationError::DatabaseError(format!(
121                    "Failed to check Redis key existence{COLON_SPACE}{error}"
122                ))
123            })?;
124        if exists == 0 {
125            let _: () = redis::cmd("SET")
126                .arg(&app_key)
127                .arg("true")
128                .query(&mut conn)
129                .map_err(|error: RedisError| {
130                    AutoCreationError::DatabaseError(format!(
131                        "Failed to set Redis initialization key{COLON_SPACE}{error}"
132                    ))
133                })?;
134            setup_operations.push(app_key.clone());
135            let config_key: String = format!("{}:config:version", self.instance.name);
136            let _: () = redis::cmd("SET")
137                .arg(&config_key)
138                .arg("1.0.0")
139                .query(&mut conn)
140                .map_err(|error: RedisError| {
141                    AutoCreationError::DatabaseError(format!(
142                        "Failed to set Redis config key{COLON_SPACE}{error}"
143                    ))
144                })?;
145            setup_operations.push(config_key);
146        }
147        Ok(setup_operations)
148    }
149}
150
151impl DatabaseAutoCreation for RedisAutoCreation {
152    #[instrument_trace]
153    async fn create_database_if_not_exists(&self) -> Result<bool, AutoCreationError> {
154        self.validate_redis_server().await?;
155        AutoCreationLogger::log_database_exists(&self.instance.name, database::PluginType::Redis)
156            .await;
157        Ok(false)
158    }
159
160    #[instrument_trace]
161    async fn create_tables_if_not_exist(&self) -> Result<Vec<String>, AutoCreationError> {
162        let setup_operations: Vec<String> = self.setup_redis_namespace().await?;
163        if !setup_operations.is_empty() {
164            AutoCreationLogger::log_tables_created(
165                &setup_operations,
166                &self.instance.name,
167                database::PluginType::Redis,
168            )
169            .await;
170        } else {
171            AutoCreationLogger::log_tables_created(
172                &[],
173                &self.instance.name,
174                database::PluginType::Redis,
175            )
176            .await;
177        }
178        Ok(setup_operations)
179    }
180
181    #[instrument_trace]
182    async fn verify_connection(&self) -> Result<(), AutoCreationError> {
183        match self.validate_redis_server().await {
184            Ok(_) => {
185                AutoCreationLogger::log_connection_verification(
186                    database::PluginType::Redis,
187                    &self.instance.name,
188                    true,
189                    None,
190                )
191                .await;
192                Ok(())
193            }
194            Err(error) => {
195                AutoCreationLogger::log_connection_verification(
196                    database::PluginType::Redis,
197                    &self.instance.name,
198                    false,
199                    Some(&error.to_string()),
200                )
201                .await;
202                Err(error)
203            }
204        }
205    }
206}