pitwall 0.1.0

Modern, type-safe Rust library for iRacing telemetry data
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
//! Integration tests for connection layer
//!
//! These tests verify that telemetry streaming and session info propagation
//! work correctly with both live and replay connections.

#[cfg(test)]
use super::*;
#[cfg(test)]
use crate::{
    UpdateRate,
    adapters::{AdapterValidation, FieldExtraction, FrameAdapter},
};
#[cfg(test)]
use futures::StreamExt;
#[cfg(test)]
use std::time::Duration;
#[cfg(test)]
use tracing::info;

// Manual implementation for testing to avoid derive macro issues
#[cfg(all(test, windows))]
#[derive(Debug)]
struct BasicTelemetry {
    speed: f32,
    rpm: f32,
    gear: i32,
}

#[cfg(all(test, windows))]
impl FrameAdapter for BasicTelemetry {
    fn validate_schema(schema: &crate::VariableSchema) -> crate::Result<AdapterValidation> {
        let mut extraction_plan = Vec::new();

        if let Some(var_info) = schema.variables.get("Speed") {
            extraction_plan.push(FieldExtraction::Required {
                name: "Speed".to_string(),
                var_info: var_info.clone(),
            });
        }

        if let Some(var_info) = schema.variables.get("RPM") {
            extraction_plan.push(FieldExtraction::Required {
                name: "RPM".to_string(),
                var_info: var_info.clone(),
            });
        }

        if let Some(var_info) = schema.variables.get("Gear") {
            extraction_plan.push(FieldExtraction::Required {
                name: "Gear".to_string(),
                var_info: var_info.clone(),
            });
        }

        Ok(AdapterValidation::new(extraction_plan))
    }

    fn adapt(packet: &crate::types::FramePacket, validation: &AdapterValidation) -> Self {
        use crate::VarData;

        let data = packet.data.as_ref();

        let speed = if let Some(index) = validation.index_of("Speed") {
            if let Some(FieldExtraction::Required { var_info, .. }) =
                validation.extraction_plan.get(index)
            {
                f32::from_bytes(data, var_info).unwrap_or(0.0)
            } else {
                0.0
            }
        } else {
            0.0
        };

        let rpm = if let Some(index) = validation.index_of("RPM") {
            if let Some(FieldExtraction::Required { var_info, .. }) =
                validation.extraction_plan.get(index)
            {
                f32::from_bytes(data, var_info).unwrap_or(0.0)
            } else {
                0.0
            }
        } else {
            0.0
        };

        let gear = if let Some(index) = validation.index_of("Gear") {
            if let Some(FieldExtraction::Required { var_info, .. }) =
                validation.extraction_plan.get(index)
            {
                i32::from_bytes(data, var_info).unwrap_or(0)
            } else {
                0
            }
        } else {
            0
        };

        BasicTelemetry { speed, rpm, gear }
    }
}

// Simple frame for throttle testing
#[cfg(test)]
#[derive(Debug)]
struct SimpleFrame {
    #[allow(dead_code)]
    speed: f32,
}

#[cfg(test)]
impl FrameAdapter for SimpleFrame {
    fn validate_schema(schema: &crate::VariableSchema) -> crate::Result<AdapterValidation> {
        let mut extraction_plan = Vec::new();

        if let Some(var_info) = schema.variables.get("Speed") {
            extraction_plan.push(FieldExtraction::Required {
                name: "Speed".to_string(),
                var_info: var_info.clone(),
            });
        }

        Ok(AdapterValidation::new(extraction_plan))
    }

    fn adapt(packet: &crate::types::FramePacket, validation: &AdapterValidation) -> Self {
        use crate::VarData;

        let data = packet.data.as_ref();

        let speed = if let Some(index) = validation.index_of("Speed") {
            if let Some(FieldExtraction::Required { var_info, .. }) =
                validation.extraction_plan.get(index)
            {
                f32::from_bytes(data, var_info).unwrap_or(0.0)
            } else {
                0.0
            }
        } else {
            0.0
        };

        SimpleFrame { speed }
    }
}

