1use crate::pipeline::Pipeline;
7use async_trait::async_trait;
8use log::{debug, warn};
9use serde::Serialize;
10use serde::de::DeserializeOwned;
11use serde_json::Value;
12use spider_util::{error::PipelineError, item::ScrapedItem};
13use std::marker::PhantomData;
14use std::sync::Arc;
15
16type TransformFn = dyn Fn(&mut Value) -> Result<(), String> + Send + Sync + 'static;
17
18#[derive(Debug, Clone)]
20pub enum TransformOperation {
21 Trim { field: String },
22 Lowercase { field: String },
23 Uppercase { field: String },
24 Rename { from: String, to: String },
25 Remove { field: String },
26 Set { field: String, value: Value },
27 SetDefault { field: String, value: Value },
28}
29
30pub struct TransformPipeline<I>
32where
33 I: ScrapedItem + Serialize + DeserializeOwned,
34{
35 operations: Vec<TransformOperation>,
36 transforms: Vec<Arc<TransformFn>>,
37 _phantom: PhantomData<I>,
38}
39
40impl<I> TransformPipeline<I>
41where
42 I: ScrapedItem + Serialize + DeserializeOwned,
43{
44 pub fn new() -> Self {
46 Self {
47 operations: Vec::new(),
48 transforms: Vec::new(),
49 _phantom: PhantomData,
50 }
51 }
52
53 pub fn with_operation(mut self, operation: TransformOperation) -> Self {
55 self.operations.push(operation);
56 self
57 }
58
59 pub fn with_transform<F>(mut self, transform: F) -> Self
61 where
62 F: Fn(&mut Value) -> Result<(), String> + Send + Sync + 'static,
63 {
64 self.transforms.push(Arc::new(transform));
65 self
66 }
67
68 fn apply_operation(value: &mut Value, operation: &TransformOperation) -> Result<(), String> {
69 let map = value
70 .as_object_mut()
71 .ok_or_else(|| "Item must be a JSON object for transformation.".to_string())?;
72
73 match operation {
74 TransformOperation::Trim { field } => {
75 if let Some(raw) = map.get_mut(field) {
76 let text = raw
77 .as_str()
78 .ok_or_else(|| format!("Field '{}' must be a string for Trim.", field))?;
79 *raw = Value::String(text.trim().to_string());
80 }
81 Ok(())
82 }
83 TransformOperation::Lowercase { field } => {
84 if let Some(raw) = map.get_mut(field) {
85 let text = raw.as_str().ok_or_else(|| {
86 format!("Field '{}' must be a string for Lowercase.", field)
87 })?;
88 *raw = Value::String(text.to_lowercase());
89 }
90 Ok(())
91 }
92 TransformOperation::Uppercase { field } => {
93 if let Some(raw) = map.get_mut(field) {
94 let text = raw.as_str().ok_or_else(|| {
95 format!("Field '{}' must be a string for Uppercase.", field)
96 })?;
97 *raw = Value::String(text.to_uppercase());
98 }
99 Ok(())
100 }
101 TransformOperation::Rename { from, to } => {
102 if let Some(value) = map.remove(from) {
103 map.insert(to.clone(), value);
104 }
105 Ok(())
106 }
107 TransformOperation::Remove { field } => {
108 map.remove(field);
109 Ok(())
110 }
111 TransformOperation::Set { field, value } => {
112 map.insert(field.clone(), value.clone());
113 Ok(())
114 }
115 TransformOperation::SetDefault { field, value } => {
116 if !map.contains_key(field) {
117 map.insert(field.clone(), value.clone());
118 }
119 Ok(())
120 }
121 }
122 }
123}
124
125impl<I> Default for TransformPipeline<I>
126where
127 I: ScrapedItem + Serialize + DeserializeOwned,
128{
129 fn default() -> Self {
130 Self::new()
131 }
132}
133
134#[async_trait]
135impl<I> Pipeline<I> for TransformPipeline<I>
136where
137 I: ScrapedItem + Serialize + DeserializeOwned,
138{
139 fn name(&self) -> &str {
140 "TransformPipeline"
141 }
142
143 async fn process_item(&self, item: I) -> Result<Option<I>, PipelineError> {
144 debug!("TransformPipeline processing item.");
145
146 let mut json = item.to_json_value();
147
148 for operation in &self.operations {
149 if let Err(err) = Self::apply_operation(&mut json, operation) {
150 warn!("Transform operation failed, dropping item: {}", err);
151 return Ok(None);
152 }
153 }
154
155 for transform in &self.transforms {
156 if let Err(err) = transform(&mut json) {
157 warn!("Custom transform failed, dropping item: {}", err);
158 return Ok(None);
159 }
160 }
161
162 match serde_json::from_value::<I>(json) {
163 Ok(transformed) => Ok(Some(transformed)),
164 Err(err) => {
165 warn!(
166 "Failed to deserialize transformed item, dropping item: {}",
167 err
168 );
169 Ok(None)
170 }
171 }
172 }
173}
174
175#[cfg(test)]
176mod tests {
177 use super::*;
178 use serde::{Deserialize, Serialize};
179 use serde_json::json;
180 use spider_util::item::ScrapedItem;
181 use std::any::Any;
182
183 #[derive(Debug, Clone, Serialize, Deserialize)]
184 struct ProductItem {
185 title: String,
186 slug: String,
187 stock: i32,
188 }
189
190 impl ScrapedItem for ProductItem {
191 fn as_any(&self) -> &dyn Any {
192 self
193 }
194
195 fn box_clone(&self) -> Box<dyn ScrapedItem + Send + Sync> {
196 Box::new(self.clone())
197 }
198
199 fn to_json_value(&self) -> Value {
200 serde_json::to_value(self).expect("serialize test item")
201 }
202 }
203
204 #[derive(Debug, Clone, Serialize, Deserialize)]
205 struct TitleOnlyItem {
206 title: String,
207 }
208
209 impl ScrapedItem for TitleOnlyItem {
210 fn as_any(&self) -> &dyn Any {
211 self
212 }
213
214 fn box_clone(&self) -> Box<dyn ScrapedItem + Send + Sync> {
215 Box::new(self.clone())
216 }
217
218 fn to_json_value(&self) -> Value {
219 serde_json::to_value(self).expect("serialize test item")
220 }
221 }
222
223 #[tokio::test]
224 async fn applies_string_operations() {
225 let pipeline = TransformPipeline::<ProductItem>::new()
226 .with_operation(TransformOperation::Trim {
227 field: "title".to_string(),
228 })
229 .with_operation(TransformOperation::Lowercase {
230 field: "slug".to_string(),
231 });
232
233 let out = pipeline
234 .process_item(ProductItem {
235 title: " Book ".to_string(),
236 slug: "HELLO-WORLD".to_string(),
237 stock: 1,
238 })
239 .await
240 .expect("pipeline should not fail")
241 .expect("item should pass");
242
243 assert_eq!(out.title, "Book");
244 assert_eq!(out.slug, "hello-world");
245 }
246
247 #[tokio::test]
248 async fn applies_rename_remove_set_and_default() {
249 let pipeline = TransformPipeline::<TitleOnlyItem>::new()
250 .with_operation(TransformOperation::Rename {
251 from: "title".to_string(),
252 to: "title".to_string(),
253 })
254 .with_operation(TransformOperation::SetDefault {
255 field: "title".to_string(),
256 value: json!("fallback"),
257 })
258 .with_operation(TransformOperation::Set {
259 field: "title".to_string(),
260 value: json!("final"),
261 })
262 .with_operation(TransformOperation::Remove {
263 field: "missing".to_string(),
264 });
265
266 let out = pipeline
267 .process_item(TitleOnlyItem {
268 title: "old".to_string(),
269 })
270 .await
271 .expect("pipeline should not fail")
272 .expect("item should pass");
273
274 assert_eq!(out.title, "final");
275 }
276
277 #[tokio::test]
278 async fn applies_custom_transform() {
279 let pipeline = TransformPipeline::<ProductItem>::new().with_transform(|json| {
280 let map = json
281 .as_object_mut()
282 .ok_or_else(|| "object expected".to_string())?;
283 map.insert("stock".to_string(), json!(42));
284 Ok(())
285 });
286
287 let out = pipeline
288 .process_item(ProductItem {
289 title: "A".to_string(),
290 slug: "b".to_string(),
291 stock: 0,
292 })
293 .await
294 .expect("pipeline should not fail")
295 .expect("item should pass");
296
297 assert_eq!(out.stock, 42);
298 }
299
300 #[tokio::test]
301 async fn drops_on_deserialize_failure_after_transform() {
302 let pipeline = TransformPipeline::<ProductItem>::new().with_transform(|json| {
303 let map = json
304 .as_object_mut()
305 .ok_or_else(|| "object expected".to_string())?;
306 map.insert("stock".to_string(), json!("not_a_number"));
307 Ok(())
308 });
309
310 let out = pipeline
311 .process_item(ProductItem {
312 title: "A".to_string(),
313 slug: "b".to_string(),
314 stock: 0,
315 })
316 .await
317 .expect("pipeline should not fail");
318
319 assert!(out.is_none());
320 }
321
322 #[tokio::test]
323 async fn missing_field_operation_is_noop() {
324 let pipeline =
325 TransformPipeline::<ProductItem>::new().with_operation(TransformOperation::Uppercase {
326 field: "missing".to_string(),
327 });
328
329 let out = pipeline
330 .process_item(ProductItem {
331 title: "A".to_string(),
332 slug: "b".to_string(),
333 stock: 1,
334 })
335 .await
336 .expect("pipeline should not fail")
337 .expect("item should pass");
338
339 assert_eq!(out.title, "A");
340 assert_eq!(out.slug, "b");
341 }
342}