drawbridge_client/
entity.rs

1// SPDX-FileCopyrightText: 2022 Profian Inc. <opensource@profian.com>
2// SPDX-License-Identifier: Apache-2.0
3
4use super::{scope, Client, Result, Scope};
5
6use std::io::{copy, Read, Write};
7use std::marker::PhantomData;
8use std::str::FromStr;
9
10use drawbridge_type::digest::{Algorithms, ContentDigest};
11use drawbridge_type::Meta;
12
13use anyhow::{anyhow, bail, ensure, Context};
14use http::header::{CONTENT_LENGTH, CONTENT_TYPE};
15use http::StatusCode;
16use mime::Mime;
17use ureq::serde::{Deserialize, Serialize};
18use ureq::{Request, Response};
19
20fn parse_header<T>(req: &Response, name: &str) -> Result<T>
21where
22    T: FromStr,
23    T::Err: 'static + Sync + Send + std::error::Error,
24{
25    req.header(name)
26        .ok_or_else(|| anyhow!("missing `{name}` header"))?
27        .parse()
28        .context(format!("failed to parse `{name}` header"))
29}
30
31#[derive(Clone, Debug)]
32pub struct Entity<'a, C: Scope, E: Scope> {
33    client: &'a Client<C>,
34    path: String,
35    phantom: PhantomData<E>,
36}
37
38fn parse_ureq_error(e: ureq::Error) -> anyhow::Error {
39    match e {
40        ureq::Error::Status(code, msg) => match msg.into_string() {
41            Ok(msg) if !msg.is_empty() => {
42                anyhow!(msg).context(format!("request failed with status code `{code}`"))
43            }
44            _ => anyhow!("request failed with status code `{code}`"),
45        },
46
47        ureq::Error::Transport(e) => anyhow::Error::new(e).context("transport layer failure"),
48    }
49}
50
51impl<'a, C: Scope> Entity<'a, C, C> {
52    pub fn new(client: &'a Client<C>) -> Self {
53        Self {
54            client,
55            path: Default::default(),
56            phantom: PhantomData,
57        }
58    }
59}
60
61impl<'a> Entity<'a, scope::Unknown, scope::Unknown> {
62    /// Changes the scope of the entity.
63    pub fn scope<O: Scope>(self) -> Entity<'a, scope::Unknown, O> {
64        Entity {
65            client: self.client,
66            path: self.path,
67            phantom: PhantomData,
68        }
69    }
70}
71
72impl<'a, C: Scope, E: Scope> Entity<'a, C, E> {
73    /// Returns a child [Entity] rooted at `path`.
74    pub fn child<O: Scope>(&self, path: &str) -> Entity<'a, C, O> {
75        Entity {
76            client: self.client,
77            path: format!("{}/{}", self.path, path),
78            phantom: PhantomData,
79        }
80    }
81
82    pub(super) fn create_request(&self, hash: &ContentDigest, mime: &Mime) -> Result<Request> {
83        let token = self.client.token.as_ref().ok_or_else(|| {
84            anyhow!("endpoint requires authorization, but no token was configured")
85        })?;
86        let url = self.client.url(&self.path)?;
87        Ok(self
88            .client
89            .inner
90            .put(url.as_str())
91            .set("Authorization", &format!("Bearer {token}"))
92            .set("Content-Digest", &hash.to_string())
93            .set(CONTENT_TYPE.as_str(), mime.as_ref()))
94    }
95
96    pub(super) fn create_bytes(&self, mime: &Mime, data: impl AsRef<[u8]>) -> Result<bool> {
97        let data = data.as_ref();
98        let (n, hash) = Algorithms::default()
99            .read_sync(data)
100            .context("failed to compute content digest")?;
101        ensure!(
102            n == data.len() as u64,
103            "invalid amount of bytes read, expected {}, read {n}",
104            data.len(),
105        );
106        let res = self
107            .create_request(&hash, mime)?
108            .send_bytes(data)
109            .map_err(parse_ureq_error)?;
110        match StatusCode::from_u16(res.status()) {
111            Ok(StatusCode::CREATED) => Ok(true),
112            Ok(StatusCode::OK) => Ok(false),
113            _ => bail!("unexpected status code: {}", res.status()),
114        }
115    }
116
117    pub(super) fn create_json(&self, mime: &Mime, val: &impl Serialize) -> Result<bool> {
118        let buf = serde_json::to_vec(val).context("failed to encode value to JSON")?;
119        self.create_bytes(mime, buf)
120    }
121
122    pub(super) fn create_from(
123        &self,
124        Meta { hash, size, mime }: &Meta,
125        rdr: impl Read,
126    ) -> Result<bool> {
127        let res = self
128            .create_request(hash, mime)?
129            .set(CONTENT_LENGTH.as_str(), &size.to_string())
130            .send(rdr)
131            .map_err(parse_ureq_error)?;
132        match StatusCode::from_u16(res.status()) {
133            Ok(StatusCode::CREATED) => Ok(true),
134            Ok(StatusCode::OK) => Ok(false),
135            _ => bail!("unexpected status code: {}", res.status()),
136        }
137    }
138
139    pub fn get(&self, limit: u64) -> Result<(Meta, impl Read)> {
140        let url = self.client.url(&self.path)?;
141        let mut req = self.client.inner.get(url.as_str());
142        if let Some(ref token) = self.client.token {
143            req = req.set("Authorization", &format!("Bearer {token}"))
144        }
145        let res = req
146            .set("Accept-Encoding", "")
147            .call()
148            .map_err(parse_ureq_error)
149            .context("GET request failed")?;
150
151        let hash: ContentDigest = parse_header(&res, "Content-Digest")?;
152        let mime = parse_header(&res, CONTENT_TYPE.as_str())?;
153        let size = parse_header(&res, CONTENT_LENGTH.as_str())?;
154        ensure!(
155            size <= limit,
156            "response size of `{size}` exceeds the limit of `{limit}`"
157        );
158        match StatusCode::from_u16(res.status()) {
159            Ok(StatusCode::OK) => Ok((
160                Meta {
161                    hash: hash.clone(),
162                    size,
163                    mime,
164                },
165                hash.verifier(res.into_reader().take(size)),
166            )),
167            _ => bail!("unexpected status code: {}", res.status()),
168        }
169    }
170
171    pub fn get_to(&self, limit: u64, dst: &mut impl Write) -> Result<Meta> {
172        let (meta @ Meta { size, .. }, mut rdr) = self.get(limit)?;
173        let n = copy(&mut rdr, dst)?;
174        ensure!(
175            n == size,
176            "invalid amount of bytes read, expected {size}, read {n}"
177        );
178        Ok(meta)
179    }
180
181    #[allow(single_use_lifetimes)]
182    pub fn get_json<T>(&self, limit: u64) -> Result<(Meta, T)>
183    where
184        for<'de> T: Deserialize<'de>,
185    {
186        let (meta, rdr) = self.get(limit)?;
187        let v = serde_json::from_reader(rdr).context("failed to decode JSON")?;
188        Ok((meta, v))
189    }
190
191    pub fn get_bytes(&self, limit: u64) -> Result<(Meta, Vec<u8>)> {
192        let (meta @ Meta { size, .. }, rdr) = self.get(limit)?;
193        let mut rdr = rdr.take(limit);
194        let mut buf =
195            Vec::with_capacity(size.try_into().context("failed to convert u64 to usize")?);
196        let n = copy(&mut rdr, &mut buf).context("I/O failure")?;
197        ensure!(
198            n == size,
199            "invalid amount of bytes read, expected {size}, read {n}"
200        );
201        Ok((meta, buf))
202    }
203
204    pub fn get_string(&self, limit: u64) -> Result<(Meta, String)> {
205        let (meta @ Meta { size, .. }, mut rdr) = self.get(limit)?;
206        let size = size.try_into().context("failed to convert u64 to usize")?;
207        let mut s = String::with_capacity(size);
208        let n = rdr.read_to_string(&mut s).context("I/O failure")?;
209        ensure!(
210            n == size,
211            "invalid amount of bytes read, expected {size}, read {n}"
212        );
213        Ok((meta, s))
214    }
215}