Struct ksqldb::KsqlDB[][src]

pub struct KsqlDB { /* fields omitted */ }

A KSQL-DB Client, ready to make requests to the server

Implementations

impl KsqlDB[src]

pub async fn create(
    &self,
    statement: &str,
    stream_properties: &HashMap<String, String>,
    command_sequence_number: Option<u32>
) -> Result<Vec<CreateResponse>>
[src]

Runs a CREATE statement

Examples

use reqwest::Client;
use ksqldb::KsqlDB;

let ksql = KsqlDB::new("localhost:8080".into(), Client::builder(), false).unwrap();

let query = r#"
CREATE STREAM MY_STREAM (
    id VARCHAR KEY
) WITH (
    kafka_topic = 'my_topic',
    partitions = 1,
    value_format = 'JSON'
);
"#;

let response = ksql.create(&query, &Default::default(), None).await;

pub async fn drop(
    &self,
    statement: &str,
    stream_properties: &HashMap<String, String>,
    command_sequence_number: Option<u32>
) -> Result<Vec<DropResponse>>
[src]

Runs a DROP statement

Examples

use reqwest::Client;
use ksqldb::KsqlDB;

let ksql = KsqlDB::new("localhost:8080".into(), Client::builder(), false).unwrap();

let query = r#"DROP TABLE MY_TABLE;"#;

let response = ksql.drop(&query, &Default::default(), None).await;

pub async fn terminate(
    &self,
    statement: &str,
    stream_properties: &HashMap<String, String>,
    command_sequence_number: Option<u32>
) -> Result<Vec<TerminateResponse>>
[src]

Runs a TERMINATE statement

Examples

use reqwest::Client;
use ksqldb::KsqlDB;

let ksql = KsqlDB::new("localhost:8080".into(), Client::builder(), false).unwrap();

let query = r#"TERMINATE my_query_id;"#;

let response = ksql.terminate(&query, &Default::default(), None).await;

pub async fn select<T>(
    &self,
    query: &str,
    stream_properties: &HashMap<String, String>
) -> Result<impl Stream<Item = Result<T>>> where
    T: DeserializeOwned
[src]

Runs a SELECT statement

Examples

use reqwest::Client;
use ksqldb::KsqlDB;
use futures_util::stream::StreamExt;

#[derive(Debug, Deserialize)]
struct MyResponse {
    id: String,
    data: Vec<u32>
}

let ksql = KsqlDB::new("localhost:8080".into(), Client::builder(), false).unwrap();

let query = r#"SELECT * FROM MY_STREAM EMIT CHANGES;"#;

let mut stream = ksql.select::<MyResponse>(&query, &Default::default()).await.unwrap();

while let Some(data) = stream.next().await {
    println!("{:#?}", data);
}

pub async fn list_streams(
    &self,
    statement: &str,
    stream_properties: &HashMap<String, String>,
    command_sequence_number: Option<u32>
) -> Result<Vec<ListStreamsResponse>>
[src]

Runs a LIST STREAMS or SHOW STREAMS statement. They both have the same response structure so this method can be used to execute either.

Examples

use reqwest::Client;
use ksqldb::KsqlDB;

let ksql = KsqlDB::new("localhost:8080".into(), Client::builder(), false).unwrap();

let query = r#"SHOW STREAMS;"#;

let response = ksql.list_streams(&query, &Default::default(), None).await;

pub async fn list_tables(
    &self,
    statement: &str,
    stream_properties: &HashMap<String, String>,
    command_sequence_number: Option<u32>
) -> Result<Vec<ListTablesResponse>>
[src]

Runs a LIST TABLES or SHOW TABLES statement. They both have the same response structure so this method can be used to execute either.

Examples

use reqwest::Client;
use ksqldb::KsqlDB;

let ksql = KsqlDB::new("localhost:8080".into(), Client::builder(), false).unwrap();

let query = r#"SHOW TABLES EXTENDED;"#;

let response = ksql.list_tables(&query, &Default::default(), None).await;

