Skip to main content

omnigraph/
instrumentation.rs

1//! Read-path cost instrumentation (test seam).
2//!
3//! Two boundary instruments let cost-budget tests assert that a warm read does
4//! no redundant IO, the way LanceDB's IO-counted tests do (see
5//! `docs/dev/testing.md`, "Cost-budget tests"):
6//!
7//! - **Lance object store** — a per-query [`WrappingObjectStore`] attached to the
8//!   datasets a query opens, so a test counts real `read_iops`. Delivered through
9//!   a task-local ([`QueryIoProbes`]) set by the test; production leaves it unset,
10//!   so the open helpers attach nothing (one unset-`Option` check per open).
11//! - **omnigraph `StorageAdapter`** — [`CountingStorageAdapter`], a decorator that
12//!   counts per-method calls (the schema-contract reads on the query path).
13//!
14//! Nothing here changes runtime behavior: the wrappers only observe, and the
15//! decorator delegates every call. `IOTracker` (the concrete counter) lives in
16//! tests via the `lance-io` dev-dependency; this module stays generic over the
17//! `lance::io`-re-exported trait, so it adds no production dependency.
18
19use std::sync::Arc;
20use std::sync::atomic::{AtomicU64, Ordering};
21
22use async_trait::async_trait;
23use lance::Dataset;
24use lance::dataset::builder::DatasetBuilder;
25use lance::io::{ObjectStoreParams, WrappingObjectStore};
26
27use crate::error::{OmniError, Result};
28use crate::storage::StorageAdapter;
29
30/// Per-query IO probes, installed for a query's task via [`with_query_io_probes`].
31///
32/// Each wrapper is attached (when present) to the datasets that category opens,
33/// so a test reads `read_iops` off its own `IOTracker` handle. `probe_count`
34/// records calls to the version probe (which runs on the coordinator's already-open
35/// handle, so it is counted by invocation rather than by the per-query wrappers).
36#[derive(Clone, Default)]
37pub struct QueryIoProbes {
38    pub manifest_wrapper: Option<Arc<dyn WrappingObjectStore>>,
39    pub commit_graph_wrapper: Option<Arc<dyn WrappingObjectStore>>,
40    /// Attached to the per-table data opens a query performs (the cache-miss
41    /// path in `SubTableEntry::open`). Lets a cost test assert how many tables
42    /// a query actually opened — N on a cold read, 0 on a warm repeat once the
43    /// handle cache (Fix 3) serves them.
44    pub table_wrapper: Option<Arc<dyn WrappingObjectStore>>,
45    pub probe_count: Arc<AtomicU64>,
46}
47
48tokio::task_local! {
49    static QUERY_IO_PROBES: QueryIoProbes;
50}
51
52/// Run `fut` with per-query IO probes installed. Test-only entry point; nothing
53/// in production sets the probes, so the accessors below return `None`/no-op.
54pub async fn with_query_io_probes<F>(probes: QueryIoProbes, fut: F) -> F::Output
55where
56    F: std::future::Future,
57{
58    QUERY_IO_PROBES.scope(probes, fut).await
59}
60
61fn current<R>(f: impl FnOnce(&QueryIoProbes) -> R) -> Option<R> {
62    QUERY_IO_PROBES.try_with(f).ok()
63}
64
65pub(crate) fn manifest_wrapper() -> Option<Arc<dyn WrappingObjectStore>> {
66    current(|p| p.manifest_wrapper.clone()).flatten()
67}
68
69pub(crate) fn commit_graph_wrapper() -> Option<Arc<dyn WrappingObjectStore>> {
70    current(|p| p.commit_graph_wrapper.clone()).flatten()
71}
72
73pub(crate) fn table_wrapper() -> Option<Arc<dyn WrappingObjectStore>> {
74    current(|p| p.table_wrapper.clone()).flatten()
75}
76
77/// Record one version-probe invocation against the active per-query probes.
78/// No-op when no probes are installed (production).
79pub(crate) fn record_probe() {
80    let _ = current(|p| p.probe_count.fetch_add(1, Ordering::Relaxed));
81}
82
83/// Per-operation staged-write counts, installed for a task via
84/// [`with_merge_write_probes`]. Lets a cost-budget test assert WHICH staged-write
85/// primitive an operation invokes — e.g. that an append-only fast-forward merge
86/// routes new rows through `stage_append` and does **zero** `stage_merge_insert`
87/// (the full-outer hash join). Counts the publish-path primitives only;
88/// merge-staging temp tables use `append_or_create_batch`, not these.
89#[derive(Clone, Default)]
90pub struct MergeWriteProbes {
91    pub stage_append_calls: Arc<AtomicU64>,
92    pub stage_append_rows: Arc<AtomicU64>,
93    pub stage_merge_insert_calls: Arc<AtomicU64>,
94    pub stage_merge_insert_rows: Arc<AtomicU64>,
95    /// Inline vector-index (IVF) builds. The fast-forward adopt path defers
96    /// index coverage to the reconciler, so an adopt merge must do 0 of these.
97    pub create_vector_index_calls: Arc<AtomicU64>,
98    /// Times the merge materialized a staged delta into one in-memory batch
99    /// (`scan_staged_combined`). The append path streams instead, so an
100    /// append-only fast-forward merge must do 0 of these.
101    pub scan_staged_combined_calls: Arc<AtomicU64>,
102}
103
104impl MergeWriteProbes {
105    pub fn stage_append_calls(&self) -> u64 {
106        self.stage_append_calls.load(Ordering::Relaxed)
107    }
108    pub fn stage_append_rows(&self) -> u64 {
109        self.stage_append_rows.load(Ordering::Relaxed)
110    }
111    pub fn stage_merge_insert_calls(&self) -> u64 {
112        self.stage_merge_insert_calls.load(Ordering::Relaxed)
113    }
114    pub fn stage_merge_insert_rows(&self) -> u64 {
115        self.stage_merge_insert_rows.load(Ordering::Relaxed)
116    }
117    pub fn create_vector_index_calls(&self) -> u64 {
118        self.create_vector_index_calls.load(Ordering::Relaxed)
119    }
120    pub fn scan_staged_combined_calls(&self) -> u64 {
121        self.scan_staged_combined_calls.load(Ordering::Relaxed)
122    }
123}
124
125tokio::task_local! {
126    static MERGE_WRITE_PROBES: MergeWriteProbes;
127}
128
129/// Run `fut` with staged-write probes installed. Test-only entry point; nothing
130/// in production sets the probes, so `record_stage_*` below are no-ops.
131pub async fn with_merge_write_probes<F>(probes: MergeWriteProbes, fut: F) -> F::Output
132where
133    F: std::future::Future,
134{
135    MERGE_WRITE_PROBES.scope(probes, fut).await
136}
137
138/// Record one `stage_append` of `rows` rows against the active probes. No-op in
139/// production (no probes installed).
140pub(crate) fn record_stage_append(rows: u64) {
141    let _ = MERGE_WRITE_PROBES.try_with(|p| {
142        p.stage_append_calls.fetch_add(1, Ordering::Relaxed);
143        p.stage_append_rows.fetch_add(rows, Ordering::Relaxed);
144    });
145}
146
147/// Record one `stage_merge_insert` of `rows` rows against the active probes.
148/// No-op in production (no probes installed).
149pub(crate) fn record_stage_merge_insert(rows: u64) {
150    let _ = MERGE_WRITE_PROBES.try_with(|p| {
151        p.stage_merge_insert_calls.fetch_add(1, Ordering::Relaxed);
152        p.stage_merge_insert_rows.fetch_add(rows, Ordering::Relaxed);
153    });
154}
155
156/// Record one inline vector-index build against the active probes. No-op in
157/// production (no probes installed).
158pub(crate) fn record_create_vector_index() {
159    let _ = MERGE_WRITE_PROBES.try_with(|p| {
160        p.create_vector_index_calls.fetch_add(1, Ordering::Relaxed);
161    });
162}
163
164/// Record one `scan_staged_combined` materialization against the active probes.
165/// No-op in production (no probes installed).
166pub(crate) fn record_scan_staged_combined() {
167    let _ = MERGE_WRITE_PROBES.try_with(|p| {
168        p.scan_staged_combined_calls.fetch_add(1, Ordering::Relaxed);
169    });
170}
171
172/// Open a Lance dataset at `uri`, attaching `wrapper` (for IO counting) when
173/// present. With no wrapper this is exactly `Dataset::open(uri)`. The wrapper is
174/// set via `ObjectStoreParams` on the builder so the open itself is counted
175/// (`Dataset::with_object_store_wrappers` only wraps an already-open store).
176pub(crate) async fn open_dataset_tracked(
177    uri: &str,
178    wrapper: Option<Arc<dyn WrappingObjectStore>>,
179) -> Result<Dataset> {
180    let result = match wrapper {
181        None => Dataset::open(uri).await,
182        Some(wrapper) => {
183            DatasetBuilder::from_uri(uri)
184                .with_store_params(ObjectStoreParams {
185                    object_store_wrapper: Some(wrapper),
186                    ..Default::default()
187                })
188                .load()
189                .await
190        }
191    };
192    result.map_err(|e| OmniError::Lance(e.to_string()))
193}
194
195/// Open a data-table dataset at `location` pinned to `version` — the cache-miss
196/// path of the data-read boundary (`SubTableEntry::open`). Attaches the shared
197/// per-graph `Session` (warms metadata/index caches across opens, LanceDB's
198/// one-session-per-connection pattern) and the per-query `table_wrapper` (for IO
199/// counting) when present. With neither, this is exactly the Fix-2
200/// `from_uri(location).with_version(version)` open.
201pub(crate) async fn open_table_dataset(
202    location: &str,
203    version: u64,
204    session: Option<&Arc<lance::session::Session>>,
205) -> Result<Dataset> {
206    let mut builder = DatasetBuilder::from_uri(location).with_version(version);
207    if let Some(session) = session {
208        builder = builder.with_session(session.clone());
209    }
210    if let Some(wrapper) = table_wrapper() {
211        builder = builder.with_store_params(ObjectStoreParams {
212            object_store_wrapper: Some(wrapper),
213            ..Default::default()
214        });
215    }
216    builder
217        .load()
218        .await
219        .map_err(|e| OmniError::Lance(e.to_string()))
220}
221
222/// Per-method read counts for [`CountingStorageAdapter`].
223#[derive(Debug, Default)]
224pub struct StorageReadCounts {
225    pub read_text: AtomicU64,
226    pub exists: AtomicU64,
227    pub read_text_versioned: AtomicU64,
228    pub list_dir: AtomicU64,
229}
230
231impl StorageReadCounts {
232    pub fn read_text(&self) -> u64 {
233        self.read_text.load(Ordering::Relaxed)
234    }
235    pub fn exists(&self) -> u64 {
236        self.exists.load(Ordering::Relaxed)
237    }
238    pub fn read_text_versioned(&self) -> u64 {
239        self.read_text_versioned.load(Ordering::Relaxed)
240    }
241    pub fn list_dir(&self) -> u64 {
242        self.list_dir.load(Ordering::Relaxed)
243    }
244}
245
246/// Boundary decorator over a [`StorageAdapter`] that counts read-facing calls.
247/// Reads delegate after incrementing; writes delegate unchanged. Construct with
248/// [`CountingStorageAdapter::new`] and open an engine via
249/// `Omnigraph::open_with_storage` to count its non-Lance storage IO.
250#[derive(Debug)]
251pub struct CountingStorageAdapter {
252    inner: Arc<dyn StorageAdapter>,
253    counts: Arc<StorageReadCounts>,
254}
255
256impl CountingStorageAdapter {
257    /// Wrap `inner`, returning the adapter and a shared handle to its counts.
258    pub fn new(inner: Arc<dyn StorageAdapter>) -> (Arc<dyn StorageAdapter>, Arc<StorageReadCounts>) {
259        let counts = Arc::new(StorageReadCounts::default());
260        let adapter: Arc<dyn StorageAdapter> = Arc::new(Self {
261            inner,
262            counts: Arc::clone(&counts),
263        });
264        (adapter, counts)
265    }
266}
267
268#[async_trait]
269impl StorageAdapter for CountingStorageAdapter {
270    async fn read_text(&self, uri: &str) -> Result<String> {
271        self.counts.read_text.fetch_add(1, Ordering::Relaxed);
272        self.inner.read_text(uri).await
273    }
274
275    async fn write_text(&self, uri: &str, contents: &str) -> Result<()> {
276        self.inner.write_text(uri, contents).await
277    }
278
279    async fn write_text_if_absent(&self, uri: &str, contents: &str) -> Result<bool> {
280        self.inner.write_text_if_absent(uri, contents).await
281    }
282
283    async fn exists(&self, uri: &str) -> Result<bool> {
284        self.counts.exists.fetch_add(1, Ordering::Relaxed);
285        self.inner.exists(uri).await
286    }
287
288    async fn rename_text(&self, from_uri: &str, to_uri: &str) -> Result<()> {
289        self.inner.rename_text(from_uri, to_uri).await
290    }
291
292    async fn delete(&self, uri: &str) -> Result<()> {
293        self.inner.delete(uri).await
294    }
295
296    async fn list_dir(&self, dir_uri: &str) -> Result<Vec<String>> {
297        self.counts.list_dir.fetch_add(1, Ordering::Relaxed);
298        self.inner.list_dir(dir_uri).await
299    }
300
301    async fn read_text_versioned(&self, uri: &str) -> Result<(String, String)> {
302        self.counts.read_text_versioned.fetch_add(1, Ordering::Relaxed);
303        self.inner.read_text_versioned(uri).await
304    }
305
306    async fn write_text_if_match(
307        &self,
308        uri: &str,
309        contents: &str,
310        expected_version: &str,
311    ) -> Result<Option<String>> {
312        self.inner
313            .write_text_if_match(uri, contents, expected_version)
314            .await
315    }
316
317    async fn delete_prefix(&self, prefix_uri: &str) -> Result<()> {
318        self.inner.delete_prefix(prefix_uri).await
319    }
320}