1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166
use crate::{alias::Result, error::ProtonClientError, ProtonClient};
use clickhouse::query::RowCursor;
use clickhouse::Row;
use serde::Deserialize;
impl ProtonClient {
/// Executes the query, returning a [`RowCursor`] to obtain results.
///
/// # Example
///
/// ```no_run
/// use proton_client::ProtonClient;
/// use proton_client::prelude::Result;
///
/// async fn example() -> Result<()> {
///
/// #[derive(Debug, clickhouse::Row, serde::Deserialize)]
/// struct MyRow<'a> {
/// no: u32,
/// name: &'a str,
/// }
///
/// let client = ProtonClient::new("http://localhost:8123");
///
/// let mut cursor = client
/// .fetch::<MyRow<'_>>("SELECT ?fields from table(test_stream) WHERE no BETWEEN 500 AND 504")
/// .await
/// .expect("[main/fetch]: Failed to fetch data");
///
/// while let Some(MyRow { name, no }) = cursor.next().await.expect("[main/fetch]: Failed to fetch data") {
/// println!("{name}: {no}");
/// }
/// # Ok(()) }
/// ```
pub async fn fetch<T: Row>(&self, query: &str) -> Result<RowCursor<T>> {
match self.client.query(query).fetch::<T>() {
Ok(cursor) => Ok(cursor),
Err(e) => Err(ProtonClientError::FetchFailed(e.to_string())),
}
}
/// Executes the query and returns all the generated results, collected into a Vec.
///
/// Note that T must be owned.
///
/// # Errors
///
/// This function will return an error if:
///
/// - The API call fails
///
/// # Example
///
/// ```no_run
/// use proton_client::prelude::{ProtonClient, Result};
///
/// #[derive(clickhouse::Row, serde::Deserialize)]
/// struct MyRow{
/// no: u32,
/// name: String,
/// }
///
/// async fn example() -> Result<()> {
///
/// let client = ProtonClient::new("http://localhost:8123");
///
/// let query = "SELECT ?fields FROM test_stream WHERE no BETWEEN 0 AND 1";
/// let data = client.fetch_all::<MyRow>(query).await.unwrap();
///
/// println!("Received {} records", data.len());
///
/// Ok(())
/// }
/// ```
pub async fn fetch_all<T: Row>(&self, query: &str) -> Result<Vec<T>>
where
T: Row + for<'b> Deserialize<'b>,
{
match self.client.query(query).fetch_all::<T>().await {
Ok(cursor) => Ok(cursor),
Err(e) => Err(ProtonClientError::FetchAllFailed(e.to_string())),
}
}
/// Executes the query and returns just a single row.
///
/// Note that `T` must be owned.
///
/// # Errors
///
/// This function will return an error if:
///
/// - The API call fails
///
/// # Example
///
/// ```no_run
/// use proton_client::prelude::{ProtonClient, Result};
///
/// async fn example() -> Result<()> {
///
/// let client = ProtonClient::new("http://localhost:8123");
/// let query = "select count() from table(test_stream)";
/// let item = client.fetch_one::<u64>(query).await.unwrap();
///
/// println!("Single result: {:#?}", item);
///
/// Ok(())
/// }
/// ```
pub async fn fetch_one<T>(self, query: &str) -> Result<T>
where
T: Row + for<'b> Deserialize<'b>,
{
match self.client.query(query).fetch_one::<T>().await {
Ok(cursor) => Ok(cursor),
Err(e) => Err(ProtonClientError::FetchOneFailed(e.to_string())),
}
}
/// Executes the query and returns at most one row.
///
/// Note that `T` must be owned.
///
/// # Errors
///
/// This function will return an error if:
///
/// - The API call fails
///
/// # Example
///
/// ```no_run
/// use proton_client::prelude::{ProtonClient, Result};
///
/// #[derive(clickhouse::Row, serde::Deserialize, Debug)]
/// struct MyRow{
/// no: u32,
/// name: String,
/// }
///
/// async fn example() -> Result<()> {
///
/// let client = ProtonClient::new("http://localhost:8123");
/// let item_id = 42;
/// let query = "SELECT ?fields FROM test_stream WHERE no = 42";
/// let item = client.fetch_optional::<MyRow>(query).await.unwrap();
///
/// match item {
/// Some(item) => println!("Fetched: {:#?}", item),
/// None => println!("No item with id {}", item_id),
/// }
///
/// Ok(())
/// }
/// ```
pub async fn fetch_optional<T>(self, query: &str) -> Result<Option<T>>
where
T: Row + for<'b> Deserialize<'b>,
{
match self.client.query(query).fetch_optional::<T>().await {
Ok(cursor) => Ok(cursor),
Err(e) => Err(ProtonClientError::FetchOptionalFailed(e.to_string())),
}
}
}