pub async fn list_queries(
    &self,
    statement: &str,
    stream_properties: &HashMap<String, String>,
    command_sequence_number: Option<u32>
) -> Result<Vec<ListQueriesResponse>>
[src]

Runs a LIST QUERIES or SHOW QUERIES statement. They both have the same response structure so this method can be used to execute either.

Examples

use reqwest::Client;
use ksqldb::KsqlDB;

let ksql = KsqlDB::new("localhost:8080".into(), Client::builder(), false).unwrap();

let query = r#"SHOW QUERIES;"#;

let response = ksql.list_queries(&query, &Default::default(), None).await;

pub async fn list_properties(
    &self,
    statement: &str,
    stream_properties: &HashMap<String, String>,
    command_sequence_number: Option<u32>
) -> Result<Vec<Properties>>
[src]

Runs a SHOW PROPERTIES statement.

Examples

use reqwest::Client;
use ksqldb::KsqlDB;

let ksql = KsqlDB::new("localhost:8080".into(), Client::builder(), false).unwrap();

let query = r#"SHOW PROPERTIES;"#;

let response = ksql.list_properties(&query, &Default::default(), None).await;

pub async fn describe(
    &self,
    statement: &str,
    stream_properties: &HashMap<String, String>,
    command_sequence_number: Option<u32>
) -> Result<Vec<DescribeResponse>>
[src]

Runs a DESCRIBE (stream_name | table_name) statement.

Examples

use reqwest::Client;
use ksqldb::KsqlDB;

let ksql = KsqlDB::new("localhost:8080".into(), Client::builder(), false).unwrap();

let query = r#"DESCRIBE EXTENDED MY_STREAM;"#;

let response = ksql.describe(&query, &Default::default(), None).await;

pub async fn explain(
    &self,
    statement: &str,
    stream_properties: &HashMap<String, String>,
    command_sequence_number: Option<u32>
) -> Result<Vec<ExplainResponse>>
[src]

Runs a EXPLAIN (sql_expression | query_id) statement.

Examples

use reqwest::Client;
use ksqldb::KsqlDB;

let ksql = KsqlDB::new("localhost:8080".into(), Client::builder(), false).unwrap();

let query = r#"EXPLAIN my_query_id;"#;

let response = ksql.explain(&query, &Default::default(), None).await;

impl KsqlDB[src]

pub async fn status(&self)[src]

@TODO

pub async fn info(&self)[src]

@TODO

pub async fn execute_statement<T>(
    &self,
    statement: &str,
    stream_properties: &HashMap<String, String>,
    command_sequence_number: Option<u32>
) -> Result<Vec<T>> where
    T: DeserializeOwned
[src]

This is a lower level entry point to the /ksql endpoint.

This resource runs a sequence of 1 or more SQL statements. All statements, except those starting with SELECT can be run.

To run SELECT statements use the KsqlDB::query method.

This KSQL-DB endpoint has a variable response, generally depending on the sorts of statements you’re executing. It requires that you pass a type T to the function dicatating what you want to deserialize from the response. In the event that you’re sending multiple requests which all contain different response structures it might be easier to specifiy the value as a serde_json::Value and handle the parsing in your application.

Examples

use reqwest::Client;
use serde::Deserialize;
use serde_json::Value;

use ksqldb::KsqlDB;

#[derive(Debug, Deserialize)]
#[serde(rename_all(serialize = "snake_case", deserialize = "camelCase"))]
struct StatementResponse {
    // You can handle reserved keywords by renaming
    // This maps the Rust key: `ident` -> JSON key: `@type`
    // You can also use this to rename fields to be more meaningful for you
    #[serde(rename = "@type")]
    ident: String,
    statement_text: String,
    // If you're not entirely sure about the type, you can leave it as JSON
    // Although you lose the benefits of Rust (and make it harder to extract in
    // the future if you do so)
    warnings: Vec<Value>,
    streams: Vec<StreamData>,
}

