switchboard_common/
ipfs.rs

1use futures::stream::TryStreamExt;
2use ipfs_api::TryFromUri;
3use ipfs_api::{IpfsApi, IpfsClient};
4use serde::{Deserialize, Serialize};
5use serde_json;
6use std::default::Default;
7use std::io::Cursor;
8use std::result::Result;
9use crate::SbError;
10use tokio::runtime::Handle;
11use tokio::sync::mpsc;
12use tokio;
13
14pub struct IpfsManager {
15    client: IpfsClient,
16}
17
18impl IpfsManager {
19
20    pub fn from_env() -> Result<Self, SbError> {
21        let ipfs_url = std::env::var("IPFS_URL")
22            .or(Err(SbError::EnvVariableMissing("IPFS_URL".to_string())))?;
23        let ipfs_username = std::env::var("IPFS_USERNAME").or(Err(SbError::EnvVariableMissing(
24            "IPFS_USERNAME".to_string(),
25        )))?;
26        let ipfs_password = std::env::var("IPFS_PASSWORD").or(Err(SbError::EnvVariableMissing(
27            "IPFS_PASSWORD".to_string(),
28        )))?;
29        Ok(Self::new(&ipfs_url, &ipfs_username, &ipfs_password))
30    }
31
32    pub fn new(ipfs_url: &str, ipfs_key: &str, ipfs_secret: &str) -> Self {
33        Self {
34            client: IpfsClient::from_str(ipfs_url)
35                .unwrap()
36                .with_credentials(ipfs_key, ipfs_secret),
37        }
38    }
39
40    pub async fn get_bytes(&self, cid: String) -> Result<Vec<u8>, SbError> {
41        let handle = Handle::current();
42
43        let client = self.client.clone();
44        let res: Vec<u8> = handle
45            .spawn_blocking(move || {
46                let handle = Handle::current();
47                handle.block_on(client.cat(&cid).map_ok(|chunk| chunk.to_vec()).try_concat())
48            })
49            .await
50            .map_err(|join_error| {
51                println!("IPFS get_object JoinError: {:?}", join_error);
52                SbError::IpfsNetworkError
53            })?
54            .map_err(|e| {
55                println!("IPFS get_object Error: {:?}", e);
56                SbError::IpfsNetworkError
57            })?;
58
59        Ok(res)
60    }
61
62    pub async fn get_object<T>(&self, cid: String) -> Result<T, SbError>
63    where
64        T: for<'a> Deserialize<'a> + Default,
65    {
66        let bytes: Vec<u8> = self.get_bytes(cid).await?;
67
68        match serde_json::from_slice(&bytes) {
69            Ok(res) => Ok(res),
70            Err(e) => {
71                println!("IPFS serde_json Error: {:#?}", e);
72                Err(SbError::IpfsParseError)
73            }
74        }
75    }
76
77    pub async fn set_object<T>(&self, my_object: T) -> Result<String, SbError>
78    where
79        T: Serialize + Default,
80    {
81        let content = serde_json::to_string(&my_object).map_err(|_| SbError::IpfsParseError)?;
82        let content = content.as_bytes().to_vec();
83        let cursor = Cursor::new(content);
84
85        let client = self.client.clone();
86        let handle = Handle::current();
87        let (tx, mut rx) = mpsc::channel(1);
88        let _res = handle
89            .spawn_blocking(move || {
90                let handle = Handle::current();
91                handle.block_on(async move {
92                    let add_result = client.add(cursor).await.unwrap();
93                    tx.send(add_result.hash).await.unwrap();
94                });
95            })
96            .await
97            .map_err(|e| {
98                println!("IPFS set_object Error: {:?}", e);
99                SbError::IpfsNetworkError
100            })?;
101
102        let cid = rx.recv().await.unwrap();
103        Ok(cid)
104    }
105
106    pub async fn set_object_async<T>(&self, my_object: T) -> Result<String, SbError>
107    where
108        T: Serialize + Default,
109    {
110        let content = serde_json::to_string(&my_object).map_err(|_| SbError::IpfsParseError)?;
111        let content = content.as_bytes().to_vec();
112        let cursor = Cursor::new(content);
113
114        let client = self.client.clone();
115        let handle = Handle::current();
116        let (tx, mut rx) = mpsc::channel(1);
117        let _res = handle
118            .spawn_blocking(move || {
119                let handle = Handle::current();
120                handle.block_on(async move {
121                    let add_result = client.add(cursor).await.unwrap();
122                    tx.send(add_result.hash).await.unwrap();
123                });
124            })
125            .await
126            .map_err(|e| {
127                println!("IPFS set_object Error: {:?}", e);
128                SbError::IpfsNetworkError
129            })?;
130
131        let cid = rx.recv().await.unwrap();
132        Ok(cid)
133    }
134}