oversync_transforms/
lib.rs1mod parse;
2pub mod steps;
3#[cfg(feature = "wasm")]
4pub mod wasm;
5
6use async_trait::async_trait;
7use oversync_core::error::OversyncError;
8use oversync_core::model::EventEnvelope;
9use oversync_core::traits::TransformHook;
10
11pub use parse::parse_steps;
12pub use steps::*;
13
14pub trait TransformStep: Send + Sync {
19 fn apply(&self, data: &mut serde_json::Value) -> Result<bool, OversyncError>;
20
21 fn step_name(&self) -> &str;
22}
23
24pub struct StepChain {
29 steps: Vec<Box<dyn TransformStep>>,
30}
31
32impl std::fmt::Debug for StepChain {
33 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
34 let names: Vec<&str> = self.steps.iter().map(|s| s.step_name()).collect();
35 f.debug_struct("StepChain").field("steps", &names).finish()
36 }
37}
38
39impl StepChain {
40 pub fn new(steps: Vec<Box<dyn TransformStep>>) -> Self {
41 Self { steps }
42 }
43
44 pub fn apply_one(&self, data: &mut serde_json::Value) -> Result<bool, OversyncError> {
45 for step in &self.steps {
46 if !step.apply(data)? {
47 return Ok(false);
48 }
49 }
50 Ok(true)
51 }
52
53 pub fn len(&self) -> usize {
54 self.steps.len()
55 }
56
57 pub fn is_empty(&self) -> bool {
58 self.steps.is_empty()
59 }
60
61 pub fn filter_rows(
64 &self,
65 rows: Vec<oversync_core::model::RawRow>,
66 ) -> Result<Vec<oversync_core::model::RawRow>, OversyncError> {
67 let mut kept = Vec::with_capacity(rows.len());
68 for mut row in rows {
69 if self.apply_one(&mut row.row_data)? {
70 kept.push(row);
71 }
72 }
73 Ok(kept)
74 }
75}
76
77#[async_trait]
78impl TransformHook for StepChain {
79 async fn transform(
80 &self,
81 envelopes: Vec<EventEnvelope>,
82 ) -> Result<Vec<EventEnvelope>, OversyncError> {
83 let mut output = Vec::with_capacity(envelopes.len());
84 for mut envelope in envelopes {
85 if self.apply_one(&mut envelope.data)? {
86 output.push(envelope);
87 }
88 }
89 Ok(output)
90 }
91}
92
93#[cfg(test)]
94mod tests {
95 use super::*;
96 use oversync_core::model::{EventMeta, OpType};
97
98 fn test_envelope(data: serde_json::Value) -> EventEnvelope {
99 EventEnvelope {
100 meta: EventMeta {
101 op: OpType::Created,
102 origin_id: "test".into(),
103 query_id: "q".into(),
104 key: "k".into(),
105 hash: "h".into(),
106 cycle_id: 1,
107 timestamp: chrono::Utc::now(),
108 },
109 data,
110 }
111 }
112
113 struct AlwaysKeep;
114 impl TransformStep for AlwaysKeep {
115 fn apply(&self, _data: &mut serde_json::Value) -> Result<bool, OversyncError> {
116 Ok(true)
117 }
118 fn step_name(&self) -> &str {
119 "always_keep"
120 }
121 }
122
123 struct AlwaysDrop;
124 impl TransformStep for AlwaysDrop {
125 fn apply(&self, _data: &mut serde_json::Value) -> Result<bool, OversyncError> {
126 Ok(false)
127 }
128 fn step_name(&self) -> &str {
129 "always_drop"
130 }
131 }
132
133 struct FailStep;
134 impl TransformStep for FailStep {
135 fn apply(&self, _data: &mut serde_json::Value) -> Result<bool, OversyncError> {
136 Err(OversyncError::Internal("step failed".into()))
137 }
138 fn step_name(&self) -> &str {
139 "fail"
140 }
141 }
142
143 #[test]
144 fn empty_chain_is_passthrough() {
145 let chain = StepChain::new(vec![]);
146 assert!(chain.is_empty());
147 let mut data = serde_json::json!({"x": 1});
148 assert!(chain.apply_one(&mut data).unwrap());
149 assert_eq!(data, serde_json::json!({"x": 1}));
150 }
151
152 #[test]
153 fn chain_keeps_record() {
154 let chain = StepChain::new(vec![Box::new(AlwaysKeep)]);
155 let mut data = serde_json::json!({"x": 1});
156 assert!(chain.apply_one(&mut data).unwrap());
157 }
158
159 #[test]
160 fn chain_drops_record() {
161 let chain = StepChain::new(vec![Box::new(AlwaysDrop)]);
162 let mut data = serde_json::json!({"x": 1});
163 assert!(!chain.apply_one(&mut data).unwrap());
164 }
165
166 #[test]
167 fn chain_short_circuits_on_drop() {
168 let chain = StepChain::new(vec![Box::new(AlwaysDrop), Box::new(FailStep)]);
169 let mut data = serde_json::json!({});
170 assert!(!chain.apply_one(&mut data).unwrap());
171 }
172
173 #[test]
174 fn chain_short_circuits_on_error() {
175 let chain = StepChain::new(vec![Box::new(FailStep), Box::new(AlwaysKeep)]);
176 let mut data = serde_json::json!({});
177 assert!(chain.apply_one(&mut data).is_err());
178 }
179
180 #[tokio::test]
181 async fn chain_as_transform_hook() {
182 let chain = StepChain::new(vec![Box::new(AlwaysKeep)]);
183 let input = vec![
184 test_envelope(serde_json::json!({"a": 1})),
185 test_envelope(serde_json::json!({"b": 2})),
186 ];
187 let output = chain.transform(input).await.unwrap();
188 assert_eq!(output.len(), 2);
189 }
190
191 #[tokio::test]
192 async fn chain_hook_filters_records() {
193 let chain = StepChain::new(vec![Box::new(AlwaysDrop)]);
194 let input = vec![
195 test_envelope(serde_json::json!({"a": 1})),
196 test_envelope(serde_json::json!({"b": 2})),
197 ];
198 let output = chain.transform(input).await.unwrap();
199 assert!(output.is_empty());
200 }
201
202 #[tokio::test]
203 async fn chain_hook_propagates_error() {
204 let chain = StepChain::new(vec![Box::new(FailStep)]);
205 let input = vec![test_envelope(serde_json::json!({}))];
206 let result = chain.transform(input).await;
207 assert!(result.is_err());
208 }
209
210 use oversync_core::model::RawRow;
213
214 fn test_rows() -> Vec<RawRow> {
215 vec![
216 RawRow {
217 row_key: "1".into(),
218 row_data: serde_json::json!({"name": "alice"}),
219 },
220 RawRow {
221 row_key: "2".into(),
222 row_data: serde_json::json!({"name": "bob"}),
223 },
224 RawRow {
225 row_key: "3".into(),
226 row_data: serde_json::json!({"name": "charlie"}),
227 },
228 ]
229 }
230
231 #[test]
232 fn filter_rows_keeps_all() {
233 let chain = StepChain::new(vec![Box::new(AlwaysKeep)]);
234 let result = chain.filter_rows(test_rows()).unwrap();
235 assert_eq!(result.len(), 3);
236 }
237
238 #[test]
239 fn filter_rows_drops_all() {
240 let chain = StepChain::new(vec![Box::new(AlwaysDrop)]);
241 let result = chain.filter_rows(test_rows()).unwrap();
242 assert!(result.is_empty());
243 }
244
245 #[test]
246 fn filter_rows_empty_chain_keeps_all() {
247 let chain = StepChain::new(vec![]);
248 let result = chain.filter_rows(test_rows()).unwrap();
249 assert_eq!(result.len(), 3);
250 }
251
252 #[test]
253 fn filter_rows_error_propagates() {
254 let chain = StepChain::new(vec![Box::new(FailStep)]);
255 let result = chain.filter_rows(test_rows());
256 assert!(result.is_err());
257 }
258
259 #[test]
260 fn filter_rows_partial() {
261 use crate::steps::{Filter, FilterOp};
262 let chain = StepChain::new(vec![Box::new(Filter {
263 field: "name".into(),
264 op: FilterOp::Eq,
265 value: serde_json::json!("alice"),
266 })]);
267 let result = chain.filter_rows(test_rows()).unwrap();
268 assert_eq!(result.len(), 1);
269 assert_eq!(result[0].row_key, "1");
270 }
271}