kdb_codec 0.1.0

Kdb+ IPC codec library for handling kdb+ wire protocol data with Rust.
Documentation

kdb_codec - Kdb+ IPC Codec Library

Tests

A Rust library focused on handling the kdb+ IPC (Inter-Process Communication) wire protocol. This library provides efficient encoding, decoding, and communication with q/kdb+ processes using idiomatic Rust patterns.

Inspired by the original kdbplus crate, this library addresses critical cancellation safety issues while maintaining full compatibility with the kdb+ IPC protocol.

Why This Library?

The original kdbplus crate had a fundamental cancellation safety issue in its receive_message() implementation. When used with tokio::select! or other cancellation-aware patterns, partial reads could cause message corruption:

// ⚠️ UNSAFE - could lose data on cancellation in original kdbplus
select! {
    msg = socket.receive_message() => { /* ... */ }
    _ = timeout => { /* partial read gets lost */ }
}

Our Solution: This library uses tokio-util::codec::Framed with a custom KdbCodec, ensuring true cancellation safety:

// ✅ SAFE - Framed maintains buffer state across cancellations
let mut framed = Framed::new(stream, KdbCodec::new(true));
select! {
    msg = framed.next() => { /* buffer state preserved */ }
    _ = timeout => { /* can safely retry */ }
}

The Framed pattern maintains internal buffer state, so cancelled reads never lose data. All partial reads are preserved in the codec's buffer and properly reassembled on the next attempt.

Features

  • Cancellation Safe: Built on tokio-util::codec::Framed for true cancellation safety
  • Tokio Codec Pattern: Modern async/await interface with proper buffer management
  • QStream Client: High-level async client for q/kdb+ communication
  • Full Compression Support: Compatible with kdb+ -18! (compress) and -19! (decompress)
  • Multiple Connection Methods: TCP, TLS, and Unix Domain Socket support
  • Type-Safe: Strong typing for all kdb+ data types
  • Minimal Dependencies: No async-recursion or unnecessary proc-macros
  • Zero-Copy Operations: Efficient message handling with minimal allocations

Rust IPC Interface for q/kdb+

This library provides a Rust client for communicating with q/kdb+ processes. Queries to kdb+ are supported in two ways:

  • Text queries: Send q code as strings
  • Functional queries: Represented as compound lists (IPC details)

Compression/decompression of messages is fully implemented following the kdb+ specification.

Codec Pattern

The library provides a tokio codec implementation for kdb+ IPC communication, offering a cleaner and more idiomatic Rust interface. The codec pattern leverages tokio-util::codec traits for efficient message framing and streaming with guaranteed cancellation safety.

Key Features:

  • Cancellation safe - buffer state preserved across cancellations
  • ✅ Full compression/decompression support compatible with kdb+ (-18!/-19!)
  • ✅ Automatic message framing and buffering
  • ✅ Zero-copy operations where possible
  • ✅ Type-safe encoder/decoder traits
  • ✅ No async-recursion dependency (uses synchronous deserialization)

See CODEC_PATTERN.md for detailed documentation.

Quick Example:

use kdb_codec::*;
use tokio::net::TcpStream;
use tokio_util::codec::Framed;
use futures::{SinkExt, StreamExt};

#[tokio::main]
async fn main() -> Result<()> {
    let stream = TcpStream::connect("127.0.0.1:5000").await?;
    let mut framed = Framed::new(stream, KdbCodec::new(true));
    
    // Send query - cancellation safe!
    let query = K::new_string("1+1".to_string(), 0);
    let msg = KdbMessage::new(qmsg_type::synchronous, query);
    framed.send(msg).await?;
    
    // Receive response - even if cancelled, buffer state is preserved
    if let Some(Ok(response)) = framed.next().await {
        println!("Result: {}", response.payload);
    }
    Ok(())
}

QStream - High-Level Client

For a more convenient API, use QStream which wraps the codec:

use kdb_codec::*;

#[tokio::main]
async fn main() -> Result<()> {
    let mut stream = QStream::connect(
        ConnectionMethod::TCP, 
        "localhost", 
        5000, 
        "user:pass"
    ).await?;
    
    // All operations are cancellation safe
    let result = stream.send_sync_message(&"2+2").await?;
    println!("Result: {}", result.get_int()?);
    
    Ok(())
}

