Skip to main content

spider_lib/
checkpoint.rs

1//! Module for managing crawler checkpoints.
2//!
3//! This module defines the data structures (`SchedulerCheckpoint`, `Checkpoint`)
4//! and functions for saving and loading the state of a crawler. Checkpoints enable
5//! the crawler to gracefully recover from interruptions or to resume a crawl
6//! at a later time. They capture the state of the scheduler (pending requests,
7//! visited URLs, salvaged requests) and the item pipelines.
8
9use crate::error::SpiderError;
10use crate::item::ScrapedItem;
11use crate::pipeline::Pipeline;
12use crate::request::Request;
13use crate::spider::Spider;
14use dashmap::DashSet;
15use rmp_serde;
16use serde::{Deserialize, Serialize};
17use serde_json::Value;
18use std::collections::{HashMap, VecDeque};
19use std::fs;
20use std::path::Path;
21use std::sync::Arc;
22use tracing::{info, warn};
23
24/// A snapshot of the scheduler's state.
25#[derive(Serialize, Deserialize, Default, Clone, Debug)]
26pub struct SchedulerCheckpoint {
27    /// The queue of pending requests.
28    pub request_queue: VecDeque<Request>,
29    /// Requests that could not be enqueued and were salvaged.
30    pub salvaged_requests: VecDeque<Request>,
31    /// The set of visited URL fingerprints.
32    pub visited_urls: DashSet<String>,
33}
34
35/// A complete checkpoint of the crawler's state.
36#[derive(Debug, Serialize, Deserialize, Default)]
37pub struct Checkpoint {
38    /// The state of the scheduler.
39    pub scheduler: SchedulerCheckpoint,
40    /// A map of pipeline states, keyed by pipeline name.
41    pub pipelines: HashMap<String, Value>,
42}
43
44pub async fn save_checkpoint<S: Spider>(
45    path: &Path,
46    scheduler_checkpoint: SchedulerCheckpoint,
47    pipelines: &Arc<Vec<Box<dyn Pipeline<S::Item>>>>,
48) -> Result<(), SpiderError>
49where
50    S::Item: ScrapedItem,
51{
52    info!("Saving checkpoint to {:?}", path);
53
54    let mut pipelines_checkpoint_map = HashMap::new();
55    for pipeline in pipelines.iter() {
56        if let Some(state) = pipeline.get_state().await? {
57            pipelines_checkpoint_map.insert(pipeline.name().to_string(), state);
58        }
59    }
60
61    if !scheduler_checkpoint.salvaged_requests.is_empty() {
62        warn!(
63            "Found {} salvaged requests during checkpoint. These have been added to the request queue.",
64            scheduler_checkpoint.salvaged_requests.len()
65        );
66    }
67
68    let checkpoint = Checkpoint {
69        scheduler: scheduler_checkpoint,
70        pipelines: pipelines_checkpoint_map,
71    };
72
73    let tmp_path = path.with_extension("tmp");
74    let encoded = rmp_serde::to_vec(&checkpoint)
75        .map_err(|e| SpiderError::GeneralError(format!("Failed to serialize checkpoint: {}", e)))?;
76    fs::write(&tmp_path, encoded).map_err(|e| {
77        SpiderError::GeneralError(format!(
78            "Failed to write checkpoint to temporary file: {}",
79            e
80        ))
81    })?;
82    fs::rename(&tmp_path, path).map_err(|e| {
83        SpiderError::GeneralError(format!("Failed to rename temporary checkpoint file: {}", e))
84    })?;
85
86    info!("Checkpoint saved successfully.");
87    Ok(())
88}