apalis_core/task/
builder.rs1use crate::{
28 backend::queue::Queue,
29 task::{
30 Parts, Task, attempt::Attempt, extensions::Extensions, metadata::MetadataExt,
31 status::Status, task_id::TaskId,
32 },
33};
34use std::time::{Duration, SystemTime, UNIX_EPOCH};
35
36#[derive(Debug)]
38pub struct TaskBuilder<Args, Ctx, IdType> {
39 pub(super) args: Args,
40 pub(super) ctx: Ctx,
41 pub(super) data: Extensions,
42 pub(super) task_id: Option<TaskId<IdType>>,
43 pub(super) attempt: Option<Attempt>,
44 pub(super) status: Option<Status>,
45 pub(super) run_at: Option<u64>,
46 pub(super) queue: Option<Queue>,
47}
48
49impl<Args, Ctx, IdType> TaskBuilder<Args, Ctx, IdType> {
50 pub fn new(args: Args) -> Self
52 where
53 Ctx: Default,
54 {
55 Self {
56 args,
57 ctx: Default::default(),
58 data: Extensions::default(),
59 task_id: None,
60 attempt: None,
61 status: None,
62 run_at: None,
63 queue: None,
64 }
65 }
66
67 pub fn with_ctx(mut self, ctx: Ctx) -> Self {
69 self.ctx = ctx;
70 self
71 }
72
73 pub fn with_data(mut self, data: Extensions) -> Self {
75 self.data = data;
76 self
77 }
78
79 pub fn data<D: Clone + Send + Sync + 'static>(mut self, value: D) -> Self {
81 self.data.insert(value);
82 self
83 }
84
85 pub fn meta<M>(mut self, value: M) -> Self
87 where
88 Ctx: MetadataExt<M>,
89 {
90 self.ctx
91 .inject(value)
92 .unwrap_or_else(|_| panic!("Failed to inject item into context"));
93 self
94 }
95
96 pub fn with_task_id(mut self, task_id: TaskId<IdType>) -> Self {
98 self.task_id = Some(task_id);
99 self
100 }
101
102 pub fn with_attempt(mut self, attempt: Attempt) -> Self {
104 self.attempt = Some(attempt);
105 self
106 }
107
108 pub fn with_status(mut self, status: Status) -> Self {
110 self.status = Some(status);
111 self
112 }
113
114 pub fn with_queue<S: AsRef<str>>(mut self, queue: S) -> Self {
116 self.queue = Some(Queue::from(queue.as_ref()));
117 self
118 }
119
120 pub fn run_at_timestamp(mut self, timestamp: u64) -> Self {
122 self.run_at = Some(timestamp);
123 self
124 }
125
126 pub fn run_at_time(mut self, time: SystemTime) -> Self {
128 let timestamp = time
129 .duration_since(UNIX_EPOCH)
130 .expect("Time went backwards")
131 .as_secs();
132 self.run_at = Some(timestamp);
133 self
134 }
135
136 pub fn run_after(mut self, delay: Duration) -> Self {
138 let now = SystemTime::now();
139 let run_time = now + delay;
140 let timestamp = run_time
141 .duration_since(UNIX_EPOCH)
142 .expect("Time went backwards")
143 .as_secs();
144 self.run_at = Some(timestamp);
145 self
146 }
147
148 pub fn run_in_seconds(self, seconds: u64) -> Self {
150 self.run_after(Duration::from_secs(seconds))
151 }
152
153 pub fn run_in_minutes(self, minutes: u64) -> Self {
155 self.run_after(Duration::from_secs(minutes * 60))
156 }
157
158 pub fn run_in_hours(self, hours: u64) -> Self {
160 self.run_after(Duration::from_secs(hours * 3600))
161 }
162
163 pub fn build(self) -> Task<Args, Ctx, IdType> {
165 let current_time = || {
166 SystemTime::now()
167 .duration_since(UNIX_EPOCH)
168 .expect("Time went backwards")
169 .as_secs()
170 };
171
172 Task {
173 args: self.args,
174 parts: Parts {
175 task_id: self.task_id,
176 data: self.data,
177 attempt: self.attempt.unwrap_or_default(),
178 ctx: self.ctx,
179 status: self.status.unwrap_or(Status::Pending).into(),
180 run_at: self.run_at.unwrap_or_else(current_time),
181 queue: self.queue,
182 },
183 }
184 }
185}
186
187impl<Args, Ctx: Default, IdType> Task<Args, Ctx, IdType> {
189 pub fn builder(args: Args) -> TaskBuilder<Args, Ctx, IdType> {
191 TaskBuilder::new(args)
192 }
193}