rudolfs/
lib.rs

1// Copyright (c) 2021 Jason White
2//
3// Permission is hereby granted, free of charge, to any person obtaining a copy
4// of this software and associated documentation files (the "Software"), to deal
5// in the Software without restriction, including without limitation the rights
6// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7// copies of the Software, and to permit persons to whom the Software is
8// furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
19// SOFTWARE.
20#![deny(clippy::all)]
21
22mod app;
23mod error;
24mod hyperext;
25mod lfs;
26mod logger;
27mod lru;
28mod sha256;
29mod storage;
30mod util;
31
32use std::convert::Infallible;
33use std::net::SocketAddr;
34use std::path::PathBuf;
35use std::sync::Arc;
36
37use futures::future::{self, Future, TryFutureExt};
38use hyper::{
39    self,
40    server::conn::{AddrIncoming, AddrStream},
41    service::make_service_fn,
42};
43
44use crate::app::App;
45use crate::error::Error;
46use crate::logger::Logger;
47use crate::storage::{Cached, Disk, Encrypted, Retrying, Storage, Verify, S3};
48
49#[cfg(feature = "faulty")]
50use crate::storage::Faulty;
51
52/// Represents a running LFS server.
53pub trait Server: Future<Output = hyper::Result<()>> {
54    /// Returns the local address this server is bound to.
55    fn addr(&self) -> SocketAddr;
56}
57
58impl<S, E> Server for hyper::Server<AddrIncoming, S, E>
59where
60    hyper::Server<AddrIncoming, S, E>: Future<Output = hyper::Result<()>>,
61{
62    fn addr(&self) -> SocketAddr {
63        self.local_addr()
64    }
65}
66
67#[derive(Debug)]
68pub struct Cache {
69    /// Path to the cache.
70    dir: PathBuf,
71
72    /// Maximum size of the cache, in bytes.
73    max_size: u64,
74}
75
76impl Cache {
77    pub fn new(dir: PathBuf, max_size: u64) -> Self {
78        Self { dir, max_size }
79    }
80}
81
82#[derive(Debug)]
83pub struct S3ServerBuilder {
84    bucket: String,
85    key: Option<[u8; 32]>,
86    prefix: Option<String>,
87    cdn: Option<String>,
88    cache: Option<Cache>,
89}
90
91impl S3ServerBuilder {
92    pub fn new(bucket: String) -> Self {
93        Self {
94            bucket,
95            prefix: None,
96            cdn: None,
97            key: None,
98            cache: None,
99        }
100    }
101
102    /// Sets the bucket to use.
103    pub fn bucket(&mut self, bucket: String) -> &mut Self {
104        self.bucket = bucket;
105        self
106    }
107
108    /// Sets the encryption key to use.
109    pub fn key(&mut self, key: [u8; 32]) -> &mut Self {
110        self.key = Some(key);
111        self
112    }
113
114    /// Sets the prefix to use.
115    pub fn prefix(&mut self, prefix: String) -> &mut Self {
116        self.prefix = Some(prefix);
117        self
118    }
119
120    /// Sets the base URL of the CDN to use. This is incompatible with
121    /// encryption since the LFS object is not sent to Rudolfs.
122    pub fn cdn(&mut self, url: String) -> &mut Self {
123        self.cdn = Some(url);
124        self
125    }
126
127    /// Sets the cache to use. If not specified, then no local disk cache is
128    /// used. All objects will get sent directly to S3.
129    pub fn cache(&mut self, cache: Cache) -> &mut Self {
130        self.cache = Some(cache);
131        self
132    }
133
134    /// Spawns the server. The server must be awaited on in order to accept
135    /// incoming client connections and run.
136    pub async fn spawn(
137        mut self,
138        addr: SocketAddr,
139    ) -> Result<Box<dyn Server + Unpin + Send>, Box<dyn std::error::Error>>
140    {
141        let prefix = self.prefix.unwrap_or_else(|| String::from("lfs"));
142
143        if self.cdn.is_some() {
144            tracing::warn!(
145                "A CDN was specified. Since uploads and downloads do not flow \
146                 through Rudolfs in this case, they will *not* be encrypted."
147            );
148
149            if self.cache.take().is_some() {
150                tracing::warn!(
151                    "A local disk cache does not work with a CDN and will be \
152                     disabled."
153                );
154            }
155        }
156
157        let s3 = S3::new(self.bucket, prefix, self.cdn)
158            .map_err(Error::from)
159            .await?;
160
161        // Retry certain operations to S3 to make it more reliable.
162        let s3 = Retrying::new(s3);
163
164        // Add a little instability for testing purposes.
165        #[cfg(feature = "faulty")]
166        let s3 = Faulty::new(s3);
167
168        match self.cache {
169            Some(cache) => {
170                // Use disk storage as a cache.
171                let disk = Disk::new(cache.dir).map_err(Error::from).await?;
172
173                #[cfg(feature = "faulty")]
174                let disk = Faulty::new(disk);
175
176                let cache = Cached::new(cache.max_size, disk, s3).await?;
177
178                match self.key {
179                    Some(key) => {
180                        let storage = Verify::new(Encrypted::new(key, cache));
181                        Ok(Box::new(spawn_server(storage, &addr)))
182                    }
183                    None => {
184                        let storage = Verify::new(cache);
185                        Ok(Box::new(spawn_server(storage, &addr)))
186                    }
187                }
188            }
189            None => match self.key {
190                Some(key) => {
191                    let storage = Verify::new(Encrypted::new(key, s3));
192                    Ok(Box::new(spawn_server(storage, &addr)))
193                }
194                None => {
195                    let storage = Verify::new(s3);
196                    Ok(Box::new(spawn_server(storage, &addr)))
197                }
198            },
199        }
200    }
201
202    /// Spawns the server and runs it to completion. This will run forever
203    /// unless there is an error or the server shuts down gracefully.
204    pub async fn run(
205        self,
206        addr: SocketAddr,
207    ) -> Result<(), Box<dyn std::error::Error>> {
208        let server = self.spawn(addr).await?;
209
210        tracing::info!("Listening on {}", server.addr());
211
212        server.await?;
213        Ok(())
214    }
215}
216
217#[derive(Debug)]
218pub struct LocalServerBuilder {
219    path: PathBuf,
220    key: Option<[u8; 32]>,
221    cache: Option<Cache>,
222}
223
224impl LocalServerBuilder {
225    /// Creates a local server builder. `path` is the path to the folder where
226    /// all of the LFS data will be stored.
227    pub fn new(path: PathBuf) -> Self {
228        Self {
229            path,
230            key: None,
231            cache: None,
232        }
233    }
234
235    /// Sets the encryption key to use.
236    pub fn key(&mut self, key: [u8; 32]) -> &mut Self {
237        self.key = Some(key);
238        self
239    }
240
241    /// Sets the cache to use. If not specified, then no local disk cache is
242    /// used. It is uncommon to want to use this when the object storage is
243    /// already local. However, a cache may be useful when the data storage path
244    /// is on a mounted network file system. In such a case, the network file
245    /// system could be slow and the local disk storage could be fast.
246    pub fn cache(&mut self, cache: Cache) -> &mut Self {
247        self.cache = Some(cache);
248        self
249    }
250
251    /// Spawns the server. The server must be awaited on in order to accept
252    /// incoming client connections and run.
253    pub async fn spawn(
254        self,
255        addr: SocketAddr,
256    ) -> Result<Box<dyn Server + Unpin + Send>, Box<dyn std::error::Error>>
257    {
258        let storage = Disk::new(self.path).map_err(Error::from).await?;
259
260        match self.key {
261            Some(key) => {
262                let storage = Verify::new(Encrypted::new(key, storage));
263                tracing::info!(
264                    "Local disk storage initialized (with encryption)."
265                );
266                Ok(Box::new(spawn_server(storage, &addr)))
267            }
268            None => {
269                let storage = Verify::new(storage);
270                tracing::info!(
271                    "Local disk storage initialized (without encryption)."
272                );
273                Ok(Box::new(spawn_server(storage, &addr)))
274            }
275        }
276    }
277
278    /// Spawns the server and runs it to completion. This will run forever
279    /// unless there is an error or the server shuts down gracefully.
280    pub async fn run(
281        self,
282        addr: SocketAddr,
283    ) -> Result<(), Box<dyn std::error::Error>> {
284        let server = self.spawn(addr).await?;
285
286        tracing::info!("Listening on {}", server.addr());
287
288        server.await?;
289        Ok(())
290    }
291}
292
293fn spawn_server<S>(storage: S, addr: &SocketAddr) -> impl Server
294where
295    S: Storage + Send + Sync + 'static,
296    S::Error: Into<Error>,
297    Error: From<S::Error>,
298{
299    let storage = Arc::new(storage);
300
301    let new_service = make_service_fn(move |socket: &AddrStream| {
302        // Create our app.
303        let service = App::new(storage.clone());
304
305        // Add logging middleware
306        future::ok::<_, Infallible>(Logger::new(socket.remote_addr(), service))
307    });
308
309    hyper::Server::bind(addr).serve(new_service)
310}