QStream

Struct QStream 

Source
pub struct QStream { /* private fields */ }
Expand description

Stream to communicate with q/kdb+.

Implementations§

Source§

impl QStream

Source

pub async fn connect( method: ConnectionMethod, host: &str, port: u16, credential: &str, ) -> Result<Self>

Connect to q/kdb+ specifying a connection method, destination host, destination port and access credential.

§Parameters
  • method: Connection method. One of followings:
    • TCP
    • TLS
    • UDS
  • host: Hostname or IP address of the target q process. Empty str for Unix domain socket.
  • port: Port of the target q process.
  • credential: Credential in the form of username:password to connect to the target q process.
§Example
use kdb_codec::*;

#[tokio::main(flavor = "multi_thread", worker_threads = 2)]
async fn main() -> Result<()> {
    let mut socket =
        QStream::connect(ConnectionMethod::UDS, "", 5000_u16, "ideal:person").await?;
    println!("Connection type: {}", socket.get_connection_type());

    // Set remote function with asynchronous message
    socket.send_async_message(&"collatz:{[n] seq:enlist n; while[not n = 1; seq,: n:$[n mod 2; 1 + 3 * n; `long$n % 2]]; seq}").await?;

    // Send a query synchronously
    let mut result = socket.send_sync_message(&"collatz[12]").await?;
    println!("collatz[12]: {}", result);

    // Send a functional query.
    let mut message = K::new_compound_list(vec![
        K::new_symbol(String::from("collatz")),
        K::new_long(100),
    ]);
    result = socket.send_sync_message(&message).await?;
    println!("collatz[100]: {}", result);

    // Send a functional asynchronous query.
    message = K::new_compound_list(vec![
        K::new_string(String::from("show"), qattribute::NONE),
        K::new_symbol(String::from("goodbye")),
    ]);
    socket.send_async_message(&message).await?;

    socket.shutdown().await?;

    Ok(())
}
Source

pub async fn connect_with_options( method: ConnectionMethod, host: &str, port: u16, credential: &str, compression_mode: CompressionMode, validation_mode: ValidationMode, ) -> Result<Self>

Connect to q/kdb+ with explicit compression and validation options

§Parameters
  • method: Connection method (TCP, TLS, or UDS)
  • host: Hostname or IP address of the target q process. Empty str for Unix domain socket.
  • port: Port of the target q process.
  • credential: Credential in the form of username:password to connect to the target q process.
  • compression_mode: How to handle message compression
  • validation_mode: How strictly to validate incoming messages
§Example
use kdb_codec::*;

#[tokio::main(flavor = "multi_thread", worker_threads = 2)]
async fn main() -> Result<()> {
    // Connect with always compress and lenient validation
    let mut socket = QStream::connect_with_options(
        ConnectionMethod::TCP,
        "localhost",
        5000,
        "user:pass",
        CompressionMode::Always,
        ValidationMode::Lenient
    ).await?;

    let result = socket.send_sync_message(&"2+2").await?;
    println!("Result: {}", result.get_int()?);

    socket.shutdown().await?;
    Ok(())
}
Source

pub async fn accept( method: ConnectionMethod, host: &str, port: u16, ) -> Result<Self>

Accept connection and does handshake.

§Parameters
  • method: Connection method. One of followings:
    • TCP
    • TLS
    • UDS
  • host: Hostname or IP address of this listener. Empty str for Unix domain socket.
  • port: Listening port.
§Example
use kdb_codec::*;
  
#[tokio::main]
async fn main() -> Result<()> {
    // Start listenening over UDS at the port 7000 with authentication enabled.
    while let Ok(mut socket) = QStream::accept(ConnectionMethod::UDS, "", 7000).await {
        tokio::task::spawn(async move {
            loop {
                match socket.receive_message().await {
                    Ok((_, message)) => {
                        println!("request: {}", message);
                    }
                    _ => {
                        socket.shutdown().await.unwrap();
                        break;
                    }
                }
            }
        });
    }

    Ok(())
}

