crates_registry/
serve.rs

1use std::net::SocketAddr;
2use std::path::Path;
3use std::sync::Arc;
4
5use anyhow::Context as _;
6use anyhow::Error;
7use anyhow::Result;
8
9use itertools::Itertools;
10use serde::Deserialize;
11use serde::Serialize;
12use tokio::net::TcpListener;
13use tokio_stream::wrappers::TcpListenerStream;
14use tracing::info;
15
16use warp::http::StatusCode;
17use warp::http::Uri;
18use warp::reject::Reject;
19use warp::Filter;
20use warp::Rejection;
21
22use crate::index::handle_git;
23use crate::index::Index;
24use crate::publish::crate_file_name;
25use crate::publish::crate_path;
26use crate::publish::publish_crate;
27use crate::serve_frontend;
28
29#[derive(Debug)]
30pub(crate) struct ServerError(pub(crate) anyhow::Error);
31
32impl Reject for ServerError {}
33
34/// A single error that the registry returns.
35#[derive(Debug, Default, Deserialize, Serialize)]
36struct RegistryError {
37    detail: String,
38}
39
40/// A list of errors that the registry returns in its response.
41#[derive(Debug, Default, Deserialize, Serialize)]
42struct RegistryErrors {
43    errors: Vec<RegistryError>,
44}
45
46impl From<Error> for RegistryErrors {
47    fn from(error: Error) -> Self {
48        Self {
49            errors: error
50                .chain()
51                .map(ToString::to_string)
52                .map(|err| RegistryError { detail: err })
53                .collect(),
54        }
55    }
56}
57
58pub enum ServerBinding {
59    Addr(SocketAddr),
60    Listener(TcpListener),
61}
62
63impl From<SocketAddr> for ServerBinding {
64    fn from(binding_addr: SocketAddr) -> Self {
65        Self::Addr(binding_addr)
66    }
67}
68
69impl From<TcpListener> for ServerBinding {
70    fn from(listener: TcpListener) -> Self {
71        Self::Listener(listener)
72    }
73}
74
75impl ServerBinding {
76    async fn to_listener(self) -> Result<TcpListener> {
77        Ok(match self {
78            ServerBinding::Addr(addr) => TcpListener::bind(addr).await?,
79            ServerBinding::Listener(listener) => listener,
80        })
81    }
82}
83
84/// Convert a result back into a response.
85fn response<T>(result: Result<T>) -> Result<impl warp::Reply, warp::Rejection>
86where
87    T: warp::Reply,
88{
89    match result {
90        Ok(inner) => {
91            info!("request status: success");
92            Ok(warp::reply::with_status(
93                inner.into_response(),
94                StatusCode::OK,
95            ))
96        }
97        Err(err) => Err(warp::reject::custom(ServerError(err))),
98    }
99    // // Registries always respond with OK and use the JSON error array to
100    // // indicate problems.
101    // let reply = warp::reply::with_status(response, StatusCode::OK);
102    // Ok(reply)
103}
104
105/// Serve a registry at the given path on the given socket address.
106pub async fn serve(root: &Path, binding: impl Into<ServerBinding>, server_addr: SocketAddr) -> Result<()> {
107    let frontend = serve_frontend(root);
108    let crates_folder = Arc::new(root.join("crates"));
109    let index_folder = root.join("index");
110    let git_index = Arc::new(
111        Index::new(&index_folder, &server_addr)
112            .await
113            .with_context(|| {
114                format!(
115                    "failed to create/instantiate crate index at {}",
116                    index_folder.display()
117                )
118            })?,
119    );
120
121    let path_for_git = index_folder.to_path_buf();
122    // Serve git client requests to /git/index
123    let index = warp::path("git")
124        .and(warp::path("index"))
125        .and(warp::path::tail())
126        .and(warp::method())
127        .and(warp::header::optional::<String>("Content-Type"))
128        .and(warp::addr::remote())
129        .and(warp::body::stream())
130        .and(warp::query::raw().or_else(|_| async { Ok::<(String,), Rejection>((String::new(),)) }))
131        .and_then(
132            move |path_tail, method, content_type, remote, body, query| {
133                let mirror_path = path_for_git.clone();
134                async move {
135                    response(
136                        handle_git(
137                            mirror_path,
138                            path_tail,
139                            method,
140                            content_type,
141                            remote,
142                            body,
143                            query,
144                        )
145                        .await,
146                    )
147                }
148            },
149        );
150    // Handle sparse index requests at /index/
151    // let sparse_index = warp::path("index").and(warp::fs::dir(index_folder.clone()));
152
153    // Serve the contents of <root>/ at /crates. This allows for directly
154    // downloading the .crate files, to which we redirect from the
155    // download handler below.
156    let crates = warp::path("crates")
157        .and(warp::fs::dir(crates_folder.to_path_buf()))
158        .with(warp::trace::request());
159    let download = warp::get()
160        .and(warp::path("api"))
161        .and(warp::path("v1"))
162        .and(warp::path("crates"))
163        .and(warp::path::param())
164        .and(warp::path::param())
165        .and(warp::path("download"))
166        .map(move |name: String, version: String| {
167            let crate_path = crate_path(&name).join(crate_file_name(&name, &version));
168            let path = format!(
169                "/crates/{}",
170                crate_path
171                    .components()
172                    .map(|c| format!("{}", c.as_os_str().to_str().unwrap()))
173                    .join("/")
174            );
175
176            // TODO: Ideally we shouldn't unwrap here. That's not that easily
177            //       possible, though, because then we'd need to handle errors
178            //       and we can't use the response function because it will
179            //       overwrite the HTTP status even on success.
180            path.parse::<Uri>().map(warp::redirect).unwrap()
181        })
182        .with(warp::trace::request());
183    let publish = warp::put()
184        .and(warp::path("api"))
185        .and(warp::path("v1"))
186        .and(warp::path("crates"))
187        .and(warp::path("new"))
188        .and(warp::path::end())
189        .and(warp::body::bytes())
190        // We cap total body size to 20 MiB to have some upper bound. At the
191        // time of last check, crates.io employed a limit of 10 MiB.
192        .and(warp::body::content_length_limit(20 * 1024 * 1024))
193        .and_then(move |body| {
194            let index = git_index.clone();
195            let crates_folder = crates_folder.clone();
196            async move {
197                response(
198                    publish_crate(body, index, crates_folder.as_path())
199                        .await
200                        .map(|()| String::new()),
201                )
202            }
203        })
204        .with(warp::trace::request());
205
206    // For Rust installation
207    let dist_dir = warp::path::path("dist").and(warp::fs::dir(root.join("dist")));
208    let rustup_dir = warp::path::path("rustup").and(warp::fs::dir(root.join("rustup")));
209
210    let routes = frontend
211        .or(crates)
212        .or(download)
213        .or(publish)
214        .or(dist_dir)
215        .or(rustup_dir)
216        // .or(sparse_index)
217        .or(index);
218    // Despite the claim that this function "Returns [...] a Future that
219    // can be executed on any runtime." not even the call itself can
220    // happen outside of a tokio runtime. Boy.
221
222    warp::serve(routes)
223        .run_incoming(TcpListenerStream::new(binding.into().to_listener().await?))
224        .await;
225
226    Ok(())
227}
228
229#[cfg(test)]
230mod tests {
231    use super::*;
232
233    use serde_json::to_string;
234
235    #[test]
236    fn registry_error_encoding() {
237        let expected = r#"{"errors":[{"detail":"error message text"}]}"#;
238        let errors = RegistryErrors {
239            errors: vec![RegistryError {
240                detail: "error message text".to_string(),
241            }],
242        };
243
244        assert_eq!(to_string(&errors).unwrap(), expected);
245    }
246}