apalis_file_storage/
lib.rs

1//! # File based Backend using JSON
2//!
3//! A JSON file-based backend for persisting tasks and results.
4//!
5//! ## Features
6//!
7//! - **Sink support**: Ability to push new tasks.
8//! - **Codec Support**: Serialization support for arguments using JSON.
9//! - **Workflow Support**: Flexible enough to support workflows.
10//! - **Ack Support**: Allows acknowledgement of task completion.
11//! - **WaitForCompletion**: Wait for tasks to complete without blocking.
12//!
13//! ## Usage Example
14//!
15//! ```rust
16//! # use apalis_file_storage::JsonStorage;;
17//! # use apalis_core::worker::builder::WorkerBuilder;
18//! # use std::time::Duration;
19//! # use apalis_core::worker::context::WorkerContext;
20//! # use apalis_core::backend::TaskSink;
21//! # use apalis_core::error::BoxDynError;
22//! # use apalis_core::worker::ext::event_listener::EventListenerExt;
23//!
24//! #[tokio::main]
25//! async fn main() {
26//!     let mut json_store = JsonStorage::new_temp().unwrap();
27//!     json_store.push(42).await.unwrap();
28
29//!
30//!     async fn task(task: u32, ctx: WorkerContext) -> Result<(), BoxDynError> {
31//!         tokio::time::sleep(Duration::from_secs(1)).await;
32//!         ctx.stop().unwrap();
33//!         Ok(())
34//!     }
35//!
36//!     let worker = WorkerBuilder::new("rango-tango")
37//!         .backend(json_store)
38//!         .on_event(|ctx, ev| {
39//!             println!("On Event = {:?}", ev);
40//!         })
41//!         .build(task);
42//!     worker.run().await.unwrap();
43//! }
44//! ```
45//!
46//! ## Implementation Notes
47//!
48//! - Tasks are stored in a file, each line representing a serialized task entry.
49//! - All operations are thread-safe using `RwLock`.
50//! - Data is atomically persisted to disk to avoid corruption.
51//! - Supports temporary storage for testing and ephemeral use cases.
52//!
53use serde_json::Value;
54use std::{
55    collections::BTreeMap,
56    fs::{File, OpenOptions},
57    io::{BufRead, Write},
58    path::PathBuf,
59    sync::{Arc, RwLock},
60};
61
62use self::util::{TaskKey, TaskWithMeta};
63use apalis_core::{
64    features_table,
65    task::{
66        Task,
67        status::Status,
68        task_id::{RandomId, TaskId},
69    },
70};
71use std::io::{BufReader, BufWriter};
72
73mod backend;
74mod meta;
75mod shared;
76mod sink;
77mod util;
78
79pub use self::shared::SharedJsonStore;
80pub use meta::JsonMapMetadata;
81/// A backend that persists to a file using json encoding
82///
83/// *Warning*: This backend is not optimized for high-throughput scenarios and is best suited for development, testing, or low-volume workloads.
84///
85/// # Example
86///
87/// Creates a temporary JSON storage backend
88/// ```rust
89/// # use apalis_file_storage::JsonStorage;;
90/// # pub fn setup_json_storage() -> JsonStorage<u32> {
91/// let mut backend = JsonStorage::new_temp().unwrap();
92/// # backend
93/// # }
94/// ```
95#[doc = features_table! {
96    setup = r#"
97        # {
98        #   use apalis_file_storage::JsonStorage;;
99        #   let mut backend = JsonStorage::new_temp().unwrap();
100        #   backend
101        # };
102    "#,
103    Backend => supported("Basic Backend functionality", true),
104    TaskSink => supported("Ability to push new tasks", true),
105    Serialization => limited("Serialization support for arguments. Only accepts `json`", false),
106    WebUI => not_implemented("Expose a web interface for monitoring tasks"),
107    FetchById => not_implemented("Allow fetching a task by its ID"),
108    RegisterWorker => not_supported("Allow registering a worker with the backend"),
109    "[`PipeExt`]" => supported("Allow other backends to pipe to this backend", false),
110    MakeShared => supported("Share the same JSON storage across multiple workers via [`SharedJsonStore`]", false),
111    Workflow => supported("Flexible enough to support workflows", true),
112    WaitForCompletion => supported("Wait for tasks to complete without blocking", true),
113    ResumeById => not_implemented("Resume a task by its ID"),
114    ResumeAbandoned => not_implemented("Resume abandoned tasks"),
115    ListWorkers => not_supported("List all workers registered with the backend"),
116    ListTasks => not_implemented("List all tasks in the backend"),
117}]
118///
119/// [`PipeExt`]: crate::backend::pipe::PipeExt
120#[derive(Debug)]
121pub struct JsonStorage<Args> {
122    tasks: Arc<RwLock<BTreeMap<TaskKey, TaskWithMeta>>>,
123    buffer: Vec<Task<Value, JsonMapMetadata, RandomId>>,
124    path: PathBuf,
125    _marker: std::marker::PhantomData<Args>,
126}
127
128#[derive(serde::Serialize, serde::Deserialize, Debug, Clone)]
129struct StorageEntry {
130    task_id: TaskId<RandomId>,
131    status: Status,
132    task: TaskWithMeta,
133}
134
135impl<Args> JsonStorage<Args> {
136    /// Creates a new `JsonStorage` instance using the specified file path.
137    pub fn new(path: impl Into<PathBuf>) -> std::io::Result<Self> {
138        let path = path.into();
139        let mut data = BTreeMap::new();
140
141        if path.exists() {
142            let file = File::open(&path)?;
143            let reader = BufReader::new(file);
144
145            for line in reader.lines() {
146                let line = line?;
147                if line.trim().is_empty() {
148                    continue;
149                }
150
151                if let Ok(entry) = serde_json::from_str::<StorageEntry>(&line) {
152                    let key = TaskKey {
153                        status: entry.status,
154                        task_id: entry.task_id,
155                        queue: std::any::type_name::<Args>().to_owned(),
156                    };
157                    data.insert(key, entry.task);
158                }
159            }
160        }
161
162        Ok(Self {
163            path,
164            tasks: Arc::new(RwLock::new(data)),
165            buffer: Vec::new(),
166            _marker: std::marker::PhantomData,
167        })
168    }
169
170    /// Creates a new temporary `JsonStorage` instance.
171    pub fn new_temp() -> Result<Self, std::io::Error> {
172        let p = std::env::temp_dir().join(format!("apalis-json-store-{}", RandomId::default()));
173        Self::new(p)
174    }
175
176    fn insert(&self, k: &TaskKey, v: TaskWithMeta) -> Result<(), std::io::Error> {
177        self.tasks.try_write().unwrap().insert(k.clone(), v);
178        Ok(())
179    }
180
181    /// Removes a task from the storage.
182    pub fn remove(&mut self, key: &TaskKey) -> std::io::Result<Option<TaskWithMeta>> {
183        let removed = self.tasks.try_write().unwrap().remove(key);
184
185        if removed.is_some() {
186            self.persist_to_disk()?;
187        }
188
189        Ok(removed)
190    }
191
192    /// Persist all current data to disk by rewriting the file
193    fn persist_to_disk(&self) -> std::io::Result<()> {
194        let tmp_path = &self.path;
195        {
196            let tmp_file = OpenOptions::new()
197                .write(true)
198                .create(true)
199                .truncate(true)
200                .open(tmp_path)?;
201            let mut writer = BufWriter::new(tmp_file);
202
203            for (key, value) in self.tasks.try_read().unwrap().iter() {
204                let entry = StorageEntry {
205                    status: key.status.clone(),
206                    task_id: key.task_id.clone(),
207                    task: value.clone(),
208                };
209                let line = serde_json::to_string(&entry)?;
210                writeln!(writer, "{line}")?;
211            }
212
213            writer.flush()?;
214        } // BufWriter is dropped here, ensuring all data is written
215
216        // Atomically replace the old file with the new one
217        std::fs::rename(tmp_path, &self.path)?;
218        Ok(())
219    }
220    /// Reload data from disk, useful if the file was modified externally
221    pub fn reload(&mut self) -> std::io::Result<()> {
222        let mut new_data = BTreeMap::new();
223
224        if self.path.exists() {
225            let file = File::open(&self.path)?;
226            let reader = BufReader::new(file);
227
228            for line in reader.lines() {
229                let line = line?;
230                if line.trim().is_empty() {
231                    continue;
232                }
233
234                if let Ok(entry) = serde_json::from_str::<StorageEntry>(&line) {
235                    let key = TaskKey {
236                        status: entry.status,
237                        task_id: entry.task_id,
238                        queue: std::any::type_name::<Args>().to_owned(),
239                    };
240                    new_data.insert(key, entry.task);
241                }
242            }
243        }
244
245        *self.tasks.try_write().unwrap() = new_data;
246        Ok(())
247    }
248    /// Clear all data from memory and file
249    pub fn clear(&mut self) -> std::io::Result<()> {
250        self.tasks.try_write().unwrap().clear();
251
252        // Create an empty file
253        let file = OpenOptions::new()
254            .write(true)
255            .create(true)
256            .truncate(true)
257            .open(&self.path)?;
258        drop(file);
259
260        Ok(())
261    }
262
263    /// Update the status of an existing key
264    pub fn update_status(
265        &mut self,
266        old_key: &TaskKey,
267        new_status: Status,
268    ) -> std::io::Result<bool> {
269        let mut tasks = self.tasks.try_write().unwrap();
270        if let Some(value) = tasks.remove(old_key) {
271            let new_key = TaskKey {
272                status: new_status,
273                task_id: old_key.task_id.clone(),
274                queue: old_key.queue.clone(),
275            };
276            tasks.insert(new_key, value);
277            Ok(true)
278        } else {
279            Ok(false)
280        }
281    }
282
283    /// Retrieves a task from the storage.
284    #[must_use]
285    pub fn get(&self, key: &TaskKey) -> Option<TaskWithMeta> {
286        let tasks = self.tasks.try_read().unwrap();
287        let res = tasks.get(key);
288        res.cloned()
289    }
290
291    fn update_result(&self, key: &TaskKey, status: Status, val: Value) -> std::io::Result<bool> {
292        let mut tasks = self.tasks.try_write().unwrap();
293        if let Some(mut task) = tasks.remove(key) {
294            let new_key = TaskKey {
295                status,
296                task_id: key.task_id.clone(),
297                queue: key.queue.clone(),
298            };
299            task.result = Some(val);
300
301            tasks.insert(new_key, task);
302            Ok(true)
303        } else {
304            Ok(false)
305        }
306    }
307}
308
309impl<Args> Clone for JsonStorage<Args> {
310    fn clone(&self) -> Self {
311        Self {
312            tasks: self.tasks.clone(),
313            buffer: Vec::new(),
314            path: self.path.clone(),
315            _marker: std::marker::PhantomData,
316        }
317    }
318}
319
320#[cfg(test)]
321mod tests {
322    use super::*;
323    use std::time::Duration;
324
325    use apalis_core::{
326        backend::TaskSink,
327        error::BoxDynError,
328        worker::{
329            builder::WorkerBuilder, context::WorkerContext, ext::event_listener::EventListenerExt,
330        },
331    };
332
333    const ITEMS: u32 = 100;
334
335    #[tokio::test]
336    async fn basic_worker() {
337        let mut json_store = JsonStorage::new_temp().unwrap();
338        for i in 0..ITEMS {
339            json_store.push(i).await.unwrap();
340        }
341
342        async fn task(task: u32, ctx: WorkerContext) -> Result<(), BoxDynError> {
343            tokio::time::sleep(Duration::from_secs(1)).await;
344            if task == ITEMS - 1 {
345                ctx.stop().unwrap();
346                return Err("Worker stopped!")?;
347            }
348            Ok(())
349        }
350
351        let worker = WorkerBuilder::new("rango-tango")
352            .backend(json_store)
353            .on_event(|ctx, ev| {
354                println!("On Event = {ev:?} from = {}", ctx.name());
355            })
356            .build(task);
357        worker.run().await.unwrap();
358    }
359}