plex_api/server/transcode/
session.rs1use futures::AsyncWrite;
2use http::StatusCode;
3use isahc::AsyncReadResponseExt;
4use serde::Deserialize;
5
6use crate::{
7 isahc_compat::StatusCodeExt,
8 media_container::{
9 server::{
10 library::{
11 AudioCodec, AudioStream, ContainerFormat, Decision, Media as MediaMetadata,
12 Metadata, Protocol, Stream, VideoCodec, VideoStream,
13 },
14 Feature,
15 },
16 MediaContainer, MediaContainerWrapper,
17 },
18 server::Query,
19 transcode::{
20 bs, get_transcode_params, session_id, Context, DecisionResult, TranscodeOptions,
21 TranscodeSessionStats,
22 },
23 url::{
24 SERVER_TRANSCODE_DECISION, SERVER_TRANSCODE_DOWNLOAD, SERVER_TRANSCODE_SESSIONS,
25 SERVER_TRANSCODE_STOP,
26 },
27 Error, HttpClient, Result,
28};
29
30#[derive(Debug, Clone, Deserialize)]
31#[serde(rename_all = "camelCase")]
32pub(crate) struct TranscodeSessionsMediaContainer {
33 #[serde(default, rename = "TranscodeSession")]
34 pub(crate) transcode_sessions: Vec<TranscodeSessionStats>,
35}
36
37#[derive(Debug, Clone, Deserialize)]
38#[allow(dead_code)]
39#[serde(rename_all = "camelCase")]
40#[cfg_attr(feature = "tests_deny_unknown_fields", serde(deny_unknown_fields))]
41struct TranscodeDecisionMediaContainer {
42 #[serde(flatten)]
43 decision_result: DecisionResult,
44
45 allow_sync: String,
46 #[serde(rename = "librarySectionID")]
47 library_section_id: Option<String>,
48 library_section_title: Option<String>,
49 #[serde(rename = "librarySectionUUID")]
50 library_section_uuid: Option<String>,
51 media_tag_prefix: Option<String>,
52 media_tag_version: Option<String>,
53 resource_session: Option<String>,
54
55 #[serde(flatten)]
56 media_container: MediaContainer,
57
58 #[serde(default, rename = "Metadata")]
59 metadata: Vec<Metadata>,
60}
61
62async fn transcode_decision(client: &HttpClient, params: &Query) -> Result<MediaMetadata> {
63 let path = format!("{SERVER_TRANSCODE_DECISION}?{params}");
64
65 let mut response = client
66 .get(path)
67 .header("Accept", "application/json")
68 .send()
69 .await?;
70
71 let text = match response.status().as_http_status() {
72 StatusCode::OK => response.text().await?,
73 _ => return Err(crate::Error::from_response(response).await),
74 };
75
76 let wrapper: MediaContainerWrapper<TranscodeDecisionMediaContainer> =
77 serde_json::from_str(&text)?;
78
79 if wrapper
80 .media_container
81 .decision_result
82 .general_decision_code
83 == Some(2011)
84 && wrapper
85 .media_container
86 .decision_result
87 .general_decision_text
88 == Some("Downloads not allowed".to_string())
89 {
90 return Err(Error::SubscriptionFeatureNotAvailable(Feature::SyncV3));
91 }
92
93 if wrapper
94 .media_container
95 .decision_result
96 .direct_play_decision_code
97 == Some(1000)
98 {
99 return Err(Error::TranscodeRefused);
100 }
101
102 wrapper
103 .media_container
104 .metadata
105 .into_iter()
106 .next()
107 .and_then(|m| m.media)
108 .and_then(|m| m.into_iter().find(|m| m.selected == Some(true)))
109 .ok_or_else(|| {
110 if let Some(text) = wrapper
111 .media_container
112 .decision_result
113 .transcode_decision_text
114 {
115 Error::TranscodeError(text)
116 } else {
117 Error::UnexpectedApiResponse {
118 status_code: response.status().as_u16(),
119 content: text,
120 }
121 }
122 })
123}
124
125pub(crate) async fn create_transcode_session<O: TranscodeOptions>(
126 client: &HttpClient,
127 item_metadata: &Metadata,
128 context: Context,
129 target_protocol: Protocol,
130 media_index: Option<usize>,
131 part_index: Option<usize>,
132 options: O,
133) -> Result<TranscodeSession> {
134 let id = session_id();
135
136 let mut params = get_transcode_params(
137 &id,
138 context,
139 target_protocol,
140 media_index,
141 part_index,
142 options,
143 )?
144 .param("path", item_metadata.key.clone());
145
146 if context == Context::Static {
147 params = params.param("offlineTranscode", bs(true));
148 }
149
150 let media_data = transcode_decision(client, ¶ms).await?;
151
152 if target_protocol != media_data.protocol.unwrap_or(Protocol::Http) {
153 return Err(Error::TranscodeError(
154 "Server returned an invalid protocol.".to_string(),
155 ));
156 }
157
158 TranscodeSession::from_metadata(
159 id,
160 client.clone(),
161 media_data,
162 context == Context::Static,
163 params,
164 )
165}
166
167pub(crate) async fn transcode_session_stats(
168 client: &HttpClient,
169 session_id: &str,
170) -> Result<TranscodeSessionStats> {
171 let wrapper: MediaContainerWrapper<TranscodeSessionsMediaContainer> = match client
172 .get(format!("{SERVER_TRANSCODE_SESSIONS}/{session_id}"))
173 .json()
174 .await
175 {
176 Ok(w) => w,
177 Err(Error::UnexpectedApiResponse {
178 status_code: 404,
179 content: _,
180 }) => {
181 return Err(crate::Error::ItemNotFound);
182 }
183 Err(e) => return Err(e),
184 };
185 wrapper
186 .media_container
187 .transcode_sessions
188 .first()
189 .cloned()
190 .ok_or(crate::Error::ItemNotFound)
191}
192
193#[derive(Clone, Copy)]
194pub enum TranscodeStatus {
195 Complete,
196 Error,
197 Transcoding {
198 remaining: Option<u32>,
200 progress: f32,
202 },
203}
204
205pub struct TranscodeSession {
206 id: String,
207 client: HttpClient,
208 offline: bool,
209 protocol: Protocol,
210 container: ContainerFormat,
211 video_transcode: Option<(Decision, VideoCodec)>,
212 audio_transcode: Option<(Decision, AudioCodec)>,
213 params: Query,
214}
215
216impl TranscodeSession {
217 pub(crate) fn from_stats(client: HttpClient, stats: TranscodeSessionStats) -> Self {
218 Self {
219 client,
220 params: Query::new().param("session", &stats.key),
223 offline: stats.offline_transcode,
224 container: stats.container,
225 protocol: stats.protocol,
226 video_transcode: stats.video_decision.zip(stats.video_codec),
227 audio_transcode: stats.audio_decision.zip(stats.audio_codec),
228 id: stats.key,
229 }
230 }
231
232 fn from_metadata(
233 id: String,
234 client: HttpClient,
235 media_data: MediaMetadata,
236 offline: bool,
237 params: Query,
238 ) -> Result<Self> {
239 let part_data = media_data
240 .parts
241 .iter()
242 .find(|p| p.selected == Some(true))
243 .ok_or_else(|| {
244 Error::TranscodeError("Server returned unexpected response".to_string())
245 })?;
246
247 let streams = part_data.streams.as_ref().ok_or_else(|| {
248 Error::TranscodeError("Server returned unexpected response".to_string())
249 })?;
250
251 let video_streams = streams
252 .iter()
253 .filter_map(|s| match s {
254 Stream::Video(s) => Some(s),
255 _ => None,
256 })
257 .collect::<Vec<&VideoStream>>();
258
259 let video_transcode = video_streams
260 .iter()
261 .find(|s| s.selected == Some(true))
262 .or_else(|| video_streams.first())
263 .map(|s| (s.decision.unwrap(), s.codec));
264
265 let audio_streams = streams
266 .iter()
267 .filter_map(|s| match s {
268 Stream::Audio(s) => Some(s),
269 _ => None,
270 })
271 .collect::<Vec<&AudioStream>>();
272
273 let audio_transcode = audio_streams
274 .iter()
275 .find(|s| s.selected == Some(true))
276 .or_else(|| audio_streams.first())
277 .map(|s| (s.decision.unwrap(), s.codec));
278
279 Ok(Self {
280 id,
281 client,
282 offline,
283 params,
284 container: media_data.container.unwrap(),
285 protocol: media_data.protocol.unwrap_or(Protocol::Http),
286 video_transcode,
287 audio_transcode,
288 })
289 }
290
291 pub fn session_id(&self) -> &str {
293 &self.id
294 }
295
296 pub fn is_offline(&self) -> bool {
297 self.offline
298 }
299
300 pub fn protocol(&self) -> Protocol {
302 self.protocol
303 }
304
305 pub fn container(&self) -> ContainerFormat {
307 self.container
308 }
309
310 pub fn video_transcode(&self) -> Option<(Decision, VideoCodec)> {
312 self.video_transcode
313 }
314
315 pub fn audio_transcode(&self) -> Option<(Decision, AudioCodec)> {
317 self.audio_transcode
318 }
319
320 #[tracing::instrument(level = "debug", skip_all)]
341 pub async fn download<W>(&self, writer: W) -> Result<()>
342 where
343 W: AsyncWrite + Unpin,
344 {
345 let ext = match (self.protocol, self.container) {
348 (Protocol::Dash, _) => "mpd".to_string(),
349 (Protocol::Hls, _) => "m3u8".to_string(),
350 (_, container) => container.to_string(),
351 };
352
353 let path = format!(
354 "{}?{}",
355 SERVER_TRANSCODE_DOWNLOAD.replace("{extension}", &ext),
356 self.params
357 );
358
359 let mut builder = self.client.get(path);
360 if self.offline {
361 builder = builder.timeout(None)
362 }
363 let mut response = builder.send().await?;
364
365 match response.status().as_http_status() {
366 StatusCode::OK => {
367 response.copy_to(writer).await?;
368 Ok(())
369 }
370 _ => Err(crate::Error::from_response(response).await),
371 }
372 }
373
374 #[tracing::instrument(level = "debug", skip_all)]
375 pub async fn status(&self) -> Result<TranscodeStatus> {
376 let stats = self.stats().await?;
377
378 if stats.error {
379 Ok(TranscodeStatus::Error)
380 } else if stats.complete {
381 Ok(TranscodeStatus::Complete)
382 } else {
383 Ok(TranscodeStatus::Transcoding {
384 remaining: stats.remaining,
385 progress: stats.progress,
386 })
387 }
388 }
389
390 #[tracing::instrument(level = "debug", skip_all)]
392 pub async fn stats(&self) -> Result<TranscodeSessionStats> {
393 transcode_session_stats(&self.client, &self.id).await
394 }
395
396 #[tracing::instrument(level = "debug", skip_all)]
403 pub async fn cancel(self) -> Result<()> {
404 let mut response = self
405 .client
406 .get(format!("{SERVER_TRANSCODE_STOP}?session={}", self.id))
407 .send()
408 .await?;
409
410 match response.status().as_http_status() {
411 StatusCode::OK | StatusCode::NOT_FOUND => Ok(response.consume().await?),
414 _ => Err(crate::Error::from_response(response).await),
415 }
416 }
417}