1use crate::error::SpiderError;
10use crate::item::ScrapedItem;
11use crate::pipeline::Pipeline;
12use crate::request::Request;
13use crate::spider::Spider;
14use dashmap::DashSet;
15use rmp_serde;
16use serde::{Deserialize, Serialize};
17use serde_json::Value;
18use std::collections::{HashMap, VecDeque};
19use std::fs;
20use std::path::Path;
21use std::sync::Arc;
22use tracing::{info, warn};
23
24#[derive(Serialize, Deserialize, Default, Clone, Debug)]
26pub struct SchedulerCheckpoint {
27 pub request_queue: VecDeque<Request>,
29 pub salvaged_requests: VecDeque<Request>,
31 pub visited_urls: DashSet<String>,
33}
34
35#[derive(Debug, Serialize, Deserialize, Default)]
37pub struct Checkpoint {
38 pub scheduler: SchedulerCheckpoint,
40 pub pipelines: HashMap<String, Value>,
42}
43
44pub async fn save_checkpoint<S: Spider>(
45 path: &Path,
46 scheduler_checkpoint: SchedulerCheckpoint,
47 pipelines: &Arc<Vec<Box<dyn Pipeline<S::Item>>>>,
48) -> Result<(), SpiderError>
49where
50 S::Item: ScrapedItem,
51{
52 info!("Saving checkpoint to {:?}", path);
53
54 let mut pipelines_checkpoint_map = HashMap::new();
55 for pipeline in pipelines.iter() {
56 if let Some(state) = pipeline.get_state().await? {
57 pipelines_checkpoint_map.insert(pipeline.name().to_string(), state);
58 }
59 }
60
61 if !scheduler_checkpoint.salvaged_requests.is_empty() {
62 warn!(
63 "Found {} salvaged requests during checkpoint. These have been added to the request queue.",
64 scheduler_checkpoint.salvaged_requests.len()
65 );
66 }
67
68 let checkpoint = Checkpoint {
69 scheduler: scheduler_checkpoint,
70 pipelines: pipelines_checkpoint_map,
71 };
72
73 let tmp_path = path.with_extension("tmp");
74 let encoded = rmp_serde::to_vec(&checkpoint)
75 .map_err(|e| SpiderError::GeneralError(format!("Failed to serialize checkpoint: {}", e)))?;
76 fs::write(&tmp_path, encoded).map_err(|e| {
77 SpiderError::GeneralError(format!(
78 "Failed to write checkpoint to temporary file: {}",
79 e
80 ))
81 })?;
82 fs::rename(&tmp_path, path).map_err(|e| {
83 SpiderError::GeneralError(format!("Failed to rename temporary checkpoint file: {}", e))
84 })?;
85
86 info!("Checkpoint saved successfully.");
87 Ok(())
88}