omnigraph-engine 0.4.1

Runtime engine for the Omnigraph graph database.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
use arrow_array::{RecordBatch, UInt64Array};
use arrow_schema::SchemaRef;
use arrow_select::concat::concat_batches;
use futures::TryStreamExt;
use lance::Dataset;
use lance::dataset::scanner::{ColumnOrdering, DatasetRecordBatchStream, Scanner};
use lance::dataset::transaction::{Operation, Transaction, TransactionBuilder};
use lance::dataset::{
    CommitBuilder, InsertBuilder, MergeInsertBuilder, WhenMatched, WhenNotMatched, WriteMode,
    WriteParams,
};
use lance::datatypes::BlobHandling;
use lance::index::scalar::IndexDetails;
use lance_file::version::LanceFileVersion;
use lance_index::scalar::{InvertedIndexParams, ScalarIndexParams};
use lance_index::{DatasetIndexExt, IndexType, is_system_index};
use lance_linalg::distance::MetricType;
use lance_table::format::{Fragment, IndexMetadata, RowIdMeta};
use lance_table::rowids::{RowIdSequence, write_row_ids};
use std::sync::Arc;

use crate::db::manifest::{TableVersionMetadata, open_table_head_for_write};
use crate::db::{Snapshot, SubTableEntry};
use crate::error::{OmniError, Result};

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct TableState {
    pub version: u64,
    pub row_count: u64,
    pub(crate) version_metadata: TableVersionMetadata,
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct DeleteState {
    pub version: u64,
    pub row_count: u64,
    pub deleted_rows: usize,
    pub(crate) version_metadata: TableVersionMetadata,
}

/// A Lance write that has produced fragment files on object storage but is
/// not yet committed to the dataset's manifest. The staged-write primitives
/// are consumed by `MutationStaging` (`exec/staging.rs`,
/// `exec/mutation.rs`) and the bulk loader (`loader/mod.rs`). The
/// intent: defer Lance commits to end-of-query so a mid-query failure
/// leaves the touched table at the pre-mutation HEAD instead of
/// drifting ahead. See `docs/runs.md` for the publisher-CAS contract
/// this builds on.
///
/// `transaction` is opaque from our side — Lance owns its semantics. We
/// commit it via `CommitBuilder::execute(transaction)` (see
/// `TableStore::commit_staged`).
///
/// For read-your-writes within the same query, `new_fragments` and
/// `removed_fragment_ids` together describe the post-stage view delta:
/// `scan_with_staged` (and `count_rows_with_staged`) compose
/// `committed - removed + new` so subsequent reads see the staged result
/// without double-counting fragments that `Operation::Update` rewrote.
/// Without `removed_fragment_ids`, a `stage_merge_insert` that rewrites
/// existing fragments would yield duplicate rows (the original fragment
/// stays in the committed manifest while its rewrite shows up in `new_fragments`).
#[derive(Debug, Clone)]
pub struct StagedWrite {
    pub transaction: Transaction,
    /// Fragments to surface alongside the committed manifest in
    /// `Scanner::with_fragments(committed - removed + new)`. For
    /// `Operation::Append` these are the freshly-appended fragments. For
    /// `Operation::Update` (merge_insert) these are
    /// `updated_fragments + new_fragments` (rewrites + freshly-inserted
    /// rows).
    pub new_fragments: Vec<Fragment>,
    /// Fragment IDs that this staged write supersedes. The committed
    /// manifest must filter these out before being combined with
    /// `new_fragments` for read-your-writes scans, otherwise rewrites
    /// yield duplicate rows. Empty for `stage_append` (`Operation::Append`
    /// adds without removing anything); populated from
    /// `Operation::Update.removed_fragment_ids` for `stage_merge_insert`.
    pub removed_fragment_ids: Vec<u64>,
}

#[derive(Debug, Clone)]
pub struct TableStore {
    root_uri: String,
}

impl TableStore {
    pub fn new(root_uri: &str) -> Self {
        Self {
            root_uri: root_uri.trim_end_matches('/').to_string(),
        }
    }

    pub fn root_uri(&self) -> &str {
        &self.root_uri
    }

    pub fn dataset_uri(&self, table_path: &str) -> String {
        format!("{}/{}", self.root_uri, table_path)
    }

    fn table_path_from_dataset_uri(&self, dataset_uri: &str) -> Result<String> {
        let prefix = format!("{}/", self.root_uri.trim_end_matches('/'));
        let table_path = dataset_uri
            .strip_prefix(&prefix)
            .map(|path| path.to_string())
            .ok_or_else(|| {
                OmniError::manifest_internal(format!(
                    "dataset uri '{}' is not under root '{}'",
                    dataset_uri, self.root_uri
                ))
            })?;
        Ok(table_path
            .split_once("/tree/")
            .map(|(path, _)| path.to_string())
            .unwrap_or(table_path))
    }

    fn dataset_version_metadata(
        &self,
        dataset_uri: &str,
        ds: &Dataset,
    ) -> Result<TableVersionMetadata> {
        let table_path = self.table_path_from_dataset_uri(dataset_uri)?;
        TableVersionMetadata::from_dataset(&self.root_uri, &table_path, ds)
    }

    pub async fn open_snapshot_table(
        &self,
        snapshot: &Snapshot,
        table_key: &str,
    ) -> Result<Dataset> {
        snapshot.open(table_key).await
    }

    pub async fn open_at_entry(&self, entry: &SubTableEntry) -> Result<Dataset> {
        entry.open(&self.root_uri).await
    }

    pub async fn open_dataset_head(
        &self,
        dataset_uri: &str,
        branch: Option<&str>,
    ) -> Result<Dataset> {
        let ds = Dataset::open(dataset_uri)
            .await
            .map_err(|e| OmniError::Lance(e.to_string()))?;
        match branch {
            Some(branch) if branch != "main" => ds
                .checkout_branch(branch)
                .await
                .map_err(|e| OmniError::Lance(e.to_string())),
            _ => Ok(ds),
        }
    }

    pub async fn open_dataset_head_for_write(
        &self,
        table_key: &str,
        dataset_uri: &str,
        branch: Option<&str>,
    ) -> Result<Dataset> {
        let table_path = self.table_path_from_dataset_uri(dataset_uri)?;
        open_table_head_for_write(&self.root_uri, table_key, &table_path, branch).await
    }

    pub async fn delete_branch(&self, dataset_uri: &str, branch: &str) -> Result<()> {
        let mut ds = Dataset::open(dataset_uri)
            .await
            .map_err(|e| OmniError::Lance(e.to_string()))?;
        ds.delete_branch(branch)
            .await
            .map_err(|e| OmniError::Lance(e.to_string()))
    }

    pub async fn open_dataset_at_state(
        &self,
        table_path: &str,
        branch: Option<&str>,
        version: u64,
    ) -> Result<Dataset> {
        let ds = self
            .open_dataset_head(&self.dataset_uri(table_path), branch)
            .await?;
        ds.checkout_version(version)
            .await
            .map_err(|e| OmniError::Lance(e.to_string()))
    }

    pub fn ensure_expected_version(
        &self,
        ds: &Dataset,
        table_key: &str,
        expected_version: u64,
    ) -> Result<()> {
        let actual = ds.version().version;
        if actual != expected_version {
            // Use the structured ExpectedVersionMismatch variant so callers
            // (and the HTTP server) can match on details rather than parsing
            // the message. This drift is a publisher-style OCC failure: the
            // caller's pre-write view of the table version is stale relative
            // to the on-disk Lance head.
            return Err(OmniError::manifest_expected_version_mismatch(
                table_key,
                expected_version,
                actual,
            ));
        }
        Ok(())
    }

    pub async fn reopen_for_mutation(
        &self,
        dataset_uri: &str,
        branch: Option<&str>,
        table_key: &str,
        expected_version: u64,
    ) -> Result<Dataset> {
        let ds = self
            .open_dataset_head_for_write(table_key, dataset_uri, branch)
            .await?;
        self.ensure_expected_version(&ds, table_key, expected_version)?;
        Ok(ds)
    }

    pub async fn fork_branch_from_state(
        &self,
        dataset_uri: &str,
        source_branch: Option<&str>,
        table_key: &str,
        source_version: u64,
        target_branch: &str,
    ) -> Result<Dataset> {
        let mut source_ds = self
            .open_dataset_head(dataset_uri, source_branch)
            .await?
            .checkout_version(source_version)
            .await
            .map_err(|e| OmniError::Lance(e.to_string()))?;
        self.ensure_expected_version(&source_ds, table_key, source_version)?;

        match source_ds
            .create_branch(target_branch, source_version, None)
            .await
        {
            Ok(_) => {}
            Err(create_err) => match self
                .open_dataset_head(dataset_uri, Some(target_branch))
                .await
            {
                Ok(ds) => {
                    self.ensure_expected_version(&ds, table_key, source_version)?;
                    return Ok(ds);
                }
                Err(_) => return Err(OmniError::Lance(create_err.to_string())),
            },
        }

        let ds = self
            .open_dataset_head(dataset_uri, Some(target_branch))
            .await?;
        self.ensure_expected_version(&ds, table_key, source_version)?;
        Ok(ds)
    }

    pub async fn scan_batches(&self, ds: &Dataset) -> Result<Vec<RecordBatch>> {
        self.scan(ds, None, None, None).await
    }

    pub async fn scan_batches_for_rewrite(&self, ds: &Dataset) -> Result<Vec<RecordBatch>> {
        let has_blob_columns = ds.schema().fields_pre_order().any(|field| field.is_blob());
        if !has_blob_columns {
            return self.scan_batches(ds).await;
        }

        let mut scanner = ds.scan();
        scanner.blob_handling(BlobHandling::AllBinary);
        scanner
            .try_into_stream()
            .await
            .map_err(|e| OmniError::Lance(e.to_string()))?
            .try_collect()
            .await
            .map_err(|e| OmniError::Lance(e.to_string()))
    }

    pub async fn scan_stream(
        ds: &Dataset,
        projection: Option<&[&str]>,
        filter: Option<&str>,
        order_by: Option<Vec<ColumnOrdering>>,
        with_row_id: bool,
    ) -> Result<DatasetRecordBatchStream> {
        Self::scan_stream_with(ds, projection, filter, order_by, with_row_id, |_| Ok(())).await
    }

    pub async fn scan_stream_with<F>(
        ds: &Dataset,
        projection: Option<&[&str]>,
        filter: Option<&str>,
        order_by: Option<Vec<ColumnOrdering>>,
        with_row_id: bool,
        configure: F,
    ) -> Result<DatasetRecordBatchStream>
    where
        F: FnOnce(&mut Scanner) -> Result<()>,
    {
        let mut scanner = ds.scan();
        if with_row_id {
            scanner.with_row_id();
        }
        if let Some(columns) = projection {
            scanner
                .project(columns)
                .map_err(|e| OmniError::Lance(e.to_string()))?;
        }
        if let Some(filter_sql) = filter {
            scanner
                .filter(filter_sql)
                .map_err(|e| OmniError::Lance(e.to_string()))?;
        }
        if let Some(ordering) = order_by {
            scanner
                .order_by(Some(ordering))
                .map_err(|e| OmniError::Lance(e.to_string()))?;
        }
        configure(&mut scanner)?;
        scanner
            .try_into_stream()
            .await
            .map_err(|e| OmniError::Lance(e.to_string()))
    }

    pub async fn scan(
        &self,
        ds: &Dataset,
        projection: Option<&[&str]>,
        filter: Option<&str>,
        order_by: Option<Vec<ColumnOrdering>>,
    ) -> Result<Vec<RecordBatch>> {
        Self::scan_stream(ds, projection, filter, order_by, false)
            .await?
            .try_collect()
            .await
            .map_err(|e| OmniError::Lance(e.to_string()))
    }

    pub async fn scan_with<F>(
        &self,
        ds: &Dataset,
        projection: Option<&[&str]>,
        filter: Option<&str>,
        order_by: Option<Vec<ColumnOrdering>>,
        with_row_id: bool,
        configure: F,
    ) -> Result<Vec<RecordBatch>>
    where
        F: FnOnce(&mut Scanner) -> Result<()>,
    {
        Self::scan_stream_with(ds, projection, filter, order_by, with_row_id, configure)
            .await?
            .try_collect()
            .await
            .map_err(|e| OmniError::Lance(e.to_string()))
    }

    pub async fn count_rows(&self, ds: &Dataset, filter: Option<String>) -> Result<usize> {
        ds.count_rows(filter)
            .await
            .map(|count| count as usize)
            .map_err(|e| OmniError::Lance(e.to_string()))
    }

    pub fn dataset_version(&self, ds: &Dataset) -> u64 {
        ds.version().version
    }

    pub async fn table_state(&self, dataset_uri: &str, ds: &Dataset) -> Result<TableState> {
        Ok(TableState {
            version: self.dataset_version(ds),
            row_count: self.count_rows(ds, None).await? as u64,
            version_metadata: self.dataset_version_metadata(dataset_uri, ds)?,
        })
    }

    pub async fn append_batch(
        &self,
        dataset_uri: &str,
        ds: &mut Dataset,
        batch: RecordBatch,
    ) -> Result<TableState> {
        if batch.num_rows() == 0 {
            return self.table_state(dataset_uri, ds).await;
        }
        let schema = batch.schema();
        let reader = arrow_array::RecordBatchIterator::new(vec![Ok(batch)], schema);
        let params = WriteParams {
            mode: WriteMode::Append,
            allow_external_blob_outside_bases: true,
            ..Default::default()
        };
        ds.append(reader, Some(params))
            .await
            .map_err(|e| OmniError::Lance(e.to_string()))?;
        self.table_state(dataset_uri, ds).await
    }

    pub async fn append_or_create_batch(
        dataset_uri: &str,
        dataset: Option<Dataset>,
        batch: RecordBatch,
    ) -> Result<Dataset> {
        let reader = arrow_array::RecordBatchIterator::new(vec![Ok(batch.clone())], batch.schema());
        match dataset {
            Some(mut ds) => {
                let params = WriteParams {
                    mode: WriteMode::Append,
                    allow_external_blob_outside_bases: true,
                    ..Default::default()
                };
                ds.append(reader, Some(params))
                    .await
                    .map_err(|e| OmniError::Lance(e.to_string()))?;
                Ok(ds)
            }
            None => {
                let params = WriteParams {
                    mode: WriteMode::Create,
                    enable_stable_row_ids: true,
                    data_storage_version: Some(LanceFileVersion::V2_2),
                    allow_external_blob_outside_bases: true,
                    ..Default::default()
                };
                Dataset::write(reader, dataset_uri, Some(params))
                    .await
                    .map_err(|e| OmniError::Lance(e.to_string()))
            }
        }
    }

    pub async fn overwrite_batch(
        &self,
        dataset_uri: &str,
        ds: &mut Dataset,
        batch: RecordBatch,
    ) -> Result<TableState> {
        ds.truncate_table()
            .await
            .map_err(|e| OmniError::Lance(e.to_string()))?;
        self.append_batch(dataset_uri, ds, batch).await
    }

    pub async fn overwrite_dataset(dataset_uri: &str, batch: RecordBatch) -> Result<Dataset> {
        let reader = arrow_array::RecordBatchIterator::new(vec![Ok(batch.clone())], batch.schema());
        let params = WriteParams {
            mode: WriteMode::Overwrite,
            enable_stable_row_ids: true,
            data_storage_version: Some(LanceFileVersion::V2_2),
            allow_external_blob_outside_bases: true,
            ..Default::default()
        };
        Dataset::write(reader, dataset_uri, Some(params))
            .await
            .map_err(|e| OmniError::Lance(e.to_string()))
    }

    pub async fn merge_insert_batch(
        &self,
        dataset_uri: &str,
        ds: Dataset,
        batch: RecordBatch,
        key_columns: Vec<String>,
        when_matched: WhenMatched,
        when_not_matched: WhenNotMatched,
    ) -> Result<TableState> {
        if batch.num_rows() == 0 {
            return self.table_state(dataset_uri, &ds).await;
        }

        // TODO(lance-upstream): MergeInsertBuilder does not accept WriteParams,
        // so allow_external_blob_outside_bases cannot be set here. External URI
        // blobs via merge_insert (LoadMode::Merge, mutations) are unsupported
        // until Lance exposes WriteParams on MergeInsertBuilder.
        let ds = Arc::new(ds);
        let job = MergeInsertBuilder::try_new(ds, key_columns)
            .map_err(|e| OmniError::Lance(e.to_string()))?
            .when_matched(when_matched)
            .when_not_matched(when_not_matched)
            .try_build()
            .map_err(|e| OmniError::Lance(e.to_string()))?;

        let schema = batch.schema();
        let reader = arrow_array::RecordBatchIterator::new(vec![Ok(batch)], schema);
        let (new_ds, _stats) = job
            .execute(lance_datafusion::utils::reader_to_stream(Box::new(reader)))
            .await
            .map_err(|e| OmniError::Lance(e.to_string()))?;
        self.table_state(dataset_uri, &new_ds).await
    }

    pub async fn merge_insert_batches(
        &self,
        dataset_uri: &str,
        ds: Dataset,
        batches: Vec<RecordBatch>,
        key_columns: Vec<String>,
        when_matched: WhenMatched,
        when_not_matched: WhenNotMatched,
    ) -> Result<TableState> {
        if batches.is_empty() {
            return self.table_state(dataset_uri, &ds).await;
        }
        let batch = if batches.len() == 1 {
            batches.into_iter().next().unwrap()
        } else {
            let schema = batches[0].schema();
            concat_batches(&schema, &batches).map_err(|e| OmniError::Lance(e.to_string()))?
        };
        self.merge_insert_batch(
            dataset_uri,
            ds,
            batch,
            key_columns,
            when_matched,
            when_not_matched,
        )
        .await
    }

    pub async fn delete_where(
        &self,
        dataset_uri: &str,
        ds: &mut Dataset,
        filter: &str,
    ) -> Result<DeleteState> {
        let delete_result = ds
            .delete(filter)
            .await
            .map_err(|e| OmniError::Lance(e.to_string()))?;
        Ok(DeleteState {
            version: delete_result.new_dataset.version().version,
            row_count: self.count_rows(&delete_result.new_dataset, None).await? as u64,
            deleted_rows: delete_result.num_deleted_rows as usize,
            version_metadata: self
                .dataset_version_metadata(dataset_uri, &delete_result.new_dataset)?,
        })
    }

    // ─── Staged-write API ────────────────────────────────────────────────────
    //
    // These primitives wrap Lance's distributed-write API: each call writes
    // fragment files to object storage but does NOT advance the dataset's
    // HEAD or commit a manifest entry. The returned `Transaction` is held by
    // the caller (typically `MutationStaging` or the loader's accumulator)
    // and committed at end-of-query via `commit_staged`. On failure the
    // fragments remain unreferenced and are reclaimed by `cleanup_old_versions`.
    //
    // The extracted `Vec<Fragment>` is for read-your-writes within the same
    // query: subsequent ops construct a `Scanner` and call
    // `scanner.with_fragments(staged.clone())` to see staged data alongside
    // the committed snapshot. Lance's filter pushdown, vector search, and
    // FTS all respect the supplied fragment list.

    /// Stage an append: write fragment files for `batch`, return the
    /// uncommitted Lance transaction plus the new fragments for
    /// read-your-writes.
    ///
    /// `prior_stages` is the slice of staged writes already accumulated
    /// against the **same dataset** in the same query. Pass `&[]` for the
    /// first call; pass the accumulated stages for subsequent calls. The
    /// primitive uses this to offset row-ID assignment so chained
    /// `stage_append` calls don't produce overlapping `_rowid` ranges.
    /// Mirrors `scan_with_staged`'s `&[StagedWrite]` shape — the same
    /// slice gets passed to both.
    ///
    /// On stable-row-id datasets we manually populate `row_id_meta` on
    /// the cloned `new_fragments` we expose for `scan_with_staged`.
    /// Lance's `InsertBuilder::execute_uncommitted` produces fragments
    /// with `row_id_meta = None`; row IDs are normally assigned by
    /// `Transaction::assign_row_ids` during commit. Because
    /// `scan_with_staged` reads the staged fragments *before* commit,
    /// the scanner trips on a stable-row-id dataset
    /// (`Error::internal("Missing row id meta")` from
    /// `dataset/rowids.rs:22`). The transaction's internal fragment copy
    /// stays untouched — Lance assigns IDs there independently at commit
    /// time, and the two ID assignments don't have to agree because no
    /// caller threads `_rowid` from the staged scan into the commit
    /// path.
    ///
    /// **Contract: `prior_stages` must contain only previous
    /// `stage_append` results against the same dataset.** Mixing
    /// stage_merge_insert into `prior_stages` would over-count because
    /// merge_insert's `new_fragments` include rewrites that don't add
    /// rows. The engine's parse-time D₂′ check (per touched table: all
    /// stage_append OR exactly one stage_merge_insert) guarantees this
    /// upstream; on the primitive layer it's the caller's responsibility.
    pub async fn stage_append(
        &self,
        ds: &Dataset,
        batch: RecordBatch,
        prior_stages: &[StagedWrite],
    ) -> Result<StagedWrite> {
        if batch.num_rows() == 0 {
            return Err(OmniError::manifest_internal(
                "stage_append called with empty batch".to_string(),
            ));
        }
        let params = WriteParams {
            mode: WriteMode::Append,
            allow_external_blob_outside_bases: true,
            ..Default::default()
        };
        let transaction = InsertBuilder::new(Arc::new(ds.clone()))
            .with_params(&params)
            .execute_uncommitted(vec![batch])
            .await
            .map_err(|e| OmniError::Lance(e.to_string()))?;
        let mut new_fragments = match &transaction.operation {
            Operation::Append { fragments } => fragments.clone(),
            Operation::Overwrite { fragments, .. } => fragments.clone(),
            other => {
                return Err(OmniError::manifest_internal(format!(
                    "stage_append: unexpected Lance operation {:?}",
                    std::mem::discriminant(other)
                )));
            }
        };
        // Assign real fragment IDs. Lance's `InsertBuilder::execute_uncommitted`
        // returns fragments with `id = 0` ("Temporary ID" — see lance-4.0.0
        // `dataset/write.rs:1044/1712`); the real assignment happens during
        // commit via `Transaction::fragments_with_ids`. Because we expose
        // these fragments to `scan_with_staged` *before* commit, two staged
        // fragments (or one staged + the seed) would collide on `id = 0`,
        // causing Lance's scanner to mishandle the combined list (silent
        // duplicates / dropped rows). Mirror the commit-time renumbering
        // here, using `ds.manifest.max_fragment_id() + 1` as the base and
        // accounting for prior stages.
        // ds.manifest.max_fragment_id is Option<u32>; cast up to u64 because
        // Lance's Fragment::id (and the commit-time renumbering counter in
        // Transaction::fragments_with_ids) operate on u64.
        let next_id_base = ds.manifest.max_fragment_id.unwrap_or(0) as u64
            + 1
            + prior_stages_fragment_count(prior_stages);
        assign_fragment_ids(&mut new_fragments, next_id_base);
        if ds.manifest.uses_stable_row_ids() {
            let prior_rows = prior_stages_row_count(prior_stages)?;
            let start_row_id = ds.manifest.next_row_id + prior_rows;
            assign_row_id_meta(&mut new_fragments, start_row_id)?;
        }
        Ok(StagedWrite {
            transaction,
            new_fragments,
            // Append never supersedes existing fragments.
            removed_fragment_ids: Vec::new(),
        })
    }

    /// Stage a merge_insert (upsert): write fragment files describing the
    /// merge result, return the uncommitted transaction plus the new
    /// fragments. The transaction's `Operation::Update` carries the
    /// fragments-to-remove and fragments-to-add; for read-your-writes we
    /// expose `new_fragments` (rows that will be visible after commit).
    ///
    /// **Contract: do not chain `stage_merge_insert` calls on the same
    /// table within one query.** Each call's `MergeInsertBuilder` runs
    /// against the supplied dataset's committed view — it does not see
    /// fragments produced by a previous staged merge on the same table.
    /// Two chained `stage_merge_insert`s whose source rows share keys will
    /// each independently produce `Operation::Update` transactions whose
    /// `new_fragments` contain a row for the shared key. `scan_with_staged`
    /// (and `count_rows_with_staged`) will then return both — i.e.
    /// **duplicates by key**.
    ///
    /// This is intrinsic to the underlying Lance API: there is no public
    /// way to make `MergeInsertBuilder` see uncommitted fragments. The
    /// engine's `MutationStaging` accumulator works around this by
    /// concatenating per-table batches in memory and issuing exactly
    /// one `stage_merge_insert` per touched table at end-of-query (with
    /// last-write-wins dedupe by id) — see `exec/staging.rs`. Direct
    /// callers of this primitive must respect the contract themselves.
    ///
    /// Lift path: either a Lance API extension that lets
    /// `MergeInsertBuilder` accept additional staged fragments, or an
    /// in-memory pre-merge here that folds prior staged batches into the
    /// input stream. See `docs/runs.md`.
    pub async fn stage_merge_insert(
        &self,
        ds: Dataset,
        batch: RecordBatch,
        key_columns: Vec<String>,
        when_matched: WhenMatched,
        when_not_matched: WhenNotMatched,
    ) -> Result<StagedWrite> {
        if batch.num_rows() == 0 {
            return Err(OmniError::manifest_internal(
                "stage_merge_insert called with empty batch".to_string(),
            ));
        }
        let ds = Arc::new(ds);
        let job = MergeInsertBuilder::try_new(ds, key_columns)
            .map_err(|e| OmniError::Lance(e.to_string()))?
            .when_matched(when_matched)
            .when_not_matched(when_not_matched)
            .try_build()
            .map_err(|e| OmniError::Lance(e.to_string()))?;
        let schema = batch.schema();
        let reader = arrow_array::RecordBatchIterator::new(vec![Ok(batch)], schema);
        let stream = lance_datafusion::utils::reader_to_stream(Box::new(reader));
        let uncommitted = job
            .execute_uncommitted(stream)
            .await
            .map_err(|e| OmniError::Lance(e.to_string()))?;
        // Operation::Update { removed_fragment_ids, updated_fragments, new_fragments, .. } —
        // `new_fragments` are the freshly inserted rows; `updated_fragments`
        // are rewrites of existing fragments that include both retained and
        // updated rows; `removed_fragment_ids` lists the committed-manifest
        // fragments that those rewrites supersede. For read-your-writes we
        // expose `updated_fragments + new_fragments` and the
        // `removed_fragment_ids` so `scan_with_staged` can filter the
        // superseded committed fragments before combining — otherwise a
        // single merge_insert appears as duplicate rows (original committed
        // version + rewritten staged version).
        let (new_fragments, removed_fragment_ids) = match &uncommitted.transaction.operation {
            Operation::Update {
                new_fragments,
                updated_fragments,
                removed_fragment_ids,
                ..
            } => {
                let mut all = updated_fragments.clone();
                all.extend(new_fragments.iter().cloned());
                (all, removed_fragment_ids.clone())
            }
            Operation::Append { fragments } => (fragments.clone(), Vec::new()),
            other => {
                return Err(OmniError::manifest_internal(format!(
                    "stage_merge_insert: unexpected Lance operation {:?}",
                    std::mem::discriminant(other)
                )));
            }
        };
        Ok(StagedWrite {
            transaction: uncommitted.transaction,
            new_fragments,
            removed_fragment_ids,
        })
    }

    /// Commit a previously-staged transaction onto `ds`, returning the new
    /// dataset (with HEAD advanced). Wraps `CommitBuilder::execute`. Used by
    /// the publisher at end-of-query to materialize all staged writes before
    /// the meta-manifest commit.
    pub async fn commit_staged(
        &self,
        ds: Arc<Dataset>,
        transaction: Transaction,
    ) -> Result<Dataset> {
        CommitBuilder::new(ds)
            .execute(transaction)
            .await
            .map_err(|e| OmniError::Lance(e.to_string()))
    }

    /// Stage an overwrite (write_fragments + Operation::Overwrite { schema, fragments }).
    /// Returns a StagedWrite carrying the replacement fragments. HEAD does
    /// NOT advance.
    ///
    /// Lance shape: `InsertBuilder::with_params(WriteParams { mode: Overwrite, .. })
    /// .execute_uncommitted(vec![batch])` produces a `Transaction` whose
    /// `Operation::Overwrite` carries the new schema + fragments. The
    /// transaction is committed via `commit_staged` (same call as
    /// `stage_append`).
    ///
    /// MR-793 Phase 2: introduces this for the schema_apply rewrite path.
    /// Lance API verified in `.context/mr-793-design.md` Appendix A.1.
    pub async fn stage_overwrite(
        &self,
        ds: &Dataset,
        batch: RecordBatch,
    ) -> Result<StagedWrite> {
        if batch.num_rows() == 0 {
            return Err(OmniError::manifest_internal(
                "stage_overwrite called with empty batch".to_string(),
            ));
        }
        let params = WriteParams {
            mode: WriteMode::Overwrite,
            allow_external_blob_outside_bases: true,
            ..Default::default()
        };
        let transaction = InsertBuilder::new(Arc::new(ds.clone()))
            .with_params(&params)
            .execute_uncommitted(vec![batch])
            .await
            .map_err(|e| OmniError::Lance(e.to_string()))?;
        let mut new_fragments = match &transaction.operation {
            Operation::Overwrite { fragments, .. } => fragments.clone(),
            other => {
                return Err(OmniError::manifest_internal(format!(
                    "stage_overwrite: unexpected Lance operation {:?}",
                    std::mem::discriminant(other)
                )));
            }
        };
        // Overwrite REPLACES every committed fragment, and Lance restarts
        // fragment-ID and row-ID counters at the post-commit version.
        // For our pre-commit staged view we need to:
        //   1) Renumber temporary fragment IDs (Lance returns them as
        //      `id = 0` from `execute_uncommitted` — see stage_append
        //      for the same fix). For Overwrite there are no committed
        //      fragments to collide with (they're all in
        //      removed_fragment_ids below), so start at 1.
        //   2) For stable-row-id datasets, assign row_id_meta starting
        //      at 0 (Overwrite is a fresh-start) so `scan_with_staged`
        //      doesn't hit the "Missing row id meta" panic in
        //      lance-4.0.0 dataset/rowids.rs:22.
        assign_fragment_ids(&mut new_fragments, 1);
        if ds.manifest.uses_stable_row_ids() {
            assign_row_id_meta(&mut new_fragments, 0)?;
        }
        // Overwrite REPLACES every committed fragment. For
        // read-your-writes via scan_with_staged, list every committed
        // fragment in removed_fragment_ids so the post-stage view shows
        // ONLY the staged fragments.
        let removed_fragment_ids: Vec<u64> =
            ds.manifest.fragments.iter().map(|f| f.id).collect();
        Ok(StagedWrite {
            transaction,
            new_fragments,
            removed_fragment_ids,
        })
    }

    /// Stage a BTREE scalar index build. Returns a StagedWrite whose
    /// transaction commits via `commit_staged`. HEAD does NOT advance.
    ///
    /// Lance shape: `CreateIndexBuilder::execute_uncommitted` returns
    /// `IndexMetadata`; we manually wrap it in `Operation::CreateIndex
    /// { new_indices, removed_indices }` via the public `TransactionBuilder`,
    /// replicating the simple (non-segment-commit-path) branch of Lance's
    /// `CreateIndexBuilder::execute` (lance-4.0.0 `src/index/create.rs:502-512`).
    ///
    /// `removed_indices` mirrors `execute()` lines 466-476: when the
    /// build replaces an existing same-named index, those entries are
    /// listed for tombstoning by the manifest commit.
    ///
    /// MR-793 Phase 2: scalar index types (BTree, Inverted) are
    /// stage-able. Vector indices are NOT (segment-commit-path requires
    /// `build_index_metadata_from_segments` which is `pub(crate)` in
    /// lance-4.0.0); see `create_vector_index` and Appendix A.3.
    pub async fn stage_create_btree_index(
        &self,
        ds: &Dataset,
        columns: &[&str],
    ) -> Result<StagedWrite> {
        let params = ScalarIndexParams::default();
        let mut ds_clone = ds.clone();
        let new_idx = ds_clone
            .create_index_builder(columns, IndexType::BTree, &params)
            .replace(true)
            .execute_uncommitted()
            .await
            .map_err(|e| {
                OmniError::Lance(format!("stage_create_btree_index: {}", e))
            })?;
        let removed_indices: Vec<IndexMetadata> = ds
            .load_indices()
            .await
            .map_err(|e| OmniError::Lance(e.to_string()))?
            .iter()
            .filter(|idx| idx.name == new_idx.name)
            .cloned()
            .collect();
        let transaction = TransactionBuilder::new(
            new_idx.dataset_version,
            Operation::CreateIndex {
                new_indices: vec![new_idx],
                removed_indices,
            },
        )
        .build();
        Ok(StagedWrite {
            transaction,
            new_fragments: Vec::new(),
            removed_fragment_ids: Vec::new(),
        })
    }

    /// Stage an INVERTED (FTS) scalar index build. Same shape as
    /// `stage_create_btree_index`; see its docs for the Lance API
    /// citation and contract notes.
    pub async fn stage_create_inverted_index(
        &self,
        ds: &Dataset,
        column: &str,
    ) -> Result<StagedWrite> {
        let params = InvertedIndexParams::default();
        let mut ds_clone = ds.clone();
        let new_idx = ds_clone
            .create_index_builder(&[column], IndexType::Inverted, &params)
            .replace(true)
            .execute_uncommitted()
            .await
            .map_err(|e| {
                OmniError::Lance(format!("stage_create_inverted_index: {}", e))
            })?;
        let removed_indices: Vec<IndexMetadata> = ds
            .load_indices()
            .await
            .map_err(|e| OmniError::Lance(e.to_string()))?
            .iter()
            .filter(|idx| idx.name == new_idx.name)
            .cloned()
            .collect();
        let transaction = TransactionBuilder::new(
            new_idx.dataset_version,
            Operation::CreateIndex {
                new_indices: vec![new_idx],
                removed_indices,
            },
        )
        .build();
        Ok(StagedWrite {
            transaction,
            new_fragments: Vec::new(),
            removed_fragment_ids: Vec::new(),
        })
    }

    /// Run a scan with optional uncommitted staged writes visible
    /// alongside the committed snapshot. When `staged` is empty this is
    /// identical to `scan(...)`.
    ///
    /// Composes the visible fragment list as `committed - removed + new`:
    /// the committed manifest's fragments, minus any fragment IDs that
    /// staged `Operation::Update`s (merge_insert rewrites) have superseded,
    /// plus the staged new/updated fragments. Without the `removed`
    /// filter, a merge_insert that rewrites an existing fragment would
    /// surface twice — once via the original committed fragment, once via
    /// the rewrite in `new_fragments`.
    ///
    /// **Filter contract is incomplete on staged fragments.** When `filter`
    /// is `Some(...)`, Lance pushes the predicate to per-fragment scans
    /// with stats-based pruning. Uncommitted fragments produced by
    /// `write_fragments_internal` lack the per-column statistics that
    /// committed fragments carry; Lance's optimizer drops them from the
    /// filtered scan even when their data would match. Staged-fragment
    /// rows are silently absent from the result. `scanner.use_stats(false)`
    /// does not fix this in lance 4.0.0. Callers needing correct filtered
    /// reads against staged data should use a different strategy — the
    /// engine's `MutationStaging` accumulator unions in-memory pending
    /// batches with the committed scan via DataFusion `MemTable` (see
    /// `scan_with_pending`).
    ///
    /// This method remains on the surface for primitive-level testing
    /// (basic stage + scan correctness without filters works) and for
    /// callers that don't need filter pushdown.
    pub async fn scan_with_staged(
        &self,
        ds: &Dataset,
        staged: &[StagedWrite],
        projection: Option<&[&str]>,
        filter: Option<&str>,
    ) -> Result<Vec<RecordBatch>> {
        if staged.is_empty() {
            return self.scan(ds, projection, filter, None).await;
        }
        let mut scanner = ds.scan();
        if let Some(cols) = projection {
            let owned: Vec<String> = cols.iter().map(|s| s.to_string()).collect();
            scanner
                .project(&owned)
                .map_err(|e| OmniError::Lance(e.to_string()))?;
        }
        if let Some(f) = filter {
            scanner
                .filter(f)
                .map_err(|e| OmniError::Lance(e.to_string()))?;
        }
        scanner.with_fragments(combine_committed_with_staged(ds, staged));
        let stream = scanner
            .try_into_stream()
            .await
            .map_err(|e| OmniError::Lance(e.to_string()))?;
        stream
            .try_collect()
            .await
            .map_err(|e| OmniError::Lance(e.to_string()))
    }

    /// Scan committed via Lance + apply the same filter to in-memory
    /// pending batches via DataFusion `MemTable`, concat the two result
    /// streams. The replacement for `scan_with_staged` in engine code:
    /// the staged-write writer accumulates input batches in memory and
    /// unions them with the committed snapshot at read time,
    /// sidestepping the `Scanner::with_fragments` filter-pushdown
    /// limitation documented on `scan_with_staged`.
    ///
    /// `committed_ds` should be opened at the pre-mutation
    /// `expected_version` (the same version captured in `MutationStaging::expected_versions`
    /// at first touch of the table). `pending_batches` are the per-table
    /// accumulator's batches in their input shape. `pending_schema` is
    /// the schema of the accumulated batches; passing `None` falls back
    /// to the schema of the first pending batch.
    ///
    /// `filter` is the Lance / DataFusion SQL predicate. It is applied
    /// to both sides — Lance pushes it down on the committed side; the
    /// pending side runs it through a fresh DataFusion `SessionContext`
    /// with the batches registered as a `MemTable` named `pending`.
    ///
    /// `key_column` controls how committed and pending are unioned:
    /// - **`None` (union semantics)**: every committed row that matches
    ///   the filter and every pending row that matches the filter is
    ///   returned. Correct when committed and pending cannot share a
    ///   primary key — e.g., Append-mode loads with ULID-generated ids,
    ///   or any read where pending hasn't been used to update committed
    ///   rows.
    /// - **`Some(col)` (merge / shadow semantics)**: committed rows whose
    ///   `col` value appears in any pending batch are EXCLUDED from the
    ///   result; only pending's view of those rows is returned. Required
    ///   for Merge-mode reads (e.g., `execute_update` on the engine path)
    ///   so a chained `update` doesn't see stale committed values that
    ///   a prior op already updated in pending. Without this, a predicate
    ///   like `where age > 30` can match a row that an earlier
    ///   `set age = 20` already moved out of range.
    ///
    /// When `pending_batches` is empty this delegates to the regular
    /// scan path.
    pub async fn scan_with_pending(
        &self,
        committed_ds: &Dataset,
        pending_batches: &[RecordBatch],
        pending_schema: Option<SchemaRef>,
        projection: Option<&[&str]>,
        filter: Option<&str>,
        key_column: Option<&str>,
    ) -> Result<Vec<RecordBatch>> {
        // Contract: when merge-shadow semantics are requested via
        // `key_column`, the committed-side projection MUST include that
        // column so we can filter committed rows whose key appears in
        // pending. Silently dropping the shadow when projection omits
        // the key would re-introduce union semantics behind the
        // caller's back. Reject up front with a clear error so callers
        // either (a) include the key in projection or (b) drop
        // `key_column` if union is what they wanted.
        if let (Some(key_col), Some(cols)) = (key_column, projection) {
            if !cols.iter().any(|c| *c == key_col) {
                return Err(OmniError::Lance(format!(
                    "scan_with_pending: key_column '{}' must appear in projection \
                     when merge-shadow semantics are requested (got projection = {:?})",
                    key_col, cols
                )));
            }
        }

        let committed = self.scan(committed_ds, projection, filter, None).await?;
        if pending_batches.is_empty() {
            return Ok(committed);
        }

        // Shadow committed rows whose key value also appears in pending.
        // This makes scan_with_pending implement merge semantics rather
        // than naive union: any row that has a pending update is
        // represented ONLY by its pending value, never by both its
        // (stale) committed value and its (current) pending value.
        let committed = match key_column {
            Some(key_col) => {
                let pending_keys = collect_string_column_values(pending_batches, key_col)?;
                if pending_keys.is_empty() {
                    committed
                } else {
                    filter_out_rows_where_string_in(committed, key_col, &pending_keys)?
                }
            }
            None => committed,
        };

        let pending = scan_pending_batches(
            pending_batches,
            pending_schema,
            projection,
            filter,
        )
        .await?;

        let mut out = committed;
        out.extend(pending);
        Ok(out)
    }

    /// `count_rows` variant that respects staged writes. Used for
    /// edge-cardinality validation that needs to see staged edges before
    /// commit. Same `committed - removed + new` composition as
    /// `scan_with_staged`.
    pub async fn count_rows_with_staged(
        &self,
        ds: &Dataset,
        staged: &[StagedWrite],
        filter: Option<String>,
    ) -> Result<usize> {
        if staged.is_empty() {
            return self.count_rows(ds, filter).await;
        }
        let mut scanner = ds.scan();
        if let Some(f) = filter {
            scanner
                .filter(&f)
                .map_err(|e| OmniError::Lance(e.to_string()))?;
        }
        scanner.with_fragments(combine_committed_with_staged(ds, staged));
        let count = scanner
            .count_rows()
            .await
            .map_err(|e| OmniError::Lance(e.to_string()))?;
        Ok(count as usize)
    }

    async fn user_indices_for_column(
        &self,
        ds: &Dataset,
        column: &str,
    ) -> Result<Vec<IndexMetadata>> {
        let field_id = ds
            .schema()
            .field(column)
            .map(|field| field.id)
            .ok_or_else(|| {
                OmniError::manifest_internal(format!(
                    "dataset is missing expected index column '{}'",
                    column
                ))
            })?;
        let indices = ds
            .load_indices()
            .await
            .map_err(|e| OmniError::Lance(e.to_string()))?;
        Ok(indices
            .iter()
            .filter(|index| !is_system_index(index))
            .filter(|index| index.fields.len() == 1 && index.fields[0] == field_id)
            .cloned()
            .collect())
    }

    pub async fn has_btree_index(&self, ds: &Dataset, column: &str) -> Result<bool> {
        let indices = self.user_indices_for_column(ds, column).await?;
        Ok(indices.iter().any(|index| {
            index
                .index_details
                .as_ref()
                .map(|details| details.type_url.ends_with("BTreeIndexDetails"))
                .unwrap_or(false)
        }))
    }

    pub async fn has_fts_index(&self, ds: &Dataset, column: &str) -> Result<bool> {
        let indices = self.user_indices_for_column(ds, column).await?;
        Ok(indices.iter().any(|index| {
            index
                .index_details
                .as_ref()
                .map(|details| IndexDetails(details.clone()).supports_fts())
                .unwrap_or(false)
        }))
    }

    pub async fn has_vector_index(&self, ds: &Dataset, column: &str) -> Result<bool> {
        let indices = self.user_indices_for_column(ds, column).await?;
        Ok(indices.iter().any(|index| {
            index
                .index_details
                .as_ref()
                .map(|details| IndexDetails(details.clone()).is_vector())
                .unwrap_or(false)
        }))
    }

    pub async fn create_btree_index(&self, ds: &mut Dataset, columns: &[&str]) -> Result<()> {
        let params = ScalarIndexParams::default();
        ds.create_index_builder(columns, IndexType::BTree, &params)
            .replace(true)
            .await
            .map(|_| ())
            .map_err(|e| OmniError::Lance(e.to_string()))
    }

    pub async fn create_inverted_index(&self, ds: &mut Dataset, column: &str) -> Result<()> {
        let params = InvertedIndexParams::default();
        ds.create_index_builder(&[column], IndexType::Inverted, &params)
            .replace(true)
            .await
            .map(|_| ())
            .map_err(|e| OmniError::Lance(e.to_string()))
    }

    pub async fn create_vector_index(&self, ds: &mut Dataset, column: &str) -> Result<()> {
        let params = lance::index::vector::VectorIndexParams::ivf_flat(1, MetricType::L2);
        ds.create_index_builder(&[column], IndexType::Vector, &params)
            .replace(true)
            .await
            .map(|_| ())
            .map_err(|e| OmniError::Lance(e.to_string()))
    }

    pub async fn create_empty_dataset(dataset_uri: &str, schema: &SchemaRef) -> Result<Dataset> {
        let batch = RecordBatch::new_empty(schema.clone());
        Self::write_dataset(dataset_uri, batch).await
    }

    pub async fn first_row_id_for_filter(&self, ds: &Dataset, filter: &str) -> Result<Option<u64>> {
        let batches = Self::scan_stream(ds, Some(&["id"]), Some(filter), None, true)
            .await?
            .try_collect::<Vec<RecordBatch>>()
            .await
            .map_err(|e| OmniError::Lance(e.to_string()))?;
        Ok(batches.iter().find_map(|batch| {
            batch
                .column_by_name("_rowid")
                .and_then(|col| col.as_any().downcast_ref::<UInt64Array>())
                .and_then(|arr| (arr.len() > 0).then(|| arr.value(0)))
        }))
    }

    pub async fn write_dataset(dataset_uri: &str, batch: RecordBatch) -> Result<Dataset> {
        let reader = arrow_array::RecordBatchIterator::new(vec![Ok(batch.clone())], batch.schema());
        let params = WriteParams {
            mode: WriteMode::Create,
            enable_stable_row_ids: true,
            data_storage_version: Some(LanceFileVersion::V2_2),
            allow_external_blob_outside_bases: true,
            ..Default::default()
        };
        Dataset::write(reader, dataset_uri, Some(params))
            .await
            .map_err(|e| OmniError::Lance(e.to_string()))
    }
}

/// Build the `Scanner::with_fragments` argument for read-your-writes:
/// committed manifest fragments minus any fragment IDs superseded by the
/// staged writes, plus the staged `new_fragments`. Order is:
///   1. committed fragments whose IDs are NOT in any staged
///      `removed_fragment_ids` (preserves committed order),
///   2. all staged `new_fragments` in stage order.
///
/// Lance's `Scanner` does not require any particular ordering between
/// committed and staged fragments — `with_fragments` scopes the scan to
/// exactly the supplied list. The dedup matters because merge_insert
/// rewrites a fragment in place at the Lance layer: the rewritten
/// fragment is in `new_fragments`, the original (which it supersedes) is
/// in `committed` until manifest commit, and including both would yield
/// duplicate rows.
///
/// **Inter-stage supersession is not handled here.** Each StagedWrite's
/// `removed_fragment_ids` lists committed-manifest fragment IDs only; a
/// later staged merge cannot know about an earlier staged merge's
/// fragments (Lance's `MergeInsertBuilder` runs against the committed
/// view). If two `stage_merge_insert`s on the same table produce rows
/// with the same key, the combined view returns duplicates by key. The
/// engine's mutation path enforces "per touched table: all stage_append
/// OR exactly one stage_merge_insert" at parse time (D₂′ in
/// `exec/mutation.rs`) so this primitive's caller never chains merges.
/// See `stage_merge_insert` for the full contract.
/// Sum `physical_rows` across all fragments in the supplied stages.
/// Used by `stage_append` to compute the row-ID offset for chained
/// `stage_append` calls against the same dataset.
///
/// Assumes `prior_stages` contains only `stage_append` results — see
/// `stage_append`'s D₂′ contract. For `stage_merge_insert` results the
/// `new_fragments` include rewrites that don't add new rows, so this
/// would over-count.
fn prior_stages_fragment_count(prior_stages: &[StagedWrite]) -> u64 {
    prior_stages
        .iter()
        .map(|s| s.new_fragments.len() as u64)
        .sum()
}

/// Assign sequential fragment IDs starting at `start_id`. Mirrors Lance's
/// commit-time `Transaction::fragments_with_ids` (lance-4.0.0
/// `dataset/transaction.rs:1456`) — fragments produced by
/// `InsertBuilder::execute_uncommitted` start with `id = 0` as a temporary
/// placeholder; we renumber here so they don't collide with committed
/// fragments (or with each other across chained stages) when the slice is
/// passed to `Scanner::with_fragments`.
fn assign_fragment_ids(fragments: &mut [Fragment], start_id: u64) {
    for (i, fragment) in fragments.iter_mut().enumerate() {
        if fragment.id == 0 {
            fragment.id = start_id + i as u64;
        }
    }
}

fn prior_stages_row_count(prior_stages: &[StagedWrite]) -> Result<u64> {
    let mut total: u64 = 0;
    for stage in prior_stages {
        for fragment in &stage.new_fragments {
            let physical_rows = fragment.physical_rows.ok_or_else(|| {
                OmniError::manifest_internal(
                    "prior_stages_row_count: fragment is missing physical_rows".to_string(),
                )
            })? as u64;
            total += physical_rows;
        }
    }
    Ok(total)
}

/// Assign sequential row IDs to fragments that lack them, starting from
/// `start_row_id`. Mirrors the relevant arm of Lance's
/// `Transaction::assign_row_ids` (lance-4.0.0 `dataset/transaction.rs:2682`)
/// for the `row_id_meta = None` case — fragments produced by
/// `InsertBuilder::execute_uncommitted` against a stable-row-id dataset.
///
/// Used only by `stage_append` for read-your-writes — see its docstring
/// for why pre-commit assignment is needed and why diverging from Lance's
/// commit-time IDs is safe.
fn assign_row_id_meta(fragments: &mut [Fragment], start_row_id: u64) -> Result<()> {
    let mut next_row_id = start_row_id;
    for fragment in fragments {
        if fragment.row_id_meta.is_some() {
            continue;
        }
        let physical_rows = fragment.physical_rows.ok_or_else(|| {
            OmniError::manifest_internal(
                "stage_append: fragment is missing physical_rows".to_string(),
            )
        })? as u64;
        let row_ids = next_row_id..(next_row_id + physical_rows);
        let sequence = RowIdSequence::from(row_ids);
        let serialized = write_row_ids(&sequence);
        fragment.row_id_meta = Some(RowIdMeta::Inline(serialized));
        next_row_id += physical_rows;
    }
    Ok(())
}

/// Collect the set of values in a Utf8 column across multiple batches.
/// Used by `scan_with_pending`'s merge-semantic path to identify
/// committed rows that are shadowed by pending writes. NULL values are
/// skipped.
fn collect_string_column_values(
    batches: &[RecordBatch],
    column: &str,
) -> Result<std::collections::HashSet<String>> {
    use arrow_array::{Array, StringArray};
    let mut out = std::collections::HashSet::new();
    for batch in batches {
        let Some(col) = batch.column_by_name(column) else {
            return Err(OmniError::Lance(format!(
                "scan_with_pending: pending batch missing key column '{}'",
                column
            )));
        };
        let arr = col.as_any().downcast_ref::<StringArray>().ok_or_else(|| {
            OmniError::Lance(format!(
                "scan_with_pending: key column '{}' is not Utf8",
                column
            ))
        })?;
        for i in 0..arr.len() {
            if arr.is_valid(i) {
                out.insert(arr.value(i).to_string());
            }
        }
    }
    Ok(out)
}

/// Drop rows from `batches` whose Utf8 `column` value is in `excluded`.
/// Used by `scan_with_pending`'s merge-semantic path to shadow committed
/// rows that pending has already updated. Returns the surviving rows.
///
/// `scan_with_pending` validates up front that the projection contains
/// `column`, so a missing column here is a programmer error — error
/// loudly instead of silently passing batches through (which would
/// re-introduce the union semantics the caller asked us to avoid).
fn filter_out_rows_where_string_in(
    batches: Vec<RecordBatch>,
    column: &str,
    excluded: &std::collections::HashSet<String>,
) -> Result<Vec<RecordBatch>> {
    use arrow_array::{Array, BooleanArray, StringArray};
    let mut out = Vec::with_capacity(batches.len());
    for batch in batches {
        if batch.num_rows() == 0 {
            out.push(batch);
            continue;
        }
        let col = batch.column_by_name(column).ok_or_else(|| {
            OmniError::manifest_internal(format!(
                "scan_with_pending: committed batch missing key column '{}' \
                 (the up-front projection check should have rejected this)",
                column
            ))
        })?;
        let arr = col.as_any().downcast_ref::<StringArray>().ok_or_else(|| {
            OmniError::Lance(format!(
                "scan_with_pending: committed column '{}' is not Utf8",
                column
            ))
        })?;
        let mask: BooleanArray = (0..arr.len())
            .map(|i| {
                if arr.is_valid(i) {
                    Some(!excluded.contains(arr.value(i)))
                } else {
                    Some(true)
                }
            })
            .collect();
        let filtered = arrow_select::filter::filter_record_batch(&batch, &mask)
            .map_err(|e| OmniError::Lance(e.to_string()))?;
        out.push(filtered);
    }
    Ok(out)
}

/// Apply `projection` and `filter` to in-memory pending batches via a
/// fresh DataFusion `SessionContext`. Used by `scan_with_pending` for
/// the read-your-writes side of the in-memory staging accumulator.
///
/// `pending_batches` must be non-empty (the caller short-circuits on
/// empty).
///
/// **SQL dialect contract.** `filter` is also passed to Lance's scanner
/// on the committed side. Lance and DataFusion both accept standard
/// SQL comparison predicates (`col op literal`) and OmniGraph's
/// `predicate_to_sql` only emits those shapes today (`=`, `!=`, `>`,
/// `<`, `>=`, `<=`). If a future caller introduces a Lance-specific
/// scanner extension (vector search, FTS, `_rowid` references) into
/// the filter, this function will need explicit translation — DataFusion
/// won't recognize those operators against the in-memory `MemTable`.
async fn scan_pending_batches(
    pending_batches: &[RecordBatch],
    pending_schema: Option<SchemaRef>,
    projection: Option<&[&str]>,
    filter: Option<&str>,
) -> Result<Vec<RecordBatch>> {
    let schema = pending_schema.unwrap_or_else(|| pending_batches[0].schema());
    let ctx = datafusion::execution::context::SessionContext::new();
    let mem = datafusion::datasource::MemTable::try_new(
        schema,
        vec![pending_batches.to_vec()],
    )
    .map_err(|e| OmniError::Lance(e.to_string()))?;
    ctx.register_table("pending", Arc::new(mem))
        .map_err(|e| OmniError::Lance(e.to_string()))?;

    let proj = projection
        .map(|cols| {
            cols.iter()
                .map(|c| format!("\"{}\"", c.replace('"', "\"\"")))
                .collect::<Vec<_>>()
                .join(", ")
        })
        .unwrap_or_else(|| "*".to_string());
    let where_clause = filter
        .map(|f| format!("WHERE {f}"))
        .unwrap_or_default();
    let sql = format!("SELECT {proj} FROM pending {where_clause}");
    let df = ctx
        .sql(&sql)
        .await
        .map_err(|e| OmniError::Lance(e.to_string()))?;
    df.collect()
        .await
        .map_err(|e| OmniError::Lance(e.to_string()))
}

fn combine_committed_with_staged(ds: &Dataset, staged: &[StagedWrite]) -> Vec<Fragment> {
    let removed: std::collections::HashSet<u64> = staged
        .iter()
        .flat_map(|w| w.removed_fragment_ids.iter().copied())
        .collect();
    let mut combined: Vec<Fragment> = ds
        .manifest
        .fragments
        .iter()
        .filter(|f| !removed.contains(&f.id))
        .cloned()
        .collect();
    for write in staged {
        combined.extend(write.new_fragments.iter().cloned());
    }
    combined
}