1use std::collections::{HashMap, VecDeque};
2
3use cluster::ClusterTrackStats;
4use transport::TrackId;
5
6use crate::rpc::ReceiverLayerLimit;
7
8const SINGLE_STREAM_BASED_BITRATE: u32 = 80_000; const SIMULCAST_BASED_BITRATE: u32 = 60_000; const SVC_BASED_BITRATE: u32 = 60_000; fn get_next_bitrate(layers: &[[u32; 3]; 3], max_spatial: u8, max_temporal: u8, current: u32) -> u32 {
13 let mut next = current;
14 for spatial in 0..(max_spatial + 1) {
15 if next > current || spatial as usize > layers.len() {
16 break;
17 }
18 for temporal in 0..(max_temporal + 1) {
19 if temporal as usize > layers[spatial as usize].len() {
20 break;
21 }
22 if layers[spatial as usize][temporal as usize] > current {
23 next = layers[spatial as usize][temporal as usize];
24 break;
25 }
26 }
27 }
28 if next == current {
29 next * 11 / 10
30 } else {
31 next
32 }
33}
34
35#[derive(Debug, Clone, PartialEq, Eq)]
36pub enum LocalTrackTarget {
37 WaitStart,
38 Pause,
39 Single { key_only: bool },
40 Scalable { spatial: u8, temporal: u8, key_only: bool },
41}
42
43#[derive(Debug, PartialEq, Eq)]
44pub enum BitrateAllocationAction {
45 LimitLocalTrack(TrackId, LocalTrackTarget),
46 LimitLocalTrackBitrate(TrackId, u32),
47 ConfigEgressBitrate { current: u32, desired: u32 },
48}
49
50struct TrackTarget {
51 changed: Option<LocalTrackTarget>,
52 current: u32,
53 desired: u32,
54}
55
56struct TrackSlot {
57 track_id: u16,
58 limit: ReceiverLayerLimit,
59 target: Option<LocalTrackTarget>,
60 source: Option<ClusterTrackStats>,
61}
62
63impl TrackSlot {
64 pub fn priority(&self) -> u16 {
65 self.limit.priority
66 }
67
68 pub fn based_bitrate(&self) -> u32 {
69 match &self.source {
70 Some(source) => match source {
71 ClusterTrackStats::Single { bitrate: _ } => SINGLE_STREAM_BASED_BITRATE,
72 ClusterTrackStats::Simulcast { bitrate: _, layers: _ } => SIMULCAST_BASED_BITRATE,
73 ClusterTrackStats::Svc { bitrate: _, layers: _ } => SVC_BASED_BITRATE,
74 },
75 None => SINGLE_STREAM_BASED_BITRATE,
76 }
77 }
78
79 pub fn update_target(&mut self, bitrate: u32) -> TrackTarget {
80 let (new_target, current, desired) = match &self.source {
81 Some(ClusterTrackStats::Single { bitrate: source_bitrate }) => {
82 if bitrate >= SINGLE_STREAM_BASED_BITRATE {
83 (LocalTrackTarget::Single { key_only: false }, *source_bitrate, *source_bitrate * 6 / 5)
84 } else {
85 (LocalTrackTarget::Pause, 0, SINGLE_STREAM_BASED_BITRATE)
86 }
87 }
88 Some(ClusterTrackStats::Simulcast { bitrate: _, layers }) => {
89 if bitrate < SIMULCAST_BASED_BITRATE {
90 (LocalTrackTarget::Pause, 0, SIMULCAST_BASED_BITRATE)
91 } else {
92 let mut current_bitrate = layers[0][0];
93 let min_spatial = self.limit.min_spatial.unwrap_or(0);
94 let min_temporal = self.limit.min_temporal.unwrap_or(0);
95 let mut target_spatial = 0;
96 let mut target_temporal = 0;
97
98 for spatial in 0..(self.limit.max_spatial + 1) {
99 for temporal in 0..(self.limit.max_temporal + 1) {
100 if layers[spatial as usize][temporal as usize] == 0 {
101 break;
102 }
103 if layers[spatial as usize][temporal as usize] <= bitrate || (spatial <= min_spatial && temporal <= min_temporal) {
104 current_bitrate = layers[spatial as usize][temporal as usize];
105 target_spatial = spatial as u8;
106 target_temporal = temporal as u8;
107 } else {
108 break;
109 }
110 }
111 }
112 let desired_bitrate = get_next_bitrate(layers, self.limit.max_spatial, self.limit.max_temporal, current_bitrate);
113
114 let target = LocalTrackTarget::Scalable {
115 spatial: target_spatial,
116 temporal: target_temporal,
117 key_only: false,
118 };
119 (target, current_bitrate, desired_bitrate)
120 }
121 }
122 Some(ClusterTrackStats::Svc { bitrate: _, layers }) => {
123 if bitrate < SVC_BASED_BITRATE {
124 (LocalTrackTarget::Pause, 0, SVC_BASED_BITRATE)
125 } else {
126 let mut current_bitrate = layers[0][0];
127 let min_spatial = self.limit.min_spatial.unwrap_or(0);
128 let min_temporal = self.limit.min_temporal.unwrap_or(0);
129 let mut target_spatial = 0;
130 let mut target_temporal = 0;
131
132 for spatial in 0..(self.limit.max_spatial + 1) {
133 for temporal in 0..(self.limit.max_temporal + 1) {
134 if layers[spatial as usize][temporal as usize] == 0 {
135 break;
136 }
137 if layers[spatial as usize][temporal as usize] <= bitrate || (spatial <= min_spatial && temporal <= min_temporal) {
138 current_bitrate = layers[spatial as usize][temporal as usize];
139 target_spatial = spatial as u8;
140 target_temporal = temporal as u8;
141 } else {
142 break;
143 }
144 }
145 }
146
147 let desired_bitrate = get_next_bitrate(layers, self.limit.max_spatial, self.limit.max_temporal, current_bitrate);
148
149 let target = LocalTrackTarget::Scalable {
150 spatial: target_spatial,
151 temporal: target_temporal,
152 key_only: false,
153 };
154 (target, current_bitrate, desired_bitrate)
155 }
156 }
157 None => {
158 (LocalTrackTarget::WaitStart, 0, SINGLE_STREAM_BASED_BITRATE)
160 }
161 };
162
163 let changed = if self.target != Some(new_target.clone()) {
164 self.target = Some(new_target.clone());
165 Some(new_target)
166 } else {
167 None
168 };
169
170 TrackTarget { changed, current, desired }
171 }
172}
173
174pub struct BitrateAllocator {
175 send_bps: u32,
176 tracks: Vec<TrackSlot>,
177 out_actions: VecDeque<BitrateAllocationAction>,
178}
179
180impl BitrateAllocator {
181 pub fn new(send_bps: u32) -> Self {
182 Self {
183 send_bps,
184 tracks: Default::default(),
185 out_actions: Default::default(),
186 }
187 }
188
189 pub fn tick(&mut self) {
190 self.refresh();
191 }
192
193 pub fn set_est_bitrate(&mut self, bps: u32) {
194 self.send_bps = bps;
195 self.refresh();
196 }
197
198 pub fn add_local_track(&mut self, track: TrackId, priority: u16) {
199 log::info!("[BitrateAllocator] add track {} priority {}", track, priority);
200 self.tracks.retain(|slot| slot.track_id != track);
202 self.tracks.push(TrackSlot {
203 track_id: track,
204 source: None,
205 limit: ReceiverLayerLimit {
206 priority,
207 min_spatial: None,
208 max_spatial: 2,
209 min_temporal: None,
210 max_temporal: 2,
211 },
212 target: None,
213 });
214 self.tracks.sort_by_key(|t| t.priority());
215 }
216
217 pub fn update_local_track_limit(&mut self, track: TrackId, limit: ReceiverLayerLimit) {
218 log::info!("[BitrateAllocator] update track {} limit {:?}", track, limit);
219 self.tracks.iter_mut().find(|slot| slot.track_id == track).map(|slot| slot.limit = limit);
221 self.tracks.sort_by_key(|t| t.priority());
222 }
223
224 pub fn remove_local_track(&mut self, track: TrackId) {
225 log::info!("[BitrateAllocator] remove track {}", track);
226 self.tracks.retain(|slot| slot.track_id != track);
227 }
228
229 pub fn update_source_bitrate(&mut self, track: TrackId, stats: ClusterTrackStats) {
230 self.tracks.iter_mut().find(|slot| slot.track_id == track).map(|slot| slot.source = Some(stats));
231 }
232
233 fn refresh(&mut self) {
234 let mut current_bitrate = 0;
235 let mut desired_bitrate = 0;
236 let mut used_bitrate = 0;
237 let mut track_bitrates: HashMap<TrackId, u32> = Default::default();
238 let mut sum_priority = 0;
239
240 for track in &self.tracks {
241 used_bitrate += track.based_bitrate();
242 sum_priority += track.priority() as u32;
243 track_bitrates.insert(track.track_id, track.based_bitrate());
244 if used_bitrate > self.send_bps {
245 break;
246 }
247 }
248
249 if sum_priority > 0 && self.send_bps > used_bitrate {
250 let remain_bitrate = (self.send_bps - used_bitrate) as u64;
251 for track in &self.tracks {
252 if let Some(bitrate) = track_bitrates.get_mut(&track.track_id) {
253 *bitrate += (remain_bitrate * (track.priority() as u64) / sum_priority as u64) as u32;
254 }
255 }
256 }
257
258 for track in self.tracks.iter_mut() {
259 let bitrate = track_bitrates.get(&track.track_id).unwrap_or(&0);
260 let output = track.update_target(*bitrate);
261 if *bitrate > 0 {
262 self.out_actions.push_back(BitrateAllocationAction::LimitLocalTrackBitrate(track.track_id, *bitrate));
263 }
264 if let Some(target) = output.changed {
265 self.out_actions.push_back(BitrateAllocationAction::LimitLocalTrack(track.track_id, target));
266 }
267 current_bitrate += output.current;
268 desired_bitrate += output.desired;
269 }
270
271 self.out_actions.push_back(BitrateAllocationAction::ConfigEgressBitrate {
272 current: current_bitrate,
273 desired: desired_bitrate * 6 / 5,
274 });
275 }
276
277 pub fn pop_action(&mut self) -> Option<BitrateAllocationAction> {
278 self.out_actions.pop_front()
279 }
280}
281
282#[cfg(test)]
283mod tests {
284 use cluster::ClusterTrackStats;
285 use transport::TrackId;
286
287 use crate::{endpoint::internal::DEFAULT_BITRATE_OUT_BPS, rpc::ReceiverLayerLimit};
288
289 use super::{get_next_bitrate, BitrateAllocationAction, BitrateAllocator, LocalTrackTarget, SINGLE_STREAM_BASED_BITRATE};
290
291 fn create_receiver_limit(priority: u16, max_spatial: u8, max_temporal: u8) -> ReceiverLayerLimit {
292 ReceiverLayerLimit {
293 priority,
294 min_spatial: None,
295 max_spatial,
296 min_temporal: None,
297 max_temporal,
298 }
299 }
300
301 fn create_receiver_limit_full(priority: u16, max_spatial: u8, max_temporal: u8, min_spatial: u8, min_temporal: u8) -> ReceiverLayerLimit {
302 ReceiverLayerLimit {
303 priority,
304 min_spatial: Some(min_spatial),
305 max_spatial,
306 min_temporal: Some(min_temporal),
307 max_temporal,
308 }
309 }
310
311 enum Data {
312 Tick,
313 SetEstBitrate(u32),
315 AddLocalTrack(TrackId, u16),
317 UpdateLocalTrack(TrackId, ReceiverLayerLimit),
318 RemoveLocalTrack(TrackId),
319 UpdateSourceBitrate(TrackId, ClusterTrackStats),
320 Output(Option<BitrateAllocationAction>),
321 }
322
323 fn test(default_send: u32, data: Vec<Data>) {
324 let mut allocator = BitrateAllocator::new(default_send);
325
326 let mut index = 0;
327 for row in data {
328 index += 1;
329 match row {
330 Data::SetEstBitrate(bps) => allocator.set_est_bitrate(bps),
331 Data::Tick => allocator.tick(),
332 Data::AddLocalTrack(track, priority) => allocator.add_local_track(track, priority),
333 Data::UpdateLocalTrack(track, limit) => allocator.update_local_track_limit(track, limit),
334 Data::RemoveLocalTrack(track) => allocator.remove_local_track(track),
335 Data::UpdateSourceBitrate(track, stats) => allocator.update_source_bitrate(track, stats),
336 Data::Output(expected) => assert_eq!(allocator.pop_action(), expected, "Wrong in row {}", index),
337 }
338 }
339 }
340
341 #[test]
342 fn next_bitrate_increasing() {
343 let layers = [[100, 200, 300], [400, 500, 600], [700, 800, 900]];
344 assert_eq!(get_next_bitrate(&layers, 2, 2, 100), 200);
345 assert_eq!(get_next_bitrate(&layers, 2, 2, 200), 300);
346 assert_eq!(get_next_bitrate(&layers, 2, 2, 300), 400);
347 assert_eq!(get_next_bitrate(&layers, 2, 2, 900), 990);
348 }
349
350 #[test]
351 fn next_bitrate_disconnect() {
352 let layers = [[100, 200, 500], [400, 500, 800], [700, 800, 900]];
353 assert_eq!(get_next_bitrate(&layers, 2, 2, 100), 200);
354 assert_eq!(get_next_bitrate(&layers, 2, 2, 200), 500);
355 assert_eq!(get_next_bitrate(&layers, 2, 2, 500), 800);
356 assert_eq!(get_next_bitrate(&layers, 2, 2, 800), 900);
357 assert_eq!(get_next_bitrate(&layers, 2, 2, 900), 990);
358 }
359
360 #[test]
361 fn single_track() {
362 test(
363 DEFAULT_BITRATE_OUT_BPS,
364 vec![
365 Data::AddLocalTrack(1, 10000),
366 Data::Tick,
367 Data::Output(Some(BitrateAllocationAction::LimitLocalTrackBitrate(1, DEFAULT_BITRATE_OUT_BPS))),
368 Data::Output(Some(BitrateAllocationAction::LimitLocalTrack(1, LocalTrackTarget::WaitStart))),
369 Data::Output(Some(BitrateAllocationAction::ConfigEgressBitrate {
370 current: 0,
371 desired: SINGLE_STREAM_BASED_BITRATE * 6 / 5,
372 })),
373 Data::Output(None),
374 Data::UpdateSourceBitrate(1, ClusterTrackStats::Single { bitrate: 100_000 }),
375 Data::Tick,
376 Data::Output(Some(BitrateAllocationAction::LimitLocalTrackBitrate(1, DEFAULT_BITRATE_OUT_BPS))),
377 Data::Output(Some(BitrateAllocationAction::LimitLocalTrack(1, LocalTrackTarget::Single { key_only: false }))),
378 Data::Output(Some(BitrateAllocationAction::ConfigEgressBitrate {
379 current: 100_000,
380 desired: 120_000 * 6 / 5,
381 })),
382 Data::Output(None),
383 Data::RemoveLocalTrack(1),
384 Data::Tick,
385 Data::Output(Some(BitrateAllocationAction::ConfigEgressBitrate { current: 0, desired: 0 })),
386 Data::Output(None),
387 ],
388 );
389 }
390
391 #[test]
392 fn multi_track() {
393 test(
394 DEFAULT_BITRATE_OUT_BPS,
395 vec![
396 Data::AddLocalTrack(1, 100),
397 Data::AddLocalTrack(2, 300),
398 Data::Tick,
399 Data::Output(Some(BitrateAllocationAction::LimitLocalTrackBitrate(
400 1,
401 SINGLE_STREAM_BASED_BITRATE + (DEFAULT_BITRATE_OUT_BPS - SINGLE_STREAM_BASED_BITRATE * 2) * 1 / 4,
402 ))),
403 Data::Output(Some(BitrateAllocationAction::LimitLocalTrack(1, LocalTrackTarget::WaitStart))),
404 Data::Output(Some(BitrateAllocationAction::LimitLocalTrackBitrate(
405 2,
406 SINGLE_STREAM_BASED_BITRATE + (DEFAULT_BITRATE_OUT_BPS - SINGLE_STREAM_BASED_BITRATE * 2) * 3 / 4,
407 ))),
408 Data::Output(Some(BitrateAllocationAction::LimitLocalTrack(2, LocalTrackTarget::WaitStart))),
409 Data::Output(Some(BitrateAllocationAction::ConfigEgressBitrate {
410 current: 0,
411 desired: SINGLE_STREAM_BASED_BITRATE * 2 * 6 / 5,
412 })),
413 Data::Output(None),
414 Data::UpdateLocalTrack(1, create_receiver_limit(300, 2, 2)),
415 Data::UpdateLocalTrack(2, create_receiver_limit(100, 2, 2)),
416 Data::Tick,
417 Data::Output(Some(BitrateAllocationAction::LimitLocalTrackBitrate(
418 2,
419 SINGLE_STREAM_BASED_BITRATE + (DEFAULT_BITRATE_OUT_BPS - SINGLE_STREAM_BASED_BITRATE * 2) * 1 / 4,
420 ))),
421 Data::Output(Some(BitrateAllocationAction::LimitLocalTrackBitrate(
422 1,
423 SINGLE_STREAM_BASED_BITRATE + (DEFAULT_BITRATE_OUT_BPS - SINGLE_STREAM_BASED_BITRATE * 2) * 3 / 4,
424 ))),
425 Data::Output(Some(BitrateAllocationAction::ConfigEgressBitrate {
426 current: 0,
427 desired: SINGLE_STREAM_BASED_BITRATE * 2 * 6 / 5,
428 })),
429 Data::Output(None),
430 ],
431 );
432 }
433
434 #[test]
435 fn simulcast_single_track() {
436 test(
437 DEFAULT_BITRATE_OUT_BPS,
438 vec![
439 Data::AddLocalTrack(1, 100),
440 Data::UpdateSourceBitrate(
441 1,
442 ClusterTrackStats::Simulcast {
443 bitrate: 100000,
444 layers: [[100_000, 150_000, 200_000], [200_000, 300_000, 400_000], [400_000, 600_000, 800_000]],
445 },
446 ),
447 Data::Tick,
448 Data::Output(Some(BitrateAllocationAction::LimitLocalTrackBitrate(1, DEFAULT_BITRATE_OUT_BPS))),
449 Data::Output(Some(BitrateAllocationAction::LimitLocalTrack(
450 1,
451 LocalTrackTarget::Scalable {
452 spatial: 2,
453 temporal: 2,
454 key_only: false,
455 },
456 ))),
457 Data::Output(Some(BitrateAllocationAction::ConfigEgressBitrate {
458 current: 800_000,
459 desired: 880_000 * 6 / 5,
460 })),
461 Data::Output(None),
462 Data::UpdateLocalTrack(1, create_receiver_limit_full(100, 2, 2, 1, 1)),
464 Data::Tick,
465 Data::Output(Some(BitrateAllocationAction::LimitLocalTrackBitrate(1, DEFAULT_BITRATE_OUT_BPS))),
466 Data::Output(Some(BitrateAllocationAction::ConfigEgressBitrate {
467 current: 800_000,
468 desired: 880_000 * 6 / 5,
469 })),
470 Data::Output(None),
471 Data::SetEstBitrate(100_000),
472 Data::Output(Some(BitrateAllocationAction::LimitLocalTrackBitrate(1, 100_000))),
473 Data::Output(Some(BitrateAllocationAction::LimitLocalTrack(
474 1,
475 LocalTrackTarget::Scalable {
476 spatial: 1,
477 temporal: 1,
478 key_only: false,
479 },
480 ))),
481 Data::Output(Some(BitrateAllocationAction::ConfigEgressBitrate {
482 current: 300_000,
483 desired: 400_000 * 6 / 5,
484 })),
485 Data::Output(None),
486 Data::UpdateLocalTrack(1, create_receiver_limit_full(100, 0, 0, 0, 0)),
488 Data::SetEstBitrate(1_000_000),
489 Data::Output(Some(BitrateAllocationAction::LimitLocalTrackBitrate(1, 1_000_000))),
490 Data::Output(Some(BitrateAllocationAction::LimitLocalTrack(
491 1,
492 LocalTrackTarget::Scalable {
493 spatial: 0,
494 temporal: 0,
495 key_only: false,
496 },
497 ))),
498 Data::Output(Some(BitrateAllocationAction::ConfigEgressBitrate {
499 current: 100_000,
500 desired: 110_000 * 6 / 5,
501 })),
502 Data::Output(None),
503 Data::RemoveLocalTrack(1),
504 Data::Tick,
505 Data::Output(Some(BitrateAllocationAction::ConfigEgressBitrate { current: 0, desired: 0 })),
506 Data::Output(None),
507 ],
508 );
509 }
510
511 #[test]
512 fn simulcast_min_spatial_overwrite() {
513 test(
514 100000,
515 vec![
516 Data::AddLocalTrack(1, 100),
517 Data::UpdateSourceBitrate(
518 1,
519 ClusterTrackStats::Simulcast {
520 bitrate: 100000,
521 layers: [[100_000, 150_000, 200_000], [200_000, 300_000, 400_000], [0, 0, 0]],
522 },
523 ),
524 Data::UpdateLocalTrack(1, create_receiver_limit_full(100, 2, 2, 2, 2)),
525 Data::Tick,
526 Data::Output(Some(BitrateAllocationAction::LimitLocalTrackBitrate(1, 100000))),
527 Data::Output(Some(BitrateAllocationAction::LimitLocalTrack(
528 1,
529 LocalTrackTarget::Scalable {
530 spatial: 1,
531 temporal: 2,
532 key_only: false,
533 },
534 ))),
535 Data::Output(Some(BitrateAllocationAction::ConfigEgressBitrate {
536 current: 400_000,
537 desired: 440_000 * 6 / 5,
538 })),
539 Data::Output(None),
540 Data::UpdateLocalTrack(1, create_receiver_limit_full(100, 2, 0, 2, 0)),
542 Data::Tick,
543 Data::Output(Some(BitrateAllocationAction::LimitLocalTrackBitrate(1, 100000))),
544 Data::Output(Some(BitrateAllocationAction::LimitLocalTrack(
545 1,
546 LocalTrackTarget::Scalable {
547 spatial: 1,
548 temporal: 0,
549 key_only: false,
550 },
551 ))),
552 Data::Output(Some(BitrateAllocationAction::ConfigEgressBitrate {
553 current: 200_000,
554 desired: 220_000 * 6 / 5,
555 })),
556 Data::Output(None),
557 ],
558 );
559 }
560
561 #[test]
562 fn svc_min_spatial_overwrite() {
563 test(
564 100000,
565 vec![
566 Data::AddLocalTrack(1, 100),
567 Data::UpdateSourceBitrate(
568 1,
569 ClusterTrackStats::Svc {
570 bitrate: 100000,
571 layers: [[100_000, 150_000, 200_000], [200_000, 300_000, 400_000], [0, 0, 0]],
572 },
573 ),
574 Data::UpdateLocalTrack(1, create_receiver_limit_full(100, 2, 2, 2, 2)),
575 Data::Tick,
576 Data::Output(Some(BitrateAllocationAction::LimitLocalTrackBitrate(1, 100000))),
577 Data::Output(Some(BitrateAllocationAction::LimitLocalTrack(
578 1,
579 LocalTrackTarget::Scalable {
580 spatial: 1,
581 temporal: 2,
582 key_only: false,
583 },
584 ))),
585 Data::Output(Some(BitrateAllocationAction::ConfigEgressBitrate {
586 current: 400_000,
587 desired: 440_000 * 6 / 5,
588 })),
589 Data::Output(None),
590 Data::UpdateLocalTrack(1, create_receiver_limit_full(100, 2, 0, 2, 0)),
591 Data::Tick,
592 Data::Output(Some(BitrateAllocationAction::LimitLocalTrackBitrate(1, 100000))),
593 Data::Output(Some(BitrateAllocationAction::LimitLocalTrack(
594 1,
595 LocalTrackTarget::Scalable {
596 spatial: 1,
597 temporal: 0,
598 key_only: false,
599 },
600 ))),
601 Data::Output(Some(BitrateAllocationAction::ConfigEgressBitrate {
602 current: 200_000,
603 desired: 220_000 * 6 / 5,
604 })),
605 Data::Output(None),
606 ],
607 );
608 }
609}