1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
//! Live telemetry connection for Windows
use crate::Result;
#[cfg(windows)]
use {
crate::driver::Driver,
crate::provider::Provider,
crate::providers::live::LiveProvider,
crate::stream::ThrottleExt,
crate::types::{FramePacket, UpdateRate},
crate::{FrameAdapter, SessionInfo, VariableSchema},
futures::{Stream, StreamExt},
std::sync::Arc,
std::time::Duration,
tokio::sync::watch,
tokio_stream::wrappers::WatchStream,
tokio_util::sync::CancellationToken,
tracing::{debug, info},
};
/// Live connection to iRacing telemetry
#[cfg(windows)]
pub struct LiveConnection {
/// Frame watch receiver
frames: watch::Receiver<Option<Arc<FramePacket>>>,
/// Session watch receiver
sessions: watch::Receiver<Option<Arc<SessionInfo>>>,
/// Variable schema
schema: Arc<VariableSchema>,
/// Source frequency
source_hz: f64,
/// Cancellation token for stopping tasks
cancel: CancellationToken,
}
#[cfg(windows)]
impl LiveConnection {
/// Create a new live connection.
///
/// This method establishes a connection to iRacing's shared memory and starts
/// monitoring for telemetry data. The connection will wait for iRacing to
/// start a session before streaming frames.
pub async fn connect() -> Result<Self> {
info!("Connecting to iRacing live telemetry");
// Create provider and extract metadata
let provider = LiveProvider::new()?;
let schema = provider.schema();
let source_hz = provider.tick_rate();
// Spawn driver tasks - they will wait for iRacing to start
let channels = Driver::spawn(provider);
// Don't wait for frames here - let the streams handle waiting
// This allows the connection to be established even if iRacing isn't
// in a session yet. The streams will wait for data.
info!("Live connection established ({}Hz) - waiting for iRacing session", source_hz);
Ok(Self {
frames: channels.frames,
sessions: channels.sessions,
schema,
source_hz,
cancel: channels.cancel,
})
}
/// Subscribe to telemetry frames
pub fn subscribe<T>(&self, rate: UpdateRate) -> impl Stream<Item = T> + 'static
where
T: FrameAdapter + Send + 'static,
{
// Validate schema once at subscription time
let validation = T::validate_schema(&self.schema).expect("Schema validation failed");
// Create base frame stream from watch channel
// Important: WatchStream yields the current value immediately. If no frames
// have arrived yet, this will be None. We must handle this carefully to avoid
// the stream appearing to end when it's actually just waiting for data.
//
// We skip initial None values to keep the stream alive while waiting for iRacing.
// Once we receive our first frame, any subsequent None indicates the provider stopped.
let frames = WatchStream::new(self.frames.clone())
.skip_while(|opt| {
// Skip leading None values (waiting for iRacing)
let is_none = opt.is_none();
async move { is_none }
})
.take_while(|opt| {
// After skipping initial Nones, stop on the first None (provider ended)
let is_some = opt.is_some();
async move { is_some }
})
.filter_map(|opt| async move { opt });
// Apply rate control and adaptation
let effective_rate = rate.normalize(self.source_hz);
match effective_rate {
UpdateRate::Native => {
// Direct adaptation, no throttling
frames.map(move |packet| T::adapt(&packet, &validation)).boxed()
}
UpdateRate::Max(hz) => {
// Throttle then adapt
let interval = Duration::from_secs_f64(1.0 / hz as f64);
frames.throttle(interval).map(move |packet| T::adapt(&packet, &validation)).boxed()
}
}
}
/// Get session updates as a stream
///
/// Sessions are automatically detected by the Driver when session versions
/// change, and YAML is parsed asynchronously without blocking frame processing.
///
/// This stream will emit the current session immediately (if available), then
/// emit subsequent session changes.
///
/// Uses WatchStream which automatically handles initial state correctly:
/// - Yields current session on subscription (if any)
/// - Yields subsequent updates as they arrive
/// - No manual skip/dedup needed - watch channel semantics handle it
pub fn session_updates(&self) -> impl Stream<Item = Arc<SessionInfo>> + 'static {
WatchStream::new(self.sessions.clone()).filter_map(|opt| async move { opt })
}
/// Get current session info (if any)
pub fn current_session(&self) -> Option<Arc<SessionInfo>> {
self.sessions.borrow().clone()
}
/// Get the source telemetry frequency
pub fn source_hz(&self) -> f64 {
self.source_hz
}
/// Get the variable schema
pub fn schema(&self) -> &VariableSchema {
&self.schema
}
}
#[cfg(windows)]
impl Drop for LiveConnection {
fn drop(&mut self) {
debug!("Dropping live connection");
// Cancel tasks on drop for clean shutdown
self.cancel.cancel();
}
}
// Non-Windows stub implementation
#[cfg(not(windows))]
pub struct LiveConnection {
_private: (),
}
#[cfg(not(windows))]
impl LiveConnection {
/// Attempt to create a live connection on non-Windows platforms.
///
/// This always returns an error as live telemetry is only available on Windows.
/// Consider using `Pitwall::open()` with an IBT file for cross-platform testing.
pub async fn connect() -> Result<Self> {
Err(crate::TelemetryError::unsupported_platform("Live telemetry", "Windows"))
}
}