omnigraph/
instrumentation.rs1use std::sync::Arc;
20use std::sync::atomic::{AtomicU64, Ordering};
21
22use async_trait::async_trait;
23use lance::Dataset;
24use lance::dataset::builder::DatasetBuilder;
25use lance::io::{ObjectStoreParams, WrappingObjectStore};
26
27use crate::error::{OmniError, Result};
28use crate::storage::StorageAdapter;
29
30#[derive(Clone, Default)]
37pub struct QueryIoProbes {
38 pub manifest_wrapper: Option<Arc<dyn WrappingObjectStore>>,
39 pub commit_graph_wrapper: Option<Arc<dyn WrappingObjectStore>>,
40 pub table_wrapper: Option<Arc<dyn WrappingObjectStore>>,
45 pub probe_count: Arc<AtomicU64>,
46}
47
48tokio::task_local! {
49 static QUERY_IO_PROBES: QueryIoProbes;
50}
51
52pub async fn with_query_io_probes<F>(probes: QueryIoProbes, fut: F) -> F::Output
55where
56 F: std::future::Future,
57{
58 QUERY_IO_PROBES.scope(probes, fut).await
59}
60
61fn current<R>(f: impl FnOnce(&QueryIoProbes) -> R) -> Option<R> {
62 QUERY_IO_PROBES.try_with(f).ok()
63}
64
65pub(crate) fn manifest_wrapper() -> Option<Arc<dyn WrappingObjectStore>> {
66 current(|p| p.manifest_wrapper.clone()).flatten()
67}
68
69pub(crate) fn commit_graph_wrapper() -> Option<Arc<dyn WrappingObjectStore>> {
70 current(|p| p.commit_graph_wrapper.clone()).flatten()
71}
72
73pub(crate) fn table_wrapper() -> Option<Arc<dyn WrappingObjectStore>> {
74 current(|p| p.table_wrapper.clone()).flatten()
75}
76
77pub(crate) fn record_probe() {
80 let _ = current(|p| p.probe_count.fetch_add(1, Ordering::Relaxed));
81}
82
83#[derive(Clone, Default)]
90pub struct MergeWriteProbes {
91 pub stage_append_calls: Arc<AtomicU64>,
92 pub stage_append_rows: Arc<AtomicU64>,
93 pub stage_merge_insert_calls: Arc<AtomicU64>,
94 pub stage_merge_insert_rows: Arc<AtomicU64>,
95 pub create_vector_index_calls: Arc<AtomicU64>,
98 pub scan_staged_combined_calls: Arc<AtomicU64>,
102}
103
104impl MergeWriteProbes {
105 pub fn stage_append_calls(&self) -> u64 {
106 self.stage_append_calls.load(Ordering::Relaxed)
107 }
108 pub fn stage_append_rows(&self) -> u64 {
109 self.stage_append_rows.load(Ordering::Relaxed)
110 }
111 pub fn stage_merge_insert_calls(&self) -> u64 {
112 self.stage_merge_insert_calls.load(Ordering::Relaxed)
113 }
114 pub fn stage_merge_insert_rows(&self) -> u64 {
115 self.stage_merge_insert_rows.load(Ordering::Relaxed)
116 }
117 pub fn create_vector_index_calls(&self) -> u64 {
118 self.create_vector_index_calls.load(Ordering::Relaxed)
119 }
120 pub fn scan_staged_combined_calls(&self) -> u64 {
121 self.scan_staged_combined_calls.load(Ordering::Relaxed)
122 }
123}
124
125tokio::task_local! {
126 static MERGE_WRITE_PROBES: MergeWriteProbes;
127}
128
129pub async fn with_merge_write_probes<F>(probes: MergeWriteProbes, fut: F) -> F::Output
132where
133 F: std::future::Future,
134{
135 MERGE_WRITE_PROBES.scope(probes, fut).await
136}
137
138pub(crate) fn record_stage_append(rows: u64) {
141 let _ = MERGE_WRITE_PROBES.try_with(|p| {
142 p.stage_append_calls.fetch_add(1, Ordering::Relaxed);
143 p.stage_append_rows.fetch_add(rows, Ordering::Relaxed);
144 });
145}
146
147pub(crate) fn record_stage_merge_insert(rows: u64) {
150 let _ = MERGE_WRITE_PROBES.try_with(|p| {
151 p.stage_merge_insert_calls.fetch_add(1, Ordering::Relaxed);
152 p.stage_merge_insert_rows.fetch_add(rows, Ordering::Relaxed);
153 });
154}
155
156pub(crate) fn record_create_vector_index() {
159 let _ = MERGE_WRITE_PROBES.try_with(|p| {
160 p.create_vector_index_calls.fetch_add(1, Ordering::Relaxed);
161 });
162}
163
164pub(crate) fn record_scan_staged_combined() {
167 let _ = MERGE_WRITE_PROBES.try_with(|p| {
168 p.scan_staged_combined_calls.fetch_add(1, Ordering::Relaxed);
169 });
170}
171
172pub(crate) async fn open_dataset_tracked(
177 uri: &str,
178 wrapper: Option<Arc<dyn WrappingObjectStore>>,
179) -> Result<Dataset> {
180 let result = match wrapper {
181 None => Dataset::open(uri).await,
182 Some(wrapper) => {
183 DatasetBuilder::from_uri(uri)
184 .with_store_params(ObjectStoreParams {
185 object_store_wrapper: Some(wrapper),
186 ..Default::default()
187 })
188 .load()
189 .await
190 }
191 };
192 result.map_err(|e| OmniError::Lance(e.to_string()))
193}
194
195pub(crate) async fn open_table_dataset(
202 location: &str,
203 version: u64,
204 session: Option<&Arc<lance::session::Session>>,
205) -> Result<Dataset> {
206 let mut builder = DatasetBuilder::from_uri(location).with_version(version);
207 if let Some(session) = session {
208 builder = builder.with_session(session.clone());
209 }
210 if let Some(wrapper) = table_wrapper() {
211 builder = builder.with_store_params(ObjectStoreParams {
212 object_store_wrapper: Some(wrapper),
213 ..Default::default()
214 });
215 }
216 builder
217 .load()
218 .await
219 .map_err(|e| OmniError::Lance(e.to_string()))
220}
221
222#[derive(Debug, Default)]
224pub struct StorageReadCounts {
225 pub read_text: AtomicU64,
226 pub exists: AtomicU64,
227 pub read_text_versioned: AtomicU64,
228 pub list_dir: AtomicU64,
229}
230
231impl StorageReadCounts {
232 pub fn read_text(&self) -> u64 {
233 self.read_text.load(Ordering::Relaxed)
234 }
235 pub fn exists(&self) -> u64 {
236 self.exists.load(Ordering::Relaxed)
237 }
238 pub fn read_text_versioned(&self) -> u64 {
239 self.read_text_versioned.load(Ordering::Relaxed)
240 }
241 pub fn list_dir(&self) -> u64 {
242 self.list_dir.load(Ordering::Relaxed)
243 }
244}
245
246#[derive(Debug)]
251pub struct CountingStorageAdapter {
252 inner: Arc<dyn StorageAdapter>,
253 counts: Arc<StorageReadCounts>,
254}
255
256impl CountingStorageAdapter {
257 pub fn new(inner: Arc<dyn StorageAdapter>) -> (Arc<dyn StorageAdapter>, Arc<StorageReadCounts>) {
259 let counts = Arc::new(StorageReadCounts::default());
260 let adapter: Arc<dyn StorageAdapter> = Arc::new(Self {
261 inner,
262 counts: Arc::clone(&counts),
263 });
264 (adapter, counts)
265 }
266}
267
268#[async_trait]
269impl StorageAdapter for CountingStorageAdapter {
270 async fn read_text(&self, uri: &str) -> Result<String> {
271 self.counts.read_text.fetch_add(1, Ordering::Relaxed);
272 self.inner.read_text(uri).await
273 }
274
275 async fn write_text(&self, uri: &str, contents: &str) -> Result<()> {
276 self.inner.write_text(uri, contents).await
277 }
278
279 async fn write_text_if_absent(&self, uri: &str, contents: &str) -> Result<bool> {
280 self.inner.write_text_if_absent(uri, contents).await
281 }
282
283 async fn exists(&self, uri: &str) -> Result<bool> {
284 self.counts.exists.fetch_add(1, Ordering::Relaxed);
285 self.inner.exists(uri).await
286 }
287
288 async fn rename_text(&self, from_uri: &str, to_uri: &str) -> Result<()> {
289 self.inner.rename_text(from_uri, to_uri).await
290 }
291
292 async fn delete(&self, uri: &str) -> Result<()> {
293 self.inner.delete(uri).await
294 }
295
296 async fn list_dir(&self, dir_uri: &str) -> Result<Vec<String>> {
297 self.counts.list_dir.fetch_add(1, Ordering::Relaxed);
298 self.inner.list_dir(dir_uri).await
299 }
300
301 async fn read_text_versioned(&self, uri: &str) -> Result<(String, String)> {
302 self.counts.read_text_versioned.fetch_add(1, Ordering::Relaxed);
303 self.inner.read_text_versioned(uri).await
304 }
305
306 async fn write_text_if_match(
307 &self,
308 uri: &str,
309 contents: &str,
310 expected_version: &str,
311 ) -> Result<Option<String>> {
312 self.inner
313 .write_text_if_match(uri, contents, expected_version)
314 .await
315 }
316
317 async fn delete_prefix(&self, prefix_uri: &str) -> Result<()> {
318 self.inner.delete_prefix(prefix_uri).await
319 }
320}