meilisearch_sdk/
tasks.rs

1use serde::{Deserialize, Deserializer, Serialize};
2use serde_json::{Map, Value};
3use std::time::Duration;
4use time::OffsetDateTime;
5
6use crate::{
7    client::Client, client::SwapIndexes, errors::Error, errors::MeilisearchError, indexes::Index,
8    request::HttpClient, settings::Settings, task_info::TaskInfo,
9};
10
11#[derive(Debug, Clone, Deserialize)]
12#[serde(rename_all = "camelCase", tag = "type")]
13pub enum TaskType {
14    Customs,
15    DocumentAdditionOrUpdate {
16        details: Option<DocumentAdditionOrUpdate>,
17    },
18    DocumentDeletion {
19        details: Option<DocumentDeletion>,
20    },
21    IndexCreation {
22        details: Option<IndexCreation>,
23    },
24    IndexUpdate {
25        details: Option<IndexUpdate>,
26    },
27    IndexDeletion {
28        details: Option<IndexDeletion>,
29    },
30    SettingsUpdate {
31        details: Box<Option<Settings>>,
32    },
33    DumpCreation {
34        details: Option<DumpCreation>,
35    },
36    IndexSwap {
37        details: Option<IndexSwap>,
38    },
39    TaskCancelation {
40        details: Option<TaskCancelation>,
41    },
42    TaskDeletion {
43        details: Option<TaskDeletion>,
44    },
45    SnapshotCreation {
46        details: Option<SnapshotCreation>,
47    },
48    IndexCompaction {
49        details: Option<IndexCompaction>,
50    },
51}
52
53#[derive(Debug, Clone, Deserialize)]
54pub struct TasksResults {
55    pub results: Vec<Task>,
56    pub total: u64,
57    pub limit: u32,
58    pub from: Option<u32>,
59    pub next: Option<u32>,
60}
61
62#[derive(Debug, Clone, Deserialize)]
63#[serde(rename_all = "camelCase")]
64pub struct DocumentAdditionOrUpdate {
65    pub indexed_documents: Option<usize>,
66    pub received_documents: usize,
67}
68
69#[derive(Debug, Clone, Deserialize)]
70#[serde(rename_all = "camelCase")]
71pub struct DocumentDeletion {
72    pub provided_ids: Option<usize>,
73    pub deleted_documents: Option<usize>,
74    pub original_filter: Option<String>,
75}
76
77#[derive(Debug, Clone, Deserialize)]
78#[serde(rename_all = "camelCase")]
79pub struct IndexCreation {
80    pub primary_key: Option<String>,
81}
82
83#[derive(Debug, Clone, Deserialize)]
84#[serde(rename_all = "camelCase")]
85pub struct IndexUpdate {
86    pub primary_key: Option<String>,
87}
88
89#[derive(Debug, Clone, Deserialize)]
90#[serde(rename_all = "camelCase")]
91pub struct IndexDeletion {
92    pub deleted_documents: Option<usize>,
93}
94
95#[derive(Debug, Clone, Deserialize)]
96#[serde(rename_all = "camelCase")]
97pub struct SnapshotCreation {}
98
99#[derive(Debug, Clone, Deserialize)]
100#[serde(rename_all = "camelCase")]
101pub struct IndexCompaction {}
102
103#[derive(Debug, Clone, Deserialize)]
104#[serde(rename_all = "camelCase")]
105pub struct DumpCreation {
106    pub dump_uid: Option<String>,
107}
108
109#[derive(Debug, Clone, Deserialize)]
110#[serde(rename_all = "camelCase")]
111pub struct IndexSwap {
112    pub swaps: Vec<SwapIndexes>,
113}
114
115#[derive(Debug, Clone, Deserialize)]
116#[serde(rename_all = "camelCase")]
117pub struct TaskCancelation {
118    pub matched_tasks: usize,
119    pub canceled_tasks: Option<usize>,
120    pub original_filter: String,
121}
122
123#[derive(Debug, Clone, Deserialize)]
124#[serde(rename_all = "camelCase")]
125pub struct TaskDeletion {
126    pub matched_tasks: usize,
127    pub deleted_tasks: Option<usize>,
128    pub original_filter: String,
129}
130
131#[derive(Deserialize, Debug, Clone)]
132#[serde(rename_all = "camelCase")]
133pub struct FailedTask {
134    pub error: MeilisearchError,
135    #[serde(flatten)]
136    pub task: SucceededTask,
137}
138
139impl AsRef<u32> for FailedTask {
140    fn as_ref(&self) -> &u32 {
141        &self.task.uid
142    }
143}
144
145fn deserialize_duration<'de, D>(deserializer: D) -> Result<Duration, D::Error>
146where
147    D: Deserializer<'de>,
148{
149    let s = String::deserialize(deserializer)?;
150    let iso_duration = iso8601::duration(&s).map_err(serde::de::Error::custom)?;
151    Ok(iso_duration.into())
152}
153
154#[derive(Deserialize, Debug, Clone)]
155#[serde(rename_all = "camelCase")]
156pub struct SucceededTask {
157    #[serde(deserialize_with = "deserialize_duration")]
158    pub duration: Duration,
159    #[serde(with = "time::serde::rfc3339")]
160    pub enqueued_at: OffsetDateTime,
161    #[serde(with = "time::serde::rfc3339")]
162    pub started_at: OffsetDateTime,
163    #[serde(with = "time::serde::rfc3339")]
164    pub finished_at: OffsetDateTime,
165    pub canceled_by: Option<usize>,
166    pub index_uid: Option<String>,
167    pub error: Option<MeilisearchError>,
168    /// Remotes object returned by the server for this task (present since Meilisearch 1.19)
169    #[serde(skip_serializing_if = "Option::is_none")]
170    pub remotes: Option<Map<String, Value>>,
171    #[serde(flatten)]
172    pub update_type: TaskType,
173    pub uid: u32,
174}
175
176impl AsRef<u32> for SucceededTask {
177    fn as_ref(&self) -> &u32 {
178        &self.uid
179    }
180}
181
182#[derive(Debug, Clone, Deserialize)]
183#[serde(rename_all = "camelCase")]
184pub struct EnqueuedTask {
185    #[serde(with = "time::serde::rfc3339")]
186    pub enqueued_at: OffsetDateTime,
187    pub index_uid: Option<String>,
188    /// Remotes object returned by the server for this enqueued task
189    #[serde(skip_serializing_if = "Option::is_none")]
190    pub remotes: Option<Map<String, Value>>,
191    #[serde(flatten)]
192    pub update_type: TaskType,
193    pub uid: u32,
194}
195
196impl AsRef<u32> for EnqueuedTask {
197    fn as_ref(&self) -> &u32 {
198        &self.uid
199    }
200}
201
202#[derive(Debug, Clone, Deserialize)]
203#[serde(rename_all = "camelCase")]
204pub struct ProcessingTask {
205    #[serde(with = "time::serde::rfc3339")]
206    pub enqueued_at: OffsetDateTime,
207    #[serde(with = "time::serde::rfc3339")]
208    pub started_at: OffsetDateTime,
209    pub index_uid: Option<String>,
210    /// Remotes object returned by the server for this processing task
211    #[serde(skip_serializing_if = "Option::is_none")]
212    pub remotes: Option<Map<String, Value>>,
213    #[serde(flatten)]
214    pub update_type: TaskType,
215    pub uid: u32,
216}
217
218impl AsRef<u32> for ProcessingTask {
219    fn as_ref(&self) -> &u32 {
220        &self.uid
221    }
222}
223
224#[derive(Debug, Clone, Deserialize)]
225#[serde(rename_all = "camelCase", tag = "status")]
226pub enum Task {
227    Enqueued {
228        #[serde(flatten)]
229        content: EnqueuedTask,
230    },
231    Processing {
232        #[serde(flatten)]
233        content: ProcessingTask,
234    },
235    Failed {
236        #[serde(flatten)]
237        content: FailedTask,
238    },
239    Succeeded {
240        #[serde(flatten)]
241        content: SucceededTask,
242    },
243}
244
245impl Task {
246    #[must_use]
247    pub fn get_uid(&self) -> u32 {
248        match self {
249            Self::Enqueued { content } => *content.as_ref(),
250            Self::Processing { content } => *content.as_ref(),
251            Self::Failed { content } => *content.as_ref(),
252            Self::Succeeded { content } => *content.as_ref(),
253        }
254    }
255
256    /// Wait until Meilisearch processes a [Task], and get its status.
257    ///
258    /// `interval` = The frequency at which the server should be polled. **Default = 50ms**
259    ///
260    /// `timeout` = The maximum time to wait for processing to complete. **Default = 5000ms**
261    ///
262    /// If the waited time exceeds `timeout` then an [`Error::Timeout`] will be returned.
263    ///
264    /// See also [`Client::wait_for_task`, `Index::wait_for_task`].
265    ///
266    /// # Example
267    ///
268    /// ```
269    /// # use meilisearch_sdk::{client::*, indexes::*, tasks::Task};
270    /// # use serde::{Serialize, Deserialize};
271    /// #
272    /// # let MEILISEARCH_URL = option_env!("MEILISEARCH_URL").unwrap_or("http://localhost:7700");
273    /// # let MEILISEARCH_API_KEY = option_env!("MEILISEARCH_API_KEY").unwrap_or("masterKey");
274    /// #
275    /// # #[derive(Debug, Serialize, Deserialize, PartialEq)]
276    /// # struct Document {
277    /// #    id: usize,
278    /// #    value: String,
279    /// #    kind: String,
280    /// # }
281    /// #
282    /// #
283    /// # tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap().block_on(async {
284    /// # let client = Client::new(MEILISEARCH_URL, Some(MEILISEARCH_API_KEY)).unwrap();
285    /// let movies = client.index("movies_wait_for_completion");
286    ///
287    /// let status = movies.add_documents(&[
288    ///     Document { id: 0, kind: "title".into(), value: "The Social Network".to_string() },
289    ///     Document { id: 1, kind: "title".into(), value: "Harry Potter and the Sorcerer's Stone".to_string() },
290    /// ], None)
291    ///     .await
292    ///     .unwrap()
293    ///     .wait_for_completion(&client, None, None)
294    ///     .await
295    ///     .unwrap();
296    ///
297    /// assert!(matches!(status, Task::Succeeded { .. }));
298    /// # movies.delete().await.unwrap().wait_for_completion(&client, None, None).await.unwrap();
299    /// # });
300    /// ```
301    pub async fn wait_for_completion<Http: HttpClient>(
302        self,
303        client: &Client<Http>,
304        interval: Option<Duration>,
305        timeout: Option<Duration>,
306    ) -> Result<Self, Error> {
307        client.wait_for_task(self, interval, timeout).await
308    }
309
310    /// Extract the [Index] from a successful `IndexCreation` task.
311    ///
312    /// If the task failed or was not an `IndexCreation` task it returns itself.
313    ///
314    /// # Example
315    ///
316    /// ```
317    /// # use meilisearch_sdk::{client::*, indexes::*};
318    /// #
319    /// # let MEILISEARCH_URL = option_env!("MEILISEARCH_URL").unwrap_or("http://localhost:7700");
320    /// # let MEILISEARCH_API_KEY = option_env!("MEILISEARCH_API_KEY").unwrap_or("masterKey");
321    /// #
322    /// # tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap().block_on(async {
323    /// # // create the client
324    /// # let client = Client::new(MEILISEARCH_URL, Some(MEILISEARCH_API_KEY)).unwrap();
325    /// let task = client.create_index("try_make_index", None).await.unwrap();
326    /// let index = client.wait_for_task(task, None, None).await.unwrap().try_make_index(&client).unwrap();
327    ///
328    /// // and safely access it
329    /// assert_eq!(index.as_ref(), "try_make_index");
330    /// # index.delete().await.unwrap().wait_for_completion(&client, None, None).await.unwrap();
331    /// # });
332    /// ```
333    #[allow(clippy::result_large_err)] // Since `self` has been consumed, this is not an issue
334    pub fn try_make_index<Http: HttpClient>(
335        self,
336        client: &Client<Http>,
337    ) -> Result<Index<Http>, Self> {
338        match self {
339            Self::Succeeded {
340                content:
341                    SucceededTask {
342                        index_uid,
343                        update_type: TaskType::IndexCreation { .. },
344                        ..
345                    },
346            } => Ok(client.index(index_uid.unwrap())),
347            _ => Err(self),
348        }
349    }
350
351    /// Unwrap the [`MeilisearchError`] from a [`Self::Failed`] [Task].
352    ///
353    /// Will panic if the task was not [`Self::Failed`].
354    ///
355    /// # Example
356    ///
357    /// ```
358    /// # use meilisearch_sdk::{client::*, indexes::*, errors::ErrorCode};
359    /// #
360    /// # let MEILISEARCH_URL = option_env!("MEILISEARCH_URL").unwrap_or("http://localhost:7700");
361    /// # let MEILISEARCH_API_KEY = option_env!("MEILISEARCH_API_KEY").unwrap_or("masterKey");
362    /// #
363    /// # tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap().block_on(async {
364    /// # let client = Client::new(MEILISEARCH_URL, Some(MEILISEARCH_API_KEY)).unwrap();
365    /// let task = client.create_index("unwrap_failure", None).await.unwrap();
366    /// let task = client
367    ///     .create_index("unwrap_failure", None)
368    ///     .await
369    ///     .unwrap()
370    ///     .wait_for_completion(&client, None, None)
371    ///     .await
372    ///     .unwrap();
373    ///
374    /// assert!(task.is_failure());
375    ///
376    /// let failure = task.unwrap_failure();
377    ///
378    /// assert_eq!(failure.error_code, ErrorCode::IndexAlreadyExists);
379    /// # client.index("unwrap_failure").delete().await.unwrap().wait_for_completion(&client, None, None).await.unwrap();
380    /// # });
381    /// ```
382    #[must_use]
383    pub fn unwrap_failure(self) -> MeilisearchError {
384        match self {
385            Self::Failed {
386                content: FailedTask { error, .. },
387            } => error,
388            _ => panic!("Called `unwrap_failure` on a non `Failed` task."),
389        }
390    }
391
392    /// Returns `true` if the [Task] is [`Self::Failed`].
393    ///
394    /// # Example
395    ///
396    /// ```
397    /// # use meilisearch_sdk::{client::*, indexes::*, errors::ErrorCode};
398    /// #
399    /// # let MEILISEARCH_URL = option_env!("MEILISEARCH_URL").unwrap_or("http://localhost:7700");
400    /// # let MEILISEARCH_API_KEY = option_env!("MEILISEARCH_API_KEY").unwrap_or("masterKey");
401    /// #
402    /// # tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap().block_on(async {
403    /// # let client = Client::new(MEILISEARCH_URL, Some(MEILISEARCH_API_KEY)).unwrap();
404    /// let task = client.create_index("is_failure", None).await.unwrap();
405    /// // create an index with a conflicting uid
406    /// let task = client
407    ///     .create_index("is_failure", None)
408    ///     .await
409    ///     .unwrap()
410    ///     .wait_for_completion(&client, None, None)
411    ///     .await
412    ///     .unwrap();
413    ///
414    /// assert!(task.is_failure());
415    /// # client.index("is_failure").delete().await.unwrap().wait_for_completion(&client, None, None).await.unwrap();
416    /// # });
417    /// ```
418    #[must_use]
419    pub fn is_failure(&self) -> bool {
420        matches!(self, Self::Failed { .. })
421    }
422
423    /// Returns `true` if the [Task] is [`Self::Succeeded`].
424    ///
425    /// # Example
426    ///
427    /// ```
428    /// # use meilisearch_sdk::{client::*, indexes::*, errors::ErrorCode};
429    /// #
430    /// # let MEILISEARCH_URL = option_env!("MEILISEARCH_URL").unwrap_or("http://localhost:7700");
431    /// # let MEILISEARCH_API_KEY = option_env!("MEILISEARCH_API_KEY").unwrap_or("masterKey");
432    /// #
433    /// # tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap().block_on(async {
434    /// # let client = Client::new(MEILISEARCH_URL, Some(MEILISEARCH_API_KEY)).unwrap();
435    /// let task = client
436    ///     .create_index("is_success", None)
437    ///     .await
438    ///     .unwrap()
439    ///     .wait_for_completion(&client, None, None)
440    ///     .await
441    ///     .unwrap();
442    ///
443    /// assert!(task.is_success());
444    /// # task.try_make_index(&client).unwrap().delete().await.unwrap().wait_for_completion(&client, None, None).await.unwrap();
445    /// # });
446    /// ```
447    #[must_use]
448    pub fn is_success(&self) -> bool {
449        matches!(self, Self::Succeeded { .. })
450    }
451
452    /// Returns `true` if the [Task] is pending ([`Self::Enqueued`] or [`Self::Processing`]).
453    ///
454    /// # Example
455    /// ```no_run
456    /// # // The test is not run because it checks for an enqueued or processed status
457    /// # // and the task might already be processed when checking the status after the get_task call
458    /// # use meilisearch_sdk::{client::*, indexes::*, errors::ErrorCode};
459    /// #
460    /// # let MEILISEARCH_URL = option_env!("MEILISEARCH_URL").unwrap_or("http://localhost:7700");
461    /// # let MEILISEARCH_API_KEY = option_env!("MEILISEARCH_API_KEY").unwrap_or("masterKey");
462    /// #
463    /// # tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap().block_on(async {
464    /// # let client = Client::new(MEILISEARCH_URL, Some(MEILISEARCH_API_KEY)).unwrap();
465    /// let task_info = client
466    ///     .create_index("is_pending", None)
467    ///     .await
468    ///     .unwrap();
469    /// let task = client.get_task(task_info).await.unwrap();
470    ///
471    /// assert!(task.is_pending());
472    /// # task.wait_for_completion(&client, None, None).await.unwrap().try_make_index(&client).unwrap().delete().await.unwrap().wait_for_completion(&client, None, None).await.unwrap();
473    /// # });
474    /// ```
475    #[must_use]
476    pub fn is_pending(&self) -> bool {
477        matches!(self, Self::Enqueued { .. } | Self::Processing { .. })
478    }
479}
480
481impl AsRef<u32> for Task {
482    fn as_ref(&self) -> &u32 {
483        match self {
484            Self::Enqueued { content } => content.as_ref(),
485            Self::Processing { content } => content.as_ref(),
486            Self::Succeeded { content } => content.as_ref(),
487            Self::Failed { content } => content.as_ref(),
488        }
489    }
490}
491
492#[derive(Debug, Serialize, Clone)]
493pub struct TasksPaginationFilters {
494    /// Maximum number of tasks to return.
495    #[serde(skip_serializing_if = "Option::is_none")]
496    limit: Option<u32>,
497    /// The first task uid that should be returned.
498    #[serde(skip_serializing_if = "Option::is_none")]
499    from: Option<u32>,
500}
501
502#[derive(Debug, Serialize, Clone)]
503pub struct TasksCancelFilters {}
504
505#[derive(Debug, Serialize, Clone)]
506pub struct TasksDeleteFilters {}
507
508pub type TasksSearchQuery<'a, Http> = TasksQuery<'a, TasksPaginationFilters, Http>;
509pub type TasksCancelQuery<'a, Http> = TasksQuery<'a, TasksCancelFilters, Http>;
510pub type TasksDeleteQuery<'a, Http> = TasksQuery<'a, TasksDeleteFilters, Http>;
511
512#[derive(Debug, Serialize, Clone)]
513#[serde(rename_all = "camelCase")]
514pub struct TasksQuery<'a, T, Http: HttpClient> {
515    #[serde(skip_serializing)]
516    client: &'a Client<Http>,
517    /// Index uids array to only retrieve the tasks of the indexes.
518    #[serde(skip_serializing_if = "Option::is_none")]
519    index_uids: Option<Vec<&'a str>>,
520    /// Statuses array to only retrieve the tasks with these statuses.
521    #[serde(skip_serializing_if = "Option::is_none")]
522    statuses: Option<Vec<&'a str>>,
523    /// Types array to only retrieve the tasks with these [`TaskType`]s.
524    #[serde(skip_serializing_if = "Option::is_none", rename = "types")]
525    task_types: Option<Vec<&'a str>>,
526    /// Uids of the tasks to retrieve.
527    #[serde(skip_serializing_if = "Option::is_none")]
528    uids: Option<Vec<&'a u32>>,
529    /// Uids of the tasks that canceled other tasks.
530    #[serde(skip_serializing_if = "Option::is_none")]
531    canceled_by: Option<Vec<&'a u32>>,
532    /// Date to retrieve all tasks that were enqueued before it.
533    #[serde(
534        skip_serializing_if = "Option::is_none",
535        serialize_with = "time::serde::rfc3339::option::serialize"
536    )]
537    before_enqueued_at: Option<OffsetDateTime>,
538    /// Date to retrieve all tasks that were enqueued after it.
539    #[serde(
540        skip_serializing_if = "Option::is_none",
541        serialize_with = "time::serde::rfc3339::option::serialize"
542    )]
543    after_enqueued_at: Option<OffsetDateTime>,
544    /// Date to retrieve all tasks that were started before it.
545    #[serde(
546        skip_serializing_if = "Option::is_none",
547        serialize_with = "time::serde::rfc3339::option::serialize"
548    )]
549    before_started_at: Option<OffsetDateTime>,
550    /// Date to retrieve all tasks that were started after it.
551    #[serde(
552        skip_serializing_if = "Option::is_none",
553        serialize_with = "time::serde::rfc3339::option::serialize"
554    )]
555    after_started_at: Option<OffsetDateTime>,
556    /// Date to retrieve all tasks that were finished before it.
557    #[serde(
558        skip_serializing_if = "Option::is_none",
559        serialize_with = "time::serde::rfc3339::option::serialize"
560    )]
561    before_finished_at: Option<OffsetDateTime>,
562    /// Date to retrieve all tasks that were finished after it.
563    #[serde(
564        skip_serializing_if = "Option::is_none",
565        serialize_with = "time::serde::rfc3339::option::serialize"
566    )]
567    after_finished_at: Option<OffsetDateTime>,
568
569    #[serde(flatten)]
570    pagination: T,
571
572    /// Whether to reverse the sort
573    #[serde(skip_serializing_if = "Option::is_none")]
574    reverse: Option<bool>,
575}
576
577#[allow(missing_docs)]
578impl<'a, T, Http: HttpClient> TasksQuery<'a, T, Http> {
579    pub fn with_index_uids<'b>(
580        &'b mut self,
581        index_uids: impl IntoIterator<Item = &'a str>,
582    ) -> &'b mut TasksQuery<'a, T, Http> {
583        self.index_uids = Some(index_uids.into_iter().collect());
584        self
585    }
586    pub fn with_statuses<'b>(
587        &'b mut self,
588        statuses: impl IntoIterator<Item = &'a str>,
589    ) -> &'b mut TasksQuery<'a, T, Http> {
590        self.statuses = Some(statuses.into_iter().collect());
591        self
592    }
593    pub fn with_types<'b>(
594        &'b mut self,
595        task_types: impl IntoIterator<Item = &'a str>,
596    ) -> &'b mut TasksQuery<'a, T, Http> {
597        self.task_types = Some(task_types.into_iter().collect());
598        self
599    }
600    pub fn with_uids<'b>(
601        &'b mut self,
602        uids: impl IntoIterator<Item = &'a u32>,
603    ) -> &'b mut TasksQuery<'a, T, Http> {
604        self.uids = Some(uids.into_iter().collect());
605        self
606    }
607    pub fn with_before_enqueued_at<'b>(
608        &'b mut self,
609        before_enqueued_at: &'a OffsetDateTime,
610    ) -> &'b mut TasksQuery<'a, T, Http> {
611        self.before_enqueued_at = Some(*before_enqueued_at);
612        self
613    }
614    pub fn with_after_enqueued_at<'b>(
615        &'b mut self,
616        after_enqueued_at: &'a OffsetDateTime,
617    ) -> &'b mut TasksQuery<'a, T, Http> {
618        self.after_enqueued_at = Some(*after_enqueued_at);
619        self
620    }
621    pub fn with_before_started_at<'b>(
622        &'b mut self,
623        before_started_at: &'a OffsetDateTime,
624    ) -> &'b mut TasksQuery<'a, T, Http> {
625        self.before_started_at = Some(*before_started_at);
626        self
627    }
628    pub fn with_after_started_at<'b>(
629        &'b mut self,
630        after_started_at: &'a OffsetDateTime,
631    ) -> &'b mut TasksQuery<'a, T, Http> {
632        self.after_started_at = Some(*after_started_at);
633        self
634    }
635    pub fn with_before_finished_at<'b>(
636        &'b mut self,
637        before_finished_at: &'a OffsetDateTime,
638    ) -> &'b mut TasksQuery<'a, T, Http> {
639        self.before_finished_at = Some(*before_finished_at);
640        self
641    }
642    pub fn with_after_finished_at<'b>(
643        &'b mut self,
644        after_finished_at: &'a OffsetDateTime,
645    ) -> &'b mut TasksQuery<'a, T, Http> {
646        self.after_finished_at = Some(*after_finished_at);
647        self
648    }
649    pub fn with_canceled_by<'b>(
650        &'b mut self,
651        task_uids: impl IntoIterator<Item = &'a u32>,
652    ) -> &'b mut TasksQuery<'a, T, Http> {
653        self.canceled_by = Some(task_uids.into_iter().collect());
654        self
655    }
656    pub fn with_reverse<'b>(&'b mut self, reverse: bool) -> &'b mut TasksQuery<'a, T, Http> {
657        self.reverse = Some(reverse);
658        self
659    }
660}
661
662impl<'a, Http: HttpClient> TasksQuery<'a, TasksCancelFilters, Http> {
663    #[must_use]
664    pub fn new(client: &'a Client<Http>) -> TasksQuery<'a, TasksCancelFilters, Http> {
665        TasksQuery {
666            client,
667            index_uids: None,
668            statuses: None,
669            task_types: None,
670            uids: None,
671            canceled_by: None,
672            before_enqueued_at: None,
673            after_enqueued_at: None,
674            before_started_at: None,
675            after_started_at: None,
676            before_finished_at: None,
677            after_finished_at: None,
678            reverse: None,
679            pagination: TasksCancelFilters {},
680        }
681    }
682
683    pub async fn execute(&'a self) -> Result<TaskInfo, Error> {
684        self.client.cancel_tasks_with(self).await
685    }
686}
687
688impl<'a, Http: HttpClient> TasksQuery<'a, TasksDeleteFilters, Http> {
689    #[must_use]
690    pub fn new(client: &'a Client<Http>) -> TasksQuery<'a, TasksDeleteFilters, Http> {
691        TasksQuery {
692            client,
693            index_uids: None,
694            statuses: None,
695            task_types: None,
696            uids: None,
697            canceled_by: None,
698            before_enqueued_at: None,
699            after_enqueued_at: None,
700            before_started_at: None,
701            after_started_at: None,
702            before_finished_at: None,
703            after_finished_at: None,
704            pagination: TasksDeleteFilters {},
705            reverse: None,
706        }
707    }
708
709    pub async fn execute(&'a self) -> Result<TaskInfo, Error> {
710        self.client.delete_tasks_with(self).await
711    }
712}
713
714impl<'a, Http: HttpClient> TasksQuery<'a, TasksPaginationFilters, Http> {
715    #[must_use]
716    pub fn new(client: &'a Client<Http>) -> TasksQuery<'a, TasksPaginationFilters, Http> {
717        TasksQuery {
718            client,
719            index_uids: None,
720            statuses: None,
721            task_types: None,
722            uids: None,
723            canceled_by: None,
724            before_enqueued_at: None,
725            after_enqueued_at: None,
726            before_started_at: None,
727            after_started_at: None,
728            before_finished_at: None,
729            after_finished_at: None,
730            pagination: TasksPaginationFilters {
731                limit: None,
732                from: None,
733            },
734            reverse: None,
735        }
736    }
737    pub fn with_limit<'b>(
738        &'b mut self,
739        limit: u32,
740    ) -> &'b mut TasksQuery<'a, TasksPaginationFilters, Http> {
741        self.pagination.limit = Some(limit);
742        self
743    }
744    pub fn with_from<'b>(
745        &'b mut self,
746        from: u32,
747    ) -> &'b mut TasksQuery<'a, TasksPaginationFilters, Http> {
748        self.pagination.from = Some(from);
749        self
750    }
751    pub async fn execute(&'a self) -> Result<TasksResults, Error> {
752        self.client.get_tasks_with(self).await
753    }
754}
755
756#[cfg(test)]
757mod test {
758
759    #[test]
760    fn test_deserialize_enqueued_task_with_remotes() {
761        let json = r#"{
762          "enqueuedAt": "2022-02-03T13:02:38.369634Z",
763          "indexUid": "movies",
764          "status": "enqueued",
765          "type": "indexUpdate",
766          "uid": 12,
767          "remotes": { "ms-00": { "status": "ok" } }
768        }"#;
769        let task: Task = serde_json::from_str(json).unwrap();
770        match task {
771            Task::Enqueued { content } => {
772                let remotes = content.remotes.expect("remotes should be present");
773                assert!(remotes.contains_key("ms-00"));
774            }
775            _ => panic!("expected enqueued task"),
776        }
777    }
778
779    #[test]
780    fn test_deserialize_processing_task_with_remotes() {
781        let json = r#"{
782          "details": {
783            "indexedDocuments": null,
784            "receivedDocuments": 10
785          },
786          "duration": null,
787          "enqueuedAt": "2022-02-03T15:17:02.801341Z",
788          "finishedAt": null,
789          "indexUid": "movies",
790          "startedAt": "2022-02-03T15:17:02.812338Z",
791          "status": "processing",
792          "type": "documentAdditionOrUpdate",
793          "uid": 14,
794          "remotes": { "ms-00": { "status": "ok" } }
795        }"#;
796        let task: Task = serde_json::from_str(json).unwrap();
797        match task {
798            Task::Processing { content } => {
799                let remotes = content.remotes.expect("remotes should be present");
800                assert!(remotes.contains_key("ms-00"));
801            }
802            _ => panic!("expected processing task"),
803        }
804    }
805
806    use super::*;
807    use crate::{
808        client::*,
809        errors::{ErrorCode, ErrorType},
810    };
811    use big_s::S;
812    use meilisearch_test_macro::meilisearch_test;
813    use serde::{Deserialize, Serialize};
814    use std::time::Duration;
815
816    #[derive(Debug, Serialize, Deserialize, PartialEq)]
817    struct Document {
818        id: usize,
819        value: String,
820        kind: String,
821    }
822
823    #[test]
824    fn test_deserialize_task() {
825        let datetime = OffsetDateTime::parse(
826            "2022-02-03T13:02:38.369634Z",
827            &time::format_description::well_known::Rfc3339,
828        )
829        .unwrap();
830
831        let task: Task = serde_json::from_str(
832            r#"
833{
834  "enqueuedAt": "2022-02-03T13:02:38.369634Z",
835  "indexUid": "meili",
836  "status": "enqueued",
837  "type": "documentAdditionOrUpdate",
838  "uid": 12
839}"#,
840        )
841        .unwrap();
842
843        assert!(matches!(
844            task,
845            Task::Enqueued {
846                content: EnqueuedTask {
847                    enqueued_at,
848                    index_uid: Some(index_uid),
849                    update_type: TaskType::DocumentAdditionOrUpdate { details: None },
850                    uid: 12, .. }
851            }
852        if enqueued_at == datetime && index_uid == "meili"));
853
854        let task: Task = serde_json::from_str(
855            r#"
856{
857  "details": {
858    "indexedDocuments": null,
859    "receivedDocuments": 19547
860  },
861  "duration": null,
862  "enqueuedAt": "2022-02-03T15:17:02.801341Z",
863  "finishedAt": null,
864  "indexUid": "meili",
865  "startedAt": "2022-02-03T15:17:02.812338Z",
866  "status": "processing",
867  "type": "documentAdditionOrUpdate",
868  "uid": 14
869}"#,
870        )
871        .unwrap();
872
873        assert!(matches!(
874            task,
875            Task::Processing {
876                content: ProcessingTask {
877                    started_at,
878                    update_type: TaskType::DocumentAdditionOrUpdate {
879                        details: Some(DocumentAdditionOrUpdate {
880                            received_documents: 19547,
881                            indexed_documents: None,
882                        })
883                    },
884                    uid: 14,
885                    ..
886                }
887            }
888            if started_at == OffsetDateTime::parse(
889                "2022-02-03T15:17:02.812338Z",
890                &time::format_description::well_known::Rfc3339
891            ).unwrap()
892        ));
893
894        let task: Task = serde_json::from_str(
895            r#"
896{
897  "details": {
898    "indexedDocuments": 19546,
899    "receivedDocuments": 19547
900  },
901  "duration": "PT10.848957S",
902  "enqueuedAt": "2022-02-03T15:17:02.801341Z",
903  "finishedAt": "2022-02-03T15:17:13.661295Z",
904  "indexUid": "meili",
905  "startedAt": "2022-02-03T15:17:02.812338Z",
906  "status": "succeeded",
907  "type": "documentAdditionOrUpdate",
908  "uid": 14
909}"#,
910        )
911        .unwrap();
912
913        assert!(matches!(
914            task,
915            Task::Succeeded {
916                content: SucceededTask {
917                    update_type: TaskType::DocumentAdditionOrUpdate {
918                        details: Some(DocumentAdditionOrUpdate {
919                            received_documents: 19547,
920                            indexed_documents: Some(19546),
921                        })
922                    },
923                    uid: 14,
924                    duration,
925                    ..
926                }
927            }
928            if duration == Duration::from_millis(10_848)
929        ));
930    }
931
932    #[meilisearch_test]
933    async fn test_wait_for_task_with_args(client: Client, movies: Index) -> Result<(), Error> {
934        let task = movies
935            .add_documents(
936                &[
937                    Document {
938                        id: 0,
939                        kind: "title".into(),
940                        value: S("The Social Network"),
941                    },
942                    Document {
943                        id: 1,
944                        kind: "title".into(),
945                        value: S("Harry Potter and the Sorcerer's Stone"),
946                    },
947                ],
948                None,
949            )
950            .await?
951            .wait_for_completion(
952                &client,
953                Some(Duration::from_millis(1)),
954                Some(Duration::from_millis(6000)),
955            )
956            .await?;
957
958        assert!(matches!(task, Task::Succeeded { .. }));
959        Ok(())
960    }
961
962    #[meilisearch_test]
963    async fn test_get_tasks_no_params() -> Result<(), Error> {
964        let mut s = mockito::Server::new_async().await;
965        let mock_server_url = s.url();
966        let client = Client::new(mock_server_url, Some("masterKey")).unwrap();
967        let path = "/tasks";
968
969        let mock_res = s.mock("GET", path).with_status(200).create_async().await;
970        let _ = client.get_tasks().await;
971        mock_res.assert_async().await;
972
973        Ok(())
974    }
975
976    #[meilisearch_test]
977    async fn test_get_tasks_with_params() -> Result<(), Error> {
978        let mut s = mockito::Server::new_async().await;
979        let mock_server_url = s.url();
980        let client = Client::new(mock_server_url, Some("masterKey")).unwrap();
981        let path =
982            "/tasks?indexUids=movies,test&statuses=equeued&types=documentDeletion&uids=1&limit=0&from=1&reverse=true";
983
984        let mock_res = s.mock("GET", path).with_status(200).create_async().await;
985
986        let mut query = TasksSearchQuery::new(&client);
987        query
988            .with_index_uids(["movies", "test"])
989            .with_statuses(["equeued"])
990            .with_types(["documentDeletion"])
991            .with_from(1)
992            .with_limit(0)
993            .with_uids([&1])
994            .with_reverse(true);
995
996        let _ = client.get_tasks_with(&query).await;
997
998        mock_res.assert_async().await;
999
1000        Ok(())
1001    }
1002
1003    #[meilisearch_test]
1004    async fn test_get_tasks_with_date_params() -> Result<(), Error> {
1005        let mut s = mockito::Server::new_async().await;
1006        let mock_server_url = s.url();
1007        let client = Client::new(mock_server_url, Some("masterKey")).unwrap();
1008        let path = "/tasks?\
1009            beforeEnqueuedAt=2022-02-03T13%3A02%3A38.369634Z\
1010            &afterEnqueuedAt=2023-02-03T13%3A02%3A38.369634Z\
1011            &beforeStartedAt=2024-02-03T13%3A02%3A38.369634Z\
1012            &afterStartedAt=2025-02-03T13%3A02%3A38.369634Z\
1013            &beforeFinishedAt=2026-02-03T13%3A02%3A38.369634Z\
1014            &afterFinishedAt=2027-02-03T13%3A02%3A38.369634Z";
1015
1016        let mock_res = s.mock("GET", path).with_status(200).create_async().await;
1017
1018        let before_enqueued_at = OffsetDateTime::parse(
1019            "2022-02-03T13:02:38.369634Z",
1020            &time::format_description::well_known::Rfc3339,
1021        )
1022        .unwrap();
1023        let after_enqueued_at = OffsetDateTime::parse(
1024            "2023-02-03T13:02:38.369634Z",
1025            &time::format_description::well_known::Rfc3339,
1026        )
1027        .unwrap();
1028        let before_started_at = OffsetDateTime::parse(
1029            "2024-02-03T13:02:38.369634Z",
1030            &time::format_description::well_known::Rfc3339,
1031        )
1032        .unwrap();
1033
1034        let after_started_at = OffsetDateTime::parse(
1035            "2025-02-03T13:02:38.369634Z",
1036            &time::format_description::well_known::Rfc3339,
1037        )
1038        .unwrap();
1039
1040        let before_finished_at = OffsetDateTime::parse(
1041            "2026-02-03T13:02:38.369634Z",
1042            &time::format_description::well_known::Rfc3339,
1043        )
1044        .unwrap();
1045
1046        let after_finished_at = OffsetDateTime::parse(
1047            "2027-02-03T13:02:38.369634Z",
1048            &time::format_description::well_known::Rfc3339,
1049        )
1050        .unwrap();
1051
1052        let mut query = TasksSearchQuery::new(&client);
1053        query
1054            .with_before_enqueued_at(&before_enqueued_at)
1055            .with_after_enqueued_at(&after_enqueued_at)
1056            .with_before_started_at(&before_started_at)
1057            .with_after_started_at(&after_started_at)
1058            .with_before_finished_at(&before_finished_at)
1059            .with_after_finished_at(&after_finished_at);
1060
1061        let _ = client.get_tasks_with(&query).await;
1062
1063        mock_res.assert_async().await;
1064
1065        Ok(())
1066    }
1067
1068    #[meilisearch_test]
1069    async fn test_get_tasks_on_struct_with_params() -> Result<(), Error> {
1070        let mut s = mockito::Server::new_async().await;
1071        let mock_server_url = s.url();
1072        let client = Client::new(mock_server_url, Some("masterKey")).unwrap();
1073        let path =
1074            "/tasks?indexUids=movies,test&statuses=equeued&types=documentDeletion&canceledBy=9";
1075
1076        let mock_res = s.mock("GET", path).with_status(200).create_async().await;
1077
1078        let mut query = TasksSearchQuery::new(&client);
1079        let _ = query
1080            .with_index_uids(["movies", "test"])
1081            .with_statuses(["equeued"])
1082            .with_types(["documentDeletion"])
1083            .with_canceled_by([&9])
1084            .execute()
1085            .await;
1086
1087        mock_res.assert_async().await;
1088
1089        Ok(())
1090    }
1091
1092    #[meilisearch_test]
1093    async fn test_get_tasks_with_none_existant_index_uids(client: Client) -> Result<(), Error> {
1094        let mut query = TasksSearchQuery::new(&client);
1095        query.with_index_uids(["no_name"]);
1096        let tasks = client.get_tasks_with(&query).await.unwrap();
1097
1098        assert_eq!(tasks.results.len(), 0);
1099        Ok(())
1100    }
1101
1102    #[meilisearch_test]
1103    async fn test_get_tasks_with_execute(client: Client) -> Result<(), Error> {
1104        let tasks = TasksSearchQuery::new(&client)
1105            .with_index_uids(["no_name"])
1106            .execute()
1107            .await
1108            .unwrap();
1109
1110        assert_eq!(tasks.results.len(), 0);
1111        Ok(())
1112    }
1113
1114    #[meilisearch_test]
1115    async fn test_failing_task(client: Client, index: Index) -> Result<(), Error> {
1116        let task_info = client.create_index(index.uid, None).await.unwrap();
1117        let task = client.get_task(task_info).await?;
1118        let task = client.wait_for_task(task, None, None).await?;
1119
1120        let error = task.unwrap_failure();
1121        assert_eq!(error.error_code, ErrorCode::IndexAlreadyExists);
1122        assert_eq!(error.error_type, ErrorType::InvalidRequest);
1123        Ok(())
1124    }
1125
1126    #[meilisearch_test]
1127    async fn test_cancel_tasks_with_params() -> Result<(), Error> {
1128        let mut s = mockito::Server::new_async().await;
1129        let mock_server_url = s.url();
1130        let client = Client::new(mock_server_url, Some("masterKey")).unwrap();
1131        let path =
1132            "/tasks/cancel?indexUids=movies,test&statuses=equeued&types=documentDeletion&uids=1";
1133
1134        let mock_res = s.mock("POST", path).with_status(200).create_async().await;
1135
1136        let mut query = TasksCancelQuery::new(&client);
1137        query
1138            .with_index_uids(["movies", "test"])
1139            .with_statuses(["equeued"])
1140            .with_types(["documentDeletion"])
1141            .with_uids([&1]);
1142
1143        let _ = client.cancel_tasks_with(&query).await;
1144
1145        mock_res.assert_async().await;
1146
1147        Ok(())
1148    }
1149
1150    #[meilisearch_test]
1151    async fn test_cancel_tasks_with_params_execute() -> Result<(), Error> {
1152        let mut s = mockito::Server::new_async().await;
1153        let mock_server_url = s.url();
1154        let client = Client::new(mock_server_url, Some("masterKey")).unwrap();
1155        let path =
1156            "/tasks/cancel?indexUids=movies,test&statuses=equeued&types=documentDeletion&uids=1";
1157
1158        let mock_res = s.mock("POST", path).with_status(200).create_async().await;
1159
1160        let mut query = TasksCancelQuery::new(&client);
1161        let _ = query
1162            .with_index_uids(["movies", "test"])
1163            .with_statuses(["equeued"])
1164            .with_types(["documentDeletion"])
1165            .with_uids([&1])
1166            .execute()
1167            .await;
1168
1169        mock_res.assert_async().await;
1170
1171        Ok(())
1172    }
1173
1174    #[meilisearch_test]
1175    async fn test_delete_tasks_with_params() -> Result<(), Error> {
1176        let mut s = mockito::Server::new_async().await;
1177        let mock_server_url = s.url();
1178        let client = Client::new(mock_server_url, Some("masterKey")).unwrap();
1179        let path = "/tasks?indexUids=movies,test&statuses=equeued&types=documentDeletion&uids=1";
1180
1181        let mock_res = s.mock("DELETE", path).with_status(200).create_async().await;
1182
1183        let mut query = TasksDeleteQuery::new(&client);
1184        query
1185            .with_index_uids(["movies", "test"])
1186            .with_statuses(["equeued"])
1187            .with_types(["documentDeletion"])
1188            .with_uids([&1]);
1189
1190        let _ = client.delete_tasks_with(&query).await;
1191
1192        mock_res.assert_async().await;
1193
1194        Ok(())
1195    }
1196
1197    #[meilisearch_test]
1198    async fn test_delete_tasks_with_params_execute() -> Result<(), Error> {
1199        let mut s = mockito::Server::new_async().await;
1200        let mock_server_url = s.url();
1201        let client = Client::new(mock_server_url, Some("masterKey")).unwrap();
1202        let path = "/tasks?indexUids=movies,test&statuses=equeued&types=documentDeletion&uids=1";
1203
1204        let mock_res = s.mock("DELETE", path).with_status(200).create_async().await;
1205
1206        let mut query = TasksDeleteQuery::new(&client);
1207        let _ = query
1208            .with_index_uids(["movies", "test"])
1209            .with_statuses(["equeued"])
1210            .with_types(["documentDeletion"])
1211            .with_uids([&1])
1212            .execute()
1213            .await;
1214
1215        mock_res.assert_async().await;
1216
1217        Ok(())
1218    }
1219}