floxide_core/
checkpoint.rs1use async_trait::async_trait;
2use serde::{Deserialize, Serialize};
3use std::{
4 collections::{HashMap, VecDeque},
5 fmt::Debug,
6 marker::PhantomData,
7 sync::{Arc, RwLock},
8};
9use thiserror::Error;
10
11use crate::{context::Context, workflow::WorkItem};
12
13#[derive(Debug, Clone, Serialize, Deserialize)]
15#[serde(
16 bound = "C: Serialize + for<'de2> Deserialize<'de2>, WI: Serialize + for<'de2> Deserialize<'de2>"
17)]
18pub struct Checkpoint<C: Context, WI: WorkItem> {
19 pub context: C,
21 pub queue: VecDeque<WI>,
23}
24
25impl<C, WI> Checkpoint<C, WI>
28where
29 C: Context,
30 WI: WorkItem,
31{
32 pub fn new(context: C, queue: VecDeque<WI>) -> Self {
34 Checkpoint { context, queue }
35 }
36}
37
38#[derive(Debug, Error)]
40pub enum CheckpointError {
41 #[error("I/O error: {0}")]
42 Io(#[from] std::io::Error),
43 #[error("Store error: {0}")]
44 Store(String),
45}
46
47#[async_trait]
49pub trait CheckpointStore<C: Context, WI: WorkItem> {
50 async fn save(
52 &self,
53 workflow_id: &str,
54 checkpoint: &Checkpoint<C, WI>,
55 ) -> Result<(), CheckpointError>;
56 async fn load(&self, workflow_id: &str) -> Result<Option<Checkpoint<C, WI>>, CheckpointError>;
58}
59
60#[derive(Clone)]
62pub struct InMemoryCheckpointStore<C: Context, WI: WorkItem> {
63 inner: Arc<RwLock<HashMap<String, Checkpoint<C, WI>>>>,
64 _phantom: PhantomData<WI>,
65}
66
67impl<C: Context, WI: WorkItem> InMemoryCheckpointStore<C, WI> {
68 pub fn new() -> Self {
69 Self {
70 inner: Arc::new(RwLock::new(HashMap::new())),
71 _phantom: PhantomData,
72 }
73 }
74}
75
76impl<C: Context, WI: WorkItem> Default for InMemoryCheckpointStore<C, WI> {
77 fn default() -> Self {
78 Self::new()
79 }
80}
81
82#[async_trait]
83impl<C: Context, WI: WorkItem> CheckpointStore<C, WI> for InMemoryCheckpointStore<C, WI> {
84 async fn save(
85 &self,
86 workflow_id: &str,
87 checkpoint: &Checkpoint<C, WI>,
88 ) -> Result<(), CheckpointError> {
89 let mut map = self.inner.write().unwrap();
91 map.insert(workflow_id.to_string(), checkpoint.clone());
92 Ok(())
93 }
94 async fn load(&self, workflow_id: &str) -> Result<Option<Checkpoint<C, WI>>, CheckpointError> {
95 let map = self.inner.read().unwrap();
96 let maybe_ck = map.get(workflow_id).cloned();
97 match maybe_ck {
98 Some(ck) => Ok(Some(ck)),
99 None => Ok(None),
100 }
101 }
102}