1use std::{
4 io::ErrorKind,
5 net::Ipv4Addr,
6 path::PathBuf,
7 process::Command,
8 time::{Duration, Instant},
9};
10
11use eyre::WrapErr;
12use tokio::{io::BufReader, net::TcpStream};
13use treasury_api::{
14 get_port, recv_message, send_handshake, send_message, FetchUrlResponse, FindResponse,
15 OpenRequest, OpenResponse, Request, StoreResponse,
16};
17use url::Url;
18
19pub use treasury_id::AssetId;
20
21#[derive(Debug, serde::Deserialize)]
22enum Treasury {
23 Local(PathBuf),
25}
26
27#[derive(Debug)]
28pub struct Client {
29 #[allow(unused)]
30 treasury: Treasury,
31 stream: BufReader<TcpStream>,
32}
33
34impl Client {
35 pub async fn local(treasury: PathBuf, init: bool) -> eyre::Result<Self> {
36 let path = treasury
37 .to_str()
38 .ok_or_else(|| eyre::eyre!("Treasury path must not contain non UTF8 characters"))?;
39
40 let mut stream = BufReader::new(connect_local().await?);
41
42 send_handshake(&mut stream)
43 .await
44 .wrap_err("Failed to send handshake to treasury server")?;
45
46 send_message(
47 &mut stream,
48 OpenRequest {
49 init,
50 path: path.into(),
51 },
52 )
53 .await
54 .wrap_err("Failed to send Open message to treasury server")?;
55
56 match recv_message(&mut stream)
57 .await
58 .wrap_err("Failed to receive response for Open request")?
59 {
60 None => {
61 return Err(eyre::eyre!(
62 "Failed to receive response for Open request. Connection lost."
63 ));
64 }
65 Some(OpenResponse::Success) => {}
66 Some(OpenResponse::Failure { description }) => {
67 return Err(eyre::eyre!("Open request failure. {}", description));
68 }
69 }
70
71 Ok(Client {
72 stream,
73 treasury: Treasury::Local(treasury),
74 })
75 }
76
77 #[tracing::instrument]
79 pub async fn store_asset(
80 &mut self,
81 source: &Url,
82 format: Option<&str>,
83 target: &str,
84 ) -> eyre::Result<(AssetId, Box<str>)> {
85 send_message(
86 &mut self.stream,
87 Request::Store {
88 source: source.as_str().into(),
89 format: format.map(|f| f.into()),
90 target: target.into(),
91 },
92 )
93 .await
94 .wrap_err("Failed to send Store request")?;
95
96 match recv_message(&mut self.stream)
97 .await
98 .wrap_err("Failed to receive response for Store request")?
99 {
100 None => Err(eyre::eyre!(
101 "Failed to receive response for Store request. Connection lost."
102 )),
103 Some(StoreResponse::Success { id, path }) => {
104 tracing::info!("Store requested succeeded");
105 Ok((id, path))
106 }
107 Some(StoreResponse::Failure { description }) => {
108 Err(eyre::eyre!("Store request failure. {}", description))
109 }
110 Some(StoreResponse::NeedData { url }) => Err(eyre::eyre!(
111 "Treasury requires access to '{}' to finish store operation",
112 url
113 )),
114 }
115 }
116
117 #[tracing::instrument]
119 pub async fn fetch(&mut self, id: AssetId) -> eyre::Result<Option<Url>> {
120 send_message(&mut self.stream, Request::FetchUrl { id })
121 .await
122 .wrap_err("Failed to send Store request")?;
123
124 match recv_message(&mut self.stream)
125 .await
126 .wrap_err("Failed to receive response for Find request")?
127 {
128 None => Err(eyre::eyre!(
129 "Failed to receive response for Find request. Connection lost."
130 )),
131 Some(FetchUrlResponse::Success { artifact }) => {
132 tracing::info!("Find requested succeeded");
133 let url = Url::parse(&artifact).wrap_err_with(|| {
134 format!(
135 "Failed to parse URL from '{}' from server response",
136 artifact
137 )
138 })?;
139
140 Ok(Some(url))
141 }
142 Some(FetchUrlResponse::Failure { description }) => {
143 Err(eyre::eyre!("Find request failure. {}", description))
144 }
145 Some(FetchUrlResponse::NotFound) => Ok(None),
146 }
147 }
148
149 #[tracing::instrument]
151 pub async fn find(
152 &mut self,
153 source: &Url,
154 target: &str,
155 ) -> eyre::Result<Option<(AssetId, Box<str>)>> {
156 send_message(
157 &mut self.stream,
158 Request::FindAsset {
159 source: source.as_str().into(),
160 target: target.into(),
161 },
162 )
163 .await
164 .wrap_err("Failed to send Store request")?;
165
166 match recv_message(&mut self.stream)
167 .await
168 .wrap_err("Failed to receive response for Find request")?
169 {
170 None => Err(eyre::eyre!(
171 "Failed to receive response for Find request. Connection lost."
172 )),
173 Some(FindResponse::Success { id, path }) => {
174 tracing::info!("Find requested succeeded");
175
176 Ok(Some((id, path)))
177 }
178 Some(FindResponse::Failure { description }) => {
179 Err(eyre::eyre!("Find request failure. {}", description))
180 }
181 Some(FindResponse::NotFound) => Ok(None),
182 }
183 }
184}
185
186async fn connect_local() -> eyre::Result<TcpStream> {
187 let port = get_port();
188
189 match TcpStream::connect((Ipv4Addr::LOCALHOST, port)).await {
190 Ok(stream) => {
191 tracing::info!("Connected to running server");
192 Ok(stream)
193 }
194 Err(err) if err.kind() == ErrorKind::ConnectionRefused => {
195 tracing::info!("Failed to connect to treasury server. Run provisional instance");
196
197 match Command::new("treasury-server")
198 .env("TREASURY_PENDING_TIMEOUT", "5")
199 .spawn()
200 {
201 Err(err) => {
202 return Err(eyre::eyre!(
203 "Failed to spawn provisional treasury server. {:#}",
204 err
205 ));
206 }
207 Ok(mut child) => {
208 let ten_ms = Duration::from_millis(10);
209 let second = Duration::from_secs(10);
210 let now = Instant::now();
211 let deadline = now + second;
212
213 while Instant::now() < deadline {
214 tokio::time::sleep(ten_ms).await;
216
217 match TcpStream::connect((Ipv4Addr::LOCALHOST, port)).await {
218 Ok(stream) => {
219 drop(child);
221 return Ok(stream);
222 }
223 Err(err) if err.kind() == ErrorKind::ConnectionRefused => continue,
224 Err(err) => {
225 let _ = child.kill();
226 return Err(eyre::eyre!(
227 "Failed to connect to provisional treasury server. {:#}",
228 err
229 ));
230 }
231 }
232 }
233
234 let _ = child.kill();
236 return Err(eyre::eyre!(
237 "Failed to connect to provisional treasury server before timeout"
238 ));
239 }
240 }
241 }
242 Err(err) => Err(eyre::eyre!(
243 "Failed to connect to treasury server. {:#}",
244 err
245 )),
246 }
247}