tpex_api/
lib.rs

1#![cfg(feature="client")]
2
3mod tests;
4mod shared;
5
6#[cfg(feature="server")]
7pub mod server;
8
9use futures::{StreamExt, TryStreamExt};
10use reqwest::StatusCode;
11use reqwest_websocket::{Message, RequestBuilderExt};
12pub use shared::*;
13use tpex::{AssetId, AssetInfo, State, StateSync};
14
15pub use shared::Token;
16
17#[derive(Debug)]
18pub enum Error {
19    RequestFailure(reqwest::Error),
20    WebSocketFailure(reqwest_websocket::Error),
21    TPExFailure(ErrorInfo),
22    Unknown(Option<StatusCode>)
23}
24impl std::fmt::Display for Error {
25    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
26        match self {
27            Error::RequestFailure(err) => write!(f, "Request failure: {err}"),
28            Error::WebSocketFailure(err) => write!(f, "WebSocket failure: {err}"),
29            Error::TPExFailure(err) => write!(f, "TPEx failure: {}", err.error),
30            Error::Unknown(Some(code)) => write!(f, "Unknown failure with status code {code}"),
31            Error::Unknown(None) => write!(f, "Unknown failutre"),
32        }
33    }
34}
35impl std::error::Error for Error {}
36impl From<reqwest::Error> for Error {
37    fn from(value: reqwest::Error) -> Self { Error::RequestFailure(value) }
38}
39impl From<reqwest_websocket::Error> for Error {
40    fn from(value: reqwest_websocket::Error) -> Self { Error::WebSocketFailure(value) }
41}
42impl From<ErrorInfo> for Error {
43    fn from(value: ErrorInfo) -> Self { Error::TPExFailure(value) }
44}
45impl From<tpex::Error> for Error {
46    fn from(value: tpex::Error) -> Self { Error::TPExFailure(ErrorInfo { error: value.to_string() }) }
47}
48
49
50pub type Result<T> = core::result::Result<T, Error>;
51
52pub struct Remote {
53    client: reqwest::Client,
54    endpoint: reqwest::Url
55}
56impl Remote {
57    pub fn new(endpoint: reqwest::Url, token: Token) -> Remote {
58        let mut headers = reqwest::header::HeaderMap::new();
59        headers.append(
60            "Authorization",
61            reqwest::header::HeaderValue::from_str(&format!("Bearer {token}")).expect("Unable to make token header"));
62        Remote {
63            client: reqwest::Client::builder().default_headers(headers).build().expect("Unable to build reqwest client"),
64            endpoint
65        }
66    }
67    async fn check_response(response: reqwest::Response) -> Result<reqwest::Response> {
68        let status = response.status();
69        if status.is_success() { Ok(response) }
70        else if let Ok(err) = response.json().await {
71            Err(Error::TPExFailure(err))
72        }
73        else {
74            Err(Error::Unknown(Some(status)))
75        }
76    }
77
78    pub async fn get_state(&self, from: u64) -> Result<Vec<u8>> {
79        let mut target = self.endpoint.clone();
80        target.query_pairs_mut().append_pair("from", &from.to_string());
81        target.path_segments_mut().expect("Unable to nav to /state").push("state");
82
83        Ok(Self::check_response(self.client.get(target).send().await?).await?.bytes().await?.to_vec())
84    }
85    pub async fn stream_state(&self, from: u64) -> Result<impl futures::Stream<Item=Result<tpex::WrappedAction>> + use<>> {
86        let mut target = self.endpoint.clone();
87        target.query_pairs_mut().append_pair("from", &from.to_string());
88        target.path_segments_mut().expect("Unable to nav to /state").push("state");
89
90        let ws = self.client.get(target)
91            .upgrade()
92            .send().await?
93            .into_websocket().await?;
94
95        Ok(ws.filter_map(|msg| async {
96            let ret: Option<Result<tpex::WrappedAction>> = match msg {
97                Ok(Message::Text(text)) => Some(serde_json::from_str(&text).map_err(|_| Error::Unknown(None))),
98                Ok(Message::Binary(binary)) => Some(serde_json::from_slice(&binary).map_err(|_| Error::Unknown(None))),
99                Err(e) => Some(Err(e.into())),
100                _ => None
101            };
102            ret
103        }))
104    }
105    pub async fn apply(&self, action: &tpex::Action) -> Result<u64> {
106        let mut target = self.endpoint.clone();
107        target.path_segments_mut().expect("Unable to nav to /state").push("state");
108
109        Ok(Self::check_response(self.client.patch(target).json(action).send().await?).await?.json().await?)
110    }
111    pub async fn get_token(&self, token: &Token) -> Result<TokenInfo> {
112        let mut target = self.endpoint.clone();
113        target.path_segments_mut().expect("Unable to nav to /token").push("token");
114
115        Ok(Self::check_response(self.client.post(target).json(token).send().await?).await?.json().await?)
116    }
117    pub async fn create_token(&self, args: &TokenPostArgs) -> Result<Token> {
118        let mut target = self.endpoint.clone();
119        target.path_segments_mut().expect("Unable to nav to /token").push("token");
120
121        Ok(Self::check_response(self.client.post(target).json(args).send().await?).await?.json().await?)
122    }
123    pub async fn delete_token(&self, args: &TokenDeleteArgs) -> Result<()> {
124        let mut target = self.endpoint.clone();
125        target.path_segments_mut().expect("Unable to nav to /token").push("token");
126
127        Ok(Self::check_response(self.client.delete(target).json(args).send().await?).await?.json().await?)
128    }
129    pub async fn fastsync(&self) -> Result<StateSync> {
130        let mut target = self.endpoint.clone();
131        target.path_segments_mut().expect("Unable to nav to /fastsync").push("fastsync");
132
133        Ok(Self::check_response(self.client.get(target).send().await?).await?.json().await?)
134    }
135    pub async fn stream_fastsync(&self) -> Result<impl futures::Stream<Item=Result<tpex::StateSync>>> {
136        let mut target = self.endpoint.clone();
137        target.path_segments_mut().expect("Unable to nav to /fastsync").push("fastsync");
138
139        let ws = self.client.get(target)
140            .upgrade()
141            .send().await?
142            .into_websocket().await?;
143
144        Ok(ws.filter_map(|msg| async {
145            let ret: Option<Result<tpex::StateSync>> = match msg {
146                Ok(Message::Text(text)) => Some(serde_json::from_str(&text).map_err(|_| Error::Unknown(None))),
147                Ok(Message::Binary(binary)) => Some(serde_json::from_slice(&binary).map_err(|_| Error::Unknown(None))),
148                Err(e) => Some(Err(e.into())),
149                _ => None
150            };
151            ret
152        }))
153    }
154}
155
156pub struct Mirrored {
157    pub remote: Remote,
158    state: tokio::sync::RwLock<State>
159}
160impl Mirrored {
161    pub fn new(endpoint: reqwest::Url, token: Token) -> Mirrored {
162        Mirrored {
163            remote: Remote::new(endpoint, token),
164            state: tokio::sync::RwLock::new(State::new())
165        }
166    }
167    pub async fn update_asset_info(&self, asset_info: std::collections::HashMap<AssetId, AssetInfo>) {
168        self.state.write().await.update_asset_info(asset_info)
169    }
170    pub async fn fastsync(&self) -> Result<tokio::sync::RwLockReadGuard<State>> {
171        let new_state: State = self.remote.fastsync().await?.try_into()?;
172        let mut state = self.state.write().await;
173        *state = new_state;
174        Ok(state.downgrade())
175    }
176    pub async fn sync(&self) -> Result<tokio::sync::RwLockReadGuard<State>> {
177        let mut state = self.state.write().await;
178        let cursor = std::io::Cursor::new(self.remote.get_state(state.get_next_id() - 1).await?);
179        let mut buf = tokio::io::BufReader::new(cursor);
180        state.replay(&mut buf, true).await.expect("State unable to replay");
181        Ok(state.downgrade())
182    }
183    pub async fn apply(&self, action: tpex::Action) -> Result<u64> {
184        // The remote could be desynced, so we send our update
185        let id = self.remote.apply(&action).await?;
186        drop(self.sync().await);
187        Ok(id)
188    }
189    // This isn't synced
190    pub async fn asset_info(&self, asset: &AssetId) -> std::result::Result<AssetInfo, tpex::Error> {
191        self.state.read().await.asset_info(asset)
192    }
193    pub async fn stream(self: std::sync::Arc<Self>) -> Result<impl futures::Stream<Item=Result<(std::sync::Arc<Self>, tpex::WrappedAction)>>> {
194        let next_id = self.state.read().await.get_next_id();
195        let this: std::sync::Arc<Self> = self.clone();
196        let stream = self.remote.stream_state(next_id).await?;
197        Ok(stream.and_then(move |wrapped_action| { let this = this.clone(); async move  {
198            this.state.write().await.apply(wrapped_action.action.clone(), tokio::io::sink()).await?;
199            Ok((this, wrapped_action))
200        }}))
201    }
202}