use std::sync::Arc;
use anyhow::Context;
use arc_swap::ArcSwap;
use dashmap::DashMap;
use tokio::sync::Mutex as AsyncMutex;
use uni_common::core::fork::{ForkId, ForkInfo, SchemaDelta};
use uni_common::core::schema::{EdgeTypeMeta, LabelMeta};
use super::registry::{ForkHolderGuard, ForkRegistryHandle};
#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)]
#[non_exhaustive]
pub enum ForkLocalIndexKind {
ScalarBtree,
Sorted,
VidUid,
Vector,
FullText,
}
pub struct ForkScope {
fork_id: ForkId,
fork_info: Arc<ForkInfo>,
overlay: Arc<ArcSwap<SchemaDelta>>,
overlay_lock: Arc<AsyncMutex<()>>,
registry: Arc<ForkRegistryHandle>,
dynamic_branches: Arc<DashMap<String, String>>,
fragment_counts: Arc<DashMap<String, u64>>,
fork_local_indexes: Arc<DashMap<(String, String), ForkLocalIndexKind>>,
_holder: ForkHolderGuard,
}
impl std::fmt::Debug for ForkScope {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ForkScope")
.field("fork_id", &self.fork_id)
.field("fork_name", &self.fork_info.name)
.finish_non_exhaustive()
}
}
impl ForkScope {
#[must_use]
pub fn new(
fork_info: Arc<ForkInfo>,
overlay: SchemaDelta,
registry: Arc<ForkRegistryHandle>,
) -> Self {
let holder = registry.register_holder(fork_info.id);
Self {
fork_id: fork_info.id,
fork_info,
overlay: Arc::new(ArcSwap::from_pointee(overlay)),
overlay_lock: Arc::new(AsyncMutex::new(())),
registry,
dynamic_branches: Arc::new(DashMap::new()),
fragment_counts: Arc::new(DashMap::new()),
fork_local_indexes: Arc::new(DashMap::new()),
_holder: holder,
}
}
pub fn record_fork_fragment(&self, table_name: &str, rows_added: u64) {
if rows_added == 0 {
return;
}
self.fragment_counts
.entry(table_name.to_string())
.and_modify(|c| *c += rows_added)
.or_insert(rows_added);
}
#[must_use]
pub fn fragment_count(&self, table_name: &str) -> u64 {
self.fragment_counts
.get(table_name)
.map(|r| *r.value())
.unwrap_or(0)
}
#[must_use]
pub fn all_fragment_counts(&self) -> Vec<(String, u64)> {
self.fragment_counts
.iter()
.map(|r| (r.key().clone(), *r.value()))
.collect()
}
pub fn register_fork_local_index(&self, label: &str, column: &str, kind: ForkLocalIndexKind) {
self.fork_local_indexes
.insert((label.to_string(), column.to_string()), kind);
}
#[must_use]
pub fn fork_local_index(&self, label: &str, column: &str) -> Option<ForkLocalIndexKind> {
self.fork_local_indexes
.get(&(label.to_string(), column.to_string()))
.map(|r| *r.value())
}
#[must_use]
pub fn all_fork_local_indexes(&self) -> Vec<((String, String), ForkLocalIndexKind)> {
self.fork_local_indexes
.iter()
.map(|r| (r.key().clone(), *r.value()))
.collect()
}
#[must_use]
pub fn fork_id(&self) -> ForkId {
self.fork_id
}
#[must_use]
pub fn fork_info(&self) -> Arc<ForkInfo> {
self.fork_info.clone()
}
#[must_use]
pub fn parent_fork_id(&self) -> Option<ForkId> {
self.fork_info.parent_fork_id
}
#[must_use]
pub fn overlay(&self) -> Arc<SchemaDelta> {
self.overlay.load_full()
}
#[must_use]
pub fn branch_for(&self, dataset_name: &str) -> Option<String> {
if let Some(b) = self.fork_info.datasets.get(dataset_name) {
return Some(b.clone());
}
self.dynamic_branches
.get(dataset_name)
.map(|r| r.value().clone())
}
pub fn register_dynamic_branch(&self, dataset: String, branch: String) {
self.dynamic_branches.insert(dataset, branch);
}
pub async fn add_label_to_overlay(&self, name: String, meta: LabelMeta) -> anyhow::Result<()> {
let _guard = self.overlay_lock.lock().await;
let mut next = (**self.overlay.load()).clone();
next.added_labels.push((name, meta));
self.registry
.update_schema_overlay(&self.fork_id, &next)
.await
.with_context(|| format!("persist schema overlay for fork {}", self.fork_id))?;
self.overlay.store(Arc::new(next));
Ok(())
}
pub async fn add_edge_type_to_overlay(
&self,
name: String,
meta: EdgeTypeMeta,
) -> anyhow::Result<()> {
let _guard = self.overlay_lock.lock().await;
let mut next = (**self.overlay.load()).clone();
next.added_edge_types.push((name, meta));
self.registry
.update_schema_overlay(&self.fork_id, &next)
.await
.with_context(|| format!("persist schema overlay for fork {}", self.fork_id))?;
self.overlay.store(Arc::new(next));
Ok(())
}
#[must_use]
pub fn registry(&self) -> Arc<ForkRegistryHandle> {
self.registry.clone()
}
}