Skip to main content

oversync_transforms/
js.rs

1use oversync_core::error::OversyncError;
2
3use crate::TransformStep;
4
5/// A JavaScript transform step powered by QuickJS (via rquickjs).
6///
7/// The user provides a JS function body that receives a row object and returns
8/// a transformed object (or `null`/`undefined` to filter the record out).
9///
10/// ```json
11/// {
12///   "type": "js",
13///   "function": "function transform(row) { return { ...row, total: row.price * row.qty } }"
14/// }
15/// ```
16///
17/// The function is compiled once at parse time. Each `apply()` call serializes
18/// the row to JS, invokes the function, and deserializes the result back.
19pub struct JsStep {
20	name: String,
21	ctx: rquickjs::Context,
22}
23
24impl std::fmt::Debug for JsStep {
25	fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
26		f.debug_struct("JsStep").field("name", &self.name).finish()
27	}
28}
29
30impl JsStep {
31	/// Create a new JS transform from a function source string.
32	///
33	/// The source must define a global `transform` function that accepts one argument.
34	pub fn new(name: &str, function_source: &str) -> Result<Self, OversyncError> {
35		let rt = rquickjs::Runtime::new()
36			.map_err(|e| OversyncError::Plugin(format!("js runtime init '{name}': {e}")))?;
37
38		// Memory limit: 32MB to prevent runaway scripts
39		rt.set_memory_limit(32 * 1024 * 1024);
40		// Max stack size: 1MB
41		rt.set_max_stack_size(1024 * 1024);
42
43		let ctx = rquickjs::Context::full(&rt)
44			.map_err(|e| OversyncError::Plugin(format!("js context init '{name}': {e}")))?;
45
46		// Compile the user's function
47		ctx.with(|ctx| {
48			ctx.eval::<(), _>(function_source)
49				.map_err(|e| OversyncError::Config(format!("js compile '{name}': {e}")))?;
50
51			// Verify 'transform' function exists
52			let globals = ctx.globals();
53			let has_transform: bool = globals
54				.get::<_, rquickjs::Value>("transform")
55				.map(|v| v.is_function())
56				.unwrap_or(false);
57
58			if !has_transform {
59				return Err(OversyncError::Config(format!(
60					"js '{name}': source must define a 'transform' function"
61				)));
62			}
63
64			Ok(())
65		})?;
66
67		Ok(Self {
68			name: name.to_string(),
69			ctx,
70		})
71	}
72}
73
74impl TransformStep for JsStep {
75	fn apply(&self, data: &mut serde_json::Value) -> Result<bool, OversyncError> {
76		self.ctx.with(|ctx| {
77			// Serialize row to JS value
78			let js_input = rquickjs_serde::to_value(ctx.clone(), &*data)
79				.map_err(|e| OversyncError::Plugin(format!("js serialize: {e}")))?;
80
81			// Get transform function from globals
82			let transform: rquickjs::Function = ctx
83				.globals()
84				.get("transform")
85				.map_err(|e| OversyncError::Plugin(format!("js get transform: {e}")))?;
86
87			// Call transform(row)
88			let result: rquickjs::Value = transform
89				.call((js_input,))
90				.map_err(|e| OversyncError::Plugin(format!("js transform: {e}")))?;
91
92			// null/undefined → filter out
93			if result.is_null() || result.is_undefined() {
94				return Ok(false);
95			}
96
97			// Deserialize result back to serde_json::Value
98			let output: serde_json::Value = rquickjs_serde::from_value(result)
99				.map_err(|e| OversyncError::Plugin(format!("js deserialize: {e}")))?;
100
101			*data = output;
102			Ok(true)
103		})
104	}
105
106	fn step_name(&self) -> &str {
107		&self.name
108	}
109}
110
111#[cfg(test)]
112mod tests {
113	use super::*;
114
115	#[test]
116	fn passthrough() {
117		let step = JsStep::new("test", "function transform(row) { return row; }").unwrap();
118		let mut data = serde_json::json!({"name": "alice", "age": 30});
119		assert!(step.apply(&mut data).unwrap());
120		assert_eq!(data, serde_json::json!({"name": "alice", "age": 30}));
121	}
122
123	#[test]
124	fn add_computed_field() {
125		let step = JsStep::new(
126			"test",
127			"function transform(row) { return { ...row, total: row.price * row.qty } }",
128		)
129		.unwrap();
130		let mut data = serde_json::json!({"price": 10.5, "qty": 3});
131		assert!(step.apply(&mut data).unwrap());
132		assert_eq!(data["total"], 31.5);
133		assert_eq!(data["price"], 10.5);
134	}
135
136	#[test]
137	fn filter_by_returning_null() {
138		let step = JsStep::new(
139			"test",
140			"function transform(row) { return row.active ? row : null; }",
141		)
142		.unwrap();
143
144		let mut keep = serde_json::json!({"active": true, "name": "alice"});
145		assert!(step.apply(&mut keep).unwrap());
146
147		let mut drop = serde_json::json!({"active": false, "name": "bob"});
148		assert!(!step.apply(&mut drop).unwrap());
149	}
150
151	#[test]
152	fn reshape_jsonb() {
153		let step = JsStep::new(
154			"test",
155			r#"function transform(row) {
156				var meta = typeof row.metadata === 'string' ? JSON.parse(row.metadata) : row.metadata;
157				return {
158					id: row.id,
159					email: meta.contact.email,
160					tier: meta.subscription.tier,
161					active: meta.subscription.active
162				};
163			}"#,
164		)
165		.unwrap();
166
167		let mut data = serde_json::json!({
168			"id": "u1",
169			"metadata": {
170				"contact": {"email": "a@b.com", "phone": "555"},
171				"subscription": {"tier": "pro", "active": true}
172			}
173		});
174		assert!(step.apply(&mut data).unwrap());
175		assert_eq!(
176			data,
177			serde_json::json!({
178				"id": "u1",
179				"email": "a@b.com",
180				"tier": "pro",
181				"active": true
182			})
183		);
184	}
185
186	#[test]
187	fn multiple_calls_reuse_context() {
188		let step = JsStep::new(
189			"test",
190			"var count = 0; function transform(row) { count++; row.seq = count; return row; }",
191		)
192		.unwrap();
193
194		let mut d1 = serde_json::json!({"x": 1});
195		step.apply(&mut d1).unwrap();
196		assert_eq!(d1["seq"], 1);
197
198		let mut d2 = serde_json::json!({"x": 2});
199		step.apply(&mut d2).unwrap();
200		assert_eq!(d2["seq"], 2);
201	}
202
203	#[test]
204	fn missing_transform_function_errors() {
205		let err = JsStep::new("test", "function foo(row) { return row; }").unwrap_err();
206		assert!(
207			err.to_string()
208				.contains("must define a 'transform' function")
209		);
210	}
211
212	#[test]
213	fn syntax_error_in_source() {
214		let err = JsStep::new("test", "function transform(row { return row; }").unwrap_err();
215		assert!(err.to_string().contains("js compile"));
216	}
217
218	#[test]
219	fn runtime_error_propagates() {
220		let step = JsStep::new(
221			"test",
222			"function transform(row) { throw new Error('boom'); }",
223		)
224		.unwrap();
225		let mut data = serde_json::json!({});
226		let err = step.apply(&mut data).unwrap_err();
227		assert!(err.to_string().contains("js transform"), "got: {err}");
228	}
229
230	#[test]
231	fn return_undefined_filters() {
232		let step = JsStep::new("test", "function transform(row) { }").unwrap();
233		let mut data = serde_json::json!({"x": 1});
234		assert!(!step.apply(&mut data).unwrap());
235	}
236
237	#[test]
238	fn array_and_nested_objects() {
239		let step = JsStep::new(
240			"test",
241			r#"function transform(row) {
242				return { tags: row.tags.map(function(t) { return t.toUpperCase(); }), count: row.tags.length };
243			}"#,
244		)
245		.unwrap();
246		let mut data = serde_json::json!({"tags": ["foo", "bar"]});
247		step.apply(&mut data).unwrap();
248		assert_eq!(data["tags"], serde_json::json!(["FOO", "BAR"]));
249		assert_eq!(data["count"], 2);
250	}
251}