apalis_core/backend/impls/json/
mod.rs1use serde_json::Value;
54use std::{
55 collections::BTreeMap,
56 fs::{File, OpenOptions},
57 io::{BufRead, Write},
58 path::PathBuf,
59 sync::{Arc, RwLock},
60};
61
62use self::{
63 meta::JsonMapMetadata,
64 util::{TaskKey, TaskWithMeta},
65};
66use crate::{
67 features_table,
68 task::{
69 Task,
70 status::Status,
71 task_id::{RandomId, TaskId},
72 },
73};
74use std::io::{BufReader, BufWriter};
75
76mod backend;
77mod meta;
78mod shared;
79mod sink;
80mod util;
81
82pub use self::shared::SharedJsonStore;
83#[doc = features_table! {
98 setup = r#"
99 # {
100 # use apalis_core::backend::json::JsonStorage;
101 # let mut backend = JsonStorage::new_temp().unwrap();
102 # backend
103 # };
104 "#,
105 Backend => supported("Basic Backend functionality", true),
106 TaskSink => supported("Ability to push new tasks", true),
107 Serialization => limited("Serialization support for arguments. Only accepts `json`", false),
108 WebUI => not_implemented("Expose a web interface for monitoring tasks"),
109 FetchById => not_implemented("Allow fetching a task by its ID"),
110 RegisterWorker => not_supported("Allow registering a worker with the backend"),
111 "[`PipeExt`]" => supported("Allow other backends to pipe to this backend", false),
112 MakeShared => supported("Share the same JSON storage across multiple workers via [`SharedJsonStore`]", false),
113 Workflow => supported("Flexible enough to support workflows", true),
114 WaitForCompletion => supported("Wait for tasks to complete without blocking", true),
115 ResumeById => not_implemented("Resume a task by its ID"),
116 ResumeAbandoned => not_implemented("Resume abandoned tasks"),
117 ListWorkers => not_supported("List all workers registered with the backend"),
118 ListTasks => not_implemented("List all tasks in the backend"),
119}]
120#[derive(Debug)]
123pub struct JsonStorage<Args> {
124 tasks: Arc<RwLock<BTreeMap<TaskKey, TaskWithMeta>>>,
125 buffer: Vec<Task<Value, JsonMapMetadata>>,
126 path: PathBuf,
127 _marker: std::marker::PhantomData<Args>,
128}
129
130#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
131#[derive(Debug, Clone)]
132struct StorageEntry {
133 task_id: TaskId,
134 status: Status,
135 task: TaskWithMeta,
136}
137
138impl<Args> JsonStorage<Args> {
139 pub fn new(path: impl Into<PathBuf>) -> std::io::Result<Self> {
141 let path = path.into();
142 let mut data = BTreeMap::new();
143
144 if path.exists() {
145 let file = File::open(&path)?;
146 let reader = BufReader::new(file);
147
148 for line in reader.lines() {
149 let line = line?;
150 if line.trim().is_empty() {
151 continue;
152 }
153
154 if let Ok(entry) = serde_json::from_str::<StorageEntry>(&line) {
155 let key = TaskKey {
156 status: entry.status,
157 task_id: entry.task_id,
158 queue: std::any::type_name::<Args>().to_owned(),
159 };
160 data.insert(key, entry.task);
161 }
162 }
163 }
164
165 Ok(JsonStorage {
166 path,
167 tasks: Arc::new(RwLock::new(data)),
168 buffer: Vec::new(),
169 _marker: std::marker::PhantomData,
170 })
171 }
172
173 pub fn new_temp() -> Result<JsonStorage<Args>, std::io::Error> {
175 let p = std::env::temp_dir().join(format!("apalis-json-store-{}", RandomId::default()));
176 Self::new(p)
177 }
178
179 fn insert(&mut self, k: &TaskKey, v: TaskWithMeta) -> Result<(), std::io::Error> {
180 self.tasks.try_write().unwrap().insert(k.clone(), v);
181 Ok(())
182 }
183
184 pub fn remove(&mut self, key: &TaskKey) -> std::io::Result<Option<TaskWithMeta>> {
186 let removed = self.tasks.try_write().unwrap().remove(key);
187
188 if removed.is_some() {
189 self.persist_to_disk()?;
190 }
191
192 Ok(removed)
193 }
194
195 fn persist_to_disk(&self) -> std::io::Result<()> {
197 let tmp_path = self.path.with_extension("tmp");
198
199 {
200 let tmp_file = OpenOptions::new()
201 .write(true)
202 .create(true)
203 .truncate(true)
204 .open(&tmp_path)?;
205 let mut writer = BufWriter::new(tmp_file);
206
207 for (key, value) in self.tasks.try_read().unwrap().iter() {
208 let entry = StorageEntry {
209 status: key.status.clone(),
210 task_id: key.task_id.clone(),
211 task: value.clone(),
212 };
213 let line = serde_json::to_string(&entry)?;
214 writeln!(writer, "{}", line)?;
215 }
216
217 writer.flush()?;
218 } std::fs::rename(tmp_path, &self.path)?;
222 Ok(())
223 }
224 pub fn reload(&mut self) -> std::io::Result<()> {
226 let mut new_data = BTreeMap::new();
227
228 if self.path.exists() {
229 let file = File::open(&self.path)?;
230 let reader = BufReader::new(file);
231
232 for line in reader.lines() {
233 let line = line?;
234 if line.trim().is_empty() {
235 continue;
236 }
237
238 if let Ok(entry) = serde_json::from_str::<StorageEntry>(&line) {
239 let key = TaskKey {
240 status: entry.status,
241 task_id: entry.task_id,
242 queue: std::any::type_name::<Args>().to_owned(),
243 };
244 new_data.insert(key, entry.task);
245 }
246 }
247 }
248
249 *self.tasks.try_write().unwrap() = new_data;
250 Ok(())
251 }
252 pub fn clear(&mut self) -> std::io::Result<()> {
254 self.tasks.try_write().unwrap().clear();
255
256 let file = OpenOptions::new()
258 .write(true)
259 .create(true)
260 .truncate(true)
261 .open(&self.path)?;
262 drop(file);
263
264 Ok(())
265 }
266
267 pub fn update_status(
269 &mut self,
270 old_key: &TaskKey,
271 new_status: Status,
272 ) -> std::io::Result<bool> {
273 let mut tasks = self.tasks.try_write().unwrap();
274 if let Some(value) = tasks.remove(old_key) {
275 let new_key = TaskKey {
276 status: new_status,
277 task_id: old_key.task_id.clone(),
278 queue: old_key.queue.clone(),
279 };
280 tasks.insert(new_key, value);
281 Ok(true)
282 } else {
283 Ok(false)
284 }
285 }
286
287 pub fn get(&self, key: &TaskKey) -> Option<TaskWithMeta> {
289 let tasks = self.tasks.try_read().unwrap();
290 let res = tasks.get(key);
291 res.cloned()
292 }
293
294 fn update_result(&self, key: &TaskKey, status: Status, val: Value) -> std::io::Result<bool> {
295 let mut tasks = self.tasks.try_write().unwrap();
296 if let Some(mut task) = tasks.remove(key) {
297 let new_key = TaskKey {
298 status,
299 task_id: key.task_id.clone(),
300 queue: key.queue.clone(),
301 };
302 task.result = Some(val);
303
304 tasks.insert(new_key, task);
305 Ok(true)
306 } else {
307 Ok(false)
308 }
309 }
310}
311
312impl<Args> Clone for JsonStorage<Args> {
313 fn clone(&self) -> Self {
314 Self {
315 tasks: self.tasks.clone(),
316 buffer: Vec::new(),
317 path: self.path.clone(),
318 _marker: std::marker::PhantomData,
319 }
320 }
321}
322
323#[cfg(test)]
324mod tests {
325 use std::time::Duration;
326
327 use crate::{
328 backend::{TaskSink, json::JsonStorage},
329 error::BoxDynError,
330 worker::{
331 builder::WorkerBuilder, context::WorkerContext, ext::event_listener::EventListenerExt,
332 },
333 };
334
335 const ITEMS: u32 = 100;
336
337 #[tokio::test]
338 async fn basic_worker() {
339 let mut json_store = JsonStorage::new_temp().unwrap();
340 for i in 0..ITEMS {
341 json_store.push(i).await.unwrap();
342 }
343
344 async fn task(task: u32, ctx: WorkerContext) -> Result<(), BoxDynError> {
345 tokio::time::sleep(Duration::from_secs(1)).await;
346 if task == ITEMS - 1 {
347 ctx.stop().unwrap();
348 return Err("Worker stopped!")?;
349 }
350 Ok(())
351 }
352
353 let worker = WorkerBuilder::new("rango-tango")
354 .backend(json_store)
355 .on_event(|ctx, ev| {
356 println!("On Event = {:?} from = {}", ev, ctx.name());
357 })
358 .build(task);
359 worker.run().await.unwrap();
360 }
361}