spider_pipeline/pipeline.rs
1//! Trait for defining item processing pipelines in `spider-pipeline`.
2//!
3//! This module provides the `Pipeline` trait, which is a fundamental abstraction
4//! for post-processing `ScrapedItem`s after they have been extracted by a spider.
5//! Pipelines allow for a modular approach to handling scraped data, enabling
6//! operations such as:
7//! - Storing items in databases or files.
8//! - Validating and cleaning data.
9//! - Deduplicating entries.
10//! - Performing further transformations.
11//!
12//! Implementors of this trait define the `process_item` method, which receives
13//! a `ScrapedItem` and can modify, drop, or pass it along the pipeline.
14//! Pipelines also support state management for checkpointing and cleanup operations.
15
16use async_trait::async_trait;
17use serde_json::Value;
18use spider_util::error::PipelineError;
19use spider_util::item::ScrapedItem;
20
21/// The `Pipeline` trait defines the contract for item processing pipelines.
22///
23/// Pipelines are responsible for processing scraped items, such as storing them in a database,
24/// writing them to a file, or performing data validation.
25#[async_trait]
26pub trait Pipeline<I: ScrapedItem>: Send + Sync + 'static {
27 /// Returns the name of the pipeline.
28 fn name(&self) -> &str;
29
30 /// Processes a single scraped item.
31 ///
32 /// This method can perform any processing on the item, such as storing it, validating it,
33 /// or passing it to another pipeline. It can also choose to drop the item by returning `Ok(None)`.
34 ///
35 /// # Errors
36 ///
37 /// Returns an error when item processing fails.
38 async fn process_item(&self, item: I) -> Result<Option<I>, PipelineError>;
39
40 /// Called when the spider is closing.
41 ///
42 /// This method can be used to perform any cleanup tasks, such as closing file handles or
43 /// database connections.
44 ///
45 /// # Errors
46 ///
47 /// Returns an error if cleanup fails.
48 async fn close(&self) -> Result<(), PipelineError> {
49 Ok(())
50 }
51
52 /// Returns the current state of the pipeline as a JSON value.
53 ///
54 /// This method is called during checkpointing to save the pipeline's state.
55 /// The returned state should be sufficient to restore the pipeline to its current
56 /// state using `restore_state`.
57 ///
58 /// # Errors
59 ///
60 /// Returns an error when state capture or serialization fails.
61 async fn get_state(&self) -> Result<Option<Value>, PipelineError> {
62 Ok(None)
63 }
64
65 /// Restores the pipeline's state from a JSON value.
66 ///
67 /// This method is called when resuming from a checkpoint. The provided state
68 /// should be used to restore the pipeline to the state it was in when the
69 /// checkpoint was created.
70 ///
71 /// # Errors
72 ///
73 /// Returns an error when deserializing or applying state fails.
74 async fn restore_state(&self, _state: Value) -> Result<(), PipelineError> {
75 Ok(())
76 }
77}