1use std::{
2 collections::HashMap, future::Future, sync::{Arc, Mutex}
3};
4
5use crate::models::Match;
6
7use super::{
8 DataAdapter, Gettable, InfoPublisher, Insertable, Matcher, Publishable, Removable, Searchable,
9 Updateable,
10};
11pub use redis::{Commands, Connection, FromRedisValue, Msg, Pipeline, PubSub, ToRedisArgs};
12use tracing::{error, info};
13
14mod io;
15pub mod publisher;
16
17#[derive(Default, Debug)]
18pub struct MatchProposal {
19 pub found_region: Option<String>,
20 pub found_mode: Option<String>,
21 pub found_game: Option<String>,
22 pub found_players: HashMap<String, Vec<String>>,
23 pub found_ai: Option<bool>,
24}
25
26impl MatchProposal {
27 #[inline]
28 pub fn is_complete(&self) -> bool {
29 return !self.found_players.is_empty()
30 && self.found_region.is_some()
31 && self.found_ai.is_some();
32 }
33}
34
35pub type RedisAdapterDefault = RedisAdapter<redis::Connection>;
36
37pub struct RedisAdapter<I> {
39 pub client: redis::Client,
40 auto_delete: Option<i64>,
41 connection: Arc<Mutex<redis::Connection>>,
42 publisher: Option<Arc<Mutex<dyn InfoPublisher<I> + Send + Sync>>>,
43 handlers: Arc<Mutex<Vec<Arc<dyn Send + Sync + 'static + Fn(Match) -> ()>>>>,
44}
45
46impl<I> From<redis::Client> for RedisAdapter<I> {
47 fn from(client: redis::Client) -> Self {
48 let connection =
49 Arc::new(Mutex::new(client.get_connection().expect(
50 format!("Could not connect to redis server at {:?}", client).as_str(),
51 )));
52 Self {
53 client,
54 connection,
55 publisher: None,
56 handlers: Arc::new(Mutex::new(Vec::new())),
57 auto_delete: None,
58 }
59 }
60}
61
62impl<I> Clone for RedisAdapter<I> {
63 fn clone(&self) -> Self {
64 let client = self.client.clone();
65 Self {
66 connection: Arc::new(Mutex::new(client.get_connection().unwrap())),
67 publisher: self.publisher.clone(),
68 client,
69 handlers: self.handlers.clone(),
70 auto_delete: self.auto_delete,
71 }
72 }
73}
74
75impl<I> RedisAdapter<I>
76where
77 I: 'static,
78 std::string::String: Publishable<I>,
79{
80 pub fn connect(url: &str) -> Result<Self, Box<dyn std::error::Error>> {
93 let client = redis::Client::open(url)?;
94 Ok(Self::from(client))
95 }
96
97 pub fn with_publisher(
98 mut self,
99 publisher: impl InfoPublisher<I> + Send + Sync + 'static,
100 ) -> Self {
101 self.publisher = Some(Arc::new(Mutex::new(publisher)));
102 self
103 }
104
105 pub fn with_auto_timeout(mut self, timeout: i64) -> Self {
106 self.auto_delete = Some(timeout);
107 self
108 }
109
110 pub fn reconnect(&self) -> Result<Connection, Box<dyn std::error::Error>> {
111 Ok(self.client.get_connection()?)
112 }
113
114 pub fn start_match_check(&self) -> tokio::task::JoinHandle<()> {
120 let self_clone = self.clone();
121 tokio::task::spawn(async move {
122 self_clone.match_check().unwrap();
123 })
124 }
125
126 pub fn match_check(self) -> Result<(), Box<dyn std::error::Error>> {
133 let mut connection = self.client.get_connection()?;
135 let mut connection = connection.as_pubsub();
136
137 connection.psubscribe("*:match:*")?;
138 info!("Subscribed to match events");
139
140 self.acc_searchers(connection)
141 }
142
143 fn acc_searchers(mut self, mut connection: PubSub) -> Result<(), Box<dyn std::error::Error>> {
144 let mut match_proposal = MatchProposal::default();
145
146 loop {
148 let msg = connection.get_message().unwrap();
149 info!("Message received: {:?}", msg);
150 self.handle_msg(msg, &mut match_proposal);
151 }
152 }
153
154 fn handle_msg(&mut self, msg: Msg, match_proposal: &mut MatchProposal) {
155 let payload = msg.get_payload::<String>().unwrap();
156
157 info!("Payload: {:?}", payload);
158
159 let channel = msg.get_channel_name().split(":").collect::<Vec<&str>>();
160
161 let last = channel.last().unwrap();
162 let uuid = channel.first().unwrap().to_string();
163
164 if last.to_string() == "region".to_string() {
165 match_proposal.found_region = Some(payload);
166 return;
167 }
168
169 if last.to_string() == "mode".to_string() {
170 match_proposal.found_mode = Some(payload);
171 return;
172 }
173
174 if last.to_string() == "game".to_string() {
175 match_proposal.found_game = Some(payload);
176 return;
177 }
178
179 if last.to_string() == "ai".to_string() {
180 match_proposal.found_ai = Some(payload.parse::<i32>().unwrap() == 1);
181 return;
182 }
183
184 if last.to_string() == "done".to_string() {
185 self.on_done_msg(&uuid, payload, &match_proposal).unwrap();
186 return;
188 }
189
190 if channel.get(channel.len() - 2).unwrap().to_string() == "players".to_string() {
191 if let Some(players) = match_proposal.found_players.get_mut(&uuid) {
192 players.push(payload);
193 return;
194 }
195 match_proposal
196 .found_players
197 .insert(channel.first().unwrap().to_string(), vec![payload]);
198 }
199 }
200
201 fn on_done_msg(
202 &mut self,
203 uuid: &String,
204 payload: String,
205 match_proposal: &MatchProposal,
206 ) -> Result<tokio::task::JoinHandle<()>, Box<dyn std::error::Error>> {
207 let region = match_proposal.found_region.as_ref();
208 if region.is_none() {
209 todo!("Handle the server error accordingly");
210 }
211 let region = region.unwrap().clone();
212
213 let players = match_proposal.found_players.get(uuid);
214 if players.is_none() || players.unwrap().len() == 0 {
215 todo!("Handle the player error accordingly");
216 }
217 let players = players.unwrap().clone();
218 if players.len() as i32 != payload.parse::<i32>()? {
219 todo!("Handle the player count error accordingly");
220 }
221
222 let new_match = Match {
223 region,
224 players: players.clone(),
225 mode: match_proposal.found_mode.as_ref().unwrap().clone(),
226 game: match_proposal.found_game.as_ref().unwrap().clone(),
227 ai: match_proposal.found_ai.unwrap(),
228 };
229
230 let handles: Vec<_> = self
231 .handlers
232 .lock()
233 .unwrap()
234 .iter()
235 .cloned()
236 .collect::<Vec<_>>()
237 .into_iter()
238 .map(move |fun| {
239 let match_clone = new_match.clone();
240 tokio::task::spawn(async move { fun(match_clone) })
241 })
242 .collect();
243
244 let self_clone = self.clone();
245 Ok(tokio::task::spawn(async move {
246 for handle in handles {
248 handle.await.unwrap();
249 }
250
251 players.iter().for_each(|player| {
252 if let Err(err) =
253 self_clone.remove(&player.splitn(3, ":").take(2).collect::<Vec<_>>().join(":"))
254 {
255 error!("Error removing player 'uuid: {}': {}", player, err);
256 }
257 });
258 }))
259 }
260}
261
262pub trait RedisFilter<T> {
263 fn is_ok(&self, check: &T) -> bool;
264}
265
266pub trait RedisUpdater<T> {
267 fn update(&self, pipe: &mut Pipeline, uuid: &str) -> Result<(), Box<dyn std::error::Error>>;
268}
269
270pub trait RedisIdentifiable {
271 fn name() -> String;
272 fn next_uuid(connection: &mut Connection) -> Result<String, Box<dyn std::error::Error>> {
273 let counter: i64 = connection.incr("uuid_inc", 1)?;
274 Ok(format!("{}:{}", counter, Self::name()))
275 }
276}
277
278pub trait RedisExpireable {
279 fn expire(&self, pipe: &mut Pipeline, base_key: &str, timeout: i64) -> Result<(), Box<dyn std::error::Error>>;
280}
281
282pub trait RedisInsertWriter {
283 fn write(&self, pipe: &mut Pipeline, base_key: &str) -> Result<(), Box<dyn std::error::Error>>;
284}
285
286pub trait NotifyOnRedisEvent<I> {
290 fn on_update<T>(
291 connection: &RedisAdapter<I>,
292 handler: impl FnMut(T) -> () + Send + Sync + 'static,
293 ) -> Result<tokio::task::JoinHandle<()>, Box<dyn std::error::Error>>
294 where
295 T: FromRedisValue;
296
297 fn on_insert<T>(
298 connection: &RedisAdapter<I>,
299 handler: impl FnMut(T) -> () + Send + Sync + 'static,
300 ) -> Result<tokio::task::JoinHandle<()>, Box<dyn std::error::Error>>
301 where
302 T: FromRedisValue;
303
304 fn on_delete<T>(
305 connection: &RedisAdapter<I>,
306 handler: impl FnMut(T) -> () + Send + Sync + 'static,
307 ) -> Result<tokio::task::JoinHandle<()>, Box<dyn std::error::Error>>
308 where
309 T: FromRedisValue;
310}
311
312pub trait RedisOutputReader
313where
314 Self: Sized,
315{
316 fn read(
317 connection: &mut Connection,
318 base_key: &str,
319 ) -> Result<Self, Box<dyn std::error::Error>>;
320}
321
322impl<I> Removable for RedisAdapter<I>
323where
324 std::string::String: Publishable<I>,
325{
326 fn remove(&self, uuid: &str) -> Result<(), Box<dyn std::error::Error>> {
327 let mut connection = self.connection.lock().unwrap();
328 let iter = connection
329 .scan_match(format!("{}*", uuid))?
330 .into_iter()
331 .collect::<Vec<String>>();
332
333 redis::transaction(&mut connection, iter.as_slice(), |conn, pipe| {
334 iter.iter().for_each(|key| {
335 pipe.del(key).ignore();
336 });
337 pipe.query(conn)
338 })?;
339
340 if let Some(publisher) = self.publisher.as_ref() {
341 publisher
342 .lock()
343 .unwrap()
344 .publish(&uuid.to_string(), format!("remove:{uuid}"))?;
345 }
346 Ok(())
347 }
348}
349
350impl<T, I> Insertable<T> for RedisAdapter<I>
351where
352 T: RedisInsertWriter + RedisExpireable + RedisIdentifiable + Clone,
353 std::string::String: Publishable<I>,
354{
355 fn insert(&self, data: T) -> Result<String, Box<dyn std::error::Error>> {
356 let key = { T::next_uuid(&mut self.connection.lock().unwrap())? };
357
358 let mut pipe = redis::pipe();
359 pipe.atomic();
360 data.write(&mut pipe, &key)?;
361 pipe.set(key.clone(), "");
362
363 if let Some(auto_delete) = self.auto_delete {
364 pipe.expire(key.clone(), auto_delete);
365 data.expire(&mut pipe, &key, auto_delete)?;
366 }
367
368 'query: {
369 let mut connection = self.connection.lock().unwrap();
370 pipe.query(&mut connection)?;
371
372 if self.publisher.is_none() {
373 break 'query;
374 }
375
376 self.publisher
377 .as_ref()
378 .unwrap()
379 .lock()
380 .unwrap()
381 .publish(&key, format!("insert:{key}"))?;
382 }
383
384 let mut split = key.split(":");
385 Ok(split
386 .next()
387 .expect(format!("Invalid id on object of type {}", T::name()).as_str())
388 .to_string()
389 + ":"
390 + split
391 .next()
392 .expect(format!("Invalid id on object of type {}", T::name()).as_str()))
393 }
394}
395
396impl<'a, O, I> Gettable<'a, O> for RedisAdapter<I>
397where
398 O: RedisOutputReader + RedisIdentifiable,
399{
400 type Type = Box<dyn Iterator<Item = O> + 'a>;
401
402 fn all(&'a self) -> Result<Self::Type, Box<dyn std::error::Error>> {
403 let mut iter = self
404 .connection
405 .lock()
406 .unwrap()
407 .scan_match(format!("*:{}", O::name()))?
408 .collect::<Vec<String>>()
409 .into_iter();
410
411 let connection_ref = self.connection.clone();
412 let iter_fun = std::iter::from_fn(move || {
413 if let Some(key) = iter.next() {
414 let res = O::read(&mut connection_ref.lock().unwrap(), &key).ok()?;
415 return Some(res);
416 }
417 None
418 });
419
420 Ok(Box::new(iter_fun))
421 }
422
423 fn get(&self, uuid: &str) -> Result<O, Box<dyn std::error::Error>> {
424 O::read(&mut self.connection.lock().unwrap(), uuid)
425 }
426}
427
428impl<'a, O, F, I> Searchable<'a, O, F> for RedisAdapter<I>
429where
430 O: RedisOutputReader + RedisIdentifiable + 'a,
431 F: RedisFilter<O> + Default + 'a,
432{
433 type Type = Box<dyn Iterator<Item = O> + 'a>;
434
435 fn filter(&'a self, filter: F) -> Result<Self::Type, Box<dyn std::error::Error>> {
436 let mut iter = self
437 .connection
438 .lock()
439 .unwrap()
440 .scan_match(format!("*:{}", O::name()))?
441 .collect::<Vec<String>>()
442 .into_iter();
443
444 let connection_ref = self.connection.clone();
445 let iter = std::iter::from_fn(move || {
446 while let Some(key) = iter.next() {
447 let res = O::read(&mut connection_ref.lock().unwrap(), &key).ok()?;
448 if filter.is_ok(&res) {
449 return Some(res);
450 }
451 }
452 None
453 });
454
455 Ok(Box::new(iter))
456 }
457}
458
459impl<T, U, I> Updateable<T, U> for RedisAdapter<I>
460where
461 U: RedisUpdater<T> + Clone,
462 std::string::String: Publishable<I>,
463{
464 fn update(&self, uuid: &str, data: U) -> Result<(), Box<dyn std::error::Error>> {
465 let mut pipe = redis::pipe();
466 pipe.atomic();
467 data.clone().update(&mut pipe, uuid)?;
468
469 let mut connection = self.connection.lock().unwrap();
470 pipe.query(&mut connection)?;
471
472 if let Some(publisher) = self.publisher.as_ref() {
473 publisher
474 .lock()
475 .unwrap()
476 .publish(&uuid.to_string(), format!("update:{uuid}"))?;
477 }
478 Ok(())
479 }
480}
481
482impl<I> Matcher for RedisAdapter<I> {
483 fn on_match<T>(&self, handler: T)
485 where
486 T: Send + Sync + 'static + Fn(Match) -> (),
487 {
488 self.handlers.lock().unwrap().push(Arc::new(handler));
489 }
490}
491
492impl<'a, T, O, F, U> DataAdapter<'a, T, O, F, U> for RedisAdapter<redis::Connection>
493where
494 T: Clone + RedisInsertWriter + RedisExpireable + RedisIdentifiable + 'a,
495 O: RedisOutputReader + RedisIdentifiable + 'a,
496 F: RedisFilter<O> + Default + 'a,
497 U: RedisUpdater<T> + Clone + 'a,
498{
499}