Skip to main content

ma_core/
kubo.rs

1//! Kubo RPC client for IPFS operations.
2//!
3//! HTTP helpers for the Kubo `/api/v0/` endpoints: data add/cat, DAG
4//! put/get, IPNS name publish/resolve, key management, and pinning.
5
6use anyhow::{anyhow, Result};
7use did_ma::{Did, Document};
8use reqwest::multipart;
9use serde::de::DeserializeOwned;
10use serde::{Deserialize, Serialize};
11use std::time::Duration;
12use tokio::time::sleep;
13use tracing::warn;
14
15pub const DEFAULT_KUBO_API_URL: &str = "http://127.0.0.1:5001";
16
17// ─── Response types ─────────────────────────────────────────────────────────
18
19#[derive(Debug, Deserialize)]
20struct AddResponse {
21    #[serde(rename = "Hash")]
22    hash: String,
23}
24
25#[derive(Debug, Deserialize)]
26struct DagPutCid {
27    #[serde(rename = "/")]
28    slash: String,
29}
30
31#[derive(Debug, Deserialize)]
32struct DagPutResponse {
33    #[serde(default, rename = "Cid")]
34    cid_upper: Option<DagPutCid>,
35    #[serde(default)]
36    cid: Option<DagPutCid>,
37}
38
39#[derive(Debug, Deserialize)]
40struct NamePublishResponse {
41    #[serde(default, rename = "Value")]
42    value_upper: String,
43    #[serde(default, rename = "value")]
44    value_lower: String,
45}
46
47#[derive(Debug, Deserialize)]
48struct NameResolveResponse {
49    #[serde(default, rename = "Path")]
50    path_upper: String,
51    #[serde(default, rename = "path")]
52    path_lower: String,
53}
54
55#[derive(Debug, Deserialize)]
56struct VersionResponse {
57    #[serde(default, rename = "Version")]
58    version_upper: String,
59    #[serde(default, rename = "version")]
60    version_lower: String,
61}
62
63#[derive(Debug, Deserialize)]
64struct KeyListEntry {
65    #[serde(default, rename = "Name")]
66    name: String,
67    #[serde(default, rename = "name")]
68    name_lower: String,
69    #[serde(default, rename = "Id")]
70    id: String,
71    #[serde(default, rename = "id")]
72    id_lower: String,
73}
74
75#[derive(Debug, Deserialize)]
76struct KeyListResponse {
77    #[serde(default, rename = "Keys")]
78    keys: Vec<KeyListEntry>,
79}
80
81#[derive(Debug, Deserialize)]
82struct KeyImportResponse {
83    #[serde(default, rename = "Name")]
84    name_upper: String,
85    #[serde(default, rename = "name")]
86    name_lower: String,
87    #[serde(default, rename = "Id")]
88    id_upper: String,
89    #[serde(default, rename = "id")]
90    id_lower: String,
91}
92
93#[derive(Clone, Debug)]
94pub struct KuboKey {
95    pub name: String,
96    pub id: String,
97}
98
99// ─── Publish options ────────────────────────────────────────────────────────
100
101#[derive(Clone, Debug)]
102pub struct IpnsPublishOptions {
103    pub timeout: Duration,
104    pub allow_offline: bool,
105    pub lifetime: String,
106    pub ttl: Option<String>,
107    pub resolve: bool,
108    pub quieter: bool,
109}
110
111impl Default for IpnsPublishOptions {
112    fn default() -> Self {
113        Self {
114            timeout: Duration::from_secs(15),
115            allow_offline: true,
116            lifetime: "8760h".to_string(),
117            ttl: None,
118            resolve: false,
119            quieter: true,
120        }
121    }
122}
123
124// ─── Readiness ──────────────────────────────────────────────────────────────
125
126pub async fn wait_for_api(kubo_url: &str, attempts: u32) -> Result<()> {
127    if attempts == 0 {
128        return Err(anyhow!("kubo readiness attempts must be >= 1"));
129    }
130
131    let base = kubo_url.trim_end_matches('/');
132    let url = format!("{base}/api/v0/version");
133    let client = reqwest::Client::builder()
134        .timeout(Duration::from_secs(6))
135        .build()?;
136
137    let mut backoff = Duration::from_millis(200);
138    let mut last_err: Option<anyhow::Error> = None;
139
140    for attempt in 1..=attempts {
141        let result = async {
142            let response = client.post(&url).send().await?.error_for_status()?;
143            let body = response.text().await?;
144            let parsed: VersionResponse = serde_json::from_str(&body)
145                .map_err(|e| anyhow!("failed parsing version response: {} body={}", e, body))?;
146            let version = if !parsed.version_upper.is_empty() {
147                parsed.version_upper
148            } else {
149                parsed.version_lower
150            };
151            if version.trim().is_empty() {
152                return Err(anyhow!("missing version field in response: {}", body));
153            }
154            Ok::<(), anyhow::Error>(())
155        }
156        .await;
157
158        match result {
159            Ok(()) => return Ok(()),
160            Err(err) => {
161                warn!("kubo readiness {}/{}: {}", attempt, attempts, err);
162                last_err = Some(err);
163                if attempt < attempts {
164                    sleep(backoff).await;
165                    let doubled = backoff.as_millis().saturating_mul(2);
166                    backoff = Duration::from_millis(std::cmp::min(doubled, 5_000) as u64);
167                }
168            }
169        }
170    }
171    Err(anyhow!(
172        "kubo API not ready after {} attempts: {}",
173        attempts,
174        last_err
175            .map(|e| e.to_string())
176            .unwrap_or_else(|| "unknown error".to_string())
177    ))
178}
179
180// ─── Add / Cat ──────────────────────────────────────────────────────────────
181
182pub async fn ipfs_add(kubo_url: &str, data: Vec<u8>) -> Result<String> {
183    let base = kubo_url.trim_end_matches('/');
184    let url = format!("{base}/api/v0/add");
185
186    let part = multipart::Part::bytes(data).file_name("data");
187    let form = multipart::Form::new().part("file", part);
188
189    let client = reqwest::Client::builder()
190        .timeout(Duration::from_secs(10))
191        .build()?;
192
193    let body = client
194        .post(url)
195        .query(&[("pin", "true")])
196        .multipart(form)
197        .send()
198        .await?
199        .error_for_status()?
200        .text()
201        .await?;
202
203    let parsed: AddResponse = serde_json::from_str(&body)
204        .map_err(|e| anyhow!("failed parsing add response: {} body={}", e, body))?;
205    Ok(parsed.hash)
206}
207
208pub async fn cat_bytes(kubo_url: &str, cid: &str) -> Result<Vec<u8>> {
209    let base = kubo_url.trim_end_matches('/');
210    let url = format!("{base}/api/v0/cat");
211
212    let client = reqwest::Client::builder()
213        .timeout(Duration::from_secs(30))
214        .build()?;
215
216    let bytes = client
217        .post(url)
218        .query(&[("arg", cid)])
219        .send()
220        .await?
221        .error_for_status()?
222        .bytes()
223        .await?;
224
225    Ok(bytes.to_vec())
226}
227
228pub async fn cat_text(kubo_url: &str, cid: &str) -> Result<String> {
229    let bytes = cat_bytes(kubo_url, cid).await?;
230    String::from_utf8(bytes).map_err(|e| anyhow!("non-utf8 content from {}: {}", cid, e))
231}
232
233// ─── DAG ────────────────────────────────────────────────────────────────────
234
235pub async fn dag_put<T: Serialize>(kubo_url: &str, value: &T) -> Result<String> {
236    let base = kubo_url.trim_end_matches('/');
237    let url = format!("{base}/api/v0/dag/put");
238    let payload = serde_json::to_vec(value)?;
239
240    let part = multipart::Part::bytes(payload)
241        .file_name("node.json")
242        .mime_str("application/json")?;
243    let form = multipart::Form::new().part("file", part);
244
245    let client = reqwest::Client::builder()
246        .timeout(Duration::from_secs(10))
247        .build()?;
248
249    let body = client
250        .post(url)
251        .query(&[
252            ("store-codec", "dag-cbor"),
253            ("input-codec", "dag-json"),
254            ("pin", "true"),
255        ])
256        .multipart(form)
257        .send()
258        .await?
259        .error_for_status()?
260        .text()
261        .await?;
262
263    let parsed: DagPutResponse = serde_json::from_str(&body)
264        .map_err(|e| anyhow!("failed parsing dag/put response: {} body={}", e, body))?;
265    parsed
266        .cid_upper
267        .or(parsed.cid)
268        .map(|c| c.slash)
269        .ok_or_else(|| anyhow!("missing CID in dag/put response: {}", body))
270}
271
272pub async fn dag_get<T: DeserializeOwned>(kubo_url: &str, cid: &str) -> Result<T> {
273    let base = kubo_url.trim_end_matches('/');
274    let url = format!("{base}/api/v0/dag/get");
275
276    let client = reqwest::Client::builder()
277        .timeout(Duration::from_secs(10))
278        .build()?;
279
280    let body = client
281        .post(url)
282        .query(&[("arg", cid)])
283        .send()
284        .await?
285        .error_for_status()?
286        .text()
287        .await?;
288
289    serde_json::from_str::<T>(&body).map_err(|e| {
290        anyhow!(
291            "failed parsing dag/get response for {}: {} body={}",
292            cid,
293            e,
294            body
295        )
296    })
297}
298
299// ─── Name publish / resolve ─────────────────────────────────────────────────
300
301fn normalize_ipfs_arg(cid_or_path: &str) -> String {
302    let mut value = cid_or_path.trim().to_string();
303    while let Some(rest) = value.strip_prefix("/ipfs/") {
304        value = rest.to_string();
305    }
306    while let Some(rest) = value.strip_prefix('/') {
307        value = rest.to_string();
308    }
309    format!("/ipfs/{value}")
310}
311
312pub async fn name_publish(kubo_url: &str, key_name: &str, cid: &str) -> Result<String> {
313    let options = IpnsPublishOptions::default();
314    name_publish_with_options(kubo_url, key_name, cid, &options).await
315}
316
317pub async fn name_publish_with_options(
318    kubo_url: &str,
319    key_name: &str,
320    cid: &str,
321    options: &IpnsPublishOptions,
322) -> Result<String> {
323    let base = kubo_url.trim_end_matches('/');
324    let url = format!("{base}/api/v0/name/publish");
325    let arg = normalize_ipfs_arg(cid);
326
327    let client = reqwest::Client::builder()
328        .timeout(options.timeout)
329        .build()?;
330
331    let allow_offline = if options.allow_offline {
332        "true"
333    } else {
334        "false"
335    };
336    let resolve = if options.resolve { "true" } else { "false" };
337    let quieter = if options.quieter { "true" } else { "false" };
338
339    let mut params: Vec<(&str, &str)> = vec![
340        ("arg", arg.as_str()),
341        ("key", key_name),
342        ("allow-offline", allow_offline),
343        ("lifetime", options.lifetime.as_str()),
344        ("resolve", resolve),
345        ("quieter", quieter),
346    ];
347    if let Some(ref ttl) = options.ttl {
348        params.push(("ttl", ttl.as_str()));
349    }
350
351    let body = client
352        .post(url)
353        .query(&params)
354        .send()
355        .await?
356        .error_for_status()?
357        .text()
358        .await?;
359
360    let parsed: NamePublishResponse = serde_json::from_str(&body)
361        .map_err(|e| anyhow!("failed parsing name/publish response: {} body={}", e, body))?;
362    let value = if !parsed.value_upper.is_empty() {
363        parsed.value_upper
364    } else {
365        parsed.value_lower
366    };
367    if value.is_empty() {
368        return Err(anyhow!("missing value in name/publish response: {}", body));
369    }
370    Ok(value)
371}
372
373pub async fn name_publish_with_retry(
374    kubo_url: &str,
375    key_name: &str,
376    cid: &str,
377    options: &IpnsPublishOptions,
378    attempts: u32,
379    initial_backoff: Duration,
380) -> Result<String> {
381    if attempts == 0 {
382        return Err(anyhow!("name publish attempts must be >= 1"));
383    }
384
385    let mut backoff = initial_backoff;
386    let mut last_err: Option<anyhow::Error> = None;
387
388    for attempt in 1..=attempts {
389        match name_publish_with_options(kubo_url, key_name, cid, options).await {
390            Ok(value) => return Ok(value),
391            Err(err) => {
392                if let Ok(value) = verify_name_target_after_error(kubo_url, key_name, cid).await {
393                    warn!(
394                        "name publish attempt {}/{} reported error for key '{}' but resolve confirms target; accepting: {}",
395                        attempt, attempts, key_name, value
396                    );
397                    return Ok(value);
398                }
399                warn!(
400                    "name publish attempt {}/{} failed for key '{}' cid '{}': {}",
401                    attempt, attempts, key_name, cid, err
402                );
403                last_err = Some(err);
404                if attempt < attempts {
405                    sleep(backoff).await;
406                    let doubled = backoff.as_millis().saturating_mul(2);
407                    backoff = Duration::from_millis(std::cmp::min(doubled, 30_000) as u64);
408                }
409            }
410        }
411    }
412
413    Err(anyhow!(
414        "name publish failed after {} attempt(s): {}",
415        attempts,
416        last_err
417            .map(|e| e.to_string())
418            .unwrap_or_else(|| "unknown error".to_string())
419    ))
420}
421
422async fn verify_name_target_after_error(
423    kubo_url: &str,
424    key_name: &str,
425    cid: &str,
426) -> Result<String> {
427    let expected = normalize_ipfs_arg(cid);
428    let resolved = name_resolve(kubo_url, &format!("/ipns/{key_name}"), true).await?;
429    if resolved.trim() == expected {
430        return Ok(resolved);
431    }
432    Err(anyhow!(
433        "post-error resolve mismatch for key '{}': expected '{}' got '{}'",
434        key_name,
435        expected,
436        resolved
437    ))
438}
439
440pub async fn name_resolve(kubo_url: &str, path: &str, recursive: bool) -> Result<String> {
441    let base = kubo_url.trim_end_matches('/');
442    let url = format!("{base}/api/v0/name/resolve");
443
444    let client = reqwest::Client::builder()
445        .timeout(Duration::from_secs(15))
446        .build()?;
447
448    let recursive_flag = if recursive { "true" } else { "false" };
449    let body = client
450        .post(url)
451        .query(&[("arg", path), ("recursive", recursive_flag)])
452        .send()
453        .await?
454        .error_for_status()?
455        .text()
456        .await?;
457
458    let parsed: NameResolveResponse = serde_json::from_str(&body)
459        .map_err(|e| anyhow!("failed parsing name/resolve response: {} body={}", e, body))?;
460    let resolved = if !parsed.path_upper.is_empty() {
461        parsed.path_upper
462    } else {
463        parsed.path_lower
464    };
465    if resolved.is_empty() {
466        return Err(anyhow!("missing path in name/resolve response: {}", body));
467    }
468    Ok(resolved)
469}
470
471// ─── DID document fetch ─────────────────────────────────────────────────────
472
473pub async fn fetch_did_document(kubo_url: &str, did: &Did) -> Result<Document> {
474    let ipns_path = format!("/ipns/{}", did.ipns);
475    let mut backoff = Duration::from_millis(150);
476    let mut last_err: Option<anyhow::Error> = None;
477    let mut document: Option<Document> = None;
478
479    for attempt in 1..=4 {
480        // DID documents are stored as DAG-CBOR via dag/put.
481        match dag_get::<Document>(kubo_url, &ipns_path).await {
482            Ok(doc) => {
483                document = Some(doc);
484                break;
485            }
486            Err(dag_err) => {
487                // Fallback: resolve IPNS manually then dag_get the CID.
488                match name_resolve(kubo_url, &ipns_path, true).await {
489                    Ok(resolved_path) => {
490                        match dag_get::<Document>(kubo_url, &resolved_path).await {
491                            Ok(doc) => {
492                                document = Some(doc);
493                                break;
494                            }
495                            Err(err) => {
496                                last_err = Some(anyhow!(
497                                    "dag_get failed for {}: direct={} resolved={}",
498                                    ipns_path,
499                                    dag_err,
500                                    err
501                                ));
502                            }
503                        }
504                    }
505                    Err(resolve_err) => {
506                        last_err = Some(anyhow!(
507                            "dag_get and name/resolve both failed for {}: dag={} resolve={}",
508                            ipns_path,
509                            dag_err,
510                            resolve_err
511                        ));
512                        if !should_retry_name_resolve_error(&resolve_err) {
513                            break;
514                        }
515                    }
516                }
517            }
518        }
519
520        if attempt < 4 {
521            sleep(backoff).await;
522            let doubled = backoff.as_millis().saturating_mul(2);
523            backoff = Duration::from_millis(std::cmp::min(doubled, 2_000) as u64);
524        }
525    }
526
527    let document = document.ok_or_else(|| {
528        anyhow!(
529            "failed to fetch DID document for {} via {} after retries: {}",
530            did.id(),
531            ipns_path,
532            last_err
533                .map(|e| e.to_string())
534                .unwrap_or_else(|| "unknown error".to_string())
535        )
536    })?;
537
538    document.validate()?;
539    document.verify()?;
540
541    let doc_did = Did::try_from(document.id.as_str())
542        .map_err(|e| anyhow!("DID document has invalid id '{}': {}", document.id, e))?;
543    if doc_did.ipns != did.ipns {
544        return Err(anyhow!(
545            "DID document IPNS mismatch: expected {} but document id is {}",
546            did.base_id(),
547            document.id
548        ));
549    }
550
551    Ok(document)
552}
553
554fn should_retry_name_resolve_error(err: &anyhow::Error) -> bool {
555    let text = err.to_string().to_ascii_lowercase();
556    if text.contains("http status client error") {
557        return false;
558    }
559    if text.contains("missing path in name/resolve response") {
560        return false;
561    }
562    true
563}
564
565// ─── Pin ────────────────────────────────────────────────────────────────────
566
567pub async fn pin_add_named(kubo_url: &str, cid: &str, name: &str) -> Result<()> {
568    let base = kubo_url.trim_end_matches('/');
569    let url = format!("{base}/api/v0/pin/add");
570    let arg = normalize_ipfs_arg(cid);
571
572    let client = reqwest::Client::builder()
573        .timeout(Duration::from_secs(10))
574        .build()?;
575
576    client
577        .post(url)
578        .query(&[("arg", arg.as_str()), ("recursive", "true"), ("name", name)])
579        .send()
580        .await?
581        .error_for_status()?;
582
583    Ok(())
584}
585
586pub async fn pin_rm(kubo_url: &str, cid: &str) -> Result<()> {
587    let base = kubo_url.trim_end_matches('/');
588    let url = format!("{base}/api/v0/pin/rm");
589    let arg = normalize_ipfs_arg(cid);
590
591    let client = reqwest::Client::builder()
592        .timeout(Duration::from_secs(10))
593        .build()?;
594
595    client
596        .post(url)
597        .query(&[("arg", arg.as_str()), ("recursive", "true")])
598        .send()
599        .await?
600        .error_for_status()?;
601
602    Ok(())
603}
604
605// ─── Key management ─────────────────────────────────────────────────────────
606
607pub async fn generate_key(kubo_url: &str, key_name: &str) -> Result<()> {
608    let base = kubo_url.trim_end_matches('/');
609    let url = format!("{base}/api/v0/key/gen");
610
611    reqwest::Client::builder()
612        .timeout(Duration::from_secs(10))
613        .build()?
614        .post(url)
615        .query(&[("arg", key_name), ("type", "ed25519")])
616        .send()
617        .await?
618        .error_for_status()?;
619
620    Ok(())
621}
622
623pub async fn import_key(kubo_url: &str, key_name: &str, key_bytes: Vec<u8>) -> Result<KuboKey> {
624    let base = kubo_url.trim_end_matches('/');
625    let url = format!("{base}/api/v0/key/import");
626
627    let part = multipart::Part::bytes(key_bytes)
628        .file_name("ipns.key")
629        .mime_str("application/octet-stream")?;
630    let form = multipart::Form::new().part("file", part);
631
632    let response = reqwest::Client::builder()
633        .timeout(Duration::from_secs(10))
634        .build()?
635        .post(url)
636        .query(&[("arg", key_name), ("allow-any-key-type", "true")])
637        .multipart(form)
638        .send()
639        .await?
640        .error_for_status()?;
641
642    let body = response.text().await?;
643    let parsed: KeyImportResponse = serde_json::from_str(&body)
644        .map_err(|e| anyhow!("failed parsing key/import response: {} body={}", e, body))?;
645
646    let name = if !parsed.name_upper.trim().is_empty() {
647        parsed.name_upper.trim().to_string()
648    } else {
649        parsed.name_lower.trim().to_string()
650    };
651    let id = if !parsed.id_upper.trim().is_empty() {
652        parsed.id_upper.trim().to_string()
653    } else {
654        parsed.id_lower.trim().to_string()
655    };
656
657    if name.is_empty() || id.is_empty() {
658        return Err(anyhow!("missing name/id in key/import response: {}", body));
659    }
660
661    Ok(KuboKey { name, id })
662}
663
664pub async fn list_keys(kubo_url: &str) -> Result<Vec<KuboKey>> {
665    let base = kubo_url.trim_end_matches('/');
666    let url = format!("{base}/api/v0/key/list");
667
668    let body = reqwest::Client::builder()
669        .timeout(Duration::from_secs(10))
670        .build()?
671        .post(url)
672        .send()
673        .await?
674        .error_for_status()?
675        .text()
676        .await?;
677
678    let parsed: KeyListResponse = serde_json::from_str(&body)
679        .map_err(|e| anyhow!("failed parsing key/list response: {} body={}", e, body))?;
680    Ok(parsed
681        .keys
682        .into_iter()
683        .filter_map(|k| {
684            let name = if !k.name.trim().is_empty() {
685                k.name.trim().to_string()
686            } else {
687                k.name_lower.trim().to_string()
688            };
689            let id = if !k.id.trim().is_empty() {
690                k.id.trim().to_string()
691            } else {
692                k.id_lower.trim().to_string()
693            };
694            if name.is_empty() {
695                None
696            } else {
697                Some(KuboKey { name, id })
698            }
699        })
700        .collect())
701}
702
703pub async fn list_key_names(kubo_url: &str) -> Result<Vec<String>> {
704    let keys = list_keys(kubo_url).await?;
705    Ok(keys.into_iter().map(|k| k.name).collect())
706}
707
708/// Remove a named key from the Kubo keystore.
709pub async fn remove_key(kubo_url: &str, key_name: &str) -> Result<()> {
710    let base = kubo_url.trim_end_matches('/');
711    let url = format!("{base}/api/v0/key/rm");
712
713    reqwest::Client::builder()
714        .timeout(Duration::from_secs(10))
715        .build()?
716        .post(url)
717        .query(&[("arg", key_name)])
718        .send()
719        .await?
720        .error_for_status()?;
721
722    Ok(())
723}
724
725// ─── Tests ──────────────────────────────────────────────────────────────────
726
727#[cfg(test)]
728mod tests {
729    use super::*;
730
731    #[test]
732    fn normalize_ipfs_arg_from_raw_cid() {
733        assert_eq!(normalize_ipfs_arg("QmExampleCid"), "/ipfs/QmExampleCid");
734    }
735
736    #[test]
737    fn normalize_ipfs_arg_from_prefixed_path() {
738        assert_eq!(
739            normalize_ipfs_arg("/ipfs/QmExampleCid"),
740            "/ipfs/QmExampleCid"
741        );
742    }
743
744    #[test]
745    fn normalize_ipfs_arg_from_double_prefixed_path() {
746        assert_eq!(
747            normalize_ipfs_arg("/ipfs//ipfs/QmExampleCid"),
748            "/ipfs/QmExampleCid"
749        );
750    }
751
752    #[test]
753    fn does_not_retry_http_client_status_errors() {
754        let err = anyhow!(
755            "HTTP status client error (404 Not Found) for url (http://127.0.0.1:5001/api/v0/name/resolve)"
756        );
757        assert!(!should_retry_name_resolve_error(&err));
758    }
759
760    #[test]
761    fn retries_http_server_status_errors() {
762        let err = anyhow!(
763            "HTTP status server error (500 Internal Server Error) for url (http://127.0.0.1:5001/api/v0/name/resolve)"
764        );
765        assert!(should_retry_name_resolve_error(&err));
766    }
767
768    #[test]
769    fn retries_network_errors() {
770        let err =
771            anyhow!("error sending request for url (http://127.0.0.1:5001/api/v0/name/resolve)");
772        assert!(should_retry_name_resolve_error(&err));
773    }
774}