spider_lib/pipeline.rs
1//! Trait for defining item processing pipelines in `spider-lib`.
2//!
3//! This module provides the `Pipeline` trait, which is a fundamental abstraction
4//! for post-processing `ScrapedItem`s after they have been extracted by a spider.
5//! Pipelines allow for a modular approach to handling scraped data, enabling
6//! operations such as:
7//! - Storing items in databases or files.
8//! - Validating and cleaning data.
9//! - Deduplicating entries.
10//! - Performing further transformations.
11//!
12//! Implementors of this trait define the `process_item` method, which receives
13//! a `ScrapedItem` and can modify, drop, or pass it along the pipeline.
14//! Pipelines also support state management for checkpointing and cleanup operations.
15
16use crate::error::PipelineError;
17use crate::item::ScrapedItem;
18use async_trait::async_trait;
19use serde_json::Value;
20
21/// The `Pipeline` trait defines the contract for item processing pipelines.
22///
23/// Pipelines are responsible for processing scraped items, such as storing them in a database,
24/// writing them to a file, or performing data validation.
25#[async_trait]
26pub trait Pipeline<I: ScrapedItem>: Send + Sync + 'static {
27 /// Returns the name of the pipeline.
28 fn name(&self) -> &str;
29
30 /// Processes a single scraped item.
31 ///
32 /// This method can perform any processing on the item, such as storing it, validating it,
33 /// or passing it to another pipeline. It can also choose to drop the item by returning `Ok(None)`.
34 async fn process_item(&self, item: I) -> Result<Option<I>, PipelineError>;
35
36 /// Called when the spider is closing.
37 ///
38 /// This method can be used to perform any cleanup tasks, such as closing file handles or
39 /// database connections.
40 async fn close(&self) -> Result<(), PipelineError> {
41 Ok(())
42 }
43
44 /// Returns the current state of the pipeline as a JSON value.
45 ///
46 /// This method is called during checkpointing to save the pipeline's state.
47 /// The returned state should be sufficient to restore the pipeline to its current
48 /// state using `restore_state`.
49 async fn get_state(&self) -> Result<Option<Value>, PipelineError> {
50 Ok(None)
51 }
52
53 /// Restores the pipeline's state from a JSON value.
54 ///
55 /// This method is called when resuming from a checkpoint. The provided state
56 /// should be used to restore the pipeline to the state it was in when the
57 /// checkpoint was created.
58 async fn restore_state(&self, _state: Value) -> Result<(), PipelineError> {
59 Ok(())
60 }
61}