mode_change_test/
mode_change_test.rs1use kiteticker_async_manager::{
2 KiteManagerConfig, KiteTickerManager, Mode, TickerMessage,
3};
4use log::info;
5use std::time::{Duration, Instant};
6use tokio::time::{sleep, timeout};
7
8#[tokio::main]
9async fn main() -> Result<(), String> {
10 env_logger::init();
12
13 println!("๐ KiteTicker Mode Change Test");
14 println!("โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ");
15
16 let api_key = std::env::var("KITE_API_KEY").unwrap_or_default();
17 let access_token = std::env::var("KITE_ACCESS_TOKEN").unwrap_or_default();
18
19 if api_key.is_empty() || access_token.is_empty() {
20 println!(
21 "โ ๏ธ KITE_API_KEY and KITE_ACCESS_TOKEN environment variables not set"
22 );
23 demonstrate_mode_change_issue().await;
24 return Ok(());
25 }
26
27 let config = KiteManagerConfig {
29 max_symbols_per_connection: 3000,
30 max_connections: 3,
31 connection_buffer_size: 5000,
32 parser_buffer_size: 10000,
33 connection_timeout: Duration::from_secs(30),
34 health_check_interval: Duration::from_secs(5),
35 max_reconnect_attempts: 5,
36 reconnect_delay: Duration::from_secs(2),
37 enable_dedicated_parsers: true,
38 default_mode: Mode::LTP,
39 };
40
41 println!("๐ง Starting manager...");
42 let mut manager = KiteTickerManager::new(api_key, access_token, config);
43
44 manager.start().await?;
45 println!("โ
Manager started successfully");
46
47 test_mode_change_issue(&mut manager).await?;
49
50 println!("\n๐ Stopping manager...");
52 manager.stop().await?;
53
54 println!("๐ Mode change test completed!");
55 Ok(())
56}
57
58async fn test_mode_change_issue(
59 manager: &mut KiteTickerManager,
60) -> Result<(), String> {
61 println!("\n๐งช Testing Mode Change Issue");
62 println!("=============================");
63
64 let test_symbol = 738561; println!("\n1๏ธโฃ Subscribing to symbol {} with LTP mode", test_symbol);
67 manager
68 .subscribe_symbols(&[test_symbol], Some(Mode::LTP))
69 .await?;
70
71 let channels = manager.get_all_channels();
73 let mut tick_listeners = Vec::new();
74
75 for (channel_id, mut receiver) in channels {
76 let task = tokio::spawn(async move {
77 let mut ticks_received = 0;
78 let start = Instant::now();
79
80 while start.elapsed() < Duration::from_secs(5) && ticks_received < 3 {
81 match timeout(Duration::from_millis(500), receiver.recv()).await {
82 Ok(Ok(message)) => {
83 if let TickerMessage::Ticks(ticks) = message {
84 for tick in &ticks {
85 if tick.instrument_token == test_symbol {
86 ticks_received += 1;
87 println!(
88 " ๐ Received tick for {}: Mode={:?}, LTP={:?}",
89 tick.instrument_token,
90 tick.content.mode,
91 tick.content.last_price
92 );
93
94 if tick.content.ohlc.is_some() {
96 println!(
97 " โ ๏ธ OHLC data present in LTP mode - unexpected!"
98 );
99 } else {
100 println!(" โ
No OHLC data in LTP mode - correct");
101 }
102 }
103 }
104 }
105 }
106 _ => continue,
107 }
108 }
109 (channel_id, ticks_received)
110 });
111 tick_listeners.push(task);
112 }
113
114 for task in tick_listeners {
116 if let Ok((channel_id, count)) = task.await {
117 println!(
118 " ๐ {:?}: Received {} ticks for initial LTP subscription",
119 channel_id, count
120 );
121 }
122 }
123
124 sleep(Duration::from_secs(2)).await;
125
126 println!(
128 "\n2๏ธโฃ Attempting to change mode from LTP to Full for symbol {}",
129 test_symbol
130 );
131 match manager.change_mode(&[test_symbol], Mode::Full).await {
132 Ok(()) => {
133 println!(" โ
Mode change command sent successfully");
134 }
135 Err(e) => {
136 println!(" โ Mode change command failed: {}", e);
137 return Err(e);
138 }
139 }
140
141 println!("\n3๏ธโฃ Monitoring ticks after mode change...");
143 let channels = manager.get_all_channels();
144 let mut post_change_listeners = Vec::new();
145
146 for (channel_id, mut receiver) in channels {
147 let task = tokio::spawn(async move {
148 let mut ticks_received = 0;
149 let mut ohlc_present = 0;
150 let mut depth_present = 0;
151 let start = Instant::now();
152
153 while start.elapsed() < Duration::from_secs(10) && ticks_received < 5 {
154 match timeout(Duration::from_millis(500), receiver.recv()).await {
155 Ok(Ok(message)) => {
156 if let TickerMessage::Ticks(ticks) = message {
157 for tick in &ticks {
158 if tick.instrument_token == test_symbol {
159 ticks_received += 1;
160 println!(
161 " ๐ Post-change tick for {}: Mode={:?}, LTP={:?}",
162 tick.instrument_token,
163 tick.content.mode,
164 tick.content.last_price
165 );
166
167 if let Some(ohlc) = &tick.content.ohlc {
169 ohlc_present += 1;
170 println!(
171 " โ
OHLC data present: O:{} H:{} L:{} C:{}",
172 ohlc.open, ohlc.high, ohlc.low, ohlc.close
173 );
174 } else {
175 println!(" โ OHLC data missing - mode change may not have worked!");
176 }
177
178 if let Some(depth) = &tick.content.depth {
179 depth_present += 1;
180 println!(
181 " โ
Market depth present: {} buy, {} sell orders",
182 depth.buy.len(),
183 depth.sell.len()
184 );
185 } else {
186 println!(" โ Market depth missing - mode change may not have worked!");
187 }
188
189 info!("Tick mode reported: {:?}", tick.content.mode);
191 }
192 }
193 }
194 }
195 _ => continue,
196 }
197 }
198 (channel_id, ticks_received, ohlc_present, depth_present)
199 });
200 post_change_listeners.push(task);
201 }
202
203 let mut total_ticks = 0;
205 let mut total_ohlc = 0;
206 let mut total_depth = 0;
207
208 for task in post_change_listeners {
209 if let Ok((channel_id, ticks, ohlc, depth)) = task.await {
210 println!(
211 " ๐ {:?}: {} ticks, {} with OHLC, {} with depth",
212 channel_id, ticks, ohlc, depth
213 );
214 total_ticks += ticks;
215 total_ohlc += ohlc;
216 total_depth += depth;
217 }
218 }
219
220 println!("\n4๏ธโฃ Test Results Analysis");
222 println!("========================");
223
224 if total_ticks == 0 {
225 println!(" โ ๏ธ No ticks received after mode change - connection issue?");
226 } else if total_ohlc == 0 && total_depth == 0 {
227 println!(" โ ISSUE CONFIRMED: Mode change command sent but Full mode data not received");
228 println!(" ๐ก This confirms that set_mode() alone doesn't work - need unsubscribe+resubscribe");
229 } else if total_ohlc > 0 && total_depth > 0 {
230 println!(" โ
Mode change worked - Full mode data received");
231 } else {
232 println!(
233 " โ ๏ธ Partial mode change - some Full mode data received but not all"
234 );
235 }
236
237 println!("\n๐ Summary:");
238 println!(" Total ticks received: {}", total_ticks);
239 println!(" Ticks with OHLC data: {}", total_ohlc);
240 println!(" Ticks with market depth: {}", total_depth);
241
242 Ok(())
243}
244
245async fn demonstrate_mode_change_issue() {
246 println!("\n๐ Mode Change Issue Demonstration");
247 println!("==================================");
248
249 println!("\n๐ The Issue:");
250 println!(
251 " When a WebSocket token is set to a specific mode (LTP, Quote, Full),"
252 );
253 println!(
254 " the Kite Connect WebSocket API doesn't allow direct mode changes."
255 );
256 println!(
257 " Simply sending a 'mode' command doesn't upgrade the subscription."
258 );
259
260 println!("\nโ Current Implementation (Broken):");
261 println!(" ```rust");
262 println!(" // This only sends a mode command but doesn't work");
263 println!(" manager.change_mode(&[symbol], Mode::Full).await?;");
264 println!(" ```");
265
266 println!("\nโ
Required Solution:");
267 println!(" ```rust");
268 println!(" // Must unsubscribe first, then resubscribe with new mode");
269 println!(" manager.unsubscribe_symbols(&[symbol]).await?;");
270 println!(" manager.subscribe_symbols(&[symbol], Some(Mode::Full)).await?;");
271 println!(" ```");
272
273 println!("\n๐ง Fix Needed:");
274 println!(" The change_mode() method should internally:");
275 println!(" 1. Unsubscribe from the symbols");
276 println!(" 2. Resubscribe with the new mode");
277 println!(" 3. Maintain symbol tracking across the operation");
278
279 println!("\n๐ To test with real data:");
280 println!(" export KITE_API_KEY=your_api_key");
281 println!(" export KITE_ACCESS_TOKEN=your_access_token");
282 println!(" export RUST_LOG=debug");
283 println!(" cargo run --example mode_change_test");
284}