1use crate::key::KeySet;
16use crate::model::batch_write_request::MutationGroup as ProtoMutationGroup;
17use crate::model::mutation::Operation;
18use crate::to_value::ToValue;
19use crate::value::Value;
20use rand::seq::IteratorRandom;
21use std::slice::Iter;
22use std::vec::IntoIter;
23
24#[derive(Clone, Debug, PartialEq)]
38pub struct Mutation {
39 pub(crate) inner: InternalMutation,
40}
41
42#[derive(Clone, Debug, PartialEq)]
43pub(crate) enum InternalMutation {
44 Insert(Write),
47 Update(Write),
50 InsertOrUpdate(Write),
53 Replace(Write),
56 Delete(Delete),
58}
59
60#[derive(Clone, Debug, PartialEq)]
62pub(crate) struct Write {
63 pub(crate) table: String,
64 pub(crate) columns: Vec<String>,
65 pub(crate) values: Vec<Value>,
66}
67
68#[derive(Clone, Debug, PartialEq)]
70pub(crate) struct Delete {
71 pub(crate) table: String,
72 pub(crate) key_set: KeySet,
75}
76
77impl Mutation {
78 pub fn new_insert_builder(table: impl Into<String>) -> WriteBuilder {
88 WriteBuilder::new(table, MutationType::Insert)
89 }
90
91 pub fn new_update_builder(table: impl Into<String>) -> WriteBuilder {
102 WriteBuilder::new(table, MutationType::Update)
103 }
104
105 pub fn new_insert_or_update_builder(table: impl Into<String>) -> WriteBuilder {
116 WriteBuilder::new(table, MutationType::InsertOrUpdate)
117 }
118
119 pub fn new_replace_builder(table: impl Into<String>) -> WriteBuilder {
130 WriteBuilder::new(table, MutationType::Replace)
131 }
132
133 pub fn delete(table: impl Into<String>, key_set: KeySet) -> Mutation {
140 Mutation {
141 inner: InternalMutation::Delete(Delete {
142 table: table.into(),
143 key_set,
144 }),
145 }
146 }
147
148 pub(crate) fn build_proto(self) -> crate::model::Mutation {
149 match self.inner {
150 InternalMutation::Insert(write) => {
151 crate::model::Mutation::new().set_insert(write.into_proto())
152 }
153 InternalMutation::Update(write) => {
154 crate::model::Mutation::new().set_update(write.into_proto())
155 }
156 InternalMutation::InsertOrUpdate(write) => {
157 crate::model::Mutation::new().set_insert_or_update(write.into_proto())
158 }
159 InternalMutation::Replace(write) => {
160 crate::model::Mutation::new().set_replace(write.into_proto())
161 }
162 InternalMutation::Delete(delete) => {
163 crate::model::Mutation::new().set_delete(delete.into_proto())
164 }
165 }
166 }
167
168 pub(crate) fn select_mutation_key(
175 mutations: &[crate::model::Mutation],
176 ) -> Option<crate::model::Mutation> {
177 if mutations.is_empty() {
178 return None;
179 }
180
181 let selected_non_insert = mutations
183 .iter()
184 .filter(|m| {
185 m.operation.as_ref().is_some_and(|op| {
186 !matches!(
187 op,
188 Operation::Insert(_) | Operation::Send(_) | Operation::Ack(_)
189 )
190 })
191 })
192 .choose(&mut rand::rng())
193 .cloned();
194
195 if selected_non_insert.is_some() {
196 return selected_non_insert;
197 }
198
199 let max_insert = mutations
201 .iter()
202 .filter_map(|m| match &m.operation {
203 Some(Operation::Insert(write)) => Some((m, write.values.len())),
204 _ => None,
205 })
206 .max_by_key(|&(_, rows)| rows)
207 .map(|(m, _)| m);
208
209 max_insert.cloned().or_else(|| mutations.first().cloned())
210 }
211}
212
213impl Write {
214 fn into_proto(self) -> crate::model::mutation::Write {
215 crate::model::mutation::Write::new()
216 .set_table(self.table)
217 .set_columns(self.columns)
218 .set_values(vec![
219 self.values
220 .into_iter()
221 .map(Value::into_serde_value)
222 .collect::<wkt::ListValue>(),
223 ])
224 }
225}
226
227impl Delete {
228 fn into_proto(self) -> crate::model::mutation::Delete {
229 crate::model::mutation::Delete::new()
230 .set_table(self.table)
231 .set_key_set(self.key_set.into_proto())
232 }
233}
234
235pub struct WriteBuilder {
237 table: String,
238 mutation_type: MutationType,
239 columns: Vec<String>,
240 values: Vec<Value>,
241}
242
243enum MutationType {
244 Insert,
245 Update,
246 InsertOrUpdate,
247 Replace,
248}
249
250impl WriteBuilder {
251 fn new(table: impl Into<String>, mutation_type: MutationType) -> Self {
252 Self {
253 table: table.into(),
254 mutation_type,
255 columns: Vec::new(),
256 values: Vec::new(),
257 }
258 }
259
260 pub fn set(self, column_name: impl Into<String>) -> ValueBinder {
270 ValueBinder {
271 builder: self,
272 column: column_name.into(),
273 }
274 }
275
276 pub fn build(self) -> Mutation {
278 let write = Write {
279 table: self.table,
280 columns: self.columns,
281 values: self.values,
282 };
283 let inner = match self.mutation_type {
284 MutationType::Insert => InternalMutation::Insert(write),
285 MutationType::Update => InternalMutation::Update(write),
286 MutationType::InsertOrUpdate => InternalMutation::InsertOrUpdate(write),
287 MutationType::Replace => InternalMutation::Replace(write),
288 };
289 Mutation { inner }
290 }
291}
292
293pub struct ValueBinder {
295 builder: WriteBuilder,
296 column: String,
297}
298
299impl ValueBinder {
300 pub fn to<T: ToValue + ?Sized>(mut self, value: &T) -> WriteBuilder {
302 self.builder.columns.push(self.column);
303 self.builder.values.push(value.to_value());
304 self.builder
305 }
306}
307
308#[derive(Clone, Debug, PartialEq)]
310#[non_exhaustive]
311pub struct MutationGroup {
312 mutations: Vec<Mutation>,
313}
314
315impl MutationGroup {
316 pub fn new(mutations: Vec<Mutation>) -> Self {
318 Self { mutations }
319 }
320
321 pub fn mutations(&self) -> &[Mutation] {
323 &self.mutations
324 }
325
326 #[allow(dead_code)]
327 pub(crate) fn build_proto(self) -> ProtoMutationGroup {
328 ProtoMutationGroup::new().set_mutations(self.mutations.into_iter().map(|m| m.build_proto()))
329 }
330}
331
332impl IntoIterator for MutationGroup {
333 type Item = Mutation;
334 type IntoIter = IntoIter<Mutation>;
335
336 fn into_iter(self) -> Self::IntoIter {
337 self.mutations.into_iter()
338 }
339}
340
341impl<'a> IntoIterator for &'a MutationGroup {
342 type Item = &'a Mutation;
343 type IntoIter = Iter<'a, Mutation>;
344
345 fn into_iter(self) -> Self::IntoIter {
346 self.mutations.iter()
347 }
348}
349
350#[cfg(test)]
351mod tests {
352 use super::*;
353
354 #[test]
355 fn auto_traits() {
356 static_assertions::assert_impl_all!(Mutation: Send, Sync, Clone, std::fmt::Debug);
357 static_assertions::assert_impl_all!(Write: Send, Sync, Clone, std::fmt::Debug);
358 static_assertions::assert_impl_all!(Delete: Send, Sync, Clone, std::fmt::Debug);
359 static_assertions::assert_impl_all!(WriteBuilder: Send, Sync);
360 static_assertions::assert_impl_all!(ValueBinder: Send, Sync);
361 static_assertions::assert_impl_all!(MutationGroup: Send, Sync, Clone, std::fmt::Debug);
362 }
363
364 #[test]
365 fn mutation_group() {
366 let mutation1 = Mutation::new_insert_builder("Users")
367 .set("UserId")
368 .to(&1)
369 .build();
370 let mutation2 = Mutation::new_insert_builder("Users")
371 .set("UserId")
372 .to(&2)
373 .build();
374 let group = MutationGroup::new(vec![mutation1.clone(), mutation2.clone()]);
375 assert_eq!(group.mutations.len(), 2);
376 assert_eq!(group.mutations[0], mutation1);
377 assert_eq!(group.mutations[1], mutation2);
378 }
379
380 #[test]
381 fn mutation_group_into_iter() {
382 let mutation1 = Mutation::new_insert_builder("Users")
383 .set("UserId")
384 .to(&1)
385 .build();
386 let mutation2 = Mutation::new_insert_builder("Users")
387 .set("UserId")
388 .to(&2)
389 .build();
390 let group = MutationGroup::new(vec![mutation1.clone(), mutation2.clone()]);
391
392 let mutations: Vec<_> = group.into_iter().collect();
393 assert_eq!(mutations, vec![mutation1, mutation2]);
394 }
395
396 #[test]
397 fn mutation_group_iter_ref() {
398 let mutation1 = Mutation::new_insert_builder("Users")
399 .set("UserId")
400 .to(&1)
401 .build();
402 let mutation2 = Mutation::new_insert_builder("Users")
403 .set("UserId")
404 .to(&2)
405 .build();
406 let group = MutationGroup::new(vec![mutation1.clone(), mutation2.clone()]);
407
408 let mutations: Vec<_> = (&group).into_iter().collect();
409 assert_eq!(mutations, vec![&mutation1, &mutation2]);
410 }
411
412 #[test]
413 fn insert_builder() {
414 let mutation = Mutation::new_insert_builder("Users")
415 .set("UserId")
416 .to(&1)
417 .set("UserName")
418 .to(&"Alice")
419 .build();
420
421 match mutation.inner {
422 InternalMutation::Insert(write) => {
423 assert_eq!(write.table, "Users");
424 assert_eq!(write.columns, vec!["UserId", "UserName"]);
425 assert_eq!(write.values.len(), 2);
426 assert_eq!(write.values[0].as_string(), "1");
427 assert_eq!(write.values[1].as_string(), "Alice");
428 }
429 _ => panic!("Expected Insert mutation"),
430 }
431 }
432
433 #[test]
434 fn update_builder() {
435 let mutation = Mutation::new_update_builder("Users")
436 .set("UserId")
437 .to(&1)
438 .build();
439
440 match mutation.inner {
441 InternalMutation::Update(write) => {
442 assert_eq!(write.table, "Users");
443 assert_eq!(write.columns, vec!["UserId"]);
444 assert_eq!(write.values.len(), 1);
445 assert_eq!(write.values[0].as_string(), "1");
446 }
447 _ => panic!("Expected Update mutation"),
448 }
449 }
450
451 #[test]
452 fn insert_or_update_builder() {
453 let mutation = Mutation::new_insert_or_update_builder("Users")
454 .set("UserId")
455 .to(&1)
456 .build();
457
458 match mutation.inner {
459 InternalMutation::InsertOrUpdate(write) => {
460 assert_eq!(write.table, "Users");
461 assert_eq!(write.columns, vec!["UserId"]);
462 assert_eq!(write.values.len(), 1);
463 assert_eq!(write.values[0].as_string(), "1");
464 }
465 _ => panic!("Expected InsertOrUpdate mutation"),
466 }
467 }
468
469 #[test]
470 fn replace_builder() {
471 let mutation = Mutation::new_replace_builder("Users")
472 .set("UserId")
473 .to(&1)
474 .build();
475
476 match mutation.inner {
477 InternalMutation::Replace(write) => {
478 assert_eq!(write.table, "Users");
479 assert_eq!(write.columns, vec!["UserId"]);
480 assert_eq!(write.values.len(), 1);
481 assert_eq!(write.values[0].as_string(), "1");
482 }
483 _ => panic!("Expected Replace mutation"),
484 }
485 }
486
487 #[test]
488 fn build_proto_insert() {
489 let mutation = Mutation::new_insert_builder("Users")
490 .set("UserId")
491 .to(&1)
492 .set("UserName")
493 .to(&"Alice")
494 .build();
495 let proto = mutation.build_proto();
496 match proto.operation {
497 Some(Operation::Insert(write)) => {
498 assert_eq!(write.table, "Users");
499 assert_eq!(write.columns, vec!["UserId", "UserName"]);
500 assert_eq!(write.values.len(), 1);
501 assert_eq!(write.values[0].len(), 2);
502 assert_eq!(write.values[0][0], serde_json::json!("1"));
503 assert_eq!(write.values[0][1], serde_json::json!("Alice"));
504 }
505 _ => panic!("Expected Insert operation, got {:?}", proto.operation),
506 }
507 }
508
509 #[test]
510 fn build_proto_update() {
511 let mutation = Mutation::new_update_builder("Users")
512 .set("UserId")
513 .to(&1)
514 .build();
515 let proto = mutation.build_proto();
516 match proto.operation {
517 Some(Operation::Update(write)) => {
518 assert_eq!(write.table, "Users");
519 assert_eq!(write.columns, vec!["UserId"]);
520 assert_eq!(write.values.len(), 1);
521 }
522 _ => panic!("Expected Update operation, got {:?}", proto.operation),
523 }
524 }
525
526 #[test]
527 fn build_proto_insert_or_update() {
528 let mutation = Mutation::new_insert_or_update_builder("Users")
529 .set("UserId")
530 .to(&1)
531 .build();
532 let proto = mutation.build_proto();
533 match proto.operation {
534 Some(Operation::InsertOrUpdate(write)) => {
535 assert_eq!(write.table, "Users");
536 assert_eq!(write.columns, vec!["UserId"]);
537 assert_eq!(write.values.len(), 1);
538 }
539 _ => panic!(
540 "Expected InsertOrUpdate operation, got {:?}",
541 proto.operation
542 ),
543 }
544 }
545
546 #[test]
547 fn build_proto_replace() {
548 let mutation = Mutation::new_replace_builder("Users")
549 .set("UserId")
550 .to(&1)
551 .build();
552 let proto = mutation.build_proto();
553 match proto.operation {
554 Some(Operation::Replace(write)) => {
555 assert_eq!(write.table, "Users");
556 assert_eq!(write.columns, vec!["UserId"]);
557 assert_eq!(write.values.len(), 1);
558 }
559 _ => panic!("Expected Replace operation, got {:?}", proto.operation),
560 }
561 }
562
563 #[test]
564 fn build_proto_delete() {
565 let key_set = crate::key::KeySet::builder().build();
566 let mutation = Mutation::delete("Users", key_set);
567 let proto = mutation.build_proto();
568 match proto.operation {
569 Some(Operation::Delete(delete)) => {
570 assert_eq!(delete.table, "Users");
571 }
572 _ => panic!("Expected Delete operation, got {:?}", proto.operation),
573 }
574 }
575
576 #[test]
577 fn test_select_mutation_key_empty() {
578 let mutations = vec![];
579 let key = Mutation::select_mutation_key(&mutations);
580 assert!(key.is_none());
581 }
582
583 #[test]
584 fn test_select_mutation_key_prefers_insert_or_update_over_insert() {
585 let m1 = Mutation::new_insert_builder("Users")
586 .set("UserId")
587 .to(&1)
588 .build()
589 .build_proto();
590 let m2 = Mutation::new_insert_or_update_builder("Users")
591 .set("UserId")
592 .to(&2)
593 .build()
594 .build_proto();
595 let mutations = vec![m1.clone(), m2.clone()];
596 let key = Mutation::select_mutation_key(&mutations);
597 assert_eq!(key, Some(m2));
598 }
599
600 #[test]
601 fn test_select_mutation_key_only_insert_prefers_largest() {
602 let m1 = Mutation::new_insert_builder("Users")
603 .set("UserId")
604 .to(&1)
605 .build()
606 .build_proto();
607
608 let row1 = vec![serde_json::json!("2")]
610 .into_iter()
611 .collect::<wkt::ListValue>();
612 let row2 = vec![serde_json::json!("3")]
613 .into_iter()
614 .collect::<wkt::ListValue>();
615 let write2 = crate::model::mutation::Write::new()
616 .set_table("Users")
617 .set_columns(vec!["UserId".to_string()])
618 .set_values(vec![row1, row2]);
619 let m2 = crate::model::Mutation::new().set_insert(write2);
620
621 let mutations = vec![m1.clone(), m2.clone()];
622 let key = Mutation::select_mutation_key(&mutations);
623 assert_eq!(key, Some(m2));
624 }
625
626 #[test]
627 fn test_select_mutation_key_mix() {
628 let m1 = Mutation::new_insert_builder("Users")
629 .set("UserId")
630 .to(&1)
631 .build()
632 .build_proto();
633 let m2 = Mutation::new_update_builder("Users")
634 .set("UserId")
635 .to(&2)
636 .build()
637 .build_proto();
638 let m3 = Mutation::new_insert_or_update_builder("Users")
639 .set("UserId")
640 .to(&3)
641 .build()
642 .build_proto();
643 let mutations = vec![m1.clone(), m2.clone(), m3.clone()];
644 let key = Mutation::select_mutation_key(&mutations).expect("Expected a key");
645 assert!(
647 key == m2 || key == m3,
648 "Expected either m2 or m3 to be selected, got {:?}",
649 key
650 );
651 }
652
653 #[test]
654 fn test_select_mutation_key_only_non_insert() {
655 let m1 = Mutation::new_update_builder("Users")
656 .set("UserId")
657 .to(&1)
658 .build()
659 .build_proto();
660 let m2 = Mutation::new_replace_builder("Users")
661 .set("UserId")
662 .to(&2)
663 .build()
664 .build_proto();
665 let mutations = vec![m1.clone(), m2.clone()];
666 let key = Mutation::select_mutation_key(&mutations).expect("Expected a key");
667 assert!(
669 key == m1 || key == m2,
670 "Expected either m1 or m2 to be selected, got {:?}",
671 key
672 );
673 }
674
675 #[test]
676 fn test_select_mutation_key_operation_none() {
677 let m1 = crate::model::Mutation::default();
678 let m2 = crate::model::Mutation::default();
679 let mutations = vec![m1.clone(), m2.clone()];
680 let key = Mutation::select_mutation_key(&mutations);
681 assert_eq!(key, Some(m1));
682 }
683}