Skip to main content

gn_matchmaking_state/adapters/
redis.rs

1use std::{
2    collections::HashMap, future::Future, sync::{Arc, Mutex}
3};
4
5use crate::models::Match;
6
7use super::{
8    DataAdapter, Gettable, InfoPublisher, Insertable, Matcher, Publishable, Removable, Searchable,
9    Updateable,
10};
11pub use redis::{Commands, Connection, FromRedisValue, Msg, Pipeline, PubSub, ToRedisArgs};
12use tracing::{error, info};
13
14mod io;
15pub mod publisher;
16
17#[derive(Default, Debug)]
18pub struct MatchProposal {
19    pub found_region: Option<String>,
20    pub found_mode: Option<String>,
21    pub found_game: Option<String>,
22    pub found_players: HashMap<String, Vec<String>>,
23    pub found_ai: Option<bool>,
24}
25
26impl MatchProposal {
27    #[inline]
28    pub fn is_complete(&self) -> bool {
29        return !self.found_players.is_empty()
30            && self.found_region.is_some()
31            && self.found_ai.is_some();
32    }
33}
34
35pub type RedisAdapterDefault = RedisAdapter<redis::Connection>;
36
37// TODO: There are definetly some thread-mutability issues in the RedisAdapter due to the excesive use of Arc<Mutex>. Fix this in a #Refractoring
38pub struct RedisAdapter<I> {
39    pub client: redis::Client,
40    auto_delete: Option<i64>,
41    connection: Arc<Mutex<redis::Connection>>,
42    publisher: Option<Arc<Mutex<dyn InfoPublisher<I> + Send + Sync>>>,
43    handlers: Arc<Mutex<Vec<Arc<dyn Send + Sync + 'static + Fn(Match) -> ()>>>>,
44}
45
46impl<I> From<redis::Client> for RedisAdapter<I> {
47    fn from(client: redis::Client) -> Self {
48        let connection =
49            Arc::new(Mutex::new(client.get_connection().expect(
50                format!("Could not connect to redis server at {:?}", client).as_str(),
51            )));
52        Self {
53            client,
54            connection,
55            publisher: None,
56            handlers: Arc::new(Mutex::new(Vec::new())),
57            auto_delete: None,
58        }
59    }
60}
61
62impl<I> Clone for RedisAdapter<I> {
63    fn clone(&self) -> Self {
64        let client = self.client.clone();
65        Self {
66            connection: Arc::new(Mutex::new(client.get_connection().unwrap())),
67            publisher: self.publisher.clone(),
68            client,
69            handlers: self.handlers.clone(),
70            auto_delete: self.auto_delete,
71        }
72    }
73}
74
75impl<I> RedisAdapter<I>
76where
77    I: 'static,
78    std::string::String: Publishable<I>,
79{
80    /// Connects to a redis server using the given url.
81
82    ///
83    /// # Arguments
84    ///
85    /// * `url` - The url to connect to the redis server.
86    ///     - *format*: `redis://[<username>][:<password>@]<hostname>[:port][/<db>]`
87    ///     - *example*: `redis://john:password@127.0.0.1:6379/0`
88    ///
89    /// # Returns
90    ///
91    /// A `Result` with the any connection error. If Ok a new `RedisAdapter` object is returned.
92    pub fn connect(url: &str) -> Result<Self, Box<dyn std::error::Error>> {
93        let client = redis::Client::open(url)?;
94        Ok(Self::from(client))
95    }
96
97    pub fn with_publisher(
98        mut self,
99        publisher: impl InfoPublisher<I> + Send + Sync + 'static,
100    ) -> Self {
101        self.publisher = Some(Arc::new(Mutex::new(publisher)));
102        self
103    }
104
105    pub fn with_auto_timeout(mut self, timeout: i64) -> Self {
106        self.auto_delete = Some(timeout);
107        self
108    }
109
110    pub fn reconnect(&self) -> Result<Connection, Box<dyn std::error::Error>> {
111        Ok(self.client.get_connection()?)
112    }
113
114    /// Starts the match check in a new task. Creates a new seperate connection to the redis server.
115    ///
116    /// # Returns
117    ///
118    /// A `tokio::task::JoinHandle` that represents the spawned task.
119    pub fn start_match_check(&self) -> tokio::task::JoinHandle<()> {
120        let self_clone = self.clone();
121        tokio::task::spawn(async move {
122            self_clone.match_check().unwrap();
123        })
124    }
125
126    /// Starts the match check in the current thread. Creates a new seperate connection to the redis server using it as a pubsub connection for events.
127    ///
128    /// # Returns
129    ///
130    /// A `Result` with the error if any occured. Under normal conditions this function will not exit and therefore the result should be `!`.
131    /// This is currently an experimental feature in Rust and therefore not implemented here yet.
132    pub fn match_check(self) -> Result<(), Box<dyn std::error::Error>> {
133        // NOTE: Result should be '!' for Ok values. This is currently expermintal tough and therefore not implemented here.
134        let mut connection = self.client.get_connection()?;
135        let mut connection = connection.as_pubsub();
136
137        connection.psubscribe("*:match:*")?;
138        info!("Subscribed to match events");
139
140        self.acc_searchers(connection)
141    }
142
143    fn acc_searchers(mut self, mut connection: PubSub) -> Result<(), Box<dyn std::error::Error>> {
144        let mut match_proposal = MatchProposal::default();
145
146        // TODO: Multithread this as soon as the problem with the order of messages is fixed.
147        loop {
148            let msg = connection.get_message().unwrap();
149            info!("Message received: {:?}", msg);
150            self.handle_msg(msg, &mut match_proposal);
151        }
152    }
153
154    fn handle_msg(&mut self, msg: Msg, match_proposal: &mut MatchProposal) {
155        let payload = msg.get_payload::<String>().unwrap();
156
157        info!("Payload: {:?}", payload);
158
159        let channel = msg.get_channel_name().split(":").collect::<Vec<&str>>();
160
161        let last = channel.last().unwrap();
162        let uuid = channel.first().unwrap().to_string();
163
164        if last.to_string() == "region".to_string() {
165            match_proposal.found_region = Some(payload);
166            return;
167        }
168
169        if last.to_string() == "mode".to_string() {
170            match_proposal.found_mode = Some(payload);
171            return;
172        }
173
174        if last.to_string() == "game".to_string() {
175            match_proposal.found_game = Some(payload);
176            return;
177        }
178
179        if last.to_string() == "ai".to_string() {
180            match_proposal.found_ai = Some(payload.parse::<i32>().unwrap() == 1);
181            return;
182        }
183
184        if last.to_string() == "done".to_string() {
185            self.on_done_msg(&uuid, payload, &match_proposal).unwrap();
186            // TODO: The order of messages is likely but not guaranteed. This could be a potential error and should be handled accordingly.
187            return;
188        }
189
190        if channel.get(channel.len() - 2).unwrap().to_string() == "players".to_string() {
191            if let Some(players) = match_proposal.found_players.get_mut(&uuid) {
192                players.push(payload);
193                return;
194            }
195            match_proposal
196                .found_players
197                .insert(channel.first().unwrap().to_string(), vec![payload]);
198        }
199    }
200
201    fn on_done_msg(
202        &mut self,
203        uuid: &String,
204        payload: String,
205        match_proposal: &MatchProposal,
206    ) -> Result<tokio::task::JoinHandle<()>, Box<dyn std::error::Error>> {
207        let region = match_proposal.found_region.as_ref();
208        if region.is_none() {
209            todo!("Handle the server error accordingly");
210        }
211        let region = region.unwrap().clone();
212
213        let players = match_proposal.found_players.get(uuid);
214        if players.is_none() || players.unwrap().len() == 0 {
215            todo!("Handle the player error accordingly");
216        }
217        let players = players.unwrap().clone();
218        if players.len() as i32 != payload.parse::<i32>()? {
219            todo!("Handle the player count error accordingly");
220        }
221
222        let new_match = Match {
223            region,
224            players: players.clone(),
225            mode: match_proposal.found_mode.as_ref().unwrap().clone(),
226            game: match_proposal.found_game.as_ref().unwrap().clone(),
227            ai: match_proposal.found_ai.unwrap(),
228        };
229
230        let handles: Vec<_> = self
231            .handlers
232            .lock()
233            .unwrap()
234            .iter()
235            .cloned()
236            .collect::<Vec<_>>()
237            .into_iter()
238            .map(move |fun| {
239                let match_clone = new_match.clone();
240                tokio::task::spawn(async move { fun(match_clone) })
241            })
242            .collect();
243
244        let self_clone = self.clone();
245        Ok(tokio::task::spawn(async move {
246            // TODO: Tasks should be joined in async
247            for handle in handles {
248                handle.await.unwrap();
249            }
250
251            players.iter().for_each(|player| {
252                if let Err(err) =
253                    self_clone.remove(&player.splitn(3, ":").take(2).collect::<Vec<_>>().join(":"))
254                {
255                    error!("Error removing player 'uuid: {}': {}", player, err);
256                }
257            });
258        }))
259    }
260}
261
262pub trait RedisFilter<T> {
263    fn is_ok(&self, check: &T) -> bool;
264}
265
266pub trait RedisUpdater<T> {
267    fn update(&self, pipe: &mut Pipeline, uuid: &str) -> Result<(), Box<dyn std::error::Error>>;
268}
269
270pub trait RedisIdentifiable {
271    fn name() -> String;
272    fn next_uuid(connection: &mut Connection) -> Result<String, Box<dyn std::error::Error>> {
273        let counter: i64 = connection.incr("uuid_inc", 1)?;
274        Ok(format!("{}:{}", counter, Self::name()))
275    }
276}
277
278pub trait RedisExpireable {
279    fn expire(&self, pipe: &mut Pipeline, base_key: &str, timeout: i64) -> Result<(), Box<dyn std::error::Error>>;
280}
281
282pub trait RedisInsertWriter {
283    fn write(&self, pipe: &mut Pipeline, base_key: &str) -> Result<(), Box<dyn std::error::Error>>;
284}
285
286/// TODO: Currently functions in this trait require the arguments to be static. This solution prohibits removing existing handlers.
287/// This should be fixed by using some Context-Manager which provides the PubSub connection and handler. When the Context-Manager is dropped the handler and handler-thread should be killed.
288/// This trait should also be moved to the super-module
289pub trait NotifyOnRedisEvent<I> {
290    fn on_update<T>(
291        connection: &RedisAdapter<I>,
292        handler: impl FnMut(T) -> () + Send + Sync + 'static,
293    ) -> Result<tokio::task::JoinHandle<()>, Box<dyn std::error::Error>>
294    where
295        T: FromRedisValue;
296
297    fn on_insert<T>(
298        connection: &RedisAdapter<I>,
299        handler: impl FnMut(T) -> () + Send + Sync + 'static,
300    ) -> Result<tokio::task::JoinHandle<()>, Box<dyn std::error::Error>>
301    where
302        T: FromRedisValue;
303
304    fn on_delete<T>(
305        connection: &RedisAdapter<I>,
306        handler: impl FnMut(T) -> () + Send + Sync + 'static,
307    ) -> Result<tokio::task::JoinHandle<()>, Box<dyn std::error::Error>>
308    where
309        T: FromRedisValue;
310}
311
312pub trait RedisOutputReader
313where
314    Self: Sized,
315{
316    fn read(
317        connection: &mut Connection,
318        base_key: &str,
319    ) -> Result<Self, Box<dyn std::error::Error>>;
320}
321
322impl<I> Removable for RedisAdapter<I>
323where
324    std::string::String: Publishable<I>,
325{
326    fn remove(&self, uuid: &str) -> Result<(), Box<dyn std::error::Error>> {
327        let mut connection = self.connection.lock().unwrap();
328        let iter = connection
329            .scan_match(format!("{}*", uuid))?
330            .into_iter()
331            .collect::<Vec<String>>();
332
333        redis::transaction(&mut connection, iter.as_slice(), |conn, pipe| {
334            iter.iter().for_each(|key| {
335                pipe.del(key).ignore();
336            });
337            pipe.query(conn)
338        })?;
339
340        if let Some(publisher) = self.publisher.as_ref() {
341            publisher
342                .lock()
343                .unwrap()
344                .publish(&uuid.to_string(), format!("remove:{uuid}"))?;
345        }
346        Ok(())
347    }
348}
349
350impl<T, I> Insertable<T> for RedisAdapter<I>
351where
352    T: RedisInsertWriter + RedisExpireable + RedisIdentifiable + Clone,
353    std::string::String: Publishable<I>,
354{
355    fn insert(&self, data: T) -> Result<String, Box<dyn std::error::Error>> {
356        let key = { T::next_uuid(&mut self.connection.lock().unwrap())? };
357
358        let mut pipe = redis::pipe();
359        pipe.atomic();
360        data.write(&mut pipe, &key)?;
361        pipe.set(key.clone(), "");
362
363        if let Some(auto_delete) = self.auto_delete {
364            pipe.expire(key.clone(), auto_delete);
365            data.expire(&mut pipe, &key, auto_delete)?;
366        }
367
368        'query: {
369            let mut connection = self.connection.lock().unwrap();
370            pipe.query(&mut connection)?;
371
372            if self.publisher.is_none() {
373                break 'query;
374            }
375
376            self.publisher
377                .as_ref()
378                .unwrap()
379                .lock()
380                .unwrap()
381                .publish(&key, format!("insert:{key}"))?;
382        }
383
384        let mut split = key.split(":");
385        Ok(split
386            .next()
387            .expect(format!("Invalid id on object of type {}", T::name()).as_str())
388            .to_string()
389            + ":"
390            + split
391                .next()
392                .expect(format!("Invalid id on object of type {}", T::name()).as_str()))
393    }
394}
395
396impl<'a, O, I> Gettable<'a, O> for RedisAdapter<I>
397where
398    O: RedisOutputReader + RedisIdentifiable,
399{
400    type Type = Box<dyn Iterator<Item = O> + 'a>;
401
402    fn all(&'a self) -> Result<Self::Type, Box<dyn std::error::Error>> {
403        let mut iter = self
404            .connection
405            .lock()
406            .unwrap()
407            .scan_match(format!("*:{}", O::name()))?
408            .collect::<Vec<String>>()
409            .into_iter();
410
411        let connection_ref = self.connection.clone();
412        let iter_fun = std::iter::from_fn(move || {
413            if let Some(key) = iter.next() {
414                let res = O::read(&mut connection_ref.lock().unwrap(), &key).ok()?;
415                return Some(res);
416            }
417            None
418        });
419
420        Ok(Box::new(iter_fun))
421    }
422
423    fn get(&self, uuid: &str) -> Result<O, Box<dyn std::error::Error>> {
424        O::read(&mut self.connection.lock().unwrap(), uuid)
425    }
426}
427
428impl<'a, O, F, I> Searchable<'a, O, F> for RedisAdapter<I>
429where
430    O: RedisOutputReader + RedisIdentifiable + 'a,
431    F: RedisFilter<O> + Default + 'a,
432{
433    type Type = Box<dyn Iterator<Item = O> + 'a>;
434
435    fn filter(&'a self, filter: F) -> Result<Self::Type, Box<dyn std::error::Error>> {
436        let mut iter = self
437            .connection
438            .lock()
439            .unwrap()
440            .scan_match(format!("*:{}", O::name()))?
441            .collect::<Vec<String>>()
442            .into_iter();
443
444        let connection_ref = self.connection.clone();
445        let iter = std::iter::from_fn(move || {
446            while let Some(key) = iter.next() {
447                let res = O::read(&mut connection_ref.lock().unwrap(), &key).ok()?;
448                if filter.is_ok(&res) {
449                    return Some(res);
450                }
451            }
452            None
453        });
454
455        Ok(Box::new(iter))
456    }
457}
458
459impl<T, U, I> Updateable<T, U> for RedisAdapter<I>
460where
461    U: RedisUpdater<T> + Clone,
462    std::string::String: Publishable<I>,
463{
464    fn update(&self, uuid: &str, data: U) -> Result<(), Box<dyn std::error::Error>> {
465        let mut pipe = redis::pipe();
466        pipe.atomic();
467        data.clone().update(&mut pipe, uuid)?;
468
469        let mut connection = self.connection.lock().unwrap();
470        pipe.query(&mut connection)?;
471
472        if let Some(publisher) = self.publisher.as_ref() {
473            publisher
474                .lock()
475                .unwrap()
476                .publish(&uuid.to_string(), format!("update:{uuid}"))?;
477        }
478        Ok(())
479    }
480}
481
482impl<I> Matcher for RedisAdapter<I> {
483    // NOTE: This function is a temporary inefficient implementation and will be migrated to a server-side lua script using channels
484    fn on_match<T>(&self, handler: T)
485    where
486        T: Send + Sync + 'static + Fn(Match) -> (),
487    {
488        self.handlers.lock().unwrap().push(Arc::new(handler));
489    }
490}
491
492impl<'a, T, O, F, U> DataAdapter<'a, T, O, F, U> for RedisAdapter<redis::Connection>
493where
494    T: Clone + RedisInsertWriter + RedisExpireable + RedisIdentifiable + 'a,
495    O: RedisOutputReader + RedisIdentifiable + 'a,
496    F: RedisFilter<O> + Default + 'a,
497    U: RedisUpdater<T> + Clone + 'a,
498{
499}