apalis_core/backend/impls/json/
mod.rs1use serde_json::Value;
54use std::{
55 collections::BTreeMap,
56 fs::{File, OpenOptions},
57 io::{BufRead, Write},
58 path::PathBuf,
59 sync::{Arc, RwLock},
60};
61
62use self::{
63 meta::JsonMapMetadata,
64 util::{TaskKey, TaskWithMeta},
65};
66use crate::{
67 features_table,
68 task::{
69 Task,
70 status::Status,
71 task_id::{RandomId, TaskId},
72 },
73};
74use std::io::{BufReader, BufWriter};
75
76mod backend;
77mod meta;
78mod shared;
79mod sink;
80mod util;
81
82pub use self::shared::SharedJsonStore;
83
84#[doc = features_table! {
88 setup = {
89 use apalis_core::backend::json::JsonStorage;
90 JsonStorage::new_temp().unwrap()
91 };,
92 TaskSink => supported("Ability to push new tasks"),
93 Serialization => limited("Serialization support for arguments. Only accepts `json`", false),
94 FetchById => not_implemented("Allow fetching a task by its ID"),
95 RegisterWorker => not_supported("Allow registering a worker with the backend"),
96 PipeExt => supported("Allow other backends to pipe to this backend", false),
97 MakeShared => supported("Share the same JSON storage across multiple workers", false),
98 Workflow => supported("Flexible enough to support workflows", false),
99 WaitForCompletion => supported("Wait for tasks to complete without blocking", false),
100 ResumeById => not_implemented("Resume a task by its ID"),
101 ResumeAbandoned => not_implemented("Resume abandoned tasks"),
102 ListWorkers => not_supported("List all workers registered with the backend"),
103 ListTasks => not_implemented("List all tasks in the backend"),
104}]
105#[derive(Debug)]
106pub struct JsonStorage<Args> {
107 tasks: Arc<RwLock<BTreeMap<TaskKey, TaskWithMeta>>>,
108 buffer: Vec<Task<Value, JsonMapMetadata>>,
109 path: PathBuf,
110 _marker: std::marker::PhantomData<Args>,
111}
112
113#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
114#[derive(Debug, Clone)]
115struct StorageEntry {
116 task_id: TaskId,
117 status: Status,
118 task: TaskWithMeta,
119}
120
121impl<Args> JsonStorage<Args> {
122 pub fn new(path: impl Into<PathBuf>) -> std::io::Result<Self> {
124 let path = path.into();
125 let mut data = BTreeMap::new();
126
127 if path.exists() {
128 let file = File::open(&path)?;
129 let reader = BufReader::new(file);
130
131 for line in reader.lines() {
132 let line = line?;
133 if line.trim().is_empty() {
134 continue;
135 }
136
137 if let Ok(entry) = serde_json::from_str::<StorageEntry>(&line) {
138 let key = TaskKey {
139 status: entry.status,
140 task_id: entry.task_id,
141 queue: std::any::type_name::<Args>().to_owned(),
142 };
143 data.insert(key, entry.task);
144 }
145 }
146 }
147
148 Ok(JsonStorage {
149 path,
150 tasks: Arc::new(RwLock::new(data)),
151 buffer: Vec::new(),
152 _marker: std::marker::PhantomData,
153 })
154 }
155
156 pub fn new_temp() -> Result<JsonStorage<Args>, std::io::Error> {
158 let p = std::env::temp_dir().join(format!("apalis-json-store-{}", RandomId::default()));
159 Self::new(p)
160 }
161
162 fn insert(&mut self, k: &TaskKey, v: TaskWithMeta) -> Result<(), std::io::Error> {
163 self.tasks.try_write().unwrap().insert(k.clone(), v);
164 Ok(())
165 }
166
167 pub fn remove(&mut self, key: &TaskKey) -> std::io::Result<Option<TaskWithMeta>> {
169 let removed = self.tasks.try_write().unwrap().remove(key);
170
171 if removed.is_some() {
172 self.persist_to_disk()?;
173 }
174
175 Ok(removed)
176 }
177
178 fn persist_to_disk(&self) -> std::io::Result<()> {
180 let tmp_path = self.path.with_extension("tmp");
181
182 {
183 let tmp_file = OpenOptions::new()
184 .write(true)
185 .create(true)
186 .truncate(true)
187 .open(&tmp_path)?;
188 let mut writer = BufWriter::new(tmp_file);
189
190 for (key, value) in self.tasks.try_read().unwrap().iter() {
191 let entry = StorageEntry {
192 status: key.status.clone(),
193 task_id: key.task_id.clone(),
194 task: value.clone(),
195 };
196 let line = serde_json::to_string(&entry)?;
197 writeln!(writer, "{}", line)?;
198 }
199
200 writer.flush()?;
201 } std::fs::rename(tmp_path, &self.path)?;
205 Ok(())
206 }
207 pub fn reload(&mut self) -> std::io::Result<()> {
209 let mut new_data = BTreeMap::new();
210
211 if self.path.exists() {
212 let file = File::open(&self.path)?;
213 let reader = BufReader::new(file);
214
215 for line in reader.lines() {
216 let line = line?;
217 if line.trim().is_empty() {
218 continue;
219 }
220
221 if let Ok(entry) = serde_json::from_str::<StorageEntry>(&line) {
222 let key = TaskKey {
223 status: entry.status,
224 task_id: entry.task_id,
225 queue: std::any::type_name::<Args>().to_owned(),
226 };
227 new_data.insert(key, entry.task);
228 }
229 }
230 }
231
232 *self.tasks.try_write().unwrap() = new_data;
233 Ok(())
234 }
235 pub fn clear(&mut self) -> std::io::Result<()> {
237 self.tasks.try_write().unwrap().clear();
238
239 let file = OpenOptions::new()
241 .write(true)
242 .create(true)
243 .truncate(true)
244 .open(&self.path)?;
245 drop(file);
246
247 Ok(())
248 }
249
250 pub fn update_status(
252 &mut self,
253 old_key: &TaskKey,
254 new_status: Status,
255 ) -> std::io::Result<bool> {
256 let mut tasks = self.tasks.try_write().unwrap();
257 if let Some(value) = tasks.remove(old_key) {
258 let new_key = TaskKey {
259 status: new_status,
260 task_id: old_key.task_id.clone(),
261 queue: old_key.queue.clone(),
262 };
263 tasks.insert(new_key, value);
264 Ok(true)
265 } else {
266 Ok(false)
267 }
268 }
269
270 pub fn get(&self, key: &TaskKey) -> Option<TaskWithMeta> {
272 let tasks = self.tasks.try_read().unwrap();
273 let res = tasks.get(key);
274 res.cloned()
275 }
276
277 fn update_result(&self, key: &TaskKey, status: Status, val: Value) -> std::io::Result<bool> {
278 let mut tasks = self.tasks.try_write().unwrap();
279 if let Some(mut task) = tasks.remove(key) {
280 let new_key = TaskKey {
281 status,
282 task_id: key.task_id.clone(),
283 queue: key.queue.clone(),
284 };
285 task.result = Some(val);
286
287 tasks.insert(new_key, task);
288 Ok(true)
289 } else {
290 Ok(false)
291 }
292 }
293}
294
295impl<Args> Clone for JsonStorage<Args> {
296 fn clone(&self) -> Self {
297 Self {
298 tasks: self.tasks.clone(),
299 buffer: Vec::new(),
300 path: self.path.clone(),
301 _marker: std::marker::PhantomData,
302 }
303 }
304}
305
306#[cfg(test)]
307mod tests {
308 use std::time::Duration;
309
310 use crate::{
311 backend::{TaskSink, json::JsonStorage},
312 error::BoxDynError,
313 worker::{
314 builder::WorkerBuilder, context::WorkerContext, ext::event_listener::EventListenerExt,
315 },
316 };
317
318 const ITEMS: u32 = 100;
319
320 #[tokio::test]
321 async fn basic_worker() {
322 let mut json_store = JsonStorage::new_temp().unwrap();
323 for i in 0..ITEMS {
324 json_store.push(i).await.unwrap();
325 }
326
327 async fn task(task: u32, ctx: WorkerContext) -> Result<(), BoxDynError> {
328 tokio::time::sleep(Duration::from_secs(1)).await;
329 if task == ITEMS - 1 {
330 ctx.stop().unwrap();
331 return Err("Worker stopped!")?;
332 }
333 Ok(())
334 }
335
336 let worker = WorkerBuilder::new("rango-tango")
337 .backend(json_store)
338 .on_event(|ctx, ev| {
339 println!("On Event = {:?} from = {}", ev, ctx.name());
340 })
341 .build(task);
342 worker.run().await.unwrap();
343 }
344}