oversync_transforms/
js.rs1use oversync_core::error::OversyncError;
2
3use crate::TransformStep;
4
5pub struct JsStep {
20 name: String,
21 ctx: rquickjs::Context,
22}
23
24impl std::fmt::Debug for JsStep {
25 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
26 f.debug_struct("JsStep").field("name", &self.name).finish()
27 }
28}
29
30impl JsStep {
31 pub fn new(name: &str, function_source: &str) -> Result<Self, OversyncError> {
35 let rt = rquickjs::Runtime::new()
36 .map_err(|e| OversyncError::Plugin(format!("js runtime init '{name}': {e}")))?;
37
38 rt.set_memory_limit(32 * 1024 * 1024);
40 rt.set_max_stack_size(1024 * 1024);
42
43 let ctx = rquickjs::Context::full(&rt)
44 .map_err(|e| OversyncError::Plugin(format!("js context init '{name}': {e}")))?;
45
46 ctx.with(|ctx| {
48 ctx.eval::<(), _>(function_source)
49 .map_err(|e| OversyncError::Config(format!("js compile '{name}': {e}")))?;
50
51 let globals = ctx.globals();
53 let has_transform: bool = globals
54 .get::<_, rquickjs::Value>("transform")
55 .map(|v| v.is_function())
56 .unwrap_or(false);
57
58 if !has_transform {
59 return Err(OversyncError::Config(format!(
60 "js '{name}': source must define a 'transform' function"
61 )));
62 }
63
64 Ok(())
65 })?;
66
67 Ok(Self {
68 name: name.to_string(),
69 ctx,
70 })
71 }
72}
73
74impl TransformStep for JsStep {
75 fn apply(&self, data: &mut serde_json::Value) -> Result<bool, OversyncError> {
76 self.ctx.with(|ctx| {
77 let js_input = rquickjs_serde::to_value(ctx.clone(), &*data)
79 .map_err(|e| OversyncError::Plugin(format!("js serialize: {e}")))?;
80
81 let transform: rquickjs::Function = ctx
83 .globals()
84 .get("transform")
85 .map_err(|e| OversyncError::Plugin(format!("js get transform: {e}")))?;
86
87 let result: rquickjs::Value = transform
89 .call((js_input,))
90 .map_err(|e| OversyncError::Plugin(format!("js transform: {e}")))?;
91
92 if result.is_null() || result.is_undefined() {
94 return Ok(false);
95 }
96
97 let output: serde_json::Value = rquickjs_serde::from_value(result)
99 .map_err(|e| OversyncError::Plugin(format!("js deserialize: {e}")))?;
100
101 *data = output;
102 Ok(true)
103 })
104 }
105
106 fn step_name(&self) -> &str {
107 &self.name
108 }
109}
110
111#[cfg(test)]
112mod tests {
113 use super::*;
114
115 #[test]
116 fn passthrough() {
117 let step = JsStep::new("test", "function transform(row) { return row; }").unwrap();
118 let mut data = serde_json::json!({"name": "alice", "age": 30});
119 assert!(step.apply(&mut data).unwrap());
120 assert_eq!(data, serde_json::json!({"name": "alice", "age": 30}));
121 }
122
123 #[test]
124 fn add_computed_field() {
125 let step = JsStep::new(
126 "test",
127 "function transform(row) { return { ...row, total: row.price * row.qty } }",
128 )
129 .unwrap();
130 let mut data = serde_json::json!({"price": 10.5, "qty": 3});
131 assert!(step.apply(&mut data).unwrap());
132 assert_eq!(data["total"], 31.5);
133 assert_eq!(data["price"], 10.5);
134 }
135
136 #[test]
137 fn filter_by_returning_null() {
138 let step = JsStep::new(
139 "test",
140 "function transform(row) { return row.active ? row : null; }",
141 )
142 .unwrap();
143
144 let mut keep = serde_json::json!({"active": true, "name": "alice"});
145 assert!(step.apply(&mut keep).unwrap());
146
147 let mut drop = serde_json::json!({"active": false, "name": "bob"});
148 assert!(!step.apply(&mut drop).unwrap());
149 }
150
151 #[test]
152 fn reshape_jsonb() {
153 let step = JsStep::new(
154 "test",
155 r#"function transform(row) {
156 var meta = typeof row.metadata === 'string' ? JSON.parse(row.metadata) : row.metadata;
157 return {
158 id: row.id,
159 email: meta.contact.email,
160 tier: meta.subscription.tier,
161 active: meta.subscription.active
162 };
163 }"#,
164 )
165 .unwrap();
166
167 let mut data = serde_json::json!({
168 "id": "u1",
169 "metadata": {
170 "contact": {"email": "a@b.com", "phone": "555"},
171 "subscription": {"tier": "pro", "active": true}
172 }
173 });
174 assert!(step.apply(&mut data).unwrap());
175 assert_eq!(
176 data,
177 serde_json::json!({
178 "id": "u1",
179 "email": "a@b.com",
180 "tier": "pro",
181 "active": true
182 })
183 );
184 }
185
186 #[test]
187 fn multiple_calls_reuse_context() {
188 let step = JsStep::new(
189 "test",
190 "var count = 0; function transform(row) { count++; row.seq = count; return row; }",
191 )
192 .unwrap();
193
194 let mut d1 = serde_json::json!({"x": 1});
195 step.apply(&mut d1).unwrap();
196 assert_eq!(d1["seq"], 1);
197
198 let mut d2 = serde_json::json!({"x": 2});
199 step.apply(&mut d2).unwrap();
200 assert_eq!(d2["seq"], 2);
201 }
202
203 #[test]
204 fn missing_transform_function_errors() {
205 let err = JsStep::new("test", "function foo(row) { return row; }").unwrap_err();
206 assert!(
207 err.to_string()
208 .contains("must define a 'transform' function")
209 );
210 }
211
212 #[test]
213 fn syntax_error_in_source() {
214 let err = JsStep::new("test", "function transform(row { return row; }").unwrap_err();
215 assert!(err.to_string().contains("js compile"));
216 }
217
218 #[test]
219 fn runtime_error_propagates() {
220 let step = JsStep::new(
221 "test",
222 "function transform(row) { throw new Error('boom'); }",
223 )
224 .unwrap();
225 let mut data = serde_json::json!({});
226 let err = step.apply(&mut data).unwrap_err();
227 assert!(err.to_string().contains("js transform"), "got: {err}");
228 }
229
230 #[test]
231 fn return_undefined_filters() {
232 let step = JsStep::new("test", "function transform(row) { }").unwrap();
233 let mut data = serde_json::json!({"x": 1});
234 assert!(!step.apply(&mut data).unwrap());
235 }
236
237 #[test]
238 fn array_and_nested_objects() {
239 let step = JsStep::new(
240 "test",
241 r#"function transform(row) {
242 return { tags: row.tags.map(function(t) { return t.toUpperCase(); }), count: row.tags.length };
243 }"#,
244 )
245 .unwrap();
246 let mut data = serde_json::json!({"tags": ["foo", "bar"]});
247 step.apply(&mut data).unwrap();
248 assert_eq!(data["tags"], serde_json::json!(["FOO", "BAR"]));
249 assert_eq!(data["count"], 2);
250 }
251}