apalis_core/backend/impls/json/
mod.rs

1//! # File based Backend using JSON
2//!
3//! A JSON file-based backend for persisting tasks and results.
4//!
5//! ## Features
6//!
7//! - **Sink support**: Ability to push new tasks.
8//! - **Codec Support**: Serialization support for arguments using JSON.
9//! - **Workflow Support**: Flexible enough to support workflows.
10//! - **Ack Support**: Allows acknowledgement of task completion.
11//! - **WaitForCompletion**: Wait for tasks to complete without blocking.
12//!
13//! ## Usage Example
14//!
15//! ```rust
16//! # use apalis_core::backend::json::JsonStorage;
17//! # use apalis_core::worker::builder::WorkerBuilder;
18//! # use std::time::Duration;
19//! # use apalis_core::worker::context::WorkerContext;
20//! # use apalis_core::backend::TaskSink;
21//! # use apalis_core::error::BoxDynError;
22//! # use apalis_core::worker::ext::event_listener::EventListenerExt;
23//!
24//! #[tokio::main]
25//! async fn main() {
26//!     let mut json_store = JsonStorage::new_temp().unwrap();
27//!     json_store.push(42).await.unwrap();
28
29//!
30//!     async fn task(task: u32, ctx: WorkerContext) -> Result<(), BoxDynError> {
31//!         tokio::time::sleep(Duration::from_secs(1)).await;
32//!         ctx.stop().unwrap();
33//!         Ok(())
34//!     }
35//!
36//!     let worker = WorkerBuilder::new("rango-tango")
37//!         .backend(json_store)
38//!         .on_event(|ctx, ev| {
39//!             println!("On Event = {:?}", ev);
40//!         })
41//!         .build(task);
42//!     worker.run().await.unwrap();
43//! }
44//! ```
45//!
46//! ## Implementation Notes
47//!
48//! - Tasks are stored in a file, each line representing a serialized task entry.
49//! - All operations are thread-safe using `RwLock`.
50//! - Data is atomically persisted to disk to avoid corruption.
51//! - Supports temporary storage for testing and ephemeral use cases.
52//!
53use serde_json::Value;
54use std::{
55    collections::BTreeMap,
56    fs::{File, OpenOptions},
57    io::{BufRead, Write},
58    path::PathBuf,
59    sync::{Arc, RwLock},
60};
61
62use self::{
63    meta::JsonMapMetadata,
64    util::{TaskKey, TaskWithMeta},
65};
66use crate::{
67    features_table,
68    task::{
69        status::Status,
70        task_id::{RandomId, TaskId},
71        Task,
72    },
73};
74use std::io::{BufReader, BufWriter};
75
76mod backend;
77mod meta;
78mod shared;
79mod sink;
80mod util;
81
82pub use self::shared::SharedJsonStore;
83
84/// A backend that persists to a file using json encoding
85///
86#[doc = features_table! {
87    setup = {
88        use apalis_core::backend::json::JsonStorage;
89        JsonStorage::new_temp().unwrap()
90    };,
91    TaskSink => supported("Ability to push new tasks"),
92    Serialization => limited("Serialization support for arguments. Only accepts `json`", false),
93    FetchById => not_implemented("Allow fetching a task by its ID"),
94    RegisterWorker => not_supported("Allow registering a worker with the backend"),
95    PipeExt => supported("Allow other backends to pipe to this backend", false),
96    MakeShared => supported("Share the same JSON storage across multiple workers", false),
97    Workflow => supported("Flexible enough to support workflows", false),
98    WaitForCompletion => supported("Wait for tasks to complete without blocking", false),
99    ResumeById => not_implemented("Resume a task by its ID"),
100    ResumeAbandoned => not_implemented("Resume abandoned tasks"),
101    ListWorkers => not_supported("List all workers registered with the backend"),
102    ListTasks => not_implemented("List all tasks in the backend"),
103}]
104#[derive(Debug)]
105pub struct JsonStorage<Args> {
106    tasks: Arc<RwLock<BTreeMap<TaskKey, TaskWithMeta>>>,
107    buffer: Vec<Task<Args, JsonMapMetadata>>,
108    path: PathBuf,
109}
110
111#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
112#[derive(Debug, Clone)]
113struct StorageEntry {
114    task_id: TaskId,
115    status: Status,
116    task: TaskWithMeta,
117}
118
119impl<Args> JsonStorage<Args> {
120    /// Creates a new `JsonStorage` instance using the specified file path.
121    pub fn new(path: impl Into<PathBuf>) -> std::io::Result<Self> {
122        let path = path.into();
123        let mut data = BTreeMap::new();
124
125        if path.exists() {
126            let file = File::open(&path)?;
127            let reader = BufReader::new(file);
128
129            for line in reader.lines() {
130                let line = line?;
131                if line.trim().is_empty() {
132                    continue;
133                }
134
135                if let Ok(entry) = serde_json::from_str::<StorageEntry>(&line) {
136                    let key = TaskKey {
137                        status: entry.status,
138                        task_id: entry.task_id,
139                        namespace: std::any::type_name::<Args>().to_owned(),
140                    };
141                    data.insert(key, entry.task);
142                }
143            }
144        }
145
146        Ok(JsonStorage {
147            path,
148            tasks: Arc::new(RwLock::new(data)),
149            buffer: Vec::new(),
150        })
151    }
152
153    /// Creates a new temporary `JsonStorage` instance.
154    pub fn new_temp() -> Result<JsonStorage<Args>, std::io::Error> {
155        let p = std::env::temp_dir().join(format!("apalis-json-store-{}", RandomId::default()));
156        Self::new(p)
157    }
158
159    fn insert(&mut self, k: &TaskKey, v: TaskWithMeta) -> Result<(), std::io::Error> {
160        self.tasks.try_write().unwrap().insert(k.clone(), v);
161        Ok(())
162    }
163
164    /// Removes a task from the storage.
165    pub fn remove(&mut self, key: &TaskKey) -> std::io::Result<Option<TaskWithMeta>> {
166        let removed = self.tasks.try_write().unwrap().remove(key);
167
168        if removed.is_some() {
169            self.persist_to_disk()?;
170        }
171
172        Ok(removed)
173    }
174
175    /// Persist all current data to disk by rewriting the file
176    fn persist_to_disk(&self) -> std::io::Result<()> {
177        let tmp_path = self.path.with_extension("tmp");
178
179        {
180            let tmp_file = OpenOptions::new()
181                .write(true)
182                .create(true)
183                .truncate(true)
184                .open(&tmp_path)?;
185            let mut writer = BufWriter::new(tmp_file);
186
187            for (key, value) in self.tasks.try_read().unwrap().iter() {
188                let entry = StorageEntry {
189                    status: key.status.clone(),
190                    task_id: key.task_id.clone(),
191                    task: value.clone(),
192                };
193                let line = serde_json::to_string(&entry)?;
194                writeln!(writer, "{}", line)?;
195            }
196
197            writer.flush()?;
198        } // BufWriter is dropped here, ensuring all data is written
199
200        // Atomically replace the old file with the new one
201        std::fs::rename(tmp_path, &self.path)?;
202        Ok(())
203    }
204    /// Reload data from disk, useful if the file was modified externally
205    pub fn reload(&mut self) -> std::io::Result<()> {
206        let mut new_data = BTreeMap::new();
207
208        if self.path.exists() {
209            let file = File::open(&self.path)?;
210            let reader = BufReader::new(file);
211
212            for line in reader.lines() {
213                let line = line?;
214                if line.trim().is_empty() {
215                    continue;
216                }
217
218                if let Ok(entry) = serde_json::from_str::<StorageEntry>(&line) {
219                    let key = TaskKey {
220                        status: entry.status,
221                        task_id: entry.task_id,
222                        namespace: std::any::type_name::<Args>().to_owned(),
223                    };
224                    new_data.insert(key, entry.task);
225                }
226            }
227        }
228
229        *self.tasks.try_write().unwrap() = new_data;
230        Ok(())
231    }
232    /// Clear all data from memory and file
233    pub fn clear(&mut self) -> std::io::Result<()> {
234        self.tasks.try_write().unwrap().clear();
235
236        // Create an empty file
237        let file = OpenOptions::new()
238            .write(true)
239            .create(true)
240            .truncate(true)
241            .open(&self.path)?;
242        drop(file);
243
244        Ok(())
245    }
246
247    /// Update the status of an existing key
248    pub fn update_status(
249        &mut self,
250        old_key: &TaskKey,
251        new_status: Status,
252    ) -> std::io::Result<bool> {
253        let mut tasks = self.tasks.try_write().unwrap();
254        if let Some(value) = tasks.remove(old_key) {
255            let new_key = TaskKey {
256                status: new_status,
257                task_id: old_key.task_id.clone(),
258                namespace: old_key.namespace.clone(),
259            };
260            tasks.insert(new_key, value);
261            Ok(true)
262        } else {
263            Ok(false)
264        }
265    }
266
267    /// Retrieves a task from the storage.
268    pub fn get(&self, key: &TaskKey) -> Option<TaskWithMeta> {
269        let tasks = self.tasks.try_read().unwrap();
270        let res = tasks.get(key);
271        res.cloned()
272    }
273
274    fn update_result(&self, key: &TaskKey, status: Status, val: Value) -> std::io::Result<bool> {
275        let mut tasks = self.tasks.try_write().unwrap();
276        if let Some(mut task) = tasks.remove(key) {
277            let new_key = TaskKey {
278                status,
279                task_id: key.task_id.clone(),
280                namespace: key.namespace.clone(),
281            };
282            task.result = Some(val);
283
284            tasks.insert(new_key, task);
285            Ok(true)
286        } else {
287            Ok(false)
288        }
289    }
290}
291
292impl<Args> Clone for JsonStorage<Args> {
293    fn clone(&self) -> Self {
294        Self {
295            tasks: self.tasks.clone(),
296            buffer: Vec::new(),
297            path: self.path.clone(),
298        }
299    }
300}
301
302#[cfg(test)]
303mod tests {
304    use std::time::Duration;
305
306    use crate::{
307        backend::{json::JsonStorage, TaskSink},
308        error::BoxDynError,
309        worker::{
310            builder::WorkerBuilder, context::WorkerContext, ext::event_listener::EventListenerExt,
311        },
312    };
313
314    const ITEMS: u32 = 100;
315
316    #[tokio::test]
317    async fn basic_worker() {
318        let mut json_store = JsonStorage::new_temp().unwrap();
319        for i in 0..ITEMS {
320            json_store.push(i).await.unwrap();
321        }
322
323        async fn task(task: u32, ctx: WorkerContext) -> Result<(), BoxDynError> {
324            tokio::time::sleep(Duration::from_secs(1)).await;
325            if task == ITEMS - 1 {
326                ctx.stop().unwrap();
327                return Err("Worker stopped!")?;
328            }
329            Ok(())
330        }
331
332        let worker = WorkerBuilder::new("rango-tango")
333            .backend(json_store)
334            .on_event(|ctx, ev| {
335                println!("On Event = {:?} from = {}", ev, ctx.name());
336            })
337            .build(task);
338        worker.run().await.unwrap();
339    }
340}