Skip to main content

oximedia_align/
multi_stream.rs

1//! Multi-stream alignment for synchronizing groups of audio/video streams.
2//!
3//! Provides group alignment, reference stream selection, and bulk offset
4//! computation for production workflows with many simultaneous camera/audio feeds.
5
6#![allow(dead_code)]
7#![allow(clippy::cast_precision_loss)]
8#![allow(clippy::too_many_arguments)]
9
10use serde::{Deserialize, Serialize};
11use std::collections::HashMap;
12
13/// Identifier for a stream in a multi-stream group
14pub type StreamId = u32;
15
16/// Offset of one stream relative to a reference stream
17#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)]
18pub struct StreamOffset {
19    /// ID of this stream
20    pub stream_id: StreamId,
21    /// ID of the reference stream
22    pub reference_id: StreamId,
23    /// Offset in samples (positive = this stream is delayed relative to reference)
24    pub offset_samples: i64,
25    /// Confidence in this measurement (0.0 to 1.0)
26    pub confidence: f64,
27}
28
29impl StreamOffset {
30    /// Create a new stream offset
31    #[must_use]
32    pub fn new(
33        stream_id: StreamId,
34        reference_id: StreamId,
35        offset_samples: i64,
36        confidence: f64,
37    ) -> Self {
38        Self {
39            stream_id,
40            reference_id,
41            offset_samples,
42            confidence,
43        }
44    }
45
46    /// Convert offset to milliseconds
47    #[must_use]
48    pub fn to_ms(&self, sample_rate: u32) -> f64 {
49        self.offset_samples as f64 / f64::from(sample_rate) * 1000.0
50    }
51
52    /// Invert the offset (swap stream and reference perspective)
53    #[must_use]
54    pub fn invert(&self) -> Self {
55        Self {
56            stream_id: self.reference_id,
57            reference_id: self.stream_id,
58            offset_samples: -self.offset_samples,
59            confidence: self.confidence,
60        }
61    }
62}
63
64/// Strategy for selecting the reference stream
65#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
66pub enum ReferenceStrategy {
67    /// Use stream with highest overall confidence as reference
68    HighestConfidence,
69    /// Use a manually specified stream ID
70    Manual(StreamId),
71    /// Use the stream with the most connections to other streams
72    MostConnected,
73    /// Use the stream that minimizes total adjustment magnitude
74    MinTotalAdjustment,
75}
76
77/// Group of streams to be aligned together
78#[derive(Debug, Clone)]
79pub struct StreamGroup {
80    /// Name of this group
81    pub name: String,
82    /// Stream IDs in this group
83    pub stream_ids: Vec<StreamId>,
84    /// Pairwise offsets measured between streams
85    offsets: Vec<StreamOffset>,
86    /// Selected reference stream
87    reference_id: Option<StreamId>,
88}
89
90impl StreamGroup {
91    /// Create a new empty stream group
92    #[must_use]
93    pub fn new(name: impl Into<String>) -> Self {
94        Self {
95            name: name.into(),
96            stream_ids: Vec::new(),
97            offsets: Vec::new(),
98            reference_id: None,
99        }
100    }
101
102    /// Add a stream to the group
103    pub fn add_stream(&mut self, id: StreamId) {
104        if !self.stream_ids.contains(&id) {
105            self.stream_ids.push(id);
106        }
107    }
108
109    /// Add a measured offset between two streams
110    pub fn add_offset(&mut self, offset: StreamOffset) {
111        self.offsets.push(offset);
112    }
113
114    /// Number of streams in the group
115    #[must_use]
116    pub fn len(&self) -> usize {
117        self.stream_ids.len()
118    }
119
120    /// Returns true if the group is empty
121    #[must_use]
122    pub fn is_empty(&self) -> bool {
123        self.stream_ids.is_empty()
124    }
125
126    /// Select reference stream using the given strategy
127    pub fn select_reference(&mut self, strategy: ReferenceStrategy) -> Option<StreamId> {
128        if self.stream_ids.is_empty() {
129            return None;
130        }
131
132        let ref_id = match strategy {
133            ReferenceStrategy::Manual(id) => {
134                if self.stream_ids.contains(&id) {
135                    id
136                } else {
137                    return None;
138                }
139            }
140            ReferenceStrategy::HighestConfidence => self.stream_with_highest_confidence(),
141            ReferenceStrategy::MostConnected => self.most_connected_stream(),
142            ReferenceStrategy::MinTotalAdjustment => self.min_adjustment_stream(),
143        };
144
145        self.reference_id = Some(ref_id);
146        Some(ref_id)
147    }
148
149    /// Get the current reference stream
150    #[must_use]
151    pub fn reference_id(&self) -> Option<StreamId> {
152        self.reference_id
153    }
154
155    /// Stream with highest average confidence in its offset measurements
156    fn stream_with_highest_confidence(&self) -> StreamId {
157        let mut confidence_sum: HashMap<StreamId, (f64, usize)> = HashMap::new();
158
159        for offset in &self.offsets {
160            let entry = confidence_sum.entry(offset.stream_id).or_insert((0.0, 0));
161            entry.0 += offset.confidence;
162            entry.1 += 1;
163
164            let entry = confidence_sum
165                .entry(offset.reference_id)
166                .or_insert((0.0, 0));
167            entry.0 += offset.confidence;
168            entry.1 += 1;
169        }
170
171        self.stream_ids
172            .iter()
173            .copied()
174            .max_by(|&a, &b| {
175                let avg_a = confidence_sum.get(&a).map_or(0.0, |(s, c)| s / *c as f64);
176                let avg_b = confidence_sum.get(&b).map_or(0.0, |(s, c)| s / *c as f64);
177                avg_a
178                    .partial_cmp(&avg_b)
179                    .unwrap_or(std::cmp::Ordering::Equal)
180            })
181            .unwrap_or(self.stream_ids[0])
182    }
183
184    /// Stream with the most offset connections to other streams
185    fn most_connected_stream(&self) -> StreamId {
186        let mut connections: HashMap<StreamId, usize> = HashMap::new();
187        for offset in &self.offsets {
188            *connections.entry(offset.stream_id).or_insert(0) += 1;
189            *connections.entry(offset.reference_id).or_insert(0) += 1;
190        }
191
192        self.stream_ids
193            .iter()
194            .copied()
195            .max_by_key(|id| connections.get(id).copied().unwrap_or(0))
196            .unwrap_or(self.stream_ids[0])
197    }
198
199    /// Stream that minimizes total adjustment magnitude when used as reference
200    fn min_adjustment_stream(&self) -> StreamId {
201        self.stream_ids
202            .iter()
203            .copied()
204            .min_by(|&a, &b| {
205                let total_a = self.total_adjustment_for_reference(a);
206                let total_b = self.total_adjustment_for_reference(b);
207                total_a
208                    .partial_cmp(&total_b)
209                    .unwrap_or(std::cmp::Ordering::Equal)
210            })
211            .unwrap_or(self.stream_ids[0])
212    }
213
214    /// Compute total absolute adjustment if a given stream were used as reference
215    fn total_adjustment_for_reference(&self, ref_id: StreamId) -> f64 {
216        let bulk = self.compute_bulk_offsets_for_reference(ref_id);
217        bulk.values()
218            .map(|o| o.offset_samples.unsigned_abs() as f64)
219            .sum()
220    }
221
222    /// Compute bulk offsets for all streams relative to a given reference
223    #[must_use]
224    pub fn compute_bulk_offsets(&self) -> HashMap<StreamId, StreamOffset> {
225        let ref_id = self
226            .reference_id
227            .unwrap_or_else(|| self.stream_ids.first().copied().unwrap_or(0));
228        self.compute_bulk_offsets_for_reference(ref_id)
229    }
230
231    /// Compute bulk offsets for all streams relative to a specific reference
232    #[must_use]
233    pub fn compute_bulk_offsets_for_reference(
234        &self,
235        ref_id: StreamId,
236    ) -> HashMap<StreamId, StreamOffset> {
237        let mut result = HashMap::new();
238
239        for &sid in &self.stream_ids {
240            if sid == ref_id {
241                result.insert(sid, StreamOffset::new(sid, ref_id, 0, 1.0));
242                continue;
243            }
244
245            // Look for direct measurement
246            let direct = self.offsets.iter().find(|o| {
247                (o.stream_id == sid && o.reference_id == ref_id)
248                    || (o.stream_id == ref_id && o.reference_id == sid)
249            });
250
251            if let Some(off) = direct {
252                let offset = if off.stream_id == sid {
253                    *off
254                } else {
255                    off.invert()
256                };
257                result.insert(
258                    sid,
259                    StreamOffset::new(sid, ref_id, offset.offset_samples, offset.confidence),
260                );
261            }
262        }
263
264        result
265    }
266
267    /// Get all offsets in the group
268    #[must_use]
269    pub fn offsets(&self) -> &[StreamOffset] {
270        &self.offsets
271    }
272}
273
274/// Multi-stream aligner that manages multiple groups
275#[derive(Debug, Clone)]
276pub struct MultiStreamAligner {
277    /// Named groups of streams
278    groups: HashMap<String, StreamGroup>,
279    /// Default reference strategy
280    strategy: ReferenceStrategy,
281    /// Sample rate used for time calculations
282    sample_rate: u32,
283}
284
285impl MultiStreamAligner {
286    /// Create a new multi-stream aligner
287    #[must_use]
288    pub fn new(sample_rate: u32) -> Self {
289        Self {
290            groups: HashMap::new(),
291            strategy: ReferenceStrategy::HighestConfidence,
292            sample_rate,
293        }
294    }
295
296    /// Add a stream group
297    pub fn add_group(&mut self, group: StreamGroup) {
298        self.groups.insert(group.name.clone(), group);
299    }
300
301    /// Get a reference to a group by name
302    #[must_use]
303    pub fn group(&self, name: &str) -> Option<&StreamGroup> {
304        self.groups.get(name)
305    }
306
307    /// Get a mutable reference to a group by name
308    pub fn group_mut(&mut self, name: &str) -> Option<&mut StreamGroup> {
309        self.groups.get_mut(name)
310    }
311
312    /// Set the reference strategy
313    pub fn set_strategy(&mut self, strategy: ReferenceStrategy) {
314        self.strategy = strategy;
315    }
316
317    /// Align all groups using the current strategy
318    pub fn align_all(&mut self) -> HashMap<String, HashMap<StreamId, StreamOffset>> {
319        let strategy = self.strategy;
320        let mut result = HashMap::new();
321
322        let names: Vec<String> = self.groups.keys().cloned().collect();
323        for name in names {
324            if let Some(group) = self.groups.get_mut(&name) {
325                group.select_reference(strategy);
326                let offsets = group.compute_bulk_offsets();
327                result.insert(name, offsets);
328            }
329        }
330
331        result
332    }
333
334    /// Total number of streams across all groups
335    #[must_use]
336    pub fn total_streams(&self) -> usize {
337        self.groups.values().map(StreamGroup::len).sum()
338    }
339
340    /// Total number of groups
341    #[must_use]
342    pub fn group_count(&self) -> usize {
343        self.groups.len()
344    }
345
346    /// Get sample rate
347    #[must_use]
348    pub fn sample_rate(&self) -> u32 {
349        self.sample_rate
350    }
351}
352
353/// Summary statistics for a completed alignment
354#[derive(Debug, Clone)]
355pub struct AlignmentSummary {
356    /// Number of streams aligned
357    pub stream_count: usize,
358    /// Maximum offset magnitude in samples
359    pub max_offset_samples: i64,
360    /// Average confidence
361    pub average_confidence: f64,
362    /// Number of streams with low confidence
363    pub low_confidence_count: usize,
364}
365
366impl AlignmentSummary {
367    /// Build a summary from a bulk offset map
368    #[must_use]
369    pub fn from_offsets(offsets: &HashMap<StreamId, StreamOffset>) -> Self {
370        let count = offsets.len();
371        if count == 0 {
372            return Self {
373                stream_count: 0,
374                max_offset_samples: 0,
375                average_confidence: 0.0,
376                low_confidence_count: 0,
377            };
378        }
379
380        let max_off = offsets
381            .values()
382            .map(|o| o.offset_samples.abs())
383            .max()
384            .unwrap_or(0);
385
386        let avg_conf = offsets.values().map(|o| o.confidence).sum::<f64>() / count as f64;
387
388        let low_conf = offsets.values().filter(|o| o.confidence < 0.5).count();
389
390        Self {
391            stream_count: count,
392            max_offset_samples: max_off,
393            average_confidence: avg_conf,
394            low_confidence_count: low_conf,
395        }
396    }
397}
398
399#[cfg(test)]
400mod tests {
401    use super::*;
402
403    fn make_group_with_offsets() -> StreamGroup {
404        let mut group = StreamGroup::new("cameras");
405        group.add_stream(1);
406        group.add_stream(2);
407        group.add_stream(3);
408        // Stream 2 is 100 samples ahead of stream 1
409        group.add_offset(StreamOffset::new(2, 1, -100, 0.9));
410        // Stream 3 is 200 samples behind stream 1
411        group.add_offset(StreamOffset::new(3, 1, 200, 0.8));
412        group
413    }
414
415    #[test]
416    fn test_stream_offset_creation() {
417        let off = StreamOffset::new(2, 1, 500, 0.9);
418        assert_eq!(off.stream_id, 2);
419        assert_eq!(off.reference_id, 1);
420        assert_eq!(off.offset_samples, 500);
421    }
422
423    #[test]
424    fn test_stream_offset_to_ms() {
425        let off = StreamOffset::new(2, 1, 4800, 0.9);
426        let ms = off.to_ms(48000);
427        assert!((ms - 100.0).abs() < 1e-6);
428    }
429
430    #[test]
431    fn test_stream_offset_invert() {
432        let off = StreamOffset::new(2, 1, 500, 0.9);
433        let inv = off.invert();
434        assert_eq!(inv.stream_id, 1);
435        assert_eq!(inv.reference_id, 2);
436        assert_eq!(inv.offset_samples, -500);
437    }
438
439    #[test]
440    fn test_stream_group_creation() {
441        let group = StreamGroup::new("test");
442        assert_eq!(group.name, "test");
443        assert!(group.is_empty());
444    }
445
446    #[test]
447    fn test_stream_group_add_stream() {
448        let mut group = StreamGroup::new("test");
449        group.add_stream(1);
450        group.add_stream(2);
451        group.add_stream(1); // duplicate should not be added
452        assert_eq!(group.len(), 2);
453    }
454
455    #[test]
456    fn test_stream_group_manual_reference() {
457        let mut group = make_group_with_offsets();
458        let ref_id = group.select_reference(ReferenceStrategy::Manual(1));
459        assert_eq!(ref_id, Some(1));
460        assert_eq!(group.reference_id(), Some(1));
461    }
462
463    #[test]
464    fn test_stream_group_manual_reference_invalid() {
465        let mut group = make_group_with_offsets();
466        let ref_id = group.select_reference(ReferenceStrategy::Manual(99));
467        assert_eq!(ref_id, None);
468    }
469
470    #[test]
471    fn test_stream_group_highest_confidence_reference() {
472        let mut group = make_group_with_offsets();
473        let ref_id = group.select_reference(ReferenceStrategy::HighestConfidence);
474        assert!(ref_id.is_some());
475        assert!(group
476            .stream_ids
477            .contains(&ref_id.expect("test expectation failed")));
478    }
479
480    #[test]
481    fn test_stream_group_most_connected_reference() {
482        let mut group = make_group_with_offsets();
483        let ref_id = group.select_reference(ReferenceStrategy::MostConnected);
484        // Stream 1 appears in both offsets, so should be most connected
485        assert_eq!(ref_id, Some(1));
486    }
487
488    #[test]
489    fn test_stream_group_bulk_offsets() {
490        let mut group = make_group_with_offsets();
491        group.select_reference(ReferenceStrategy::Manual(1));
492        let offsets = group.compute_bulk_offsets();
493        // Stream 1 should have 0 offset (reference)
494        assert_eq!(offsets[&1].offset_samples, 0);
495        // Stream 2 is -100 relative to 1
496        assert_eq!(offsets[&2].offset_samples, -100);
497        // Stream 3 is +200 relative to 1
498        assert_eq!(offsets[&3].offset_samples, 200);
499    }
500
501    #[test]
502    fn test_multi_stream_aligner_creation() {
503        let aligner = MultiStreamAligner::new(48000);
504        assert_eq!(aligner.sample_rate(), 48000);
505        assert_eq!(aligner.group_count(), 0);
506    }
507
508    #[test]
509    fn test_multi_stream_aligner_add_group() {
510        let mut aligner = MultiStreamAligner::new(48000);
511        let mut group = StreamGroup::new("cameras");
512        group.add_stream(1);
513        group.add_stream(2);
514        aligner.add_group(group);
515        assert_eq!(aligner.group_count(), 1);
516        assert_eq!(aligner.total_streams(), 2);
517    }
518
519    #[test]
520    fn test_multi_stream_aligner_get_group() {
521        let mut aligner = MultiStreamAligner::new(48000);
522        aligner.add_group(StreamGroup::new("cameras"));
523        assert!(aligner.group("cameras").is_some());
524        assert!(aligner.group("missing").is_none());
525    }
526
527    #[test]
528    fn test_multi_stream_aligner_align_all() {
529        let mut aligner = MultiStreamAligner::new(48000);
530        let group = make_group_with_offsets();
531        aligner.add_group(group);
532        aligner.set_strategy(ReferenceStrategy::Manual(1));
533        let results = aligner.align_all();
534        assert!(results.contains_key("cameras"));
535    }
536
537    #[test]
538    fn test_alignment_summary_empty() {
539        let summary = AlignmentSummary::from_offsets(&HashMap::new());
540        assert_eq!(summary.stream_count, 0);
541        assert_eq!(summary.max_offset_samples, 0);
542    }
543
544    #[test]
545    fn test_alignment_summary_from_offsets() {
546        let mut offsets = HashMap::new();
547        offsets.insert(1u32, StreamOffset::new(1, 1, 0, 1.0));
548        offsets.insert(2u32, StreamOffset::new(2, 1, 200, 0.9));
549        offsets.insert(3u32, StreamOffset::new(3, 1, -300, 0.3));
550
551        let summary = AlignmentSummary::from_offsets(&offsets);
552        assert_eq!(summary.stream_count, 3);
553        assert_eq!(summary.max_offset_samples, 300);
554        assert_eq!(summary.low_confidence_count, 1); // stream 3 has 0.3
555    }
556}