apalis_core/backend/impls/json/
mod.rs

1//! # File based Backend using JSON
2//!
3//! A JSON file-based backend for persisting tasks and results.
4//!
5//! ## Features
6//!
7//! - **Sink support**: Ability to push new tasks.
8//! - **Codec Support**: Serialization support for arguments using JSON.
9//! - **Workflow Support**: Flexible enough to support workflows.
10//! - **Ack Support**: Allows acknowledgement of task completion.
11//! - **WaitForCompletion**: Wait for tasks to complete without blocking.
12//!
13//! ## Usage Example
14//!
15//! ```rust
16//! # use apalis_core::backend::json::JsonStorage;
17//! # use apalis_core::worker::builder::WorkerBuilder;
18//! # use std::time::Duration;
19//! # use apalis_core::worker::context::WorkerContext;
20//! # use apalis_core::backend::TaskSink;
21//! # use apalis_core::error::BoxDynError;
22//! # use apalis_core::worker::ext::event_listener::EventListenerExt;
23//!
24//! #[tokio::main]
25//! async fn main() {
26//!     let mut json_store = JsonStorage::new_temp().unwrap();
27//!     json_store.push(42).await.unwrap();
28
29//!
30//!     async fn task(task: u32, ctx: WorkerContext) -> Result<(), BoxDynError> {
31//!         tokio::time::sleep(Duration::from_secs(1)).await;
32//!         ctx.stop().unwrap();
33//!         Ok(())
34//!     }
35//!
36//!     let worker = WorkerBuilder::new("rango-tango")
37//!         .backend(json_store)
38//!         .on_event(|ctx, ev| {
39//!             println!("On Event = {:?}", ev);
40//!         })
41//!         .build(task);
42//!     worker.run().await.unwrap();
43//! }
44//! ```
45//!
46//! ## Implementation Notes
47//!
48//! - Tasks are stored in a file, each line representing a serialized task entry.
49//! - All operations are thread-safe using `RwLock`.
50//! - Data is atomically persisted to disk to avoid corruption.
51//! - Supports temporary storage for testing and ephemeral use cases.
52//!
53use serde_json::Value;
54use std::{
55    collections::BTreeMap,
56    fs::{File, OpenOptions},
57    io::{BufRead, Write},
58    path::PathBuf,
59    sync::{Arc, RwLock},
60};
61
62use self::{
63    meta::JsonMapMetadata,
64    util::{TaskKey, TaskWithMeta},
65};
66use crate::{
67    features_table,
68    task::{
69        Task,
70        status::Status,
71        task_id::{RandomId, TaskId},
72    },
73};
74use std::io::{BufReader, BufWriter};
75
76mod backend;
77mod meta;
78mod shared;
79mod sink;
80mod util;
81
82pub use self::shared::SharedJsonStore;
83
84/// A backend that persists to a file using json encoding
85///
86/// *Warning*: This backend is not optimized for high-throughput scenarios and is best suited for development, testing, or low-volume workloads.
87#[doc = features_table! {
88    setup = {
89        use apalis_core::backend::json::JsonStorage;
90        JsonStorage::new_temp().unwrap()
91    };,
92    TaskSink => supported("Ability to push new tasks"),
93    Serialization => limited("Serialization support for arguments. Only accepts `json`", false),
94    FetchById => not_implemented("Allow fetching a task by its ID"),
95    RegisterWorker => not_supported("Allow registering a worker with the backend"),
96    PipeExt => supported("Allow other backends to pipe to this backend", false),
97    MakeShared => supported("Share the same JSON storage across multiple workers", false),
98    Workflow => supported("Flexible enough to support workflows", false),
99    WaitForCompletion => supported("Wait for tasks to complete without blocking", false),
100    ResumeById => not_implemented("Resume a task by its ID"),
101    ResumeAbandoned => not_implemented("Resume abandoned tasks"),
102    ListWorkers => not_supported("List all workers registered with the backend"),
103    ListTasks => not_implemented("List all tasks in the backend"),
104}]
105#[derive(Debug)]
106pub struct JsonStorage<Args> {
107    tasks: Arc<RwLock<BTreeMap<TaskKey, TaskWithMeta>>>,
108    buffer: Vec<Task<Value, JsonMapMetadata>>,
109    path: PathBuf,
110    _marker: std::marker::PhantomData<Args>,
111}
112
113#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
114#[derive(Debug, Clone)]
115struct StorageEntry {
116    task_id: TaskId,
117    status: Status,
118    task: TaskWithMeta,
119}
120
121impl<Args> JsonStorage<Args> {
122    /// Creates a new `JsonStorage` instance using the specified file path.
123    pub fn new(path: impl Into<PathBuf>) -> std::io::Result<Self> {
124        let path = path.into();
125        let mut data = BTreeMap::new();
126
127        if path.exists() {
128            let file = File::open(&path)?;
129            let reader = BufReader::new(file);
130
131            for line in reader.lines() {
132                let line = line?;
133                if line.trim().is_empty() {
134                    continue;
135                }
136
137                if let Ok(entry) = serde_json::from_str::<StorageEntry>(&line) {
138                    let key = TaskKey {
139                        status: entry.status,
140                        task_id: entry.task_id,
141                        queue: std::any::type_name::<Args>().to_owned(),
142                    };
143                    data.insert(key, entry.task);
144                }
145            }
146        }
147
148        Ok(JsonStorage {
149            path,
150            tasks: Arc::new(RwLock::new(data)),
151            buffer: Vec::new(),
152            _marker: std::marker::PhantomData,
153        })
154    }
155
156    /// Creates a new temporary `JsonStorage` instance.
157    pub fn new_temp() -> Result<JsonStorage<Args>, std::io::Error> {
158        let p = std::env::temp_dir().join(format!("apalis-json-store-{}", RandomId::default()));
159        Self::new(p)
160    }
161
162    fn insert(&mut self, k: &TaskKey, v: TaskWithMeta) -> Result<(), std::io::Error> {
163        self.tasks.try_write().unwrap().insert(k.clone(), v);
164        Ok(())
165    }
166
167    /// Removes a task from the storage.
168    pub fn remove(&mut self, key: &TaskKey) -> std::io::Result<Option<TaskWithMeta>> {
169        let removed = self.tasks.try_write().unwrap().remove(key);
170
171        if removed.is_some() {
172            self.persist_to_disk()?;
173        }
174
175        Ok(removed)
176    }
177
178    /// Persist all current data to disk by rewriting the file
179    fn persist_to_disk(&self) -> std::io::Result<()> {
180        let tmp_path = self.path.with_extension("tmp");
181
182        {
183            let tmp_file = OpenOptions::new()
184                .write(true)
185                .create(true)
186                .truncate(true)
187                .open(&tmp_path)?;
188            let mut writer = BufWriter::new(tmp_file);
189
190            for (key, value) in self.tasks.try_read().unwrap().iter() {
191                let entry = StorageEntry {
192                    status: key.status.clone(),
193                    task_id: key.task_id.clone(),
194                    task: value.clone(),
195                };
196                let line = serde_json::to_string(&entry)?;
197                writeln!(writer, "{}", line)?;
198            }
199
200            writer.flush()?;
201        } // BufWriter is dropped here, ensuring all data is written
202
203        // Atomically replace the old file with the new one
204        std::fs::rename(tmp_path, &self.path)?;
205        Ok(())
206    }
207    /// Reload data from disk, useful if the file was modified externally
208    pub fn reload(&mut self) -> std::io::Result<()> {
209        let mut new_data = BTreeMap::new();
210
211        if self.path.exists() {
212            let file = File::open(&self.path)?;
213            let reader = BufReader::new(file);
214
215            for line in reader.lines() {
216                let line = line?;
217                if line.trim().is_empty() {
218                    continue;
219                }
220
221                if let Ok(entry) = serde_json::from_str::<StorageEntry>(&line) {
222                    let key = TaskKey {
223                        status: entry.status,
224                        task_id: entry.task_id,
225                        queue: std::any::type_name::<Args>().to_owned(),
226                    };
227                    new_data.insert(key, entry.task);
228                }
229            }
230        }
231
232        *self.tasks.try_write().unwrap() = new_data;
233        Ok(())
234    }
235    /// Clear all data from memory and file
236    pub fn clear(&mut self) -> std::io::Result<()> {
237        self.tasks.try_write().unwrap().clear();
238
239        // Create an empty file
240        let file = OpenOptions::new()
241            .write(true)
242            .create(true)
243            .truncate(true)
244            .open(&self.path)?;
245        drop(file);
246
247        Ok(())
248    }
249
250    /// Update the status of an existing key
251    pub fn update_status(
252        &mut self,
253        old_key: &TaskKey,
254        new_status: Status,
255    ) -> std::io::Result<bool> {
256        let mut tasks = self.tasks.try_write().unwrap();
257        if let Some(value) = tasks.remove(old_key) {
258            let new_key = TaskKey {
259                status: new_status,
260                task_id: old_key.task_id.clone(),
261                queue: old_key.queue.clone(),
262            };
263            tasks.insert(new_key, value);
264            Ok(true)
265        } else {
266            Ok(false)
267        }
268    }
269
270    /// Retrieves a task from the storage.
271    pub fn get(&self, key: &TaskKey) -> Option<TaskWithMeta> {
272        let tasks = self.tasks.try_read().unwrap();
273        let res = tasks.get(key);
274        res.cloned()
275    }
276
277    fn update_result(&self, key: &TaskKey, status: Status, val: Value) -> std::io::Result<bool> {
278        let mut tasks = self.tasks.try_write().unwrap();
279        if let Some(mut task) = tasks.remove(key) {
280            let new_key = TaskKey {
281                status,
282                task_id: key.task_id.clone(),
283                queue: key.queue.clone(),
284            };
285            task.result = Some(val);
286
287            tasks.insert(new_key, task);
288            Ok(true)
289        } else {
290            Ok(false)
291        }
292    }
293}
294
295impl<Args> Clone for JsonStorage<Args> {
296    fn clone(&self) -> Self {
297        Self {
298            tasks: self.tasks.clone(),
299            buffer: Vec::new(),
300            path: self.path.clone(),
301            _marker: std::marker::PhantomData,
302        }
303    }
304}
305
306#[cfg(test)]
307mod tests {
308    use std::time::Duration;
309
310    use crate::{
311        backend::{TaskSink, json::JsonStorage},
312        error::BoxDynError,
313        worker::{
314            builder::WorkerBuilder, context::WorkerContext, ext::event_listener::EventListenerExt,
315        },
316    };
317
318    const ITEMS: u32 = 100;
319
320    #[tokio::test]
321    async fn basic_worker() {
322        let mut json_store = JsonStorage::new_temp().unwrap();
323        for i in 0..ITEMS {
324            json_store.push(i).await.unwrap();
325        }
326
327        async fn task(task: u32, ctx: WorkerContext) -> Result<(), BoxDynError> {
328            tokio::time::sleep(Duration::from_secs(1)).await;
329            if task == ITEMS - 1 {
330                ctx.stop().unwrap();
331                return Err("Worker stopped!")?;
332            }
333            Ok(())
334        }
335
336        let worker = WorkerBuilder::new("rango-tango")
337            .backend(json_store)
338            .on_event(|ctx, ev| {
339                println!("On Event = {:?} from = {}", ev, ctx.name());
340            })
341            .build(task);
342        worker.run().await.unwrap();
343    }
344}