tpex_api/
lib.rs

1#![cfg(feature="client")]
2
3mod tests;
4mod shared;
5
6#[cfg(feature="server")]
7pub mod server;
8
9use futures::{StreamExt, TryStreamExt};
10use reqwest::StatusCode;
11use reqwest_websocket::{Message, RequestBuilderExt};
12pub use shared::*;
13use tpex::{AssetId, State, StateSync};
14
15pub use shared::Token;
16
17#[derive(Debug)]
18pub enum Error {
19    RequestFailure(reqwest::Error),
20    WebSocketFailure(reqwest_websocket::Error),
21    TPExFailure(ErrorInfo),
22    Unknown(Option<StatusCode>)
23}
24impl std::fmt::Display for Error {
25    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
26        match self {
27            Error::RequestFailure(err) => write!(f, "Request failure: {err}"),
28            Error::WebSocketFailure(err) => write!(f, "WebSocket failure: {err}"),
29            Error::TPExFailure(err) => write!(f, "TPEx failure: {}", err.error),
30            Error::Unknown(Some(code)) => write!(f, "Unknown failure with status code {code}"),
31            Error::Unknown(None) => write!(f, "Unknown failutre"),
32        }
33    }
34}
35impl std::error::Error for Error {}
36impl From<reqwest::Error> for Error {
37    fn from(value: reqwest::Error) -> Self { Error::RequestFailure(value) }
38}
39impl From<reqwest_websocket::Error> for Error {
40    fn from(value: reqwest_websocket::Error) -> Self { Error::WebSocketFailure(value) }
41}
42impl From<ErrorInfo> for Error {
43    fn from(value: ErrorInfo) -> Self { Error::TPExFailure(value) }
44}
45impl From<tpex::Error> for Error {
46    fn from(value: tpex::Error) -> Self { Error::TPExFailure(ErrorInfo { error: value.to_string() }) }
47}
48
49
50pub type Result<T> = core::result::Result<T, Error>;
51
52pub struct Remote {
53    client: reqwest::Client,
54    endpoint: reqwest::Url
55}
56impl Remote {
57    pub fn new(endpoint: reqwest::Url, token: Token) -> Remote {
58        let mut headers = reqwest::header::HeaderMap::new();
59        headers.append(
60            "Authorization",
61            reqwest::header::HeaderValue::from_str(&format!("Bearer {token}")).expect("Unable to make token header"));
62        Remote {
63            client: reqwest::Client::builder().default_headers(headers).build().expect("Unable to build reqwest client"),
64            endpoint
65        }
66    }
67    async fn check_response(response: reqwest::Response) -> Result<reqwest::Response> {
68        let status = response.status();
69        if status.is_success() { Ok(response) }
70        else if let Ok(err) = response.json().await {
71            Err(Error::TPExFailure(err))
72        }
73        else {
74            Err(Error::Unknown(Some(status)))
75        }
76    }
77
78    pub async fn get_state(&self, from: u64) -> Result<Vec<u8>> {
79        let mut target = self.endpoint.clone();
80        target.query_pairs_mut().append_pair("from", &from.to_string());
81        target.path_segments_mut().expect("Unable to nav to /state").push("state");
82
83        Ok(Self::check_response(self.client.get(target).send().await?).await?.bytes().await?.to_vec())
84    }
85    pub async fn stream_state(&self, from: u64) -> Result<impl futures::Stream<Item=Result<tpex::WrappedAction>> + use<>> {
86        let mut target = self.endpoint.clone();
87        target.query_pairs_mut().append_pair("from", &from.to_string());
88        target.path_segments_mut().expect("Unable to nav to /state").push("state");
89
90        let ws = self.client.get(target)
91            .upgrade()
92            .send().await?
93            .into_websocket().await?;
94
95        Ok(ws.filter_map(|msg| async {
96            let ret: Option<Result<tpex::WrappedAction>> = match msg {
97                Ok(Message::Text(text)) => Some(serde_json::from_str(&text).map_err(|_| Error::Unknown(None))),
98                Ok(Message::Binary(binary)) => Some(serde_json::from_slice(&binary).map_err(|_| Error::Unknown(None))),
99                Err(e) => Some(Err(e.into())),
100                _ => None
101            };
102            ret
103        }))
104    }
105    pub async fn apply(&self, action: &tpex::Action) -> Result<u64> {
106        let mut target = self.endpoint.clone();
107        target.path_segments_mut().expect("Unable to nav to /state").push("state");
108
109        Ok(Self::check_response(self.client.patch(target).json(action).send().await?).await?.json().await?)
110    }
111    pub async fn get_token(&self, token: &Token) -> Result<TokenInfo> {
112        let mut target = self.endpoint.clone();
113        target.path_segments_mut().expect("Unable to nav to /token").push("token");
114
115        Ok(Self::check_response(self.client.post(target).json(token).send().await?).await?.json().await?)
116    }
117    pub async fn create_token(&self, args: &TokenPostArgs) -> Result<Token> {
118        let mut target = self.endpoint.clone();
119        target.path_segments_mut().expect("Unable to nav to /token").push("token");
120
121        Ok(Self::check_response(self.client.post(target).json(args).send().await?).await?.json().await?)
122    }
123    pub async fn delete_token(&self, args: &TokenDeleteArgs) -> Result<()> {
124        let mut target = self.endpoint.clone();
125        target.path_segments_mut().expect("Unable to nav to /token").push("token");
126
127        Ok(Self::check_response(self.client.delete(target).json(args).send().await?).await?.json().await?)
128    }
129    pub async fn fastsync(&self) -> Result<StateSync> {
130        let mut target = self.endpoint.clone();
131        target.path_segments_mut().expect("Unable to nav to /fastsync").push("fastsync");
132
133        Ok(Self::check_response(self.client.get(target).send().await?).await?.json().await?)
134    }
135    pub async fn stream_fastsync(&self) -> Result<impl futures::Stream<Item=Result<tpex::StateSync>>> {
136        let mut target = self.endpoint.clone();
137        target.path_segments_mut().expect("Unable to nav to /fastsync").push("fastsync");
138
139        let ws = self.client.get(target)
140            .upgrade()
141            .send().await?
142            .into_websocket().await?;
143
144        Ok(ws.filter_map(|msg| async {
145            let ret: Option<Result<tpex::StateSync>> = match msg {
146                Ok(Message::Text(text)) => Some(serde_json::from_str(&text).map_err(|_| Error::Unknown(None))),
147                Ok(Message::Binary(binary)) => Some(serde_json::from_slice(&binary).map_err(|_| Error::Unknown(None))),
148                Err(e) => Some(Err(e.into())),
149                _ => None
150            };
151            ret
152        }))
153    }
154    pub async fn get_balance(&self, player: &tpex::PlayerId) -> Result<tpex::Coins> {
155        let mut target = self.endpoint.clone();
156        target.path_segments_mut().expect("Unable to nav to /inspect/balance").push("inspect").push("balance");
157        target.query_pairs_mut().append_pair("player", player.get_raw_name());
158
159        Ok(Self::check_response(self.client.get(target).send().await?).await?.json().await?)
160    }
161    pub async fn get_assets(&self, player: &tpex::PlayerId) -> Result<std::collections::HashMap<AssetId, u64>> {
162        let mut target = self.endpoint.clone();
163        target.path_segments_mut().expect("Unable to nav to /inspect/assets").push("inspect").push("assets");
164        target.query_pairs_mut().append_pair("player", player.get_raw_name());
165
166        Ok(Self::check_response(self.client.get(target).send().await?).await?.json().await?)
167    }
168    pub async fn itemised_audit(&self) -> Result<tpex::ItemisedAudit> {
169        let mut target = self.endpoint.clone();
170        target.path_segments_mut().expect("Unable to nav to /inspect/audit").push("inspect").push("audit");
171
172        Ok(Self::check_response(self.client.get(target).send().await?).await?.json().await?)
173    }
174}
175
176pub struct Mirrored {
177    pub remote: Remote,
178    state: tokio::sync::RwLock<State>
179}
180impl Mirrored {
181    pub fn new(endpoint: reqwest::Url, token: Token) -> Mirrored {
182        Mirrored {
183            remote: Remote::new(endpoint, token),
184            state: tokio::sync::RwLock::new(State::new())
185        }
186    }
187    pub async fn fastsync(&'_ self) -> Result<tokio::sync::RwLockReadGuard<'_, State>> {
188        let new_state: State = self.remote.fastsync().await?.try_into()?;
189        let mut state = self.state.write().await;
190        *state = new_state;
191        Ok(state.downgrade())
192    }
193    pub async fn sync(&'_ self) -> Result<tokio::sync::RwLockReadGuard<'_, State>> {
194        let mut state = self.state.write().await;
195        let cursor = std::io::Cursor::new(self.remote.get_state(state.get_next_id() - 1).await?);
196        let mut buf = tokio::io::BufReader::new(cursor);
197        state.replay(&mut buf, true).await.expect("State unable to replay");
198        Ok(state.downgrade())
199    }
200    pub async fn apply(&self, action: tpex::Action) -> Result<u64> {
201        // The remote could be desynced, so we send our update
202        let id = self.remote.apply(&action).await?;
203        drop(self.sync().await);
204        Ok(id)
205    }
206    pub async fn stream(self: std::sync::Arc<Self>) -> Result<impl futures::Stream<Item=Result<(std::sync::Arc<Self>, tpex::WrappedAction)>>> {
207        let next_id = self.state.read().await.get_next_id();
208        let this: std::sync::Arc<Self> = self.clone();
209        let stream = self.remote.stream_state(next_id).await?;
210        Ok(stream.and_then(move |wrapped_action| { let this = this.clone(); async move  {
211            this.state.write().await.apply(wrapped_action.action.clone(), tokio::io::sink()).await?;
212            Ok((this, wrapped_action))
213        }}))
214    }
215}