1use serde_json::Value;
54use std::{
55 collections::BTreeMap,
56 fs::{File, OpenOptions},
57 io::{BufRead, Write},
58 path::PathBuf,
59 sync::{Arc, RwLock},
60};
61
62use self::util::{TaskKey, TaskWithMeta};
63use apalis_core::{
64 features_table,
65 task::{
66 Task,
67 status::Status,
68 task_id::{RandomId, TaskId},
69 },
70};
71use std::io::{BufReader, BufWriter};
72
73mod backend;
74mod meta;
75mod shared;
76mod sink;
77mod util;
78
79pub use self::shared::SharedJsonStore;
80pub use meta::JsonMapMetadata;
81#[doc = features_table! {
96 setup = r#"
97 # {
98 # use apalis_file_storage::JsonStorage;;
99 # let mut backend = JsonStorage::new_temp().unwrap();
100 # backend
101 # };
102 "#,
103 Backend => supported("Basic Backend functionality", true),
104 TaskSink => supported("Ability to push new tasks", true),
105 Serialization => limited("Serialization support for arguments. Only accepts `json`", false),
106 WebUI => not_implemented("Expose a web interface for monitoring tasks"),
107 FetchById => not_implemented("Allow fetching a task by its ID"),
108 RegisterWorker => not_supported("Allow registering a worker with the backend"),
109 "[`PipeExt`]" => supported("Allow other backends to pipe to this backend", false),
110 MakeShared => supported("Share the same JSON storage across multiple workers via [`SharedJsonStore`]", false),
111 Workflow => supported("Flexible enough to support workflows", true),
112 WaitForCompletion => supported("Wait for tasks to complete without blocking", true),
113 ResumeById => not_implemented("Resume a task by its ID"),
114 ResumeAbandoned => not_implemented("Resume abandoned tasks"),
115 ListWorkers => not_supported("List all workers registered with the backend"),
116 ListTasks => not_implemented("List all tasks in the backend"),
117}]
118#[derive(Debug)]
121pub struct JsonStorage<Args> {
122 tasks: Arc<RwLock<BTreeMap<TaskKey, TaskWithMeta>>>,
123 buffer: Vec<Task<Value, JsonMapMetadata, RandomId>>,
124 path: PathBuf,
125 _marker: std::marker::PhantomData<Args>,
126}
127
128#[derive(serde::Serialize, serde::Deserialize, Debug, Clone)]
129struct StorageEntry {
130 task_id: TaskId<RandomId>,
131 status: Status,
132 task: TaskWithMeta,
133}
134
135impl<Args> JsonStorage<Args> {
136 pub fn new(path: impl Into<PathBuf>) -> std::io::Result<Self> {
138 let path = path.into();
139 let mut data = BTreeMap::new();
140
141 if path.exists() {
142 let file = File::open(&path)?;
143 let reader = BufReader::new(file);
144
145 for line in reader.lines() {
146 let line = line?;
147 if line.trim().is_empty() {
148 continue;
149 }
150
151 if let Ok(entry) = serde_json::from_str::<StorageEntry>(&line) {
152 let key = TaskKey {
153 status: entry.status,
154 task_id: entry.task_id,
155 queue: std::any::type_name::<Args>().to_owned(),
156 };
157 data.insert(key, entry.task);
158 }
159 }
160 }
161
162 Ok(Self {
163 path,
164 tasks: Arc::new(RwLock::new(data)),
165 buffer: Vec::new(),
166 _marker: std::marker::PhantomData,
167 })
168 }
169
170 pub fn new_temp() -> Result<Self, std::io::Error> {
172 let p = std::env::temp_dir().join(format!("apalis-json-store-{}", RandomId::default()));
173 Self::new(p)
174 }
175
176 fn insert(&self, k: &TaskKey, v: TaskWithMeta) -> Result<(), std::io::Error> {
177 self.tasks.try_write().unwrap().insert(k.clone(), v);
178 Ok(())
179 }
180
181 pub fn remove(&mut self, key: &TaskKey) -> std::io::Result<Option<TaskWithMeta>> {
183 let removed = self.tasks.try_write().unwrap().remove(key);
184
185 if removed.is_some() {
186 self.persist_to_disk()?;
187 }
188
189 Ok(removed)
190 }
191
192 fn persist_to_disk(&self) -> std::io::Result<()> {
194 let tmp_path = &self.path;
195 {
196 let tmp_file = OpenOptions::new()
197 .write(true)
198 .create(true)
199 .truncate(true)
200 .open(tmp_path)?;
201 let mut writer = BufWriter::new(tmp_file);
202
203 for (key, value) in self.tasks.try_read().unwrap().iter() {
204 let entry = StorageEntry {
205 status: key.status.clone(),
206 task_id: key.task_id.clone(),
207 task: value.clone(),
208 };
209 let line = serde_json::to_string(&entry)?;
210 writeln!(writer, "{line}")?;
211 }
212
213 writer.flush()?;
214 } std::fs::rename(tmp_path, &self.path)?;
218 Ok(())
219 }
220 pub fn reload(&mut self) -> std::io::Result<()> {
222 let mut new_data = BTreeMap::new();
223
224 if self.path.exists() {
225 let file = File::open(&self.path)?;
226 let reader = BufReader::new(file);
227
228 for line in reader.lines() {
229 let line = line?;
230 if line.trim().is_empty() {
231 continue;
232 }
233
234 if let Ok(entry) = serde_json::from_str::<StorageEntry>(&line) {
235 let key = TaskKey {
236 status: entry.status,
237 task_id: entry.task_id,
238 queue: std::any::type_name::<Args>().to_owned(),
239 };
240 new_data.insert(key, entry.task);
241 }
242 }
243 }
244
245 *self.tasks.try_write().unwrap() = new_data;
246 Ok(())
247 }
248 pub fn clear(&mut self) -> std::io::Result<()> {
250 self.tasks.try_write().unwrap().clear();
251
252 let file = OpenOptions::new()
254 .write(true)
255 .create(true)
256 .truncate(true)
257 .open(&self.path)?;
258 drop(file);
259
260 Ok(())
261 }
262
263 pub fn update_status(
265 &mut self,
266 old_key: &TaskKey,
267 new_status: Status,
268 ) -> std::io::Result<bool> {
269 let mut tasks = self.tasks.try_write().unwrap();
270 if let Some(value) = tasks.remove(old_key) {
271 let new_key = TaskKey {
272 status: new_status,
273 task_id: old_key.task_id.clone(),
274 queue: old_key.queue.clone(),
275 };
276 tasks.insert(new_key, value);
277 Ok(true)
278 } else {
279 Ok(false)
280 }
281 }
282
283 #[must_use]
285 pub fn get(&self, key: &TaskKey) -> Option<TaskWithMeta> {
286 let tasks = self.tasks.try_read().unwrap();
287 let res = tasks.get(key);
288 res.cloned()
289 }
290
291 fn update_result(&self, key: &TaskKey, status: Status, val: Value) -> std::io::Result<bool> {
292 let mut tasks = self.tasks.try_write().unwrap();
293 if let Some(mut task) = tasks.remove(key) {
294 let new_key = TaskKey {
295 status,
296 task_id: key.task_id.clone(),
297 queue: key.queue.clone(),
298 };
299 task.result = Some(val);
300
301 tasks.insert(new_key, task);
302 Ok(true)
303 } else {
304 Ok(false)
305 }
306 }
307}
308
309impl<Args> Clone for JsonStorage<Args> {
310 fn clone(&self) -> Self {
311 Self {
312 tasks: self.tasks.clone(),
313 buffer: Vec::new(),
314 path: self.path.clone(),
315 _marker: std::marker::PhantomData,
316 }
317 }
318}
319
320#[cfg(test)]
321mod tests {
322 use super::*;
323 use std::time::Duration;
324
325 use apalis_core::{
326 backend::TaskSink,
327 error::BoxDynError,
328 worker::{
329 builder::WorkerBuilder, context::WorkerContext, ext::event_listener::EventListenerExt,
330 },
331 };
332
333 const ITEMS: u32 = 100;
334
335 #[tokio::test]
336 async fn basic_worker() {
337 let mut json_store = JsonStorage::new_temp().unwrap();
338 for i in 0..ITEMS {
339 json_store.push(i).await.unwrap();
340 }
341
342 async fn task(task: u32, ctx: WorkerContext) -> Result<(), BoxDynError> {
343 tokio::time::sleep(Duration::from_secs(1)).await;
344 if task == ITEMS - 1 {
345 ctx.stop().unwrap();
346 return Err("Worker stopped!")?;
347 }
348 Ok(())
349 }
350
351 let worker = WorkerBuilder::new("rango-tango")
352 .backend(json_store)
353 .on_event(|ctx, ev| {
354 println!("On Event = {ev:?} from = {}", ctx.name());
355 })
356 .build(task);
357 worker.run().await.unwrap();
358 }
359}