1use crate::error::FaucetError;
4use serde::{Deserialize, Serialize};
5use serde_json::{Map, Value};
6use std::collections::HashMap;
7
8#[derive(
10 Debug, Clone, Copy, Default, Serialize, Deserialize, schemars::JsonSchema, PartialEq, Eq,
11)]
12#[serde(rename_all = "snake_case")]
13pub enum WriteMode {
14 #[default]
16 Append,
17 Upsert,
19 Delete,
21}
22
23impl WriteMode {
24 pub fn as_str(&self) -> &'static str {
26 match self {
27 WriteMode::Append => "append",
28 WriteMode::Upsert => "upsert",
29 WriteMode::Delete => "delete",
30 }
31 }
32}
33
34#[derive(Debug, Clone, Default, Serialize, Deserialize, schemars::JsonSchema, PartialEq, Eq)]
37pub struct DeleteMarker {
38 pub field: String,
40 pub values: Vec<String>,
42}
43
44#[derive(Debug, Clone, Default, Serialize, Deserialize, schemars::JsonSchema)]
48pub struct WriteSpec {
49 #[serde(default)]
51 pub write_mode: WriteMode,
52 #[serde(default)]
54 pub key: Vec<String>,
55 #[serde(default, skip_serializing_if = "Option::is_none")]
59 pub delete_marker: Option<DeleteMarker>,
60}
61
62impl WriteSpec {
63 pub fn validate(&self) -> Result<(), FaucetError> {
65 if matches!(self.write_mode, WriteMode::Upsert | WriteMode::Delete) && self.key.is_empty() {
66 return Err(FaucetError::Config(format!(
67 "write_mode: {} requires a non-empty `key`",
68 self.write_mode.as_str()
69 )));
70 }
71 Ok(())
72 }
73}
74
75#[derive(Debug, Clone, PartialEq)]
77pub struct KeyTuple(pub Vec<(String, Value)>);
78
79#[derive(Debug, Default)]
83pub struct WritePlan {
84 pub upserts: Vec<Value>,
86 pub deletes: Vec<KeyTuple>,
88 pub failed: Vec<(usize, String)>,
90}
91
92#[derive(Clone)]
93enum Action {
94 Upsert(Value),
95 Delete(KeyTuple),
96}
97
98pub fn plan_writes(page: &[Value], spec: &WriteSpec) -> WritePlan {
102 debug_assert!(
103 spec.write_mode != WriteMode::Append,
104 "plan_writes called with WriteMode::Append — callers must route append separately"
105 );
106 let mut plan = WritePlan::default();
107 let mut index: HashMap<String, usize> = HashMap::new();
108 let mut order: Vec<Action> = Vec::new();
109
110 for (i, rec) in page.iter().enumerate() {
111 let key_tuple = match extract_key(rec, &spec.key) {
112 Ok(k) => k,
113 Err(msg) => {
114 plan.failed.push((i, msg));
115 continue;
116 }
117 };
118 let canon = canonical(&key_tuple);
119
120 let is_delete = match spec.write_mode {
121 WriteMode::Delete => true,
122 WriteMode::Upsert => is_delete_marked(rec, spec.delete_marker.as_ref()),
123 WriteMode::Append => false,
124 };
125
126 let action = if is_delete {
127 Action::Delete(key_tuple)
128 } else {
129 Action::Upsert(strip_marker(rec.clone(), spec.delete_marker.as_ref()))
130 };
131
132 match index.get(&canon) {
133 Some(&slot) => order[slot] = action,
134 None => {
135 index.insert(canon, order.len());
136 order.push(action);
137 }
138 }
139 }
140
141 for action in order {
142 match action {
143 Action::Upsert(v) => plan.upserts.push(v),
144 Action::Delete(k) => plan.deletes.push(k),
145 }
146 }
147 plan
148}
149
150fn extract_key(rec: &Value, key: &[String]) -> Result<KeyTuple, String> {
153 let obj = rec
154 .as_object()
155 .ok_or_else(|| "record is not a JSON object".to_string())?;
156 let mut out = Vec::with_capacity(key.len());
157 for col in key {
158 match obj.get(col) {
159 None => return Err(format!("missing key column '{col}'")),
160 Some(Value::Null) => return Err(format!("null value for key column '{col}'")),
161 Some(v) => out.push((col.clone(), v.clone())),
162 }
163 }
164 Ok(KeyTuple(out))
165}
166
167fn is_delete_marked(rec: &Value, marker: Option<&DeleteMarker>) -> bool {
168 let Some(dm) = marker else { return false };
169 let Some(v) = rec.get(&dm.field) else {
170 return false;
171 };
172 let Some(s) = v.as_str() else { return false };
173 dm.values.iter().any(|m| m == s)
174}
175
176fn strip_marker(mut rec: Value, marker: Option<&DeleteMarker>) -> Value {
177 if let (Some(dm), Value::Object(map)) = (marker, &mut rec) {
178 map.remove(&dm.field);
179 }
180 rec
181}
182
183fn canonical(k: &KeyTuple) -> String {
185 let arr: Vec<&Value> = k.0.iter().map(|(_, v)| v).collect();
186 serde_json::to_string(&arr).expect("a Vec<&serde_json::Value> always serializes")
187}
188
189pub fn key_to_doc_id(k: &KeyTuple, separator: &str) -> String {
196 k.0.iter()
197 .map(|(_, v)| match v {
198 Value::String(s) => s.clone(),
199 other => other.to_string(),
200 })
201 .collect::<Vec<_>>()
202 .join(separator)
203}
204
205pub fn key_to_filter(k: &KeyTuple) -> Map<String, Value> {
207 k.0.iter().map(|(c, v)| (c.clone(), v.clone())).collect()
208}
209
210#[cfg(test)]
211mod tests {
212 use super::*;
213 use serde_json::json;
214
215 fn upsert_spec(keys: &[&str]) -> WriteSpec {
216 WriteSpec {
217 write_mode: WriteMode::Upsert,
218 key: keys.iter().map(|s| s.to_string()).collect(),
219 delete_marker: None,
220 }
221 }
222
223 #[test]
224 fn upsert_extracts_key_and_keeps_row() {
225 let plan = plan_writes(&[json!({"id": 1, "name": "a"})], &upsert_spec(&["id"]));
226 assert_eq!(plan.upserts, vec![json!({"id": 1, "name": "a"})]);
227 assert!(plan.deletes.is_empty());
228 assert!(plan.failed.is_empty());
229 }
230
231 #[test]
232 fn missing_key_goes_to_failed_with_original_index() {
233 let plan = plan_writes(
234 &[json!({"id": 1}), json!({"name": "no-key"})],
235 &upsert_spec(&["id"]),
236 );
237 assert_eq!(plan.upserts.len(), 1);
238 assert_eq!(plan.failed.len(), 1);
239 assert_eq!(plan.failed[0].0, 1, "failed row keeps its page index");
240 }
241
242 #[test]
243 fn null_key_value_is_a_failure() {
244 let plan = plan_writes(&[json!({"id": null})], &upsert_spec(&["id"]));
245 assert!(plan.upserts.is_empty());
246 assert_eq!(plan.failed.len(), 1);
247 }
248
249 #[test]
250 fn delete_marker_routes_to_deletes_and_strips_marker() {
251 let spec = WriteSpec {
252 write_mode: WriteMode::Upsert,
253 key: vec!["id".into()],
254 delete_marker: Some(DeleteMarker {
255 field: "__op".into(),
256 values: vec!["d".into()],
257 }),
258 };
259 let plan = plan_writes(
260 &[
261 json!({"id": 1, "name": "a", "__op": "u"}),
262 json!({"id": 2, "__op": "d"}),
263 ],
264 &spec,
265 );
266 assert_eq!(plan.upserts, vec![json!({"id": 1, "name": "a"})]);
267 assert_eq!(plan.deletes.len(), 1);
268 assert_eq!(plan.deletes[0].0, vec![("id".to_string(), json!(2))]);
269 }
270
271 #[test]
272 fn last_write_wins_dedup_keeps_final_upsert() {
273 let plan = plan_writes(
274 &[json!({"id": 1, "v": "old"}), json!({"id": 1, "v": "new"})],
275 &upsert_spec(&["id"]),
276 );
277 assert_eq!(plan.upserts, vec![json!({"id": 1, "v": "new"})]);
278 }
279
280 #[test]
281 fn last_write_wins_delete_after_upsert_is_a_delete() {
282 let spec = WriteSpec {
283 write_mode: WriteMode::Upsert,
284 key: vec!["id".into()],
285 delete_marker: Some(DeleteMarker {
286 field: "__op".into(),
287 values: vec!["d".into()],
288 }),
289 };
290 let plan = plan_writes(
291 &[json!({"id": 1, "__op": "u"}), json!({"id": 1, "__op": "d"})],
292 &spec,
293 );
294 assert!(plan.upserts.is_empty());
295 assert_eq!(plan.deletes.len(), 1);
296 }
297
298 #[test]
299 fn delete_mode_routes_every_row_to_deletes() {
300 let spec = WriteSpec {
301 write_mode: WriteMode::Delete,
302 key: vec!["id".into()],
303 delete_marker: None,
304 };
305 let plan = plan_writes(&[json!({"id": 1}), json!({"id": 2})], &spec);
306 assert!(plan.upserts.is_empty());
307 assert_eq!(plan.deletes.len(), 2);
308 }
309
310 #[test]
311 fn composite_key_tuple_is_ordered() {
312 let plan = plan_writes(
313 &[json!({"a": 1, "b": 2, "v": 9})],
314 &upsert_spec(&["a", "b"]),
315 );
316 assert_eq!(plan.upserts.len(), 1);
317 let plan2 = plan_writes(
318 &[
319 json!({"a": 1, "b": 2, "v": "x"}),
320 json!({"a": 1, "b": 3, "v": "y"}),
321 ],
322 &upsert_spec(&["a", "b"]),
323 );
324 assert_eq!(plan2.upserts.len(), 2, "(1,2) and (1,3) are distinct keys");
325 }
326
327 #[test]
328 fn validate_rejects_upsert_without_key() {
329 let spec = WriteSpec {
330 write_mode: WriteMode::Upsert,
331 key: vec![],
332 delete_marker: None,
333 };
334 assert!(spec.validate().is_err());
335 }
336
337 #[test]
338 fn validate_allows_append_without_key() {
339 assert!(WriteSpec::default().validate().is_ok());
340 }
341
342 #[test]
343 fn last_write_wins_upsert_after_delete_is_an_upsert() {
344 let spec = WriteSpec {
346 write_mode: WriteMode::Upsert,
347 key: vec!["id".into()],
348 delete_marker: Some(DeleteMarker {
349 field: "__op".into(),
350 values: vec!["d".into()],
351 }),
352 };
353 let plan = plan_writes(
354 &[
355 json!({"id": 1, "__op": "d"}),
356 json!({"id": 1, "v": 9, "__op": "u"}),
357 ],
358 &spec,
359 );
360 assert!(plan.deletes.is_empty());
361 assert_eq!(plan.upserts, vec![json!({"id": 1, "v": 9})]);
362 }
363
364 #[test]
365 fn empty_page_produces_empty_plan() {
366 let plan = plan_writes(&[], &upsert_spec(&["id"]));
367 assert!(plan.upserts.is_empty());
368 assert!(plan.deletes.is_empty());
369 assert!(plan.failed.is_empty());
370 }
371
372 #[test]
373 fn delete_mode_dedups_repeated_key() {
374 let spec = WriteSpec {
376 write_mode: WriteMode::Delete,
377 key: vec!["id".into()],
378 delete_marker: None,
379 };
380 let plan = plan_writes(&[json!({"id": 1}), json!({"id": 1})], &spec);
381 assert_eq!(plan.deletes.len(), 1);
382 }
383}