spider_pipeline/pipeline.rs
1//! Pipeline trait and lifecycle hooks.
2//!
3//! A pipeline receives each scraped item after parsing. It may keep the item,
4//! transform it, drop it, write it somewhere, or preserve its own state for
5//! checkpointing.
6
7use async_trait::async_trait;
8use serde_json::Value;
9use spider_util::error::PipelineError;
10use spider_util::item::ScrapedItem;
11
12/// Contract implemented by item-processing pipelines.
13///
14/// Pipelines receive items after parsing and before final output is considered
15/// complete. A pipeline may transform an item and pass it on, drop it by
16/// returning `Ok(None)`, persist it externally, or keep internal state for
17/// checkpoint/resume.
18#[async_trait]
19pub trait Pipeline<I: ScrapedItem>: Send + Sync + 'static {
20 /// Returns the name of the pipeline.
21 fn name(&self) -> &str;
22
23 /// Processes a single scraped item.
24 ///
25 /// This method can perform any processing on the item, such as storing it, validating it,
26 /// or passing it to another pipeline. It can also choose to drop the item by returning `Ok(None)`.
27 ///
28 /// Pipelines are ordered. Returning `Ok(Some(item))` forwards the item to
29 /// the next pipeline, while `Ok(None)` stops the item from moving further
30 /// down the chain without treating it as an error.
31 ///
32 /// # Errors
33 ///
34 /// Returns an error when item processing fails.
35 async fn process_item(&self, item: I) -> Result<Option<I>, PipelineError>;
36
37 /// Called when the spider is closing.
38 ///
39 /// This method can be used to perform any cleanup tasks, such as closing file handles or
40 /// database connections.
41 ///
42 /// # Errors
43 ///
44 /// Returns an error if cleanup fails.
45 async fn close(&self) -> Result<(), PipelineError> {
46 Ok(())
47 }
48
49 /// Returns the current state of the pipeline as a JSON value.
50 ///
51 /// This method is called during checkpointing to save the pipeline's state.
52 /// The returned state should be sufficient to restore the pipeline to its current
53 /// state using `restore_state`.
54 ///
55 /// Pipelines that do not need checkpoint support can keep the default
56 /// implementation and return `Ok(None)`.
57 ///
58 /// # Errors
59 ///
60 /// Returns an error when state capture or serialization fails.
61 async fn get_state(&self) -> Result<Option<Value>, PipelineError> {
62 Ok(None)
63 }
64
65 /// Restores the pipeline's state from a JSON value.
66 ///
67 /// This method is called when resuming from a checkpoint. The provided state
68 /// should be used to restore the pipeline to the state it was in when the
69 /// checkpoint was created.
70 ///
71 /// # Errors
72 ///
73 /// Returns an error when deserializing or applying state fails.
74 async fn restore_state(&self, _state: Value) -> Result<(), PipelineError> {
75 Ok(())
76 }
77}