use std::sync::{
Arc,
atomic::{AtomicU64, Ordering},
};
use qbice_stable_type_id::Identifiable;
use qbice_storage::{
kv_database::{DiscriminantEncoding, WideColumn, WideColumnValue},
single_map::SingleMap as _,
storage_engine::StorageEngine,
write_manager::WriteManager,
};
use tokio::sync::{OwnedRwLockReadGuard, OwnedRwLockWriteGuard, RwLock};
use crate::{
Config, Engine,
engine::computation_graph::database::{
SingleMap, Timestamp, WriteTransaction,
},
};
#[derive(
Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Identifiable,
)]
pub struct TimestampColumn;
impl WideColumn for TimestampColumn {
type Key = ();
type Discriminant = ();
fn discriminant_encoding() -> DiscriminantEncoding {
DiscriminantEncoding::Prefixed
}
}
impl WideColumnValue<TimestampColumn> for Timestamp {
fn discriminant() {}
}
pub struct Sync<C: Config> {
write_manager: <C::StorageEngine as StorageEngine>::WriteManager,
timestamp: AtomicU64,
phase_mutex: Arc<RwLock<()>>,
timestamp_map: SingleMap<C, TimestampColumn, Timestamp>,
}
#[derive(Debug, Clone)]
pub struct ActiveComputationGuard(
#[allow(unused)] Arc<OwnedRwLockReadGuard<()>>,
);
#[derive(Debug)]
pub struct ActiveInputSessionGuard(
#[allow(unused)] Arc<OwnedRwLockWriteGuard<()>>,
);
impl<C: Config> Sync<C> {
pub async fn new(db: &C::StorageEngine) -> Self {
let write_manager = db.new_write_manager();
let timestamp_map = db.new_single_map::<TimestampColumn, Timestamp>();
let timestamp = timestamp_map.get(&()).await;
let timestamp = if let Some(timestamp) = timestamp {
timestamp
} else {
let mut tx = write_manager.new_write_batch();
timestamp_map.insert((), Timestamp(0), &mut tx).await;
write_manager.submit_write_batch(tx);
Timestamp(0)
};
Self {
timestamp: AtomicU64::new(timestamp.0),
phase_mutex: Arc::new(RwLock::new(())),
timestamp_map,
write_manager,
}
}
}
impl<C: Config> Engine<C> {
pub(in crate::engine::computation_graph) fn new_write_transaction(
&'_ self,
) -> WriteTransaction<C> {
self.computation_graph.database.sync.write_manager.new_write_batch()
}
pub(in crate::engine::computation_graph) async fn acquire_active_computation_guard(
&self,
) -> (ActiveComputationGuard, Timestamp) {
let guard = self
.computation_graph
.database
.sync
.phase_mutex
.clone()
.read_owned()
.await;
let timestamp = Timestamp(
self.computation_graph
.database
.sync
.timestamp
.load(Ordering::SeqCst),
);
(ActiveComputationGuard(Arc::new(guard)), timestamp)
}
pub(in crate::engine::computation_graph) async fn acquire_active_input_session_guard(
&self,
) -> (WriteTransaction<C>, ActiveInputSessionGuard) {
let mut write_buffer = self
.computation_graph
.database
.sync
.write_manager
.new_write_batch();
let prev = self
.computation_graph
.database
.sync
.timestamp
.fetch_add(1, Ordering::SeqCst);
let new_timestamp = prev + 1;
self.computation_graph
.database
.sync
.timestamp_map
.insert((), Timestamp(new_timestamp), &mut write_buffer)
.await;
let guard = self
.computation_graph
.database
.sync
.phase_mutex
.clone()
.write_owned()
.await;
(write_buffer, ActiveInputSessionGuard(Arc::new(guard)))
}
pub(in crate::engine::computation_graph) fn submit_write_buffer(
&self,
write_buffer: WriteTransaction<C>,
) {
self.computation_graph
.database
.sync
.write_manager
.submit_write_batch(write_buffer);
}
pub(in crate::engine::computation_graph) unsafe fn get_current_timestamp_unchecked(
&self,
) -> Timestamp {
Timestamp(
self.computation_graph
.database
.sync
.timestamp
.load(Ordering::SeqCst),
)
}
}