Skip to main content

omnigraph/exec/
mutation.rs

1use super::*;
2
3use super::query::literal_to_sql;
4
5// ─── Mutation helpers ────────────────────────────────────────────────────────
6
7/// Resolve an IRExpr to a concrete Literal value at runtime.
8fn resolve_expr_value(expr: &IRExpr, params: &ParamMap) -> Result<Literal> {
9    match expr {
10        IRExpr::Literal(lit) => Ok(lit.clone()),
11        IRExpr::Param(name) => params
12            .get(name)
13            .cloned()
14            .ok_or_else(|| OmniError::manifest(format!("parameter '{}' not provided", name))),
15        other => Err(OmniError::manifest(format!(
16            "unsupported expression in mutation: {:?}",
17            other
18        ))),
19    }
20}
21
22/// Create a single-element or N-element array from a Literal, matching the target DataType.
23fn literal_to_typed_array(
24    lit: &Literal,
25    data_type: &DataType,
26    num_rows: usize,
27) -> Result<ArrayRef> {
28    Ok(match (lit, data_type) {
29        (Literal::Null, _) => arrow_array::new_null_array(data_type, num_rows),
30        (Literal::String(s), DataType::Utf8) => {
31            Arc::new(StringArray::from(vec![s.as_str(); num_rows])) as ArrayRef
32        }
33        (Literal::Integer(n), DataType::Int32) => {
34            Arc::new(Int32Array::from(vec![*n as i32; num_rows]))
35        }
36        (Literal::Integer(n), DataType::Int64) => Arc::new(Int64Array::from(vec![*n; num_rows])),
37        (Literal::Integer(n), DataType::UInt32) => {
38            Arc::new(UInt32Array::from(vec![*n as u32; num_rows]))
39        }
40        (Literal::Integer(n), DataType::UInt64) => {
41            Arc::new(UInt64Array::from(vec![*n as u64; num_rows]))
42        }
43        (Literal::Float(f), DataType::Float32) => {
44            Arc::new(Float32Array::from(vec![*f as f32; num_rows]))
45        }
46        (Literal::Float(f), DataType::Float64) => Arc::new(Float64Array::from(vec![*f; num_rows])),
47        (Literal::Bool(b), DataType::Boolean) => Arc::new(BooleanArray::from(vec![*b; num_rows])),
48        (Literal::Date(s), DataType::Date32) => {
49            let days = crate::loader::parse_date32_literal(s)?;
50            Arc::new(Date32Array::from(vec![days; num_rows]))
51        }
52        (Literal::DateTime(s), DataType::Date64) => Arc::new(Date64Array::from(vec![
53            crate::loader::parse_date64_literal(s)?;
54            num_rows
55        ])),
56        (Literal::List(items), DataType::List(field)) => {
57            typed_list_literal_to_array(items, field.data_type(), num_rows)?
58        }
59        (Literal::List(items), DataType::FixedSizeList(field, dim))
60            if field.data_type() == &DataType::Float32 =>
61        {
62            if items.len() != *dim as usize {
63                return Err(OmniError::manifest(format!(
64                    "vector property expects {} dimensions, got {}",
65                    dim,
66                    items.len()
67                )));
68            }
69            let mut builder = FixedSizeListBuilder::with_capacity(
70                Float32Builder::with_capacity(num_rows * (*dim as usize)),
71                *dim,
72                num_rows,
73            )
74            .with_field(field.clone());
75            for _ in 0..num_rows {
76                for item in items {
77                    match item {
78                        Literal::Integer(value) => builder.values().append_value(*value as f32),
79                        Literal::Float(value) => builder.values().append_value(*value as f32),
80                        _ => {
81                            return Err(OmniError::manifest(
82                                "vector elements must be numeric".to_string(),
83                            ));
84                        }
85                    }
86                }
87                builder.append(true);
88            }
89            Arc::new(builder.finish())
90        }
91        _ => {
92            return Err(OmniError::manifest(format!(
93                "cannot convert {:?} to {:?}",
94                lit, data_type
95            )));
96        }
97    })
98}
99
100fn typed_list_literal_to_array(
101    items: &[Literal],
102    item_type: &DataType,
103    num_rows: usize,
104) -> Result<ArrayRef> {
105    match item_type {
106        DataType::Utf8 => {
107            let mut builder = ListBuilder::new(StringBuilder::new());
108            for _ in 0..num_rows {
109                for item in items {
110                    match item {
111                        Literal::String(value) => builder.values().append_value(value),
112                        _ => builder.values().append_null(),
113                    }
114                }
115                builder.append(true);
116            }
117            Ok(Arc::new(builder.finish()))
118        }
119        DataType::Boolean => {
120            let mut builder = ListBuilder::new(BooleanBuilder::new());
121            for _ in 0..num_rows {
122                for item in items {
123                    match item {
124                        Literal::Bool(value) => builder.values().append_value(*value),
125                        _ => builder.values().append_null(),
126                    }
127                }
128                builder.append(true);
129            }
130            Ok(Arc::new(builder.finish()))
131        }
132        DataType::Int32 => {
133            let mut builder = ListBuilder::new(Int32Builder::new());
134            for _ in 0..num_rows {
135                for item in items {
136                    match item {
137                        Literal::Integer(value) => {
138                            let value = i32::try_from(*value).map_err(|_| {
139                                OmniError::manifest(format!(
140                                    "list value {} exceeds Int32 range",
141                                    value
142                                ))
143                            })?;
144                            builder.values().append_value(value);
145                        }
146                        _ => builder.values().append_null(),
147                    }
148                }
149                builder.append(true);
150            }
151            Ok(Arc::new(builder.finish()))
152        }
153        DataType::Int64 => {
154            let mut builder = ListBuilder::new(Int64Builder::new());
155            for _ in 0..num_rows {
156                for item in items {
157                    match item {
158                        Literal::Integer(value) => builder.values().append_value(*value),
159                        _ => builder.values().append_null(),
160                    }
161                }
162                builder.append(true);
163            }
164            Ok(Arc::new(builder.finish()))
165        }
166        DataType::UInt32 => {
167            let mut builder = ListBuilder::new(UInt32Builder::new());
168            for _ in 0..num_rows {
169                for item in items {
170                    match item {
171                        Literal::Integer(value) => {
172                            let value = u32::try_from(*value).map_err(|_| {
173                                OmniError::manifest(format!(
174                                    "list value {} exceeds UInt32 range",
175                                    value
176                                ))
177                            })?;
178                            builder.values().append_value(value);
179                        }
180                        _ => builder.values().append_null(),
181                    }
182                }
183                builder.append(true);
184            }
185            Ok(Arc::new(builder.finish()))
186        }
187        DataType::UInt64 => {
188            let mut builder = ListBuilder::new(UInt64Builder::new());
189            for _ in 0..num_rows {
190                for item in items {
191                    match item {
192                        Literal::Integer(value) => {
193                            let value = u64::try_from(*value).map_err(|_| {
194                                OmniError::manifest(format!(
195                                    "list value {} exceeds UInt64 range",
196                                    value
197                                ))
198                            })?;
199                            builder.values().append_value(value);
200                        }
201                        _ => builder.values().append_null(),
202                    }
203                }
204                builder.append(true);
205            }
206            Ok(Arc::new(builder.finish()))
207        }
208        DataType::Float32 => {
209            let mut builder = ListBuilder::new(Float32Builder::new());
210            for _ in 0..num_rows {
211                for item in items {
212                    match item {
213                        Literal::Integer(value) => builder.values().append_value(*value as f32),
214                        Literal::Float(value) => builder.values().append_value(*value as f32),
215                        _ => builder.values().append_null(),
216                    }
217                }
218                builder.append(true);
219            }
220            Ok(Arc::new(builder.finish()))
221        }
222        DataType::Float64 => {
223            let mut builder = ListBuilder::new(Float64Builder::new());
224            for _ in 0..num_rows {
225                for item in items {
226                    match item {
227                        Literal::Integer(value) => builder.values().append_value(*value as f64),
228                        Literal::Float(value) => builder.values().append_value(*value),
229                        _ => builder.values().append_null(),
230                    }
231                }
232                builder.append(true);
233            }
234            Ok(Arc::new(builder.finish()))
235        }
236        DataType::Date32 => {
237            let mut builder = ListBuilder::new(Date32Builder::new());
238            for _ in 0..num_rows {
239                for item in items {
240                    match item {
241                        Literal::Date(value) => builder
242                            .values()
243                            .append_value(crate::loader::parse_date32_literal(value)?),
244                        _ => builder.values().append_null(),
245                    }
246                }
247                builder.append(true);
248            }
249            Ok(Arc::new(builder.finish()))
250        }
251        DataType::Date64 => {
252            let mut builder = ListBuilder::new(Date64Builder::new());
253            for _ in 0..num_rows {
254                for item in items {
255                    match item {
256                        Literal::DateTime(value) => builder
257                            .values()
258                            .append_value(crate::loader::parse_date64_literal(value)?),
259                        _ => builder.values().append_null(),
260                    }
261                }
262                builder.append(true);
263            }
264            Ok(Arc::new(builder.finish()))
265        }
266        other => Err(OmniError::manifest(format!(
267            "cannot convert list literal to {:?}",
268            other
269        ))),
270    }
271}
272
273/// Build a single-element blob array from a URI or base64 value string.
274fn build_blob_array_from_value(value: &str) -> Result<ArrayRef> {
275    let mut builder = BlobArrayBuilder::new(1);
276    crate::loader::append_blob_value(&mut builder, value)?;
277    builder
278        .finish()
279        .map_err(|e| OmniError::Lance(e.to_string()))
280}
281
282/// Build a null blob array with one element.
283fn build_null_blob_array() -> Result<ArrayRef> {
284    let mut builder = BlobArrayBuilder::new(1);
285    builder
286        .push_null()
287        .map_err(|e| OmniError::Lance(e.to_string()))?;
288    builder
289        .finish()
290        .map_err(|e| OmniError::Lance(e.to_string()))
291}
292
293/// Build a single-row RecordBatch from resolved assignments.
294fn build_insert_batch(
295    schema: &SchemaRef,
296    id: &str,
297    assignments: &HashMap<String, Literal>,
298    blob_properties: &HashSet<String>,
299) -> Result<RecordBatch> {
300    let mut columns: Vec<ArrayRef> = Vec::with_capacity(schema.fields().len());
301
302    for field in schema.fields() {
303        if field.name() == "id" {
304            columns.push(Arc::new(StringArray::from(vec![id])));
305        } else if blob_properties.contains(field.name()) {
306            if let Some(Literal::String(uri)) = assignments.get(field.name()) {
307                columns.push(build_blob_array_from_value(uri)?);
308            } else if field.is_nullable() {
309                columns.push(build_null_blob_array()?);
310            } else {
311                return Err(OmniError::manifest(format!(
312                    "missing required blob property '{}'",
313                    field.name()
314                )));
315            }
316        } else if field.name() == "src" {
317            let lit = assignments.get("from").ok_or_else(|| {
318                OmniError::manifest("missing required edge endpoint 'from'".to_string())
319            })?;
320            columns.push(literal_to_typed_array(lit, field.data_type(), 1)?);
321        } else if field.name() == "dst" {
322            let lit = assignments.get("to").ok_or_else(|| {
323                OmniError::manifest("missing required edge endpoint 'to'".to_string())
324            })?;
325            columns.push(literal_to_typed_array(lit, field.data_type(), 1)?);
326        } else if let Some(lit) = assignments.get(field.name()) {
327            columns.push(literal_to_typed_array(lit, field.data_type(), 1)?);
328        } else if field.is_nullable() {
329            columns.push(arrow_array::new_null_array(field.data_type(), 1));
330        } else {
331            return Err(OmniError::manifest(format!(
332                "missing required property '{}'",
333                field.name()
334            )));
335        }
336    }
337
338    RecordBatch::try_new(schema.clone(), columns).map_err(|e| OmniError::Lance(e.to_string()))
339}
340
341async fn validate_edge_insert_endpoints(
342    db: &Omnigraph,
343    staging: &MutationStaging,
344    branch: Option<&str>,
345    edge_name: &str,
346    assignments: &HashMap<String, Literal>,
347) -> Result<()> {
348    let edge_type = db
349        .catalog()
350        .edge_types
351        .get(edge_name)
352        .ok_or_else(|| OmniError::manifest(format!("unknown edge type '{}'", edge_name)))?;
353    let from = match assignments.get("from") {
354        Some(Literal::String(value)) => value.as_str(),
355        Some(other) => {
356            return Err(OmniError::manifest(format!(
357                "edge {} from endpoint must be a string id, got {}",
358                edge_name,
359                literal_to_sql(other)
360            )));
361        }
362        None => {
363            return Err(OmniError::manifest(format!(
364                "edge {} missing 'from' endpoint",
365                edge_name
366            )));
367        }
368    };
369    let to = match assignments.get("to") {
370        Some(Literal::String(value)) => value.as_str(),
371        Some(other) => {
372            return Err(OmniError::manifest(format!(
373                "edge {} to endpoint must be a string id, got {}",
374                edge_name,
375                literal_to_sql(other)
376            )));
377        }
378        None => {
379            return Err(OmniError::manifest(format!(
380                "edge {} missing 'to' endpoint",
381                edge_name
382            )));
383        }
384    };
385
386    ensure_node_id_exists(db, staging, branch, &edge_type.from_type, from, "src").await?;
387    ensure_node_id_exists(db, staging, branch, &edge_type.to_type, to, "dst").await?;
388    Ok(())
389}
390
391/// Quick scan of pending batches for an `id` value match. Used by the
392/// mutation path's edge endpoint validation to satisfy read-your-writes
393/// for same-query inserts before they're committed to Lance.
394fn pending_batches_contain_id(batches: &[RecordBatch], id: &str) -> bool {
395    for batch in batches {
396        let Some(col) = batch.column_by_name("id") else {
397            continue;
398        };
399        let Some(arr) = col.as_any().downcast_ref::<StringArray>() else {
400            continue;
401        };
402        for i in 0..arr.len() {
403            if arr.is_valid(i) && arr.value(i) == id {
404                return true;
405            }
406        }
407    }
408    false
409}
410
411async fn ensure_node_id_exists(
412    db: &Omnigraph,
413    staging: &MutationStaging,
414    branch: Option<&str>,
415    node_type: &str,
416    id: &str,
417    label: &str,
418) -> Result<()> {
419    let table_key = format!("node:{}", node_type);
420
421    // Prefer the in-query pending accumulator so a same-query insert of
422    // the referenced node is visible to this validation. Fall back to
423    // the pre-mutation manifest snapshot when nothing pending matches.
424    let pending = staging.pending_batches(&table_key);
425    if pending_batches_contain_id(pending, id) {
426        return Ok(());
427    }
428
429    let filter = format!("id = '{}'", id.replace('\'', "''"));
430    let snapshot = db.snapshot_for_branch(branch).await?;
431    let ds = snapshot.open(&table_key).await?;
432    let exists = ds
433        .count_rows(Some(filter))
434        .await
435        .map_err(|e| OmniError::Lance(e.to_string()))?
436        > 0;
437
438    if exists {
439        Ok(())
440    } else {
441        Err(OmniError::manifest(format!(
442            "{} '{}' not found in {}",
443            label, id, node_type
444        )))
445    }
446}
447
448/// Convert an IRMutationPredicate to a Lance SQL filter string.
449fn predicate_to_sql(
450    predicate: &IRMutationPredicate,
451    params: &ParamMap,
452    is_edge: bool,
453) -> Result<String> {
454    let column = if is_edge {
455        match predicate.property.as_str() {
456            "from" => "src".to_string(),
457            "to" => "dst".to_string(),
458            other => other.to_string(),
459        }
460    } else {
461        predicate.property.clone()
462    };
463
464    let value = resolve_expr_value(&predicate.value, params)?;
465    let value_sql = literal_to_sql(&value);
466
467    let op = match predicate.op {
468        CompOp::Eq => "=",
469        CompOp::Ne => "!=",
470        CompOp::Gt => ">",
471        CompOp::Lt => "<",
472        CompOp::Ge => ">=",
473        CompOp::Le => "<=",
474        CompOp::Contains => {
475            return Err(OmniError::manifest(
476                "contains predicate not supported in mutations".to_string(),
477            ));
478        }
479    };
480
481    Ok(format!("{} {} {}", column, op, value_sql))
482}
483
484/// Replace specific columns in a RecordBatch with new literal values.
485///
486/// Blob columns may or may not be present in `batch` depending on the
487/// caller's scan projection:
488/// - If `batch` does NOT contain a blob column AND it has no assignment,
489///   the column is OMITTED from the output. `merge_insert` leaves it
490///   untouched.
491/// - If `batch` DOES contain a blob column AND it has no assignment, the
492///   column is COPIED to the output. This enables coalescing of
493///   different-shape updates into a single full-schema merge batch (the
494///   per-table accumulator in `MutationStaging` requires consistent
495///   schemas across pending batches for `concat_batches`). The
496///   round-tripping cost is acceptable for typical agent-driven
497///   mutations; tables with large blobs and unassigned-blob updates may
498///   want to be split into separate queries.
499/// - If a blob column has a string-URI assignment, build the blob array
500///   inline.
501fn apply_assignments(
502    full_schema: &SchemaRef,
503    batch: &RecordBatch,
504    assignments: &HashMap<String, Literal>,
505    blob_properties: &HashSet<String>,
506) -> Result<RecordBatch> {
507    let mut columns: Vec<ArrayRef> = Vec::with_capacity(full_schema.fields().len());
508    let mut out_fields: Vec<Field> = Vec::with_capacity(full_schema.fields().len());
509
510    for field in full_schema.fields().iter() {
511        if blob_properties.contains(field.name()) {
512            if let Some(Literal::String(uri)) = assignments.get(field.name()) {
513                // Assigned: build a single blob column from the URI.
514                let mut builder = BlobArrayBuilder::new(batch.num_rows());
515                for _ in 0..batch.num_rows() {
516                    crate::loader::append_blob_value(&mut builder, uri)?;
517                }
518                let blob_field = lance::blob::blob_field(field.name(), true);
519                out_fields.push(blob_field);
520                columns.push(
521                    builder
522                        .finish()
523                        .map_err(|e| OmniError::Lance(e.to_string()))?,
524                );
525            } else if let Some(col) = batch.column_by_name(field.name()) {
526                // Unassigned but scan included it: copy through (writes
527                // back the same blob, no observable change but uniform
528                // schema for the accumulator).
529                let blob_field = lance::blob::blob_field(field.name(), field.is_nullable());
530                out_fields.push(blob_field);
531                columns.push(col.clone());
532            }
533            // else: scan did not include this blob column and no
534            // assignment — omit. Caller's accumulator must accept the
535            // narrower schema (legacy single-merge_insert path).
536        } else if let Some(lit) = assignments.get(field.name()) {
537            out_fields.push(field.as_ref().clone());
538            columns.push(literal_to_typed_array(
539                lit,
540                field.data_type(),
541                batch.num_rows(),
542            )?);
543        } else {
544            let col = batch.column_by_name(field.name()).ok_or_else(|| {
545                OmniError::Lance(format!(
546                    "column '{}' not found in scan result",
547                    field.name()
548                ))
549            })?;
550            out_fields.push(field.as_ref().clone());
551            columns.push(col.clone());
552        }
553    }
554
555    RecordBatch::try_new(Arc::new(Schema::new(out_fields)), columns)
556        .map_err(|e| OmniError::Lance(e.to_string()))
557}
558
559// ─── Mutation execution ──────────────────────────────────────────────────────
560
561use super::staging::{MutationStaging, PendingMode};
562
563/// Open a sub-table dataset for read or inline-commit-write within the
564/// current mutation query, capturing pre-write metadata in `staging` on
565/// first touch. The captured version is the publisher's CAS fence at
566/// end-of-query (per-table OCC).
567///
568/// On first touch, opens the dataset at HEAD on the requested branch
569/// via `open_for_mutation_on_branch`, which compares Lance HEAD against
570/// the manifest's pinned version — that fence is the engine's
571/// publisher-style OCC catching cross-writer drift before we make any
572/// changes.
573///
574/// On subsequent touches *within the same query*, behavior depends on
575/// whether the table has already been inline-committed by a delete op:
576///
577/// - **Insert / update path (no inline commit between touches).** Lance
578///   HEAD has not moved since first touch, so a fresh
579///   `open_for_mutation_on_branch` would still match the manifest
580///   pinned version. We just go through it again; `ensure_path` is a
581///   no-op (idempotent on the captured `expected_version`).
582/// - **Delete cascade or multi-delete on the same table.** A prior
583///   `delete_where` on this table has already advanced Lance HEAD past
584///   the manifest's pinned version (the manifest doesn't move until
585///   end-of-query). Going through `open_for_mutation_on_branch` again
586///   would trip its `ensure_expected_version` equality check
587///   (`actual = pinned + 1` vs `expected = pinned`). Instead we route
588///   through `reopen_for_mutation` at the post-inline-commit Lance
589///   version captured in `staging.inline_committed[table_key]`, which
590///   is the source of truth for "where is Lance HEAD right now on
591///   this table within this query."
592///
593/// The `inline_committed` reopen branch closes the multi-delete-on-same-table
594/// failure path that pre-staged-write engines inherited. The branch goes
595/// away once Lance exposes a two-phase delete API
596/// ([lance-format/lance#6658](https://github.com/lance-format/lance/issues/6658))
597/// and we can stage deletes on the same path as inserts/updates.
598async fn open_table_for_mutation(
599    db: &Omnigraph,
600    staging: &mut MutationStaging,
601    branch: Option<&str>,
602    table_key: &str,
603) -> Result<(Dataset, String, Option<String>)> {
604    if let Some(prior) = staging.inline_committed.get(table_key) {
605        let path = staging.paths.get(table_key).ok_or_else(|| {
606            OmniError::manifest_internal(format!(
607                "open_table_for_mutation: inline_committed[{}] without paths entry",
608                table_key
609            ))
610        })?;
611        let ds = db
612            .reopen_for_mutation(
613                table_key,
614                &path.full_path,
615                path.table_branch.as_deref(),
616                prior.table_version,
617            )
618            .await?;
619        return Ok((ds, path.full_path.clone(), path.table_branch.clone()));
620    }
621    let (ds, full_path, table_branch) =
622        db.open_for_mutation_on_branch(branch, table_key).await?;
623    let expected_version = ds.version().version;
624    staging.ensure_path(
625        table_key,
626        full_path.clone(),
627        table_branch.clone(),
628        expected_version,
629    );
630    Ok((ds, full_path, table_branch))
631}
632
633/// D₂ parse-time check: a single mutation query is either insert/update-only
634/// or delete-only. Mixed → reject before any I/O.
635///
636/// Reason: under the staged-write writer, inserts and updates
637/// accumulate in memory and commit at end-of-query, while deletes still
638/// inline-commit (Lance lacks a public two-phase delete in 4.0.0).
639/// Mixing creates ordering hazards (same-row insert→delete becomes a no-op
640/// because the staged insert isn't visible to delete; cascading deletes
641/// of just-inserted edges break referential integrity by silent design).
642/// Until Lance exposes `DeleteJob::execute_uncommitted`, the parse-time
643/// rejection keeps both paths atomic and correct.
644fn enforce_no_mixed_destructive_constructive(
645    ir: &omnigraph_compiler::ir::MutationIR,
646) -> Result<()> {
647    let mut has_constructive = false;
648    let mut has_delete = false;
649    for op in &ir.ops {
650        match op {
651            MutationOpIR::Insert { .. } | MutationOpIR::Update { .. } => {
652                has_constructive = true;
653            }
654            MutationOpIR::Delete { .. } => {
655                has_delete = true;
656            }
657        }
658    }
659    if has_constructive && has_delete {
660        return Err(OmniError::manifest(format!(
661            "mutation '{}' on the same query mixes inserts/updates and deletes; \
662             split into separate mutations: (1) inserts and updates, then (2) deletes. \
663             This restriction lifts when Lance exposes a two-phase delete API \
664             (tracked: lance-format/lance#6658).",
665            ir.name
666        )));
667    }
668    Ok(())
669}
670
671impl Omnigraph {
672    pub async fn mutate(
673        &mut self,
674        branch: &str,
675        query_source: &str,
676        query_name: &str,
677        params: &ParamMap,
678    ) -> Result<MutationResult> {
679        self.mutate_as(branch, query_source, query_name, params, None)
680            .await
681    }
682
683    pub async fn mutate_as(
684        &mut self,
685        branch: &str,
686        query_source: &str,
687        query_name: &str,
688        params: &ParamMap,
689        actor_id: Option<&str>,
690    ) -> Result<MutationResult> {
691        let previous_actor = self.audit_actor_id.clone();
692        self.audit_actor_id = actor_id.map(str::to_string);
693        let result = self
694            .mutate_with_current_actor(branch, query_source, query_name, params)
695            .await;
696        self.audit_actor_id = previous_actor;
697        result
698    }
699
700    async fn mutate_with_current_actor(
701        &mut self,
702        branch: &str,
703        query_source: &str,
704        query_name: &str,
705        params: &ParamMap,
706    ) -> Result<MutationResult> {
707        self.ensure_schema_state_valid().await?;
708        let requested = Self::normalize_branch_name(branch)?;
709        // Reject internal `__run__*` / system-prefixed branches at the
710        // public write boundary. Direct-publish paths assert this
711        // explicitly so a caller can't write to legacy or system
712        // staging branches by passing the prefix verbatim.
713        if let Some(name) = requested.as_deref() {
714            crate::db::ensure_public_branch_ref(name, "mutate")?;
715        }
716        let resolved_params = enrich_mutation_params(params)?;
717
718        // Per-query staging accumulator. Inserts and updates push batches
719        // into `pending`; deletes still inline-commit and record into
720        // `inline_committed`. At end-of-query, `finalize` issues one
721        // `stage_*` + `commit_staged` per pending table, then the
722        // publisher commits the manifest atomically across all touched
723        // tables. Branch is threaded explicitly — no coordinator swap.
724        let mut staging = MutationStaging::default();
725
726        let exec_result = self
727            .execute_named_mutation(
728                query_source,
729                query_name,
730                &resolved_params,
731                requested.as_deref(),
732                &mut staging,
733            )
734            .await;
735
736        match exec_result {
737            Err(e) => Err(e),
738            Ok(total) if staging.is_empty() => Ok(total),
739            Ok(total) => {
740                let (updates, expected_versions) = staging
741                    .finalize(self, requested.as_deref())
742                    .await?;
743                // Failpoint that wedges the documented finalize→publisher
744                // residual: per-table `commit_staged` calls already
745                // advanced Lance HEAD on every touched table; a failure
746                // injected here mirrors the production-rare case where
747                // the publisher's CAS pre-check rejects (or the manifest
748                // write throws) after staged commits succeeded. Used by
749                // `tests/failpoints.rs::finalize_publisher_residual_*`
750                // to pin the documented residual behavior. See
751                // `docs/runs.md` "Finalize → publisher residual".
752                crate::failpoints::maybe_fail("mutation.post_finalize_pre_publisher")?;
753                self.commit_updates_on_branch_with_expected(
754                    requested.as_deref(),
755                    &updates,
756                    &expected_versions,
757                )
758                .await?;
759                Ok(total)
760            }
761        }
762    }
763
764    async fn execute_named_mutation(
765        &mut self,
766        query_source: &str,
767        query_name: &str,
768        params: &ParamMap,
769        branch: Option<&str>,
770        staging: &mut MutationStaging,
771    ) -> Result<MutationResult> {
772        let query_decl = omnigraph_compiler::find_named_query(query_source, query_name)
773            .map_err(|e| OmniError::manifest(e.to_string()))?;
774
775        let checked = typecheck_query_decl(self.catalog(), &query_decl)?;
776        match checked {
777            CheckedQuery::Mutation(_) => {}
778            CheckedQuery::Read(_) => {
779                return Err(OmniError::manifest(
780                    "mutation execution called on a read query; use query instead".to_string(),
781                ));
782            }
783        }
784
785        let ir = lower_mutation_query(&query_decl)?;
786        // D₂: reject mixed insert/update + delete before any I/O.
787        enforce_no_mixed_destructive_constructive(&ir)?;
788
789        let mut total = MutationResult::default();
790        for op in &ir.ops {
791            let result = match op {
792                MutationOpIR::Insert {
793                    type_name,
794                    assignments,
795                } => {
796                    self.execute_insert(type_name, assignments, params, branch, staging)
797                        .await?
798                }
799                MutationOpIR::Update {
800                    type_name,
801                    assignments,
802                    predicate,
803                } => {
804                    self.execute_update(
805                        type_name,
806                        assignments,
807                        predicate,
808                        params,
809                        branch,
810                        staging,
811                    )
812                    .await?
813                }
814                MutationOpIR::Delete {
815                    type_name,
816                    predicate,
817                } => {
818                    self.execute_delete(type_name, predicate, params, branch, staging)
819                        .await?
820                }
821            };
822            total.affected_nodes += result.affected_nodes;
823            total.affected_edges += result.affected_edges;
824        }
825        Ok(total)
826    }
827
828    async fn execute_insert(
829        &mut self,
830        type_name: &str,
831        assignments: &[IRAssignment],
832        params: &ParamMap,
833        branch: Option<&str>,
834        staging: &mut MutationStaging,
835    ) -> Result<MutationResult> {
836        let mut resolved: HashMap<String, Literal> = HashMap::new();
837        for a in assignments {
838            resolved.insert(a.property.clone(), resolve_expr_value(&a.value, params)?);
839        }
840
841        let is_node = self.catalog().node_types.contains_key(type_name);
842        let is_edge = self.catalog().edge_types.contains_key(type_name);
843
844        if is_node {
845            let node_type = &self.catalog().node_types[type_name];
846            let schema = node_type.arrow_schema.clone();
847            let blob_props = node_type.blob_properties.clone();
848            let id = if let Some(key_prop) = node_type.key_property() {
849                match resolved.get(key_prop) {
850                    Some(Literal::String(s)) => s.clone(),
851                    Some(other) => literal_to_sql(other).trim_matches('\'').to_string(),
852                    None => {
853                        return Err(OmniError::manifest(format!(
854                            "insert missing @key property '{}'",
855                            key_prop
856                        )));
857                    }
858                }
859            } else {
860                ulid::Ulid::new().to_string()
861            };
862
863            let batch = build_insert_batch(&schema, &id, &resolved, &blob_props)?;
864            crate::loader::validate_value_constraints(&batch, node_type)?;
865            crate::loader::validate_enum_constraints(&batch, &node_type.properties, type_name)?;
866            let unique_props = crate::loader::unique_property_names_for_node(node_type);
867            if !unique_props.is_empty() {
868                crate::loader::enforce_unique_constraints_intra_batch(
869                    &batch,
870                    type_name,
871                    &unique_props,
872                )?;
873            }
874            let has_key = node_type.key_property().is_some();
875            let table_key = format!("node:{}", type_name);
876            // Capture pre-write metadata on first touch (no Lance write).
877            let (_ds, _full_path, _table_branch) =
878                open_table_for_mutation(self, staging, branch, &table_key).await?;
879            // Accumulate. @key inserts go into the Merge stream (so a
880            // later update on the same id coalesces correctly); no-key
881            // inserts go into the Append stream.
882            let mode = if has_key {
883                PendingMode::Merge
884            } else {
885                PendingMode::Append
886            };
887            staging.append_batch(&table_key, schema, mode, batch)?;
888
889            Ok(MutationResult {
890                affected_nodes: 1,
891                affected_edges: 0,
892            })
893        } else if is_edge {
894            let edge_type = &self.catalog().edge_types[type_name];
895            let schema = edge_type.arrow_schema.clone();
896            let blob_props = edge_type.blob_properties.clone();
897            let id = ulid::Ulid::new().to_string();
898
899            let batch = build_insert_batch(&schema, &id, &resolved, &blob_props)?;
900            validate_edge_insert_endpoints(self, staging, branch, type_name, &resolved).await?;
901            crate::loader::validate_enum_constraints(&batch, &edge_type.properties, type_name)?;
902            let unique_props = crate::loader::unique_property_names_for_edge(edge_type);
903            if !unique_props.is_empty() {
904                crate::loader::enforce_unique_constraints_intra_batch(
905                    &batch,
906                    type_name,
907                    &unique_props,
908                )?;
909            }
910            let table_key = format!("edge:{}", type_name);
911            // Capture pre-write metadata on first touch (no Lance write).
912            let (ds, _full_path, _table_branch) =
913                open_table_for_mutation(self, staging, branch, &table_key).await?;
914            // Accumulate the new edge row. Edge IDs are ULID-generated so
915            // Append mode is correct (no key-based dedup needed).
916            staging.append_batch(&table_key, schema, PendingMode::Append, batch.clone())?;
917
918            // Edge cardinality validation: scan committed edges via Lance
919            // + iterate pending edges in-memory for the `src` column,
920            // group-by-src. The pending side already includes the row
921            // we just appended (above).
922            validate_edge_cardinality_with_pending(
923                self,
924                &ds,
925                staging,
926                &table_key,
927                edge_type,
928            )
929            .await?;
930
931            self.invalidate_graph_index().await;
932
933            Ok(MutationResult {
934                affected_nodes: 0,
935                affected_edges: 1,
936            })
937        } else {
938            Err(OmniError::manifest(format!("unknown type '{}'", type_name)))
939        }
940    }
941
942    async fn execute_update(
943        &mut self,
944        type_name: &str,
945        assignments: &[IRAssignment],
946        predicate: &IRMutationPredicate,
947        params: &ParamMap,
948        branch: Option<&str>,
949        staging: &mut MutationStaging,
950    ) -> Result<MutationResult> {
951        // Defense in depth: ensure this is a node type
952        if !self.catalog().node_types.contains_key(type_name) {
953            return Err(OmniError::manifest(format!(
954                "update is only supported for node types, not '{}'",
955                type_name
956            )));
957        }
958
959        // Reject updates to @key properties — identity is immutable
960        if let Some(key_prop) = self.catalog().node_types[type_name].key_property() {
961            if assignments.iter().any(|a| a.property == key_prop) {
962                return Err(OmniError::manifest(format!(
963                    "cannot update @key property '{}' — delete and re-insert instead",
964                    key_prop
965                )));
966            }
967        }
968
969        let pred_sql = predicate_to_sql(predicate, params, false)?;
970        let schema = self.catalog().node_types[type_name].arrow_schema.clone();
971        let blob_props = self.catalog().node_types[type_name].blob_properties.clone();
972
973        let table_key = format!("node:{}", type_name);
974        let (ds, _full_path, _table_branch) =
975            open_table_for_mutation(self, staging, branch, &table_key).await?;
976
977        // Scan committed via Lance + apply the same predicate to pending
978        // batches via DataFusion `MemTable` (read-your-writes for prior
979        // ops in this query). The pending side may include rows from
980        // earlier `insert` / `update` ops on the same table.
981        //
982        // For blob tables we project away the blob columns: Lance's
983        // scanner doesn't accept the standard projection path on blob
984        // descriptors and would panic with a `Field::project` assertion.
985        // The downstream `apply_assignments` synthesizes blob columns
986        // from explicit assignments and omits unassigned blobs (Lance's
987        // merge_insert leaves them untouched). Tables without blob
988        // columns scan the full schema unprojected.
989        let non_blob_cols: Vec<&str> = schema
990            .fields()
991            .iter()
992            .filter(|f| !blob_props.contains(f.name()))
993            .map(|f| f.name().as_str())
994            .collect();
995        let projection: Option<&[&str]> =
996            (!blob_props.is_empty()).then_some(non_blob_cols.as_slice());
997        let pending_batches = staging.pending_batches(&table_key);
998        let pending_schema = staging.pending_schema(&table_key);
999        // Use merge semantics on the union: a committed row whose `id`
1000        // also appears in pending has been logically updated by an
1001        // earlier op in this query and is shadowed from the scan,
1002        // otherwise the predicate runs against stale committed values
1003        // and a chained `update where <pred>` can match a row whose
1004        // pending value no longer satisfies <pred>.
1005        let batches = self
1006            .table_store()
1007            .scan_with_pending(
1008                &ds,
1009                pending_batches,
1010                pending_schema,
1011                projection,
1012                Some(&pred_sql),
1013                Some("id"),
1014            )
1015            .await?;
1016
1017        if batches.is_empty() || batches.iter().all(|b| b.num_rows() == 0) {
1018            return Ok(MutationResult {
1019                affected_nodes: 0,
1020                affected_edges: 0,
1021            });
1022        }
1023
1024        // Concat the matched batches (committed + pending) into one. The
1025        // helper trusts that both sides share a schema — Lance returns
1026        // dataset-schema-ordered columns and DataFusion returns
1027        // MemTable-schema-ordered columns; both should match the catalog's
1028        // arrow_schema when the projection is consistent. If they
1029        // diverge (typically a blob-table mid-schema-shift), the helper
1030        // surfaces a clear error directing the caller to split the
1031        // mutation.
1032        let matched = concat_match_batches_to_schema(&schema, &blob_props, batches)?;
1033
1034        let affected_count = matched.num_rows();
1035
1036        let mut resolved: HashMap<String, Literal> = HashMap::new();
1037        for a in assignments {
1038            resolved.insert(a.property.clone(), resolve_expr_value(&a.value, params)?);
1039        }
1040        let updated = apply_assignments(&schema, &matched, &resolved, &blob_props)?;
1041        let node_type = &self.catalog().node_types[type_name];
1042        crate::loader::validate_value_constraints(&updated, node_type)?;
1043        crate::loader::validate_enum_constraints(&updated, &node_type.properties, type_name)?;
1044        let unique_props = crate::loader::unique_property_names_for_node(node_type);
1045        if !unique_props.is_empty() {
1046            crate::loader::enforce_unique_constraints_intra_batch(
1047                &updated,
1048                type_name,
1049                &unique_props,
1050            )?;
1051        }
1052
1053        // Accumulate the updated batch into the Merge-mode pending stream.
1054        // The accumulator may now contain entries with the same id as a
1055        // prior insert or update on this table; `MutationStaging::finalize`
1056        // dedupes by id (last-occurrence wins) before issuing the single
1057        // `stage_merge_insert` call at end-of-query.
1058        let updated_schema = updated.schema();
1059        staging.append_batch(&table_key, updated_schema, PendingMode::Merge, updated)?;
1060
1061        Ok(MutationResult {
1062            affected_nodes: affected_count,
1063            affected_edges: 0,
1064        })
1065    }
1066
1067    async fn execute_delete(
1068        &mut self,
1069        type_name: &str,
1070        predicate: &IRMutationPredicate,
1071        params: &ParamMap,
1072        branch: Option<&str>,
1073        staging: &mut MutationStaging,
1074    ) -> Result<MutationResult> {
1075        let is_node = self.catalog().node_types.contains_key(type_name);
1076        if is_node {
1077            self.execute_delete_node(type_name, predicate, params, branch, staging)
1078                .await
1079        } else {
1080            self.execute_delete_edge(type_name, predicate, params, branch, staging)
1081                .await
1082        }
1083    }
1084
1085    async fn execute_delete_node(
1086        &mut self,
1087        type_name: &str,
1088        predicate: &IRMutationPredicate,
1089        params: &ParamMap,
1090        branch: Option<&str>,
1091        staging: &mut MutationStaging,
1092    ) -> Result<MutationResult> {
1093        let pred_sql = predicate_to_sql(predicate, params, false)?;
1094
1095        let table_key = format!("node:{}", type_name);
1096        let (ds, full_path, table_branch) =
1097            open_table_for_mutation(self, staging, branch, &table_key).await?;
1098        let initial_version = ds.version().version;
1099
1100        // Scan matching IDs for cascade. Per D₂ this never overlaps with
1101        // staged inserts (mixed insert/delete in one query is rejected at
1102        // parse time), so we scan committed only.
1103        let batches = self
1104            .table_store()
1105            .scan(&ds, Some(&["id"]), Some(&pred_sql), None)
1106            .await?;
1107
1108        let deleted_ids: Vec<String> = batches
1109            .iter()
1110            .flat_map(|batch| {
1111                let ids = batch
1112                    .column(0)
1113                    .as_any()
1114                    .downcast_ref::<StringArray>()
1115                    .unwrap();
1116                (0..ids.len())
1117                    .map(|i| ids.value(i).to_string())
1118                    .collect::<Vec<_>>()
1119            })
1120            .collect();
1121
1122        if deleted_ids.is_empty() {
1123            return Ok(MutationResult {
1124                affected_nodes: 0,
1125                affected_edges: 0,
1126            });
1127        }
1128
1129        let affected_nodes = deleted_ids.len();
1130
1131        // Delete nodes — still inline-commit (Lance's `Dataset::delete` is
1132        // not exposed as a two-phase op in 4.0.0). D₂ keeps inserts and
1133        // deletes from coexisting in one query, so this advance of Lance
1134        // HEAD is the only HEAD movement during the query and the
1135        // publisher's CAS captures it intact.
1136        let mut ds = self
1137            .reopen_for_mutation(
1138                &table_key,
1139                &full_path,
1140                table_branch.as_deref(),
1141                initial_version,
1142            )
1143            .await?;
1144        let delete_state = self
1145            .table_store()
1146            .delete_where(&full_path, &mut ds, &pred_sql)
1147            .await?;
1148
1149        staging.record_inline(crate::db::SubTableUpdate {
1150            table_key: table_key.clone(),
1151            table_version: delete_state.version,
1152            table_branch: table_branch.clone(),
1153            row_count: delete_state.row_count,
1154            version_metadata: delete_state.version_metadata,
1155        });
1156
1157        let mut affected_edges = 0usize;
1158        let escaped: Vec<String> = deleted_ids
1159            .iter()
1160            .map(|id| format!("'{}'", id.replace('\'', "''")))
1161            .collect();
1162        let id_list = escaped.join(", ");
1163
1164        let edge_info: Vec<(String, String, String)> = self
1165            .catalog()
1166            .edge_types
1167            .iter()
1168            .map(|(name, et)| (name.clone(), et.from_type.clone(), et.to_type.clone()))
1169            .collect();
1170
1171        for (edge_name, from_type, to_type) in &edge_info {
1172            let mut cascade_filters = Vec::new();
1173            if from_type == type_name {
1174                cascade_filters.push(format!("src IN ({})", id_list));
1175            }
1176            if to_type == type_name {
1177                cascade_filters.push(format!("dst IN ({})", id_list));
1178            }
1179            if cascade_filters.is_empty() {
1180                continue;
1181            }
1182
1183            let edge_table_key = format!("edge:{}", edge_name);
1184            let cascade_filter = cascade_filters.join(" OR ");
1185            let (mut edge_ds, edge_full_path, edge_table_branch) =
1186                open_table_for_mutation(self, staging, branch, &edge_table_key).await?;
1187
1188            let edge_delete = self
1189                .table_store()
1190                .delete_where(&edge_full_path, &mut edge_ds, &cascade_filter)
1191                .await?;
1192
1193            affected_edges += edge_delete.deleted_rows;
1194
1195            if edge_delete.deleted_rows > 0 {
1196                staging.record_inline(crate::db::SubTableUpdate {
1197                    table_key: edge_table_key,
1198                    table_version: edge_delete.version,
1199                    table_branch: edge_table_branch,
1200                    row_count: edge_delete.row_count,
1201                    version_metadata: edge_delete.version_metadata,
1202                });
1203            }
1204        }
1205
1206        if affected_edges > 0 {
1207            self.invalidate_graph_index().await;
1208        }
1209
1210        Ok(MutationResult {
1211            affected_nodes,
1212            affected_edges,
1213        })
1214    }
1215
1216    async fn execute_delete_edge(
1217        &mut self,
1218        type_name: &str,
1219        predicate: &IRMutationPredicate,
1220        params: &ParamMap,
1221        branch: Option<&str>,
1222        staging: &mut MutationStaging,
1223    ) -> Result<MutationResult> {
1224        let pred_sql = predicate_to_sql(predicate, params, true)?;
1225
1226        let table_key = format!("edge:{}", type_name);
1227        let (mut ds, full_path, table_branch) =
1228            open_table_for_mutation(self, staging, branch, &table_key).await?;
1229
1230        let delete_state = self
1231            .table_store()
1232            .delete_where(&full_path, &mut ds, &pred_sql)
1233            .await?;
1234        let affected = delete_state.deleted_rows;
1235
1236        if affected > 0 {
1237            staging.record_inline(crate::db::SubTableUpdate {
1238                table_key,
1239                table_version: delete_state.version,
1240                table_branch,
1241                row_count: delete_state.row_count,
1242                version_metadata: delete_state.version_metadata,
1243            });
1244            self.invalidate_graph_index().await;
1245        }
1246
1247        Ok(MutationResult {
1248            affected_nodes: 0,
1249            affected_edges: affected,
1250        })
1251    }
1252}
1253
1254/// Concat the matched batches from `scan_with_pending` into a single batch.
1255/// `scan_with_pending` returns committed-side and pending-side batches in
1256/// order; both should share a schema if pending was produced through
1257/// `apply_assignments` with full-schema scan input. If schemas drift,
1258/// surface a clear error so the user can split the query.
1259fn concat_match_batches_to_schema(
1260    _schema: &SchemaRef,
1261    _blob_properties: &HashSet<String>,
1262    batches: Vec<RecordBatch>,
1263) -> Result<RecordBatch> {
1264    if batches.len() == 1 {
1265        return Ok(batches.into_iter().next().unwrap());
1266    }
1267    let common = batches[0].schema();
1268    arrow_select::concat::concat_batches(&common, &batches).map_err(|e| {
1269        OmniError::Lance(format!(
1270            "scan_with_pending returned batches with mismatched schemas \
1271             across the committed/pending boundary; this typically indicates \
1272             a blob-column shape mismatch between the committed table and a \
1273             prior in-query insert/update. Split blob-touching mutations \
1274             into separate queries. ({})",
1275            e
1276        ))
1277    })
1278}
1279
1280/// Validate `@card` bounds against committed (Lance) + pending (in-memory)
1281/// edges for one edge table. Engine path: each insert produces a fresh
1282/// ULID id, so committed and pending cannot share a primary key — no
1283/// dedup needed (`dedupe_key_column = None`).
1284async fn validate_edge_cardinality_with_pending(
1285    db: &Omnigraph,
1286    committed_ds: &Dataset,
1287    staging: &MutationStaging,
1288    table_key: &str,
1289    edge_type: &omnigraph_compiler::catalog::EdgeType,
1290) -> Result<()> {
1291    if edge_type.cardinality.is_default() {
1292        return Ok(());
1293    }
1294    let counts = super::staging::count_src_per_edge(
1295        db,
1296        committed_ds,
1297        table_key,
1298        staging,
1299        None,
1300    )
1301    .await?;
1302    super::staging::enforce_cardinality_bounds(edge_type, &counts)
1303}
1304
1305fn enrich_mutation_params(params: &ParamMap) -> Result<ParamMap> {
1306    let mut resolved = params.clone();
1307    if !resolved.contains_key(NOW_PARAM_NAME) {
1308        let now = OffsetDateTime::now_utc()
1309            .format(&Rfc3339)
1310            .map_err(|e| OmniError::manifest(format!("failed to format now(): {}", e)))?;
1311        resolved.insert(NOW_PARAM_NAME.to_string(), Literal::DateTime(now));
1312    }
1313    Ok(resolved)
1314}