pgdog_plugin/
context.rs

1//! Context passed to and from the plugins.
2
3use std::ops::Deref;
4
5use crate::{
6    bindings::PdRouterContext, parameters::Parameters, PdParameters, PdRoute, PdStatement,
7};
8
9/// PostgreSQL statement, parsed by [`pg_query`].
10///
11/// Implements [`Deref`] on [`PdStatement`], which is passed
12/// in using the FFI interface.
13/// Use the [`PdStatement::protobuf`] method to obtain a reference
14/// to the Abstract Syntax Tree.
15///
16/// ### Example
17///
18/// ```no_run
19/// # use pgdog_plugin::Context;
20/// # let context = unsafe { Context::doc_test() };
21/// # let statement = context.statement();
22/// use pgdog_plugin::pg_query::NodeEnum;
23///
24/// let ast = statement.protobuf();
25/// let root = ast
26///     .stmts
27///     .first()
28///     .unwrap()
29///     .stmt
30///     .as_ref()
31///     .unwrap()
32///     .node
33///     .as_ref();
34///
35/// if let Some(NodeEnum::SelectStmt(stmt)) = root {
36///     println!("SELECT statement: {:#?}", stmt);
37/// }
38///
39/// ```
40pub struct Statement {
41    ffi: PdStatement,
42}
43
44impl Deref for Statement {
45    type Target = PdStatement;
46
47    fn deref(&self) -> &Self::Target {
48        &self.ffi
49    }
50}
51
52/// Context information provided by PgDog to the plugin at statement execution. It contains the actual statement and several metadata about
53/// the state of the database cluster:
54///
55/// - Number of shards
56/// - Does it have replicas
57/// - Does it have a primary
58///
59/// ### Example
60///
61/// ```
62/// use pgdog_plugin::{Context, Route, macros, Shard, ReadWrite};
63///
64/// #[macros::route]
65/// fn route(context: Context) -> Route {
66///     let shards = context.shards();
67///     let read_only = context.read_only();
68///     let ast = context.statement().protobuf();
69///
70///     println!("shards: {} (read_only: {})", shards, read_only);
71///     println!("ast: {:#?}", ast);
72///
73///     let read_write = if read_only {
74///         ReadWrite::Read
75///     } else {
76///         ReadWrite::Write
77///     };
78///
79///     Route::new(Shard::Direct(0), read_write)
80/// }
81/// ```
82///
83pub struct Context {
84    ffi: PdRouterContext,
85}
86
87impl From<PdRouterContext> for Context {
88    fn from(value: PdRouterContext) -> Self {
89        Self { ffi: value }
90    }
91}
92
93impl Context {
94    /// Returns a reference to the Abstract Syntax Tree (AST) created by [`pg_query`].
95    ///
96    /// # Example
97    ///
98    /// ```no_run
99    /// # use pgdog_plugin::Context;
100    /// # let context = unsafe { Context::doc_test() };
101    /// # let statement = context.statement();
102    /// let ast = context.statement().protobuf();
103    /// let nodes = ast.nodes();
104    /// ```
105    pub fn statement(&self) -> Statement {
106        Statement {
107            ffi: self.ffi.query,
108        }
109    }
110
111    /// Returns true if the database cluster doesn't have a primary database and can only serve
112    /// read queries.
113    ///
114    /// # Example
115    ///
116    /// ```
117    /// # use pgdog_plugin::Context;
118    /// # let context = unsafe { Context::doc_test() };
119    ///
120    /// let read_only = context.read_only();
121    ///
122    /// if read_only {
123    ///     println!("Database cluster doesn't have a primary, only replicas.");
124    /// }
125    /// ```
126    pub fn read_only(&self) -> bool {
127        self.ffi.has_primary == 0
128    }
129
130    /// Returns true if the database cluster has replica databases.
131    ///
132    /// # Example
133    ///
134    /// ```
135    /// # use pgdog_plugin::Context;
136    /// # let context = unsafe { Context::doc_test() };
137    /// let has_replicas = context.has_replicas();
138    ///
139    /// if has_replicas {
140    ///     println!("Database cluster can load balance read queries.")
141    /// }
142    /// ```
143    pub fn has_replicas(&self) -> bool {
144        self.ffi.has_replicas == 1
145    }
146
147    /// Returns true if the database cluster has a primary database and can serve write queries.
148    ///
149    /// # Example
150    ///
151    /// ```
152    /// # use pgdog_plugin::Context;
153    /// # let context = unsafe { Context::doc_test() };
154    /// let has_primary = context.has_primary();
155    ///
156    /// if has_primary {
157    ///     println!("Database cluster can serve write queries.");
158    /// }
159    /// ```
160    pub fn has_primary(&self) -> bool {
161        !self.read_only()
162    }
163
164    /// Returns the number of shards in the database cluster.
165    ///
166    /// # Example
167    ///
168    /// ```
169    /// # use pgdog_plugin::Context;
170    /// # let context = unsafe { Context::doc_test() };
171    /// let shards = context.shards();
172    ///
173    /// if shards > 1 {
174    ///     println!("Plugin should consider which shard to route the query to.");
175    /// }
176    /// ```
177    pub fn shards(&self) -> usize {
178        self.ffi.shards as usize
179    }
180
181    /// Returns true if the database cluster has more than one shard.
182    ///
183    /// # Example
184    ///
185    /// ```
186    /// # use pgdog_plugin::Context;
187    /// # let context = unsafe { Context::doc_test() };
188    /// let sharded = context.sharded();
189    /// let shards = context.shards();
190    ///
191    /// if sharded {
192    ///     assert!(shards > 1);
193    /// } else {
194    ///     assert_eq!(shards, 1);
195    /// }
196    /// ```
197    pub fn sharded(&self) -> bool {
198        self.shards() > 1
199    }
200
201    /// Returns true if PgDog strongly believes the statement should be sent to a primary. This indicates
202    /// that the statement is **not** a `SELECT` (e.g. `UPDATE`, `DELETE`, etc.), or a `SELECT` that is very likely to write data to the database, e.g.:
203    ///
204    /// ```sql
205    /// WITH users AS (
206    ///     INSERT INTO users VALUES (1, 'test@acme.com') RETURNING *
207    /// )
208    /// SELECT * FROM users;
209    /// ```
210    ///
211    /// # Example
212    ///
213    /// ```
214    /// # use pgdog_plugin::Context;
215    /// # let context = unsafe { Context::doc_test() };
216    /// if context.write_override() {
217    ///     println!("We should really send this query to the primary.");
218    /// }
219    /// ```
220    pub fn write_override(&self) -> bool {
221        self.ffi.write_override == 1
222    }
223
224    /// Returns a list of parameters bound on the statement. If using the simple protocol,
225    /// this is going to be empty and parameters will be in the actual query text.
226    ///
227    /// # Example
228    ///
229    /// ```
230    /// use pgdog_plugin::prelude::*;
231    /// # let context = unsafe { Context::doc_test() };
232    /// let params = context.parameters();
233    /// if let Some(param) = params.get(0) {
234    ///     let value = param.decode(params.parameter_format(0));
235    ///     println!("{:?}", value);
236    /// }
237    /// ```
238    pub fn parameters(&self) -> Parameters {
239        self.ffi.params.into()
240    }
241}
242
243impl Context {
244    /// Used for doc tests only. **Do not use**.
245    ///
246    /// # Safety
247    ///
248    /// Not safe, don't use. We use it for doc tests only.
249    ///
250    pub unsafe fn doc_test() -> Context {
251        use std::{os::raw::c_void, ptr::null};
252
253        Context {
254            ffi: PdRouterContext {
255                shards: 1,
256                has_replicas: 1,
257                has_primary: 1,
258                in_transaction: 0,
259                write_override: 0,
260                query: PdStatement {
261                    version: 1,
262                    len: 0,
263                    data: null::<c_void>() as *mut c_void,
264                },
265                params: PdParameters::default(),
266            },
267        }
268    }
269}
270
271/// What shard, if any, the statement should be sent to.
272///
273/// ### Example
274///
275/// ```
276/// use pgdog_plugin::Shard;
277///
278/// // Send query to shard 2.
279/// let direct = Shard::Direct(2);
280///
281/// // Send query to all shards.
282/// let cross_shard = Shard::All;
283///
284/// // Let PgDog handle sharding.
285/// let unknown = Shard::Unknown;
286/// ```
287#[derive(Debug, Copy, Clone, PartialEq, Eq)]
288pub enum Shard {
289    /// Direct-to-shard statement, sent to the specified shard only.
290    Direct(usize),
291    /// Send statement to all shards and let PgDog collect and transform the results.
292    All,
293    /// Not clear which shard it should go to, so let PgDog decide.
294    /// Use this if you don't want to handle sharding inside the plugin.
295    Unknown,
296    /// The statement is blocked from executing.
297    Blocked,
298}
299
300impl From<Shard> for i64 {
301    fn from(value: Shard) -> Self {
302        match value {
303            Shard::Direct(value) => value as i64,
304            Shard::All => -1,
305            Shard::Unknown => -2,
306            Shard::Blocked => -3,
307        }
308    }
309}
310
311impl TryFrom<i64> for Shard {
312    type Error = ();
313    fn try_from(value: i64) -> Result<Self, Self::Error> {
314        Ok(if value == -1 {
315            Shard::All
316        } else if value == -2 {
317            Shard::Unknown
318        } else if value == -3 {
319            Shard::Blocked
320        } else if value >= 0 {
321            Shard::Direct(value as usize)
322        } else {
323            return Err(());
324        })
325    }
326}
327
328impl TryFrom<u8> for ReadWrite {
329    type Error = ();
330
331    fn try_from(value: u8) -> Result<Self, Self::Error> {
332        Ok(if value == 0 {
333            ReadWrite::Write
334        } else if value == 1 {
335            ReadWrite::Read
336        } else if value == 2 {
337            ReadWrite::Unknown
338        } else {
339            return Err(());
340        })
341    }
342}
343
344/// Indicates if the statement is a read or a write. Read statements are sent to a replica, if one is configured.
345/// Write statements are sent to the primary.
346///
347/// ### Example
348///
349/// ```
350/// use pgdog_plugin::ReadWrite;
351///
352/// // The statement should go to a replica.
353/// let read = ReadWrite::Read;
354///
355/// // The statement should go the primary.
356/// let write = ReadWrite::Write;
357///
358/// // Skip and let PgDog decide.
359/// let unknown = ReadWrite::Unknown;
360/// ```
361#[derive(Debug, Copy, Clone, PartialEq, Eq)]
362pub enum ReadWrite {
363    /// Send the statement to a replica, if any are configured.
364    Read,
365    /// Send the statement to the primary.
366    Write,
367    /// Plugin doesn't know if the statement is a read or write. This let's PgDog decide.
368    /// Use this if you don't want to make this decision in the plugin.
369    Unknown,
370}
371
372impl From<ReadWrite> for u8 {
373    fn from(value: ReadWrite) -> Self {
374        match value {
375            ReadWrite::Write => 0,
376            ReadWrite::Read => 1,
377            ReadWrite::Unknown => 2,
378        }
379    }
380}
381
382impl Default for PdRoute {
383    fn default() -> Self {
384        Route::unknown().ffi
385    }
386}
387
388/// Statement route.
389///
390/// PgDog uses this to decide where a query should be sent to. Read statements are sent to a replica,
391/// while write ones are sent the primary. If the cluster has more than one shard, the statement can be
392/// sent to a specific database, or all of them.
393///
394/// ### Example
395///
396/// ```
397/// use pgdog_plugin::{Shard, ReadWrite, Route};
398///
399/// // This sends the query to the primary database of shard 0.
400/// let route = Route::new(Shard::Direct(0), ReadWrite::Write);
401///
402/// // This sends the query to all shards, routing them to a replica
403/// // of each shard, if any are configured.
404/// let route = Route::new(Shard::All, ReadWrite::Read);
405///
406/// // No routing information is available. PgDog will ignore it
407/// // and make its own decision.
408/// let route = Route::unknown();
409pub struct Route {
410    ffi: PdRoute,
411}
412
413impl Default for Route {
414    fn default() -> Self {
415        Self::unknown()
416    }
417}
418
419impl Deref for Route {
420    type Target = PdRoute;
421
422    fn deref(&self) -> &Self::Target {
423        &self.ffi
424    }
425}
426
427impl From<PdRoute> for Route {
428    fn from(value: PdRoute) -> Self {
429        Self { ffi: value }
430    }
431}
432
433impl From<Route> for PdRoute {
434    fn from(value: Route) -> Self {
435        value.ffi
436    }
437}
438
439impl Route {
440    /// Create new route.
441    ///
442    /// # Arguments
443    ///
444    /// * `shard`: Which shard the statement should be sent to.
445    /// * `read_write`: Does the statement read or write data. Read statements are sent to a replica. Write statements are sent to the primary.
446    ///
447    pub fn new(shard: Shard, read_write: ReadWrite) -> Route {
448        Self {
449            ffi: PdRoute {
450                shard: shard.into(),
451                read_write: read_write.into(),
452            },
453        }
454    }
455
456    /// Create new route with no sharding or read/write information.
457    /// Use this if you don't want your plugin to do query routing.
458    /// Plugins that do something else with queries, e.g., logging, metrics,
459    /// can return this route.
460    pub fn unknown() -> Route {
461        Self {
462            ffi: PdRoute {
463                shard: -2,
464                read_write: 2,
465            },
466        }
467    }
468
469    /// Block the query from being sent to a database. PgDog will abort the query
470    /// and return an error to the client, telling them which plugin blocked it.
471    pub fn block() -> Route {
472        Self {
473            ffi: PdRoute {
474                shard: -3,
475                read_write: 2,
476            },
477        }
478    }
479}