pub struct QStream { /* private fields */ }
Expand description
Stream to communicate with q/kdb+.
Implementations§
source§impl QStream
impl QStream
sourcepub async fn connect(
method: ConnectionMethod,
host: &str,
port: u16,
credential: &str,
) -> Result<Self>
pub async fn connect( method: ConnectionMethod, host: &str, port: u16, credential: &str, ) -> Result<Self>
Connect to q/kdb+ specifying a connection method, destination host, destination port and access credential.
§Parameters
method
: Connection method. One of followings:- TCP
- TLS
- UDS
host
: Hostname or IP address of the target q process. Emptystr
for Unix domain socket.port
: Port of the target q process.credential
: Credential in the form ofusername:password
to connect to the target q process.
§Example
use kdbplus::qattribute;
use kdbplus::ipc::*;
#[tokio::main(flavor = "multi_thread", worker_threads = 2)]
async fn main() -> Result<()> {
let mut socket =
QStream::connect(ConnectionMethod::UDS, "", 5000_u16, "ideal:person").await?;
println!("Connection type: {}", socket.get_connection_type());
// Set remote function with asynchronous message
socket.send_async_message(&"collatz:{[n] seq:enlist n; while[not n = 1; seq,: n:$[n mod 2; 1 + 3 * n; `long$n % 2]]; seq}").await?;
// Send a query synchronously
let mut result = socket.send_sync_message(&"collatz[12]").await?;
println!("collatz[12]: {}", result);
// Send a functional query.
let mut message = K::new_compound_list(vec![
K::new_symbol(String::from("collatz")),
K::new_long(100),
]);
result = socket.send_sync_message(&message).await?;
println!("collatz[100]: {}", result);
// Send a functional asynchronous query.
message = K::new_compound_list(vec![
K::new_string(String::from("show"), qattribute::NONE),
K::new_symbol(String::from("goodbye")),
]);
socket.send_async_message(&message).await?;
socket.shutdown().await?;
Ok(())
}
sourcepub async fn accept(
method: ConnectionMethod,
host: &str,
port: u16,
) -> Result<Self>
pub async fn accept( method: ConnectionMethod, host: &str, port: u16, ) -> Result<Self>
Accept connection and does handshake.
§Parameters
method
: Connection method. One of followings:- TCP
- TLS
- UDS
- host: Hostname or IP address of this listener. Empty
str
for Unix domain socket. - port: Listening port.
§Example
use kdbplus::ipc::*;
#[tokio::main]
async fn main() -> Result<()> {
// Start listenening over UDS at the port 7000 with authentication enabled.
while let Ok(mut socket) = QStream::accept(ConnectionMethod::UDS, "", 7000).await {
tokio::task::spawn(async move {
loop {
match socket.receive_message().await {
Ok((_, message)) => {
println!("request: {}", message);
}
_ => {
socket.shutdown().await.unwrap();
break;
}
}
}
});
}
Ok(())
}
q processes can connect and send messages to this acceptor.
q)// Process1
q)h:hopen `:unix://7000:reluctant:slowday
q)neg[h] (`monalizza; 3.8)
q)neg[h] (`pizza; 125)
q)// Process2
q)h:hopen `:unix://7000:mattew:oracle
q)neg[h] (`teddy; "bear")
§Note
- TLS acceptor sets
.kdbplus.close_tls_connection_
on q clien via an asynchronous message. This function is necessary to close the socket from the server side without crashing server side application. - TLS acceptor and UDS acceptor use specific environmental variables to work. See the Environmental Variable section for details.
sourcepub async fn send_message(
&mut self,
message: &dyn Query,
message_type: u8,
) -> Result<()>
pub async fn send_message( &mut self, message: &dyn Query, message_type: u8, ) -> Result<()>
Send a message with a specified message type without waiting for a response even for a synchronous message.
If you need to receive a response you need to use receive_message
.
§Note
The usage of this function for a synchronous message is to handle an asynchronous message or a synchronous message sent by a remote function during its execution.
§Parameters
message
: q command to execute on the remote q process.&str
: q command in a string form.K
: Query in a functional form.
message_type
: Asynchronous or synchronous.
§Example
See the example of connect
.
sourcepub async fn send_async_message(&mut self, message: &dyn Query) -> Result<()>
pub async fn send_async_message(&mut self, message: &dyn Query) -> Result<()>
sourcepub async fn send_sync_message(&mut self, message: &dyn Query) -> Result<K>
pub async fn send_sync_message(&mut self, message: &dyn Query) -> Result<K>
Send a message synchronously.
§Note
Remote function must NOT send back a message of asynchronous or synchronous type durning execution of the function.
§Parameters
message
: q command to execute on the remote q process.&str
: q command in a string form.K
: Query in a functional form.
§Example
See the example of connect
.
sourcepub async fn receive_message(&mut self) -> Result<(u8, K)>
pub async fn receive_message(&mut self) -> Result<(u8, K)>
sourcepub fn get_connection_type(&self) -> &str
pub fn get_connection_type(&self) -> &str
sourcepub fn enforce_compression(&mut self)
pub fn enforce_compression(&mut self)
Enforce compression if the size of a message exceeds 2000 regardless of locality of the connection. This flag is not revertible intentionally.