omnigraph-engine 0.8.0

Runtime engine for the Omnigraph graph database.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
//! Read-path cost instrumentation (test seam).
//!
//! Two boundary instruments let cost-budget tests assert that a warm read does
//! no redundant IO, the way LanceDB's IO-counted tests do (see
//! `docs/dev/testing.md`, "Cost-budget tests"):
//!
//! - **Lance object store** — a per-query [`WrappingObjectStore`] attached to the
//!   datasets a query opens, so a test counts real `read_iops`. Delivered through
//!   a task-local ([`QueryIoProbes`]) set by the test; production leaves it unset,
//!   so the open helpers attach nothing (one unset-`Option` check per open).
//! - **omnigraph `StorageAdapter`** — [`CountingStorageAdapter`], a decorator that
//!   counts per-method calls (the schema-contract reads on the query path).
//!
//! Nothing here changes runtime behavior: the wrappers only observe, and the
//! decorator delegates every call. `IOTracker` (the concrete counter) lives in
//! tests via the `lance-io` dev-dependency; this module stays generic over the
//! `lance::io`-re-exported trait, so it adds no production dependency.

use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};

use async_trait::async_trait;
use lance::Dataset;
use lance::dataset::builder::DatasetBuilder;
use lance::io::{ObjectStoreParams, WrappingObjectStore};

use crate::error::{OmniError, Result};
use crate::storage::StorageAdapter;

/// Per-query IO probes, installed for a query's task via [`with_query_io_probes`].
///
/// Each wrapper is attached (when present) to the datasets that category opens,
/// so a test reads `read_iops` off its own `IOTracker` handle. `probe_count`
/// records calls to the version probe (which runs on the coordinator's already-open
/// handle, so it is counted by invocation rather than by the per-query wrappers).
#[derive(Clone, Default)]
pub struct QueryIoProbes {
    pub manifest_wrapper: Option<Arc<dyn WrappingObjectStore>>,
    /// Attached to the per-table data opens a query performs (the cache-miss
    /// path in `SubTableEntry::open`). Lets a cost test assert how many tables
    /// a query actually opened — N on a cold read, 0 on a warm repeat once the
    /// handle cache (Fix 3) serves them.
    pub table_wrapper: Option<Arc<dyn WrappingObjectStore>>,
    pub probe_count: Arc<AtomicU64>,
    /// Counts DATA-table open CALLS through the two instrumented chokepoints
    /// (`open_dataset_tracked` / `open_table_dataset`), classified by URI so the
    /// internal/system tables (`__manifest`) are EXCLUDED — the publisher CAS
    /// opens those every write, and counting them would make the
    /// `data_open_count <= |touched_tables|` write gate
    /// (RFC-013 step 3b) unreachable by threading alone. Unlike the opener-read
    /// term (which mixes with the merge-insert/RI scan on the write path), this is
    /// an exact open-invocation count. `forbidden_apis` keeps engine code OUTSIDE the
    /// storage layer (`exec/`, `db/omnigraph/`, `loader/`, `changes/`) from opening
    /// datasets except through these chokepoints, so the count is complete for the
    /// keyed-write data path the gate measures. (`table_store.rs` is allow-listed and
    /// does hold direct `Dataset::open`s — but only for branch-management ops
    /// (`delete_branch`/`list_branches`/`force_delete_branch`), never that hot path.)
    pub data_open_count: Arc<AtomicU64>,
    /// Internal/system-table (`__manifest`) open CALLS — the complement of
    /// `data_open_count`, kept for symmetry and debugging.
    pub internal_open_count: Arc<AtomicU64>,
    /// Counts topology-index builds (the `RuntimeCache::graph_index` cache-miss
    /// path). A cost test asserts a fresh branch whose edge tables are unchanged
    /// from main reuses main's cached index (0 builds) rather than rebuilding it.
    pub graph_build_count: Arc<AtomicU64>,
    /// Edge tables included in topology builds this query (summed over build
    /// invocations). A cost test asserts a query referencing one edge builds only
    /// that edge, not every catalog edge (the cold-build shrink A2 ships).
    pub graph_edges_built: Arc<AtomicU64>,
}

tokio::task_local! {
    static QUERY_IO_PROBES: QueryIoProbes;
}

/// Run `fut` with per-query IO probes installed. Test-only entry point; nothing
/// in production sets the probes, so the accessors below return `None`/no-op.
pub async fn with_query_io_probes<F>(probes: QueryIoProbes, fut: F) -> F::Output
where
    F: std::future::Future,
{
    QUERY_IO_PROBES.scope(probes, fut).await
}