Tip: For advanced use cases requiring separate send/receive channels, you can split the underlying Framed stream:

let stream = TcpStream::connect("127.0.0.1:5000").await?;
let framed = Framed::new(stream, KdbCodec::new(true));
let (mut writer, mut reader) = framed.split();

// Use writer and reader independently
tokio::spawn(async move {
    while let Some(Ok(msg)) = reader.next().await {
        println!("Received: {:?}", msg);
    }
});

writer.send(msg).await?;

Connection Methods

As for connect method, usually client interfaces of q/kdb+ do not provide a listener due to its protocol. However, sometimes Rust process is connecting to an upstream and q/kdb+ starts afterward or is restarted more frequently. Then providing a listener method is a natural direction and it was achieved here. Following ways are supported to connect to kdb+:

  • TCP
  • TLS
  • Unix domain socket

Furthermore, in order to improve inter-operatability some casting, getter and setter methods are provided.

Environmental Variables

This crate uses q-native or crate-specific environmental variables.

  • KDBPLUS_ACCOUNT_FILE: A file path to a credential file which an acceptor loads in order to manage access from a q client. This file contains a user name and SHA-1 hashed password in each line which are delimited by ':' without any space. For example, a file containing two credentials "mattew:oracle" and "reluctant:slowday" looks like this:

    mattew:431364b6450fc47ccdbf6a2205dfdb1baeb79412
    reluctant:d03f5cc1cdb11a77410ee34e26ca1102e67a893c
    

    The hashed password can be generated with q using a function .Q.sha1:

    q).Q.sha1 "slowday"
    0xd03f5cc1cdb11a77410ee34e26ca1102e67a893c
    
  • KDBPLUS_TLS_KEY_FILE and KDBPLUS_TLS_KEY_FILE_SECRET: The pkcs12 file and its password which TLS acceptor uses.

  • QUDSPATH (optional): q-native environmental variable to define an astract namespace. This environmental variable is used by UDS acceptor too. The abstract nameapace will be @${QUDSPATH}/kx.[server process port] if this environmental variable is defined; otherwise it will be @/tmp/kx.[server process port].

Notes:

  • Messages will be sent with OS native endian.
  • When using this crate for a TLS client you need to set two environmental variables KX_SSL_CERT_FILE and KX_SSL_KEY_FILE on q side to make q/kdb+ to work as a TLS server. For details, see the KX website.

Type Mapping

All types are expressed as K struct which is quite similar to the K struct of api module but its structure is optimized for IPC usage and for the convenience to interact with. The table below shows the input types of each q type which is used to construct K object. Note that the input type can be different from the inner type. For example, timestamp has an input type of chrono::DateTime<Utc> but the inner type is i64 denoting an elapsed time in nanoseconds since 2000.01.01D00:00:00.

q Rust
bool bool
GUID [u8; 16]
byte u8
short i16
int i32
long i64
real f32
float f64
char char
symbol String
timestamp chrono::DateTime<Utc>
month chrono::NaiveDate
date chrono::NaiveDate
datetime chrono::DateTime<Utc>
timespan chrono::Duration
minute chrono::Duration
second chrono::Duration
time chrono::Duration
list Vec<Item> (Item is a corrsponding type above)
compound list Vec<K>
table Vec<K>
dictionary Vec<K>
null ()

Examples

Client

use kdb_codec::*;

