Skip to main content

spider_pipeline/
transform.rs

1//! Item transformation pipeline.
2//!
3//! [`TransformPipeline`] works on the JSON form of a scraped item and can apply
4//! a sequence of built-in field edits or custom closures.
5
6use crate::pipeline::Pipeline;
7use async_trait::async_trait;
8use log::{debug, warn};
9use serde::Serialize;
10use serde::de::DeserializeOwned;
11use serde_json::Value;
12use spider_util::{error::PipelineError, item::ScrapedItem};
13use std::marker::PhantomData;
14use std::sync::Arc;
15
16type TransformFn = dyn Fn(&mut Value) -> Result<(), String> + Send + Sync + 'static;
17
18/// Built-in operations applied to top-level object fields.
19#[derive(Debug, Clone)]
20pub enum TransformOperation {
21    Trim { field: String },
22    Lowercase { field: String },
23    Uppercase { field: String },
24    Rename { from: String, to: String },
25    Remove { field: String },
26    Set { field: String, value: Value },
27    SetDefault { field: String, value: Value },
28}
29
30/// Pipeline that transforms items and forwards successful results downstream.
31pub struct TransformPipeline<I>
32where
33    I: ScrapedItem + Serialize + DeserializeOwned,
34{
35    operations: Vec<TransformOperation>,
36    transforms: Vec<Arc<TransformFn>>,
37    _phantom: PhantomData<I>,
38}
39
40impl<I> TransformPipeline<I>
41where
42    I: ScrapedItem + Serialize + DeserializeOwned,
43{
44    /// Creates a new empty `TransformPipeline`.
45    pub fn new() -> Self {
46        Self {
47            operations: Vec::new(),
48            transforms: Vec::new(),
49            _phantom: PhantomData,
50        }
51    }
52
53    /// Adds a built-in transformation operation.
54    pub fn with_operation(mut self, operation: TransformOperation) -> Self {
55        self.operations.push(operation);
56        self
57    }
58
59    /// Adds a custom transformation closure.
60    pub fn with_transform<F>(mut self, transform: F) -> Self
61    where
62        F: Fn(&mut Value) -> Result<(), String> + Send + Sync + 'static,
63    {
64        self.transforms.push(Arc::new(transform));
65        self
66    }
67
68    fn apply_operation(value: &mut Value, operation: &TransformOperation) -> Result<(), String> {
69        let map = value
70            .as_object_mut()
71            .ok_or_else(|| "Item must be a JSON object for transformation.".to_string())?;
72
73        match operation {
74            TransformOperation::Trim { field } => {
75                if let Some(raw) = map.get_mut(field) {
76                    let text = raw
77                        .as_str()
78                        .ok_or_else(|| format!("Field '{}' must be a string for Trim.", field))?;
79                    *raw = Value::String(text.trim().to_string());
80                }
81                Ok(())
82            }
83            TransformOperation::Lowercase { field } => {
84                if let Some(raw) = map.get_mut(field) {
85                    let text = raw.as_str().ok_or_else(|| {
86                        format!("Field '{}' must be a string for Lowercase.", field)
87                    })?;
88                    *raw = Value::String(text.to_lowercase());
89                }
90                Ok(())
91            }
92            TransformOperation::Uppercase { field } => {
93                if let Some(raw) = map.get_mut(field) {
94                    let text = raw.as_str().ok_or_else(|| {
95                        format!("Field '{}' must be a string for Uppercase.", field)
96                    })?;
97                    *raw = Value::String(text.to_uppercase());
98                }
99                Ok(())
100            }
101            TransformOperation::Rename { from, to } => {
102                if let Some(value) = map.remove(from) {
103                    map.insert(to.clone(), value);
104                }
105                Ok(())
106            }
107            TransformOperation::Remove { field } => {
108                map.remove(field);
109                Ok(())
110            }
111            TransformOperation::Set { field, value } => {
112                map.insert(field.clone(), value.clone());
113                Ok(())
114            }
115            TransformOperation::SetDefault { field, value } => {
116                if !map.contains_key(field) {
117                    map.insert(field.clone(), value.clone());
118                }
119                Ok(())
120            }
121        }
122    }
123}
124
125impl<I> Default for TransformPipeline<I>
126where
127    I: ScrapedItem + Serialize + DeserializeOwned,
128{
129    fn default() -> Self {
130        Self::new()
131    }
132}
133
134#[async_trait]
135impl<I> Pipeline<I> for TransformPipeline<I>
136where
137    I: ScrapedItem + Serialize + DeserializeOwned,
138{
139    fn name(&self) -> &str {
140        "TransformPipeline"
141    }
142
143    async fn process_item(&self, item: I) -> Result<Option<I>, PipelineError> {
144        debug!("TransformPipeline processing item.");
145
146        let mut json = item.to_json_value();
147
148        for operation in &self.operations {
149            if let Err(err) = Self::apply_operation(&mut json, operation) {
150                warn!("Transform operation failed, dropping item: {}", err);
151                return Ok(None);
152            }
153        }
154
155        for transform in &self.transforms {
156            if let Err(err) = transform(&mut json) {
157                warn!("Custom transform failed, dropping item: {}", err);
158                return Ok(None);
159            }
160        }
161
162        match serde_json::from_value::<I>(json) {
163            Ok(transformed) => Ok(Some(transformed)),
164            Err(err) => {
165                warn!(
166                    "Failed to deserialize transformed item, dropping item: {}",
167                    err
168                );
169                Ok(None)
170            }
171        }
172    }
173}