apalis_core/backend/impls/json/
mod.rs

1//! # File based Backend using JSON
2//!
3//! A JSON file-based backend for persisting tasks and results.
4//!
5//! ## Features
6//!
7//! - **Sink support**: Ability to push new tasks.
8//! - **Codec Support**: Serialization support for arguments using JSON.
9//! - **Workflow Support**: Flexible enough to support workflows.
10//! - **Ack Support**: Allows acknowledgement of task completion.
11//! - **WaitForCompletion**: Wait for tasks to complete without blocking.
12//!
13//! ## Usage Example
14//!
15//! ```rust
16//! # use apalis_core::backend::json::JsonStorage;
17//! # use apalis_core::worker::builder::WorkerBuilder;
18//! # use std::time::Duration;
19//! # use apalis_core::worker::context::WorkerContext;
20//! # use apalis_core::backend::TaskSink;
21//! # use apalis_core::error::BoxDynError;
22//! # use apalis_core::worker::ext::event_listener::EventListenerExt;
23//!
24//! #[tokio::main]
25//! async fn main() {
26//!     let mut json_store = JsonStorage::new_temp().unwrap();
27//!     json_store.push(42).await.unwrap();
28
29//!
30//!     async fn task(task: u32, ctx: WorkerContext) -> Result<(), BoxDynError> {
31//!         tokio::time::sleep(Duration::from_secs(1)).await;
32//!         ctx.stop().unwrap();
33//!         Ok(())
34//!     }
35//!
36//!     let worker = WorkerBuilder::new("rango-tango")
37//!         .backend(json_store)
38//!         .on_event(|ctx, ev| {
39//!             println!("On Event = {:?}", ev);
40//!         })
41//!         .build(task);
42//!     worker.run().await.unwrap();
43//! }
44//! ```
45//!
46//! ## Implementation Notes
47//!
48//! - Tasks are stored in a file, each line representing a serialized task entry.
49//! - All operations are thread-safe using `RwLock`.
50//! - Data is atomically persisted to disk to avoid corruption.
51//! - Supports temporary storage for testing and ephemeral use cases.
52//!
53use serde_json::Value;
54use std::{
55    collections::BTreeMap,
56    fs::{File, OpenOptions},
57    io::{BufRead, Write},
58    path::PathBuf,
59    sync::{Arc, RwLock},
60};
61
62use self::{
63    meta::JsonMapMetadata,
64    util::{TaskKey, TaskWithMeta},
65};
66use crate::{
67    features_table,
68    task::{
69        Task,
70        status::Status,
71        task_id::{RandomId, TaskId},
72    },
73};
74use std::io::{BufReader, BufWriter};
75
76mod backend;
77mod meta;
78mod shared;
79mod sink;
80mod util;
81
82pub use self::shared::SharedJsonStore;
83/// A backend that persists to a file using json encoding
84///
85/// *Warning*: This backend is not optimized for high-throughput scenarios and is best suited for development, testing, or low-volume workloads.
86///
87/// # Example
88///
89/// Creates a temporary JSON storage backend
90/// ```rust
91/// # use apalis_core::backend::json::JsonStorage;
92/// # pub fn setup_json_storage() -> JsonStorage<u32> {
93/// let mut backend = JsonStorage::new_temp().unwrap();
94/// # backend
95/// # }
96/// ```
97#[doc = features_table! {
98    setup = r#"
99        # {
100        #   use apalis_core::backend::json::JsonStorage;
101        #   let mut backend = JsonStorage::new_temp().unwrap();
102        #   backend
103        # };
104    "#,
105    Backend => supported("Basic Backend functionality", true),
106    TaskSink => supported("Ability to push new tasks", true),
107    Serialization => limited("Serialization support for arguments. Only accepts `json`", false),
108    WebUI => not_implemented("Expose a web interface for monitoring tasks"),
109    FetchById => not_implemented("Allow fetching a task by its ID"),
110    RegisterWorker => not_supported("Allow registering a worker with the backend"),
111    "[`PipeExt`]" => supported("Allow other backends to pipe to this backend", false),
112    MakeShared => supported("Share the same JSON storage across multiple workers via [`SharedJsonStore`]", false),
113    Workflow => supported("Flexible enough to support workflows", true),
114    WaitForCompletion => supported("Wait for tasks to complete without blocking", true),
115    ResumeById => not_implemented("Resume a task by its ID"),
116    ResumeAbandoned => not_implemented("Resume abandoned tasks"),
117    ListWorkers => not_supported("List all workers registered with the backend"),
118    ListTasks => not_implemented("List all tasks in the backend"),
119}]
120///
121/// [`PipeExt`]: crate::backend::pipe::PipeExt
122#[derive(Debug)]
123pub struct JsonStorage<Args> {
124    tasks: Arc<RwLock<BTreeMap<TaskKey, TaskWithMeta>>>,
125    buffer: Vec<Task<Value, JsonMapMetadata>>,
126    path: PathBuf,
127    _marker: std::marker::PhantomData<Args>,
128}
129
130#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
131#[derive(Debug, Clone)]
132struct StorageEntry {
133    task_id: TaskId,
134    status: Status,
135    task: TaskWithMeta,
136}
137
138impl<Args> JsonStorage<Args> {
139    /// Creates a new `JsonStorage` instance using the specified file path.
140    pub fn new(path: impl Into<PathBuf>) -> std::io::Result<Self> {
141        let path = path.into();
142        let mut data = BTreeMap::new();
143
144        if path.exists() {
145            let file = File::open(&path)?;
146            let reader = BufReader::new(file);
147
148            for line in reader.lines() {
149                let line = line?;
150                if line.trim().is_empty() {
151                    continue;
152                }
153
154                if let Ok(entry) = serde_json::from_str::<StorageEntry>(&line) {
155                    let key = TaskKey {
156                        status: entry.status,
157                        task_id: entry.task_id,
158                        queue: std::any::type_name::<Args>().to_owned(),
159                    };
160                    data.insert(key, entry.task);
161                }
162            }
163        }
164
165        Ok(JsonStorage {
166            path,
167            tasks: Arc::new(RwLock::new(data)),
168            buffer: Vec::new(),
169            _marker: std::marker::PhantomData,
170        })
171    }
172
173    /// Creates a new temporary `JsonStorage` instance.
174    pub fn new_temp() -> Result<JsonStorage<Args>, std::io::Error> {
175        let p = std::env::temp_dir().join(format!("apalis-json-store-{}", RandomId::default()));
176        Self::new(p)
177    }
178
179    fn insert(&mut self, k: &TaskKey, v: TaskWithMeta) -> Result<(), std::io::Error> {
180        self.tasks.try_write().unwrap().insert(k.clone(), v);
181        Ok(())
182    }
183
184    /// Removes a task from the storage.
185    pub fn remove(&mut self, key: &TaskKey) -> std::io::Result<Option<TaskWithMeta>> {
186        let removed = self.tasks.try_write().unwrap().remove(key);
187
188        if removed.is_some() {
189            self.persist_to_disk()?;
190        }
191
192        Ok(removed)
193    }
194
195    /// Persist all current data to disk by rewriting the file
196    fn persist_to_disk(&self) -> std::io::Result<()> {
197        let tmp_path = self.path.with_extension("tmp");
198
199        {
200            let tmp_file = OpenOptions::new()
201                .write(true)
202                .create(true)
203                .truncate(true)
204                .open(&tmp_path)?;
205            let mut writer = BufWriter::new(tmp_file);
206
207            for (key, value) in self.tasks.try_read().unwrap().iter() {
208                let entry = StorageEntry {
209                    status: key.status.clone(),
210                    task_id: key.task_id.clone(),
211                    task: value.clone(),
212                };
213                let line = serde_json::to_string(&entry)?;
214                writeln!(writer, "{}", line)?;
215            }
216
217            writer.flush()?;
218        } // BufWriter is dropped here, ensuring all data is written
219
220        // Atomically replace the old file with the new one
221        std::fs::rename(tmp_path, &self.path)?;
222        Ok(())
223    }
224    /// Reload data from disk, useful if the file was modified externally
225    pub fn reload(&mut self) -> std::io::Result<()> {
226        let mut new_data = BTreeMap::new();
227
228        if self.path.exists() {
229            let file = File::open(&self.path)?;
230            let reader = BufReader::new(file);
231
232            for line in reader.lines() {
233                let line = line?;
234                if line.trim().is_empty() {
235                    continue;
236                }
237
238                if let Ok(entry) = serde_json::from_str::<StorageEntry>(&line) {
239                    let key = TaskKey {
240                        status: entry.status,
241                        task_id: entry.task_id,
242                        queue: std::any::type_name::<Args>().to_owned(),
243                    };
244                    new_data.insert(key, entry.task);
245                }
246            }
247        }
248
249        *self.tasks.try_write().unwrap() = new_data;
250        Ok(())
251    }
252    /// Clear all data from memory and file
253    pub fn clear(&mut self) -> std::io::Result<()> {
254        self.tasks.try_write().unwrap().clear();
255
256        // Create an empty file
257        let file = OpenOptions::new()
258            .write(true)
259            .create(true)
260            .truncate(true)
261            .open(&self.path)?;
262        drop(file);
263
264        Ok(())
265    }
266
267    /// Update the status of an existing key
268    pub fn update_status(
269        &mut self,
270        old_key: &TaskKey,
271        new_status: Status,
272    ) -> std::io::Result<bool> {
273        let mut tasks = self.tasks.try_write().unwrap();
274        if let Some(value) = tasks.remove(old_key) {
275            let new_key = TaskKey {
276                status: new_status,
277                task_id: old_key.task_id.clone(),
278                queue: old_key.queue.clone(),
279            };
280            tasks.insert(new_key, value);
281            Ok(true)
282        } else {
283            Ok(false)
284        }
285    }
286
287    /// Retrieves a task from the storage.
288    pub fn get(&self, key: &TaskKey) -> Option<TaskWithMeta> {
289        let tasks = self.tasks.try_read().unwrap();
290        let res = tasks.get(key);
291        res.cloned()
292    }
293
294    fn update_result(&self, key: &TaskKey, status: Status, val: Value) -> std::io::Result<bool> {
295        let mut tasks = self.tasks.try_write().unwrap();
296        if let Some(mut task) = tasks.remove(key) {
297            let new_key = TaskKey {
298                status,
299                task_id: key.task_id.clone(),
300                queue: key.queue.clone(),
301            };
302            task.result = Some(val);
303
304            tasks.insert(new_key, task);
305            Ok(true)
306        } else {
307            Ok(false)
308        }
309    }
310}
311
312impl<Args> Clone for JsonStorage<Args> {
313    fn clone(&self) -> Self {
314        Self {
315            tasks: self.tasks.clone(),
316            buffer: Vec::new(),
317            path: self.path.clone(),
318            _marker: std::marker::PhantomData,
319        }
320    }
321}
322
323#[cfg(test)]
324mod tests {
325    use std::time::Duration;
326
327    use crate::{
328        backend::{TaskSink, json::JsonStorage},
329        error::BoxDynError,
330        worker::{
331            builder::WorkerBuilder, context::WorkerContext, ext::event_listener::EventListenerExt,
332        },
333    };
334
335    const ITEMS: u32 = 100;
336
337    #[tokio::test]
338    async fn basic_worker() {
339        let mut json_store = JsonStorage::new_temp().unwrap();
340        for i in 0..ITEMS {
341            json_store.push(i).await.unwrap();
342        }
343
344        async fn task(task: u32, ctx: WorkerContext) -> Result<(), BoxDynError> {
345            tokio::time::sleep(Duration::from_secs(1)).await;
346            if task == ITEMS - 1 {
347                ctx.stop().unwrap();
348                return Err("Worker stopped!")?;
349            }
350            Ok(())
351        }
352
353        let worker = WorkerBuilder::new("rango-tango")
354            .backend(json_store)
355            .on_event(|ctx, ev| {
356                println!("On Event = {:?} from = {}", ev, ctx.name());
357            })
358            .build(task);
359        worker.run().await.unwrap();
360    }
361}