fn current<R>(f: impl FnOnce(&QueryIoProbes) -> R) -> Option<R> {
    QUERY_IO_PROBES.try_with(f).ok()
}

tokio::task_local! {
    static TRAVERSAL_MODE_OVERRIDE: Option<&'static str>;
}

/// Force the Expand execution mode (`"indexed"` | `"csr"`) for the scope of `fut`
/// WITHOUT mutating the process-global `OMNIGRAPH_TRAVERSAL_MODE` env var. This is
/// the general traversal-mode test seam: scope-bound (so it cannot leak — the
/// override is gone when `fut` resolves or unwinds) and process-safe (it never
/// touches shared state, so a forced-mode test never affects a concurrent test in
/// the same binary, removing the need for `#[serial]` + a dedicated all-serial
/// binary). Mirrors [`with_query_io_probes`]. The env var stays the production/ops
/// escape hatch; this scoped override takes precedence over it
/// (`exec::query::traversal_indexed_override`).
pub async fn with_traversal_mode<F>(mode: &'static str, fut: F) -> F::Output
where
    F: std::future::Future,
{
    TRAVERSAL_MODE_OVERRIDE.scope(Some(mode), fut).await
}

/// The scoped traversal-mode override active for this task, if any. `None` in
/// production (no scope installed), so the env var is consulted instead.
pub(crate) fn traversal_mode_override() -> Option<&'static str> {
    TRAVERSAL_MODE_OVERRIDE.try_with(|m| *m).ok().flatten()
}

pub(crate) fn manifest_wrapper() -> Option<Arc<dyn WrappingObjectStore>> {
    current(|p| p.manifest_wrapper.clone()).flatten()
}

pub(crate) fn table_wrapper() -> Option<Arc<dyn WrappingObjectStore>> {
    current(|p| p.table_wrapper.clone()).flatten()
}

/// Record one version-probe invocation against the active per-query probes.
/// No-op when no probes are installed (production).
pub(crate) fn record_probe() {
    let _ = current(|p| p.probe_count.fetch_add(1, Ordering::Relaxed));
}

/// Internal/system table directory names. An open of one of these is a metadata
/// open (publisher CAS, recovery audit), NOT a data-table open. Kept in sync with
/// the dir constants in `db/manifest/layout.rs` and `db/recovery_audit.rs`.
const INTERNAL_TABLE_DIRS: [&str; 2] = ["__manifest", "_graph_commit_recoveries.lance"];

/// True when `uri`'s last path segment names an internal/system table.
fn open_is_internal(uri: &str) -> bool {
    let trimmed = uri.trim_end_matches('/');
    let last = trimmed.rsplit('/').next().unwrap_or(trimmed);
    INTERNAL_TABLE_DIRS.contains(&last)
}

/// Record one table-open call against the active per-query probes, classified by
/// table class (the URI's last segment) so the write gate counts DATA-table opens
/// only and ignores the publisher metadata opens. No-op in production
/// (the classification runs only inside the probe closure, which `current` skips
/// when no probes are installed). Called at both open chokepoints.
pub(crate) fn record_open(uri: &str) {
    let _ = current(|p| {
        if open_is_internal(uri) {
            p.internal_open_count.fetch_add(1, Ordering::Relaxed);
        } else {
            p.data_open_count.fetch_add(1, Ordering::Relaxed);
        }
    });
}

/// Record one topology-index build over `edges` edge tables (the
/// `RuntimeCache::graph_index` cache-miss path). No-op when no probes are
/// installed (production).
pub(crate) fn record_graph_build(edges: usize) {
    let _ = current(|p| {
        p.graph_build_count.fetch_add(1, Ordering::Relaxed);
        p.graph_edges_built.fetch_add(edges as u64, Ordering::Relaxed);
    });
}

/// Per-operation staged-write counts, installed for a task via
/// [`with_merge_write_probes`]. Lets a cost-budget test assert WHICH staged-write
/// primitive an operation invokes — e.g. that an append-only fast-forward merge
/// routes new rows through `stage_append` and does **zero** `stage_merge_insert`
/// (the full-outer hash join). Counts the publish-path primitives only;
/// merge-staging temp tables use `append_or_create_batch`, not these.
#[derive(Clone, Default)]
pub struct MergeWriteProbes {
    pub stage_append_calls: Arc<AtomicU64>,
    pub stage_append_rows: Arc<AtomicU64>,
    pub stage_merge_insert_calls: Arc<AtomicU64>,
    pub stage_merge_insert_rows: Arc<AtomicU64>,
    /// Inline vector-index (IVF) builds. The fast-forward adopt path defers
    /// index coverage to the reconciler, so an adopt merge must do 0 of these.
    pub create_vector_index_calls: Arc<AtomicU64>,
    /// Times the merge materialized a staged delta into one in-memory batch
    /// (`scan_staged_combined`). The append path streams instead, so an
    /// append-only fast-forward merge must do 0 of these.
    pub scan_staged_combined_calls: Arc<AtomicU64>,
}

