strecken_info/request/
revision.rs1use std::{
27 io::ErrorKind,
28 time::{Duration, SystemTime},
29};
30
31use futures_util::{SinkExt, StreamExt};
32use serde::Deserialize;
33use tokio::net::TcpStream;
34use tokio_tungstenite::{
35 connect_async,
36 tungstenite::{error::ProtocolError, Error, Message},
37 MaybeTlsStream, WebSocketStream,
38};
39
40use crate::error::StreckenInfoError;
41
42const WEBSOCKET_PATH: &str = "wss://strecken-info.de/api/websocket";
43
44pub struct RevisionContext {
45 stream: WebSocketStream<MaybeTlsStream<TcpStream>>,
46 old_revision: Option<u32>,
47}
48
49#[derive(Deserialize)]
50struct FirstRevisionJson {
51 revision: u32,
52}
53
54#[derive(Deserialize)]
55#[serde(tag = "type")]
56enum UpdateJson {
57 #[serde(alias = "NEW_REVISION")]
58 NewRevision {
59 revision: UpdateRevisionJson,
60 },
61 Other(()),
62}
63
64#[derive(Deserialize)]
65struct UpdateRevisionJson {
66 #[serde(alias = "nummer")]
67 number: u32,
68 #[serde(alias = "stoerungen")]
69 disruptions: Vec<serde_json::Value>,
70}
71
72fn revision_error_should_retry(err: &Error) -> bool {
76 if let Error::Protocol(prtctl_err) = err {
77 return matches!(prtctl_err, ProtocolError::ResetWithoutClosingHandshake);
78 }
79
80 if let Error::Io(io_err) = err {
81 return io_err.kind() == ErrorKind::UnexpectedEof;
82 }
83
84 matches!(err, Error::ConnectionClosed)
85}
86
87impl RevisionContext {
88 pub async fn connect() -> Result<Self, StreckenInfoError> {
89 let (ws, _) = connect_async(WEBSOCKET_PATH).await?;
90 Ok(Self {
91 stream: ws,
92 old_revision: None,
93 })
94 }
95
96 async fn reconnect(&mut self) -> Result<(), StreckenInfoError> {
98 let _ = self.stream.close(None).await;
100 *self = Self::connect().await?;
101 Ok(())
102 }
103
104 pub async fn get_first_revision(&mut self) -> Result<u32, StreckenInfoError> {
105 self.stream
106 .send(Message::text("{\"type\":\"HANDSHAKE\",\"revision\":null}"))
107 .await?;
108 let msg = self
109 .stream
110 .next()
111 .await
112 .ok_or(StreckenInfoError::WebSocketNoRevisionError)??;
113 let json: FirstRevisionJson = serde_json::from_slice(msg.into_data().iter().as_slice())?;
114 self.old_revision = Some(json.revision);
115 Ok(json.revision)
116 }
117
118 pub async fn wait_for_new_revision_filtered_timeout(
120 &mut self,
121 only_new_disruptions: bool,
122 timeout: Option<Duration>,
123 ) -> Result<u32, StreckenInfoError> {
124 if self.old_revision.is_none() {
125 return self.get_first_revision().await;
126 }
127
128 let since = SystemTime::now();
129
130 let mut retry = true;
132
133 while let Some(msg) = self.stream.next().await {
134 if let Err(err) = msg {
135 if revision_error_should_retry(&err) && retry {
136 let old_revision = self.old_revision;
137 self.reconnect().await?;
138 let revision = self.get_first_revision().await?;
139 if old_revision != Some(revision) {
140 return Ok(revision);
141 }
142
143 retry = false;
144 continue;
145 } else {
146 return Err(StreckenInfoError::WebsocketError(err));
147 }
148 }
149
150 let text = msg?.into_text()?;
151 let text = text.as_str();
152 retry = true;
153 if !text.starts_with('{') {
155 continue;
156 }
157
158 let json: UpdateJson = serde_json::from_str(text)?;
159 if let UpdateJson::NewRevision { revision } = json {
160 self.old_revision = Some(revision.number);
161 if only_new_disruptions && revision.disruptions.is_empty() {
162 if let Some(timeout) = timeout {
163 if since.elapsed().unwrap() >= timeout {
165 return Ok(revision.number);
166 }
167 }
168 continue;
169 }
170 return Ok(revision.number);
171 }
172 }
173 Err(StreckenInfoError::WebSocketNoRevisionError)
174 }
175
176 pub async fn wait_for_new_revision_filtered(
177 &mut self,
178 only_new_disruptions: bool,
179 ) -> Result<u32, StreckenInfoError> {
180 self.wait_for_new_revision_filtered_timeout(only_new_disruptions, None)
181 .await
182 }
183
184 pub async fn wait_for_new_revision(&mut self) -> Result<u32, StreckenInfoError> {
185 self.wait_for_new_revision_filtered(false).await
186 }
187}
188
189pub async fn get_revision() -> Result<u32, StreckenInfoError> {
190 let mut ctx = RevisionContext::connect().await?;
191 ctx.get_first_revision().await
192}