Skip to main content

faucet_core/
write_mode.rs

1//! Unified write-mode types + planner shared by every upsert-capable sink.
2
3use crate::error::FaucetError;
4use serde::{Deserialize, Serialize};
5use serde_json::{Map, Value};
6use std::collections::HashMap;
7
8/// Write semantics for a sink. Serialized snake_case. Default `Append`.
9#[derive(
10    Debug, Clone, Copy, Default, Serialize, Deserialize, schemars::JsonSchema, PartialEq, Eq,
11)]
12#[serde(rename_all = "snake_case")]
13pub enum WriteMode {
14    /// Insert every record (today's behaviour).
15    #[default]
16    Append,
17    /// Insert-or-update by `key`; optionally route delete-marked rows to deletes.
18    Upsert,
19    /// Delete by `key` for every record.
20    Delete,
21}
22
23impl WriteMode {
24    /// Lowercase wire name, for error messages.
25    pub fn as_str(&self) -> &'static str {
26        match self {
27            WriteMode::Append => "append",
28            WriteMode::Upsert => "upsert",
29            WriteMode::Delete => "delete",
30        }
31    }
32}
33
34/// Identifies a record as a delete (vs. an upsert) by a marker field's value.
35/// e.g. `{ field: "__op", values: ["d", "delete"] }`.
36#[derive(Debug, Clone, Default, Serialize, Deserialize, schemars::JsonSchema, PartialEq, Eq)]
37pub struct DeleteMarker {
38    /// Field name whose value flags a delete.
39    pub field: String,
40    /// Values of `field` that mean "this row is a delete".
41    pub values: Vec<String>,
42}
43
44/// Shared write-mode config, embedded in each upsert-capable sink config via
45/// `#[serde(flatten)]` so `write_mode` / `key` / `delete_marker` appear at the
46/// sink-config top level.
47#[derive(Debug, Clone, Default, Serialize, Deserialize, schemars::JsonSchema)]
48pub struct WriteSpec {
49    /// Append (default), upsert, or delete.
50    #[serde(default)]
51    pub write_mode: WriteMode,
52    /// Key columns. Required and non-empty for upsert/delete; ignored for append.
53    #[serde(default)]
54    pub key: Vec<String>,
55    /// Optional. Upsert only: rows whose `field` matches one of `values` are
56    /// deletes; all others are upserts. The marker field is stripped from
57    /// upsert rows before writing.
58    #[serde(default, skip_serializing_if = "Option::is_none")]
59    pub delete_marker: Option<DeleteMarker>,
60}
61
62impl WriteSpec {
63    /// Validate internal consistency at config-load time.
64    pub fn validate(&self) -> Result<(), FaucetError> {
65        if matches!(self.write_mode, WriteMode::Upsert | WriteMode::Delete) && self.key.is_empty() {
66            return Err(FaucetError::Config(format!(
67                "write_mode: {} requires a non-empty `key`",
68                self.write_mode.as_str()
69            )));
70        }
71        Ok(())
72    }
73}
74
75/// Ordered key column → value pairs, in `key` declaration order.
76#[derive(Debug, Clone, PartialEq)]
77pub struct KeyTuple(pub Vec<(String, Value)>);
78
79/// The partition of a page by write mode. Infallible to build — per-row
80/// failures (missing/null key) land in `failed` with their original page index
81/// so the caller can route them to a DLQ or abort.
82#[derive(Debug, Default)]
83pub struct WritePlan {
84    /// Rows to insert-or-update, deduped (last-write-wins), marker stripped.
85    pub upserts: Vec<Value>,
86    /// Key tuples to delete, deduped.
87    pub deletes: Vec<KeyTuple>,
88    /// `(page_index, message)` for rows whose key could not be extracted.
89    pub failed: Vec<(usize, String)>,
90}
91
92#[derive(Clone)]
93enum Action {
94    Upsert(Value),
95    Delete(KeyTuple),
96}
97
98/// Partition `page` into upserts + deletes per `spec`. The single place all six
99/// sinks share. `WriteMode::Append` should never reach here (callers route
100/// append separately); if it does, every row is treated as an upsert.
101pub fn plan_writes(page: &[Value], spec: &WriteSpec) -> WritePlan {
102    debug_assert!(
103        spec.write_mode != WriteMode::Append,
104        "plan_writes called with WriteMode::Append — callers must route append separately"
105    );
106    let mut plan = WritePlan::default();
107    let mut index: HashMap<String, usize> = HashMap::new();
108    let mut order: Vec<Action> = Vec::new();
109
110    for (i, rec) in page.iter().enumerate() {
111        let key_tuple = match extract_key(rec, &spec.key) {
112            Ok(k) => k,
113            Err(msg) => {
114                plan.failed.push((i, msg));
115                continue;
116            }
117        };
118        let canon = canonical(&key_tuple);
119
120        let is_delete = match spec.write_mode {
121            WriteMode::Delete => true,
122            WriteMode::Upsert => is_delete_marked(rec, spec.delete_marker.as_ref()),
123            WriteMode::Append => false,
124        };
125
126        let action = if is_delete {
127            Action::Delete(key_tuple)
128        } else {
129            Action::Upsert(strip_marker(rec.clone(), spec.delete_marker.as_ref()))
130        };
131
132        match index.get(&canon) {
133            Some(&slot) => order[slot] = action,
134            None => {
135                index.insert(canon, order.len());
136                order.push(action);
137            }
138        }
139    }
140
141    for action in order {
142        match action {
143            Action::Upsert(v) => plan.upserts.push(v),
144            Action::Delete(k) => plan.deletes.push(k),
145        }
146    }
147    plan
148}
149
150/// Pull the key columns out of a record in `key` order. Missing key or null
151/// key value is an error.
152fn extract_key(rec: &Value, key: &[String]) -> Result<KeyTuple, String> {
153    let obj = rec
154        .as_object()
155        .ok_or_else(|| "record is not a JSON object".to_string())?;
156    let mut out = Vec::with_capacity(key.len());
157    for col in key {
158        match obj.get(col) {
159            None => return Err(format!("missing key column '{col}'")),
160            Some(Value::Null) => return Err(format!("null value for key column '{col}'")),
161            Some(v) => out.push((col.clone(), v.clone())),
162        }
163    }
164    Ok(KeyTuple(out))
165}
166
167fn is_delete_marked(rec: &Value, marker: Option<&DeleteMarker>) -> bool {
168    let Some(dm) = marker else { return false };
169    let Some(v) = rec.get(&dm.field) else {
170        return false;
171    };
172    let Some(s) = v.as_str() else { return false };
173    dm.values.iter().any(|m| m == s)
174}
175
176fn strip_marker(mut rec: Value, marker: Option<&DeleteMarker>) -> Value {
177    if let (Some(dm), Value::Object(map)) = (marker, &mut rec) {
178        map.remove(&dm.field);
179    }
180    rec
181}
182
183/// Stable canonical string for a key tuple, for dedup.
184fn canonical(k: &KeyTuple) -> String {
185    let arr: Vec<&Value> = k.0.iter().map(|(_, v)| v).collect();
186    serde_json::to_string(&arr).expect("a Vec<&serde_json::Value> always serializes")
187}
188
189/// Join a key tuple's values into a single document id (Elasticsearch `_id`,
190/// composite keys). Each value rendered as its plain string / JSON form.
191///
192/// Assumes each key column has a consistent JSON type across records (the
193/// normal case for SQL and CDC sources); it does not disambiguate, e.g., the
194/// integer `7` from the string `"7"` in the same column.
195pub fn key_to_doc_id(k: &KeyTuple, separator: &str) -> String {
196    k.0.iter()
197        .map(|(_, v)| match v {
198            Value::String(s) => s.clone(),
199            other => other.to_string(),
200        })
201        .collect::<Vec<_>>()
202        .join(separator)
203}
204
205/// Build a Mongo/ES filter document `{ col: value, … }` from a key tuple.
206pub fn key_to_filter(k: &KeyTuple) -> Map<String, Value> {
207    k.0.iter().map(|(c, v)| (c.clone(), v.clone())).collect()
208}
209
210#[cfg(test)]
211mod tests {
212    use super::*;
213    use serde_json::json;
214
215    fn upsert_spec(keys: &[&str]) -> WriteSpec {
216        WriteSpec {
217            write_mode: WriteMode::Upsert,
218            key: keys.iter().map(|s| s.to_string()).collect(),
219            delete_marker: None,
220        }
221    }
222
223    #[test]
224    fn upsert_extracts_key_and_keeps_row() {
225        let plan = plan_writes(&[json!({"id": 1, "name": "a"})], &upsert_spec(&["id"]));
226        assert_eq!(plan.upserts, vec![json!({"id": 1, "name": "a"})]);
227        assert!(plan.deletes.is_empty());
228        assert!(plan.failed.is_empty());
229    }
230
231    #[test]
232    fn missing_key_goes_to_failed_with_original_index() {
233        let plan = plan_writes(
234            &[json!({"id": 1}), json!({"name": "no-key"})],
235            &upsert_spec(&["id"]),
236        );
237        assert_eq!(plan.upserts.len(), 1);
238        assert_eq!(plan.failed.len(), 1);
239        assert_eq!(plan.failed[0].0, 1, "failed row keeps its page index");
240    }
241
242    #[test]
243    fn null_key_value_is_a_failure() {
244        let plan = plan_writes(&[json!({"id": null})], &upsert_spec(&["id"]));
245        assert!(plan.upserts.is_empty());
246        assert_eq!(plan.failed.len(), 1);
247    }
248
249    #[test]
250    fn delete_marker_routes_to_deletes_and_strips_marker() {
251        let spec = WriteSpec {
252            write_mode: WriteMode::Upsert,
253            key: vec!["id".into()],
254            delete_marker: Some(DeleteMarker {
255                field: "__op".into(),
256                values: vec!["d".into()],
257            }),
258        };
259        let plan = plan_writes(
260            &[
261                json!({"id": 1, "name": "a", "__op": "u"}),
262                json!({"id": 2, "__op": "d"}),
263            ],
264            &spec,
265        );
266        assert_eq!(plan.upserts, vec![json!({"id": 1, "name": "a"})]);
267        assert_eq!(plan.deletes.len(), 1);
268        assert_eq!(plan.deletes[0].0, vec![("id".to_string(), json!(2))]);
269    }
270
271    #[test]
272    fn last_write_wins_dedup_keeps_final_upsert() {
273        let plan = plan_writes(
274            &[json!({"id": 1, "v": "old"}), json!({"id": 1, "v": "new"})],
275            &upsert_spec(&["id"]),
276        );
277        assert_eq!(plan.upserts, vec![json!({"id": 1, "v": "new"})]);
278    }
279
280    #[test]
281    fn last_write_wins_delete_after_upsert_is_a_delete() {
282        let spec = WriteSpec {
283            write_mode: WriteMode::Upsert,
284            key: vec!["id".into()],
285            delete_marker: Some(DeleteMarker {
286                field: "__op".into(),
287                values: vec!["d".into()],
288            }),
289        };
290        let plan = plan_writes(
291            &[json!({"id": 1, "__op": "u"}), json!({"id": 1, "__op": "d"})],
292            &spec,
293        );
294        assert!(plan.upserts.is_empty());
295        assert_eq!(plan.deletes.len(), 1);
296    }
297
298    #[test]
299    fn delete_mode_routes_every_row_to_deletes() {
300        let spec = WriteSpec {
301            write_mode: WriteMode::Delete,
302            key: vec!["id".into()],
303            delete_marker: None,
304        };
305        let plan = plan_writes(&[json!({"id": 1}), json!({"id": 2})], &spec);
306        assert!(plan.upserts.is_empty());
307        assert_eq!(plan.deletes.len(), 2);
308    }
309
310    #[test]
311    fn composite_key_tuple_is_ordered() {
312        let plan = plan_writes(
313            &[json!({"a": 1, "b": 2, "v": 9})],
314            &upsert_spec(&["a", "b"]),
315        );
316        assert_eq!(plan.upserts.len(), 1);
317        let plan2 = plan_writes(
318            &[
319                json!({"a": 1, "b": 2, "v": "x"}),
320                json!({"a": 1, "b": 3, "v": "y"}),
321            ],
322            &upsert_spec(&["a", "b"]),
323        );
324        assert_eq!(plan2.upserts.len(), 2, "(1,2) and (1,3) are distinct keys");
325    }
326
327    #[test]
328    fn validate_rejects_upsert_without_key() {
329        let spec = WriteSpec {
330            write_mode: WriteMode::Upsert,
331            key: vec![],
332            delete_marker: None,
333        };
334        assert!(spec.validate().is_err());
335    }
336
337    #[test]
338    fn validate_allows_append_without_key() {
339        assert!(WriteSpec::default().validate().is_ok());
340    }
341
342    #[test]
343    fn last_write_wins_upsert_after_delete_is_an_upsert() {
344        // Inverse of the delete-after-upsert case: [delete, upsert] → upsert wins.
345        let spec = WriteSpec {
346            write_mode: WriteMode::Upsert,
347            key: vec!["id".into()],
348            delete_marker: Some(DeleteMarker {
349                field: "__op".into(),
350                values: vec!["d".into()],
351            }),
352        };
353        let plan = plan_writes(
354            &[
355                json!({"id": 1, "__op": "d"}),
356                json!({"id": 1, "v": 9, "__op": "u"}),
357            ],
358            &spec,
359        );
360        assert!(plan.deletes.is_empty());
361        assert_eq!(plan.upserts, vec![json!({"id": 1, "v": 9})]);
362    }
363
364    #[test]
365    fn empty_page_produces_empty_plan() {
366        let plan = plan_writes(&[], &upsert_spec(&["id"]));
367        assert!(plan.upserts.is_empty());
368        assert!(plan.deletes.is_empty());
369        assert!(plan.failed.is_empty());
370    }
371
372    #[test]
373    fn delete_mode_dedups_repeated_key() {
374        // Same key deleted twice in one page collapses to a single delete.
375        let spec = WriteSpec {
376            write_mode: WriteMode::Delete,
377            key: vec!["id".into()],
378            delete_marker: None,
379        };
380        let plan = plan_writes(&[json!({"id": 1}), json!({"id": 1})], &spec);
381        assert_eq!(plan.deletes.len(), 1);
382    }
383}