1use super::{ConstraintValidator, Operator, OperatorResult, PropertySource};
9use crate::execution::chunk::{DataChunk, DataChunkBuilder};
10use crate::graph::{GraphStore, GraphStoreMut};
11use grafeo_common::types::{
12 EdgeId, EpochId, LogicalType, NodeId, PropertyKey, TransactionId, Value,
13};
14use std::sync::Arc;
15
16pub struct MergeConfig {
18 pub variable: String,
20 pub labels: Vec<String>,
22 pub match_properties: Vec<(String, PropertySource)>,
24 pub on_create_properties: Vec<(String, PropertySource)>,
26 pub on_match_properties: Vec<(String, PropertySource)>,
28 pub output_schema: Vec<LogicalType>,
30 pub output_column: usize,
32 pub bound_variable_column: Option<usize>,
36}
37
38pub struct MergeOperator {
46 store: Arc<dyn GraphStoreMut>,
48 input: Option<Box<dyn Operator>>,
50 config: MergeConfig,
52 executed: bool,
54 viewing_epoch: Option<EpochId>,
56 transaction_id: Option<TransactionId>,
58 validator: Option<Arc<dyn ConstraintValidator>>,
60}
61
62impl MergeOperator {
63 pub fn new(
65 store: Arc<dyn GraphStoreMut>,
66 input: Option<Box<dyn Operator>>,
67 config: MergeConfig,
68 ) -> Self {
69 Self {
70 store,
71 input,
72 config,
73 executed: false,
74 viewing_epoch: None,
75 transaction_id: None,
76 validator: None,
77 }
78 }
79
80 #[must_use]
82 pub fn variable(&self) -> &str {
83 &self.config.variable
84 }
85
86 pub fn with_transaction_context(
88 mut self,
89 epoch: EpochId,
90 transaction_id: Option<TransactionId>,
91 ) -> Self {
92 self.viewing_epoch = Some(epoch);
93 self.transaction_id = transaction_id;
94 self
95 }
96
97 pub fn with_validator(mut self, validator: Arc<dyn ConstraintValidator>) -> Self {
99 self.validator = Some(validator);
100 self
101 }
102
103 fn resolve_properties(
105 props: &[(String, PropertySource)],
106 chunk: Option<&DataChunk>,
107 row: usize,
108 store: &dyn GraphStore,
109 ) -> Vec<(String, Value)> {
110 props
111 .iter()
112 .map(|(name, source)| {
113 let value = if let Some(chunk) = chunk {
114 source.resolve(chunk, row, store)
115 } else {
116 match source {
118 PropertySource::Constant(v) => v.clone(),
119 _ => Value::Null,
120 }
121 };
122 (name.clone(), value)
123 })
124 .collect()
125 }
126
127 fn find_matching_node(&self, resolved_match_props: &[(String, Value)]) -> Option<NodeId> {
129 let use_index = resolved_match_props
132 .iter()
133 .any(|(k, v)| !v.is_null() && self.store.has_property_index(k));
134
135 let candidates: Vec<NodeId> = if use_index {
136 let conditions: Vec<(&str, Value)> = resolved_match_props
137 .iter()
138 .filter(|(_, v)| !v.is_null())
139 .map(|(k, v)| (k.as_str(), v.clone()))
140 .collect();
141 self.store.find_nodes_by_properties(&conditions)
142 } else if let Some(first_label) = self.config.labels.first() {
143 self.store.nodes_by_label(first_label)
144 } else {
145 self.store.node_ids()
146 };
147
148 for node_id in candidates {
149 if let Some(node) = self.store.get_node(node_id) {
150 let has_all_labels = self.config.labels.iter().all(|label| node.has_label(label));
151 if !has_all_labels {
152 continue;
153 }
154
155 let has_all_props = resolved_match_props.iter().all(|(key, expected_value)| {
156 let prop = node.properties.get(&PropertyKey::new(key.as_str()));
157 if expected_value.is_null() {
158 prop.map_or(true, |v| v.is_null())
160 } else {
161 prop.is_some_and(|v| v == expected_value)
162 }
163 });
164
165 if has_all_props {
166 return Some(node_id);
167 }
168 }
169 }
170
171 None
172 }
173
174 fn create_node(
176 &self,
177 resolved_match_props: &[(String, Value)],
178 resolved_create_props: &[(String, Value)],
179 ) -> Result<NodeId, super::OperatorError> {
180 if let Some(ref validator) = self.validator {
182 validator.validate_node_labels_allowed(&self.config.labels)?;
183
184 let all_props: Vec<(String, Value)> = resolved_match_props
185 .iter()
186 .chain(resolved_create_props.iter())
187 .map(|(k, v)| (k.clone(), v.clone()))
188 .collect();
189 for (name, value) in &all_props {
190 validator.validate_node_property(&self.config.labels, name, value)?;
191 validator.check_unique_node_property(&self.config.labels, name, value)?;
192 }
193 validator.validate_node_complete(&self.config.labels, &all_props)?;
194 }
195
196 let mut all_props: Vec<(PropertyKey, Value)> = resolved_match_props
197 .iter()
198 .map(|(k, v)| (PropertyKey::new(k.as_str()), v.clone()))
199 .collect();
200
201 for (k, v) in resolved_create_props {
202 if let Some(existing) = all_props.iter_mut().find(|(key, _)| key.as_str() == k) {
203 existing.1 = v.clone();
204 } else {
205 all_props.push((PropertyKey::new(k.as_str()), v.clone()));
206 }
207 }
208
209 let labels: Vec<&str> = self.config.labels.iter().map(String::as_str).collect();
210 Ok(self.store.create_node_with_props(&labels, &all_props))
211 }
212
213 fn merge_node_for_row(
215 &self,
216 chunk: Option<&DataChunk>,
217 row: usize,
218 ) -> Result<NodeId, super::OperatorError> {
219 let store_ref: &dyn GraphStore = self.store.as_ref();
220 let resolved_match =
221 Self::resolve_properties(&self.config.match_properties, chunk, row, store_ref);
222
223 if let Some(existing_id) = self.find_matching_node(&resolved_match) {
224 let resolved_on_match =
225 Self::resolve_properties(&self.config.on_match_properties, chunk, row, store_ref);
226 self.apply_on_match(existing_id, &resolved_on_match)?;
227 Ok(existing_id)
228 } else {
229 let resolved_on_create =
230 Self::resolve_properties(&self.config.on_create_properties, chunk, row, store_ref);
231 self.create_node(&resolved_match, &resolved_on_create)
232 }
233 }
234
235 fn apply_on_match(
237 &self,
238 node_id: NodeId,
239 resolved_on_match: &[(String, Value)],
240 ) -> Result<(), super::OperatorError> {
241 for (key, value) in resolved_on_match {
242 if let Some(ref validator) = self.validator {
243 validator.validate_node_property(&self.config.labels, key, value)?;
244 }
245 if let Some(tid) = self.transaction_id {
246 self.store
247 .set_node_property_versioned(node_id, key.as_str(), value.clone(), tid);
248 } else {
249 self.store
250 .set_node_property(node_id, key.as_str(), value.clone());
251 }
252 }
253 Ok(())
254 }
255}
256
257impl Operator for MergeOperator {
258 fn next(&mut self) -> OperatorResult {
259 if let Some(ref mut input) = self.input {
262 if let Some(chunk) = input.next()? {
263 let mut builder =
264 DataChunkBuilder::with_capacity(&self.config.output_schema, chunk.row_count());
265
266 for row in chunk.selected_indices() {
267 if let Some(bound_col) = self.config.bound_variable_column {
269 let is_null = chunk.column(bound_col).map_or(true, |col| col.is_null(row));
270 if is_null {
271 return Err(super::OperatorError::TypeMismatch {
272 expected: format!(
273 "non-null node for MERGE variable '{}'",
274 self.config.variable
275 ),
276 found: "NULL".to_string(),
277 });
278 }
279 }
280
281 let node_id = self.merge_node_for_row(Some(&chunk), row)?;
283
284 for col_idx in 0..chunk.column_count() {
286 if let (Some(src), Some(dst)) =
287 (chunk.column(col_idx), builder.column_mut(col_idx))
288 {
289 if let Some(val) = src.get_value(row) {
290 dst.push_value(val);
291 } else {
292 dst.push_value(Value::Null);
293 }
294 }
295 }
296
297 if let Some(dst) = builder.column_mut(self.config.output_column) {
299 dst.push_node_id(node_id);
300 }
301
302 builder.advance_row();
303 }
304
305 return Ok(Some(builder.finish()));
306 }
307 return Ok(None);
308 }
309
310 if self.executed {
312 return Ok(None);
313 }
314 self.executed = true;
315
316 let node_id = self.merge_node_for_row(None, 0)?;
317
318 let mut builder = DataChunkBuilder::new(&self.config.output_schema);
319 if let Some(dst) = builder.column_mut(self.config.output_column) {
320 dst.push_node_id(node_id);
321 }
322 builder.advance_row();
323
324 Ok(Some(builder.finish()))
325 }
326
327 fn reset(&mut self) {
328 self.executed = false;
329 if let Some(ref mut input) = self.input {
330 input.reset();
331 }
332 }
333
334 fn name(&self) -> &'static str {
335 "Merge"
336 }
337
338 fn into_any(self: Box<Self>) -> Box<dyn std::any::Any + Send> {
339 self
340 }
341}
342
343pub struct MergeRelationshipConfig {
345 pub source_column: usize,
347 pub target_column: usize,
349 pub source_variable: String,
351 pub target_variable: String,
353 pub edge_type: String,
355 pub match_properties: Vec<(String, PropertySource)>,
357 pub on_create_properties: Vec<(String, PropertySource)>,
359 pub on_match_properties: Vec<(String, PropertySource)>,
361 pub output_schema: Vec<LogicalType>,
363 pub edge_output_column: usize,
365}
366
367pub struct MergeRelationshipOperator {
374 store: Arc<dyn GraphStoreMut>,
376 input: Box<dyn Operator>,
378 config: MergeRelationshipConfig,
380 viewing_epoch: Option<EpochId>,
382 transaction_id: Option<TransactionId>,
384 validator: Option<Arc<dyn ConstraintValidator>>,
386}
387
388impl MergeRelationshipOperator {
389 pub fn new(
391 store: Arc<dyn GraphStoreMut>,
392 input: Box<dyn Operator>,
393 config: MergeRelationshipConfig,
394 ) -> Self {
395 Self {
396 store,
397 input,
398 config,
399 viewing_epoch: None,
400 transaction_id: None,
401 validator: None,
402 }
403 }
404
405 pub fn with_transaction_context(
407 mut self,
408 epoch: EpochId,
409 transaction_id: Option<TransactionId>,
410 ) -> Self {
411 self.viewing_epoch = Some(epoch);
412 self.transaction_id = transaction_id;
413 self
414 }
415
416 pub fn with_validator(mut self, validator: Arc<dyn ConstraintValidator>) -> Self {
418 self.validator = Some(validator);
419 self
420 }
421
422 fn find_matching_edge(
424 &self,
425 src: NodeId,
426 dst: NodeId,
427 resolved_match_props: &[(String, Value)],
428 ) -> Option<EdgeId> {
429 use crate::graph::Direction;
430
431 for (target, edge_id) in self.store.edges_from(src, Direction::Outgoing) {
432 if target != dst {
433 continue;
434 }
435
436 if let Some(edge) = self.store.get_edge(edge_id) {
437 if edge.edge_type.as_str() != self.config.edge_type {
438 continue;
439 }
440
441 let has_all_props = resolved_match_props
442 .iter()
443 .all(|(key, expected)| edge.get_property(key).is_some_and(|v| v == expected));
444
445 if has_all_props {
446 return Some(edge_id);
447 }
448 }
449 }
450
451 None
452 }
453
454 fn create_edge(
456 &self,
457 src: NodeId,
458 dst: NodeId,
459 resolved_match_props: &[(String, Value)],
460 resolved_create_props: &[(String, Value)],
461 ) -> Result<EdgeId, super::OperatorError> {
462 if let Some(ref validator) = self.validator {
464 validator.validate_edge_type_allowed(&self.config.edge_type)?;
465
466 let all_props: Vec<(String, Value)> = resolved_match_props
467 .iter()
468 .chain(resolved_create_props.iter())
469 .map(|(k, v)| (k.clone(), v.clone()))
470 .collect();
471 for (name, value) in &all_props {
472 validator.validate_edge_property(&self.config.edge_type, name, value)?;
473 }
474 validator.validate_edge_complete(&self.config.edge_type, &all_props)?;
475 }
476
477 let mut all_props: Vec<(PropertyKey, Value)> = resolved_match_props
478 .iter()
479 .map(|(k, v)| (PropertyKey::new(k.as_str()), v.clone()))
480 .collect();
481
482 for (k, v) in resolved_create_props {
483 if let Some(existing) = all_props.iter_mut().find(|(key, _)| key.as_str() == k) {
484 existing.1 = v.clone();
485 } else {
486 all_props.push((PropertyKey::new(k.as_str()), v.clone()));
487 }
488 }
489
490 Ok(self
491 .store
492 .create_edge_with_props(src, dst, &self.config.edge_type, &all_props))
493 }
494
495 fn apply_on_match_edge(
497 &self,
498 edge_id: EdgeId,
499 resolved_on_match: &[(String, Value)],
500 ) -> Result<(), super::OperatorError> {
501 for (key, value) in resolved_on_match {
502 if let Some(ref validator) = self.validator {
503 validator.validate_edge_property(&self.config.edge_type, key, value)?;
504 }
505 if let Some(tid) = self.transaction_id {
506 self.store
507 .set_edge_property_versioned(edge_id, key.as_str(), value.clone(), tid);
508 } else {
509 self.store
510 .set_edge_property(edge_id, key.as_str(), value.clone());
511 }
512 }
513 Ok(())
514 }
515}
516
517impl Operator for MergeRelationshipOperator {
518 fn next(&mut self) -> OperatorResult {
519 use super::OperatorError;
520
521 if let Some(chunk) = self.input.next()? {
522 let mut builder =
523 DataChunkBuilder::with_capacity(&self.config.output_schema, chunk.row_count());
524
525 for row in chunk.selected_indices() {
526 let src_val = chunk
527 .column(self.config.source_column)
528 .and_then(|c| c.get_node_id(row))
529 .ok_or_else(|| OperatorError::TypeMismatch {
530 expected: format!(
531 "non-null node for MERGE variable '{}'",
532 self.config.source_variable
533 ),
534 found: "NULL".to_string(),
535 })?;
536
537 let dst_val = chunk
538 .column(self.config.target_column)
539 .and_then(|c| c.get_node_id(row))
540 .ok_or_else(|| OperatorError::TypeMismatch {
541 expected: format!(
542 "non-null node for MERGE variable '{}'",
543 self.config.target_variable
544 ),
545 found: "None".to_string(),
546 })?;
547
548 let store_ref: &dyn GraphStore = self.store.as_ref();
549 let resolved_match = MergeOperator::resolve_properties(
550 &self.config.match_properties,
551 Some(&chunk),
552 row,
553 store_ref,
554 );
555
556 let edge_id = if let Some(existing) =
557 self.find_matching_edge(src_val, dst_val, &resolved_match)
558 {
559 let resolved_on_match = MergeOperator::resolve_properties(
560 &self.config.on_match_properties,
561 Some(&chunk),
562 row,
563 store_ref,
564 );
565 self.apply_on_match_edge(existing, &resolved_on_match)?;
566 existing
567 } else {
568 let resolved_on_create = MergeOperator::resolve_properties(
569 &self.config.on_create_properties,
570 Some(&chunk),
571 row,
572 store_ref,
573 );
574 self.create_edge(src_val, dst_val, &resolved_match, &resolved_on_create)?
575 };
576
577 for col_idx in 0..self.config.output_schema.len() {
579 if col_idx == self.config.edge_output_column {
580 if let Some(dst_col) = builder.column_mut(col_idx) {
581 dst_col.push_edge_id(edge_id);
582 }
583 } else if let (Some(src_col), Some(dst_col)) =
584 (chunk.column(col_idx), builder.column_mut(col_idx))
585 && let Some(val) = src_col.get_value(row)
586 {
587 dst_col.push_value(val);
588 }
589 }
590
591 builder.advance_row();
592 }
593
594 return Ok(Some(builder.finish()));
595 }
596
597 Ok(None)
598 }
599
600 fn reset(&mut self) {
601 self.input.reset();
602 }
603
604 fn name(&self) -> &'static str {
605 "MergeRelationship"
606 }
607
608 fn into_any(self: Box<Self>) -> Box<dyn std::any::Any + Send> {
609 self
610 }
611}
612
613#[cfg(all(test, feature = "lpg"))]
614mod tests {
615 use super::*;
616 use crate::graph::lpg::LpgStore;
617
618 fn const_props(props: Vec<(&str, Value)>) -> Vec<(String, PropertySource)> {
619 props
620 .into_iter()
621 .map(|(k, v)| (k.to_string(), PropertySource::Constant(v)))
622 .collect()
623 }
624
625 #[test]
626 fn test_merge_creates_new_node() {
627 let store: Arc<dyn GraphStoreMut> = Arc::new(LpgStore::new().unwrap());
628
629 let mut merge = MergeOperator::new(
631 Arc::clone(&store),
632 None,
633 MergeConfig {
634 variable: "n".to_string(),
635 labels: vec!["Person".to_string()],
636 match_properties: const_props(vec![("name", Value::String("Alix".into()))]),
637 on_create_properties: vec![],
638 on_match_properties: vec![],
639 output_schema: vec![LogicalType::Node],
640 output_column: 0,
641 bound_variable_column: None,
642 },
643 );
644
645 let result = merge.next().unwrap();
646 assert!(result.is_some());
647
648 let nodes = store.nodes_by_label("Person");
650 assert_eq!(nodes.len(), 1);
651
652 let node = store.get_node(nodes[0]).unwrap();
653 assert!(node.has_label("Person"));
654 assert_eq!(
655 node.properties.get(&PropertyKey::new("name")),
656 Some(&Value::String("Alix".into()))
657 );
658 }
659
660 #[test]
661 fn test_merge_matches_existing_node() {
662 let store: Arc<dyn GraphStoreMut> = Arc::new(LpgStore::new().unwrap());
663
664 store.create_node_with_props(
666 &["Person"],
667 &[(PropertyKey::new("name"), Value::String("Gus".into()))],
668 );
669
670 let mut merge = MergeOperator::new(
672 Arc::clone(&store),
673 None,
674 MergeConfig {
675 variable: "n".to_string(),
676 labels: vec!["Person".to_string()],
677 match_properties: const_props(vec![("name", Value::String("Gus".into()))]),
678 on_create_properties: vec![],
679 on_match_properties: vec![],
680 output_schema: vec![LogicalType::Node],
681 output_column: 0,
682 bound_variable_column: None,
683 },
684 );
685
686 let result = merge.next().unwrap();
687 assert!(result.is_some());
688
689 let nodes = store.nodes_by_label("Person");
691 assert_eq!(nodes.len(), 1);
692 }
693
694 #[test]
695 fn test_merge_with_on_create() {
696 let store: Arc<dyn GraphStoreMut> = Arc::new(LpgStore::new().unwrap());
697
698 let mut merge = MergeOperator::new(
700 Arc::clone(&store),
701 None,
702 MergeConfig {
703 variable: "n".to_string(),
704 labels: vec!["Person".to_string()],
705 match_properties: const_props(vec![("name", Value::String("Vincent".into()))]),
706 on_create_properties: const_props(vec![("created", Value::Bool(true))]),
707 on_match_properties: vec![],
708 output_schema: vec![LogicalType::Node],
709 output_column: 0,
710 bound_variable_column: None,
711 },
712 );
713
714 let _ = merge.next().unwrap();
715
716 let nodes = store.nodes_by_label("Person");
718 let node = store.get_node(nodes[0]).unwrap();
719 assert_eq!(
720 node.properties.get(&PropertyKey::new("name")),
721 Some(&Value::String("Vincent".into()))
722 );
723 assert_eq!(
724 node.properties.get(&PropertyKey::new("created")),
725 Some(&Value::Bool(true))
726 );
727 }
728
729 #[test]
730 fn test_merge_with_on_match() {
731 let store: Arc<dyn GraphStoreMut> = Arc::new(LpgStore::new().unwrap());
732
733 let node_id = store.create_node_with_props(
735 &["Person"],
736 &[(PropertyKey::new("name"), Value::String("Jules".into()))],
737 );
738
739 let mut merge = MergeOperator::new(
741 Arc::clone(&store),
742 None,
743 MergeConfig {
744 variable: "n".to_string(),
745 labels: vec!["Person".to_string()],
746 match_properties: const_props(vec![("name", Value::String("Jules".into()))]),
747 on_create_properties: vec![],
748 on_match_properties: const_props(vec![("updated", Value::Bool(true))]),
749 output_schema: vec![LogicalType::Node],
750 output_column: 0,
751 bound_variable_column: None,
752 },
753 );
754
755 let _ = merge.next().unwrap();
756
757 let node = store.get_node(node_id).unwrap();
759 assert_eq!(
760 node.properties.get(&PropertyKey::new("updated")),
761 Some(&Value::Bool(true))
762 );
763 }
764
765 #[test]
766 fn test_merge_uses_property_index() {
767 let lpg_store = Arc::new(LpgStore::new().unwrap());
768 lpg_store.create_property_index("name");
769 assert!(lpg_store.has_property_index("name"));
770
771 let store: Arc<dyn GraphStoreMut> = lpg_store;
773
774 for i in 0..50u32 {
775 store.create_node_with_props(
776 &["Person"],
777 &[(
778 PropertyKey::new("name"),
779 Value::String(format!("person_{i}").into()),
780 )],
781 );
782 }
783
784 let target_id = store.create_node_with_props(
785 &["Person"],
786 &[(PropertyKey::new("name"), Value::String("Beatrix".into()))],
787 );
788
789 let mut merge = MergeOperator::new(
791 Arc::clone(&store),
792 None,
793 MergeConfig {
794 variable: "n".to_string(),
795 labels: vec!["Person".to_string()],
796 match_properties: const_props(vec![("name", Value::String("Beatrix".into()))]),
797 on_create_properties: vec![],
798 on_match_properties: const_props(vec![("found", Value::Bool(true))]),
799 output_schema: vec![LogicalType::Node],
800 output_column: 0,
801 bound_variable_column: None,
802 },
803 );
804
805 let result = merge.next().unwrap();
806 assert!(result.is_some());
807
808 let node = store.get_node(target_id).unwrap();
810 assert_eq!(
811 node.properties.get(&PropertyKey::new("found")),
812 Some(&Value::Bool(true))
813 );
814
815 let persons = store.nodes_by_label("Person");
817 assert_eq!(persons.len(), 51);
818 }
819
820 #[test]
821 fn test_merge_creates_via_index_miss() {
822 let lpg_store = Arc::new(LpgStore::new().unwrap());
823 lpg_store.create_property_index("name");
824
825 let store: Arc<dyn GraphStoreMut> = lpg_store;
826
827 store.create_node_with_props(
828 &["Person"],
829 &[(PropertyKey::new("name"), Value::String("Django".into()))],
830 );
831
832 let mut merge = MergeOperator::new(
834 Arc::clone(&store),
835 None,
836 MergeConfig {
837 variable: "n".to_string(),
838 labels: vec!["Person".to_string()],
839 match_properties: const_props(vec![("name", Value::String("Shosanna".into()))]),
840 on_create_properties: const_props(vec![("created", Value::Bool(true))]),
841 on_match_properties: vec![],
842 output_schema: vec![LogicalType::Node],
843 output_column: 0,
844 bound_variable_column: None,
845 },
846 );
847
848 let result = merge.next().unwrap();
849 assert!(result.is_some());
850
851 let persons = store.nodes_by_label("Person");
852 assert_eq!(persons.len(), 2);
853
854 let new_nodes: Vec<_> = persons
855 .iter()
856 .filter_map(|&id| store.get_node(id))
857 .filter(|n| {
858 n.properties.get(&PropertyKey::new("name"))
859 == Some(&Value::String("Shosanna".into()))
860 })
861 .collect();
862 assert_eq!(new_nodes.len(), 1);
863 assert_eq!(
864 new_nodes[0].properties.get(&PropertyKey::new("created")),
865 Some(&Value::Bool(true))
866 );
867 }
868
869 #[test]
870 fn test_merge_into_any() {
871 let store: Arc<dyn GraphStoreMut> = Arc::new(LpgStore::new().unwrap());
872 let op = MergeOperator::new(
873 Arc::clone(&store),
874 None,
875 MergeConfig {
876 variable: "n".to_string(),
877 labels: vec!["Person".to_string()],
878 match_properties: vec![],
879 on_create_properties: vec![],
880 on_match_properties: vec![],
881 output_schema: vec![LogicalType::Node],
882 output_column: 0,
883 bound_variable_column: None,
884 },
885 );
886 let any = Box::new(op).into_any();
887 assert!(any.downcast::<MergeOperator>().is_ok());
888 }
889}