chromey 2.46.5

Concurrent chrome devtools protocol automation library for Rust
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
use base64::engine::general_purpose;
use base64::prelude::Engine as _;
use hashbrown::HashMap;
use http_cache_reqwest::CacheManager;
use http_cache_semantics::CachePolicy;
use http_global_cache::CACACHE_MANAGER;
use lazy_static::lazy_static;
use reqwest::header::HeaderValue;
use reqwest::Method;
use reqwest::{Client, StatusCode};
use serde::{Deserialize, Serialize};
use tokio::sync::Semaphore;
use url::Url;

use crate::cache::manager::site_key_for_target_url;
use crate::http::{convert_headers, HttpRequestLike, HttpResponseLike, HttpVersion};

lazy_static! {
    /// Global HTTP client reused for all remote cache dumps.
    pub static ref HYBRID_CACHE_CLIENT: Client = Client::builder()
        .pool_idle_timeout(std::time::Duration::from_secs(90))
        .build()
        .expect("failed to build HYBRID_CACHE_CLIENT");
    /// Base URL of your remote hybrid cache server.
    ///
    /// Example: "http://127.0.0.1:8080"
    ///
    /// Override via env:
    ///   HYBRID_CACHE_ENDPOINT=http://remote-cache:8080
    pub static ref HYBRID_CACHE_ENDPOINT: String = std::env::var("HYBRID_CACHE_ENDPOINT")
        .unwrap_or_else(|_| "http://127.0.0.1:8080".to_string());
    /// The local session cache per run cleared.
    pub static ref LOCAL_SESSION_CACHE: dashmap::DashMap<String, HashMap<String, (http_cache_reqwest::HttpResponse, CachePolicy)>> = dashmap::DashMap::new();
    /// Max concurrent remote cache dumps across the whole process.
    pub static ref REMOTE_CACHE_DUMP_SEM: Semaphore = Semaphore::new(1000);
    /// URLs currently being streamed via `Fetch.takeResponseBodyAsStream`.
    /// Checked by the `Network.responseReceived` listener to avoid a
    /// redundant `getResponseBody` call for the same resource.
    pub(crate) static ref PENDING_STREAM_URLS: dashmap::DashSet<String> = dashmap::DashSet::new();
}

/// Payload shape for the remote hybrid cache server `/cache/index` endpoint.
#[derive(Debug, Serialize, Deserialize, Default)]
pub struct HybridCachePayload {
    /// Optional website-level key (defaults to URL host if None).
    #[serde(default)]
    website_key: Option<String>,
    resource_key: String,
    url: String,
    method: String,
    status: u16,
    request_headers: std::collections::HashMap<String, String>,
    response_headers: std::collections::HashMap<String, String>,
    http_version: HttpVersion,
    /// Base64-encoded HTTP body for JSON transport.
    body_base64: String,
}

pub async fn dump_to_remote_cache_parts(
    cache_key: &str,
    cache_site: &str,
    url_str: &str,
    body: &[u8],
    method: &str,
    status: u16,
    http_request_headers: &std::collections::HashMap<String, String>,
    response_headers: &std::collections::HashMap<String, String>,
    http_version: &HttpVersion,
    dump_remote: Option<&str>,
) {
    let _permit = match REMOTE_CACHE_DUMP_SEM.acquire().await {
        Ok(p) => p,
        Err(_) => return,
    };

    let website_key = url::Url::parse(url_str)
        .ok()
        .and_then(|u| u.host_str().map(|h| h.to_string()));

    let body_base64 = general_purpose::STANDARD.encode(body);

    let payload = HybridCachePayload {
        website_key,
        resource_key: cache_key.to_string(),
        url: url_str.to_string(),
        method: method.to_string(),
        status,
        http_version: *http_version,
        request_headers: http_request_headers.clone(),
        response_headers: response_headers.clone(),
        body_base64,
    };

    let mut base_url = HYBRID_CACHE_ENDPOINT.as_str();

    if let Some(remote) = dump_remote {
        if remote != "true" {
            base_url = remote.trim_ascii();
        }
    }

    let endpoint = format!("{}/cache/index", &*base_url);

    let result = HYBRID_CACHE_CLIENT
        .post(&endpoint)
        .json(&payload)
        .header(
            "x-cache-site",
            HeaderValue::from_str(cache_site).unwrap_or(HeaderValue::from_static("")),
        )
        .send()
        .await;

    match result {
        Ok(resp) => {
            if !resp.status().is_success() {
                tracing::warn!(
                    "remote cache dump: non-success status for {}: {}",
                    cache_key,
                    resp.status()
                );
            } else {
                tracing::info!(
                    "remote cache dump: success status for {}: {}",
                    cache_key,
                    resp.status()
                );
            }
        }
        Err(err) => {
            tracing::warn!(
                "remote cache dump: failed to POST {} to {}: {}",
                cache_key,
                endpoint,
                err
            );
        }
    }
}

