use std::collections::HashMap;
use serde::de::DeserializeOwned;
use serde::Serialize;
use crate::catalog::Catalog;
use crate::error::DbError;
use crate::storage::Storage;
use crate::types::Value;
use super::clock::{Clock, SystemClock};
use super::context::Variables;
use super::durability::{DurabilitySink, NoopDurability};
use super::journal::{Journal, NoopJournal};
use super::locks::{LockTable, SessionId, TxWorkspace};
use super::tooling::SessionOptions;
use super::transaction::TransactionManager;
use super::wal::Wal;
pub trait SessionManager {
fn create_session(&self) -> SessionId;
fn reset_session(&self, session_id: SessionId) -> Result<(), DbError>;
fn close_session(&self, session_id: SessionId) -> Result<(), DbError>;
fn set_session_journal(
&self,
session_id: SessionId,
journal: Box<dyn Journal>,
) -> Result<(), DbError>;
}
#[derive(Debug, Clone)]
pub struct IdentityState {
pub(crate) last_identity: Option<i64>,
pub(crate) scope_stack: Vec<Option<i64>>,
}
impl Default for IdentityState {
fn default() -> Self {
Self::new()
}
}
impl IdentityState {
pub fn new() -> Self {
Self {
last_identity: None,
scope_stack: vec![None],
}
}
pub fn reset(&mut self) {
self.last_identity = None;
self.scope_stack = vec![None];
}
}
#[derive(Debug, Clone)]
pub struct TableState {
pub(crate) temp_map: HashMap<String, String>,
pub(crate) var_map: HashMap<String, String>,
pub(crate) var_counter: u64,
}
impl Default for TableState {
fn default() -> Self {
Self::new()
}
}
impl TableState {
pub fn new() -> Self {
Self {
temp_map: HashMap::new(),
var_map: HashMap::new(),
var_counter: 0,
}
}
pub fn reset(&mut self) {
self.temp_map.clear();
self.var_map.clear();
self.var_counter = 0;
}
}
#[derive(Debug, Clone)]
pub struct BulkLoadState {
pub active: bool,
pub table: Option<crate::ast::ObjectName>,
pub columns: Option<Vec<crate::ast::statements::ddl::ColumnSpec>>,
pub received_metadata: bool,
}
impl Default for BulkLoadState {
fn default() -> Self {
Self::new()
}
}
impl BulkLoadState {
pub fn new() -> Self {
Self {
active: false,
table: None,
columns: None,
received_metadata: false,
}
}
pub fn reset(&mut self) {
self.active = false;
self.table = None;
self.columns = None;
self.received_metadata = false;
}
}
#[derive(Debug, Clone)]
pub struct CursorState {
pub(crate) map: HashMap<String, super::model::Cursor>,
pub(crate) fetch_status: i32,
pub(crate) next_cursor_handle: i32,
pub(crate) handle_map: HashMap<i32, String>,
}
impl Default for CursorState {
fn default() -> Self {
Self::new()
}
}
impl CursorState {
pub fn new() -> Self {
Self {
map: HashMap::new(),
fetch_status: -1,
next_cursor_handle: 1,
handle_map: HashMap::new(),
}
}
pub fn reset(&mut self) {
self.map.clear();
self.fetch_status = -1;
self.next_cursor_handle = 1;
self.handle_map.clear();
}
}
#[derive(Debug, Clone)]
pub struct SessionSnapshot {
pub variables: Variables,
pub identities: IdentityState,
pub tables: TableState,
pub cursors: CursorState,
pub options: SessionOptions,
pub random_state: u64,
pub context_info: Vec<u8>,
pub session_context: HashMap<String, (Value, bool)>,
}
#[derive(Debug, Clone)]
pub struct DiagnosticsState {
pub(crate) print_output: Vec<String>,
}
impl Default for DiagnosticsState {
fn default() -> Self {
Self::new()
}
}
impl DiagnosticsState {
pub fn new() -> Self {
Self {
print_output: Vec::new(),
}
}
pub fn reset(&mut self) {
self.print_output.clear();
}
}
pub struct SessionRuntime<C, S> {
pub(crate) clock: Box<dyn Clock>,
pub(crate) tx_manager: TransactionManager<C, S, SessionSnapshot>,
pub(crate) journal: Box<dyn Journal>,
pub(crate) variables: Variables,
pub(crate) identities: IdentityState,
pub(crate) tables: TableState,
pub(crate) cursors: CursorState,
pub(crate) diagnostics: DiagnosticsState,
pub(crate) workspace: Option<TxWorkspace<C, S>>,
pub(crate) options: SessionOptions,
pub(crate) random_state: u64,
pub(crate) current_database: String,
pub(crate) original_database: String,
pub(crate) user: Option<String>,
pub(crate) app_name: Option<String>,
pub(crate) host_name: Option<String>,
pub(crate) context_info: Vec<u8>,
pub(crate) session_context: HashMap<String, (Value, bool)>,
pub(crate) bulk_load: BulkLoadState,
}
impl<C, S> SessionRuntime<C, S>
where
C: Catalog + Serialize + DeserializeOwned + Clone + 'static,
S: Storage + Serialize + DeserializeOwned + Clone + 'static + Default,
{
pub(crate) fn new() -> Self {
Self {
clock: Box::new(SystemClock),
tx_manager: TransactionManager::default(),
journal: Box::new(NoopJournal),
variables: Variables::new(),
identities: IdentityState::new(),
tables: TableState::new(),
cursors: CursorState::new(),
diagnostics: DiagnosticsState::new(),
workspace: None,
options: SessionOptions::default(),
random_state: 1,
current_database: "master".to_string(),
original_database: "master".to_string(),
user: None,
app_name: None,
host_name: None,
context_info: vec![0u8; 128],
session_context: HashMap::new(),
bulk_load: BulkLoadState::new(),
}
}
pub(crate) fn reset(&mut self) {
self.tx_manager = TransactionManager::default();
self.variables.clear();
self.identities.reset();
self.tables.reset();
self.cursors.reset();
self.diagnostics.reset();
self.workspace = None;
self.options = SessionOptions::default();
self.random_state = 1;
self.current_database = "master".to_string();
self.original_database = "master".to_string();
self.user = None;
self.app_name = None;
self.host_name = None;
self.context_info = vec![0u8; 128];
self.session_context.clear();
self.bulk_load.reset();
}
}
pub struct SharedStorage<C, S> {
pub(crate) catalog: C,
pub(crate) storage: S,
pub(crate) commit_ts: u64,
pub(crate) table_versions: HashMap<String, u64>,
}
impl<C, S> SharedStorage<C, S> {
pub fn get_mut_refs(&mut self) -> (&mut C, &mut S) {
(&mut self.catalog, &mut self.storage)
}
pub fn get_refs(&self) -> (&C, &S) {
(&self.catalog, &self.storage)
}
}
pub struct SharedState<C, S> {
pub(crate) storage: parking_lot::RwLock<SharedStorage<C, S>>,
pub(crate) table_locks: parking_lot::Mutex<LockTable>,
pub(crate) durability: parking_lot::Mutex<Box<dyn DurabilitySink<C>>>,
pub(crate) wal: parking_lot::Mutex<Option<Wal>>,
pub(crate) next_tx_id: std::sync::atomic::AtomicU64,
pub(crate) sessions: dashmap::DashMap<SessionId, parking_lot::Mutex<SessionRuntime<C, S>>>,
pub(crate) deadlock_priorities: dashmap::DashMap<SessionId, i32>,
pub(crate) next_session_id: std::sync::atomic::AtomicU64,
pub(crate) dirty_buffer: std::sync::Arc<parking_lot::Mutex<super::dirty_buffer::DirtyBuffer>>,
}
impl<C, S> SharedState<C, S>
where
C: Catalog + Serialize + DeserializeOwned + Clone + 'static,
S: Storage + Serialize + DeserializeOwned + Clone + 'static + Default,
{
pub fn with_initial(catalog: C, storage: S) -> Self {
Self {
storage: parking_lot::RwLock::new(SharedStorage {
catalog,
storage,
commit_ts: 0,
table_versions: HashMap::new(),
}),
table_locks: parking_lot::Mutex::new(LockTable::new()),
durability: parking_lot::Mutex::new(Box::new(NoopDurability::default())),
wal: parking_lot::Mutex::new(None),
next_tx_id: std::sync::atomic::AtomicU64::new(1),
sessions: dashmap::DashMap::new(),
deadlock_priorities: dashmap::DashMap::new(),
next_session_id: std::sync::atomic::AtomicU64::new(1),
dirty_buffer: std::sync::Arc::new(parking_lot::Mutex::new(
super::dirty_buffer::DirtyBuffer::new(),
)),
}
}
}