omnigraph/
instrumentation.rs1use std::sync::Arc;
20use std::sync::atomic::{AtomicU64, Ordering};
21
22use async_trait::async_trait;
23use lance::Dataset;
24use lance::dataset::builder::DatasetBuilder;
25use lance::io::{ObjectStoreParams, WrappingObjectStore};
26
27use crate::error::{OmniError, Result};
28use crate::storage::StorageAdapter;
29
30#[derive(Clone, Default)]
37pub struct QueryIoProbes {
38 pub manifest_wrapper: Option<Arc<dyn WrappingObjectStore>>,
39 pub commit_graph_wrapper: Option<Arc<dyn WrappingObjectStore>>,
40 pub table_wrapper: Option<Arc<dyn WrappingObjectStore>>,
45 pub probe_count: Arc<AtomicU64>,
46 pub data_open_count: Arc<AtomicU64>,
60 pub internal_open_count: Arc<AtomicU64>,
63}
64
65tokio::task_local! {
66 static QUERY_IO_PROBES: QueryIoProbes;
67}
68
69pub async fn with_query_io_probes<F>(probes: QueryIoProbes, fut: F) -> F::Output
72where
73 F: std::future::Future,
74{
75 QUERY_IO_PROBES.scope(probes, fut).await
76}
77
78fn current<R>(f: impl FnOnce(&QueryIoProbes) -> R) -> Option<R> {
79 QUERY_IO_PROBES.try_with(f).ok()
80}
81
82pub(crate) fn manifest_wrapper() -> Option<Arc<dyn WrappingObjectStore>> {
83 current(|p| p.manifest_wrapper.clone()).flatten()
84}
85
86pub(crate) fn commit_graph_wrapper() -> Option<Arc<dyn WrappingObjectStore>> {
87 current(|p| p.commit_graph_wrapper.clone()).flatten()
88}
89
90pub(crate) fn table_wrapper() -> Option<Arc<dyn WrappingObjectStore>> {
91 current(|p| p.table_wrapper.clone()).flatten()
92}
93
94pub(crate) fn record_probe() {
97 let _ = current(|p| p.probe_count.fetch_add(1, Ordering::Relaxed));
98}
99
100const INTERNAL_TABLE_DIRS: [&str; 4] = [
105 "__manifest",
106 "_graph_commits.lance",
107 "_graph_commit_actors.lance",
108 "_graph_commit_recoveries.lance",
109];
110
111fn open_is_internal(uri: &str) -> bool {
113 let trimmed = uri.trim_end_matches('/');
114 let last = trimmed.rsplit('/').next().unwrap_or(trimmed);
115 INTERNAL_TABLE_DIRS.contains(&last)
116}
117
118pub(crate) fn record_open(uri: &str) {
124 let _ = current(|p| {
125 if open_is_internal(uri) {
126 p.internal_open_count.fetch_add(1, Ordering::Relaxed);
127 } else {
128 p.data_open_count.fetch_add(1, Ordering::Relaxed);
129 }
130 });
131}
132
133#[derive(Clone, Default)]
140pub struct MergeWriteProbes {
141 pub stage_append_calls: Arc<AtomicU64>,
142 pub stage_append_rows: Arc<AtomicU64>,
143 pub stage_merge_insert_calls: Arc<AtomicU64>,
144 pub stage_merge_insert_rows: Arc<AtomicU64>,
145 pub create_vector_index_calls: Arc<AtomicU64>,
148 pub scan_staged_combined_calls: Arc<AtomicU64>,
152}
153
154impl MergeWriteProbes {
155 pub fn stage_append_calls(&self) -> u64 {
156 self.stage_append_calls.load(Ordering::Relaxed)
157 }
158 pub fn stage_append_rows(&self) -> u64 {
159 self.stage_append_rows.load(Ordering::Relaxed)
160 }
161 pub fn stage_merge_insert_calls(&self) -> u64 {
162 self.stage_merge_insert_calls.load(Ordering::Relaxed)
163 }
164 pub fn stage_merge_insert_rows(&self) -> u64 {
165 self.stage_merge_insert_rows.load(Ordering::Relaxed)
166 }
167 pub fn create_vector_index_calls(&self) -> u64 {
168 self.create_vector_index_calls.load(Ordering::Relaxed)
169 }
170 pub fn scan_staged_combined_calls(&self) -> u64 {
171 self.scan_staged_combined_calls.load(Ordering::Relaxed)
172 }
173}
174
175tokio::task_local! {
176 static MERGE_WRITE_PROBES: MergeWriteProbes;
177}
178
179pub async fn with_merge_write_probes<F>(probes: MergeWriteProbes, fut: F) -> F::Output
182where
183 F: std::future::Future,
184{
185 MERGE_WRITE_PROBES.scope(probes, fut).await
186}
187
188pub(crate) fn record_stage_append(rows: u64) {
191 let _ = MERGE_WRITE_PROBES.try_with(|p| {
192 p.stage_append_calls.fetch_add(1, Ordering::Relaxed);
193 p.stage_append_rows.fetch_add(rows, Ordering::Relaxed);
194 });
195}
196
197pub(crate) fn record_stage_merge_insert(rows: u64) {
200 let _ = MERGE_WRITE_PROBES.try_with(|p| {
201 p.stage_merge_insert_calls.fetch_add(1, Ordering::Relaxed);
202 p.stage_merge_insert_rows.fetch_add(rows, Ordering::Relaxed);
203 });
204}
205
206pub(crate) fn record_create_vector_index() {
209 let _ = MERGE_WRITE_PROBES.try_with(|p| {
210 p.create_vector_index_calls.fetch_add(1, Ordering::Relaxed);
211 });
212}
213
214pub(crate) fn record_scan_staged_combined() {
217 let _ = MERGE_WRITE_PROBES.try_with(|p| {
218 p.scan_staged_combined_calls.fetch_add(1, Ordering::Relaxed);
219 });
220}
221
222pub(crate) async fn open_dataset_tracked(
227 uri: &str,
228 wrapper: Option<Arc<dyn WrappingObjectStore>>,
229) -> Result<Dataset> {
230 record_open(uri);
231 let result = match wrapper {
232 None => Dataset::open(uri).await,
233 Some(wrapper) => {
234 DatasetBuilder::from_uri(uri)
235 .with_store_params(ObjectStoreParams {
236 object_store_wrapper: Some(wrapper),
237 ..Default::default()
238 })
239 .load()
240 .await
241 }
242 };
243 result.map_err(|e| OmniError::Lance(e.to_string()))
244}
245
246pub(crate) async fn open_table_dataset(
253 location: &str,
254 version: u64,
255 session: Option<&Arc<lance::session::Session>>,
256) -> Result<Dataset> {
257 record_open(location);
258 let mut builder = DatasetBuilder::from_uri(location).with_version(version);
259 if let Some(session) = session {
260 builder = builder.with_session(session.clone());
261 }
262 if let Some(wrapper) = table_wrapper() {
263 builder = builder.with_store_params(ObjectStoreParams {
264 object_store_wrapper: Some(wrapper),
265 ..Default::default()
266 });
267 }
268 builder
269 .load()
270 .await
271 .map_err(|e| OmniError::Lance(e.to_string()))
272}
273
274#[derive(Debug, Default)]
276pub struct StorageReadCounts {
277 pub read_text: AtomicU64,
278 pub exists: AtomicU64,
279 pub read_text_versioned: AtomicU64,
280 pub list_dir: AtomicU64,
281}
282
283impl StorageReadCounts {
284 pub fn read_text(&self) -> u64 {
285 self.read_text.load(Ordering::Relaxed)
286 }
287 pub fn exists(&self) -> u64 {
288 self.exists.load(Ordering::Relaxed)
289 }
290 pub fn read_text_versioned(&self) -> u64 {
291 self.read_text_versioned.load(Ordering::Relaxed)
292 }
293 pub fn list_dir(&self) -> u64 {
294 self.list_dir.load(Ordering::Relaxed)
295 }
296}
297
298#[derive(Debug)]
303pub struct CountingStorageAdapter {
304 inner: Arc<dyn StorageAdapter>,
305 counts: Arc<StorageReadCounts>,
306}
307
308impl CountingStorageAdapter {
309 pub fn new(inner: Arc<dyn StorageAdapter>) -> (Arc<dyn StorageAdapter>, Arc<StorageReadCounts>) {
311 let counts = Arc::new(StorageReadCounts::default());
312 let adapter: Arc<dyn StorageAdapter> = Arc::new(Self {
313 inner,
314 counts: Arc::clone(&counts),
315 });
316 (adapter, counts)
317 }
318}
319
320#[async_trait]
321impl StorageAdapter for CountingStorageAdapter {
322 async fn read_text(&self, uri: &str) -> Result<String> {
323 self.counts.read_text.fetch_add(1, Ordering::Relaxed);
324 self.inner.read_text(uri).await
325 }
326
327 async fn write_text(&self, uri: &str, contents: &str) -> Result<()> {
328 self.inner.write_text(uri, contents).await
329 }
330
331 async fn write_text_if_absent(&self, uri: &str, contents: &str) -> Result<bool> {
332 self.inner.write_text_if_absent(uri, contents).await
333 }
334
335 async fn exists(&self, uri: &str) -> Result<bool> {
336 self.counts.exists.fetch_add(1, Ordering::Relaxed);
337 self.inner.exists(uri).await
338 }
339
340 async fn rename_text(&self, from_uri: &str, to_uri: &str) -> Result<()> {
341 self.inner.rename_text(from_uri, to_uri).await
342 }
343
344 async fn delete(&self, uri: &str) -> Result<()> {
345 self.inner.delete(uri).await
346 }
347
348 async fn list_dir(&self, dir_uri: &str) -> Result<Vec<String>> {
349 self.counts.list_dir.fetch_add(1, Ordering::Relaxed);
350 self.inner.list_dir(dir_uri).await
351 }
352
353 async fn read_text_versioned(&self, uri: &str) -> Result<(String, String)> {
354 self.counts.read_text_versioned.fetch_add(1, Ordering::Relaxed);
355 self.inner.read_text_versioned(uri).await
356 }
357
358 async fn write_text_if_match(
359 &self,
360 uri: &str,
361 contents: &str,
362 expected_version: &str,
363 ) -> Result<Option<String>> {
364 self.inner
365 .write_text_if_match(uri, contents, expected_version)
366 .await
367 }
368
369 async fn delete_prefix(&self, prefix_uri: &str) -> Result<()> {
370 self.inner.delete_prefix(prefix_uri).await
371 }
372}