floxide_core/
checkpoint.rs

1use async_trait::async_trait;
2use serde::{Deserialize, Serialize};
3use std::{
4    collections::{HashMap, VecDeque},
5    fmt::Debug,
6    marker::PhantomData,
7    sync::{Arc, RwLock},
8};
9use thiserror::Error;
10
11use crate::{context::Context, workflow::WorkItem};
12
13/// A snapshot of a workflow's pending work and its context.
14#[derive(Debug, Clone, Serialize, Deserialize)]
15#[serde(
16    bound = "C: Serialize + for<'de2> Deserialize<'de2>, WI: Serialize + for<'de2> Deserialize<'de2>"
17)]
18pub struct Checkpoint<C: Context, WI: WorkItem> {
19    /// The user-provided context for the workflow
20    pub context: C,
21    /// The queue of pending work items
22    pub queue: VecDeque<WI>,
23}
24
25// Note: Serialization of Checkpoint<C, W> is the responsibility of the user-provided store.
26// The Checkpoint type itself is Serialize + Deserialize, and can be cloned.
27impl<C, WI> Checkpoint<C, WI>
28where
29    C: Context,
30    WI: WorkItem,
31{
32    /// Create a new checkpoint from context and initial queue
33    pub fn new(context: C, queue: VecDeque<WI>) -> Self {
34        Checkpoint { context, queue }
35    }
36}
37
38/// Errors occurring during checkpoint persistence.
39#[derive(Debug, Error)]
40pub enum CheckpointError {
41    #[error("I/O error: {0}")]
42    Io(#[from] std::io::Error),
43    #[error("Store error: {0}")]
44    Store(String),
45}
46
47/// A trait for persisting and loading workflow checkpoints.
48#[async_trait]
49pub trait CheckpointStore<C: Context, WI: WorkItem> {
50    /// Persist the given checkpoint under `workflow_id`.
51    async fn save(
52        &self,
53        workflow_id: &str,
54        checkpoint: &Checkpoint<C, WI>,
55    ) -> Result<(), CheckpointError>;
56    /// Load the last-saved checkpoint for `workflow_id`, if any.
57    async fn load(&self, workflow_id: &str) -> Result<Option<Checkpoint<C, WI>>, CheckpointError>;
58}
59
60/// A simple in-memory checkpoint store using JSON serialization
61#[derive(Clone)]
62pub struct InMemoryCheckpointStore<C: Context, WI: WorkItem> {
63    inner: Arc<RwLock<HashMap<String, Checkpoint<C, WI>>>>,
64    _phantom: PhantomData<WI>,
65}
66
67impl<C: Context, WI: WorkItem> InMemoryCheckpointStore<C, WI> {
68    pub fn new() -> Self {
69        Self {
70            inner: Arc::new(RwLock::new(HashMap::new())),
71            _phantom: PhantomData,
72        }
73    }
74}
75
76impl<C: Context, WI: WorkItem> Default for InMemoryCheckpointStore<C, WI> {
77    fn default() -> Self {
78        Self::new()
79    }
80}
81
82#[async_trait]
83impl<C: Context, WI: WorkItem> CheckpointStore<C, WI> for InMemoryCheckpointStore<C, WI> {
84    async fn save(
85        &self,
86        workflow_id: &str,
87        checkpoint: &Checkpoint<C, WI>,
88    ) -> Result<(), CheckpointError> {
89        // serialize checkpoint to JSON bytes
90        let mut map = self.inner.write().unwrap();
91        map.insert(workflow_id.to_string(), checkpoint.clone());
92        Ok(())
93    }
94    async fn load(&self, workflow_id: &str) -> Result<Option<Checkpoint<C, WI>>, CheckpointError> {
95        let map = self.inner.read().unwrap();
96        let maybe_ck = map.get(workflow_id).cloned();
97        match maybe_ck {
98            Some(ck) => Ok(Some(ck)),
99            None => Ok(None),
100        }
101    }
102}