import multiprocessing
import os.path
import os
import glob
import re
from hftbacktest.data.utils import tardis
sod_date = 20240501
csv_path = '.'
npz_path = '.'
num_processors = 2
buffer_size = 100_000_000
def convert(in_path, out_path, exchange, symbol, yyyymmdd, sod):
files = [
os.path.join(in_path, f'{exchange}_incremental_book_L2_{symbol}_{yyyymmdd}.csv.gz'),
os.path.join(in_path, f'{exchange}_trades_{symbol}_{yyyymmdd}.csv.gz')
]
try:
tardis.convert(
files,
output_filename=os.path.join(out_path, f'{symbol}_{yyyymmdd}.npz'),
buffer_size=buffer_size,
snapshot_mode='process' if sod else 'ignore',
)
for file in files:
os.remove(file)
except Exception as e:
print(e, exchange, symbol, yyyymmdd)
args = []
pat = re.compile(r'^([\w\-]+)_incremental_book_L2_(\w+)_([0-9]{8})\.csv\.gz$')
for file in glob.glob(os.path.join(csv_path, '*.csv.gz')):
match = pat.match(os.path.basename(file))
if match is not None:
exchange = match.group(1)
symbol = match.group(2)
yyyymmdd = match.group(3)
args.append((csv_path, npz_path, exchange, symbol, yyyymmdd, yyyymmdd == sod_date))
with multiprocessing.Pool(num_processors) as pool:
pool.starmap(convert, args)