Skip to main content

rlstatsapi/
client.rs

1use futures_util::stream::Stream;
2use serde_json::Value;
3use tokio::io::{AsyncReadExt, AsyncWriteExt};
4use tokio::net::TcpStream;
5use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf};
6use tokio::time::{Duration, sleep};
7
8use crate::config::{
9    ClientOptions, ConnectionConfig, prepare_connection_config,
10};
11use crate::error::RlStatsError;
12use crate::events::{StatsEvent, parse_stats_event_value};
13use crate::filters::EventFilter;
14
15pub struct RocketLeagueStatsClient {
16    reader: OwnedReadHalf,
17    writer: OwnedWriteHalf,
18    read_buffer: Vec<u8>,
19    connection: ConnectionConfig,
20}
21
22impl RocketLeagueStatsClient {
23    pub async fn connect(options: ClientOptions) -> Result<Self, RlStatsError> {
24        let connection = prepare_connection_config(&options)?;
25        let (reader, writer) = connect_socket(&connection).await?;
26
27        Ok(Self {
28            reader,
29            writer,
30            read_buffer: Vec::new(),
31            connection,
32        })
33    }
34
35    pub fn connection(&self) -> &ConnectionConfig {
36        &self.connection
37    }
38
39    pub async fn connect_with_retry(
40        options: ClientOptions,
41        max_attempts: usize,
42        retry_delay: Duration,
43    ) -> Result<Self, RlStatsError> {
44        let attempts = max_attempts.max(1);
45
46        for attempt in 0..attempts {
47            match Self::connect(options.clone()).await {
48                Ok(client) => return Ok(client),
49                Err(error) => {
50                    if attempt + 1 == attempts {
51                        return Err(error);
52                    }
53
54                    sleep(retry_delay).await;
55                }
56            }
57        }
58
59        unreachable!("attempts is always >= 1")
60    }
61
62    pub async fn reconnect(&mut self) -> Result<(), RlStatsError> {
63        let (reader, writer) = connect_socket(&self.connection).await?;
64        self.reader = reader;
65        self.writer = writer;
66        self.read_buffer.clear();
67        Ok(())
68    }
69
70    pub async fn next_event(
71        &mut self,
72    ) -> Result<Option<StatsEvent>, RlStatsError> {
73        loop {
74            if let Some(event) = self.try_parse_event_from_buffer()? {
75                return Ok(Some(event));
76            }
77
78            let bytes_read =
79                self.reader.read_buf(&mut self.read_buffer).await?;
80            if bytes_read == 0 {
81                return self.try_parse_event_from_buffer();
82            }
83        }
84    }
85
86    pub async fn next_filtered_event(
87        &mut self,
88        filter: &EventFilter,
89    ) -> Result<Option<StatsEvent>, RlStatsError> {
90        loop {
91            match self.next_event().await? {
92                Some(event) => {
93                    if filter.matches(&event) {
94                        return Ok(Some(event));
95                    }
96                }
97                None => return Ok(None),
98            }
99        }
100    }
101
102    fn try_parse_event_from_buffer(
103        &mut self,
104    ) -> Result<Option<StatsEvent>, RlStatsError> {
105        discard_leading_whitespace(&mut self.read_buffer);
106
107        if self.read_buffer.is_empty() {
108            return Ok(None);
109        }
110
111        let mut stream =
112            serde_json::Deserializer::from_slice(&self.read_buffer)
113                .into_iter::<Value>();
114
115        match stream.next() {
116            Some(Ok(value)) => {
117                let consumed = stream.byte_offset();
118                self.read_buffer.drain(0..consumed);
119                Ok(Some(parse_stats_event_value(value)?))
120            }
121            Some(Err(error)) if error.is_eof() => Ok(None),
122            Some(Err(error)) => Err(RlStatsError::Json(error)),
123            None => Ok(None),
124        }
125    }
126
127    pub fn into_event_stream(
128        self,
129    ) -> impl Stream<Item = Result<StatsEvent, RlStatsError>> {
130        futures_util::stream::unfold(self, |mut client| async move {
131            match client.next_event().await {
132                Ok(Some(event)) => Some((Ok(event), client)),
133                Ok(None) => None,
134                Err(error) => Some((Err(error), client)),
135            }
136        })
137    }
138
139    pub fn into_filtered_event_stream(
140        self,
141        filter: EventFilter,
142    ) -> impl Stream<Item = Result<StatsEvent, RlStatsError>> {
143        futures_util::stream::unfold(
144            (self, filter),
145            |(mut client, filter)| async move {
146                loop {
147                    match client.next_event().await {
148                        Ok(Some(event)) => {
149                            if filter.matches(&event) {
150                                return Some((Ok(event), (client, filter)));
151                            }
152                        }
153                        Ok(None) => return None,
154                        Err(error) => {
155                            return Some((Err(error), (client, filter)));
156                        }
157                    }
158                }
159            },
160        )
161    }
162
163    pub async fn close(mut self) -> Result<(), RlStatsError> {
164        self.writer.shutdown().await?;
165        Ok(())
166    }
167}
168
169async fn connect_socket(
170    connection: &ConnectionConfig,
171) -> Result<(OwnedReadHalf, OwnedWriteHalf), RlStatsError> {
172    let stream = TcpStream::connect(connection.socket_address()).await?;
173    let (reader, writer) = stream.into_split();
174
175    Ok((reader, writer))
176}
177
178fn discard_leading_whitespace(buffer: &mut Vec<u8>) {
179    let to_drop = buffer
180        .iter()
181        .take_while(|byte| byte.is_ascii_whitespace())
182        .count();
183
184    if to_drop > 0 {
185        buffer.drain(0..to_drop);
186    }
187}