apalis_core/backend/impls/json/
mod.rs1use serde_json::Value;
54use std::{
55 collections::BTreeMap,
56 fs::{File, OpenOptions},
57 io::{BufRead, Write},
58 path::PathBuf,
59 sync::{Arc, RwLock},
60};
61
62use self::{
63 meta::JsonMapMetadata,
64 util::{TaskKey, TaskWithMeta},
65};
66use crate::{
67 features_table,
68 task::{
69 status::Status,
70 task_id::{RandomId, TaskId},
71 Task,
72 },
73};
74use std::io::{BufReader, BufWriter};
75
76mod backend;
77mod meta;
78mod shared;
79mod sink;
80mod util;
81
82pub use self::shared::SharedJsonStore;
83
84#[doc = features_table! {
87 setup = {
88 use apalis_core::backend::json::JsonStorage;
89 JsonStorage::new_temp().unwrap()
90 };,
91 TaskSink => supported("Ability to push new tasks"),
92 Serialization => limited("Serialization support for arguments. Only accepts `json`", false),
93 FetchById => not_implemented("Allow fetching a task by its ID"),
94 RegisterWorker => not_supported("Allow registering a worker with the backend"),
95 PipeExt => supported("Allow other backends to pipe to this backend", false),
96 MakeShared => supported("Share the same JSON storage across multiple workers", false),
97 Workflow => supported("Flexible enough to support workflows", false),
98 WaitForCompletion => supported("Wait for tasks to complete without blocking", false),
99 ResumeById => not_implemented("Resume a task by its ID"),
100 ResumeAbandoned => not_implemented("Resume abandoned tasks"),
101 ListWorkers => not_supported("List all workers registered with the backend"),
102 ListTasks => not_implemented("List all tasks in the backend"),
103}]
104#[derive(Debug)]
105pub struct JsonStorage<Args> {
106 tasks: Arc<RwLock<BTreeMap<TaskKey, TaskWithMeta>>>,
107 buffer: Vec<Task<Args, JsonMapMetadata>>,
108 path: PathBuf,
109}
110
111#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
112#[derive(Debug, Clone)]
113struct StorageEntry {
114 task_id: TaskId,
115 status: Status,
116 task: TaskWithMeta,
117}
118
119impl<Args> JsonStorage<Args> {
120 pub fn new(path: impl Into<PathBuf>) -> std::io::Result<Self> {
122 let path = path.into();
123 let mut data = BTreeMap::new();
124
125 if path.exists() {
126 let file = File::open(&path)?;
127 let reader = BufReader::new(file);
128
129 for line in reader.lines() {
130 let line = line?;
131 if line.trim().is_empty() {
132 continue;
133 }
134
135 if let Ok(entry) = serde_json::from_str::<StorageEntry>(&line) {
136 let key = TaskKey {
137 status: entry.status,
138 task_id: entry.task_id,
139 namespace: std::any::type_name::<Args>().to_owned(),
140 };
141 data.insert(key, entry.task);
142 }
143 }
144 }
145
146 Ok(JsonStorage {
147 path,
148 tasks: Arc::new(RwLock::new(data)),
149 buffer: Vec::new(),
150 })
151 }
152
153 pub fn new_temp() -> Result<JsonStorage<Args>, std::io::Error> {
155 let p = std::env::temp_dir().join(format!("apalis-json-store-{}", RandomId::default()));
156 Self::new(p)
157 }
158
159 fn insert(&mut self, k: &TaskKey, v: TaskWithMeta) -> Result<(), std::io::Error> {
160 self.tasks.try_write().unwrap().insert(k.clone(), v);
161 Ok(())
162 }
163
164 pub fn remove(&mut self, key: &TaskKey) -> std::io::Result<Option<TaskWithMeta>> {
166 let removed = self.tasks.try_write().unwrap().remove(key);
167
168 if removed.is_some() {
169 self.persist_to_disk()?;
170 }
171
172 Ok(removed)
173 }
174
175 fn persist_to_disk(&self) -> std::io::Result<()> {
177 let tmp_path = self.path.with_extension("tmp");
178
179 {
180 let tmp_file = OpenOptions::new()
181 .write(true)
182 .create(true)
183 .truncate(true)
184 .open(&tmp_path)?;
185 let mut writer = BufWriter::new(tmp_file);
186
187 for (key, value) in self.tasks.try_read().unwrap().iter() {
188 let entry = StorageEntry {
189 status: key.status.clone(),
190 task_id: key.task_id.clone(),
191 task: value.clone(),
192 };
193 let line = serde_json::to_string(&entry)?;
194 writeln!(writer, "{}", line)?;
195 }
196
197 writer.flush()?;
198 } std::fs::rename(tmp_path, &self.path)?;
202 Ok(())
203 }
204 pub fn reload(&mut self) -> std::io::Result<()> {
206 let mut new_data = BTreeMap::new();
207
208 if self.path.exists() {
209 let file = File::open(&self.path)?;
210 let reader = BufReader::new(file);
211
212 for line in reader.lines() {
213 let line = line?;
214 if line.trim().is_empty() {
215 continue;
216 }
217
218 if let Ok(entry) = serde_json::from_str::<StorageEntry>(&line) {
219 let key = TaskKey {
220 status: entry.status,
221 task_id: entry.task_id,
222 namespace: std::any::type_name::<Args>().to_owned(),
223 };
224 new_data.insert(key, entry.task);
225 }
226 }
227 }
228
229 *self.tasks.try_write().unwrap() = new_data;
230 Ok(())
231 }
232 pub fn clear(&mut self) -> std::io::Result<()> {
234 self.tasks.try_write().unwrap().clear();
235
236 let file = OpenOptions::new()
238 .write(true)
239 .create(true)
240 .truncate(true)
241 .open(&self.path)?;
242 drop(file);
243
244 Ok(())
245 }
246
247 pub fn update_status(
249 &mut self,
250 old_key: &TaskKey,
251 new_status: Status,
252 ) -> std::io::Result<bool> {
253 let mut tasks = self.tasks.try_write().unwrap();
254 if let Some(value) = tasks.remove(old_key) {
255 let new_key = TaskKey {
256 status: new_status,
257 task_id: old_key.task_id.clone(),
258 namespace: old_key.namespace.clone(),
259 };
260 tasks.insert(new_key, value);
261 Ok(true)
262 } else {
263 Ok(false)
264 }
265 }
266
267 pub fn get(&self, key: &TaskKey) -> Option<TaskWithMeta> {
269 let tasks = self.tasks.try_read().unwrap();
270 let res = tasks.get(key);
271 res.cloned()
272 }
273
274 fn update_result(&self, key: &TaskKey, status: Status, val: Value) -> std::io::Result<bool> {
275 let mut tasks = self.tasks.try_write().unwrap();
276 if let Some(mut task) = tasks.remove(key) {
277 let new_key = TaskKey {
278 status,
279 task_id: key.task_id.clone(),
280 namespace: key.namespace.clone(),
281 };
282 task.result = Some(val);
283
284 tasks.insert(new_key, task);
285 Ok(true)
286 } else {
287 Ok(false)
288 }
289 }
290}
291
292impl<Args> Clone for JsonStorage<Args> {
293 fn clone(&self) -> Self {
294 Self {
295 tasks: self.tasks.clone(),
296 buffer: Vec::new(),
297 path: self.path.clone(),
298 }
299 }
300}
301
302#[cfg(test)]
303mod tests {
304 use std::time::Duration;
305
306 use crate::{
307 backend::{json::JsonStorage, TaskSink},
308 error::BoxDynError,
309 worker::{
310 builder::WorkerBuilder, context::WorkerContext, ext::event_listener::EventListenerExt,
311 },
312 };
313
314 const ITEMS: u32 = 100;
315
316 #[tokio::test]
317 async fn basic_worker() {
318 let mut json_store = JsonStorage::new_temp().unwrap();
319 for i in 0..ITEMS {
320 json_store.push(i).await.unwrap();
321 }
322
323 async fn task(task: u32, ctx: WorkerContext) -> Result<(), BoxDynError> {
324 tokio::time::sleep(Duration::from_secs(1)).await;
325 if task == ITEMS - 1 {
326 ctx.stop().unwrap();
327 return Err("Worker stopped!")?;
328 }
329 Ok(())
330 }
331
332 let worker = WorkerBuilder::new("rango-tango")
333 .backend(json_store)
334 .on_event(|ctx, ev| {
335 println!("On Event = {:?} from = {}", ev, ctx.name());
336 })
337 .build(task);
338 worker.run().await.unwrap();
339 }
340}