kdb_codec 0.3.0

Kdb+ IPC codec library for handling kdb+ wire protocol data with Rust.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
# kdb_codec - Kdb+ IPC Codec Library

[![Tests](https://github.com/yshing/kdb_codec/actions/workflows/test.yml/badge.svg)](https://github.com/yshing/kdb_codec/actions/workflows/test.yml)

A Rust library focused on handling the kdb+ IPC (Inter-Process Communication) wire protocol. This library provides efficient encoding, decoding, and communication with q/kdb+ processes using idiomatic Rust patterns.

**Inspired by the original [kdbplus](https://crates.io/crates/kdbplus) crate**, this library addresses critical **cancellation safety** issues while maintaining full compatibility with the kdb+ IPC protocol.

## Why This Library?

The original kdbplus crate had a fundamental cancellation safety issue in its `receive_message()` implementation. When used with `tokio::select!` or other cancellation-aware patterns, partial reads could cause message corruption:

```rust
// ⚠️ UNSAFE - could lose data on cancellation in original kdbplus
select! {
    msg = socket.receive_message() => { /* ... */ }
    _ = timeout => { /* partial read gets lost */ }
}
```

**Our Solution:** This library uses `tokio-util::codec::Framed` with a custom `KdbCodec`, ensuring true cancellation safety:

```rust
// ✅ SAFE - Framed maintains buffer state across cancellations
let mut framed = Framed::new(stream, KdbCodec::new(true));
select! {
    msg = framed.next() => { /* buffer state preserved */ }
    _ = timeout => { /* can safely retry */ }
}
```

The Framed pattern maintains internal buffer state, so cancelled reads never lose data. All partial reads are preserved in the codec's buffer and properly reassembled on the next attempt.

## Features

- **Cancellation Safe**: Built on `tokio-util::codec::Framed` for true cancellation safety
- **Tokio Codec Pattern**: Modern async/await interface with proper buffer management
- **QStream Client**: High-level async client for q/kdb+ communication
- **Intuitive Data Access**: Index trait for ergonomic K object access with `[]` syntax
- **Full Compression Support**: Compatible with kdb+ `-18!` (compress) and `-19!` (decompress)
- **Multiple Connection Methods**: TCP, TLS, and Unix Domain Socket support
- **Type-Safe**: Strong typing for all kdb+ data types
- **Minimal Dependencies**: No `async-recursion` or unnecessary proc-macros
- **Zero-Copy Operations**: Efficient message handling with minimal allocations

## Rust IPC Interface for q/kdb+

This library provides a Rust client for communicating with q/kdb+ processes. Queries to kdb+ are supported in two ways:

- **Text queries**: Send q code as strings
- **Functional queries**: Represented as compound lists ([IPC details]https://code.kx.com/q4m3/11_IO/#116-interprocess-communication)

Compression/decompression of messages is fully implemented following the [kdb+ specification](https://code.kx.com/q/basics/ipc/#compression).

## Codec Pattern

The library provides a tokio codec implementation for kdb+ IPC communication, offering a cleaner and more idiomatic Rust interface. The codec pattern leverages `tokio-util::codec` traits for efficient message framing and streaming with **guaranteed cancellation safety**.

**Key Features:**
- **Cancellation safe** - buffer state preserved across cancellations
- ✅ Full compression/decompression support compatible with kdb+ (-18!/-19!)
- ✅ Automatic message framing and buffering
- ✅ Zero-copy operations where possible
- ✅ Type-safe encoder/decoder traits
- ✅ No `async-recursion` dependency (uses synchronous deserialization)

See [CODEC_PATTERN.md](CODEC_PATTERN.md) for detailed documentation.

**Quick Example:**
```rust
use kdb_codec::*;
use tokio::net::TcpStream;
use tokio_util::codec::Framed;
use futures::{SinkExt, StreamExt};

#[tokio::main]
async fn main() -> Result<()> {
    let stream = TcpStream::connect("127.0.0.1:5000").await?;
    let mut framed = Framed::new(stream, KdbCodec::new(true));
    
    // Send query - cancellation safe!
    let query = K::new_string("1+1".to_string(), 0);
    let msg = KdbMessage::new(qmsg_type::synchronous, query);
    framed.send(msg).await?;
    
    // Receive response - even if cancelled, buffer state is preserved
    if let Some(Ok(response)) = framed.next().await {
        println!("Result: {}", response.payload);
    }
    Ok(())
}
```

### Compression Control

The codec provides explicit control over compression behavior:

```rust
use kdb_codec::*;

// Auto mode (default): compress large messages on remote connections only
let codec = KdbCodec::new(false);

// Using with_options method
let codec = KdbCodec::with_options(true, CompressionMode::Always, ValidationMode::Strict);

// Using builder pattern (recommended)
let codec = KdbCodec::builder()
    .is_local(false)
    .compression_mode(CompressionMode::Never)
    .validation_mode(ValidationMode::Strict)
    .build();
```

**Compression Modes:**
- `Auto` (default): Compress large messages (>2000 bytes) only on remote connections
- `Always`: Attempt to compress messages larger than 2000 bytes even on local connections
- `Never`: Disable compression entirely

### Header Validation

The codec validates incoming message headers to detect protocol violations:

```rust
use kdb_codec::*;

// Strict mode (default): reject invalid headers
let codec = KdbCodec::with_options(false, CompressionMode::Auto, ValidationMode::Strict);

// Using builder pattern
let codec = KdbCodec::builder()
    .validation_mode(ValidationMode::Lenient)
    .build();
```

**Validation Modes:**
- `Strict` (default): Validates that compressed flag is 0 or 1, and message type is 0, 1, or 2
- `Lenient`: Accepts any header values (useful for debugging or handling non-standard implementations)

### QStream - High-Level Client

For a more convenient API, use `QStream` which wraps the codec:

```rust
use kdb_codec::*;

#[tokio::main]
async fn main() -> Result<()> {
    let mut stream = QStream::connect(
        ConnectionMethod::TCP, 
        "localhost", 
        5000, 
        "user:pass"
    ).await?;
    
    // All operations are cancellation safe
    let result = stream.send_sync_message(&"2+2").await?;
    println!("Result: {}", result.get_int()?);
    
    Ok(())
}
```

**With Explicit Options:**

```rust
use kdb_codec::*;

#[tokio::main]
async fn main() -> Result<()> {
    // Using connect_with_options method
    let mut stream = QStream::connect_with_options(
        ConnectionMethod::TCP, 
        "localhost", 
        5000, 
        "user:pass",
        CompressionMode::Always,
        ValidationMode::Lenient
    ).await?;
    
    let result = stream.send_sync_message(&"2+2").await?;
    println!("Result: {}", result.get_int()?);
    
    Ok(())
}
```

**Using Builder Pattern (recommended):**

```rust
use kdb_codec::*;

#[tokio::main]
async fn main() -> Result<()> {
    // Using builder pattern for cleaner API
    let mut stream = QStream::builder()
        .method(ConnectionMethod::TCP)
        .host("localhost")
        .port(5000)
        .credential("user:pass")
        .compression_mode(CompressionMode::Always)
        .validation_mode(ValidationMode::Lenient)
        .connect()
        .await?;
    
    let result = stream.send_sync_message(&"2+2").await?;
    println!("Result: {}", result.get_int()?);
    
    Ok(())
}
```

**Tip:** For advanced use cases requiring separate send/receive channels, you can split the underlying Framed stream:

```rust
let stream = TcpStream::connect("127.0.0.1:5000").await?;
let framed = Framed::new(stream, KdbCodec::new(true));
let (mut writer, mut reader) = framed.split();

// Use writer and reader independently
tokio::spawn(async move {
    while let Some(Ok(msg)) = reader.next().await {
        println!("Received: {:?}", msg);
    }
});

writer.send(msg).await?;
```

## Intuitive Data Access with Index Trait

The library implements Rust's `Index` and `IndexMut` traits for ergonomic access to K object data using familiar `[]` syntax.

### Dictionary Access

Access dictionary keys and values directly using numeric indices:

```rust
use kdb_codec::*;

// Create a dictionary using k! macro
let dict = k!(dict: k!(sym: vec!["a", "b", "c"]) => k!(long: vec![10, 20, 30]));

// Access keys and values using [] syntax
let keys = &dict[0];    // Get dictionary keys
let values = &dict[1];  // Get dictionary values

println!("Keys: {}", keys);      // `a`b`c
println!("Values: {}", values);  // 10 20 30

// Mutable access
let mut dict = k!(dict: k!(sym: vec!["x"]) => k!(long: vec![42]));
dict[1] = k!(long: vec![100]);  // Replace values
```

### Table Column Access

Access table columns by name using string indices:

```rust
use kdb_codec::*;

// Create a table
let table = k!(table: {
    "fruit" => k!(sym: vec!["apple", "banana", "cherry"]),
    "price" => k!(float: vec![1.5, 2.3, 3.8]),
    "quantity" => k!(long: vec![100, 150, 75])
});

// Access columns by name
let fruits = &table["fruit"];
let prices = &table["price"];
let quantities = &table["quantity"];

println!("Fruits: {}", fruits);        // `apple`banana`cherry
println!("Prices: {}", prices);        // 1.5 2.3 3.8
println!("Quantities: {}", quantities); // 100 150 75

// Mutable access
let mut table = k!(table: {
    "price" => k!(float: vec![1.5, 2.3])
});
table["price"] = k!(float: vec![2.0, 2.5]);  // Update prices
```

### Safe Access Methods

For production code, use the safe `try_*` methods that return `Result` instead of panicking:

```rust
use kdb_codec::*;

let dict = k!(dict: k!(sym: vec!["x", "y"]) => k!(long: vec![10, 20]));

// Safe dictionary access
match dict.try_index(0) {
    Ok(keys) => println!("Keys: {}", keys),
    Err(e) => eprintln!("Error: {:?}", e),
}

// Try accessing out of bounds - won't panic
if dict.try_index(2).is_err() {
    println!("Index 2 is out of bounds");
}

// Safe table column access
let table = k!(table: {
    "name" => k!(sym: vec!["Alice", "Bob"])
});

match table.try_column("name") {
    Ok(col) => println!("Names: {}", col),
    Err(_) => println!("Column not found"),
}

// Check if column exists before accessing
if table.try_column("nonexistent").is_err() {
    println!("Column 'nonexistent' not found");
}
```

### Compound List Access

Access elements in compound (heterogeneous) lists:

```rust
use kdb_codec::*;

let list = k!([
    k!(long: 100),
    k!(float: 3.14),
    k!(sym: "hello"),
    k!(bool: vec![true, false, true])
]);

// Safe access to list elements
if let Ok(first) = list.try_index(0) {
    println!("First element: {}", first);  // 100
}

if let Ok(second) = list.try_index(1) {
    println!("Second element: {}", second); // 3.14
}
```

**Benefits:**
- ✅ Ergonomic `[]` syntax familiar to Rust developers
- ✅ Type-safe with compile-time borrow checking
- ✅ Both panicking (`[]`) and safe (`try_*`) variants available
- ✅ Works seamlessly with mutable access
- ✅ Supports dictionaries, tables, and compound lists

See `examples/index_trait_demo.rs` for more examples.

### Connection Methods

As for connect method, usually client interfaces of q/kdb+ do not provide a listener due to its protocol. However, sometimes Rust process is connecting to an upstream and q/kdb+ starts afterward or is restarted more frequently. Then providing a listener method is a natural direction and it was achieved here. Following ways are supported to connect to kdb+:

- TCP
- TLS
- Unix domain socket

Furthermore, in order to improve inter-operatability some casting, getter and setter methods are provided.

### Environmental Variables

This crate uses q-native or crate-specific environmental variables.

- `KDBPLUS_ACCOUNT_FILE`: A file path to a credential file which an acceptor loads in order to manage access from a q client. This file contains a user name and SHA-1 hashed password in each line which are delimited by `':'` without any space. For example, a file containing two credentials `"mattew:oracle"` and `"reluctant:slowday"` looks like this:

      mattew:431364b6450fc47ccdbf6a2205dfdb1baeb79412
      reluctant:d03f5cc1cdb11a77410ee34e26ca1102e67a893c

      
    The hashed password can be generated with q using a function `.Q.sha1`:
 
      q).Q.sha1 "slowday"
      0xd03f5cc1cdb11a77410ee34e26ca1102e67a893c
 
- `KDBPLUS_TLS_KEY_FILE` and `KDBPLUS_TLS_KEY_FILE_SECRET`: The pkcs12 file and its password which TLS acceptor uses.
- `QUDSPATH` (optional): q-native environmental variable to define an astract namespace. This environmental variable is used by UDS acceptor too. The abstract nameapace will be `@${QUDSPATH}/kx.[server process port]` if this environmental variable is defined; otherwise it will be `@/tmp/kx.[server process port]`.

*Notes:*

- Messages will be sent with OS native endian.
- When using this crate for a TLS client you need to set two environmental variables `KX_SSL_CERT_FILE` and `KX_SSL_KEY_FILE` on q side to make q/kdb+ to work as a TLS server. For details, see [the KX website]https://code.kx.com/q/kb/ssl/.

### Type Mapping

All types are expressed as `K` struct which is quite similar to the `K` struct of `api` module but its structure is optimized for IPC
usage and for the convenience to interact with. The table below shows the input types of each q type which is used to construct `K` object.
Note that the input type can be different from the inner type. For example, timestamp has an input type of `chrono::DateTime<Utc>` but
the inner type is `i64` denoting an elapsed time in nanoseconds since `2000.01.01D00:00:00`.

| q                | Rust                                              |
|------------------|---------------------------------------------------|
| `bool`           | `bool`                                            |
| `GUID`           | `[u8; 16]`                                        |
| `byte`           | `u8`                                              |
| `short`          | `i16`                                             |
| `int`            | `i32`                                             |
| `long`           | `i64`                                             |
| `real`           | `f32`                                             |
| `float`          | `f64`                                             |
| `char`           | `char`                                            |
| `symbol`         | `String`                                          |
| `timestamp`      | `chrono::DateTime<Utc>`                           |
| `month`          | `chrono::NaiveDate`                               |
| `date`           | `chrono::NaiveDate`                               |
| `datetime`       | `chrono::DateTime<Utc>`                           |
| `timespan`       | `chrono::Duration`                                |
| `minute`         | `chrono::Duration`                                |
| `second`         | `chrono::Duration`                                |
| `time`           | `chrono::Duration`                                |
| `list`           | `Vec<Item>` (`Item` is a corrsponding type above) |
| `compound list`  | `Vec<K>`                                          |
| `table`          | `Vec<K>`                                          |
| `dictionary`     | `Vec<K>`                                          |
| `null`           | `()`                                              |
 
### Examples

#### Client

```rust
use kdb_codec::*;

#[tokio::main(flavor = "multi_thread", worker_threads = 2)]
async fn main() -> Result<()> {

    // Connect to qprocess running on localhost:5000 via UDS
    let mut socket = QStream::connect(ConnectionMethod::UDS, "", 5000_u16, "ideal:person").await?;
    println!("Connection type: {}", socket.get_connection_type());

    // Set remote function with asynchronous message
    socket.send_async_message(&"collatz:{[n] seq:enlist n; while[not n = 1; seq,: n:$[n mod 2; 1 + 3 * n; `long$n % 2]]; seq}").await?;

    // Send a query synchronously
    let mut result = socket.send_sync_message(&"collatz[12]").await?;
    println!("collatz[12]: {}", result);

    result = socket.send_sync_message(&"collatz[`a]").await?;
    println!("collatz[`a]: {}", result);

    // Send a functional query.
    let mut message = K::new_compound_list(vec![K::new_symbol(String::from("collatz")), K::new_long(100)]);
    result = socket.send_sync_message(&message).await?;
    println!("collatz[100]: {}", result);

    // Modify query to (`collatz; 20)
    message.pop().unwrap();
    message.push(&K::new_long(20)).unwrap();
    result=socket.send_sync_message(&message).await?;
    println!("collatz[20]: {}", result);

    // Send a functional asynchronous query.
    message = K::new_compound_list(vec![K::new_string(String::from("show"), qattribute::NONE), K::new_symbol(String::from("goodbye"))]);
    socket.send_async_message(&message).await?;

    socket.shutdown().await?;

    Ok(())
}
```

#### Listener

```rust
use kdb_codec::*;

#[tokio::main]
async fn main() -> Result<()> {

  // Start listenening over TCP at the port 7000 with authentication enabled.
  let mut socket_tcp = QStream::accept(ConnectionMethod::TCP, "127.0.0.1", 7000).await?;

  // Send a query with the socket.
  let greeting = socket_tcp.send_sync_message(&"string `Hello").await?;
  println!("Greeting: {}", greeting);

  socket_tcp.shutdown().await?;

  Ok(())
}
```

Then q client can connect to this acceptor with the acceptor's host, port and the credential configured in `KDBPLUS_ACCOUNT_FILE`:

```q
q)h:hopen `::7000:reluctant:slowday
```

## Architecture & Design

### Cancellation Safety

The core innovation of this library is its use of `tokio-util::codec::Framed` which provides automatic buffer management:

- **Buffer Preservation**: Partial reads are stored in the codec's internal buffer
- **Resumable Operations**: Cancelled reads can be safely retried without data loss
- **No Manual State Management**: The Framed wrapper handles all buffer lifecycle

This is critical for production systems using patterns like:
- `tokio::select!` for timeouts or concurrent operations
- Graceful shutdown with cancellation
- Request racing or fallback logic

### Synchronous Deserialization

Unlike the original kdbplus crate, we use **synchronous deserialization** without `async-recursion`:

- **Simpler**: No async recursion complexity
- **Faster**: Eliminates async overhead for CPU-bound deserialization
- **Smaller**: No `async-recursion` proc-macro dependency
- **Safer**: Avoids potential stack overflow from deep async recursion

The deserialization happens in `deserialize_sync.rs` and is called from the codec's `decode()` method after the complete message is buffered.

### Why Not Add `split()` to QStream?

While we show how to split the underlying Framed stream in examples, we **don't recommend** adding a `split()` method directly to `QStream` because:

1. **Protocol Semantics**: KDB+ IPC is request-response oriented. Splitting would allow sending multiple requests before receiving responses, which can confuse message correlation.

2. **Complexity**: Users would need to manually track which response corresponds to which request.

3. **Better Alternatives**: For concurrent operations, use multiple `QStream` instances or the lower-level `Framed` API directly when you need full control.

If you need independent send/receive channels, access the underlying stream:

```rust
let stream = TcpStream::connect("127.0.0.1:5000").await?;
let framed = Framed::new(stream, KdbCodec::new(true));
let (writer, reader) = framed.split();
// Now you have full control
```

### Installation

Add `kdb_codec` to your `Cargo.toml`:

```toml
[dependencies]
kdb_codec = "0.4"
```

The IPC feature is enabled by default.

## Testing

### Unit Tests

Run the standard unit tests (no kdb+ server required):

```bash
cargo test --package kdb_codec --lib --tests
```

### Integration Tests

Some tests require a running kdb+ server and are marked as `#[ignore]` by default. To run these tests:

1. Start a kdb+ server on `localhost:5001` with credentials `kdbuser:pass`:
   ```bash
   q -p 5001 -u path/to/passwd/file
   ```

2. Run the ignored tests:
   ```bash
   cargo test --package kdb_codec --tests -- --ignored
   ```

The integration tests include:
- `functional_message_test`: Tests various message types and operations
- `compression_test`: Tests compression functionality with large data

**Note**: These tests are automatically skipped in CI/CD unless a kdb+ server is explicitly configured.

## Documentation

The full API documentation is available on [docs.rs/kdb_codec](https://docs.rs/kdb_codec/).

For details of the kdb+ IPC protocol, see:

- [kdb+ IPC Reference]https://code.kx.com/q/basics/ipc/
- [Serialization]https://code.kx.com/q/basics/serialization/

## License

This library is licensed under Apache-2.0.

See [LICENSE](kdb_codec/LICENSE).