1use chrono::{DateTime, TimeDelta, Utc};
26use serde::{Deserialize, Serialize};
27use uuid::Uuid;
28
29use crate::SyncStats;
30use crate::config::PortalType;
31
32#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
38#[serde(rename_all = "lowercase")]
39pub enum JobStatus {
40 Pending,
42 Running,
44 Completed,
46 Failed,
48 Cancelled,
50}
51
52impl JobStatus {
53 pub fn as_str(&self) -> &'static str {
55 match self {
56 JobStatus::Pending => "pending",
57 JobStatus::Running => "running",
58 JobStatus::Completed => "completed",
59 JobStatus::Failed => "failed",
60 JobStatus::Cancelled => "cancelled",
61 }
62 }
63
64 pub fn is_terminal(&self) -> bool {
66 matches!(
67 self,
68 JobStatus::Completed | JobStatus::Failed | JobStatus::Cancelled
69 )
70 }
71}
72
73#[derive(Debug, Clone, PartialEq, Eq)]
75pub struct ParseJobStatusError(String);
76
77impl std::fmt::Display for ParseJobStatusError {
78 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
79 write!(f, "invalid job status: {}", self.0)
80 }
81}
82
83impl std::error::Error for ParseJobStatusError {}
84
85impl std::str::FromStr for JobStatus {
86 type Err = ParseJobStatusError;
87
88 fn from_str(s: &str) -> Result<Self, Self::Err> {
89 match s {
90 "pending" => Ok(JobStatus::Pending),
91 "running" => Ok(JobStatus::Running),
92 "completed" => Ok(JobStatus::Completed),
93 "failed" => Ok(JobStatus::Failed),
94 "cancelled" => Ok(JobStatus::Cancelled),
95 _ => Err(ParseJobStatusError(s.to_string())),
96 }
97 }
98}
99
100impl std::fmt::Display for JobStatus {
101 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
102 write!(f, "{}", self.as_str())
103 }
104}
105
106#[derive(Debug, Clone)]
112pub struct RetryConfig {
113 pub max_retries: u32,
115 pub max_delay: TimeDelta,
117}
118
119impl Default for RetryConfig {
120 fn default() -> Self {
121 Self {
122 max_retries: 3,
123 max_delay: TimeDelta::minutes(60), }
125 }
126}
127
128impl RetryConfig {
129 pub fn delay_for_attempt(&self, attempt: u32) -> TimeDelta {
136 if attempt == 0 {
137 return TimeDelta::zero();
138 }
139
140 let minutes = match attempt {
142 1 => 1,
143 2 => 5,
144 3 => 30,
145 _ => 60, };
147
148 let delay = TimeDelta::minutes(minutes);
149 std::cmp::min(delay, self.max_delay)
150 }
151}
152
153#[derive(Debug, Clone, Serialize, Deserialize)]
159pub struct HarvestJob {
160 pub id: Uuid,
162
163 pub portal_url: String,
165
166 pub portal_name: Option<String>,
168
169 pub portal_type: PortalType,
171
172 pub status: JobStatus,
174
175 pub created_at: DateTime<Utc>,
177
178 pub updated_at: DateTime<Utc>,
180
181 pub started_at: Option<DateTime<Utc>>,
183
184 pub completed_at: Option<DateTime<Utc>>,
186
187 pub retry_count: u32,
189
190 pub max_retries: u32,
192
193 pub next_retry_at: Option<DateTime<Utc>>,
195
196 pub error_message: Option<String>,
198
199 pub sync_stats: Option<SyncStats>,
201
202 pub worker_id: Option<String>,
204
205 pub force_full_sync: bool,
207
208 pub url_template: Option<String>,
210
211 pub language: Option<String>,
213
214 pub profile: Option<String>,
216
217 pub sparql_endpoint: Option<String>,
219}
220
221impl HarvestJob {
222 pub fn can_retry(&self) -> bool {
224 self.retry_count < self.max_retries
225 }
226
227 pub fn calculate_next_retry(&self, config: &RetryConfig) -> DateTime<Utc> {
229 let delay = config.delay_for_attempt(self.retry_count + 1);
230 Utc::now() + delay
231 }
232}
233
234#[derive(Debug, Clone)]
240pub struct CreateJobRequest {
241 pub portal_url: String,
243 pub portal_name: Option<String>,
245 pub force_full_sync: bool,
247 pub max_retries: Option<u32>,
249 pub url_template: Option<String>,
251
252 pub language: Option<String>,
254
255 pub portal_type: PortalType,
257
258 pub profile: Option<String>,
260
261 pub sparql_endpoint: Option<String>,
263}
264
265impl CreateJobRequest {
266 pub fn new(portal_url: impl Into<String>) -> Self {
268 Self {
269 portal_url: portal_url.into(),
270 portal_name: None,
271 force_full_sync: false,
272 max_retries: None,
273 url_template: None,
274 language: None,
275 portal_type: PortalType::default(),
276 profile: None,
277 sparql_endpoint: None,
278 }
279 }
280
281 pub fn with_name(mut self, name: impl Into<String>) -> Self {
283 self.portal_name = Some(name.into());
284 self
285 }
286
287 pub fn with_full_sync(mut self) -> Self {
289 self.force_full_sync = true;
290 self
291 }
292
293 pub fn with_max_retries(mut self, max: u32) -> Self {
295 self.max_retries = Some(max);
296 self
297 }
298
299 pub fn with_url_template(mut self, template: impl Into<String>) -> Self {
301 self.url_template = Some(template.into());
302 self
303 }
304
305 pub fn with_language(mut self, language: impl Into<String>) -> Self {
307 self.language = Some(language.into());
308 self
309 }
310
311 pub fn with_portal_type(mut self, portal_type: PortalType) -> Self {
313 self.portal_type = portal_type;
314 self
315 }
316
317 pub fn with_profile(mut self, profile: impl Into<String>) -> Self {
319 self.profile = Some(profile.into());
320 self
321 }
322
323 pub fn with_sparql_endpoint(mut self, sparql_endpoint: impl Into<String>) -> Self {
325 self.sparql_endpoint = Some(sparql_endpoint.into());
326 self
327 }
328}
329
330#[derive(Debug, Clone)]
336pub struct WorkerConfig {
337 pub worker_id: String,
339 pub poll_interval: std::time::Duration,
341 pub retry_config: RetryConfig,
343}
344
345impl Default for WorkerConfig {
346 fn default() -> Self {
347 Self {
348 worker_id: format!("worker-{}", Uuid::new_v4()),
349 poll_interval: std::time::Duration::from_secs(5),
350 retry_config: RetryConfig::default(),
351 }
352 }
353}
354
355impl WorkerConfig {
356 pub fn with_worker_id(mut self, id: impl Into<String>) -> Self {
358 self.worker_id = id.into();
359 self
360 }
361
362 pub fn with_poll_interval(mut self, interval: std::time::Duration) -> Self {
364 self.poll_interval = interval;
365 self
366 }
367
368 pub fn with_retry_config(mut self, config: RetryConfig) -> Self {
370 self.retry_config = config;
371 self
372 }
373}
374
375#[cfg(test)]
380mod tests {
381 use super::*;
382
383 #[test]
384 fn test_job_status_as_str() {
385 assert_eq!(JobStatus::Pending.as_str(), "pending");
386 assert_eq!(JobStatus::Running.as_str(), "running");
387 assert_eq!(JobStatus::Completed.as_str(), "completed");
388 assert_eq!(JobStatus::Failed.as_str(), "failed");
389 assert_eq!(JobStatus::Cancelled.as_str(), "cancelled");
390 }
391
392 #[test]
393 fn test_job_status_from_str() {
394 assert_eq!("pending".parse::<JobStatus>(), Ok(JobStatus::Pending));
395 assert_eq!("running".parse::<JobStatus>(), Ok(JobStatus::Running));
396 assert_eq!("completed".parse::<JobStatus>(), Ok(JobStatus::Completed));
397 assert_eq!("failed".parse::<JobStatus>(), Ok(JobStatus::Failed));
398 assert_eq!("cancelled".parse::<JobStatus>(), Ok(JobStatus::Cancelled));
399 assert!("unknown".parse::<JobStatus>().is_err());
400 }
401
402 #[test]
403 fn test_job_status_is_terminal() {
404 assert!(!JobStatus::Pending.is_terminal());
405 assert!(!JobStatus::Running.is_terminal());
406 assert!(JobStatus::Completed.is_terminal());
407 assert!(JobStatus::Failed.is_terminal());
408 assert!(JobStatus::Cancelled.is_terminal());
409 }
410
411 #[test]
412 fn test_retry_config_default() {
413 let config = RetryConfig::default();
414 assert_eq!(config.max_retries, 3);
415 assert_eq!(config.max_delay, TimeDelta::minutes(60));
416 }
417
418 #[test]
419 fn test_retry_delay_exponential() {
420 let config = RetryConfig::default();
421
422 assert_eq!(config.delay_for_attempt(0), TimeDelta::zero());
423 assert_eq!(config.delay_for_attempt(1), TimeDelta::minutes(1));
424 assert_eq!(config.delay_for_attempt(2), TimeDelta::minutes(5));
425 assert_eq!(config.delay_for_attempt(3), TimeDelta::minutes(30));
426 assert_eq!(config.delay_for_attempt(4), TimeDelta::minutes(60)); assert_eq!(config.delay_for_attempt(10), TimeDelta::minutes(60)); }
429
430 #[test]
431 fn test_create_job_request_builder() {
432 let request = CreateJobRequest::new("https://example.com")
433 .with_name("Example Portal")
434 .with_full_sync()
435 .with_max_retries(5);
436
437 assert_eq!(request.portal_url, "https://example.com");
438 assert_eq!(request.portal_name, Some("Example Portal".to_string()));
439 assert!(request.force_full_sync);
440 assert_eq!(request.max_retries, Some(5));
441 }
442
443 #[test]
444 fn test_worker_config_default() {
445 let config = WorkerConfig::default();
446
447 assert!(config.worker_id.starts_with("worker-"));
448 assert_eq!(config.poll_interval, std::time::Duration::from_secs(5));
449 assert_eq!(config.retry_config.max_retries, 3);
450 }
451
452 #[test]
453 fn test_worker_config_builder() {
454 let config = WorkerConfig::default()
455 .with_worker_id("my-worker")
456 .with_poll_interval(std::time::Duration::from_secs(10));
457
458 assert_eq!(config.worker_id, "my-worker");
459 assert_eq!(config.poll_interval, std::time::Duration::from_secs(10));
460 }
461
462 #[test]
463 fn test_harvest_job_can_retry() {
464 let mut job = HarvestJob {
465 id: Uuid::new_v4(),
466 portal_url: "https://example.com".to_string(),
467 portal_name: None,
468 portal_type: crate::config::PortalType::default(),
469 status: JobStatus::Running,
470 created_at: Utc::now(),
471 updated_at: Utc::now(),
472 started_at: Some(Utc::now()),
473 completed_at: None,
474 retry_count: 0,
475 max_retries: 3,
476 next_retry_at: None,
477 error_message: None,
478 sync_stats: None,
479 worker_id: Some("worker-1".to_string()),
480 force_full_sync: false,
481 url_template: None,
482 language: None,
483 profile: None,
484 sparql_endpoint: None,
485 };
486
487 assert!(job.can_retry());
488
489 job.retry_count = 2;
490 assert!(job.can_retry());
491
492 job.retry_count = 3;
493 assert!(!job.can_retry());
494 }
495}