/// Best-effort dump of a cached response into the remote hybrid cache server [experimental]
pub async fn dump_to_remote_cache(
    cache_key: &str,
    cache_site: &str,
    http_response: &crate::http::HttpResponse,
    method: &str,
    http_request_headers: &std::collections::HashMap<String, String>,
    dump_remote: Option<&str>,
) {
    dump_to_remote_cache_parts(
        cache_key,
        cache_site,
        http_response.url.as_str(),
        &http_response.body,
        method,
        http_response.status,
        http_request_headers,
        &http_response.headers,
        &http_response.version,
        dump_remote,
    )
    .await
}

/// Get the cache for a website from the remote cache server and seed
/// our local hybrid cache (CACACHE_MANAGER) with **all** entries [experimental].
///
/// `cache_key` here is the `website_key` used by the remote server,
/// e.g. "example.com".
pub async fn get_cache_site(
    target_url: &str,
    auth: Option<&str>,
    remote: Option<&str>,
    namespace: Option<&str>,
) {
    let mut base_url = HYBRID_CACHE_ENDPOINT.as_str();

    if let Some(remote) = remote {
        if remote != "true" {
            base_url = remote.trim_ascii();
        }
    }

    let cache_key = site_key_for_target_url(target_url, auth, namespace);

    let endpoint = format!("{}/cache/site/{}", &*base_url, cache_key);

    // Fetch all entries for this website from the remote cache server.
    let result = HYBRID_CACHE_CLIENT.get(&endpoint).send().await;

    let resp = match result {
        Ok(resp) => resp,
        Err(err) => {
            tracing::warn!(
                "remote cache get: failed to GET {} from {}: {}",
                cache_key,
                endpoint,
                err
            );
            return;
        }
    };

    if !resp.status().is_success() {
        tracing::warn!(
            "remote cache get: non-success status for {}: {}",
            cache_key,
            resp.status()
        );
        return;
    }

    // Parse JSON payloads: Vec<HybridCachePayload>
    let payloads: Vec<Box<HybridCachePayload>> = match resp.json().await {
        Ok(p) => p,
        Err(err) => {
            tracing::warn!(
                "remote cache get: failed to parse JSON for {} from {}: {}",
                cache_key,
                endpoint,
                err
            );
            return;
        }
    };

    tracing::debug!(
        "remote cache get: seeding {} entries locally for website {}",
        payloads.len(),
        cache_key
    );

    for payload in payloads {
        if let Err(err) = seed_payload_into_local_cache(&cache_key, &payload, &target_url).await {
            tracing::warn!(
                "remote cache get: failed to seed resource {} for website {}: {}",
                payload.resource_key,
                cache_key,
                err
            );
        }
    }
}

/// Get the cache for a resource from the remote cache server and seed
/// our local hybrid cache (CACACHE_MANAGER) with **all** entries [experimental].
///
/// `cache_key` here is the `website_key` used by the remote server,
/// e.g. "example.com".
pub async fn get_cache_resource(
    target_url: &str,
    auth: Option<&str>,
    remote: Option<&str>,
    namespace: Option<&str>,
) {
    let mut base_url = HYBRID_CACHE_ENDPOINT.as_str();

    if let Some(remote) = remote {
        if remote != "true" {
            base_url = remote.trim_ascii();
        }
    }

    let cache_key = site_key_for_target_url(target_url, auth, namespace);

    let endpoint = format!("{}/cache/resource/{}", &*base_url, cache_key);

    // Fetch all entries for this website from the remote cache server.
    let result = HYBRID_CACHE_CLIENT.get(&endpoint).send().await;

    let resp = match result {
        Ok(resp) => resp,
        Err(err) => {
            tracing::warn!(
                "remote cache get: failed to GET {} from {}: {}",
                cache_key,
                endpoint,
                err
            );
            return;
        }
    };

    if !resp.status().is_success() {
        tracing::warn!(
            "remote cache get: non-success status for {}: {}",
            cache_key,
            resp.status()
        );
        return;
    }

    let payload: Box<HybridCachePayload> = match resp.json().await {
        Ok(p) => p,
        Err(err) => {
            tracing::warn!(
                "remote cache get: failed to parse JSON for {} from {}: {}",
                cache_key,
                endpoint,
                err
            );
            return;
        }
    };

    tracing::debug!(
        "remote cache get: seeding 1 entrie locally for website {}",
        cache_key
    );

    if let Err(err) = seed_payload_into_local_cache(&cache_key, &payload, &target_url).await {
        tracing::warn!(
            "remote cache get: failed to seed resource {} for website {}: {}",
            payload.resource_key,
            cache_key,
            err
        );
    }
}

/// Remove item from local session cache.
pub async fn clear_local_session_cache(cache_key: &str) {
    LOCAL_SESSION_CACHE.remove(cache_key);
}

