plex_api/server/transcode/
session.rs

1use futures::AsyncWrite;
2use http::StatusCode;
3use isahc::AsyncReadResponseExt;
4use serde::Deserialize;
5
6use crate::{
7    isahc_compat::StatusCodeExt,
8    media_container::{
9        server::{
10            library::{
11                AudioCodec, AudioStream, ContainerFormat, Decision, Media as MediaMetadata,
12                Metadata, Protocol, Stream, VideoCodec, VideoStream,
13            },
14            Feature,
15        },
16        MediaContainer, MediaContainerWrapper,
17    },
18    server::Query,
19    transcode::{
20        bs, get_transcode_params, session_id, Context, DecisionResult, TranscodeOptions,
21        TranscodeSessionStats,
22    },
23    url::{
24        SERVER_TRANSCODE_DECISION, SERVER_TRANSCODE_DOWNLOAD, SERVER_TRANSCODE_SESSIONS,
25        SERVER_TRANSCODE_STOP,
26    },
27    Error, HttpClient, Result,
28};
29
30#[derive(Debug, Clone, Deserialize)]
31#[serde(rename_all = "camelCase")]
32pub(crate) struct TranscodeSessionsMediaContainer {
33    #[serde(default, rename = "TranscodeSession")]
34    pub(crate) transcode_sessions: Vec<TranscodeSessionStats>,
35}
36
37#[derive(Debug, Clone, Deserialize)]
38#[allow(dead_code)]
39#[serde(rename_all = "camelCase")]
40#[cfg_attr(feature = "tests_deny_unknown_fields", serde(deny_unknown_fields))]
41struct TranscodeDecisionMediaContainer {
42    #[serde(flatten)]
43    decision_result: DecisionResult,
44
45    allow_sync: String,
46    #[serde(rename = "librarySectionID")]
47    library_section_id: Option<String>,
48    library_section_title: Option<String>,
49    #[serde(rename = "librarySectionUUID")]
50    library_section_uuid: Option<String>,
51    media_tag_prefix: Option<String>,
52    media_tag_version: Option<String>,
53    resource_session: Option<String>,
54
55    #[serde(flatten)]
56    media_container: MediaContainer,
57
58    #[serde(default, rename = "Metadata")]
59    metadata: Vec<Metadata>,
60}
61
62async fn transcode_decision(client: &HttpClient, params: &Query) -> Result<MediaMetadata> {
63    let path = format!("{SERVER_TRANSCODE_DECISION}?{params}");
64
65    let mut response = client
66        .get(path)
67        .header("Accept", "application/json")
68        .send()
69        .await?;
70
71    let text = match response.status().as_http_status() {
72        StatusCode::OK => response.text().await?,
73        _ => return Err(crate::Error::from_response(response).await),
74    };
75
76    let wrapper: MediaContainerWrapper<TranscodeDecisionMediaContainer> =
77        serde_json::from_str(&text)?;
78
79    if wrapper
80        .media_container
81        .decision_result
82        .general_decision_code
83        == Some(2011)
84        && wrapper
85            .media_container
86            .decision_result
87            .general_decision_text
88            == Some("Downloads not allowed".to_string())
89    {
90        return Err(Error::SubscriptionFeatureNotAvailable(Feature::SyncV3));
91    }
92
93    if wrapper
94        .media_container
95        .decision_result
96        .direct_play_decision_code
97        == Some(1000)
98    {
99        return Err(Error::TranscodeRefused);
100    }
101
102    wrapper
103        .media_container
104        .metadata
105        .into_iter()
106        .next()
107        .and_then(|m| m.media)
108        .and_then(|m| m.into_iter().find(|m| m.selected == Some(true)))
109        .ok_or_else(|| {
110            if let Some(text) = wrapper
111                .media_container
112                .decision_result
113                .transcode_decision_text
114            {
115                Error::TranscodeError(text)
116            } else {
117                Error::UnexpectedApiResponse {
118                    status_code: response.status().as_u16(),
119                    content: text,
120                }
121            }
122        })
123}
124
125pub(crate) async fn create_transcode_session<O: TranscodeOptions>(
126    client: &HttpClient,
127    item_metadata: &Metadata,
128    context: Context,
129    target_protocol: Protocol,
130    media_index: Option<usize>,
131    part_index: Option<usize>,
132    options: O,
133) -> Result<TranscodeSession> {
134    let id = session_id();
135
136    let mut params = get_transcode_params(
137        &id,
138        context,
139        target_protocol,
140        media_index,
141        part_index,
142        options,
143    )?
144    .param("path", item_metadata.key.clone());
145
146    if context == Context::Static {
147        params = params.param("offlineTranscode", bs(true));
148    }
149
150    let media_data = transcode_decision(client, &params).await?;
151
152    if target_protocol != media_data.protocol.unwrap_or(Protocol::Http) {
153        return Err(Error::TranscodeError(
154            "Server returned an invalid protocol.".to_string(),
155        ));
156    }
157
158    TranscodeSession::from_metadata(
159        id,
160        client.clone(),
161        media_data,
162        context == Context::Static,
163        params,
164    )
165}
166
167pub(crate) async fn transcode_session_stats(
168    client: &HttpClient,
169    session_id: &str,
170) -> Result<TranscodeSessionStats> {
171    let wrapper: MediaContainerWrapper<TranscodeSessionsMediaContainer> = match client
172        .get(format!("{SERVER_TRANSCODE_SESSIONS}/{session_id}"))
173        .json()
174        .await
175    {
176        Ok(w) => w,
177        Err(Error::UnexpectedApiResponse {
178            status_code: 404,
179            content: _,
180        }) => {
181            return Err(crate::Error::ItemNotFound);
182        }
183        Err(e) => return Err(e),
184    };
185    wrapper
186        .media_container
187        .transcode_sessions
188        .first()
189        .cloned()
190        .ok_or(crate::Error::ItemNotFound)
191}
192
193#[derive(Clone, Copy)]
194pub enum TranscodeStatus {
195    Complete,
196    Error,
197    Transcoding {
198        // The server's estimate of how many seconds are left until complete.
199        remaining: Option<u32>,
200        // Percent complete (0-100).
201        progress: f32,
202    },
203}
204
205pub struct TranscodeSession {
206    id: String,
207    client: HttpClient,
208    offline: bool,
209    protocol: Protocol,
210    container: ContainerFormat,
211    video_transcode: Option<(Decision, VideoCodec)>,
212    audio_transcode: Option<(Decision, AudioCodec)>,
213    params: Query,
214}
215
216impl TranscodeSession {
217    pub(crate) fn from_stats(client: HttpClient, stats: TranscodeSessionStats) -> Self {
218        Self {
219            client,
220            // Once the transcode session is started we only need the session ID
221            // to download.
222            params: Query::new().param("session", &stats.key),
223            offline: stats.offline_transcode,
224            container: stats.container,
225            protocol: stats.protocol,
226            video_transcode: stats.video_decision.zip(stats.video_codec),
227            audio_transcode: stats.audio_decision.zip(stats.audio_codec),
228            id: stats.key,
229        }
230    }
231
232    fn from_metadata(
233        id: String,
234        client: HttpClient,
235        media_data: MediaMetadata,
236        offline: bool,
237        params: Query,
238    ) -> Result<Self> {
239        let part_data = media_data
240            .parts
241            .iter()
242            .find(|p| p.selected == Some(true))
243            .ok_or_else(|| {
244                Error::TranscodeError("Server returned unexpected response".to_string())
245            })?;
246
247        let streams = part_data.streams.as_ref().ok_or_else(|| {
248            Error::TranscodeError("Server returned unexpected response".to_string())
249        })?;
250
251        let video_streams = streams
252            .iter()
253            .filter_map(|s| match s {
254                Stream::Video(s) => Some(s),
255                _ => None,
256            })
257            .collect::<Vec<&VideoStream>>();
258
259        let video_transcode = video_streams
260            .iter()
261            .find(|s| s.selected == Some(true))
262            .or_else(|| video_streams.first())
263            .map(|s| (s.decision.unwrap(), s.codec));
264
265        let audio_streams = streams
266            .iter()
267            .filter_map(|s| match s {
268                Stream::Audio(s) => Some(s),
269                _ => None,
270            })
271            .collect::<Vec<&AudioStream>>();
272
273        let audio_transcode = audio_streams
274            .iter()
275            .find(|s| s.selected == Some(true))
276            .or_else(|| audio_streams.first())
277            .map(|s| (s.decision.unwrap(), s.codec));
278
279        Ok(Self {
280            id,
281            client,
282            offline,
283            params,
284            container: media_data.container.unwrap(),
285            protocol: media_data.protocol.unwrap_or(Protocol::Http),
286            video_transcode,
287            audio_transcode,
288        })
289    }
290
291    /// The session ID allows for re-retrieving this session at a later date.
292    pub fn session_id(&self) -> &str {
293        &self.id
294    }
295
296    pub fn is_offline(&self) -> bool {
297        self.offline
298    }
299
300    /// The selected protocol.
301    pub fn protocol(&self) -> Protocol {
302        self.protocol
303    }
304
305    /// The selected container.
306    pub fn container(&self) -> ContainerFormat {
307        self.container
308    }
309
310    // The target video codec and the transcode decision.
311    pub fn video_transcode(&self) -> Option<(Decision, VideoCodec)> {
312        self.video_transcode
313    }
314
315    // The target audio codec and the transcode decision.
316    pub fn audio_transcode(&self) -> Option<(Decision, AudioCodec)> {
317        self.audio_transcode
318    }
319
320    /// Downloads the transcoded data to the provided writer.
321    ///
322    /// For streaming transcodes (MPEG-DASH or HLS) this will return the
323    /// playlist data. This crate doesn't contain any support for processing
324    /// these streaming formats and figuring out how to use them is currently
325    /// left as an exercise for the caller.
326    ///
327    /// For offline transcodes it is possible to start downloading before the
328    /// transcode is complete. In this case any data already transcoded is
329    /// downloaded and then the connection will remain open and more data will
330    /// be delivered to the writer as it becomes available. This can mean
331    /// that the HTTP connection is idle for long periods of time waiting for
332    /// more data to be transcoded and so the normal timeouts are disabled for
333    /// offline transcode downloads.
334    ///
335    /// Unfortunately there does not appear to be any way to restart downloads
336    /// from a specific point in the file. So if the download fails for
337    /// any reason you have to start downloading all over again. It may make
338    /// more sense to wait until the transcode is complete or nearly complete
339    /// before attempting download.
340    #[tracing::instrument(level = "debug", skip_all)]
341    pub async fn download<W>(&self, writer: W) -> Result<()>
342    where
343        W: AsyncWrite + Unpin,
344    {
345        // Strictly speaking it doesn't appear that the requested extension
346        // matters but we'll attempt to match other clients anyway.
347        let ext = match (self.protocol, self.container) {
348            (Protocol::Dash, _) => "mpd".to_string(),
349            (Protocol::Hls, _) => "m3u8".to_string(),
350            (_, container) => container.to_string(),
351        };
352
353        let path = format!(
354            "{}?{}",
355            SERVER_TRANSCODE_DOWNLOAD.replace("{extension}", &ext),
356            self.params
357        );
358
359        let mut builder = self.client.get(path);
360        if self.offline {
361            builder = builder.timeout(None)
362        }
363        let mut response = builder.send().await?;
364
365        match response.status().as_http_status() {
366            StatusCode::OK => {
367                response.copy_to(writer).await?;
368                Ok(())
369            }
370            _ => Err(crate::Error::from_response(response).await),
371        }
372    }
373
374    #[tracing::instrument(level = "debug", skip_all)]
375    pub async fn status(&self) -> Result<TranscodeStatus> {
376        let stats = self.stats().await?;
377
378        if stats.error {
379            Ok(TranscodeStatus::Error)
380        } else if stats.complete {
381            Ok(TranscodeStatus::Complete)
382        } else {
383            Ok(TranscodeStatus::Transcoding {
384                remaining: stats.remaining,
385                progress: stats.progress,
386            })
387        }
388    }
389
390    /// Retrieves the current transcode stats.
391    #[tracing::instrument(level = "debug", skip_all)]
392    pub async fn stats(&self) -> Result<TranscodeSessionStats> {
393        transcode_session_stats(&self.client, &self.id).await
394    }
395
396    /// Cancels the transcode and removes any transcoded data from the server.
397    ///
398    /// NB! Be careful with cancelling sessions too often! Cancelling a few transcoding
399    /// sessions in a short succession, or cancelling a session shortly after it was
400    /// initiated might crash the Plex server. At least the one running inside a Linux
401    /// Docker Container.
402    #[tracing::instrument(level = "debug", skip_all)]
403    pub async fn cancel(self) -> Result<()> {
404        let mut response = self
405            .client
406            .get(format!("{SERVER_TRANSCODE_STOP}?session={}", self.id))
407            .send()
408            .await?;
409
410        match response.status().as_http_status() {
411            // Sometimes the server will respond not found but still cancel the
412            // session.
413            StatusCode::OK | StatusCode::NOT_FOUND => Ok(response.consume().await?),
414            _ => Err(crate::Error::from_response(response).await),
415        }
416    }
417}