1use anyhow::{anyhow, Result};
7use ma_did::{Did, Document};
8use reqwest::multipart;
9use serde::de::DeserializeOwned;
10use serde::{Deserialize, Serialize};
11use std::time::Duration;
12use tokio::time::sleep;
13use tracing::warn;
14
15#[derive(Debug, Deserialize)]
18struct AddResponse {
19 #[serde(rename = "Hash")]
20 hash: String,
21}
22
23#[derive(Debug, Deserialize)]
24struct DagPutCid {
25 #[serde(rename = "/")]
26 slash: String,
27}
28
29#[derive(Debug, Deserialize)]
30struct DagPutResponse {
31 #[serde(default, rename = "Cid")]
32 cid_upper: Option<DagPutCid>,
33 #[serde(default)]
34 cid: Option<DagPutCid>,
35}
36
37#[derive(Debug, Deserialize)]
38struct NamePublishResponse {
39 #[serde(default, rename = "Value")]
40 value_upper: String,
41 #[serde(default, rename = "value")]
42 value_lower: String,
43}
44
45#[derive(Debug, Deserialize)]
46struct NameResolveResponse {
47 #[serde(default, rename = "Path")]
48 path_upper: String,
49 #[serde(default, rename = "path")]
50 path_lower: String,
51}
52
53#[derive(Debug, Deserialize)]
54struct VersionResponse {
55 #[serde(default, rename = "Version")]
56 version_upper: String,
57 #[serde(default, rename = "version")]
58 version_lower: String,
59}
60
61#[derive(Debug, Deserialize)]
62struct KeyListEntry {
63 #[serde(default, rename = "Name")]
64 name: String,
65 #[serde(default, rename = "name")]
66 name_lower: String,
67 #[serde(default, rename = "Id")]
68 id: String,
69 #[serde(default, rename = "id")]
70 id_lower: String,
71}
72
73#[derive(Debug, Deserialize)]
74struct KeyListResponse {
75 #[serde(default, rename = "Keys")]
76 keys: Vec<KeyListEntry>,
77}
78
79#[derive(Debug, Deserialize)]
80struct KeyImportResponse {
81 #[serde(default, rename = "Name")]
82 name_upper: String,
83 #[serde(default, rename = "name")]
84 name_lower: String,
85 #[serde(default, rename = "Id")]
86 id_upper: String,
87 #[serde(default, rename = "id")]
88 id_lower: String,
89}
90
91#[derive(Clone, Debug)]
92pub struct KuboKey {
93 pub name: String,
94 pub id: String,
95}
96
97#[derive(Clone, Debug)]
100pub struct IpnsPublishOptions {
101 pub timeout: Duration,
102 pub allow_offline: bool,
103 pub lifetime: String,
104 pub ttl: Option<String>,
105 pub resolve: bool,
106 pub quieter: bool,
107}
108
109impl Default for IpnsPublishOptions {
110 fn default() -> Self {
111 Self {
112 timeout: Duration::from_secs(15),
113 allow_offline: true,
114 lifetime: "8760h".to_string(),
115 ttl: None,
116 resolve: false,
117 quieter: true,
118 }
119 }
120}
121
122pub async fn wait_for_api(kubo_url: &str, attempts: u32) -> Result<()> {
125 if attempts == 0 {
126 return Err(anyhow!("kubo readiness attempts must be >= 1"));
127 }
128
129 let base = kubo_url.trim_end_matches('/');
130 let url = format!("{base}/api/v0/version");
131 let client = reqwest::Client::builder()
132 .timeout(Duration::from_secs(6))
133 .build()?;
134
135 let mut backoff = Duration::from_millis(200);
136 let mut last_err: Option<anyhow::Error> = None;
137
138 for attempt in 1..=attempts {
139 let result = async {
140 let response = client.post(&url).send().await?.error_for_status()?;
141 let body = response.text().await?;
142 let parsed: VersionResponse = serde_json::from_str(&body)
143 .map_err(|e| anyhow!("failed parsing version response: {} body={}", e, body))?;
144 let version = if !parsed.version_upper.is_empty() {
145 parsed.version_upper
146 } else {
147 parsed.version_lower
148 };
149 if version.trim().is_empty() {
150 return Err(anyhow!("missing version field in response: {}", body));
151 }
152 Ok::<(), anyhow::Error>(())
153 }
154 .await;
155
156 match result {
157 Ok(()) => return Ok(()),
158 Err(err) => {
159 warn!("kubo readiness {}/{}: {}", attempt, attempts, err);
160 last_err = Some(err);
161 if attempt < attempts {
162 sleep(backoff).await;
163 let doubled = backoff.as_millis().saturating_mul(2);
164 backoff = Duration::from_millis(std::cmp::min(doubled, 5_000) as u64);
165 }
166 }
167 }
168 }
169 Err(anyhow!(
170 "kubo API not ready after {} attempts: {}",
171 attempts,
172 last_err
173 .map(|e| e.to_string())
174 .unwrap_or_else(|| "unknown error".to_string())
175 ))
176}
177
178pub async fn ipfs_add(kubo_url: &str, data: Vec<u8>) -> Result<String> {
181 let base = kubo_url.trim_end_matches('/');
182 let url = format!("{base}/api/v0/add");
183
184 let part = multipart::Part::bytes(data).file_name("data");
185 let form = multipart::Form::new().part("file", part);
186
187 let client = reqwest::Client::builder()
188 .timeout(Duration::from_secs(10))
189 .build()?;
190
191 let body = client
192 .post(url)
193 .query(&[("pin", "true")])
194 .multipart(form)
195 .send()
196 .await?
197 .error_for_status()?
198 .text()
199 .await?;
200
201 let parsed: AddResponse = serde_json::from_str(&body)
202 .map_err(|e| anyhow!("failed parsing add response: {} body={}", e, body))?;
203 Ok(parsed.hash)
204}
205
206pub async fn cat_bytes(kubo_url: &str, cid: &str) -> Result<Vec<u8>> {
207 let base = kubo_url.trim_end_matches('/');
208 let url = format!("{base}/api/v0/cat");
209
210 let client = reqwest::Client::builder()
211 .timeout(Duration::from_secs(30))
212 .build()?;
213
214 let bytes = client
215 .post(url)
216 .query(&[("arg", cid)])
217 .send()
218 .await?
219 .error_for_status()?
220 .bytes()
221 .await?;
222
223 Ok(bytes.to_vec())
224}
225
226pub async fn cat_text(kubo_url: &str, cid: &str) -> Result<String> {
227 let bytes = cat_bytes(kubo_url, cid).await?;
228 String::from_utf8(bytes).map_err(|e| anyhow!("non-utf8 content from {}: {}", cid, e))
229}
230
231pub async fn dag_put<T: Serialize>(kubo_url: &str, value: &T) -> Result<String> {
234 let base = kubo_url.trim_end_matches('/');
235 let url = format!("{base}/api/v0/dag/put");
236 let payload = serde_json::to_vec(value)?;
237
238 let part = multipart::Part::bytes(payload)
239 .file_name("node.json")
240 .mime_str("application/json")?;
241 let form = multipart::Form::new().part("file", part);
242
243 let client = reqwest::Client::builder()
244 .timeout(Duration::from_secs(10))
245 .build()?;
246
247 let body = client
248 .post(url)
249 .query(&[
250 ("store-codec", "dag-cbor"),
251 ("input-codec", "dag-json"),
252 ("pin", "true"),
253 ])
254 .multipart(form)
255 .send()
256 .await?
257 .error_for_status()?
258 .text()
259 .await?;
260
261 let parsed: DagPutResponse = serde_json::from_str(&body)
262 .map_err(|e| anyhow!("failed parsing dag/put response: {} body={}", e, body))?;
263 parsed
264 .cid_upper
265 .or(parsed.cid)
266 .map(|c| c.slash)
267 .ok_or_else(|| anyhow!("missing CID in dag/put response: {}", body))
268}
269
270pub async fn dag_get<T: DeserializeOwned>(kubo_url: &str, cid: &str) -> Result<T> {
271 let base = kubo_url.trim_end_matches('/');
272 let url = format!("{base}/api/v0/dag/get");
273
274 let client = reqwest::Client::builder()
275 .timeout(Duration::from_secs(10))
276 .build()?;
277
278 let body = client
279 .post(url)
280 .query(&[("arg", cid)])
281 .send()
282 .await?
283 .error_for_status()?
284 .text()
285 .await?;
286
287 serde_json::from_str::<T>(&body).map_err(|e| {
288 anyhow!(
289 "failed parsing dag/get response for {}: {} body={}",
290 cid,
291 e,
292 body
293 )
294 })
295}
296
297fn normalize_ipfs_arg(cid_or_path: &str) -> String {
300 let mut value = cid_or_path.trim().to_string();
301 while let Some(rest) = value.strip_prefix("/ipfs/") {
302 value = rest.to_string();
303 }
304 while let Some(rest) = value.strip_prefix('/') {
305 value = rest.to_string();
306 }
307 format!("/ipfs/{value}")
308}
309
310pub async fn name_publish(kubo_url: &str, key_name: &str, cid: &str) -> Result<String> {
311 let options = IpnsPublishOptions::default();
312 name_publish_with_options(kubo_url, key_name, cid, &options).await
313}
314
315pub async fn name_publish_with_options(
316 kubo_url: &str,
317 key_name: &str,
318 cid: &str,
319 options: &IpnsPublishOptions,
320) -> Result<String> {
321 let base = kubo_url.trim_end_matches('/');
322 let url = format!("{base}/api/v0/name/publish");
323 let arg = normalize_ipfs_arg(cid);
324
325 let client = reqwest::Client::builder()
326 .timeout(options.timeout)
327 .build()?;
328
329 let allow_offline = if options.allow_offline {
330 "true"
331 } else {
332 "false"
333 };
334 let resolve = if options.resolve { "true" } else { "false" };
335 let quieter = if options.quieter { "true" } else { "false" };
336
337 let mut params: Vec<(&str, &str)> = vec![
338 ("arg", arg.as_str()),
339 ("key", key_name),
340 ("allow-offline", allow_offline),
341 ("lifetime", options.lifetime.as_str()),
342 ("resolve", resolve),
343 ("quieter", quieter),
344 ];
345 if let Some(ref ttl) = options.ttl {
346 params.push(("ttl", ttl.as_str()));
347 }
348
349 let body = client
350 .post(url)
351 .query(¶ms)
352 .send()
353 .await?
354 .error_for_status()?
355 .text()
356 .await?;
357
358 let parsed: NamePublishResponse = serde_json::from_str(&body)
359 .map_err(|e| anyhow!("failed parsing name/publish response: {} body={}", e, body))?;
360 let value = if !parsed.value_upper.is_empty() {
361 parsed.value_upper
362 } else {
363 parsed.value_lower
364 };
365 if value.is_empty() {
366 return Err(anyhow!("missing value in name/publish response: {}", body));
367 }
368 Ok(value)
369}
370
371pub async fn name_publish_with_retry(
372 kubo_url: &str,
373 key_name: &str,
374 cid: &str,
375 options: &IpnsPublishOptions,
376 attempts: u32,
377 initial_backoff: Duration,
378) -> Result<String> {
379 if attempts == 0 {
380 return Err(anyhow!("name publish attempts must be >= 1"));
381 }
382
383 let mut backoff = initial_backoff;
384 let mut last_err: Option<anyhow::Error> = None;
385
386 for attempt in 1..=attempts {
387 match name_publish_with_options(kubo_url, key_name, cid, options).await {
388 Ok(value) => return Ok(value),
389 Err(err) => {
390 if let Ok(value) = verify_name_target_after_error(kubo_url, key_name, cid).await {
391 warn!(
392 "name publish attempt {}/{} reported error for key '{}' but resolve confirms target; accepting: {}",
393 attempt, attempts, key_name, value
394 );
395 return Ok(value);
396 }
397 warn!(
398 "name publish attempt {}/{} failed for key '{}' cid '{}': {}",
399 attempt, attempts, key_name, cid, err
400 );
401 last_err = Some(err);
402 if attempt < attempts {
403 sleep(backoff).await;
404 let doubled = backoff.as_millis().saturating_mul(2);
405 backoff = Duration::from_millis(std::cmp::min(doubled, 30_000) as u64);
406 }
407 }
408 }
409 }
410
411 Err(anyhow!(
412 "name publish failed after {} attempt(s): {}",
413 attempts,
414 last_err
415 .map(|e| e.to_string())
416 .unwrap_or_else(|| "unknown error".to_string())
417 ))
418}
419
420async fn verify_name_target_after_error(
421 kubo_url: &str,
422 key_name: &str,
423 cid: &str,
424) -> Result<String> {
425 let expected = normalize_ipfs_arg(cid);
426 let resolved = name_resolve(kubo_url, &format!("/ipns/{key_name}"), true).await?;
427 if resolved.trim() == expected {
428 return Ok(resolved);
429 }
430 Err(anyhow!(
431 "post-error resolve mismatch for key '{}': expected '{}' got '{}'",
432 key_name,
433 expected,
434 resolved
435 ))
436}
437
438pub async fn name_resolve(kubo_url: &str, path: &str, recursive: bool) -> Result<String> {
439 let base = kubo_url.trim_end_matches('/');
440 let url = format!("{base}/api/v0/name/resolve");
441
442 let client = reqwest::Client::builder()
443 .timeout(Duration::from_secs(15))
444 .build()?;
445
446 let recursive_flag = if recursive { "true" } else { "false" };
447 let body = client
448 .post(url)
449 .query(&[("arg", path), ("recursive", recursive_flag)])
450 .send()
451 .await?
452 .error_for_status()?
453 .text()
454 .await?;
455
456 let parsed: NameResolveResponse = serde_json::from_str(&body)
457 .map_err(|e| anyhow!("failed parsing name/resolve response: {} body={}", e, body))?;
458 let resolved = if !parsed.path_upper.is_empty() {
459 parsed.path_upper
460 } else {
461 parsed.path_lower
462 };
463 if resolved.is_empty() {
464 return Err(anyhow!("missing path in name/resolve response: {}", body));
465 }
466 Ok(resolved)
467}
468
469pub async fn fetch_did_document(kubo_url: &str, did: &Did) -> Result<Document> {
472 let ipns_path = format!("/ipns/{}", did.ipns);
473 let mut backoff = Duration::from_millis(150);
474 let mut last_err: Option<anyhow::Error> = None;
475 let mut document: Option<Document> = None;
476
477 for attempt in 1..=4 {
478 match dag_get::<Document>(kubo_url, &ipns_path).await {
480 Ok(doc) => {
481 document = Some(doc);
482 break;
483 }
484 Err(dag_err) => {
485 match name_resolve(kubo_url, &ipns_path, true).await {
487 Ok(resolved_path) => {
488 match dag_get::<Document>(kubo_url, &resolved_path).await {
489 Ok(doc) => {
490 document = Some(doc);
491 break;
492 }
493 Err(err) => {
494 last_err = Some(anyhow!(
495 "dag_get failed for {}: direct={} resolved={}",
496 ipns_path,
497 dag_err,
498 err
499 ));
500 }
501 }
502 }
503 Err(resolve_err) => {
504 last_err = Some(anyhow!(
505 "dag_get and name/resolve both failed for {}: dag={} resolve={}",
506 ipns_path,
507 dag_err,
508 resolve_err
509 ));
510 if !should_retry_name_resolve_error(&resolve_err) {
511 break;
512 }
513 }
514 }
515 }
516 }
517
518 if attempt < 4 {
519 sleep(backoff).await;
520 let doubled = backoff.as_millis().saturating_mul(2);
521 backoff = Duration::from_millis(std::cmp::min(doubled, 2_000) as u64);
522 }
523 }
524
525 let document = document.ok_or_else(|| {
526 anyhow!(
527 "failed to fetch DID document for {} via {} after retries: {}",
528 did.id(),
529 ipns_path,
530 last_err
531 .map(|e| e.to_string())
532 .unwrap_or_else(|| "unknown error".to_string())
533 )
534 })?;
535
536 document.validate()?;
537 document.verify()?;
538
539 let doc_did = Did::try_from(document.id.as_str())
540 .map_err(|e| anyhow!("DID document has invalid id '{}': {}", document.id, e))?;
541 if doc_did.ipns != did.ipns {
542 return Err(anyhow!(
543 "DID document IPNS mismatch: expected {} but document id is {}",
544 did.base_id(),
545 document.id
546 ));
547 }
548
549 Ok(document)
550}
551
552fn should_retry_name_resolve_error(err: &anyhow::Error) -> bool {
553 let text = err.to_string().to_ascii_lowercase();
554 if text.contains("http status client error") {
555 return false;
556 }
557 if text.contains("missing path in name/resolve response") {
558 return false;
559 }
560 true
561}
562
563pub async fn pin_add_named(kubo_url: &str, cid: &str, name: &str) -> Result<()> {
566 let base = kubo_url.trim_end_matches('/');
567 let url = format!("{base}/api/v0/pin/add");
568 let arg = normalize_ipfs_arg(cid);
569
570 let client = reqwest::Client::builder()
571 .timeout(Duration::from_secs(10))
572 .build()?;
573
574 client
575 .post(url)
576 .query(&[("arg", arg.as_str()), ("recursive", "true"), ("name", name)])
577 .send()
578 .await?
579 .error_for_status()?;
580
581 Ok(())
582}
583
584pub async fn pin_rm(kubo_url: &str, cid: &str) -> Result<()> {
585 let base = kubo_url.trim_end_matches('/');
586 let url = format!("{base}/api/v0/pin/rm");
587 let arg = normalize_ipfs_arg(cid);
588
589 let client = reqwest::Client::builder()
590 .timeout(Duration::from_secs(10))
591 .build()?;
592
593 client
594 .post(url)
595 .query(&[("arg", arg.as_str()), ("recursive", "true")])
596 .send()
597 .await?
598 .error_for_status()?;
599
600 Ok(())
601}
602
603pub async fn generate_key(kubo_url: &str, key_name: &str) -> Result<()> {
606 let base = kubo_url.trim_end_matches('/');
607 let url = format!("{base}/api/v0/key/gen");
608
609 reqwest::Client::builder()
610 .timeout(Duration::from_secs(10))
611 .build()?
612 .post(url)
613 .query(&[("arg", key_name), ("type", "ed25519")])
614 .send()
615 .await?
616 .error_for_status()?;
617
618 Ok(())
619}
620
621pub async fn import_key(kubo_url: &str, key_name: &str, key_bytes: Vec<u8>) -> Result<KuboKey> {
622 let base = kubo_url.trim_end_matches('/');
623 let url = format!("{base}/api/v0/key/import");
624
625 let part = multipart::Part::bytes(key_bytes)
626 .file_name("ipns.key")
627 .mime_str("application/octet-stream")?;
628 let form = multipart::Form::new().part("file", part);
629
630 let response = reqwest::Client::builder()
631 .timeout(Duration::from_secs(10))
632 .build()?
633 .post(url)
634 .query(&[("arg", key_name), ("allow-any-key-type", "true")])
635 .multipart(form)
636 .send()
637 .await?
638 .error_for_status()?;
639
640 let body = response.text().await?;
641 let parsed: KeyImportResponse = serde_json::from_str(&body)
642 .map_err(|e| anyhow!("failed parsing key/import response: {} body={}", e, body))?;
643
644 let name = if !parsed.name_upper.trim().is_empty() {
645 parsed.name_upper.trim().to_string()
646 } else {
647 parsed.name_lower.trim().to_string()
648 };
649 let id = if !parsed.id_upper.trim().is_empty() {
650 parsed.id_upper.trim().to_string()
651 } else {
652 parsed.id_lower.trim().to_string()
653 };
654
655 if name.is_empty() || id.is_empty() {
656 return Err(anyhow!("missing name/id in key/import response: {}", body));
657 }
658
659 Ok(KuboKey { name, id })
660}
661
662pub async fn list_keys(kubo_url: &str) -> Result<Vec<KuboKey>> {
663 let base = kubo_url.trim_end_matches('/');
664 let url = format!("{base}/api/v0/key/list");
665
666 let body = reqwest::Client::builder()
667 .timeout(Duration::from_secs(10))
668 .build()?
669 .post(url)
670 .send()
671 .await?
672 .error_for_status()?
673 .text()
674 .await?;
675
676 let parsed: KeyListResponse = serde_json::from_str(&body)
677 .map_err(|e| anyhow!("failed parsing key/list response: {} body={}", e, body))?;
678 Ok(parsed
679 .keys
680 .into_iter()
681 .filter_map(|k| {
682 let name = if !k.name.trim().is_empty() {
683 k.name.trim().to_string()
684 } else {
685 k.name_lower.trim().to_string()
686 };
687 let id = if !k.id.trim().is_empty() {
688 k.id.trim().to_string()
689 } else {
690 k.id_lower.trim().to_string()
691 };
692 if name.is_empty() {
693 None
694 } else {
695 Some(KuboKey { name, id })
696 }
697 })
698 .collect())
699}
700
701pub async fn list_key_names(kubo_url: &str) -> Result<Vec<String>> {
702 let keys = list_keys(kubo_url).await?;
703 Ok(keys.into_iter().map(|k| k.name).collect())
704}
705
706pub async fn remove_key(kubo_url: &str, key_name: &str) -> Result<()> {
708 let base = kubo_url.trim_end_matches('/');
709 let url = format!("{base}/api/v0/key/rm");
710
711 reqwest::Client::builder()
712 .timeout(Duration::from_secs(10))
713 .build()?
714 .post(url)
715 .query(&[("arg", key_name)])
716 .send()
717 .await?
718 .error_for_status()?;
719
720 Ok(())
721}
722
723#[cfg(test)]
726mod tests {
727 use super::*;
728
729 #[test]
730 fn normalize_ipfs_arg_from_raw_cid() {
731 assert_eq!(normalize_ipfs_arg("QmExampleCid"), "/ipfs/QmExampleCid");
732 }
733
734 #[test]
735 fn normalize_ipfs_arg_from_prefixed_path() {
736 assert_eq!(
737 normalize_ipfs_arg("/ipfs/QmExampleCid"),
738 "/ipfs/QmExampleCid"
739 );
740 }
741
742 #[test]
743 fn normalize_ipfs_arg_from_double_prefixed_path() {
744 assert_eq!(
745 normalize_ipfs_arg("/ipfs//ipfs/QmExampleCid"),
746 "/ipfs/QmExampleCid"
747 );
748 }
749
750 #[test]
751 fn does_not_retry_http_client_status_errors() {
752 let err = anyhow!(
753 "HTTP status client error (404 Not Found) for url (http://127.0.0.1:5001/api/v0/name/resolve)"
754 );
755 assert!(!should_retry_name_resolve_error(&err));
756 }
757
758 #[test]
759 fn retries_http_server_status_errors() {
760 let err = anyhow!(
761 "HTTP status server error (500 Internal Server Error) for url (http://127.0.0.1:5001/api/v0/name/resolve)"
762 );
763 assert!(should_retry_name_resolve_error(&err));
764 }
765
766 #[test]
767 fn retries_network_errors() {
768 let err =
769 anyhow!("error sending request for url (http://127.0.0.1:5001/api/v0/name/resolve)");
770 assert!(should_retry_name_resolve_error(&err));
771 }
772}