Struct kdbplus::ipc::QStream

source ·
pub struct QStream { /* private fields */ }
Expand description

Stream to communicate with q/kdb+.

Implementations§

source§

impl QStream

source

pub async fn connect( method: ConnectionMethod, host: &str, port: u16, credential: &str, ) -> Result<Self>

Connect to q/kdb+ specifying a connection method, destination host, destination port and access credential.

§Parameters
  • method: Connection method. One of followings:
    • TCP
    • TLS
    • UDS
  • host: Hostname or IP address of the target q process. Empty str for Unix domain socket.
  • port: Port of the target q process.
  • credential: Credential in the form of username:password to connect to the target q process.
§Example
use kdbplus::qattribute;
use kdbplus::ipc::*;

#[tokio::main(flavor = "multi_thread", worker_threads = 2)]
async fn main() -> Result<()> {
    let mut socket =
        QStream::connect(ConnectionMethod::UDS, "", 5000_u16, "ideal:person").await?;
    println!("Connection type: {}", socket.get_connection_type());

    // Set remote function with asynchronous message
    socket.send_async_message(&"collatz:{[n] seq:enlist n; while[not n = 1; seq,: n:$[n mod 2; 1 + 3 * n; `long$n % 2]]; seq}").await?;

    // Send a query synchronously
    let mut result = socket.send_sync_message(&"collatz[12]").await?;
    println!("collatz[12]: {}", result);

    // Send a functional query.
    let mut message = K::new_compound_list(vec![
        K::new_symbol(String::from("collatz")),
        K::new_long(100),
    ]);
    result = socket.send_sync_message(&message).await?;
    println!("collatz[100]: {}", result);

    // Send a functional asynchronous query.
    message = K::new_compound_list(vec![
        K::new_string(String::from("show"), qattribute::NONE),
        K::new_symbol(String::from("goodbye")),
    ]);
    socket.send_async_message(&message).await?;

    socket.shutdown().await?;

    Ok(())
}
source

pub async fn accept( method: ConnectionMethod, host: &str, port: u16, ) -> Result<Self>

Accept connection and does handshake.

§Parameters
  • method: Connection method. One of followings:
    • TCP
    • TLS
    • UDS
  • host: Hostname or IP address of this listener. Empty str for Unix domain socket.
  • port: Listening port.
§Example
use kdbplus::ipc::*;
  
#[tokio::main]
async fn main() -> Result<()> {
    // Start listenening over UDS at the port 7000 with authentication enabled.
    while let Ok(mut socket) = QStream::accept(ConnectionMethod::UDS, "", 7000).await {
        tokio::task::spawn(async move {
            loop {
                match socket.receive_message().await {
                    Ok((_, message)) => {
                        println!("request: {}", message);
                    }
                    _ => {
                        socket.shutdown().await.unwrap();
                        break;
                    }
                }
            }
        });
    }

    Ok(())
}

q processes can connect and send messages to this acceptor.

q)// Process1
q)h:hopen `:unix://7000:reluctant:slowday
q)neg[h] (`monalizza; 3.8)
q)neg[h] (`pizza; 125)
q)// Process2
q)h:hopen `:unix://7000:mattew:oracle
q)neg[h] (`teddy; "bear")
§Note
  • TLS acceptor sets .kdbplus.close_tls_connection_ on q clien via an asynchronous message. This function is necessary to close the socket from the server side without crashing server side application.
  • TLS acceptor and UDS acceptor use specific environmental variables to work. See the Environmental Variable section for details.
source

pub async fn shutdown(self) -> Result<()>

Shutdown the socket for a q process.

§Example

See the example of connect.

source

pub async fn send_message( &mut self, message: &dyn Query, message_type: u8, ) -> Result<()>

Send a message with a specified message type without waiting for a response even for a synchronous message. If you need to receive a response you need to use receive_message.

§Note

The usage of this function for a synchronous message is to handle an asynchronous message or a synchronous message sent by a remote function during its execution.

§Parameters
  • message: q command to execute on the remote q process.
    • &str: q command in a string form.
    • K: Query in a functional form.
  • message_type: Asynchronous or synchronous.
§Example

See the example of connect.

source

pub async fn send_async_message(&mut self, message: &dyn Query) -> Result<()>

Send a message asynchronously.

§Parameters
  • message: q command to execute on the remote q process.
    • &str: q command in a string form.
    • K: Query in a functional form.
§Example

See the example of connect.

source

pub async fn send_sync_message(&mut self, message: &dyn Query) -> Result<K>

Send a message synchronously.

§Note

Remote function must NOT send back a message of asynchronous or synchronous type durning execution of the function.

§Parameters
  • message: q command to execute on the remote q process.
    • &str: q command in a string form.
    • K: Query in a functional form.
§Example

See the example of connect.

source

pub async fn receive_message(&mut self) -> Result<(u8, K)>

Receive a message from a remote q process. The received message is parsed as K and message type is stored in the first returned value.

§Example

See the example of accept.

source

pub fn get_connection_type(&self) -> &str

Return underlying connection type. One of TCP, TLS or UDS.

§Example

See the example of connect.

source

pub fn enforce_compression(&mut self)

Enforce compression if the size of a message exceeds 2000 regardless of locality of the connection. This flag is not revertible intentionally.

Auto Trait Implementations§

Blanket Implementations§

source§

impl<T> Any for T
where T: 'static + ?Sized,

source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
source§

impl<T> Borrow<T> for T
where T: ?Sized,

source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
source§

impl<T> From<T> for T

source§

fn from(t: T) -> T

Returns the argument unchanged.

source§

impl<T> Instrument for T

source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
source§

impl<T, U> Into<U> for T
where U: From<T>,

source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

§

type Error = Infallible

The type returned in the event of a conversion error.
source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

source§

fn vzip(self) -> V

source§

impl<T> WithSubscriber for T

source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more