Skip to main content

paperless_api/
client.rs

1//! The central client for interacting with Paperless.
2
3use std::{collections::HashMap, path::Path, str::FromStr, sync::Arc};
4
5use enum_iterator::Sequence;
6use reqwest::{
7    Method, StatusCode,
8    header::{ACCEPT, HeaderMap, HeaderName, InvalidHeaderValue},
9    multipart,
10};
11use serde::{Deserialize, de::DeserializeOwned};
12use tracing::{debug, trace};
13
14use crate::{
15    Error, Group, Result, SavedView, User,
16    document::{Document, DocumentData},
17    document_query::DocumentQueryBuilder,
18    dto::{CreateDto, Item, UpdateDto},
19    id::{
20        CorrespondentId, CustomFieldId, DocumentId, DocumentTypeId, GroupId, StoragePathId, TagId,
21        TaskId, UserId,
22    },
23    metadata::{
24        correspondent::Correspondent, custom_field::CustomField, document_type::DocumentType,
25        storage_path::StoragePath, tag::Tag,
26    },
27    task::Task,
28    util,
29    workflow::Workflow,
30};
31
32/// Selects which cached metadata to refresh.
33///
34/// Cached data is data which is rarely updated;
35/// refreshing it is normally not necessary on every request.
36#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash, Sequence)]
37#[non_exhaustive]
38pub enum RefreshMetaData {
39    Tags,
40    CustomFields,
41    Correspondents,
42    DocumentTypes,
43    Groups,
44    Users,
45    StoragePaths,
46}
47
48/// Client to interact with Paperless.
49#[derive(Debug, Clone)]
50pub struct PaperlessClient {
51    /// Whether to request full permissions data for items.
52    pub request_full_permissions: bool,
53
54    /// Whether to always request the full document content.
55    pub request_full_content: bool,
56
57    pub(crate) base_url: Arc<str>,
58
59    client: reqwest::Client,
60    cached_data: Arc<CachedData>,
61}
62
63#[derive(Debug, Clone)]
64struct CachedData {
65    correspondents: HashMap<CorrespondentId, Correspondent>,
66    custom_fields: HashMap<CustomFieldId, CustomField>,
67    document_types: HashMap<DocumentTypeId, DocumentType>,
68    groups: HashMap<GroupId, Group>,
69    storage_paths: HashMap<StoragePathId, StoragePath>,
70    tags: HashMap<TagId, Tag>,
71    users: HashMap<UserId, User>,
72}
73
74#[derive(Debug, Deserialize)]
75struct PaginatedResponse<T> {
76    results: Vec<T>,
77    next: Option<String>,
78}
79
80impl PaperlessClient {
81    /// Create a new Paperless client.
82    ///
83    /// # Arguments
84    ///
85    /// * `base_url` - The base URL of the Paperless API.
86    /// * `token` - The authentication token for the Paperless API.
87    /// * `headers` - Optional additional headers to include in requests.
88    pub fn new(
89        base_url: &str,
90        token: &str,
91        headers: Option<&HashMap<String, String>>,
92    ) -> std::result::Result<Self, String> {
93        Self::new_with_client(
94            base_url,
95            token,
96            headers,
97            reqwest::Client::builder().zstd(true),
98        )
99    }
100
101    /// Create a new Paperless client.
102    ///
103    /// Provide a [`reqwest::ClientBuilder`] to customize the HTTP client,
104    /// such as adding custom headers or disabling compression.
105    ///
106    /// # Arguments
107    ///
108    /// * `base_url` - The base URL of the Paperless API.
109    /// * `token` - The authentication token for the Paperless API.
110    /// * `headers` - Optional additional headers to include in requests.
111    /// * `client_builder` - [`reqwest::ClientBuilder`] to use for creating the HTTP client.
112    pub fn new_with_client(
113        base_url: &str,
114        token: &str,
115        headers: Option<&HashMap<String, String>>,
116        client_builder: reqwest::ClientBuilder,
117    ) -> std::result::Result<Self, String> {
118        let mut headers_map = HeaderMap::new();
119
120        // Add additional headers if provided
121        if let Some(headers) = headers {
122            for (key, value) in headers {
123                headers_map.insert(
124                    HeaderName::from_str(key).map_err(|err| err.to_string())?,
125                    value
126                        .parse()
127                        .map_err(|err: InvalidHeaderValue| err.to_string())?,
128                );
129            }
130        }
131
132        // Add the Paperless token header
133        headers_map.insert(
134            HeaderName::from_str("Authorization").map_err(|err| err.to_string())?,
135            format!("Token {token}")
136                .parse()
137                .map_err(|err: InvalidHeaderValue| err.to_string())?,
138        );
139
140        Ok(Self {
141            request_full_permissions: false,
142            request_full_content: false,
143            base_url: base_url.into(),
144            client: client_builder
145                .default_headers(headers_map)
146                .build()
147                .map_err(|err| err.to_string())?,
148            cached_data: Arc::new(CachedData {
149                custom_fields: HashMap::new(),
150                correspondents: HashMap::new(),
151                document_types: HashMap::new(),
152                groups: HashMap::new(),
153                storage_paths: HashMap::new(),
154                tags: HashMap::new(),
155                users: HashMap::new(),
156            }),
157        })
158    }
159
160    /// Sets whether to request full permissions data for items during refresh.
161    ///
162    /// If not enabled only simple permission data is loaded.
163    /// See [`ItemPermissions`](crate::metadata::permission::ItemPermissions) for more details.
164    #[must_use]
165    pub fn with_full_permissions(mut self, req: bool) -> Self {
166        self.request_full_permissions = req;
167        self
168    }
169
170    #[must_use]
171    pub fn with_full_content(mut self, full_content: bool) -> Self {
172        self.request_full_content = full_content;
173        self
174    }
175
176    /// Loads all items of the given type from the API.
177    async fn load_items<T: Item + DeserializeOwned>(&self) -> Result<HashMap<T::Id, T>> {
178        debug!("Loading {}", T::endpoint());
179        let endpoint = format!("/api/{}/", T::endpoint());
180
181        let items: Vec<T> = self.fetch_all_pages(&endpoint, None).await?;
182        Ok(items.into_iter().map(|item| (item.id(), item)).collect())
183    }
184
185    fn default_query_params(&self) -> Option<Vec<(&'static str, &'static str)>> {
186        let mut params = Vec::with_capacity(2);
187
188        if self.request_full_permissions {
189            params.push((crate::document_query::QUERY_PARAM_FULL_PERMISSIONS, "true"));
190        }
191        if !self.request_full_content {
192            params.push((crate::document_query::QUERY_PARAM_TRUNCATE_CONTENT, "true"));
193        }
194
195        if params.is_empty() {
196            None
197        } else {
198            Some(params)
199        }
200    }
201
202    /// Refresh and cache all metadata.
203    ///
204    /// Only updates the cache for this instance, cloned instances will not see the changes.
205    pub async fn refresh_all(&mut self) -> Result<()> {
206        self.refresh(enum_iterator::all::<RefreshMetaData>()).await
207    }
208
209    /// Refresh and cache the selected metadata.
210    ///
211    /// Only updates the cache for this instance, cloned instances will not see the changes.
212    ///
213    /// # Arguments
214    ///
215    /// * `data` - The metadata to refresh.
216    /// * `full_permissions` - Whether to use request full permissions data for the items being refreshed.
217    pub async fn refresh(&mut self, data: impl IntoIterator<Item = RefreshMetaData>) -> Result<()> {
218        #[rustfmt::skip]
219        async fn inner(
220            client: &mut PaperlessClient,
221            data: &mut dyn Iterator<Item = RefreshMetaData>,
222        ) -> Result<()> {
223            let selected: std::collections::HashSet<_> = data.into_iter().collect();
224
225            if selected.is_empty() {
226                return Ok(());
227            }
228
229            let (tags, custom_fields, correspondents, document_types, groups, users, storage_paths) =
230                futures_util::try_join!(
231                    async {
232                        if selected.contains(&RefreshMetaData::Tags) {
233                            Ok(Some(client.load_items::<Tag>().await?))
234                        } else {
235                            Ok::<Option<_>, Error>(None)
236                        }
237                    },
238                    async {
239                        if selected.contains(&RefreshMetaData::CustomFields) {
240                            Ok(Some(client.load_items::<CustomField>().await?))
241                        } else {
242                            Ok(None)
243                        }
244                    },
245                    async {
246                        if selected.contains(&RefreshMetaData::Correspondents) {
247                            Ok(Some(client.load_items::<Correspondent>().await?))
248                        } else {
249                            Ok(None)
250                        }
251                    },
252                    async {
253                        if selected.contains(&RefreshMetaData::DocumentTypes) {
254                            Ok(Some(client.load_items::<DocumentType>().await?))
255                        } else {
256                            Ok(None)
257                        }
258                    },
259                    async {
260                        if selected.contains(&RefreshMetaData::Groups) {
261                            Ok(Some(client.load_items::<Group>().await?))
262                        } else {
263                            Ok(None)
264                        }
265                    },
266                    async {
267                        if selected.contains(&RefreshMetaData::Users) {
268                            Ok(Some(client.load_items::<User>().await?))
269                        } else {
270                            Ok(None)
271                        }
272                    },
273                    async {
274                        if selected.contains(&RefreshMetaData::StoragePaths) {
275                            Ok(Some(client.load_items::<StoragePath>().await?))
276                        } else {
277                            Ok(None)
278                        }
279                    },
280                )?;
281
282            let cached_data = Arc::make_mut(&mut client.cached_data);
283
284            if let Some(value) = custom_fields { cached_data.custom_fields = value; }
285            if let Some(value) = correspondents { cached_data.correspondents = value; }
286            if let Some(value) = document_types { cached_data.document_types = value; }
287            if let Some(value) = groups { cached_data.groups = value; }
288            if let Some(value) = storage_paths { cached_data.storage_paths = value; }
289            if let Some(value) = tags { cached_data.tags = value; }
290            if let Some(value) = users { cached_data.users = value; }
291
292            Ok(())
293        }
294
295        inner(self, &mut data.into_iter()).await
296    }
297
298    /// Query documents using the given [`DocumentQueryBuilder`].
299    pub async fn query_documents(&self, query: DocumentQueryBuilder) -> Result<Vec<Document>> {
300        let full_content = query.full_content;
301        let query_params = query.build();
302        let query_vec: Vec<_> = query_params
303            .query
304            .iter()
305            .map(|(k, v)| (*k, v.as_str()))
306            .collect();
307        let query_slice = query_vec.as_slice();
308
309        let documents: Vec<_> = self
310            .fetch_all_pages::<DocumentData>("/api/documents/", Some(query_slice))
311            .await?
312            .into_iter()
313            .map(|data| Document::new(data, Arc::new(self.clone()), !full_content))
314            .collect();
315
316        Ok(documents)
317    }
318
319    /// Get all documents with any of the given tags.
320    pub fn get_documents_by_tags(
321        &self,
322        tag_ids: &[TagId],
323    ) -> impl Future<Output = Result<Vec<Document>>> {
324        let query = DocumentQueryBuilder::default()
325            .full_content(self.request_full_content)
326            .full_permissions(self.request_full_permissions)
327            .tags_id_in(tag_ids.to_vec());
328
329        self.query_documents(query)
330    }
331
332    pub(crate) async fn get_document_data_by_id(&self, id: DocumentId) -> Result<DocumentData> {
333        self.request_json(
334            Method::GET,
335            &format!("/api/documents/{}/", id.0),
336            None,
337            self.default_query_params().as_deref(),
338        )
339        .await
340    }
341
342    /// Get a document by its ID.
343    pub async fn get_document_by_id(&self, id: DocumentId) -> Result<Document> {
344        Ok(Document::new(
345            self.get_document_data_by_id(id).await?,
346            Arc::new(self.clone()),
347            false,
348        ))
349    }
350
351    /// Make a request and parse the response as JSON.
352    pub(crate) async fn request_json<T: serde::de::DeserializeOwned>(
353        &self,
354        method: Method,
355        endpoint: &str,
356        body: Option<&serde_json::Value>,
357        query_params: Option<&[(&str, &str)]>,
358    ) -> Result<T> {
359        let resp = self.request(method, endpoint, body, query_params).await?;
360
361        if tracing::enabled!(tracing::Level::TRACE) {
362            // Only log the response body if trace logging is enabled to avoid unnecessary overhead
363            let response_text = resp.text().await.unwrap_or_default();
364            trace!(body = %response_text, "Response");
365
366            Ok(serde_json::from_str(&response_text)
367                .map_err(|e| Error::InvalidJson(format!("Failed to parse response body: {e:?}")))?)
368        } else {
369            Ok(resp
370                .json()
371                .await
372                .map_err(|e| Error::InvalidJson(format!("Failed to parse response body: {e:?}")))?)
373        }
374    }
375
376    /// Make a request and return the raw [`reqwest::Response`].
377    pub(crate) async fn request(
378        &self,
379        method: Method,
380        endpoint: &str,
381        body: Option<&serde_json::Value>,
382        query_params: Option<&[(&str, &str)]>,
383    ) -> Result<reqwest::Response> {
384        let mut req = self
385            .client
386            .request(method, format!("{}{endpoint}", self.base_url))
387            .header(ACCEPT, "application/json");
388
389        if let Some(params) = query_params {
390            req = req.query(params);
391        }
392
393        // Set payload body if provided
394        if let Some(json_body) = body {
395            req = req.json(json_body);
396        }
397
398        let req = req.build().map_err(|e| Error::Request(e.into()))?;
399
400        if tracing::enabled!(tracing::Level::TRACE)
401            && let Some(body) = req.body().and_then(|b| b.as_bytes())
402        {
403            trace!(
404                method = ?req.method(),
405                url = ?req.url(),
406                body = %String::from_utf8_lossy(body),
407                "Sending request to Paperless API");
408        } else {
409            debug!(
410                method = ?req.method(),
411                url = ?req.url(),
412                "Sending request to Paperless API");
413        }
414
415        let resp = self
416            .client
417            .execute(req)
418            .await
419            .map_err(|e| Error::Other(format!("Failed to send request: {e}")))?;
420
421        // Log the response body for debugging
422        debug!(status = ?resp.status(), "Response");
423
424        if resp.status() == StatusCode::NOT_FOUND {
425            return Err(Error::NotFound);
426        }
427
428        if !resp.status().is_success() {
429            return Err(Error::Response {
430                status_code: resp.status().as_u16(),
431                body: resp.text().await.unwrap_or_default(),
432            });
433        }
434
435        Ok(resp)
436    }
437
438    pub(crate) async fn fetch_all_pages<T: for<'de> Deserialize<'de>>(
439        &self,
440        endpoint: &str,
441        query_params: Option<&[(&str, &str)]>,
442    ) -> Result<Vec<T>> {
443        let mut results = vec![];
444        let mut all_query_params = self.default_query_params().unwrap_or_default();
445        all_query_params.extend(query_params.unwrap_or_default());
446        let mut all_query_params = Some(all_query_params);
447
448        let mut current_url = Some(endpoint.to_string());
449
450        while let Some(url) = current_url {
451            debug!("Fetching page: {url}");
452
453            let page: PaginatedResponse<T> = self
454                .request_json(Method::GET, &url, None, all_query_params.as_deref())
455                .await?;
456
457            results.extend(page.results);
458
459            current_url = page.next.and_then(|next_url| {
460                // Extract just the path from the full URL
461                next_url
462                    .strip_prefix(&*self.base_url)
463                    .unwrap_or(&next_url)
464                    .to_string()
465                    .into()
466            });
467            all_query_params = None;
468        }
469
470        Ok(results)
471    }
472
473    /// Get all tasks with optional filtering by ID, name, or acknowledged status.
474    pub async fn get_task_status(
475        &self,
476        task_id: Option<&TaskId>,
477        task_name: Option<&str>,
478        acknowledged: Option<bool>,
479    ) -> Result<Vec<Task>> {
480        let mut query = Vec::new();
481
482        if let Some(id) = task_id {
483            query.push(("task_id", id.to_string()));
484        }
485
486        if let Some(name) = task_name {
487            query.push(("task_name", name.to_string()));
488        }
489
490        if let Some(ack) = acknowledged {
491            query.push(("acknowledged", ack.to_string()));
492        }
493
494        let resp = self
495            .request(
496                Method::GET,
497                &format!(
498                    "/api/tasks/?{}",
499                    serde_urlencoded::to_string(&query)
500                        .map_err(|e| Error::Other(format!("Failed to serialize query: {e}")))?
501                ),
502                None::<&serde_json::Value>,
503                None,
504            )
505            .await?;
506
507        let body = resp
508            .text()
509            .await
510            .map_err(|e| Error::Other(format!("Failed to read response body: {e:?}")))?;
511
512        trace!("get_task_status response: {:?}", body);
513
514        let tasks: Vec<Task> = match serde_json::from_str(&body) {
515            Ok(t) => t,
516            Err(e) => {
517                return Err(Error::InvalidJson(format!(
518                    "Failed to parse response body: {e:?}"
519                )));
520            }
521        };
522
523        if tasks.is_empty() {
524            return Err(Error::NotFound);
525        }
526
527        Ok(tasks)
528    }
529
530    /// Get all workflows.
531    pub fn get_workflows(&self) -> impl Future<Output = Result<Vec<Workflow>>> {
532        self.fetch_all_pages("/api/workflows/", None)
533    }
534
535    /// Get all saved views.
536    pub fn get_saved_views(&self) -> impl Future<Output = Result<Vec<SavedView>>> {
537        self.fetch_all_pages("/api/saved_views/", None)
538    }
539
540    /// Get server statistics.
541    pub fn get_statistics(&self) -> impl Future<Output = Result<util::Statistics>> {
542        self.request_json(Method::GET, "/api/statistics/", None, None)
543    }
544
545    /// Get server status.
546    pub fn get_status(&self) -> impl Future<Output = Result<util::ServerStatus>> {
547        self.request_json(Method::GET, "/api/status/", None, None)
548    }
549
550    /// Create a new item on the server.
551    ///
552    /// All structs which implement [`CreateDtoObject`](crate::dto::CreateDtoObject) can be used as `new_item`.
553    ///
554    /// Returns the created item.
555    pub async fn create<T: CreateDto>(&self, new_item: &T) -> Result<T::BaseType> {
556        let url = format!("/api/{}/", T::endpoint());
557        self.request_json(
558            Method::POST,
559            &url,
560            Some(&serde_json::to_value(new_item).map_err(|e| Error::Other(e.to_string()))?),
561            None,
562        )
563        .await
564    }
565
566    /// Updates an existing.
567    ///
568    /// All structs which implement [`UpdateDtoObject`](crate::dto::UpdateDtoObject) can be used as `item`.
569    ///
570    /// Returns the updated item
571    pub async fn update<T: UpdateDto>(&self, id: T::Id, update: &T) -> Result<T::BaseType> {
572        let url = format!("/api/{}/{}/", T::endpoint(), id);
573        self.request_json::<T::BaseType>(
574            Method::PATCH,
575            &url,
576            Some(&serde_json::to_value(update).map_err(|e| Error::Other(e.to_string()))?),
577            None,
578        )
579        .await
580    }
581
582    /// Deletes an existing item.
583    ///
584    /// All structs which implement [`UpdateDtoObject`](crate::dto::UpdateDtoObject) can be used.
585    pub async fn delete<T: Item>(&self, id: T::Id) -> Result<()> {
586        let url = format!("/api/{}/{}/", T::endpoint(), id);
587        self.request(Method::DELETE, &url, None, None).await?;
588        Ok(())
589    }
590
591    /// Load an existing item directly from the server, bypassing the caches.
592    ///
593    /// All structs which implement [`Item`] can be used.
594    pub async fn load_by_id<T: Item>(&self, id: T::Id) -> Result<Option<T::BaseType>> {
595        let url = format!("/api/{}/{}/", T::endpoint(), id);
596        match self.request_json(Method::GET, &url, None, None).await {
597            found_item @ Ok(_) => found_item,
598            Err(Error::NotFound) => Ok(None),
599            err @ Err(_) => err,
600        }
601    }
602
603    /// Upload a document to Paperless.
604    ///
605    /// Returns the task ID on success.
606    pub async fn upload_document(&self, file_path: &Path, filename: &str) -> Result<TaskId> {
607        let stream = tokio::fs::File::open(file_path)
608            .await
609            .map_err(|e| Error::Other(format!("Failed to open file: {e}")))?;
610
611        let form = multipart::Form::new().part(
612            "document",
613            multipart::Part::stream(stream).file_name(filename.to_string()),
614        );
615
616        let url = format!("{}/api/documents/post_document/", self.base_url);
617
618        let resp = self
619            .client
620            .post(&url)
621            .multipart(form)
622            .send()
623            .await
624            .map_err(|e| Error::Other(format!("Failed to send request: {e}")))?;
625
626        let status = resp.status();
627        if !resp.status().is_success() {
628            return Err(Error::Response {
629                status_code: status.as_u16(),
630                body: resp.text().await.unwrap_or_default(),
631            });
632        }
633
634        let task_id: String = resp
635            .json()
636            .await
637            .map_err(|e| Error::Other(format!("Failed to parse task ID: {e:?}")))?;
638        Ok(TaskId(task_id))
639    }
640
641    /// Get the tags cache.
642    #[inline]
643    #[must_use]
644    pub fn tags(&self) -> &HashMap<TagId, Tag> {
645        &self.cached_data.tags
646    }
647
648    /// Get the storage paths cache.
649    #[inline]
650    #[must_use]
651    pub fn storage_paths(&self) -> &HashMap<StoragePathId, StoragePath> {
652        &self.cached_data.storage_paths
653    }
654
655    /// Find a tag by its name.
656    #[must_use]
657    pub fn find_tag_by_name(&self, name: &str) -> Option<&Tag> {
658        self.cached_data.tags.values().find(|tag| tag.name == name)
659    }
660
661    /// Get the document types cache.
662    #[inline]
663    #[must_use]
664    pub fn document_types(&self) -> &HashMap<DocumentTypeId, DocumentType> {
665        &self.cached_data.document_types
666    }
667
668    /// Find a document type by its name.
669    #[must_use]
670    pub fn find_document_type_by_name(&self, name: &str) -> Option<&DocumentType> {
671        self.cached_data
672            .document_types
673            .values()
674            .find(|dt| dt.name == name)
675    }
676
677    /// Get the correspondents cache.
678    #[inline]
679    #[must_use]
680    pub fn correspondents(&self) -> &HashMap<CorrespondentId, Correspondent> {
681        &self.cached_data.correspondents
682    }
683
684    /// Get the custom fields cache.
685    #[inline]
686    #[must_use]
687    pub fn custom_fields(&self) -> &HashMap<CustomFieldId, CustomField> {
688        &self.cached_data.custom_fields
689    }
690
691    /// Find a custom field by its name.
692    #[must_use]
693    pub fn find_custom_field_by_name(&self, name: &str) -> Option<&CustomField> {
694        self.cached_data
695            .custom_fields
696            .values()
697            .find(|field| field.name == name)
698    }
699
700    /// Get the users cache.
701    #[inline]
702    #[must_use]
703    pub fn users(&self) -> &HashMap<UserId, User> {
704        &self.cached_data.users
705    }
706
707    /// Get the groups cache.
708    #[inline]
709    #[must_use]
710    pub fn groups(&self) -> &HashMap<GroupId, Group> {
711        &self.cached_data.groups
712    }
713}