1use super::*;
2
3use super::query::literal_to_sql;
4
5fn resolve_expr_value(expr: &IRExpr, params: &ParamMap) -> Result<Literal> {
9 match expr {
10 IRExpr::Literal(lit) => Ok(lit.clone()),
11 IRExpr::Param(name) => params
12 .get(name)
13 .cloned()
14 .ok_or_else(|| OmniError::manifest(format!("parameter '{}' not provided", name))),
15 other => Err(OmniError::manifest(format!(
16 "unsupported expression in mutation: {:?}",
17 other
18 ))),
19 }
20}
21
22fn literal_to_typed_array(
24 lit: &Literal,
25 data_type: &DataType,
26 num_rows: usize,
27) -> Result<ArrayRef> {
28 Ok(match (lit, data_type) {
29 (Literal::Null, _) => arrow_array::new_null_array(data_type, num_rows),
30 (Literal::String(s), DataType::Utf8) => {
31 Arc::new(StringArray::from(vec![s.as_str(); num_rows])) as ArrayRef
32 }
33 (Literal::Integer(n), DataType::Int32) => {
34 Arc::new(Int32Array::from(vec![*n as i32; num_rows]))
35 }
36 (Literal::Integer(n), DataType::Int64) => Arc::new(Int64Array::from(vec![*n; num_rows])),
37 (Literal::Integer(n), DataType::UInt32) => {
38 Arc::new(UInt32Array::from(vec![*n as u32; num_rows]))
39 }
40 (Literal::Integer(n), DataType::UInt64) => {
41 Arc::new(UInt64Array::from(vec![*n as u64; num_rows]))
42 }
43 (Literal::Float(f), DataType::Float32) => {
44 Arc::new(Float32Array::from(vec![*f as f32; num_rows]))
45 }
46 (Literal::Float(f), DataType::Float64) => Arc::new(Float64Array::from(vec![*f; num_rows])),
47 (Literal::Bool(b), DataType::Boolean) => Arc::new(BooleanArray::from(vec![*b; num_rows])),
48 (Literal::Date(s), DataType::Date32) => {
49 let days = crate::loader::parse_date32_literal(s)?;
50 Arc::new(Date32Array::from(vec![days; num_rows]))
51 }
52 (Literal::DateTime(s), DataType::Date64) => Arc::new(Date64Array::from(vec![
53 crate::loader::parse_date64_literal(s)?;
54 num_rows
55 ])),
56 (Literal::List(items), DataType::List(field)) => {
57 typed_list_literal_to_array(items, field.data_type(), num_rows)?
58 }
59 (Literal::List(items), DataType::FixedSizeList(field, dim))
60 if field.data_type() == &DataType::Float32 =>
61 {
62 if items.len() != *dim as usize {
63 return Err(OmniError::manifest(format!(
64 "vector property expects {} dimensions, got {}",
65 dim,
66 items.len()
67 )));
68 }
69 let mut builder = FixedSizeListBuilder::with_capacity(
70 Float32Builder::with_capacity(num_rows * (*dim as usize)),
71 *dim,
72 num_rows,
73 )
74 .with_field(field.clone());
75 for _ in 0..num_rows {
76 for item in items {
77 match item {
78 Literal::Integer(value) => builder.values().append_value(*value as f32),
79 Literal::Float(value) => builder.values().append_value(*value as f32),
80 _ => {
81 return Err(OmniError::manifest(
82 "vector elements must be numeric".to_string(),
83 ));
84 }
85 }
86 }
87 builder.append(true);
88 }
89 Arc::new(builder.finish())
90 }
91 _ => {
92 return Err(OmniError::manifest(format!(
93 "cannot convert {:?} to {:?}",
94 lit, data_type
95 )));
96 }
97 })
98}
99
100fn typed_list_literal_to_array(
101 items: &[Literal],
102 item_type: &DataType,
103 num_rows: usize,
104) -> Result<ArrayRef> {
105 match item_type {
106 DataType::Utf8 => {
107 let mut builder = ListBuilder::new(StringBuilder::new());
108 for _ in 0..num_rows {
109 for item in items {
110 match item {
111 Literal::String(value) => builder.values().append_value(value),
112 _ => builder.values().append_null(),
113 }
114 }
115 builder.append(true);
116 }
117 Ok(Arc::new(builder.finish()))
118 }
119 DataType::Boolean => {
120 let mut builder = ListBuilder::new(BooleanBuilder::new());
121 for _ in 0..num_rows {
122 for item in items {
123 match item {
124 Literal::Bool(value) => builder.values().append_value(*value),
125 _ => builder.values().append_null(),
126 }
127 }
128 builder.append(true);
129 }
130 Ok(Arc::new(builder.finish()))
131 }
132 DataType::Int32 => {
133 let mut builder = ListBuilder::new(Int32Builder::new());
134 for _ in 0..num_rows {
135 for item in items {
136 match item {
137 Literal::Integer(value) => {
138 let value = i32::try_from(*value).map_err(|_| {
139 OmniError::manifest(format!(
140 "list value {} exceeds Int32 range",
141 value
142 ))
143 })?;
144 builder.values().append_value(value);
145 }
146 _ => builder.values().append_null(),
147 }
148 }
149 builder.append(true);
150 }
151 Ok(Arc::new(builder.finish()))
152 }
153 DataType::Int64 => {
154 let mut builder = ListBuilder::new(Int64Builder::new());
155 for _ in 0..num_rows {
156 for item in items {
157 match item {
158 Literal::Integer(value) => builder.values().append_value(*value),
159 _ => builder.values().append_null(),
160 }
161 }
162 builder.append(true);
163 }
164 Ok(Arc::new(builder.finish()))
165 }
166 DataType::UInt32 => {
167 let mut builder = ListBuilder::new(UInt32Builder::new());
168 for _ in 0..num_rows {
169 for item in items {
170 match item {
171 Literal::Integer(value) => {
172 let value = u32::try_from(*value).map_err(|_| {
173 OmniError::manifest(format!(
174 "list value {} exceeds UInt32 range",
175 value
176 ))
177 })?;
178 builder.values().append_value(value);
179 }
180 _ => builder.values().append_null(),
181 }
182 }
183 builder.append(true);
184 }
185 Ok(Arc::new(builder.finish()))
186 }
187 DataType::UInt64 => {
188 let mut builder = ListBuilder::new(UInt64Builder::new());
189 for _ in 0..num_rows {
190 for item in items {
191 match item {
192 Literal::Integer(value) => {
193 let value = u64::try_from(*value).map_err(|_| {
194 OmniError::manifest(format!(
195 "list value {} exceeds UInt64 range",
196 value
197 ))
198 })?;
199 builder.values().append_value(value);
200 }
201 _ => builder.values().append_null(),
202 }
203 }
204 builder.append(true);
205 }
206 Ok(Arc::new(builder.finish()))
207 }
208 DataType::Float32 => {
209 let mut builder = ListBuilder::new(Float32Builder::new());
210 for _ in 0..num_rows {
211 for item in items {
212 match item {
213 Literal::Integer(value) => builder.values().append_value(*value as f32),
214 Literal::Float(value) => builder.values().append_value(*value as f32),
215 _ => builder.values().append_null(),
216 }
217 }
218 builder.append(true);
219 }
220 Ok(Arc::new(builder.finish()))
221 }
222 DataType::Float64 => {
223 let mut builder = ListBuilder::new(Float64Builder::new());
224 for _ in 0..num_rows {
225 for item in items {
226 match item {
227 Literal::Integer(value) => builder.values().append_value(*value as f64),
228 Literal::Float(value) => builder.values().append_value(*value),
229 _ => builder.values().append_null(),
230 }
231 }
232 builder.append(true);
233 }
234 Ok(Arc::new(builder.finish()))
235 }
236 DataType::Date32 => {
237 let mut builder = ListBuilder::new(Date32Builder::new());
238 for _ in 0..num_rows {
239 for item in items {
240 match item {
241 Literal::Date(value) => builder
242 .values()
243 .append_value(crate::loader::parse_date32_literal(value)?),
244 _ => builder.values().append_null(),
245 }
246 }
247 builder.append(true);
248 }
249 Ok(Arc::new(builder.finish()))
250 }
251 DataType::Date64 => {
252 let mut builder = ListBuilder::new(Date64Builder::new());
253 for _ in 0..num_rows {
254 for item in items {
255 match item {
256 Literal::DateTime(value) => builder
257 .values()
258 .append_value(crate::loader::parse_date64_literal(value)?),
259 _ => builder.values().append_null(),
260 }
261 }
262 builder.append(true);
263 }
264 Ok(Arc::new(builder.finish()))
265 }
266 other => Err(OmniError::manifest(format!(
267 "cannot convert list literal to {:?}",
268 other
269 ))),
270 }
271}
272
273fn build_blob_array_from_value(value: &str) -> Result<ArrayRef> {
275 let mut builder = BlobArrayBuilder::new(1);
276 crate::loader::append_blob_value(&mut builder, value)?;
277 builder
278 .finish()
279 .map_err(|e| OmniError::Lance(e.to_string()))
280}
281
282fn build_null_blob_array() -> Result<ArrayRef> {
284 let mut builder = BlobArrayBuilder::new(1);
285 builder
286 .push_null()
287 .map_err(|e| OmniError::Lance(e.to_string()))?;
288 builder
289 .finish()
290 .map_err(|e| OmniError::Lance(e.to_string()))
291}
292
293fn build_insert_batch(
295 schema: &SchemaRef,
296 id: &str,
297 assignments: &HashMap<String, Literal>,
298 blob_properties: &HashSet<String>,
299) -> Result<RecordBatch> {
300 let mut columns: Vec<ArrayRef> = Vec::with_capacity(schema.fields().len());
301
302 for field in schema.fields() {
303 if field.name() == "id" {
304 columns.push(Arc::new(StringArray::from(vec![id])));
305 } else if blob_properties.contains(field.name()) {
306 if let Some(Literal::String(uri)) = assignments.get(field.name()) {
307 columns.push(build_blob_array_from_value(uri)?);
308 } else if field.is_nullable() {
309 columns.push(build_null_blob_array()?);
310 } else {
311 return Err(OmniError::manifest(format!(
312 "missing required blob property '{}'",
313 field.name()
314 )));
315 }
316 } else if field.name() == "src" {
317 let lit = assignments.get("from").ok_or_else(|| {
318 OmniError::manifest("missing required edge endpoint 'from'".to_string())
319 })?;
320 columns.push(literal_to_typed_array(lit, field.data_type(), 1)?);
321 } else if field.name() == "dst" {
322 let lit = assignments.get("to").ok_or_else(|| {
323 OmniError::manifest("missing required edge endpoint 'to'".to_string())
324 })?;
325 columns.push(literal_to_typed_array(lit, field.data_type(), 1)?);
326 } else if let Some(lit) = assignments.get(field.name()) {
327 columns.push(literal_to_typed_array(lit, field.data_type(), 1)?);
328 } else if field.is_nullable() {
329 columns.push(arrow_array::new_null_array(field.data_type(), 1));
330 } else {
331 return Err(OmniError::manifest(format!(
332 "missing required property '{}'",
333 field.name()
334 )));
335 }
336 }
337
338 RecordBatch::try_new(schema.clone(), columns).map_err(|e| OmniError::Lance(e.to_string()))
339}
340
341async fn validate_edge_insert_endpoints(
342 db: &Omnigraph,
343 staging: &MutationStaging,
344 branch: Option<&str>,
345 edge_name: &str,
346 assignments: &HashMap<String, Literal>,
347) -> Result<()> {
348 let catalog = db.catalog();
349 let edge_type = catalog
350 .edge_types
351 .get(edge_name)
352 .ok_or_else(|| OmniError::manifest(format!("unknown edge type '{}'", edge_name)))?;
353 let from = match assignments.get("from") {
354 Some(Literal::String(value)) => value.as_str(),
355 Some(other) => {
356 return Err(OmniError::manifest(format!(
357 "edge {} from endpoint must be a string id, got {}",
358 edge_name,
359 literal_to_sql(other)
360 )));
361 }
362 None => {
363 return Err(OmniError::manifest(format!(
364 "edge {} missing 'from' endpoint",
365 edge_name
366 )));
367 }
368 };
369 let to = match assignments.get("to") {
370 Some(Literal::String(value)) => value.as_str(),
371 Some(other) => {
372 return Err(OmniError::manifest(format!(
373 "edge {} to endpoint must be a string id, got {}",
374 edge_name,
375 literal_to_sql(other)
376 )));
377 }
378 None => {
379 return Err(OmniError::manifest(format!(
380 "edge {} missing 'to' endpoint",
381 edge_name
382 )));
383 }
384 };
385
386 ensure_node_id_exists(db, staging, branch, &edge_type.from_type, from, "src").await?;
387 ensure_node_id_exists(db, staging, branch, &edge_type.to_type, to, "dst").await?;
388 Ok(())
389}
390
391fn pending_batches_contain_id(batches: &[RecordBatch], id: &str) -> bool {
395 for batch in batches {
396 let Some(col) = batch.column_by_name("id") else {
397 continue;
398 };
399 let Some(arr) = col.as_any().downcast_ref::<StringArray>() else {
400 continue;
401 };
402 for i in 0..arr.len() {
403 if arr.is_valid(i) && arr.value(i) == id {
404 return true;
405 }
406 }
407 }
408 false
409}
410
411async fn ensure_node_id_exists(
412 db: &Omnigraph,
413 staging: &MutationStaging,
414 branch: Option<&str>,
415 node_type: &str,
416 id: &str,
417 label: &str,
418) -> Result<()> {
419 let table_key = format!("node:{}", node_type);
420
421 let pending = staging.pending_batches(&table_key);
425 if pending_batches_contain_id(pending, id) {
426 return Ok(());
427 }
428
429 let filter = format!("id = '{}'", id.replace('\'', "''"));
430 let snapshot = db.snapshot_for_branch(branch).await?;
431 let ds = db
432 .storage()
433 .open_snapshot_at_table(&snapshot, &table_key)
434 .await?;
435 let exists = db.storage().count_rows(&ds, Some(filter)).await? > 0;
436
437 if exists {
438 Ok(())
439 } else {
440 Err(OmniError::manifest(format!(
441 "{} '{}' not found in {}",
442 label, id, node_type
443 )))
444 }
445}
446
447fn predicate_to_sql(
449 predicate: &IRMutationPredicate,
450 params: &ParamMap,
451 is_edge: bool,
452) -> Result<String> {
453 let column = if is_edge {
454 match predicate.property.as_str() {
455 "from" => "src".to_string(),
456 "to" => "dst".to_string(),
457 other => other.to_string(),
458 }
459 } else {
460 predicate.property.clone()
461 };
462
463 let value = resolve_expr_value(&predicate.value, params)?;
464 let value_sql = literal_to_sql(&value);
465
466 let op = match predicate.op {
467 CompOp::Eq => "=",
468 CompOp::Ne => "!=",
469 CompOp::Gt => ">",
470 CompOp::Lt => "<",
471 CompOp::Ge => ">=",
472 CompOp::Le => "<=",
473 CompOp::Contains => {
474 return Err(OmniError::manifest(
475 "contains predicate not supported in mutations".to_string(),
476 ));
477 }
478 };
479
480 Ok(format!("{} {} {}", column, op, value_sql))
487}
488
489fn apply_assignments(
507 full_schema: &SchemaRef,
508 batch: &RecordBatch,
509 assignments: &HashMap<String, Literal>,
510 blob_properties: &HashSet<String>,
511) -> Result<RecordBatch> {
512 let mut columns: Vec<ArrayRef> = Vec::with_capacity(full_schema.fields().len());
513 let mut out_fields: Vec<Field> = Vec::with_capacity(full_schema.fields().len());
514
515 for field in full_schema.fields().iter() {
516 if blob_properties.contains(field.name()) {
517 if let Some(Literal::String(uri)) = assignments.get(field.name()) {
518 let mut builder = BlobArrayBuilder::new(batch.num_rows());
520 for _ in 0..batch.num_rows() {
521 crate::loader::append_blob_value(&mut builder, uri)?;
522 }
523 let blob_field = lance::blob::blob_field(field.name(), true);
524 out_fields.push(blob_field);
525 columns.push(
526 builder
527 .finish()
528 .map_err(|e| OmniError::Lance(e.to_string()))?,
529 );
530 } else if let Some(col) = batch.column_by_name(field.name()) {
531 let blob_field = lance::blob::blob_field(field.name(), field.is_nullable());
535 out_fields.push(blob_field);
536 columns.push(col.clone());
537 }
538 } else if let Some(lit) = assignments.get(field.name()) {
542 out_fields.push(field.as_ref().clone());
543 columns.push(literal_to_typed_array(
544 lit,
545 field.data_type(),
546 batch.num_rows(),
547 )?);
548 } else {
549 let col = batch.column_by_name(field.name()).ok_or_else(|| {
550 OmniError::Lance(format!(
551 "column '{}' not found in scan result",
552 field.name()
553 ))
554 })?;
555 out_fields.push(field.as_ref().clone());
556 columns.push(col.clone());
557 }
558 }
559
560 RecordBatch::try_new(Arc::new(Schema::new(out_fields)), columns)
561 .map_err(|e| OmniError::Lance(e.to_string()))
562}
563
564use super::staging::{MutationStaging, PendingMode};
567
568async fn open_table_for_mutation(
605 db: &Omnigraph,
606 staging: &mut MutationStaging,
607 branch: Option<&str>,
608 table_key: &str,
609 op_kind: crate::db::MutationOpKind,
610) -> Result<(SnapshotHandle, String, Option<String>)> {
611 if let Some(prior) = staging.inline_committed.get(table_key) {
612 let path = staging.paths.get(table_key).ok_or_else(|| {
613 OmniError::manifest_internal(format!(
614 "open_table_for_mutation: inline_committed[{}] without paths entry",
615 table_key
616 ))
617 })?;
618 let ds = db
619 .reopen_for_mutation(
620 table_key,
621 &path.full_path,
622 path.table_branch.as_deref(),
623 prior.table_version,
624 op_kind,
625 )
626 .await?;
627 return Ok((ds, path.full_path.clone(), path.table_branch.clone()));
628 }
629 let (ds, full_path, table_branch) = db
630 .open_for_mutation_on_branch(branch, table_key, op_kind)
631 .await?;
632 let expected_version = ds.version();
633 staging.ensure_path(
634 table_key,
635 full_path.clone(),
636 table_branch.clone(),
637 expected_version,
638 op_kind,
639 );
640 Ok((ds, full_path, table_branch))
641}
642
643fn enforce_no_mixed_destructive_constructive(
655 ir: &omnigraph_compiler::ir::MutationIR,
656) -> Result<()> {
657 let mut has_constructive = false;
658 let mut has_delete = false;
659 for op in &ir.ops {
660 match op {
661 MutationOpIR::Insert { .. } | MutationOpIR::Update { .. } => {
662 has_constructive = true;
663 }
664 MutationOpIR::Delete { .. } => {
665 has_delete = true;
666 }
667 }
668 }
669 if has_constructive && has_delete {
670 return Err(OmniError::manifest(format!(
671 "mutation '{}' on the same query mixes inserts/updates and deletes; \
672 split into separate mutations: (1) inserts and updates, then (2) deletes. \
673 This restriction lifts when Lance exposes a two-phase delete API \
674 (tracked: lance-format/lance#6658).",
675 ir.name
676 )));
677 }
678 Ok(())
679}
680
681impl Omnigraph {
682 pub async fn mutate(
683 &self,
684 branch: &str,
685 query_source: &str,
686 query_name: &str,
687 params: &ParamMap,
688 ) -> Result<MutationResult> {
689 self.mutate_as(branch, query_source, query_name, params, None)
690 .await
691 }
692
693 pub async fn mutate_as(
694 &self,
695 branch: &str,
696 query_source: &str,
697 query_name: &str,
698 params: &ParamMap,
699 actor_id: Option<&str>,
700 ) -> Result<MutationResult> {
701 self.enforce(
707 omnigraph_policy::PolicyAction::Change,
708 &omnigraph_policy::ResourceScope::Branch(branch.to_string()),
709 actor_id,
710 )?;
711 self.mutate_with_current_actor(branch, query_source, query_name, params, actor_id)
712 .await
713 }
714
715 async fn mutate_with_current_actor(
716 &self,
717 branch: &str,
718 query_source: &str,
719 query_name: &str,
720 params: &ParamMap,
721 actor_id: Option<&str>,
722 ) -> Result<MutationResult> {
723 self.ensure_schema_state_valid().await?;
724 self.heal_pending_recovery_sidecars().await?;
732 let requested = Self::normalize_branch_name(branch)?;
733 if let Some(name) = requested.as_deref() {
738 crate::db::ensure_public_branch_ref(name, "mutate")?;
739 }
740 let resolved_params = enrich_mutation_params(params)?;
741
742 let mut staging = MutationStaging::default();
749
750 let ir = self.lower_named_mutation(query_source, query_name)?;
754
755 let fork_queue_guards: Option<(
762 Vec<(String, Option<String>)>,
763 Vec<tokio::sync::OwnedMutexGuard<()>>,
764 )> = if let Some(active) = requested.as_deref() {
765 let snapshot = self.snapshot_for_branch(Some(active)).await?;
766 let touched: Vec<(String, Option<String>)> = self
767 .touched_table_keys(&ir)
768 .into_iter()
769 .map(|k| (k, Some(active.to_string())))
770 .collect();
771 let needs_fork = touched.iter().any(|(table_key, _)| {
772 snapshot
773 .entry(table_key)
774 .map(|e| e.table_branch.as_deref() != Some(active))
775 .unwrap_or(false)
776 });
777 if needs_fork {
778 let guards = self.write_queue().acquire_many(&touched).await;
779 Some((touched, guards))
780 } else {
781 None
782 }
783 } else {
784 None
785 };
786
787 let exec_result = self
788 .execute_named_mutation(&ir, &resolved_params, requested.as_deref(), &mut staging)
789 .await;
790
791 match exec_result {
792 Err(e) => Err(e),
793 Ok(total) if staging.is_empty() => Ok(total),
794 Ok(total) => {
795 let staged = staging.stage_all(self, requested.as_deref()).await?;
796 let (updates, expected_versions, sidecar_handle, _queue_guards) = staged
803 .commit_all(
804 self,
805 requested.as_deref(),
806 crate::db::manifest::SidecarKind::Mutation,
807 actor_id,
808 fork_queue_guards,
809 )
810 .await?;
811 crate::failpoints::maybe_fail("mutation.post_finalize_pre_publisher")?;
822 self.commit_updates_on_branch_with_expected(
823 requested.as_deref(),
824 &updates,
825 &expected_versions,
826 actor_id,
827 )
828 .await?;
829 if let Some(handle) = sidecar_handle {
835 if let Err(err) =
844 crate::db::manifest::delete_sidecar(&handle, self.storage_adapter()).await
845 {
846 tracing::warn!(
847 error = %err,
848 operation_id = handle.operation_id.as_str(),
849 "recovery sidecar cleanup failed; the next open's recovery sweep will resolve it"
850 );
851 }
852 }
853 Ok(total)
854 }
855 }
856 }
857
858 fn lower_named_mutation(
867 &self,
868 query_source: &str,
869 query_name: &str,
870 ) -> Result<omnigraph_compiler::ir::MutationIR> {
871 let query_decl = omnigraph_compiler::find_named_query(query_source, query_name)
872 .map_err(|e| OmniError::manifest(e.to_string()))?;
873
874 let checked = typecheck_query_decl(&self.catalog(), &query_decl)?;
875 match checked {
876 CheckedQuery::Mutation(_) => {}
877 CheckedQuery::Read(_) => {
878 return Err(OmniError::manifest(
879 "mutation execution called on a read query; use query instead".to_string(),
880 ));
881 }
882 }
883
884 let ir = lower_mutation_query(&query_decl)?;
885 enforce_no_mixed_destructive_constructive(&ir)?;
887 Ok(ir)
888 }
889
890 fn touched_table_keys(&self, ir: &omnigraph_compiler::ir::MutationIR) -> Vec<String> {
905 use omnigraph_compiler::ir::MutationOpIR;
906 let catalog = self.catalog();
907 let mut keys: Vec<String> = Vec::new();
908 for op in &ir.ops {
909 let type_name = match op {
910 MutationOpIR::Insert { type_name, .. }
911 | MutationOpIR::Update { type_name, .. }
912 | MutationOpIR::Delete { type_name, .. } => type_name,
913 };
914 if catalog.node_types.contains_key(type_name) {
915 keys.push(format!("node:{type_name}"));
916 if matches!(op, MutationOpIR::Delete { .. }) {
920 for (edge_name, edge_type) in &catalog.edge_types {
921 if edge_type.from_type == *type_name || edge_type.to_type == *type_name {
922 keys.push(format!("edge:{edge_name}"));
923 }
924 }
925 }
926 } else if catalog.edge_types.contains_key(type_name) {
927 keys.push(format!("edge:{type_name}"));
928 }
929 }
930 keys.sort();
931 keys.dedup();
932 keys
933 }
934
935 async fn execute_named_mutation(
936 &self,
937 ir: &omnigraph_compiler::ir::MutationIR,
938 params: &ParamMap,
939 branch: Option<&str>,
940 staging: &mut MutationStaging,
941 ) -> Result<MutationResult> {
942 let mut total = MutationResult::default();
943 for op in &ir.ops {
944 let result = match op {
945 MutationOpIR::Insert {
946 type_name,
947 assignments,
948 } => {
949 self.execute_insert(type_name, assignments, params, branch, staging)
950 .await?
951 }
952 MutationOpIR::Update {
953 type_name,
954 assignments,
955 predicate,
956 } => {
957 self.execute_update(type_name, assignments, predicate, params, branch, staging)
958 .await?
959 }
960 MutationOpIR::Delete {
961 type_name,
962 predicate,
963 } => {
964 self.execute_delete(type_name, predicate, params, branch, staging)
965 .await?
966 }
967 };
968 total.affected_nodes += result.affected_nodes;
969 total.affected_edges += result.affected_edges;
970 }
971 Ok(total)
972 }
973
974 async fn execute_insert(
975 &self,
976 type_name: &str,
977 assignments: &[IRAssignment],
978 params: &ParamMap,
979 branch: Option<&str>,
980 staging: &mut MutationStaging,
981 ) -> Result<MutationResult> {
982 let mut resolved: HashMap<String, Literal> = HashMap::new();
983 for a in assignments {
984 resolved.insert(a.property.clone(), resolve_expr_value(&a.value, params)?);
985 }
986
987 let is_node = self.catalog().node_types.contains_key(type_name);
988 let is_edge = self.catalog().edge_types.contains_key(type_name);
989
990 if is_node {
991 let node_type = &self.catalog().node_types[type_name];
992 let schema = node_type.arrow_schema.clone();
993 let blob_props = node_type.blob_properties.clone();
994 let id = if let Some(key_prop) = node_type.key_property() {
995 match resolved.get(key_prop) {
996 Some(Literal::String(s)) => s.clone(),
997 Some(other) => literal_to_sql(other).trim_matches('\'').to_string(),
998 None => {
999 return Err(OmniError::manifest(format!(
1000 "insert missing @key property '{}'",
1001 key_prop
1002 )));
1003 }
1004 }
1005 } else {
1006 ulid::Ulid::new().to_string()
1007 };
1008
1009 let batch = build_insert_batch(&schema, &id, &resolved, &blob_props)?;
1010 crate::loader::validate_value_constraints(&batch, node_type)?;
1011 crate::loader::validate_enum_constraints(&batch, &node_type.properties, type_name)?;
1012 let unique_groups = crate::loader::unique_constraint_groups_for_node(node_type);
1013 if !unique_groups.is_empty() {
1014 crate::loader::enforce_unique_constraints_intra_batch(
1015 &batch,
1016 type_name,
1017 &unique_groups,
1018 )?;
1019 }
1020 let has_key = node_type.key_property().is_some();
1021 let table_key = format!("node:{}", type_name);
1022 let insert_kind = if has_key {
1024 crate::db::MutationOpKind::Merge
1025 } else {
1026 crate::db::MutationOpKind::Insert
1027 };
1028 let (_ds, _full_path, _table_branch) =
1029 open_table_for_mutation(self, staging, branch, &table_key, insert_kind).await?;
1030 let mode = if has_key {
1034 PendingMode::Merge
1035 } else {
1036 PendingMode::Append
1037 };
1038 staging.append_batch(&table_key, schema, mode, batch)?;
1039
1040 Ok(MutationResult {
1041 affected_nodes: 1,
1042 affected_edges: 0,
1043 })
1044 } else if is_edge {
1045 let edge_type = &self.catalog().edge_types[type_name];
1046 let schema = edge_type.arrow_schema.clone();
1047 let blob_props = edge_type.blob_properties.clone();
1048 let id = ulid::Ulid::new().to_string();
1049
1050 let batch = build_insert_batch(&schema, &id, &resolved, &blob_props)?;
1051 validate_edge_insert_endpoints(self, staging, branch, type_name, &resolved).await?;
1052 crate::loader::validate_enum_constraints(&batch, &edge_type.properties, type_name)?;
1053 let unique_groups = crate::loader::unique_constraint_groups_for_edge(edge_type);
1054 if !unique_groups.is_empty() {
1055 crate::loader::enforce_unique_constraints_intra_batch(
1056 &batch,
1057 type_name,
1058 &unique_groups,
1059 )?;
1060 }
1061 let table_key = format!("edge:{}", type_name);
1062 let (ds, _full_path, _table_branch) = open_table_for_mutation(
1064 self,
1065 staging,
1066 branch,
1067 &table_key,
1068 crate::db::MutationOpKind::Insert,
1069 )
1070 .await?;
1071 staging.append_batch(&table_key, schema, PendingMode::Append, batch.clone())?;
1074
1075 validate_edge_cardinality_with_pending(self, &ds, staging, &table_key, edge_type)
1080 .await?;
1081
1082 self.invalidate_graph_index().await;
1083
1084 Ok(MutationResult {
1085 affected_nodes: 0,
1086 affected_edges: 1,
1087 })
1088 } else {
1089 Err(OmniError::manifest(format!("unknown type '{}'", type_name)))
1090 }
1091 }
1092
1093 async fn execute_update(
1094 &self,
1095 type_name: &str,
1096 assignments: &[IRAssignment],
1097 predicate: &IRMutationPredicate,
1098 params: &ParamMap,
1099 branch: Option<&str>,
1100 staging: &mut MutationStaging,
1101 ) -> Result<MutationResult> {
1102 if !self.catalog().node_types.contains_key(type_name) {
1104 return Err(OmniError::manifest(format!(
1105 "update is only supported for node types, not '{}'",
1106 type_name
1107 )));
1108 }
1109
1110 if let Some(key_prop) = self.catalog().node_types[type_name].key_property() {
1112 if assignments.iter().any(|a| a.property == key_prop) {
1113 return Err(OmniError::manifest(format!(
1114 "cannot update @key property '{}' — delete and re-insert instead",
1115 key_prop
1116 )));
1117 }
1118 }
1119
1120 let pred_sql = predicate_to_sql(predicate, params, false)?;
1121 let schema = self.catalog().node_types[type_name].arrow_schema.clone();
1122 let blob_props = self.catalog().node_types[type_name].blob_properties.clone();
1123
1124 let table_key = format!("node:{}", type_name);
1125 let (ds, _full_path, _table_branch) = open_table_for_mutation(
1126 self,
1127 staging,
1128 branch,
1129 &table_key,
1130 crate::db::MutationOpKind::Update,
1131 )
1132 .await?;
1133
1134 let non_blob_cols: Vec<&str> = schema
1147 .fields()
1148 .iter()
1149 .filter(|f| !blob_props.contains(f.name()))
1150 .map(|f| f.name().as_str())
1151 .collect();
1152 let projection: Option<&[&str]> =
1153 (!blob_props.is_empty()).then_some(non_blob_cols.as_slice());
1154 let pending_batches = staging.pending_batches(&table_key);
1155 let pending_schema = staging.pending_schema(&table_key);
1156 let batches = self
1163 .storage()
1164 .scan_with_pending(
1165 &ds,
1166 pending_batches,
1167 pending_schema,
1168 projection,
1169 Some(&pred_sql),
1170 Some("id"),
1171 )
1172 .await?;
1173
1174 if batches.is_empty() || batches.iter().all(|b| b.num_rows() == 0) {
1175 return Ok(MutationResult {
1176 affected_nodes: 0,
1177 affected_edges: 0,
1178 });
1179 }
1180
1181 let matched = concat_match_batches_to_schema(&schema, &blob_props, batches)?;
1190
1191 let affected_count = matched.num_rows();
1192
1193 let mut resolved: HashMap<String, Literal> = HashMap::new();
1194 for a in assignments {
1195 resolved.insert(a.property.clone(), resolve_expr_value(&a.value, params)?);
1196 }
1197 let updated = apply_assignments(&schema, &matched, &resolved, &blob_props)?;
1198 let node_type = &self.catalog().node_types[type_name];
1199 crate::loader::validate_value_constraints(&updated, node_type)?;
1200 crate::loader::validate_enum_constraints(&updated, &node_type.properties, type_name)?;
1201 let unique_groups = crate::loader::unique_constraint_groups_for_node(node_type);
1202 if !unique_groups.is_empty() {
1203 crate::loader::enforce_unique_constraints_intra_batch(
1204 &updated,
1205 type_name,
1206 &unique_groups,
1207 )?;
1208 }
1209
1210 let updated_schema = updated.schema();
1216 staging.append_batch(&table_key, updated_schema, PendingMode::Merge, updated)?;
1217
1218 Ok(MutationResult {
1219 affected_nodes: affected_count,
1220 affected_edges: 0,
1221 })
1222 }
1223
1224 async fn execute_delete(
1225 &self,
1226 type_name: &str,
1227 predicate: &IRMutationPredicate,
1228 params: &ParamMap,
1229 branch: Option<&str>,
1230 staging: &mut MutationStaging,
1231 ) -> Result<MutationResult> {
1232 let is_node = self.catalog().node_types.contains_key(type_name);
1233 if is_node {
1234 self.execute_delete_node(type_name, predicate, params, branch, staging)
1235 .await
1236 } else {
1237 self.execute_delete_edge(type_name, predicate, params, branch, staging)
1238 .await
1239 }
1240 }
1241
1242 async fn execute_delete_node(
1243 &self,
1244 type_name: &str,
1245 predicate: &IRMutationPredicate,
1246 params: &ParamMap,
1247 branch: Option<&str>,
1248 staging: &mut MutationStaging,
1249 ) -> Result<MutationResult> {
1250 let pred_sql = predicate_to_sql(predicate, params, false)?;
1251
1252 let table_key = format!("node:{}", type_name);
1253 let (ds, full_path, table_branch) = open_table_for_mutation(
1254 self,
1255 staging,
1256 branch,
1257 &table_key,
1258 crate::db::MutationOpKind::Delete,
1259 )
1260 .await?;
1261 let initial_version = ds.version();
1262
1263 let batches = self
1267 .storage()
1268 .scan(&ds, Some(&["id"]), Some(&pred_sql), None)
1269 .await?;
1270
1271 let deleted_ids: Vec<String> = batches
1272 .iter()
1273 .flat_map(|batch| {
1274 let ids = batch
1275 .column(0)
1276 .as_any()
1277 .downcast_ref::<StringArray>()
1278 .unwrap();
1279 (0..ids.len())
1280 .map(|i| ids.value(i).to_string())
1281 .collect::<Vec<_>>()
1282 })
1283 .collect();
1284
1285 if deleted_ids.is_empty() {
1286 return Ok(MutationResult {
1287 affected_nodes: 0,
1288 affected_edges: 0,
1289 });
1290 }
1291
1292 let affected_nodes = deleted_ids.len();
1293
1294 let ds = self
1300 .reopen_for_mutation(
1301 &table_key,
1302 &full_path,
1303 table_branch.as_deref(),
1304 initial_version,
1305 crate::db::MutationOpKind::Delete,
1306 )
1307 .await?;
1308 crate::failpoints::maybe_fail("mutation.delete_node_pre_primary_delete")?;
1309 let (_new_ds, delete_state) = self
1310 .storage_inline_residual()
1311 .delete_where(&full_path, ds, &pred_sql)
1312 .await?;
1313
1314 staging.record_inline(crate::db::SubTableUpdate {
1315 table_key: table_key.clone(),
1316 table_version: delete_state.version,
1317 table_branch: table_branch.clone(),
1318 row_count: delete_state.row_count,
1319 version_metadata: delete_state.version_metadata,
1320 });
1321
1322 let mut affected_edges = 0usize;
1323 let escaped: Vec<String> = deleted_ids
1324 .iter()
1325 .map(|id| format!("'{}'", id.replace('\'', "''")))
1326 .collect();
1327 let id_list = escaped.join(", ");
1328
1329 let edge_info: Vec<(String, String, String)> = self
1330 .catalog()
1331 .edge_types
1332 .iter()
1333 .map(|(name, et)| (name.clone(), et.from_type.clone(), et.to_type.clone()))
1334 .collect();
1335
1336 for (edge_name, from_type, to_type) in &edge_info {
1337 let mut cascade_filters = Vec::new();
1338 if from_type == type_name {
1339 cascade_filters.push(format!("src IN ({})", id_list));
1340 }
1341 if to_type == type_name {
1342 cascade_filters.push(format!("dst IN ({})", id_list));
1343 }
1344 if cascade_filters.is_empty() {
1345 continue;
1346 }
1347
1348 let edge_table_key = format!("edge:{}", edge_name);
1349 let cascade_filter = cascade_filters.join(" OR ");
1350 let (edge_ds, edge_full_path, edge_table_branch) = open_table_for_mutation(
1351 self,
1352 staging,
1353 branch,
1354 &edge_table_key,
1355 crate::db::MutationOpKind::Delete,
1356 )
1357 .await?;
1358
1359 let (_new_edge_ds, edge_delete) = self
1360 .storage_inline_residual()
1361 .delete_where(&edge_full_path, edge_ds, &cascade_filter)
1362 .await?;
1363
1364 affected_edges += edge_delete.deleted_rows;
1365
1366 if edge_delete.deleted_rows > 0 {
1367 staging.record_inline(crate::db::SubTableUpdate {
1368 table_key: edge_table_key,
1369 table_version: edge_delete.version,
1370 table_branch: edge_table_branch,
1371 row_count: edge_delete.row_count,
1372 version_metadata: edge_delete.version_metadata,
1373 });
1374 }
1375 }
1376
1377 if affected_edges > 0 {
1378 self.invalidate_graph_index().await;
1379 }
1380
1381 Ok(MutationResult {
1382 affected_nodes,
1383 affected_edges,
1384 })
1385 }
1386
1387 async fn execute_delete_edge(
1388 &self,
1389 type_name: &str,
1390 predicate: &IRMutationPredicate,
1391 params: &ParamMap,
1392 branch: Option<&str>,
1393 staging: &mut MutationStaging,
1394 ) -> Result<MutationResult> {
1395 let pred_sql = predicate_to_sql(predicate, params, true)?;
1396
1397 let table_key = format!("edge:{}", type_name);
1398 let (ds, full_path, table_branch) = open_table_for_mutation(
1399 self,
1400 staging,
1401 branch,
1402 &table_key,
1403 crate::db::MutationOpKind::Delete,
1404 )
1405 .await?;
1406
1407 let (_new_ds, delete_state) = self
1408 .storage_inline_residual()
1409 .delete_where(&full_path, ds, &pred_sql)
1410 .await?;
1411 let affected = delete_state.deleted_rows;
1412
1413 if affected > 0 {
1414 staging.record_inline(crate::db::SubTableUpdate {
1415 table_key,
1416 table_version: delete_state.version,
1417 table_branch,
1418 row_count: delete_state.row_count,
1419 version_metadata: delete_state.version_metadata,
1420 });
1421 self.invalidate_graph_index().await;
1422 }
1423
1424 Ok(MutationResult {
1425 affected_nodes: 0,
1426 affected_edges: affected,
1427 })
1428 }
1429}
1430
1431fn concat_match_batches_to_schema(
1437 _schema: &SchemaRef,
1438 _blob_properties: &HashSet<String>,
1439 batches: Vec<RecordBatch>,
1440) -> Result<RecordBatch> {
1441 if batches.len() == 1 {
1442 return Ok(batches.into_iter().next().unwrap());
1443 }
1444 let common = batches[0].schema();
1445 arrow_select::concat::concat_batches(&common, &batches).map_err(|e| {
1446 OmniError::Lance(format!(
1447 "scan_with_pending returned batches with mismatched schemas \
1448 across the committed/pending boundary; this typically indicates \
1449 a blob-column shape mismatch between the committed table and a \
1450 prior in-query insert/update. Split blob-touching mutations \
1451 into separate queries. ({})",
1452 e
1453 ))
1454 })
1455}
1456
1457async fn validate_edge_cardinality_with_pending(
1462 db: &Omnigraph,
1463 committed_ds: &SnapshotHandle,
1464 staging: &MutationStaging,
1465 table_key: &str,
1466 edge_type: &omnigraph_compiler::catalog::EdgeType,
1467) -> Result<()> {
1468 if edge_type.cardinality.is_default() {
1469 return Ok(());
1470 }
1471 let counts =
1472 super::staging::count_src_per_edge(db, committed_ds, table_key, staging, None).await?;
1473 super::staging::enforce_cardinality_bounds(edge_type, &counts)
1474}
1475
1476fn enrich_mutation_params(params: &ParamMap) -> Result<ParamMap> {
1477 let mut resolved = params.clone();
1478 if !resolved.contains_key(NOW_PARAM_NAME) {
1479 let now = OffsetDateTime::now_utc()
1480 .format(&Rfc3339)
1481 .map_err(|e| OmniError::manifest(format!("failed to format now(): {}", e)))?;
1482 resolved.insert(NOW_PARAM_NAME.to_string(), Literal::DateTime(now));
1483 }
1484 Ok(resolved)
1485}
1486
1487#[cfg(test)]
1488mod predicate_sql_tests {
1489 use super::*;
1490
1491 #[test]
1499 fn predicate_to_sql_preserves_camelcase_column_unquoted() {
1500 let predicate = IRMutationPredicate {
1501 property: "repoName".to_string(),
1502 op: CompOp::Eq,
1503 value: IRExpr::Literal(Literal::String("acme".into())),
1504 };
1505 let sql = predicate_to_sql(&predicate, &ParamMap::new(), false).unwrap();
1506 assert_eq!(
1507 sql, "repoName = 'acme'",
1508 "column must be unquoted and case-preserved, got {sql}"
1509 );
1510 }
1511}