use chrono::{DateTime, TimeDelta, Utc};
use serde::{Deserialize, Serialize};
use uuid::Uuid;
use crate::SyncStats;
use crate::config::PortalType;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum JobStatus {
Pending,
Running,
Completed,
Failed,
Cancelled,
}
impl JobStatus {
pub fn as_str(&self) -> &'static str {
match self {
JobStatus::Pending => "pending",
JobStatus::Running => "running",
JobStatus::Completed => "completed",
JobStatus::Failed => "failed",
JobStatus::Cancelled => "cancelled",
}
}
pub fn is_terminal(&self) -> bool {
matches!(
self,
JobStatus::Completed | JobStatus::Failed | JobStatus::Cancelled
)
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ParseJobStatusError(String);
impl std::fmt::Display for ParseJobStatusError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "invalid job status: {}", self.0)
}
}
impl std::error::Error for ParseJobStatusError {}
impl std::str::FromStr for JobStatus {
type Err = ParseJobStatusError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"pending" => Ok(JobStatus::Pending),
"running" => Ok(JobStatus::Running),
"completed" => Ok(JobStatus::Completed),
"failed" => Ok(JobStatus::Failed),
"cancelled" => Ok(JobStatus::Cancelled),
_ => Err(ParseJobStatusError(s.to_string())),
}
}
}
impl std::fmt::Display for JobStatus {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.as_str())
}
}
#[derive(Debug, Clone)]
pub struct RetryConfig {
pub max_retries: u32,
pub max_delay: TimeDelta,
}
impl Default for RetryConfig {
fn default() -> Self {
Self {
max_retries: 3,
max_delay: TimeDelta::minutes(60), }
}
}
impl RetryConfig {
pub fn delay_for_attempt(&self, attempt: u32) -> TimeDelta {
if attempt == 0 {
return TimeDelta::zero();
}
let minutes = match attempt {
1 => 1,
2 => 5,
3 => 30,
_ => 60, };
let delay = TimeDelta::minutes(minutes);
std::cmp::min(delay, self.max_delay)
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct HarvestJob {
pub id: Uuid,
pub portal_url: String,
pub portal_name: Option<String>,
pub portal_type: PortalType,
pub status: JobStatus,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
pub started_at: Option<DateTime<Utc>>,
pub completed_at: Option<DateTime<Utc>>,
pub retry_count: u32,
pub max_retries: u32,
pub next_retry_at: Option<DateTime<Utc>>,
pub error_message: Option<String>,
pub sync_stats: Option<SyncStats>,
pub worker_id: Option<String>,
pub force_full_sync: bool,
pub url_template: Option<String>,
pub language: Option<String>,
pub profile: Option<String>,
pub sparql_endpoint: Option<String>,
}
impl HarvestJob {
pub fn can_retry(&self) -> bool {
self.retry_count < self.max_retries
}
pub fn calculate_next_retry(&self, config: &RetryConfig) -> DateTime<Utc> {
let delay = config.delay_for_attempt(self.retry_count + 1);
Utc::now() + delay
}
}
#[derive(Debug, Clone)]
pub struct CreateJobRequest {
pub portal_url: String,
pub portal_name: Option<String>,
pub force_full_sync: bool,
pub max_retries: Option<u32>,
pub url_template: Option<String>,
pub language: Option<String>,
pub portal_type: PortalType,
pub profile: Option<String>,
pub sparql_endpoint: Option<String>,
}
impl CreateJobRequest {
pub fn new(portal_url: impl Into<String>) -> Self {
Self {
portal_url: portal_url.into(),
portal_name: None,
force_full_sync: false,
max_retries: None,
url_template: None,
language: None,
portal_type: PortalType::default(),
profile: None,
sparql_endpoint: None,
}
}
pub fn with_name(mut self, name: impl Into<String>) -> Self {
self.portal_name = Some(name.into());
self
}
pub fn with_full_sync(mut self) -> Self {
self.force_full_sync = true;
self
}
pub fn with_max_retries(mut self, max: u32) -> Self {
self.max_retries = Some(max);
self
}
pub fn with_url_template(mut self, template: impl Into<String>) -> Self {
self.url_template = Some(template.into());
self
}
pub fn with_language(mut self, language: impl Into<String>) -> Self {
self.language = Some(language.into());
self
}
pub fn with_portal_type(mut self, portal_type: PortalType) -> Self {
self.portal_type = portal_type;
self
}
pub fn with_profile(mut self, profile: impl Into<String>) -> Self {
self.profile = Some(profile.into());
self
}
pub fn with_sparql_endpoint(mut self, sparql_endpoint: impl Into<String>) -> Self {
self.sparql_endpoint = Some(sparql_endpoint.into());
self
}
}
#[derive(Debug, Clone)]
pub struct WorkerConfig {
pub worker_id: String,
pub poll_interval: std::time::Duration,
pub retry_config: RetryConfig,
}
impl Default for WorkerConfig {
fn default() -> Self {
Self {
worker_id: format!("worker-{}", Uuid::new_v4()),
poll_interval: std::time::Duration::from_secs(5),
retry_config: RetryConfig::default(),
}
}
}
impl WorkerConfig {
pub fn with_worker_id(mut self, id: impl Into<String>) -> Self {
self.worker_id = id.into();
self
}
pub fn with_poll_interval(mut self, interval: std::time::Duration) -> Self {
self.poll_interval = interval;
self
}
pub fn with_retry_config(mut self, config: RetryConfig) -> Self {
self.retry_config = config;
self
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_job_status_as_str() {
assert_eq!(JobStatus::Pending.as_str(), "pending");
assert_eq!(JobStatus::Running.as_str(), "running");
assert_eq!(JobStatus::Completed.as_str(), "completed");
assert_eq!(JobStatus::Failed.as_str(), "failed");
assert_eq!(JobStatus::Cancelled.as_str(), "cancelled");
}
#[test]
fn test_job_status_from_str() {
assert_eq!("pending".parse::<JobStatus>(), Ok(JobStatus::Pending));
assert_eq!("running".parse::<JobStatus>(), Ok(JobStatus::Running));
assert_eq!("completed".parse::<JobStatus>(), Ok(JobStatus::Completed));
assert_eq!("failed".parse::<JobStatus>(), Ok(JobStatus::Failed));
assert_eq!("cancelled".parse::<JobStatus>(), Ok(JobStatus::Cancelled));
assert!("unknown".parse::<JobStatus>().is_err());
}
#[test]
fn test_job_status_is_terminal() {
assert!(!JobStatus::Pending.is_terminal());
assert!(!JobStatus::Running.is_terminal());
assert!(JobStatus::Completed.is_terminal());
assert!(JobStatus::Failed.is_terminal());
assert!(JobStatus::Cancelled.is_terminal());
}
#[test]
fn test_retry_config_default() {
let config = RetryConfig::default();
assert_eq!(config.max_retries, 3);
assert_eq!(config.max_delay, TimeDelta::minutes(60));
}
#[test]
fn test_retry_delay_exponential() {
let config = RetryConfig::default();
assert_eq!(config.delay_for_attempt(0), TimeDelta::zero());
assert_eq!(config.delay_for_attempt(1), TimeDelta::minutes(1));
assert_eq!(config.delay_for_attempt(2), TimeDelta::minutes(5));
assert_eq!(config.delay_for_attempt(3), TimeDelta::minutes(30));
assert_eq!(config.delay_for_attempt(4), TimeDelta::minutes(60)); assert_eq!(config.delay_for_attempt(10), TimeDelta::minutes(60)); }
#[test]
fn test_create_job_request_builder() {
let request = CreateJobRequest::new("https://example.com")
.with_name("Example Portal")
.with_full_sync()
.with_max_retries(5);
assert_eq!(request.portal_url, "https://example.com");
assert_eq!(request.portal_name, Some("Example Portal".to_string()));
assert!(request.force_full_sync);
assert_eq!(request.max_retries, Some(5));
}
#[test]
fn test_worker_config_default() {
let config = WorkerConfig::default();
assert!(config.worker_id.starts_with("worker-"));
assert_eq!(config.poll_interval, std::time::Duration::from_secs(5));
assert_eq!(config.retry_config.max_retries, 3);
}
#[test]
fn test_worker_config_builder() {
let config = WorkerConfig::default()
.with_worker_id("my-worker")
.with_poll_interval(std::time::Duration::from_secs(10));
assert_eq!(config.worker_id, "my-worker");
assert_eq!(config.poll_interval, std::time::Duration::from_secs(10));
}
#[test]
fn test_harvest_job_can_retry() {
let mut job = HarvestJob {
id: Uuid::new_v4(),
portal_url: "https://example.com".to_string(),
portal_name: None,
portal_type: crate::config::PortalType::default(),
status: JobStatus::Running,
created_at: Utc::now(),
updated_at: Utc::now(),
started_at: Some(Utc::now()),
completed_at: None,
retry_count: 0,
max_retries: 3,
next_retry_at: None,
error_message: None,
sync_stats: None,
worker_id: Some("worker-1".to_string()),
force_full_sync: false,
url_template: None,
language: None,
profile: None,
sparql_endpoint: None,
};
assert!(job.can_retry());
job.retry_count = 2;
assert!(job.can_retry());
job.retry_count = 3;
assert!(!job.can_retry());
}
}