kdb_codec - Kdb+ IPC Codec Library
A Rust library focused on handling the kdb+ IPC (Inter-Process Communication) wire protocol. This library provides efficient encoding, decoding, and communication with q/kdb+ processes using idiomatic Rust patterns.
Inspired by the original kdbplus crate, this library addresses critical cancellation safety issues while maintaining full compatibility with the kdb+ IPC protocol.
Why This Library?
The original kdbplus crate had a fundamental cancellation safety issue in its receive_message() implementation. When used with tokio::select! or other cancellation-aware patterns, partial reads could cause message corruption:
// ⚠️ UNSAFE - could lose data on cancellation in original kdbplus
select!
Our Solution: This library uses tokio-util::codec::Framed with a custom KdbCodec, ensuring true cancellation safety:
// ✅ SAFE - Framed maintains buffer state across cancellations
let mut framed = new;
select!
The Framed pattern maintains internal buffer state, so cancelled reads never lose data. All partial reads are preserved in the codec's buffer and properly reassembled on the next attempt.
Features
- Cancellation Safe: Built on
tokio-util::codec::Framedfor true cancellation safety - Tokio Codec Pattern: Modern async/await interface with proper buffer management
- QStream Client: High-level async client for q/kdb+ communication
- Full Compression Support: Compatible with kdb+
-18!(compress) and-19!(decompress) - Multiple Connection Methods: TCP, TLS, and Unix Domain Socket support
- Type-Safe: Strong typing for all kdb+ data types
- Minimal Dependencies: No
async-recursionor unnecessary proc-macros - Zero-Copy Operations: Efficient message handling with minimal allocations
Rust IPC Interface for q/kdb+
This library provides a Rust client for communicating with q/kdb+ processes. Queries to kdb+ are supported in two ways:
- Text queries: Send q code as strings
- Functional queries: Represented as compound lists (IPC details)
Compression/decompression of messages is fully implemented following the kdb+ specification.
Codec Pattern
The library provides a tokio codec implementation for kdb+ IPC communication, offering a cleaner and more idiomatic Rust interface. The codec pattern leverages tokio-util::codec traits for efficient message framing and streaming with guaranteed cancellation safety.
Key Features:
- ✅ Cancellation safe - buffer state preserved across cancellations
- ✅ Full compression/decompression support compatible with kdb+ (-18!/-19!)
- ✅ Automatic message framing and buffering
- ✅ Zero-copy operations where possible
- ✅ Type-safe encoder/decoder traits
- ✅ No
async-recursiondependency (uses synchronous deserialization)
See CODEC_PATTERN.md for detailed documentation.
Quick Example:
use *;
use TcpStream;
use Framed;
use ;
async
Compression Control
The codec provides explicit control over compression behavior:
use *;
// Auto mode (default): compress large messages on remote connections only
let codec = new;
// Using with_options method
let codec = with_options;
// Using builder pattern (recommended)
let codec = builder
.is_local
.compression_mode
.validation_mode
.build;
Compression Modes:
Auto(default): Compress large messages (>2000 bytes) only on remote connectionsAlways: Attempt to compress messages larger than 2000 bytes even on local connectionsNever: Disable compression entirely
Header Validation
The codec validates incoming message headers to detect protocol violations:
use *;
// Strict mode (default): reject invalid headers
let codec = with_options;
// Using builder pattern
let codec = builder
.validation_mode
.build;
Validation Modes:
Strict(default): Validates that compressed flag is 0 or 1, and message type is 0, 1, or 2Lenient: Accepts any header values (useful for debugging or handling non-standard implementations)
QStream - High-Level Client
For a more convenient API, use QStream which wraps the codec:
use *;
async
With Explicit Options:
use *;
async
Using Builder Pattern (recommended):
use *;
async
Tip: For advanced use cases requiring separate send/receive channels, you can split the underlying Framed stream:
let stream = connect.await?;
let framed = new;
let = framed.split;
// Use writer and reader independently
spawn;
writer.send.await?;
Connection Methods
As for connect method, usually client interfaces of q/kdb+ do not provide a listener due to its protocol. However, sometimes Rust process is connecting to an upstream and q/kdb+ starts afterward or is restarted more frequently. Then providing a listener method is a natural direction and it was achieved here. Following ways are supported to connect to kdb+:
- TCP
- TLS
- Unix domain socket
Furthermore, in order to improve inter-operatability some casting, getter and setter methods are provided.
Environmental Variables
This crate uses q-native or crate-specific environmental variables.
-
KDBPLUS_ACCOUNT_FILE: A file path to a credential file which an acceptor loads in order to manage access from a q client. This file contains a user name and SHA-1 hashed password in each line which are delimited by':'without any space. For example, a file containing two credentials"mattew:oracle"and"reluctant:slowday"looks like this:mattew:431364b6450fc47ccdbf6a2205dfdb1baeb79412 reluctant:d03f5cc1cdb11a77410ee34e26ca1102e67a893cThe hashed password can be generated with q using a function
.Q.sha1:q).Q.sha1 "slowday" 0xd03f5cc1cdb11a77410ee34e26ca1102e67a893c -
KDBPLUS_TLS_KEY_FILEandKDBPLUS_TLS_KEY_FILE_SECRET: The pkcs12 file and its password which TLS acceptor uses. -
QUDSPATH(optional): q-native environmental variable to define an astract namespace. This environmental variable is used by UDS acceptor too. The abstract nameapace will be@${QUDSPATH}/kx.[server process port]if this environmental variable is defined; otherwise it will be@/tmp/kx.[server process port].
Notes:
- Messages will be sent with OS native endian.
- When using this crate for a TLS client you need to set two environmental variables
KX_SSL_CERT_FILEandKX_SSL_KEY_FILEon q side to make q/kdb+ to work as a TLS server. For details, see the KX website.
Type Mapping
All types are expressed as K struct which is quite similar to the K struct of api module but its structure is optimized for IPC
usage and for the convenience to interact with. The table below shows the input types of each q type which is used to construct K object.
Note that the input type can be different from the inner type. For example, timestamp has an input type of chrono::DateTime<Utc> but
the inner type is i64 denoting an elapsed time in nanoseconds since 2000.01.01D00:00:00.
| q | Rust |
|---|---|
bool |
bool |
GUID |
[u8; 16] |
byte |
u8 |
short |
i16 |
int |
i32 |
long |
i64 |
real |
f32 |
float |
f64 |
char |
char |
symbol |
String |
timestamp |
chrono::DateTime<Utc> |
month |
chrono::NaiveDate |
date |
chrono::NaiveDate |
datetime |
chrono::DateTime<Utc> |
timespan |
chrono::Duration |
minute |
chrono::Duration |
second |
chrono::Duration |
time |
chrono::Duration |
list |
Vec<Item> (Item is a corrsponding type above) |
compound list |
Vec<K> |
table |
Vec<K> |
dictionary |
Vec<K> |
null |
() |
Examples
Client
use *;
async
Listener
use *;
async
Then q client can connect to this acceptor with the acceptor's host, port and the credential configured in KDBPLUS_ACCOUNT_FILE:
q)h:hopen `::7000:reluctant:slowday
Architecture & Design
Cancellation Safety
The core innovation of this library is its use of tokio-util::codec::Framed which provides automatic buffer management:
- Buffer Preservation: Partial reads are stored in the codec's internal buffer
- Resumable Operations: Cancelled reads can be safely retried without data loss
- No Manual State Management: The Framed wrapper handles all buffer lifecycle
This is critical for production systems using patterns like:
tokio::select!for timeouts or concurrent operations- Graceful shutdown with cancellation
- Request racing or fallback logic
Synchronous Deserialization
Unlike the original kdbplus crate, we use synchronous deserialization without async-recursion:
- Simpler: No async recursion complexity
- Faster: Eliminates async overhead for CPU-bound deserialization
- Smaller: No
async-recursionproc-macro dependency - Safer: Avoids potential stack overflow from deep async recursion
The deserialization happens in deserialize_sync.rs and is called from the codec's decode() method after the complete message is buffered.
Why Not Add split() to QStream?
While we show how to split the underlying Framed stream in examples, we don't recommend adding a split() method directly to QStream because:
-
Protocol Semantics: KDB+ IPC is request-response oriented. Splitting would allow sending multiple requests before receiving responses, which can confuse message correlation.
-
Complexity: Users would need to manually track which response corresponds to which request.
-
Better Alternatives: For concurrent operations, use multiple
QStreaminstances or the lower-levelFramedAPI directly when you need full control.
If you need independent send/receive channels, access the underlying stream:
let stream = connect.await?;
let framed = new;
let = framed.split;
// Now you have full control
Installation
Add kdb_codec to your Cargo.toml:
[]
= "0.4"
The IPC feature is enabled by default.
Testing
Unit Tests
Run the standard unit tests (no kdb+ server required):
Integration Tests
Some tests require a running kdb+ server and are marked as #[ignore] by default. To run these tests:
-
Start a kdb+ server on
localhost:5001with credentialskdbuser:pass: -
Run the ignored tests:
The integration tests include:
functional_message_test: Tests various message types and operationscompression_test: Tests compression functionality with large data
Note: These tests are automatically skipped in CI/CD unless a kdb+ server is explicitly configured.
Documentation
The full API documentation is available on docs.rs/kdb_codec.
For details of the kdb+ IPC protocol, see:
License
This library is licensed under Apache-2.0.
See LICENSE.