1use std::collections::HashMap;
8use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
9use std::sync::{Arc, Mutex};
10
11use crate::RtpSsrc;
12use crate::RtpTimestamp;
13use crate::packet::rtcp::NtpTimestamp;
14use super::clock::MediaClock;
15
16struct StreamMapping {
18 source_ssrc: RtpSsrc,
20
21 target_ssrc: RtpSsrc,
23
24 source_rate: u32,
26
27 target_rate: u32,
29
30 source_rtp_ref: RtpTimestamp,
32
33 target_rtp_ref: RtpTimestamp,
35
36 ntp_ref: NtpTimestamp,
38
39 last_update: Instant,
41
42 drift_ppm: f64,
44}
45
46impl StreamMapping {
47 pub fn new(
49 source_ssrc: RtpSsrc,
50 target_ssrc: RtpSsrc,
51 source_rate: u32,
52 target_rate: u32,
53 source_rtp: RtpTimestamp,
54 target_rtp: RtpTimestamp,
55 ntp: NtpTimestamp,
56 ) -> Self {
57 Self {
58 source_ssrc,
59 target_ssrc,
60 source_rate,
61 target_rate,
62 source_rtp_ref: source_rtp,
63 target_rtp_ref: target_rtp,
64 ntp_ref: ntp,
65 last_update: Instant::now(),
66 drift_ppm: 0.0,
67 }
68 }
69
70 pub fn map_timestamp(&self, source_rtp: RtpTimestamp) -> RtpTimestamp {
72 let source_diff = source_rtp.wrapping_sub(self.source_rtp_ref) as i64;
74
75 let seconds = source_diff as f64 / self.source_rate as f64;
77
78 let corrected_seconds = seconds * (1.0 + self.drift_ppm / 1_000_000.0);
80
81 let target_diff = (corrected_seconds * self.target_rate as f64) as i64;
83
84 self.target_rtp_ref.wrapping_add(target_diff as u32)
86 }
87
88 pub fn update(
90 &mut self,
91 source_rtp: RtpTimestamp,
92 target_rtp: RtpTimestamp,
93 ntp: NtpTimestamp
94 ) -> f64 {
95 let now = Instant::now();
97 let elapsed = now.duration_since(self.last_update).as_secs_f64();
98
99 if elapsed > 0.0 {
100 let source_ticks = source_rtp.wrapping_sub(self.source_rtp_ref) as i64;
102 let target_ticks = target_rtp.wrapping_sub(self.target_rtp_ref) as i64;
103
104 let source_seconds = source_ticks as f64 / self.source_rate as f64;
105 let target_seconds = target_ticks as f64 / self.target_rate as f64;
106
107 let drift = (target_seconds - source_seconds) / elapsed;
109 self.drift_ppm = drift * 1_000_000.0;
110 }
111
112 self.source_rtp_ref = source_rtp;
114 self.target_rtp_ref = target_rtp;
115 self.ntp_ref = ntp;
116 self.last_update = now;
117
118 self.drift_ppm / 1000.0
120 }
121}
122
123#[derive(Clone)]
125pub struct TimestampMapper {
126 mappings: Arc<Mutex<HashMap<(RtpSsrc, RtpSsrc), StreamMapping>>>,
128
129 clocks: Arc<Mutex<HashMap<RtpSsrc, MediaClock>>>,
131}
132
133impl TimestampMapper {
134 pub fn new() -> Self {
136 Self {
137 mappings: Arc::new(Mutex::new(HashMap::new())),
138 clocks: Arc::new(Mutex::new(HashMap::new())),
139 }
140 }
141
142 pub fn register_stream(&self, ssrc: RtpSsrc, clock_rate: u32, initial_rtp: RtpTimestamp) {
144 if let Ok(mut clocks) = self.clocks.lock() {
145 let clock = MediaClock::now(clock_rate, initial_rtp);
147 clocks.insert(ssrc, clock);
148 }
149 }
150
151 pub fn update_from_sr(&self, ssrc: RtpSsrc, ntp: NtpTimestamp, rtp: RtpTimestamp) {
153 if let Ok(mut clocks) = self.clocks.lock() {
154 if let Some(clock) = clocks.get_mut(&ssrc) {
156 clock.update_reference(rtp, ntp);
157 } else {
158 let clock_rate = 8000; let clock = MediaClock::new(clock_rate, rtp, ntp);
162 clocks.insert(ssrc, clock);
163 }
164 }
165 }
166
167 pub fn map_streams(
169 &self,
170 source_ssrc: RtpSsrc,
171 target_ssrc: RtpSsrc,
172 source_rtp: RtpTimestamp,
173 target_rtp: RtpTimestamp,
174 ntp: NtpTimestamp,
175 ) -> Option<f64> {
176 let (source_rate, target_rate) = if let Ok(clocks) = self.clocks.lock() {
178 let source_clock = clocks.get(&source_ssrc)?;
179 let target_clock = clocks.get(&target_ssrc)?;
180 (source_clock.clock_rate(), target_clock.clock_rate())
181 } else {
182 return None;
183 };
184
185 if let Ok(mut mappings) = self.mappings.lock() {
186 let key = (source_ssrc, target_ssrc);
187
188 if let Some(mapping) = mappings.get_mut(&key) {
190 let drift = mapping.update(source_rtp, target_rtp, ntp);
192 Some(drift)
193 } else {
194 let mapping = StreamMapping::new(
196 source_ssrc, target_ssrc, source_rate, target_rate,
197 source_rtp, target_rtp, ntp
198 );
199 mappings.insert(key, mapping);
200 Some(0.0) }
202 } else {
203 None
204 }
205 }
206
207 pub fn map_timestamp(
209 &self,
210 source_ssrc: RtpSsrc,
211 target_ssrc: RtpSsrc,
212 source_rtp: RtpTimestamp,
213 ) -> Option<RtpTimestamp> {
214 if let Ok(mappings) = self.mappings.lock() {
215 let key = (source_ssrc, target_ssrc);
216
217 if let Some(mapping) = mappings.get(&key) {
218 Some(mapping.map_timestamp(source_rtp))
220 } else {
221 if let Ok(clocks) = self.clocks.lock() {
223 let source_clock = clocks.get(&source_ssrc)?;
224 let target_clock = clocks.get(&target_ssrc)?;
225
226 let ntp = source_clock.rtp_to_ntp(source_rtp);
228
229 Some(target_clock.ntp_to_rtp(ntp))
231 } else {
232 None
233 }
234 }
235 } else {
236 None
237 }
238 }
239
240 pub fn get_drift(&self, source_ssrc: RtpSsrc, target_ssrc: RtpSsrc) -> Option<f64> {
242 if let Ok(mappings) = self.mappings.lock() {
243 let key = (source_ssrc, target_ssrc);
244
245 mappings.get(&key).map(|mapping| mapping.drift_ppm)
246 } else {
247 None
248 }
249 }
250
251 pub fn rtp_to_wallclock(&self, ssrc: RtpSsrc, rtp: RtpTimestamp) -> Option<Instant> {
253 if let Ok(clocks) = self.clocks.lock() {
254 let clock = clocks.get(&ssrc)?;
255 Some(clock.rtp_to_system_time(rtp))
256 } else {
257 None
258 }
259 }
260
261 pub fn wallclock_to_rtp(&self, ssrc: RtpSsrc, time: Instant) -> Option<RtpTimestamp> {
263 if let Ok(clocks) = self.clocks.lock() {
264 let clock = clocks.get(&ssrc)?;
265 Some(clock.system_time_to_rtp(time))
266 } else {
267 None
268 }
269 }
270
271 pub fn get_sync_offset(&self, source_ssrc: RtpSsrc, target_ssrc: RtpSsrc) -> Option<f64> {
276 if let Ok(mappings) = self.mappings.lock() {
278 let key = (source_ssrc, target_ssrc);
279
280 if let Some(mapping) = mappings.get(&key) {
281 let elapsed = Instant::now().duration_since(mapping.last_update).as_secs_f64();
283
284 let drift_ms = (mapping.drift_ppm / 1000.0) * elapsed;
286
287 return Some(drift_ms);
288 }
289 }
290
291 if let Ok(clocks) = self.clocks.lock() {
293 let source_clock = clocks.get(&source_ssrc)?;
294 let target_clock = clocks.get(&target_ssrc)?;
295
296 let now = SystemTime::now().duration_since(UNIX_EPOCH).ok()?;
298 let ntp = NtpTimestamp::from_duration_since_unix_epoch(now);
299
300 let source_rtp = source_clock.ntp_to_rtp(ntp);
302 let target_rtp = target_clock.ntp_to_rtp(ntp);
303
304 let source_seconds = source_rtp as f64 / source_clock.clock_rate() as f64;
306 let target_seconds = target_rtp as f64 / target_clock.clock_rate() as f64;
307
308 let offset_ms = (target_seconds - source_seconds) * 1000.0;
310
311 Some(offset_ms)
312 } else {
313 None
314 }
315 }
316}
317
318impl Default for TimestampMapper {
319 fn default() -> Self {
320 Self::new()
321 }
322}
323
324#[cfg(test)]
325mod tests {
326 use super::*;
327
328 #[test]
329 fn test_stream_mapping() {
330 let source_ssrc = 0x1234;
332 let target_ssrc = 0x5678;
333 let source_rate = 8000; let target_rate = 90000; let source_rtp = 1600; let target_rtp = 18000; let ntp = NtpTimestamp::now();
338
339 let mut mapping = StreamMapping::new(
340 source_ssrc, target_ssrc, source_rate, target_rate,
341 source_rtp, target_rtp, ntp
342 );
343
344 let source_rtp_400ms = source_rtp.wrapping_add(3200);
346 let target_rtp_400ms = mapping.map_timestamp(source_rtp_400ms);
347
348 let expected = target_rtp + 36000;
353 assert_eq!(target_rtp_400ms, expected);
354 }
355
356 #[test]
357 fn test_timestamp_mapper() {
358 let mapper = TimestampMapper::new();
359
360 let audio_ssrc = 0x1234;
362 let video_ssrc = 0x5678;
363 let audio_rate = 8000; let video_rate = 90000; mapper.register_stream(audio_ssrc, audio_rate, 800); mapper.register_stream(video_ssrc, video_rate, 9000); let ntp = NtpTimestamp::now();
371 let audio_rtp_200ms = 1600; let video_rtp_200ms = 18000; mapper.map_streams(audio_ssrc, video_ssrc, audio_rtp_200ms, video_rtp_200ms, ntp);
375
376 let audio_rtp_400ms = 3200; let video_rtp_400ms = mapper.map_timestamp(audio_ssrc, video_ssrc, audio_rtp_400ms);
379
380 assert_eq!(video_rtp_400ms, Some(video_rtp_200ms + 18000));
382 }
383}