Skip to main content

ma_core/ipfs/
kubo.rs

1//! Kubo RPC client for IPFS operations.
2//!
3//! HTTP helpers for the Kubo `/api/v0/` endpoints: data add/cat, DAG
4//! put/get, IPNS name publish/resolve, key management, and pinning.
5
6use anyhow::{anyhow, Result};
7use did_ma::{Did, Document};
8use reqwest::multipart;
9use serde::de::DeserializeOwned;
10use serde::{Deserialize, Serialize};
11use std::time::Duration;
12use tokio::time::sleep;
13use tracing::warn;
14
15// ─── Response types ─────────────────────────────────────────────────────────
16
17#[derive(Debug, Deserialize)]
18struct AddResponse {
19    #[serde(rename = "Hash")]
20    hash: String,
21}
22
23#[derive(Debug, Deserialize)]
24struct DagPutCid {
25    #[serde(rename = "/")]
26    slash: String,
27}
28
29#[derive(Debug, Deserialize)]
30struct DagPutResponse {
31    #[serde(default, rename = "Cid")]
32    cid_upper: Option<DagPutCid>,
33    #[serde(default)]
34    cid: Option<DagPutCid>,
35}
36
37#[derive(Debug, Deserialize)]
38struct NamePublishResponse {
39    #[serde(default, rename = "Value")]
40    value_upper: String,
41    #[serde(default, rename = "value")]
42    value_lower: String,
43}
44
45#[derive(Debug, Deserialize)]
46struct NameResolveResponse {
47    #[serde(default, rename = "Path")]
48    path_upper: String,
49    #[serde(default, rename = "path")]
50    path_lower: String,
51}
52
53#[derive(Debug, Deserialize)]
54struct VersionResponse {
55    #[serde(default, rename = "Version")]
56    version_upper: String,
57    #[serde(default, rename = "version")]
58    version_lower: String,
59}
60
61#[derive(Debug, Deserialize)]
62struct KeyListEntry {
63    #[serde(default, rename = "Name")]
64    name: String,
65    #[serde(default, rename = "name")]
66    name_lower: String,
67    #[serde(default, rename = "Id")]
68    id: String,
69    #[serde(default, rename = "id")]
70    id_lower: String,
71}
72
73#[derive(Debug, Deserialize)]
74struct KeyListResponse {
75    #[serde(default, rename = "Keys")]
76    keys: Vec<KeyListEntry>,
77}
78
79#[derive(Debug, Deserialize)]
80struct KeyImportResponse {
81    #[serde(default, rename = "Name")]
82    name_upper: String,
83    #[serde(default, rename = "name")]
84    name_lower: String,
85    #[serde(default, rename = "Id")]
86    id_upper: String,
87    #[serde(default, rename = "id")]
88    id_lower: String,
89}
90
91#[derive(Clone, Debug)]
92pub struct KuboKey {
93    pub name: String,
94    pub id: String,
95}
96
97// ─── Publish options ────────────────────────────────────────────────────────
98
99#[derive(Clone, Debug)]
100pub struct IpnsPublishOptions {
101    pub timeout: Duration,
102    pub allow_offline: bool,
103    pub lifetime: String,
104    pub ttl: Option<String>,
105    pub resolve: bool,
106    pub quieter: bool,
107}
108
109impl Default for IpnsPublishOptions {
110    fn default() -> Self {
111        Self {
112            timeout: Duration::from_secs(15),
113            allow_offline: true,
114            lifetime: "8760h".to_string(),
115            ttl: None,
116            resolve: false,
117            quieter: true,
118        }
119    }
120}
121
122// ─── Readiness ──────────────────────────────────────────────────────────────
123
124pub async fn wait_for_api(kubo_url: &str, attempts: u32) -> Result<()> {
125    if attempts == 0 {
126        return Err(anyhow!("kubo readiness attempts must be >= 1"));
127    }
128
129    let base = kubo_url.trim_end_matches('/');
130    let url = format!("{base}/api/v0/version");
131    let client = reqwest::Client::builder()
132        .timeout(Duration::from_secs(6))
133        .build()?;
134
135    let mut backoff = Duration::from_millis(200);
136    let mut last_err: Option<anyhow::Error> = None;
137
138    for attempt in 1..=attempts {
139        let result = async {
140            let response = client.post(&url).send().await?.error_for_status()?;
141            let body = response.text().await?;
142            let parsed: VersionResponse = serde_json::from_str(&body)
143                .map_err(|e| anyhow!("failed parsing version response: {} body={}", e, body))?;
144            let version = if !parsed.version_upper.is_empty() {
145                parsed.version_upper
146            } else {
147                parsed.version_lower
148            };
149            if version.trim().is_empty() {
150                return Err(anyhow!("missing version field in response: {}", body));
151            }
152            Ok::<(), anyhow::Error>(())
153        }
154        .await;
155
156        match result {
157            Ok(()) => return Ok(()),
158            Err(err) => {
159                warn!("kubo readiness {}/{}: {}", attempt, attempts, err);
160                last_err = Some(err);
161                if attempt < attempts {
162                    sleep(backoff).await;
163                    let doubled = backoff.as_millis().saturating_mul(2);
164                    backoff = Duration::from_millis(std::cmp::min(doubled, 5_000) as u64);
165                }
166            }
167        }
168    }
169    Err(anyhow!(
170        "kubo API not ready after {} attempts: {}",
171        attempts,
172        last_err
173            .map(|e| e.to_string())
174            .unwrap_or_else(|| "unknown error".to_string())
175    ))
176}
177
178// ─── Add / Cat ──────────────────────────────────────────────────────────────
179
180pub async fn ipfs_add(kubo_url: &str, data: Vec<u8>) -> Result<String> {
181    let base = kubo_url.trim_end_matches('/');
182    let url = format!("{base}/api/v0/add");
183
184    let part = multipart::Part::bytes(data).file_name("data");
185    let form = multipart::Form::new().part("file", part);
186
187    let client = reqwest::Client::builder()
188        .timeout(Duration::from_secs(10))
189        .build()?;
190
191    let body = client
192        .post(url)
193        .query(&[("pin", "true")])
194        .multipart(form)
195        .send()
196        .await?
197        .error_for_status()?
198        .text()
199        .await?;
200
201    let parsed: AddResponse = serde_json::from_str(&body)
202        .map_err(|e| anyhow!("failed parsing add response: {} body={}", e, body))?;
203    Ok(parsed.hash)
204}
205
206pub async fn cat_bytes(kubo_url: &str, cid: &str) -> Result<Vec<u8>> {
207    let base = kubo_url.trim_end_matches('/');
208    let url = format!("{base}/api/v0/cat");
209
210    let client = reqwest::Client::builder()
211        .timeout(Duration::from_secs(30))
212        .build()?;
213
214    let bytes = client
215        .post(url)
216        .query(&[("arg", cid)])
217        .send()
218        .await?
219        .error_for_status()?
220        .bytes()
221        .await?;
222
223    Ok(bytes.to_vec())
224}
225
226pub async fn cat_text(kubo_url: &str, cid: &str) -> Result<String> {
227    let bytes = cat_bytes(kubo_url, cid).await?;
228    String::from_utf8(bytes).map_err(|e| anyhow!("non-utf8 content from {}: {}", cid, e))
229}
230
231// ─── DAG ────────────────────────────────────────────────────────────────────
232
233pub async fn dag_put<T: Serialize>(kubo_url: &str, value: &T) -> Result<String> {
234    let base = kubo_url.trim_end_matches('/');
235    let url = format!("{base}/api/v0/dag/put");
236    let payload = serde_json::to_vec(value)?;
237
238    let part = multipart::Part::bytes(payload)
239        .file_name("node.json")
240        .mime_str("application/json")?;
241    let form = multipart::Form::new().part("file", part);
242
243    let client = reqwest::Client::builder()
244        .timeout(Duration::from_secs(10))
245        .build()?;
246
247    let body = client
248        .post(url)
249        .query(&[
250            ("store-codec", "dag-cbor"),
251            ("input-codec", "dag-json"),
252            ("pin", "true"),
253        ])
254        .multipart(form)
255        .send()
256        .await?
257        .error_for_status()?
258        .text()
259        .await?;
260
261    let parsed: DagPutResponse = serde_json::from_str(&body)
262        .map_err(|e| anyhow!("failed parsing dag/put response: {} body={}", e, body))?;
263    parsed
264        .cid_upper
265        .or(parsed.cid)
266        .map(|c| c.slash)
267        .ok_or_else(|| anyhow!("missing CID in dag/put response: {}", body))
268}
269
270pub async fn dag_get<T: DeserializeOwned>(kubo_url: &str, cid: &str) -> Result<T> {
271    let base = kubo_url.trim_end_matches('/');
272    let url = format!("{base}/api/v0/dag/get");
273
274    let client = reqwest::Client::builder()
275        .timeout(Duration::from_secs(10))
276        .build()?;
277
278    let body = client
279        .post(url)
280        .query(&[("arg", cid)])
281        .send()
282        .await?
283        .error_for_status()?
284        .text()
285        .await?;
286
287    serde_json::from_str::<T>(&body).map_err(|e| {
288        anyhow!(
289            "failed parsing dag/get response for {}: {} body={}",
290            cid,
291            e,
292            body
293        )
294    })
295}
296
297// ─── Name publish / resolve ─────────────────────────────────────────────────
298
299fn normalize_ipfs_arg(cid_or_path: &str) -> String {
300    let mut value = cid_or_path.trim().to_string();
301    while let Some(rest) = value.strip_prefix("/ipfs/") {
302        value = rest.to_string();
303    }
304    while let Some(rest) = value.strip_prefix('/') {
305        value = rest.to_string();
306    }
307    format!("/ipfs/{value}")
308}
309
310pub async fn name_publish(kubo_url: &str, key_name: &str, cid: &str) -> Result<String> {
311    let options = IpnsPublishOptions::default();
312    name_publish_with_options(kubo_url, key_name, cid, &options).await
313}
314
315pub async fn name_publish_with_options(
316    kubo_url: &str,
317    key_name: &str,
318    cid: &str,
319    options: &IpnsPublishOptions,
320) -> Result<String> {
321    let base = kubo_url.trim_end_matches('/');
322    let url = format!("{base}/api/v0/name/publish");
323    let arg = normalize_ipfs_arg(cid);
324
325    let client = reqwest::Client::builder()
326        .timeout(options.timeout)
327        .build()?;
328
329    let allow_offline = if options.allow_offline {
330        "true"
331    } else {
332        "false"
333    };
334    let resolve = if options.resolve { "true" } else { "false" };
335    let quieter = if options.quieter { "true" } else { "false" };
336
337    let mut params: Vec<(&str, &str)> = vec![
338        ("arg", arg.as_str()),
339        ("key", key_name),
340        ("allow-offline", allow_offline),
341        ("lifetime", options.lifetime.as_str()),
342        ("resolve", resolve),
343        ("quieter", quieter),
344    ];
345    if let Some(ref ttl) = options.ttl {
346        params.push(("ttl", ttl.as_str()));
347    }
348
349    let body = client
350        .post(url)
351        .query(&params)
352        .send()
353        .await?
354        .error_for_status()?
355        .text()
356        .await?;
357
358    let parsed: NamePublishResponse = serde_json::from_str(&body)
359        .map_err(|e| anyhow!("failed parsing name/publish response: {} body={}", e, body))?;
360    let value = if !parsed.value_upper.is_empty() {
361        parsed.value_upper
362    } else {
363        parsed.value_lower
364    };
365    if value.is_empty() {
366        return Err(anyhow!("missing value in name/publish response: {}", body));
367    }
368    Ok(value)
369}
370
371pub async fn name_publish_with_retry(
372    kubo_url: &str,
373    key_name: &str,
374    cid: &str,
375    options: &IpnsPublishOptions,
376    attempts: u32,
377    initial_backoff: Duration,
378) -> Result<String> {
379    if attempts == 0 {
380        return Err(anyhow!("name publish attempts must be >= 1"));
381    }
382
383    let mut backoff = initial_backoff;
384    let mut last_err: Option<anyhow::Error> = None;
385
386    for attempt in 1..=attempts {
387        match name_publish_with_options(kubo_url, key_name, cid, options).await {
388            Ok(value) => return Ok(value),
389            Err(err) => {
390                if let Ok(value) = verify_name_target_after_error(kubo_url, key_name, cid).await {
391                    warn!(
392                        "name publish attempt {}/{} reported error for key '{}' but resolve confirms target; accepting: {}",
393                        attempt, attempts, key_name, value
394                    );
395                    return Ok(value);
396                }
397                warn!(
398                    "name publish attempt {}/{} failed for key '{}' cid '{}': {}",
399                    attempt, attempts, key_name, cid, err
400                );
401                last_err = Some(err);
402                if attempt < attempts {
403                    sleep(backoff).await;
404                    let doubled = backoff.as_millis().saturating_mul(2);
405                    backoff = Duration::from_millis(std::cmp::min(doubled, 30_000) as u64);
406                }
407            }
408        }
409    }
410
411    Err(anyhow!(
412        "name publish failed after {} attempt(s): {}",
413        attempts,
414        last_err
415            .map(|e| e.to_string())
416            .unwrap_or_else(|| "unknown error".to_string())
417    ))
418}
419
420async fn verify_name_target_after_error(
421    kubo_url: &str,
422    key_name: &str,
423    cid: &str,
424) -> Result<String> {
425    let expected = normalize_ipfs_arg(cid);
426    let resolved = name_resolve(kubo_url, &format!("/ipns/{key_name}"), true).await?;
427    if resolved.trim() == expected {
428        return Ok(resolved);
429    }
430    Err(anyhow!(
431        "post-error resolve mismatch for key '{}': expected '{}' got '{}'",
432        key_name,
433        expected,
434        resolved
435    ))
436}
437
438pub async fn name_resolve(kubo_url: &str, path: &str, recursive: bool) -> Result<String> {
439    let base = kubo_url.trim_end_matches('/');
440    let url = format!("{base}/api/v0/name/resolve");
441
442    let client = reqwest::Client::builder()
443        .timeout(Duration::from_secs(15))
444        .build()?;
445
446    let recursive_flag = if recursive { "true" } else { "false" };
447    let body = client
448        .post(url)
449        .query(&[("arg", path), ("recursive", recursive_flag)])
450        .send()
451        .await?
452        .error_for_status()?
453        .text()
454        .await?;
455
456    let parsed: NameResolveResponse = serde_json::from_str(&body)
457        .map_err(|e| anyhow!("failed parsing name/resolve response: {} body={}", e, body))?;
458    let resolved = if !parsed.path_upper.is_empty() {
459        parsed.path_upper
460    } else {
461        parsed.path_lower
462    };
463    if resolved.is_empty() {
464        return Err(anyhow!("missing path in name/resolve response: {}", body));
465    }
466    Ok(resolved)
467}
468
469// ─── DID document fetch ─────────────────────────────────────────────────────
470
471pub async fn fetch_did_document(kubo_url: &str, did: &Did) -> Result<Document> {
472    let ipns_path = format!("/ipns/{}", did.ipns);
473    let mut backoff = Duration::from_millis(150);
474    let mut last_err: Option<anyhow::Error> = None;
475    let mut document: Option<Document> = None;
476
477    for attempt in 1..=4 {
478        // DID documents are stored as DAG-CBOR via dag/put.
479        match dag_get::<Document>(kubo_url, &ipns_path).await {
480            Ok(doc) => {
481                document = Some(doc);
482                break;
483            }
484            Err(dag_err) => {
485                // Fallback: resolve IPNS manually then dag_get the CID.
486                match name_resolve(kubo_url, &ipns_path, true).await {
487                    Ok(resolved_path) => {
488                        match dag_get::<Document>(kubo_url, &resolved_path).await {
489                            Ok(doc) => {
490                                document = Some(doc);
491                                break;
492                            }
493                            Err(err) => {
494                                last_err = Some(anyhow!(
495                                    "dag_get failed for {}: direct={} resolved={}",
496                                    ipns_path,
497                                    dag_err,
498                                    err
499                                ));
500                            }
501                        }
502                    }
503                    Err(resolve_err) => {
504                        last_err = Some(anyhow!(
505                            "dag_get and name/resolve both failed for {}: dag={} resolve={}",
506                            ipns_path,
507                            dag_err,
508                            resolve_err
509                        ));
510                        if !should_retry_name_resolve_error(&resolve_err) {
511                            break;
512                        }
513                    }
514                }
515            }
516        }
517
518        if attempt < 4 {
519            sleep(backoff).await;
520            let doubled = backoff.as_millis().saturating_mul(2);
521            backoff = Duration::from_millis(std::cmp::min(doubled, 2_000) as u64);
522        }
523    }
524
525    let document = document.ok_or_else(|| {
526        anyhow!(
527            "failed to fetch DID document for {} via {} after retries: {}",
528            did.id(),
529            ipns_path,
530            last_err
531                .map(|e| e.to_string())
532                .unwrap_or_else(|| "unknown error".to_string())
533        )
534    })?;
535
536    document.validate()?;
537    document.verify()?;
538
539    let doc_did = Did::try_from(document.id.as_str())
540        .map_err(|e| anyhow!("DID document has invalid id '{}': {}", document.id, e))?;
541    if doc_did.ipns != did.ipns {
542        return Err(anyhow!(
543            "DID document IPNS mismatch: expected {} but document id is {}",
544            did.base_id(),
545            document.id
546        ));
547    }
548
549    Ok(document)
550}
551
552fn should_retry_name_resolve_error(err: &anyhow::Error) -> bool {
553    let text = err.to_string().to_ascii_lowercase();
554    if text.contains("http status client error") {
555        return false;
556    }
557    if text.contains("missing path in name/resolve response") {
558        return false;
559    }
560    true
561}
562
563// ─── Pin ────────────────────────────────────────────────────────────────────
564
565pub async fn pin_add_named(kubo_url: &str, cid: &str, name: &str) -> Result<()> {
566    let base = kubo_url.trim_end_matches('/');
567    let url = format!("{base}/api/v0/pin/add");
568    let arg = normalize_ipfs_arg(cid);
569
570    let client = reqwest::Client::builder()
571        .timeout(Duration::from_secs(10))
572        .build()?;
573
574    client
575        .post(url)
576        .query(&[("arg", arg.as_str()), ("recursive", "true"), ("name", name)])
577        .send()
578        .await?
579        .error_for_status()?;
580
581    Ok(())
582}
583
584pub async fn pin_rm(kubo_url: &str, cid: &str) -> Result<()> {
585    let base = kubo_url.trim_end_matches('/');
586    let url = format!("{base}/api/v0/pin/rm");
587    let arg = normalize_ipfs_arg(cid);
588
589    let client = reqwest::Client::builder()
590        .timeout(Duration::from_secs(10))
591        .build()?;
592
593    client
594        .post(url)
595        .query(&[("arg", arg.as_str()), ("recursive", "true")])
596        .send()
597        .await?
598        .error_for_status()?;
599
600    Ok(())
601}
602
603// ─── Key management ─────────────────────────────────────────────────────────
604
605pub async fn generate_key(kubo_url: &str, key_name: &str) -> Result<()> {
606    let base = kubo_url.trim_end_matches('/');
607    let url = format!("{base}/api/v0/key/gen");
608
609    reqwest::Client::builder()
610        .timeout(Duration::from_secs(10))
611        .build()?
612        .post(url)
613        .query(&[("arg", key_name), ("type", "ed25519")])
614        .send()
615        .await?
616        .error_for_status()?;
617
618    Ok(())
619}
620
621pub async fn import_key(kubo_url: &str, key_name: &str, key_bytes: Vec<u8>) -> Result<KuboKey> {
622    let base = kubo_url.trim_end_matches('/');
623    let url = format!("{base}/api/v0/key/import");
624
625    let part = multipart::Part::bytes(key_bytes)
626        .file_name("ipns.key")
627        .mime_str("application/octet-stream")?;
628    let form = multipart::Form::new().part("file", part);
629
630    let response = reqwest::Client::builder()
631        .timeout(Duration::from_secs(10))
632        .build()?
633        .post(url)
634        .query(&[("arg", key_name), ("allow-any-key-type", "true")])
635        .multipart(form)
636        .send()
637        .await?
638        .error_for_status()?;
639
640    let body = response.text().await?;
641    let parsed: KeyImportResponse = serde_json::from_str(&body)
642        .map_err(|e| anyhow!("failed parsing key/import response: {} body={}", e, body))?;
643
644    let name = if !parsed.name_upper.trim().is_empty() {
645        parsed.name_upper.trim().to_string()
646    } else {
647        parsed.name_lower.trim().to_string()
648    };
649    let id = if !parsed.id_upper.trim().is_empty() {
650        parsed.id_upper.trim().to_string()
651    } else {
652        parsed.id_lower.trim().to_string()
653    };
654
655    if name.is_empty() || id.is_empty() {
656        return Err(anyhow!("missing name/id in key/import response: {}", body));
657    }
658
659    Ok(KuboKey { name, id })
660}
661
662pub async fn list_keys(kubo_url: &str) -> Result<Vec<KuboKey>> {
663    let base = kubo_url.trim_end_matches('/');
664    let url = format!("{base}/api/v0/key/list");
665
666    let body = reqwest::Client::builder()
667        .timeout(Duration::from_secs(10))
668        .build()?
669        .post(url)
670        .send()
671        .await?
672        .error_for_status()?
673        .text()
674        .await?;
675
676    let parsed: KeyListResponse = serde_json::from_str(&body)
677        .map_err(|e| anyhow!("failed parsing key/list response: {} body={}", e, body))?;
678    Ok(parsed
679        .keys
680        .into_iter()
681        .filter_map(|k| {
682            let name = if !k.name.trim().is_empty() {
683                k.name.trim().to_string()
684            } else {
685                k.name_lower.trim().to_string()
686            };
687            let id = if !k.id.trim().is_empty() {
688                k.id.trim().to_string()
689            } else {
690                k.id_lower.trim().to_string()
691            };
692            if name.is_empty() {
693                None
694            } else {
695                Some(KuboKey { name, id })
696            }
697        })
698        .collect())
699}
700
701pub async fn list_key_names(kubo_url: &str) -> Result<Vec<String>> {
702    let keys = list_keys(kubo_url).await?;
703    Ok(keys.into_iter().map(|k| k.name).collect())
704}
705
706/// Remove a named key from the Kubo keystore.
707pub async fn remove_key(kubo_url: &str, key_name: &str) -> Result<()> {
708    let base = kubo_url.trim_end_matches('/');
709    let url = format!("{base}/api/v0/key/rm");
710
711    reqwest::Client::builder()
712        .timeout(Duration::from_secs(10))
713        .build()?
714        .post(url)
715        .query(&[("arg", key_name)])
716        .send()
717        .await?
718        .error_for_status()?;
719
720    Ok(())
721}
722
723// ─── Tests ──────────────────────────────────────────────────────────────────
724
725#[cfg(test)]
726mod tests {
727    use super::*;
728
729    #[test]
730    fn normalize_ipfs_arg_from_raw_cid() {
731        assert_eq!(normalize_ipfs_arg("QmExampleCid"), "/ipfs/QmExampleCid");
732    }
733
734    #[test]
735    fn normalize_ipfs_arg_from_prefixed_path() {
736        assert_eq!(
737            normalize_ipfs_arg("/ipfs/QmExampleCid"),
738            "/ipfs/QmExampleCid"
739        );
740    }
741
742    #[test]
743    fn normalize_ipfs_arg_from_double_prefixed_path() {
744        assert_eq!(
745            normalize_ipfs_arg("/ipfs//ipfs/QmExampleCid"),
746            "/ipfs/QmExampleCid"
747        );
748    }
749
750    #[test]
751    fn does_not_retry_http_client_status_errors() {
752        let err = anyhow!(
753            "HTTP status client error (404 Not Found) for url (http://127.0.0.1:5001/api/v0/name/resolve)"
754        );
755        assert!(!should_retry_name_resolve_error(&err));
756    }
757
758    #[test]
759    fn retries_http_server_status_errors() {
760        let err = anyhow!(
761            "HTTP status server error (500 Internal Server Error) for url (http://127.0.0.1:5001/api/v0/name/resolve)"
762        );
763        assert!(should_retry_name_resolve_error(&err));
764    }
765
766    #[test]
767    fn retries_network_errors() {
768        let err =
769            anyhow!("error sending request for url (http://127.0.0.1:5001/api/v0/name/resolve)");
770        assert!(should_retry_name_resolve_error(&err));
771    }
772}