oversync_transforms/
lib.rs1#[cfg(feature = "js")]
2pub mod js;
3mod parse;
4pub mod steps;
5#[cfg(feature = "wasm")]
6pub mod wasm;
7
8use async_trait::async_trait;
9use oversync_core::error::OversyncError;
10use oversync_core::model::EventEnvelope;
11use oversync_core::traits::TransformHook;
12
13pub use parse::parse_steps;
14pub use steps::*;
15
16pub trait TransformStep: Send + Sync {
21 fn apply(&self, data: &mut serde_json::Value) -> Result<bool, OversyncError>;
22
23 fn step_name(&self) -> &str;
24}
25
26pub struct StepChain {
31 steps: Vec<Box<dyn TransformStep>>,
32}
33
34impl std::fmt::Debug for StepChain {
35 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
36 let names: Vec<&str> = self.steps.iter().map(|s| s.step_name()).collect();
37 f.debug_struct("StepChain").field("steps", &names).finish()
38 }
39}
40
41impl StepChain {
42 pub fn new(steps: Vec<Box<dyn TransformStep>>) -> Self {
43 Self { steps }
44 }
45
46 pub fn apply_one(&self, data: &mut serde_json::Value) -> Result<bool, OversyncError> {
47 for step in &self.steps {
48 if !step.apply(data)? {
49 return Ok(false);
50 }
51 }
52 Ok(true)
53 }
54
55 pub fn len(&self) -> usize {
56 self.steps.len()
57 }
58
59 pub fn is_empty(&self) -> bool {
60 self.steps.is_empty()
61 }
62
63 pub fn filter_rows(
66 &self,
67 rows: Vec<oversync_core::model::RawRow>,
68 ) -> Result<Vec<oversync_core::model::RawRow>, OversyncError> {
69 let mut kept = Vec::with_capacity(rows.len());
70 for mut row in rows {
71 if self.apply_one(&mut row.row_data)? {
72 kept.push(row);
73 }
74 }
75 Ok(kept)
76 }
77}
78
79#[async_trait]
80impl TransformHook for StepChain {
81 async fn transform(
82 &self,
83 envelopes: Vec<EventEnvelope>,
84 ) -> Result<Vec<EventEnvelope>, OversyncError> {
85 let mut output = Vec::with_capacity(envelopes.len());
86 for mut envelope in envelopes {
87 if self.apply_one(&mut envelope.data)? {
88 output.push(envelope);
89 }
90 }
91 Ok(output)
92 }
93}
94
95#[cfg(test)]
96mod tests {
97 use super::*;
98 use oversync_core::model::{EventMeta, OpType};
99
100 fn test_envelope(data: serde_json::Value) -> EventEnvelope {
101 EventEnvelope {
102 meta: EventMeta {
103 op: OpType::Created,
104 origin_id: "test".into(),
105 query_id: "q".into(),
106 key: "k".into(),
107 hash: "h".into(),
108 cycle_id: 1,
109 timestamp: chrono::Utc::now(),
110 },
111 data,
112 }
113 }
114
115 struct AlwaysKeep;
116 impl TransformStep for AlwaysKeep {
117 fn apply(&self, _data: &mut serde_json::Value) -> Result<bool, OversyncError> {
118 Ok(true)
119 }
120 fn step_name(&self) -> &str {
121 "always_keep"
122 }
123 }
124
125 struct AlwaysDrop;
126 impl TransformStep for AlwaysDrop {
127 fn apply(&self, _data: &mut serde_json::Value) -> Result<bool, OversyncError> {
128 Ok(false)
129 }
130 fn step_name(&self) -> &str {
131 "always_drop"
132 }
133 }
134
135 struct FailStep;
136 impl TransformStep for FailStep {
137 fn apply(&self, _data: &mut serde_json::Value) -> Result<bool, OversyncError> {
138 Err(OversyncError::Internal("step failed".into()))
139 }
140 fn step_name(&self) -> &str {
141 "fail"
142 }
143 }
144
145 #[test]
146 fn empty_chain_is_passthrough() {
147 let chain = StepChain::new(vec![]);
148 assert!(chain.is_empty());
149 let mut data = serde_json::json!({"x": 1});
150 assert!(chain.apply_one(&mut data).unwrap());
151 assert_eq!(data, serde_json::json!({"x": 1}));
152 }
153
154 #[test]
155 fn chain_keeps_record() {
156 let chain = StepChain::new(vec![Box::new(AlwaysKeep)]);
157 let mut data = serde_json::json!({"x": 1});
158 assert!(chain.apply_one(&mut data).unwrap());
159 }
160
161 #[test]
162 fn chain_drops_record() {
163 let chain = StepChain::new(vec![Box::new(AlwaysDrop)]);
164 let mut data = serde_json::json!({"x": 1});
165 assert!(!chain.apply_one(&mut data).unwrap());
166 }
167
168 #[test]
169 fn chain_short_circuits_on_drop() {
170 let chain = StepChain::new(vec![Box::new(AlwaysDrop), Box::new(FailStep)]);
171 let mut data = serde_json::json!({});
172 assert!(!chain.apply_one(&mut data).unwrap());
173 }
174
175 #[test]
176 fn chain_short_circuits_on_error() {
177 let chain = StepChain::new(vec![Box::new(FailStep), Box::new(AlwaysKeep)]);
178 let mut data = serde_json::json!({});
179 assert!(chain.apply_one(&mut data).is_err());
180 }
181
182 #[tokio::test]
183 async fn chain_as_transform_hook() {
184 let chain = StepChain::new(vec![Box::new(AlwaysKeep)]);
185 let input = vec![
186 test_envelope(serde_json::json!({"a": 1})),
187 test_envelope(serde_json::json!({"b": 2})),
188 ];
189 let output = chain.transform(input).await.unwrap();
190 assert_eq!(output.len(), 2);
191 }
192
193 #[tokio::test]
194 async fn chain_hook_filters_records() {
195 let chain = StepChain::new(vec![Box::new(AlwaysDrop)]);
196 let input = vec![
197 test_envelope(serde_json::json!({"a": 1})),
198 test_envelope(serde_json::json!({"b": 2})),
199 ];
200 let output = chain.transform(input).await.unwrap();
201 assert!(output.is_empty());
202 }
203
204 #[tokio::test]
205 async fn chain_hook_propagates_error() {
206 let chain = StepChain::new(vec![Box::new(FailStep)]);
207 let input = vec![test_envelope(serde_json::json!({}))];
208 let result = chain.transform(input).await;
209 assert!(result.is_err());
210 }
211
212 use oversync_core::model::RawRow;
215
216 fn test_rows() -> Vec<RawRow> {
217 vec![
218 RawRow {
219 row_key: "1".into(),
220 row_data: serde_json::json!({"name": "alice"}),
221 },
222 RawRow {
223 row_key: "2".into(),
224 row_data: serde_json::json!({"name": "bob"}),
225 },
226 RawRow {
227 row_key: "3".into(),
228 row_data: serde_json::json!({"name": "charlie"}),
229 },
230 ]
231 }
232
233 #[test]
234 fn filter_rows_keeps_all() {
235 let chain = StepChain::new(vec![Box::new(AlwaysKeep)]);
236 let result = chain.filter_rows(test_rows()).unwrap();
237 assert_eq!(result.len(), 3);
238 }
239
240 #[test]
241 fn filter_rows_drops_all() {
242 let chain = StepChain::new(vec![Box::new(AlwaysDrop)]);
243 let result = chain.filter_rows(test_rows()).unwrap();
244 assert!(result.is_empty());
245 }
246
247 #[test]
248 fn filter_rows_empty_chain_keeps_all() {
249 let chain = StepChain::new(vec![]);
250 let result = chain.filter_rows(test_rows()).unwrap();
251 assert_eq!(result.len(), 3);
252 }
253
254 #[test]
255 fn filter_rows_error_propagates() {
256 let chain = StepChain::new(vec![Box::new(FailStep)]);
257 let result = chain.filter_rows(test_rows());
258 assert!(result.is_err());
259 }
260
261 #[test]
262 fn filter_rows_partial() {
263 use crate::steps::{Filter, FilterOp};
264 let chain = StepChain::new(vec![Box::new(Filter {
265 field: "name".into(),
266 op: FilterOp::Eq,
267 value: serde_json::json!("alice"),
268 })]);
269 let result = chain.filter_rows(test_rows()).unwrap();
270 assert_eq!(result.len(), 1);
271 assert_eq!(result[0].row_key, "1");
272 }
273}