1use futures_util::stream::Stream;
2use serde_json::Value;
3use tokio::io::{AsyncReadExt, AsyncWriteExt};
4use tokio::net::TcpStream;
5use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf};
6use tokio::time::{Duration, sleep};
7
8use crate::config::{
9 ClientOptions, ConnectionConfig, prepare_connection_config,
10};
11use crate::error::RlStatsError;
12use crate::events::{StatsEvent, parse_stats_event_value};
13use crate::filters::EventFilter;
14
15pub struct RocketLeagueStatsClient {
16 reader: OwnedReadHalf,
17 writer: OwnedWriteHalf,
18 read_buffer: Vec<u8>,
19 connection: ConnectionConfig,
20}
21
22impl RocketLeagueStatsClient {
23 pub async fn connect(options: ClientOptions) -> Result<Self, RlStatsError> {
24 let connection = prepare_connection_config(&options)?;
25 let (reader, writer) = connect_socket(&connection).await?;
26
27 Ok(Self {
28 reader,
29 writer,
30 read_buffer: Vec::new(),
31 connection,
32 })
33 }
34
35 pub fn connection(&self) -> &ConnectionConfig {
36 &self.connection
37 }
38
39 pub async fn connect_with_retry(
40 options: ClientOptions,
41 max_attempts: usize,
42 retry_delay: Duration,
43 ) -> Result<Self, RlStatsError> {
44 let attempts = max_attempts.max(1);
45
46 for attempt in 0..attempts {
47 match Self::connect(options.clone()).await {
48 Ok(client) => return Ok(client),
49 Err(error) => {
50 if attempt + 1 == attempts {
51 return Err(error);
52 }
53
54 sleep(retry_delay).await;
55 }
56 }
57 }
58
59 unreachable!("attempts is always >= 1")
60 }
61
62 pub async fn reconnect(&mut self) -> Result<(), RlStatsError> {
63 let (reader, writer) = connect_socket(&self.connection).await?;
64 self.reader = reader;
65 self.writer = writer;
66 self.read_buffer.clear();
67 Ok(())
68 }
69
70 pub async fn next_event(
71 &mut self,
72 ) -> Result<Option<StatsEvent>, RlStatsError> {
73 loop {
74 if let Some(event) = self.try_parse_event_from_buffer()? {
75 return Ok(Some(event));
76 }
77
78 let bytes_read =
79 self.reader.read_buf(&mut self.read_buffer).await?;
80 if bytes_read == 0 {
81 return self.try_parse_event_from_buffer();
82 }
83 }
84 }
85
86 pub async fn next_filtered_event(
87 &mut self,
88 filter: &EventFilter,
89 ) -> Result<Option<StatsEvent>, RlStatsError> {
90 loop {
91 match self.next_event().await? {
92 Some(event) => {
93 if filter.matches(&event) {
94 return Ok(Some(event));
95 }
96 }
97 None => return Ok(None),
98 }
99 }
100 }
101
102 fn try_parse_event_from_buffer(
103 &mut self,
104 ) -> Result<Option<StatsEvent>, RlStatsError> {
105 discard_leading_whitespace(&mut self.read_buffer);
106
107 if self.read_buffer.is_empty() {
108 return Ok(None);
109 }
110
111 let mut stream =
112 serde_json::Deserializer::from_slice(&self.read_buffer)
113 .into_iter::<Value>();
114
115 match stream.next() {
116 Some(Ok(value)) => {
117 let consumed = stream.byte_offset();
118 self.read_buffer.drain(0..consumed);
119 Ok(Some(parse_stats_event_value(value)?))
120 }
121 Some(Err(error)) if error.is_eof() => Ok(None),
122 Some(Err(error)) => Err(RlStatsError::Json(error)),
123 None => Ok(None),
124 }
125 }
126
127 pub fn into_event_stream(
128 self,
129 ) -> impl Stream<Item = Result<StatsEvent, RlStatsError>> {
130 futures_util::stream::unfold(self, |mut client| async move {
131 match client.next_event().await {
132 Ok(Some(event)) => Some((Ok(event), client)),
133 Ok(None) => None,
134 Err(error) => Some((Err(error), client)),
135 }
136 })
137 }
138
139 pub fn into_filtered_event_stream(
140 self,
141 filter: EventFilter,
142 ) -> impl Stream<Item = Result<StatsEvent, RlStatsError>> {
143 futures_util::stream::unfold(
144 (self, filter),
145 |(mut client, filter)| async move {
146 loop {
147 match client.next_event().await {
148 Ok(Some(event)) => {
149 if filter.matches(&event) {
150 return Some((Ok(event), (client, filter)));
151 }
152 }
153 Ok(None) => return None,
154 Err(error) => {
155 return Some((Err(error), (client, filter)));
156 }
157 }
158 }
159 },
160 )
161 }
162
163 pub async fn close(mut self) -> Result<(), RlStatsError> {
164 self.writer.shutdown().await?;
165 Ok(())
166 }
167}
168
169async fn connect_socket(
170 connection: &ConnectionConfig,
171) -> Result<(OwnedReadHalf, OwnedWriteHalf), RlStatsError> {
172 let stream = TcpStream::connect(connection.socket_address()).await?;
173 let (reader, writer) = stream.into_split();
174
175 Ok((reader, writer))
176}
177
178fn discard_leading_whitespace(buffer: &mut Vec<u8>) {
179 let to_drop = buffer
180 .iter()
181 .take_while(|byte| byte.is_ascii_whitespace())
182 .count();
183
184 if to_drop > 0 {
185 buffer.drain(0..to_drop);
186 }
187}