plex_api/server/transcode/
download_queue.rs1use std::{fmt, ops::RangeBounds, str::FromStr};
2
3use content_disposition::parse_content_disposition;
4use futures::AsyncWrite;
5use http::StatusCode;
6use isahc::{
7 http::header::CONTENT_DISPOSITION, http::header::CONTENT_LENGTH, AsyncReadResponseExt,
8};
9use serde::Deserialize;
10use serde_json::Value;
11
12use crate::{
13 isahc_compat::StatusCodeExt,
14 media_container::{
15 server::library::{ContainerFormat, Metadata, Protocol},
16 MediaContainerWrapper,
17 },
18 transcode::{
19 get_transcode_params, session_id, Context, DecisionResult, TranscodeOptions,
20 TranscodeSessionStats,
21 },
22 url::{
23 DOWNLOAD_QUEUE_ADD, DOWNLOAD_QUEUE_CREATE, DOWNLOAD_QUEUE_DOWNLOAD, DOWNLOAD_QUEUE_ITEM,
24 DOWNLOAD_QUEUE_LIST,
25 },
26 Error, HttpClient, Result,
27};
28
29#[derive(Deserialize)]
30#[serde(rename_all = "lowercase")]
31enum QueueStatus {
32 Deciding,
33 Waiting,
34 Processing,
35 Done,
36 Error,
37}
38
39#[derive(Debug, Clone, Deserialize, PartialEq, Eq)]
40#[serde(rename_all = "lowercase")]
41pub enum QueueItemStatus {
42 Deciding,
44 Waiting,
46 Processing,
48 Available,
50 Error,
52 Expired,
54}
55
56#[derive(Deserialize)]
57#[allow(dead_code)]
58#[serde(rename_all = "camelCase")]
59#[cfg_attr(feature = "tests_deny_unknown_fields", serde(deny_unknown_fields))]
60struct QueueSpec {
61 id: u32,
62 owner: Option<u32>,
63 client_identifier: Option<String>,
64 item_count: u32,
65 status: QueueStatus,
66}
67
68#[derive(Deserialize)]
69struct DownloadQueueContainer {
70 #[serde(rename = "DownloadQueue", default)]
71 queues: Vec<QueueSpec>,
72}
73
74#[derive(Deserialize)]
75struct QueueAddedItem {
76 key: String,
77 id: u32,
78}
79
80#[derive(Deserialize)]
81struct QueueAddedContainer {
82 #[serde(rename = "AddedQueueItems", default)]
83 items: Vec<QueueAddedItem>,
84}
85
86#[derive(Clone, Debug)]
87pub struct DownloadQueue {
91 client: HttpClient,
92 id: u32,
93}
94
95impl PartialEq for DownloadQueue {
96 fn eq(&self, other: &Self) -> bool {
97 self.id == other.id
98 && self.client.x_plex_client_identifier == other.client.x_plex_client_identifier
99 }
100}
101
102impl DownloadQueue {
103 pub(crate) async fn get_or_create(client: HttpClient) -> Result<Self> {
104 let wrapper: MediaContainerWrapper<DownloadQueueContainer> =
105 client.post(DOWNLOAD_QUEUE_CREATE).json().await?;
106
107 if let Some(queue) = wrapper.media_container.queues.first() {
108 Ok(Self {
109 client,
110 id: queue.id,
111 })
112 } else {
113 Err(Error::ItemNotFound)
114 }
115 }
116
117 pub async fn items(&self) -> Result<Vec<QueueItem>> {
119 Ok(self
120 .client
121 .get(DOWNLOAD_QUEUE_LIST.replace("{queueId}", &self.id.to_string()))
122 .json::<MediaContainerWrapper<QueueItemContainer>>()
123 .await?
124 .media_container
125 .items
126 .into_iter()
127 .map(|item| QueueItem {
128 client: self.client.clone(),
129 state: item,
130 })
131 .collect())
132 }
133
134 pub async fn item(&self, id: u32) -> Result<QueueItem> {
136 let state = QueueItemState::fetch(&self.client, self.id, id).await?;
137
138 Ok(QueueItem {
139 client: self.client.clone(),
140 state,
141 })
142 }
143
144 pub(crate) async fn add_item<O: TranscodeOptions>(
150 &self,
151 metadata: &Metadata,
152 media_index: Option<usize>,
153 part_index: Option<usize>,
154 options: O,
155 ) -> Result<QueueItem> {
156 let id = session_id();
157 let key = &metadata.key;
158
159 let params = get_transcode_params(
160 &id,
161 Context::Static,
162 Protocol::Http,
163 media_index,
164 part_index,
165 options,
166 )?
167 .param("keys", &metadata.key)
168 .param("path", &metadata.key);
169
170 let wrapper: MediaContainerWrapper<QueueAddedContainer> = self
171 .client
172 .post(format!(
173 "{}?{params}",
174 DOWNLOAD_QUEUE_ADD.replace("{queueId}", &self.id.to_string())
175 ))
176 .json()
177 .await?;
178
179 if let Some(item) = wrapper.media_container.items.iter().find(|i| &i.key == key) {
180 let item = QueueItemState::fetch(&self.client, self.id, item.id).await?;
181
182 Ok(QueueItem {
183 client: self.client.clone(),
184 state: item,
185 })
186 } else {
187 Err(Error::ItemNotFound)
188 }
189 }
190}
191
192#[derive(Debug, Clone, Deserialize)]
193#[allow(dead_code)]
194#[serde(rename_all = "camelCase")]
195#[cfg_attr(feature = "tests_deny_unknown_fields", serde(deny_unknown_fields))]
196struct QueueItemState {
197 id: u32,
198 queue_id: u32,
199 key: String,
200 status: QueueItemStatus,
201 error: Option<String>,
202 transcode: Option<Value>,
205 #[serde(rename = "DecisionResult")]
206 decision_result: DecisionResult,
207 #[serde(rename = "TranscodeSession")]
208 session_stats: Option<TranscodeSessionStats>,
209}
210
211impl QueueItemState {
212 async fn fetch(client: &HttpClient, queue_id: u32, id: u32) -> Result<Self> {
213 let items = client
214 .get(
215 DOWNLOAD_QUEUE_ITEM
216 .replace("{queueId}", &queue_id.to_string())
217 .replace("{itemId}", &id.to_string()),
218 )
219 .json::<MediaContainerWrapper<QueueItemContainer>>()
220 .await?
221 .media_container
222 .items;
223
224 items.into_iter().next().ok_or_else(|| Error::ItemNotFound)
225 }
226}
227
228#[derive(Deserialize)]
229struct QueueItemContainer {
230 #[serde(rename = "DownloadQueueItem", default)]
231 items: Vec<QueueItemState>,
232}
233
234pub struct QueueItem {
236 client: HttpClient,
237 state: QueueItemState,
238}
239
240impl fmt::Debug for QueueItem {
241 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
242 self.state.fmt(f)
243 }
244}
245
246impl QueueItem {
247 pub fn id(&self) -> u32 {
248 self.state.id
249 }
250
251 pub fn queue(&self) -> DownloadQueue {
252 DownloadQueue {
253 client: self.client.clone(),
254 id: self.state.queue_id,
255 }
256 }
257
258 pub fn key(&self) -> &str {
259 &self.state.key
260 }
261
262 pub fn status(&self) -> QueueItemStatus {
263 self.state.status.clone()
264 }
265
266 pub fn stats(&self) -> Option<TranscodeSessionStats> {
269 self.state.session_stats.clone()
270 }
271
272 pub fn error(&self) -> Option<&str> {
274 self.state.error.as_deref()
275 }
276
277 pub fn is_transcode(&self) -> bool {
280 self.state.decision_result.direct_play_decision_code != Some(1000)
281 }
282
283 pub async fn container(&self) -> Result<ContainerFormat> {
287 let path = DOWNLOAD_QUEUE_DOWNLOAD
295 .replace("{queueId}", &self.state.queue_id.to_string())
296 .replace("{itemId}", &self.state.id.to_string());
297
298 let response = self.client.head(path).send().await?;
299 match response.status().as_http_status() {
300 StatusCode::OK => {
301 if let Some(val) = response.headers().get(CONTENT_DISPOSITION) {
302 if let Ok(st) = val.to_str() {
303 if let Some((_, Some(ext))) = parse_content_disposition(st).filename() {
304 match ContainerFormat::from_str(&ext) {
305 Ok(c) => Ok(c),
306 Err(_) => Err(Error::UnknownContainerFormat(ext.to_string())),
307 }
308 } else {
309 Err(Error::InvalidHeaderValue)
310 }
311 } else {
312 Err(Error::InvalidHeaderValue)
313 }
314 } else {
315 Err(Error::InvalidHeaderValue)
316 }
317 }
318 StatusCode::SERVICE_UNAVAILABLE => Err(Error::TranscodeIncomplete),
319 _ => Err(crate::Error::from_response(response).await),
320 }
321 }
322
323 pub async fn content_length(&self) -> Result<Option<u64>> {
327 let path = DOWNLOAD_QUEUE_DOWNLOAD
328 .replace("{queueId}", &self.state.queue_id.to_string())
329 .replace("{itemId}", &self.state.id.to_string());
330
331 let response = self.client.head(path).send().await?;
332 match response.status().as_http_status() {
333 StatusCode::OK => {
334 if let Some(val) = response.headers().get(CONTENT_LENGTH) {
335 if let Ok(st) = val.to_str() {
336 Ok(st.parse::<u64>().ok())
337 } else {
338 Ok(None)
339 }
340 } else {
341 Ok(None)
342 }
343 }
344 StatusCode::SERVICE_UNAVAILABLE => Err(Error::TranscodeIncomplete),
345 _ => Err(crate::Error::from_response(response).await),
346 }
347 }
348
349 pub async fn update(&mut self) -> Result<()> {
351 let state = QueueItemState::fetch(&self.client, self.state.queue_id, self.state.id).await?;
352 self.state = state;
353 Ok(())
354 }
355
356 pub async fn download<W, R>(&self, writer: W, range: R) -> Result
360 where
361 W: AsyncWrite + Unpin,
362 R: RangeBounds<u64>,
363 {
364 let path = DOWNLOAD_QUEUE_DOWNLOAD
365 .replace("{queueId}", &self.state.queue_id.to_string())
366 .replace("{itemId}", &self.state.id.to_string());
367
368 let start = match range.start_bound() {
369 std::ops::Bound::Included(v) => *v,
370 std::ops::Bound::Excluded(v) => v + 1,
371 std::ops::Bound::Unbounded => 0,
372 };
373
374 let end = match range.end_bound() {
375 std::ops::Bound::Included(v) => Some(*v),
376 std::ops::Bound::Excluded(v) => Some(v - 1),
377 std::ops::Bound::Unbounded => None,
378 };
379
380 let mut builder = self.client.get(path).timeout(None);
381 if start != 0 || end.is_some() {
382 let end = end.map(|v| v.to_string()).unwrap_or_default();
384 builder = builder.header("Range", format!("bytes={start}-{end}"))
385 }
386
387 let mut response = builder.send().await?;
388 match response.status().as_http_status() {
389 StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
390 response.copy_to(writer).await?;
391 Ok(())
392 }
393 StatusCode::SERVICE_UNAVAILABLE => Err(Error::TranscodeIncomplete),
394 _ => Err(crate::Error::from_response(response).await),
395 }
396 }
397
398 pub async fn delete(self) -> Result<()> {
400 self.client
401 .delete(
402 DOWNLOAD_QUEUE_ITEM
403 .replace("{queueId}", &self.state.queue_id.to_string())
404 .replace("{itemId}", &self.state.id.to_string()),
405 )
406 .send()
407 .await?;
408
409 Ok(())
410 }
411}