/// Maximum number of top-level site keys in the local session cache.
/// Each site key maps to a sub-map of resource entries. This cap prevents
/// unbounded memory growth when crawling many distinct sites in a single
/// process lifetime. The oldest entries are dropped in bulk when exceeded.
const SESSION_CACHE_MAX_SITES: usize = 2_000;

/// Maximum number of resource entries per site key.
const SESSION_CACHE_MAX_PER_SITE: usize = 10_000;

/// Insert the item into the dashmap
pub fn session_cache_insert(
    cache_key: &str,
    http_res: http_cache_reqwest::HttpResponse,
    cache_policy: CachePolicy,
    entry_key: &str,
) {
    use dashmap::mapref::entry::Entry;

    match LOCAL_SESSION_CACHE.entry(cache_key.to_string()) {
        Entry::Occupied(mut occ) => {
            let inner = occ.get_mut();
            // Cap per-site entries to avoid a single site exhausting memory.
            if inner.len() < SESSION_CACHE_MAX_PER_SITE {
                inner.insert(entry_key.into(), (http_res, cache_policy));
            }
        }
        Entry::Vacant(vac) => {
            // If we've hit the global cap, shed ~25 % of entries (cheap bulk
            // eviction without requiring an LRU structure).
            if LOCAL_SESSION_CACHE.len() >= SESSION_CACHE_MAX_SITES {
                let to_remove: Vec<String> = LOCAL_SESSION_CACHE
                    .iter()
                    .take(SESSION_CACHE_MAX_SITES / 4)
                    .map(|r| r.key().clone())
                    .collect();
                for key in to_remove {
                    LOCAL_SESSION_CACHE.remove(&key);
                }
            }

            let mut m: HashMap<String, (http_cache_reqwest::HttpResponse, CachePolicy)> =
                HashMap::new();

            m.insert(entry_key.into(), (http_res, cache_policy));

            vac.insert(m);
        }
    }
}

/// Seed a single `HybridCachePayload` into the local HTTP cache (CACACHE_MANAGER).
async fn seed_payload_into_local_cache(
    cache_key: &str,
    payload: &HybridCachePayload,
    target_url: &str,
) -> Result<(), String> {
    if payload.body_base64.is_empty() {
        return Ok(());
    }

    let same_document = payload.url == target_url;

    let uri = payload
        .url
        .parse()
        .map_err(|e| format!("invalid URI for {}: {e}", payload.url))?;

    let body = general_purpose::STANDARD
        .decode(&payload.body_base64)
        .map_err(|e| format!("invalid base64 body for {}: {e}", payload.resource_key))?;

    let req = HttpRequestLike {
        uri,
        method: Method::from_bytes(payload.method.as_bytes()).unwrap_or(Method::GET),
        headers: convert_headers(&payload.request_headers),
    };

    let res = HttpResponseLike {
        status: StatusCode::from_u16(payload.status).unwrap_or(StatusCode::EXPECTATION_FAILED),
        headers: convert_headers(&payload.response_headers),
    };

    let policy = CachePolicy::new(&req, &res);

    let url =
        Url::parse(&payload.url).map_err(|e| format!("invalid Url for {}: {e}", payload.url))?;

    let http_res = http_cache_reqwest::HttpResponse {
        url,
        headers: http_cache::HttpHeaders::Modern(crate::http::headers_to_multi(
            &payload.response_headers,
        )),
        version: payload.http_version.into(),
        status: payload.status,
        body,
        metadata: None,
    };

    let key = payload.resource_key.clone();
    let session_key = format!("{}:{}", payload.method, http_res.url);

    if same_document {
        let put_result = CACACHE_MANAGER
            .put(key.clone(), http_res.clone(), policy.clone())
            .await;
        if let Err(e) = put_result {
            return Err(format!("CACACHE_MANAGER.put failed for {}: {e}", key));
        }
    }

    session_cache_insert(cache_key, http_res, policy, &session_key);

    Ok(())
}

/// Get the resource from the cache.
pub fn get_session_cache_item(
    cache_key: &str,
    target_url: &str,
) -> Option<(http_cache_reqwest::HttpResponse, CachePolicy)> {
    LOCAL_SESSION_CACHE
        .get(cache_key)
        .and_then(|local_cache| local_cache.get(target_url).cloned())
}

/// Check the resource from the cache.
pub fn check_session_cache_item(cache_key: &str, target_url: &str) -> bool {
    LOCAL_SESSION_CACHE
        .get(cache_key)
        .map_or(false, |local_cache| local_cache.contains_key(target_url))
}

/// Mark a URL as "stream in-flight" so the Network listener skips it.
pub fn mark_stream_pending(key: &str) {
    PENDING_STREAM_URLS.insert(key.to_string());
}

/// Remove the in-flight marker (called on success *and* failure).
pub fn clear_stream_pending(key: &str) {
    PENDING_STREAM_URLS.remove(key);
}

/// Returns `true` when the URL is currently being body-streamed.
pub fn is_stream_pending(key: &str) -> bool {
    PENDING_STREAM_URLS.contains(key)
}