rvoip_rtp_core/sync/
mapping.rs

1//! Timestamp mapping module
2//!
3//! This module provides mapping between different timestamp domains,
4//! such as RTP timestamps from different streams, NTP timestamps,
5//! and system time.
6
7use std::collections::HashMap;
8use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
9use std::sync::{Arc, Mutex};
10
11use crate::RtpSsrc;
12use crate::RtpTimestamp;
13use crate::packet::rtcp::NtpTimestamp;
14use super::clock::MediaClock;
15
16/// Mapping between a source stream and a target stream
17struct StreamMapping {
18    /// Source stream SSRC
19    source_ssrc: RtpSsrc,
20    
21    /// Target stream SSRC
22    target_ssrc: RtpSsrc,
23    
24    /// Source stream clock rate
25    source_rate: u32,
26    
27    /// Target stream clock rate
28    target_rate: u32,
29    
30    /// Source stream reference RTP timestamp
31    source_rtp_ref: RtpTimestamp,
32    
33    /// Target stream reference RTP timestamp
34    target_rtp_ref: RtpTimestamp,
35    
36    /// Common reference NTP timestamp
37    ntp_ref: NtpTimestamp,
38    
39    /// When this mapping was last updated
40    last_update: Instant,
41    
42    /// Measured clock drift between source and target (parts per million)
43    drift_ppm: f64,
44}
45
46impl StreamMapping {
47    /// Create a new mapping between two streams
48    pub fn new(
49        source_ssrc: RtpSsrc,
50        target_ssrc: RtpSsrc,
51        source_rate: u32,
52        target_rate: u32,
53        source_rtp: RtpTimestamp,
54        target_rtp: RtpTimestamp,
55        ntp: NtpTimestamp,
56    ) -> Self {
57        Self {
58            source_ssrc,
59            target_ssrc,
60            source_rate,
61            target_rate,
62            source_rtp_ref: source_rtp,
63            target_rtp_ref: target_rtp,
64            ntp_ref: ntp,
65            last_update: Instant::now(),
66            drift_ppm: 0.0,
67        }
68    }
69    
70    /// Map a source RTP timestamp to the equivalent in the target stream
71    pub fn map_timestamp(&self, source_rtp: RtpTimestamp) -> RtpTimestamp {
72        // Calculate time difference in source stream ticks
73        let source_diff = source_rtp.wrapping_sub(self.source_rtp_ref) as i64;
74        
75        // Convert to seconds, using source clock rate
76        let seconds = source_diff as f64 / self.source_rate as f64;
77        
78        // Apply drift correction
79        let corrected_seconds = seconds * (1.0 + self.drift_ppm / 1_000_000.0);
80        
81        // Convert to target stream ticks
82        let target_diff = (corrected_seconds * self.target_rate as f64) as i64;
83        
84        // Calculate target timestamp
85        self.target_rtp_ref.wrapping_add(target_diff as u32)
86    }
87    
88    /// Update the mapping with new reference timestamps
89    pub fn update(
90        &mut self, 
91        source_rtp: RtpTimestamp, 
92        target_rtp: RtpTimestamp, 
93        ntp: NtpTimestamp
94    ) -> f64 {
95        // Calculate elapsed time between updates
96        let now = Instant::now();
97        let elapsed = now.duration_since(self.last_update).as_secs_f64();
98        
99        if elapsed > 0.0 {
100            // Calculate drift between source and target
101            let source_ticks = source_rtp.wrapping_sub(self.source_rtp_ref) as i64;
102            let target_ticks = target_rtp.wrapping_sub(self.target_rtp_ref) as i64;
103            
104            let source_seconds = source_ticks as f64 / self.source_rate as f64;
105            let target_seconds = target_ticks as f64 / self.target_rate as f64;
106            
107            // Calculate drift in PPM (parts per million)
108            let drift = (target_seconds - source_seconds) / elapsed;
109            self.drift_ppm = drift * 1_000_000.0;
110        }
111        
112        // Update reference points
113        self.source_rtp_ref = source_rtp;
114        self.target_rtp_ref = target_rtp;
115        self.ntp_ref = ntp;
116        self.last_update = now;
117        
118        // Return drift in milliseconds per second
119        self.drift_ppm / 1000.0
120    }
121}
122
123/// Manages timestamp mappings between multiple streams
124#[derive(Clone)]
125pub struct TimestampMapper {
126    /// Maps from (source_ssrc, target_ssrc) to StreamMapping
127    mappings: Arc<Mutex<HashMap<(RtpSsrc, RtpSsrc), StreamMapping>>>,
128    
129    /// Maps from SSRC to its corresponding MediaClock
130    clocks: Arc<Mutex<HashMap<RtpSsrc, MediaClock>>>,
131}
132
133impl TimestampMapper {
134    /// Create a new timestamp mapper
135    pub fn new() -> Self {
136        Self {
137            mappings: Arc::new(Mutex::new(HashMap::new())),
138            clocks: Arc::new(Mutex::new(HashMap::new())),
139        }
140    }
141    
142    /// Register a new stream with its clock rate
143    pub fn register_stream(&self, ssrc: RtpSsrc, clock_rate: u32, initial_rtp: RtpTimestamp) {
144        if let Ok(mut clocks) = self.clocks.lock() {
145            // Create a new media clock for this stream
146            let clock = MediaClock::now(clock_rate, initial_rtp);
147            clocks.insert(ssrc, clock);
148        }
149    }
150    
151    /// Update stream timing information from an RTCP sender report
152    pub fn update_from_sr(&self, ssrc: RtpSsrc, ntp: NtpTimestamp, rtp: RtpTimestamp) {
153        if let Ok(mut clocks) = self.clocks.lock() {
154            // If we have a clock for this stream, update it
155            if let Some(clock) = clocks.get_mut(&ssrc) {
156                clock.update_reference(rtp, ntp);
157            } else {
158                // If not, create one if we know the clock rate
159                // (fallback to common rates based on payload type)
160                let clock_rate = 8000; // Default to 8kHz
161                let clock = MediaClock::new(clock_rate, rtp, ntp);
162                clocks.insert(ssrc, clock);
163            }
164        }
165    }
166    
167    /// Create or update a mapping between two streams
168    pub fn map_streams(
169        &self, 
170        source_ssrc: RtpSsrc,
171        target_ssrc: RtpSsrc,
172        source_rtp: RtpTimestamp,
173        target_rtp: RtpTimestamp,
174        ntp: NtpTimestamp,
175    ) -> Option<f64> {
176        // Get the clock rates
177        let (source_rate, target_rate) = if let Ok(clocks) = self.clocks.lock() {
178            let source_clock = clocks.get(&source_ssrc)?;
179            let target_clock = clocks.get(&target_ssrc)?;
180            (source_clock.clock_rate(), target_clock.clock_rate())
181        } else {
182            return None;
183        };
184        
185        if let Ok(mut mappings) = self.mappings.lock() {
186            let key = (source_ssrc, target_ssrc);
187            
188            // Check if mapping already exists
189            if let Some(mapping) = mappings.get_mut(&key) {
190                // Update existing mapping
191                let drift = mapping.update(source_rtp, target_rtp, ntp);
192                Some(drift)
193            } else {
194                // Create new mapping
195                let mapping = StreamMapping::new(
196                    source_ssrc, target_ssrc, source_rate, target_rate,
197                    source_rtp, target_rtp, ntp
198                );
199                mappings.insert(key, mapping);
200                Some(0.0) // No drift initially
201            }
202        } else {
203            None
204        }
205    }
206    
207    /// Map a timestamp from one stream to another
208    pub fn map_timestamp(
209        &self,
210        source_ssrc: RtpSsrc,
211        target_ssrc: RtpSsrc,
212        source_rtp: RtpTimestamp,
213    ) -> Option<RtpTimestamp> {
214        if let Ok(mappings) = self.mappings.lock() {
215            let key = (source_ssrc, target_ssrc);
216            
217            if let Some(mapping) = mappings.get(&key) {
218                // Direct mapping exists
219                Some(mapping.map_timestamp(source_rtp))
220            } else {
221                // Try indirect mapping via NTP timestamps
222                if let Ok(clocks) = self.clocks.lock() {
223                    let source_clock = clocks.get(&source_ssrc)?;
224                    let target_clock = clocks.get(&target_ssrc)?;
225                    
226                    // Convert source RTP to NTP
227                    let ntp = source_clock.rtp_to_ntp(source_rtp);
228                    
229                    // Convert NTP to target RTP
230                    Some(target_clock.ntp_to_rtp(ntp))
231                } else {
232                    None
233                }
234            }
235        } else {
236            None
237        }
238    }
239    
240    /// Get estimated clock drift between two streams in PPM
241    pub fn get_drift(&self, source_ssrc: RtpSsrc, target_ssrc: RtpSsrc) -> Option<f64> {
242        if let Ok(mappings) = self.mappings.lock() {
243            let key = (source_ssrc, target_ssrc);
244            
245            mappings.get(&key).map(|mapping| mapping.drift_ppm)
246        } else {
247            None
248        }
249    }
250    
251    /// Convert an RTP timestamp to wall clock time
252    pub fn rtp_to_wallclock(&self, ssrc: RtpSsrc, rtp: RtpTimestamp) -> Option<Instant> {
253        if let Ok(clocks) = self.clocks.lock() {
254            let clock = clocks.get(&ssrc)?;
255            Some(clock.rtp_to_system_time(rtp))
256        } else {
257            None
258        }
259    }
260    
261    /// Convert wall clock time to an RTP timestamp
262    pub fn wallclock_to_rtp(&self, ssrc: RtpSsrc, time: Instant) -> Option<RtpTimestamp> {
263        if let Ok(clocks) = self.clocks.lock() {
264            let clock = clocks.get(&ssrc)?;
265            Some(clock.system_time_to_rtp(time))
266        } else {
267            None
268        }
269    }
270    
271    /// Get the synchronization offset between two streams in milliseconds
272    ///
273    /// Returns a positive value if target stream is ahead of source stream
274    /// and should be delayed to achieve synchronization.
275    pub fn get_sync_offset(&self, source_ssrc: RtpSsrc, target_ssrc: RtpSsrc) -> Option<f64> {
276        // Try to get the direct mapping
277        if let Ok(mappings) = self.mappings.lock() {
278            let key = (source_ssrc, target_ssrc);
279            
280            if let Some(mapping) = mappings.get(&key) {
281                // Calculate how far the streams have drifted
282                let elapsed = Instant::now().duration_since(mapping.last_update).as_secs_f64();
283                
284                // Drift in ms per second * elapsed seconds = total drift in ms
285                let drift_ms = (mapping.drift_ppm / 1000.0) * elapsed;
286                
287                return Some(drift_ms);
288            }
289        }
290        
291        // If no direct mapping, try to calculate via NTP timestamps
292        if let Ok(clocks) = self.clocks.lock() {
293            let source_clock = clocks.get(&source_ssrc)?;
294            let target_clock = clocks.get(&target_ssrc)?;
295            
296            // Create a common reference point (now)
297            let now = SystemTime::now().duration_since(UNIX_EPOCH).ok()?;
298            let ntp = NtpTimestamp::from_duration_since_unix_epoch(now);
299            
300            // Convert to RTP timestamps in each stream's clock domain
301            let source_rtp = source_clock.ntp_to_rtp(ntp);
302            let target_rtp = target_clock.ntp_to_rtp(ntp);
303            
304            // Calculate playback points in seconds
305            let source_seconds = source_rtp as f64 / source_clock.clock_rate() as f64;
306            let target_seconds = target_rtp as f64 / target_clock.clock_rate() as f64;
307            
308            // Calculate offset in milliseconds
309            let offset_ms = (target_seconds - source_seconds) * 1000.0;
310            
311            Some(offset_ms)
312        } else {
313            None
314        }
315    }
316}
317
318impl Default for TimestampMapper {
319    fn default() -> Self {
320        Self::new()
321    }
322}
323
324#[cfg(test)]
325mod tests {
326    use super::*;
327    
328    #[test]
329    fn test_stream_mapping() {
330        // Create a mapping between two streams
331        let source_ssrc = 0x1234;
332        let target_ssrc = 0x5678;
333        let source_rate = 8000;  // 8 kHz audio
334        let target_rate = 90000; // 90 kHz video
335        let source_rtp = 1600;   // 200ms of audio
336        let target_rtp = 18000;  // 200ms of video
337        let ntp = NtpTimestamp::now();
338        
339        let mut mapping = StreamMapping::new(
340            source_ssrc, target_ssrc, source_rate, target_rate,
341            source_rtp, target_rtp, ntp
342        );
343        
344        // Test mapping 400ms (3200 samples) of audio to video
345        let source_rtp_400ms = source_rtp.wrapping_add(3200);
346        let target_rtp_400ms = mapping.map_timestamp(source_rtp_400ms);
347        
348        // Calculate expected value:
349        // 3200 - 1600 = 1600 audio samples = 200ms
350        // 200ms * 90000/1000 samples/ms = 18000 video samples
351        // 18000 + 18000 = 36000
352        let expected = target_rtp + 36000;
353        assert_eq!(target_rtp_400ms, expected);
354    }
355    
356    #[test]
357    fn test_timestamp_mapper() {
358        let mapper = TimestampMapper::new();
359        
360        // Register two streams
361        let audio_ssrc = 0x1234;
362        let video_ssrc = 0x5678;
363        let audio_rate = 8000;  // 8 kHz
364        let video_rate = 90000; // 90 kHz
365        
366        mapper.register_stream(audio_ssrc, audio_rate, 800); // 100ms
367        mapper.register_stream(video_ssrc, video_rate, 9000); // 100ms
368        
369        // Create mapping with both at 200ms
370        let ntp = NtpTimestamp::now();
371        let audio_rtp_200ms = 1600; // 200ms at 8kHz
372        let video_rtp_200ms = 18000; // 200ms at 90kHz
373        
374        mapper.map_streams(audio_ssrc, video_ssrc, audio_rtp_200ms, video_rtp_200ms, ntp);
375        
376        // Map 400ms of audio to video
377        let audio_rtp_400ms = 3200; // 400ms at 8kHz
378        let video_rtp_400ms = mapper.map_timestamp(audio_ssrc, video_ssrc, audio_rtp_400ms);
379        
380        // Expected: video_rtp_200ms + (400ms - 200ms) * 90kHz = video_rtp_200ms + 18000
381        assert_eq!(video_rtp_400ms, Some(video_rtp_200ms + 18000));
382    }
383}