#[cfg(all(test, windows))]
#[tokio::test]
#[ignore = "iracing_required"]
async fn live_session_immediate_delivery() {
    // Initialize logging for debugging
    let _ = tracing_subscriber::fmt::try_init();

    info!("Testing LIVE session immediate delivery");

    let connection = match live::LiveConnection::connect().await {
        Ok(conn) => conn,
        Err(e) => {
            eprintln!("Failed to connect to iRacing: {}", e);
            eprintln!("Make sure iRacing is running with a session loaded");
            panic!("Cannot test without iRacing");
        }
    };

    info!("Connected to live iRacing");

    // Get session updates stream
    let mut session_stream = Box::pin(connection.session_updates());

    // CRITICAL TEST: Session should be available on FIRST call to stream.next()
    // This is the bug we're fixing - stream should yield immediately, not hang
    info!("Calling stream.next() - should return within 1 second");
    let start = std::time::Instant::now();

    let session = tokio::time::timeout(Duration::from_secs(1), session_stream.next())
        .await
        .expect(
            "TIMEOUT! Stream did not yield session within 1 second - WatchStream bug not fixed!",
        )
        .expect("Stream returned None - no session available");

    let elapsed = start.elapsed();

    // Verify session has expected fields
    assert!(!session.weekend_info.track_name.is_empty(), "Track name should not be empty");
    assert!(!session.weekend_info.track_length.is_empty(), "Track length should not be empty");

    info!("Session delivered in {:?}", elapsed);
    info!("Track: {}", session.weekend_info.track_name);
    info!("Length: {}", session.weekend_info.track_length);
    info!("Sessions: {}", session.session_info.sessions.len());
    info!("Current session num: {}", session.session_info.current_session_num);
}

#[cfg(all(test, windows))]
#[tokio::test]
#[ignore = "iracing_required"]
async fn live_session_info_propagation() {
    // Initialize logging for debugging
    let _ = tracing_subscriber::fmt::try_init();

    info!("Connecting to live telemetry...");
    let connection = match live::LiveConnection::connect().await {
        Ok(conn) => conn,
        Err(e) => {
            eprintln!("Failed to connect to iRacing: {}", e);
            eprintln!("Make sure iRacing is running with a session loaded");
            panic!("Cannot test without iRacing");
        }
    };

    info!("Connected! Testing session info stream...");

    // Get session updates stream
    let mut session_stream = Box::pin(connection.session_updates());

    // Get the initial session info (there's typically only one unless session changes)
    let mut session_count = 0;

    // Use timeout to avoid hanging if no updates - only expect 1 session
    let timeout = Duration::from_secs(2);
    let start = tokio::time::Instant::now();

    while session_count < 1 && start.elapsed() < timeout {
        match tokio::time::timeout(Duration::from_secs(1), session_stream.next()).await {
            Ok(Some(session)) => {
                session_count += 1;

                // Verify session has expected fields
                assert!(
                    !session.weekend_info.track_name.is_empty(),
                    "Track name should not be empty"
                );
                assert!(
                    !session.weekend_info.track_length.is_empty(),
                    "Track length should not be empty"
                );

                info!(
                    "Session {}: Track={}, Length={}, Sessions={}",
                    session_count,
                    session.weekend_info.track_name,
                    session.weekend_info.track_length,
                    session.session_info.sessions.len()
                );
            }
            Ok(None) => {
                info!("Session stream ended");
                break;
            }
            Err(_) => {
                // Timeout is fine - might not have session changes
                info!("No session update within timeout");
            }
        }
    }

    assert!(session_count > 0, "Should receive at least one session info");
    info!("Successfully received {} session updates", session_count);
}

#[cfg(windows)]
#[tokio::test]
#[ignore = "iracing_required"]
async fn live_telemetry_with_session_correlation() {
    let _ = tracing_subscriber::fmt::try_init();

    info!("Connecting for telemetry correlation test...");
    let connection = live::LiveConnection::connect().await.expect("Failed to connect to iRacing");

    // Subscribe to both telemetry and session info
    let mut telemetry_stream =
        Box::pin(connection.subscribe::<BasicTelemetry>(UpdateRate::Max(10)));
    let mut session_stream = Box::pin(connection.session_updates());

    // Get initial session info
    let initial_session =
        tokio::time::timeout(Duration::from_secs(2), session_stream.next()).await.ok().flatten();

    if let Some(session) = initial_session {
        info!("Initial session: {}", session.weekend_info.track_name);
    }

    // Collect some telemetry frames
    let mut frame_count = 0;
    let timeout = Duration::from_secs(3);
    let start = tokio::time::Instant::now();

    while frame_count < 10 && start.elapsed() < timeout {
        match tokio::time::timeout(Duration::from_millis(200), telemetry_stream.next()).await {
            Ok(Some(frame)) => {
                frame_count += 1;
                info!(
                    "Frame {}: Speed={:.1} km/h, RPM={:.0}, Gear={}",
                    frame_count, frame.speed, frame.rpm, frame.gear
                );
            }
            Ok(None) => break,
            Err(_) => continue,
        }
    }

    assert!(frame_count > 0, "Should receive telemetry frames");
    info!("Successfully received {} telemetry frames", frame_count);
}

