spider_pipeline/
transform.rs1use crate::pipeline::Pipeline;
7use async_trait::async_trait;
8use log::{debug, warn};
9use serde::Serialize;
10use serde::de::DeserializeOwned;
11use serde_json::Value;
12use spider_util::{error::PipelineError, item::ScrapedItem};
13use std::marker::PhantomData;
14use std::sync::Arc;
15
16type TransformFn = dyn Fn(&mut Value) -> Result<(), String> + Send + Sync + 'static;
17
18#[derive(Debug, Clone)]
20pub enum TransformOperation {
21 Trim { field: String },
22 Lowercase { field: String },
23 Uppercase { field: String },
24 Rename { from: String, to: String },
25 Remove { field: String },
26 Set { field: String, value: Value },
27 SetDefault { field: String, value: Value },
28}
29
30pub struct TransformPipeline<I>
32where
33 I: ScrapedItem + Serialize + DeserializeOwned,
34{
35 operations: Vec<TransformOperation>,
36 transforms: Vec<Arc<TransformFn>>,
37 _phantom: PhantomData<I>,
38}
39
40impl<I> TransformPipeline<I>
41where
42 I: ScrapedItem + Serialize + DeserializeOwned,
43{
44 pub fn new() -> Self {
46 Self {
47 operations: Vec::new(),
48 transforms: Vec::new(),
49 _phantom: PhantomData,
50 }
51 }
52
53 pub fn with_operation(mut self, operation: TransformOperation) -> Self {
55 self.operations.push(operation);
56 self
57 }
58
59 pub fn with_transform<F>(mut self, transform: F) -> Self
61 where
62 F: Fn(&mut Value) -> Result<(), String> + Send + Sync + 'static,
63 {
64 self.transforms.push(Arc::new(transform));
65 self
66 }
67
68 fn apply_operation(value: &mut Value, operation: &TransformOperation) -> Result<(), String> {
69 let map = value
70 .as_object_mut()
71 .ok_or_else(|| "Item must be a JSON object for transformation.".to_string())?;
72
73 match operation {
74 TransformOperation::Trim { field } => {
75 if let Some(raw) = map.get_mut(field) {
76 let text = raw
77 .as_str()
78 .ok_or_else(|| format!("Field '{}' must be a string for Trim.", field))?;
79 *raw = Value::String(text.trim().to_string());
80 }
81 Ok(())
82 }
83 TransformOperation::Lowercase { field } => {
84 if let Some(raw) = map.get_mut(field) {
85 let text = raw.as_str().ok_or_else(|| {
86 format!("Field '{}' must be a string for Lowercase.", field)
87 })?;
88 *raw = Value::String(text.to_lowercase());
89 }
90 Ok(())
91 }
92 TransformOperation::Uppercase { field } => {
93 if let Some(raw) = map.get_mut(field) {
94 let text = raw.as_str().ok_or_else(|| {
95 format!("Field '{}' must be a string for Uppercase.", field)
96 })?;
97 *raw = Value::String(text.to_uppercase());
98 }
99 Ok(())
100 }
101 TransformOperation::Rename { from, to } => {
102 if let Some(value) = map.remove(from) {
103 map.insert(to.clone(), value);
104 }
105 Ok(())
106 }
107 TransformOperation::Remove { field } => {
108 map.remove(field);
109 Ok(())
110 }
111 TransformOperation::Set { field, value } => {
112 map.insert(field.clone(), value.clone());
113 Ok(())
114 }
115 TransformOperation::SetDefault { field, value } => {
116 if !map.contains_key(field) {
117 map.insert(field.clone(), value.clone());
118 }
119 Ok(())
120 }
121 }
122 }
123}
124
125impl<I> Default for TransformPipeline<I>
126where
127 I: ScrapedItem + Serialize + DeserializeOwned,
128{
129 fn default() -> Self {
130 Self::new()
131 }
132}
133
134#[async_trait]
135impl<I> Pipeline<I> for TransformPipeline<I>
136where
137 I: ScrapedItem + Serialize + DeserializeOwned,
138{
139 fn name(&self) -> &str {
140 "TransformPipeline"
141 }
142
143 async fn process_item(&self, item: I) -> Result<Option<I>, PipelineError> {
144 debug!("TransformPipeline processing item.");
145
146 let mut json = item.to_json_value();
147
148 for operation in &self.operations {
149 if let Err(err) = Self::apply_operation(&mut json, operation) {
150 warn!("Transform operation failed, dropping item: {}", err);
151 return Ok(None);
152 }
153 }
154
155 for transform in &self.transforms {
156 if let Err(err) = transform(&mut json) {
157 warn!("Custom transform failed, dropping item: {}", err);
158 return Ok(None);
159 }
160 }
161
162 match serde_json::from_value::<I>(json) {
163 Ok(transformed) => Ok(Some(transformed)),
164 Err(err) => {
165 warn!(
166 "Failed to deserialize transformed item, dropping item: {}",
167 err
168 );
169 Ok(None)
170 }
171 }
172 }
173}