drawbridge_server/store/
entity.rs

1// SPDX-FileCopyrightText: 2022 Profian Inc. <opensource@profian.com>
2// SPDX-License-Identifier: Apache-2.0
3
4use std::io;
5use std::os::unix::fs::DirBuilderExt;
6
7use drawbridge_type::Meta;
8
9use anyhow::Context;
10use axum::http::StatusCode;
11use axum::response::{IntoResponse, Response};
12use camino::{Utf8Path, Utf8PathBuf};
13use cap_async_std::fs_utf8::{Dir, DirBuilder, ReadDir};
14use drawbridge_type::digest::ContentDigest;
15use futures::future::TryFutureExt;
16use futures::io::copy;
17use futures::try_join;
18use futures::{AsyncRead, AsyncWrite};
19use serde::{Deserialize, Serialize};
20use tracing::{debug, trace};
21
22const STORAGE_FAILURE_RESPONSE: (StatusCode, &str) =
23    (StatusCode::INTERNAL_SERVER_ERROR, "Storage backend failure");
24
25#[derive(Debug)]
26pub enum CreateError<E> {
27    Occupied,
28    LengthMismatch { expected: u64, got: u64 },
29    DigestMismatch,
30    Internal(E),
31}
32
33impl<E> IntoResponse for CreateError<E> {
34    fn into_response(self) -> Response {
35        match self {
36            CreateError::Occupied => (StatusCode::CONFLICT, "Already exists").into_response(),
37            CreateError::DigestMismatch => {
38                (StatusCode::BAD_REQUEST, "Content digest mismatch").into_response()
39            }
40            CreateError::LengthMismatch { expected, got } => (
41                StatusCode::BAD_REQUEST,
42                format!("Content length mismatch, expected: {expected}, got {got}"),
43            )
44                .into_response(),
45            CreateError::Internal(_) => STORAGE_FAILURE_RESPONSE.into_response(),
46        }
47    }
48}
49
50#[derive(Clone, Debug, PartialEq, Eq)]
51pub enum GetError<E> {
52    NotFound,
53    Internal(E),
54}
55
56impl<E> IntoResponse for GetError<E> {
57    fn into_response(self) -> Response {
58        match self {
59            GetError::NotFound => (StatusCode::NOT_FOUND, "Not found"),
60            GetError::Internal(_) => STORAGE_FAILURE_RESPONSE,
61        }
62        .into_response()
63    }
64}
65
66#[derive(Debug)]
67pub enum GetToWriterError<E> {
68    IO(io::Error),
69    Get(GetError<E>),
70}
71
72impl<E> IntoResponse for GetToWriterError<E> {
73    fn into_response(self) -> Response {
74        match self {
75            GetToWriterError::Get(GetError::NotFound) => {
76                (StatusCode::NOT_FOUND, "Repository does not exist")
77            }
78            GetToWriterError::Get(GetError::Internal(_)) => {
79                (StatusCode::INTERNAL_SERVER_ERROR, "Storage backend failure")
80            }
81            GetToWriterError::IO(_) => (StatusCode::INTERNAL_SERVER_ERROR, "I/O error"),
82        }
83        .into_response()
84    }
85}
86
87#[derive(Copy, Clone, Debug)]
88pub struct Entity<'a, P> {
89    root: &'a Dir,
90    prefix: P,
91}
92
93async fn create_verified(
94    dir: &Dir,
95    path: impl AsRef<Utf8Path>,
96    hash: ContentDigest,
97    size: u64,
98    rdr: impl Unpin + AsyncRead,
99) -> Result<(), CreateError<anyhow::Error>> {
100    let mut file = dir.create(path).await.map_err(|e| match e.kind() {
101        io::ErrorKind::AlreadyExists => CreateError::Occupied,
102        _ => CreateError::Internal(anyhow::Error::new(e).context("failed to create file")),
103    })?;
104    match copy(hash.verifier(rdr), &mut file).await {
105        Err(e) if e.kind() == io::ErrorKind::InvalidData => Err(CreateError::DigestMismatch),
106        Err(e) => Err(CreateError::Internal(
107            anyhow::Error::new(e).context("failed to write file"),
108        )),
109        Ok(n) if n != size => Err(CreateError::LengthMismatch {
110            expected: size,
111            got: n,
112        }),
113        Ok(_) => Ok(()),
114    }
115}
116
117impl<'a> Entity<'a, &'static str> {
118    pub fn new(root: &'a Dir) -> Self {
119        Self { root, prefix: "" }
120    }
121}
122
123impl<'a, P: AsRef<Utf8Path>> Entity<'a, P> {
124    /// Returns a child [Entity] rooted at `path`.
125    pub fn child(&self, path: impl AsRef<Utf8Path>) -> Entity<'a, Utf8PathBuf> {
126        Entity {
127            root: self.root,
128            prefix: self.path(path),
129        }
130    }
131
132    fn path(&self, path: impl AsRef<Utf8Path>) -> Utf8PathBuf {
133        self.prefix.as_ref().join(path)
134    }
135
136    fn meta_path(&self) -> Utf8PathBuf {
137        self.path("meta.json")
138    }
139
140    fn content_path(&self) -> Utf8PathBuf {
141        self.path("content")
142    }
143
144    pub(super) async fn create_from_reader(
145        &self,
146        meta: Meta,
147        rdr: impl Unpin + AsyncRead,
148    ) -> Result<(), CreateError<anyhow::Error>> {
149        trace!(target: "app::store::Entity::create_from_reader", "create entity at `{}`", self.prefix.as_ref());
150        let meta_json = serde_json::to_vec(&meta)
151            .context("failed to encode metadata")
152            .map_err(CreateError::Internal)?;
153        try_join!(
154            self.root
155                .write(self.meta_path(), meta_json)
156                .map_err(|e| match e.kind() {
157                    io::ErrorKind::AlreadyExists => CreateError::Occupied,
158                    _ => CreateError::Internal(
159                        anyhow::Error::new(e).context("failed to write metadata"),
160                    ),
161                })
162                .map_err(|e| {
163                    debug!(target: "app::store::Entity::create_from_reader", "failed to create meta file `{:?}`", e);
164                    e
165                }),
166            create_verified(self.root, self.content_path(), meta.hash, meta.size, rdr).map_err(|e| {
167                debug!(target: "app::store::Entity::create_from_reader", "failed to create content file `{:?}`", e);
168                e
169            })
170        )?;
171        Ok(())
172    }
173
174    pub(super) async fn create_json(
175        &self,
176        meta: Meta,
177        val: &impl Serialize,
178    ) -> Result<(), CreateError<anyhow::Error>> {
179        let buf = serde_json::to_vec(val)
180            .context("failed to encode value to JSON")
181            .map_err(CreateError::Internal)?;
182        self.create_from_reader(meta, buf.as_slice()).await
183    }
184
185    pub(super) async fn create_dir(
186        &self,
187        path: impl AsRef<Utf8Path>,
188    ) -> Result<(), CreateError<anyhow::Error>> {
189        let path = self.path(path);
190        debug_assert_ne!(path, self.meta_path());
191        debug_assert_ne!(path, self.content_path());
192
193        trace!(target: "app::store::Entity::create_dir", "create directory at `{path}`");
194        self.root
195            .create_dir_with(path, DirBuilder::new().mode(0o700))
196            .map_err(|e| match e.kind() {
197                io::ErrorKind::AlreadyExists => CreateError::Occupied,
198                _ => CreateError::Internal(
199                    anyhow::Error::new(e).context("failed to create directory"),
200                ),
201            })
202            .map_err(|e| {
203                debug!(target: "app::store::Entity::create_dir", "failed to create directory: `{:?}`", e);
204                e
205            })
206    }
207
208    pub(super) async fn read_dir(
209        &self,
210        path: impl AsRef<Utf8Path>,
211    ) -> Result<ReadDir, GetError<anyhow::Error>> {
212        self.root
213            .read_dir(self.path(path))
214            .await
215            .map_err(|e| match e.kind() {
216                io::ErrorKind::NotFound => GetError::NotFound,
217                _ => GetError::Internal(anyhow::Error::new(e).context("failed to read directory")),
218            })
219    }
220
221    /// Returns metadata of the entity.
222    pub async fn get_meta(&self) -> Result<Meta, GetError<anyhow::Error>> {
223        let buf = self
224            .root
225            .read(self.meta_path())
226            .await
227            .map_err(|e| match e.kind() {
228                io::ErrorKind::NotFound => GetError::NotFound,
229                _ => GetError::Internal(anyhow::Error::new(e).context("failed to read metadata")),
230            })?;
231        serde_json::from_slice(&buf)
232            .context("failed to decode metadata")
233            .map_err(GetError::Internal)
234    }
235
236    /// Returns contents of the entity as [AsyncRead].
237    pub async fn get_content(&self) -> Result<impl '_ + AsyncRead, GetError<anyhow::Error>> {
238        self.root
239            .open(self.content_path())
240            .map_err(|e| match e.kind() {
241                io::ErrorKind::NotFound => GetError::NotFound,
242                _ => {
243                    GetError::Internal(anyhow::Error::new(e).context("failed to open content file"))
244                }
245            })
246            .await
247    }
248
249    /// Reads contents of the entity.
250    pub async fn read_content(&self) -> Result<Vec<u8>, GetError<anyhow::Error>> {
251        self.root
252            .read(self.content_path())
253            .map_err(|e| match e.kind() {
254                io::ErrorKind::NotFound => GetError::NotFound,
255                _ => {
256                    GetError::Internal(anyhow::Error::new(e).context("failed to read content file"))
257                }
258            })
259            .await
260    }
261
262    /// Returns the contents of the entity as JSON.
263    #[allow(single_use_lifetimes)]
264    pub async fn get_content_json<T>(&self) -> Result<T, GetError<anyhow::Error>>
265    where
266        for<'de> T: Deserialize<'de>,
267    {
268        let buf = self.read_content().await?;
269        serde_json::from_slice(&buf)
270            .context("failed to decode content as JSON")
271            .map_err(GetError::Internal)
272    }
273
274    /// Returns metadata of the entity and a reader of its contents.
275    pub async fn get(&self) -> Result<(Meta, impl '_ + AsyncRead), GetError<anyhow::Error>> {
276        try_join!(self.get_meta(), self.get_content())
277    }
278
279    /// Returns metadata of the entity and writes its contents into `dst`.
280    pub async fn get_to_writer(
281        &self,
282        dst: &mut (impl Unpin + AsyncWrite),
283    ) -> Result<Meta, GetToWriterError<anyhow::Error>> {
284        let (meta, rdr) = self.get().await.map_err(GetToWriterError::Get)?;
285        _ = copy(rdr, dst).await.map_err(GetToWriterError::IO)?;
286        // TODO: Validate size
287        Ok(meta)
288    }
289}