wingfoil 4.0.0

graph based stream processing framework
Documentation
# KDB+ Adapter

This directory contains the KDB+ adapter for wingfoil, enabling reading from and writing to KDB+ databases.

## Module Structure

```
kdb/
  mod.rs               # Public API, connection types, error handling, Sym
  read.rs              # kdb_read() and shared helpers (compute_time_slices, KdbExt, etc.)
  read_cached.rs       # kdb_read_cached() — file-cached variant of kdb_read
  write.rs             # kdb_write() - writing stream data to KDB+ tables
  integration_tests.rs # Integration tests (requires running KDB+ instance)
```

## Key Components

### Reading from KDB+

- `kdb_read()` - Time-partitioned reads driven by a caller-supplied query closure
  - Computes time slices from `RunMode`/`RunFor` and calls `query_fn` for each slice
  - `query_fn((t0, t1), kdb_date, iteration) -> String` — half-open [t0, t1)
  - Use `time >= t0j, time < t1j` in the q filter for clean round-number boundaries
  - Requires `RunMode::HistoricalFrom` (non-zero start) and `RunFor::Duration`
  - Caller constructs the full query — date/time filters, partition hints, etc.
  - Terminates automatically when all slices are exhausted
- `kdb_read_cached()` - Cached variant of `kdb_read`
  - Same signature as `kdb_read` plus a `cache_dir: impl Into<PathBuf>` parameter
  - Checks `<cache_dir>/<hash>.cache` before each slice query; writes on miss
  - Cache files persist until manually deleted — no TTL, no eviction
  - Lazy TCP connection: if all slices are cache hits, no KDB connection is opened
  - `T` must additionally implement `serde::Serialize + serde::Deserialize`; `Sync` is also required
  - `Sym` is fully supported — it serializes as a plain string (interning not restored on load)
  - If a cache file is corrupt, logs a warning and falls back to KDB, then overwrites the bad file
  - **Schema evolution:** `bincode` is not self-describing — delete the cache dir if `T` changes
- `KdbDeserialize` trait - Convert KDB+ rows to `(NanoTime, T)` tuples
  - `from_kdb_row` returns `Result<(NanoTime, Self), KdbError>` — implementor owns time extraction
  - Use `row.get_timestamp(col)` to extract a KDB timestamp column as `NanoTime`
  - Your struct should only contain business data (sym, price, qty, etc.)

### Writing to KDB+

- `kdb_write()` - Write stream data to KDB+ tables
  - Time is automatically extracted from graph tuples `(NanoTime, T)` and prepended to rows
- `KdbSerialize` trait - Convert Rust types to KDB+ rows
  - `to_kdb_row()` - Returns K object with business data only (no time)
  - **IMPORTANT**: Do NOT include the time field - it's prepended automatically
  - Your struct should only contain business data (sym, price, qty, etc.)

### Connection

- `KdbConnection::new(host, port)` - Configure connection
- `.with_credentials(user, pass)` - Add authentication

## Pre-Commit Requirements

Before committing changes to the KDB adapter, you MUST:

1. **Run integration tests with a live KDB+ instance:**

   ```bash
   # Start KDB+ on port 5000
   # In a separate terminal:
   q -p 5000

   # Then run integration tests:
   cargo test --features kdb-integration-test -p wingfoil
   ```

2. **Run standard formatting and linting:**

   ```bash
   cargo fmt --all
   cargo clippy --workspace --all-targets --all-features
   ```

3. **Run unit tests:**

   ```bash
   cargo test -p wingfoil
   ```

## Integration Test Details

The integration tests in `integration_tests.rs` are gated behind the `kdb-integration-test` feature flag to avoid requiring a running KDB+ instance for normal development.

**Environment variables:**
- `KDB_TEST_HOST` - KDB+ host (default: "localhost")
- `KDB_TEST_PORT` - KDB+ port (default: 5000)

**Test coverage:**
- Round-trip write/read verification
- Serialization/deserialization correctness
- Connection handling
- Time-sliced reads across multiple days and periods
- Edge cases: empty tables, bad queries, bad time columns

## Development Tips

- Always implement both `KdbSerialize` and `KdbDeserialize` for types used in round-trip scenarios
- **Time management**: Time is stored on-graph in tuples `(NanoTime, T)`, not in record structs
  - When reading: time is extracted from the KDB time column and passed as the first tuple element
  - When writing: time is extracted from the tuple and prepended to the serialized row
  - Your structs should ONLY contain business data (no time field)
- Write operations use K object functional queries: `(insert; `tablename; row_values)`
- Connection pooling: Each read/write call opens its own connection

### Example: Record Structure

```rust
// CORRECT - No time field, use Sym for interned symbols
#[derive(Debug, Clone, Default)]
struct Trade {
    sym: Sym,
    price: f64,
    qty: i64,
}

impl KdbDeserialize for Trade {
    fn from_kdb_row(row: Row<'_>, _columns: &[String], interner: &mut SymbolInterner) -> Result<(NanoTime, Self), KdbError> {
        let time = row.get_timestamp(1)?; // col 0: date, col 1: time
        Ok((time, Trade {
            sym: row.get_sym(2, interner)?,
            price: row.get(3)?.get_float()?,
            qty: row.get(4)?.get_long()?,
        }))
    }
}

impl KdbSerialize for Trade {
    fn to_kdb_row(&self) -> K {
        // Return business data only - time prepended automatically
        K::new_compound_list(vec![
            K::new_symbol(self.sym.to_string()),
            K::new_float(self.price),
            K::new_long(self.qty),
        ])
    }
}
```

### Reading with kdb_read_cached

```rust
// Same query closure as kdb_read, plus a cache directory.
// T must also implement serde::Serialize + serde::Deserialize + Sync.
let stream = kdb_read_cached::<Trade, _>(
    conn,
    std::time::Duration::from_secs(3600),
    "/tmp/my-backtest-cache",
    |(t0, t1), date, _| {
        format!(
            "select from trades where date=2000.01.01+{}, \
             time >= (`timestamp$){}j, time < (`timestamp$){}j",
            date, t0.to_kdb_timestamp(), t1.to_kdb_timestamp()
        )
    },
);
```

### Reading with kdb_read

```rust
// Date-partitioned table: filter by date and time range in each slice
let stream = kdb_read::<Trade, _>(
    conn,
    std::time::Duration::from_secs(3600), // 1-hour slices
    |(t0, t1), date, _| {
        format!(
            "select from trades where date=2000.01.01+{}, \
             time >= (`timestamp$){}j, time < (`timestamp$){}j",
            date, t0.to_kdb_timestamp(), t1.to_kdb_timestamp()
        )
    },
);
stream
    .collapse()
    .run(
        RunMode::HistoricalFrom(NanoTime::from_kdb_timestamp(start_kdb)),
        RunFor::Duration(std::time::Duration::from_secs(86400)),
    )
    .unwrap();
```