impl MergeWriteProbes {
    pub fn stage_append_calls(&self) -> u64 {
        self.stage_append_calls.load(Ordering::Relaxed)
    }
    pub fn stage_append_rows(&self) -> u64 {
        self.stage_append_rows.load(Ordering::Relaxed)
    }
    pub fn stage_merge_insert_calls(&self) -> u64 {
        self.stage_merge_insert_calls.load(Ordering::Relaxed)
    }
    pub fn stage_merge_insert_rows(&self) -> u64 {
        self.stage_merge_insert_rows.load(Ordering::Relaxed)
    }
    pub fn create_vector_index_calls(&self) -> u64 {
        self.create_vector_index_calls.load(Ordering::Relaxed)
    }
    pub fn scan_staged_combined_calls(&self) -> u64 {
        self.scan_staged_combined_calls.load(Ordering::Relaxed)
    }
}

tokio::task_local! {
    static MERGE_WRITE_PROBES: MergeWriteProbes;
}

/// Run `fut` with staged-write probes installed. Test-only entry point; nothing
/// in production sets the probes, so `record_stage_*` below are no-ops.
pub async fn with_merge_write_probes<F>(probes: MergeWriteProbes, fut: F) -> F::Output
where
    F: std::future::Future,
{
    MERGE_WRITE_PROBES.scope(probes, fut).await
}

/// Record one `stage_append` of `rows` rows against the active probes. No-op in
/// production (no probes installed).
pub(crate) fn record_stage_append(rows: u64) {
    let _ = MERGE_WRITE_PROBES.try_with(|p| {
        p.stage_append_calls.fetch_add(1, Ordering::Relaxed);
        p.stage_append_rows.fetch_add(rows, Ordering::Relaxed);
    });
}

/// Record one `stage_merge_insert` of `rows` rows against the active probes.
/// No-op in production (no probes installed).
pub(crate) fn record_stage_merge_insert(rows: u64) {
    let _ = MERGE_WRITE_PROBES.try_with(|p| {
        p.stage_merge_insert_calls.fetch_add(1, Ordering::Relaxed);
        p.stage_merge_insert_rows.fetch_add(rows, Ordering::Relaxed);
    });
}

/// Record one inline vector-index build against the active probes. No-op in
/// production (no probes installed).
pub(crate) fn record_create_vector_index() {
    let _ = MERGE_WRITE_PROBES.try_with(|p| {
        p.create_vector_index_calls.fetch_add(1, Ordering::Relaxed);
    });
}

/// Record one `scan_staged_combined` materialization against the active probes.
/// No-op in production (no probes installed).
pub(crate) fn record_scan_staged_combined() {
    let _ = MERGE_WRITE_PROBES.try_with(|p| {
        p.scan_staged_combined_calls.fetch_add(1, Ordering::Relaxed);
    });
}

/// Open a Lance dataset at `uri`, attaching `wrapper` (for IO counting) when
/// present. With no wrapper this is exactly `Dataset::open(uri)`. The wrapper is
/// set via `ObjectStoreParams` on the builder so the open itself is counted
/// (`Dataset::with_object_store_wrappers` only wraps an already-open store).
pub(crate) async fn open_dataset_tracked(
    uri: &str,
    wrapper: Option<Arc<dyn WrappingObjectStore>>,
) -> Result<Dataset> {
    record_open(uri);
    let result = match wrapper {
        None => Dataset::open(uri).await,
        Some(wrapper) => {
            DatasetBuilder::from_uri(uri)
                .with_store_params(ObjectStoreParams {
                    object_store_wrapper: Some(wrapper),
                    ..Default::default()
                })
                .load()
                .await
        }
    };
    result.map_err(|e| OmniError::Lance(e.to_string()))
}

