1use std::net::SocketAddr;
2use std::path::Path;
3use std::sync::Arc;
4
5use anyhow::Context as _;
6use anyhow::Error;
7use anyhow::Result;
8
9use itertools::Itertools;
10use serde::Deserialize;
11use serde::Serialize;
12use tokio::net::TcpListener;
13use tokio_stream::wrappers::TcpListenerStream;
14use tracing::info;
15
16use warp::http::StatusCode;
17use warp::http::Uri;
18use warp::reject::Reject;
19use warp::Filter;
20use warp::Rejection;
21
22use crate::index::handle_git;
23use crate::index::Index;
24use crate::publish::crate_file_name;
25use crate::publish::crate_path;
26use crate::publish::publish_crate;
27use crate::serve_frontend;
28
29#[derive(Debug)]
30pub(crate) struct ServerError(pub(crate) anyhow::Error);
31
32impl Reject for ServerError {}
33
34#[derive(Debug, Default, Deserialize, Serialize)]
36struct RegistryError {
37 detail: String,
38}
39
40#[derive(Debug, Default, Deserialize, Serialize)]
42struct RegistryErrors {
43 errors: Vec<RegistryError>,
44}
45
46impl From<Error> for RegistryErrors {
47 fn from(error: Error) -> Self {
48 Self {
49 errors: error
50 .chain()
51 .map(ToString::to_string)
52 .map(|err| RegistryError { detail: err })
53 .collect(),
54 }
55 }
56}
57
58pub enum ServerBinding {
59 Addr(SocketAddr),
60 Listener(TcpListener),
61}
62
63impl From<SocketAddr> for ServerBinding {
64 fn from(binding_addr: SocketAddr) -> Self {
65 Self::Addr(binding_addr)
66 }
67}
68
69impl From<TcpListener> for ServerBinding {
70 fn from(listener: TcpListener) -> Self {
71 Self::Listener(listener)
72 }
73}
74
75impl ServerBinding {
76 async fn to_listener(self) -> Result<TcpListener> {
77 Ok(match self {
78 ServerBinding::Addr(addr) => TcpListener::bind(addr).await?,
79 ServerBinding::Listener(listener) => listener,
80 })
81 }
82}
83
84fn response<T>(result: Result<T>) -> Result<impl warp::Reply, warp::Rejection>
86where
87 T: warp::Reply,
88{
89 match result {
90 Ok(inner) => {
91 info!("request status: success");
92 Ok(warp::reply::with_status(
93 inner.into_response(),
94 StatusCode::OK,
95 ))
96 }
97 Err(err) => Err(warp::reject::custom(ServerError(err))),
98 }
99 }
104
105pub async fn serve(root: &Path, binding: impl Into<ServerBinding>, server_addr: SocketAddr) -> Result<()> {
107 let frontend = serve_frontend(root);
108 let crates_folder = Arc::new(root.join("crates"));
109 let index_folder = root.join("index");
110 let git_index = Arc::new(
111 Index::new(&index_folder, &server_addr)
112 .await
113 .with_context(|| {
114 format!(
115 "failed to create/instantiate crate index at {}",
116 index_folder.display()
117 )
118 })?,
119 );
120
121 let path_for_git = index_folder.to_path_buf();
122 let index = warp::path("git")
124 .and(warp::path("index"))
125 .and(warp::path::tail())
126 .and(warp::method())
127 .and(warp::header::optional::<String>("Content-Type"))
128 .and(warp::addr::remote())
129 .and(warp::body::stream())
130 .and(warp::query::raw().or_else(|_| async { Ok::<(String,), Rejection>((String::new(),)) }))
131 .and_then(
132 move |path_tail, method, content_type, remote, body, query| {
133 let mirror_path = path_for_git.clone();
134 async move {
135 response(
136 handle_git(
137 mirror_path,
138 path_tail,
139 method,
140 content_type,
141 remote,
142 body,
143 query,
144 )
145 .await,
146 )
147 }
148 },
149 );
150 let crates = warp::path("crates")
157 .and(warp::fs::dir(crates_folder.to_path_buf()))
158 .with(warp::trace::request());
159 let download = warp::get()
160 .and(warp::path("api"))
161 .and(warp::path("v1"))
162 .and(warp::path("crates"))
163 .and(warp::path::param())
164 .and(warp::path::param())
165 .and(warp::path("download"))
166 .map(move |name: String, version: String| {
167 let crate_path = crate_path(&name).join(crate_file_name(&name, &version));
168 let path = format!(
169 "/crates/{}",
170 crate_path
171 .components()
172 .map(|c| format!("{}", c.as_os_str().to_str().unwrap()))
173 .join("/")
174 );
175
176 path.parse::<Uri>().map(warp::redirect).unwrap()
181 })
182 .with(warp::trace::request());
183 let publish = warp::put()
184 .and(warp::path("api"))
185 .and(warp::path("v1"))
186 .and(warp::path("crates"))
187 .and(warp::path("new"))
188 .and(warp::path::end())
189 .and(warp::body::bytes())
190 .and(warp::body::content_length_limit(20 * 1024 * 1024))
193 .and_then(move |body| {
194 let index = git_index.clone();
195 let crates_folder = crates_folder.clone();
196 async move {
197 response(
198 publish_crate(body, index, crates_folder.as_path())
199 .await
200 .map(|()| String::new()),
201 )
202 }
203 })
204 .with(warp::trace::request());
205
206 let dist_dir = warp::path::path("dist").and(warp::fs::dir(root.join("dist")));
208 let rustup_dir = warp::path::path("rustup").and(warp::fs::dir(root.join("rustup")));
209
210 let routes = frontend
211 .or(crates)
212 .or(download)
213 .or(publish)
214 .or(dist_dir)
215 .or(rustup_dir)
216 .or(index);
218 warp::serve(routes)
223 .run_incoming(TcpListenerStream::new(binding.into().to_listener().await?))
224 .await;
225
226 Ok(())
227}
228
229#[cfg(test)]
230mod tests {
231 use super::*;
232
233 use serde_json::to_string;
234
235 #[test]
236 fn registry_error_encoding() {
237 let expected = r#"{"errors":[{"detail":"error message text"}]}"#;
238 let errors = RegistryErrors {
239 errors: vec![RegistryError {
240 detail: "error message text".to_string(),
241 }],
242 };
243
244 assert_eq!(to_string(&errors).unwrap(), expected);
245 }
246}