q processes can connect and send messages to this acceptor.

q)// Process1
q)h:hopen `:unix://7000:reluctant:slowday
q)neg[h] (`monalizza; 3.8)
q)neg[h] (`pizza; 125)
q)// Process2
q)h:hopen `:unix://7000:mattew:oracle
q)neg[h] (`teddy; "bear")
§Note
  • TLS acceptor sets .kdbplus.close_tls_connection_ on q clien via an asynchronous message. This function is necessary to close the socket from the server side without crashing server side application.
  • TLS acceptor and UDS acceptor use specific environmental variables to work. See the Environmental Variable section for details.
Source

pub async fn accept_with_options( method: ConnectionMethod, host: &str, port: u16, compression_mode: CompressionMode, validation_mode: ValidationMode, ) -> Result<Self>

Accept connection with explicit compression and validation options

§Parameters
  • method: Connection method (TCP, TLS, or UDS)
  • host: Hostname or IP address of this listener. Empty str for Unix domain socket.
  • port: Listening port.
  • compression_mode: How to handle message compression
  • validation_mode: How strictly to validate incoming messages
§Example
use kdb_codec::*;
  
#[tokio::main]
async fn main() -> Result<()> {
    // Start listening with never compress and lenient validation
    let mut socket = QStream::accept_with_options(
        ConnectionMethod::TCP,
        "127.0.0.1",
        7000,
        CompressionMode::Never,
        ValidationMode::Lenient
    ).await?;
     
    let greeting = socket.send_sync_message(&"string `Hello").await?;
    println!("Greeting: {}", greeting);
     
    socket.shutdown().await?;
    Ok(())
}
Source

pub async fn shutdown(self) -> Result<()>

Shutdown the socket for a q process.

§Example

See the example of connect.

Source

pub async fn send_message( &mut self, message: &dyn Query, message_type: u8, ) -> Result<()>

Send a message with a specified message type without waiting for a response even for a synchronous message. If you need to receive a response you need to use receive_message.

§Note

The usage of this function for a synchronous message is to handle an asynchronous message or a synchronous message sent by a remote function during its execution.

§Parameters
  • message: q command to execute on the remote q process.
    • &str: q command in a string form.
    • K: Query in a functional form.
  • message_type: Asynchronous or synchronous.
§Example

See the example of connect.

Source

pub async fn send_async_message(&mut self, message: &dyn Query) -> Result<()>

Send a message asynchronously.

§Parameters
  • message: q command to execute on the remote q process.
    • &str: q command in a string form.
    • K: Query in a functional form.
§Example

See the example of connect.

Source

pub async fn send_sync_message(&mut self, message: &dyn Query) -> Result<K>

Send a message synchronously.

§Note

Remote function must NOT send back a message of asynchronous or synchronous type durning execution of the function.

§Parameters
  • message: q command to execute on the remote q process.
    • &str: q command in a string form.
    • K: Query in a functional form.
§Example

See the example of connect.

Source

pub async fn receive_message(&mut self) -> Result<(u8, K)>

Receive a message from a remote q process. The received message is parsed as K and message type is stored in the first returned value.

§Example

See the example of accept.

Source

pub fn get_connection_type(&self) -> &str

Return underlying connection type. One of TCP, TLS or UDS.

§Example

See the example of connect.

Source

pub fn builder() -> QStreamBuilder

Create a builder for connecting to q/kdb+ with fluent API

§Example
use kdb_codec::*;

#[tokio::main]
async fn main() -> Result<()> {
    // Using builder pattern
    let mut stream = QStream::builder()
        .method(ConnectionMethod::TCP)
        .host("localhost")
        .port(5000)
        .credential("user:pass")
        .compression_mode(CompressionMode::Always)
        .validation_mode(ValidationMode::Lenient)
        .connect()
        .await?;
     
    let result = stream.send_sync_message(&"2+2").await?;
    println!("Result: {}", result.get_int()?);
     
    stream.shutdown().await?;
    Ok(())
}

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

Source§

fn vzip(self) -> V

Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more