use crate::error::FaucetError;
use serde::{Deserialize, Serialize};
use serde_json::{Map, Value};
use std::collections::HashMap;
#[derive(
Debug, Clone, Copy, Default, Serialize, Deserialize, schemars::JsonSchema, PartialEq, Eq,
)]
#[serde(rename_all = "snake_case")]
pub enum WriteMode {
#[default]
Append,
Upsert,
Delete,
}
impl WriteMode {
pub fn as_str(&self) -> &'static str {
match self {
WriteMode::Append => "append",
WriteMode::Upsert => "upsert",
WriteMode::Delete => "delete",
}
}
}
#[derive(Debug, Clone, Default, Serialize, Deserialize, schemars::JsonSchema, PartialEq, Eq)]
pub struct DeleteMarker {
pub field: String,
pub values: Vec<String>,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize, schemars::JsonSchema)]
pub struct WriteSpec {
#[serde(default)]
pub write_mode: WriteMode,
#[serde(default)]
pub key: Vec<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub delete_marker: Option<DeleteMarker>,
}
impl WriteSpec {
pub fn validate(&self) -> Result<(), FaucetError> {
if matches!(self.write_mode, WriteMode::Upsert | WriteMode::Delete) && self.key.is_empty() {
return Err(FaucetError::Config(format!(
"write_mode: {} requires a non-empty `key`",
self.write_mode.as_str()
)));
}
Ok(())
}
}
#[derive(Debug, Clone, PartialEq)]
pub struct KeyTuple(pub Vec<(String, Value)>);
#[derive(Debug, Default)]
pub struct WritePlan {
pub upserts: Vec<Value>,
pub deletes: Vec<KeyTuple>,
pub failed: Vec<(usize, String)>,
}
#[derive(Clone)]
enum Action {
Upsert(Value),
Delete(KeyTuple),
}
pub fn plan_writes(page: &[Value], spec: &WriteSpec) -> WritePlan {
debug_assert!(
spec.write_mode != WriteMode::Append,
"plan_writes called with WriteMode::Append — callers must route append separately"
);
let mut plan = WritePlan::default();
let mut index: HashMap<String, usize> = HashMap::new();
let mut order: Vec<Action> = Vec::new();
for (i, rec) in page.iter().enumerate() {
let key_tuple = match extract_key(rec, &spec.key) {
Ok(k) => k,
Err(msg) => {
plan.failed.push((i, msg));
continue;
}
};
let canon = canonical(&key_tuple);
let is_delete = match spec.write_mode {
WriteMode::Delete => true,
WriteMode::Upsert => is_delete_marked(rec, spec.delete_marker.as_ref()),
WriteMode::Append => false,
};
let action = if is_delete {
Action::Delete(key_tuple)
} else {
Action::Upsert(strip_marker(rec.clone(), spec.delete_marker.as_ref()))
};
match index.get(&canon) {
Some(&slot) => order[slot] = action,
None => {
index.insert(canon, order.len());
order.push(action);
}
}
}
for action in order {
match action {
Action::Upsert(v) => plan.upserts.push(v),
Action::Delete(k) => plan.deletes.push(k),
}
}
plan
}
fn extract_key(rec: &Value, key: &[String]) -> Result<KeyTuple, String> {
let obj = rec
.as_object()
.ok_or_else(|| "record is not a JSON object".to_string())?;
let mut out = Vec::with_capacity(key.len());
for col in key {
match obj.get(col) {
None => return Err(format!("missing key column '{col}'")),
Some(Value::Null) => return Err(format!("null value for key column '{col}'")),
Some(v) => out.push((col.clone(), v.clone())),
}
}
Ok(KeyTuple(out))
}
fn is_delete_marked(rec: &Value, marker: Option<&DeleteMarker>) -> bool {
let Some(dm) = marker else { return false };
let Some(v) = rec.get(&dm.field) else {
return false;
};
let Some(s) = v.as_str() else { return false };
dm.values.iter().any(|m| m == s)
}
fn strip_marker(mut rec: Value, marker: Option<&DeleteMarker>) -> Value {
if let (Some(dm), Value::Object(map)) = (marker, &mut rec) {
map.remove(&dm.field);
}
rec
}
fn canonical(k: &KeyTuple) -> String {
let arr: Vec<&Value> = k.0.iter().map(|(_, v)| v).collect();
serde_json::to_string(&arr).expect("a Vec<&serde_json::Value> always serializes")
}
pub fn key_to_doc_id(k: &KeyTuple, separator: &str) -> String {
k.0.iter()
.map(|(_, v)| match v {
Value::String(s) => s.clone(),
other => other.to_string(),
})
.collect::<Vec<_>>()
.join(separator)
}
pub fn key_to_filter(k: &KeyTuple) -> Map<String, Value> {
k.0.iter().map(|(c, v)| (c.clone(), v.clone())).collect()
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
fn upsert_spec(keys: &[&str]) -> WriteSpec {
WriteSpec {
write_mode: WriteMode::Upsert,
key: keys.iter().map(|s| s.to_string()).collect(),
delete_marker: None,
}
}
#[test]
fn upsert_extracts_key_and_keeps_row() {
let plan = plan_writes(&[json!({"id": 1, "name": "a"})], &upsert_spec(&["id"]));
assert_eq!(plan.upserts, vec![json!({"id": 1, "name": "a"})]);
assert!(plan.deletes.is_empty());
assert!(plan.failed.is_empty());
}
#[test]
fn missing_key_goes_to_failed_with_original_index() {
let plan = plan_writes(
&[json!({"id": 1}), json!({"name": "no-key"})],
&upsert_spec(&["id"]),
);
assert_eq!(plan.upserts.len(), 1);
assert_eq!(plan.failed.len(), 1);
assert_eq!(plan.failed[0].0, 1, "failed row keeps its page index");
}
#[test]
fn null_key_value_is_a_failure() {
let plan = plan_writes(&[json!({"id": null})], &upsert_spec(&["id"]));
assert!(plan.upserts.is_empty());
assert_eq!(plan.failed.len(), 1);
}
#[test]
fn delete_marker_routes_to_deletes_and_strips_marker() {
let spec = WriteSpec {
write_mode: WriteMode::Upsert,
key: vec!["id".into()],
delete_marker: Some(DeleteMarker {
field: "__op".into(),
values: vec!["d".into()],
}),
};
let plan = plan_writes(
&[
json!({"id": 1, "name": "a", "__op": "u"}),
json!({"id": 2, "__op": "d"}),
],
&spec,
);
assert_eq!(plan.upserts, vec![json!({"id": 1, "name": "a"})]);
assert_eq!(plan.deletes.len(), 1);
assert_eq!(plan.deletes[0].0, vec![("id".to_string(), json!(2))]);
}
#[test]
fn last_write_wins_dedup_keeps_final_upsert() {
let plan = plan_writes(
&[json!({"id": 1, "v": "old"}), json!({"id": 1, "v": "new"})],
&upsert_spec(&["id"]),
);
assert_eq!(plan.upserts, vec![json!({"id": 1, "v": "new"})]);
}
#[test]
fn last_write_wins_delete_after_upsert_is_a_delete() {
let spec = WriteSpec {
write_mode: WriteMode::Upsert,
key: vec!["id".into()],
delete_marker: Some(DeleteMarker {
field: "__op".into(),
values: vec!["d".into()],
}),
};
let plan = plan_writes(
&[json!({"id": 1, "__op": "u"}), json!({"id": 1, "__op": "d"})],
&spec,
);
assert!(plan.upserts.is_empty());
assert_eq!(plan.deletes.len(), 1);
}
#[test]
fn delete_mode_routes_every_row_to_deletes() {
let spec = WriteSpec {
write_mode: WriteMode::Delete,
key: vec!["id".into()],
delete_marker: None,
};
let plan = plan_writes(&[json!({"id": 1}), json!({"id": 2})], &spec);
assert!(plan.upserts.is_empty());
assert_eq!(plan.deletes.len(), 2);
}
#[test]
fn composite_key_tuple_is_ordered() {
let plan = plan_writes(
&[json!({"a": 1, "b": 2, "v": 9})],
&upsert_spec(&["a", "b"]),
);
assert_eq!(plan.upserts.len(), 1);
let plan2 = plan_writes(
&[
json!({"a": 1, "b": 2, "v": "x"}),
json!({"a": 1, "b": 3, "v": "y"}),
],
&upsert_spec(&["a", "b"]),
);
assert_eq!(plan2.upserts.len(), 2, "(1,2) and (1,3) are distinct keys");
}
#[test]
fn validate_rejects_upsert_without_key() {
let spec = WriteSpec {
write_mode: WriteMode::Upsert,
key: vec![],
delete_marker: None,
};
assert!(spec.validate().is_err());
}
#[test]
fn validate_allows_append_without_key() {
assert!(WriteSpec::default().validate().is_ok());
}
#[test]
fn last_write_wins_upsert_after_delete_is_an_upsert() {
let spec = WriteSpec {
write_mode: WriteMode::Upsert,
key: vec!["id".into()],
delete_marker: Some(DeleteMarker {
field: "__op".into(),
values: vec!["d".into()],
}),
};
let plan = plan_writes(
&[
json!({"id": 1, "__op": "d"}),
json!({"id": 1, "v": 9, "__op": "u"}),
],
&spec,
);
assert!(plan.deletes.is_empty());
assert_eq!(plan.upserts, vec![json!({"id": 1, "v": 9})]);
}
#[test]
fn empty_page_produces_empty_plan() {
let plan = plan_writes(&[], &upsert_spec(&["id"]));
assert!(plan.upserts.is_empty());
assert!(plan.deletes.is_empty());
assert!(plan.failed.is_empty());
}
#[test]
fn delete_mode_dedups_repeated_key() {
let spec = WriteSpec {
write_mode: WriteMode::Delete,
key: vec!["id".into()],
delete_marker: None,
};
let plan = plan_writes(&[json!({"id": 1}), json!({"id": 1})], &spec);
assert_eq!(plan.deletes.len(), 1);
}
}