1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
use crate::prelude::{ProtonClient, ProtonClientError, Result};
use clickhouse::query::RowCursor;
use clickhouse::Row;
use serde::Deserialize;

impl ProtonClient {
    ///
    /// Executes a streaming query, returning a [`RowCursor`] to obtain results
    /// as they become available from the stream. The key difference compared to fetch is that,
    /// for streaming query, the returned result is a unbounded stream. Also,
    /// a fetch_stream query will keep running continuously returning fresh data
    /// until the application terminates..
    ///
    /// # Example
    ///
    /// ```no_run
    ///  use proton_client::ProtonClient;
    ///  use proton_client::prelude::Result;
    ///
    ///  async fn example() -> Result<()> {
    ///
    /// #[derive(Debug, clickhouse::Row, serde::Deserialize)]
    /// struct MyRow {
    ///     no: u32,
    ///     name: String,
    /// }
    ///
    /// let client = ProtonClient::new("http://localhost:3218");
    ///
    ///  let mut cursor = client
    ///     .fetch_stream::<MyRow>("SELECT ?fields from (test_stream) WHERE no BETWEEN 500 AND 504")
    ///     .await
    ///     .expect("[main/fetch]: Failed to fetch stream data");
    ///
    /// while let Some(MyRow { name, no }) = cursor.next().await.expect("[main/fetch]: Failed to fetch data") {
    ///     println!("{name}: {no}");
    /// }
    /// # Ok(()) }
    /// ```
    pub async fn fetch_stream<T>(&self, query: &str) -> Result<RowCursor<T>>
    where
        T: Row + for<'b> Deserialize<'b>,
    {
        // Here we use the client without compression. For details, see:
        // https://github.com/timeplus-io/proton-rust-client/issues/6
        match self.client_without_compression.query(query).fetch::<T>() {
            Ok(cursor) => Ok(cursor),
            Err(e) => Err(ProtonClientError::FetchFailed(e.to_string())),
        }
    }
}