newton-core 0.4.16

newton protocol core sdk
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
//! Two-tier IPFS content caching service.
//!
//! Provides L1 (in-memory via moka) and L2 (Redis) caching for IPFS content.
//! IPFS content is immutable by CID, so cache entries never need invalidation.

use std::{sync::Arc, time::Duration};

use moka::future::Cache;
use redis::{aio::ConnectionManager, AsyncCommands, Client};
use tokio::sync::RwLock;
use tracing::{debug, info, warn};

use crate::{
    common::{retry_with_backoff, RetryConfig},
    config::ipfs::IpfsConfig,
};

/// Error types for IPFS cache operations.
#[derive(Debug, thiserror::Error)]
pub enum IpfsCacheError {
    #[error("Redis connection error: {0}")]
    RedisConnection(String),
    #[error("Redis operation error: {0}")]
    RedisOperation(String),
    #[error("Network fetch error: {0}")]
    NetworkFetch(String),
    #[error("Fetched IPFS content exceeds maximum allowed size: {0} bytes")]
    ResponseTooLarge(usize),
    #[error("Cache miss")]
    CacheMiss,
}

/// Two-tier IPFS content cache service.
///
/// Provides fast content retrieval with:
/// - L1: In-memory moka cache (sub-ms access)
/// - L2: Redis distributed cache (~1ms access)
/// - Fallback: IPFS gateway network fetch (~900-2000ms)
pub struct IpfsCacheService {
    config: IpfsConfig,
    /// L1: In-memory cache (CID -> content bytes)
    memory_cache: Option<Cache<String, Arc<Vec<u8>>>>,
    /// L2: Redis connection manager
    redis_connection: RwLock<Option<ConnectionManager>>,
    /// Redis client for initialization
    redis_client: Option<Client>,
    /// Shared HTTP client for IPFS gateway requests (connection pooling).
    http_client: reqwest::Client,
}

impl std::fmt::Debug for IpfsCacheService {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("IpfsCacheService")
            .field("config", &self.config)
            .field("memory_cache_enabled", &self.memory_cache.is_some())
            .field("redis_enabled", &self.redis_client.is_some())
            .finish()
    }
}

impl IpfsCacheService {
    /// Create a new IPFS cache service with the given configuration.
    ///
    /// # Arguments
    /// * `config` - IPFS configuration including cache settings
    /// * `redis_url` - Optional Redis URL for L2 cache
    pub fn new(config: IpfsConfig, redis_url: Option<&str>) -> Self {
        let memory_cache = if config.cache_enabled {
            let cache = Cache::builder()
                .max_capacity(config.memory_cache_size as u64)
                .time_to_live(Duration::from_secs(config.memory_cache_ttl_secs))
                .build();
            info!(
                "IPFS L1 cache initialized: max_entries={}, ttl={}s",
                config.memory_cache_size, config.memory_cache_ttl_secs
            );
            Some(cache)
        } else {
            info!("IPFS L1 cache disabled");
            None
        };

        let redis_client = if config.cache_enabled && config.redis_cache_enabled {
            match redis_url {
                Some(url) => match Client::open(url) {
                    Ok(client) => {
                        info!("IPFS L2 Redis client created: ttl={}s", config.redis_cache_ttl_secs);
                        Some(client)
                    }
                    Err(e) => {
                        warn!(error = %e, "Failed to create Redis client for IPFS cache, L2 disabled");
                        None
                    }
                },
                None => {
                    info!("No Redis URL provided, IPFS L2 cache disabled");
                    None
                }
            }
        } else {
            None
        };

        Self {
            config,
            memory_cache,
            redis_connection: RwLock::new(None),
            redis_client,
            http_client: reqwest::Client::new(),
        }
    }

    /// Initialize Redis connection manager for L2 cache.
    pub async fn initialize(&self) -> Result<(), IpfsCacheError> {
        if let Some(ref client) = self.redis_client {
            info!("Initializing IPFS cache Redis connection...");
            match tokio::time::timeout(Duration::from_secs(10), client.get_connection_manager()).await {
                Ok(Ok(manager)) => {
                    *self.redis_connection.write().await = Some(manager);
                    info!("IPFS cache Redis connection established");
                }
                Ok(Err(e)) => {
                    warn!(error = %e, "Failed to connect to Redis for IPFS cache, continuing without L2");
                }
                Err(_) => {
                    warn!("Redis connection timeout for IPFS cache, continuing without L2");
                }
            }
        }
        Ok(())
    }

