use crate::chains::ChainApi;
use crate::partition_index::ChainPartitionIndex;
use crate::storage::Location;
use crate::storage::{Persistable, StorageApi, StorageConf};
use crate::table_api::TableApi;
use anyhow::{bail, Result};
use datafusion::prelude::{SessionConfig, SessionContext};
use log::{debug, warn};
use parking_lot::RwLock;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::Mutex;
use super::schema::{Catalog, GlobalCatalogs, DEFAULT_CATALOG};
pub struct Ctx {
state: Arc<CtxState>,
df_ctx: SessionContext,
global_catalogs: Arc<GlobalCatalogs>,
}
impl std::fmt::Debug for Ctx {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Ctx").field("state", &self.state).finish()
}
}
impl Default for Ctx {
fn default() -> Self {
Self::new()
}
}
pub type CtxStateRef = Arc<CtxState>;
impl Ctx {
pub fn new() -> Self {
let state: CtxState = Default::default();
let state = Arc::new(state);
let global_catalogs = Arc::new(GlobalCatalogs::new(state.clone()));
let conf = SessionConfig::default()
.with_information_schema(true)
.with_default_catalog_and_schema(DEFAULT_CATALOG, "eth");
let mut df_ctx = SessionContext::with_config(conf);
df_ctx.register_catalog_list(global_catalogs.clone());
Self {
df_ctx,
state,
global_catalogs,
}
}
pub fn catalog(&self) -> Arc<Catalog> {
self.global_catalogs.catalog()
}
pub fn ctx(&self) -> &SessionContext {
&self.df_ctx
}
pub fn ctx_mut(&mut self) -> &mut SessionContext {
&mut self.df_ctx
}
pub fn state(&self) -> Arc<CtxState> {
self.state.clone()
}
pub async fn add_storage_conf(&self, name: &str, conf: &StorageConf) -> Result<()> {
self.state().add_store(name, conf);
let ctx = self.ctx();
ctx.runtime_env().register_object_store(
conf.scheme(),
conf.bucket().unwrap_or(""),
self.state.chain_idx_stores.get_store_api(conf).await?,
);
Ok(())
}
pub fn register_chain(&self, chain: Arc<dyn ChainApi>) {
self.catalog().register_chain(chain);
}
pub async fn chain_store_for_loc(
&self,
loc: &Location,
) -> Result<Arc<StorageApi<ChainPartitionIndex>>> {
self.state().chain_store_for_loc(loc).await
}
}
type RegChain = (Arc<dyn ChainApi>, Vec<Arc<dyn TableApi>>);
#[derive(Debug)]
pub struct CtxState {
blocks_per_batch: u64,
start_block: Option<u64>,
end_block: Option<u64>,
last_n: Option<u64>,
store_confs: RwLock<HashMap<String, StorageConf>>,
chain_idx_stores: StorgeApiMap<ChainPartitionIndex>,
}
impl Default for CtxState {
fn default() -> Self {
Self {
blocks_per_batch: 100,
start_block: None,
end_block: None,
last_n: None,
store_confs: RwLock::new(HashMap::new()),
chain_idx_stores: StorgeApiMap::new(),
}
}
}
impl CtxState {
pub fn add_store(&self, name: &str, conf: &StorageConf) {
self.store_confs
.write()
.insert(name.to_owned(), conf.to_owned());
}
pub async fn chain_store_for_loc(
&self,
loc: &Location,
) -> Result<Arc<StorageApi<ChainPartitionIndex>>> {
let conf = {
let all_stores = self.store_confs.read();
let mut valid_stores = all_stores.iter().filter(|(_, v)| v.location_is_valid(loc));
if let Some((name, conf)) = valid_stores.next() {
if valid_stores.next().is_some() {
warn!("detected multiple valid stores for '{loc}'. using store '{name}'",);
}
debug!("using storage conf '{name}' for {loc}");
conf.to_owned()
} else {
bail!("no valid configs found")
}
};
self.chain_idx_stores.get_store_api(&conf).await
}
pub async fn get_chain_idx_store(
&self,
conf: &StorageConf,
) -> Result<Arc<StorageApi<ChainPartitionIndex>>> {
self.chain_idx_stores.get_store_api(conf).await
}
pub fn blocks_per_batch(&self) -> u64 {
self.blocks_per_batch
}
pub fn start_block(&self) -> Option<u64> {
self.start_block
}
pub fn end_block(&self) -> Option<u64> {
self.end_block
}
}
#[derive(Debug)]
struct StorgeApiMap<T>
where
T: Send + Sync + Persistable + 'static,
{
data: Mutex<HashMap<StorageConf, Arc<StorageApi<T>>>>,
}
impl<T> StorgeApiMap<T>
where
T: Send + Sync + Persistable + 'static,
{
fn new() -> Self {
Self {
data: Mutex::new(HashMap::new()),
}
}
async fn get_store_api(&self, conf: &StorageConf) -> Result<Arc<StorageApi<T>>> {
let mut map = self.data.lock().await;
if let Some(existing) = map.get(conf) {
Ok(existing.clone())
} else {
let store = Arc::new(StorageApi::try_new(conf.clone()).await.map_err(|e| {
e.context(
"StoreMap got invalid storage conf! \
(failed to create store within get_store_api)",
)
})?);
map.insert(conf.clone(), Arc::clone(&store));
Ok(store)
}
}
async fn len(&self) -> usize {
self.data.lock().await.len()
}
}
#[cfg(test)]
mod tests {
use crate::storage::ObjStorePath;
use itertools::Itertools;
use std::path::PathBuf;
use super::*;
use crate::{storage::Location, test::TestDir};
fn testconfs(datadir: PathBuf) -> Vec<StorageConf> {
vec![
StorageConf::File {
dirpath: datadir,
filename: "testy.db".to_string(),
},
StorageConf::Memory {
bucket: "bucket".to_owned(),
},
StorageConf::Memory {
bucket: "bucket".to_owned(),
},
]
}
async fn ctx_with_stores(tdir: PathBuf) -> Ctx {
let (conf1, conf3, conf4) = testconfs(tdir).into_iter().collect_tuple().unwrap();
let badconf = StorageConf::File {
dirpath: PathBuf::from("/wtf"),
filename: "testy.db".to_string(),
};
let ctx = Ctx::new();
ctx.add_storage_conf("n1", &conf1).await.unwrap();
ctx.add_storage_conf("n2", &badconf).await.unwrap_err();
ctx.add_storage_conf("n3", &conf3).await.unwrap();
ctx.add_storage_conf("n4", &conf4).await.unwrap();
ctx
}
#[tokio::test]
async fn test_ctx_with_stores() {
let dir = TestDir::new(true);
let ctx = ctx_with_stores(dir.path.clone()).await;
let state = ctx.state();
let stores = state.store_confs.read();
assert_eq!(stores.len(), 4);
assert_eq!(stores.get("n1").unwrap().scheme(), "file");
}
#[tokio::test]
async fn test_ctx_with_stores_datafusion_registry() {
let dir = TestDir::new(true);
let confs = testconfs(dir.path.clone());
let ctx = ctx_with_stores(dir.path.clone()).await;
let conf = &confs[0];
let loc = Location::new(
conf.scheme(),
conf.bucket(),
ObjStorePath::from_absolute_path(dir.path.join("testy.db")).unwrap(),
);
let _objstore = ctx
.ctx()
.runtime_env()
.object_store_registry
.get_by_url(loc);
}
#[tokio::test]
async fn test_store_map() {
let dir = TestDir::new(true);
let m = StorgeApiMap::<ChainPartitionIndex>::new();
let (goodconf, match1, match2) = testconfs(dir.path.clone())
.into_iter()
.collect_tuple()
.unwrap();
let badconf = StorageConf::File {
dirpath: PathBuf::from("/wtf"),
filename: "testy.db".to_string(),
};
let api1 = m.get_store_api(&goodconf).await.unwrap();
let api2 = m.get_store_api(&goodconf).await.unwrap();
assert!(Arc::ptr_eq(&api1, &api2));
assert_eq!(m.len().await, 1);
m.get_store_api(&badconf).await.unwrap_err();
assert_eq!(m.len().await, 1);
let mem1 = m.get_store_api(&match1).await.unwrap();
let mem2 = m.get_store_api(&match2).await.unwrap();
assert_eq!(m.len().await, 2);
assert!(Arc::ptr_eq(&mem1, &mem2));
}
#[tokio::test]
async fn test_chain_idx_store() {
let dir = TestDir::new(true);
let ctx = ctx_with_stores(dir.path.clone()).await;
let store = ctx
.chain_store_for_loc(&Location::new(
"file",
None,
ObjStorePath::from_absolute_path(dir.path.clone().join("ledata/lefile")).unwrap(),
))
.await
.unwrap();
dbg!(&store);
match store.conf() {
StorageConf::File { dirpath, .. } => {
assert_eq!(dirpath.to_owned(), dir.path.to_owned());
}
_ => panic!(),
}
let memstore = ctx
.chain_store_for_loc(&Location::new(
"memory",
Some("bucket"),
ObjStorePath::parse("/var/data/file").unwrap(),
))
.await
.unwrap();
assert_eq!(
std::mem::discriminant(memstore.conf()),
std::mem::discriminant(&StorageConf::Memory {
bucket: "testy".to_owned()
})
);
}
}