strecken_info/request/
revision.rs

1//! Revisions are like versions of disruptions. To get the disruptions you need a revision.
2//! ```no_run
3//! use strecken_info::revision::get_revision;
4//!
5//! #[tokio::main]
6//! async fn main() {
7//!     let revision: u32 = get_revision().await.unwrap();
8//! }
9//! ```
10//! If you want to wait for a new revision try this:
11//! ```no_run
12//! use strecken_info::revision::RevisionContext;
13//!
14//! #[tokio::main]
15//! async fn main() {
16//!     let mut ctx = RevisionContext::connect().await.unwrap();
17//!     let first_revision: u32 = ctx.get_first_revision().await.unwrap();
18//!     println!("First revision: {first_revision}");
19//!     loop {
20//!         let revision = ctx.wait_for_new_revision().await.unwrap();
21//!         println!("Got new revision: {revision}");
22//!     }
23//! }
24//! ```
25
26use std::{
27    io::ErrorKind,
28    time::{Duration, SystemTime},
29};
30
31use futures_util::{SinkExt, StreamExt};
32use serde::Deserialize;
33use tokio::net::TcpStream;
34use tokio_tungstenite::{
35    connect_async,
36    tungstenite::{error::ProtocolError, Error, Message},
37    MaybeTlsStream, WebSocketStream,
38};
39
40use crate::error::StreckenInfoError;
41
42const WEBSOCKET_PATH: &str = "wss://strecken-info.de/api/websocket";
43
44pub struct RevisionContext {
45    stream: WebSocketStream<MaybeTlsStream<TcpStream>>,
46    old_revision: Option<u32>,
47}
48
49#[derive(Deserialize)]
50struct FirstRevisionJson {
51    revision: u32,
52}
53
54#[derive(Deserialize)]
55#[serde(tag = "type")]
56enum UpdateJson {
57    #[serde(alias = "NEW_REVISION")]
58    NewRevision {
59        revision: UpdateRevisionJson,
60    },
61    Other(()),
62}
63
64#[derive(Deserialize)]
65struct UpdateRevisionJson {
66    #[serde(alias = "nummer")]
67    number: u32,
68    #[serde(alias = "stoerungen")]
69    disruptions: Vec<serde_json::Value>,
70}
71
72/// returns `true` if `err` is
73/// - Error::ProtocolError::ResetWithoutClosingHandshake
74/// - Error::ConnectionClosed
75fn revision_error_should_retry(err: &Error) -> bool {
76    if let Error::Protocol(prtctl_err) = err {
77        return matches!(prtctl_err, ProtocolError::ResetWithoutClosingHandshake);
78    }
79
80    if let Error::Io(io_err) = err {
81        return io_err.kind() == ErrorKind::UnexpectedEof;
82    }
83
84    matches!(err, Error::ConnectionClosed)
85}
86
87impl RevisionContext {
88    pub async fn connect() -> Result<Self, StreckenInfoError> {
89        let (ws, _) = connect_async(WEBSOCKET_PATH).await?;
90        Ok(Self {
91            stream: ws,
92            old_revision: None,
93        })
94    }
95
96    /// close the open stream and reopen it (doing the handshake with `get_first_revision` is mandatory)
97    async fn reconnect(&mut self) -> Result<(), StreckenInfoError> {
98        // ignore close result (we want to force the close)
99        let _ = self.stream.close(None).await;
100        *self = Self::connect().await?;
101        Ok(())
102    }
103
104    pub async fn get_first_revision(&mut self) -> Result<u32, StreckenInfoError> {
105        self.stream
106            .send(Message::text("{\"type\":\"HANDSHAKE\",\"revision\":null}"))
107            .await?;
108        let msg = self
109            .stream
110            .next()
111            .await
112            .ok_or(StreckenInfoError::WebSocketNoRevisionError)??;
113        let json: FirstRevisionJson = serde_json::from_slice(msg.into_data().iter().as_slice())?;
114        self.old_revision = Some(json.revision);
115        Ok(json.revision)
116    }
117
118    /// return the new revision after timeout even if only_new_disruptions is true but no new disruption sent
119    pub async fn wait_for_new_revision_filtered_timeout(
120        &mut self,
121        only_new_disruptions: bool,
122        timeout: Option<Duration>,
123    ) -> Result<u32, StreckenInfoError> {
124        if self.old_revision.is_none() {
125            return self.get_first_revision().await;
126        }
127
128        let since = SystemTime::now();
129
130        // just one retry is allowed
131        let mut retry = true;
132
133        while let Some(msg) = self.stream.next().await {
134            if let Err(err) = msg {
135                if revision_error_should_retry(&err) && retry {
136                    let old_revision = self.old_revision;
137                    self.reconnect().await?;
138                    let revision = self.get_first_revision().await?;
139                    if old_revision != Some(revision) {
140                        return Ok(revision);
141                    }
142
143                    retry = false;
144                    continue;
145                } else {
146                    return Err(StreckenInfoError::WebsocketError(err));
147                }
148            }
149
150            let text = msg?.into_text()?;
151            let text = text.as_str();
152            retry = true;
153            // no json (e.g. a 'PING')
154            if !text.starts_with('{') {
155                continue;
156            }
157
158            let json: UpdateJson = serde_json::from_str(text)?;
159            if let UpdateJson::NewRevision { revision } = json {
160                self.old_revision = Some(revision.number);
161                if only_new_disruptions && revision.disruptions.is_empty() {
162                    if let Some(timeout) = timeout {
163                        // unwrap because since couldn't be later than now
164                        if since.elapsed().unwrap() >= timeout {
165                            return Ok(revision.number);
166                        }
167                    }
168                    continue;
169                }
170                return Ok(revision.number);
171            }
172        }
173        Err(StreckenInfoError::WebSocketNoRevisionError)
174    }
175
176    pub async fn wait_for_new_revision_filtered(
177        &mut self,
178        only_new_disruptions: bool,
179    ) -> Result<u32, StreckenInfoError> {
180        self.wait_for_new_revision_filtered_timeout(only_new_disruptions, None)
181            .await
182    }
183
184    pub async fn wait_for_new_revision(&mut self) -> Result<u32, StreckenInfoError> {
185        self.wait_for_new_revision_filtered(false).await
186    }
187}
188
189pub async fn get_revision() -> Result<u32, StreckenInfoError> {
190    let mut ctx = RevisionContext::connect().await?;
191    ctx.get_first_revision().await
192}