Skip to main content

ma_core/kubo/
kubo.rs

1//! Kubo RPC client for IPFS operations.
2//!
3//! HTTP helpers for the Kubo `/api/v0/` endpoints: data add/cat, DAG
4//! put/get, IPNS name publish/resolve, key management, and pinning.
5
6use crate::{Did, Document};
7use anyhow::{anyhow, Result};
8use reqwest::multipart;
9use serde::de::DeserializeOwned;
10use serde::{Deserialize, Serialize};
11use tokio::time::sleep;
12use tracing::warn;
13use web_time::Duration;
14
15// ─── Response types ─────────────────────────────────────────────────────────
16
17#[derive(Debug, Deserialize)]
18struct AddResponse {
19    #[serde(rename = "Hash")]
20    hash: String,
21}
22
23#[derive(Debug, Deserialize)]
24struct DagPutCid {
25    #[serde(rename = "/")]
26    slash: String,
27}
28
29#[derive(Debug, Deserialize)]
30struct DagPutResponse {
31    #[serde(default, rename = "Cid")]
32    cid_upper: Option<DagPutCid>,
33    #[serde(default)]
34    cid: Option<DagPutCid>,
35}
36
37#[derive(Debug, Deserialize)]
38struct NamePublishResponse {
39    #[serde(default, rename = "Value")]
40    value_upper: String,
41    #[serde(default, rename = "value")]
42    value_lower: String,
43}
44
45#[derive(Debug, Deserialize)]
46struct NameResolveResponse {
47    #[serde(default, rename = "Path")]
48    path_upper: String,
49    #[serde(default, rename = "path")]
50    path_lower: String,
51}
52
53#[derive(Debug, Deserialize)]
54struct VersionResponse {
55    #[serde(default, rename = "Version")]
56    version_upper: String,
57    #[serde(default, rename = "version")]
58    version_lower: String,
59}
60
61#[derive(Debug, Deserialize)]
62struct KeyListEntry {
63    #[serde(default, rename = "Name")]
64    name: String,
65    #[serde(default, rename = "name")]
66    name_lower: String,
67    #[serde(default, rename = "Id")]
68    id: String,
69    #[serde(default, rename = "id")]
70    id_lower: String,
71}
72
73#[derive(Debug, Deserialize)]
74struct KeyListResponse {
75    #[serde(default, rename = "Keys")]
76    keys: Vec<KeyListEntry>,
77}
78
79#[derive(Debug, Deserialize)]
80struct KeyImportResponse {
81    #[serde(default, rename = "Name")]
82    name_upper: String,
83    #[serde(default, rename = "name")]
84    name_lower: String,
85    #[serde(default, rename = "Id")]
86    id_upper: String,
87    #[serde(default, rename = "id")]
88    id_lower: String,
89}
90
91#[derive(Clone, Debug)]
92pub struct KuboKey {
93    pub name: String,
94    pub id: String,
95}
96
97// ─── Publish options ────────────────────────────────────────────────────────
98
99#[derive(Clone, Debug)]
100pub struct IpnsPublishOptions {
101    pub timeout: Duration,
102    pub allow_offline: bool,
103    pub lifetime: String,
104    pub ttl: Option<String>,
105    pub resolve: bool,
106    pub quieter: bool,
107}
108
109impl Default for IpnsPublishOptions {
110    fn default() -> Self {
111        Self {
112            timeout: Duration::from_mins(2),
113            allow_offline: true,
114            lifetime: "8760h".to_string(),
115            ttl: None,
116            resolve: false,
117            quieter: true,
118        }
119    }
120}
121
122// ─── Readiness ──────────────────────────────────────────────────────────────
123
124pub async fn wait_for_api(kubo_url: &str, attempts: u32) -> Result<()> {
125    if attempts == 0 {
126        return Err(anyhow!("kubo readiness attempts must be >= 1"));
127    }
128
129    let base = kubo_url.trim_end_matches('/');
130    let url = format!("{base}/api/v0/version");
131    let client = reqwest::Client::builder()
132        .timeout(Duration::from_secs(6))
133        .build()?;
134
135    let mut fib_prev = Duration::from_millis(0);
136    let mut fib_curr = Duration::from_millis(200);
137    let mut last_err: Option<anyhow::Error> = None;
138
139    for attempt in 1..=attempts {
140        let result = async {
141            let response = client.post(&url).send().await?.error_for_status()?;
142            let body = response.text().await?;
143            let parsed: VersionResponse = serde_json::from_str(&body)
144                .map_err(|e| anyhow!("failed parsing version response: {} body={}", e, body))?;
145            let version = if !parsed.version_upper.is_empty() {
146                parsed.version_upper
147            } else {
148                parsed.version_lower
149            };
150            if version.trim().is_empty() {
151                return Err(anyhow!("missing version field in response: {}", body));
152            }
153            Ok::<(), anyhow::Error>(())
154        }
155        .await;
156
157        match result {
158            Ok(()) => return Ok(()),
159            Err(err) => {
160                warn!("kubo readiness {}/{}: {}", attempt, attempts, err);
161                last_err = Some(err);
162                if attempt < attempts {
163                    sleep(fib_curr).await;
164                    let next_ms = fib_prev.as_millis().saturating_add(fib_curr.as_millis());
165                    fib_prev = fib_curr;
166                    fib_curr = Duration::from_millis(std::cmp::min(next_ms, 5_000) as u64);
167                }
168            }
169        }
170    }
171    Err(anyhow!(
172        "kubo API not ready after {} attempts: {}",
173        attempts,
174        last_err
175            .map(|e| e.to_string())
176            .unwrap_or_else(|| "unknown error".to_string())
177    ))
178}
179
180// ─── Add / Cat ──────────────────────────────────────────────────────────────
181
182pub async fn ipfs_add(kubo_url: &str, data: Vec<u8>) -> Result<String> {
183    let base = kubo_url.trim_end_matches('/');
184    let url = format!("{base}/api/v0/add");
185
186    let part = multipart::Part::bytes(data).file_name("data");
187    let form = multipart::Form::new().part("file", part);
188
189    let client = reqwest::Client::builder()
190        .timeout(Duration::from_secs(10))
191        .build()?;
192
193    let body = client
194        .post(url)
195        .query(&[("pin", "true")])
196        .multipart(form)
197        .send()
198        .await?
199        .error_for_status()?
200        .text()
201        .await?;
202
203    let parsed: AddResponse = serde_json::from_str(&body)
204        .map_err(|e| anyhow!("failed parsing add response: {} body={}", e, body))?;
205    Ok(parsed.hash)
206}
207
208pub async fn cat_bytes(kubo_url: &str, cid: &str) -> Result<Vec<u8>> {
209    let base = kubo_url.trim_end_matches('/');
210    let url = format!("{base}/api/v0/cat");
211
212    let client = reqwest::Client::builder()
213        .timeout(Duration::from_secs(30))
214        .build()?;
215
216    let bytes = client
217        .post(url)
218        .query(&[("arg", cid)])
219        .send()
220        .await?
221        .error_for_status()?
222        .bytes()
223        .await?;
224
225    Ok(bytes.to_vec())
226}
227
228pub async fn cat_text(kubo_url: &str, cid: &str) -> Result<String> {
229    let bytes = cat_bytes(kubo_url, cid).await?;
230    String::from_utf8(bytes).map_err(|e| anyhow!("non-utf8 content from {}: {}", cid, e))
231}
232
233// ─── DAG ────────────────────────────────────────────────────────────────────
234
235pub async fn dag_put<T: Serialize>(kubo_url: &str, value: &T) -> Result<String> {
236    let base = kubo_url.trim_end_matches('/');
237    let url = format!("{base}/api/v0/dag/put");
238    let payload = serde_json::to_vec(value)?;
239
240    let part = multipart::Part::bytes(payload)
241        .file_name("node.json")
242        .mime_str("application/json")?;
243    let form = multipart::Form::new().part("file", part);
244
245    let client = reqwest::Client::builder()
246        .timeout(Duration::from_secs(10))
247        .build()?;
248
249    let body = client
250        .post(url)
251        .query(&[
252            ("store-codec", "dag-cbor"),
253            ("input-codec", "dag-json"),
254            ("pin", "true"),
255        ])
256        .multipart(form)
257        .send()
258        .await?
259        .error_for_status()?
260        .text()
261        .await?;
262
263    let parsed: DagPutResponse = serde_json::from_str(&body)
264        .map_err(|e| anyhow!("failed parsing dag/put response: {} body={}", e, body))?;
265    parsed
266        .cid_upper
267        .or(parsed.cid)
268        .map(|c| c.slash)
269        .ok_or_else(|| anyhow!("missing CID in dag/put response: {}", body))
270}
271
272pub async fn dag_get<T: DeserializeOwned>(kubo_url: &str, cid: &str) -> Result<T> {
273    let base = kubo_url.trim_end_matches('/');
274    let url = format!("{base}/api/v0/dag/get");
275
276    let client = reqwest::Client::builder()
277        .timeout(Duration::from_secs(10))
278        .build()?;
279
280    let body = client
281        .post(url)
282        .query(&[("arg", cid)])
283        .send()
284        .await?
285        .error_for_status()?
286        .text()
287        .await?;
288
289    serde_json::from_str::<T>(&body).map_err(|e| {
290        anyhow!(
291            "failed parsing dag/get response for {}: {} body={}",
292            cid,
293            e,
294            body
295        )
296    })
297}
298
299// ─── Name publish / resolve ─────────────────────────────────────────────────
300
301fn normalize_ipfs_arg(cid_or_path: &str) -> String {
302    let mut value = cid_or_path.trim().to_string();
303    while let Some(rest) = value.strip_prefix("/ipfs/") {
304        value = rest.to_string();
305    }
306    while let Some(rest) = value.strip_prefix('/') {
307        value = rest.to_string();
308    }
309    format!("/ipfs/{value}")
310}
311
312pub async fn name_publish(kubo_url: &str, key_name: &str, cid: &str) -> Result<String> {
313    let options = IpnsPublishOptions::default();
314    name_publish_with_options(kubo_url, key_name, cid, &options).await
315}
316
317pub async fn name_publish_with_options(
318    kubo_url: &str,
319    key_name: &str,
320    cid: &str,
321    options: &IpnsPublishOptions,
322) -> Result<String> {
323    let base = kubo_url.trim_end_matches('/');
324    let url = format!("{base}/api/v0/name/publish");
325    let arg = normalize_ipfs_arg(cid);
326
327    let client = reqwest::Client::builder()
328        .timeout(options.timeout)
329        .build()?;
330
331    let allow_offline = if options.allow_offline {
332        "true"
333    } else {
334        "false"
335    };
336    let resolve = if options.resolve { "true" } else { "false" };
337    let quieter = if options.quieter { "true" } else { "false" };
338
339    let mut params: Vec<(&str, &str)> = vec![
340        ("arg", arg.as_str()),
341        ("key", key_name),
342        ("allow-offline", allow_offline),
343        ("lifetime", options.lifetime.as_str()),
344        ("resolve", resolve),
345        ("quieter", quieter),
346    ];
347    if let Some(ref ttl) = options.ttl {
348        params.push(("ttl", ttl.as_str()));
349    }
350
351    let body = client
352        .post(url)
353        .query(&params)
354        .send()
355        .await?
356        .error_for_status()?
357        .text()
358        .await?;
359
360    let parsed: NamePublishResponse = serde_json::from_str(&body)
361        .map_err(|e| anyhow!("failed parsing name/publish response: {} body={}", e, body))?;
362    let value = if !parsed.value_upper.is_empty() {
363        parsed.value_upper
364    } else {
365        parsed.value_lower
366    };
367    if value.is_empty() {
368        return Err(anyhow!("missing value in name/publish response: {}", body));
369    }
370    Ok(value)
371}
372
373pub async fn name_publish_with_retry(
374    kubo_url: &str,
375    key_name: &str,
376    cid: &str,
377    options: &IpnsPublishOptions,
378    attempts: u32,
379    initial_backoff: Duration,
380) -> Result<String> {
381    if attempts == 0 {
382        return Err(anyhow!("name publish attempts must be >= 1"));
383    }
384
385    let mut fib_prev = Duration::from_millis(0);
386    let mut fib_curr = initial_backoff;
387    let mut last_err: Option<anyhow::Error> = None;
388
389    for attempt in 1..=attempts {
390        match name_publish_with_options(kubo_url, key_name, cid, options).await {
391            Ok(value) => return Ok(value),
392            Err(err) => {
393                if let Ok(value) = verify_name_target_after_error(kubo_url, key_name, cid).await {
394                    warn!(
395                        "name publish attempt {}/{} reported error for key '{}' but resolve confirms target; accepting: {}",
396                        attempt, attempts, key_name, value
397                    );
398                    return Ok(value);
399                }
400                warn!(
401                    "name publish attempt {}/{} failed for key '{}' cid '{}': {}",
402                    attempt, attempts, key_name, cid, err
403                );
404                last_err = Some(err);
405                if attempt < attempts {
406                    sleep(fib_curr).await;
407                    let next_ms = fib_prev.as_millis().saturating_add(fib_curr.as_millis());
408                    fib_prev = fib_curr;
409                    fib_curr = Duration::from_millis(std::cmp::min(next_ms, 30_000) as u64);
410                }
411            }
412        }
413    }
414
415    Err(anyhow!(
416        "name publish failed after {} attempt(s): {}",
417        attempts,
418        last_err
419            .map(|e| e.to_string())
420            .unwrap_or_else(|| "unknown error".to_string())
421    ))
422}
423
424async fn verify_name_target_after_error(
425    kubo_url: &str,
426    key_name: &str,
427    cid: &str,
428) -> Result<String> {
429    let expected = normalize_ipfs_arg(cid);
430    let resolved = name_resolve(kubo_url, &format!("/ipns/{key_name}"), true).await?;
431    if resolved.trim() == expected {
432        return Ok(resolved);
433    }
434    Err(anyhow!(
435        "post-error resolve mismatch for key '{}': expected '{}' got '{}'",
436        key_name,
437        expected,
438        resolved
439    ))
440}
441
442pub async fn name_resolve(kubo_url: &str, path: &str, recursive: bool) -> Result<String> {
443    let base = kubo_url.trim_end_matches('/');
444    let url = format!("{base}/api/v0/name/resolve");
445
446    let client = reqwest::Client::builder()
447        .timeout(Duration::from_secs(15))
448        .build()?;
449
450    let recursive_flag = if recursive { "true" } else { "false" };
451    let body = client
452        .post(url)
453        .query(&[("arg", path), ("recursive", recursive_flag)])
454        .send()
455        .await?
456        .error_for_status()?
457        .text()
458        .await?;
459
460    let parsed: NameResolveResponse = serde_json::from_str(&body)
461        .map_err(|e| anyhow!("failed parsing name/resolve response: {} body={}", e, body))?;
462    let resolved = if !parsed.path_upper.is_empty() {
463        parsed.path_upper
464    } else {
465        parsed.path_lower
466    };
467    if resolved.is_empty() {
468        return Err(anyhow!("missing path in name/resolve response: {}", body));
469    }
470    Ok(resolved)
471}
472
473// ─── DID document fetch ─────────────────────────────────────────────────────
474
475pub async fn fetch_did_document(kubo_url: &str, did: &Did) -> Result<Document> {
476    let ipns_path = format!("/ipns/{}", did.ipns);
477    let mut fib_prev = Duration::from_millis(0);
478    let mut fib_curr = Duration::from_millis(150);
479    let mut last_err: Option<anyhow::Error> = None;
480    let mut document: Option<Document> = None;
481
482    for attempt in 1..=4 {
483        // DID documents are stored as DAG-CBOR via dag/put.
484        match dag_get::<Document>(kubo_url, &ipns_path).await {
485            Ok(doc) => {
486                document = Some(doc);
487                break;
488            }
489            Err(dag_err) => {
490                // Fallback: resolve IPNS manually then dag_get the CID.
491                match name_resolve(kubo_url, &ipns_path, true).await {
492                    Ok(resolved_path) => {
493                        match dag_get::<Document>(kubo_url, &resolved_path).await {
494                            Ok(doc) => {
495                                document = Some(doc);
496                                break;
497                            }
498                            Err(err) => {
499                                last_err = Some(anyhow!(
500                                    "dag_get failed for {}: direct={} resolved={}",
501                                    ipns_path,
502                                    dag_err,
503                                    err
504                                ));
505                            }
506                        }
507                    }
508                    Err(resolve_err) => {
509                        last_err = Some(anyhow!(
510                            "dag_get and name/resolve both failed for {}: dag={} resolve={}",
511                            ipns_path,
512                            dag_err,
513                            resolve_err
514                        ));
515                        if !should_retry_name_resolve_error(&resolve_err) {
516                            break;
517                        }
518                    }
519                }
520            }
521        }
522
523        if attempt < 4 {
524            sleep(fib_curr).await;
525            let next_ms = fib_prev.as_millis().saturating_add(fib_curr.as_millis());
526            fib_prev = fib_curr;
527            fib_curr = Duration::from_millis(std::cmp::min(next_ms, 2_000) as u64);
528        }
529    }
530
531    let document = document.ok_or_else(|| {
532        anyhow!(
533            "failed to fetch DID document for {} via {} after retries: {}",
534            did.id(),
535            ipns_path,
536            last_err
537                .map(|e| e.to_string())
538                .unwrap_or_else(|| "unknown error".to_string())
539        )
540    })?;
541
542    document.validate()?;
543    document.verify()?;
544
545    let doc_did = Did::try_from(document.id.as_str())
546        .map_err(|e| anyhow!("DID document has invalid id '{}': {}", document.id, e))?;
547    if doc_did.ipns != did.ipns {
548        return Err(anyhow!(
549            "DID document IPNS mismatch: expected {} but document id is {}",
550            did.base_id(),
551            document.id
552        ));
553    }
554
555    Ok(document)
556}
557
558fn should_retry_name_resolve_error(err: &anyhow::Error) -> bool {
559    let text = err.to_string().to_ascii_lowercase();
560    if text.contains("http status client error") {
561        return false;
562    }
563    if text.contains("missing path in name/resolve response") {
564        return false;
565    }
566    true
567}
568
569// ─── Pin ────────────────────────────────────────────────────────────────────
570
571pub async fn pin_add_named(kubo_url: &str, cid: &str, name: &str) -> Result<()> {
572    let base = kubo_url.trim_end_matches('/');
573    let url = format!("{base}/api/v0/pin/add");
574    let arg = normalize_ipfs_arg(cid);
575
576    let client = reqwest::Client::builder()
577        .timeout(Duration::from_secs(10))
578        .build()?;
579
580    client
581        .post(url)
582        .query(&[("arg", arg.as_str()), ("recursive", "true"), ("name", name)])
583        .send()
584        .await?
585        .error_for_status()?;
586
587    Ok(())
588}
589
590pub async fn pin_rm(kubo_url: &str, cid: &str) -> Result<()> {
591    let base = kubo_url.trim_end_matches('/');
592    let url = format!("{base}/api/v0/pin/rm");
593    let arg = normalize_ipfs_arg(cid);
594
595    let client = reqwest::Client::builder()
596        .timeout(Duration::from_secs(10))
597        .build()?;
598
599    client
600        .post(url)
601        .query(&[("arg", arg.as_str()), ("recursive", "true")])
602        .send()
603        .await?
604        .error_for_status()?;
605
606    Ok(())
607}
608
609// ─── Key management ─────────────────────────────────────────────────────────
610
611pub async fn generate_key(kubo_url: &str, key_name: &str) -> Result<()> {
612    let base = kubo_url.trim_end_matches('/');
613    let url = format!("{base}/api/v0/key/gen");
614
615    reqwest::Client::builder()
616        .timeout(Duration::from_secs(10))
617        .build()?
618        .post(url)
619        .query(&[("arg", key_name), ("type", "ed25519")])
620        .send()
621        .await?
622        .error_for_status()?;
623
624    Ok(())
625}
626
627pub async fn import_key(kubo_url: &str, key_name: &str, key_bytes: Vec<u8>) -> Result<KuboKey> {
628    let base = kubo_url.trim_end_matches('/');
629    let url = format!("{base}/api/v0/key/import");
630
631    let part = multipart::Part::bytes(key_bytes)
632        .file_name("ipns.key")
633        .mime_str("application/octet-stream")?;
634    let form = multipart::Form::new().part("file", part);
635
636    let response = reqwest::Client::builder()
637        .timeout(Duration::from_secs(10))
638        .build()?
639        .post(url)
640        .query(&[
641            ("arg", key_name),
642            ("ipns-base", "base36"),
643            ("allow-any-key-type", "true"),
644        ])
645        .multipart(form)
646        .send()
647        .await?
648        .error_for_status()?;
649
650    let body = response.text().await?;
651    let parsed: KeyImportResponse = serde_json::from_str(&body)
652        .map_err(|e| anyhow!("failed parsing key/import response: {} body={}", e, body))?;
653
654    let name = if !parsed.name_upper.trim().is_empty() {
655        parsed.name_upper.trim().to_string()
656    } else {
657        parsed.name_lower.trim().to_string()
658    };
659    let id = if !parsed.id_upper.trim().is_empty() {
660        parsed.id_upper.trim().to_string()
661    } else {
662        parsed.id_lower.trim().to_string()
663    };
664
665    if name.is_empty() || id.is_empty() {
666        return Err(anyhow!("missing name/id in key/import response: {}", body));
667    }
668
669    Ok(KuboKey { name, id })
670}
671
672pub async fn list_keys(kubo_url: &str) -> Result<Vec<KuboKey>> {
673    let base = kubo_url.trim_end_matches('/');
674    let url = format!("{base}/api/v0/key/list");
675
676    let body = reqwest::Client::builder()
677        .timeout(Duration::from_secs(10))
678        .build()?
679        .post(url)
680        .send()
681        .await?
682        .error_for_status()?
683        .text()
684        .await?;
685
686    let parsed: KeyListResponse = serde_json::from_str(&body)
687        .map_err(|e| anyhow!("failed parsing key/list response: {} body={}", e, body))?;
688    Ok(parsed
689        .keys
690        .into_iter()
691        .filter_map(|k| {
692            let name = if !k.name.trim().is_empty() {
693                k.name.trim().to_string()
694            } else {
695                k.name_lower.trim().to_string()
696            };
697            let id = if !k.id.trim().is_empty() {
698                k.id.trim().to_string()
699            } else {
700                k.id_lower.trim().to_string()
701            };
702            if name.is_empty() {
703                None
704            } else {
705                Some(KuboKey { name, id })
706            }
707        })
708        .collect())
709}
710
711pub async fn list_key_names(kubo_url: &str) -> Result<Vec<String>> {
712    let keys = list_keys(kubo_url).await?;
713    Ok(keys.into_iter().map(|k| k.name).collect())
714}
715
716/// Remove a named key from the Kubo keystore.
717pub async fn remove_key(kubo_url: &str, key_name: &str) -> Result<()> {
718    let base = kubo_url.trim_end_matches('/');
719    let url = format!("{base}/api/v0/key/rm");
720
721    reqwest::Client::builder()
722        .timeout(Duration::from_secs(10))
723        .build()?
724        .post(url)
725        .query(&[("arg", key_name)])
726        .send()
727        .await?
728        .error_for_status()?;
729
730    Ok(())
731}
732
733// ─── Tests ──────────────────────────────────────────────────────────────────
734
735#[cfg(test)]
736mod tests {
737    use super::*;
738
739    #[test]
740    fn normalize_ipfs_arg_from_raw_cid() {
741        assert_eq!(normalize_ipfs_arg("QmExampleCid"), "/ipfs/QmExampleCid");
742    }
743
744    #[test]
745    fn normalize_ipfs_arg_from_prefixed_path() {
746        assert_eq!(
747            normalize_ipfs_arg("/ipfs/QmExampleCid"),
748            "/ipfs/QmExampleCid"
749        );
750    }
751
752    #[test]
753    fn normalize_ipfs_arg_from_double_prefixed_path() {
754        assert_eq!(
755            normalize_ipfs_arg("/ipfs//ipfs/QmExampleCid"),
756            "/ipfs/QmExampleCid"
757        );
758    }
759
760    #[test]
761    fn does_not_retry_http_client_status_errors() {
762        let err = anyhow!(
763            "HTTP status client error (404 Not Found) for url (http://127.0.0.1:5001/api/v0/name/resolve)"
764        );
765        assert!(!should_retry_name_resolve_error(&err));
766    }
767
768    #[test]
769    fn retries_http_server_status_errors() {
770        let err = anyhow!(
771            "HTTP status server error (500 Internal Server Error) for url (http://127.0.0.1:5001/api/v0/name/resolve)"
772        );
773        assert!(should_retry_name_resolve_error(&err));
774    }
775
776    #[test]
777    fn retries_network_errors() {
778        let err =
779            anyhow!("error sending request for url (http://127.0.0.1:5001/api/v0/name/resolve)");
780        assert!(should_retry_name_resolve_error(&err));
781    }
782}