// In this case we're defining our own type, alternatively you can use the
// ones located in the `common::ksqldb::types` module.
// Feel free to add new ones as appropriate
#[derive(Debug, Deserialize)]
#[serde(rename_all(serialize = "snake_case", deserialize = "camelCase"))]
struct StreamData {
    #[serde(rename = "type")]
    data_type: String,
    name: String,
    topic: String,
    format: String,
}

#[tokio::main]
async fn main() {
    let ksql = KsqlDB::new("localhost:8088".to_string(), Client::builder(), false).unwrap();

    let query = "show streams;";
    let result = ksql
        .execute_statement::<StatementResponse>(&query, &Default::default(), None)
        .await;
}

API Docs

pub async fn execute_statement_raw(
    &self,
    statement: &str,
    stream_properties: &HashMap<String, String>,
    command_sequence_number: Option<u32>
) -> Result<Value>
[src]

This method just makes the request to the execute statement endpoint and returns the raw JSON response back to you, not doing any parsing or error checking.

It’s unlikely that you would want to use this in any of your application code, however it might be useful for development or debugging, which is why it will be left in.

The caveat to this is that by using this function the caller has to do all parsing (Ok or Err) themselves.

impl KsqlDB[src]

pub fn new(
    url: String,
    builder: ClientBuilder,
    https_only: bool
) -> Result<Self>
[src]

Initialises the KSQL DB Client with the provided request::ClientBuilder.

Any authentication or common headers should be attached to the client prior to calling this method.

pub async fn query<T>(
    &self,
    statement: &str,
    properties: &HashMap<String, String>
) -> Result<impl Stream<Item = Result<T>>> where
    T: DeserializeOwned
[src]

This method lets you stream the output records of a SELECT statement via HTTP/2 streams. The response is streamed back until the LIMIT specified in the statement is reached, or the client closes the connection.

If no LIMIT is specified in the statement, then the response is streamed until the client closes the connection.

This method requires the http2 feature be enabled.

This crate also offers a HTTP/1 compatible approach to streaming results via Transfer-Encoding: chunked. To enable this turn off default features and enable the http1 feature.

Notes

  • The T provided, must be able to directly [Deserialize] the response, it will error if there are missing mandatory fields
  • In the example below, if you were to change the query to be SELECT ID FROM EVENT_REPLAY_STREAM EMIT CHANGES, the query would error, because all of the other fields within the struct are mandatory fields.

Example

use futures_util::StreamExt;
use reqwest::Client;
use serde::Deserialize;

use ksqldb::KsqlDB;

#[derive(Debug, Deserialize)]
struct Response {
    id: String,
    is_keyframe: bool,
    sequence_number: u32,
    events_since_keyframe: u32,
    event_data: String,
}

#[tokio::main]
async fn main() {
    let ksql = KsqlDB::new("localhost:8080".into(), Client::builder(), false).unwrap();
    let query = "SELECT * FROM EVENT_REPLAY_STREAM EMIT CHANGES;";

    let mut stream = ksql
        .query::<Response>(&query, &Default::default())
        .await
        .unwrap();
    while let Some(r) = stream.next().await {
        match r {
            Ok(data) => {
                println!("{:#?}", data);
            }
            Err(e) => {
                eprintln!("Found Error {}", e);
            }
        }
    }
}

API Docs

Auto Trait Implementations

impl !RefUnwindSafe for KsqlDB

impl Send for KsqlDB

impl Sync for KsqlDB

impl Unpin for KsqlDB

impl !UnwindSafe for KsqlDB

Blanket Implementations

impl<T> Any for T where
    T: 'static + ?Sized
[src]

impl<T> Borrow<T> for T where
    T: ?Sized
[src]

impl<T> BorrowMut<T> for T where
    T: ?Sized
[src]

impl<T> From<T> for T[src]

impl<T> Instrument for T[src]

impl<T> Instrument for T[src]

impl<T, U> Into<U> for T where
    U: From<T>, 
[src]

impl<T, U> TryFrom<U> for T where
    U: Into<T>, 
[src]

type Error = Infallible

The type returned in the event of a conversion error.

impl<T, U> TryInto<U> for T where
    U: TryFrom<T>, 
[src]

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.