Struct ksqldb::KsqlDB [−][src]
A KSQL-DB Client, ready to make requests to the server
Implementations
impl KsqlDB
[src]
pub async fn create(
&self,
statement: &str,
stream_properties: &HashMap<String, String>,
command_sequence_number: Option<u32>
) -> Result<Vec<CreateResponse>>
[src]
&self,
statement: &str,
stream_properties: &HashMap<String, String>,
command_sequence_number: Option<u32>
) -> Result<Vec<CreateResponse>>
Runs a CREATE
statement
Examples
use reqwest::Client; use ksqldb::KsqlDB; let ksql = KsqlDB::new("localhost:8080".into(), Client::builder(), false).unwrap(); let query = r#" CREATE STREAM MY_STREAM ( id VARCHAR KEY ) WITH ( kafka_topic = 'my_topic', partitions = 1, value_format = 'JSON' ); "#; let response = ksql.create(&query, &Default::default(), None).await;
pub async fn drop(
&self,
statement: &str,
stream_properties: &HashMap<String, String>,
command_sequence_number: Option<u32>
) -> Result<Vec<DropResponse>>
[src]
&self,
statement: &str,
stream_properties: &HashMap<String, String>,
command_sequence_number: Option<u32>
) -> Result<Vec<DropResponse>>
Runs a DROP
statement
Examples
use reqwest::Client; use ksqldb::KsqlDB; let ksql = KsqlDB::new("localhost:8080".into(), Client::builder(), false).unwrap(); let query = r#"DROP TABLE MY_TABLE;"#; let response = ksql.drop(&query, &Default::default(), None).await;
pub async fn terminate(
&self,
statement: &str,
stream_properties: &HashMap<String, String>,
command_sequence_number: Option<u32>
) -> Result<Vec<TerminateResponse>>
[src]
&self,
statement: &str,
stream_properties: &HashMap<String, String>,
command_sequence_number: Option<u32>
) -> Result<Vec<TerminateResponse>>
Runs a TERMINATE
statement
Examples
use reqwest::Client; use ksqldb::KsqlDB; let ksql = KsqlDB::new("localhost:8080".into(), Client::builder(), false).unwrap(); let query = r#"TERMINATE my_query_id;"#; let response = ksql.terminate(&query, &Default::default(), None).await;
pub async fn select<T>(
&self,
query: &str,
stream_properties: &HashMap<String, String>
) -> Result<impl Stream<Item = Result<T>>> where
T: DeserializeOwned,
[src]
&self,
query: &str,
stream_properties: &HashMap<String, String>
) -> Result<impl Stream<Item = Result<T>>> where
T: DeserializeOwned,
Runs a SELECT
statement
Examples
use reqwest::Client; use ksqldb::KsqlDB; use futures_util::stream::StreamExt; #[derive(Debug, Deserialize)] struct MyResponse { id: String, data: Vec<u32> } let ksql = KsqlDB::new("localhost:8080".into(), Client::builder(), false).unwrap(); let query = r#"SELECT * FROM MY_STREAM EMIT CHANGES;"#; let mut stream = ksql.select::<MyResponse>(&query, &Default::default()).await.unwrap(); while let Some(data) = stream.next().await { println!("{:#?}", data); }
pub async fn list_streams(
&self,
statement: &str,
stream_properties: &HashMap<String, String>,
command_sequence_number: Option<u32>
) -> Result<Vec<ListStreamsResponse>>
[src]
&self,
statement: &str,
stream_properties: &HashMap<String, String>,
command_sequence_number: Option<u32>
) -> Result<Vec<ListStreamsResponse>>
Runs a LIST STREAMS
or SHOW STREAMS
statement. They both have the same
response structure so this method can be used to execute either.
Examples
use reqwest::Client; use ksqldb::KsqlDB; let ksql = KsqlDB::new("localhost:8080".into(), Client::builder(), false).unwrap(); let query = r#"SHOW STREAMS;"#; let response = ksql.list_streams(&query, &Default::default(), None).await;
pub async fn list_tables(
&self,
statement: &str,
stream_properties: &HashMap<String, String>,
command_sequence_number: Option<u32>
) -> Result<Vec<ListTablesResponse>>
[src]
&self,
statement: &str,
stream_properties: &HashMap<String, String>,
command_sequence_number: Option<u32>
) -> Result<Vec<ListTablesResponse>>
Runs a LIST TABLES
or SHOW TABLES
statement. They both have the same
response structure so this method can be used to execute either.
Examples
use reqwest::Client; use ksqldb::KsqlDB; let ksql = KsqlDB::new("localhost:8080".into(), Client::builder(), false).unwrap(); let query = r#"SHOW TABLES EXTENDED;"#; let response = ksql.list_tables(&query, &Default::default(), None).await;
pub async fn list_queries(
&self,
statement: &str,
stream_properties: &HashMap<String, String>,
command_sequence_number: Option<u32>
) -> Result<Vec<ListQueriesResponse>>
[src]
&self,
statement: &str,
stream_properties: &HashMap<String, String>,
command_sequence_number: Option<u32>
) -> Result<Vec<ListQueriesResponse>>
Runs a LIST QUERIES
or SHOW QUERIES
statement. They both have the same
response structure so this method can be used to execute either.
Examples
use reqwest::Client; use ksqldb::KsqlDB; let ksql = KsqlDB::new("localhost:8080".into(), Client::builder(), false).unwrap(); let query = r#"SHOW QUERIES;"#; let response = ksql.list_queries(&query, &Default::default(), None).await;
pub async fn list_properties(
&self,
statement: &str,
stream_properties: &HashMap<String, String>,
command_sequence_number: Option<u32>
) -> Result<Vec<Properties>>
[src]
&self,
statement: &str,
stream_properties: &HashMap<String, String>,
command_sequence_number: Option<u32>
) -> Result<Vec<Properties>>
Runs a SHOW PROPERTIES
statement.
Examples
use reqwest::Client; use ksqldb::KsqlDB; let ksql = KsqlDB::new("localhost:8080".into(), Client::builder(), false).unwrap(); let query = r#"SHOW PROPERTIES;"#; let response = ksql.list_properties(&query, &Default::default(), None).await;
pub async fn describe(
&self,
statement: &str,
stream_properties: &HashMap<String, String>,
command_sequence_number: Option<u32>
) -> Result<Vec<DescribeResponse>>
[src]
&self,
statement: &str,
stream_properties: &HashMap<String, String>,
command_sequence_number: Option<u32>
) -> Result<Vec<DescribeResponse>>
Runs a DESCRIBE (stream_name | table_name)
statement.
Examples
use reqwest::Client; use ksqldb::KsqlDB; let ksql = KsqlDB::new("localhost:8080".into(), Client::builder(), false).unwrap(); let query = r#"DESCRIBE EXTENDED MY_STREAM;"#; let response = ksql.describe(&query, &Default::default(), None).await;
pub async fn explain(
&self,
statement: &str,
stream_properties: &HashMap<String, String>,
command_sequence_number: Option<u32>
) -> Result<Vec<ExplainResponse>>
[src]
&self,
statement: &str,
stream_properties: &HashMap<String, String>,
command_sequence_number: Option<u32>
) -> Result<Vec<ExplainResponse>>
Runs a EXPLAIN (sql_expression | query_id)
statement.
Examples
use reqwest::Client; use ksqldb::KsqlDB; let ksql = KsqlDB::new("localhost:8080".into(), Client::builder(), false).unwrap(); let query = r#"EXPLAIN my_query_id;"#; let response = ksql.explain(&query, &Default::default(), None).await;
impl KsqlDB
[src]
pub async fn status(&self)
[src]
@TODO
pub async fn info(&self)
[src]
@TODO
pub async fn execute_statement<T>(
&self,
statement: &str,
stream_properties: &HashMap<String, String>,
command_sequence_number: Option<u32>
) -> Result<Vec<T>> where
T: DeserializeOwned,
[src]
&self,
statement: &str,
stream_properties: &HashMap<String, String>,
command_sequence_number: Option<u32>
) -> Result<Vec<T>> where
T: DeserializeOwned,
This is a lower level entry point to the /ksql
endpoint.
This resource runs a sequence of 1 or more SQL
statements. All statements, except those starting with SELECT
can be run.
To run SELECT
statements use the KsqlDB::query
method.
This KSQL-DB endpoint has a variable response, generally depending on the sorts
of statements you’re executing. It requires that you pass a type T
to the function dicatating
what you want to deserialize from the response. In the event that you’re sending multiple
requests which all contain different response structures it might be easier to
specifiy the value as a serde_json::Value
and handle the parsing in your application.
Examples
use reqwest::Client; use serde::Deserialize; use serde_json::Value; use ksqldb::KsqlDB; #[derive(Debug, Deserialize)] #[serde(rename_all(serialize = "snake_case", deserialize = "camelCase"))] struct StatementResponse { // You can handle reserved keywords by renaming // This maps the Rust key: `ident` -> JSON key: `@type` // You can also use this to rename fields to be more meaningful for you #[serde(rename = "@type")] ident: String, statement_text: String, // If you're not entirely sure about the type, you can leave it as JSON // Although you lose the benefits of Rust (and make it harder to extract in // the future if you do so) warnings: Vec<Value>, streams: Vec<StreamData>, } // In this case we're defining our own type, alternatively you can use the // ones located in the `common::ksqldb::types` module. // Feel free to add new ones as appropriate #[derive(Debug, Deserialize)] #[serde(rename_all(serialize = "snake_case", deserialize = "camelCase"))] struct StreamData { #[serde(rename = "type")] data_type: String, name: String, topic: String, format: String, } #[tokio::main] async fn main() { let ksql = KsqlDB::new("localhost:8088".to_string(), Client::builder(), false).unwrap(); let query = "show streams;"; let result = ksql .execute_statement::<StatementResponse>(&query, &Default::default(), None) .await; }
pub async fn execute_statement_raw(
&self,
statement: &str,
stream_properties: &HashMap<String, String>,
command_sequence_number: Option<u32>
) -> Result<Value>
[src]
&self,
statement: &str,
stream_properties: &HashMap<String, String>,
command_sequence_number: Option<u32>
) -> Result<Value>
This method just makes the request to the execute statement endpoint and returns
the raw JSON
response back to you, not doing any parsing or error checking.
It’s unlikely that you would want to use this in any of your application code, however it might be useful for development or debugging, which is why it will be left in.
The caveat to this is that by using this function the caller has to do all parsing
(Ok
or Err
) themselves.
impl KsqlDB
[src]
pub fn new(
url: String,
builder: ClientBuilder,
https_only: bool
) -> Result<Self>
[src]
url: String,
builder: ClientBuilder,
https_only: bool
) -> Result<Self>
Initialises the KSQL DB Client with the provided request::ClientBuilder
.
Any authentication or common headers should be attached to the client prior to calling this method.
pub async fn query<T>(
&self,
statement: &str,
properties: &HashMap<String, String>
) -> Result<impl Stream<Item = Result<T>>> where
T: DeserializeOwned,
[src]
&self,
statement: &str,
properties: &HashMap<String, String>
) -> Result<impl Stream<Item = Result<T>>> where
T: DeserializeOwned,
This method lets you stream the output records of a SELECT
statement
via HTTP/2 streams. The response is streamed back until the
LIMIT
specified in the statement is reached, or the client closes the connection.
If no LIMIT
is specified in the statement, then the response is streamed until the client closes the connection.
This method requires the http2
feature be enabled.
This crate also offers a HTTP/1 compatible approach to streaming results via
Transfer-Encoding: chunked
. To enable this turn off default features and enable the
http1
feature.
Notes
- The
T
provided, must be able to directlyserde::Deserialize
the response, it will error if there are missing mandatory fields - In the example below, if you were to change the query to be
SELECT ID FROM EVENT_REPLAY_STREAM EMIT CHANGES
, the query would error, because all of the other fields within the struct are mandatory fields.
Example
use futures_util::StreamExt; use reqwest::Client; use serde::Deserialize; use ksqldb::KsqlDB; #[derive(Debug, Deserialize)] struct Response { id: String, is_keyframe: bool, sequence_number: u32, events_since_keyframe: u32, event_data: String, } #[tokio::main] async fn main() { let ksql = KsqlDB::new("localhost:8080".into(), Client::builder(), false).unwrap(); let query = "SELECT * FROM EVENT_REPLAY_STREAM EMIT CHANGES;"; let mut stream = ksql .query::<Response>(&query, &Default::default()) .await .unwrap(); while let Some(r) = stream.next().await { match r { Ok(data) => { println!("{:#?}", data); } Err(e) => { eprintln!("Found Error {}", e); } } } }
Auto Trait Implementations
impl !RefUnwindSafe for KsqlDB
impl Send for KsqlDB
impl Sync for KsqlDB
impl Unpin for KsqlDB
impl !UnwindSafe for KsqlDB
Blanket Implementations
impl<T> Any for T where
T: 'static + ?Sized,
[src]
T: 'static + ?Sized,
impl<T> Borrow<T> for T where
T: ?Sized,
[src]
T: ?Sized,
impl<T> BorrowMut<T> for T where
T: ?Sized,
[src]
T: ?Sized,
pub fn borrow_mut(&mut self) -> &mut T
[src]
impl<T> From<T> for T
[src]
impl<T> Instrument for T
[src]
pub fn instrument(self, span: Span) -> Instrumented<Self>
[src]
pub fn in_current_span(self) -> Instrumented<Self>
[src]
impl<T> Instrument for T
[src]
pub fn instrument(self, span: Span) -> Instrumented<Self>
[src]
pub fn in_current_span(self) -> Instrumented<Self>
[src]
impl<T, U> Into<U> for T where
U: From<T>,
[src]
U: From<T>,
impl<T, U> TryFrom<U> for T where
U: Into<T>,
[src]
U: Into<T>,
type Error = Infallible
The type returned in the event of a conversion error.
pub fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>
[src]
impl<T, U> TryInto<U> for T where
U: TryFrom<T>,
[src]
U: TryFrom<T>,