use crate::key::KeySet;
use crate::model::batch_write_request::MutationGroup as ProtoMutationGroup;
use crate::model::mutation::Operation;
use crate::to_value::ToValue;
use crate::value::Value;
use rand::seq::IteratorRandom;
use std::slice::Iter;
use std::vec::IntoIter;
#[derive(Clone, Debug, PartialEq)]
pub struct Mutation {
pub(crate) inner: InternalMutation,
}
#[derive(Clone, Debug, PartialEq)]
pub(crate) enum InternalMutation {
Insert(Write),
Update(Write),
InsertOrUpdate(Write),
Replace(Write),
Delete(Delete),
}
#[derive(Clone, Debug, PartialEq)]
pub(crate) struct Write {
pub(crate) table: String,
pub(crate) columns: Vec<String>,
pub(crate) values: Vec<Value>,
}
#[derive(Clone, Debug, PartialEq)]
pub(crate) struct Delete {
pub(crate) table: String,
pub(crate) key_set: KeySet,
}
impl Mutation {
pub fn new_insert_builder(table: impl Into<String>) -> WriteBuilder {
WriteBuilder::new(table, MutationType::Insert)
}
pub fn new_update_builder(table: impl Into<String>) -> WriteBuilder {
WriteBuilder::new(table, MutationType::Update)
}
pub fn new_insert_or_update_builder(table: impl Into<String>) -> WriteBuilder {
WriteBuilder::new(table, MutationType::InsertOrUpdate)
}
pub fn new_replace_builder(table: impl Into<String>) -> WriteBuilder {
WriteBuilder::new(table, MutationType::Replace)
}
pub fn delete(table: impl Into<String>, key_set: KeySet) -> Mutation {
Mutation {
inner: InternalMutation::Delete(Delete {
table: table.into(),
key_set,
}),
}
}
pub(crate) fn build_proto(self) -> crate::model::Mutation {
match self.inner {
InternalMutation::Insert(write) => {
crate::model::Mutation::new().set_insert(write.into_proto())
}
InternalMutation::Update(write) => {
crate::model::Mutation::new().set_update(write.into_proto())
}
InternalMutation::InsertOrUpdate(write) => {
crate::model::Mutation::new().set_insert_or_update(write.into_proto())
}
InternalMutation::Replace(write) => {
crate::model::Mutation::new().set_replace(write.into_proto())
}
InternalMutation::Delete(delete) => {
crate::model::Mutation::new().set_delete(delete.into_proto())
}
}
}
pub(crate) fn select_mutation_key(
mutations: &[crate::model::Mutation],
) -> Option<crate::model::Mutation> {
if mutations.is_empty() {
return None;
}
let selected_non_insert = mutations
.iter()
.filter(|m| {
m.operation.as_ref().is_some_and(|op| {
!matches!(
op,
Operation::Insert(_) | Operation::Send(_) | Operation::Ack(_)
)
})
})
.choose(&mut rand::rng())
.cloned();
if selected_non_insert.is_some() {
return selected_non_insert;
}
let max_insert = mutations
.iter()
.filter_map(|m| match &m.operation {
Some(Operation::Insert(write)) => Some((m, write.values.len())),
_ => None,
})
.max_by_key(|&(_, rows)| rows)
.map(|(m, _)| m);
max_insert.cloned().or_else(|| mutations.first().cloned())
}
}
impl Write {
fn into_proto(self) -> crate::model::mutation::Write {
crate::model::mutation::Write::new()
.set_table(self.table)
.set_columns(self.columns)
.set_values(vec![
self.values
.into_iter()
.map(Value::into_serde_value)
.collect::<wkt::ListValue>(),
])
}
}
impl Delete {
fn into_proto(self) -> crate::model::mutation::Delete {
crate::model::mutation::Delete::new()
.set_table(self.table)
.set_key_set(self.key_set.into_proto())
}
}
pub struct WriteBuilder {
table: String,
mutation_type: MutationType,
columns: Vec<String>,
values: Vec<Value>,
}
enum MutationType {
Insert,
Update,
InsertOrUpdate,
Replace,
}
impl WriteBuilder {
fn new(table: impl Into<String>, mutation_type: MutationType) -> Self {
Self {
table: table.into(),
mutation_type,
columns: Vec::new(),
values: Vec::new(),
}
}
pub fn set(self, column_name: impl Into<String>) -> ValueBinder {
ValueBinder {
builder: self,
column: column_name.into(),
}
}
pub fn build(self) -> Mutation {
let write = Write {
table: self.table,
columns: self.columns,
values: self.values,
};
let inner = match self.mutation_type {
MutationType::Insert => InternalMutation::Insert(write),
MutationType::Update => InternalMutation::Update(write),
MutationType::InsertOrUpdate => InternalMutation::InsertOrUpdate(write),
MutationType::Replace => InternalMutation::Replace(write),
};
Mutation { inner }
}
}
pub struct ValueBinder {
builder: WriteBuilder,
column: String,
}
impl ValueBinder {
pub fn to<T: ToValue + ?Sized>(mut self, value: &T) -> WriteBuilder {
self.builder.columns.push(self.column);
self.builder.values.push(value.to_value());
self.builder
}
}
#[derive(Clone, Debug, PartialEq)]
#[non_exhaustive]
pub struct MutationGroup {
mutations: Vec<Mutation>,
}
impl MutationGroup {
pub fn new(mutations: Vec<Mutation>) -> Self {
Self { mutations }
}
pub fn mutations(&self) -> &[Mutation] {
&self.mutations
}
#[allow(dead_code)]
pub(crate) fn build_proto(self) -> ProtoMutationGroup {
ProtoMutationGroup::new().set_mutations(self.mutations.into_iter().map(|m| m.build_proto()))
}
}
impl IntoIterator for MutationGroup {
type Item = Mutation;
type IntoIter = IntoIter<Mutation>;
fn into_iter(self) -> Self::IntoIter {
self.mutations.into_iter()
}
}
impl<'a> IntoIterator for &'a MutationGroup {
type Item = &'a Mutation;
type IntoIter = Iter<'a, Mutation>;
fn into_iter(self) -> Self::IntoIter {
self.mutations.iter()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn auto_traits() {
static_assertions::assert_impl_all!(Mutation: Send, Sync, Clone, std::fmt::Debug);
static_assertions::assert_impl_all!(Write: Send, Sync, Clone, std::fmt::Debug);
static_assertions::assert_impl_all!(Delete: Send, Sync, Clone, std::fmt::Debug);
static_assertions::assert_impl_all!(WriteBuilder: Send, Sync);
static_assertions::assert_impl_all!(ValueBinder: Send, Sync);
static_assertions::assert_impl_all!(MutationGroup: Send, Sync, Clone, std::fmt::Debug);
}
#[test]
fn mutation_group() {
let mutation1 = Mutation::new_insert_builder("Users")
.set("UserId")
.to(&1)
.build();
let mutation2 = Mutation::new_insert_builder("Users")
.set("UserId")
.to(&2)
.build();
let group = MutationGroup::new(vec![mutation1.clone(), mutation2.clone()]);
assert_eq!(group.mutations.len(), 2);
assert_eq!(group.mutations[0], mutation1);
assert_eq!(group.mutations[1], mutation2);
}
#[test]
fn mutation_group_into_iter() {
let mutation1 = Mutation::new_insert_builder("Users")
.set("UserId")
.to(&1)
.build();
let mutation2 = Mutation::new_insert_builder("Users")
.set("UserId")
.to(&2)
.build();
let group = MutationGroup::new(vec![mutation1.clone(), mutation2.clone()]);
let mutations: Vec<_> = group.into_iter().collect();
assert_eq!(mutations, vec![mutation1, mutation2]);
}
#[test]
fn mutation_group_iter_ref() {
let mutation1 = Mutation::new_insert_builder("Users")
.set("UserId")
.to(&1)
.build();
let mutation2 = Mutation::new_insert_builder("Users")
.set("UserId")
.to(&2)
.build();
let group = MutationGroup::new(vec![mutation1.clone(), mutation2.clone()]);
let mutations: Vec<_> = (&group).into_iter().collect();
assert_eq!(mutations, vec![&mutation1, &mutation2]);
}
#[test]
fn insert_builder() {
let mutation = Mutation::new_insert_builder("Users")
.set("UserId")
.to(&1)
.set("UserName")
.to(&"Alice")
.build();
match mutation.inner {
InternalMutation::Insert(write) => {
assert_eq!(write.table, "Users");
assert_eq!(write.columns, vec!["UserId", "UserName"]);
assert_eq!(write.values.len(), 2);
assert_eq!(write.values[0].as_string(), "1");
assert_eq!(write.values[1].as_string(), "Alice");
}
_ => panic!("Expected Insert mutation"),
}
}
#[test]
fn update_builder() {
let mutation = Mutation::new_update_builder("Users")
.set("UserId")
.to(&1)
.build();
match mutation.inner {
InternalMutation::Update(write) => {
assert_eq!(write.table, "Users");
assert_eq!(write.columns, vec!["UserId"]);
assert_eq!(write.values.len(), 1);
assert_eq!(write.values[0].as_string(), "1");
}
_ => panic!("Expected Update mutation"),
}
}
#[test]
fn insert_or_update_builder() {
let mutation = Mutation::new_insert_or_update_builder("Users")
.set("UserId")
.to(&1)
.build();
match mutation.inner {
InternalMutation::InsertOrUpdate(write) => {
assert_eq!(write.table, "Users");
assert_eq!(write.columns, vec!["UserId"]);
assert_eq!(write.values.len(), 1);
assert_eq!(write.values[0].as_string(), "1");
}
_ => panic!("Expected InsertOrUpdate mutation"),
}
}
#[test]
fn replace_builder() {
let mutation = Mutation::new_replace_builder("Users")
.set("UserId")
.to(&1)
.build();
match mutation.inner {
InternalMutation::Replace(write) => {
assert_eq!(write.table, "Users");
assert_eq!(write.columns, vec!["UserId"]);
assert_eq!(write.values.len(), 1);
assert_eq!(write.values[0].as_string(), "1");
}
_ => panic!("Expected Replace mutation"),
}
}
#[test]
fn build_proto_insert() {
let mutation = Mutation::new_insert_builder("Users")
.set("UserId")
.to(&1)
.set("UserName")
.to(&"Alice")
.build();
let proto = mutation.build_proto();
match proto.operation {
Some(Operation::Insert(write)) => {
assert_eq!(write.table, "Users");
assert_eq!(write.columns, vec!["UserId", "UserName"]);
assert_eq!(write.values.len(), 1);
assert_eq!(write.values[0].len(), 2);
assert_eq!(write.values[0][0], serde_json::json!("1"));
assert_eq!(write.values[0][1], serde_json::json!("Alice"));
}
_ => panic!("Expected Insert operation, got {:?}", proto.operation),
}
}
#[test]
fn build_proto_update() {
let mutation = Mutation::new_update_builder("Users")
.set("UserId")
.to(&1)
.build();
let proto = mutation.build_proto();
match proto.operation {
Some(Operation::Update(write)) => {
assert_eq!(write.table, "Users");
assert_eq!(write.columns, vec!["UserId"]);
assert_eq!(write.values.len(), 1);
}
_ => panic!("Expected Update operation, got {:?}", proto.operation),
}
}
#[test]
fn build_proto_insert_or_update() {
let mutation = Mutation::new_insert_or_update_builder("Users")
.set("UserId")
.to(&1)
.build();
let proto = mutation.build_proto();
match proto.operation {
Some(Operation::InsertOrUpdate(write)) => {
assert_eq!(write.table, "Users");
assert_eq!(write.columns, vec!["UserId"]);
assert_eq!(write.values.len(), 1);
}
_ => panic!(
"Expected InsertOrUpdate operation, got {:?}",
proto.operation
),
}
}
#[test]
fn build_proto_replace() {
let mutation = Mutation::new_replace_builder("Users")
.set("UserId")
.to(&1)
.build();
let proto = mutation.build_proto();
match proto.operation {
Some(Operation::Replace(write)) => {
assert_eq!(write.table, "Users");
assert_eq!(write.columns, vec!["UserId"]);
assert_eq!(write.values.len(), 1);
}
_ => panic!("Expected Replace operation, got {:?}", proto.operation),
}
}
#[test]
fn build_proto_delete() {
let key_set = crate::key::KeySet::builder().build();
let mutation = Mutation::delete("Users", key_set);
let proto = mutation.build_proto();
match proto.operation {
Some(Operation::Delete(delete)) => {
assert_eq!(delete.table, "Users");
}
_ => panic!("Expected Delete operation, got {:?}", proto.operation),
}
}
#[test]
fn test_select_mutation_key_empty() {
let mutations = vec![];
let key = Mutation::select_mutation_key(&mutations);
assert!(key.is_none());
}
#[test]
fn test_select_mutation_key_prefers_insert_or_update_over_insert() {
let m1 = Mutation::new_insert_builder("Users")
.set("UserId")
.to(&1)
.build()
.build_proto();
let m2 = Mutation::new_insert_or_update_builder("Users")
.set("UserId")
.to(&2)
.build()
.build_proto();
let mutations = vec![m1.clone(), m2.clone()];
let key = Mutation::select_mutation_key(&mutations);
assert_eq!(key, Some(m2));
}
#[test]
fn test_select_mutation_key_only_insert_prefers_largest() {
let m1 = Mutation::new_insert_builder("Users")
.set("UserId")
.to(&1)
.build()
.build_proto();
let row1 = vec![serde_json::json!("2")]
.into_iter()
.collect::<wkt::ListValue>();
let row2 = vec![serde_json::json!("3")]
.into_iter()
.collect::<wkt::ListValue>();
let write2 = crate::model::mutation::Write::new()
.set_table("Users")
.set_columns(vec!["UserId".to_string()])
.set_values(vec![row1, row2]);
let m2 = crate::model::Mutation::new().set_insert(write2);
let mutations = vec![m1.clone(), m2.clone()];
let key = Mutation::select_mutation_key(&mutations);
assert_eq!(key, Some(m2));
}
#[test]
fn test_select_mutation_key_mix() {
let m1 = Mutation::new_insert_builder("Users")
.set("UserId")
.to(&1)
.build()
.build_proto();
let m2 = Mutation::new_update_builder("Users")
.set("UserId")
.to(&2)
.build()
.build_proto();
let m3 = Mutation::new_insert_or_update_builder("Users")
.set("UserId")
.to(&3)
.build()
.build_proto();
let mutations = vec![m1.clone(), m2.clone(), m3.clone()];
let key = Mutation::select_mutation_key(&mutations).expect("Expected a key");
assert!(
key == m2 || key == m3,
"Expected either m2 or m3 to be selected, got {:?}",
key
);
}
#[test]
fn test_select_mutation_key_only_non_insert() {
let m1 = Mutation::new_update_builder("Users")
.set("UserId")
.to(&1)
.build()
.build_proto();
let m2 = Mutation::new_replace_builder("Users")
.set("UserId")
.to(&2)
.build()
.build_proto();
let mutations = vec![m1.clone(), m2.clone()];
let key = Mutation::select_mutation_key(&mutations).expect("Expected a key");
assert!(
key == m1 || key == m2,
"Expected either m1 or m2 to be selected, got {:?}",
key
);
}
#[test]
fn test_select_mutation_key_operation_none() {
let m1 = crate::model::Mutation::default();
let m2 = crate::model::Mutation::default();
let mutations = vec![m1.clone(), m2.clone()];
let key = Mutation::select_mutation_key(&mutations);
assert_eq!(key, Some(m1));
}
}