Skip to main content

oxpulse_sfu_kit/client/
keyframe.rs

1//! Keyframe-request plumbing — both directions.
2//!
3//! - **Upstream**: when str0m gives us non-contiguous media on an incoming
4//!   track, ask the source peer for a keyframe (throttled to avoid storms).
5//! - **Downstream**: when a subscriber's decoder stalls and it asks the SFU
6//!   for a keyframe on an outgoing track, relay that to the origin client.
7
8use std::time::{Duration, Instant};
9
10use str0m::media::{KeyframeRequestKind, Mid, Rid};
11
12use super::tracks::TrackOut;
13use super::Client;
14use crate::ids::SfuMid;
15use crate::keyframe::SfuKeyframeRequest;
16use crate::propagate::Propagated;
17
18/// Minimum gap between PLI/FIR requests for the same track.
19///
20/// Matches str0m's `chat.rs` 1-second floor — fast enough to unblock receivers,
21/// slow enough to avoid keyframe request storms.
22const KEYFRAME_REQUEST_MIN_GAP: Duration = Duration::from_secs(1);
23
24impl Client {
25    /// Ask the source to cut a keyframe, throttled to [`KEYFRAME_REQUEST_MIN_GAP`].
26    pub(super) fn request_keyframe_throttled(
27        &mut self,
28        mid: Mid,
29        rid: Option<Rid>,
30        kind: KeyframeRequestKind,
31    ) {
32        let Some(mut writer) = self.rtc.writer(mid) else {
33            return;
34        };
35        let Some(entry) = self.tracks_in.iter_mut().find(|t| t.id.mid == mid) else {
36            return;
37        };
38        if entry
39            .last_keyframe_request
40            .map(|t| t.elapsed() < KEYFRAME_REQUEST_MIN_GAP)
41            .unwrap_or(false)
42        {
43            return;
44        }
45        let _ = writer.request_keyframe(rid, kind);
46        entry.last_keyframe_request = Some(Instant::now());
47    }
48
49    /// Translate a subscriber's keyframe request to the origin client's track.
50    pub(super) fn incoming_keyframe_req(
51        &self,
52        mut req: str0m::media::KeyframeRequest,
53    ) -> Propagated {
54        let Some(track_out): Option<&TrackOut> =
55            self.tracks_out.iter().find(|t| t.mid() == Some(req.mid))
56        else {
57            return Propagated::Noop;
58        };
59        let Some(track_in) = track_out.track_in.upgrade() else {
60            return Propagated::Noop;
61        };
62        req.rid = self.chosen_rid;
63        if track_in.relay_source {
64            // The publisher is on another SFU edge — we cannot send PLI/FIR to
65            // it because it has no inbound negotiation for this direction.
66            // Emit UpstreamKeyframeRequest for the application to relay upstream.
67            return Propagated::UpstreamKeyframeRequest {
68                source_relay_id: track_in.origin,
69                req: SfuKeyframeRequest::from_str0m(req),
70                source_mid: SfuMid::from_str0m(track_in.mid),
71            };
72        }
73        Propagated::KeyframeRequest(
74            self.id,
75            SfuKeyframeRequest::from_str0m(req),
76            track_in.origin,
77            SfuMid::from_str0m(track_in.mid),
78        )
79    }
80
81    /// Handle a propagated keyframe request: pass it through to str0m's writer
82    /// if this client owns the matching incoming track.
83    pub fn handle_keyframe_request(&mut self, req: SfuKeyframeRequest, mid_in: SfuMid) {
84        let mid_in = mid_in.to_str0m();
85        if !self.tracks_in.iter().any(|i| i.id.mid == mid_in) {
86            return;
87        }
88        let Some(mut writer) = self.rtc.writer(mid_in) else {
89            return;
90        };
91        let rid = req.rid().map(|r| r.to_str0m());
92        let kind = req.kind().to_str0m();
93        if let Err(e) = writer.request_keyframe(rid, kind) {
94            tracing::info!(client = *self.id, error = ?e, "request_keyframe failed");
95        }
96    }
97}
98
99#[cfg(any(test, feature = "test-utils"))]
100impl Client {
101    /// Expose `incoming_keyframe_req` for integration tests.
102    pub fn incoming_keyframe_req_for_tests(
103        &self,
104        req: str0m::media::KeyframeRequest,
105    ) -> crate::propagate::Propagated {
106        self.incoming_keyframe_req(req)
107    }
108}