clickhouse_arrow/client/
response.rs1use std::pin::Pin;
2
3use futures_util::stream::StreamExt;
4use futures_util::{Stream, TryStreamExt};
5use tokio::sync::mpsc;
6use tokio_stream::wrappers::ReceiverStream;
7use tracing::{error, trace};
8
9use super::ClientFormat;
10use crate::prelude::{ATT_CID, ATT_QID};
11use crate::{Qid, Result};
12
13pub(crate) fn create_response_stream<T: ClientFormat>(
14 rx: mpsc::Receiver<Result<T::Data>>,
15 qid: Qid,
16 cid: u16,
17) -> impl Stream<Item = Result<T::Data>> + 'static {
18 ReceiverStream::new(rx)
19 .inspect_ok(move |_| trace!({ ATT_CID } = cid, { ATT_QID } = %qid, "response"))
20 .inspect_err(move |error| error!(?error, { ATT_CID } = cid, { ATT_QID } = %qid, "response"))
21}
22
23pub(crate) fn handle_insert_response<T: ClientFormat>(
24 rx: mpsc::Receiver<Result<T::Data>>,
25 qid: Qid,
26 cid: u16,
27) -> impl Stream<Item = Result<()>> + 'static {
28 ReceiverStream::new(rx)
29 .inspect_ok(move |_| trace!({ ATT_CID } = cid, { ATT_QID } = %qid, "response"))
30 .inspect_err(move |error| error!(?error, { ATT_CID } = cid, { ATT_QID } = %qid, "response"))
31 .filter_map(move |response| async move {
32 match response {
33 Ok(_) => None,
34 Err(e) => Some(Err(e)),
35 }
36 })
37}
38
39#[pin_project::pin_project]
40pub struct ClickHouseResponse<T> {
41 #[pin]
42 stream: Pin<Box<dyn Stream<Item = Result<T>> + Send + 'static>>,
43}
44
45impl<T> ClickHouseResponse<T> {
46 pub fn new(stream: Pin<Box<dyn Stream<Item = Result<T>> + Send + 'static>>) -> Self {
47 Self { stream }
48 }
49
50 pub fn from_stream<S>(stream: S) -> Self
51 where
52 S: Stream<Item = Result<T>> + Send + 'static,
53 {
54 Self::new(Box::pin(stream))
55 }
56}
57
58impl<T> Stream for ClickHouseResponse<T>
59where
60 T: Send + 'static,
61{
62 type Item = Result<T>;
63
64 fn poll_next(
65 self: Pin<&mut Self>,
66 cx: &mut std::task::Context<'_>,
67 ) -> std::task::Poll<Option<Self::Item>> {
68 self.project().stream.poll_next(cx)
69 }
70}