hftbacktest 0.9.4

A high-frequency trading and market-making backtesting tool accounts for limit orders, queue positions, and latencies, utilizing full tick data for trades and order books.
import json
import multiprocessing
import os
import os.path
import subprocess
from datetime import timedelta, datetime

import numpy as np
from sklearn.model_selection import ParameterGrid

date_from = 20240501
date_to = 20240531

# The path for the converted npz files for the Rust version.
npz_path = '.'

# The path where the backtesting result is saved.
out_path = '.'

# The path where the example backtesting program, "gridtrading_backtest_args", is located.
backtest_program = './gridtrading_backtest_args'

# Sets the number of processors for parallel processing during backtesting. The backtesting program itself doesn't use
# multiprocessing, but it runs multiple backtests for each pair in parallel to speed up the process.
num_processors = 8


with open('tickers.json', 'r') as f:
    tickers = json.load(f)


def backtest_rust(
        symbol,
        date_from_,
        date_to_,
        tick_size,
        lot_size,
        rel_half_spread,
        rel_grid_interval,
        grid_num,
        skew,
        order_qty,
        max_position
):
    date = datetime.strptime(str(date_from_), '%Y%m%d')
    date_to_ = datetime.strptime(str(date_to_), '%Y%m%d')
    dates = []
    while date <= date_to_:
        dates.append(date.strftime('%Y%m%d'))
        date += timedelta(days=1)
    data_files = ' '.join([os.path.join(npz_path, f'{symbol}_{yyyymmdd}.npz') for yyyymmdd in dates])
    latency_files = ' '.join([os.path.join(npz_path, f'latency_{yyyymmdd}.npz') for yyyymmdd in dates])
    cmd = (
        f'{backtest_program} '
        f'--name {symbol}_{rel_half_spread}_{rel_grid_interval}_{grid_num}_{skew}_ '
        f'--data-files {data_files} '
        f'--latency-files {latency_files} '
        f'--output-path {out_path} '
        f'--tick-size {tick_size} '
        f'--lot-size {lot_size} '
        f'--relative-half-spread {rel_half_spread} '
        f'--relative-grid-interval {rel_grid_interval} '
        f'--grid-num {grid_num} '
        f'--skew {skew} '
        f'--order-qty {order_qty} '
        f'--max-position {max_position} '
    )
    return_code = subprocess.call(cmd, shell=True)
    print(f'{symbol}: {return_code}\n')


# Sets parameters for the given symbol. To reduce the number of parameter sets to search, there uses the same value for
# grid_interval as for rel_half_spread, and skew is normalized by grid_num and rel_half_spread. You can also search
# these parameters to broaden the search space. However, this may increase the risk of overfitting. You need to
# carefully select and limit the search space.
def params(symbol, rel_half_spread, grid_num):
    tick_size = tickers[symbol]['tick_size']
    lot_size = tickers[symbol]['lot_size']
    min_qty = tickers[symbol]['min_qty']
    rel_half_spread = rel_half_spread
    grid_num = grid_num
    rel_grid_interval = rel_half_spread
    skew = rel_half_spread / grid_num

    # Order quantity is set to be equivalent to about $100.
    if symbol.startswith('1000'):
        order_qty100 = round(
            (100 / (1000 * float(tickers[symbol]['weighted_avg_price']))) / float(lot_size)
        ) * float(lot_size)
    else:
        order_qty100 = round((100 / float(tickers[symbol]['weighted_avg_price'])) / float(lot_size)) * float(lot_size)
    order_qty = max(float(min_qty), order_qty100)
    max_position = grid_num * order_qty

    return (
        symbol,
        date_from,
        date_to,
        tick_size,
        lot_size,
        rel_half_spread,
        rel_grid_interval,
        grid_num,
        skew,
        order_qty,
        max_position
    )


param_grid = {
    'symbol': list(tickers.keys()),
    'rel_half_spread': [0.0004, 0.0005, 0.0006, 0.0007, 0.0008],
    'grid_num': [5, 10, 15, 20]
}

grid = ParameterGrid(param_grid)
args = [params(**p) for p in grid]
with multiprocessing.Pool(num_processors) as pool:
    pool.starmap(backtest_rust, args)