clickhouse_arrow/client/
response.rs

1use std::pin::Pin;
2
3use futures_util::stream::StreamExt;
4use futures_util::{Stream, TryStreamExt};
5use tokio::sync::mpsc;
6use tokio_stream::wrappers::ReceiverStream;
7use tracing::{error, trace};
8
9use super::ClientFormat;
10use crate::prelude::{ATT_CID, ATT_QID};
11use crate::{Qid, Result};
12
13pub(crate) fn create_response_stream<T: ClientFormat>(
14    rx: mpsc::Receiver<Result<T::Data>>,
15    qid: Qid,
16    cid: u16,
17) -> impl Stream<Item = Result<T::Data>> + 'static {
18    ReceiverStream::new(rx)
19        .inspect_ok(move |_| trace!({ ATT_CID } = cid, { ATT_QID } = %qid, "response"))
20        .inspect_err(move |error| error!(?error, { ATT_CID } = cid, { ATT_QID } = %qid, "response"))
21}
22
23pub(crate) fn handle_insert_response<T: ClientFormat>(
24    rx: mpsc::Receiver<Result<T::Data>>,
25    qid: Qid,
26    cid: u16,
27) -> impl Stream<Item = Result<()>> + 'static {
28    ReceiverStream::new(rx)
29        .inspect_ok(move |_| trace!({ ATT_CID } = cid, { ATT_QID } = %qid, "response"))
30        .inspect_err(move |error| error!(?error, { ATT_CID } = cid, { ATT_QID } = %qid, "response"))
31        .filter_map(move |response| async move {
32            match response {
33                Ok(_) => None,
34                Err(e) => Some(Err(e)),
35            }
36        })
37}
38
39#[pin_project::pin_project]
40pub struct ClickHouseResponse<T> {
41    #[pin]
42    stream: Pin<Box<dyn Stream<Item = Result<T>> + Send + 'static>>,
43}
44
45impl<T> ClickHouseResponse<T> {
46    pub fn new(stream: Pin<Box<dyn Stream<Item = Result<T>> + Send + 'static>>) -> Self {
47        Self { stream }
48    }
49
50    pub fn from_stream<S>(stream: S) -> Self
51    where
52        S: Stream<Item = Result<T>> + Send + 'static,
53    {
54        Self::new(Box::pin(stream))
55    }
56}
57
58impl<T> Stream for ClickHouseResponse<T>
59where
60    T: Send + 'static,
61{
62    type Item = Result<T>;
63
64    fn poll_next(
65        self: Pin<&mut Self>,
66        cx: &mut std::task::Context<'_>,
67    ) -> std::task::Poll<Option<Self::Item>> {
68        self.project().stream.poll_next(cx)
69    }
70}