Skip to main content

omnigraph/exec/
mutation.rs

1use super::*;
2
3use super::query::literal_to_sql;
4
5// ─── Mutation helpers ────────────────────────────────────────────────────────
6
7/// Resolve an IRExpr to a concrete Literal value at runtime.
8fn resolve_expr_value(expr: &IRExpr, params: &ParamMap) -> Result<Literal> {
9    match expr {
10        IRExpr::Literal(lit) => Ok(lit.clone()),
11        IRExpr::Param(name) => params
12            .get(name)
13            .cloned()
14            .ok_or_else(|| OmniError::manifest(format!("parameter '{}' not provided", name))),
15        other => Err(OmniError::manifest(format!(
16            "unsupported expression in mutation: {:?}",
17            other
18        ))),
19    }
20}
21
22/// Create a single-element or N-element array from a Literal, matching the target DataType.
23fn literal_to_typed_array(
24    lit: &Literal,
25    data_type: &DataType,
26    num_rows: usize,
27) -> Result<ArrayRef> {
28    Ok(match (lit, data_type) {
29        (Literal::Null, _) => arrow_array::new_null_array(data_type, num_rows),
30        (Literal::String(s), DataType::Utf8) => {
31            Arc::new(StringArray::from(vec![s.as_str(); num_rows])) as ArrayRef
32        }
33        (Literal::Integer(n), DataType::Int32) => {
34            Arc::new(Int32Array::from(vec![*n as i32; num_rows]))
35        }
36        (Literal::Integer(n), DataType::Int64) => Arc::new(Int64Array::from(vec![*n; num_rows])),
37        (Literal::Integer(n), DataType::UInt32) => {
38            Arc::new(UInt32Array::from(vec![*n as u32; num_rows]))
39        }
40        (Literal::Integer(n), DataType::UInt64) => {
41            Arc::new(UInt64Array::from(vec![*n as u64; num_rows]))
42        }
43        (Literal::Float(f), DataType::Float32) => {
44            Arc::new(Float32Array::from(vec![*f as f32; num_rows]))
45        }
46        (Literal::Float(f), DataType::Float64) => Arc::new(Float64Array::from(vec![*f; num_rows])),
47        (Literal::Bool(b), DataType::Boolean) => Arc::new(BooleanArray::from(vec![*b; num_rows])),
48        (Literal::Date(s), DataType::Date32) => {
49            let days = crate::loader::parse_date32_literal(s)?;
50            Arc::new(Date32Array::from(vec![days; num_rows]))
51        }
52        (Literal::DateTime(s), DataType::Date64) => Arc::new(Date64Array::from(vec![
53            crate::loader::parse_date64_literal(s)?;
54            num_rows
55        ])),
56        (Literal::List(items), DataType::List(field)) => {
57            typed_list_literal_to_array(items, field.data_type(), num_rows)?
58        }
59        (Literal::List(items), DataType::FixedSizeList(field, dim))
60            if field.data_type() == &DataType::Float32 =>
61        {
62            if items.len() != *dim as usize {
63                return Err(OmniError::manifest(format!(
64                    "vector property expects {} dimensions, got {}",
65                    dim,
66                    items.len()
67                )));
68            }
69            let mut builder = FixedSizeListBuilder::with_capacity(
70                Float32Builder::with_capacity(num_rows * (*dim as usize)),
71                *dim,
72                num_rows,
73            )
74            .with_field(field.clone());
75            for _ in 0..num_rows {
76                for item in items {
77                    match item {
78                        Literal::Integer(value) => builder.values().append_value(*value as f32),
79                        Literal::Float(value) => builder.values().append_value(*value as f32),
80                        _ => {
81                            return Err(OmniError::manifest(
82                                "vector elements must be numeric".to_string(),
83                            ));
84                        }
85                    }
86                }
87                builder.append(true);
88            }
89            Arc::new(builder.finish())
90        }
91        _ => {
92            return Err(OmniError::manifest(format!(
93                "cannot convert {:?} to {:?}",
94                lit, data_type
95            )));
96        }
97    })
98}
99
100fn typed_list_literal_to_array(
101    items: &[Literal],
102    item_type: &DataType,
103    num_rows: usize,
104) -> Result<ArrayRef> {
105    match item_type {
106        DataType::Utf8 => {
107            let mut builder = ListBuilder::new(StringBuilder::new());
108            for _ in 0..num_rows {
109                for item in items {
110                    match item {
111                        Literal::String(value) => builder.values().append_value(value),
112                        _ => builder.values().append_null(),
113                    }
114                }
115                builder.append(true);
116            }
117            Ok(Arc::new(builder.finish()))
118        }
119        DataType::Boolean => {
120            let mut builder = ListBuilder::new(BooleanBuilder::new());
121            for _ in 0..num_rows {
122                for item in items {
123                    match item {
124                        Literal::Bool(value) => builder.values().append_value(*value),
125                        _ => builder.values().append_null(),
126                    }
127                }
128                builder.append(true);
129            }
130            Ok(Arc::new(builder.finish()))
131        }
132        DataType::Int32 => {
133            let mut builder = ListBuilder::new(Int32Builder::new());
134            for _ in 0..num_rows {
135                for item in items {
136                    match item {
137                        Literal::Integer(value) => {
138                            let value = i32::try_from(*value).map_err(|_| {
139                                OmniError::manifest(format!(
140                                    "list value {} exceeds Int32 range",
141                                    value
142                                ))
143                            })?;
144                            builder.values().append_value(value);
145                        }
146                        _ => builder.values().append_null(),
147                    }
148                }
149                builder.append(true);
150            }
151            Ok(Arc::new(builder.finish()))
152        }
153        DataType::Int64 => {
154            let mut builder = ListBuilder::new(Int64Builder::new());
155            for _ in 0..num_rows {
156                for item in items {
157                    match item {
158                        Literal::Integer(value) => builder.values().append_value(*value),
159                        _ => builder.values().append_null(),
160                    }
161                }
162                builder.append(true);
163            }
164            Ok(Arc::new(builder.finish()))
165        }
166        DataType::UInt32 => {
167            let mut builder = ListBuilder::new(UInt32Builder::new());
168            for _ in 0..num_rows {
169                for item in items {
170                    match item {
171                        Literal::Integer(value) => {
172                            let value = u32::try_from(*value).map_err(|_| {
173                                OmniError::manifest(format!(
174                                    "list value {} exceeds UInt32 range",
175                                    value
176                                ))
177                            })?;
178                            builder.values().append_value(value);
179                        }
180                        _ => builder.values().append_null(),
181                    }
182                }
183                builder.append(true);
184            }
185            Ok(Arc::new(builder.finish()))
186        }
187        DataType::UInt64 => {
188            let mut builder = ListBuilder::new(UInt64Builder::new());
189            for _ in 0..num_rows {
190                for item in items {
191                    match item {
192                        Literal::Integer(value) => {
193                            let value = u64::try_from(*value).map_err(|_| {
194                                OmniError::manifest(format!(
195                                    "list value {} exceeds UInt64 range",
196                                    value
197                                ))
198                            })?;
199                            builder.values().append_value(value);
200                        }
201                        _ => builder.values().append_null(),
202                    }
203                }
204                builder.append(true);
205            }
206            Ok(Arc::new(builder.finish()))
207        }
208        DataType::Float32 => {
209            let mut builder = ListBuilder::new(Float32Builder::new());
210            for _ in 0..num_rows {
211                for item in items {
212                    match item {
213                        Literal::Integer(value) => builder.values().append_value(*value as f32),
214                        Literal::Float(value) => builder.values().append_value(*value as f32),
215                        _ => builder.values().append_null(),
216                    }
217                }
218                builder.append(true);
219            }
220            Ok(Arc::new(builder.finish()))
221        }
222        DataType::Float64 => {
223            let mut builder = ListBuilder::new(Float64Builder::new());
224            for _ in 0..num_rows {
225                for item in items {
226                    match item {
227                        Literal::Integer(value) => builder.values().append_value(*value as f64),
228                        Literal::Float(value) => builder.values().append_value(*value),
229                        _ => builder.values().append_null(),
230                    }
231                }
232                builder.append(true);
233            }
234            Ok(Arc::new(builder.finish()))
235        }
236        DataType::Date32 => {
237            let mut builder = ListBuilder::new(Date32Builder::new());
238            for _ in 0..num_rows {
239                for item in items {
240                    match item {
241                        Literal::Date(value) => builder
242                            .values()
243                            .append_value(crate::loader::parse_date32_literal(value)?),
244                        _ => builder.values().append_null(),
245                    }
246                }
247                builder.append(true);
248            }
249            Ok(Arc::new(builder.finish()))
250        }
251        DataType::Date64 => {
252            let mut builder = ListBuilder::new(Date64Builder::new());
253            for _ in 0..num_rows {
254                for item in items {
255                    match item {
256                        Literal::DateTime(value) => builder
257                            .values()
258                            .append_value(crate::loader::parse_date64_literal(value)?),
259                        _ => builder.values().append_null(),
260                    }
261                }
262                builder.append(true);
263            }
264            Ok(Arc::new(builder.finish()))
265        }
266        other => Err(OmniError::manifest(format!(
267            "cannot convert list literal to {:?}",
268            other
269        ))),
270    }
271}
272
273/// Build a single-element blob array from a URI or base64 value string.
274fn build_blob_array_from_value(value: &str) -> Result<ArrayRef> {
275    let mut builder = BlobArrayBuilder::new(1);
276    crate::loader::append_blob_value(&mut builder, value)?;
277    builder
278        .finish()
279        .map_err(|e| OmniError::Lance(e.to_string()))
280}
281
282/// Build a null blob array with one element.
283fn build_null_blob_array() -> Result<ArrayRef> {
284    let mut builder = BlobArrayBuilder::new(1);
285    builder
286        .push_null()
287        .map_err(|e| OmniError::Lance(e.to_string()))?;
288    builder
289        .finish()
290        .map_err(|e| OmniError::Lance(e.to_string()))
291}
292
293/// Build a single-row RecordBatch from resolved assignments.
294fn build_insert_batch(
295    schema: &SchemaRef,
296    id: &str,
297    assignments: &HashMap<String, Literal>,
298    blob_properties: &HashSet<String>,
299) -> Result<RecordBatch> {
300    let mut columns: Vec<ArrayRef> = Vec::with_capacity(schema.fields().len());
301
302    for field in schema.fields() {
303        if field.name() == "id" {
304            columns.push(Arc::new(StringArray::from(vec![id])));
305        } else if blob_properties.contains(field.name()) {
306            if let Some(Literal::String(uri)) = assignments.get(field.name()) {
307                columns.push(build_blob_array_from_value(uri)?);
308            } else if field.is_nullable() {
309                columns.push(build_null_blob_array()?);
310            } else {
311                return Err(OmniError::manifest(format!(
312                    "missing required blob property '{}'",
313                    field.name()
314                )));
315            }
316        } else if field.name() == "src" {
317            let lit = assignments.get("from").ok_or_else(|| {
318                OmniError::manifest("missing required edge endpoint 'from'".to_string())
319            })?;
320            columns.push(literal_to_typed_array(lit, field.data_type(), 1)?);
321        } else if field.name() == "dst" {
322            let lit = assignments.get("to").ok_or_else(|| {
323                OmniError::manifest("missing required edge endpoint 'to'".to_string())
324            })?;
325            columns.push(literal_to_typed_array(lit, field.data_type(), 1)?);
326        } else if let Some(lit) = assignments.get(field.name()) {
327            columns.push(literal_to_typed_array(lit, field.data_type(), 1)?);
328        } else if field.is_nullable() {
329            columns.push(arrow_array::new_null_array(field.data_type(), 1));
330        } else {
331            return Err(OmniError::manifest(format!(
332                "missing required property '{}'",
333                field.name()
334            )));
335        }
336    }
337
338    RecordBatch::try_new(schema.clone(), columns).map_err(|e| OmniError::Lance(e.to_string()))
339}
340
341async fn validate_edge_insert_endpoints(
342    db: &Omnigraph,
343    staging: &MutationStaging,
344    branch: Option<&str>,
345    edge_name: &str,
346    assignments: &HashMap<String, Literal>,
347) -> Result<()> {
348    let catalog = db.catalog();
349    let edge_type = catalog
350        .edge_types
351        .get(edge_name)
352        .ok_or_else(|| OmniError::manifest(format!("unknown edge type '{}'", edge_name)))?;
353    let from = match assignments.get("from") {
354        Some(Literal::String(value)) => value.as_str(),
355        Some(other) => {
356            return Err(OmniError::manifest(format!(
357                "edge {} from endpoint must be a string id, got {}",
358                edge_name,
359                literal_to_sql(other)
360            )));
361        }
362        None => {
363            return Err(OmniError::manifest(format!(
364                "edge {} missing 'from' endpoint",
365                edge_name
366            )));
367        }
368    };
369    let to = match assignments.get("to") {
370        Some(Literal::String(value)) => value.as_str(),
371        Some(other) => {
372            return Err(OmniError::manifest(format!(
373                "edge {} to endpoint must be a string id, got {}",
374                edge_name,
375                literal_to_sql(other)
376            )));
377        }
378        None => {
379            return Err(OmniError::manifest(format!(
380                "edge {} missing 'to' endpoint",
381                edge_name
382            )));
383        }
384    };
385
386    ensure_node_id_exists(db, staging, branch, &edge_type.from_type, from, "src").await?;
387    ensure_node_id_exists(db, staging, branch, &edge_type.to_type, to, "dst").await?;
388    Ok(())
389}
390
391/// Quick scan of pending batches for an `id` value match. Used by the
392/// mutation path's edge endpoint validation to satisfy read-your-writes
393/// for same-query inserts before they're committed to Lance.
394fn pending_batches_contain_id(batches: &[RecordBatch], id: &str) -> bool {
395    for batch in batches {
396        let Some(col) = batch.column_by_name("id") else {
397            continue;
398        };
399        let Some(arr) = col.as_any().downcast_ref::<StringArray>() else {
400            continue;
401        };
402        for i in 0..arr.len() {
403            if arr.is_valid(i) && arr.value(i) == id {
404                return true;
405            }
406        }
407    }
408    false
409}
410
411async fn ensure_node_id_exists(
412    db: &Omnigraph,
413    staging: &MutationStaging,
414    branch: Option<&str>,
415    node_type: &str,
416    id: &str,
417    label: &str,
418) -> Result<()> {
419    let table_key = format!("node:{}", node_type);
420
421    // Prefer the in-query pending accumulator so a same-query insert of
422    // the referenced node is visible to this validation. Fall back to
423    // the pre-mutation manifest snapshot when nothing pending matches.
424    let pending = staging.pending_batches(&table_key);
425    if pending_batches_contain_id(pending, id) {
426        return Ok(());
427    }
428
429    let filter = format!("id = '{}'", id.replace('\'', "''"));
430    let snapshot = db.snapshot_for_branch(branch).await?;
431    let ds = db
432        .storage()
433        .open_snapshot_at_table(&snapshot, &table_key)
434        .await?;
435    let exists = db.storage().count_rows(&ds, Some(filter)).await? > 0;
436
437    if exists {
438        Ok(())
439    } else {
440        Err(OmniError::manifest(format!(
441            "{} '{}' not found in {}",
442            label, id, node_type
443        )))
444    }
445}
446
447/// Convert an IRMutationPredicate to a Lance SQL filter string.
448fn predicate_to_sql(
449    predicate: &IRMutationPredicate,
450    params: &ParamMap,
451    is_edge: bool,
452) -> Result<String> {
453    let column = if is_edge {
454        match predicate.property.as_str() {
455            "from" => "src".to_string(),
456            "to" => "dst".to_string(),
457            other => other.to_string(),
458        }
459    } else {
460        predicate.property.clone()
461    };
462
463    let value = resolve_expr_value(&predicate.value, params)?;
464    let value_sql = literal_to_sql(&value);
465
466    let op = match predicate.op {
467        CompOp::Eq => "=",
468        CompOp::Ne => "!=",
469        CompOp::Gt => ">",
470        CompOp::Lt => "<",
471        CompOp::Ge => ">=",
472        CompOp::Le => "<=",
473        CompOp::Contains => {
474            return Err(OmniError::manifest(
475                "contains predicate not supported in mutations".to_string(),
476            ));
477        }
478    };
479
480    // #283: emit the column UNQUOTED. Lance's `Scanner::filter(&str)` (the
481    // committed-scan consumer) preserves an unquoted identifier's case but
482    // treats a double-quoted `"col"` as a string literal, so quoting here
483    // would silently match zero committed rows. The pending-batch MemTable
484    // query is instead made case-preserving by disabling DataFusion identifier
485    // normalization on its `SessionContext` (see `scan_pending_batches`).
486    Ok(format!("{} {} {}", column, op, value_sql))
487}
488
489/// Replace specific columns in a RecordBatch with new literal values.
490///
491/// Blob columns may or may not be present in `batch` depending on the
492/// caller's scan projection:
493/// - If `batch` does NOT contain a blob column AND it has no assignment,
494///   the column is OMITTED from the output. `merge_insert` leaves it
495///   untouched.
496/// - If `batch` DOES contain a blob column AND it has no assignment, the
497///   column is COPIED to the output. This enables coalescing of
498///   different-shape updates into a single full-schema merge batch (the
499///   per-table accumulator in `MutationStaging` requires consistent
500///   schemas across pending batches for `concat_batches`). The
501///   round-tripping cost is acceptable for typical agent-driven
502///   mutations; tables with large blobs and unassigned-blob updates may
503///   want to be split into separate queries.
504/// - If a blob column has a string-URI assignment, build the blob array
505///   inline.
506fn apply_assignments(
507    full_schema: &SchemaRef,
508    batch: &RecordBatch,
509    assignments: &HashMap<String, Literal>,
510    blob_properties: &HashSet<String>,
511) -> Result<RecordBatch> {
512    let mut columns: Vec<ArrayRef> = Vec::with_capacity(full_schema.fields().len());
513    let mut out_fields: Vec<Field> = Vec::with_capacity(full_schema.fields().len());
514
515    for field in full_schema.fields().iter() {
516        if blob_properties.contains(field.name()) {
517            if let Some(Literal::String(uri)) = assignments.get(field.name()) {
518                // Assigned: build a single blob column from the URI.
519                let mut builder = BlobArrayBuilder::new(batch.num_rows());
520                for _ in 0..batch.num_rows() {
521                    crate::loader::append_blob_value(&mut builder, uri)?;
522                }
523                let blob_field = lance::blob::blob_field(field.name(), true);
524                out_fields.push(blob_field);
525                columns.push(
526                    builder
527                        .finish()
528                        .map_err(|e| OmniError::Lance(e.to_string()))?,
529                );
530            } else if let Some(col) = batch.column_by_name(field.name()) {
531                // Unassigned but scan included it: copy through (writes
532                // back the same blob, no observable change but uniform
533                // schema for the accumulator).
534                let blob_field = lance::blob::blob_field(field.name(), field.is_nullable());
535                out_fields.push(blob_field);
536                columns.push(col.clone());
537            }
538            // else: scan did not include this blob column and no
539            // assignment — omit. Caller's accumulator must accept the
540            // narrower schema (legacy single-merge_insert path).
541        } else if let Some(lit) = assignments.get(field.name()) {
542            out_fields.push(field.as_ref().clone());
543            columns.push(literal_to_typed_array(
544                lit,
545                field.data_type(),
546                batch.num_rows(),
547            )?);
548        } else {
549            let col = batch.column_by_name(field.name()).ok_or_else(|| {
550                OmniError::Lance(format!(
551                    "column '{}' not found in scan result",
552                    field.name()
553                ))
554            })?;
555            out_fields.push(field.as_ref().clone());
556            columns.push(col.clone());
557        }
558    }
559
560    RecordBatch::try_new(Arc::new(Schema::new(out_fields)), columns)
561        .map_err(|e| OmniError::Lance(e.to_string()))
562}
563
564// ─── Mutation execution ──────────────────────────────────────────────────────
565
566use super::staging::{MutationStaging, PendingMode};
567
568/// Open a sub-table dataset for read or inline-commit-write within the
569/// current mutation query, capturing pre-write metadata in `staging` on
570/// first touch. The captured version is the publisher's CAS fence at
571/// end-of-query (per-table OCC).
572///
573/// On first touch, opens the dataset at HEAD on the requested branch
574/// via `open_for_mutation_on_branch`, which compares Lance HEAD against
575/// the manifest's pinned version — that fence is the engine's
576/// publisher-style OCC catching cross-writer drift before we make any
577/// changes. For delete-only queries, this strict open is also the uncovered
578/// drift guard that runs before `delete_where` can inline-commit.
579///
580/// On subsequent touches *within the same query*, behavior depends on
581/// whether the table has already been inline-committed by a delete op:
582///
583/// - **Insert / update path (no inline commit between touches).** Lance
584///   HEAD has not moved since first touch, so a fresh
585///   `open_for_mutation_on_branch` would still match the manifest
586///   pinned version. We just go through it again; `ensure_path` is a
587///   no-op (idempotent on the captured `expected_version`).
588/// - **Delete cascade or multi-delete on the same table.** A prior
589///   `delete_where` on this table has already advanced Lance HEAD past
590///   the manifest's pinned version (the manifest doesn't move until
591///   end-of-query). Going through `open_for_mutation_on_branch` again
592///   would trip its `ensure_expected_version` equality check
593///   (`actual = pinned + 1` vs `expected = pinned`). Instead we route
594///   through `reopen_for_mutation` at the post-inline-commit Lance
595///   version captured in `staging.inline_committed[table_key]`, which
596///   is the source of truth for "where is Lance HEAD right now on
597///   this table within this query."
598///
599/// The `inline_committed` reopen branch closes the multi-delete-on-same-table
600/// failure path that pre-staged-write engines inherited. The branch goes
601/// away once Lance exposes a two-phase delete API
602/// ([lance-format/lance#6658](https://github.com/lance-format/lance/issues/6658))
603/// and we can stage deletes on the same path as inserts/updates.
604impl Omnigraph {
605    /// Resolve a LIVE-HEAD read handle for an edge table's committed-state `@card`
606    /// scan when collapse #1 skipped the accumulation open. The edge-insert path no
607    /// longer opens the edge dataset (non-strict op + txn), but cardinality is
608    /// validated ONCE (never rechecked at commit), so the scan must observe the
609    /// freshest committed edges — NOT the pinned `txn.base`. A concurrent writer can
610    /// commit edges to this table after `txn` capture; counting against the stale
611    /// base undercounts and lets a violating insert through (invariant 9). The table
612    /// LOCATION is read from the pinned entry (stable across versions); the dataset is
613    /// opened at live HEAD via `open_dataset_head_for_write` (a read here despite the
614    /// name — no lock/stage), restoring the pre-3b image (the mutation's own open).
615    /// The residual validate→commit race (a writer committing between this scan and
616    /// the end-of-query commit) is the §7.1 gap, closed by RFC-013 step 4.
617    async fn edge_cardinality_read_handle(
618        &self,
619        txn: Option<&crate::db::WriteTxn>,
620        table_key: &str,
621    ) -> Result<SnapshotHandle> {
622        let branch = txn.and_then(|t| t.branch.as_deref());
623        match txn.and_then(|t| t.base.entry(table_key)) {
624            Some(entry) => {
625                let full_path = self.storage().dataset_uri(&entry.table_path);
626                self.storage()
627                    .open_dataset_head_for_write(table_key, &full_path, branch)
628                    .await
629            }
630            // Unreachable today (the `None` handle only reaches here under a txn whose
631            // base contains the table). Defensive: resolve the table fresh (live)
632            // without the schema re-validation `snapshot_for_branch` would re-run.
633            None => {
634                let snapshot = self.fresh_snapshot_for_branch_unchecked(branch).await?;
635                self.storage().open_snapshot_at_table(&snapshot, table_key).await
636            }
637        }
638    }
639}
640
641async fn open_table_for_mutation(
642    db: &Omnigraph,
643    staging: &mut MutationStaging,
644    branch: Option<&str>,
645    table_key: &str,
646    op_kind: crate::db::MutationOpKind,
647    txn: Option<&crate::db::WriteTxn>,
648) -> Result<(Option<SnapshotHandle>, String, Option<String>)> {
649    if let Some(prior) = staging.inline_committed.get(table_key) {
650        let path = staging.paths.get(table_key).ok_or_else(|| {
651            OmniError::manifest_internal(format!(
652                "open_table_for_mutation: inline_committed[{}] without paths entry",
653                table_key
654            ))
655        })?;
656        // The inline-committed reopen does NOT validate the schema contract
657        // (it reopens at the post-inline-commit Lance version directly), so it
658        // takes no `txn` — threading it here would change nothing. Deletes are
659        // strict ops, so this always opens (returns `Some`).
660        let ds = db
661            .reopen_for_mutation(
662                table_key,
663                &path.full_path,
664                path.table_branch.as_deref(),
665                prior.table_version,
666                op_kind,
667            )
668            .await?;
669        return Ok((Some(ds), path.full_path.clone(), path.table_branch.clone()));
670    }
671    // `open_for_mutation_on_branch` returns the expected version even when it
672    // skips the open (collapse #1, the non-strict insert/merge path): the version
673    // is the pinned base's, identical to the opened handle's `.version()`. Use it
674    // directly for `ensure_path` so the no-open path still captures the publisher
675    // CAS fence.
676    let opened = db
677        .open_for_mutation_on_branch(branch, table_key, op_kind, txn)
678        .await?;
679    // Pin the open-skip contract (collapse #1): a missing handle is legal ONLY on
680    // the non-strict `txn` path. A future change that returns `None` elsewhere
681    // (e.g. a new strict arm) trips this in debug builds rather than silently
682    // handing a `None` to a `require_handle` consumer.
683    debug_assert!(
684        opened.handle.is_some() || (txn.is_some() && !op_kind.strict_pre_stage_version_check()),
685        "open_for_mutation_on_branch returned no handle outside the non-strict txn open-skip path",
686    );
687    staging.ensure_path(
688        table_key,
689        opened.full_path.clone(),
690        opened.table_branch.clone(),
691        opened.expected_version,
692        op_kind,
693    );
694    Ok((opened.handle, opened.full_path, opened.table_branch))
695}
696
697/// D₂ parse-time check: a single mutation query is either insert/update-only
698/// or delete-only. Mixed → reject before any I/O.
699///
700/// Reason: under the staged-write writer, inserts and updates
701/// accumulate in memory and commit at end-of-query, while deletes still
702/// inline-commit (Lance lacks a public two-phase delete in 6.0.1).
703/// Mixing creates ordering hazards (same-row insert→delete becomes a no-op
704/// because the staged insert isn't visible to delete; cascading deletes
705/// of just-inserted edges break referential integrity by silent design).
706/// Until Lance exposes `DeleteJob::execute_uncommitted`, the parse-time
707/// rejection keeps both paths atomic and correct.
708fn enforce_no_mixed_destructive_constructive(
709    ir: &omnigraph_compiler::ir::MutationIR,
710) -> Result<()> {
711    let mut has_constructive = false;
712    let mut has_delete = false;
713    for op in &ir.ops {
714        match op {
715            MutationOpIR::Insert { .. } | MutationOpIR::Update { .. } => {
716                has_constructive = true;
717            }
718            MutationOpIR::Delete { .. } => {
719                has_delete = true;
720            }
721        }
722    }
723    if has_constructive && has_delete {
724        return Err(OmniError::manifest(format!(
725            "mutation '{}' on the same query mixes inserts/updates and deletes; \
726             split into separate mutations: (1) inserts and updates, then (2) deletes. \
727             This restriction lifts when Lance exposes a two-phase delete API \
728             (tracked: lance-format/lance#6658).",
729            ir.name
730        )));
731    }
732    Ok(())
733}
734
735impl Omnigraph {
736    pub async fn mutate(
737        &self,
738        branch: &str,
739        query_source: &str,
740        query_name: &str,
741        params: &ParamMap,
742    ) -> Result<MutationResult> {
743        self.mutate_as(branch, query_source, query_name, params, None)
744            .await
745    }
746
747    pub async fn mutate_as(
748        &self,
749        branch: &str,
750        query_source: &str,
751        query_name: &str,
752        params: &ParamMap,
753        actor_id: Option<&str>,
754    ) -> Result<MutationResult> {
755        // Engine-layer policy gate (MR-722 fan-out / PR #3). Scope is
756        // `Branch(branch)` to match the HTTP-layer convention at
757        // `server_change` (branch=Some(branch), target_branch=None). When no
758        // PolicyChecker is installed this is a no-op; with policy installed
759        // and actor=None this fails hard (forget-the-actor footgun guard).
760        self.enforce(
761            omnigraph_policy::PolicyAction::Change,
762            &omnigraph_policy::ResourceScope::Branch(branch.to_string()),
763            actor_id,
764        )?;
765        self.mutate_with_current_actor(branch, query_source, query_name, params, actor_id)
766            .await
767    }
768
769    async fn mutate_with_current_actor(
770        &self,
771        branch: &str,
772        query_source: &str,
773        query_name: &str,
774        params: &ParamMap,
775        actor_id: Option<&str>,
776    ) -> Result<MutationResult> {
777        // Converge any pending recovery sidecar (a previously failed
778        // writer's Phase B → Phase C residual) before executing: the
779        // inline delete path advances Lance HEAD during execution and
780        // the staged path's commit-time drift guard refuses
781        // sidecar-covered drift, so a long-lived handle must heal here
782        // — not at restart. One `list_dir` when no sidecars exist (the
783        // steady state). MUST run before `open_write_txn` below — the heal
784        // may advance the manifest, so the pinned base must be captured after.
785        self.heal_pending_recovery_sidecars().await?;
786        let requested = Self::normalize_branch_name(branch)?;
787        // Reject internal `__run__*` / system-prefixed branches at the
788        // public write boundary. Direct-publish paths assert this
789        // explicitly so a caller can't write to legacy or system
790        // staging branches by passing the prefix verbatim.
791        if let Some(name) = requested.as_deref() {
792            crate::db::ensure_public_branch_ref(name, "mutate")?;
793        }
794        // Capture-once write transaction (RFC-013 step 3b). `open_write_txn`
795        // validates the schema contract ONCE (it resolves the branch target,
796        // whose first line is `ensure_schema_state_valid`) and pins the base
797        // snapshot for this write. Threaded as `Some(&txn)` through execution,
798        // staging commit, and the manifest publish so the per-table opens and
799        // the commit-time OCC re-read reuse the pinned base instead of
800        // re-validating the contract at every resolve point. Captured AFTER the
801        // recovery heal (which may advance the manifest) and AFTER `requested`
802        // is known so it pins the post-heal snapshot for the correct branch.
803        let txn = self.open_write_txn(requested.as_deref()).await?;
804        let resolved_params = enrich_mutation_params(params)?;
805
806        // Per-query staging accumulator. Inserts and updates push batches
807        // into `pending`; deletes still inline-commit and record into
808        // `inline_committed`. At end-of-query, `finalize` issues one
809        // `stage_*` + `commit_staged` per pending table, then the
810        // publisher commits the manifest atomically across all touched
811        // tables. Branch is threaded explicitly — no coordinator swap.
812        let mut staging = MutationStaging::default();
813
814        // Lower + validate up front so the touched-table set is known before
815        // execution. A lowering/validation error returns exactly as it did
816        // when this happened inside execute_named_mutation.
817        let ir = self.lower_named_mutation(query_source, query_name)?;
818
819        // Up-front fork-queue acquisition (see the loader for the full
820        // rationale): if this mutation will fork any touched table onto a
821        // non-main branch, acquire the per-(table, branch) write queues for
822        // every touched table before the first fork and hold them through the
823        // publish, so the orphan-fork reclaim can't race a concurrent
824        // in-process fork. The touched set is derived from the lowered IR.
825        let fork_queue_guards: Option<(
826            Vec<(String, Option<String>)>,
827            Vec<tokio::sync::OwnedMutexGuard<()>>,
828        )> = if let Some(active) = requested.as_deref() {
829            let snapshot = self.snapshot_for_branch(Some(active)).await?;
830            let touched: Vec<(String, Option<String>)> = self
831                .touched_table_keys(&ir)
832                .into_iter()
833                .map(|k| (k, Some(active.to_string())))
834                .collect();
835            let needs_fork = touched.iter().any(|(table_key, _)| {
836                snapshot
837                    .entry(table_key)
838                    .map(|e| e.table_branch.as_deref() != Some(active))
839                    .unwrap_or(false)
840            });
841            if needs_fork {
842                let guards = self.write_queue().acquire_many(&touched).await;
843                Some((touched, guards))
844            } else {
845                None
846            }
847        } else {
848            None
849        };
850
851        let exec_result = self
852            .execute_named_mutation(
853                &ir,
854                &resolved_params,
855                requested.as_deref(),
856                &mut staging,
857                Some(&txn),
858            )
859            .await;
860
861        match exec_result {
862            Err(e) => Err(e),
863            Ok(total) if staging.is_empty() => Ok(total),
864            Ok(total) => {
865                let staged = staging.stage_all(self, requested.as_deref()).await?;
866                // `_queue_guards` holds per-(table_key, branch) write
867                // queues acquired inside `commit_all`. Held across the
868                // manifest publish below so no concurrent writer can
869                // interleave between our commit_staged and our publish
870                // (which would correctly fail our CAS but leave Lance
871                // HEAD advanced — the residual class MR-870 recovers).
872                let super::staging::CommittedMutation {
873                    updates,
874                    expected_versions,
875                    sidecar_handle,
876                    guards: _queue_guards,
877                    committed_handles,
878                } = staged
879                    .commit_all(
880                        self,
881                        requested.as_deref(),
882                        crate::db::manifest::SidecarKind::Mutation,
883                        actor_id,
884                        fork_queue_guards,
885                        Some(&txn),
886                    )
887                    .await?;
888                // Failpoint that wedges the documented finalize→publisher
889                // residual: per-table `commit_staged` calls already
890                // advanced Lance HEAD on every touched table; a failure
891                // injected here mirrors the production-rare case where
892                // the publisher's CAS pre-check rejects (or the manifest
893                // write throws) after staged commits succeeded. The
894                // sidecar written inside `staging.finalize()` persists
895                // across this failure so the next `Omnigraph::open`'s
896                // recovery sweep can roll forward — see
897                // `tests/failpoints.rs::recovery_rolls_forward_after_finalize_publisher_failure`.
898                crate::failpoints::maybe_fail(crate::failpoints::names::MUTATION_POST_FINALIZE_PRE_PUBLISHER)?;
899                self.commit_updates_on_branch_with_expected(
900                    requested.as_deref(),
901                    &updates,
902                    &expected_versions,
903                    actor_id,
904                    Some(&txn),
905                    committed_handles,
906                )
907                .await?;
908                // Phase C succeeded — sidecar can be deleted. If this
909                // delete fails, the next open's sweep classifies every
910                // table as NoMovement (manifest pin == Lance HEAD ==
911                // post_commit_pin) and the sidecar is treated as a
912                // stale artifact (cleaned up via the Phase 2 logic).
913                if let Some(handle) = sidecar_handle {
914                    // Best-effort cleanup: the manifest publish already
915                    // succeeded, so the user's mutation is durable. A
916                    // failed delete leaves the sidecar on disk; the
917                    // next open's recovery sweep classifies every table
918                    // as `NoMovement` (manifest pin == Lance HEAD ==
919                    // post_commit_pin) and tidies up. Failing the user
920                    // here would return an error for a write that
921                    // already landed.
922                    if let Err(err) =
923                        crate::db::manifest::delete_sidecar(&handle, self.storage_adapter()).await
924                    {
925                        tracing::warn!(
926                            error = %err,
927                            operation_id = handle.operation_id.as_str(),
928                            "recovery sidecar cleanup failed; the next open's recovery sweep will resolve it"
929                        );
930                    }
931                }
932                Ok(total)
933            }
934        }
935    }
936
937    /// Lower + validate a named mutation query into its IR.
938    ///
939    /// Hoisted out of [`Self::execute_named_mutation`] so the caller can
940    /// inspect the IR before execution — specifically to compute the
941    /// touched-table set (see [`Self::touched_table_keys`]) for up-front
942    /// write-queue acquisition. Performs the same find → typecheck → lower
943    /// → D₂ checks that execution previously did inline, so error behavior
944    /// is unchanged.
945    fn lower_named_mutation(
946        &self,
947        query_source: &str,
948        query_name: &str,
949    ) -> Result<omnigraph_compiler::ir::MutationIR> {
950        let query_decl = omnigraph_compiler::find_named_query(query_source, query_name)
951            .map_err(|e| OmniError::manifest(e.to_string()))?;
952
953        let checked = typecheck_query_decl(&self.catalog(), &query_decl)?;
954        match checked {
955            CheckedQuery::Mutation(_) => {}
956            CheckedQuery::Read(_) => {
957                return Err(OmniError::manifest(
958                    "mutation execution called on a read query; use query instead".to_string(),
959                ));
960            }
961        }
962
963        let ir = lower_mutation_query(&query_decl)?;
964        // D₂: reject mixed insert/update + delete before any I/O.
965        enforce_no_mixed_destructive_constructive(&ir)?;
966        Ok(ir)
967    }
968
969    /// The COMPLETE set of `(node|edge):{type}` table keys a mutation IR can
970    /// touch at execution time, keyed as `MutationStaging`/`commit_all` key
971    /// them. Must be a superset of everything execution forks/commits, since
972    /// it drives the up-front fork-queue acquisition and `commit_all`'s
973    /// held-guard coverage check — a miss means an unserialized fork/commit.
974    ///
975    /// The set is a pure function of (IR ops + catalog). For each op it mirrors
976    /// the execute path's node-vs-edge dispatch (`node_types` first, then
977    /// `edge_types`). A `delete <Node>` additionally **cascades** to every edge
978    /// type whose endpoint is that node (see `execute_delete_node`), forking
979    /// those edge tables during execution — so they are included here, derived
980    /// the same way the executor derives them (`from_type`/`to_type` match).
981    /// Unknown types are skipped (the execute path surfaces the error).
982    /// Sorted + deduped for one-shot `acquire_many`.
983    fn touched_table_keys(&self, ir: &omnigraph_compiler::ir::MutationIR) -> Vec<String> {
984        use omnigraph_compiler::ir::MutationOpIR;
985        let catalog = self.catalog();
986        let mut keys: Vec<String> = Vec::new();
987        for op in &ir.ops {
988            let type_name = match op {
989                MutationOpIR::Insert { type_name, .. }
990                | MutationOpIR::Update { type_name, .. }
991                | MutationOpIR::Delete { type_name, .. } => type_name,
992            };
993            if catalog.node_types.contains_key(type_name) {
994                keys.push(format!("node:{type_name}"));
995                // A node delete cascades to every edge touching this node type,
996                // forking those edge tables. Include them so the up-front
997                // acquisition covers the cascade (mirrors execute_delete_node).
998                if matches!(op, MutationOpIR::Delete { .. }) {
999                    for (edge_name, edge_type) in &catalog.edge_types {
1000                        if edge_type.from_type == *type_name || edge_type.to_type == *type_name {
1001                            keys.push(format!("edge:{edge_name}"));
1002                        }
1003                    }
1004                }
1005            } else if catalog.edge_types.contains_key(type_name) {
1006                keys.push(format!("edge:{type_name}"));
1007            }
1008        }
1009        keys.sort();
1010        keys.dedup();
1011        keys
1012    }
1013
1014    async fn execute_named_mutation(
1015        &self,
1016        ir: &omnigraph_compiler::ir::MutationIR,
1017        params: &ParamMap,
1018        branch: Option<&str>,
1019        staging: &mut MutationStaging,
1020        txn: Option<&crate::db::WriteTxn>,
1021    ) -> Result<MutationResult> {
1022        let mut total = MutationResult::default();
1023        for op in &ir.ops {
1024            let result = match op {
1025                MutationOpIR::Insert {
1026                    type_name,
1027                    assignments,
1028                } => {
1029                    self.execute_insert(type_name, assignments, params, branch, staging, txn)
1030                        .await?
1031                }
1032                MutationOpIR::Update {
1033                    type_name,
1034                    assignments,
1035                    predicate,
1036                } => {
1037                    self.execute_update(
1038                        type_name, assignments, predicate, params, branch, staging, txn,
1039                    )
1040                    .await?
1041                }
1042                MutationOpIR::Delete {
1043                    type_name,
1044                    predicate,
1045                } => {
1046                    self.execute_delete(type_name, predicate, params, branch, staging, txn)
1047                        .await?
1048                }
1049            };
1050            total.affected_nodes += result.affected_nodes;
1051            total.affected_edges += result.affected_edges;
1052        }
1053        Ok(total)
1054    }
1055
1056    async fn execute_insert(
1057        &self,
1058        type_name: &str,
1059        assignments: &[IRAssignment],
1060        params: &ParamMap,
1061        branch: Option<&str>,
1062        staging: &mut MutationStaging,
1063        txn: Option<&crate::db::WriteTxn>,
1064    ) -> Result<MutationResult> {
1065        let mut resolved: HashMap<String, Literal> = HashMap::new();
1066        for a in assignments {
1067            resolved.insert(a.property.clone(), resolve_expr_value(&a.value, params)?);
1068        }
1069
1070        let is_node = self.catalog().node_types.contains_key(type_name);
1071        let is_edge = self.catalog().edge_types.contains_key(type_name);
1072
1073        if is_node {
1074            let node_type = &self.catalog().node_types[type_name];
1075            let schema = node_type.arrow_schema.clone();
1076            let blob_props = node_type.blob_properties.clone();
1077            let id = if let Some(key_prop) = node_type.key_property() {
1078                match resolved.get(key_prop) {
1079                    Some(Literal::String(s)) => s.clone(),
1080                    Some(other) => literal_to_sql(other).trim_matches('\'').to_string(),
1081                    None => {
1082                        return Err(OmniError::manifest(format!(
1083                            "insert missing @key property '{}'",
1084                            key_prop
1085                        )));
1086                    }
1087                }
1088            } else {
1089                ulid::Ulid::new().to_string()
1090            };
1091
1092            let batch = build_insert_batch(&schema, &id, &resolved, &blob_props)?;
1093            crate::loader::validate_value_constraints(&batch, node_type)?;
1094            crate::loader::validate_enum_constraints(&batch, &node_type.properties, type_name)?;
1095            let unique_groups = crate::loader::unique_constraint_groups_for_node(node_type);
1096            if !unique_groups.is_empty() {
1097                crate::loader::enforce_unique_constraints_intra_batch(
1098                    &batch,
1099                    type_name,
1100                    &unique_groups,
1101                )?;
1102            }
1103            let has_key = node_type.key_property().is_some();
1104            let table_key = format!("node:{}", type_name);
1105            // Capture pre-write metadata on first touch (no Lance write).
1106            let insert_kind = if has_key {
1107                crate::db::MutationOpKind::Merge
1108            } else {
1109                crate::db::MutationOpKind::Insert
1110            };
1111            // Node inserts are non-strict (Insert/Merge), so with a `WriteTxn`
1112            // this opens NOTHING (collapse #1) — the handle is discarded anyway;
1113            // only `ensure_path`'s captured version (read inside
1114            // `open_table_for_mutation`) is used downstream.
1115            let (_ds, _full_path, _table_branch) =
1116                open_table_for_mutation(self, staging, branch, &table_key, insert_kind, txn).await?;
1117            // Accumulate. @key inserts go into the Merge stream (so a
1118            // later update on the same id coalesces correctly); no-key
1119            // inserts go into the Append stream.
1120            let mode = if has_key {
1121                PendingMode::Merge
1122            } else {
1123                PendingMode::Append
1124            };
1125            staging.append_batch(&table_key, schema, mode, batch)?;
1126
1127            Ok(MutationResult {
1128                affected_nodes: 1,
1129                affected_edges: 0,
1130            })
1131        } else if is_edge {
1132            let edge_type = &self.catalog().edge_types[type_name];
1133            let schema = edge_type.arrow_schema.clone();
1134            let blob_props = edge_type.blob_properties.clone();
1135            let id = ulid::Ulid::new().to_string();
1136
1137            let batch = build_insert_batch(&schema, &id, &resolved, &blob_props)?;
1138            validate_edge_insert_endpoints(self, staging, branch, type_name, &resolved).await?;
1139            crate::loader::validate_enum_constraints(&batch, &edge_type.properties, type_name)?;
1140            let unique_groups = crate::loader::unique_constraint_groups_for_edge(edge_type);
1141            if !unique_groups.is_empty() {
1142                crate::loader::enforce_unique_constraints_intra_batch(
1143                    &batch,
1144                    type_name,
1145                    &unique_groups,
1146                )?;
1147            }
1148            let table_key = format!("edge:{}", type_name);
1149            // Capture pre-write metadata on first touch. Edge inserts are
1150            // non-strict, so with a `WriteTxn` this opens NOTHING (collapse #1)
1151            // and returns `None`.
1152            let (handle, _full_path, _table_branch) = open_table_for_mutation(
1153                self,
1154                staging,
1155                branch,
1156                &table_key,
1157                crate::db::MutationOpKind::Insert,
1158                txn,
1159            )
1160            .await?;
1161            // Accumulate the new edge row. Edge IDs are ULID-generated so
1162            // Append mode is correct (no key-based dedup needed).
1163            staging.append_batch(&table_key, schema, PendingMode::Append, batch.clone())?;
1164
1165            // Edge cardinality validation: scan committed edges via Lance
1166            // + iterate pending edges in-memory for the `src` column,
1167            // group-by-src. The pending side already includes the row
1168            // we just appended (above). When the open was skipped (collapse
1169            // #1), resolve a read handle for the committed scan at LIVE HEAD
1170            // (`edge_cardinality_read_handle`, #298) — NOT the pinned txn.base,
1171            // which would undercount edges a concurrent writer committed since
1172            // capture. Only when cardinality is non-default, so the common
1173            // default-cardinality edge keeps the open-free path. (The residual
1174            // validate→commit race is the §7.1 gap — step 4.)
1175            if !edge_type.cardinality.is_default() {
1176                let committed_ds = match handle {
1177                    Some(h) => h,
1178                    None => self.edge_cardinality_read_handle(txn, &table_key).await?,
1179                };
1180                validate_edge_cardinality_with_pending(
1181                    self,
1182                    &committed_ds,
1183                    staging,
1184                    &table_key,
1185                    edge_type,
1186                )
1187                .await?;
1188            }
1189
1190            self.invalidate_graph_index().await;
1191
1192            Ok(MutationResult {
1193                affected_nodes: 0,
1194                affected_edges: 1,
1195            })
1196        } else {
1197            Err(OmniError::manifest(format!("unknown type '{}'", type_name)))
1198        }
1199    }
1200
1201    async fn execute_update(
1202        &self,
1203        type_name: &str,
1204        assignments: &[IRAssignment],
1205        predicate: &IRMutationPredicate,
1206        params: &ParamMap,
1207        branch: Option<&str>,
1208        staging: &mut MutationStaging,
1209        txn: Option<&crate::db::WriteTxn>,
1210    ) -> Result<MutationResult> {
1211        // Defense in depth: ensure this is a node type
1212        if !self.catalog().node_types.contains_key(type_name) {
1213            return Err(OmniError::manifest(format!(
1214                "update is only supported for node types, not '{}'",
1215                type_name
1216            )));
1217        }
1218
1219        // Reject updates to @key properties — identity is immutable
1220        if let Some(key_prop) = self.catalog().node_types[type_name].key_property() {
1221            if assignments.iter().any(|a| a.property == key_prop) {
1222                return Err(OmniError::manifest(format!(
1223                    "cannot update @key property '{}' — delete and re-insert instead",
1224                    key_prop
1225                )));
1226            }
1227        }
1228
1229        let pred_sql = predicate_to_sql(predicate, params, false)?;
1230        let schema = self.catalog().node_types[type_name].arrow_schema.clone();
1231        let blob_props = self.catalog().node_types[type_name].blob_properties.clone();
1232
1233        let table_key = format!("node:{}", type_name);
1234        let (handle, _full_path, _table_branch) = open_table_for_mutation(
1235            self,
1236            staging,
1237            branch,
1238            &table_key,
1239            crate::db::MutationOpKind::Update,
1240            txn,
1241        )
1242        .await?;
1243        // Update is a STRICT op, so collapse #1 never skips its open — the
1244        // handle is always `Some` (and it's needed for the committed scan below).
1245        let ds = handle.expect("strict Update op always opens its dataset");
1246
1247        // Scan committed via Lance + apply the same predicate to pending
1248        // batches via DataFusion `MemTable` (read-your-writes for prior
1249        // ops in this query). The pending side may include rows from
1250        // earlier `insert` / `update` ops on the same table.
1251        //
1252        // For blob tables we project away the blob columns: Lance's
1253        // scanner doesn't accept the standard projection path on blob
1254        // descriptors and would panic with a `Field::project` assertion.
1255        // The downstream `apply_assignments` synthesizes blob columns
1256        // from explicit assignments and omits unassigned blobs (Lance's
1257        // merge_insert leaves them untouched). Tables without blob
1258        // columns scan the full schema unprojected.
1259        let non_blob_cols: Vec<&str> = schema
1260            .fields()
1261            .iter()
1262            .filter(|f| !blob_props.contains(f.name()))
1263            .map(|f| f.name().as_str())
1264            .collect();
1265        let projection: Option<&[&str]> =
1266            (!blob_props.is_empty()).then_some(non_blob_cols.as_slice());
1267        let pending_batches = staging.pending_batches(&table_key);
1268        let pending_schema = staging.pending_schema(&table_key);
1269        // Use merge semantics on the union: a committed row whose `id`
1270        // also appears in pending has been logically updated by an
1271        // earlier op in this query and is shadowed from the scan,
1272        // otherwise the predicate runs against stale committed values
1273        // and a chained `update where <pred>` can match a row whose
1274        // pending value no longer satisfies <pred>.
1275        let batches = self
1276            .storage()
1277            .scan_with_pending(
1278                &ds,
1279                pending_batches,
1280                pending_schema,
1281                projection,
1282                Some(&pred_sql),
1283                Some("id"),
1284            )
1285            .await?;
1286
1287        if batches.is_empty() || batches.iter().all(|b| b.num_rows() == 0) {
1288            return Ok(MutationResult {
1289                affected_nodes: 0,
1290                affected_edges: 0,
1291            });
1292        }
1293
1294        // Concat the matched batches (committed + pending) into one. The
1295        // helper trusts that both sides share a schema — Lance returns
1296        // dataset-schema-ordered columns and DataFusion returns
1297        // MemTable-schema-ordered columns; both should match the catalog's
1298        // arrow_schema when the projection is consistent. If they
1299        // diverge (typically a blob-table mid-schema-shift), the helper
1300        // surfaces a clear error directing the caller to split the
1301        // mutation.
1302        let matched = concat_match_batches_to_schema(&schema, &blob_props, batches)?;
1303
1304        let affected_count = matched.num_rows();
1305
1306        let mut resolved: HashMap<String, Literal> = HashMap::new();
1307        for a in assignments {
1308            resolved.insert(a.property.clone(), resolve_expr_value(&a.value, params)?);
1309        }
1310        let updated = apply_assignments(&schema, &matched, &resolved, &blob_props)?;
1311        let node_type = &self.catalog().node_types[type_name];
1312        crate::loader::validate_value_constraints(&updated, node_type)?;
1313        crate::loader::validate_enum_constraints(&updated, &node_type.properties, type_name)?;
1314        let unique_groups = crate::loader::unique_constraint_groups_for_node(node_type);
1315        if !unique_groups.is_empty() {
1316            crate::loader::enforce_unique_constraints_intra_batch(
1317                &updated,
1318                type_name,
1319                &unique_groups,
1320            )?;
1321        }
1322
1323        // Accumulate the updated batch into the Merge-mode pending stream.
1324        // The accumulator may now contain entries with the same id as a
1325        // prior insert or update on this table; `MutationStaging::finalize`
1326        // dedupes by id (last-occurrence wins) before issuing the single
1327        // `stage_merge_insert` call at end-of-query.
1328        let updated_schema = updated.schema();
1329        staging.append_batch(&table_key, updated_schema, PendingMode::Merge, updated)?;
1330
1331        Ok(MutationResult {
1332            affected_nodes: affected_count,
1333            affected_edges: 0,
1334        })
1335    }
1336
1337    async fn execute_delete(
1338        &self,
1339        type_name: &str,
1340        predicate: &IRMutationPredicate,
1341        params: &ParamMap,
1342        branch: Option<&str>,
1343        staging: &mut MutationStaging,
1344        txn: Option<&crate::db::WriteTxn>,
1345    ) -> Result<MutationResult> {
1346        let is_node = self.catalog().node_types.contains_key(type_name);
1347        if is_node {
1348            self.execute_delete_node(type_name, predicate, params, branch, staging, txn)
1349                .await
1350        } else {
1351            self.execute_delete_edge(type_name, predicate, params, branch, staging, txn)
1352                .await
1353        }
1354    }
1355
1356    async fn execute_delete_node(
1357        &self,
1358        type_name: &str,
1359        predicate: &IRMutationPredicate,
1360        params: &ParamMap,
1361        branch: Option<&str>,
1362        staging: &mut MutationStaging,
1363        txn: Option<&crate::db::WriteTxn>,
1364    ) -> Result<MutationResult> {
1365        let pred_sql = predicate_to_sql(predicate, params, false)?;
1366
1367        let table_key = format!("node:{}", type_name);
1368        let (handle, full_path, table_branch) = open_table_for_mutation(
1369            self,
1370            staging,
1371            branch,
1372            &table_key,
1373            crate::db::MutationOpKind::Delete,
1374            txn,
1375        )
1376        .await?;
1377        // Delete is a STRICT op, so collapse #1 never skips its open.
1378        let ds = handle.expect("strict Delete op always opens its dataset");
1379        let initial_version = ds.version();
1380
1381        // Scan matching IDs for cascade. Per D₂ this never overlaps with
1382        // staged inserts (mixed insert/delete in one query is rejected at
1383        // parse time), so we scan committed only.
1384        let batches = self
1385            .storage()
1386            .scan(&ds, Some(&["id"]), Some(&pred_sql), None)
1387            .await?;
1388
1389        let deleted_ids: Vec<String> = batches
1390            .iter()
1391            .flat_map(|batch| {
1392                let ids = batch
1393                    .column(0)
1394                    .as_any()
1395                    .downcast_ref::<StringArray>()
1396                    .unwrap();
1397                (0..ids.len())
1398                    .map(|i| ids.value(i).to_string())
1399                    .collect::<Vec<_>>()
1400            })
1401            .collect();
1402
1403        if deleted_ids.is_empty() {
1404            return Ok(MutationResult {
1405                affected_nodes: 0,
1406                affected_edges: 0,
1407            });
1408        }
1409
1410        let affected_nodes = deleted_ids.len();
1411
1412        // Delete nodes — still inline-commit (Lance's `Dataset::delete` is
1413        // not exposed as a two-phase op in 6.0.1). D₂ keeps inserts and
1414        // deletes from coexisting in one query, so this advance of Lance
1415        // HEAD is the only HEAD movement during the query and the
1416        // publisher's CAS captures it intact.
1417        let ds = self
1418            .reopen_for_mutation(
1419                &table_key,
1420                &full_path,
1421                table_branch.as_deref(),
1422                initial_version,
1423                crate::db::MutationOpKind::Delete,
1424            )
1425            .await?;
1426        crate::failpoints::maybe_fail(crate::failpoints::names::MUTATION_DELETE_NODE_PRE_PRIMARY_DELETE)?;
1427        let (_new_ds, delete_state) = self
1428            .storage_inline_residual()
1429            .delete_where(&full_path, ds, &pred_sql)
1430            .await?;
1431
1432        staging.record_inline(crate::db::SubTableUpdate {
1433            table_key: table_key.clone(),
1434            table_version: delete_state.version,
1435            table_branch: table_branch.clone(),
1436            row_count: delete_state.row_count,
1437            version_metadata: delete_state.version_metadata,
1438        });
1439
1440        let mut affected_edges = 0usize;
1441        let escaped: Vec<String> = deleted_ids
1442            .iter()
1443            .map(|id| format!("'{}'", id.replace('\'', "''")))
1444            .collect();
1445        let id_list = escaped.join(", ");
1446
1447        let edge_info: Vec<(String, String, String)> = self
1448            .catalog()
1449            .edge_types
1450            .iter()
1451            .map(|(name, et)| (name.clone(), et.from_type.clone(), et.to_type.clone()))
1452            .collect();
1453
1454        for (edge_name, from_type, to_type) in &edge_info {
1455            let mut cascade_filters = Vec::new();
1456            if from_type == type_name {
1457                cascade_filters.push(format!("src IN ({})", id_list));
1458            }
1459            if to_type == type_name {
1460                cascade_filters.push(format!("dst IN ({})", id_list));
1461            }
1462            if cascade_filters.is_empty() {
1463                continue;
1464            }
1465
1466            let edge_table_key = format!("edge:{}", edge_name);
1467            let cascade_filter = cascade_filters.join(" OR ");
1468            let (edge_handle, edge_full_path, edge_table_branch) = open_table_for_mutation(
1469                self,
1470                staging,
1471                branch,
1472                &edge_table_key,
1473                crate::db::MutationOpKind::Delete,
1474                txn,
1475            )
1476            .await?;
1477            // Delete is a STRICT op, so collapse #1 never skips its open.
1478            let edge_ds = edge_handle.expect("strict Delete op always opens its dataset");
1479
1480            let (_new_edge_ds, edge_delete) = self
1481                .storage_inline_residual()
1482                .delete_where(&edge_full_path, edge_ds, &cascade_filter)
1483                .await?;
1484
1485            affected_edges += edge_delete.deleted_rows;
1486
1487            if edge_delete.deleted_rows > 0 {
1488                staging.record_inline(crate::db::SubTableUpdate {
1489                    table_key: edge_table_key,
1490                    table_version: edge_delete.version,
1491                    table_branch: edge_table_branch,
1492                    row_count: edge_delete.row_count,
1493                    version_metadata: edge_delete.version_metadata,
1494                });
1495            }
1496        }
1497
1498        if affected_edges > 0 {
1499            self.invalidate_graph_index().await;
1500        }
1501
1502        Ok(MutationResult {
1503            affected_nodes,
1504            affected_edges,
1505        })
1506    }
1507
1508    async fn execute_delete_edge(
1509        &self,
1510        type_name: &str,
1511        predicate: &IRMutationPredicate,
1512        params: &ParamMap,
1513        branch: Option<&str>,
1514        staging: &mut MutationStaging,
1515        txn: Option<&crate::db::WriteTxn>,
1516    ) -> Result<MutationResult> {
1517        let pred_sql = predicate_to_sql(predicate, params, true)?;
1518
1519        let table_key = format!("edge:{}", type_name);
1520        let (handle, full_path, table_branch) = open_table_for_mutation(
1521            self,
1522            staging,
1523            branch,
1524            &table_key,
1525            crate::db::MutationOpKind::Delete,
1526            txn,
1527        )
1528        .await?;
1529        // Delete is a STRICT op, so collapse #1 never skips its open.
1530        let ds = handle.expect("strict Delete op always opens its dataset");
1531
1532        let (_new_ds, delete_state) = self
1533            .storage_inline_residual()
1534            .delete_where(&full_path, ds, &pred_sql)
1535            .await?;
1536        let affected = delete_state.deleted_rows;
1537
1538        if affected > 0 {
1539            staging.record_inline(crate::db::SubTableUpdate {
1540                table_key,
1541                table_version: delete_state.version,
1542                table_branch,
1543                row_count: delete_state.row_count,
1544                version_metadata: delete_state.version_metadata,
1545            });
1546            self.invalidate_graph_index().await;
1547        }
1548
1549        Ok(MutationResult {
1550            affected_nodes: 0,
1551            affected_edges: affected,
1552        })
1553    }
1554}
1555
1556/// Concat the matched batches from `scan_with_pending` into a single batch.
1557/// `scan_with_pending` returns committed-side and pending-side batches in
1558/// order; both should share a schema if pending was produced through
1559/// `apply_assignments` with full-schema scan input. If schemas drift,
1560/// surface a clear error so the user can split the query.
1561fn concat_match_batches_to_schema(
1562    _schema: &SchemaRef,
1563    _blob_properties: &HashSet<String>,
1564    batches: Vec<RecordBatch>,
1565) -> Result<RecordBatch> {
1566    if batches.len() == 1 {
1567        return Ok(batches.into_iter().next().unwrap());
1568    }
1569    let common = batches[0].schema();
1570    arrow_select::concat::concat_batches(&common, &batches).map_err(|e| {
1571        OmniError::Lance(format!(
1572            "scan_with_pending returned batches with mismatched schemas \
1573             across the committed/pending boundary; this typically indicates \
1574             a blob-column shape mismatch between the committed table and a \
1575             prior in-query insert/update. Split blob-touching mutations \
1576             into separate queries. ({})",
1577            e
1578        ))
1579    })
1580}
1581
1582/// Validate `@card` bounds against committed (Lance) + pending (in-memory)
1583/// edges for one edge table. Engine path: each insert produces a fresh
1584/// ULID id, so committed and pending cannot share a primary key — no
1585/// dedup needed (`dedupe_key_column = None`).
1586async fn validate_edge_cardinality_with_pending(
1587    db: &Omnigraph,
1588    committed_ds: &SnapshotHandle,
1589    staging: &MutationStaging,
1590    table_key: &str,
1591    edge_type: &omnigraph_compiler::catalog::EdgeType,
1592) -> Result<()> {
1593    if edge_type.cardinality.is_default() {
1594        return Ok(());
1595    }
1596    let counts =
1597        super::staging::count_src_per_edge(db, committed_ds, table_key, staging, None).await?;
1598    super::staging::enforce_cardinality_bounds(edge_type, &counts)
1599}
1600
1601fn enrich_mutation_params(params: &ParamMap) -> Result<ParamMap> {
1602    let mut resolved = params.clone();
1603    if !resolved.contains_key(NOW_PARAM_NAME) {
1604        let now = OffsetDateTime::now_utc()
1605            .format(&Rfc3339)
1606            .map_err(|e| OmniError::manifest(format!("failed to format now(): {}", e)))?;
1607        resolved.insert(NOW_PARAM_NAME.to_string(), Literal::DateTime(now));
1608    }
1609    Ok(resolved)
1610}
1611
1612#[cfg(test)]
1613mod predicate_sql_tests {
1614    use super::*;
1615
1616    // #283: a camelCase column in a mutation predicate must be emitted
1617    // UNQUOTED and case-preserved. The committed-scan consumer, Lance's
1618    // `Scanner::filter(&str)`, preserves an unquoted identifier's case but
1619    // treats a double-quoted `"col"` as a string literal (which silently
1620    // matches zero rows), so the predicate string must not quote the column.
1621    // The pending MemTable path stays case-preserving by disabling DataFusion
1622    // identifier normalization on its context, not by quoting here.
1623    #[test]
1624    fn predicate_to_sql_preserves_camelcase_column_unquoted() {
1625        let predicate = IRMutationPredicate {
1626            property: "repoName".to_string(),
1627            op: CompOp::Eq,
1628            value: IRExpr::Literal(Literal::String("acme".into())),
1629        };
1630        let sql = predicate_to_sql(&predicate, &ParamMap::new(), false).unwrap();
1631        assert_eq!(
1632            sql, "repoName = 'acme'",
1633            "column must be unquoted and case-preserved, got {sql}"
1634        );
1635    }
1636}