1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
use std::marker::PhantomData;
use std::ptr::NonNull;
use crate::datum::DatumWithOid;
use crate::pg_sys::{self, PgOid};
use crate::spi::{PreparedStatement, Query, Spi, SpiCursor, SpiError, SpiResult, SpiTupleTable};
use super::query::PreparableQuery;
// TODO: should `'conn` be invariant?
pub struct SpiClient<'conn> {
__marker: PhantomData<&'conn ()>,
}
impl<'conn> SpiClient<'conn> {
/// Connect to Postgres' SPI system.
pub(super) fn connect() -> SpiResult<Self> {
// SPI_connect() is documented as being able to return SPI_ERROR_CONNECT, so we have to
// assume it could. The truth seems to be that it never actually does.
Spi::check_status(unsafe { pg_sys::SPI_connect() })?;
Ok(SpiClient { __marker: PhantomData })
}
/// Prepares a statement that is valid for the lifetime of the client.
pub fn prepare<Q: PreparableQuery<'conn>>(
&self,
query: Q,
args: &[PgOid],
) -> SpiResult<PreparedStatement<'conn>> {
query.prepare(self, args)
}
/// Prepares a mutating statement that is valid for the lifetime of the client.
pub fn prepare_mut<Q: PreparableQuery<'conn>>(
&self,
query: Q,
args: &[PgOid],
) -> SpiResult<PreparedStatement<'conn>> {
query.prepare_mut(self, args)
}
/// Perform a `SELECT` statement in **read-only** SPI mode.
///
/// Read-only execution is cheaper (no per-statement snapshot work) but it
/// inherits PostgreSQL's volatility constraints: the query cannot use a
/// locking clause (`FOR UPDATE`, `FOR SHARE`, `SKIP LOCKED`, `NOWAIT`),
/// cannot call `VOLATILE` functions that depend on a writable snapshot,
/// and cannot run DML (`INSERT`, `UPDATE`, `DELETE`). Trying to do any
/// of those through this method will surface as an error from Postgres,
/// most commonly:
///
/// > ERROR: SELECT FOR UPDATE is not allowed in a non-volatile function
///
/// Use [`SpiClient::update`] for those cases — it switches the rest of
/// the transaction to writable SPI execution.
///
/// Whether the underlying `SPI_execute*` call is invoked with
/// `read_only = true` is decided by [`Spi::is_xact_still_immutable`].
/// Once any prior statement in this transaction has been executed via
/// [`SpiClient::update`] (or [`Spi::mark_mutable`] has been called
/// directly), subsequent `select` calls in the same transaction will
/// *also* run with `read_only = false` — pgrx does not flip the SPI
/// mode back inside one transaction. This matches the Postgres
/// guidance that mixing read-only and read-write SPI commands in a
/// single function is unwise.
pub fn select<'mcx, Q: Query<'conn>>(
&self,
query: Q,
limit: Option<libc::c_long>,
args: &[DatumWithOid<'mcx>],
) -> SpiResult<SpiTupleTable<'conn>> {
query.execute(self, limit, args)
}
/// Perform any query that requires **writable** SPI execution.
///
/// This is the right method for the obvious writers — `INSERT`,
/// `UPDATE`, `DELETE`, and utility statements (`CREATE`, `ALTER`,
/// `DROP`, …) — but it is *also* the right method for any `SELECT`
/// that uses a locking clause (`FOR UPDATE`, `FOR SHARE`, `SKIP
/// LOCKED`, `NOWAIT`) or that calls a function whose volatility
/// requires a writable snapshot. The first call in a transaction
/// invokes [`Spi::mark_mutable`], which forces a real `TransactionId`
/// to be assigned; from that point forward `SPI_execute*` is invoked
/// with `read_only = false` and the rest of the transaction's SPI
/// runs writable.
///
/// If you only need a plain `SELECT` and don't intend to write or
/// lock, prefer [`SpiClient::select`] — it leaves the transaction in
/// read-only mode and is cheaper.
pub fn update<'mcx, Q: Query<'conn>>(
&mut self,
query: Q,
limit: Option<libc::c_long>,
args: &[DatumWithOid<'mcx>],
) -> SpiResult<SpiTupleTable<'conn>> {
Spi::mark_mutable();
query.execute(self, limit, args)
}
pub(super) fn prepare_tuple_table(
status_code: i32,
) -> std::result::Result<SpiTupleTable<'conn>, SpiError> {
Ok(SpiTupleTable {
status_code: Spi::check_status(status_code)?,
// SAFETY: no concurrent access
table: unsafe { pg_sys::SPI_tuptable.as_mut() },
// SAFETY: no concurrent access
size: unsafe {
if pg_sys::SPI_tuptable.is_null() {
pg_sys::SPI_processed as usize
} else {
(*pg_sys::SPI_tuptable).numvals as usize
}
},
current: -1,
})
}
/// Set up a cursor that will execute the specified query.
///
/// Rows may be then fetched using [`SpiCursor::fetch`].
///
/// See [`SpiCursor`] docs for usage details.
///
/// See [`try_open_cursor`][Self::try_open_cursor] which will return an [`SpiError`] rather than panicking.
///
/// # Panics
///
/// Panics if a cursor wasn't opened.
pub fn open_cursor<'mcx, Q: Query<'conn>>(
&self,
query: Q,
args: &[DatumWithOid<'mcx>],
) -> SpiCursor<'conn> {
self.try_open_cursor(query, args).unwrap()
}
/// Set up a cursor that will execute the specified query.
///
/// Rows may be then fetched using [`SpiCursor::fetch`].
///
/// See [`SpiCursor`] docs for usage details.
pub fn try_open_cursor<'mcx, Q: Query<'conn>>(
&self,
query: Q,
args: &[DatumWithOid<'mcx>],
) -> SpiResult<SpiCursor<'conn>> {
query.try_open_cursor(self, args)
}
/// Set up a cursor that will execute the specified update (mutating) query.
///
/// Rows may be then fetched using [`SpiCursor::fetch`].
///
/// See [`SpiCursor`] docs for usage details.
///
/// See [`try_open_cursor_mut`][Self::try_open_cursor_mut] which will return an [`SpiError`] rather than panicking.
///
/// # Panics
///
/// Panics if a cursor wasn't opened.
pub fn open_cursor_mut<'mcx, Q: Query<'conn>>(
&mut self,
query: Q,
args: &[DatumWithOid<'mcx>],
) -> SpiCursor<'conn> {
Spi::mark_mutable();
self.try_open_cursor_mut(query, args).unwrap()
}
/// Set up a cursor that will execute the specified update (mutating) query.
///
/// Rows may be then fetched using [`SpiCursor::fetch`].
///
/// See [`SpiCursor`] docs for usage details.
pub fn try_open_cursor_mut<'mcx, Q: Query<'conn>>(
&mut self,
query: Q,
args: &[DatumWithOid<'mcx>],
) -> SpiResult<SpiCursor<'conn>> {
Spi::mark_mutable();
query.try_open_cursor(self, args)
}
/// Find a cursor in transaction by name.
///
/// A cursor for a query can be opened using [`SpiClient::open_cursor`].
/// Cursor are automatically closed on drop unless [`SpiCursor::detach_into_name`] is used.
/// Returned name can be used with this method to retrieve the open cursor.
///
/// See [`SpiCursor`] docs for usage details.
pub fn find_cursor(&self, name: &str) -> SpiResult<SpiCursor<'conn>> {
use pgrx_pg_sys::AsPgCStr;
let ptr = NonNull::new(unsafe { pg_sys::SPI_cursor_find(name.as_pg_cstr()) })
.ok_or(SpiError::CursorNotFound(name.to_string()))?;
Ok(SpiCursor { ptr, __marker: PhantomData })
}
}
impl Drop for SpiClient<'_> {
/// When `SpiClient` is dropped, we make sure to disconnect from SPI.
fn drop(&mut self) {
// Best efforts to disconnect from SPI
// SPI_finish() would only complain if we hadn't previously called SPI_connect() and
// SpiConnection should prevent that from happening (assuming users don't go unsafe{})
Spi::check_status(unsafe { pg_sys::SPI_finish() }).ok();
}
}