Skip to main content

spider_pipeline/
transform.rs

1//! Item Pipeline for transforming scraped items.
2//!
3//! This module provides `TransformPipeline`, which applies declarative
4//! transformation operations and custom closures to item JSON payloads.
5
6use crate::pipeline::Pipeline;
7use async_trait::async_trait;
8use log::{debug, warn};
9use serde::Serialize;
10use serde::de::DeserializeOwned;
11use serde_json::Value;
12use spider_util::{error::PipelineError, item::ScrapedItem};
13use std::marker::PhantomData;
14use std::sync::Arc;
15
16type TransformFn = dyn Fn(&mut Value) -> Result<(), String> + Send + Sync + 'static;
17
18/// Built-in transformation operations for top-level object fields.
19#[derive(Debug, Clone)]
20pub enum TransformOperation {
21    Trim { field: String },
22    Lowercase { field: String },
23    Uppercase { field: String },
24    Rename { from: String, to: String },
25    Remove { field: String },
26    Set { field: String, value: Value },
27    SetDefault { field: String, value: Value },
28}
29
30/// A pipeline that transforms items and forwards transformed items downstream.
31pub struct TransformPipeline<I>
32where
33    I: ScrapedItem + Serialize + DeserializeOwned,
34{
35    operations: Vec<TransformOperation>,
36    transforms: Vec<Arc<TransformFn>>,
37    _phantom: PhantomData<I>,
38}
39
40impl<I> TransformPipeline<I>
41where
42    I: ScrapedItem + Serialize + DeserializeOwned,
43{
44    /// Creates a new empty `TransformPipeline`.
45    pub fn new() -> Self {
46        Self {
47            operations: Vec::new(),
48            transforms: Vec::new(),
49            _phantom: PhantomData,
50        }
51    }
52
53    /// Adds a built-in transformation operation.
54    pub fn with_operation(mut self, operation: TransformOperation) -> Self {
55        self.operations.push(operation);
56        self
57    }
58
59    /// Adds a custom transformation closure.
60    pub fn with_transform<F>(mut self, transform: F) -> Self
61    where
62        F: Fn(&mut Value) -> Result<(), String> + Send + Sync + 'static,
63    {
64        self.transforms.push(Arc::new(transform));
65        self
66    }
67
68    fn apply_operation(value: &mut Value, operation: &TransformOperation) -> Result<(), String> {
69        let map = value
70            .as_object_mut()
71            .ok_or_else(|| "Item must be a JSON object for transformation.".to_string())?;
72
73        match operation {
74            TransformOperation::Trim { field } => {
75                if let Some(raw) = map.get_mut(field) {
76                    let text = raw
77                        .as_str()
78                        .ok_or_else(|| format!("Field '{}' must be a string for Trim.", field))?;
79                    *raw = Value::String(text.trim().to_string());
80                }
81                Ok(())
82            }
83            TransformOperation::Lowercase { field } => {
84                if let Some(raw) = map.get_mut(field) {
85                    let text = raw.as_str().ok_or_else(|| {
86                        format!("Field '{}' must be a string for Lowercase.", field)
87                    })?;
88                    *raw = Value::String(text.to_lowercase());
89                }
90                Ok(())
91            }
92            TransformOperation::Uppercase { field } => {
93                if let Some(raw) = map.get_mut(field) {
94                    let text = raw.as_str().ok_or_else(|| {
95                        format!("Field '{}' must be a string for Uppercase.", field)
96                    })?;
97                    *raw = Value::String(text.to_uppercase());
98                }
99                Ok(())
100            }
101            TransformOperation::Rename { from, to } => {
102                if let Some(value) = map.remove(from) {
103                    map.insert(to.clone(), value);
104                }
105                Ok(())
106            }
107            TransformOperation::Remove { field } => {
108                map.remove(field);
109                Ok(())
110            }
111            TransformOperation::Set { field, value } => {
112                map.insert(field.clone(), value.clone());
113                Ok(())
114            }
115            TransformOperation::SetDefault { field, value } => {
116                if !map.contains_key(field) {
117                    map.insert(field.clone(), value.clone());
118                }
119                Ok(())
120            }
121        }
122    }
123}
124
125impl<I> Default for TransformPipeline<I>
126where
127    I: ScrapedItem + Serialize + DeserializeOwned,
128{
129    fn default() -> Self {
130        Self::new()
131    }
132}
133
134#[async_trait]
135impl<I> Pipeline<I> for TransformPipeline<I>
136where
137    I: ScrapedItem + Serialize + DeserializeOwned,
138{
139    fn name(&self) -> &str {
140        "TransformPipeline"
141    }
142
143    async fn process_item(&self, item: I) -> Result<Option<I>, PipelineError> {
144        debug!("TransformPipeline processing item.");
145
146        let mut json = item.to_json_value();
147
148        for operation in &self.operations {
149            if let Err(err) = Self::apply_operation(&mut json, operation) {
150                warn!("Transform operation failed, dropping item: {}", err);
151                return Ok(None);
152            }
153        }
154
155        for transform in &self.transforms {
156            if let Err(err) = transform(&mut json) {
157                warn!("Custom transform failed, dropping item: {}", err);
158                return Ok(None);
159            }
160        }
161
162        match serde_json::from_value::<I>(json) {
163            Ok(transformed) => Ok(Some(transformed)),
164            Err(err) => {
165                warn!(
166                    "Failed to deserialize transformed item, dropping item: {}",
167                    err
168                );
169                Ok(None)
170            }
171        }
172    }
173}
174
175#[cfg(test)]
176mod tests {
177    use super::*;
178    use serde::{Deserialize, Serialize};
179    use serde_json::json;
180    use spider_util::item::ScrapedItem;
181    use std::any::Any;
182
183    #[derive(Debug, Clone, Serialize, Deserialize)]
184    struct ProductItem {
185        title: String,
186        slug: String,
187        stock: i32,
188    }
189
190    impl ScrapedItem for ProductItem {
191        fn as_any(&self) -> &dyn Any {
192            self
193        }
194
195        fn box_clone(&self) -> Box<dyn ScrapedItem + Send + Sync> {
196            Box::new(self.clone())
197        }
198
199        fn to_json_value(&self) -> Value {
200            serde_json::to_value(self).expect("serialize test item")
201        }
202    }
203
204    #[derive(Debug, Clone, Serialize, Deserialize)]
205    struct TitleOnlyItem {
206        title: String,
207    }
208
209    impl ScrapedItem for TitleOnlyItem {
210        fn as_any(&self) -> &dyn Any {
211            self
212        }
213
214        fn box_clone(&self) -> Box<dyn ScrapedItem + Send + Sync> {
215            Box::new(self.clone())
216        }
217
218        fn to_json_value(&self) -> Value {
219            serde_json::to_value(self).expect("serialize test item")
220        }
221    }
222
223    #[tokio::test]
224    async fn applies_string_operations() {
225        let pipeline = TransformPipeline::<ProductItem>::new()
226            .with_operation(TransformOperation::Trim {
227                field: "title".to_string(),
228            })
229            .with_operation(TransformOperation::Lowercase {
230                field: "slug".to_string(),
231            });
232
233        let out = pipeline
234            .process_item(ProductItem {
235                title: "  Book  ".to_string(),
236                slug: "HELLO-WORLD".to_string(),
237                stock: 1,
238            })
239            .await
240            .expect("pipeline should not fail")
241            .expect("item should pass");
242
243        assert_eq!(out.title, "Book");
244        assert_eq!(out.slug, "hello-world");
245    }
246
247    #[tokio::test]
248    async fn applies_rename_remove_set_and_default() {
249        let pipeline = TransformPipeline::<TitleOnlyItem>::new()
250            .with_operation(TransformOperation::Rename {
251                from: "title".to_string(),
252                to: "title".to_string(),
253            })
254            .with_operation(TransformOperation::SetDefault {
255                field: "title".to_string(),
256                value: json!("fallback"),
257            })
258            .with_operation(TransformOperation::Set {
259                field: "title".to_string(),
260                value: json!("final"),
261            })
262            .with_operation(TransformOperation::Remove {
263                field: "missing".to_string(),
264            });
265
266        let out = pipeline
267            .process_item(TitleOnlyItem {
268                title: "old".to_string(),
269            })
270            .await
271            .expect("pipeline should not fail")
272            .expect("item should pass");
273
274        assert_eq!(out.title, "final");
275    }
276
277    #[tokio::test]
278    async fn applies_custom_transform() {
279        let pipeline = TransformPipeline::<ProductItem>::new().with_transform(|json| {
280            let map = json
281                .as_object_mut()
282                .ok_or_else(|| "object expected".to_string())?;
283            map.insert("stock".to_string(), json!(42));
284            Ok(())
285        });
286
287        let out = pipeline
288            .process_item(ProductItem {
289                title: "A".to_string(),
290                slug: "b".to_string(),
291                stock: 0,
292            })
293            .await
294            .expect("pipeline should not fail")
295            .expect("item should pass");
296
297        assert_eq!(out.stock, 42);
298    }
299
300    #[tokio::test]
301    async fn drops_on_deserialize_failure_after_transform() {
302        let pipeline = TransformPipeline::<ProductItem>::new().with_transform(|json| {
303            let map = json
304                .as_object_mut()
305                .ok_or_else(|| "object expected".to_string())?;
306            map.insert("stock".to_string(), json!("not_a_number"));
307            Ok(())
308        });
309
310        let out = pipeline
311            .process_item(ProductItem {
312                title: "A".to_string(),
313                slug: "b".to_string(),
314                stock: 0,
315            })
316            .await
317            .expect("pipeline should not fail");
318
319        assert!(out.is_none());
320    }
321
322    #[tokio::test]
323    async fn missing_field_operation_is_noop() {
324        let pipeline =
325            TransformPipeline::<ProductItem>::new().with_operation(TransformOperation::Uppercase {
326                field: "missing".to_string(),
327            });
328
329        let out = pipeline
330            .process_item(ProductItem {
331                title: "A".to_string(),
332                slug: "b".to_string(),
333                stock: 1,
334            })
335            .await
336            .expect("pipeline should not fail")
337            .expect("item should pass");
338
339        assert_eq!(out.title, "A");
340        assert_eq!(out.slug, "b");
341    }
342}