use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use async_trait::async_trait;
use lance::Dataset;
use lance::dataset::builder::DatasetBuilder;
use lance::io::{ObjectStoreParams, WrappingObjectStore};
use crate::error::{OmniError, Result};
use crate::storage::StorageAdapter;
#[derive(Clone, Default)]
pub struct QueryIoProbes {
pub manifest_wrapper: Option<Arc<dyn WrappingObjectStore>>,
pub commit_graph_wrapper: Option<Arc<dyn WrappingObjectStore>>,
pub table_wrapper: Option<Arc<dyn WrappingObjectStore>>,
pub probe_count: Arc<AtomicU64>,
}
tokio::task_local! {
static QUERY_IO_PROBES: QueryIoProbes;
}
pub async fn with_query_io_probes<F>(probes: QueryIoProbes, fut: F) -> F::Output
where
F: std::future::Future,
{
QUERY_IO_PROBES.scope(probes, fut).await
}
fn current<R>(f: impl FnOnce(&QueryIoProbes) -> R) -> Option<R> {
QUERY_IO_PROBES.try_with(f).ok()
}
pub(crate) fn manifest_wrapper() -> Option<Arc<dyn WrappingObjectStore>> {
current(|p| p.manifest_wrapper.clone()).flatten()
}
pub(crate) fn commit_graph_wrapper() -> Option<Arc<dyn WrappingObjectStore>> {
current(|p| p.commit_graph_wrapper.clone()).flatten()
}
pub(crate) fn table_wrapper() -> Option<Arc<dyn WrappingObjectStore>> {
current(|p| p.table_wrapper.clone()).flatten()
}
pub(crate) fn record_probe() {
let _ = current(|p| p.probe_count.fetch_add(1, Ordering::Relaxed));
}
#[derive(Clone, Default)]
pub struct MergeWriteProbes {
pub stage_append_calls: Arc<AtomicU64>,
pub stage_append_rows: Arc<AtomicU64>,
pub stage_merge_insert_calls: Arc<AtomicU64>,
pub stage_merge_insert_rows: Arc<AtomicU64>,
pub create_vector_index_calls: Arc<AtomicU64>,
pub scan_staged_combined_calls: Arc<AtomicU64>,
}
impl MergeWriteProbes {
pub fn stage_append_calls(&self) -> u64 {
self.stage_append_calls.load(Ordering::Relaxed)
}
pub fn stage_append_rows(&self) -> u64 {
self.stage_append_rows.load(Ordering::Relaxed)
}
pub fn stage_merge_insert_calls(&self) -> u64 {
self.stage_merge_insert_calls.load(Ordering::Relaxed)
}
pub fn stage_merge_insert_rows(&self) -> u64 {
self.stage_merge_insert_rows.load(Ordering::Relaxed)
}
pub fn create_vector_index_calls(&self) -> u64 {
self.create_vector_index_calls.load(Ordering::Relaxed)
}
pub fn scan_staged_combined_calls(&self) -> u64 {
self.scan_staged_combined_calls.load(Ordering::Relaxed)
}
}
tokio::task_local! {
static MERGE_WRITE_PROBES: MergeWriteProbes;
}
pub async fn with_merge_write_probes<F>(probes: MergeWriteProbes, fut: F) -> F::Output
where
F: std::future::Future,
{
MERGE_WRITE_PROBES.scope(probes, fut).await
}
pub(crate) fn record_stage_append(rows: u64) {
let _ = MERGE_WRITE_PROBES.try_with(|p| {
p.stage_append_calls.fetch_add(1, Ordering::Relaxed);
p.stage_append_rows.fetch_add(rows, Ordering::Relaxed);
});
}
pub(crate) fn record_stage_merge_insert(rows: u64) {
let _ = MERGE_WRITE_PROBES.try_with(|p| {
p.stage_merge_insert_calls.fetch_add(1, Ordering::Relaxed);
p.stage_merge_insert_rows.fetch_add(rows, Ordering::Relaxed);
});
}
pub(crate) fn record_create_vector_index() {
let _ = MERGE_WRITE_PROBES.try_with(|p| {
p.create_vector_index_calls.fetch_add(1, Ordering::Relaxed);
});
}
pub(crate) fn record_scan_staged_combined() {
let _ = MERGE_WRITE_PROBES.try_with(|p| {
p.scan_staged_combined_calls.fetch_add(1, Ordering::Relaxed);
});
}
pub(crate) async fn open_dataset_tracked(
uri: &str,
wrapper: Option<Arc<dyn WrappingObjectStore>>,
) -> Result<Dataset> {
let result = match wrapper {
None => Dataset::open(uri).await,
Some(wrapper) => {
DatasetBuilder::from_uri(uri)
.with_store_params(ObjectStoreParams {
object_store_wrapper: Some(wrapper),
..Default::default()
})
.load()
.await
}
};
result.map_err(|e| OmniError::Lance(e.to_string()))
}
pub(crate) async fn open_table_dataset(
location: &str,
version: u64,
session: Option<&Arc<lance::session::Session>>,
) -> Result<Dataset> {
let mut builder = DatasetBuilder::from_uri(location).with_version(version);
if let Some(session) = session {
builder = builder.with_session(session.clone());
}
if let Some(wrapper) = table_wrapper() {
builder = builder.with_store_params(ObjectStoreParams {
object_store_wrapper: Some(wrapper),
..Default::default()
});
}
builder
.load()
.await
.map_err(|e| OmniError::Lance(e.to_string()))
}
#[derive(Debug, Default)]
pub struct StorageReadCounts {
pub read_text: AtomicU64,
pub exists: AtomicU64,
pub read_text_versioned: AtomicU64,
pub list_dir: AtomicU64,
}
impl StorageReadCounts {
pub fn read_text(&self) -> u64 {
self.read_text.load(Ordering::Relaxed)
}
pub fn exists(&self) -> u64 {
self.exists.load(Ordering::Relaxed)
}
pub fn read_text_versioned(&self) -> u64 {
self.read_text_versioned.load(Ordering::Relaxed)
}
pub fn list_dir(&self) -> u64 {
self.list_dir.load(Ordering::Relaxed)
}
}
#[derive(Debug)]
pub struct CountingStorageAdapter {
inner: Arc<dyn StorageAdapter>,
counts: Arc<StorageReadCounts>,
}
impl CountingStorageAdapter {
pub fn new(inner: Arc<dyn StorageAdapter>) -> (Arc<dyn StorageAdapter>, Arc<StorageReadCounts>) {
let counts = Arc::new(StorageReadCounts::default());
let adapter: Arc<dyn StorageAdapter> = Arc::new(Self {
inner,
counts: Arc::clone(&counts),
});
(adapter, counts)
}
}
#[async_trait]
impl StorageAdapter for CountingStorageAdapter {
async fn read_text(&self, uri: &str) -> Result<String> {
self.counts.read_text.fetch_add(1, Ordering::Relaxed);
self.inner.read_text(uri).await
}
async fn write_text(&self, uri: &str, contents: &str) -> Result<()> {
self.inner.write_text(uri, contents).await
}
async fn write_text_if_absent(&self, uri: &str, contents: &str) -> Result<bool> {
self.inner.write_text_if_absent(uri, contents).await
}
async fn exists(&self, uri: &str) -> Result<bool> {
self.counts.exists.fetch_add(1, Ordering::Relaxed);
self.inner.exists(uri).await
}
async fn rename_text(&self, from_uri: &str, to_uri: &str) -> Result<()> {
self.inner.rename_text(from_uri, to_uri).await
}
async fn delete(&self, uri: &str) -> Result<()> {
self.inner.delete(uri).await
}
async fn list_dir(&self, dir_uri: &str) -> Result<Vec<String>> {
self.counts.list_dir.fetch_add(1, Ordering::Relaxed);
self.inner.list_dir(dir_uri).await
}
async fn read_text_versioned(&self, uri: &str) -> Result<(String, String)> {
self.counts.read_text_versioned.fetch_add(1, Ordering::Relaxed);
self.inner.read_text_versioned(uri).await
}
async fn write_text_if_match(
&self,
uri: &str,
contents: &str,
expected_version: &str,
) -> Result<Option<String>> {
self.inner
.write_text_if_match(uri, contents, expected_version)
.await
}
async fn delete_prefix(&self, prefix_uri: &str) -> Result<()> {
self.inner.delete_prefix(prefix_uri).await
}
}