hyperlane_plugin/redis/
impl.rs1use super::*;
2
3impl Default for RedisAutoCreation {
4 #[instrument_trace]
5 fn default() -> Self {
6 let env: &'static EnvConfig = get_global_env_config();
7 if let Some(instance) = env.get_default_redis_instance() {
8 Self::new(instance.clone())
9 } else {
10 let default_instance: RedisInstanceConfig = RedisInstanceConfig::default();
11 Self::new(default_instance)
12 }
13 }
14}
15
16impl RedisAutoCreation {
17 #[instrument_trace]
18 async fn create_mutable_connection(&self) -> Result<Connection, AutoCreationError> {
19 let db_url: String = self.instance.get_connection_url();
20 let client: Client = Client::open(db_url).map_err(|error: RedisError| {
21 let error_msg: String = error.to_string();
22 if error_msg.contains("authentication failed") || error_msg.contains("NOAUTH") {
23 AutoCreationError::InsufficientPermissions(format!(
24 "Redis authentication failed{COLON_SPACE}{error_msg}"
25 ))
26 } else if error_msg.contains("Connection refused") || error_msg.contains("timeout") {
27 AutoCreationError::ConnectionFailed(format!(
28 "Cannot connect to Redis server{COLON_SPACE}{error_msg}"
29 ))
30 } else {
31 AutoCreationError::DatabaseError(format!(
32 "Redis connection error{COLON_SPACE}{error_msg}"
33 ))
34 }
35 })?;
36 let timeout_duration: Duration = get_connection_timeout_duration();
37 let timeout_seconds: u64 = timeout_duration.as_secs();
38 let connection_task: tokio::task::JoinHandle<Result<Connection, RedisError>> =
39 tokio::task::spawn_blocking(move || client.get_connection());
40 let connection: Connection = match timeout(timeout_duration, connection_task).await {
41 Ok(join_result) => match join_result {
42 Ok(result) => result.map_err(|error: RedisError| {
43 let error_msg: String = error.to_string();
44 if error_msg.contains("authentication failed") || error_msg.contains("NOAUTH") {
45 AutoCreationError::InsufficientPermissions(format!(
46 "Redis authentication failed{COLON_SPACE}{error_msg}"
47 ))
48 } else if error_msg.contains("Connection refused")
49 || error_msg.contains("timeout")
50 {
51 AutoCreationError::ConnectionFailed(format!(
52 "Cannot connect to Redis server{COLON_SPACE}{error_msg}"
53 ))
54 } else {
55 AutoCreationError::DatabaseError(format!(
56 "Redis connection error{COLON_SPACE}{error_msg}"
57 ))
58 }
59 })?,
60 Err(_) => {
61 return Err(AutoCreationError::ConnectionFailed(
62 "Redis connection task failed".to_string(),
63 ));
64 }
65 },
66 Err(_) => {
67 return Err(AutoCreationError::Timeout(format!(
68 "Redis connection timeout after {timeout_seconds} seconds"
69 )));
70 }
71 };
72 Ok(connection)
73 }
74
75 #[instrument_trace]
76 async fn validate_redis_server(&self) -> Result<(), AutoCreationError> {
77 let mut conn: Connection = self.create_mutable_connection().await?;
78 let pong: String = redis::cmd("PING")
79 .query(&mut conn)
80 .map_err(|error: RedisError| {
81 AutoCreationError::ConnectionFailed(format!(
82 "Redis PING failed{COLON_SPACE}{error}"
83 ))
84 })?;
85 if pong != "PONG" {
86 return Err(AutoCreationError::ConnectionFailed(
87 "Redis PING returned unexpected response".to_string(),
88 ));
89 }
90 let info: String =
91 redis::cmd("INFO")
92 .arg("server")
93 .query(&mut conn)
94 .map_err(|error: RedisError| {
95 AutoCreationError::DatabaseError(format!(
96 "Failed to get Redis server info{COLON_SPACE}{error}"
97 ))
98 })?;
99 if info.contains("redis_version:") {
100 AutoCreationLogger::log_connection_verification(
101 database::PluginType::Redis,
102 &self.instance.name,
103 true,
104 None,
105 )
106 .await;
107 }
108 Ok(())
109 }
110
111 #[instrument_trace]
112 async fn setup_redis_namespace(&self) -> Result<Vec<String>, AutoCreationError> {
113 let mut setup_operations: Vec<String> = Vec::new();
114 let mut conn: Connection = self.create_mutable_connection().await?;
115 let app_key: String = format!("{}:initialized", self.instance.name);
116 let exists: i32 = redis::cmd("EXISTS")
117 .arg(&app_key)
118 .query(&mut conn)
119 .map_err(|error: RedisError| {
120 AutoCreationError::DatabaseError(format!(
121 "Failed to check Redis key existence{COLON_SPACE}{error}"
122 ))
123 })?;
124 if exists == 0 {
125 let _: () = redis::cmd("SET")
126 .arg(&app_key)
127 .arg("true")
128 .query(&mut conn)
129 .map_err(|error: RedisError| {
130 AutoCreationError::DatabaseError(format!(
131 "Failed to set Redis initialization key{COLON_SPACE}{error}"
132 ))
133 })?;
134 setup_operations.push(app_key.clone());
135 let config_key: String = format!("{}:config:version", self.instance.name);
136 let _: () = redis::cmd("SET")
137 .arg(&config_key)
138 .arg("1.0.0")
139 .query(&mut conn)
140 .map_err(|error: RedisError| {
141 AutoCreationError::DatabaseError(format!(
142 "Failed to set Redis config key{COLON_SPACE}{error}"
143 ))
144 })?;
145 setup_operations.push(config_key);
146 }
147 Ok(setup_operations)
148 }
149}
150
151impl DatabaseAutoCreation for RedisAutoCreation {
152 #[instrument_trace]
153 async fn create_database_if_not_exists(&self) -> Result<bool, AutoCreationError> {
154 self.validate_redis_server().await?;
155 AutoCreationLogger::log_database_exists(&self.instance.name, database::PluginType::Redis)
156 .await;
157 Ok(false)
158 }
159
160 #[instrument_trace]
161 async fn create_tables_if_not_exist(&self) -> Result<Vec<String>, AutoCreationError> {
162 let setup_operations: Vec<String> = self.setup_redis_namespace().await?;
163 if !setup_operations.is_empty() {
164 AutoCreationLogger::log_tables_created(
165 &setup_operations,
166 &self.instance.name,
167 database::PluginType::Redis,
168 )
169 .await;
170 } else {
171 AutoCreationLogger::log_tables_created(
172 &[],
173 &self.instance.name,
174 database::PluginType::Redis,
175 )
176 .await;
177 }
178 Ok(setup_operations)
179 }
180
181 #[instrument_trace]
182 async fn verify_connection(&self) -> Result<(), AutoCreationError> {
183 match self.validate_redis_server().await {
184 Ok(_) => {
185 AutoCreationLogger::log_connection_verification(
186 database::PluginType::Redis,
187 &self.instance.name,
188 true,
189 None,
190 )
191 .await;
192 Ok(())
193 }
194 Err(error) => {
195 AutoCreationLogger::log_connection_verification(
196 database::PluginType::Redis,
197 &self.instance.name,
198 false,
199 Some(&error.to_string()),
200 )
201 .await;
202 Err(error)
203 }
204 }
205 }
206}