Skip to main content

xos_storage/
ipfs.rs

1//! IPFS HTTP API client.
2//!
3//! Talks to an IPFS node (local or remote gateway) via its HTTP API.
4//! No heavy libp2p dependency — just plain reqwest.
5
6use crate::{Result, StorageError};
7use serde::Deserialize;
8use tracing::info;
9
10/// Response from `POST /api/v0/add`.
11#[derive(Debug, Deserialize)]
12#[serde(rename_all = "PascalCase")]
13#[allow(dead_code)]
14struct AddResponse {
15    hash: String,
16    size: String,
17}
18
19/// A single link returned by `POST /api/v0/ls`.
20#[derive(Debug, Deserialize)]
21#[serde(rename_all = "PascalCase")]
22struct LsLink {
23    hash: String,
24    name: String,
25    size: u64,
26}
27
28/// Object wrapper returned by `POST /api/v0/ls`.
29#[derive(Debug, Deserialize)]
30#[serde(rename_all = "PascalCase")]
31struct LsObject {
32    links: Vec<LsLink>,
33}
34
35/// Top-level response from `POST /api/v0/ls`.
36#[derive(Debug, Deserialize)]
37#[serde(rename_all = "PascalCase")]
38struct LsResponse {
39    objects: Vec<LsObject>,
40}
41
42/// IPFS file entry returned by [`IpfsClient::ls`].
43#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
44pub struct IpfsEntry {
45    pub cid: String,
46    pub name: String,
47    pub size: u64,
48}
49
50/// Client for the IPFS HTTP API.
51pub struct IpfsClient {
52    api_url: String,
53    gateway_url: String,
54    client: reqwest::Client,
55    timeout: std::time::Duration,
56}
57
58impl IpfsClient {
59    /// Connect to a local IPFS node at `http://127.0.0.1:5001`.
60    pub fn new() -> Self {
61        Self::with_api_url("http://127.0.0.1:5001")
62    }
63
64    /// Connect to a custom IPFS API endpoint.
65    pub fn with_api_url(api_url: &str) -> Self {
66        Self {
67            api_url: api_url.trim_end_matches('/').to_string(),
68            gateway_url: "https://ipfs.io/ipfs".to_string(),
69            client: reqwest::Client::new(),
70            timeout: std::time::Duration::from_secs(30),
71        }
72    }
73
74    /// Set a custom public gateway URL for read-only fetches.
75    pub fn with_gateway(mut self, gateway_url: &str) -> Self {
76        self.gateway_url = gateway_url.trim_end_matches('/').to_string();
77        self
78    }
79
80    /// Set request timeout.
81    pub fn with_timeout(mut self, timeout: std::time::Duration) -> Self {
82        self.timeout = timeout;
83        self
84    }
85
86    /// Return the configured API URL.
87    pub fn api_url(&self) -> &str {
88        &self.api_url
89    }
90
91    /// Return the configured gateway URL.
92    pub fn gateway_url(&self) -> &str {
93        &self.gateway_url
94    }
95
96    /// Upload data to IPFS. Returns the CID.
97    pub async fn add(&self, data: &[u8]) -> Result<String> {
98        info!("Uploading {} bytes to IPFS", data.len());
99        let url = format!("{}/api/v0/add", self.api_url);
100
101        let part = reqwest::multipart::Part::bytes(data.to_vec()).file_name("data");
102        let form = reqwest::multipart::Form::new().part("file", part);
103
104        let resp: AddResponse = self
105            .client
106            .post(&url)
107            .multipart(form)
108            .timeout(self.timeout)
109            .send()
110            .await
111            .map_err(|e| StorageError::Ipfs(format!("add request failed: {e}")))?
112            .json()
113            .await
114            .map_err(|e| StorageError::Ipfs(format!("add response parse failed: {e}")))?;
115
116        info!("Uploaded to IPFS: CID={}", resp.hash);
117        Ok(resp.hash)
118    }
119
120    /// Download data from IPFS by CID (tries local node first, then gateway).
121    pub async fn cat(&self, cid: &str) -> Result<Vec<u8>> {
122        info!("Fetching CID {cid} from IPFS");
123
124        // Try local API first
125        let url = format!("{}/api/v0/cat?arg={cid}", self.api_url);
126        match self
127            .client
128            .post(&url)
129            .timeout(self.timeout)
130            .send()
131            .await
132        {
133            Ok(resp) if resp.status().is_success() => {
134                let bytes = resp
135                    .bytes()
136                    .await
137                    .map_err(|e| StorageError::Ipfs(format!("read body failed: {e}")))?;
138                return Ok(bytes.to_vec());
139            }
140            _ => {}
141        }
142
143        // Fallback to public gateway
144        let gw_url = format!("{}/{cid}", self.gateway_url);
145        let bytes = self
146            .client
147            .get(&gw_url)
148            .timeout(self.timeout)
149            .send()
150            .await
151            .map_err(|e| StorageError::Ipfs(format!("gateway fetch failed: {e}")))?
152            .bytes()
153            .await
154            .map_err(|e| StorageError::Ipfs(format!("gateway read failed: {e}")))?;
155
156        Ok(bytes.to_vec())
157    }
158
159    /// Pin a CID so the local node keeps it.
160    pub async fn pin_add(&self, cid: &str) -> Result<()> {
161        let url = format!("{}/api/v0/pin/add?arg={cid}", self.api_url);
162        self.client
163            .post(&url)
164            .timeout(self.timeout)
165            .send()
166            .await
167            .map_err(|e| StorageError::Ipfs(format!("pin add failed: {e}")))?;
168        info!("Pinned CID {cid}");
169        Ok(())
170    }
171
172    /// Unpin a CID.
173    pub async fn pin_rm(&self, cid: &str) -> Result<()> {
174        let url = format!("{}/api/v0/pin/rm?arg={cid}", self.api_url);
175        self.client
176            .post(&url)
177            .timeout(self.timeout)
178            .send()
179            .await
180            .map_err(|e| StorageError::Ipfs(format!("pin rm failed: {e}")))?;
181        info!("Unpinned CID {cid}");
182        Ok(())
183    }
184
185    /// List directory contents of a CID.
186    pub async fn ls(&self, cid: &str) -> Result<Vec<IpfsEntry>> {
187        let url = format!("{}/api/v0/ls?arg={cid}", self.api_url);
188        let resp: LsResponse = self
189            .client
190            .post(&url)
191            .timeout(self.timeout)
192            .send()
193            .await
194            .map_err(|e| StorageError::Ipfs(format!("ls failed: {e}")))?
195            .json()
196            .await
197            .map_err(|e| StorageError::Ipfs(format!("ls parse failed: {e}")))?;
198
199        let entries = resp
200            .objects
201            .into_iter()
202            .flat_map(|obj| {
203                obj.links.into_iter().map(|link| IpfsEntry {
204                    cid: link.hash,
205                    name: link.name,
206                    size: link.size,
207                })
208            })
209            .collect();
210
211        Ok(entries)
212    }
213}
214
215impl Default for IpfsClient {
216    fn default() -> Self {
217        Self::new()
218    }
219}
220
221#[cfg(test)]
222mod tests {
223    use super::*;
224
225    #[test]
226    fn client_default_urls() {
227        let client = IpfsClient::new();
228        assert_eq!(client.api_url(), "http://127.0.0.1:5001");
229        assert!(client.gateway_url().contains("ipfs.io"));
230    }
231
232    #[test]
233    fn custom_api_url() {
234        let client = IpfsClient::with_api_url("http://localhost:9001");
235        assert_eq!(client.api_url(), "http://localhost:9001");
236    }
237
238    #[test]
239    fn custom_gateway() {
240        let client = IpfsClient::new().with_gateway("https://gateway.pinata.cloud/ipfs");
241        assert!(client.gateway_url().contains("pinata"));
242    }
243
244    #[test]
245    fn custom_timeout() {
246        let client = IpfsClient::new().with_timeout(std::time::Duration::from_secs(60));
247        assert_eq!(client.timeout, std::time::Duration::from_secs(60));
248    }
249
250    #[test]
251    fn trailing_slash_stripped() {
252        let client = IpfsClient::with_api_url("http://localhost:5001/");
253        assert_eq!(client.api_url(), "http://localhost:5001");
254    }
255}