treasury_client/
lib.rs

1//! Library for importing assets into treasury.
2
3use std::{
4    io::ErrorKind,
5    net::Ipv4Addr,
6    path::PathBuf,
7    process::Command,
8    time::{Duration, Instant},
9};
10
11use eyre::WrapErr;
12use tokio::{io::BufReader, net::TcpStream};
13use treasury_api::{
14    get_port, recv_message, send_handshake, send_message, FetchUrlResponse, FindResponse,
15    OpenRequest, OpenResponse, Request, StoreResponse,
16};
17use url::Url;
18
19pub use treasury_id::AssetId;
20
21#[derive(Debug, serde::Deserialize)]
22enum Treasury {
23    // Remote(Url),
24    Local(PathBuf),
25}
26
27#[derive(Debug)]
28pub struct Client {
29    #[allow(unused)]
30    treasury: Treasury,
31    stream: BufReader<TcpStream>,
32}
33
34impl Client {
35    pub async fn local(treasury: PathBuf, init: bool) -> eyre::Result<Self> {
36        let path = treasury
37            .to_str()
38            .ok_or_else(|| eyre::eyre!("Treasury path must not contain non UTF8 characters"))?;
39
40        let mut stream = BufReader::new(connect_local().await?);
41
42        send_handshake(&mut stream)
43            .await
44            .wrap_err("Failed to send handshake to treasury server")?;
45
46        send_message(
47            &mut stream,
48            OpenRequest {
49                init,
50                path: path.into(),
51            },
52        )
53        .await
54        .wrap_err("Failed to send Open message to treasury server")?;
55
56        match recv_message(&mut stream)
57            .await
58            .wrap_err("Failed to receive response for Open request")?
59        {
60            None => {
61                return Err(eyre::eyre!(
62                    "Failed to receive response for Open request. Connection lost."
63                ));
64            }
65            Some(OpenResponse::Success) => {}
66            Some(OpenResponse::Failure { description }) => {
67                return Err(eyre::eyre!("Open request failure. {}", description));
68            }
69        }
70
71        Ok(Client {
72            stream,
73            treasury: Treasury::Local(treasury),
74        })
75    }
76
77    /// Store asset into treasury from specified URL.
78    #[tracing::instrument]
79    pub async fn store_asset(
80        &mut self,
81        source: &Url,
82        format: Option<&str>,
83        target: &str,
84    ) -> eyre::Result<(AssetId, Box<str>)> {
85        send_message(
86            &mut self.stream,
87            Request::Store {
88                source: source.as_str().into(),
89                format: format.map(|f| f.into()),
90                target: target.into(),
91            },
92        )
93        .await
94        .wrap_err("Failed to send Store request")?;
95
96        match recv_message(&mut self.stream)
97            .await
98            .wrap_err("Failed to receive response for Store request")?
99        {
100            None => Err(eyre::eyre!(
101                "Failed to receive response for Store request. Connection lost."
102            )),
103            Some(StoreResponse::Success { id, path }) => {
104                tracing::info!("Store requested succeeded");
105                Ok((id, path))
106            }
107            Some(StoreResponse::Failure { description }) => {
108                Err(eyre::eyre!("Store request failure. {}", description))
109            }
110            Some(StoreResponse::NeedData { url }) => Err(eyre::eyre!(
111                "Treasury requires access to '{}' to finish store operation",
112                url
113            )),
114        }
115    }
116
117    /// Store asset into treasury from specified URL.
118    #[tracing::instrument]
119    pub async fn fetch(&mut self, id: AssetId) -> eyre::Result<Option<Url>> {
120        send_message(&mut self.stream, Request::FetchUrl { id })
121            .await
122            .wrap_err("Failed to send Store request")?;
123
124        match recv_message(&mut self.stream)
125            .await
126            .wrap_err("Failed to receive response for Find request")?
127        {
128            None => Err(eyre::eyre!(
129                "Failed to receive response for Find request. Connection lost."
130            )),
131            Some(FetchUrlResponse::Success { artifact }) => {
132                tracing::info!("Find requested succeeded");
133                let url = Url::parse(&artifact).wrap_err_with(|| {
134                    format!(
135                        "Failed to parse URL from '{}' from server response",
136                        artifact
137                    )
138                })?;
139
140                Ok(Some(url))
141            }
142            Some(FetchUrlResponse::Failure { description }) => {
143                Err(eyre::eyre!("Find request failure. {}", description))
144            }
145            Some(FetchUrlResponse::NotFound) => Ok(None),
146        }
147    }
148
149    /// Store asset into treasury from specified URL.
150    #[tracing::instrument]
151    pub async fn find(
152        &mut self,
153        source: &Url,
154        target: &str,
155    ) -> eyre::Result<Option<(AssetId, Box<str>)>> {
156        send_message(
157            &mut self.stream,
158            Request::FindAsset {
159                source: source.as_str().into(),
160                target: target.into(),
161            },
162        )
163        .await
164        .wrap_err("Failed to send Store request")?;
165
166        match recv_message(&mut self.stream)
167            .await
168            .wrap_err("Failed to receive response for Find request")?
169        {
170            None => Err(eyre::eyre!(
171                "Failed to receive response for Find request. Connection lost."
172            )),
173            Some(FindResponse::Success { id, path }) => {
174                tracing::info!("Find requested succeeded");
175
176                Ok(Some((id, path)))
177            }
178            Some(FindResponse::Failure { description }) => {
179                Err(eyre::eyre!("Find request failure. {}", description))
180            }
181            Some(FindResponse::NotFound) => Ok(None),
182        }
183    }
184}
185
186async fn connect_local() -> eyre::Result<TcpStream> {
187    let port = get_port();
188
189    match TcpStream::connect((Ipv4Addr::LOCALHOST, port)).await {
190        Ok(stream) => {
191            tracing::info!("Connected to running server");
192            Ok(stream)
193        }
194        Err(err) if err.kind() == ErrorKind::ConnectionRefused => {
195            tracing::info!("Failed to connect to treasury server. Run provisional instance");
196
197            match Command::new("treasury-server")
198                .env("TREASURY_PENDING_TIMEOUT", "5")
199                .spawn()
200            {
201                Err(err) => {
202                    return Err(eyre::eyre!(
203                        "Failed to spawn provisional treasury server. {:#}",
204                        err
205                    ));
206                }
207                Ok(mut child) => {
208                    let ten_ms = Duration::from_millis(10);
209                    let second = Duration::from_secs(10);
210                    let now = Instant::now();
211                    let deadline = now + second;
212
213                    while Instant::now() < deadline {
214                        // Dirty, I know.
215                        tokio::time::sleep(ten_ms).await;
216
217                        match TcpStream::connect((Ipv4Addr::LOCALHOST, port)).await {
218                            Ok(stream) => {
219                                // Not recommended for long-running processes to do so on UNIX systems.
220                                drop(child);
221                                return Ok(stream);
222                            }
223                            Err(err) if err.kind() == ErrorKind::ConnectionRefused => continue,
224                            Err(err) => {
225                                let _ = child.kill();
226                                return Err(eyre::eyre!(
227                                    "Failed to connect to provisional treasury server. {:#}",
228                                    err
229                                ));
230                            }
231                        }
232                    }
233
234                    // Failed to connect. Kill the bastard.
235                    let _ = child.kill();
236                    return Err(eyre::eyre!(
237                        "Failed to connect to provisional treasury server before timeout"
238                    ));
239                }
240            }
241        }
242        Err(err) => Err(eyre::eyre!(
243            "Failed to connect to treasury server. {:#}",
244            err
245        )),
246    }
247}