#[tokio::test]
async fn replay_session_immediate_delivery() {
    use crate::test_utils;

    let _ = tracing_subscriber::fmt::try_init();

    // Get a test IBT file
    let ibt_file = test_utils::get_smallest_ibt_test_file().expect("No IBT test files found");

    info!("Opening replay file: {:?}", ibt_file);
    let connection =
        replay::ReplayConnection::open(ibt_file).await.expect("Failed to open IBT file");

    // Get session updates stream
    let mut session_stream = Box::pin(connection.session_updates());

    // CRITICAL TEST: Session should be available on FIRST call to stream.next()
    // This validates WatchStream yields current value immediately
    let session = tokio::time::timeout(Duration::from_secs(1), session_stream.next())
        .await
        .expect("Timeout waiting for initial session - stream should yield immediately")
        .expect("Stream should not be empty - session should be available");

    // Verify session has expected data
    assert!(!session.weekend_info.track_name.is_empty(), "Track name should not be empty");
    assert!(!session.weekend_info.track_length.is_empty(), "Track length should not be empty");

    info!(
        "Initial session delivered immediately: Track={}, Sessions={}",
        session.weekend_info.track_name,
        session.session_info.sessions.len()
    );
}

#[tokio::test]
async fn replay_session_info_propagation() {
    use crate::test_utils;

    let _ = tracing_subscriber::fmt::try_init();

    // Get a test IBT file
    let ibt_file = test_utils::get_smallest_ibt_test_file().expect("No IBT test files found");

    info!("Opening replay file: {:?}", ibt_file);
    let connection =
        replay::ReplayConnection::open(ibt_file).await.expect("Failed to open IBT file");

    // Get session updates stream
    let mut session_stream = Box::pin(connection.session_updates());

    // Collect session updates
    let mut sessions = Vec::new();
    let timeout = Duration::from_secs(5);
    let start = tokio::time::Instant::now();

    while start.elapsed() < timeout {
        match tokio::time::timeout(Duration::from_millis(100), session_stream.next()).await {
            Ok(Some(session)) => {
                info!(
                    "Session: Track={}, Sessions={}",
                    session.weekend_info.track_name,
                    session.session_info.sessions.len()
                );
                sessions.push(session);
            }
            Ok(None) => {
                info!("Session stream ended");
                break;
            }
            Err(_) => {
                // No more updates
                break;
            }
        }
    }

    // Verify we got at least one session
    assert!(!sessions.is_empty(), "Should receive at least one session info");

    // Verify deduplication - shouldn't get duplicate sessions
    for i in 1..sessions.len() {
        // Session number should have changed if we got another update
        assert_ne!(
            sessions[i - 1].session_info.current_session_num,
            sessions[i].session_info.current_session_num,
            "Should not receive duplicate session updates"
        );
    }

    info!("Successfully received {} unique session updates", sessions.len());
}

#[tokio::test]
async fn replay_telemetry_stream_throttling() {
    use crate::test_utils;
    use std::time::Instant;

    let _ = tracing_subscriber::fmt::try_init();

    let ibt_file = test_utils::get_smallest_ibt_test_file().expect("No IBT test files found");

    let connection =
        replay::ReplayConnection::open(ibt_file).await.expect("Failed to open IBT file");

    // Subscribe with throttling to 5 Hz
    let mut stream = Box::pin(connection.subscribe::<SimpleFrame>(UpdateRate::Max(5)));

    let mut frames = Vec::new();
    let mut timestamps = Vec::new();
    let start = Instant::now();

    // Collect frames for 2 seconds
    while start.elapsed() < Duration::from_secs(2) {
        match tokio::time::timeout(Duration::from_millis(250), stream.next()).await {
            Ok(Some(frame)) => {
                timestamps.push(Instant::now());
                frames.push(frame);
            }
            Ok(None) => break,
            Err(_) => continue,
        }
    }

    // Should have received frames
    assert!(!frames.is_empty(), "Should receive frames");

    // Check throttling - should be approximately 5 Hz
    if timestamps.len() > 2 {
        let mut intervals = Vec::new();
        for i in 1..timestamps.len() {
            intervals.push(timestamps[i].duration_since(timestamps[i - 1]));
        }

        let avg_interval = intervals.iter().sum::<Duration>() / intervals.len() as u32;
        let expected_interval = Duration::from_millis(200); // 5 Hz = 200ms

        // Allow 50ms tolerance
        let diff = avg_interval.abs_diff(expected_interval);

        assert!(
            diff < Duration::from_millis(50),
            "Throttling not working correctly. Expected ~200ms, got {:?}",
            avg_interval
        );

        info!("Throttling working: avg interval = {:?}", avg_interval);
    }

    info!("Received {} frames over {:?}", frames.len(), start.elapsed());
}