1#![allow(dead_code)]
7#![allow(clippy::cast_precision_loss)]
8#![allow(clippy::too_many_arguments)]
9
10use serde::{Deserialize, Serialize};
11use std::collections::HashMap;
12
13pub type StreamId = u32;
15
16#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)]
18pub struct StreamOffset {
19 pub stream_id: StreamId,
21 pub reference_id: StreamId,
23 pub offset_samples: i64,
25 pub confidence: f64,
27}
28
29impl StreamOffset {
30 #[must_use]
32 pub fn new(
33 stream_id: StreamId,
34 reference_id: StreamId,
35 offset_samples: i64,
36 confidence: f64,
37 ) -> Self {
38 Self {
39 stream_id,
40 reference_id,
41 offset_samples,
42 confidence,
43 }
44 }
45
46 #[must_use]
48 pub fn to_ms(&self, sample_rate: u32) -> f64 {
49 self.offset_samples as f64 / f64::from(sample_rate) * 1000.0
50 }
51
52 #[must_use]
54 pub fn invert(&self) -> Self {
55 Self {
56 stream_id: self.reference_id,
57 reference_id: self.stream_id,
58 offset_samples: -self.offset_samples,
59 confidence: self.confidence,
60 }
61 }
62}
63
64#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
66pub enum ReferenceStrategy {
67 HighestConfidence,
69 Manual(StreamId),
71 MostConnected,
73 MinTotalAdjustment,
75}
76
77#[derive(Debug, Clone)]
79pub struct StreamGroup {
80 pub name: String,
82 pub stream_ids: Vec<StreamId>,
84 offsets: Vec<StreamOffset>,
86 reference_id: Option<StreamId>,
88}
89
90impl StreamGroup {
91 #[must_use]
93 pub fn new(name: impl Into<String>) -> Self {
94 Self {
95 name: name.into(),
96 stream_ids: Vec::new(),
97 offsets: Vec::new(),
98 reference_id: None,
99 }
100 }
101
102 pub fn add_stream(&mut self, id: StreamId) {
104 if !self.stream_ids.contains(&id) {
105 self.stream_ids.push(id);
106 }
107 }
108
109 pub fn add_offset(&mut self, offset: StreamOffset) {
111 self.offsets.push(offset);
112 }
113
114 #[must_use]
116 pub fn len(&self) -> usize {
117 self.stream_ids.len()
118 }
119
120 #[must_use]
122 pub fn is_empty(&self) -> bool {
123 self.stream_ids.is_empty()
124 }
125
126 pub fn select_reference(&mut self, strategy: ReferenceStrategy) -> Option<StreamId> {
128 if self.stream_ids.is_empty() {
129 return None;
130 }
131
132 let ref_id = match strategy {
133 ReferenceStrategy::Manual(id) => {
134 if self.stream_ids.contains(&id) {
135 id
136 } else {
137 return None;
138 }
139 }
140 ReferenceStrategy::HighestConfidence => self.stream_with_highest_confidence(),
141 ReferenceStrategy::MostConnected => self.most_connected_stream(),
142 ReferenceStrategy::MinTotalAdjustment => self.min_adjustment_stream(),
143 };
144
145 self.reference_id = Some(ref_id);
146 Some(ref_id)
147 }
148
149 #[must_use]
151 pub fn reference_id(&self) -> Option<StreamId> {
152 self.reference_id
153 }
154
155 fn stream_with_highest_confidence(&self) -> StreamId {
157 let mut confidence_sum: HashMap<StreamId, (f64, usize)> = HashMap::new();
158
159 for offset in &self.offsets {
160 let entry = confidence_sum.entry(offset.stream_id).or_insert((0.0, 0));
161 entry.0 += offset.confidence;
162 entry.1 += 1;
163
164 let entry = confidence_sum
165 .entry(offset.reference_id)
166 .or_insert((0.0, 0));
167 entry.0 += offset.confidence;
168 entry.1 += 1;
169 }
170
171 self.stream_ids
172 .iter()
173 .copied()
174 .max_by(|&a, &b| {
175 let avg_a = confidence_sum.get(&a).map_or(0.0, |(s, c)| s / *c as f64);
176 let avg_b = confidence_sum.get(&b).map_or(0.0, |(s, c)| s / *c as f64);
177 avg_a
178 .partial_cmp(&avg_b)
179 .unwrap_or(std::cmp::Ordering::Equal)
180 })
181 .unwrap_or(self.stream_ids[0])
182 }
183
184 fn most_connected_stream(&self) -> StreamId {
186 let mut connections: HashMap<StreamId, usize> = HashMap::new();
187 for offset in &self.offsets {
188 *connections.entry(offset.stream_id).or_insert(0) += 1;
189 *connections.entry(offset.reference_id).or_insert(0) += 1;
190 }
191
192 self.stream_ids
193 .iter()
194 .copied()
195 .max_by_key(|id| connections.get(id).copied().unwrap_or(0))
196 .unwrap_or(self.stream_ids[0])
197 }
198
199 fn min_adjustment_stream(&self) -> StreamId {
201 self.stream_ids
202 .iter()
203 .copied()
204 .min_by(|&a, &b| {
205 let total_a = self.total_adjustment_for_reference(a);
206 let total_b = self.total_adjustment_for_reference(b);
207 total_a
208 .partial_cmp(&total_b)
209 .unwrap_or(std::cmp::Ordering::Equal)
210 })
211 .unwrap_or(self.stream_ids[0])
212 }
213
214 fn total_adjustment_for_reference(&self, ref_id: StreamId) -> f64 {
216 let bulk = self.compute_bulk_offsets_for_reference(ref_id);
217 bulk.values()
218 .map(|o| o.offset_samples.unsigned_abs() as f64)
219 .sum()
220 }
221
222 #[must_use]
224 pub fn compute_bulk_offsets(&self) -> HashMap<StreamId, StreamOffset> {
225 let ref_id = self
226 .reference_id
227 .unwrap_or_else(|| self.stream_ids.first().copied().unwrap_or(0));
228 self.compute_bulk_offsets_for_reference(ref_id)
229 }
230
231 #[must_use]
233 pub fn compute_bulk_offsets_for_reference(
234 &self,
235 ref_id: StreamId,
236 ) -> HashMap<StreamId, StreamOffset> {
237 let mut result = HashMap::new();
238
239 for &sid in &self.stream_ids {
240 if sid == ref_id {
241 result.insert(sid, StreamOffset::new(sid, ref_id, 0, 1.0));
242 continue;
243 }
244
245 let direct = self.offsets.iter().find(|o| {
247 (o.stream_id == sid && o.reference_id == ref_id)
248 || (o.stream_id == ref_id && o.reference_id == sid)
249 });
250
251 if let Some(off) = direct {
252 let offset = if off.stream_id == sid {
253 *off
254 } else {
255 off.invert()
256 };
257 result.insert(
258 sid,
259 StreamOffset::new(sid, ref_id, offset.offset_samples, offset.confidence),
260 );
261 }
262 }
263
264 result
265 }
266
267 #[must_use]
269 pub fn offsets(&self) -> &[StreamOffset] {
270 &self.offsets
271 }
272}
273
274#[derive(Debug, Clone)]
276pub struct MultiStreamAligner {
277 groups: HashMap<String, StreamGroup>,
279 strategy: ReferenceStrategy,
281 sample_rate: u32,
283}
284
285impl MultiStreamAligner {
286 #[must_use]
288 pub fn new(sample_rate: u32) -> Self {
289 Self {
290 groups: HashMap::new(),
291 strategy: ReferenceStrategy::HighestConfidence,
292 sample_rate,
293 }
294 }
295
296 pub fn add_group(&mut self, group: StreamGroup) {
298 self.groups.insert(group.name.clone(), group);
299 }
300
301 #[must_use]
303 pub fn group(&self, name: &str) -> Option<&StreamGroup> {
304 self.groups.get(name)
305 }
306
307 pub fn group_mut(&mut self, name: &str) -> Option<&mut StreamGroup> {
309 self.groups.get_mut(name)
310 }
311
312 pub fn set_strategy(&mut self, strategy: ReferenceStrategy) {
314 self.strategy = strategy;
315 }
316
317 pub fn align_all(&mut self) -> HashMap<String, HashMap<StreamId, StreamOffset>> {
319 let strategy = self.strategy;
320 let mut result = HashMap::new();
321
322 let names: Vec<String> = self.groups.keys().cloned().collect();
323 for name in names {
324 if let Some(group) = self.groups.get_mut(&name) {
325 group.select_reference(strategy);
326 let offsets = group.compute_bulk_offsets();
327 result.insert(name, offsets);
328 }
329 }
330
331 result
332 }
333
334 #[must_use]
336 pub fn total_streams(&self) -> usize {
337 self.groups.values().map(StreamGroup::len).sum()
338 }
339
340 #[must_use]
342 pub fn group_count(&self) -> usize {
343 self.groups.len()
344 }
345
346 #[must_use]
348 pub fn sample_rate(&self) -> u32 {
349 self.sample_rate
350 }
351}
352
353#[derive(Debug, Clone)]
355pub struct AlignmentSummary {
356 pub stream_count: usize,
358 pub max_offset_samples: i64,
360 pub average_confidence: f64,
362 pub low_confidence_count: usize,
364}
365
366impl AlignmentSummary {
367 #[must_use]
369 pub fn from_offsets(offsets: &HashMap<StreamId, StreamOffset>) -> Self {
370 let count = offsets.len();
371 if count == 0 {
372 return Self {
373 stream_count: 0,
374 max_offset_samples: 0,
375 average_confidence: 0.0,
376 low_confidence_count: 0,
377 };
378 }
379
380 let max_off = offsets
381 .values()
382 .map(|o| o.offset_samples.abs())
383 .max()
384 .unwrap_or(0);
385
386 let avg_conf = offsets.values().map(|o| o.confidence).sum::<f64>() / count as f64;
387
388 let low_conf = offsets.values().filter(|o| o.confidence < 0.5).count();
389
390 Self {
391 stream_count: count,
392 max_offset_samples: max_off,
393 average_confidence: avg_conf,
394 low_confidence_count: low_conf,
395 }
396 }
397}
398
399#[cfg(test)]
400mod tests {
401 use super::*;
402
403 fn make_group_with_offsets() -> StreamGroup {
404 let mut group = StreamGroup::new("cameras");
405 group.add_stream(1);
406 group.add_stream(2);
407 group.add_stream(3);
408 group.add_offset(StreamOffset::new(2, 1, -100, 0.9));
410 group.add_offset(StreamOffset::new(3, 1, 200, 0.8));
412 group
413 }
414
415 #[test]
416 fn test_stream_offset_creation() {
417 let off = StreamOffset::new(2, 1, 500, 0.9);
418 assert_eq!(off.stream_id, 2);
419 assert_eq!(off.reference_id, 1);
420 assert_eq!(off.offset_samples, 500);
421 }
422
423 #[test]
424 fn test_stream_offset_to_ms() {
425 let off = StreamOffset::new(2, 1, 4800, 0.9);
426 let ms = off.to_ms(48000);
427 assert!((ms - 100.0).abs() < 1e-6);
428 }
429
430 #[test]
431 fn test_stream_offset_invert() {
432 let off = StreamOffset::new(2, 1, 500, 0.9);
433 let inv = off.invert();
434 assert_eq!(inv.stream_id, 1);
435 assert_eq!(inv.reference_id, 2);
436 assert_eq!(inv.offset_samples, -500);
437 }
438
439 #[test]
440 fn test_stream_group_creation() {
441 let group = StreamGroup::new("test");
442 assert_eq!(group.name, "test");
443 assert!(group.is_empty());
444 }
445
446 #[test]
447 fn test_stream_group_add_stream() {
448 let mut group = StreamGroup::new("test");
449 group.add_stream(1);
450 group.add_stream(2);
451 group.add_stream(1); assert_eq!(group.len(), 2);
453 }
454
455 #[test]
456 fn test_stream_group_manual_reference() {
457 let mut group = make_group_with_offsets();
458 let ref_id = group.select_reference(ReferenceStrategy::Manual(1));
459 assert_eq!(ref_id, Some(1));
460 assert_eq!(group.reference_id(), Some(1));
461 }
462
463 #[test]
464 fn test_stream_group_manual_reference_invalid() {
465 let mut group = make_group_with_offsets();
466 let ref_id = group.select_reference(ReferenceStrategy::Manual(99));
467 assert_eq!(ref_id, None);
468 }
469
470 #[test]
471 fn test_stream_group_highest_confidence_reference() {
472 let mut group = make_group_with_offsets();
473 let ref_id = group.select_reference(ReferenceStrategy::HighestConfidence);
474 assert!(ref_id.is_some());
475 assert!(group
476 .stream_ids
477 .contains(&ref_id.expect("test expectation failed")));
478 }
479
480 #[test]
481 fn test_stream_group_most_connected_reference() {
482 let mut group = make_group_with_offsets();
483 let ref_id = group.select_reference(ReferenceStrategy::MostConnected);
484 assert_eq!(ref_id, Some(1));
486 }
487
488 #[test]
489 fn test_stream_group_bulk_offsets() {
490 let mut group = make_group_with_offsets();
491 group.select_reference(ReferenceStrategy::Manual(1));
492 let offsets = group.compute_bulk_offsets();
493 assert_eq!(offsets[&1].offset_samples, 0);
495 assert_eq!(offsets[&2].offset_samples, -100);
497 assert_eq!(offsets[&3].offset_samples, 200);
499 }
500
501 #[test]
502 fn test_multi_stream_aligner_creation() {
503 let aligner = MultiStreamAligner::new(48000);
504 assert_eq!(aligner.sample_rate(), 48000);
505 assert_eq!(aligner.group_count(), 0);
506 }
507
508 #[test]
509 fn test_multi_stream_aligner_add_group() {
510 let mut aligner = MultiStreamAligner::new(48000);
511 let mut group = StreamGroup::new("cameras");
512 group.add_stream(1);
513 group.add_stream(2);
514 aligner.add_group(group);
515 assert_eq!(aligner.group_count(), 1);
516 assert_eq!(aligner.total_streams(), 2);
517 }
518
519 #[test]
520 fn test_multi_stream_aligner_get_group() {
521 let mut aligner = MultiStreamAligner::new(48000);
522 aligner.add_group(StreamGroup::new("cameras"));
523 assert!(aligner.group("cameras").is_some());
524 assert!(aligner.group("missing").is_none());
525 }
526
527 #[test]
528 fn test_multi_stream_aligner_align_all() {
529 let mut aligner = MultiStreamAligner::new(48000);
530 let group = make_group_with_offsets();
531 aligner.add_group(group);
532 aligner.set_strategy(ReferenceStrategy::Manual(1));
533 let results = aligner.align_all();
534 assert!(results.contains_key("cameras"));
535 }
536
537 #[test]
538 fn test_alignment_summary_empty() {
539 let summary = AlignmentSummary::from_offsets(&HashMap::new());
540 assert_eq!(summary.stream_count, 0);
541 assert_eq!(summary.max_offset_samples, 0);
542 }
543
544 #[test]
545 fn test_alignment_summary_from_offsets() {
546 let mut offsets = HashMap::new();
547 offsets.insert(1u32, StreamOffset::new(1, 1, 0, 1.0));
548 offsets.insert(2u32, StreamOffset::new(2, 1, 200, 0.9));
549 offsets.insert(3u32, StreamOffset::new(3, 1, -300, 0.3));
550
551 let summary = AlignmentSummary::from_offsets(&offsets);
552 assert_eq!(summary.stream_count, 3);
553 assert_eq!(summary.max_offset_samples, 300);
554 assert_eq!(summary.low_confidence_count, 1); }
556}