use std::path::PathBuf;
use std::sync::atomic::{AtomicU32, AtomicU64};
use std::sync::Arc;
use moka::future::Cache;
use tokio::sync::OwnedMutexGuard;
use super::*;
use crate::catalog::TableRefId;
use crate::storage::Table;
#[derive(Clone)]
pub struct SecondaryTable {
pub table_ref_id: TableRefId,
pub columns: Arc<[ColumnCatalog]>,
pub column_map: HashMap<ColumnId, usize>,
pub storage_options: Arc<StorageOptions>,
pub version: Arc<VersionManager>,
pub txn_mgr: Arc<TransactionManager>,
pub block_cache: Cache<BlockCacheKey, Block>,
next_id: Arc<(AtomicU32, AtomicU64)>,
}
impl SecondaryTable {
pub fn new(
storage_options: Arc<StorageOptions>,
table_ref_id: TableRefId,
columns: &[ColumnCatalog],
next_id: Arc<(AtomicU32, AtomicU64)>,
version: Arc<VersionManager>,
block_cache: Cache<BlockCacheKey, Block>,
txn_mgr: Arc<TransactionManager>,
) -> Self {
Self {
columns: columns.into(),
column_map: columns
.iter()
.enumerate()
.map(|(idx, col)| (col.id(), idx))
.collect(),
table_ref_id,
storage_options,
next_id,
version,
block_cache,
txn_mgr,
}
}
pub fn generate_rowset_id(&self) -> u32 {
self.next_id
.0
.fetch_add(1, std::sync::atomic::Ordering::SeqCst)
}
pub fn generate_dv_id(&self) -> u64 {
self.next_id
.1
.fetch_add(1, std::sync::atomic::Ordering::SeqCst)
}
pub fn get_rowset_path(&self, rowset_id: u32) -> PathBuf {
self.storage_options
.path
.join(format!("{}_{}", self.table_id(), rowset_id))
}
pub fn get_dv_path(&self, rowset_id: u32, dv_id: u64) -> PathBuf {
self.storage_options
.path
.join(format!("dv/{}_{}_{}.dv", self.table_id(), rowset_id, dv_id))
}
pub fn table_id(&self) -> u32 {
self.table_ref_id.table_id
}
pub async fn lock_for_deletion(&self) -> OwnedMutexGuard<()> {
self.txn_mgr.lock_for_deletion(self.table_id()).await
}
}
impl Table for SecondaryTable {
type Transaction = SecondaryTransaction;
fn columns(&self) -> StorageResult<Arc<[ColumnCatalog]>> {
Ok(self.columns.clone())
}
fn table_id(&self) -> TableRefId {
self.table_ref_id
}
async fn write(&self) -> StorageResult<SecondaryTransaction> {
SecondaryTransaction::start(self, false, false).await
}
async fn read(&self) -> StorageResult<SecondaryTransaction> {
SecondaryTransaction::start(self, true, false).await
}
async fn update(&self) -> StorageResult<SecondaryTransaction> {
SecondaryTransaction::start(self, false, true).await
}
}