1use core::ops::{Deref, DerefMut, Range};
12
13use postgres_protocol::message::{backend, frontend};
14use xitca_io::bytes::BytesMut;
15
16use crate::ExecuteBlocking;
17
18use super::{
19 column::Column,
20 driver::codec::{self, encode::Encode, Response},
21 error::{Completed, Error},
22 execute::{Execute, ExecuteMut},
23 iter::AsyncLendingIterator,
24 query::Query,
25 row::Row,
26 BoxedFuture,
27};
28
29pub struct Pipeline<'a, B = Owned, const SYNC_MODE: bool = true> {
66 pub(crate) columns: Vec<&'a [Column]>,
67 pub(crate) buf: B,
69}
70
71pub struct Borrowed<'a>(&'a mut BytesMut);
73
74pub struct Owned(BytesMut);
76
77impl Deref for Borrowed<'_> {
78 type Target = BytesMut;
79
80 #[inline(always)]
81 fn deref(&self) -> &Self::Target {
82 self.0
83 }
84}
85
86impl DerefMut for Borrowed<'_> {
87 #[inline(always)]
88 fn deref_mut(&mut self) -> &mut Self::Target {
89 self.0
90 }
91}
92
93impl Drop for Borrowed<'_> {
94 fn drop(&mut self) {
95 self.0.clear();
96 }
97}
98
99impl Deref for Owned {
100 type Target = BytesMut;
101
102 #[inline(always)]
103 fn deref(&self) -> &Self::Target {
104 &self.0
105 }
106}
107
108impl DerefMut for Owned {
109 #[inline(always)]
110 fn deref_mut(&mut self) -> &mut Self::Target {
111 &mut self.0
112 }
113}
114
115impl<'a> From<Borrowed<'a>> for Owned {
116 fn from(buf: Borrowed<'a>) -> Self {
117 Self(BytesMut::from(buf.as_ref()))
118 }
119}
120
121fn _assert_pipe_send() {
122 crate::_assert_send2::<Pipeline<'_, Owned>>();
123 crate::_assert_send2::<Pipeline<'_, Borrowed<'_>>>();
124}
125
126impl Pipeline<'_, Owned, true> {
127 #[inline]
137 pub fn new() -> Self {
138 Self::with_capacity(0)
139 }
140
141 #[inline]
145 pub fn with_capacity(cap: usize) -> Self {
146 Self::_with_capacity(cap)
147 }
148}
149
150impl Pipeline<'_, Owned, false> {
151 #[inline]
159 pub fn unsync() -> Self {
160 Self::unsync_with_capacity(0)
161 }
162
163 #[inline]
167 pub fn unsync_with_capacity(cap: usize) -> Self {
168 Self::_with_capacity(cap)
169 }
170}
171
172impl<'a> Pipeline<'_, Borrowed<'a>, true> {
173 #[inline]
184 pub fn from_buf(buf: &'a mut BytesMut) -> Self {
185 Self::with_capacity_from_buf(0, buf)
186 }
187
188 #[inline]
192 pub fn with_capacity_from_buf(cap: usize, buf: &'a mut BytesMut) -> Self {
193 Self::_with_capacity_from_buf(cap, buf)
194 }
195}
196
197impl<'a> Pipeline<'_, Borrowed<'a>, false> {
198 #[inline]
206 pub fn unsync_from_buf(buf: &'a mut BytesMut) -> Self {
207 Self::unsync_with_capacity_from_buf(0, buf)
208 }
209
210 #[inline]
214 pub fn unsync_with_capacity_from_buf(cap: usize, buf: &'a mut BytesMut) -> Self {
215 Self::_with_capacity_from_buf(cap, buf)
216 }
217}
218
219impl<const SYNC_MODE: bool> Pipeline<'_, Owned, SYNC_MODE> {
220 fn _with_capacity(cap: usize) -> Self {
221 Self {
222 columns: Vec::with_capacity(cap),
223 buf: Owned(BytesMut::new()),
224 }
225 }
226}
227
228impl<'b, const SYNC_MODE: bool> Pipeline<'_, Borrowed<'b>, SYNC_MODE> {
229 fn _with_capacity_from_buf(cap: usize, buf: &'b mut BytesMut) -> Self {
230 debug_assert!(buf.is_empty(), "pipeline is borrowing potential polluted buffer");
231 Self {
232 columns: Vec::with_capacity(cap),
233 buf: Borrowed(buf),
234 }
235 }
236}
237
238impl<'a, B, E, const SYNC_MODE: bool> ExecuteMut<'_, Pipeline<'a, B, SYNC_MODE>> for E
239where
240 B: DerefMut<Target = BytesMut>,
241 E: Encode<Output = &'a [Column]>,
242{
243 type ExecuteMutOutput = Self::QueryMutOutput;
244 type QueryMutOutput = Result<(), Error>;
245
246 #[inline]
247 fn execute_mut(self, pipe: &mut Pipeline<'a, B, SYNC_MODE>) -> Self::ExecuteMutOutput {
248 self.query_mut(pipe)
249 }
250
251 fn query_mut(self, pipe: &mut Pipeline<'a, B, SYNC_MODE>) -> Self::QueryMutOutput {
252 let len = pipe.buf.len();
253
254 self.encode::<SYNC_MODE>(&mut pipe.buf)
255 .map(|columns| pipe.columns.push(columns))
256 .inspect_err(|_| pipe.buf.truncate(len))
258 }
259}
260
261pub struct PipelineQuery<'a, 'b> {
262 pub(crate) count: usize,
263 pub(crate) columns: Vec<&'a [Column]>,
264 pub(crate) buf: &'b [u8],
265}
266
267impl<'p, C, B, const SYNC_MODE: bool> Execute<'_, C> for Pipeline<'p, B, SYNC_MODE>
268where
269 C: Query,
270 B: DerefMut<Target = BytesMut>,
271{
272 type ExecuteOutput = BoxedFuture<'p, Result<u64, Error>>;
273 type QueryOutput = Result<PipelineStream<'p>, Error>;
274
275 fn execute(self, cli: &C) -> Self::ExecuteOutput {
276 let res = self.query(cli);
277 Box::pin(async move {
278 let mut res = res?;
279 let mut row_affected = 0;
280 while let Some(item) = res.try_next().await? {
281 row_affected += item.row_affected().await?;
282 }
283 Ok(row_affected)
284 })
285 }
286
287 #[inline]
288 fn query(self, cli: &C) -> Self::QueryOutput {
289 let Pipeline { columns, mut buf } = self;
290 assert!(!buf.is_empty());
291
292 let count = if SYNC_MODE {
293 columns.len()
294 } else {
295 frontend::sync(&mut buf);
296 1
297 };
298
299 cli._query(PipelineQuery {
300 count,
301 columns,
302 buf: buf.as_ref(),
303 })
304 }
305}
306
307impl<'p, C, B, const SYNC_MODE: bool> ExecuteBlocking<'_, C> for Pipeline<'p, B, SYNC_MODE>
308where
309 C: Query,
310 B: DerefMut<Target = BytesMut>,
311{
312 type ExecuteOutput = Result<u64, Error>;
313 type QueryOutput = Result<PipelineStream<'p>, Error>;
314
315 fn execute_blocking(self, cli: &C) -> Result<u64, Error> {
316 let mut res = self.query_blocking(cli)?;
317 let mut row_affected = 0;
318
319 loop {
320 match res.res.blocking_recv()? {
321 backend::Message::BindComplete => {
322 let item = PipelineItem {
323 finished: false,
324 res: &mut res.res,
325 ranges: &mut res.ranges,
326 columns: res.columns.pop_front(),
327 };
328 row_affected += item.row_affected_blocking()?;
329 }
330 backend::Message::ReadyForQuery(_) => {
331 if res.columns.is_empty() {
332 return Ok(row_affected);
333 }
334 }
335 _ => return Err(Error::unexpected()),
336 }
337 }
338 }
339
340 fn query_blocking(self, cli: &C) -> Self::QueryOutput {
341 self.query(cli)
342 }
343}
344
345pub struct PipelineStream<'a> {
348 res: Response,
349 columns: Columns<'a>,
350 ranges: Ranges,
351}
352
353impl<'a> PipelineStream<'a> {
354 pub(crate) const fn new(res: Response, columns: Vec<&'a [Column]>) -> Self {
355 Self {
356 res,
357 columns: Columns { columns, next: 0 },
358 ranges: Vec::new(),
359 }
360 }
361}
362
363type Ranges = Vec<Range<usize>>;
364
365struct Columns<'a> {
366 columns: Vec<&'a [Column]>,
367 next: usize,
368}
369
370impl<'a> Columns<'a> {
371 fn pop_front(&mut self) -> &'a [Column] {
374 let off = self.next;
375 self.next += 1;
376 self.columns[off]
377 }
378
379 fn len(&self) -> usize {
380 self.columns.len() - self.next
381 }
382
383 fn is_empty(&self) -> bool {
384 self.len() == 0
385 }
386}
387
388impl<'a> AsyncLendingIterator for PipelineStream<'a> {
389 type Ok<'i>
390 = PipelineItem<'i>
391 where
392 Self: 'i;
393 type Err = Error;
394
395 async fn try_next(&mut self) -> Result<Option<Self::Ok<'_>>, Self::Err> {
396 loop {
397 match self.res.recv().await? {
398 backend::Message::BindComplete => {
399 return Ok(Some(PipelineItem {
400 finished: false,
401 res: &mut self.res,
402 ranges: &mut self.ranges,
403 columns: self.columns.pop_front(),
404 }));
405 }
406 backend::Message::DataRow(_) | backend::Message::CommandComplete(_) => {
407 }
410 backend::Message::ReadyForQuery(_) => {
411 if self.columns.is_empty() {
412 return Ok(None);
413 }
414 }
415 _ => return Err(Error::unexpected()),
416 }
417 }
418 }
419
420 #[inline]
421 fn size_hint(&self) -> (usize, Option<usize>) {
422 let len = self.columns.len();
423 (len, Some(len))
424 }
425}
426
427pub struct PipelineItem<'a> {
430 finished: bool,
431 res: &'a mut Response,
432 ranges: &'a mut Ranges,
433 columns: &'a [Column],
434}
435
436impl PipelineItem<'_> {
437 pub async fn row_affected(mut self) -> Result<u64, Error> {
439 if self.finished {
440 return Err(Completed.into());
441 }
442
443 loop {
444 match self.res.recv().await? {
445 backend::Message::DataRow(_) => {}
446 backend::Message::CommandComplete(body) => {
447 self.finished = true;
448 return codec::body_to_affected_rows(&body);
449 }
450 _ => return Err(Error::unexpected()),
451 }
452 }
453 }
454
455 pub fn row_affected_blocking(mut self) -> Result<u64, Error> {
457 if self.finished {
458 return Err(Completed.into());
459 }
460
461 loop {
462 match self.res.blocking_recv()? {
463 backend::Message::DataRow(_) => {}
464 backend::Message::CommandComplete(body) => {
465 self.finished = true;
466 return codec::body_to_affected_rows(&body);
467 }
468 _ => return Err(Error::unexpected()),
469 }
470 }
471 }
472}
473
474impl AsyncLendingIterator for PipelineItem<'_> {
475 type Ok<'i>
476 = Row<'i>
477 where
478 Self: 'i;
479 type Err = Error;
480
481 async fn try_next(&mut self) -> Result<Option<Self::Ok<'_>>, Self::Err> {
482 if !self.finished {
483 match self.res.recv().await? {
484 backend::Message::DataRow(body) => {
485 return Row::try_new(self.columns, body, self.ranges).map(Some);
486 }
487 backend::Message::CommandComplete(_) => self.finished = true,
488 _ => return Err(Error::unexpected()),
489 }
490 }
491
492 Ok(None)
493 }
494}