1#![deny(clippy::all)]
21
22mod app;
23mod error;
24mod hyperext;
25mod lfs;
26mod logger;
27mod lru;
28mod sha256;
29mod storage;
30mod util;
31
32use std::convert::Infallible;
33use std::net::SocketAddr;
34use std::path::PathBuf;
35use std::sync::Arc;
36
37use futures::future::{self, Future, TryFutureExt};
38use hyper::{
39 self,
40 server::conn::{AddrIncoming, AddrStream},
41 service::make_service_fn,
42};
43
44use crate::app::App;
45use crate::error::Error;
46use crate::logger::Logger;
47use crate::storage::{Cached, Disk, Encrypted, Retrying, Storage, Verify, S3};
48
49#[cfg(feature = "faulty")]
50use crate::storage::Faulty;
51
52pub trait Server: Future<Output = hyper::Result<()>> {
54 fn addr(&self) -> SocketAddr;
56}
57
58impl<S, E> Server for hyper::Server<AddrIncoming, S, E>
59where
60 hyper::Server<AddrIncoming, S, E>: Future<Output = hyper::Result<()>>,
61{
62 fn addr(&self) -> SocketAddr {
63 self.local_addr()
64 }
65}
66
67#[derive(Debug)]
68pub struct Cache {
69 dir: PathBuf,
71
72 max_size: u64,
74}
75
76impl Cache {
77 pub fn new(dir: PathBuf, max_size: u64) -> Self {
78 Self { dir, max_size }
79 }
80}
81
82#[derive(Debug)]
83pub struct S3ServerBuilder {
84 bucket: String,
85 key: Option<[u8; 32]>,
86 prefix: Option<String>,
87 cdn: Option<String>,
88 cache: Option<Cache>,
89}
90
91impl S3ServerBuilder {
92 pub fn new(bucket: String) -> Self {
93 Self {
94 bucket,
95 prefix: None,
96 cdn: None,
97 key: None,
98 cache: None,
99 }
100 }
101
102 pub fn bucket(&mut self, bucket: String) -> &mut Self {
104 self.bucket = bucket;
105 self
106 }
107
108 pub fn key(&mut self, key: [u8; 32]) -> &mut Self {
110 self.key = Some(key);
111 self
112 }
113
114 pub fn prefix(&mut self, prefix: String) -> &mut Self {
116 self.prefix = Some(prefix);
117 self
118 }
119
120 pub fn cdn(&mut self, url: String) -> &mut Self {
123 self.cdn = Some(url);
124 self
125 }
126
127 pub fn cache(&mut self, cache: Cache) -> &mut Self {
130 self.cache = Some(cache);
131 self
132 }
133
134 pub async fn spawn(
137 mut self,
138 addr: SocketAddr,
139 ) -> Result<Box<dyn Server + Unpin + Send>, Box<dyn std::error::Error>>
140 {
141 let prefix = self.prefix.unwrap_or_else(|| String::from("lfs"));
142
143 if self.cdn.is_some() {
144 tracing::warn!(
145 "A CDN was specified. Since uploads and downloads do not flow \
146 through Rudolfs in this case, they will *not* be encrypted."
147 );
148
149 if self.cache.take().is_some() {
150 tracing::warn!(
151 "A local disk cache does not work with a CDN and will be \
152 disabled."
153 );
154 }
155 }
156
157 let s3 = S3::new(self.bucket, prefix, self.cdn)
158 .map_err(Error::from)
159 .await?;
160
161 let s3 = Retrying::new(s3);
163
164 #[cfg(feature = "faulty")]
166 let s3 = Faulty::new(s3);
167
168 match self.cache {
169 Some(cache) => {
170 let disk = Disk::new(cache.dir).map_err(Error::from).await?;
172
173 #[cfg(feature = "faulty")]
174 let disk = Faulty::new(disk);
175
176 let cache = Cached::new(cache.max_size, disk, s3).await?;
177
178 match self.key {
179 Some(key) => {
180 let storage = Verify::new(Encrypted::new(key, cache));
181 Ok(Box::new(spawn_server(storage, &addr)))
182 }
183 None => {
184 let storage = Verify::new(cache);
185 Ok(Box::new(spawn_server(storage, &addr)))
186 }
187 }
188 }
189 None => match self.key {
190 Some(key) => {
191 let storage = Verify::new(Encrypted::new(key, s3));
192 Ok(Box::new(spawn_server(storage, &addr)))
193 }
194 None => {
195 let storage = Verify::new(s3);
196 Ok(Box::new(spawn_server(storage, &addr)))
197 }
198 },
199 }
200 }
201
202 pub async fn run(
205 self,
206 addr: SocketAddr,
207 ) -> Result<(), Box<dyn std::error::Error>> {
208 let server = self.spawn(addr).await?;
209
210 tracing::info!("Listening on {}", server.addr());
211
212 server.await?;
213 Ok(())
214 }
215}
216
217#[derive(Debug)]
218pub struct LocalServerBuilder {
219 path: PathBuf,
220 key: Option<[u8; 32]>,
221 cache: Option<Cache>,
222}
223
224impl LocalServerBuilder {
225 pub fn new(path: PathBuf) -> Self {
228 Self {
229 path,
230 key: None,
231 cache: None,
232 }
233 }
234
235 pub fn key(&mut self, key: [u8; 32]) -> &mut Self {
237 self.key = Some(key);
238 self
239 }
240
241 pub fn cache(&mut self, cache: Cache) -> &mut Self {
247 self.cache = Some(cache);
248 self
249 }
250
251 pub async fn spawn(
254 self,
255 addr: SocketAddr,
256 ) -> Result<Box<dyn Server + Unpin + Send>, Box<dyn std::error::Error>>
257 {
258 let storage = Disk::new(self.path).map_err(Error::from).await?;
259
260 match self.key {
261 Some(key) => {
262 let storage = Verify::new(Encrypted::new(key, storage));
263 tracing::info!(
264 "Local disk storage initialized (with encryption)."
265 );
266 Ok(Box::new(spawn_server(storage, &addr)))
267 }
268 None => {
269 let storage = Verify::new(storage);
270 tracing::info!(
271 "Local disk storage initialized (without encryption)."
272 );
273 Ok(Box::new(spawn_server(storage, &addr)))
274 }
275 }
276 }
277
278 pub async fn run(
281 self,
282 addr: SocketAddr,
283 ) -> Result<(), Box<dyn std::error::Error>> {
284 let server = self.spawn(addr).await?;
285
286 tracing::info!("Listening on {}", server.addr());
287
288 server.await?;
289 Ok(())
290 }
291}
292
293fn spawn_server<S>(storage: S, addr: &SocketAddr) -> impl Server
294where
295 S: Storage + Send + Sync + 'static,
296 S::Error: Into<Error>,
297 Error: From<S::Error>,
298{
299 let storage = Arc::new(storage);
300
301 let new_service = make_service_fn(move |socket: &AddrStream| {
302 let service = App::new(storage.clone());
304
305 future::ok::<_, Infallible>(Logger::new(socket.remote_addr(), service))
307 });
308
309 hyper::Server::bind(addr).serve(new_service)
310}