Skip to main content

spider_pipeline/
pipeline.rs

1//! Pipeline trait and lifecycle hooks.
2//!
3//! A pipeline receives each scraped item after parsing. It may keep the item,
4//! transform it, drop it, write it somewhere, or preserve its own state for
5//! checkpointing.
6
7use async_trait::async_trait;
8use serde_json::Value;
9use spider_util::error::PipelineError;
10use spider_util::item::ScrapedItem;
11
12/// Contract implemented by item-processing pipelines.
13///
14/// Pipelines receive items after parsing and before final output is considered
15/// complete. A pipeline may transform an item and pass it on, drop it by
16/// returning `Ok(None)`, persist it externally, or keep internal state for
17/// checkpoint/resume.
18#[async_trait]
19pub trait Pipeline<I: ScrapedItem>: Send + Sync + 'static {
20    /// Returns the name of the pipeline.
21    fn name(&self) -> &str;
22
23    /// Processes a single scraped item.
24    ///
25    /// This method can perform any processing on the item, such as storing it, validating it,
26    /// or passing it to another pipeline. It can also choose to drop the item by returning `Ok(None)`.
27    ///
28    /// Pipelines are ordered. Returning `Ok(Some(item))` forwards the item to
29    /// the next pipeline, while `Ok(None)` stops the item from moving further
30    /// down the chain without treating it as an error.
31    ///
32    /// # Errors
33    ///
34    /// Returns an error when item processing fails.
35    async fn process_item(&self, item: I) -> Result<Option<I>, PipelineError>;
36
37    /// Called when the spider is closing.
38    ///
39    /// This method can be used to perform any cleanup tasks, such as closing file handles or
40    /// database connections.
41    ///
42    /// # Errors
43    ///
44    /// Returns an error if cleanup fails.
45    async fn close(&self) -> Result<(), PipelineError> {
46        Ok(())
47    }
48
49    /// Returns the current state of the pipeline as a JSON value.
50    ///
51    /// This method is called during checkpointing to save the pipeline's state.
52    /// The returned state should be sufficient to restore the pipeline to its current
53    /// state using `restore_state`.
54    ///
55    /// Pipelines that do not need checkpoint support can keep the default
56    /// implementation and return `Ok(None)`.
57    ///
58    /// # Errors
59    ///
60    /// Returns an error when state capture or serialization fails.
61    async fn get_state(&self) -> Result<Option<Value>, PipelineError> {
62        Ok(None)
63    }
64
65    /// Restores the pipeline's state from a JSON value.
66    ///
67    /// This method is called when resuming from a checkpoint. The provided state
68    /// should be used to restore the pipeline to the state it was in when the
69    /// checkpoint was created.
70    ///
71    /// # Errors
72    ///
73    /// Returns an error when deserializing or applying state fails.
74    async fn restore_state(&self, _state: Value) -> Result<(), PipelineError> {
75        Ok(())
76    }
77}