/// Open a data-table dataset at `location` pinned to `version` — the cache-miss
/// path of the data-read boundary (`SubTableEntry::open`). Attaches the shared
/// per-graph `Session` (warms metadata/index caches across opens, LanceDB's
/// one-session-per-connection pattern) and the per-query `table_wrapper` (for IO
/// counting) when present. With neither, this is exactly the Fix-2
/// `from_uri(location).with_version(version)` open.
pub(crate) async fn open_table_dataset(
    location: &str,
    version: u64,
    session: Option<&Arc<lance::session::Session>>,
) -> Result<Dataset> {
    record_open(location);
    let mut builder = DatasetBuilder::from_uri(location).with_version(version);
    if let Some(session) = session {
        builder = builder.with_session(session.clone());
    }
    if let Some(wrapper) = table_wrapper() {
        builder = builder.with_store_params(ObjectStoreParams {
            object_store_wrapper: Some(wrapper),
            ..Default::default()
        });
    }
    builder
        .load()
        .await
        .map_err(|e| OmniError::Lance(e.to_string()))
}

/// Per-method read counts for [`CountingStorageAdapter`].
#[derive(Debug, Default)]
pub struct StorageReadCounts {
    pub read_text: AtomicU64,
    pub exists: AtomicU64,
    pub read_text_versioned: AtomicU64,
    pub list_dir: AtomicU64,
}

impl StorageReadCounts {
    pub fn read_text(&self) -> u64 {
        self.read_text.load(Ordering::Relaxed)
    }
    pub fn exists(&self) -> u64 {
        self.exists.load(Ordering::Relaxed)
    }
    pub fn read_text_versioned(&self) -> u64 {
        self.read_text_versioned.load(Ordering::Relaxed)
    }
    pub fn list_dir(&self) -> u64 {
        self.list_dir.load(Ordering::Relaxed)
    }
}

/// Boundary decorator over a [`StorageAdapter`] that counts read-facing calls.
/// Reads delegate after incrementing; writes delegate unchanged. Construct with
/// [`CountingStorageAdapter::new`] and open an engine via
/// `Omnigraph::open_with_storage` to count its non-Lance storage IO.
#[derive(Debug)]
pub struct CountingStorageAdapter {
    inner: Arc<dyn StorageAdapter>,
    counts: Arc<StorageReadCounts>,
}

impl CountingStorageAdapter {
    /// Wrap `inner`, returning the adapter and a shared handle to its counts.
    pub fn new(inner: Arc<dyn StorageAdapter>) -> (Arc<dyn StorageAdapter>, Arc<StorageReadCounts>) {
        let counts = Arc::new(StorageReadCounts::default());
        let adapter: Arc<dyn StorageAdapter> = Arc::new(Self {
            inner,
            counts: Arc::clone(&counts),
        });
        (adapter, counts)
    }
}

#[async_trait]
impl StorageAdapter for CountingStorageAdapter {
    async fn read_text(&self, uri: &str) -> Result<String> {
        self.counts.read_text.fetch_add(1, Ordering::Relaxed);
        self.inner.read_text(uri).await
    }

    async fn write_text(&self, uri: &str, contents: &str) -> Result<()> {
        self.inner.write_text(uri, contents).await
    }

    async fn write_text_if_absent(&self, uri: &str, contents: &str) -> Result<bool> {
        self.inner.write_text_if_absent(uri, contents).await
    }

    async fn exists(&self, uri: &str) -> Result<bool> {
        self.counts.exists.fetch_add(1, Ordering::Relaxed);
        self.inner.exists(uri).await
    }

    async fn rename_text(&self, from_uri: &str, to_uri: &str) -> Result<()> {
        self.inner.rename_text(from_uri, to_uri).await
    }

    async fn delete(&self, uri: &str) -> Result<()> {
        self.inner.delete(uri).await
    }

    async fn list_dir(&self, dir_uri: &str) -> Result<Vec<String>> {
        self.counts.list_dir.fetch_add(1, Ordering::Relaxed);
        self.inner.list_dir(dir_uri).await
    }

    async fn read_text_versioned(&self, uri: &str) -> Result<(String, String)> {
        self.counts.read_text_versioned.fetch_add(1, Ordering::Relaxed);
        self.inner.read_text_versioned(uri).await
    }

    async fn write_text_if_match(
        &self,
        uri: &str,
        contents: &str,
        expected_version: &str,
    ) -> Result<Option<String>> {
        self.inner
            .write_text_if_match(uri, contents, expected_version)
            .await
    }

    async fn delete_prefix(&self, prefix_uri: &str) -> Result<()> {
        self.inner.delete_prefix(prefix_uri).await
    }
}