apalis_core/task/
builder.rs1use crate::task::{
28 Parts, Task, attempt::Attempt, extensions::Extensions, metadata::MetadataExt, status::Status,
29 task_id::TaskId,
30};
31use std::time::{Duration, SystemTime, UNIX_EPOCH};
32
33#[derive(Debug)]
35pub struct TaskBuilder<Args, Ctx, IdType> {
36 pub(super) args: Args,
37 pub(super) ctx: Ctx,
38 pub(super) data: Extensions,
39 pub(super) task_id: Option<TaskId<IdType>>,
40 pub(super) attempt: Option<Attempt>,
41 pub(super) status: Option<Status>,
42 pub(super) run_at: Option<u64>,
43}
44
45impl<Args, Ctx, IdType> TaskBuilder<Args, Ctx, IdType> {
46 #[must_use]
48 pub fn new(args: Args) -> Self
49 where
50 Ctx: Default,
51 {
52 Self {
53 args,
54 ctx: Default::default(),
55 data: Extensions::default(),
56 task_id: None,
57 attempt: None,
58 status: None,
59 run_at: None,
60 }
61 }
62
63 #[must_use]
65 pub fn with_ctx(mut self, ctx: Ctx) -> Self {
66 self.ctx = ctx;
67 self
68 }
69
70 #[must_use]
72 pub fn with_data(mut self, data: Extensions) -> Self {
73 self.data = data;
74 self
75 }
76
77 #[must_use]
79 pub fn data<D: Clone + Send + Sync + 'static>(mut self, value: D) -> Self {
80 self.data.insert(value);
81 self
82 }
83
84 #[must_use]
86 pub fn meta<M>(mut self, value: M) -> Self
87 where
88 Ctx: MetadataExt<M>,
89 {
90 self.ctx
91 .inject(value)
92 .unwrap_or_else(|_| panic!("Failed to inject item into context"));
93 self
94 }
95
96 #[must_use]
98 pub fn with_task_id(mut self, task_id: TaskId<IdType>) -> Self {
99 self.task_id = Some(task_id);
100 self
101 }
102
103 #[must_use]
105 pub fn with_attempt(mut self, attempt: Attempt) -> Self {
106 self.attempt = Some(attempt);
107 self
108 }
109
110 #[must_use]
112 pub fn with_status(mut self, status: Status) -> Self {
113 self.status = Some(status);
114 self
115 }
116
117 #[must_use]
119 pub fn run_at_timestamp(mut self, timestamp: u64) -> Self {
120 self.run_at = Some(timestamp);
121 self
122 }
123
124 #[must_use]
126 pub fn run_at_time(mut self, time: SystemTime) -> Self {
127 let timestamp = time
128 .duration_since(UNIX_EPOCH)
129 .expect("Time went backwards")
130 .as_secs();
131 self.run_at = Some(timestamp);
132 self
133 }
134
135 #[must_use]
137 pub fn run_after(mut self, delay: Duration) -> Self {
138 let now = SystemTime::now();
139 let run_time = now + delay;
140 let timestamp = run_time
141 .duration_since(UNIX_EPOCH)
142 .expect("Time went backwards")
143 .as_secs();
144 self.run_at = Some(timestamp);
145 self
146 }
147
148 #[must_use]
150 pub fn run_in_seconds(self, seconds: u64) -> Self {
151 self.run_after(Duration::from_secs(seconds))
152 }
153
154 #[must_use]
156 pub fn run_in_minutes(self, minutes: u64) -> Self {
157 self.run_after(Duration::from_secs(minutes * 60))
158 }
159
160 #[must_use]
162 pub fn run_in_hours(self, hours: u64) -> Self {
163 self.run_after(Duration::from_secs(hours * 3600))
164 }
165
166 #[must_use]
168 pub fn build(self) -> Task<Args, Ctx, IdType> {
169 let current_time = || {
170 SystemTime::now()
171 .duration_since(UNIX_EPOCH)
172 .expect("Time went backwards")
173 .as_secs()
174 };
175
176 Task {
177 args: self.args,
178 parts: Parts {
179 task_id: self.task_id,
180 data: self.data,
181 attempt: self.attempt.unwrap_or_default(),
182 ctx: self.ctx,
183 status: self.status.unwrap_or(Status::Pending).into(),
184 run_at: self.run_at.unwrap_or_else(current_time),
185 },
186 }
187 }
188}
189
190impl<Args, Ctx: Default, IdType> Task<Args, Ctx, IdType> {
192 pub fn builder(args: Args) -> TaskBuilder<Args, Ctx, IdType> {
194 TaskBuilder::new(args)
195 }
196}