1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
// SPDX-License-Identifier: BUSL-1.1
//! Concurrent session store — keyed by socket address.
use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::RwLock;
use nodedb_types::DatabaseId;
use crate::types::TenantId;
use super::state::{PgSession, TransactionState};
/// Concurrent session store — keyed by socket address.
pub struct SessionStore {
sessions: RwLock<HashMap<SocketAddr, PgSession>>,
}
impl Default for SessionStore {
fn default() -> Self {
Self::new()
}
}
impl SessionStore {
pub fn new() -> Self {
Self {
sessions: RwLock::new(HashMap::new()),
}
}
/// Ensure a session exists for this address.
pub fn ensure_session(&self, addr: SocketAddr) {
let mut sessions = self.sessions.write().unwrap_or_else(|p| p.into_inner());
sessions.entry(addr).or_insert_with(PgSession::new);
}
/// Remove a session (connection closed).
pub fn remove(&self, addr: &SocketAddr) {
let mut sessions = self.sessions.write().unwrap_or_else(|p| p.into_inner());
sessions.remove(addr);
}
/// List all active sessions as (peer_address, transaction_state) pairs.
pub fn all_sessions(&self) -> Vec<(String, String)> {
let sessions = self.sessions.read().unwrap_or_else(|p| p.into_inner());
sessions
.iter()
.map(|(addr, session)| {
let tx = match session.tx_state {
TransactionState::Idle => "idle",
TransactionState::InBlock => "in_transaction",
TransactionState::Failed => "failed",
};
(addr.to_string(), tx.to_string())
})
.collect()
}
/// Number of active sessions.
pub fn count(&self) -> usize {
let sessions = self.sessions.read().unwrap_or_else(|p| p.into_inner());
sessions.len()
}
/// Look up cached physical tasks for a SQL string in the
/// session's plan cache. `current_version` maps each
/// recorded descriptor id to its current persisted version
/// (or `None` if dropped). The cache returns a hit only
/// when every recorded `(id, version)` pair still matches.
///
/// On a hit returns both the cached tasks and the
/// `DescriptorVersionSet` they were built against — the
/// caller passes the set into
/// `SharedState::acquire_plan_lease_scope` so cache hits
/// and fresh plans share the same lease-acquisition path.
pub fn get_cached_plan<F>(
&self,
addr: &SocketAddr,
sql: &str,
current_version: F,
) -> Option<(
Vec<nodedb_physical::physical_task::PhysicalTask>,
crate::control::planner::descriptor_set::DescriptorVersionSet,
)>
where
F: Fn(&nodedb_cluster::DescriptorId) -> Option<u64>,
{
let mut sessions = self.sessions.write().unwrap_or_else(|p| p.into_inner());
sessions
.get_mut(addr)
.and_then(|s| s.plan_cache.get(sql, current_version))
}
/// Store compiled physical tasks in the session's plan
/// cache along with the descriptor version set they were
/// built against.
pub fn put_cached_plan(
&self,
addr: &SocketAddr,
sql: &str,
tasks: Vec<nodedb_physical::physical_task::PhysicalTask>,
versions: crate::control::planner::descriptor_set::DescriptorVersionSet,
) {
let mut sessions = self.sessions.write().unwrap_or_else(|p| p.into_inner());
if let Some(session) = sessions.get_mut(addr) {
session.plan_cache.put(sql, tasks, versions);
}
}
/// Retrieve the `current_database` for a connection, or `None` if the session
/// does not exist or has not had a database bound yet.
pub fn get_current_database(&self, addr: &SocketAddr) -> Option<DatabaseId> {
let sessions = self.sessions.read().unwrap_or_else(|p| p.into_inner());
sessions.get(addr)?.current_database
}
/// Bind a database to a session. Called at pgwire startup once the database
/// name from the StartupMessage has been resolved to a `DatabaseId`.
pub fn set_current_database(&self, addr: &SocketAddr, db_id: DatabaseId) {
let mut sessions = self.sessions.write().unwrap_or_else(|p| p.into_inner());
if let Some(session) = sessions.get_mut(addr) {
session.current_database = Some(db_id);
}
}
/// Read the session's superuser tenant override, if any. Returns `None`
/// when the session has never run `SET TENANT` (the common case).
pub fn get_effective_tenant_id(&self, addr: &SocketAddr) -> Option<TenantId> {
let sessions = self.sessions.read().unwrap_or_else(|p| p.into_inner());
sessions.get(addr).and_then(|s| s.effective_tenant_id)
}
/// Install or clear the session's tenant override. Callers MUST have
/// already verified the connection is a superuser and is not inside an
/// active transaction — this method performs no policy checks.
///
/// Invalidates the session's plan cache and SQL-level prepared statements
/// so plans built against the prior tenant's catalog cannot be reused.
pub fn set_effective_tenant_id(&self, addr: &SocketAddr, tenant: Option<TenantId>) {
let mut sessions = self.sessions.write().unwrap_or_else(|p| p.into_inner());
if let Some(session) = sessions.get_mut(addr) {
session.effective_tenant_id = tenant;
session.plan_cache.clear();
session.prepared_stmts.clear();
}
}
/// Reset per-session state for a `USE DATABASE` switch:
/// 1. Aborts any open transaction (discards tx_buffer, resets state to Idle).
/// 2. Clears all SQL-level prepared statements.
/// 3. Clears the wire-level plan cache.
/// 4. Rebinds `current_database` to the new id.
pub fn reset_for_database_switch(&self, addr: &SocketAddr, new_db: DatabaseId) {
let mut sessions = self.sessions.write().unwrap_or_else(|p| p.into_inner());
if let Some(session) = sessions.get_mut(addr) {
// Abort open transaction.
session.tx_state = TransactionState::Idle;
session.tx_buffer.clear();
session.tx_snapshot_lsn = None;
session.tx_read_set.clear();
session.savepoints.clear();
session.pending_offset_commits.clear();
session.pending_notifies.clear();
// Invalidate prepared statements and plan cache.
session.prepared_stmts.clear();
session.plan_cache.clear();
// A USE DATABASE switch crosses out of any tenant override — the
// new database may not exist (or have the same id) in the override
// tenant, so the safe contract is to drop the override on switch.
session.effective_tenant_id = None;
// Rebind database.
session.current_database = Some(new_db);
}
}
/// Access the session map with a read lock for use by other session submodules.
pub(super) fn read_session<R>(
&self,
addr: &SocketAddr,
f: impl FnOnce(&PgSession) -> R,
) -> Option<R> {
let sessions = self.sessions.read().unwrap_or_else(|p| p.into_inner());
sessions.get(addr).map(f)
}
/// Access the session map with a write lock for use by other session submodules.
pub(super) fn write_session<R>(
&self,
addr: &SocketAddr,
f: impl FnOnce(&mut PgSession) -> R,
) -> Option<R> {
let mut sessions = self.sessions.write().unwrap_or_else(|p| p.into_inner());
sessions.get_mut(addr).map(f)
}
}