apalis_core/task/
builder.rs1use crate::task::{
28 Parts, Task, attempt::Attempt, extensions::Extensions, metadata::MetadataExt, status::Status,
29 task_id::TaskId,
30};
31use std::time::{Duration, SystemTime, UNIX_EPOCH};
32
33#[derive(Debug)]
35pub struct TaskBuilder<Args, Ctx, IdType> {
36 pub(super) args: Args,
37 #[doc(hidden)]
39 pub ctx: Ctx,
40 pub(super) data: Extensions,
41 pub(super) task_id: Option<TaskId<IdType>>,
42 pub(super) attempt: Option<Attempt>,
43 pub(super) status: Option<Status>,
44 pub(super) run_at: Option<u64>,
45 pub(super) idempotency_key: Option<String>,
46}
47
48impl<Args, Ctx, IdType> TaskBuilder<Args, Ctx, IdType> {
49 #[must_use]
51 pub fn new(args: Args) -> Self
52 where
53 Ctx: Default,
54 {
55 Self {
56 args,
57 ctx: Default::default(),
58 data: Extensions::default(),
59 task_id: None,
60 attempt: None,
61 status: None,
62 run_at: None,
63 idempotency_key: Default::default(),
64 }
65 }
66
67 #[must_use]
69 pub fn with_ctx(mut self, ctx: Ctx) -> Self {
70 self.ctx = ctx;
71 self
72 }
73
74 #[must_use]
76 pub fn with_data(mut self, data: Extensions) -> Self {
77 self.data = data;
78 self
79 }
80
81 #[must_use]
83 pub fn data<D: Clone + Send + Sync + 'static>(mut self, value: D) -> Self {
84 self.data.insert(value);
85 self
86 }
87
88 #[must_use]
90 pub fn meta<M>(mut self, value: M) -> Self
91 where
92 Ctx: MetadataExt<M>,
93 {
94 self.ctx
95 .inject(value)
96 .unwrap_or_else(|_| panic!("Failed to inject item into context"));
97 self
98 }
99
100 #[must_use]
102 pub fn with_task_id(mut self, task_id: TaskId<IdType>) -> Self {
103 self.task_id = Some(task_id);
104 self
105 }
106
107 #[must_use]
109 pub fn with_attempt(mut self, attempt: Attempt) -> Self {
110 self.attempt = Some(attempt);
111 self
112 }
113
114 #[must_use]
116 pub fn with_status(mut self, status: Status) -> Self {
117 self.status = Some(status);
118 self
119 }
120
121 #[must_use]
123 pub fn run_at_timestamp(mut self, timestamp: u64) -> Self {
124 self.run_at = Some(timestamp);
125 self
126 }
127
128 #[must_use]
130 pub fn run_at_time(mut self, time: SystemTime) -> Self {
131 let timestamp = time
132 .duration_since(UNIX_EPOCH)
133 .expect("Time went backwards")
134 .as_secs();
135 self.run_at = Some(timestamp);
136 self
137 }
138
139 #[must_use]
141 pub fn run_after(mut self, delay: Duration) -> Self {
142 let now = SystemTime::now();
143 let run_time = now + delay;
144 let timestamp = run_time
145 .duration_since(UNIX_EPOCH)
146 .expect("Time went backwards")
147 .as_secs();
148 self.run_at = Some(timestamp);
149 self
150 }
151
152 #[must_use]
154 pub fn run_in_seconds(self, seconds: u64) -> Self {
155 self.run_after(Duration::from_secs(seconds))
156 }
157
158 #[must_use]
160 pub fn run_in_minutes(self, minutes: u64) -> Self {
161 self.run_after(Duration::from_secs(minutes * 60))
162 }
163
164 #[must_use]
166 pub fn run_in_hours(self, hours: u64) -> Self {
167 self.run_after(Duration::from_secs(hours * 3600))
168 }
169
170 #[must_use]
172 pub fn with_idempotency_key<S: AsRef<str>>(mut self, idempotency_key: S) -> Self {
173 self.idempotency_key = Some(idempotency_key.as_ref().to_owned());
174 self
175 }
176
177 #[must_use]
179 pub fn build(self) -> Task<Args, Ctx, IdType> {
180 let current_time = || {
181 SystemTime::now()
182 .duration_since(UNIX_EPOCH)
183 .expect("Time went backwards")
184 .as_secs()
185 };
186
187 Task {
188 args: self.args,
189 parts: Parts {
190 task_id: self.task_id,
191 data: self.data,
192 attempt: self.attempt.unwrap_or_default(),
193 ctx: self.ctx,
194 status: self.status.unwrap_or(Status::Pending).into(),
195 run_at: self.run_at.unwrap_or_else(current_time),
196 idempotency_key: self.idempotency_key,
197 },
198 }
199 }
200}
201
202impl<Args, Ctx: Default, IdType> Task<Args, Ctx, IdType> {
204 pub fn builder(args: Args) -> TaskBuilder<Args, Ctx, IdType> {
206 TaskBuilder::new(args)
207 }
208}