1use anyhow::{anyhow, Result};
7use did_ma::{Did, Document};
8use reqwest::multipart;
9use serde::de::DeserializeOwned;
10use serde::{Deserialize, Serialize};
11use std::time::Duration;
12use tokio::time::sleep;
13use tracing::warn;
14
15pub const DEFAULT_KUBO_API_URL: &str = "http://127.0.0.1:5001";
16
17#[derive(Debug, Deserialize)]
20struct AddResponse {
21 #[serde(rename = "Hash")]
22 hash: String,
23}
24
25#[derive(Debug, Deserialize)]
26struct DagPutCid {
27 #[serde(rename = "/")]
28 slash: String,
29}
30
31#[derive(Debug, Deserialize)]
32struct DagPutResponse {
33 #[serde(default, rename = "Cid")]
34 cid_upper: Option<DagPutCid>,
35 #[serde(default)]
36 cid: Option<DagPutCid>,
37}
38
39#[derive(Debug, Deserialize)]
40struct NamePublishResponse {
41 #[serde(default, rename = "Value")]
42 value_upper: String,
43 #[serde(default, rename = "value")]
44 value_lower: String,
45}
46
47#[derive(Debug, Deserialize)]
48struct NameResolveResponse {
49 #[serde(default, rename = "Path")]
50 path_upper: String,
51 #[serde(default, rename = "path")]
52 path_lower: String,
53}
54
55#[derive(Debug, Deserialize)]
56struct VersionResponse {
57 #[serde(default, rename = "Version")]
58 version_upper: String,
59 #[serde(default, rename = "version")]
60 version_lower: String,
61}
62
63#[derive(Debug, Deserialize)]
64struct KeyListEntry {
65 #[serde(default, rename = "Name")]
66 name: String,
67 #[serde(default, rename = "name")]
68 name_lower: String,
69 #[serde(default, rename = "Id")]
70 id: String,
71 #[serde(default, rename = "id")]
72 id_lower: String,
73}
74
75#[derive(Debug, Deserialize)]
76struct KeyListResponse {
77 #[serde(default, rename = "Keys")]
78 keys: Vec<KeyListEntry>,
79}
80
81#[derive(Debug, Deserialize)]
82struct KeyImportResponse {
83 #[serde(default, rename = "Name")]
84 name_upper: String,
85 #[serde(default, rename = "name")]
86 name_lower: String,
87 #[serde(default, rename = "Id")]
88 id_upper: String,
89 #[serde(default, rename = "id")]
90 id_lower: String,
91}
92
93#[derive(Clone, Debug)]
94pub struct KuboKey {
95 pub name: String,
96 pub id: String,
97}
98
99#[derive(Clone, Debug)]
102pub struct IpnsPublishOptions {
103 pub timeout: Duration,
104 pub allow_offline: bool,
105 pub lifetime: String,
106 pub ttl: Option<String>,
107 pub resolve: bool,
108 pub quieter: bool,
109}
110
111impl Default for IpnsPublishOptions {
112 fn default() -> Self {
113 Self {
114 timeout: Duration::from_secs(15),
115 allow_offline: true,
116 lifetime: "8760h".to_string(),
117 ttl: None,
118 resolve: false,
119 quieter: true,
120 }
121 }
122}
123
124pub async fn wait_for_api(kubo_url: &str, attempts: u32) -> Result<()> {
127 if attempts == 0 {
128 return Err(anyhow!("kubo readiness attempts must be >= 1"));
129 }
130
131 let base = kubo_url.trim_end_matches('/');
132 let url = format!("{base}/api/v0/version");
133 let client = reqwest::Client::builder()
134 .timeout(Duration::from_secs(6))
135 .build()?;
136
137 let mut backoff = Duration::from_millis(200);
138 let mut last_err: Option<anyhow::Error> = None;
139
140 for attempt in 1..=attempts {
141 let result = async {
142 let response = client.post(&url).send().await?.error_for_status()?;
143 let body = response.text().await?;
144 let parsed: VersionResponse = serde_json::from_str(&body)
145 .map_err(|e| anyhow!("failed parsing version response: {} body={}", e, body))?;
146 let version = if !parsed.version_upper.is_empty() {
147 parsed.version_upper
148 } else {
149 parsed.version_lower
150 };
151 if version.trim().is_empty() {
152 return Err(anyhow!("missing version field in response: {}", body));
153 }
154 Ok::<(), anyhow::Error>(())
155 }
156 .await;
157
158 match result {
159 Ok(()) => return Ok(()),
160 Err(err) => {
161 warn!("kubo readiness {}/{}: {}", attempt, attempts, err);
162 last_err = Some(err);
163 if attempt < attempts {
164 sleep(backoff).await;
165 let doubled = backoff.as_millis().saturating_mul(2);
166 backoff = Duration::from_millis(std::cmp::min(doubled, 5_000) as u64);
167 }
168 }
169 }
170 }
171 Err(anyhow!(
172 "kubo API not ready after {} attempts: {}",
173 attempts,
174 last_err
175 .map(|e| e.to_string())
176 .unwrap_or_else(|| "unknown error".to_string())
177 ))
178}
179
180pub async fn ipfs_add(kubo_url: &str, data: Vec<u8>) -> Result<String> {
183 let base = kubo_url.trim_end_matches('/');
184 let url = format!("{base}/api/v0/add");
185
186 let part = multipart::Part::bytes(data).file_name("data");
187 let form = multipart::Form::new().part("file", part);
188
189 let client = reqwest::Client::builder()
190 .timeout(Duration::from_secs(10))
191 .build()?;
192
193 let body = client
194 .post(url)
195 .query(&[("pin", "true")])
196 .multipart(form)
197 .send()
198 .await?
199 .error_for_status()?
200 .text()
201 .await?;
202
203 let parsed: AddResponse = serde_json::from_str(&body)
204 .map_err(|e| anyhow!("failed parsing add response: {} body={}", e, body))?;
205 Ok(parsed.hash)
206}
207
208pub async fn cat_bytes(kubo_url: &str, cid: &str) -> Result<Vec<u8>> {
209 let base = kubo_url.trim_end_matches('/');
210 let url = format!("{base}/api/v0/cat");
211
212 let client = reqwest::Client::builder()
213 .timeout(Duration::from_secs(30))
214 .build()?;
215
216 let bytes = client
217 .post(url)
218 .query(&[("arg", cid)])
219 .send()
220 .await?
221 .error_for_status()?
222 .bytes()
223 .await?;
224
225 Ok(bytes.to_vec())
226}
227
228pub async fn cat_text(kubo_url: &str, cid: &str) -> Result<String> {
229 let bytes = cat_bytes(kubo_url, cid).await?;
230 String::from_utf8(bytes).map_err(|e| anyhow!("non-utf8 content from {}: {}", cid, e))
231}
232
233pub async fn dag_put<T: Serialize>(kubo_url: &str, value: &T) -> Result<String> {
236 let base = kubo_url.trim_end_matches('/');
237 let url = format!("{base}/api/v0/dag/put");
238 let payload = serde_json::to_vec(value)?;
239
240 let part = multipart::Part::bytes(payload)
241 .file_name("node.json")
242 .mime_str("application/json")?;
243 let form = multipart::Form::new().part("file", part);
244
245 let client = reqwest::Client::builder()
246 .timeout(Duration::from_secs(10))
247 .build()?;
248
249 let body = client
250 .post(url)
251 .query(&[
252 ("store-codec", "dag-cbor"),
253 ("input-codec", "dag-json"),
254 ("pin", "true"),
255 ])
256 .multipart(form)
257 .send()
258 .await?
259 .error_for_status()?
260 .text()
261 .await?;
262
263 let parsed: DagPutResponse = serde_json::from_str(&body)
264 .map_err(|e| anyhow!("failed parsing dag/put response: {} body={}", e, body))?;
265 parsed
266 .cid_upper
267 .or(parsed.cid)
268 .map(|c| c.slash)
269 .ok_or_else(|| anyhow!("missing CID in dag/put response: {}", body))
270}
271
272pub async fn dag_get<T: DeserializeOwned>(kubo_url: &str, cid: &str) -> Result<T> {
273 let base = kubo_url.trim_end_matches('/');
274 let url = format!("{base}/api/v0/dag/get");
275
276 let client = reqwest::Client::builder()
277 .timeout(Duration::from_secs(10))
278 .build()?;
279
280 let body = client
281 .post(url)
282 .query(&[("arg", cid)])
283 .send()
284 .await?
285 .error_for_status()?
286 .text()
287 .await?;
288
289 serde_json::from_str::<T>(&body).map_err(|e| {
290 anyhow!(
291 "failed parsing dag/get response for {}: {} body={}",
292 cid,
293 e,
294 body
295 )
296 })
297}
298
299fn normalize_ipfs_arg(cid_or_path: &str) -> String {
302 let mut value = cid_or_path.trim().to_string();
303 while let Some(rest) = value.strip_prefix("/ipfs/") {
304 value = rest.to_string();
305 }
306 while let Some(rest) = value.strip_prefix('/') {
307 value = rest.to_string();
308 }
309 format!("/ipfs/{value}")
310}
311
312pub async fn name_publish(kubo_url: &str, key_name: &str, cid: &str) -> Result<String> {
313 let options = IpnsPublishOptions::default();
314 name_publish_with_options(kubo_url, key_name, cid, &options).await
315}
316
317pub async fn name_publish_with_options(
318 kubo_url: &str,
319 key_name: &str,
320 cid: &str,
321 options: &IpnsPublishOptions,
322) -> Result<String> {
323 let base = kubo_url.trim_end_matches('/');
324 let url = format!("{base}/api/v0/name/publish");
325 let arg = normalize_ipfs_arg(cid);
326
327 let client = reqwest::Client::builder()
328 .timeout(options.timeout)
329 .build()?;
330
331 let allow_offline = if options.allow_offline {
332 "true"
333 } else {
334 "false"
335 };
336 let resolve = if options.resolve { "true" } else { "false" };
337 let quieter = if options.quieter { "true" } else { "false" };
338
339 let mut params: Vec<(&str, &str)> = vec![
340 ("arg", arg.as_str()),
341 ("key", key_name),
342 ("allow-offline", allow_offline),
343 ("lifetime", options.lifetime.as_str()),
344 ("resolve", resolve),
345 ("quieter", quieter),
346 ];
347 if let Some(ref ttl) = options.ttl {
348 params.push(("ttl", ttl.as_str()));
349 }
350
351 let body = client
352 .post(url)
353 .query(¶ms)
354 .send()
355 .await?
356 .error_for_status()?
357 .text()
358 .await?;
359
360 let parsed: NamePublishResponse = serde_json::from_str(&body)
361 .map_err(|e| anyhow!("failed parsing name/publish response: {} body={}", e, body))?;
362 let value = if !parsed.value_upper.is_empty() {
363 parsed.value_upper
364 } else {
365 parsed.value_lower
366 };
367 if value.is_empty() {
368 return Err(anyhow!("missing value in name/publish response: {}", body));
369 }
370 Ok(value)
371}
372
373pub async fn name_publish_with_retry(
374 kubo_url: &str,
375 key_name: &str,
376 cid: &str,
377 options: &IpnsPublishOptions,
378 attempts: u32,
379 initial_backoff: Duration,
380) -> Result<String> {
381 if attempts == 0 {
382 return Err(anyhow!("name publish attempts must be >= 1"));
383 }
384
385 let mut backoff = initial_backoff;
386 let mut last_err: Option<anyhow::Error> = None;
387
388 for attempt in 1..=attempts {
389 match name_publish_with_options(kubo_url, key_name, cid, options).await {
390 Ok(value) => return Ok(value),
391 Err(err) => {
392 if let Ok(value) = verify_name_target_after_error(kubo_url, key_name, cid).await {
393 warn!(
394 "name publish attempt {}/{} reported error for key '{}' but resolve confirms target; accepting: {}",
395 attempt, attempts, key_name, value
396 );
397 return Ok(value);
398 }
399 warn!(
400 "name publish attempt {}/{} failed for key '{}' cid '{}': {}",
401 attempt, attempts, key_name, cid, err
402 );
403 last_err = Some(err);
404 if attempt < attempts {
405 sleep(backoff).await;
406 let doubled = backoff.as_millis().saturating_mul(2);
407 backoff = Duration::from_millis(std::cmp::min(doubled, 30_000) as u64);
408 }
409 }
410 }
411 }
412
413 Err(anyhow!(
414 "name publish failed after {} attempt(s): {}",
415 attempts,
416 last_err
417 .map(|e| e.to_string())
418 .unwrap_or_else(|| "unknown error".to_string())
419 ))
420}
421
422async fn verify_name_target_after_error(
423 kubo_url: &str,
424 key_name: &str,
425 cid: &str,
426) -> Result<String> {
427 let expected = normalize_ipfs_arg(cid);
428 let resolved = name_resolve(kubo_url, &format!("/ipns/{key_name}"), true).await?;
429 if resolved.trim() == expected {
430 return Ok(resolved);
431 }
432 Err(anyhow!(
433 "post-error resolve mismatch for key '{}': expected '{}' got '{}'",
434 key_name,
435 expected,
436 resolved
437 ))
438}
439
440pub async fn name_resolve(kubo_url: &str, path: &str, recursive: bool) -> Result<String> {
441 let base = kubo_url.trim_end_matches('/');
442 let url = format!("{base}/api/v0/name/resolve");
443
444 let client = reqwest::Client::builder()
445 .timeout(Duration::from_secs(15))
446 .build()?;
447
448 let recursive_flag = if recursive { "true" } else { "false" };
449 let body = client
450 .post(url)
451 .query(&[("arg", path), ("recursive", recursive_flag)])
452 .send()
453 .await?
454 .error_for_status()?
455 .text()
456 .await?;
457
458 let parsed: NameResolveResponse = serde_json::from_str(&body)
459 .map_err(|e| anyhow!("failed parsing name/resolve response: {} body={}", e, body))?;
460 let resolved = if !parsed.path_upper.is_empty() {
461 parsed.path_upper
462 } else {
463 parsed.path_lower
464 };
465 if resolved.is_empty() {
466 return Err(anyhow!("missing path in name/resolve response: {}", body));
467 }
468 Ok(resolved)
469}
470
471pub async fn fetch_did_document(kubo_url: &str, did: &Did) -> Result<Document> {
474 let ipns_path = format!("/ipns/{}", did.ipns);
475 let mut backoff = Duration::from_millis(150);
476 let mut last_err: Option<anyhow::Error> = None;
477 let mut document: Option<Document> = None;
478
479 for attempt in 1..=4 {
480 match dag_get::<Document>(kubo_url, &ipns_path).await {
482 Ok(doc) => {
483 document = Some(doc);
484 break;
485 }
486 Err(dag_err) => {
487 match name_resolve(kubo_url, &ipns_path, true).await {
489 Ok(resolved_path) => {
490 match dag_get::<Document>(kubo_url, &resolved_path).await {
491 Ok(doc) => {
492 document = Some(doc);
493 break;
494 }
495 Err(err) => {
496 last_err = Some(anyhow!(
497 "dag_get failed for {}: direct={} resolved={}",
498 ipns_path,
499 dag_err,
500 err
501 ));
502 }
503 }
504 }
505 Err(resolve_err) => {
506 last_err = Some(anyhow!(
507 "dag_get and name/resolve both failed for {}: dag={} resolve={}",
508 ipns_path,
509 dag_err,
510 resolve_err
511 ));
512 if !should_retry_name_resolve_error(&resolve_err) {
513 break;
514 }
515 }
516 }
517 }
518 }
519
520 if attempt < 4 {
521 sleep(backoff).await;
522 let doubled = backoff.as_millis().saturating_mul(2);
523 backoff = Duration::from_millis(std::cmp::min(doubled, 2_000) as u64);
524 }
525 }
526
527 let document = document.ok_or_else(|| {
528 anyhow!(
529 "failed to fetch DID document for {} via {} after retries: {}",
530 did.id(),
531 ipns_path,
532 last_err
533 .map(|e| e.to_string())
534 .unwrap_or_else(|| "unknown error".to_string())
535 )
536 })?;
537
538 document.validate()?;
539 document.verify()?;
540
541 let doc_did = Did::try_from(document.id.as_str())
542 .map_err(|e| anyhow!("DID document has invalid id '{}': {}", document.id, e))?;
543 if doc_did.ipns != did.ipns {
544 return Err(anyhow!(
545 "DID document IPNS mismatch: expected {} but document id is {}",
546 did.base_id(),
547 document.id
548 ));
549 }
550
551 Ok(document)
552}
553
554fn should_retry_name_resolve_error(err: &anyhow::Error) -> bool {
555 let text = err.to_string().to_ascii_lowercase();
556 if text.contains("http status client error") {
557 return false;
558 }
559 if text.contains("missing path in name/resolve response") {
560 return false;
561 }
562 true
563}
564
565pub async fn pin_add_named(kubo_url: &str, cid: &str, name: &str) -> Result<()> {
568 let base = kubo_url.trim_end_matches('/');
569 let url = format!("{base}/api/v0/pin/add");
570 let arg = normalize_ipfs_arg(cid);
571
572 let client = reqwest::Client::builder()
573 .timeout(Duration::from_secs(10))
574 .build()?;
575
576 client
577 .post(url)
578 .query(&[("arg", arg.as_str()), ("recursive", "true"), ("name", name)])
579 .send()
580 .await?
581 .error_for_status()?;
582
583 Ok(())
584}
585
586pub async fn pin_rm(kubo_url: &str, cid: &str) -> Result<()> {
587 let base = kubo_url.trim_end_matches('/');
588 let url = format!("{base}/api/v0/pin/rm");
589 let arg = normalize_ipfs_arg(cid);
590
591 let client = reqwest::Client::builder()
592 .timeout(Duration::from_secs(10))
593 .build()?;
594
595 client
596 .post(url)
597 .query(&[("arg", arg.as_str()), ("recursive", "true")])
598 .send()
599 .await?
600 .error_for_status()?;
601
602 Ok(())
603}
604
605pub async fn generate_key(kubo_url: &str, key_name: &str) -> Result<()> {
608 let base = kubo_url.trim_end_matches('/');
609 let url = format!("{base}/api/v0/key/gen");
610
611 reqwest::Client::builder()
612 .timeout(Duration::from_secs(10))
613 .build()?
614 .post(url)
615 .query(&[("arg", key_name), ("type", "ed25519")])
616 .send()
617 .await?
618 .error_for_status()?;
619
620 Ok(())
621}
622
623pub async fn import_key(kubo_url: &str, key_name: &str, key_bytes: Vec<u8>) -> Result<KuboKey> {
624 let base = kubo_url.trim_end_matches('/');
625 let url = format!("{base}/api/v0/key/import");
626
627 let part = multipart::Part::bytes(key_bytes)
628 .file_name("ipns.key")
629 .mime_str("application/octet-stream")?;
630 let form = multipart::Form::new().part("file", part);
631
632 let response = reqwest::Client::builder()
633 .timeout(Duration::from_secs(10))
634 .build()?
635 .post(url)
636 .query(&[("arg", key_name), ("allow-any-key-type", "true")])
637 .multipart(form)
638 .send()
639 .await?
640 .error_for_status()?;
641
642 let body = response.text().await?;
643 let parsed: KeyImportResponse = serde_json::from_str(&body)
644 .map_err(|e| anyhow!("failed parsing key/import response: {} body={}", e, body))?;
645
646 let name = if !parsed.name_upper.trim().is_empty() {
647 parsed.name_upper.trim().to_string()
648 } else {
649 parsed.name_lower.trim().to_string()
650 };
651 let id = if !parsed.id_upper.trim().is_empty() {
652 parsed.id_upper.trim().to_string()
653 } else {
654 parsed.id_lower.trim().to_string()
655 };
656
657 if name.is_empty() || id.is_empty() {
658 return Err(anyhow!("missing name/id in key/import response: {}", body));
659 }
660
661 Ok(KuboKey { name, id })
662}
663
664pub async fn list_keys(kubo_url: &str) -> Result<Vec<KuboKey>> {
665 let base = kubo_url.trim_end_matches('/');
666 let url = format!("{base}/api/v0/key/list");
667
668 let body = reqwest::Client::builder()
669 .timeout(Duration::from_secs(10))
670 .build()?
671 .post(url)
672 .send()
673 .await?
674 .error_for_status()?
675 .text()
676 .await?;
677
678 let parsed: KeyListResponse = serde_json::from_str(&body)
679 .map_err(|e| anyhow!("failed parsing key/list response: {} body={}", e, body))?;
680 Ok(parsed
681 .keys
682 .into_iter()
683 .filter_map(|k| {
684 let name = if !k.name.trim().is_empty() {
685 k.name.trim().to_string()
686 } else {
687 k.name_lower.trim().to_string()
688 };
689 let id = if !k.id.trim().is_empty() {
690 k.id.trim().to_string()
691 } else {
692 k.id_lower.trim().to_string()
693 };
694 if name.is_empty() {
695 None
696 } else {
697 Some(KuboKey { name, id })
698 }
699 })
700 .collect())
701}
702
703pub async fn list_key_names(kubo_url: &str) -> Result<Vec<String>> {
704 let keys = list_keys(kubo_url).await?;
705 Ok(keys.into_iter().map(|k| k.name).collect())
706}
707
708pub async fn remove_key(kubo_url: &str, key_name: &str) -> Result<()> {
710 let base = kubo_url.trim_end_matches('/');
711 let url = format!("{base}/api/v0/key/rm");
712
713 reqwest::Client::builder()
714 .timeout(Duration::from_secs(10))
715 .build()?
716 .post(url)
717 .query(&[("arg", key_name)])
718 .send()
719 .await?
720 .error_for_status()?;
721
722 Ok(())
723}
724
725#[cfg(test)]
728mod tests {
729 use super::*;
730
731 #[test]
732 fn normalize_ipfs_arg_from_raw_cid() {
733 assert_eq!(normalize_ipfs_arg("QmExampleCid"), "/ipfs/QmExampleCid");
734 }
735
736 #[test]
737 fn normalize_ipfs_arg_from_prefixed_path() {
738 assert_eq!(
739 normalize_ipfs_arg("/ipfs/QmExampleCid"),
740 "/ipfs/QmExampleCid"
741 );
742 }
743
744 #[test]
745 fn normalize_ipfs_arg_from_double_prefixed_path() {
746 assert_eq!(
747 normalize_ipfs_arg("/ipfs//ipfs/QmExampleCid"),
748 "/ipfs/QmExampleCid"
749 );
750 }
751
752 #[test]
753 fn does_not_retry_http_client_status_errors() {
754 let err = anyhow!(
755 "HTTP status client error (404 Not Found) for url (http://127.0.0.1:5001/api/v0/name/resolve)"
756 );
757 assert!(!should_retry_name_resolve_error(&err));
758 }
759
760 #[test]
761 fn retries_http_server_status_errors() {
762 let err = anyhow!(
763 "HTTP status server error (500 Internal Server Error) for url (http://127.0.0.1:5001/api/v0/name/resolve)"
764 );
765 assert!(should_retry_name_resolve_error(&err));
766 }
767
768 #[test]
769 fn retries_network_errors() {
770 let err =
771 anyhow!("error sending request for url (http://127.0.0.1:5001/api/v0/name/resolve)");
772 assert!(should_retry_name_resolve_error(&err));
773 }
774}