1use crate::{Did, Document};
7use anyhow::{anyhow, Result};
8use reqwest::multipart;
9use serde::de::DeserializeOwned;
10use serde::{Deserialize, Serialize};
11use tokio::time::sleep;
12use tracing::warn;
13use web_time::Duration;
14
15#[derive(Debug, Deserialize)]
18struct AddResponse {
19 #[serde(rename = "Hash")]
20 hash: String,
21}
22
23#[derive(Debug, Deserialize)]
24struct DagPutCid {
25 #[serde(rename = "/")]
26 slash: String,
27}
28
29#[derive(Debug, Deserialize)]
30struct DagPutResponse {
31 #[serde(default, rename = "Cid")]
32 cid_upper: Option<DagPutCid>,
33 #[serde(default)]
34 cid: Option<DagPutCid>,
35}
36
37#[derive(Debug, Deserialize)]
38struct NamePublishResponse {
39 #[serde(default, rename = "Value")]
40 value_upper: String,
41 #[serde(default, rename = "value")]
42 value_lower: String,
43}
44
45#[derive(Debug, Deserialize)]
46struct NameResolveResponse {
47 #[serde(default, rename = "Path")]
48 path_upper: String,
49 #[serde(default, rename = "path")]
50 path_lower: String,
51}
52
53#[derive(Debug, Deserialize)]
54struct VersionResponse {
55 #[serde(default, rename = "Version")]
56 version_upper: String,
57 #[serde(default, rename = "version")]
58 version_lower: String,
59}
60
61#[derive(Debug, Deserialize)]
62struct KeyListEntry {
63 #[serde(default, rename = "Name")]
64 name: String,
65 #[serde(default, rename = "name")]
66 name_lower: String,
67 #[serde(default, rename = "Id")]
68 id: String,
69 #[serde(default, rename = "id")]
70 id_lower: String,
71}
72
73#[derive(Debug, Deserialize)]
74struct KeyListResponse {
75 #[serde(default, rename = "Keys")]
76 keys: Vec<KeyListEntry>,
77}
78
79#[derive(Debug, Deserialize)]
80struct KeyImportResponse {
81 #[serde(default, rename = "Name")]
82 name_upper: String,
83 #[serde(default, rename = "name")]
84 name_lower: String,
85 #[serde(default, rename = "Id")]
86 id_upper: String,
87 #[serde(default, rename = "id")]
88 id_lower: String,
89}
90
91#[derive(Clone, Debug)]
92pub struct KuboKey {
93 pub name: String,
94 pub id: String,
95}
96
97#[derive(Clone, Debug)]
100pub struct IpnsPublishOptions {
101 pub timeout: Duration,
102 pub allow_offline: bool,
103 pub lifetime: String,
104 pub ttl: Option<String>,
105 pub resolve: bool,
106 pub quieter: bool,
107}
108
109impl Default for IpnsPublishOptions {
110 fn default() -> Self {
111 Self {
112 timeout: Duration::from_mins(2),
113 allow_offline: true,
114 lifetime: "8760h".to_string(),
115 ttl: None,
116 resolve: false,
117 quieter: true,
118 }
119 }
120}
121
122pub async fn wait_for_api(kubo_url: &str, attempts: u32) -> Result<()> {
125 if attempts == 0 {
126 return Err(anyhow!("kubo readiness attempts must be >= 1"));
127 }
128
129 let base = kubo_url.trim_end_matches('/');
130 let url = format!("{base}/api/v0/version");
131 let client = reqwest::Client::builder()
132 .timeout(Duration::from_secs(6))
133 .build()?;
134
135 let mut fib_prev = Duration::from_millis(0);
136 let mut fib_curr = Duration::from_millis(200);
137 let mut last_err: Option<anyhow::Error> = None;
138
139 for attempt in 1..=attempts {
140 let result = async {
141 let response = client.post(&url).send().await?.error_for_status()?;
142 let body = response.text().await?;
143 let parsed: VersionResponse = serde_json::from_str(&body)
144 .map_err(|e| anyhow!("failed parsing version response: {} body={}", e, body))?;
145 let version = if !parsed.version_upper.is_empty() {
146 parsed.version_upper
147 } else {
148 parsed.version_lower
149 };
150 if version.trim().is_empty() {
151 return Err(anyhow!("missing version field in response: {}", body));
152 }
153 Ok::<(), anyhow::Error>(())
154 }
155 .await;
156
157 match result {
158 Ok(()) => return Ok(()),
159 Err(err) => {
160 warn!("kubo readiness {}/{}: {}", attempt, attempts, err);
161 last_err = Some(err);
162 if attempt < attempts {
163 sleep(fib_curr).await;
164 let next_ms = fib_prev.as_millis().saturating_add(fib_curr.as_millis());
165 fib_prev = fib_curr;
166 fib_curr = Duration::from_millis(std::cmp::min(next_ms, 5_000) as u64);
167 }
168 }
169 }
170 }
171 Err(anyhow!(
172 "kubo API not ready after {} attempts: {}",
173 attempts,
174 last_err
175 .map(|e| e.to_string())
176 .unwrap_or_else(|| "unknown error".to_string())
177 ))
178}
179
180pub async fn ipfs_add(kubo_url: &str, data: Vec<u8>) -> Result<String> {
183 let base = kubo_url.trim_end_matches('/');
184 let url = format!("{base}/api/v0/add");
185
186 let part = multipart::Part::bytes(data).file_name("data");
187 let form = multipart::Form::new().part("file", part);
188
189 let client = reqwest::Client::builder()
190 .timeout(Duration::from_secs(10))
191 .build()?;
192
193 let body = client
194 .post(url)
195 .query(&[("pin", "true")])
196 .multipart(form)
197 .send()
198 .await?
199 .error_for_status()?
200 .text()
201 .await?;
202
203 let parsed: AddResponse = serde_json::from_str(&body)
204 .map_err(|e| anyhow!("failed parsing add response: {} body={}", e, body))?;
205 Ok(parsed.hash)
206}
207
208pub async fn cat_bytes(kubo_url: &str, cid: &str) -> Result<Vec<u8>> {
209 let base = kubo_url.trim_end_matches('/');
210 let url = format!("{base}/api/v0/cat");
211
212 let client = reqwest::Client::builder()
213 .timeout(Duration::from_secs(30))
214 .build()?;
215
216 let bytes = client
217 .post(url)
218 .query(&[("arg", cid)])
219 .send()
220 .await?
221 .error_for_status()?
222 .bytes()
223 .await?;
224
225 Ok(bytes.to_vec())
226}
227
228pub async fn cat_text(kubo_url: &str, cid: &str) -> Result<String> {
229 let bytes = cat_bytes(kubo_url, cid).await?;
230 String::from_utf8(bytes).map_err(|e| anyhow!("non-utf8 content from {}: {}", cid, e))
231}
232
233pub async fn dag_put<T: Serialize>(kubo_url: &str, value: &T) -> Result<String> {
236 let base = kubo_url.trim_end_matches('/');
237 let url = format!("{base}/api/v0/dag/put");
238 let payload = serde_json::to_vec(value)?;
239
240 let part = multipart::Part::bytes(payload)
241 .file_name("node.json")
242 .mime_str("application/json")?;
243 let form = multipart::Form::new().part("file", part);
244
245 let client = reqwest::Client::builder()
246 .timeout(Duration::from_secs(10))
247 .build()?;
248
249 let body = client
250 .post(url)
251 .query(&[
252 ("store-codec", "dag-cbor"),
253 ("input-codec", "dag-json"),
254 ("pin", "true"),
255 ])
256 .multipart(form)
257 .send()
258 .await?
259 .error_for_status()?
260 .text()
261 .await?;
262
263 let parsed: DagPutResponse = serde_json::from_str(&body)
264 .map_err(|e| anyhow!("failed parsing dag/put response: {} body={}", e, body))?;
265 parsed
266 .cid_upper
267 .or(parsed.cid)
268 .map(|c| c.slash)
269 .ok_or_else(|| anyhow!("missing CID in dag/put response: {}", body))
270}
271
272pub async fn dag_get<T: DeserializeOwned>(kubo_url: &str, cid: &str) -> Result<T> {
273 let base = kubo_url.trim_end_matches('/');
274 let url = format!("{base}/api/v0/dag/get");
275
276 let client = reqwest::Client::builder()
277 .timeout(Duration::from_secs(10))
278 .build()?;
279
280 let body = client
281 .post(url)
282 .query(&[("arg", cid)])
283 .send()
284 .await?
285 .error_for_status()?
286 .text()
287 .await?;
288
289 serde_json::from_str::<T>(&body).map_err(|e| {
290 anyhow!(
291 "failed parsing dag/get response for {}: {} body={}",
292 cid,
293 e,
294 body
295 )
296 })
297}
298
299fn normalize_ipfs_arg(cid_or_path: &str) -> String {
302 let mut value = cid_or_path.trim().to_string();
303 while let Some(rest) = value.strip_prefix("/ipfs/") {
304 value = rest.to_string();
305 }
306 while let Some(rest) = value.strip_prefix('/') {
307 value = rest.to_string();
308 }
309 format!("/ipfs/{value}")
310}
311
312pub async fn name_publish(kubo_url: &str, key_name: &str, cid: &str) -> Result<String> {
313 let options = IpnsPublishOptions::default();
314 name_publish_with_options(kubo_url, key_name, cid, &options).await
315}
316
317pub async fn name_publish_with_options(
318 kubo_url: &str,
319 key_name: &str,
320 cid: &str,
321 options: &IpnsPublishOptions,
322) -> Result<String> {
323 let base = kubo_url.trim_end_matches('/');
324 let url = format!("{base}/api/v0/name/publish");
325 let arg = normalize_ipfs_arg(cid);
326
327 let client = reqwest::Client::builder()
328 .timeout(options.timeout)
329 .build()?;
330
331 let allow_offline = if options.allow_offline {
332 "true"
333 } else {
334 "false"
335 };
336 let resolve = if options.resolve { "true" } else { "false" };
337 let quieter = if options.quieter { "true" } else { "false" };
338
339 let mut params: Vec<(&str, &str)> = vec![
340 ("arg", arg.as_str()),
341 ("key", key_name),
342 ("allow-offline", allow_offline),
343 ("lifetime", options.lifetime.as_str()),
344 ("resolve", resolve),
345 ("quieter", quieter),
346 ];
347 if let Some(ref ttl) = options.ttl {
348 params.push(("ttl", ttl.as_str()));
349 }
350
351 let body = client
352 .post(url)
353 .query(¶ms)
354 .send()
355 .await?
356 .error_for_status()?
357 .text()
358 .await?;
359
360 let parsed: NamePublishResponse = serde_json::from_str(&body)
361 .map_err(|e| anyhow!("failed parsing name/publish response: {} body={}", e, body))?;
362 let value = if !parsed.value_upper.is_empty() {
363 parsed.value_upper
364 } else {
365 parsed.value_lower
366 };
367 if value.is_empty() {
368 return Err(anyhow!("missing value in name/publish response: {}", body));
369 }
370 Ok(value)
371}
372
373pub async fn name_publish_with_retry(
374 kubo_url: &str,
375 key_name: &str,
376 cid: &str,
377 options: &IpnsPublishOptions,
378 attempts: u32,
379 initial_backoff: Duration,
380) -> Result<String> {
381 if attempts == 0 {
382 return Err(anyhow!("name publish attempts must be >= 1"));
383 }
384
385 let mut fib_prev = Duration::from_millis(0);
386 let mut fib_curr = initial_backoff;
387 let mut last_err: Option<anyhow::Error> = None;
388
389 for attempt in 1..=attempts {
390 match name_publish_with_options(kubo_url, key_name, cid, options).await {
391 Ok(value) => return Ok(value),
392 Err(err) => {
393 if let Ok(value) = verify_name_target_after_error(kubo_url, key_name, cid).await {
394 warn!(
395 "name publish attempt {}/{} reported error for key '{}' but resolve confirms target; accepting: {}",
396 attempt, attempts, key_name, value
397 );
398 return Ok(value);
399 }
400 warn!(
401 "name publish attempt {}/{} failed for key '{}' cid '{}': {}",
402 attempt, attempts, key_name, cid, err
403 );
404 last_err = Some(err);
405 if attempt < attempts {
406 sleep(fib_curr).await;
407 let next_ms = fib_prev.as_millis().saturating_add(fib_curr.as_millis());
408 fib_prev = fib_curr;
409 fib_curr = Duration::from_millis(std::cmp::min(next_ms, 30_000) as u64);
410 }
411 }
412 }
413 }
414
415 Err(anyhow!(
416 "name publish failed after {} attempt(s): {}",
417 attempts,
418 last_err
419 .map(|e| e.to_string())
420 .unwrap_or_else(|| "unknown error".to_string())
421 ))
422}
423
424async fn verify_name_target_after_error(
425 kubo_url: &str,
426 key_name: &str,
427 cid: &str,
428) -> Result<String> {
429 let expected = normalize_ipfs_arg(cid);
430 let resolved = name_resolve(kubo_url, &format!("/ipns/{key_name}"), true).await?;
431 if resolved.trim() == expected {
432 return Ok(resolved);
433 }
434 Err(anyhow!(
435 "post-error resolve mismatch for key '{}': expected '{}' got '{}'",
436 key_name,
437 expected,
438 resolved
439 ))
440}
441
442pub async fn name_resolve(kubo_url: &str, path: &str, recursive: bool) -> Result<String> {
443 let base = kubo_url.trim_end_matches('/');
444 let url = format!("{base}/api/v0/name/resolve");
445
446 let client = reqwest::Client::builder()
447 .timeout(Duration::from_secs(15))
448 .build()?;
449
450 let recursive_flag = if recursive { "true" } else { "false" };
451 let body = client
452 .post(url)
453 .query(&[("arg", path), ("recursive", recursive_flag)])
454 .send()
455 .await?
456 .error_for_status()?
457 .text()
458 .await?;
459
460 let parsed: NameResolveResponse = serde_json::from_str(&body)
461 .map_err(|e| anyhow!("failed parsing name/resolve response: {} body={}", e, body))?;
462 let resolved = if !parsed.path_upper.is_empty() {
463 parsed.path_upper
464 } else {
465 parsed.path_lower
466 };
467 if resolved.is_empty() {
468 return Err(anyhow!("missing path in name/resolve response: {}", body));
469 }
470 Ok(resolved)
471}
472
473pub async fn fetch_did_document(kubo_url: &str, did: &Did) -> Result<Document> {
476 let ipns_path = format!("/ipns/{}", did.ipns);
477 let mut fib_prev = Duration::from_millis(0);
478 let mut fib_curr = Duration::from_millis(150);
479 let mut last_err: Option<anyhow::Error> = None;
480 let mut document: Option<Document> = None;
481
482 for attempt in 1..=4 {
483 match dag_get::<Document>(kubo_url, &ipns_path).await {
485 Ok(doc) => {
486 document = Some(doc);
487 break;
488 }
489 Err(dag_err) => {
490 match name_resolve(kubo_url, &ipns_path, true).await {
492 Ok(resolved_path) => {
493 match dag_get::<Document>(kubo_url, &resolved_path).await {
494 Ok(doc) => {
495 document = Some(doc);
496 break;
497 }
498 Err(err) => {
499 last_err = Some(anyhow!(
500 "dag_get failed for {}: direct={} resolved={}",
501 ipns_path,
502 dag_err,
503 err
504 ));
505 }
506 }
507 }
508 Err(resolve_err) => {
509 last_err = Some(anyhow!(
510 "dag_get and name/resolve both failed for {}: dag={} resolve={}",
511 ipns_path,
512 dag_err,
513 resolve_err
514 ));
515 if !should_retry_name_resolve_error(&resolve_err) {
516 break;
517 }
518 }
519 }
520 }
521 }
522
523 if attempt < 4 {
524 sleep(fib_curr).await;
525 let next_ms = fib_prev.as_millis().saturating_add(fib_curr.as_millis());
526 fib_prev = fib_curr;
527 fib_curr = Duration::from_millis(std::cmp::min(next_ms, 2_000) as u64);
528 }
529 }
530
531 let document = document.ok_or_else(|| {
532 anyhow!(
533 "failed to fetch DID document for {} via {} after retries: {}",
534 did.id(),
535 ipns_path,
536 last_err
537 .map(|e| e.to_string())
538 .unwrap_or_else(|| "unknown error".to_string())
539 )
540 })?;
541
542 document.validate()?;
543 document.verify()?;
544
545 let doc_did = Did::try_from(document.id.as_str())
546 .map_err(|e| anyhow!("DID document has invalid id '{}': {}", document.id, e))?;
547 if doc_did.ipns != did.ipns {
548 return Err(anyhow!(
549 "DID document IPNS mismatch: expected {} but document id is {}",
550 did.base_id(),
551 document.id
552 ));
553 }
554
555 Ok(document)
556}
557
558fn should_retry_name_resolve_error(err: &anyhow::Error) -> bool {
559 let text = err.to_string().to_ascii_lowercase();
560 if text.contains("http status client error") {
561 return false;
562 }
563 if text.contains("missing path in name/resolve response") {
564 return false;
565 }
566 true
567}
568
569pub async fn pin_add_named(kubo_url: &str, cid: &str, name: &str) -> Result<()> {
572 let base = kubo_url.trim_end_matches('/');
573 let url = format!("{base}/api/v0/pin/add");
574 let arg = normalize_ipfs_arg(cid);
575
576 let client = reqwest::Client::builder()
577 .timeout(Duration::from_secs(10))
578 .build()?;
579
580 client
581 .post(url)
582 .query(&[("arg", arg.as_str()), ("recursive", "true"), ("name", name)])
583 .send()
584 .await?
585 .error_for_status()?;
586
587 Ok(())
588}
589
590pub async fn pin_rm(kubo_url: &str, cid: &str) -> Result<()> {
591 let base = kubo_url.trim_end_matches('/');
592 let url = format!("{base}/api/v0/pin/rm");
593 let arg = normalize_ipfs_arg(cid);
594
595 let client = reqwest::Client::builder()
596 .timeout(Duration::from_secs(10))
597 .build()?;
598
599 client
600 .post(url)
601 .query(&[("arg", arg.as_str()), ("recursive", "true")])
602 .send()
603 .await?
604 .error_for_status()?;
605
606 Ok(())
607}
608
609pub async fn generate_key(kubo_url: &str, key_name: &str) -> Result<()> {
612 let base = kubo_url.trim_end_matches('/');
613 let url = format!("{base}/api/v0/key/gen");
614
615 reqwest::Client::builder()
616 .timeout(Duration::from_secs(10))
617 .build()?
618 .post(url)
619 .query(&[("arg", key_name), ("type", "ed25519")])
620 .send()
621 .await?
622 .error_for_status()?;
623
624 Ok(())
625}
626
627pub async fn import_key(kubo_url: &str, key_name: &str, key_bytes: Vec<u8>) -> Result<KuboKey> {
628 let base = kubo_url.trim_end_matches('/');
629 let url = format!("{base}/api/v0/key/import");
630
631 let part = multipart::Part::bytes(key_bytes)
632 .file_name("ipns.key")
633 .mime_str("application/octet-stream")?;
634 let form = multipart::Form::new().part("file", part);
635
636 let response = reqwest::Client::builder()
637 .timeout(Duration::from_secs(10))
638 .build()?
639 .post(url)
640 .query(&[
641 ("arg", key_name),
642 ("ipns-base", "base36"),
643 ("allow-any-key-type", "true"),
644 ])
645 .multipart(form)
646 .send()
647 .await?
648 .error_for_status()?;
649
650 let body = response.text().await?;
651 let parsed: KeyImportResponse = serde_json::from_str(&body)
652 .map_err(|e| anyhow!("failed parsing key/import response: {} body={}", e, body))?;
653
654 let name = if !parsed.name_upper.trim().is_empty() {
655 parsed.name_upper.trim().to_string()
656 } else {
657 parsed.name_lower.trim().to_string()
658 };
659 let id = if !parsed.id_upper.trim().is_empty() {
660 parsed.id_upper.trim().to_string()
661 } else {
662 parsed.id_lower.trim().to_string()
663 };
664
665 if name.is_empty() || id.is_empty() {
666 return Err(anyhow!("missing name/id in key/import response: {}", body));
667 }
668
669 Ok(KuboKey { name, id })
670}
671
672pub async fn list_keys(kubo_url: &str) -> Result<Vec<KuboKey>> {
673 let base = kubo_url.trim_end_matches('/');
674 let url = format!("{base}/api/v0/key/list");
675
676 let body = reqwest::Client::builder()
677 .timeout(Duration::from_secs(10))
678 .build()?
679 .post(url)
680 .send()
681 .await?
682 .error_for_status()?
683 .text()
684 .await?;
685
686 let parsed: KeyListResponse = serde_json::from_str(&body)
687 .map_err(|e| anyhow!("failed parsing key/list response: {} body={}", e, body))?;
688 Ok(parsed
689 .keys
690 .into_iter()
691 .filter_map(|k| {
692 let name = if !k.name.trim().is_empty() {
693 k.name.trim().to_string()
694 } else {
695 k.name_lower.trim().to_string()
696 };
697 let id = if !k.id.trim().is_empty() {
698 k.id.trim().to_string()
699 } else {
700 k.id_lower.trim().to_string()
701 };
702 if name.is_empty() {
703 None
704 } else {
705 Some(KuboKey { name, id })
706 }
707 })
708 .collect())
709}
710
711pub async fn list_key_names(kubo_url: &str) -> Result<Vec<String>> {
712 let keys = list_keys(kubo_url).await?;
713 Ok(keys.into_iter().map(|k| k.name).collect())
714}
715
716pub async fn remove_key(kubo_url: &str, key_name: &str) -> Result<()> {
718 let base = kubo_url.trim_end_matches('/');
719 let url = format!("{base}/api/v0/key/rm");
720
721 reqwest::Client::builder()
722 .timeout(Duration::from_secs(10))
723 .build()?
724 .post(url)
725 .query(&[("arg", key_name)])
726 .send()
727 .await?
728 .error_for_status()?;
729
730 Ok(())
731}
732
733#[cfg(test)]
736mod tests {
737 use super::*;
738
739 #[test]
740 fn normalize_ipfs_arg_from_raw_cid() {
741 assert_eq!(normalize_ipfs_arg("QmExampleCid"), "/ipfs/QmExampleCid");
742 }
743
744 #[test]
745 fn normalize_ipfs_arg_from_prefixed_path() {
746 assert_eq!(
747 normalize_ipfs_arg("/ipfs/QmExampleCid"),
748 "/ipfs/QmExampleCid"
749 );
750 }
751
752 #[test]
753 fn normalize_ipfs_arg_from_double_prefixed_path() {
754 assert_eq!(
755 normalize_ipfs_arg("/ipfs//ipfs/QmExampleCid"),
756 "/ipfs/QmExampleCid"
757 );
758 }
759
760 #[test]
761 fn does_not_retry_http_client_status_errors() {
762 let err = anyhow!(
763 "HTTP status client error (404 Not Found) for url (http://127.0.0.1:5001/api/v0/name/resolve)"
764 );
765 assert!(!should_retry_name_resolve_error(&err));
766 }
767
768 #[test]
769 fn retries_http_server_status_errors() {
770 let err = anyhow!(
771 "HTTP status server error (500 Internal Server Error) for url (http://127.0.0.1:5001/api/v0/name/resolve)"
772 );
773 assert!(should_retry_name_resolve_error(&err));
774 }
775
776 #[test]
777 fn retries_network_errors() {
778 let err =
779 anyhow!("error sending request for url (http://127.0.0.1:5001/api/v0/name/resolve)");
780 assert!(should_retry_name_resolve_error(&err));
781 }
782}