use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use std::{
collections::{HashMap, VecDeque},
fmt::Debug,
marker::PhantomData,
sync::{Arc, RwLock},
};
use thiserror::Error;
use crate::{context::Context, workflow::WorkItem};
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(
bound = "C: Serialize + for<'de2> Deserialize<'de2>, WI: Serialize + for<'de2> Deserialize<'de2>"
)]
pub struct Checkpoint<C: Context, WI: WorkItem> {
pub context: C,
pub queue: VecDeque<WI>,
}
impl<C, WI> Checkpoint<C, WI>
where
C: Context,
WI: WorkItem,
{
pub fn new(context: C, queue: VecDeque<WI>) -> Self {
Checkpoint { context, queue }
}
}
#[derive(Debug, Error)]
pub enum CheckpointError {
#[error("I/O error: {0}")]
Io(#[from] std::io::Error),
#[error("Store error: {0}")]
Store(String),
}
#[async_trait]
pub trait CheckpointStore<C: Context, WI: WorkItem> {
async fn save(
&self,
workflow_id: &str,
checkpoint: &Checkpoint<C, WI>,
) -> Result<(), CheckpointError>;
async fn load(&self, workflow_id: &str) -> Result<Option<Checkpoint<C, WI>>, CheckpointError>;
}
#[derive(Clone)]
pub struct InMemoryCheckpointStore<C: Context, WI: WorkItem> {
inner: Arc<RwLock<HashMap<String, Checkpoint<C, WI>>>>,
_phantom: PhantomData<WI>,
}
impl<C: Context, WI: WorkItem> InMemoryCheckpointStore<C, WI> {
pub fn new() -> Self {
Self {
inner: Arc::new(RwLock::new(HashMap::new())),
_phantom: PhantomData,
}
}
}
impl<C: Context, WI: WorkItem> Default for InMemoryCheckpointStore<C, WI> {
fn default() -> Self {
Self::new()
}
}
#[async_trait]
impl<C: Context, WI: WorkItem> CheckpointStore<C, WI> for InMemoryCheckpointStore<C, WI> {
async fn save(
&self,
workflow_id: &str,
checkpoint: &Checkpoint<C, WI>,
) -> Result<(), CheckpointError> {
let mut map = self.inner.write().unwrap();
map.insert(workflow_id.to_string(), checkpoint.clone());
Ok(())
}
async fn load(&self, workflow_id: &str) -> Result<Option<Checkpoint<C, WI>>, CheckpointError> {
let map = self.inner.read().unwrap();
let maybe_ck = map.get(workflow_id).cloned();
match maybe_ck {
Some(ck) => Ok(Some(ck)),
None => Ok(None),
}
}
}