d_id/endpoints/video/
streams.rs

1// File: streams.rs
2// Path: src/endpoints/video/streams.rs
3
4use super::*;
5
6use webrtc::{api::APIBuilder, peer_connection::configuration::RTCConfiguration, ice_transport::ice_server::*, peer_connection::sdp::session_description::*};
7
8const STREAMS_PATH: &str = "/talks/streams";
9const SDP_PATH: &str = "/sdp";
10const ICE_PATH: &str = "/ice";
11
12#[derive(Serialize, Debug)]
13pub struct NewStreamRequestBody {
14    pub source_url: String,
15    #[serde(skip_serializing_if = "String::is_empty")]
16    pub driver_url: String,
17    #[serde(skip_serializing_if = "Option::is_none")]
18    pub face: Option<Face>,
19    #[serde(skip_serializing_if = "Option::is_none")]
20    pub config: Option<StreamConfig>,
21}
22
23impl NewStreamRequestBody {
24    pub async fn create_stream(&self) -> Result<NewStreamResponse> {
25        let c = ClientBuilder::new()?
26            .method(POST)?
27            .path(STREAMS_PATH)?
28            .header(ACCEPT, APPLICATION_JSON)?
29            .header(CONTENT_TYPE, APPLICATION_JSON)?
30            .build()?;
31
32        let body = serde_json::to_string(&self)?;
33
34        let resp = c.send_request(Full::<Bytes>::new(body.into())).await?;
35
36        let stream_resp = serde_json::from_slice::<NewStreamResponse>(&resp.as_ref())?;
37
38        Ok(stream_resp)
39    }
40}
41
42
43#[derive(Serialize, Debug)]
44pub struct NewStreamRequestBodyBuilder {
45    pub source_url: Option<String>,
46    pub driver_url: Option<String>,
47    pub face: Option<Face>,
48    pub config: Option<StreamConfig>,
49}
50
51#[derive(Serialize, Debug)]
52pub struct StreamConfig {
53    pub motion_factor: f64,
54    pub align_expand_factor: f64,
55    pub stitch: bool,
56}
57
58impl NewStreamRequestBodyBuilder {
59    pub fn new() -> Self {
60        Self {
61            source_url: None,
62            driver_url: None,
63            face: None,
64            config: None,
65        }
66    }
67
68    pub fn source_url(mut self, source_url: &str) -> Self {
69        self.source_url = Some(source_url.to_string());
70        self
71    }
72
73    pub fn driver_url(mut self, driver_url: &str) -> Self {
74        self.driver_url = Some(driver_url.to_string());
75        self
76    }
77
78    pub fn face(mut self, face: Face) -> Self {
79        self.face = Some(face);
80        self
81    }
82
83    pub fn config(mut self, config: StreamConfig) -> Self {
84        self.config = Some(config);
85        self
86    }
87
88    pub fn build(self) -> Result<NewStreamRequestBody> {
89        let source_url = self.source_url.ok_or(RequestBodyBuildError::SourceUrlNotSet)?;
90
91        Ok(NewStreamRequestBody {
92            source_url: source_url,
93            driver_url: self.driver_url.unwrap_or_default(),
94            face: self.face,
95            config: self.config,
96        })
97    }
98}
99
100
101#[derive(Deserialize, Debug)]
102pub struct NewStreamResponse {
103    id: String,
104    offer: Offer,
105    ice_servers: Vec<IceServer>,
106    session_id: String,
107}
108
109#[derive(Deserialize, Debug)]
110pub struct IceServer {
111    urls: Urls,
112    username: Option<String>,
113    credential: Option<String>,
114}
115
116#[derive(Deserialize, Debug)]
117#[serde(untagged)]
118pub enum Urls {
119    Stun(String),
120    Turns(Vec<String>),
121}
122
123#[derive(Deserialize, Debug)]
124pub struct Offer {
125    r#type: String,
126    sdp: String,
127}
128
129pub async fn start_stream(stream_response: NewStreamResponse) -> Result<serde_json::Value> {
130    let web_rtc_api = APIBuilder::new().build();
131
132    let ice_servers = stream_response.ice_servers.iter().map(|ice_server| {
133        let urls = match &ice_server.urls {
134            Urls::Stun(url) => vec![url.clone()],
135            Urls::Turns(urls) => urls.clone(),
136        };
137
138        RTCIceServer {
139            urls,
140            //username: ice_server.username.clone().unwrap_or_default(),
141            credential: ice_server.credential.clone().unwrap_or_default(),
142            //credential_type: RTCIceCredentialType::Password,
143            ..Default::default()
144        }
145    }).collect::<Vec<_>>();
146
147    let rtc_config = RTCConfiguration {
148        ice_servers,
149        peer_identity: stream_response.id.clone(),
150        ..Default::default()
151    };
152
153    let peer_connection = web_rtc_api.new_peer_connection(rtc_config).await?;
154
155    let rtc_session_offer = RTCSessionDescription::offer(stream_response.offer.sdp)?;
156
157    let _ = peer_connection.set_remote_description(rtc_session_offer).await?;
158
159    let rtc_session_answer = peer_connection.create_answer(None).await?;
160
161
162    let _ = peer_connection.set_local_description(rtc_session_answer.clone()).await?;
163
164    //dbg!(peer_connection.connection_state());
165
166    let c = ClientBuilder::new()?
167        .method(POST)?
168        .path(&format!("{}/{}{}", STREAMS_PATH, stream_response.id, SDP_PATH))?
169        .header(ACCEPT, APPLICATION_JSON)?
170        .header(CONTENT_TYPE, APPLICATION_JSON)?
171        .build()?;
172
173    let body = StartStreamRequestBody::new(rtc_session_answer.sdp, stream_response.session_id);
174
175    let body = serde_json::to_string(&body)?;
176
177    let resp = c.send_request(Full::<Bytes>::new(body.into())).await?;
178
179    let json = serde_json::from_slice::<serde_json::Value>(&resp.as_ref())?;
180
181    Ok(json)
182
183}
184
185#[derive(Serialize, Debug)]
186pub struct StartStreamRequestBody {
187    pub answer: Answer,
188    pub session_id: String,
189}
190
191#[derive(Serialize, Debug)]
192pub struct Answer {
193    pub r#type: String,
194    pub sdp: String,
195}
196
197impl StartStreamRequestBody {
198    pub fn new(sdp: String, session_id: String) -> Self {
199        Self {
200            answer: Answer {
201                r#type: "answer".to_string(),
202                sdp: sdp,
203            },
204            session_id: session_id,
205        }
206    }
207}
208