1use std::io;
5use std::os::unix::fs::DirBuilderExt;
6
7use drawbridge_type::Meta;
8
9use anyhow::Context;
10use axum::http::StatusCode;
11use axum::response::{IntoResponse, Response};
12use camino::{Utf8Path, Utf8PathBuf};
13use cap_async_std::fs_utf8::{Dir, DirBuilder, ReadDir};
14use drawbridge_type::digest::ContentDigest;
15use futures::future::TryFutureExt;
16use futures::io::copy;
17use futures::try_join;
18use futures::{AsyncRead, AsyncWrite};
19use serde::{Deserialize, Serialize};
20use tracing::{debug, trace};
21
22const STORAGE_FAILURE_RESPONSE: (StatusCode, &str) =
23 (StatusCode::INTERNAL_SERVER_ERROR, "Storage backend failure");
24
25#[derive(Debug)]
26pub enum CreateError<E> {
27 Occupied,
28 LengthMismatch { expected: u64, got: u64 },
29 DigestMismatch,
30 Internal(E),
31}
32
33impl<E> IntoResponse for CreateError<E> {
34 fn into_response(self) -> Response {
35 match self {
36 CreateError::Occupied => (StatusCode::CONFLICT, "Already exists").into_response(),
37 CreateError::DigestMismatch => {
38 (StatusCode::BAD_REQUEST, "Content digest mismatch").into_response()
39 }
40 CreateError::LengthMismatch { expected, got } => (
41 StatusCode::BAD_REQUEST,
42 format!("Content length mismatch, expected: {expected}, got {got}"),
43 )
44 .into_response(),
45 CreateError::Internal(_) => STORAGE_FAILURE_RESPONSE.into_response(),
46 }
47 }
48}
49
50#[derive(Clone, Debug, PartialEq, Eq)]
51pub enum GetError<E> {
52 NotFound,
53 Internal(E),
54}
55
56impl<E> IntoResponse for GetError<E> {
57 fn into_response(self) -> Response {
58 match self {
59 GetError::NotFound => (StatusCode::NOT_FOUND, "Not found"),
60 GetError::Internal(_) => STORAGE_FAILURE_RESPONSE,
61 }
62 .into_response()
63 }
64}
65
66#[derive(Debug)]
67pub enum GetToWriterError<E> {
68 IO(io::Error),
69 Get(GetError<E>),
70}
71
72impl<E> IntoResponse for GetToWriterError<E> {
73 fn into_response(self) -> Response {
74 match self {
75 GetToWriterError::Get(GetError::NotFound) => {
76 (StatusCode::NOT_FOUND, "Repository does not exist")
77 }
78 GetToWriterError::Get(GetError::Internal(_)) => {
79 (StatusCode::INTERNAL_SERVER_ERROR, "Storage backend failure")
80 }
81 GetToWriterError::IO(_) => (StatusCode::INTERNAL_SERVER_ERROR, "I/O error"),
82 }
83 .into_response()
84 }
85}
86
87#[derive(Copy, Clone, Debug)]
88pub struct Entity<'a, P> {
89 root: &'a Dir,
90 prefix: P,
91}
92
93async fn create_verified(
94 dir: &Dir,
95 path: impl AsRef<Utf8Path>,
96 hash: ContentDigest,
97 size: u64,
98 rdr: impl Unpin + AsyncRead,
99) -> Result<(), CreateError<anyhow::Error>> {
100 let mut file = dir.create(path).await.map_err(|e| match e.kind() {
101 io::ErrorKind::AlreadyExists => CreateError::Occupied,
102 _ => CreateError::Internal(anyhow::Error::new(e).context("failed to create file")),
103 })?;
104 match copy(hash.verifier(rdr), &mut file).await {
105 Err(e) if e.kind() == io::ErrorKind::InvalidData => Err(CreateError::DigestMismatch),
106 Err(e) => Err(CreateError::Internal(
107 anyhow::Error::new(e).context("failed to write file"),
108 )),
109 Ok(n) if n != size => Err(CreateError::LengthMismatch {
110 expected: size,
111 got: n,
112 }),
113 Ok(_) => Ok(()),
114 }
115}
116
117impl<'a> Entity<'a, &'static str> {
118 pub fn new(root: &'a Dir) -> Self {
119 Self { root, prefix: "" }
120 }
121}
122
123impl<'a, P: AsRef<Utf8Path>> Entity<'a, P> {
124 pub fn child(&self, path: impl AsRef<Utf8Path>) -> Entity<'a, Utf8PathBuf> {
126 Entity {
127 root: self.root,
128 prefix: self.path(path),
129 }
130 }
131
132 fn path(&self, path: impl AsRef<Utf8Path>) -> Utf8PathBuf {
133 self.prefix.as_ref().join(path)
134 }
135
136 fn meta_path(&self) -> Utf8PathBuf {
137 self.path("meta.json")
138 }
139
140 fn content_path(&self) -> Utf8PathBuf {
141 self.path("content")
142 }
143
144 pub(super) async fn create_from_reader(
145 &self,
146 meta: Meta,
147 rdr: impl Unpin + AsyncRead,
148 ) -> Result<(), CreateError<anyhow::Error>> {
149 trace!(target: "app::store::Entity::create_from_reader", "create entity at `{}`", self.prefix.as_ref());
150 let meta_json = serde_json::to_vec(&meta)
151 .context("failed to encode metadata")
152 .map_err(CreateError::Internal)?;
153 try_join!(
154 self.root
155 .write(self.meta_path(), meta_json)
156 .map_err(|e| match e.kind() {
157 io::ErrorKind::AlreadyExists => CreateError::Occupied,
158 _ => CreateError::Internal(
159 anyhow::Error::new(e).context("failed to write metadata"),
160 ),
161 })
162 .map_err(|e| {
163 debug!(target: "app::store::Entity::create_from_reader", "failed to create meta file `{:?}`", e);
164 e
165 }),
166 create_verified(self.root, self.content_path(), meta.hash, meta.size, rdr).map_err(|e| {
167 debug!(target: "app::store::Entity::create_from_reader", "failed to create content file `{:?}`", e);
168 e
169 })
170 )?;
171 Ok(())
172 }
173
174 pub(super) async fn create_json(
175 &self,
176 meta: Meta,
177 val: &impl Serialize,
178 ) -> Result<(), CreateError<anyhow::Error>> {
179 let buf = serde_json::to_vec(val)
180 .context("failed to encode value to JSON")
181 .map_err(CreateError::Internal)?;
182 self.create_from_reader(meta, buf.as_slice()).await
183 }
184
185 pub(super) async fn create_dir(
186 &self,
187 path: impl AsRef<Utf8Path>,
188 ) -> Result<(), CreateError<anyhow::Error>> {
189 let path = self.path(path);
190 debug_assert_ne!(path, self.meta_path());
191 debug_assert_ne!(path, self.content_path());
192
193 trace!(target: "app::store::Entity::create_dir", "create directory at `{path}`");
194 self.root
195 .create_dir_with(path, DirBuilder::new().mode(0o700))
196 .map_err(|e| match e.kind() {
197 io::ErrorKind::AlreadyExists => CreateError::Occupied,
198 _ => CreateError::Internal(
199 anyhow::Error::new(e).context("failed to create directory"),
200 ),
201 })
202 .map_err(|e| {
203 debug!(target: "app::store::Entity::create_dir", "failed to create directory: `{:?}`", e);
204 e
205 })
206 }
207
208 pub(super) async fn read_dir(
209 &self,
210 path: impl AsRef<Utf8Path>,
211 ) -> Result<ReadDir, GetError<anyhow::Error>> {
212 self.root
213 .read_dir(self.path(path))
214 .await
215 .map_err(|e| match e.kind() {
216 io::ErrorKind::NotFound => GetError::NotFound,
217 _ => GetError::Internal(anyhow::Error::new(e).context("failed to read directory")),
218 })
219 }
220
221 pub async fn get_meta(&self) -> Result<Meta, GetError<anyhow::Error>> {
223 let buf = self
224 .root
225 .read(self.meta_path())
226 .await
227 .map_err(|e| match e.kind() {
228 io::ErrorKind::NotFound => GetError::NotFound,
229 _ => GetError::Internal(anyhow::Error::new(e).context("failed to read metadata")),
230 })?;
231 serde_json::from_slice(&buf)
232 .context("failed to decode metadata")
233 .map_err(GetError::Internal)
234 }
235
236 pub async fn get_content(&self) -> Result<impl '_ + AsyncRead, GetError<anyhow::Error>> {
238 self.root
239 .open(self.content_path())
240 .map_err(|e| match e.kind() {
241 io::ErrorKind::NotFound => GetError::NotFound,
242 _ => {
243 GetError::Internal(anyhow::Error::new(e).context("failed to open content file"))
244 }
245 })
246 .await
247 }
248
249 pub async fn read_content(&self) -> Result<Vec<u8>, GetError<anyhow::Error>> {
251 self.root
252 .read(self.content_path())
253 .map_err(|e| match e.kind() {
254 io::ErrorKind::NotFound => GetError::NotFound,
255 _ => {
256 GetError::Internal(anyhow::Error::new(e).context("failed to read content file"))
257 }
258 })
259 .await
260 }
261
262 #[allow(single_use_lifetimes)]
264 pub async fn get_content_json<T>(&self) -> Result<T, GetError<anyhow::Error>>
265 where
266 for<'de> T: Deserialize<'de>,
267 {
268 let buf = self.read_content().await?;
269 serde_json::from_slice(&buf)
270 .context("failed to decode content as JSON")
271 .map_err(GetError::Internal)
272 }
273
274 pub async fn get(&self) -> Result<(Meta, impl '_ + AsyncRead), GetError<anyhow::Error>> {
276 try_join!(self.get_meta(), self.get_content())
277 }
278
279 pub async fn get_to_writer(
281 &self,
282 dst: &mut (impl Unpin + AsyncWrite),
283 ) -> Result<Meta, GetToWriterError<anyhow::Error>> {
284 let (meta, rdr) = self.get().await.map_err(GetToWriterError::Get)?;
285 _ = copy(rdr, dst).await.map_err(GetToWriterError::IO)?;
286 Ok(meta)
288 }
289}