noosphere_sphere/
internal.rs1use super::{BodyChunkDecoder, SphereFile};
2use crate::{AsyncFileBody, HasSphereContext};
3use anyhow::{anyhow, Result};
4use async_trait::async_trait;
5use noosphere_storage::{BlockStore, Storage};
6use std::str::FromStr;
7use tokio_util::io::StreamReader;
8
9use cid::Cid;
10use noosphere_core::{
11 authority::Access,
12 data::{ContentType, Header, Link, MemoIpld},
13 stream::put_block_stream,
14};
15
16#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
19#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
20pub(crate) trait SphereContextInternal<S>
21where
22 S: Storage + 'static,
23{
24 async fn assert_write_access(&self) -> Result<()>;
28
29 async fn get_file(
30 &self,
31 sphere_revision: &Cid,
32 memo_link: Link<MemoIpld>,
33 ) -> Result<SphereFile<Box<dyn AsyncFileBody>>>;
34}
35
36#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
37#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
38impl<C, S> SphereContextInternal<S> for C
39where
40 C: HasSphereContext<S>,
41 S: Storage + 'static,
42{
43 async fn assert_write_access(&self) -> Result<()> {
44 let sphere_context = self.sphere_context().await?;
45 match sphere_context.access().await? {
46 Access::ReadOnly => Err(anyhow!(
47 "Cannot mutate sphere; author only has read access to its contents"
48 )),
49 _ => Ok(()),
50 }
51 }
52
53 async fn get_file(
54 &self,
55 sphere_revision: &Cid,
56 memo_link: Link<MemoIpld>,
57 ) -> Result<SphereFile<Box<dyn AsyncFileBody>>> {
58 let db = self.sphere_context().await?.db().clone();
59 let memo = memo_link.load_from(&db).await?;
60
61 if db.get_block(&memo.body).await?.is_none() {
64 let client = self
65 .sphere_context()
66 .await?
67 .client()
68 .await
69 .map_err(|error| {
70 warn!("Unable to initialize API client for replicating missing content");
71 error
72 })?;
73
74 let stream = client.replicate(&memo_link, None).await?;
79
80 put_block_stream(db.clone(), stream).await?;
81 }
82
83 let content_type = match memo.get_first_header(&Header::ContentType) {
84 Some(content_type) => Some(ContentType::from_str(content_type.as_str())?),
85 None => None,
86 };
87
88 let stream = match content_type {
89 Some(_) => BodyChunkDecoder(&memo.body, &db).stream(),
91 None => return Err(anyhow!("No content type specified")),
92 };
93
94 Ok(SphereFile {
95 sphere_identity: self.sphere_context().await?.identity().clone(),
96 sphere_version: sphere_revision.into(),
97 memo_version: memo_link,
98 memo,
99 contents: Box::new(StreamReader::new(stream)),
101 })
102 }
103}