noosphere_sphere/
internal.rs

1use super::{BodyChunkDecoder, SphereFile};
2use crate::{AsyncFileBody, HasSphereContext};
3use anyhow::{anyhow, Result};
4use async_trait::async_trait;
5use noosphere_storage::{BlockStore, Storage};
6use std::str::FromStr;
7use tokio_util::io::StreamReader;
8
9use cid::Cid;
10use noosphere_core::{
11    authority::Access,
12    data::{ContentType, Header, Link, MemoIpld},
13    stream::put_block_stream,
14};
15
16/// A module-private trait for internal trait methods; this is a workaround for
17/// the fact that all trait methods are implicitly public implementation
18#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
19#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
20pub(crate) trait SphereContextInternal<S>
21where
22    S: Storage + 'static,
23{
24    /// Returns an error result if the configured author of the [SphereContext]
25    /// does not have write access to it (as a matter of cryptographic
26    /// authorization).
27    async fn assert_write_access(&self) -> Result<()>;
28
29    async fn get_file(
30        &self,
31        sphere_revision: &Cid,
32        memo_link: Link<MemoIpld>,
33    ) -> Result<SphereFile<Box<dyn AsyncFileBody>>>;
34}
35
36#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
37#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
38impl<C, S> SphereContextInternal<S> for C
39where
40    C: HasSphereContext<S>,
41    S: Storage + 'static,
42{
43    async fn assert_write_access(&self) -> Result<()> {
44        let sphere_context = self.sphere_context().await?;
45        match sphere_context.access().await? {
46            Access::ReadOnly => Err(anyhow!(
47                "Cannot mutate sphere; author only has read access to its contents"
48            )),
49            _ => Ok(()),
50        }
51    }
52
53    async fn get_file(
54        &self,
55        sphere_revision: &Cid,
56        memo_link: Link<MemoIpld>,
57    ) -> Result<SphereFile<Box<dyn AsyncFileBody>>> {
58        let db = self.sphere_context().await?.db().clone();
59        let memo = memo_link.load_from(&db).await?;
60
61        // If we have a memo, but not the content it refers to, we should try to
62        // replicate from the gateway
63        if db.get_block(&memo.body).await?.is_none() {
64            let client = self
65                .sphere_context()
66                .await?
67                .client()
68                .await
69                .map_err(|error| {
70                    warn!("Unable to initialize API client for replicating missing content");
71                    error
72                })?;
73
74            // NOTE: This is kind of a hack, since we may be accessing a
75            // "read-only" context. Technically this should be acceptable
76            // because our mutation here is propagating immutable blocks
77            // into the local DB
78            let stream = client.replicate(&memo_link, None).await?;
79
80            put_block_stream(db.clone(), stream).await?;
81        }
82
83        let content_type = match memo.get_first_header(&Header::ContentType) {
84            Some(content_type) => Some(ContentType::from_str(content_type.as_str())?),
85            None => None,
86        };
87
88        let stream = match content_type {
89            // TODO(#86): Content-type aware decoding of body bytes
90            Some(_) => BodyChunkDecoder(&memo.body, &db).stream(),
91            None => return Err(anyhow!("No content type specified")),
92        };
93
94        Ok(SphereFile {
95            sphere_identity: self.sphere_context().await?.identity().clone(),
96            sphere_version: sphere_revision.into(),
97            memo_version: memo_link,
98            memo,
99            // NOTE: we have to box here because traits don't support `impl` types in return values
100            contents: Box::new(StreamReader::new(stream)),
101        })
102    }
103}