Skip to main content

omnigraph/
instrumentation.rs

1//! Read-path cost instrumentation (test seam).
2//!
3//! Two boundary instruments let cost-budget tests assert that a warm read does
4//! no redundant IO, the way LanceDB's IO-counted tests do (see
5//! `docs/dev/testing.md`, "Cost-budget tests"):
6//!
7//! - **Lance object store** — a per-query [`WrappingObjectStore`] attached to the
8//!   datasets a query opens, so a test counts real `read_iops`. Delivered through
9//!   a task-local ([`QueryIoProbes`]) set by the test; production leaves it unset,
10//!   so the open helpers attach nothing (one unset-`Option` check per open).
11//! - **omnigraph `StorageAdapter`** — [`CountingStorageAdapter`], a decorator that
12//!   counts per-method calls (the schema-contract reads on the query path).
13//!
14//! Nothing here changes runtime behavior: the wrappers only observe, and the
15//! decorator delegates every call. `IOTracker` (the concrete counter) lives in
16//! tests via the `lance-io` dev-dependency; this module stays generic over the
17//! `lance::io`-re-exported trait, so it adds no production dependency.
18
19use std::sync::Arc;
20use std::sync::atomic::{AtomicU64, Ordering};
21
22use async_trait::async_trait;
23use lance::Dataset;
24use lance::dataset::builder::DatasetBuilder;
25use lance::io::{ObjectStoreParams, WrappingObjectStore};
26
27use crate::error::{OmniError, Result};
28use crate::storage::StorageAdapter;
29
30/// Per-query IO probes, installed for a query's task via [`with_query_io_probes`].
31///
32/// Each wrapper is attached (when present) to the datasets that category opens,
33/// so a test reads `read_iops` off its own `IOTracker` handle. `probe_count`
34/// records calls to the version probe (which runs on the coordinator's already-open
35/// handle, so it is counted by invocation rather than by the per-query wrappers).
36#[derive(Clone, Default)]
37pub struct QueryIoProbes {
38    pub manifest_wrapper: Option<Arc<dyn WrappingObjectStore>>,
39    pub commit_graph_wrapper: Option<Arc<dyn WrappingObjectStore>>,
40    /// Attached to the per-table data opens a query performs (the cache-miss
41    /// path in `SubTableEntry::open`). Lets a cost test assert how many tables
42    /// a query actually opened — N on a cold read, 0 on a warm repeat once the
43    /// handle cache (Fix 3) serves them.
44    pub table_wrapper: Option<Arc<dyn WrappingObjectStore>>,
45    pub probe_count: Arc<AtomicU64>,
46    /// Counts DATA-table open CALLS through the two instrumented chokepoints
47    /// (`open_dataset_tracked` / `open_table_dataset`), classified by URI so the
48    /// internal/system tables (`__manifest`, `_graph_commits*`) are EXCLUDED — the
49    /// publisher CAS and commit-graph append open those every write, and counting
50    /// them would make the `data_open_count <= |touched_tables|` write gate
51    /// (RFC-013 step 3b) unreachable by threading alone. Unlike the opener-read
52    /// term (which mixes with the merge-insert/RI scan on the write path), this is
53    /// an exact open-invocation count. `forbidden_apis` keeps engine code OUTSIDE the
54    /// storage layer (`exec/`, `db/omnigraph/`, `loader/`, `changes/`) from opening
55    /// datasets except through these chokepoints, so the count is complete for the
56    /// keyed-write data path the gate measures. (`table_store.rs` is allow-listed and
57    /// does hold direct `Dataset::open`s — but only for branch-management ops
58    /// (`delete_branch`/`list_branches`/`force_delete_branch`), never that hot path.)
59    pub data_open_count: Arc<AtomicU64>,
60    /// Internal/system-table (`__manifest`, `_graph_commits*`) open CALLS — the
61    /// complement of `data_open_count`, kept for symmetry and debugging.
62    pub internal_open_count: Arc<AtomicU64>,
63}
64
65tokio::task_local! {
66    static QUERY_IO_PROBES: QueryIoProbes;
67}
68
69/// Run `fut` with per-query IO probes installed. Test-only entry point; nothing
70/// in production sets the probes, so the accessors below return `None`/no-op.
71pub async fn with_query_io_probes<F>(probes: QueryIoProbes, fut: F) -> F::Output
72where
73    F: std::future::Future,
74{
75    QUERY_IO_PROBES.scope(probes, fut).await
76}
77
78fn current<R>(f: impl FnOnce(&QueryIoProbes) -> R) -> Option<R> {
79    QUERY_IO_PROBES.try_with(f).ok()
80}
81
82pub(crate) fn manifest_wrapper() -> Option<Arc<dyn WrappingObjectStore>> {
83    current(|p| p.manifest_wrapper.clone()).flatten()
84}
85
86pub(crate) fn commit_graph_wrapper() -> Option<Arc<dyn WrappingObjectStore>> {
87    current(|p| p.commit_graph_wrapper.clone()).flatten()
88}
89
90pub(crate) fn table_wrapper() -> Option<Arc<dyn WrappingObjectStore>> {
91    current(|p| p.table_wrapper.clone()).flatten()
92}
93
94/// Record one version-probe invocation against the active per-query probes.
95/// No-op when no probes are installed (production).
96pub(crate) fn record_probe() {
97    let _ = current(|p| p.probe_count.fetch_add(1, Ordering::Relaxed));
98}
99
100/// Internal/system table directory names. An open of one of these is a metadata
101/// open (publisher CAS, commit-graph append, recovery audit), NOT a data-table
102/// open. Kept in sync with the dir constants in `db/manifest/layout.rs`,
103/// `db/commit_graph.rs`, and `db/recovery_audit.rs`.
104const INTERNAL_TABLE_DIRS: [&str; 4] = [
105    "__manifest",
106    "_graph_commits.lance",
107    "_graph_commit_actors.lance",
108    "_graph_commit_recoveries.lance",
109];
110
111/// True when `uri`'s last path segment names an internal/system table.
112fn open_is_internal(uri: &str) -> bool {
113    let trimmed = uri.trim_end_matches('/');
114    let last = trimmed.rsplit('/').next().unwrap_or(trimmed);
115    INTERNAL_TABLE_DIRS.contains(&last)
116}
117
118/// Record one table-open call against the active per-query probes, classified by
119/// table class (the URI's last segment) so the write gate counts DATA-table opens
120/// only and ignores the publisher/commit-graph metadata opens. No-op in production
121/// (the classification runs only inside the probe closure, which `current` skips
122/// when no probes are installed). Called at both open chokepoints.
123pub(crate) fn record_open(uri: &str) {
124    let _ = current(|p| {
125        if open_is_internal(uri) {
126            p.internal_open_count.fetch_add(1, Ordering::Relaxed);
127        } else {
128            p.data_open_count.fetch_add(1, Ordering::Relaxed);
129        }
130    });
131}
132
133/// Per-operation staged-write counts, installed for a task via
134/// [`with_merge_write_probes`]. Lets a cost-budget test assert WHICH staged-write
135/// primitive an operation invokes — e.g. that an append-only fast-forward merge
136/// routes new rows through `stage_append` and does **zero** `stage_merge_insert`
137/// (the full-outer hash join). Counts the publish-path primitives only;
138/// merge-staging temp tables use `append_or_create_batch`, not these.
139#[derive(Clone, Default)]
140pub struct MergeWriteProbes {
141    pub stage_append_calls: Arc<AtomicU64>,
142    pub stage_append_rows: Arc<AtomicU64>,
143    pub stage_merge_insert_calls: Arc<AtomicU64>,
144    pub stage_merge_insert_rows: Arc<AtomicU64>,
145    /// Inline vector-index (IVF) builds. The fast-forward adopt path defers
146    /// index coverage to the reconciler, so an adopt merge must do 0 of these.
147    pub create_vector_index_calls: Arc<AtomicU64>,
148    /// Times the merge materialized a staged delta into one in-memory batch
149    /// (`scan_staged_combined`). The append path streams instead, so an
150    /// append-only fast-forward merge must do 0 of these.
151    pub scan_staged_combined_calls: Arc<AtomicU64>,
152}
153
154impl MergeWriteProbes {
155    pub fn stage_append_calls(&self) -> u64 {
156        self.stage_append_calls.load(Ordering::Relaxed)
157    }
158    pub fn stage_append_rows(&self) -> u64 {
159        self.stage_append_rows.load(Ordering::Relaxed)
160    }
161    pub fn stage_merge_insert_calls(&self) -> u64 {
162        self.stage_merge_insert_calls.load(Ordering::Relaxed)
163    }
164    pub fn stage_merge_insert_rows(&self) -> u64 {
165        self.stage_merge_insert_rows.load(Ordering::Relaxed)
166    }
167    pub fn create_vector_index_calls(&self) -> u64 {
168        self.create_vector_index_calls.load(Ordering::Relaxed)
169    }
170    pub fn scan_staged_combined_calls(&self) -> u64 {
171        self.scan_staged_combined_calls.load(Ordering::Relaxed)
172    }
173}
174
175tokio::task_local! {
176    static MERGE_WRITE_PROBES: MergeWriteProbes;
177}
178
179/// Run `fut` with staged-write probes installed. Test-only entry point; nothing
180/// in production sets the probes, so `record_stage_*` below are no-ops.
181pub async fn with_merge_write_probes<F>(probes: MergeWriteProbes, fut: F) -> F::Output
182where
183    F: std::future::Future,
184{
185    MERGE_WRITE_PROBES.scope(probes, fut).await
186}
187
188/// Record one `stage_append` of `rows` rows against the active probes. No-op in
189/// production (no probes installed).
190pub(crate) fn record_stage_append(rows: u64) {
191    let _ = MERGE_WRITE_PROBES.try_with(|p| {
192        p.stage_append_calls.fetch_add(1, Ordering::Relaxed);
193        p.stage_append_rows.fetch_add(rows, Ordering::Relaxed);
194    });
195}
196
197/// Record one `stage_merge_insert` of `rows` rows against the active probes.
198/// No-op in production (no probes installed).
199pub(crate) fn record_stage_merge_insert(rows: u64) {
200    let _ = MERGE_WRITE_PROBES.try_with(|p| {
201        p.stage_merge_insert_calls.fetch_add(1, Ordering::Relaxed);
202        p.stage_merge_insert_rows.fetch_add(rows, Ordering::Relaxed);
203    });
204}
205
206/// Record one inline vector-index build against the active probes. No-op in
207/// production (no probes installed).
208pub(crate) fn record_create_vector_index() {
209    let _ = MERGE_WRITE_PROBES.try_with(|p| {
210        p.create_vector_index_calls.fetch_add(1, Ordering::Relaxed);
211    });
212}
213
214/// Record one `scan_staged_combined` materialization against the active probes.
215/// No-op in production (no probes installed).
216pub(crate) fn record_scan_staged_combined() {
217    let _ = MERGE_WRITE_PROBES.try_with(|p| {
218        p.scan_staged_combined_calls.fetch_add(1, Ordering::Relaxed);
219    });
220}
221
222/// Open a Lance dataset at `uri`, attaching `wrapper` (for IO counting) when
223/// present. With no wrapper this is exactly `Dataset::open(uri)`. The wrapper is
224/// set via `ObjectStoreParams` on the builder so the open itself is counted
225/// (`Dataset::with_object_store_wrappers` only wraps an already-open store).
226pub(crate) async fn open_dataset_tracked(
227    uri: &str,
228    wrapper: Option<Arc<dyn WrappingObjectStore>>,
229) -> Result<Dataset> {
230    record_open(uri);
231    let result = match wrapper {
232        None => Dataset::open(uri).await,
233        Some(wrapper) => {
234            DatasetBuilder::from_uri(uri)
235                .with_store_params(ObjectStoreParams {
236                    object_store_wrapper: Some(wrapper),
237                    ..Default::default()
238                })
239                .load()
240                .await
241        }
242    };
243    result.map_err(|e| OmniError::Lance(e.to_string()))
244}
245
246/// Open a data-table dataset at `location` pinned to `version` — the cache-miss
247/// path of the data-read boundary (`SubTableEntry::open`). Attaches the shared
248/// per-graph `Session` (warms metadata/index caches across opens, LanceDB's
249/// one-session-per-connection pattern) and the per-query `table_wrapper` (for IO
250/// counting) when present. With neither, this is exactly the Fix-2
251/// `from_uri(location).with_version(version)` open.
252pub(crate) async fn open_table_dataset(
253    location: &str,
254    version: u64,
255    session: Option<&Arc<lance::session::Session>>,
256) -> Result<Dataset> {
257    record_open(location);
258    let mut builder = DatasetBuilder::from_uri(location).with_version(version);
259    if let Some(session) = session {
260        builder = builder.with_session(session.clone());
261    }
262    if let Some(wrapper) = table_wrapper() {
263        builder = builder.with_store_params(ObjectStoreParams {
264            object_store_wrapper: Some(wrapper),
265            ..Default::default()
266        });
267    }
268    builder
269        .load()
270        .await
271        .map_err(|e| OmniError::Lance(e.to_string()))
272}
273
274/// Per-method read counts for [`CountingStorageAdapter`].
275#[derive(Debug, Default)]
276pub struct StorageReadCounts {
277    pub read_text: AtomicU64,
278    pub exists: AtomicU64,
279    pub read_text_versioned: AtomicU64,
280    pub list_dir: AtomicU64,
281}
282
283impl StorageReadCounts {
284    pub fn read_text(&self) -> u64 {
285        self.read_text.load(Ordering::Relaxed)
286    }
287    pub fn exists(&self) -> u64 {
288        self.exists.load(Ordering::Relaxed)
289    }
290    pub fn read_text_versioned(&self) -> u64 {
291        self.read_text_versioned.load(Ordering::Relaxed)
292    }
293    pub fn list_dir(&self) -> u64 {
294        self.list_dir.load(Ordering::Relaxed)
295    }
296}
297
298/// Boundary decorator over a [`StorageAdapter`] that counts read-facing calls.
299/// Reads delegate after incrementing; writes delegate unchanged. Construct with
300/// [`CountingStorageAdapter::new`] and open an engine via
301/// `Omnigraph::open_with_storage` to count its non-Lance storage IO.
302#[derive(Debug)]
303pub struct CountingStorageAdapter {
304    inner: Arc<dyn StorageAdapter>,
305    counts: Arc<StorageReadCounts>,
306}
307
308impl CountingStorageAdapter {
309    /// Wrap `inner`, returning the adapter and a shared handle to its counts.
310    pub fn new(inner: Arc<dyn StorageAdapter>) -> (Arc<dyn StorageAdapter>, Arc<StorageReadCounts>) {
311        let counts = Arc::new(StorageReadCounts::default());
312        let adapter: Arc<dyn StorageAdapter> = Arc::new(Self {
313            inner,
314            counts: Arc::clone(&counts),
315        });
316        (adapter, counts)
317    }
318}
319
320#[async_trait]
321impl StorageAdapter for CountingStorageAdapter {
322    async fn read_text(&self, uri: &str) -> Result<String> {
323        self.counts.read_text.fetch_add(1, Ordering::Relaxed);
324        self.inner.read_text(uri).await
325    }
326
327    async fn write_text(&self, uri: &str, contents: &str) -> Result<()> {
328        self.inner.write_text(uri, contents).await
329    }
330
331    async fn write_text_if_absent(&self, uri: &str, contents: &str) -> Result<bool> {
332        self.inner.write_text_if_absent(uri, contents).await
333    }
334
335    async fn exists(&self, uri: &str) -> Result<bool> {
336        self.counts.exists.fetch_add(1, Ordering::Relaxed);
337        self.inner.exists(uri).await
338    }
339
340    async fn rename_text(&self, from_uri: &str, to_uri: &str) -> Result<()> {
341        self.inner.rename_text(from_uri, to_uri).await
342    }
343
344    async fn delete(&self, uri: &str) -> Result<()> {
345        self.inner.delete(uri).await
346    }
347
348    async fn list_dir(&self, dir_uri: &str) -> Result<Vec<String>> {
349        self.counts.list_dir.fetch_add(1, Ordering::Relaxed);
350        self.inner.list_dir(dir_uri).await
351    }
352
353    async fn read_text_versioned(&self, uri: &str) -> Result<(String, String)> {
354        self.counts.read_text_versioned.fetch_add(1, Ordering::Relaxed);
355        self.inner.read_text_versioned(uri).await
356    }
357
358    async fn write_text_if_match(
359        &self,
360        uri: &str,
361        contents: &str,
362        expected_version: &str,
363    ) -> Result<Option<String>> {
364        self.inner
365            .write_text_if_match(uri, contents, expected_version)
366            .await
367    }
368
369    async fn delete_prefix(&self, prefix_uri: &str) -> Result<()> {
370        self.inner.delete_prefix(prefix_uri).await
371    }
372}