1#![cfg(feature="client")]
2
3mod tests;
4mod shared;
5
6#[cfg(feature="server")]
7pub mod server;
8
9use std::sync::Arc;
10
11use futures::{SinkExt, StreamExt, TryStreamExt};
12use reqwest::StatusCode;
13use reqwest_websocket::{Message, RequestBuilderExt};
14pub use shared::*;
15use tpex::{AssetId, State, StateSync};
16
17pub use shared::Token;
18
19#[derive(Debug)]
20pub enum Error {
21 RequestFailure(reqwest::Error),
22 WebSocketFailure(reqwest_websocket::Error),
23 TPExFailure(ErrorInfo),
24 Unknown(Option<StatusCode>),
25}
26impl std::fmt::Display for Error {
27 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
28 match self {
29 Error::RequestFailure(err) => write!(f, "Request failure: {err}"),
30 Error::WebSocketFailure(err) => write!(f, "WebSocket failure: {err}"),
31 Error::TPExFailure(err) => write!(f, "TPEx failure: {}", err.error),
32 Error::Unknown(Some(code)) => write!(f, "Unknown failure with status code {code}"),
33 Error::Unknown(None) => write!(f, "Unknown failutre"),
34 }
35 }
36}
37impl std::error::Error for Error {}
38impl From<reqwest::Error> for Error {
39 fn from(value: reqwest::Error) -> Self { Error::RequestFailure(value) }
40}
41impl From<reqwest_websocket::Error> for Error {
42 fn from(value: reqwest_websocket::Error) -> Self { Error::WebSocketFailure(value) }
43}
44impl From<ErrorInfo> for Error {
45 fn from(value: ErrorInfo) -> Self { Error::TPExFailure(value) }
46}
47impl From<tpex::Error> for Error {
48 fn from(value: tpex::Error) -> Self { Error::TPExFailure(ErrorInfo { error: value.to_string() }) }
49}
50
51
52pub type Result<T> = core::result::Result<T, Error>;
53
54pub struct Remote {
55 client: reqwest::Client,
56 endpoint: reqwest::Url
57}
58impl Remote {
59 pub fn new(endpoint: reqwest::Url, token: Token) -> Remote {
60 let mut headers = reqwest::header::HeaderMap::new();
61 headers.append(
62 "Authorization",
63 reqwest::header::HeaderValue::from_str(&format!("Bearer {token}")).expect("Unable to make token header"));
64 Remote {
65 client: reqwest::Client::builder().default_headers(headers).build().expect("Unable to build reqwest client"),
66 endpoint
67 }
68 }
69 async fn check_response(response: reqwest::Response) -> Result<reqwest::Response> {
70 let status = response.status();
71 if status.is_success() { Ok(response) }
72 else if let Ok(err) = response.json().await {
73 Err(Error::TPExFailure(err))
74 }
75 else {
76 Err(Error::Unknown(Some(status)))
77 }
78 }
79
80 pub async fn get_state(&self, from: u64) -> Result<Vec<u8>> {
81 let mut target = self.endpoint.clone();
82 target.query_pairs_mut().append_pair("from", &from.to_string());
83 target.path_segments_mut().expect("Unable to nav to /state").push("state");
84
85 Ok(Self::check_response(self.client.get(target).send().await?).await?.bytes().await?.to_vec())
86 }
87 pub async fn stream_state(&self, from: u64) -> Result<impl futures::Stream<Item=Result<tpex::WrappedAction>> + use<>> {
88 let mut target = self.endpoint.clone();
89 target.query_pairs_mut().append_pair("from", &from.to_string());
90 target.path_segments_mut().expect("Unable to nav to /state").push("state");
91
92 let (sink, stream) = self.client.get(target)
93 .upgrade()
94 .send().await?
95 .into_websocket().await?
96 .split();
97 let sink = Arc::new(tokio::sync::Mutex::new(sink));
98
99 Ok(stream.filter_map(move |msg| { let sink = sink.clone(); async move {
100 match msg {
101 Ok(Message::Text(text)) => Some(serde_json::from_str(&text).map_err(|_| Error::Unknown(None))),
102 Ok(Message::Binary(binary)) => Some(serde_json::from_slice(&binary).map_err(|_| Error::Unknown(None))),
103 Ok(Message::Ping(payload)) => {
104 let _ = sink.lock().await.send(Message::Pong(payload)).await;
105 None
106 }
107 Err(e) => Some(Err(e.into())),
108 _ => None
109 }
110 }}))
111 }
112 pub async fn apply(&self, action: &tpex::Action) -> Result<u64> {
113 let mut target = self.endpoint.clone();
114 target.path_segments_mut().expect("Unable to nav to /state").push("state");
115
116 Ok(Self::check_response(self.client.patch(target).json(action).send().await?).await?.json().await?)
117 }
118 pub async fn get_token(&self) -> Result<TokenInfo> {
119 let mut target = self.endpoint.clone();
120 target.path_segments_mut().expect("Unable to nav to /token").push("token");
121
122 Ok(Self::check_response(self.client.post(target).send().await?).await?.json().await?)
123 }
124 pub async fn create_token(&self, args: &TokenPostArgs) -> Result<Token> {
125 let mut target = self.endpoint.clone();
126 target.path_segments_mut().expect("Unable to nav to /token").push("token");
127
128 Ok(Self::check_response(self.client.post(target).json(args).send().await?).await?.json().await?)
129 }
130 pub async fn delete_token(&self, args: &TokenDeleteArgs) -> Result<()> {
131 let mut target = self.endpoint.clone();
132 target.path_segments_mut().expect("Unable to nav to /token").push("token");
133
134 Ok(Self::check_response(self.client.delete(target).json(args).send().await?).await?.json().await?)
135 }
136 pub async fn fastsync(&self) -> Result<StateSync> {
137 let mut target = self.endpoint.clone();
138 target.path_segments_mut().expect("Unable to nav to /fastsync").push("fastsync");
139
140 Ok(Self::check_response(self.client.get(target).send().await?).await?.json().await?)
141 }
142 pub async fn stream_fastsync(&self) -> Result<impl futures::Stream<Item=Result<tpex::StateSync>>> {
143 let mut target = self.endpoint.clone();
144 target.path_segments_mut().expect("Unable to nav to /fastsync").push("fastsync");
145
146 let (sink, stream) = self.client.get(target)
147 .upgrade()
148 .send().await?
149 .into_websocket().await?
150 .split();
151 let sink = Arc::new(tokio::sync::Mutex::new(sink));
152
153 Ok(stream.filter_map(move |msg| { let sink = sink.clone(); async move {
154 match msg {
155 Ok(Message::Text(text)) => Some(serde_json::from_str(&text).map_err(|_| Error::Unknown(None))),
156 Ok(Message::Binary(binary)) => Some(serde_json::from_slice(&binary).map_err(|_| Error::Unknown(None))),
157 Ok(Message::Ping(payload)) => {
158 let _ = sink.lock().await.send(Message::Pong(payload)).await;
159 None
160 }
161 Err(e) => Some(Err(e.into())),
162 _ => None
163 }
164 }}))
165 }
166 pub async fn get_balance(&self, player: &tpex::PlayerId) -> Result<tpex::Coins> {
167 let mut target = self.endpoint.clone();
168 target.path_segments_mut().expect("Unable to nav to /inspect/balance").push("inspect").push("balance");
169 target.query_pairs_mut().append_pair("player", player.get_raw_name());
170
171 Ok(Self::check_response(self.client.get(target).send().await?).await?.json().await?)
172 }
173 pub async fn get_assets(&self, player: &tpex::PlayerId) -> Result<std::collections::HashMap<AssetId, u64>> {
174 let mut target = self.endpoint.clone();
175 target.path_segments_mut().expect("Unable to nav to /inspect/assets").push("inspect").push("assets");
176 target.query_pairs_mut().append_pair("player", player.get_raw_name());
177
178 Ok(Self::check_response(self.client.get(target).send().await?).await?.json().await?)
179 }
180 pub async fn itemised_audit(&self) -> Result<tpex::ItemisedAudit> {
181 let mut target = self.endpoint.clone();
182 target.path_segments_mut().expect("Unable to nav to /inspect/audit").push("inspect").push("audit");
183
184 Ok(Self::check_response(self.client.get(target).send().await?).await?.json().await?)
185 }
186 pub async fn price_history(&self, asset: &AssetId) -> Result<Vec<PriceSummary>> {
187 let mut target = self.endpoint.clone();
188 target.path_segments_mut().expect("Unable to nav to /price/history").push("price").push("history");
189 target.query_pairs_mut().append_pair("asset", asset);
190
191 Ok(Self::check_response(self.client.get(target).send().await?).await?.json().await?)
192 }
193}
194
195pub struct Mirrored {
196 pub remote: Remote,
197 state: tokio::sync::RwLock<State>
198}
199impl Mirrored {
200 pub fn new(endpoint: reqwest::Url, token: Token) -> Mirrored {
201 Mirrored {
202 remote: Remote::new(endpoint, token),
203 state: tokio::sync::RwLock::new(State::new())
204 }
205 }
206 pub async fn fastsync(&'_ self) -> Result<tokio::sync::RwLockReadGuard<'_, State>> {
207 let new_state: State = self.remote.fastsync().await?.try_into()?;
208 let mut state = self.state.write().await;
209 *state = new_state;
210 Ok(state.downgrade())
211 }
212 pub async fn sync(&'_ self) -> Result<tokio::sync::RwLockReadGuard<'_, State>> {
213 let mut state = self.state.write().await;
214 let cursor = std::io::Cursor::new(self.remote.get_state(state.get_next_id() - 1).await?);
215 let mut buf = tokio::io::BufReader::new(cursor);
216 state.replay(&mut buf, true).await.expect("State unable to replay");
217 Ok(state.downgrade())
218 }
219 pub async fn unsynced(&'_ self) -> tokio::sync::RwLockReadGuard<'_, State> {
220 self.state.read().await
221 }
222 pub async fn apply(&self, action: tpex::Action) -> Result<u64> {
223 let id = self.remote.apply(&action).await?;
225 drop(self.sync().await);
226 Ok(id)
227 }
228 pub async fn stream(self: std::sync::Arc<Self>) -> Result<impl futures::Stream<Item=Result<(std::sync::Arc<Self>, tpex::WrappedAction)>>> {
229 let next_id = self.state.read().await.get_next_id();
230 let this: std::sync::Arc<Self> = self.clone();
231 let stream = self.remote.stream_state(next_id).await?;
232 Ok(stream.and_then(move |wrapped_action| { let this = this.clone(); async move {
233 let mut state = this.state.write().await;
234 if state.get_next_id() != wrapped_action.id {
235 return Err(tpex::Error::InvalidId { id: wrapped_action.id }.into());
236 }
237 state.apply_wrapped(wrapped_action.clone(), tokio::io::sink()).await?;
238 drop(state);
239 Ok((this, wrapped_action))
240 }}))
241 }
242}