tpex_api/
lib.rs

1#![cfg(feature="client")]
2
3mod tests;
4mod shared;
5
6#[cfg(feature="server")]
7pub mod server;
8
9use std::sync::Arc;
10
11use futures::{SinkExt, StreamExt, TryStreamExt};
12use reqwest::StatusCode;
13use reqwest_websocket::{Message, RequestBuilderExt};
14pub use shared::*;
15use tpex::{AssetId, State, StateSync};
16
17pub use shared::Token;
18
19#[derive(Debug)]
20pub enum Error {
21    RequestFailure(reqwest::Error),
22    WebSocketFailure(reqwest_websocket::Error),
23    TPExFailure(ErrorInfo),
24    Unknown(Option<StatusCode>),
25}
26impl std::fmt::Display for Error {
27    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
28        match self {
29            Error::RequestFailure(err) => write!(f, "Request failure: {err}"),
30            Error::WebSocketFailure(err) => write!(f, "WebSocket failure: {err}"),
31            Error::TPExFailure(err) => write!(f, "TPEx failure: {}", err.error),
32            Error::Unknown(Some(code)) => write!(f, "Unknown failure with status code {code}"),
33            Error::Unknown(None) => write!(f, "Unknown failutre"),
34        }
35    }
36}
37impl std::error::Error for Error {}
38impl From<reqwest::Error> for Error {
39    fn from(value: reqwest::Error) -> Self { Error::RequestFailure(value) }
40}
41impl From<reqwest_websocket::Error> for Error {
42    fn from(value: reqwest_websocket::Error) -> Self { Error::WebSocketFailure(value) }
43}
44impl From<ErrorInfo> for Error {
45    fn from(value: ErrorInfo) -> Self { Error::TPExFailure(value) }
46}
47impl From<tpex::Error> for Error {
48    fn from(value: tpex::Error) -> Self { Error::TPExFailure(ErrorInfo { error: value.to_string() }) }
49}
50
51
52pub type Result<T> = core::result::Result<T, Error>;
53
54pub struct Remote {
55    client: reqwest::Client,
56    endpoint: reqwest::Url
57}
58impl Remote {
59    pub fn new(endpoint: reqwest::Url, token: Token) -> Remote {
60        let mut headers = reqwest::header::HeaderMap::new();
61        headers.append(
62            "Authorization",
63            reqwest::header::HeaderValue::from_str(&format!("Bearer {token}")).expect("Unable to make token header"));
64        Remote {
65            client: reqwest::Client::builder().default_headers(headers).build().expect("Unable to build reqwest client"),
66            endpoint
67        }
68    }
69    async fn check_response(response: reqwest::Response) -> Result<reqwest::Response> {
70        let status = response.status();
71        if status.is_success() { Ok(response) }
72        else if let Ok(err) = response.json().await {
73            Err(Error::TPExFailure(err))
74        }
75        else {
76            Err(Error::Unknown(Some(status)))
77        }
78    }
79
80    pub async fn get_state(&self, from: u64) -> Result<Vec<u8>> {
81        let mut target = self.endpoint.clone();
82        target.query_pairs_mut().append_pair("from", &from.to_string());
83        target.path_segments_mut().expect("Unable to nav to /state").push("state");
84
85        Ok(Self::check_response(self.client.get(target).send().await?).await?.bytes().await?.to_vec())
86    }
87    pub async fn stream_state(&self, from: u64) -> Result<impl futures::Stream<Item=Result<tpex::WrappedAction>> + use<>> {
88        let mut target = self.endpoint.clone();
89        target.query_pairs_mut().append_pair("from", &from.to_string());
90        target.path_segments_mut().expect("Unable to nav to /state").push("state");
91
92        let (sink, stream) = self.client.get(target)
93            .upgrade()
94            .send().await?
95            .into_websocket().await?
96            .split();
97        let sink = Arc::new(tokio::sync::Mutex::new(sink));
98
99        Ok(stream.filter_map(move |msg| { let sink = sink.clone(); async move {
100            match msg {
101                Ok(Message::Text(text)) => Some(serde_json::from_str(&text).map_err(|_| Error::Unknown(None))),
102                Ok(Message::Binary(binary)) => Some(serde_json::from_slice(&binary).map_err(|_| Error::Unknown(None))),
103                Ok(Message::Ping(payload)) => {
104                    let _ = sink.lock().await.send(Message::Pong(payload)).await;
105                    None
106                }
107                Err(e) => Some(Err(e.into())),
108                _ => None
109            }
110        }}))
111    }
112    pub async fn apply(&self, action: &tpex::Action) -> Result<u64> {
113        let mut target = self.endpoint.clone();
114        target.path_segments_mut().expect("Unable to nav to /state").push("state");
115
116        Ok(Self::check_response(self.client.patch(target).json(action).send().await?).await?.json().await?)
117    }
118    pub async fn get_token(&self) -> Result<TokenInfo> {
119        let mut target = self.endpoint.clone();
120        target.path_segments_mut().expect("Unable to nav to /token").push("token");
121
122        Ok(Self::check_response(self.client.post(target).send().await?).await?.json().await?)
123    }
124    pub async fn create_token(&self, args: &TokenPostArgs) -> Result<Token> {
125        let mut target = self.endpoint.clone();
126        target.path_segments_mut().expect("Unable to nav to /token").push("token");
127
128        Ok(Self::check_response(self.client.post(target).json(args).send().await?).await?.json().await?)
129    }
130    pub async fn delete_token(&self, args: &TokenDeleteArgs) -> Result<()> {
131        let mut target = self.endpoint.clone();
132        target.path_segments_mut().expect("Unable to nav to /token").push("token");
133
134        Ok(Self::check_response(self.client.delete(target).json(args).send().await?).await?.json().await?)
135    }
136    pub async fn fastsync(&self) -> Result<StateSync> {
137        let mut target = self.endpoint.clone();
138        target.path_segments_mut().expect("Unable to nav to /fastsync").push("fastsync");
139
140        Ok(Self::check_response(self.client.get(target).send().await?).await?.json().await?)
141    }
142    pub async fn stream_fastsync(&self) -> Result<impl futures::Stream<Item=Result<tpex::StateSync>>> {
143        let mut target = self.endpoint.clone();
144        target.path_segments_mut().expect("Unable to nav to /fastsync").push("fastsync");
145
146        let (sink, stream) = self.client.get(target)
147            .upgrade()
148            .send().await?
149            .into_websocket().await?
150            .split();
151        let sink = Arc::new(tokio::sync::Mutex::new(sink));
152
153        Ok(stream.filter_map(move |msg| { let sink = sink.clone(); async move {
154            match msg {
155                Ok(Message::Text(text)) => Some(serde_json::from_str(&text).map_err(|_| Error::Unknown(None))),
156                Ok(Message::Binary(binary)) => Some(serde_json::from_slice(&binary).map_err(|_| Error::Unknown(None))),
157                Ok(Message::Ping(payload)) => {
158                    let _ = sink.lock().await.send(Message::Pong(payload)).await;
159                    None
160                }
161                Err(e) => Some(Err(e.into())),
162                _ => None
163            }
164        }}))
165    }
166    pub async fn get_balance(&self, player: &tpex::PlayerId) -> Result<tpex::Coins> {
167        let mut target = self.endpoint.clone();
168        target.path_segments_mut().expect("Unable to nav to /inspect/balance").push("inspect").push("balance");
169        target.query_pairs_mut().append_pair("player", player.get_raw_name());
170
171        Ok(Self::check_response(self.client.get(target).send().await?).await?.json().await?)
172    }
173    pub async fn get_assets(&self, player: &tpex::PlayerId) -> Result<std::collections::HashMap<AssetId, u64>> {
174        let mut target = self.endpoint.clone();
175        target.path_segments_mut().expect("Unable to nav to /inspect/assets").push("inspect").push("assets");
176        target.query_pairs_mut().append_pair("player", player.get_raw_name());
177
178        Ok(Self::check_response(self.client.get(target).send().await?).await?.json().await?)
179    }
180    pub async fn itemised_audit(&self) -> Result<tpex::ItemisedAudit> {
181        let mut target = self.endpoint.clone();
182        target.path_segments_mut().expect("Unable to nav to /inspect/audit").push("inspect").push("audit");
183
184        Ok(Self::check_response(self.client.get(target).send().await?).await?.json().await?)
185    }
186    pub async fn price_history(&self, asset: &AssetId) -> Result<Vec<PriceSummary>> {
187        let mut target = self.endpoint.clone();
188        target.path_segments_mut().expect("Unable to nav to /price/history").push("price").push("history");
189        target.query_pairs_mut().append_pair("asset", asset);
190
191        Ok(Self::check_response(self.client.get(target).send().await?).await?.json().await?)
192    }
193}
194
195pub struct Mirrored {
196    pub remote: Remote,
197    state: tokio::sync::RwLock<State>
198}
199impl Mirrored {
200    pub fn new(endpoint: reqwest::Url, token: Token) -> Mirrored {
201        Mirrored {
202            remote: Remote::new(endpoint, token),
203            state: tokio::sync::RwLock::new(State::new())
204        }
205    }
206    pub async fn fastsync(&'_ self) -> Result<tokio::sync::RwLockReadGuard<'_, State>> {
207        let new_state: State = self.remote.fastsync().await?.try_into()?;
208        let mut state = self.state.write().await;
209        *state = new_state;
210        Ok(state.downgrade())
211    }
212    pub async fn sync(&'_ self) -> Result<tokio::sync::RwLockReadGuard<'_, State>> {
213        let mut state = self.state.write().await;
214        let cursor = std::io::Cursor::new(self.remote.get_state(state.get_next_id() - 1).await?);
215        let mut buf = tokio::io::BufReader::new(cursor);
216        state.replay(&mut buf, true).await.expect("State unable to replay");
217        Ok(state.downgrade())
218    }
219    pub async fn unsynced(&'_ self) -> tokio::sync::RwLockReadGuard<'_, State> {
220        self.state.read().await
221    }
222    pub async fn apply(&self, action: tpex::Action) -> Result<u64> {
223        // The remote could be desynced, so we send our update
224        let id = self.remote.apply(&action).await?;
225        drop(self.sync().await);
226        Ok(id)
227    }
228    pub async fn stream(self: std::sync::Arc<Self>) -> Result<impl futures::Stream<Item=Result<(std::sync::Arc<Self>, tpex::WrappedAction)>>> {
229        let next_id = self.state.read().await.get_next_id();
230        let this: std::sync::Arc<Self> = self.clone();
231        let stream = self.remote.stream_state(next_id).await?;
232        Ok(stream.and_then(move |wrapped_action| { let this = this.clone(); async move  {
233            let mut state = this.state.write().await;
234            if state.get_next_id() != wrapped_action.id {
235                return Err(tpex::Error::InvalidId { id: wrapped_action.id }.into());
236            }
237            state.apply_wrapped(wrapped_action.clone(), tokio::io::sink()).await?;
238            drop(state);
239            Ok((this, wrapped_action))
240        }}))
241    }
242}