switchboard_common/
ipfs.rs1use futures::stream::TryStreamExt;
2use ipfs_api::TryFromUri;
3use ipfs_api::{IpfsApi, IpfsClient};
4use serde::{Deserialize, Serialize};
5use serde_json;
6use std::default::Default;
7use std::io::Cursor;
8use std::result::Result;
9use crate::SbError;
10use tokio::runtime::Handle;
11use tokio::sync::mpsc;
12use tokio;
13
14pub struct IpfsManager {
15 client: IpfsClient,
16}
17
18impl IpfsManager {
19
20 pub fn from_env() -> Result<Self, SbError> {
21 let ipfs_url = std::env::var("IPFS_URL")
22 .or(Err(SbError::EnvVariableMissing("IPFS_URL".to_string())))?;
23 let ipfs_username = std::env::var("IPFS_USERNAME").or(Err(SbError::EnvVariableMissing(
24 "IPFS_USERNAME".to_string(),
25 )))?;
26 let ipfs_password = std::env::var("IPFS_PASSWORD").or(Err(SbError::EnvVariableMissing(
27 "IPFS_PASSWORD".to_string(),
28 )))?;
29 Ok(Self::new(&ipfs_url, &ipfs_username, &ipfs_password))
30 }
31
32 pub fn new(ipfs_url: &str, ipfs_key: &str, ipfs_secret: &str) -> Self {
33 Self {
34 client: IpfsClient::from_str(ipfs_url)
35 .unwrap()
36 .with_credentials(ipfs_key, ipfs_secret),
37 }
38 }
39
40 pub async fn get_bytes(&self, cid: String) -> Result<Vec<u8>, SbError> {
41 let handle = Handle::current();
42
43 let client = self.client.clone();
44 let res: Vec<u8> = handle
45 .spawn_blocking(move || {
46 let handle = Handle::current();
47 handle.block_on(client.cat(&cid).map_ok(|chunk| chunk.to_vec()).try_concat())
48 })
49 .await
50 .map_err(|join_error| {
51 println!("IPFS get_object JoinError: {:?}", join_error);
52 SbError::IpfsNetworkError
53 })?
54 .map_err(|e| {
55 println!("IPFS get_object Error: {:?}", e);
56 SbError::IpfsNetworkError
57 })?;
58
59 Ok(res)
60 }
61
62 pub async fn get_object<T>(&self, cid: String) -> Result<T, SbError>
63 where
64 T: for<'a> Deserialize<'a> + Default,
65 {
66 let bytes: Vec<u8> = self.get_bytes(cid).await?;
67
68 match serde_json::from_slice(&bytes) {
69 Ok(res) => Ok(res),
70 Err(e) => {
71 println!("IPFS serde_json Error: {:#?}", e);
72 Err(SbError::IpfsParseError)
73 }
74 }
75 }
76
77 pub async fn set_object<T>(&self, my_object: T) -> Result<String, SbError>
78 where
79 T: Serialize + Default,
80 {
81 let content = serde_json::to_string(&my_object).map_err(|_| SbError::IpfsParseError)?;
82 let content = content.as_bytes().to_vec();
83 let cursor = Cursor::new(content);
84
85 let client = self.client.clone();
86 let handle = Handle::current();
87 let (tx, mut rx) = mpsc::channel(1);
88 let _res = handle
89 .spawn_blocking(move || {
90 let handle = Handle::current();
91 handle.block_on(async move {
92 let add_result = client.add(cursor).await.unwrap();
93 tx.send(add_result.hash).await.unwrap();
94 });
95 })
96 .await
97 .map_err(|e| {
98 println!("IPFS set_object Error: {:?}", e);
99 SbError::IpfsNetworkError
100 })?;
101
102 let cid = rx.recv().await.unwrap();
103 Ok(cid)
104 }
105
106 pub async fn set_object_async<T>(&self, my_object: T) -> Result<String, SbError>
107 where
108 T: Serialize + Default,
109 {
110 let content = serde_json::to_string(&my_object).map_err(|_| SbError::IpfsParseError)?;
111 let content = content.as_bytes().to_vec();
112 let cursor = Cursor::new(content);
113
114 let client = self.client.clone();
115 let handle = Handle::current();
116 let (tx, mut rx) = mpsc::channel(1);
117 let _res = handle
118 .spawn_blocking(move || {
119 let handle = Handle::current();
120 handle.block_on(async move {
121 let add_result = client.add(cursor).await.unwrap();
122 tx.send(add_result.hash).await.unwrap();
123 });
124 })
125 .await
126 .map_err(|e| {
127 println!("IPFS set_object Error: {:?}", e);
128 SbError::IpfsNetworkError
129 })?;
130
131 let cid = rx.recv().await.unwrap();
132 Ok(cid)
133 }
134}