    /// Get content from IPFS with caching.
    ///
    /// Lookup order:
    /// 1. L1 memory cache (sub-ms)
    /// 2. L2 Redis cache (~1ms)
    /// 3. IPFS gateway network fetch (~900-2000ms)
    ///
    /// Successfully fetched content is stored in both cache tiers.
    pub async fn get(&self, cid: &str) -> eyre::Result<Vec<u8>> {
        // L1: Check memory cache first
        if let Some(ref cache) = self.memory_cache {
            if let Some(data) = cache.get(cid).await {
                debug!(cid = cid, "IPFS cache L1 hit");
                return Ok((*data).clone());
            }
        }

        // L2: Check Redis cache
        if let Some(data) = self.get_from_redis(cid).await {
            debug!(cid = cid, "IPFS cache L2 hit");
            // Populate L1 from L2
            if let Some(ref cache) = self.memory_cache {
                cache.insert(cid.to_string(), Arc::new(data.clone())).await;
            }
            return Ok(data);
        }

        // L3: Fetch from IPFS network
        debug!(cid = cid, "IPFS cache miss, fetching from network");
        let data = self.fetch_from_network(cid).await?;

        // Store in both caches
        self.set(cid, data.clone()).await;

        Ok(data)
    }

    /// Get content as UTF-8 text.
    pub async fn get_text(&self, cid: &str) -> eyre::Result<String> {
        let data = self.get(cid).await?;
        String::from_utf8(data).map_err(|e| eyre::eyre!("Invalid UTF-8 content: {}", e))
    }

    /// Store content in both cache tiers.
    async fn set(&self, cid: &str, data: Vec<u8>) {
        let data_arc = Arc::new(data);

        // Store in L1
        if let Some(ref cache) = self.memory_cache {
            cache.insert(cid.to_string(), data_arc.clone()).await;
        }

        // Store in L2
        self.set_in_redis(cid, &data_arc).await;
    }

    /// Get content from Redis L2 cache.
    async fn get_from_redis(&self, cid: &str) -> Option<Vec<u8>> {
        let guard = self.redis_connection.read().await;
        let conn = guard.as_ref()?;

        let key = Self::redis_key(cid);
        let mut conn = conn.clone();

        match conn.get::<_, Option<Vec<u8>>>(&key).await {
            Ok(Some(data)) => Some(data),
            Ok(None) => None,
            Err(e) => {
                warn!(error = %e, cid = cid, "Redis GET failed for IPFS content");
                None
            }
        }
    }

    /// Store content in Redis L2 cache.
    async fn set_in_redis(&self, cid: &str, data: &[u8]) {
        let guard = self.redis_connection.read().await;
        let Some(conn) = guard.as_ref() else { return };

        let key = Self::redis_key(cid);
        let ttl_secs = self.config.redis_cache_ttl_secs;
        let mut conn = conn.clone();

        if let Err(e) = conn.set_ex::<_, _, ()>(&key, data, ttl_secs).await {
            warn!(error = %e, cid = cid, "Redis SET failed for IPFS content");
        }
    }

    /// Generate Redis key for IPFS content.
    fn redis_key(cid: &str) -> String {
        format!("ipfs:content:{}", cid)
    }

    /// Fetch content from IPFS gateway network.
    async fn fetch_from_network(&self, cid: &str) -> eyre::Result<Vec<u8>> {
        let url = self.build_ipfs_url(cid);
        info!(url = %url, "Fetching from IPFS network");

        let response = self.http_client.get(&url).send().await;

        match response {
            Ok(resp) if resp.status().is_success() => {
                let bytes = read_bounded_response(resp, self.config.max_fetch_size_bytes)
                    .await
                    .map_err(|e| eyre::eyre!("Failed to read IPFS response: {}", e))?;
                info!(cid = cid, bytes = bytes.len(), "IPFS fetch successful");
                Ok(bytes)
            }
            Ok(resp) => {
                warn!(
                    cid = cid,
                    status = %resp.status(),
                    "Primary IPFS gateway failed, trying fallback"
                );
                self.fetch_from_fallback(cid).await
            }
            Err(e) => {
                warn!(error = %e, cid = cid, "Primary IPFS gateway error, trying fallback");
                self.fetch_from_fallback(cid).await
            }
        }
    }

    /// Fetch from public IPFS gateway as fallback.
    async fn fetch_from_fallback(&self, cid: &str) -> eyre::Result<Vec<u8>> {
        retry_with_backoff(
            &fallback_retry_config(),
            "ipfs_gateway_fallback_fetch",
            || async { self.fetch_from_fallback_once(cid).await },
            |e| {
                metrics::counter!("ipfs_cache_gateway_fallback_retry_attempts_total").increment(1);
                is_retryable_fallback_error(e)
            },
        )
        .await
    }

    async fn fetch_from_fallback_once(&self, cid: &str) -> eyre::Result<Vec<u8>> {
        let url = format!("{}{}", self.config.fallback_gateway, cid);
        info!(url = %url, "Fetching from fallback IPFS gateway");

        let response = self
            .http_client
            .get(&url)
            .send()
            .await
            .map_err(FallbackFetchError::Request)?;

        if response.status().is_success() {
            let bytes = read_bounded_response(response, self.config.max_fetch_size_bytes).await?;
            info!(cid = cid, bytes = bytes.len(), "Fallback IPFS fetch successful");
            Ok(bytes)
        } else {
            Err(FallbackFetchError::Status(response.status()).into())
        }
    }