#[tokio::main(flavor = "multi_thread", worker_threads = 2)]
async fn main() -> Result<()> {

    // Connect to qprocess running on localhost:5000 via UDS
    let mut socket = QStream::connect(ConnectionMethod::UDS, "", 5000_u16, "ideal:person").await?;
    println!("Connection type: {}", socket.get_connection_type());

    // Set remote function with asynchronous message
    socket.send_async_message(&"collatz:{[n] seq:enlist n; while[not n = 1; seq,: n:$[n mod 2; 1 + 3 * n; `long$n % 2]]; seq}").await?;

    // Send a query synchronously
    let mut result = socket.send_sync_message(&"collatz[12]").await?;
    println!("collatz[12]: {}", result);

    result = socket.send_sync_message(&"collatz[`a]").await?;
    println!("collatz[`a]: {}", result);

    // Send a functional query.
    let mut message = K::new_compound_list(vec![K::new_symbol(String::from("collatz")), K::new_long(100)]);
    result = socket.send_sync_message(&message).await?;
    println!("collatz[100]: {}", result);

    // Modify query to (`collatz; 20)
    message.pop().unwrap();
    message.push(&K::new_long(20)).unwrap();
    result=socket.send_sync_message(&message).await?;
    println!("collatz[20]: {}", result);

    // Send a functional asynchronous query.
    message = K::new_compound_list(vec![K::new_string(String::from("show"), qattribute::NONE), K::new_symbol(String::from("goodbye"))]);
    socket.send_async_message(&message).await?;

    socket.shutdown().await?;

    Ok(())
}

Listener

use kdb_codec::*;

#[tokio::main]
async fn main() -> Result<()> {

  // Start listenening over TCP at the port 7000 with authentication enabled.
  let mut socket_tcp = QStream::accept(ConnectionMethod::TCP, "127.0.0.1", 7000).await?;

  // Send a query with the socket.
  let greeting = socket_tcp.send_sync_message(&"string `Hello").await?;
  println!("Greeting: {}", greeting);

  socket_tcp.shutdown().await?;

  Ok(())
}

Then q client can connect to this acceptor with the acceptor's host, port and the credential configured in KDBPLUS_ACCOUNT_FILE:

q)h:hopen `::7000:reluctant:slowday

Architecture & Design

Cancellation Safety

The core innovation of this library is its use of tokio-util::codec::Framed which provides automatic buffer management:

  • Buffer Preservation: Partial reads are stored in the codec's internal buffer
  • Resumable Operations: Cancelled reads can be safely retried without data loss
  • No Manual State Management: The Framed wrapper handles all buffer lifecycle

This is critical for production systems using patterns like:

  • tokio::select! for timeouts or concurrent operations
  • Graceful shutdown with cancellation
  • Request racing or fallback logic

Synchronous Deserialization

Unlike the original kdbplus crate, we use synchronous deserialization without async-recursion:

  • Simpler: No async recursion complexity
  • Faster: Eliminates async overhead for CPU-bound deserialization
  • Smaller: No async-recursion proc-macro dependency
  • Safer: Avoids potential stack overflow from deep async recursion

The deserialization happens in deserialize_sync.rs and is called from the codec's decode() method after the complete message is buffered.

Why Not Add split() to QStream?

While we show how to split the underlying Framed stream in examples, we don't recommend adding a split() method directly to QStream because:

  1. Protocol Semantics: KDB+ IPC is request-response oriented. Splitting would allow sending multiple requests before receiving responses, which can confuse message correlation.

  2. Complexity: Users would need to manually track which response corresponds to which request.

  3. Better Alternatives: For concurrent operations, use multiple QStream instances or the lower-level Framed API directly when you need full control.

If you need independent send/receive channels, access the underlying stream:

let stream = TcpStream::connect("127.0.0.1:5000").await?;
let framed = Framed::new(stream, KdbCodec::new(true));
let (writer, reader) = framed.split();
// Now you have full control

Installation

Add kdb_codec to your Cargo.toml:

[dependencies]
kdb_codec = "0.4"

The IPC feature is enabled by default.

Testing

Unit Tests

Run the standard unit tests (no kdb+ server required):

cargo test --package kdb_codec --lib --tests

Integration Tests

Some tests require a running kdb+ server and are marked as #[ignore] by default. To run these tests:

  1. Start a kdb+ server on localhost:5001 with credentials kdbuser:pass:

    q -p 5001 -u path/to/passwd/file
    
  2. Run the ignored tests:

    cargo test --package kdb_codec --tests -- --ignored
    

The integration tests include:

  • functional_message_test: Tests various message types and operations
  • compression_test: Tests compression functionality with large data

Note: These tests are automatically skipped in CI/CD unless a kdb+ server is explicitly configured.

Documentation

The full API documentation is available on docs.rs/kdb_codec.

For details of the kdb+ IPC protocol, see:

License

This library is licensed under Apache-2.0.

See LICENSE.