xitca_postgres/
pipeline.rs

1//! explicit pipelining module
2//!
3//! this crate supports "implicit" pipeline like [`tokio-postgres`] does and explicit pipeline is an optional addition.
4//!
5//! making pipelined queries with explicit types and apis has following benefits:
6//! - reduced lock contention. explicit pipeline only lock client once when executed regardless query count
7//! - flexible transform between sync and un-sync pipeline. See [Pipeline::new] for detail
8//! - ordered response handling with a single stream type. reduce memory footprint and possibility of deadlock
9//!
10//! [`tokio-postgres`]: https://docs.rs/tokio-postgres/latest/tokio_postgres/#pipelining
11use core::ops::{Deref, DerefMut, Range};
12
13use postgres_protocol::message::{backend, frontend};
14use xitca_io::bytes::BytesMut;
15
16use crate::ExecuteBlocking;
17
18use super::{
19    column::Column,
20    driver::codec::{self, encode::Encode, Response},
21    error::{Completed, Error},
22    execute::{Execute, ExecuteMut},
23    iter::AsyncLendingIterator,
24    query::Query,
25    row::Row,
26    BoxedFuture,
27};
28
29/// A pipelined sql query type. It lazily batch queries into local buffer and try to send it
30/// with the least amount of syscall when pipeline starts.
31///
32/// # Examples
33/// ```rust
34/// use xitca_postgres::{iter::AsyncLendingIterator, pipeline::Pipeline, Client, Execute, ExecuteMut, Statement};
35///
36/// async fn pipeline(client: &Client) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
37///     // prepare a statement that will be called repeatedly.
38///     // it can be a collection of statements that will be called in iteration.
39///     let statement = Statement::named("SELECT * FROM public.users", &[]).execute(client).await?;
40///
41///     // create a new pipeline.
42///     let mut pipe = Pipeline::new();
43///
44///     // bind value param to statement and query with the pipeline.
45///     // pipeline can encode multiple queries locally before send it to database.
46///     statement.bind([] as [i32; 0]).query_mut(&mut pipe)?;
47///     statement.bind([] as [i32; 0]).query_mut(&mut pipe)?;
48///     statement.bind([] as [i32; 0]).query_mut(&mut pipe)?;
49///
50///     // query the pipeline and on success a streaming response will be returned.
51///     let mut res = pipe.query(client)?;
52///
53///     // iterate through the query responses. the response order is the same as the order of
54///     // queries encoded into pipeline with Pipeline::query_xxx api.
55///     while let Some(mut item) = res.try_next().await? {
56///         // every query can contain streaming rows.
57///         while let Some(row) = item.try_next().await? {
58///             let _: u32 = row.get("id");
59///         }
60///     }
61///
62///     Ok(())
63/// }
64/// ```
65pub struct Pipeline<'a, B = Owned, const SYNC_MODE: bool = true> {
66    pub(crate) columns: Vec<&'a [Column]>,
67    // type for either owned or borrowed bytes buffer.
68    pub(crate) buf: B,
69}
70
71/// borrowed bytes buffer supplied by api caller
72pub struct Borrowed<'a>(&'a mut BytesMut);
73
74/// owned bytes buffer created by [Pipeline]
75pub struct Owned(BytesMut);
76
77impl Deref for Borrowed<'_> {
78    type Target = BytesMut;
79
80    #[inline(always)]
81    fn deref(&self) -> &Self::Target {
82        self.0
83    }
84}
85
86impl DerefMut for Borrowed<'_> {
87    #[inline(always)]
88    fn deref_mut(&mut self) -> &mut Self::Target {
89        self.0
90    }
91}
92
93impl Drop for Borrowed<'_> {
94    fn drop(&mut self) {
95        self.0.clear();
96    }
97}
98
99impl Deref for Owned {
100    type Target = BytesMut;
101
102    #[inline(always)]
103    fn deref(&self) -> &Self::Target {
104        &self.0
105    }
106}
107
108impl DerefMut for Owned {
109    #[inline(always)]
110    fn deref_mut(&mut self) -> &mut Self::Target {
111        &mut self.0
112    }
113}
114
115impl<'a> From<Borrowed<'a>> for Owned {
116    fn from(buf: Borrowed<'a>) -> Self {
117        Self(BytesMut::from(buf.as_ref()))
118    }
119}
120
121fn _assert_pipe_send() {
122    crate::_assert_send2::<Pipeline<'_, Owned>>();
123    crate::_assert_send2::<Pipeline<'_, Borrowed<'_>>>();
124}
125
126impl Pipeline<'_, Owned, true> {
127    /// start a new pipeline.
128    ///
129    /// pipeline is sync by default. which means every query inside is considered separate binding
130    /// and the pipeline is transparent to database server. the pipeline only happen on socket
131    /// transport where minimal amount of syscall is needed.
132    ///
133    /// for more relaxed [Pipeline Mode][libpq_link] see [Pipeline::unsync] api.
134    ///
135    /// [libpq_link]: https://www.postgresql.org/docs/current/libpq-pipeline-mode.html
136    #[inline]
137    pub fn new() -> Self {
138        Self::with_capacity(0)
139    }
140
141    /// start a new pipeline with given capacity.
142    /// capacity represent how many queries will be contained by a single pipeline. a determined cap
143    /// can possibly reduce memory reallocation when constructing the pipeline.
144    #[inline]
145    pub fn with_capacity(cap: usize) -> Self {
146        Self::_with_capacity(cap)
147    }
148}
149
150impl Pipeline<'_, Owned, false> {
151    /// start a new un-sync pipeline.
152    ///
153    /// in un-sync mode pipeline treat all queries inside as one single binding and database server
154    /// can see them as no sync point in between which can result in potential performance gain.
155    ///
156    /// it behaves the same on transportation level as [Pipeline::new] where minimal amount
157    /// of socket syscall is needed.
158    #[inline]
159    pub fn unsync() -> Self {
160        Self::unsync_with_capacity(0)
161    }
162
163    /// start a new un-sync pipeline with given capacity.
164    /// capacity represent how many queries will be contained by a single pipeline. a determined cap
165    /// can possibly reduce memory reallocation when constructing the pipeline.
166    #[inline]
167    pub fn unsync_with_capacity(cap: usize) -> Self {
168        Self::_with_capacity(cap)
169    }
170}
171
172impl<'a> Pipeline<'_, Borrowed<'a>, true> {
173    /// start a new borrowed pipeline. pipeline will use borrowed bytes buffer to store encode messages
174    /// before sending it to database.
175    ///
176    /// pipeline is sync by default. which means every query inside is considered separate binding
177    /// and the pipeline is transparent to database server. the pipeline only happen on socket
178    /// transport where minimal amount of syscall is needed.
179    ///
180    /// for more relaxed [Pipeline Mode][libpq_link] see [Pipeline::unsync_from_buf] api.
181    ///
182    /// [libpq_link]: https://www.postgresql.org/docs/current/libpq-pipeline-mode.html
183    #[inline]
184    pub fn from_buf(buf: &'a mut BytesMut) -> Self {
185        Self::with_capacity_from_buf(0, buf)
186    }
187
188    /// start a new borrowed pipeline with given capacity.
189    /// capacity represent how many queries will be contained by a single pipeline. a determined cap
190    /// can possibly reduce memory reallocation when constructing the pipeline.
191    #[inline]
192    pub fn with_capacity_from_buf(cap: usize, buf: &'a mut BytesMut) -> Self {
193        Self::_with_capacity_from_buf(cap, buf)
194    }
195}
196
197impl<'a> Pipeline<'_, Borrowed<'a>, false> {
198    /// start a new borrowed un-sync pipeline.
199    ///
200    /// in un-sync mode pipeline treat all queries inside as one single binding and database server
201    /// can see them as no sync point in between which can result in potential performance gain.
202    ///
203    /// it behaves the same on transportation level as [Pipeline::from_buf] where minimal amount
204    /// of socket syscall is needed.
205    #[inline]
206    pub fn unsync_from_buf(buf: &'a mut BytesMut) -> Self {
207        Self::unsync_with_capacity_from_buf(0, buf)
208    }
209
210    /// start a new borrowed un-sync pipeline with given capacity.
211    /// capacity represent how many queries will be contained by a single pipeline. a determined cap
212    /// can possibly reduce memory reallocation when constructing the pipeline.
213    #[inline]
214    pub fn unsync_with_capacity_from_buf(cap: usize, buf: &'a mut BytesMut) -> Self {
215        Self::_with_capacity_from_buf(cap, buf)
216    }
217}
218
219impl<const SYNC_MODE: bool> Pipeline<'_, Owned, SYNC_MODE> {
220    fn _with_capacity(cap: usize) -> Self {
221        Self {
222            columns: Vec::with_capacity(cap),
223            buf: Owned(BytesMut::new()),
224        }
225    }
226}
227
228impl<'b, const SYNC_MODE: bool> Pipeline<'_, Borrowed<'b>, SYNC_MODE> {
229    fn _with_capacity_from_buf(cap: usize, buf: &'b mut BytesMut) -> Self {
230        debug_assert!(buf.is_empty(), "pipeline is borrowing potential polluted buffer");
231        Self {
232            columns: Vec::with_capacity(cap),
233            buf: Borrowed(buf),
234        }
235    }
236}
237
238impl<'a, B, E, const SYNC_MODE: bool> ExecuteMut<'_, Pipeline<'a, B, SYNC_MODE>> for E
239where
240    B: DerefMut<Target = BytesMut>,
241    E: Encode<Output = &'a [Column]>,
242{
243    type ExecuteMutOutput = Self::QueryMutOutput;
244    type QueryMutOutput = Result<(), Error>;
245
246    #[inline]
247    fn execute_mut(self, pipe: &mut Pipeline<'a, B, SYNC_MODE>) -> Self::ExecuteMutOutput {
248        self.query_mut(pipe)
249    }
250
251    fn query_mut(self, pipe: &mut Pipeline<'a, B, SYNC_MODE>) -> Self::QueryMutOutput {
252        let len = pipe.buf.len();
253
254        self.encode::<SYNC_MODE>(&mut pipe.buf)
255            .map(|columns| pipe.columns.push(columns))
256            // revert back to last pipelined query when encoding error occurred.
257            .inspect_err(|_| pipe.buf.truncate(len))
258    }
259}
260
261pub struct PipelineQuery<'a, 'b> {
262    pub(crate) count: usize,
263    pub(crate) columns: Vec<&'a [Column]>,
264    pub(crate) buf: &'b [u8],
265}
266
267impl<'p, C, B, const SYNC_MODE: bool> Execute<'_, C> for Pipeline<'p, B, SYNC_MODE>
268where
269    C: Query,
270    B: DerefMut<Target = BytesMut>,
271{
272    type ExecuteOutput = BoxedFuture<'p, Result<u64, Error>>;
273    type QueryOutput = Result<PipelineStream<'p>, Error>;
274
275    fn execute(self, cli: &C) -> Self::ExecuteOutput {
276        let res = self.query(cli);
277        Box::pin(async move {
278            let mut res = res?;
279            let mut row_affected = 0;
280            while let Some(item) = res.try_next().await? {
281                row_affected += item.row_affected().await?;
282            }
283            Ok(row_affected)
284        })
285    }
286
287    #[inline]
288    fn query(self, cli: &C) -> Self::QueryOutput {
289        let Pipeline { columns, mut buf } = self;
290        assert!(!buf.is_empty());
291
292        let count = if SYNC_MODE {
293            columns.len()
294        } else {
295            frontend::sync(&mut buf);
296            1
297        };
298
299        cli._query(PipelineQuery {
300            count,
301            columns,
302            buf: buf.as_ref(),
303        })
304    }
305}
306
307impl<'p, C, B, const SYNC_MODE: bool> ExecuteBlocking<'_, C> for Pipeline<'p, B, SYNC_MODE>
308where
309    C: Query,
310    B: DerefMut<Target = BytesMut>,
311{
312    type ExecuteOutput = Result<u64, Error>;
313    type QueryOutput = Result<PipelineStream<'p>, Error>;
314
315    fn execute_blocking(self, cli: &C) -> Result<u64, Error> {
316        let mut res = self.query_blocking(cli)?;
317        let mut row_affected = 0;
318
319        loop {
320            match res.res.blocking_recv()? {
321                backend::Message::BindComplete => {
322                    let item = PipelineItem {
323                        finished: false,
324                        res: &mut res.res,
325                        ranges: &mut res.ranges,
326                        columns: res.columns.pop_front(),
327                    };
328                    row_affected += item.row_affected_blocking()?;
329                }
330                backend::Message::ReadyForQuery(_) => {
331                    if res.columns.is_empty() {
332                        return Ok(row_affected);
333                    }
334                }
335                _ => return Err(Error::unexpected()),
336            }
337        }
338    }
339
340    fn query_blocking(self, cli: &C) -> Self::QueryOutput {
341        self.query(cli)
342    }
343}
344
345/// streaming response of pipeline.
346/// impl [AsyncLendingIterator] trait and can be collected asynchronously.
347pub struct PipelineStream<'a> {
348    res: Response,
349    columns: Columns<'a>,
350    ranges: Ranges,
351}
352
353impl<'a> PipelineStream<'a> {
354    pub(crate) const fn new(res: Response, columns: Vec<&'a [Column]>) -> Self {
355        Self {
356            res,
357            columns: Columns { columns, next: 0 },
358            ranges: Vec::new(),
359        }
360    }
361}
362
363type Ranges = Vec<Range<usize>>;
364
365struct Columns<'a> {
366    columns: Vec<&'a [Column]>,
367    next: usize,
368}
369
370impl<'a> Columns<'a> {
371    // only move the cursor by one.
372    // column references will be removed when pipeline stream is dropped.
373    fn pop_front(&mut self) -> &'a [Column] {
374        let off = self.next;
375        self.next += 1;
376        self.columns[off]
377    }
378
379    fn len(&self) -> usize {
380        self.columns.len() - self.next
381    }
382
383    fn is_empty(&self) -> bool {
384        self.len() == 0
385    }
386}
387
388impl<'a> AsyncLendingIterator for PipelineStream<'a> {
389    type Ok<'i>
390        = PipelineItem<'i>
391    where
392        Self: 'i;
393    type Err = Error;
394
395    async fn try_next(&mut self) -> Result<Option<Self::Ok<'_>>, Self::Err> {
396        loop {
397            match self.res.recv().await? {
398                backend::Message::BindComplete => {
399                    return Ok(Some(PipelineItem {
400                        finished: false,
401                        res: &mut self.res,
402                        ranges: &mut self.ranges,
403                        columns: self.columns.pop_front(),
404                    }));
405                }
406                backend::Message::DataRow(_) | backend::Message::CommandComplete(_) => {
407                    // last PipelineItem dropped before finish. do some catch up until next
408                    // item arrives.
409                }
410                backend::Message::ReadyForQuery(_) => {
411                    if self.columns.is_empty() {
412                        return Ok(None);
413                    }
414                }
415                _ => return Err(Error::unexpected()),
416            }
417        }
418    }
419
420    #[inline]
421    fn size_hint(&self) -> (usize, Option<usize>) {
422        let len = self.columns.len();
423        (len, Some(len))
424    }
425}
426
427/// streaming item of certain query inside pipeline's [PipelineStream].
428/// impl [AsyncLendingIterator] and can be used to collect [Row] from item.
429pub struct PipelineItem<'a> {
430    finished: bool,
431    res: &'a mut Response,
432    ranges: &'a mut Ranges,
433    columns: &'a [Column],
434}
435
436impl PipelineItem<'_> {
437    /// collect rows affected by this pipelined query. [Row] information will be ignored.
438    pub async fn row_affected(mut self) -> Result<u64, Error> {
439        if self.finished {
440            return Err(Completed.into());
441        }
442
443        loop {
444            match self.res.recv().await? {
445                backend::Message::DataRow(_) => {}
446                backend::Message::CommandComplete(body) => {
447                    self.finished = true;
448                    return codec::body_to_affected_rows(&body);
449                }
450                _ => return Err(Error::unexpected()),
451            }
452        }
453    }
454
455    /// blocking version of [`PipelineItem::row_affected`]
456    pub fn row_affected_blocking(mut self) -> Result<u64, Error> {
457        if self.finished {
458            return Err(Completed.into());
459        }
460
461        loop {
462            match self.res.blocking_recv()? {
463                backend::Message::DataRow(_) => {}
464                backend::Message::CommandComplete(body) => {
465                    self.finished = true;
466                    return codec::body_to_affected_rows(&body);
467                }
468                _ => return Err(Error::unexpected()),
469            }
470        }
471    }
472}
473
474impl AsyncLendingIterator for PipelineItem<'_> {
475    type Ok<'i>
476        = Row<'i>
477    where
478        Self: 'i;
479    type Err = Error;
480
481    async fn try_next(&mut self) -> Result<Option<Self::Ok<'_>>, Self::Err> {
482        if !self.finished {
483            match self.res.recv().await? {
484                backend::Message::DataRow(body) => {
485                    return Row::try_new(self.columns, body, self.ranges).map(Some);
486                }
487                backend::Message::CommandComplete(_) => self.finished = true,
488                _ => return Err(Error::unexpected()),
489            }
490        }
491
492        Ok(None)
493    }
494}