use crate::event::{Event, EventSequence};
use crate::snapshot::Snapshot;
use crate::tenant::{Tenant, TenantId, DEFAULT_TENANT};
use async_trait::async_trait;
use jamjet_core::workflow::{ExecutionId, WorkflowExecution, WorkflowStatus};
use thiserror::Error;
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct WorkflowDefinition {
pub workflow_id: String,
pub version: String,
pub ir: serde_json::Value,
pub created_at: chrono::DateTime<chrono::Utc>,
#[serde(default = "default_tenant_string")]
pub tenant_id: String,
}
fn default_tenant_string() -> String {
DEFAULT_TENANT.to_string()
}
#[derive(Debug, Error)]
pub enum StateBackendError {
#[error("execution not found: {0}")]
NotFound(String),
#[error("optimistic concurrency conflict: sequence mismatch for {0}")]
SequenceConflict(String),
#[error("database error: {0}")]
Database(String),
#[error("serialization error: {0}")]
Serialization(#[from] serde_json::Error),
}
pub type BackendResult<T> = Result<T, StateBackendError>;
#[async_trait]
pub trait StateBackend: Send + Sync {
async fn store_workflow(&self, def: WorkflowDefinition) -> BackendResult<()>;
async fn get_workflow(
&self,
workflow_id: &str,
version: &str,
) -> BackendResult<Option<WorkflowDefinition>>;
async fn create_execution(&self, execution: WorkflowExecution) -> BackendResult<()>;
async fn get_execution(&self, id: &ExecutionId) -> BackendResult<Option<WorkflowExecution>>;
async fn update_execution_status(
&self,
id: &ExecutionId,
status: WorkflowStatus,
) -> BackendResult<()>;
async fn update_execution_current_state(
&self,
id: &ExecutionId,
current_state: &serde_json::Value,
) -> BackendResult<()>;
async fn patch_append_array(
&self,
execution_id: &ExecutionId,
key: &str,
value: serde_json::Value,
) -> BackendResult<()>;
async fn list_executions(
&self,
status: Option<WorkflowStatus>,
limit: u32,
offset: u32,
) -> BackendResult<Vec<WorkflowExecution>>;
async fn append_event(&self, event: Event) -> BackendResult<EventSequence>;
async fn get_events(&self, execution_id: &ExecutionId) -> BackendResult<Vec<Event>>;
async fn get_events_since(
&self,
execution_id: &ExecutionId,
since_sequence: EventSequence,
) -> BackendResult<Vec<Event>>;
async fn latest_sequence(&self, execution_id: &ExecutionId) -> BackendResult<EventSequence>;
async fn write_snapshot(&self, snapshot: Snapshot) -> BackendResult<()>;
async fn latest_snapshot(&self, execution_id: &ExecutionId) -> BackendResult<Option<Snapshot>>;
async fn enqueue_work_item(&self, item: WorkItem) -> BackendResult<WorkItemId>;
async fn claim_work_item(
&self,
worker_id: &str,
queue_types: &[&str],
) -> BackendResult<Option<WorkItem>>;
async fn renew_lease(&self, item_id: WorkItemId, worker_id: &str) -> BackendResult<()>;
async fn complete_work_item(&self, item_id: WorkItemId) -> BackendResult<()>;
async fn fail_work_item(&self, item_id: WorkItemId, error: &str) -> BackendResult<()>;
async fn reclaim_expired_leases(&self) -> BackendResult<ReclaimResult>;
async fn move_to_dead_letter(&self, item_id: WorkItemId, last_error: &str)
-> BackendResult<()>;
async fn create_token(&self, name: &str, role: &str) -> BackendResult<(String, ApiToken)>;
async fn validate_token(&self, token: &str) -> BackendResult<Option<ApiToken>>;
async fn create_tenant(&self, _tenant: Tenant) -> BackendResult<()> {
Err(StateBackendError::Database(
"tenant management not supported".into(),
))
}
async fn get_tenant(&self, _id: &TenantId) -> BackendResult<Option<Tenant>> {
Ok(None)
}
async fn list_tenants(&self) -> BackendResult<Vec<Tenant>> {
Ok(vec![])
}
async fn update_tenant(&self, _tenant: Tenant) -> BackendResult<()> {
Err(StateBackendError::Database(
"tenant management not supported".into(),
))
}
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct ApiToken {
pub id: String,
pub name: String,
pub role: String,
pub created_at: chrono::DateTime<chrono::Utc>,
pub expires_at: Option<chrono::DateTime<chrono::Utc>>,
#[serde(default = "default_tenant_string")]
pub tenant_id: String,
}
pub type WorkItemId = uuid::Uuid;
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct WorkItem {
pub id: WorkItemId,
pub execution_id: ExecutionId,
pub node_id: String,
pub queue_type: String,
pub payload: serde_json::Value,
pub attempt: u32,
pub max_attempts: u32,
pub created_at: chrono::DateTime<chrono::Utc>,
pub lease_expires_at: Option<chrono::DateTime<chrono::Utc>>,
pub worker_id: Option<String>,
#[serde(default = "default_tenant_string")]
pub tenant_id: String,
}
#[derive(Debug, Default)]
pub struct ReclaimResult {
pub retryable: Vec<WorkItem>,
pub exhausted: Vec<WorkItem>,
}