1#![cfg(feature="client")]
2
3mod tests;
4mod shared;
5
6#[cfg(feature="server")]
7pub mod server;
8
9use futures::{StreamExt, TryStreamExt};
10use reqwest::StatusCode;
11use reqwest_websocket::{Message, RequestBuilderExt};
12pub use shared::*;
13use tpex::{AssetId, State, StateSync};
14
15pub use shared::Token;
16
17#[derive(Debug)]
18pub enum Error {
19 RequestFailure(reqwest::Error),
20 WebSocketFailure(reqwest_websocket::Error),
21 TPExFailure(ErrorInfo),
22 Unknown(Option<StatusCode>)
23}
24impl std::fmt::Display for Error {
25 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
26 match self {
27 Error::RequestFailure(err) => write!(f, "Request failure: {err}"),
28 Error::WebSocketFailure(err) => write!(f, "WebSocket failure: {err}"),
29 Error::TPExFailure(err) => write!(f, "TPEx failure: {}", err.error),
30 Error::Unknown(Some(code)) => write!(f, "Unknown failure with status code {code}"),
31 Error::Unknown(None) => write!(f, "Unknown failutre"),
32 }
33 }
34}
35impl std::error::Error for Error {}
36impl From<reqwest::Error> for Error {
37 fn from(value: reqwest::Error) -> Self { Error::RequestFailure(value) }
38}
39impl From<reqwest_websocket::Error> for Error {
40 fn from(value: reqwest_websocket::Error) -> Self { Error::WebSocketFailure(value) }
41}
42impl From<ErrorInfo> for Error {
43 fn from(value: ErrorInfo) -> Self { Error::TPExFailure(value) }
44}
45impl From<tpex::Error> for Error {
46 fn from(value: tpex::Error) -> Self { Error::TPExFailure(ErrorInfo { error: value.to_string() }) }
47}
48
49
50pub type Result<T> = core::result::Result<T, Error>;
51
52pub struct Remote {
53 client: reqwest::Client,
54 endpoint: reqwest::Url
55}
56impl Remote {
57 pub fn new(endpoint: reqwest::Url, token: Token) -> Remote {
58 let mut headers = reqwest::header::HeaderMap::new();
59 headers.append(
60 "Authorization",
61 reqwest::header::HeaderValue::from_str(&format!("Bearer {token}")).expect("Unable to make token header"));
62 Remote {
63 client: reqwest::Client::builder().default_headers(headers).build().expect("Unable to build reqwest client"),
64 endpoint
65 }
66 }
67 async fn check_response(response: reqwest::Response) -> Result<reqwest::Response> {
68 let status = response.status();
69 if status.is_success() { Ok(response) }
70 else if let Ok(err) = response.json().await {
71 Err(Error::TPExFailure(err))
72 }
73 else {
74 Err(Error::Unknown(Some(status)))
75 }
76 }
77
78 pub async fn get_state(&self, from: u64) -> Result<Vec<u8>> {
79 let mut target = self.endpoint.clone();
80 target.query_pairs_mut().append_pair("from", &from.to_string());
81 target.path_segments_mut().expect("Unable to nav to /state").push("state");
82
83 Ok(Self::check_response(self.client.get(target).send().await?).await?.bytes().await?.to_vec())
84 }
85 pub async fn stream_state(&self, from: u64) -> Result<impl futures::Stream<Item=Result<tpex::WrappedAction>> + use<>> {
86 let mut target = self.endpoint.clone();
87 target.query_pairs_mut().append_pair("from", &from.to_string());
88 target.path_segments_mut().expect("Unable to nav to /state").push("state");
89
90 let ws = self.client.get(target)
91 .upgrade()
92 .send().await?
93 .into_websocket().await?;
94
95 Ok(ws.filter_map(|msg| async {
96 let ret: Option<Result<tpex::WrappedAction>> = match msg {
97 Ok(Message::Text(text)) => Some(serde_json::from_str(&text).map_err(|_| Error::Unknown(None))),
98 Ok(Message::Binary(binary)) => Some(serde_json::from_slice(&binary).map_err(|_| Error::Unknown(None))),
99 Err(e) => Some(Err(e.into())),
100 _ => None
101 };
102 ret
103 }))
104 }
105 pub async fn apply(&self, action: &tpex::Action) -> Result<u64> {
106 let mut target = self.endpoint.clone();
107 target.path_segments_mut().expect("Unable to nav to /state").push("state");
108
109 Ok(Self::check_response(self.client.patch(target).json(action).send().await?).await?.json().await?)
110 }
111 pub async fn get_token(&self, token: &Token) -> Result<TokenInfo> {
112 let mut target = self.endpoint.clone();
113 target.path_segments_mut().expect("Unable to nav to /token").push("token");
114
115 Ok(Self::check_response(self.client.post(target).json(token).send().await?).await?.json().await?)
116 }
117 pub async fn create_token(&self, args: &TokenPostArgs) -> Result<Token> {
118 let mut target = self.endpoint.clone();
119 target.path_segments_mut().expect("Unable to nav to /token").push("token");
120
121 Ok(Self::check_response(self.client.post(target).json(args).send().await?).await?.json().await?)
122 }
123 pub async fn delete_token(&self, args: &TokenDeleteArgs) -> Result<()> {
124 let mut target = self.endpoint.clone();
125 target.path_segments_mut().expect("Unable to nav to /token").push("token");
126
127 Ok(Self::check_response(self.client.delete(target).json(args).send().await?).await?.json().await?)
128 }
129 pub async fn fastsync(&self) -> Result<StateSync> {
130 let mut target = self.endpoint.clone();
131 target.path_segments_mut().expect("Unable to nav to /fastsync").push("fastsync");
132
133 Ok(Self::check_response(self.client.get(target).send().await?).await?.json().await?)
134 }
135 pub async fn stream_fastsync(&self) -> Result<impl futures::Stream<Item=Result<tpex::StateSync>>> {
136 let mut target = self.endpoint.clone();
137 target.path_segments_mut().expect("Unable to nav to /fastsync").push("fastsync");
138
139 let ws = self.client.get(target)
140 .upgrade()
141 .send().await?
142 .into_websocket().await?;
143
144 Ok(ws.filter_map(|msg| async {
145 let ret: Option<Result<tpex::StateSync>> = match msg {
146 Ok(Message::Text(text)) => Some(serde_json::from_str(&text).map_err(|_| Error::Unknown(None))),
147 Ok(Message::Binary(binary)) => Some(serde_json::from_slice(&binary).map_err(|_| Error::Unknown(None))),
148 Err(e) => Some(Err(e.into())),
149 _ => None
150 };
151 ret
152 }))
153 }
154 pub async fn get_balance(&self, player: &tpex::PlayerId) -> Result<tpex::Coins> {
155 let mut target = self.endpoint.clone();
156 target.path_segments_mut().expect("Unable to nav to /inspect/balance").push("inspect").push("balance");
157 target.query_pairs_mut().append_pair("player", player.get_raw_name());
158
159 Ok(Self::check_response(self.client.get(target).send().await?).await?.json().await?)
160 }
161 pub async fn get_assets(&self, player: &tpex::PlayerId) -> Result<std::collections::HashMap<AssetId, u64>> {
162 let mut target = self.endpoint.clone();
163 target.path_segments_mut().expect("Unable to nav to /inspect/assets").push("inspect").push("assets");
164 target.query_pairs_mut().append_pair("player", player.get_raw_name());
165
166 Ok(Self::check_response(self.client.get(target).send().await?).await?.json().await?)
167 }
168 pub async fn itemised_audit(&self) -> Result<tpex::ItemisedAudit> {
169 let mut target = self.endpoint.clone();
170 target.path_segments_mut().expect("Unable to nav to /inspect/audit").push("inspect").push("audit");
171
172 Ok(Self::check_response(self.client.get(target).send().await?).await?.json().await?)
173 }
174}
175
176pub struct Mirrored {
177 pub remote: Remote,
178 state: tokio::sync::RwLock<State>
179}
180impl Mirrored {
181 pub fn new(endpoint: reqwest::Url, token: Token) -> Mirrored {
182 Mirrored {
183 remote: Remote::new(endpoint, token),
184 state: tokio::sync::RwLock::new(State::new())
185 }
186 }
187 pub async fn fastsync(&'_ self) -> Result<tokio::sync::RwLockReadGuard<'_, State>> {
188 let new_state: State = self.remote.fastsync().await?.try_into()?;
189 let mut state = self.state.write().await;
190 *state = new_state;
191 Ok(state.downgrade())
192 }
193 pub async fn sync(&'_ self) -> Result<tokio::sync::RwLockReadGuard<'_, State>> {
194 let mut state = self.state.write().await;
195 let cursor = std::io::Cursor::new(self.remote.get_state(state.get_next_id() - 1).await?);
196 let mut buf = tokio::io::BufReader::new(cursor);
197 state.replay(&mut buf, true).await.expect("State unable to replay");
198 Ok(state.downgrade())
199 }
200 pub async fn apply(&self, action: tpex::Action) -> Result<u64> {
201 let id = self.remote.apply(&action).await?;
203 drop(self.sync().await);
204 Ok(id)
205 }
206 pub async fn stream(self: std::sync::Arc<Self>) -> Result<impl futures::Stream<Item=Result<(std::sync::Arc<Self>, tpex::WrappedAction)>>> {
207 let next_id = self.state.read().await.get_next_id();
208 let this: std::sync::Arc<Self> = self.clone();
209 let stream = self.remote.stream_state(next_id).await?;
210 Ok(stream.and_then(move |wrapped_action| { let this = this.clone(); async move {
211 this.state.write().await.apply(wrapped_action.action.clone(), tokio::io::sink()).await?;
212 Ok((this, wrapped_action))
213 }}))
214 }
215}