plex_api/server/transcode/
download_queue.rs

1use std::{fmt, ops::RangeBounds, str::FromStr};
2
3use content_disposition::parse_content_disposition;
4use futures::AsyncWrite;
5use http::StatusCode;
6use isahc::{
7    http::header::CONTENT_DISPOSITION, http::header::CONTENT_LENGTH, AsyncReadResponseExt,
8};
9use serde::Deserialize;
10use serde_json::Value;
11
12use crate::{
13    isahc_compat::StatusCodeExt,
14    media_container::{
15        server::library::{ContainerFormat, Metadata, Protocol},
16        MediaContainerWrapper,
17    },
18    transcode::{
19        get_transcode_params, session_id, Context, DecisionResult, TranscodeOptions,
20        TranscodeSessionStats,
21    },
22    url::{
23        DOWNLOAD_QUEUE_ADD, DOWNLOAD_QUEUE_CREATE, DOWNLOAD_QUEUE_DOWNLOAD, DOWNLOAD_QUEUE_ITEM,
24        DOWNLOAD_QUEUE_LIST,
25    },
26    Error, HttpClient, Result,
27};
28
29#[derive(Deserialize)]
30#[serde(rename_all = "lowercase")]
31enum QueueStatus {
32    Deciding,
33    Waiting,
34    Processing,
35    Done,
36    Error,
37}
38
39#[derive(Debug, Clone, Deserialize, PartialEq, Eq)]
40#[serde(rename_all = "lowercase")]
41pub enum QueueItemStatus {
42    /// The server is deciding whether to transcode this item or not.
43    Deciding,
44    /// The item is waiting in the queue to be transcoded.
45    Waiting,
46    /// The item is currently being transcoded.
47    Processing,
48    /// The item is available for download. Either transcoding is complete or was not required.
49    Available,
50    /// An error occurred.
51    Error,
52    /// The transcoded item has timed out and is no longer available.
53    Expired,
54}
55
56#[derive(Deserialize)]
57#[allow(dead_code)]
58#[serde(rename_all = "camelCase")]
59#[cfg_attr(feature = "tests_deny_unknown_fields", serde(deny_unknown_fields))]
60struct QueueSpec {
61    id: u32,
62    owner: Option<u32>,
63    client_identifier: Option<String>,
64    item_count: u32,
65    status: QueueStatus,
66}
67
68#[derive(Deserialize)]
69struct DownloadQueueContainer {
70    #[serde(rename = "DownloadQueue", default)]
71    queues: Vec<QueueSpec>,
72}
73
74#[derive(Deserialize)]
75struct QueueAddedItem {
76    key: String,
77    id: u32,
78}
79
80#[derive(Deserialize)]
81struct QueueAddedContainer {
82    #[serde(rename = "AddedQueueItems", default)]
83    items: Vec<QueueAddedItem>,
84}
85
86#[derive(Clone, Debug)]
87/// A download queue on the server.
88///
89/// Each server maintains one download queue per user per device.
90pub struct DownloadQueue {
91    client: HttpClient,
92    id: u32,
93}
94
95impl PartialEq for DownloadQueue {
96    fn eq(&self, other: &Self) -> bool {
97        self.id == other.id
98            && self.client.x_plex_client_identifier == other.client.x_plex_client_identifier
99    }
100}
101
102impl DownloadQueue {
103    pub(crate) async fn get_or_create(client: HttpClient) -> Result<Self> {
104        let wrapper: MediaContainerWrapper<DownloadQueueContainer> =
105            client.post(DOWNLOAD_QUEUE_CREATE).json().await?;
106
107        if let Some(queue) = wrapper.media_container.queues.first() {
108            Ok(Self {
109                client,
110                id: queue.id,
111            })
112        } else {
113            Err(Error::ItemNotFound)
114        }
115    }
116
117    /// Lists the items in this download queue.
118    pub async fn items(&self) -> Result<Vec<QueueItem>> {
119        Ok(self
120            .client
121            .get(DOWNLOAD_QUEUE_LIST.replace("{queueId}", &self.id.to_string()))
122            .json::<MediaContainerWrapper<QueueItemContainer>>()
123            .await?
124            .media_container
125            .items
126            .into_iter()
127            .map(|item| QueueItem {
128                client: self.client.clone(),
129                state: item,
130            })
131            .collect())
132    }
133
134    /// Gets a specific item in this download queue by its ID.
135    pub async fn item(&self, id: u32) -> Result<QueueItem> {
136        let state = QueueItemState::fetch(&self.client, self.id, id).await?;
137
138        Ok(QueueItem {
139            client: self.client.clone(),
140            state,
141        })
142    }
143
144    /// Adds a media item to this download queue with the given transcode options.
145    ///
146    /// Adding the same media with the same options will return the existing item in the queue.
147    /// You can pass either the main item (in which case the server selects which media to use and
148    /// combines all parts) or specific media or a specific part.
149    pub(crate) async fn add_item<O: TranscodeOptions>(
150        &self,
151        metadata: &Metadata,
152        media_index: Option<usize>,
153        part_index: Option<usize>,
154        options: O,
155    ) -> Result<QueueItem> {
156        let id = session_id();
157        let key = &metadata.key;
158
159        let params = get_transcode_params(
160            &id,
161            Context::Static,
162            Protocol::Http,
163            media_index,
164            part_index,
165            options,
166        )?
167        .param("keys", &metadata.key)
168        .param("path", &metadata.key);
169
170        let wrapper: MediaContainerWrapper<QueueAddedContainer> = self
171            .client
172            .post(format!(
173                "{}?{params}",
174                DOWNLOAD_QUEUE_ADD.replace("{queueId}", &self.id.to_string())
175            ))
176            .json()
177            .await?;
178
179        if let Some(item) = wrapper.media_container.items.iter().find(|i| &i.key == key) {
180            let item = QueueItemState::fetch(&self.client, self.id, item.id).await?;
181
182            Ok(QueueItem {
183                client: self.client.clone(),
184                state: item,
185            })
186        } else {
187            Err(Error::ItemNotFound)
188        }
189    }
190}
191
192#[derive(Debug, Clone, Deserialize)]
193#[allow(dead_code)]
194#[serde(rename_all = "camelCase")]
195#[cfg_attr(feature = "tests_deny_unknown_fields", serde(deny_unknown_fields))]
196struct QueueItemState {
197    id: u32,
198    queue_id: u32,
199    key: String,
200    status: QueueItemStatus,
201    error: Option<String>,
202    // The API docs says this is the transcode session object. I've never seen it as anything other
203    // than null though.
204    transcode: Option<Value>,
205    #[serde(rename = "DecisionResult")]
206    decision_result: DecisionResult,
207    #[serde(rename = "TranscodeSession")]
208    session_stats: Option<TranscodeSessionStats>,
209}
210
211impl QueueItemState {
212    async fn fetch(client: &HttpClient, queue_id: u32, id: u32) -> Result<Self> {
213        let items = client
214            .get(
215                DOWNLOAD_QUEUE_ITEM
216                    .replace("{queueId}", &queue_id.to_string())
217                    .replace("{itemId}", &id.to_string()),
218            )
219            .json::<MediaContainerWrapper<QueueItemContainer>>()
220            .await?
221            .media_container
222            .items;
223
224        items.into_iter().next().ok_or_else(|| Error::ItemNotFound)
225    }
226}
227
228#[derive(Deserialize)]
229struct QueueItemContainer {
230    #[serde(rename = "DownloadQueueItem", default)]
231    items: Vec<QueueItemState>,
232}
233
234/// An item in a download queue.
235pub struct QueueItem {
236    client: HttpClient,
237    state: QueueItemState,
238}
239
240impl fmt::Debug for QueueItem {
241    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
242        self.state.fmt(f)
243    }
244}
245
246impl QueueItem {
247    pub fn id(&self) -> u32 {
248        self.state.id
249    }
250
251    pub fn queue(&self) -> DownloadQueue {
252        DownloadQueue {
253            client: self.client.clone(),
254            id: self.state.queue_id,
255        }
256    }
257
258    pub fn key(&self) -> &str {
259        &self.state.key
260    }
261
262    pub fn status(&self) -> QueueItemStatus {
263        self.state.status.clone()
264    }
265
266    /// If this item is currently being transcoded this will return the current
267    /// transcode stats.
268    pub fn stats(&self) -> Option<TranscodeSessionStats> {
269        self.state.session_stats.clone()
270    }
271
272    /// If the status is an error this may reveal more information.
273    pub fn error(&self) -> Option<&str> {
274        self.state.error.as_deref()
275    }
276
277    /// Returns true if this item was or is being transcoded. If false then
278    /// downloading will just download the original media file.
279    pub fn is_transcode(&self) -> bool {
280        self.state.decision_result.direct_play_decision_code != Some(1000)
281    }
282
283    /// Returns the container format of the file that will be downloaded.
284    ///
285    /// This will fail if the item is not available.
286    pub async fn container(&self) -> Result<ContainerFormat> {
287        // The API doesn't appear to expose the container format in a
288        // a particularly nice way. If the item is in the middle of transcoding
289        // then it is available in the transcode stats, but if already complete
290        // the stats are no longer exposed. However the content-disposition
291        // header of the download endpoint does include the filename complete
292        // with correct extension for the container so we can use that.
293
294        let path = DOWNLOAD_QUEUE_DOWNLOAD
295            .replace("{queueId}", &self.state.queue_id.to_string())
296            .replace("{itemId}", &self.state.id.to_string());
297
298        let response = self.client.head(path).send().await?;
299        match response.status().as_http_status() {
300            StatusCode::OK => {
301                if let Some(val) = response.headers().get(CONTENT_DISPOSITION) {
302                    if let Ok(st) = val.to_str() {
303                        if let Some((_, Some(ext))) = parse_content_disposition(st).filename() {
304                            match ContainerFormat::from_str(&ext) {
305                                Ok(c) => Ok(c),
306                                Err(_) => Err(Error::UnknownContainerFormat(ext.to_string())),
307                            }
308                        } else {
309                            Err(Error::InvalidHeaderValue)
310                        }
311                    } else {
312                        Err(Error::InvalidHeaderValue)
313                    }
314                } else {
315                    Err(Error::InvalidHeaderValue)
316                }
317            }
318            StatusCode::SERVICE_UNAVAILABLE => Err(Error::TranscodeIncomplete),
319            _ => Err(crate::Error::from_response(response).await),
320        }
321    }
322
323    /// Returns the expected length of the download.
324    ///
325    /// This will fail if the item is not available.
326    pub async fn content_length(&self) -> Result<Option<u64>> {
327        let path = DOWNLOAD_QUEUE_DOWNLOAD
328            .replace("{queueId}", &self.state.queue_id.to_string())
329            .replace("{itemId}", &self.state.id.to_string());
330
331        let response = self.client.head(path).send().await?;
332        match response.status().as_http_status() {
333            StatusCode::OK => {
334                if let Some(val) = response.headers().get(CONTENT_LENGTH) {
335                    if let Ok(st) = val.to_str() {
336                        Ok(st.parse::<u64>().ok())
337                    } else {
338                        Ok(None)
339                    }
340                } else {
341                    Ok(None)
342                }
343            }
344            StatusCode::SERVICE_UNAVAILABLE => Err(Error::TranscodeIncomplete),
345            _ => Err(crate::Error::from_response(response).await),
346        }
347    }
348
349    /// Updates the state of this item by re-fetching it from the server.
350    pub async fn update(&mut self) -> Result<()> {
351        let state = QueueItemState::fetch(&self.client, self.state.queue_id, self.state.id).await?;
352        self.state = state;
353        Ok(())
354    }
355
356    /// Downloads the item to the provided writer.
357    ///
358    /// This will fail if the item is not available.
359    pub async fn download<W, R>(&self, writer: W, range: R) -> Result
360    where
361        W: AsyncWrite + Unpin,
362        R: RangeBounds<u64>,
363    {
364        let path = DOWNLOAD_QUEUE_DOWNLOAD
365            .replace("{queueId}", &self.state.queue_id.to_string())
366            .replace("{itemId}", &self.state.id.to_string());
367
368        let start = match range.start_bound() {
369            std::ops::Bound::Included(v) => *v,
370            std::ops::Bound::Excluded(v) => v + 1,
371            std::ops::Bound::Unbounded => 0,
372        };
373
374        let end = match range.end_bound() {
375            std::ops::Bound::Included(v) => Some(*v),
376            std::ops::Bound::Excluded(v) => Some(v - 1),
377            std::ops::Bound::Unbounded => None,
378        };
379
380        let mut builder = self.client.get(path).timeout(None);
381        if start != 0 || end.is_some() {
382            // We're requesting part of the file.
383            let end = end.map(|v| v.to_string()).unwrap_or_default();
384            builder = builder.header("Range", format!("bytes={start}-{end}"))
385        }
386
387        let mut response = builder.send().await?;
388        match response.status().as_http_status() {
389            StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
390                response.copy_to(writer).await?;
391                Ok(())
392            }
393            StatusCode::SERVICE_UNAVAILABLE => Err(Error::TranscodeIncomplete),
394            _ => Err(crate::Error::from_response(response).await),
395        }
396    }
397
398    /// Deletes this item from the download queue.
399    pub async fn delete(self) -> Result<()> {
400        self.client
401            .delete(
402                DOWNLOAD_QUEUE_ITEM
403                    .replace("{queueId}", &self.state.queue_id.to_string())
404                    .replace("{itemId}", &self.state.id.to_string()),
405            )
406            .send()
407            .await?;
408
409        Ok(())
410    }
411}