1use std::{borrow::Cow, collections::HashMap, path::Path, str::FromStr, sync::Arc};
4
5use enum_iterator::Sequence;
6use reqwest::{
7 Method, StatusCode,
8 header::{ACCEPT, HeaderMap, HeaderName, InvalidHeaderValue},
9 multipart,
10};
11use serde::{Deserialize, Serialize, de::DeserializeOwned};
12use tracing::{debug, trace};
13
14use crate::{
15 Error, Group, Result, SavedView, User,
16 document::{Document, DocumentData},
17 document_query::DocumentQueryBuilder,
18 dto::{CreateDto, Item, UpdateDto},
19 id::{
20 CorrespondentId, CustomFieldId, DocumentId, DocumentTypeId, GroupId, ItemId, StoragePathId,
21 TagId, TaskId, UserId,
22 },
23 metadata::{
24 correspondent::Correspondent, custom_field::CustomField, document_type::DocumentType,
25 storage_path::StoragePath, tag::Tag,
26 },
27 task::Task,
28 util,
29 workflow::Workflow,
30};
31
32#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash, Sequence)]
37#[non_exhaustive]
38pub enum RefreshMetaData {
39 Tags,
40 CustomFields,
41 Correspondents,
42 DocumentTypes,
43 Groups,
44 Users,
45 StoragePaths,
46}
47
48#[derive(Debug, Clone)]
50pub struct PaperlessClient {
51 pub request_full_permissions: bool,
53
54 pub request_full_content: bool,
56
57 pub(crate) base_url: Arc<str>,
58
59 client: reqwest::Client,
60 cached_data: Arc<CachedData>,
61}
62
63#[derive(Debug, Clone)]
64struct CachedData {
65 correspondents: HashMap<CorrespondentId, Correspondent>,
66 custom_fields: HashMap<CustomFieldId, CustomField>,
67 document_types: HashMap<DocumentTypeId, DocumentType>,
68 groups: HashMap<GroupId, Group>,
69 storage_paths: HashMap<StoragePathId, StoragePath>,
70 tags: HashMap<TagId, Tag>,
71 users: HashMap<UserId, User>,
72}
73
74#[derive(Debug, Deserialize)]
75struct PaginatedResponse<T> {
76 results: Vec<T>,
77 next: Option<String>,
78}
79
80impl PaperlessClient {
81 pub fn new(
89 base_url: &str,
90 token: &str,
91 headers: Option<&HashMap<String, String>>,
92 ) -> std::result::Result<Self, String> {
93 Self::new_with_client(
94 base_url,
95 token,
96 headers,
97 reqwest::Client::builder().zstd(true),
98 )
99 }
100
101 pub fn new_with_client(
113 base_url: &str,
114 token: &str,
115 headers: Option<&HashMap<String, String>>,
116 client_builder: reqwest::ClientBuilder,
117 ) -> std::result::Result<Self, String> {
118 let mut headers_map = HeaderMap::new();
119
120 if let Some(headers) = headers {
122 for (key, value) in headers {
123 headers_map.insert(
124 HeaderName::from_str(key).map_err(|err| err.to_string())?,
125 value
126 .parse()
127 .map_err(|err: InvalidHeaderValue| err.to_string())?,
128 );
129 }
130 }
131
132 headers_map.insert(
134 HeaderName::from_str("Authorization").map_err(|err| err.to_string())?,
135 format!("Token {token}")
136 .parse()
137 .map_err(|err: InvalidHeaderValue| err.to_string())?,
138 );
139
140 Ok(Self {
141 request_full_permissions: false,
142 request_full_content: false,
143 base_url: base_url.into(),
144 client: client_builder
145 .default_headers(headers_map)
146 .build()
147 .map_err(|err| err.to_string())?,
148 cached_data: Arc::new(CachedData {
149 custom_fields: HashMap::new(),
150 correspondents: HashMap::new(),
151 document_types: HashMap::new(),
152 groups: HashMap::new(),
153 storage_paths: HashMap::new(),
154 tags: HashMap::new(),
155 users: HashMap::new(),
156 }),
157 })
158 }
159
160 #[must_use]
165 pub fn with_full_permissions(mut self, req: bool) -> Self {
166 self.request_full_permissions = req;
167 self
168 }
169
170 #[must_use]
171 pub fn with_full_content(mut self, full_content: bool) -> Self {
172 self.request_full_content = full_content;
173 self
174 }
175
176 pub async fn load_items<T: Item + DeserializeOwned>(&self) -> Result<HashMap<T::Id, T>> {
178 let endpoint = format!("/api/{}/", T::endpoint());
179 debug!(endpoint, "Loading");
180
181 let items: Vec<T> = self.fetch_all_pages(&endpoint, None).await?;
182 Ok(items.into_iter().map(|item| (item.id(), item)).collect())
183 }
184
185 fn default_query_params(&self) -> Option<HashMap<&'static str, Cow<'_, str>>> {
186 let mut params = HashMap::new();
187
188 if self.request_full_permissions {
189 params.insert(
190 crate::document_query::QUERY_PARAM_FULL_PERMISSIONS,
191 Cow::Borrowed("true"),
192 );
193 }
194 if !self.request_full_content {
195 params.insert(
196 crate::document_query::QUERY_PARAM_TRUNCATE_CONTENT,
197 Cow::Borrowed("true"),
198 );
199 }
200
201 if params.is_empty() {
202 None
203 } else {
204 Some(params)
205 }
206 }
207
208 pub async fn refresh_all(&mut self) -> Result<()> {
212 self.refresh(enum_iterator::all::<RefreshMetaData>()).await
213 }
214
215 pub async fn refresh(&mut self, data: impl IntoIterator<Item = RefreshMetaData>) -> Result<()> {
224 #[rustfmt::skip]
225 async fn inner(
226 client: &mut PaperlessClient,
227 data: &mut dyn Iterator<Item = RefreshMetaData>,
228 ) -> Result<()> {
229 let selected: std::collections::HashSet<_> = data.into_iter().collect();
230
231 if selected.is_empty() {
232 return Ok(());
233 }
234
235 let (tags, custom_fields, correspondents, document_types, groups, users, storage_paths) =
236 futures_util::try_join!(
237 async {
238 if selected.contains(&RefreshMetaData::Tags) {
239 Ok(Some(client.load_items::<Tag>().await?))
240 } else {
241 Ok::<Option<_>, Error>(None)
242 }
243 },
244 async {
245 if selected.contains(&RefreshMetaData::CustomFields) {
246 Ok(Some(client.load_items::<CustomField>().await?))
247 } else {
248 Ok(None)
249 }
250 },
251 async {
252 if selected.contains(&RefreshMetaData::Correspondents) {
253 Ok(Some(client.load_items::<Correspondent>().await?))
254 } else {
255 Ok(None)
256 }
257 },
258 async {
259 if selected.contains(&RefreshMetaData::DocumentTypes) {
260 Ok(Some(client.load_items::<DocumentType>().await?))
261 } else {
262 Ok(None)
263 }
264 },
265 async {
266 if selected.contains(&RefreshMetaData::Groups) {
267 Ok(Some(client.load_items::<Group>().await?))
268 } else {
269 Ok(None)
270 }
271 },
272 async {
273 if selected.contains(&RefreshMetaData::Users) {
274 Ok(Some(client.load_items::<User>().await?))
275 } else {
276 Ok(None)
277 }
278 },
279 async {
280 if selected.contains(&RefreshMetaData::StoragePaths) {
281 Ok(Some(client.load_items::<StoragePath>().await?))
282 } else {
283 Ok(None)
284 }
285 },
286 )?;
287
288 let cached_data = Arc::make_mut(&mut client.cached_data);
289
290 if let Some(value) = custom_fields { cached_data.custom_fields = value; }
291 if let Some(value) = correspondents { cached_data.correspondents = value; }
292 if let Some(value) = document_types { cached_data.document_types = value; }
293 if let Some(value) = groups { cached_data.groups = value; }
294 if let Some(value) = storage_paths { cached_data.storage_paths = value; }
295 if let Some(value) = tags { cached_data.tags = value; }
296 if let Some(value) = users { cached_data.users = value; }
297
298 Ok(())
299 }
300
301 inner(self, &mut data.into_iter()).await
302 }
303
304 pub async fn query_documents(&self, query: DocumentQueryBuilder) -> Result<Vec<Document>> {
306 let full_content = query.full_content;
307 let query_params = query.build();
308 let query: HashMap<&str, Cow<str>> = query_params
309 .query
310 .into_iter()
311 .map(|(k, v)| (k, Cow::Owned(v)))
312 .collect();
313
314 let documents: Vec<_> = self
315 .fetch_all_pages::<DocumentData>("/api/documents/", Some(&query))
316 .await?
317 .into_iter()
318 .map(|data| Document::new(data, Arc::new(self.clone()), !full_content))
319 .collect();
320
321 Ok(documents)
322 }
323
324 pub fn get_documents_by_tags(
326 &self,
327 tag_ids: &[TagId],
328 ) -> impl Future<Output = Result<Vec<Document>>> {
329 let query = DocumentQueryBuilder::default()
330 .full_content(self.request_full_content)
331 .full_permissions(self.request_full_permissions)
332 .tags_id_in(tag_ids.to_vec());
333
334 self.query_documents(query)
335 }
336
337 pub(crate) async fn get_document_data_by_id(
338 &self,
339 id: DocumentId,
340 full_content: Option<bool>,
341 full_permissions: Option<bool>,
342 ) -> Result<DocumentData> {
343 let mut params = self.default_query_params();
344
345 if full_content.is_some() || full_permissions.is_some() {
346 let mut updated_params = params.unwrap_or_default();
347
348 if let Some(full_content) = full_content {
349 updated_params.insert(
350 crate::document_query::QUERY_PARAM_TRUNCATE_CONTENT,
351 Cow::Owned((!full_content).to_string()),
352 );
353 }
354
355 if let Some(full_permissions) = full_permissions {
356 updated_params.insert(
357 crate::document_query::QUERY_PARAM_FULL_PERMISSIONS,
358 Cow::Owned(full_permissions.to_string()),
359 );
360 }
361
362 params = Some(updated_params);
363 }
364
365 self.request_json_no_body(
366 Method::GET,
367 &format!("/api/documents/{}/", id.0),
368 params.as_ref(),
369 )
370 .await
371 }
372
373 pub async fn get_document_by_id(
375 &self,
376 id: DocumentId,
377 full_content: Option<bool>,
378 full_permissions: Option<bool>,
379 ) -> Result<Document> {
380 let content_is_truncated = !full_content.unwrap_or(self.request_full_content);
381 Ok(Document::new(
382 self.get_document_data_by_id(id, full_content, full_permissions)
383 .await?,
384 Arc::new(self.clone()),
385 content_is_truncated,
386 ))
387 }
388
389 pub(crate) fn request_json_no_body<T: serde::de::DeserializeOwned>(
391 &self,
392 method: Method,
393 endpoint: &str,
394 query_params: Option<&HashMap<&str, Cow<str>>>,
395 ) -> impl Future<Output = Result<T>> {
396 self.request_json(method, endpoint, None::<&()>, query_params)
397 }
398
399 pub(crate) async fn request_json<T: serde::de::DeserializeOwned>(
401 &self,
402 method: Method,
403 endpoint: &str,
404 body: Option<&impl Serialize>,
405 query_params: Option<&HashMap<&str, Cow<'_, str>>>,
406 ) -> Result<T> {
407 let resp = self.request(method, endpoint, body, query_params).await?;
408
409 if tracing::enabled!(tracing::Level::TRACE) {
410 let response_text = resp.text().await.unwrap_or_default();
412 trace!(body = %response_text, "Response");
413
414 Ok(serde_json::from_str(&response_text)
415 .map_err(|e| Error::InvalidJson(format!("Failed to parse response body: {e:?}")))?)
416 } else {
417 Ok(resp
418 .json()
419 .await
420 .map_err(|e| Error::InvalidJson(format!("Failed to parse response body: {e:?}")))?)
421 }
422 }
423
424 pub(crate) fn request_no_body(
426 &self,
427 method: Method,
428 endpoint: &str,
429 query_params: Option<&HashMap<&str, Cow<'_, str>>>,
430 ) -> impl Future<Output = Result<reqwest::Response>> {
431 self.request(method, endpoint, None::<&()>, query_params)
432 }
433
434 pub(crate) async fn request(
436 &self,
437 method: Method,
438 endpoint: &str,
439 body: Option<&impl Serialize>,
440 query_params: Option<&HashMap<&str, Cow<'_, str>>>,
441 ) -> Result<reqwest::Response> {
442 let mut req = self
443 .client
444 .request(method, format!("{}{endpoint}", self.base_url))
445 .header(ACCEPT, "application/json");
446
447 if let Some(params) = query_params {
448 req = req.query(params);
449 }
450
451 if let Some(json_body) = body {
453 req = req.json(json_body);
454 }
455
456 let req = req.build().map_err(|e| Error::Request(e.into()))?;
457
458 if tracing::enabled!(tracing::Level::TRACE)
459 && let Some(body) = req.body().and_then(|b| b.as_bytes())
460 {
461 trace!(
462 method = ?req.method(),
463 url = ?req.url(),
464 body = %String::from_utf8_lossy(body),
465 "Sending request to Paperless API");
466 } else {
467 debug!(
468 method = ?req.method(),
469 url = ?req.url(),
470 "Sending request to Paperless API");
471 }
472
473 let resp = self
474 .client
475 .execute(req)
476 .await
477 .map_err(|e| Error::Other(format!("Failed to send request: {e}")))?;
478
479 debug!(status = ?resp.status(), "Response");
481
482 if resp.status() == StatusCode::NOT_FOUND {
483 return Err(Error::NotFound);
484 }
485
486 if !resp.status().is_success() {
487 return Err(Error::Response {
488 status_code: resp.status().as_u16(),
489 body: resp.text().await.unwrap_or_default(),
490 });
491 }
492
493 Ok(resp)
494 }
495
496 pub(crate) async fn fetch_all_pages<T: for<'de> Deserialize<'de>>(
497 &self,
498 endpoint: &str,
499 query_params: Option<&HashMap<&str, Cow<'_, str>>>,
500 ) -> Result<Vec<T>> {
501 let mut results = vec![];
502 let mut all_query_params = self.default_query_params().unwrap_or_default();
503 if let Some(query_params) = query_params {
504 all_query_params.extend(query_params.clone());
505 }
506
507 let mut all_query_params = Some(all_query_params);
508
509 let mut current_url = Some(endpoint.to_string());
510
511 while let Some(url) = current_url {
512 debug!("Fetching page: {url}");
513
514 let page: PaginatedResponse<T> = self
515 .request_json_no_body(Method::GET, &url, all_query_params.as_ref())
516 .await?;
517
518 results.extend(page.results);
519
520 current_url = page.next.and_then(|next_url| {
521 next_url
523 .strip_prefix(&*self.base_url)
524 .unwrap_or(&next_url)
525 .to_string()
526 .into()
527 });
528 all_query_params = None;
529 }
530
531 Ok(results)
532 }
533
534 pub async fn get_task_status(
536 &self,
537 task_id: Option<&TaskId>,
538 task_name: Option<&str>,
539 acknowledged: Option<bool>,
540 ) -> Result<Vec<Task>> {
541 let mut query = Vec::new();
542
543 if let Some(id) = task_id {
544 query.push(("task_id", id.to_string()));
545 }
546
547 if let Some(name) = task_name {
548 query.push(("task_name", name.to_string()));
549 }
550
551 if let Some(ack) = acknowledged {
552 query.push(("acknowledged", ack.to_string()));
553 }
554
555 let resp = self
556 .request_no_body(
557 Method::GET,
558 &format!(
559 "/api/tasks/?{}",
560 serde_urlencoded::to_string(&query)
561 .map_err(|e| Error::Other(format!("Failed to serialize query: {e}")))?
562 ),
563 None,
564 )
565 .await?;
566
567 let body = resp
568 .text()
569 .await
570 .map_err(|e| Error::Other(format!("Failed to read response body: {e:?}")))?;
571
572 trace!("get_task_status response: {:?}", body);
573
574 let tasks: Vec<Task> = match serde_json::from_str(&body) {
575 Ok(t) => t,
576 Err(e) => {
577 return Err(Error::InvalidJson(format!(
578 "Failed to parse response body: {e:?}"
579 )));
580 }
581 };
582
583 if tasks.is_empty() {
584 return Err(Error::NotFound);
585 }
586
587 Ok(tasks)
588 }
589
590 pub fn get_workflows(&self) -> impl Future<Output = Result<Vec<Workflow>>> {
592 self.fetch_all_pages("/api/workflows/", None)
593 }
594
595 pub fn get_saved_views(&self) -> impl Future<Output = Result<Vec<SavedView>>> {
597 self.fetch_all_pages("/api/saved_views/", None)
598 }
599
600 pub fn get_statistics(&self) -> impl Future<Output = Result<util::Statistics>> {
602 self.request_json_no_body(Method::GET, "/api/statistics/", None)
603 }
604
605 pub fn get_status(&self) -> impl Future<Output = Result<util::ServerStatus>> {
607 self.request_json_no_body(Method::GET, "/api/status/", None)
608 }
609
610 pub async fn create<T>(&self, new_item: &T) -> Result<T::BaseType>
616 where
617 T: CreateDto,
618 T::BaseType: Item,
619 {
620 let url = format!("/api/{}/", <T::BaseType as Item>::endpoint());
621 self.request_json(Method::POST, &url, Some(&new_item), None)
622 .await
623 }
624
625 pub async fn update<T>(&self, id: T::Id, update: &T) -> Result<T::BaseType>
631 where
632 T: UpdateDto,
633 T::BaseType: Item,
634 {
635 let url = format!("/api/{}/{}/", <T::BaseType as Item>::endpoint(), id);
636 self.request_json::<T::BaseType>(Method::PATCH, &url, Some(&update), None)
637 .await
638 }
639
640 pub async fn delete<T: ItemId>(&self, id: T) -> Result<()> {
644 let url = format!("/api/{}/{}/", T::endpoint(), id);
645 self.request_no_body(Method::DELETE, &url, None).await?;
646 Ok(())
647 }
648
649 pub async fn load_by_id<T: Item>(&self, id: T::Id) -> Result<Option<T>> {
653 let url = format!("/api/{}/{}/", T::endpoint(), id);
654 match self.request_json_no_body(Method::GET, &url, None).await {
655 found_item @ Ok(_) => found_item,
656 Err(Error::NotFound) => Ok(None),
657 err @ Err(_) => err,
658 }
659 }
660
661 pub async fn upload_document(&self, file_path: &Path, filename: &str) -> Result<TaskId> {
665 let stream = tokio::fs::File::open(file_path)
666 .await
667 .map_err(|e| Error::Other(format!("Failed to open file: {e}")))?;
668
669 let form = multipart::Form::new().part(
670 "document",
671 multipart::Part::stream(stream).file_name(filename.to_string()),
672 );
673
674 let url = format!("{}/api/documents/post_document/", self.base_url);
675
676 let resp = self
677 .client
678 .post(&url)
679 .multipart(form)
680 .send()
681 .await
682 .map_err(|e| Error::Other(format!("Failed to send request: {e}")))?;
683
684 let status = resp.status();
685 if !resp.status().is_success() {
686 return Err(Error::Response {
687 status_code: status.as_u16(),
688 body: resp.text().await.unwrap_or_default(),
689 });
690 }
691
692 let task_id: String = resp
693 .json()
694 .await
695 .map_err(|e| Error::Other(format!("Failed to parse task ID: {e:?}")))?;
696 Ok(TaskId(task_id))
697 }
698
699 #[inline]
701 #[must_use]
702 pub fn tags(&self) -> &HashMap<TagId, Tag> {
703 &self.cached_data.tags
704 }
705
706 #[inline]
708 #[must_use]
709 pub fn storage_paths(&self) -> &HashMap<StoragePathId, StoragePath> {
710 &self.cached_data.storage_paths
711 }
712
713 #[must_use]
715 pub fn find_tag_by_name(&self, name: &str) -> Option<&Tag> {
716 self.cached_data.tags.values().find(|tag| tag.name == name)
717 }
718
719 #[inline]
721 #[must_use]
722 pub fn document_types(&self) -> &HashMap<DocumentTypeId, DocumentType> {
723 &self.cached_data.document_types
724 }
725
726 #[must_use]
728 pub fn find_document_type_by_name(&self, name: &str) -> Option<&DocumentType> {
729 self.cached_data
730 .document_types
731 .values()
732 .find(|dt| dt.name == name)
733 }
734
735 #[inline]
737 #[must_use]
738 pub fn correspondents(&self) -> &HashMap<CorrespondentId, Correspondent> {
739 &self.cached_data.correspondents
740 }
741
742 #[inline]
744 #[must_use]
745 pub fn custom_fields(&self) -> &HashMap<CustomFieldId, CustomField> {
746 &self.cached_data.custom_fields
747 }
748
749 #[must_use]
751 pub fn find_custom_field_by_name(&self, name: &str) -> Option<&CustomField> {
752 self.cached_data
753 .custom_fields
754 .values()
755 .find(|field| field.name == name)
756 }
757
758 #[inline]
760 #[must_use]
761 pub fn users(&self) -> &HashMap<UserId, User> {
762 &self.cached_data.users
763 }
764
765 #[inline]
767 #[must_use]
768 pub fn groups(&self) -> &HashMap<GroupId, Group> {
769 &self.cached_data.groups
770 }
771}