# kdb_codec - Kdb+ IPC Codec Library
[](https://github.com/yshing/kdb_codec/actions/workflows/test.yml)
Cancellation-safe Rust codec + client for the kdb+ IPC wire protocol (q/kdb+).
Docs: https://yshing.github.io/kdb_codec/
## Key features
- Cancellation-safe message framing via `tokio-util::codec::Framed`
- IPC encode/decode, including kdb+ compression (`-18!`/`-19!`)
- Async client API (`QStream`) and lower-level codec API (`KdbCodec`)
- Multiple connection methods: TCP / TLS / Unix Domain Socket
- Ergonomic `K` value type for building/inspecting q objects
## Quick start
```rust
use futures::{SinkExt, StreamExt};
use kdb_codec::*;
use tokio::net::TcpStream;
use tokio_util::codec::Framed;
#[tokio::main]
async fn main() -> Result<()> {
let stream = TcpStream::connect("127.0.0.1:5000").await?;
let mut framed = Framed::new(stream, KdbCodec::new(true));
let query = K::new_string("1+1".to_string(), 0);
framed
.send(KdbMessage::new(qmsg_type::synchronous, query))
.await?;
if let Some(Ok(resp)) = framed.next().await {
println!("{}", resp.payload);
}
Ok(())
}
```
## Datatype coverage (IPC)
This project prioritizes safety when decoding untrusted IPC bytes (no panics, no OOM).
- Supported: basic atoms/lists (0–19), mixed lists, table (98), dictionary (99/127), null (101), error (-128)
- Not supported: enums (20–76), nested/other types (77+), function/derived types (100–112), foreign (112)
let mut dict = k!(dict: k!(sym: vec!["x"]) => k!(long: vec![42]));
dict[1] = k!(long: vec![100]); // Replace values
```
### Table Column Access
Access table columns by name using string indices:
```rust
use kdb_codec::*;
// Create a table
let table = k!(table: {
"fruit" => k!(sym: vec!["apple", "banana", "cherry"]),
"price" => k!(float: vec![1.5, 2.3, 3.8]),
"quantity" => k!(long: vec![100, 150, 75])
});
// Access columns by name
let fruits = &table["fruit"];
let prices = &table["price"];
let quantities = &table["quantity"];
println!("Fruits: {}", fruits); // `apple`banana`cherry
println!("Prices: {}", prices); // 1.5 2.3 3.8
println!("Quantities: {}", quantities); // 100 150 75
// Mutable access
let mut table = k!(table: {
"price" => k!(float: vec![1.5, 2.3])
});
table["price"] = k!(float: vec![2.0, 2.5]); // Update prices
```
### Safe Access Methods
For production code, use the safe `try_*` methods that return `Result` instead of panicking:
```rust
use kdb_codec::*;
let dict = k!(dict: k!(sym: vec!["x", "y"]) => k!(long: vec![10, 20]));
// Safe dictionary access
match dict.try_index(0) {
Ok(keys) => println!("Keys: {}", keys),
Err(e) => eprintln!("Error: {:?}", e),
}
// Try accessing out of bounds - won't panic
if dict.try_index(2).is_err() {
println!("Index 2 is out of bounds");
}
// Safe table column access
let table = k!(table: {
"name" => k!(sym: vec!["Alice", "Bob"])
});
match table.try_column("name") {
Ok(col) => println!("Names: {}", col),
Err(_) => println!("Column not found"),
}
// Check if column exists before accessing
if table.try_column("nonexistent").is_err() {
println!("Column 'nonexistent' not found");
}
```
### Compound List Access
Access elements in compound (heterogeneous) lists:
```rust
use kdb_codec::*;
let list = k!([
k!(long: 100),
k!(float: 3.14),
k!(sym: "hello"),
k!(bool: vec![true, false, true])
]);
// Safe access to list elements
if let Ok(first) = list.try_index(0) {
println!("First element: {}", first); // 100
}
if let Ok(second) = list.try_index(1) {
println!("Second element: {}", second); // 3.14
}
```
**Benefits:**
- ✅ Ergonomic `[]` syntax familiar to Rust developers
- ✅ Type-safe with compile-time borrow checking
- ✅ Both panicking (`[]`) and safe (`try_*`) variants available
- ✅ Works seamlessly with mutable access
- ✅ Supports dictionaries, tables, and compound lists
See `examples/index_trait_demo.rs` for more examples.
### Connection Methods
As for connect method, usually client interfaces of q/kdb+ do not provide a listener due to its protocol. However, sometimes Rust process is connecting to an upstream and q/kdb+ starts afterward or is restarted more frequently. Then providing a listener method is a natural direction and it was achieved here. Following ways are supported to connect to kdb+:
- TCP
- TLS
- Unix domain socket
Furthermore, in order to improve inter-operatability some casting, getter and setter methods are provided.
### Environmental Variables
This crate uses q-native or crate-specific environmental variables.
- `KDBPLUS_ACCOUNT_FILE`: A file path to a credential file which an acceptor loads in order to manage access from a q client. This file contains a user name and SHA-1 hashed password in each line which are delimited by `':'` without any space. For example, a file containing two credentials `"mattew:oracle"` and `"reluctant:slowday"` looks like this:
mattew:431364b6450fc47ccdbf6a2205dfdb1baeb79412
reluctant:d03f5cc1cdb11a77410ee34e26ca1102e67a893c
The hashed password can be generated with q using a function `.Q.sha1`:
q).Q.sha1 "slowday"
0xd03f5cc1cdb11a77410ee34e26ca1102e67a893c
- `KDBPLUS_TLS_KEY_FILE` and `KDBPLUS_TLS_KEY_FILE_SECRET`: The pkcs12 file and its password which TLS acceptor uses.
- `QUDSPATH` (optional): q-native environmental variable to define an astract namespace. This environmental variable is used by UDS acceptor too. The abstract nameapace will be `@${QUDSPATH}/kx.[server process port]` if this environmental variable is defined; otherwise it will be `@/tmp/kx.[server process port]`.
*Notes:*
- Messages will be sent with OS native endian.
- When using this crate for a TLS client you need to set two environmental variables `KX_SSL_CERT_FILE` and `KX_SSL_KEY_FILE` on q side to make q/kdb+ to work as a TLS server. For details, see [the KX website](https://code.kx.com/q/kb/ssl/).
### Type Mapping
All types are expressed as `K` struct which is quite similar to the `K` struct of `api` module but its structure is optimized for IPC
usage and for the convenience to interact with. The table below shows the input types of each q type which is used to construct `K` object.
Note that the input type can be different from the inner type. For example, timestamp has an input type of `chrono::DateTime<Utc>` but
the inner type is `i64` denoting an elapsed time in nanoseconds since `2000.01.01D00:00:00`.
| `bool` | `bool` |
| `GUID` | `[u8; 16]` |
| `byte` | `u8` |
| `short` | `i16` |
| `int` | `i32` |
| `long` | `i64` |
| `real` | `f32` |
| `float` | `f64` |
| `char` | `char` |
| `symbol` | `String` |
| `timestamp` | `chrono::DateTime<Utc>` |
| `month` | `chrono::NaiveDate` |
| `date` | `chrono::NaiveDate` |
| `datetime` | `chrono::DateTime<Utc>` |
| `timespan` | `chrono::Duration` |
| `minute` | `chrono::Duration` |
| `second` | `chrono::Duration` |
| `time` | `chrono::Duration` |
| `list` | `Vec<Item>` (`Item` is a corrsponding type above) |
| `compound list` | `Vec<K>` |
| `table` | `Vec<K>` |
| `dictionary` | `Vec<K>` |
| `null` | `()` |
### Examples
#### Client
```rust
use kdb_codec::*;
#[tokio::main(flavor = "multi_thread", worker_threads = 2)]
async fn main() -> Result<()> {
// Connect to qprocess running on localhost:5000 via UDS
let mut socket = QStream::connect(ConnectionMethod::UDS, "", 5000_u16, "ideal:person").await?;
println!("Connection type: {}", socket.get_connection_type());
// Set remote function with asynchronous message
socket.send_async_message(&"collatz:{[n] seq:enlist n; while[not n = 1; seq,: n:$[n mod 2; 1 + 3 * n; `long$n % 2]]; seq}").await?;
// Send a query synchronously
let mut result = socket.send_sync_message(&"collatz[12]").await?;
println!("collatz[12]: {}", result);
result = socket.send_sync_message(&"collatz[`a]").await?;
println!("collatz[`a]: {}", result);
// Send a functional query.
let mut message = K::new_compound_list(vec![K::new_symbol(String::from("collatz")), K::new_long(100)]);
result = socket.send_sync_message(&message).await?;
println!("collatz[100]: {}", result);
// Modify query to (`collatz; 20)
message.pop().unwrap();
message.push(&K::new_long(20)).unwrap();
result=socket.send_sync_message(&message).await?;
println!("collatz[20]: {}", result);
// Send a functional asynchronous query.
message = K::new_compound_list(vec![K::new_string(String::from("show"), qattribute::NONE), K::new_symbol(String::from("goodbye"))]);
socket.send_async_message(&message).await?;
socket.shutdown().await?;
Ok(())
}
```
#### Listener
```rust
use kdb_codec::*;
#[tokio::main]
async fn main() -> Result<()> {
// Start listenening over TCP at the port 7000 with authentication enabled.
let mut socket_tcp = QStream::accept(ConnectionMethod::TCP, "127.0.0.1", 7000).await?;
// Send a query with the socket.
let greeting = socket_tcp.send_sync_message(&"string `Hello").await?;
println!("Greeting: {}", greeting);
socket_tcp.shutdown().await?;
Ok(())
}
```
Then q client can connect to this acceptor with the acceptor's host, port and the credential configured in `KDBPLUS_ACCOUNT_FILE`:
```q
q)h:hopen `::7000:reluctant:slowday
```
## Architecture & Design
### Cancellation Safety
The core innovation of this library is its use of `tokio-util::codec::Framed` which provides automatic buffer management:
- **Buffer Preservation**: Partial reads are stored in the codec's internal buffer
- **Resumable Operations**: Cancelled reads can be safely retried without data loss
- **No Manual State Management**: The Framed wrapper handles all buffer lifecycle
This is critical for production systems using patterns like:
- `tokio::select!` for timeouts or concurrent operations
- Graceful shutdown with cancellation
- Request racing or fallback logic
### Synchronous Deserialization
Unlike the original kdbplus crate, we use **synchronous deserialization** without `async-recursion`:
- **Simpler**: No async recursion complexity
- **Faster**: Eliminates async overhead for CPU-bound deserialization
- **Smaller**: No `async-recursion` proc-macro dependency
- **Safer**: Avoids potential stack overflow from deep async recursion
The deserialization happens in `deserialize_sync.rs` and is called from the codec's `decode()` method after the complete message is buffered.
### Why Not Add `split()` to QStream?
While we show how to split the underlying Framed stream in examples, we **don't recommend** adding a `split()` method directly to `QStream` because:
1. **Protocol Semantics**: KDB+ IPC is request-response oriented. Splitting would allow sending multiple requests before receiving responses, which can confuse message correlation.
2. **Complexity**: Users would need to manually track which response corresponds to which request.
3. **Better Alternatives**: For concurrent operations, use multiple `QStream` instances or the lower-level `Framed` API directly when you need full control.
If you need independent send/receive channels, access the underlying stream:
```rust
let stream = TcpStream::connect("127.0.0.1:5000").await?;
let framed = Framed::new(stream, KdbCodec::new(true));
let (writer, reader) = framed.split();
// Now you have full control
```
### Installation
Add `kdb_codec` to your `Cargo.toml`:
```toml
[dependencies]
kdb_codec = "0.4"
```
The IPC feature is enabled by default.
## Testing
### Unit Tests
Run the standard unit tests (no kdb+ server required):
```bash
cargo test --package kdb_codec --lib --tests
```
### Integration Tests
Some tests require a running kdb+ server and are marked as `#[ignore]` by default. To run these tests:
1. Start a kdb+ server on `localhost:5001` with credentials `kdbuser:pass`:
```bash
q -p 5001 -u ./tests/test_kdb_passwd
```
2. Run the ignored tests:
```bash
cargo test --package kdb_codec --tests -- --ignored
cargo test --package kdb_codec --test e2e_acceptor -- e2e_q_script_to_rust_acceptor_echo_roundtrip --exact --nocapture --ignored
```
The integration tests include:
- `functional_message_test`: Tests various message types and operations
- `compression_test`: Tests compression functionality with large data
**Note**: These tests are automatically skipped in CI/CD unless a kdb+ server is explicitly configured.
## Documentation
The full API documentation is available on [docs.rs/kdb_codec](https://docs.rs/kdb_codec/).
For details of the kdb+ IPC protocol, see:
- [kdb+ IPC Reference](https://code.kx.com/q/basics/ipc/)
- [Serialization](https://code.kx.com/q/basics/serialization/)
## License
This library is licensed under Apache-2.0.
See [LICENSE](kdb_codec/LICENSE).