rustdx-complete 0.5.0

功能完整的 A 股数据获取库,完全对标 pytdx
#!/usr/bin/env python3
"""
测试pytdx的get_security_quotes功能,并打印调试信息
"""
from pytdx.hq import TdxHq_API
import socket

# 创建一个socket来捕获原始数据
original_connect = TdxHq_API.connect

def debug_connect(self, host, port):
    result = original_connect(self, host, port)
    if result:
        print(f"✅ 连接成功到 {host}:{port}")
    return result

TdxHq_API.connect = debug_connect

api = TdxHq_API()

if api.connect('115.238.56.198', 7709):
    print("\n📤 发送请求...")

    # 创建请求
    stocks = [(0, '000001')]
    api.get_security_quotes.prepare_data(api, stocks)

    # 获取请求包
    if hasattr(api, 'send_pkg'):
        print(f"请求包长度: {len(api.send_pkg)} 字节")
        print(f"请求包内容(hex): {api.send_pkg.hex()}")
        print(f"请求包内容(前30字节):")
        for i in range(0, min(30, len(api.send_pkg)), 8):
            hex_str = ' '.join(f'{b:02x}' for b in api.send_pkg[i:i+8])
            print(f"  字节 {i:2d}-{i+7:2d}: {hex_str}")

    print("\n📥 接收响应...")
    data = api.get_security_quotes(stocks)

    print(f"\n✅ 返回数据:")
    print(f"  返回数量: {len(data)}")
    if data and len(data) > 0:
        quote = data[0]
        print(f"  股票代码: {quote.get('code', 'N/A')}")
        print(f"  当前价: {quote.get('price', 'N/A')}")
        print(f"  活跃1: {quote.get('active1', 'N/A')}")

    api.disconnect()
else:
    print("❌ 连接失败")