Skip to main content

ma_core/ipfs/
gateway_resolver.rs

1//! IPFS gateway DID document resolver traits and implementations.
2
3use crate::Document;
4#[cfg(target_arch = "wasm32")]
5use async_trait::async_trait;
6#[cfg(not(target_arch = "wasm32"))]
7use async_trait::async_trait;
8use std::collections::HashMap;
9use std::sync::Mutex;
10use web_time::{Duration, Instant};
11
12/// Trait for resolving a DID to its DID document.
13///
14/// Ship with `IpfsGatewayResolver` for HTTP gateway resolution.
15/// Implement this trait for custom resolution strategies.
16#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
17#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
18pub trait DidDocumentResolver: Send + Sync {
19    async fn resolve(&self, did: &str) -> crate::error::Result<Document>;
20
21    /// Update resolver cache TTLs at runtime.
22    ///
23    /// Default implementation is a no-op for resolvers without mutable cache policy.
24    fn set_cache_ttls(&self, _positive_ttl: Duration, _negative_ttl: Duration) {}
25
26    /// Return current resolver cache TTLs when supported.
27    fn cache_ttls(&self) -> Option<(Duration, Duration)> {
28        None
29    }
30}
31
32/// Resolves DID documents via an IPFS/IPNS HTTP gateway.
33///
34/// The gateway must serve DID documents at `/ipns/<key-id>`.
35pub struct IpfsGatewayResolver {
36    gateways: Vec<String>,
37    client: reqwest::Client,
38    positive_ttl: Mutex<Duration>,
39    negative_ttl: Mutex<Duration>,
40    localhost_cooldown: Duration,
41    cache: Mutex<HashMap<String, CacheEntry>>,
42    localhost_blocked_until: Mutex<Option<Instant>>,
43}
44
45#[derive(Clone)]
46struct CacheEntry {
47    expires_at: Instant,
48    value: CacheValue,
49}
50
51#[derive(Clone)]
52enum CacheValue {
53    Hit(Vec<u8>),
54    Miss(String),
55}
56
57impl IpfsGatewayResolver {
58    const LOCALHOST_GATEWAY: &'static str = "http://127.0.0.1:8080/";
59    const DEFAULT_PUBLIC_GATEWAYS: [&'static str; 2] = ["https://dweb.link/", "https://w3s.link/"];
60
61    pub fn new(gateway_url: impl Into<String>) -> Self {
62        let primary = normalize_gateway_url(&gateway_url.into());
63
64        let mut gateways = Vec::new();
65        push_gateway(&mut gateways, Self::LOCALHOST_GATEWAY);
66        push_gateway(&mut gateways, &primary);
67        for fallback in Self::DEFAULT_PUBLIC_GATEWAYS {
68            push_gateway(&mut gateways, fallback);
69        }
70
71        #[cfg(not(target_arch = "wasm32"))]
72        let client = reqwest::Client::builder()
73            .timeout(Duration::from_secs(4))
74            .build()
75            .unwrap_or_else(|_| reqwest::Client::new());
76
77        #[cfg(target_arch = "wasm32")]
78        let client = reqwest::Client::builder()
79            .build()
80            .unwrap_or_else(|_| reqwest::Client::new());
81
82        Self {
83            gateways,
84            client,
85            positive_ttl: Mutex::new(Duration::from_mins(1)),
86            negative_ttl: Mutex::new(Duration::from_secs(10)),
87            localhost_cooldown: Duration::from_secs(20),
88            cache: Mutex::new(HashMap::new()),
89            localhost_blocked_until: Mutex::new(None),
90        }
91    }
92
93    #[must_use]
94    pub fn with_cache_ttls(self, positive_ttl: Duration, negative_ttl: Duration) -> Self {
95        self.set_cache_ttls_inner(positive_ttl, negative_ttl);
96        self
97    }
98
99    fn set_cache_ttls_inner(&self, positive_ttl: Duration, negative_ttl: Duration) {
100        if let Ok(mut ttl) = self.positive_ttl.lock() {
101            *ttl = positive_ttl;
102        }
103        if let Ok(mut ttl) = self.negative_ttl.lock() {
104            *ttl = negative_ttl;
105        }
106    }
107
108    fn positive_ttl(&self) -> Duration {
109        self.positive_ttl
110            .lock()
111            .map_or(Duration::from_secs(0), |ttl| *ttl)
112    }
113
114    fn negative_ttl(&self) -> Duration {
115        self.negative_ttl
116            .lock()
117            .map_or(Duration::from_secs(0), |ttl| *ttl)
118    }
119
120    #[must_use]
121    pub fn with_localhost_cooldown(mut self, cooldown: Duration) -> Self {
122        self.localhost_cooldown = cooldown;
123        self
124    }
125}
126
127#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
128#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
129impl DidDocumentResolver for IpfsGatewayResolver {
130    async fn resolve(&self, did: &str) -> crate::error::Result<Document> {
131        let parsed = crate::Did::try_from(did).map_err(crate::error::Error::Validation)?;
132        let did_key = did.to_string();
133        let positive_ttl = self.positive_ttl();
134        let negative_ttl = self.negative_ttl();
135        let cache_hit_enabled = !positive_ttl.is_zero();
136        let cache_miss_enabled = !negative_ttl.is_zero();
137
138        if let Some(cached) = self.read_cache(&did_key, cache_hit_enabled, cache_miss_enabled) {
139            return match cached {
140                CacheValue::Hit(body) => {
141                    parse_document_bytes(&body).map_err(|detail| crate::error::Error::Resolution {
142                        did: did_key,
143                        detail: format!("cached document parse failed: {detail}"),
144                    })
145                }
146                CacheValue::Miss(detail) => Err(crate::error::Error::Resolution {
147                    did: did_key,
148                    detail,
149                }),
150            };
151        }
152
153        let mut errors = Vec::new();
154        let now = Instant::now();
155
156        for gateway in &self.gateways {
157            if is_localhost_gateway(gateway) && self.localhost_is_blocked(now) {
158                errors.push(format!("{} -> skipped (cooldown)", gateway));
159                continue;
160            }
161
162            let url = format!("{}ipns/{}", gateway, parsed.ipns);
163
164            let response = match self.client.get(&url).send().await {
165                Ok(response) => response,
166                Err(err) => {
167                    if is_localhost_gateway(gateway) {
168                        self.block_localhost_until(Some(now + self.localhost_cooldown));
169                    }
170                    errors.push(format!("{url} -> {err}"));
171                    continue;
172                }
173            };
174
175            if !response.status().is_success() {
176                if is_localhost_gateway(gateway) {
177                    self.block_localhost_until(Some(now + self.localhost_cooldown));
178                }
179                errors.push(format!("{url} -> HTTP {}", response.status()));
180                continue;
181            }
182
183            let body = match response.bytes().await {
184                Ok(body) => body,
185                Err(err) => {
186                    if is_localhost_gateway(gateway) {
187                        self.block_localhost_until(Some(now + self.localhost_cooldown));
188                    }
189                    errors.push(format!("{url} -> {err}"));
190                    continue;
191                }
192            };
193
194            let doc = match parse_document_bytes(body.as_ref()) {
195                Ok(doc) => doc,
196                Err(detail) => {
197                    errors.push(format!("{url} -> invalid DID document: {detail}"));
198                    continue;
199                }
200            };
201
202            if is_localhost_gateway(gateway) {
203                self.block_localhost_until(None);
204            }
205
206            if cache_hit_enabled {
207                self.write_cache(
208                    did_key.clone(),
209                    CacheValue::Hit(body.to_vec()),
210                    now + positive_ttl,
211                );
212            }
213            return Ok(doc);
214        }
215
216        let detail = format!("all gateways failed: {}", errors.join(" | "));
217        if cache_miss_enabled {
218            self.write_cache(
219                did_key.clone(),
220                CacheValue::Miss(detail.clone()),
221                now + negative_ttl,
222            );
223        }
224
225        Err(crate::error::Error::Resolution {
226            did: did_key,
227            detail,
228        })
229    }
230
231    fn set_cache_ttls(&self, positive_ttl: Duration, negative_ttl: Duration) {
232        self.set_cache_ttls_inner(positive_ttl, negative_ttl);
233    }
234
235    fn cache_ttls(&self) -> Option<(Duration, Duration)> {
236        Some((self.positive_ttl(), self.negative_ttl()))
237    }
238}
239
240impl IpfsGatewayResolver {
241    fn read_cache(
242        &self,
243        did: &str,
244        cache_hit_enabled: bool,
245        cache_miss_enabled: bool,
246    ) -> Option<CacheValue> {
247        if !cache_hit_enabled && !cache_miss_enabled {
248            return None;
249        }
250
251        let mut cache = self.cache.lock().ok()?;
252        let entry = cache.get(did).cloned()?;
253        if entry.expires_at <= Instant::now() {
254            cache.remove(did);
255            return None;
256        }
257
258        match entry.value {
259            CacheValue::Hit(value) if cache_hit_enabled => Some(CacheValue::Hit(value)),
260            CacheValue::Miss(value) if cache_miss_enabled => Some(CacheValue::Miss(value)),
261            _ => None,
262        }
263    }
264
265    fn write_cache(&self, did: String, value: CacheValue, expires_at: Instant) {
266        if let Ok(mut cache) = self.cache.lock() {
267            cache.insert(did, CacheEntry { expires_at, value });
268        }
269    }
270
271    fn localhost_is_blocked(&self, now: Instant) -> bool {
272        let guard = match self.localhost_blocked_until.lock() {
273            Ok(guard) => guard,
274            Err(_) => return false,
275        };
276        guard.as_ref().is_some_and(|blocked| *blocked > now)
277    }
278
279    fn block_localhost_until(&self, until: Option<Instant>) {
280        if let Ok(mut guard) = self.localhost_blocked_until.lock() {
281            *guard = until;
282        }
283    }
284}
285
286fn normalize_gateway_url(input: &str) -> String {
287    let mut url = input.trim().to_string();
288    if !url.ends_with('/') {
289        url.push('/');
290    }
291    url
292}
293
294fn push_gateway(gateways: &mut Vec<String>, candidate: &str) {
295    let normalized = normalize_gateway_url(candidate);
296    if !gateways.iter().any(|g| g.eq_ignore_ascii_case(&normalized)) {
297        gateways.push(normalized);
298    }
299}
300
301fn is_localhost_gateway(gateway: &str) -> bool {
302    gateway.starts_with("http://127.0.0.1:") || gateway.starts_with("http://localhost:")
303}
304
305fn parse_document_bytes(bytes: &[u8]) -> std::result::Result<Document, String> {
306    Document::decode(bytes).map_err(|err| format!("CBOR decode failed: {err}"))
307}
308
309#[cfg(test)]
310mod tests {
311    use super::parse_document_bytes;
312    use crate::generate_identity_from_secret;
313
314    #[test]
315    fn parses_dag_cbor_documents() {
316        let identity = generate_identity_from_secret([7u8; 32]).expect("identity");
317        let cbor = identity.document.encode().expect("cbor");
318        let parsed = parse_document_bytes(&cbor).expect("parsed cbor");
319        assert_eq!(parsed, identity.document);
320    }
321
322    #[test]
323    fn rejects_non_document_payloads() {
324        let err = parse_document_bytes(b"<html>nope</html>").expect_err("invalid payload");
325        assert!(err.contains("CBOR decode failed"));
326    }
327}