Skip to main content

spider_pipeline/
pipeline.rs

1//! Trait for defining item processing pipelines in `spider-pipeline`.
2//!
3//! This module provides the `Pipeline` trait, which is a fundamental abstraction
4//! for post-processing `ScrapedItem`s after they have been extracted by a spider.
5//! Pipelines allow for a modular approach to handling scraped data, enabling
6//! operations such as:
7//! - Storing items in databases or files.
8//! - Validating and cleaning data.
9//! - Deduplicating entries.
10//! - Performing further transformations.
11//!
12//! Implementors of this trait define the `process_item` method, which receives
13//! a `ScrapedItem` and can modify, drop, or pass it along the pipeline.
14//! Pipelines also support state management for checkpointing and cleanup operations.
15
16use async_trait::async_trait;
17use serde_json::Value;
18use spider_util::error::PipelineError;
19use spider_util::item::ScrapedItem;
20
21/// The `Pipeline` trait defines the contract for item processing pipelines.
22///
23/// Pipelines are responsible for processing scraped items, such as storing them in a database,
24/// writing them to a file, or performing data validation.
25#[async_trait]
26pub trait Pipeline<I: ScrapedItem>: Send + Sync + 'static {
27    /// Returns the name of the pipeline.
28    fn name(&self) -> &str;
29
30    /// Processes a single scraped item.
31    ///
32    /// This method can perform any processing on the item, such as storing it, validating it,
33    /// or passing it to another pipeline. It can also choose to drop the item by returning `Ok(None)`.
34    ///
35    /// # Errors
36    ///
37    /// Returns an error when item processing fails.
38    async fn process_item(&self, item: I) -> Result<Option<I>, PipelineError>;
39
40    /// Called when the spider is closing.
41    ///
42    /// This method can be used to perform any cleanup tasks, such as closing file handles or
43    /// database connections.
44    ///
45    /// # Errors
46    ///
47    /// Returns an error if cleanup fails.
48    async fn close(&self) -> Result<(), PipelineError> {
49        Ok(())
50    }
51
52    /// Returns the current state of the pipeline as a JSON value.
53    ///
54    /// This method is called during checkpointing to save the pipeline's state.
55    /// The returned state should be sufficient to restore the pipeline to its current
56    /// state using `restore_state`.
57    ///
58    /// # Errors
59    ///
60    /// Returns an error when state capture or serialization fails.
61    async fn get_state(&self) -> Result<Option<Value>, PipelineError> {
62        Ok(None)
63    }
64
65    /// Restores the pipeline's state from a JSON value.
66    ///
67    /// This method is called when resuming from a checkpoint. The provided state
68    /// should be used to restore the pipeline to the state it was in when the
69    /// checkpoint was created.
70    ///
71    /// # Errors
72    ///
73    /// Returns an error when deserializing or applying state fails.
74    async fn restore_state(&self, _state: Value) -> Result<(), PipelineError> {
75        Ok(())
76    }
77}