    /// Build IPFS gateway URL from config.
    fn build_ipfs_url(&self, cid: &str) -> String {
        let base = &self.config.gateway;
        let params = &self.config.params;

        if params.is_empty() {
            format!("{}{}", base, cid)
        } else {
            format!("{}{}?{}", base, cid, params)
        }
    }

    /// Check if Redis L2 cache is available.
    pub async fn is_redis_available(&self) -> bool {
        let guard = self.redis_connection.read().await;
        if let Some(ref conn) = *guard {
            let mut conn = conn.clone();
            conn.get::<&str, Option<String>>("__ipfs_cache_test__").await.is_ok()
        } else {
            false
        }
    }
}

fn fallback_retry_config() -> RetryConfig {
    RetryConfig {
        max_retries: 5,
        ..Default::default()
    }
}

#[derive(Debug, thiserror::Error)]
enum FallbackFetchError {
    #[error("Fallback IPFS fetch failed: {0}")]
    Request(reqwest::Error),
    #[error("Failed to read fallback IPFS response: {0}")]
    Body(reqwest::Error),
    #[error("Fallback IPFS fetch failed with status: {0}")]
    Status(reqwest::StatusCode),
}

fn is_retryable_fallback_error(error: &eyre::Report) -> bool {
    match error.downcast_ref::<FallbackFetchError>() {
        Some(FallbackFetchError::Request(err) | FallbackFetchError::Body(err)) => {
            err.is_timeout() || err.is_connect() || err.status().is_none()
        }
        Some(FallbackFetchError::Status(status)) => status.is_server_error(),
        None => false,
    }
}

/// Read response body with an explicit byte limit to avoid unbounded memory growth.
async fn read_bounded_response(mut response: reqwest::Response, max_bytes: usize) -> eyre::Result<Vec<u8>> {
    let content_length = response.content_length().map(|v| v as usize);
    if let Some(len) = content_length {
        if len > max_bytes {
            return Err(IpfsCacheError::ResponseTooLarge(len).into());
        }
    }

    let mut bytes = Vec::with_capacity(content_length.unwrap_or(8192).min(max_bytes));
    while let Some(chunk) = response
        .chunk()
        .await
        .map_err(|e| eyre::eyre!("Failed to read response chunk: {}", e))?
    {
        if bytes.len() + chunk.len() > max_bytes {
            return Err(IpfsCacheError::ResponseTooLarge(bytes.len() + chunk.len()).into());
        }
        bytes.extend_from_slice(&chunk);
    }
    Ok(bytes)
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_redis_key_format() {
        let key = IpfsCacheService::redis_key("QmTest123");
        assert_eq!(key, "ipfs:content:QmTest123");
    }

    #[test]
    fn test_build_ipfs_url_without_params() {
        let config = IpfsConfig {
            gateway: "https://ipfs.example.com/ipfs/".to_string(),
            params: String::new(),
            ..Default::default()
        };
        let service = IpfsCacheService::new(config, None);
        let url = service.build_ipfs_url("QmTest123");
        assert_eq!(url, "https://ipfs.example.com/ipfs/QmTest123");
    }

    #[test]
    fn test_build_ipfs_url_with_params() {
        let config = IpfsConfig {
            gateway: "https://ipfs.example.com/ipfs/".to_string(),
            params: "format=dag-json".to_string(),
            ..Default::default()
        };
        let service = IpfsCacheService::new(config, None);
        let url = service.build_ipfs_url("QmTest123");
        assert_eq!(url, "https://ipfs.example.com/ipfs/QmTest123?format=dag-json");
    }

    #[tokio::test]
    async fn test_memory_cache_disabled() {
        let config = IpfsConfig {
            cache_enabled: false,
            ..Default::default()
        };
        let service = IpfsCacheService::new(config, None);
        assert!(service.memory_cache.is_none());
    }

    #[tokio::test]
    async fn test_memory_cache_enabled() {
        let config = IpfsConfig {
            cache_enabled: true,
            memory_cache_size: 50,
            memory_cache_ttl_secs: 600,
            ..Default::default()
        };
        let service = IpfsCacheService::new(config, None);
        assert!(service.memory_cache.is_some());
    }

    #[test]
    fn fallback_fetch_uses_five_retries() {
        assert_eq!(fallback_retry_config().max_retries, 5);
    }

    #[test]
    fn fallback_retry_classifies_http_statuses() {
        let not_found: eyre::Report = FallbackFetchError::Status(reqwest::StatusCode::NOT_FOUND).into();
        assert!(!is_retryable_fallback_error(&not_found));

        let service_unavailable: eyre::Report =
            FallbackFetchError::Status(reqwest::StatusCode::SERVICE_UNAVAILABLE).into();
        assert!(is_retryable_fallback_error(&service_unavailable));
    }
}